diff --git a/handler/relay/bind.go b/handler/relay/bind.go index 4ef94a9..f137e6f 100644 --- a/handler/relay/bind.go +++ b/handler/relay/bind.go @@ -6,12 +6,14 @@ import ( "net" "time" + "github.com/go-gost/core/handler" + "github.com/go-gost/core/listener" "github.com/go-gost/core/logger" "github.com/go-gost/relay" - netpkg "github.com/go-gost/x/internal/net" "github.com/go-gost/x/internal/net/udp" "github.com/go-gost/x/internal/util/mux" relay_util "github.com/go-gost/x/internal/util/relay" + xservice "github.com/go-gost/x/service" "github.com/google/uuid" ) @@ -55,29 +57,70 @@ func (h *relayHandler) bindTCP(ctx context.Context, conn net.Conn, network, addr resp.WriteTo(conn) return err } + defer ln.Close() + + serviceName := fmt.Sprintf("%s-ep-%s", h.options.Service, ln.Addr()) + log = log.WithFields(map[string]any{ + "service": serviceName, + "listener": "tcp", + "handler": "ep-tcp", + "bind": fmt.Sprintf("%s/%s", ln.Addr(), ln.Addr().Network()), + }) af := &relay.AddrFeature{} - err = af.ParseFrom(ln.Addr().String()) - if err != nil { + if err := af.ParseFrom(ln.Addr().String()); err != nil { log.Warn(err) } - - // Issue: may not reachable when host has multi-interface - af.Host, _, _ = net.SplitHostPort(conn.LocalAddr().String()) - af.AType = relay.AddrIPv4 resp.Features = append(resp.Features, af) if _, err := resp.WriteTo(conn); err != nil { log.Error(err) - ln.Close() return err } - log = log.WithFields(map[string]any{ - "bind": fmt.Sprintf("%s/%s", ln.Addr(), ln.Addr().Network()), - }) - log.Debugf("bind on %s OK", ln.Addr()) + // Upgrade connection to multiplex session. + session, err := mux.ClientSession(conn) + if err != nil { + log.Error(err) + return err + } + defer session.Close() - return h.serveTCPBind(ctx, conn, ln, log) + epListener := newTCPListener(ln, + listener.AddrOption(address), + listener.ServiceOption(serviceName), + listener.LoggerOption(log.WithFields(map[string]any{ + "kind": "listener", + })), + ) + epHandler := newTCPHandler(session, + handler.ServiceOption(serviceName), + handler.LoggerOption(log.WithFields(map[string]any{ + "kind": "handler", + })), + ) + srv := xservice.NewService( + serviceName, epListener, epHandler, + xservice.LoggerOption(log.WithFields(map[string]any{ + "kind": "service", + })), + ) + + log = log.WithFields(map[string]any{}) + log.Debugf("bind on %s/%s OK", ln.Addr(), ln.Addr().Network()) + + go func() { + defer srv.Close() + for { + conn, err := session.Accept() + if err != nil { + log.Error(err) + return + } + conn.Close() // we do not handle incoming connections. + } + }() + + return srv.Serve() } func (h *relayHandler) bindUDP(ctx context.Context, conn net.Conn, network, address string, log logger.Logger) error { @@ -128,71 +171,6 @@ func (h *relayHandler) bindUDP(ctx context.Context, conn net.Conn, network, addr return nil } -func (h *relayHandler) serveTCPBind(ctx context.Context, conn net.Conn, ln net.Listener, log logger.Logger) error { - // Upgrade connection to multiplex stream. - session, err := mux.ClientSession(conn) - if err != nil { - log.Error(err) - return err - } - defer session.Close() - - go func() { - defer ln.Close() - for { - conn, err := session.Accept() - if err != nil { - log.Error(err) - return - } - conn.Close() // we do not handle incoming connections. - } - }() - - for { - rc, err := ln.Accept() - if err != nil { - log.Error(err) - return err - } - log.Debugf("peer %s accepted", rc.RemoteAddr()) - - go func(c net.Conn) { - defer c.Close() - - log = log.WithFields(map[string]any{ - "local": ln.Addr().String(), - "remote": c.RemoteAddr().String(), - }) - - sc, err := session.GetConn() - if err != nil { - log.Error(err) - return - } - defer sc.Close() - - af := &relay.AddrFeature{} - af.ParseFrom(c.RemoteAddr().String()) - resp := relay.Response{ - Version: relay.Version1, - Status: relay.StatusOK, - Features: []relay.Feature{af}, - } - if _, err := resp.WriteTo(sc); err != nil { - log.Error(err) - return - } - - t := time.Now() - log.Debugf("%s <-> %s", c.LocalAddr(), c.RemoteAddr()) - netpkg.Transport(sc, c) - log.WithFields(map[string]any{"duration": time.Since(t)}). - Debugf("%s >-< %s", c.LocalAddr(), c.RemoteAddr()) - }(rc) - } -} - func (h *relayHandler) handleBindTunnel(ctx context.Context, conn net.Conn, network string, tunnelID relay.TunnelID, log logger.Logger) (err error) { resp := relay.Response{ Version: relay.Version1, diff --git a/handler/relay/connect.go b/handler/relay/connect.go index 90bb305..062f89a 100644 --- a/handler/relay/connect.go +++ b/handler/relay/connect.go @@ -10,7 +10,6 @@ import ( "github.com/go-gost/core/logger" "github.com/go-gost/relay" - netpkg "github.com/go-gost/x/internal/net" xnet "github.com/go-gost/x/internal/net" sx "github.com/go-gost/x/internal/util/selector" ) @@ -89,7 +88,7 @@ func (h *relayHandler) handleConnect(ctx context.Context, conn net.Conn, network t := time.Now() log.Debugf("%s <-> %s", conn.RemoteAddr(), address) - netpkg.Transport(conn, cc) + xnet.Transport(conn, cc) log.WithFields(map[string]any{ "duration": time.Since(t), }).Debugf("%s >-< %s", conn.RemoteAddr(), address) diff --git a/handler/relay/entrypoint.go b/handler/relay/entrypoint.go index e446435..8975621 100644 --- a/handler/relay/entrypoint.go +++ b/handler/relay/entrypoint.go @@ -13,41 +13,35 @@ import ( md "github.com/go-gost/core/metadata" "github.com/go-gost/relay" admission "github.com/go-gost/x/admission/wrapper" + netpkg "github.com/go-gost/x/internal/net" xnet "github.com/go-gost/x/internal/net" "github.com/go-gost/x/internal/net/proxyproto" "github.com/go-gost/x/internal/util/forward" + "github.com/go-gost/x/internal/util/mux" climiter "github.com/go-gost/x/limiter/conn/wrapper" limiter "github.com/go-gost/x/limiter/traffic/wrapper" metrics "github.com/go-gost/x/metrics/wrapper" ) -type epListener struct { +type tcpListener struct { ln net.Listener options listener.Options } -func NewEntryPointListener(opts ...listener.Option) listener.Listener { +func newTCPListener(ln net.Listener, opts ...listener.Option) listener.Listener { options := listener.Options{} for _, opt := range opts { opt(&options) } - return &epListener{ + return &tcpListener{ + ln: ln, options: options, } } -func (l *epListener) Init(md md.Metadata) (err error) { - network := "tcp" - if xnet.IsIPv4(l.options.Addr) { - network = "tcp4" - } - ln, err := net.Listen(network, l.options.Addr) - if err != nil { - return - } - +func (l *tcpListener) Init(md md.Metadata) (err error) { // l.logger.Debugf("pp: %d", l.options.ProxyProtocol) - + ln := l.ln ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) ln = admission.WrapListener(l.options.Admission, ln) @@ -58,42 +52,106 @@ func (l *epListener) Init(md md.Metadata) (err error) { return } -func (l *epListener) Accept() (conn net.Conn, err error) { +func (l *tcpListener) Accept() (conn net.Conn, err error) { return l.ln.Accept() } -func (l *epListener) Addr() net.Addr { +func (l *tcpListener) Addr() net.Addr { return l.ln.Addr() } -func (l *epListener) Close() error { +func (l *tcpListener) Close() error { return l.ln.Close() } -type epHandler struct { - pool *ConnectorPool - ingress ingress.Ingress +type tcpHandler struct { + session *mux.Session options handler.Options } -func NewEntryPointHandler(pool *ConnectorPool, ingress ingress.Ingress, opts ...handler.Option) handler.Handler { +func newTCPHandler(session *mux.Session, opts ...handler.Option) handler.Handler { options := handler.Options{} for _, opt := range opts { opt(&options) } - return &epHandler{ + return &tcpHandler{ + session: session, + options: options, + } +} + +func (h *tcpHandler) Init(md md.Metadata) (err error) { + return +} + +func (h *tcpHandler) Handle(ctx context.Context, conn net.Conn, opts ...handler.HandleOption) error { + defer conn.Close() + + start := time.Now() + log := h.options.Logger.WithFields(map[string]any{ + "remote": conn.RemoteAddr().String(), + "local": conn.LocalAddr().String(), + }) + + log.Infof("%s <> %s", conn.RemoteAddr(), conn.LocalAddr()) + defer func() { + log.WithFields(map[string]any{ + "duration": time.Since(start), + }).Infof("%s >< %s", conn.RemoteAddr(), conn.LocalAddr()) + }() + + cc, err := h.session.GetConn() + if err != nil { + log.Error(err) + return err + } + defer cc.Close() + + af := &relay.AddrFeature{} + af.ParseFrom(conn.RemoteAddr().String()) + resp := relay.Response{ + Version: relay.Version1, + Status: relay.StatusOK, + Features: []relay.Feature{af}, + } + if _, err := resp.WriteTo(cc); err != nil { + log.Error(err) + return err + } + + t := time.Now() + log.Debugf("%s <-> %s", conn.RemoteAddr(), cc.RemoteAddr()) + netpkg.Transport(conn, cc) + log.WithFields(map[string]any{"duration": time.Since(t)}). + Debugf("%s >-< %s", conn.RemoteAddr(), cc.RemoteAddr()) + return nil +} + +type tunnelHandler struct { + pool *ConnectorPool + ingress ingress.Ingress + options handler.Options +} + +func newTunnelHandler(pool *ConnectorPool, ingress ingress.Ingress, opts ...handler.Option) handler.Handler { + options := handler.Options{} + for _, opt := range opts { + opt(&options) + } + + return &tunnelHandler{ pool: pool, ingress: ingress, options: options, } } -func (h *epHandler) Init(md md.Metadata) (err error) { +func (h *tunnelHandler) Init(md md.Metadata) (err error) { return } -func (h *epHandler) Handle(ctx context.Context, conn net.Conn, opts ...handler.HandleOption) error { +func (h *tunnelHandler) Handle(ctx context.Context, conn net.Conn, opts ...handler.HandleOption) error { defer conn.Close() start := time.Now() diff --git a/handler/relay/handler.go b/handler/relay/handler.go index 760dd31..f5a2d44 100644 --- a/handler/relay/handler.go +++ b/handler/relay/handler.go @@ -14,6 +14,7 @@ import ( md "github.com/go-gost/core/metadata" "github.com/go-gost/core/service" "github.com/go-gost/relay" + xnet "github.com/go-gost/x/internal/net" "github.com/go-gost/x/registry" xservice "github.com/go-gost/x/service" ) @@ -71,13 +72,25 @@ func (h *relayHandler) initEntryPoint() (err error) { return } - serviceName := fmt.Sprintf("%s-ep", h.options.Service) + network := "tcp" + if xnet.IsIPv4(h.md.entryPoint) { + network = "tcp4" + } + + ln, err := net.Listen(network, h.md.entryPoint) + if err != nil { + h.options.Logger.Error(err) + return + } + + serviceName := fmt.Sprintf("%s-ep-%s", h.options.Service, ln.Addr()) log := h.options.Logger.WithFields(map[string]any{ "service": serviceName, - "listener": "tunnel", - "handler": "tunnel", + "listener": "tcp", + "handler": "ep-tunnel", + "kind": "service", }) - epListener := NewEntryPointListener( + epListener := newTCPListener(ln, listener.AddrOption(h.md.entryPoint), listener.ServiceOption(serviceName), listener.LoggerOption(log.WithFields(map[string]any{ @@ -87,7 +100,7 @@ func (h *relayHandler) initEntryPoint() (err error) { if err = epListener.Init(nil); err != nil { return } - epHandler := NewEntryPointHandler( + epHandler := newTunnelHandler( h.pool, h.md.ingress, handler.ServiceOption(serviceName),