From ce3d62759a3eaddfece6d907bff3c3b00e040f0d Mon Sep 17 00:00:00 2001 From: ginuerzh Date: Mon, 15 Nov 2021 12:55:05 +0800 Subject: [PATCH] add rtcp --- cmd/gost/config.go | 5 +- cmd/gost/gost.yml | 50 +++++- cmd/gost/register.go | 1 + pkg/{handler => chain}/router.go | 7 +- pkg/config/config.go | 5 +- pkg/connector/http/connector.go | 20 --- pkg/connector/http/metadata.go | 32 +++- pkg/connector/socks/v4/connector.go | 11 -- pkg/connector/socks/v4/metadata.go | 22 ++- pkg/connector/socks/v5/connector.go | 18 --- pkg/connector/socks/v5/metadata.go | 29 +++- pkg/handler/forward/local/handler.go | 8 +- pkg/handler/forward/local/metadata.go | 16 +- pkg/handler/http/handler.go | 35 +---- pkg/handler/http/metadata.go | 51 +++++- pkg/handler/socks/v4/handler.go | 19 +-- pkg/handler/socks/v4/metadata.go | 29 +++- pkg/handler/socks/v5/bind.go | 23 ++- pkg/handler/socks/v5/connect.go | 3 +- pkg/handler/socks/v5/handler.go | 24 +-- pkg/handler/socks/v5/mbind.go | 29 +++- pkg/handler/socks/v5/metadata.go | 44 +++--- pkg/handler/socks/v5/udp.go | 14 +- pkg/handler/socks/v5/udp_tun.go | 11 +- pkg/handler/ss/handler.go | 2 +- pkg/handler/ss/udp.go | 6 +- pkg/internal/utils/mux/mux.go | 40 +++-- pkg/listener/kcp/listener.go | 5 +- pkg/listener/kcp/metadata.go | 14 +- pkg/listener/option.go | 8 + pkg/listener/rtcp/conn.go | 17 ++ pkg/listener/rtcp/listener.go | 214 ++++++++++++++++++++++++++ pkg/listener/rtcp/metadata.go | 35 +++++ pkg/listener/rtcp/mux.go | 109 +++++++++++++ pkg/listener/tcp/listener.go | 7 - pkg/listener/tcp/metadata.go | 19 ++- pkg/listener/udp/listener.go | 54 ++----- pkg/listener/udp/metadata.go | 43 +++++- 38 files changed, 788 insertions(+), 291 deletions(-) rename pkg/{handler => chain}/router.go (92%) create mode 100644 pkg/listener/rtcp/conn.go create mode 100644 pkg/listener/rtcp/listener.go create mode 100644 pkg/listener/rtcp/metadata.go create mode 100644 pkg/listener/rtcp/mux.go diff --git a/cmd/gost/config.go b/cmd/gost/config.go index b2e8137..f587376 100644 --- a/cmd/gost/config.go +++ b/cmd/gost/config.go @@ -47,6 +47,7 @@ func buildService(cfg *config.Config) (services []*service.Service) { }) ln := registry.GetListener(svc.Listener.Type)( listener.AddrOption(svc.Addr), + listener.ChainOption(chains[svc.Listener.Chain]), listener.LoggerOption(listenerLogger), ) if err := ln.Init(metadata.MapMetadata(svc.Listener.Metadata)); err != nil { @@ -59,8 +60,8 @@ func buildService(cfg *config.Config) (services []*service.Service) { }) h := registry.GetHandler(svc.Handler.Type)( - handler.ChainOption(chains[svc.Chain]), - handler.BypassOption(bypasses[svc.Bypass]), + handler.ChainOption(chains[svc.Handler.Chain]), + handler.BypassOption(bypasses[svc.Handler.Bypass]), handler.LoggerOption(handlerLogger), ) diff --git a/cmd/gost/gost.yml b/cmd/gost/gost.yml index 4709e92..4fa4648 100644 --- a/cmd/gost/gost.yml +++ b/cmd/gost/gost.yml @@ -13,6 +13,8 @@ services: addr: ":28000" handler: type: http + chain: chain01 + # bypass: bypass01 metadata: proxyAgent: "gost/3.0" retry: 3 @@ -25,13 +27,13 @@ services: type: tcp metadata: keepAlive: 15s - chain: chain01 - # bypass: bypass01 - name: ss url: "ss://chacha20:gost@:8000" addr: ":28338" handler: type: ss + # chain: chain01 + # bypass: bypass01 metadata: method: chacha20-ietf password: gost @@ -43,26 +45,26 @@ services: type: tcp metadata: keepAlive: 15s - # chain: chain01 - # bypass: bypass01 - name: socks5 url: "socks5://gost:gost@:1080" addr: ":21080" handler: type: socks5 + # chain: chain-ss + # bypass: bypass01 metadata: auths: - gost:gost readTimeout: 5s retry: 3 notls: true + bind: true + udp: true # udpBufferSize: 4096 # range [512, 66560] listener: type: tcp metadata: keepAlive: 15s - chain: chain-ss - # bypass: bypass01 - name: socks5+tcp url: "socks5://gost:gost@:1080" addr: ":21081" @@ -93,6 +95,7 @@ services: failTimeout: 30s handler: type: forward + chain: chain-ss metadata: readTimeout: 5s retry: 3 @@ -100,7 +103,38 @@ services: type: udp metadata: keepAlive: 15s - chain: chain-ss + +- name: kcp-forward-tunnel + addr: ":8388" + forwarder: + targets: + - 127.0.0.1:28338 + handler: + type: forward + metadata: + readTimeout: 5s + retry: 3 + listener: + type: kcp + metadata: + keepAlive: 15s + +- name: rtcp + addr: ":28100" + forwarder: + targets: + - 192.168.8.8:80 + handler: + type: forward + metadata: + readTimeout: 5s + retry: 3 + listener: + type: rtcp + chain: chain-socks5 + metadata: + keepAlive: 15s + mux: true chains: - name: chain01 @@ -179,7 +213,7 @@ chains: - name: hop01 nodes: - name: node01 - addr: ":11080" + addr: ":21080" url: "http://gost:gost@:8081" # bypass: bypass01 connector: diff --git a/cmd/gost/register.go b/cmd/gost/register.go index a2ca242..a87682b 100644 --- a/cmd/gost/register.go +++ b/cmd/gost/register.go @@ -26,6 +26,7 @@ import ( _ "github.com/go-gost/gost/pkg/listener/obfs/http" _ "github.com/go-gost/gost/pkg/listener/obfs/tls" _ "github.com/go-gost/gost/pkg/listener/quic" + _ "github.com/go-gost/gost/pkg/listener/rtcp" _ "github.com/go-gost/gost/pkg/listener/tcp" _ "github.com/go-gost/gost/pkg/listener/tls" _ "github.com/go-gost/gost/pkg/listener/tls/mux" diff --git a/pkg/handler/router.go b/pkg/chain/router.go similarity index 92% rename from pkg/handler/router.go rename to pkg/chain/router.go index 4474e7b..49dfc55 100644 --- a/pkg/handler/router.go +++ b/pkg/chain/router.go @@ -1,4 +1,4 @@ -package handler +package chain import ( "bytes" @@ -6,17 +6,16 @@ import ( "fmt" "net" - "github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/logger" ) type Router struct { - chain *chain.Chain + chain *Chain retries int logger logger.Logger } -func (r *Router) WithChain(chain *chain.Chain) *Router { +func (r *Router) WithChain(chain *Chain) *Router { r.chain = chain return r } diff --git a/pkg/config/config.go b/pkg/config/config.go index a4c72b9..96776d6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -42,11 +42,14 @@ type BypassConfig struct { } type ListenerConfig struct { Type string + Chain string Metadata map[string]interface{} } type HandlerConfig struct { Type string + Chain string + Bypass string Metadata map[string]interface{} } @@ -72,8 +75,6 @@ type ServiceConfig struct { Listener *ListenerConfig Handler *HandlerConfig Forwarder *ForwarderConfig - Chain string - Bypass string } type ChainConfig struct { diff --git a/pkg/connector/http/connector.go b/pkg/connector/http/connector.go index 27de560..8df23b0 100644 --- a/pkg/connector/http/connector.go +++ b/pkg/connector/http/connector.go @@ -9,7 +9,6 @@ import ( "net/http" "net/http/httputil" "net/url" - "strings" "time" "github.com/go-gost/gost/pkg/connector" @@ -111,22 +110,3 @@ func (c *httpConnector) Connect(ctx context.Context, conn net.Conn, network, add return conn, nil } - -func (c *httpConnector) parseMetadata(md md.Metadata) (err error) { - c.md.connectTimeout = md.GetDuration(connectTimeout) - c.md.UserAgent, _ = md.Get(userAgent).(string) - if c.md.UserAgent == "" { - c.md.UserAgent = defaultUserAgent - } - - if v := md.GetString(auth); v != "" { - ss := strings.SplitN(v, ":", 2) - if len(ss) == 1 { - c.md.User = url.User(ss[0]) - } else { - c.md.User = url.UserPassword(ss[0], ss[1]) - } - } - - return -} diff --git a/pkg/connector/http/metadata.go b/pkg/connector/http/metadata.go index b3b6741..5033fcb 100644 --- a/pkg/connector/http/metadata.go +++ b/pkg/connector/http/metadata.go @@ -2,13 +2,10 @@ package http import ( "net/url" + "strings" "time" -) -const ( - connectTimeout = "timeout" - userAgent = "userAgent" - auth = "auth" + md "github.com/go-gost/gost/pkg/metadata" ) const ( @@ -20,3 +17,28 @@ type metadata struct { UserAgent string User *url.Userinfo } + +func (c *httpConnector) parseMetadata(md md.Metadata) (err error) { + const ( + connectTimeout = "timeout" + userAgent = "userAgent" + auth = "auth" + ) + + c.md.connectTimeout = md.GetDuration(connectTimeout) + c.md.UserAgent, _ = md.Get(userAgent).(string) + if c.md.UserAgent == "" { + c.md.UserAgent = defaultUserAgent + } + + if v := md.GetString(auth); v != "" { + ss := strings.SplitN(v, ":", 2) + if len(ss) == 1 { + c.md.User = url.User(ss[0]) + } else { + c.md.User = url.UserPassword(ss[0], ss[1]) + } + } + + return +} diff --git a/pkg/connector/socks/v4/connector.go b/pkg/connector/socks/v4/connector.go index 512d6bd..124fb79 100644 --- a/pkg/connector/socks/v4/connector.go +++ b/pkg/connector/socks/v4/connector.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "net" - "net/url" "strconv" "time" @@ -114,13 +113,3 @@ func (c *socks4Connector) Connect(ctx context.Context, conn net.Conn, network, a return conn, nil } - -func (c *socks4Connector) parseMetadata(md md.Metadata) (err error) { - if v := md.GetString(auth); v != "" { - c.md.User = url.User(v) - } - c.md.connectTimeout = md.GetDuration(connectTimeout) - c.md.disable4a = md.GetBool(disable4a) - - return -} diff --git a/pkg/connector/socks/v4/metadata.go b/pkg/connector/socks/v4/metadata.go index a73efed..41361c9 100644 --- a/pkg/connector/socks/v4/metadata.go +++ b/pkg/connector/socks/v4/metadata.go @@ -3,12 +3,8 @@ package v4 import ( "net/url" "time" -) -const ( - connectTimeout = "timeout" - auth = "auth" - disable4a = "disable4a" + md "github.com/go-gost/gost/pkg/metadata" ) type metadata struct { @@ -16,3 +12,19 @@ type metadata struct { User *url.Userinfo disable4a bool } + +func (c *socks4Connector) parseMetadata(md md.Metadata) (err error) { + const ( + connectTimeout = "timeout" + auth = "auth" + disable4a = "disable4a" + ) + + if v := md.GetString(auth); v != "" { + c.md.User = url.User(v) + } + c.md.connectTimeout = md.GetDuration(connectTimeout) + c.md.disable4a = md.GetBool(disable4a) + + return +} diff --git a/pkg/connector/socks/v5/connector.go b/pkg/connector/socks/v5/connector.go index dc38f04..a5dd0e8 100644 --- a/pkg/connector/socks/v5/connector.go +++ b/pkg/connector/socks/v5/connector.go @@ -6,8 +6,6 @@ import ( "errors" "fmt" "net" - "net/url" - "strings" "time" "github.com/go-gost/gosocks5" @@ -172,19 +170,3 @@ func (c *socks5Connector) connectUDP(ctx context.Context, conn net.Conn, network return socks.UDPTunClientConn(conn, addr), nil } - -func (c *socks5Connector) parseMetadata(md md.Metadata) (err error) { - if v := md.GetString(auth); v != "" { - ss := strings.SplitN(v, ":", 2) - if len(ss) == 1 { - c.md.User = url.User(ss[0]) - } else { - c.md.User = url.UserPassword(ss[0], ss[1]) - } - } - - c.md.connectTimeout = md.GetDuration(connectTimeout) - c.md.noTLS = md.GetBool(noTLS) - - return -} diff --git a/pkg/connector/socks/v5/metadata.go b/pkg/connector/socks/v5/metadata.go index 362c9af..336130f 100644 --- a/pkg/connector/socks/v5/metadata.go +++ b/pkg/connector/socks/v5/metadata.go @@ -3,13 +3,10 @@ package v5 import ( "crypto/tls" "net/url" + "strings" "time" -) -const ( - connectTimeout = "timeout" - auth = "auth" - noTLS = "notls" + md "github.com/go-gost/gost/pkg/metadata" ) type metadata struct { @@ -18,3 +15,25 @@ type metadata struct { tlsConfig *tls.Config noTLS bool } + +func (c *socks5Connector) parseMetadata(md md.Metadata) (err error) { + const ( + connectTimeout = "timeout" + auth = "auth" + noTLS = "notls" + ) + + if v := md.GetString(auth); v != "" { + ss := strings.SplitN(v, ":", 2) + if len(ss) == 1 { + c.md.User = url.User(ss[0]) + } else { + c.md.User = url.UserPassword(ss[0], ss[1]) + } + } + + c.md.connectTimeout = md.GetDuration(connectTimeout) + c.md.noTLS = md.GetBool(noTLS) + + return +} diff --git a/pkg/handler/forward/local/handler.go b/pkg/handler/forward/local/handler.go index c68768e..ef55200 100644 --- a/pkg/handler/forward/local/handler.go +++ b/pkg/handler/forward/local/handler.go @@ -75,7 +75,7 @@ func (h *localForwardHandler) Handle(ctx context.Context, conn net.Conn) { h.logger.Infof("%s >> %s", conn.RemoteAddr(), target.Addr()) - r := (&handler.Router{}). + r := (&chain.Router{}). WithChain(h.chain). WithRetry(h.md.retryCount). WithLogger(h.logger) @@ -105,9 +105,3 @@ func (h *localForwardHandler) Handle(ctx context.Context, conn net.Conn) { }). Infof("%s >-< %s", conn.RemoteAddr(), target.Addr()) } - -func (h *localForwardHandler) parseMetadata(md md.Metadata) (err error) { - h.md.readTimeout = md.GetDuration(readTimeout) - h.md.retryCount = md.GetInt(retryCount) - return -} diff --git a/pkg/handler/forward/local/metadata.go b/pkg/handler/forward/local/metadata.go index 617a55b..b9e2c3a 100644 --- a/pkg/handler/forward/local/metadata.go +++ b/pkg/handler/forward/local/metadata.go @@ -2,14 +2,22 @@ package local import ( "time" -) -const ( - readTimeout = "readTimeout" - retryCount = "retry" + md "github.com/go-gost/gost/pkg/metadata" ) type metadata struct { readTimeout time.Duration retryCount int } + +func (h *localForwardHandler) parseMetadata(md md.Metadata) (err error) { + const ( + readTimeout = "readTimeout" + retryCount = "retry" + ) + + h.md.readTimeout = md.GetDuration(readTimeout) + h.md.retryCount = md.GetInt(retryCount) + return +} diff --git a/pkg/handler/http/handler.go b/pkg/handler/http/handler.go index 1900be3..057d72e 100644 --- a/pkg/handler/http/handler.go +++ b/pkg/handler/http/handler.go @@ -15,7 +15,6 @@ import ( "strings" "time" - "github.com/go-gost/gost/pkg/auth" "github.com/go-gost/gost/pkg/bypass" "github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/handler" @@ -52,38 +51,6 @@ func (h *httpHandler) Init(md md.Metadata) error { return h.parseMetadata(md) } -func (h *httpHandler) parseMetadata(md md.Metadata) error { - h.md.proxyAgent = md.GetString(proxyAgentKey) - - if v, _ := md.Get(authsKey).([]interface{}); len(v) > 0 { - authenticator := auth.NewLocalAuthenticator(nil) - for _, auth := range v { - if s, _ := auth.(string); s != "" { - ss := strings.SplitN(s, ":", 2) - if len(ss) == 1 { - authenticator.Add(ss[0], "") - } else { - authenticator.Add(ss[0], ss[1]) - } - } - } - h.md.authenticator = authenticator - } - - if v := md.GetString(probeResistKey); v != "" { - if ss := strings.SplitN(v, ":", 2); len(ss) == 2 { - h.md.probeResist = &probeResist{ - Type: ss[0], - Value: ss[1], - Knock: md.GetString(knockKey), - } - } - } - h.md.retryCount = md.GetInt(retryCount) - - return nil -} - func (h *httpHandler) Handle(ctx context.Context, conn net.Conn) { defer conn.Close() @@ -209,7 +176,7 @@ func (h *httpHandler) handleRequest(ctx context.Context, conn net.Conn, req *htt req.Header.Del("Proxy-Authorization") - r := (&handler.Router{}). + r := (&chain.Router{}). WithChain(h.chain). WithRetry(h.md.retryCount). WithLogger(h.logger) diff --git a/pkg/handler/http/metadata.go b/pkg/handler/http/metadata.go index 321fdfb..f7ad9c3 100644 --- a/pkg/handler/http/metadata.go +++ b/pkg/handler/http/metadata.go @@ -1,13 +1,10 @@ package http -import "github.com/go-gost/gost/pkg/auth" +import ( + "strings" -const ( - proxyAgentKey = "proxyAgent" - authsKey = "auths" - probeResistKey = "probeResist" - knockKey = "knock" - retryCount = "retry" + "github.com/go-gost/gost/pkg/auth" + md "github.com/go-gost/gost/pkg/metadata" ) type metadata struct { @@ -17,6 +14,46 @@ type metadata struct { probeResist *probeResist } +func (h *httpHandler) parseMetadata(md md.Metadata) error { + const ( + proxyAgentKey = "proxyAgent" + authsKey = "auths" + probeResistKey = "probeResist" + knockKey = "knock" + retryCount = "retry" + ) + + h.md.proxyAgent = md.GetString(proxyAgentKey) + + if v, _ := md.Get(authsKey).([]interface{}); len(v) > 0 { + authenticator := auth.NewLocalAuthenticator(nil) + for _, auth := range v { + if s, _ := auth.(string); s != "" { + ss := strings.SplitN(s, ":", 2) + if len(ss) == 1 { + authenticator.Add(ss[0], "") + } else { + authenticator.Add(ss[0], ss[1]) + } + } + } + h.md.authenticator = authenticator + } + + if v := md.GetString(probeResistKey); v != "" { + if ss := strings.SplitN(v, ":", 2); len(ss) == 2 { + h.md.probeResist = &probeResist{ + Type: ss[0], + Value: ss[1], + Knock: md.GetString(knockKey), + } + } + } + h.md.retryCount = md.GetInt(retryCount) + + return nil +} + type probeResist struct { Type string Value string diff --git a/pkg/handler/socks/v4/handler.go b/pkg/handler/socks/v4/handler.go index 73b26d9..f6f71c2 100644 --- a/pkg/handler/socks/v4/handler.go +++ b/pkg/handler/socks/v4/handler.go @@ -6,7 +6,6 @@ import ( "time" "github.com/go-gost/gosocks4" - "github.com/go-gost/gost/pkg/auth" "github.com/go-gost/gost/pkg/bypass" "github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/handler" @@ -108,7 +107,7 @@ func (h *socks4Handler) handleConnect(ctx context.Context, conn net.Conn, req *g return } - r := (&handler.Router{}). + r := (&chain.Router{}). WithChain(h.chain). WithRetry(h.md.retryCount). WithLogger(h.logger) @@ -142,19 +141,3 @@ func (h *socks4Handler) handleConnect(ctx context.Context, conn net.Conn, req *g func (h *socks4Handler) handleBind(ctx context.Context, conn net.Conn, req *gosocks4.Request) { // TODO: bind } - -func (h *socks4Handler) parseMetadata(md md.Metadata) (err error) { - if v, _ := md.Get(authsKey).([]interface{}); len(v) > 0 { - authenticator := auth.NewLocalAuthenticator(nil) - for _, auth := range v { - if v, _ := auth.(string); v != "" { - authenticator.Add(v, "") - } - } - h.md.authenticator = authenticator - } - - h.md.readTimeout = md.GetDuration(readTimeout) - h.md.retryCount = md.GetInt(retryCount) - return -} diff --git a/pkg/handler/socks/v4/metadata.go b/pkg/handler/socks/v4/metadata.go index a32b5d7..8b38166 100644 --- a/pkg/handler/socks/v4/metadata.go +++ b/pkg/handler/socks/v4/metadata.go @@ -4,12 +4,7 @@ import ( "time" "github.com/go-gost/gost/pkg/auth" -) - -const ( - authsKey = "auths" - readTimeout = "readTimeout" - retryCount = "retry" + md "github.com/go-gost/gost/pkg/metadata" ) type metadata struct { @@ -17,3 +12,25 @@ type metadata struct { readTimeout time.Duration retryCount int } + +func (h *socks4Handler) parseMetadata(md md.Metadata) (err error) { + const ( + authsKey = "auths" + readTimeout = "readTimeout" + retryCount = "retry" + ) + + if v, _ := md.Get(authsKey).([]interface{}); len(v) > 0 { + authenticator := auth.NewLocalAuthenticator(nil) + for _, auth := range v { + if v, _ := auth.(string); v != "" { + authenticator.Add(v, "") + } + } + h.md.authenticator = authenticator + } + + h.md.readTimeout = md.GetDuration(readTimeout) + h.md.retryCount = md.GetInt(retryCount) + return +} diff --git a/pkg/handler/socks/v5/bind.go b/pkg/handler/socks/v5/bind.go index 4ae5003..f72010d 100644 --- a/pkg/handler/socks/v5/bind.go +++ b/pkg/handler/socks/v5/bind.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-gost/gosocks5" + "github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/handler" ) @@ -19,12 +20,20 @@ func (h *socks5Handler) handleBind(ctx context.Context, conn net.Conn, req *goso h.logger.Infof("%s >> %s", conn.RemoteAddr(), addr) + if !h.md.enableBind { + reply := gosocks5.NewReply(gosocks5.NotAllowed, nil) + reply.Write(conn) + h.logger.Debug(reply) + h.logger.Error("BIND is diabled") + return + } + if h.chain.IsEmpty() { h.bindLocal(ctx, conn, addr) return } - r := (&handler.Router{}). + r := (&chain.Router{}). WithChain(h.chain). WithRetry(h.md.retryCount). WithLogger(h.logger) @@ -83,7 +92,7 @@ func (h *socks5Handler) bindLocal(ctx context.Context, conn net.Conn, addr strin h.logger = h.logger.WithFields(map[string]interface{}{ "bind": socksAddr.String(), }) - h.logger.Infof("bind on %s OK", socksAddr.String()) + h.logger.Debugf("bind on %s OK", &socksAddr) h.serveBind(ctx, conn, ln) } @@ -127,10 +136,19 @@ func (h *socks5Handler) serveBind(ctx context.Context, conn net.Conn, ln net.Lis case err := <-accept(): if err != nil { h.logger.Error(err) + + reply := gosocks5.NewReply(gosocks5.Failure, nil) + if err := reply.Write(pc2); err != nil { + h.logger.Error(err) + } + h.logger.Debug(reply) + return } defer rc.Close() + h.logger.Debugf("peer %s accepted", rc.RemoteAddr()) + raddr := gosocks5.Addr{} raddr.ParseFrom(rc.RemoteAddr().String()) reply := gosocks5.NewReply(gosocks5.Succeeded, &raddr) @@ -138,7 +156,6 @@ func (h *socks5Handler) serveBind(ctx context.Context, conn net.Conn, ln net.Lis h.logger.Error(err) } h.logger.Debug(reply) - h.logger.Infof("peer accepted: %s", raddr.String()) start := time.Now() h.logger.Infof("%s <-> %s", conn.RemoteAddr(), raddr.String()) diff --git a/pkg/handler/socks/v5/connect.go b/pkg/handler/socks/v5/connect.go index 8752516..1723360 100644 --- a/pkg/handler/socks/v5/connect.go +++ b/pkg/handler/socks/v5/connect.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-gost/gosocks5" + "github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/handler" ) @@ -24,7 +25,7 @@ func (h *socks5Handler) handleConnect(ctx context.Context, conn net.Conn, addr s return } - r := (&handler.Router{}). + r := (&chain.Router{}). WithChain(h.chain). WithRetry(h.md.retryCount). WithLogger(h.logger) diff --git a/pkg/handler/socks/v5/handler.go b/pkg/handler/socks/v5/handler.go index 324801f..3015712 100644 --- a/pkg/handler/socks/v5/handler.go +++ b/pkg/handler/socks/v5/handler.go @@ -90,29 +90,13 @@ func (h *socks5Handler) Handle(ctx context.Context, conn net.Conn) { case gosocks5.CmdConnect: h.handleConnect(ctx, conn, req.Addr.String()) case gosocks5.CmdBind: - if h.md.enableBind { - h.handleBind(ctx, conn, req) - } else { - h.logger.Error("BIND is diabled") - } + h.handleBind(ctx, conn, req) case socks.CmdMuxBind: - if h.md.enableBind { - h.handleMuxBind(ctx, conn, req) - } else { - h.logger.Error("BIND is diabled") - } + h.handleMuxBind(ctx, conn, req) case gosocks5.CmdUdp: - if h.md.enableUDP { - h.handleUDP(ctx, conn, req) - } else { - h.logger.Error("UDP relay is diabled") - } + h.handleUDP(ctx, conn, req) case socks.CmdUDPTun: - if h.md.enableUDP { - h.handleUDPTun(ctx, conn, req) - } else { - h.logger.Error("UDP relay is diabled") - } + h.handleUDPTun(ctx, conn, req) default: h.logger.Errorf("unknown cmd: %d", req.Cmd) resp := gosocks5.NewReply(gosocks5.CmdUnsupported, nil) diff --git a/pkg/handler/socks/v5/mbind.go b/pkg/handler/socks/v5/mbind.go index ca576c9..ebdca9a 100644 --- a/pkg/handler/socks/v5/mbind.go +++ b/pkg/handler/socks/v5/mbind.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-gost/gosocks5" + "github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/handler" "github.com/go-gost/gost/pkg/internal/utils/mux" ) @@ -20,12 +21,20 @@ func (h *socks5Handler) handleMuxBind(ctx context.Context, conn net.Conn, req *g h.logger.Infof("%s >> %s", conn.RemoteAddr(), addr) + if !h.md.enableBind { + reply := gosocks5.NewReply(gosocks5.NotAllowed, nil) + reply.Write(conn) + h.logger.Debug(reply) + h.logger.Error("BIND is diabled") + return + } + if h.chain.IsEmpty() { h.muxBindLocal(ctx, conn, addr) return } - r := (&handler.Router{}). + r := (&chain.Router{}). WithChain(h.chain). WithRetry(h.md.retryCount). WithLogger(h.logger) @@ -90,14 +99,14 @@ func (h *socks5Handler) muxBindLocal(ctx context.Context, conn net.Conn, addr st h.logger = h.logger.WithFields(map[string]interface{}{ "bind": socksAddr.String(), }) - h.logger.Infof("bind on: %s OK", socksAddr.String()) + h.logger.Debugf("bind on %s OK", &socksAddr) h.serveMuxBind(ctx, conn, ln) } func (h *socks5Handler) serveMuxBind(ctx context.Context, conn net.Conn, ln net.Listener) { // Upgrade connection to multiplex stream. - session, err := mux.NewMuxSession(conn) + session, err := mux.ClientSession(conn) if err != nil { h.logger.Error(err) return @@ -122,7 +131,7 @@ func (h *socks5Handler) serveMuxBind(ctx context.Context, conn net.Conn, ln net. h.logger.Error(err) return } - h.logger.Infof("peer accepted: %s", rc.RemoteAddr().String()) + h.logger.Debugf("peer %s accepted", rc.RemoteAddr()) go func(c net.Conn) { defer c.Close() @@ -134,6 +143,18 @@ func (h *socks5Handler) serveMuxBind(ctx context.Context, conn net.Conn, ln net. } defer sc.Close() + // incompatible with GOST v2.x + if !h.md.compatibilityMode { + addr := gosocks5.Addr{} + addr.ParseFrom(c.RemoteAddr().String()) + reply := gosocks5.NewReply(gosocks5.Succeeded, &addr) + if err := reply.Write(sc); err != nil { + h.logger.Error(err) + return + } + h.logger.Debug(reply) + } + t := time.Now() h.logger.Infof("%s <-> %s", conn.RemoteAddr(), c.RemoteAddr().String()) handler.Transport(sc, c) diff --git a/pkg/handler/socks/v5/metadata.go b/pkg/handler/socks/v5/metadata.go index 2a1f8f5..9036a62 100644 --- a/pkg/handler/socks/v5/metadata.go +++ b/pkg/handler/socks/v5/metadata.go @@ -11,30 +11,32 @@ import ( ) type metadata struct { - tlsConfig *tls.Config - authenticator auth.Authenticator - timeout time.Duration - readTimeout time.Duration - retryCount int - noTLS bool - enableBind bool - enableUDP bool - udpBufferSize int + tlsConfig *tls.Config + authenticator auth.Authenticator + timeout time.Duration + readTimeout time.Duration + retryCount int + noTLS bool + enableBind bool + enableUDP bool + udpBufferSize int + compatibilityMode bool } func (h *socks5Handler) parseMetadata(md md.Metadata) error { const ( - certFile = "certFile" - keyFile = "keyFile" - caFile = "caFile" - authsKey = "auths" - readTimeout = "readTimeout" - timeout = "timeout" - retryCount = "retry" - noTLS = "notls" - enableBind = "bind" - enableUDP = "udp" - udpBufferSize = "udpBufferSize" + certFile = "certFile" + keyFile = "keyFile" + caFile = "caFile" + authsKey = "auths" + readTimeout = "readTimeout" + timeout = "timeout" + retryCount = "retry" + noTLS = "notls" + enableBind = "bind" + enableUDP = "udp" + udpBufferSize = "udpBufferSize" + compatibilityMode = "comp" ) var err error @@ -81,5 +83,7 @@ func (h *socks5Handler) parseMetadata(md md.Metadata) error { h.md.udpBufferSize = 4096 // default buffer size } + h.md.compatibilityMode = md.GetBool(compatibilityMode) + return nil } diff --git a/pkg/handler/socks/v5/udp.go b/pkg/handler/socks/v5/udp.go index ec35246..807d5d5 100644 --- a/pkg/handler/socks/v5/udp.go +++ b/pkg/handler/socks/v5/udp.go @@ -9,7 +9,7 @@ import ( "time" "github.com/go-gost/gosocks5" - "github.com/go-gost/gost/pkg/handler" + "github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/internal/bufpool" "github.com/go-gost/gost/pkg/internal/utils/socks" ) @@ -19,6 +19,14 @@ func (h *socks5Handler) handleUDP(ctx context.Context, conn net.Conn, req *gosoc "cmd": "udp", }) + if !h.md.enableUDP { + reply := gosocks5.NewReply(gosocks5.NotAllowed, nil) + reply.Write(conn) + h.logger.Debug(reply) + h.logger.Error("UDP relay is diabled") + return + } + relay, err := net.ListenUDP("udp", nil) if err != nil { h.logger.Error(err) @@ -43,7 +51,7 @@ func (h *socks5Handler) handleUDP(ctx context.Context, conn net.Conn, req *gosoc h.logger = h.logger.WithFields(map[string]interface{}{ "bind": saddr.String(), }) - h.logger.Infof("bind on %s OK", saddr.String()) + h.logger.Debugf("bind on %s OK", &saddr) if h.chain.IsEmpty() { // serve as standard socks5 udp relay. @@ -81,7 +89,7 @@ func (h *socks5Handler) handleUDP(ctx context.Context, conn net.Conn, req *gosoc } func (h *socks5Handler) getUDPTun(ctx context.Context) (conn net.Conn, err error) { - r := (&handler.Router{}). + r := (&chain.Router{}). WithChain(h.chain). WithRetry(h.md.retryCount). WithLogger(h.logger) diff --git a/pkg/handler/socks/v5/udp_tun.go b/pkg/handler/socks/v5/udp_tun.go index dbd1891..abae6c6 100644 --- a/pkg/handler/socks/v5/udp_tun.go +++ b/pkg/handler/socks/v5/udp_tun.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-gost/gosocks5" + "github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/handler" "github.com/go-gost/gost/pkg/internal/bufpool" "github.com/go-gost/gost/pkg/internal/utils/socks" @@ -16,6 +17,14 @@ func (h *socks5Handler) handleUDPTun(ctx context.Context, conn net.Conn, req *go "cmd": "udp-tun", }) + if !h.md.enableUDP { + reply := gosocks5.NewReply(gosocks5.NotAllowed, nil) + reply.Write(conn) + h.logger.Debug(reply) + h.logger.Error("UDP relay is diabled") + return + } + if h.chain.IsEmpty() { addr := req.Addr.String() @@ -56,7 +65,7 @@ func (h *socks5Handler) handleUDPTun(ctx context.Context, conn net.Conn, req *go return } - r := (&handler.Router{}). + r := (&chain.Router{}). WithChain(h.chain). WithRetry(h.md.retryCount). WithLogger(h.logger) diff --git a/pkg/handler/ss/handler.go b/pkg/handler/ss/handler.go index a2f255a..00520f9 100644 --- a/pkg/handler/ss/handler.go +++ b/pkg/handler/ss/handler.go @@ -123,7 +123,7 @@ func (h *ssHandler) Handle(ctx context.Context, conn net.Conn) { return } - r := (&handler.Router{}). + r := (&chain.Router{}). WithChain(h.chain). WithRetry(h.md.retryCount). WithLogger(h.logger) diff --git a/pkg/handler/ss/udp.go b/pkg/handler/ss/udp.go index 1553a1c..5f353a8 100644 --- a/pkg/handler/ss/udp.go +++ b/pkg/handler/ss/udp.go @@ -5,7 +5,7 @@ import ( "net" "time" - "github.com/go-gost/gost/pkg/handler" + "github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/internal/bufpool" "github.com/go-gost/gost/pkg/internal/utils/socks" "github.com/go-gost/gost/pkg/internal/utils/ss" @@ -17,7 +17,7 @@ func (h *ssHandler) handleUDP(ctx context.Context, raddr net.Addr, conn net.Pack } // obtain a udp connection - r := (&handler.Router{}). + r := (&chain.Router{}). WithChain(h.chain). WithRetry(h.md.retryCount). WithLogger(h.logger) @@ -51,7 +51,7 @@ func (h *ssHandler) handleUDP(ctx context.Context, raddr net.Addr, conn net.Pack func (h *ssHandler) handleUDPTun(ctx context.Context, conn net.Conn) { // obtain a udp connection - r := (&handler.Router{}). + r := (&chain.Router{}). WithChain(h.chain). WithRetry(h.md.retryCount). WithLogger(h.logger) diff --git a/pkg/internal/utils/mux/mux.go b/pkg/internal/utils/mux/mux.go index cd70aae..a3a6f8d 100644 --- a/pkg/internal/utils/mux/mux.go +++ b/pkg/internal/utils/mux/mux.go @@ -6,70 +6,80 @@ import ( smux "github.com/xtaci/smux" ) -type MuxSession struct { +type Session struct { conn net.Conn session *smux.Session } -func NewMuxSession(conn net.Conn) (*MuxSession, error) { - // Upgrade connection to multiplex stream. +func ClientSession(conn net.Conn) (*Session, error) { s, err := smux.Client(conn, smux.DefaultConfig()) if err != nil { return nil, err } - return &MuxSession{ + return &Session{ conn: conn, session: s, }, nil } -func (session *MuxSession) GetConn() (net.Conn, error) { +func ServerSession(conn net.Conn) (*Session, error) { + s, err := smux.Server(conn, smux.DefaultConfig()) + if err != nil { + return nil, err + } + return &Session{ + conn: conn, + session: s, + }, nil +} + +func (session *Session) GetConn() (net.Conn, error) { stream, err := session.session.OpenStream() if err != nil { return nil, err } - return &muxStreamConn{Conn: session.conn, stream: stream}, nil + return &StreamConn{Conn: session.conn, stream: stream}, nil } -func (session *MuxSession) Accept() (net.Conn, error) { +func (session *Session) Accept() (net.Conn, error) { stream, err := session.session.AcceptStream() if err != nil { return nil, err } - return &muxStreamConn{Conn: session.conn, stream: stream}, nil + return &StreamConn{Conn: session.conn, stream: stream}, nil } -func (session *MuxSession) Close() error { +func (session *Session) Close() error { if session.session == nil { return nil } return session.session.Close() } -func (session *MuxSession) IsClosed() bool { +func (session *Session) IsClosed() bool { if session.session == nil { return true } return session.session.IsClosed() } -func (session *MuxSession) NumStreams() int { +func (session *Session) NumStreams() int { return session.session.NumStreams() } -type muxStreamConn struct { +type StreamConn struct { net.Conn stream *smux.Stream } -func (c *muxStreamConn) Read(b []byte) (n int, err error) { +func (c *StreamConn) Read(b []byte) (n int, err error) { return c.stream.Read(b) } -func (c *muxStreamConn) Write(b []byte) (n int, err error) { +func (c *StreamConn) Write(b []byte) (n int, err error) { return c.stream.Write(b) } -func (c *muxStreamConn) Close() error { +func (c *StreamConn) Close() error { return c.stream.Close() } diff --git a/pkg/listener/kcp/listener.go b/pkg/listener/kcp/listener.go index 53e5c3c..09b7c16 100644 --- a/pkg/listener/kcp/listener.go +++ b/pkg/listener/kcp/listener.go @@ -48,6 +48,7 @@ func (l *kcpListener) Init(md md.Metadata) (err error) { config = DefaultConfig } config.Init() + l.md.config = config var ln *kcp.Listener @@ -168,7 +169,3 @@ func (l *kcpListener) mux(conn net.Conn) { } } } - -func (l *kcpListener) parseMetadata(md md.Metadata) (err error) { - return -} diff --git a/pkg/listener/kcp/metadata.go b/pkg/listener/kcp/metadata.go index 977c735..8b89741 100644 --- a/pkg/listener/kcp/metadata.go +++ b/pkg/listener/kcp/metadata.go @@ -1,8 +1,6 @@ package kcp -const ( - connQueueSize = "connQueueSize" -) +import md "github.com/go-gost/gost/pkg/metadata" const ( defaultQueueSize = 128 @@ -13,3 +11,13 @@ type metadata struct { connQueueSize int } + +func (l *kcpListener) parseMetadata(md md.Metadata) (err error) { + const ( + connQueueSize = "connQueueSize" + ) + + l.md.connQueueSize = md.GetInt(connQueueSize) + + return +} diff --git a/pkg/listener/option.go b/pkg/listener/option.go index ceb6b88..570dea5 100644 --- a/pkg/listener/option.go +++ b/pkg/listener/option.go @@ -1,11 +1,13 @@ package listener import ( + "github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/logger" ) type Options struct { Addr string + Chain *chain.Chain Logger logger.Logger } @@ -17,6 +19,12 @@ func AddrOption(addr string) Option { } } +func ChainOption(chain *chain.Chain) Option { + return func(opts *Options) { + opts.Chain = chain + } +} + func LoggerOption(logger logger.Logger) Option { return func(opts *Options) { opts.Logger = logger diff --git a/pkg/listener/rtcp/conn.go b/pkg/listener/rtcp/conn.go new file mode 100644 index 0000000..1bc27c4 --- /dev/null +++ b/pkg/listener/rtcp/conn.go @@ -0,0 +1,17 @@ +package rtcp + +import "net" + +type peerConn struct { + net.Conn + localAddr net.Addr + remoteAddr net.Addr +} + +func (c *peerConn) LocalAddr() net.Addr { + return c.localAddr +} + +func (c *peerConn) RemoteAddr() net.Addr { + return c.remoteAddr +} diff --git a/pkg/listener/rtcp/listener.go b/pkg/listener/rtcp/listener.go new file mode 100644 index 0000000..565f2d8 --- /dev/null +++ b/pkg/listener/rtcp/listener.go @@ -0,0 +1,214 @@ +package rtcp + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + "github.com/go-gost/gosocks5" + "github.com/go-gost/gost/pkg/chain" + "github.com/go-gost/gost/pkg/internal/utils/mux" + "github.com/go-gost/gost/pkg/listener" + "github.com/go-gost/gost/pkg/logger" + md "github.com/go-gost/gost/pkg/metadata" + "github.com/go-gost/gost/pkg/registry" +) + +func init() { + registry.RegisterListener("rtcp", NewListener) +} + +type rtcpListener struct { + addr string + laddr net.Addr + chain *chain.Chain + md metadata + ln net.Listener + connChan chan net.Conn + session *mux.Session + sessionMux sync.Mutex + logger logger.Logger + closed chan struct{} +} + +func NewListener(opts ...listener.Option) listener.Listener { + options := &listener.Options{} + for _, opt := range opts { + opt(options) + } + return &rtcpListener{ + addr: options.Addr, + chain: options.Chain, + closed: make(chan struct{}), + logger: options.Logger, + } +} + +func (l *rtcpListener) Init(md md.Metadata) (err error) { + if err = l.parseMetadata(md); err != nil { + return + } + + laddr, err := net.ResolveTCPAddr("tcp", l.addr) + if err != nil { + return + } + + l.laddr = laddr + l.connChan = make(chan net.Conn, l.md.connQueueSize) + + if l.chain.IsEmpty() { + l.ln, err = net.ListenTCP("tcp", laddr) + return err + } + + go l.listenLoop() + + return +} + +func (l *rtcpListener) Addr() net.Addr { + return l.laddr +} + +func (l *rtcpListener) Close() error { + if l.ln != nil { + return l.ln.Close() + } + + select { + case <-l.closed: + default: + close(l.closed) + } + + return nil +} + +func (l *rtcpListener) Accept() (conn net.Conn, err error) { + if l.ln != nil { + return l.ln.Accept() + } + + select { + case conn = <-l.connChan: + case <-l.closed: + err = net.ErrClosed + } + + return +} + +func (l *rtcpListener) listenLoop() { + var tempDelay time.Duration + + for { + select { + case <-l.closed: + return + default: + } + + conn, err := l.accept() + + if err != nil { + if tempDelay == 0 { + tempDelay = 1000 * time.Millisecond + } else { + tempDelay *= 2 + } + if max := 6 * time.Second; tempDelay > max { + tempDelay = max + } + l.logger.Warnf("accept: %v, retrying in %v", err, tempDelay) + time.Sleep(tempDelay) + continue + } + + tempDelay = 0 + + select { + case l.connChan <- conn: + default: + conn.Close() + l.logger.Warnf("connection queue is full, client %s discarded", conn.RemoteAddr().String()) + } + } +} + +func (l *rtcpListener) accept() (net.Conn, error) { + if l.md.enableMux { + return l.muxAccept() + } + + r := (&chain.Router{}). + WithChain(l.chain). + WithRetry(l.md.retryCount). + WithLogger(l.logger) + cc, err := r.Connect(context.Background()) + if err != nil { + return nil, err + } + + conn, err := l.waitPeer(cc) + if err != nil { + l.logger.Error(err) + cc.Close() + return nil, err + } + + l.logger.Debugf("peer %s accepted", conn.RemoteAddr()) + + return conn, nil +} + +func (l *rtcpListener) waitPeer(conn net.Conn) (net.Conn, error) { + addr := gosocks5.Addr{} + addr.ParseFrom(l.addr) + req := gosocks5.NewRequest(gosocks5.CmdBind, &addr) + if err := req.Write(conn); err != nil { + l.logger.Error(err) + return nil, err + } + + // first reply, bind status + rep, err := gosocks5.ReadReply(conn) + if err != nil { + l.logger.Error(err) + return nil, err + } + + l.logger.Debug(rep) + + if rep.Rep != gosocks5.Succeeded { + err = fmt.Errorf("bind on %s failed", l.addr) + l.logger.Error(err) + return nil, err + } + l.logger.Debugf("bind on %s OK", rep.Addr) + + // second reply, peer connected + rep, err = gosocks5.ReadReply(conn) + if err != nil { + l.logger.Error(err) + return nil, err + } + if rep.Rep != gosocks5.Succeeded { + err = fmt.Errorf("peer connect failed") + l.logger.Error(err) + return nil, err + } + + raddr, err := net.ResolveTCPAddr("tcp", rep.Addr.String()) + if err != nil { + return nil, err + } + + return &peerConn{ + Conn: conn, + localAddr: l.laddr, + remoteAddr: raddr, + }, nil +} diff --git a/pkg/listener/rtcp/metadata.go b/pkg/listener/rtcp/metadata.go new file mode 100644 index 0000000..8ee306e --- /dev/null +++ b/pkg/listener/rtcp/metadata.go @@ -0,0 +1,35 @@ +package rtcp + +import ( + "time" + + md "github.com/go-gost/gost/pkg/metadata" +) + +const ( + defaultKeepAlivePeriod = 180 * time.Second + defaultConnQueueSize = 128 +) + +type metadata struct { + enableMux bool + connQueueSize int + retryCount int +} + +func (l *rtcpListener) parseMetadata(md md.Metadata) (err error) { + const ( + enableMux = "mux" + connQueueSize = "connQueueSize" + retryCount = "retry" + ) + + l.md.enableMux = md.GetBool(enableMux) + l.md.retryCount = md.GetInt(retryCount) + + l.md.connQueueSize = md.GetInt(connQueueSize) + if l.md.connQueueSize <= 0 { + l.md.connQueueSize = defaultConnQueueSize + } + return +} diff --git a/pkg/listener/rtcp/mux.go b/pkg/listener/rtcp/mux.go new file mode 100644 index 0000000..22a0b2b --- /dev/null +++ b/pkg/listener/rtcp/mux.go @@ -0,0 +1,109 @@ +package rtcp + +import ( + "context" + "fmt" + "net" + + "github.com/go-gost/gosocks5" + "github.com/go-gost/gost/pkg/chain" + "github.com/go-gost/gost/pkg/internal/utils/mux" + "github.com/go-gost/gost/pkg/internal/utils/socks" +) + +func (l *rtcpListener) muxAccept() (net.Conn, error) { + session, err := l.getSession() + if err != nil { + l.logger.Error(err) + return nil, err + } + + cc, err := session.Accept() + if err != nil { + session.Close() + return nil, err + } + + conn, err := l.getPeerConn(cc) + if err != nil { + l.logger.Error(err) + cc.Close() + return nil, err + } + + l.logger.Debugf("peer %s accepted", conn.RemoteAddr()) + + return conn, nil +} + +func (l *rtcpListener) getPeerConn(conn net.Conn) (net.Conn, error) { + // second reply, peer connected + rep, err := gosocks5.ReadReply(conn) + if err != nil { + return nil, err + } + if rep.Rep != gosocks5.Succeeded { + err = fmt.Errorf("peer connect failed") + return nil, err + } + + raddr, err := net.ResolveTCPAddr("tcp", rep.Addr.String()) + if err != nil { + return nil, err + } + + return &peerConn{ + Conn: conn, + localAddr: l.laddr, + remoteAddr: raddr, + }, nil +} + +func (l *rtcpListener) getSession() (s *mux.Session, err error) { + l.sessionMux.Lock() + defer l.sessionMux.Unlock() + + if l.session != nil && !l.session.IsClosed() { + return l.session, nil + } + + r := (&chain.Router{}). + WithChain(l.chain). + WithRetry(l.md.retryCount). + WithLogger(l.logger) + conn, err := r.Connect(context.Background()) + if err != nil { + return nil, err + } + + l.session, err = l.initSession(conn) + if err != nil { + conn.Close() + return + } + + return l.session, nil +} + +func (l *rtcpListener) initSession(conn net.Conn) (*mux.Session, error) { + addr := gosocks5.Addr{} + addr.ParseFrom(l.addr) + req := gosocks5.NewRequest(socks.CmdMuxBind, &addr) + if err := req.Write(conn); err != nil { + return nil, err + } + + // first reply, bind status + rep, err := gosocks5.ReadReply(conn) + if err != nil { + return nil, err + } + + if rep.Rep != gosocks5.Succeeded { + err = fmt.Errorf("bind on %s failed", l.addr) + return nil, err + } + l.logger.Debugf("bind on %s OK", rep.Addr) + + return mux.ServerSession(conn) +} diff --git a/pkg/listener/tcp/listener.go b/pkg/listener/tcp/listener.go index bf548fd..d2f964b 100644 --- a/pkg/listener/tcp/listener.go +++ b/pkg/listener/tcp/listener.go @@ -57,10 +57,3 @@ func (l *tcpListener) Init(md md.Metadata) (err error) { l.Listener = ln return } - -func (l *tcpListener) parseMetadata(md md.Metadata) (err error) { - l.md.keepAlive = md.GetBool(keepAlive) - l.md.keepAlivePeriod = md.GetDuration(keepAlivePeriod) - - return -} diff --git a/pkg/listener/tcp/metadata.go b/pkg/listener/tcp/metadata.go index ecf21ff..b93af21 100644 --- a/pkg/listener/tcp/metadata.go +++ b/pkg/listener/tcp/metadata.go @@ -1,10 +1,9 @@ package tcp -import "time" +import ( + "time" -const ( - keepAlive = "keepAlive" - keepAlivePeriod = "keepAlivePeriod" + md "github.com/go-gost/gost/pkg/metadata" ) const ( @@ -15,3 +14,15 @@ type metadata struct { keepAlive bool keepAlivePeriod time.Duration } + +func (l *tcpListener) parseMetadata(md md.Metadata) (err error) { + const ( + keepAlive = "keepAlive" + keepAlivePeriod = "keepAlivePeriod" + ) + + l.md.keepAlive = md.GetBool(keepAlive) + l.md.keepAlivePeriod = md.GetDuration(keepAlivePeriod) + + return +} diff --git a/pkg/listener/udp/listener.go b/pkg/listener/udp/listener.go index e595672..7f04000 100644 --- a/pkg/listener/udp/listener.go +++ b/pkg/listener/udp/listener.go @@ -15,14 +15,14 @@ func init() { } type udpListener struct { - addr string - md metadata - conn net.PacketConn - connChan chan net.Conn - errChan chan error - closeChan chan struct{} - connPool *connPool - logger logger.Logger + addr string + md metadata + conn net.PacketConn + connChan chan net.Conn + errChan chan error + closed chan struct{} + connPool *connPool + logger logger.Logger } func NewListener(opts ...listener.Option) listener.Listener { @@ -31,10 +31,10 @@ func NewListener(opts ...listener.Option) listener.Listener { opt(options) } return &udpListener{ - addr: options.Addr, - errChan: make(chan error, 1), - closeChan: make(chan struct{}), - logger: options.Logger, + addr: options.Addr, + errChan: make(chan error, 1), + closed: make(chan struct{}), + logger: options.Logger, } } @@ -75,13 +75,14 @@ func (l *udpListener) Accept() (conn net.Conn, err error) { func (l *udpListener) Close() error { select { - case <-l.closeChan: - return nil + case <-l.closed: default: - close(l.closeChan) + close(l.closed) l.connPool.Close() return l.conn.Close() } + + return nil } func (l *udpListener) Addr() net.Addr { @@ -126,26 +127,3 @@ func (l *udpListener) getConn(addr net.Addr) *conn { } return c } - -func (l *udpListener) parseMetadata(md md.Metadata) (err error) { - l.md.ttl = md.GetDuration(ttl) - if l.md.ttl <= 0 { - l.md.ttl = defaultTTL - } - l.md.readBufferSize = md.GetInt(readBufferSize) - if l.md.readBufferSize <= 0 { - l.md.readBufferSize = defaultReadBufferSize - } - - l.md.readQueueSize = md.GetInt(readQueueSize) - if l.md.readQueueSize <= 0 { - l.md.readQueueSize = defaultReadQueueSize - } - - l.md.connQueueSize = md.GetInt(connQueueSize) - if l.md.connQueueSize <= 0 { - l.md.connQueueSize = defaultConnQueueSize - } - - return -} diff --git a/pkg/listener/udp/metadata.go b/pkg/listener/udp/metadata.go index ed575ab..89cf7d8 100644 --- a/pkg/listener/udp/metadata.go +++ b/pkg/listener/udp/metadata.go @@ -1,6 +1,10 @@ package udp -import "time" +import ( + "time" + + md "github.com/go-gost/gost/pkg/metadata" +) const ( defaultTTL = 60 * time.Second @@ -9,13 +13,6 @@ const ( defaultConnQueueSize = 128 ) -const ( - ttl = "ttl" - readBufferSize = "readBufferSize" - readQueueSize = "readQueueSize" - connQueueSize = "connQueueSize" -) - type metadata struct { ttl time.Duration @@ -23,3 +20,33 @@ type metadata struct { readQueueSize int connQueueSize int } + +func (l *udpListener) parseMetadata(md md.Metadata) (err error) { + const ( + ttl = "ttl" + readBufferSize = "readBufferSize" + readQueueSize = "readQueueSize" + connQueueSize = "connQueueSize" + ) + + l.md.ttl = md.GetDuration(ttl) + if l.md.ttl <= 0 { + l.md.ttl = defaultTTL + } + l.md.readBufferSize = md.GetInt(readBufferSize) + if l.md.readBufferSize <= 0 { + l.md.readBufferSize = defaultReadBufferSize + } + + l.md.readQueueSize = md.GetInt(readQueueSize) + if l.md.readQueueSize <= 0 { + l.md.readQueueSize = defaultReadQueueSize + } + + l.md.connQueueSize = md.GetInt(connQueueSize) + if l.md.connQueueSize <= 0 { + l.md.connQueueSize = defaultConnQueueSize + } + + return +}