improve ss

This commit is contained in:
ginuerzh
2021-11-01 21:57:28 +08:00
parent e2995ece96
commit ec8615991b
71 changed files with 554 additions and 316 deletions

143
pkg/listener/ws/listener.go Normal file
View File

@ -0,0 +1,143 @@
package ws
import (
"crypto/tls"
"net"
"net/http"
"github.com/go-gost/gost/pkg/internal/utils"
"github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger"
md "github.com/go-gost/gost/pkg/metadata"
"github.com/go-gost/gost/pkg/registry"
"github.com/gorilla/websocket"
)
func init() {
registry.RegisterListener("ws", NewListener)
registry.RegisterListener("wss", NewListener)
}
type wsListener struct {
saddr string
md metadata
addr net.Addr
upgrader *websocket.Upgrader
srv *http.Server
connChan chan net.Conn
errChan chan error
logger logger.Logger
}
func NewListener(opts ...listener.Option) listener.Listener {
options := &listener.Options{}
for _, opt := range opts {
opt(options)
}
return &wsListener{
saddr: options.Addr,
logger: options.Logger,
}
}
func (l *wsListener) Init(md md.Metadata) (err error) {
if err = l.parseMetadata(md); err != nil {
return
}
l.upgrader = &websocket.Upgrader{
HandshakeTimeout: l.md.handshakeTimeout,
ReadBufferSize: l.md.readBufferSize,
WriteBufferSize: l.md.writeBufferSize,
CheckOrigin: func(r *http.Request) bool { return true },
EnableCompression: l.md.enableCompression,
}
path := l.md.path
if path == "" {
path = defaultPath
}
mux := http.NewServeMux()
mux.Handle(path, http.HandlerFunc(l.upgrade))
l.srv = &http.Server{
Addr: l.saddr,
TLSConfig: l.md.tlsConfig,
Handler: mux,
ReadHeaderTimeout: l.md.readHeaderTimeout,
}
queueSize := l.md.connQueueSize
if queueSize <= 0 {
queueSize = defaultQueueSize
}
l.connChan = make(chan net.Conn, queueSize)
l.errChan = make(chan error, 1)
ln, err := net.Listen("tcp", l.saddr)
if err != nil {
return
}
if l.md.tlsConfig != nil {
ln = tls.NewListener(ln, l.md.tlsConfig)
}
l.addr = ln.Addr()
go func() {
err := l.srv.Serve(ln)
if err != nil {
l.errChan <- err
}
close(l.errChan)
}()
return
}
func (l *wsListener) Accept() (conn net.Conn, err error) {
var ok bool
select {
case conn = <-l.connChan:
case err, ok = <-l.errChan:
if !ok {
err = listener.ErrClosed
}
}
return
}
func (l *wsListener) Close() error {
return l.srv.Close()
}
func (l *wsListener) Addr() net.Addr {
return l.addr
}
func (l *wsListener) parseMetadata(md md.Metadata) (err error) {
l.md.tlsConfig, err = utils.LoadTLSConfig(
md.GetString(certFile),
md.GetString(keyFile),
md.GetString(caFile),
)
if err != nil {
return
}
return
}
func (l *wsListener) upgrade(w http.ResponseWriter, r *http.Request) {
conn, err := l.upgrader.Upgrade(w, r, l.md.responseHeader)
if err != nil {
l.logger.Error(err)
return
}
select {
case l.connChan <- utils.WebsocketServerConn(conn):
default:
conn.Close()
l.logger.Warn("connection queue is full")
}
}

View File

@ -0,0 +1,38 @@
package ws
import (
"crypto/tls"
"net/http"
"time"
)
const (
path = "path"
certFile = "certFile"
keyFile = "keyFile"
caFile = "caFile"
handshakeTimeout = "handshakeTimeout"
readHeaderTimeout = "readHeaderTimeout"
readBufferSize = "readBufferSize"
writeBufferSize = "writeBufferSize"
enableCompression = "enableCompression"
responseHeader = "responseHeader"
connQueueSize = "connQueueSize"
)
const (
defaultPath = "/ws"
defaultQueueSize = 128
)
type metadata struct {
path string
tlsConfig *tls.Config
handshakeTimeout time.Duration
readHeaderTimeout time.Duration
readBufferSize int
writeBufferSize int
enableCompression bool
responseHeader http.Header
connQueueSize int
}

View File

@ -0,0 +1,177 @@
package mux
import (
"crypto/tls"
"net"
"net/http"
"github.com/go-gost/gost/pkg/internal/utils"
"github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger"
md "github.com/go-gost/gost/pkg/metadata"
"github.com/go-gost/gost/pkg/registry"
"github.com/gorilla/websocket"
"github.com/xtaci/smux"
)
func init() {
registry.RegisterListener("mws", NewListener)
registry.RegisterListener("mwss", NewListener)
}
type mwsListener struct {
saddr string
md metadata
addr net.Addr
upgrader *websocket.Upgrader
srv *http.Server
connChan chan net.Conn
errChan chan error
logger logger.Logger
}
func NewListener(opts ...listener.Option) listener.Listener {
options := &listener.Options{}
for _, opt := range opts {
opt(options)
}
return &mwsListener{
logger: options.Logger,
}
}
func (l *mwsListener) Init(md md.Metadata) (err error) {
if err = l.parseMetadata(md); err != nil {
return
}
l.upgrader = &websocket.Upgrader{
HandshakeTimeout: l.md.handshakeTimeout,
ReadBufferSize: l.md.readBufferSize,
WriteBufferSize: l.md.writeBufferSize,
CheckOrigin: func(r *http.Request) bool { return true },
EnableCompression: l.md.enableCompression,
}
path := l.md.path
if path == "" {
path = defaultPath
}
mux := http.NewServeMux()
mux.Handle(path, http.HandlerFunc(l.upgrade))
l.srv = &http.Server{
Addr: l.saddr,
TLSConfig: l.md.tlsConfig,
Handler: mux,
ReadHeaderTimeout: l.md.readHeaderTimeout,
}
l.connChan = make(chan net.Conn, l.md.connQueueSize)
l.errChan = make(chan error, 1)
ln, err := net.Listen("tcp", l.saddr)
if err != nil {
return
}
if l.md.tlsConfig != nil {
ln = tls.NewListener(ln, l.md.tlsConfig)
}
l.addr = ln.Addr()
go func() {
err := l.srv.Serve(ln)
if err != nil {
l.errChan <- err
}
close(l.errChan)
}()
return
}
func (l *mwsListener) Accept() (conn net.Conn, err error) {
var ok bool
select {
case conn = <-l.connChan:
case err, ok = <-l.errChan:
if !ok {
err = listener.ErrClosed
}
}
return
}
func (l *mwsListener) Close() error {
return l.srv.Close()
}
func (l *mwsListener) Addr() net.Addr {
return l.addr
}
func (l *mwsListener) parseMetadata(md md.Metadata) (err error) {
l.md.tlsConfig, err = utils.LoadTLSConfig(
md.GetString(certFile),
md.GetString(keyFile),
md.GetString(caFile),
)
if err != nil {
return
}
return
}
func (l *mwsListener) upgrade(w http.ResponseWriter, r *http.Request) {
conn, err := l.upgrader.Upgrade(w, r, l.md.responseHeader)
if err != nil {
l.logger.Error(err)
return
}
l.mux(utils.WebsocketServerConn(conn))
}
func (l *mwsListener) mux(conn net.Conn) {
smuxConfig := smux.DefaultConfig()
smuxConfig.KeepAliveDisabled = l.md.muxKeepAliveDisabled
if l.md.muxKeepAlivePeriod > 0 {
smuxConfig.KeepAliveInterval = l.md.muxKeepAlivePeriod
}
if l.md.muxKeepAliveTimeout > 0 {
smuxConfig.KeepAliveTimeout = l.md.muxKeepAliveTimeout
}
if l.md.muxMaxFrameSize > 0 {
smuxConfig.MaxFrameSize = l.md.muxMaxFrameSize
}
if l.md.muxMaxReceiveBuffer > 0 {
smuxConfig.MaxReceiveBuffer = l.md.muxMaxReceiveBuffer
}
if l.md.muxMaxStreamBuffer > 0 {
smuxConfig.MaxStreamBuffer = l.md.muxMaxStreamBuffer
}
session, err := smux.Server(conn, smuxConfig)
if err != nil {
l.logger.Error(err)
return
}
defer session.Close()
for {
stream, err := session.AcceptStream()
if err != nil {
l.logger.Error("accept stream:", err)
return
}
select {
case l.connChan <- stream:
case <-stream.GetDieCh():
stream.Close()
default:
stream.Close()
l.logger.Error("connection queue is full")
}
}
}

View File

@ -0,0 +1,52 @@
package mux
import (
"crypto/tls"
"net/http"
"time"
)
const (
path = "path"
certFile = "certFile"
keyFile = "keyFile"
caFile = "caFile"
handshakeTimeout = "handshakeTimeout"
readHeaderTimeout = "readHeaderTimeout"
readBufferSize = "readBufferSize"
writeBufferSize = "writeBufferSize"
enableCompression = "enableCompression"
responseHeader = "responseHeader"
connQueueSize = "connQueueSize"
muxKeepAliveDisabled = "muxKeepAliveDisabled"
muxKeepAlivePeriod = "muxKeepAlivePeriod"
muxKeepAliveTimeout = "muxKeepAliveTimeout"
muxMaxFrameSize = "muxMaxFrameSize"
muxMaxReceiveBuffer = "muxMaxReceiveBuffer"
muxMaxStreamBuffer = "muxMaxStreamBuffer"
)
const (
defaultPath = "/ws"
defaultQueueSize = 128
)
type metadata struct {
path string
tlsConfig *tls.Config
handshakeTimeout time.Duration
readHeaderTimeout time.Duration
readBufferSize int
writeBufferSize int
enableCompression bool
responseHeader http.Header
muxKeepAliveDisabled bool
muxKeepAlivePeriod time.Duration
muxKeepAliveTimeout time.Duration
muxMaxFrameSize int
muxMaxReceiveBuffer int
muxMaxStreamBuffer int
connQueueSize int
}