From 82cd924c867fe05bf3b4f31bc490a4fee8170fb6 Mon Sep 17 00:00:00 2001 From: ginuerzh Date: Sat, 14 Jan 2023 13:15:15 +0800 Subject: [PATCH] add tunnel feature for relay --- config/config.go | 17 + config/parsing/parse.go | 56 ++++ config/parsing/service.go | 1 + connector/relay/bind.go | 79 ++++- connector/relay/connector.go | 6 +- connector/relay/listener.go | 1 + connector/relay/metadata.go | 12 + go.mod | 5 +- go.sum | 10 +- handler/forward/local/handler.go | 2 +- handler/forward/remote/handler.go | 2 +- handler/relay/bind.go | 47 +++ handler/relay/entrypoint.go | 170 ++++++++++ handler/relay/handler.go | 130 ++++++-- handler/relay/metadata.go | 10 + handler/relay/tunnel.go | 149 +++++++++ ingress/ingress.go | 304 ++++++++++++++++++ .../util}/forward/forward.go | 0 registry/ingress.go | 37 +++ registry/registry.go | 7 + 20 files changed, 1000 insertions(+), 45 deletions(-) create mode 100644 handler/relay/entrypoint.go create mode 100644 handler/relay/tunnel.go create mode 100644 ingress/ingress.go rename {handler/forward/internal => internal/util}/forward/forward.go (100%) create mode 100644 registry/ingress.go diff --git a/config/config.go b/config/config.go index 9ab2148..65146b7 100644 --- a/config/config.go +++ b/config/config.go @@ -193,6 +193,20 @@ type HostsConfig struct { HTTP *HTTPLoader `yaml:"http,omitempty" json:"http,omitempty"` } +type IngressRuleConfig struct { + Hostname string `json:"hostname"` + Endpoint string `json:"endpoint"` +} + +type IngressConfig struct { + Name string `json:"name"` + Rules []*IngressRuleConfig `yaml:",omitempty" json:"rules,omitempty"` + Reload time.Duration `yaml:",omitempty" json:"reload,omitempty"` + File *FileLoader `yaml:",omitempty" json:"file,omitempty"` + Redis *RedisLoader `yaml:",omitempty" json:"redis,omitempty"` + HTTP *HTTPLoader `yaml:"http,omitempty" json:"http,omitempty"` +} + type RecorderConfig struct { Name string `json:"name"` File *FileRecorder `yaml:",omitempty" json:"file,omitempty"` @@ -246,6 +260,7 @@ type HandlerConfig struct { Authers []string `yaml:",omitempty" json:"authers,omitempty"` Auth *AuthConfig `yaml:",omitempty" json:"auth,omitempty"` TLS *TLSConfig `yaml:",omitempty" json:"tls,omitempty"` + Ingress string `yaml:",omitempty" json:"ingress,omitempty"` Metadata map[string]any `yaml:",omitempty" json:"metadata,omitempty"` } @@ -357,6 +372,7 @@ type Config struct { Bypasses []*BypassConfig `yaml:",omitempty" json:"bypasses,omitempty"` Resolvers []*ResolverConfig `yaml:",omitempty" json:"resolvers,omitempty"` Hosts []*HostsConfig `yaml:",omitempty" json:"hosts,omitempty"` + Ingresses []*IngressConfig `yaml:",omitempty" json:"ingresses,omitempty"` Recorders []*RecorderConfig `yaml:",omitempty" json:"recorders,omitempty"` Limiters []*LimiterConfig `yaml:",omitempty" json:"limiters,omitempty"` CLimiters []*LimiterConfig `yaml:"climiters,omitempty" json:"climiters,omitempty"` @@ -404,6 +420,7 @@ func (c *Config) Write(w io.Writer, format string) error { default: enc := yaml.NewEncoder(w) defer enc.Close() + enc.SetIndent(2) return enc.Encode(c) } diff --git a/config/parsing/parse.go b/config/parsing/parse.go index b061df6..a19ef0f 100644 --- a/config/parsing/parse.go +++ b/config/parsing/parse.go @@ -9,6 +9,7 @@ import ( "github.com/go-gost/core/bypass" "github.com/go-gost/core/chain" "github.com/go-gost/core/hosts" + "github.com/go-gost/core/ingress" "github.com/go-gost/core/limiter/conn" "github.com/go-gost/core/limiter/rate" "github.com/go-gost/core/limiter/traffic" @@ -21,6 +22,7 @@ import ( bypass_impl "github.com/go-gost/x/bypass" "github.com/go-gost/x/config" xhosts "github.com/go-gost/x/hosts" + xingress "github.com/go-gost/x/ingress" "github.com/go-gost/x/internal/loader" xconn "github.com/go-gost/x/limiter/conn" xrate "github.com/go-gost/x/limiter/rate" @@ -319,6 +321,60 @@ func ParseHosts(cfg *config.HostsConfig) hosts.HostMapper { return xhosts.NewHostMapper(opts...) } +func ParseIngress(cfg *config.IngressConfig) ingress.Ingress { + if cfg == nil { + return nil + } + + var rules []xingress.Rule + for _, rule := range cfg.Rules { + if rule.Hostname == "" || rule.Endpoint == "" { + continue + } + + rules = append(rules, xingress.Rule{ + Host: rule.Hostname, + Endpoint: rule.Endpoint, + }) + } + opts := []xingress.Option{ + xingress.RulesOption(rules), + xingress.ReloadPeriodOption(cfg.Reload), + xingress.LoggerOption(logger.Default().WithFields(map[string]any{ + "kind": "ingress", + "ingress": cfg.Name, + })), + } + if cfg.File != nil && cfg.File.Path != "" { + opts = append(opts, xingress.FileLoaderOption(loader.FileLoader(cfg.File.Path))) + } + if cfg.Redis != nil && cfg.Redis.Addr != "" { + switch cfg.Redis.Type { + case "set": // redis set + opts = append(opts, xingress.RedisLoaderOption(loader.RedisSetLoader( + cfg.Redis.Addr, + loader.DBRedisLoaderOption(cfg.Redis.DB), + loader.PasswordRedisLoaderOption(cfg.Redis.Password), + loader.KeyRedisLoaderOption(cfg.Redis.Key), + ))) + default: // redis hash + opts = append(opts, xingress.RedisLoaderOption(loader.RedisHashLoader( + cfg.Redis.Addr, + loader.DBRedisLoaderOption(cfg.Redis.DB), + loader.PasswordRedisLoaderOption(cfg.Redis.Password), + loader.KeyRedisLoaderOption(cfg.Redis.Key), + ))) + } + } + if cfg.HTTP != nil && cfg.HTTP.URL != "" { + opts = append(opts, xingress.HTTPLoaderOption(loader.HTTPLoader( + cfg.HTTP.URL, + loader.TimeoutHTTPLoaderOption(cfg.HTTP.Timeout), + ))) + } + return xingress.NewIngress(opts...) +} + func ParseRecorder(cfg *config.RecorderConfig) (r recorder.Recorder) { if cfg == nil { return nil diff --git a/config/parsing/service.go b/config/parsing/service.go index 483b5bb..1665765 100644 --- a/config/parsing/service.go +++ b/config/parsing/service.go @@ -200,6 +200,7 @@ func ParseService(cfg *config.ServiceConfig) (service.Service, error) { handler.TLSConfigOption(tlsConfig), handler.RateLimiterOption(registry.RateLimiterRegistry().Get(cfg.RLimiter)), handler.LoggerOption(handlerLogger), + handler.ServiceOption(cfg.Name), ) } else { return nil, fmt.Errorf("unregistered handler: %s", cfg.Handler.Type) diff --git a/connector/relay/bind.go b/connector/relay/bind.go index 94e6c18..f99d33f 100644 --- a/connector/relay/bind.go +++ b/connector/relay/bind.go @@ -16,6 +16,10 @@ import ( // Bind implements connector.Binder. func (c *relayConnector) Bind(ctx context.Context, conn net.Conn, network, address string, opts ...connector.BindOption) (net.Listener, error) { + if !c.md.tunnelID.IsZero() { + return c.tunnel(ctx, conn, c.options.Logger) + } + log := c.options.Logger.WithFields(map[string]any{ "network": network, "address": address, @@ -39,8 +43,75 @@ func (c *relayConnector) Bind(ctx context.Context, conn net.Conn, network, addre } } +func (c *relayConnector) tunnel(ctx context.Context, conn net.Conn, log logger.Logger) (net.Listener, error) { + addr, cid, err := c.initTunnel(conn) + if err != nil { + return nil, err + } + log.Debugf("create tunnel %s connector %s OK", c.md.tunnelID, cid) + + session, err := mux.ServerSession(conn) + if err != nil { + return nil, err + } + + return &tcpListener{ + addr: addr, + session: session, + logger: log, + }, nil +} + +func (c *relayConnector) initTunnel(conn net.Conn) (addr net.Addr, cid relay.ConnectorID, err error) { + req := relay.Request{ + Version: relay.Version1, + Cmd: relay.CmdBind, + } + + if c.options.Auth != nil { + pwd, _ := c.options.Auth.Password() + req.Features = append(req.Features, &relay.UserAuthFeature{ + Username: c.options.Auth.Username(), + Password: pwd, + }) + } + + req.Features = append(req.Features, &relay.TunnelFeature{ + ID: c.md.tunnelID, + }) + if _, err = req.WriteTo(conn); err != nil { + return + } + + // first reply, bind status + resp := relay.Response{} + if _, err = resp.ReadFrom(conn); err != nil { + return + } + + if resp.Status != relay.StatusOK { + err = fmt.Errorf("%d: create tunnel %s failed", resp.Status, c.md.tunnelID) + return + } + + for _, f := range resp.Features { + switch f.Type() { + case relay.FeatureAddr: + if feature, _ := f.(*relay.AddrFeature); feature != nil { + addr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(feature.Host, strconv.Itoa(int(feature.Port)))) + } + case relay.FeatureTunnel: + if feature, _ := f.(*relay.TunnelFeature); feature != nil { + cid = feature.ID + } + } + } + + return +} + func (c *relayConnector) bindTCP(ctx context.Context, conn net.Conn, network, address string, log logger.Logger) (net.Listener, error) { - laddr, err := c.bind(conn, relay.BIND, network, address) + laddr, err := c.bind(conn, relay.CmdBind, network, address) if err != nil { return nil, err } @@ -59,7 +130,7 @@ func (c *relayConnector) bindTCP(ctx context.Context, conn net.Conn, network, ad } func (c *relayConnector) bindUDP(ctx context.Context, conn net.Conn, network, address string, opts *connector.BindOptions, log logger.Logger) (net.Listener, error) { - laddr, err := c.bind(conn, relay.FUDP|relay.BIND, network, address) + laddr, err := c.bind(conn, relay.FUDP|relay.CmdBind, network, address) if err != nil { return nil, err } @@ -80,10 +151,10 @@ func (c *relayConnector) bindUDP(ctx context.Context, conn net.Conn, network, ad return ln, nil } -func (c *relayConnector) bind(conn net.Conn, cmd uint8, network, address string) (net.Addr, error) { +func (c *relayConnector) bind(conn net.Conn, cmd relay.CmdType, network, address string) (net.Addr, error) { req := relay.Request{ Version: relay.Version1, - Flags: cmd, + Cmd: cmd, } if c.options.Auth != nil { diff --git a/connector/relay/connector.go b/connector/relay/connector.go index be4616e..1e6f540 100644 --- a/connector/relay/connector.go +++ b/connector/relay/connector.go @@ -53,14 +53,14 @@ func (c *relayConnector) Connect(ctx context.Context, conn net.Conn, network, ad req := relay.Request{ Version: relay.Version1, - Flags: relay.CONNECT, + Cmd: relay.CmdConnect, } if network == "udp" || network == "udp4" || network == "udp6" { - req.Flags |= relay.FUDP + req.Cmd |= relay.FUDP // UDP association if address == "" { - baddr, err := c.bind(conn, relay.FUDP|relay.BIND, network, address) + baddr, err := c.bind(conn, relay.FUDP|relay.CmdBind, network, address) if err != nil { return nil, err } diff --git a/connector/relay/listener.go b/connector/relay/listener.go index d6b1cbd..432b3ae 100644 --- a/connector/relay/listener.go +++ b/connector/relay/listener.go @@ -25,6 +25,7 @@ func (p *tcpListener) Accept() (net.Conn, error) { conn, err := p.getPeerConn(cc) if err != nil { cc.Close() + p.logger.Errorf("get peer failed: %s", err) return nil, err } diff --git a/connector/relay/metadata.go b/connector/relay/metadata.go index 385c632..910fd3d 100644 --- a/connector/relay/metadata.go +++ b/connector/relay/metadata.go @@ -5,21 +5,33 @@ import ( mdata "github.com/go-gost/core/metadata" mdutil "github.com/go-gost/core/metadata/util" + "github.com/go-gost/relay" + "github.com/google/uuid" ) type metadata struct { connectTimeout time.Duration noDelay bool + tunnelID relay.TunnelID } func (c *relayConnector) parseMetadata(md mdata.Metadata) (err error) { const ( connectTimeout = "connectTimeout" noDelay = "nodelay" + tunnelID = "tunnelID" ) c.md.connectTimeout = mdutil.GetDuration(md, connectTimeout) c.md.noDelay = mdutil.GetBool(md, noDelay) + if s := mdutil.GetString(md, tunnelID); s != "" { + uuid, err := uuid.Parse(s) + if err != nil { + return err + } + copy(c.md.tunnelID[:], uuid[:]) + } + return } diff --git a/go.mod b/go.mod index 4c9df7d..603f587 100644 --- a/go.mod +++ b/go.mod @@ -7,14 +7,15 @@ require ( github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d github.com/gin-contrib/cors v1.3.1 github.com/gin-gonic/gin v1.8.2 - github.com/go-gost/core v0.0.0-20221221101823-36ed0eae2dec + github.com/go-gost/core v0.0.0-20230114050924-1a8c1ccb1dc5 github.com/go-gost/gosocks4 v0.0.1 github.com/go-gost/gosocks5 v0.3.1-0.20211109033403-d894d75b7f09 - github.com/go-gost/relay v0.1.1-0.20211123134818-8ef7fd81ffd7 + github.com/go-gost/relay v0.2.0 github.com/go-gost/tls-dissector v0.0.2-0.20220408131628-aac992c27451 github.com/go-redis/redis/v8 v8.11.5 github.com/gobwas/glob v0.2.3 github.com/golang/snappy v0.0.4 + github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.5.0 github.com/lucas-clemente/quic-go v0.31.1 github.com/miekg/dns v1.1.50 diff --git a/go.sum b/go.sum index d120f73..e6e38a0 100644 --- a/go.sum +++ b/go.sum @@ -91,14 +91,14 @@ github.com/gin-gonic/gin v1.8.2/go.mod h1:qw5AYuDrzRTnhvusDsrov+fDIxp9Dleuu12h8n github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= -github.com/go-gost/core v0.0.0-20221221101823-36ed0eae2dec h1:QxAIPNaXlWpqCVYFjkRjw9EIJte3qI+J94Kh213TdaI= -github.com/go-gost/core v0.0.0-20221221101823-36ed0eae2dec/go.mod h1:R08B7BVdhWsYHX8s7wkEBpeKqc4+YFP6bLLFoao0J/A= +github.com/go-gost/core v0.0.0-20230114050924-1a8c1ccb1dc5 h1:X33/ce0ShlkTL28XxpLtirAvOJk99bWduSn7yuSAEU8= +github.com/go-gost/core v0.0.0-20230114050924-1a8c1ccb1dc5/go.mod h1:R08B7BVdhWsYHX8s7wkEBpeKqc4+YFP6bLLFoao0J/A= github.com/go-gost/gosocks4 v0.0.1 h1:+k1sec8HlELuQV7rWftIkmy8UijzUt2I6t+iMPlGB2s= github.com/go-gost/gosocks4 v0.0.1/go.mod h1:3B6L47HbU/qugDg4JnoFPHgJXE43Inz8Bah1QaN9qCc= github.com/go-gost/gosocks5 v0.3.1-0.20211109033403-d894d75b7f09 h1:A95M6UWcfZgOuJkQ7QLfG0Hs5peWIUSysCDNz4pfe04= github.com/go-gost/gosocks5 v0.3.1-0.20211109033403-d894d75b7f09/go.mod h1:1G6I7HP7VFVxveGkoK8mnprnJqSqJjdcASKsdUn4Pp4= -github.com/go-gost/relay v0.1.1-0.20211123134818-8ef7fd81ffd7 h1:itaaJhQJ19kUXEB4Igb0EbY8m+1Py2AaNNSBds/9gk4= -github.com/go-gost/relay v0.1.1-0.20211123134818-8ef7fd81ffd7/go.mod h1:lcX+23LCQ3khIeASBo+tJ/WbwXFO32/N5YN6ucuYTG8= +github.com/go-gost/relay v0.2.0 h1:8udTweykgDUdOY1j1U90fApNuG7Sp7pvKoiIp3eV6ME= +github.com/go-gost/relay v0.2.0/go.mod h1:lcX+23LCQ3khIeASBo+tJ/WbwXFO32/N5YN6ucuYTG8= github.com/go-gost/tls-dissector v0.0.2-0.20220408131628-aac992c27451 h1:xj8gUZGYO3nb5+6Bjw9+tsFkA9sYynrOvDvvC4uDV2I= github.com/go-gost/tls-dissector v0.0.2-0.20220408131628-aac992c27451/go.mod h1:/9QfdewqmHdaE362Hv5nDaSWLx3pCmtD870d6GaquXs= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -195,6 +195,8 @@ github.com/google/pprof v0.0.0-20221219190121-3cb0bae90811 h1:wORs2YN3R3ona/CXYu github.com/google/pprof v0.0.0-20221219190121-3cb0bae90811/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= diff --git a/handler/forward/local/handler.go b/handler/forward/local/handler.go index 5f8ec12..63ca7e8 100644 --- a/handler/forward/local/handler.go +++ b/handler/forward/local/handler.go @@ -12,8 +12,8 @@ import ( "github.com/go-gost/core/handler" md "github.com/go-gost/core/metadata" xchain "github.com/go-gost/x/chain" - "github.com/go-gost/x/handler/forward/internal/forward" netpkg "github.com/go-gost/x/internal/net" + "github.com/go-gost/x/internal/util/forward" "github.com/go-gost/x/registry" ) diff --git a/handler/forward/remote/handler.go b/handler/forward/remote/handler.go index d2b7abf..3581509 100644 --- a/handler/forward/remote/handler.go +++ b/handler/forward/remote/handler.go @@ -11,8 +11,8 @@ import ( "github.com/go-gost/core/chain" "github.com/go-gost/core/handler" md "github.com/go-gost/core/metadata" - "github.com/go-gost/x/handler/forward/internal/forward" netpkg "github.com/go-gost/x/internal/net" + "github.com/go-gost/x/internal/util/forward" "github.com/go-gost/x/registry" ) diff --git a/handler/relay/bind.go b/handler/relay/bind.go index ebaeef3..1b03466 100644 --- a/handler/relay/bind.go +++ b/handler/relay/bind.go @@ -12,6 +12,7 @@ import ( "github.com/go-gost/x/internal/net/udp" "github.com/go-gost/x/internal/util/mux" relay_util "github.com/go-gost/x/internal/util/relay" + "github.com/google/uuid" ) func (h *relayHandler) handleBind(ctx context.Context, conn net.Conn, network, address string, log logger.Logger) error { @@ -191,3 +192,49 @@ func (h *relayHandler) serveTCPBind(ctx context.Context, conn net.Conn, ln net.L }(rc) } } + +func (h *relayHandler) handleTunnel(ctx context.Context, conn net.Conn, tunnelID relay.TunnelID, log logger.Logger) (err error) { + resp := relay.Response{ + Version: relay.Version1, + Status: relay.StatusOK, + } + + if h.ep == nil { + resp.Status = relay.StatusServiceUnavailable + resp.WriteTo(conn) + return + } + + uuid, err := uuid.NewRandom() + if err != nil { + resp.Status = relay.StatusInternalServerError + resp.WriteTo(conn) + return + } + + var connectorID relay.ConnectorID + copy(connectorID[:], uuid[:]) + + af := &relay.AddrFeature{} + err = af.ParseFrom(h.ep.Addr().String()) + if err != nil { + log.Warn(err) + } + resp.Features = append(resp.Features, af, + &relay.TunnelFeature{ + ID: connectorID, + }, + ) + resp.WriteTo(conn) + + // Upgrade connection to multiplex session. + session, err := mux.ClientSession(conn) + if err != nil { + return + } + + h.pool.Add(tunnelID, NewConnector(connectorID, session)) + log.Debugf("tunnel %s connector %s established", tunnelID, connectorID) + + return +} diff --git a/handler/relay/entrypoint.go b/handler/relay/entrypoint.go new file mode 100644 index 0000000..22e677f --- /dev/null +++ b/handler/relay/entrypoint.go @@ -0,0 +1,170 @@ +package relay + +import ( + "context" + "fmt" + "io" + "net" + "time" + + "github.com/go-gost/core/handler" + "github.com/go-gost/core/ingress" + "github.com/go-gost/core/listener" + md "github.com/go-gost/core/metadata" + "github.com/go-gost/relay" + admission "github.com/go-gost/x/admission/wrapper" + xnet "github.com/go-gost/x/internal/net" + "github.com/go-gost/x/internal/net/proxyproto" + "github.com/go-gost/x/internal/util/forward" + climiter "github.com/go-gost/x/limiter/conn/wrapper" + limiter "github.com/go-gost/x/limiter/traffic/wrapper" + metrics "github.com/go-gost/x/metrics/wrapper" + "github.com/google/uuid" +) + +type epListener struct { + ln net.Listener + options listener.Options +} + +func NewEntryPointListener(opts ...listener.Option) listener.Listener { + options := listener.Options{} + for _, opt := range opts { + opt(&options) + } + return &epListener{ + options: options, + } +} + +func (l *epListener) Init(md md.Metadata) (err error) { + network := "tcp" + if xnet.IsIPv4(l.options.Addr) { + network = "tcp4" + } + ln, err := net.Listen(network, l.options.Addr) + if err != nil { + return + } + + // l.logger.Debugf("pp: %d", l.options.ProxyProtocol) + + ln = metrics.WrapListener(l.options.Service, ln) + ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = admission.WrapListener(l.options.Admission, ln) + ln = limiter.WrapListener(l.options.TrafficLimiter, ln) + ln = climiter.WrapListener(l.options.ConnLimiter, ln) + l.ln = ln + + return +} + +func (l *epListener) Accept() (conn net.Conn, err error) { + return l.ln.Accept() +} + +func (l *epListener) Addr() net.Addr { + return l.ln.Addr() +} + +func (l *epListener) Close() error { + return l.ln.Close() +} + +type epHandler struct { + pool *ConnectorPool + ingress ingress.Ingress + options handler.Options +} + +func NewEntryPointHandler(pool *ConnectorPool, ingress ingress.Ingress, opts ...handler.Option) handler.Handler { + options := handler.Options{} + for _, opt := range opts { + opt(&options) + } + + return &epHandler{ + pool: pool, + ingress: ingress, + options: options, + } +} + +func (h *epHandler) Init(md md.Metadata) (err error) { + return +} + +func (h *epHandler) Handle(ctx context.Context, conn net.Conn, opts ...handler.HandleOption) error { + defer conn.Close() + + start := time.Now() + log := h.options.Logger.WithFields(map[string]any{ + "remote": conn.RemoteAddr().String(), + "local": conn.LocalAddr().String(), + }) + + log.Infof("%s <> %s", conn.RemoteAddr(), conn.LocalAddr()) + defer func() { + log.WithFields(map[string]any{ + "duration": time.Since(start), + }).Infof("%s >< %s", conn.RemoteAddr(), conn.LocalAddr()) + }() + + var rw io.ReadWriter = conn + var host string + var protocol string + rw, host, protocol, _ = forward.Sniffing(ctx, conn) + h.options.Logger.Debugf("sniffing: host=%s, protocol=%s", host, protocol) + + var tunnelID relay.TunnelID + if h.ingress != nil { + v := h.ingress.Get(host) + uuid, _ := uuid.Parse(v) + copy(tunnelID[:], uuid[:]) + } + log = log.WithFields(map[string]any{ + "tunnel": tunnelID.String(), + }) + + var cc net.Conn + var err error + for i := 0; i < 3; i++ { + c := h.pool.Get(tunnelID) + if c == nil { + err = fmt.Errorf("tunnel %s not available", tunnelID.String()) + break + } + + cc, err = c.Session().GetConn() + if err != nil { + log.Error(err) + continue + } + break + } + if err != nil { + log.Error(err) + return err + } + defer cc.Close() + + log.Debugf("%s >> %s", conn.RemoteAddr(), cc.RemoteAddr()) + + af := &relay.AddrFeature{} + af.ParseFrom(conn.RemoteAddr().String()) + resp := relay.Response{ + Version: relay.Version1, + Status: relay.StatusOK, + Features: []relay.Feature{af}, + } + resp.WriteTo(cc) + + t := time.Now() + log.Debugf("%s <-> %s", conn.RemoteAddr(), cc.RemoteAddr()) + xnet.Transport(rw, cc) + log.WithFields(map[string]any{ + "duration": time.Since(t), + }).Debugf("%s >-< %s", conn.RemoteAddr(), cc.RemoteAddr()) + + return nil +} diff --git a/handler/relay/handler.go b/handler/relay/handler.go index 5ef92af..5e7e2ce 100644 --- a/handler/relay/handler.go +++ b/handler/relay/handler.go @@ -3,20 +3,26 @@ package relay import ( "context" "errors" + "fmt" "net" "strconv" "time" "github.com/go-gost/core/chain" "github.com/go-gost/core/handler" + "github.com/go-gost/core/listener" md "github.com/go-gost/core/metadata" + "github.com/go-gost/core/service" "github.com/go-gost/relay" "github.com/go-gost/x/registry" + xservice "github.com/go-gost/x/service" ) var ( - ErrBadVersion = errors.New("relay: bad version") - ErrUnknownCmd = errors.New("relay: unknown command") + ErrBadVersion = errors.New("relay: bad version") + ErrUnknownCmd = errors.New("relay: unknown command") + ErrUnauthorized = errors.New("relay: unauthorized") + ErrRateLimit = errors.New("relay: rate limiting exceeded") ) func init() { @@ -28,6 +34,8 @@ type relayHandler struct { router *chain.Router md metadata options handler.Options + ep service.Service + pool *ConnectorPool } func NewHandler(opts ...handler.Option) handler.Handler { @@ -38,6 +46,7 @@ func NewHandler(opts ...handler.Option) handler.Handler { return &relayHandler{ options: options, + pool: NewConnectorPool(), } } @@ -51,17 +60,61 @@ func (h *relayHandler) Init(md md.Metadata) (err error) { h.router = chain.NewRouter(chain.LoggerRouterOption(h.options.Logger)) } + if err = h.initEntryPoint(); err != nil { + return + } return nil } +func (h *relayHandler) initEntryPoint() (err error) { + if h.md.entryPoint == "" { + return + } + + serviceName := fmt.Sprintf("%s-ep", h.options.Service) + log := h.options.Logger.WithFields(map[string]any{ + "service": serviceName, + "listener": "tunnel", + "handler": "tunnel", + }) + epListener := NewEntryPointListener( + listener.AddrOption(h.md.entryPoint), + listener.ServiceOption(serviceName), + listener.LoggerOption(log.WithFields(map[string]any{ + "kind": "listener", + })), + ) + if err = epListener.Init(nil); err != nil { + return + } + epHandler := NewEntryPointHandler( + h.pool, + h.md.ingress, + handler.ServiceOption(serviceName), + handler.LoggerOption(log.WithFields(map[string]any{ + "kind": "handler", + })), + ) + if err = epHandler.Init(nil); err != nil { + return + } + + h.ep = xservice.NewService( + serviceName, epListener, epHandler, + xservice.LoggerOption(log), + ) + go h.ep.Serve() + log.Infof("entrypoint: %s", h.ep.Addr()) + + return +} + // Forward implements handler.Forwarder. func (h *relayHandler) Forward(hop chain.Hop) { h.hop = hop } -func (h *relayHandler) Handle(ctx context.Context, conn net.Conn, opts ...handler.HandleOption) error { - defer conn.Close() - +func (h *relayHandler) Handle(ctx context.Context, conn net.Conn, opts ...handler.HandleOption) (err error) { start := time.Now() log := h.options.Logger.WithFields(map[string]any{ "remote": conn.RemoteAddr().String(), @@ -69,14 +122,19 @@ func (h *relayHandler) Handle(ctx context.Context, conn net.Conn, opts ...handle }) log.Infof("%s <> %s", conn.RemoteAddr(), conn.LocalAddr()) + + var tunnelID relay.TunnelID defer func() { + if tunnelID.IsZero() || err != nil { + conn.Close() + } log.WithFields(map[string]any{ "duration": time.Since(start), }).Infof("%s >< %s", conn.RemoteAddr(), conn.LocalAddr()) }() if !h.checkRateLimit(conn.RemoteAddr()) { - return nil + return ErrRateLimit } if h.md.readTimeout > 0 { @@ -85,28 +143,38 @@ func (h *relayHandler) Handle(ctx context.Context, conn net.Conn, opts ...handle req := relay.Request{} if _, err := req.ReadFrom(conn); err != nil { - log.Error(err) return err } conn.SetReadDeadline(time.Time{}) + resp := relay.Response{ + Version: relay.Version1, + Status: relay.StatusOK, + } + if req.Version != relay.Version1 { - err := ErrBadVersion - log.Error(err) - return err + resp.Status = relay.StatusBadRequest + resp.WriteTo(conn) + return ErrBadVersion } var user, pass string var address string for _, f := range req.Features { - if f.Type() == relay.FeatureUserAuth { - feature := f.(*relay.UserAuthFeature) - user, pass = feature.Username, feature.Password - } - if f.Type() == relay.FeatureAddr { - feature := f.(*relay.AddrFeature) - address = net.JoinHostPort(feature.Host, strconv.Itoa(int(feature.Port))) + switch f.Type() { + case relay.FeatureUserAuth: + if feature, _ := f.(*relay.UserAuthFeature); feature != nil { + user, pass = feature.Username, feature.Password + } + case relay.FeatureAddr: + if feature, _ := f.(*relay.AddrFeature); feature != nil { + address = net.JoinHostPort(feature.Host, strconv.Itoa(int(feature.Port))) + } + case relay.FeatureTunnel: + if feature, _ := f.(*relay.TunnelFeature); feature != nil { + tunnelID = feature.ID + } } } @@ -114,19 +182,15 @@ func (h *relayHandler) Handle(ctx context.Context, conn net.Conn, opts ...handle log = log.WithFields(map[string]any{"user": user}) } - resp := relay.Response{ - Version: relay.Version1, - Status: relay.StatusOK, - } - if h.options.Auther != nil && !h.options.Auther.Authenticate(user, pass) { + if h.options.Auther != nil && + !h.options.Auther.Authenticate(user, pass) { resp.Status = relay.StatusUnauthorized - log.Error("unauthorized") - _, err := resp.WriteTo(conn) - return err + resp.WriteTo(conn) + return ErrUnauthorized } network := "tcp" - if (req.Flags & relay.FUDP) == relay.FUDP { + if (req.Cmd & relay.FUDP) == relay.FUDP { network = "udp" } @@ -141,13 +205,19 @@ func (h *relayHandler) Handle(ctx context.Context, conn net.Conn, opts ...handle return h.handleForward(ctx, conn, network, log) } - switch req.Flags & relay.CmdMask { - case 0, relay.CONNECT: + switch req.Cmd & relay.CmdMask { + case 0, relay.CmdConnect: return h.handleConnect(ctx, conn, network, address, log) - case relay.BIND: + case relay.CmdBind: + if !tunnelID.IsZero() { + return h.handleTunnel(ctx, conn, tunnelID, log) + } return h.handleBind(ctx, conn, network, address, log) + default: + resp.Status = relay.StatusBadRequest + resp.WriteTo(conn) + return ErrUnknownCmd } - return ErrUnknownCmd } func (h *relayHandler) checkRateLimit(addr net.Addr) bool { diff --git a/handler/relay/metadata.go b/handler/relay/metadata.go index 27fbae7..bc8de2c 100644 --- a/handler/relay/metadata.go +++ b/handler/relay/metadata.go @@ -4,8 +4,10 @@ import ( "math" "time" + "github.com/go-gost/core/ingress" mdata "github.com/go-gost/core/metadata" mdutil "github.com/go-gost/core/metadata/util" + "github.com/go-gost/x/registry" ) type metadata struct { @@ -14,6 +16,8 @@ type metadata struct { udpBufferSize int noDelay bool hash string + entryPoint string + ingress ingress.Ingress } func (h *relayHandler) parseMetadata(md mdata.Metadata) (err error) { @@ -23,6 +27,8 @@ func (h *relayHandler) parseMetadata(md mdata.Metadata) (err error) { udpBufferSize = "udpBufferSize" noDelay = "nodelay" hash = "hash" + entryPoint = "entryPoint" + ingress = "ingress" ) h.md.readTimeout = mdutil.GetDuration(md, readTimeout) @@ -36,5 +42,9 @@ func (h *relayHandler) parseMetadata(md mdata.Metadata) (err error) { } h.md.hash = mdutil.GetString(md, hash) + + h.md.entryPoint = mdutil.GetString(md, entryPoint) + h.md.ingress = registry.IngressRegistry().Get(mdutil.GetString(md, ingress)) + return } diff --git a/handler/relay/tunnel.go b/handler/relay/tunnel.go new file mode 100644 index 0000000..27e9745 --- /dev/null +++ b/handler/relay/tunnel.go @@ -0,0 +1,149 @@ +package relay + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/go-gost/core/logger" + "github.com/go-gost/relay" + "github.com/go-gost/x/internal/util/mux" +) + +type Connector struct { + id relay.ConnectorID + t time.Time + s *mux.Session +} + +func NewConnector(id relay.ConnectorID, s *mux.Session) *Connector { + c := &Connector{ + id: id, + t: time.Now(), + s: s, + } + go c.accept() + return c +} + +func (c *Connector) accept() { + for { + conn, err := c.s.Accept() + if err != nil { + logger.Default().Errorf("connector %s: %v", c.id, err) + c.s.Close() + return + } + conn.Close() + } +} + +func (c *Connector) ID() relay.ConnectorID { + return c.id +} + +func (c *Connector) Session() *mux.Session { + return c.s +} + +type Tunnel struct { + id relay.TunnelID + connectors []*Connector + t time.Time + n uint64 + mu sync.RWMutex +} + +func NewTunnel(id relay.TunnelID) *Tunnel { + t := &Tunnel{ + id: id, + t: time.Now(), + } + go t.clean() + return t +} + +func (t *Tunnel) ID() relay.TunnelID { + return t.id +} + +func (t *Tunnel) AddConnector(c *Connector) { + if c == nil { + return + } + + t.mu.Lock() + defer t.mu.Unlock() + + t.connectors = append(t.connectors, c) +} + +func (t *Tunnel) GetConnector() *Connector { + t.mu.RLock() + defer t.mu.RUnlock() + + if len(t.connectors) == 0 { + return nil + } + + n := atomic.AddUint64(&t.n, 1) - 1 + return t.connectors[n%uint64(len(t.connectors))] +} + +func (t *Tunnel) clean() { + ticker := time.NewTicker(3 * time.Second) + for range ticker.C { + t.mu.Lock() + var connectors []*Connector + for _, c := range t.connectors { + if c.Session().IsClosed() { + logger.Default().Debugf("remove tunnel %s connector %s", t.id, c.id) + continue + } + connectors = append(connectors, c) + } + if len(connectors) != len(t.connectors) { + t.connectors = connectors + } + t.mu.Unlock() + } +} + +type ConnectorPool struct { + tunnels map[relay.TunnelID]*Tunnel + mu sync.RWMutex +} + +func NewConnectorPool() *ConnectorPool { + return &ConnectorPool{ + tunnels: make(map[relay.TunnelID]*Tunnel), + } +} + +func (p *ConnectorPool) Add(tid relay.TunnelID, c *Connector) { + p.mu.Lock() + defer p.mu.Unlock() + + t := p.tunnels[tid] + if t == nil { + t = NewTunnel(tid) + p.tunnels[tid] = t + } + t.AddConnector(c) +} + +func (p *ConnectorPool) Get(tid relay.TunnelID) *Connector { + if p == nil { + return nil + } + + p.mu.RLock() + defer p.mu.RUnlock() + + t := p.tunnels[tid] + if t == nil { + return nil + } + + return t.GetConnector() +} diff --git a/ingress/ingress.go b/ingress/ingress.go new file mode 100644 index 0000000..71bd788 --- /dev/null +++ b/ingress/ingress.go @@ -0,0 +1,304 @@ +package ingress + +import ( + "bufio" + "context" + "io" + "net" + "strings" + "sync" + "time" + + ingress_pkg "github.com/go-gost/core/ingress" + "github.com/go-gost/core/logger" + "github.com/go-gost/x/internal/loader" +) + +type Rule struct { + Host string + Endpoint string +} + +type options struct { + rules []Rule + fileLoader loader.Loader + redisLoader loader.Loader + httpLoader loader.Loader + period time.Duration + logger logger.Logger +} + +type Option func(opts *options) + +func RulesOption(rules []Rule) Option { + return func(opts *options) { + opts.rules = rules + } +} + +func ReloadPeriodOption(period time.Duration) Option { + return func(opts *options) { + opts.period = period + } +} + +func FileLoaderOption(fileLoader loader.Loader) Option { + return func(opts *options) { + opts.fileLoader = fileLoader + } +} + +func RedisLoaderOption(redisLoader loader.Loader) Option { + return func(opts *options) { + opts.redisLoader = redisLoader + } +} + +func HTTPLoaderOption(httpLoader loader.Loader) Option { + return func(opts *options) { + opts.httpLoader = httpLoader + } +} + +func LoggerOption(logger logger.Logger) Option { + return func(opts *options) { + opts.logger = logger + } +} + +type ingress struct { + rules map[string]Rule + cancelFunc context.CancelFunc + options options + mu sync.RWMutex +} + +// NewIngress creates and initializes a new Ingress. +func NewIngress(opts ...Option) ingress_pkg.Ingress { + var options options + for _, opt := range opts { + opt(&options) + } + + ctx, cancel := context.WithCancel(context.TODO()) + + ing := &ingress{ + cancelFunc: cancel, + options: options, + } + + if err := ing.reload(ctx); err != nil { + options.logger.Warnf("reload: %v", err) + } + if ing.options.period > 0 { + go ing.periodReload(ctx) + } + + return ing +} + +func (ing *ingress) periodReload(ctx context.Context) error { + period := ing.options.period + if period < time.Second { + period = time.Second + } + ticker := time.NewTicker(period) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := ing.reload(ctx); err != nil { + ing.options.logger.Warnf("reload: %v", err) + // return err + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (ing *ingress) reload(ctx context.Context) error { + rules := make(map[string]Rule) + + fn := func(rule Rule) { + if rule.Host == "" || rule.Endpoint == "" { + return + } + host := rule.Host + if host[0] == '*' { + host = "." + host[1:] + } + rules[host] = rule + } + + for _, rule := range ing.options.rules { + fn(rule) + } + + v, err := ing.load(ctx) + if err != nil { + return err + } + for _, rule := range v { + fn(rule) + } + + ing.mu.Lock() + defer ing.mu.Unlock() + + ing.rules = rules + + return nil +} + +func (ing *ingress) load(ctx context.Context) (rules []Rule, err error) { + if ing.options.fileLoader != nil { + if lister, ok := ing.options.fileLoader.(loader.Lister); ok { + list, er := lister.List(ctx) + if er != nil { + ing.options.logger.Warnf("file loader: %v", er) + } + for _, s := range list { + rules = append(rules, ing.parseLine(s)) + } + } else { + r, er := ing.options.fileLoader.Load(ctx) + if er != nil { + ing.options.logger.Warnf("file loader: %v", er) + } + if v, _ := ing.parseRules(r); v != nil { + rules = append(rules, v...) + } + } + } + if ing.options.redisLoader != nil { + if lister, ok := ing.options.redisLoader.(loader.Lister); ok { + list, er := lister.List(ctx) + if er != nil { + ing.options.logger.Warnf("redis loader: %v", er) + } + for _, v := range list { + rules = append(rules, ing.parseLine(v)) + } + } else { + r, er := ing.options.redisLoader.Load(ctx) + if er != nil { + ing.options.logger.Warnf("redis loader: %v", er) + } + v, _ := ing.parseRules(r) + rules = append(rules, v...) + } + } + if ing.options.httpLoader != nil { + r, er := ing.options.httpLoader.Load(ctx) + if er != nil { + ing.options.logger.Warnf("http loader: %v", er) + } + v, _ := ing.parseRules(r) + rules = append(rules, v...) + } + + ing.options.logger.Debugf("load items %d", len(rules)) + return +} + +func (ing *ingress) parseRules(r io.Reader) (rules []Rule, err error) { + if r == nil { + return + } + + scanner := bufio.NewScanner(r) + for scanner.Scan() { + if rule := ing.parseLine(scanner.Text()); rule.Host != "" { + rules = append(rules, rule) + } + } + + err = scanner.Err() + return +} + +func (ing *ingress) Get(host string) string { + if host == "" || ing == nil { + return "" + } + + // try to strip the port + if v, _, _ := net.SplitHostPort(host); v != "" { + host = v + } + + if ing == nil || len(ing.rules) == 0 { + return "" + } + + ing.options.logger.Debugf("ingress: lookup %s", host) + ep := ing.lookup(host) + if ep == "" { + ep = ing.lookup("." + host) + } + if ep == "" { + s := host + for { + if index := strings.IndexByte(s, '.'); index > 0 { + ep = ing.lookup(s[index:]) + s = s[index+1:] + if ep == "" { + continue + } + } + break + } + } + + if ep != "" { + ing.options.logger.Debugf("ingress: %s -> %s", host, ep) + } + + return ep +} + +func (ing *ingress) lookup(host string) string { + if ing == nil || len(ing.rules) == 0 { + return "" + } + + ing.mu.RLock() + defer ing.mu.RUnlock() + + return ing.rules[host].Endpoint +} + +func (ing *ingress) parseLine(s string) (rule Rule) { + line := strings.Replace(s, "\t", " ", -1) + line = strings.TrimSpace(line) + if n := strings.IndexByte(line, '#'); n >= 0 { + line = line[:n] + } + var sp []string + for _, s := range strings.Split(line, " ") { + if s = strings.TrimSpace(s); s != "" { + sp = append(sp, s) + } + } + if len(sp) < 2 { + return // invalid lines are ignored + } + + return Rule{ + Host: sp[0], + Endpoint: sp[1], + } +} + +func (ing *ingress) Close() error { + ing.cancelFunc() + if ing.options.fileLoader != nil { + ing.options.fileLoader.Close() + } + if ing.options.redisLoader != nil { + ing.options.redisLoader.Close() + } + return nil +} diff --git a/handler/forward/internal/forward/forward.go b/internal/util/forward/forward.go similarity index 100% rename from handler/forward/internal/forward/forward.go rename to internal/util/forward/forward.go diff --git a/registry/ingress.go b/registry/ingress.go new file mode 100644 index 0000000..0c039d0 --- /dev/null +++ b/registry/ingress.go @@ -0,0 +1,37 @@ +package registry + +import ( + "github.com/go-gost/core/ingress" +) + +type ingressRegistry struct { + registry[ingress.Ingress] +} + +func (r *ingressRegistry) Register(name string, v ingress.Ingress) error { + return r.registry.Register(name, v) +} + +func (r *ingressRegistry) Get(name string) ingress.Ingress { + if name != "" { + return &ingressWrapper{name: name, r: r} + } + return nil +} + +func (r *ingressRegistry) get(name string) ingress.Ingress { + return r.registry.Get(name) +} + +type ingressWrapper struct { + name string + r *ingressRegistry +} + +func (w *ingressWrapper) Get(host string) string { + v := w.r.get(w.name) + if v == nil { + return "" + } + return v.Get(host) +} diff --git a/registry/registry.go b/registry/registry.go index 7978dc4..2d9e211 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -10,6 +10,7 @@ import ( "github.com/go-gost/core/bypass" "github.com/go-gost/core/chain" "github.com/go-gost/core/hosts" + "github.com/go-gost/core/ingress" "github.com/go-gost/core/limiter/conn" "github.com/go-gost/core/limiter/rate" "github.com/go-gost/core/limiter/traffic" @@ -41,6 +42,8 @@ var ( trafficLimiterReg reg.Registry[traffic.TrafficLimiter] = new(trafficLimiterRegistry) connLimiterReg reg.Registry[conn.ConnLimiter] = new(connLimiterRegistry) rateLimiterReg reg.Registry[rate.RateLimiter] = new(rateLimiterRegistry) + + ingressReg reg.Registry[ingress.Ingress] = new(ingressRegistry) ) type registry[T any] struct { @@ -155,3 +158,7 @@ func ConnLimiterRegistry() reg.Registry[conn.ConnLimiter] { func RateLimiterRegistry() reg.Registry[rate.RateLimiter] { return rateLimiterReg } + +func IngressRegistry() reg.Registry[ingress.Ingress] { + return ingressReg +}