diff --git a/connector/relay/bind.go b/connector/relay/bind.go index 1e6b04c..08f279d 100644 --- a/connector/relay/bind.go +++ b/connector/relay/bind.go @@ -16,10 +16,6 @@ import ( // Bind implements connector.Binder. func (c *relayConnector) Bind(ctx context.Context, conn net.Conn, network, address string, opts ...connector.BindOption) (net.Listener, error) { - if !c.md.tunnelID.IsZero() { - return c.bindTunnel(ctx, conn, network, address, c.options.Logger) - } - log := c.options.Logger.WithFields(map[string]any{ "network": network, "address": address, @@ -43,86 +39,6 @@ func (c *relayConnector) Bind(ctx context.Context, conn net.Conn, network, addre } } -func (c *relayConnector) bindTunnel(ctx context.Context, conn net.Conn, network, address string, log logger.Logger) (net.Listener, error) { - addr, cid, err := c.initTunnel(conn, network, address) - if err != nil { - return nil, err - } - log.Infof("create tunnel on %s/%s OK, tunnel=%s, connector=%s", addr, network, c.md.tunnelID.String(), cid) - - session, err := mux.ServerSession(conn, c.md.muxCfg) - if err != nil { - return nil, err - } - - return &bindListener{ - network: network, - addr: addr, - session: session, - logger: log, - }, nil -} - -func (c *relayConnector) initTunnel(conn net.Conn, network, address string) (addr net.Addr, cid relay.ConnectorID, err error) { - req := relay.Request{ - Version: relay.Version1, - Cmd: relay.CmdBind, - } - - if network == "udp" { - req.Cmd |= relay.FUDP - } - - if c.options.Auth != nil { - pwd, _ := c.options.Auth.Password() - req.Features = append(req.Features, &relay.UserAuthFeature{ - Username: c.options.Auth.Username(), - Password: pwd, - }) - } - - af := &relay.AddrFeature{} - af.ParseFrom(address) - - req.Features = append(req.Features, af, - &relay.TunnelFeature{ - ID: c.md.tunnelID.ID(), - }, - ) - if _, err = req.WriteTo(conn); err != nil { - return - } - - // first reply, bind status - resp := relay.Response{} - if _, err = resp.ReadFrom(conn); err != nil { - return - } - - if resp.Status != relay.StatusOK { - err = fmt.Errorf("%d: create tunnel %s failed", resp.Status, c.md.tunnelID.String()) - return - } - - for _, f := range resp.Features { - switch f.Type() { - case relay.FeatureAddr: - if feature, _ := f.(*relay.AddrFeature); feature != nil { - addr = &bindAddr{ - network: network, - addr: net.JoinHostPort(feature.Host, strconv.Itoa(int(feature.Port))), - } - } - case relay.FeatureTunnel: - if feature, _ := f.(*relay.TunnelFeature); feature != nil { - cid = relay.NewConnectorID(feature.ID[:]) - } - } - } - - return -} - func (c *relayConnector) bindTCP(ctx context.Context, conn net.Conn, network, address string, log logger.Logger) (net.Listener, error) { laddr, err := c.bind(conn, relay.CmdBind, network, address) if err != nil { @@ -177,6 +93,14 @@ func (c *relayConnector) bind(conn net.Conn, cmd relay.CmdType, network, address Password: pwd, }) } + + nid := relay.NetworkTCP + if network == "udp" || network == "udp4" || network == "udp6" { + nid = relay.NetworkUDP + } + req.Features = append(req.Features, &relay.NetworkFeature{ + Network: nid, + }) fa := &relay.AddrFeature{} fa.ParseFrom(address) req.Features = append(req.Features, fa) diff --git a/connector/relay/conn.go b/connector/relay/conn.go index 1712051..4e9cc4f 100644 --- a/connector/relay/conn.go +++ b/connector/relay/conn.go @@ -19,6 +19,7 @@ type tcpConn struct { net.Conn wbuf *bytes.Buffer once sync.Once + mu sync.Mutex } func (c *tcpConn) Read(b []byte) (n int, err error) { @@ -36,6 +37,10 @@ func (c *tcpConn) Read(b []byte) (n int, err error) { func (c *tcpConn) Write(b []byte) (n int, err error) { n = len(b) // force byte length consistent + + c.mu.Lock() + defer c.mu.Unlock() + if c.wbuf != nil && c.wbuf.Len() > 0 { c.wbuf.Write(b) // append the data to the cached header _, err = c.Conn.Write(c.wbuf.Bytes()) @@ -50,6 +55,7 @@ type udpConn struct { net.Conn wbuf *bytes.Buffer once sync.Once + mu sync.Mutex } func (c *udpConn) Read(b []byte) (n int, err error) { @@ -88,6 +94,10 @@ func (c *udpConn) Write(b []byte) (n int, err error) { } n = len(b) + + c.mu.Lock() + defer c.mu.Unlock() + if c.wbuf != nil && c.wbuf.Len() > 0 { var bb [2]byte binary.BigEndian.PutUint16(bb[:], uint16(len(b))) diff --git a/connector/relay/connector.go b/connector/relay/connector.go index 9e41ca2..cfd4251 100644 --- a/connector/relay/connector.go +++ b/connector/relay/connector.go @@ -101,12 +101,6 @@ func (c *relayConnector) Connect(ctx context.Context, conn net.Conn, network, ad req.Features = append(req.Features, af) } - if !c.md.tunnelID.IsZero() { - req.Features = append(req.Features, &relay.TunnelFeature{ - ID: c.md.tunnelID.ID(), - }) - } - if c.md.noDelay { if _, err := req.WriteTo(conn); err != nil { return nil, err diff --git a/connector/relay/metadata.go b/connector/relay/metadata.go index 8cf22af..460eb1b 100644 --- a/connector/relay/metadata.go +++ b/connector/relay/metadata.go @@ -5,15 +5,12 @@ import ( mdata "github.com/go-gost/core/metadata" mdutil "github.com/go-gost/core/metadata/util" - "github.com/go-gost/relay" "github.com/go-gost/x/internal/util/mux" - "github.com/google/uuid" ) type metadata struct { connectTimeout time.Duration noDelay bool - tunnelID relay.TunnelID muxCfg *mux.Config } @@ -26,14 +23,6 @@ func (c *relayConnector) parseMetadata(md mdata.Metadata) (err error) { c.md.connectTimeout = mdutil.GetDuration(md, connectTimeout) c.md.noDelay = mdutil.GetBool(md, noDelay) - if s := mdutil.GetString(md, "tunnelID", "tunnel.id"); s != "" { - uuid, err := uuid.Parse(s) - if err != nil { - return err - } - c.md.tunnelID = relay.NewTunnelID(uuid[:]) - } - c.md.muxCfg = &mux.Config{ Version: mdutil.GetInt(md, "mux.version"), KeepAliveInterval: mdutil.GetDuration(md, "mux.keepaliveInterval"), diff --git a/handler/relay/bind.go b/handler/relay/bind.go index 26016e9..459daa1 100644 --- a/handler/relay/bind.go +++ b/handler/relay/bind.go @@ -2,8 +2,6 @@ package relay import ( "context" - "crypto/md5" - "encoding/hex" "fmt" "net" "time" @@ -17,7 +15,6 @@ import ( relay_util "github.com/go-gost/x/internal/util/relay" metrics "github.com/go-gost/x/metrics/wrapper" xservice "github.com/go-gost/x/service" - "github.com/google/uuid" ) func (h *relayHandler) handleBind(ctx context.Context, conn net.Conn, network, address string, log logger.Logger) error { @@ -182,54 +179,3 @@ func (h *relayHandler) bindUDP(ctx context.Context, conn net.Conn, network, addr }).Debugf("%s >-< %s", conn.RemoteAddr(), pc.LocalAddr()) return nil } - -func (h *relayHandler) handleBindTunnel(ctx context.Context, conn net.Conn, network, address string, tunnelID relay.TunnelID, log logger.Logger) (err error) { - resp := relay.Response{ - Version: relay.Version1, - Status: relay.StatusOK, - } - - uuid, err := uuid.NewRandom() - if err != nil { - resp.Status = relay.StatusInternalServerError - resp.WriteTo(conn) - return - } - connectorID := relay.NewConnectorID(uuid[:]) - if network == "udp" { - connectorID = relay.NewUDPConnectorID(uuid[:]) - } - - addr := address - if host, port, _ := net.SplitHostPort(addr); host == "" { - v := md5.Sum([]byte(tunnelID.String())) - host = hex.EncodeToString(v[:8]) - addr = net.JoinHostPort(host, port) - } - af := &relay.AddrFeature{} - err = af.ParseFrom(addr) - if err != nil { - log.Warn(err) - } - resp.Features = append(resp.Features, af, - &relay.TunnelFeature{ - ID: connectorID.ID(), - }, - ) - resp.WriteTo(conn) - - // Upgrade connection to multiplex session. - session, err := mux.ClientSession(conn, h.md.muxCfg) - if err != nil { - return - } - - h.pool.Add(tunnelID, NewConnector(connectorID, session)) - if h.md.ingress != nil { - h.md.ingress.Set(ctx, addr, tunnelID.String()) - } - - log.Debugf("%s/%s: tunnel=%s, connector=%s established", address, network, tunnelID, connectorID) - - return -} diff --git a/handler/relay/connect.go b/handler/relay/connect.go index 18dcae9..c67bd89 100644 --- a/handler/relay/connect.go +++ b/handler/relay/connect.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "net" - "strconv" "time" "github.com/go-gost/core/logger" @@ -113,99 +112,3 @@ func (h *relayHandler) handleConnect(ctx context.Context, conn net.Conn, network return nil } - -func (h *relayHandler) handleConnectTunnel(ctx context.Context, conn net.Conn, network, address string, tunnelID relay.TunnelID, log logger.Logger) error { - log = log.WithFields(map[string]any{ - "dst": fmt.Sprintf("%s/%s", address, network), - "cmd": "connect", - "tunnel": tunnelID.String(), - }) - - log.Debugf("%s >> %s/%s", conn.RemoteAddr(), address, network) - - resp := relay.Response{ - Version: relay.Version1, - Status: relay.StatusOK, - } - - host, sp, _ := net.SplitHostPort(address) - - if h.options.Bypass != nil && h.options.Bypass.Contains(ctx, network, address) { - log.Debug("bypass: ", address) - resp.Status = relay.StatusForbidden - _, err := resp.WriteTo(conn) - return err - } - - var tid relay.TunnelID - if ingress := h.md.ingress; ingress != nil { - tid = parseTunnelID(ingress.Get(ctx, host)) - } - - if !tid.Equal(tunnelID) && !h.md.directTunnel { - resp.Status = relay.StatusHostUnreachable - resp.WriteTo(conn) - err := fmt.Errorf("no route to host %s", host) - log.Error(err) - return err - } - - cc, _, err := getTunnelConn(network, h.pool, tunnelID, 3, log) - if err != nil { - resp.Status = relay.StatusServiceUnavailable - resp.WriteTo(conn) - log.Error(err) - return err - } - defer cc.Close() - - log.Debugf("%s >> %s", conn.RemoteAddr(), cc.RemoteAddr()) - - if h.md.noDelay { - if _, err := resp.WriteTo(conn); err != nil { - log.Error(err) - return err - } - } else { - rc := &tcpConn{ - Conn: conn, - } - // cache the header - if _, err := resp.WriteTo(&rc.wbuf); err != nil { - return err - } - conn = rc - } - - var features []relay.Feature - af := &relay.AddrFeature{} // source/visitor address - af.ParseFrom(conn.RemoteAddr().String()) - features = append(features, af) - - if host != "" { - port, _ := strconv.Atoi(sp) - // target host - af = &relay.AddrFeature{ - AType: relay.AddrDomain, - Host: host, - Port: uint16(port), - } - features = append(features, af) - } - - resp = relay.Response{ - Version: relay.Version1, - Status: relay.StatusOK, - Features: features, - } - resp.WriteTo(cc) - - t := time.Now() - log.Debugf("%s <-> %s", conn.RemoteAddr(), cc.RemoteAddr()) - xnet.Transport(conn, cc) - log.WithFields(map[string]any{ - "duration": time.Since(t), - }).Debugf("%s >-< %s", conn.RemoteAddr(), cc.RemoteAddr()) - - return nil -} diff --git a/handler/relay/entrypoint.go b/handler/relay/entrypoint.go index 839f7c6..1c3030a 100644 --- a/handler/relay/entrypoint.go +++ b/handler/relay/entrypoint.go @@ -1,25 +1,17 @@ package relay import ( - "bufio" "context" - "fmt" - "io" "net" - "net/http" - "net/http/httputil" "time" "github.com/go-gost/core/handler" - "github.com/go-gost/core/ingress" "github.com/go-gost/core/listener" - "github.com/go-gost/core/logger" md "github.com/go-gost/core/metadata" "github.com/go-gost/relay" admission "github.com/go-gost/x/admission/wrapper" xnet "github.com/go-gost/x/internal/net" "github.com/go-gost/x/internal/net/proxyproto" - "github.com/go-gost/x/internal/util/forward" "github.com/go-gost/x/internal/util/mux" climiter "github.com/go-gost/x/limiter/conn/wrapper" limiter "github.com/go-gost/x/limiter/traffic/wrapper" @@ -130,203 +122,3 @@ func (h *tcpHandler) Handle(ctx context.Context, conn net.Conn, opts ...handler. Debugf("%s >-< %s", conn.RemoteAddr(), cc.RemoteAddr()) return nil } - -type tunnelHandler struct { - pool *ConnectorPool - ingress ingress.Ingress - options handler.Options -} - -func newTunnelHandler(pool *ConnectorPool, ingress ingress.Ingress, opts ...handler.Option) handler.Handler { - options := handler.Options{} - for _, opt := range opts { - opt(&options) - } - - return &tunnelHandler{ - pool: pool, - ingress: ingress, - options: options, - } -} - -func (h *tunnelHandler) Init(md md.Metadata) (err error) { - return -} - -func (h *tunnelHandler) Handle(ctx context.Context, conn net.Conn, opts ...handler.HandleOption) error { - defer conn.Close() - - start := time.Now() - log := h.options.Logger.WithFields(map[string]any{ - "remote": conn.RemoteAddr().String(), - "local": conn.LocalAddr().String(), - }) - - log.Infof("%s <> %s", conn.RemoteAddr(), conn.LocalAddr()) - defer func() { - log.WithFields(map[string]any{ - "duration": time.Since(start), - }).Infof("%s >< %s", conn.RemoteAddr(), conn.LocalAddr()) - }() - - var rw io.ReadWriter = conn - var host string - var protocol string - rw, host, protocol, _ = forward.Sniffing(ctx, conn) - h.options.Logger.Debugf("sniffing: host=%s, protocol=%s", host, protocol) - - if protocol == forward.ProtoHTTP { - return h.handleHTTP(ctx, conn.RemoteAddr(), rw, log) - } - - var tunnelID relay.TunnelID - if h.ingress != nil { - tunnelID = parseTunnelID(h.ingress.Get(ctx, host)) - } - if tunnelID.IsZero() { - err := fmt.Errorf("no route to host %s", host) - log.Error(err) - return err - } - if tunnelID.IsPrivate() { - err := fmt.Errorf("access denied: tunnel %s is private for host %s", tunnelID, host) - log.Error(err) - return err - } - log = log.WithFields(map[string]any{ - "tunnel": tunnelID.String(), - }) - - cc, _, err := getTunnelConn("tcp", h.pool, tunnelID, 3, log) - if err != nil { - log.Error(err) - return err - } - defer cc.Close() - - log.Debugf("%s >> %s", conn.RemoteAddr(), cc.RemoteAddr()) - - var features []relay.Feature - af := &relay.AddrFeature{} - af.ParseFrom(conn.RemoteAddr().String()) // client address - features = append(features, af) - - if host != "" { - // target host - af := &relay.AddrFeature{} - af.ParseFrom(host) - features = append(features, af) - } - - resp := relay.Response{ - Version: relay.Version1, - Status: relay.StatusOK, - Features: features, - } - resp.WriteTo(cc) - - t := time.Now() - log.Debugf("%s <-> %s", conn.RemoteAddr(), cc.RemoteAddr()) - xnet.Transport(rw, cc) - log.WithFields(map[string]any{ - "duration": time.Since(t), - }).Debugf("%s >-< %s", conn.RemoteAddr(), cc.RemoteAddr()) - - return nil -} - -func (h *tunnelHandler) handleHTTP(ctx context.Context, raddr net.Addr, rw io.ReadWriter, log logger.Logger) (err error) { - br := bufio.NewReader(rw) - - for { - resp := &http.Response{ - ProtoMajor: 1, - ProtoMinor: 1, - Header: http.Header{}, - StatusCode: http.StatusServiceUnavailable, - } - - err = func() error { - req, err := http.ReadRequest(br) - if err != nil { - return err - } - - var tunnelID relay.TunnelID - if h.ingress != nil { - tunnelID = parseTunnelID(h.ingress.Get(ctx, req.Host)) - } - if tunnelID.IsZero() { - err := fmt.Errorf("no route to host %s", req.Host) - log.Error(err) - resp.StatusCode = http.StatusBadGateway - return resp.Write(rw) - } - if tunnelID.IsPrivate() { - err := fmt.Errorf("access denied: tunnel %s is private for host %s", tunnelID, req.Host) - log.Error(err) - resp.StatusCode = http.StatusBadGateway - return resp.Write(rw) - } - - log = log.WithFields(map[string]any{ - "host": req.Host, - "tunnel": tunnelID.String(), - }) - - cc, cid, err := getTunnelConn("tcp", h.pool, tunnelID, 3, log) - if err != nil { - log.Error(err) - return resp.Write(rw) - } - defer cc.Close() - - log.Debugf("new connection to tunnel %s(connector %s)", tunnelID, cid) - - var features []relay.Feature - af := &relay.AddrFeature{} - af.ParseFrom(raddr.String()) - features = append(features, af) - - if host := req.Host; host != "" { - if h, _, _ := net.SplitHostPort(host); h == "" { - host = net.JoinHostPort(host, "80") - } - af := &relay.AddrFeature{} - af.ParseFrom(host) - features = append(features, af) - } - - (&relay.Response{ - Version: relay.Version1, - Status: relay.StatusOK, - Features: features, - }).WriteTo(cc) - - if log.IsLevelEnabled(logger.TraceLevel) { - dump, _ := httputil.DumpRequest(req, false) - log.Trace(string(dump)) - } - if err := req.Write(cc); err != nil { - log.Warnf("send request to tunnel %s: %v", tunnelID, err) - return resp.Write(rw) - } - - res, err := http.ReadResponse(bufio.NewReader(cc), req) - if err != nil { - log.Warnf("read response from tunnel %s: %v", tunnelID, err) - return resp.Write(rw) - } - defer res.Body.Close() - - return res.Write(rw) - }() - if err != nil { - // log.Error(err) - break - } - } - - return -} diff --git a/handler/relay/handler.go b/handler/relay/handler.go index c42e6e5..5047dad 100644 --- a/handler/relay/handler.go +++ b/handler/relay/handler.go @@ -3,7 +3,6 @@ package relay import ( "context" "errors" - "fmt" "net" "strconv" "time" @@ -11,14 +10,11 @@ import ( "github.com/go-gost/core/chain" "github.com/go-gost/core/handler" "github.com/go-gost/core/hop" - "github.com/go-gost/core/listener" md "github.com/go-gost/core/metadata" "github.com/go-gost/core/service" "github.com/go-gost/relay" - xnet "github.com/go-gost/x/internal/net" auth_util "github.com/go-gost/x/internal/util/auth" "github.com/go-gost/x/registry" - xservice "github.com/go-gost/x/service" ) var ( @@ -38,7 +34,6 @@ type relayHandler struct { md metadata options handler.Options ep service.Service - pool *ConnectorPool } func NewHandler(opts ...handler.Option) handler.Handler { @@ -49,7 +44,6 @@ func NewHandler(opts ...handler.Option) handler.Handler { return &relayHandler{ options: options, - pool: NewConnectorPool(), } } @@ -63,68 +57,9 @@ func (h *relayHandler) Init(md md.Metadata) (err error) { h.router = chain.NewRouter(chain.LoggerRouterOption(h.options.Logger)) } - if err = h.initEntryPoint(); err != nil { - return - } return nil } -func (h *relayHandler) initEntryPoint() (err error) { - if h.md.entryPoint == "" { - return - } - - network := "tcp" - if xnet.IsIPv4(h.md.entryPoint) { - network = "tcp4" - } - - ln, err := net.Listen(network, h.md.entryPoint) - if err != nil { - h.options.Logger.Error(err) - return - } - - serviceName := fmt.Sprintf("%s-ep-%s", h.options.Service, ln.Addr()) - log := h.options.Logger.WithFields(map[string]any{ - "service": serviceName, - "listener": "tcp", - "handler": "ep-tunnel", - "kind": "service", - }) - epListener := newTCPListener(ln, - listener.AddrOption(h.md.entryPoint), - listener.ServiceOption(serviceName), - listener.ProxyProtocolOption(h.md.entryPointProxyProtocol), - listener.LoggerOption(log.WithFields(map[string]any{ - "kind": "listener", - })), - ) - if err = epListener.Init(nil); err != nil { - return - } - epHandler := newTunnelHandler( - h.pool, - h.md.ingress, - handler.ServiceOption(serviceName), - handler.LoggerOption(log.WithFields(map[string]any{ - "kind": "handler", - })), - ) - if err = epHandler.Init(nil); err != nil { - return - } - - h.ep = xservice.NewService( - serviceName, epListener, epHandler, - xservice.LoggerOption(log), - ) - go h.ep.Serve() - log.Infof("entrypoint: %s", h.ep.Addr()) - - return -} - // Forward implements handler.Forwarder. func (h *relayHandler) Forward(hop hop.Hop) { h.hop = hop @@ -179,7 +114,6 @@ func (h *relayHandler) Handle(ctx context.Context, conn net.Conn, opts ...handle var user, pass string var address string var networkID relay.NetworkID - var tunnelID relay.TunnelID for _, f := range req.Features { switch f.Type() { case relay.FeatureUserAuth: @@ -190,10 +124,6 @@ func (h *relayHandler) Handle(ctx context.Context, conn net.Conn, opts ...handle if feature, _ := f.(*relay.AddrFeature); feature != nil { address = net.JoinHostPort(feature.Host, strconv.Itoa(int(feature.Port))) } - case relay.FeatureTunnel: - if feature, _ := f.(*relay.TunnelFeature); feature != nil { - tunnelID = relay.NewTunnelID(feature.ID[:]) - } case relay.FeatureNetwork: if feature, _ := f.(*relay.NetworkFeature); feature != nil { networkID = feature.Network @@ -230,16 +160,10 @@ func (h *relayHandler) Handle(ctx context.Context, conn net.Conn, opts ...handle case 0, relay.CmdConnect: defer conn.Close() - if !tunnelID.IsZero() { - return h.handleConnectTunnel(ctx, conn, network, address, tunnelID, log) - } return h.handleConnect(ctx, conn, network, address, log) case relay.CmdBind: - if !tunnelID.IsZero() { - return h.handleBindTunnel(ctx, conn, network, address, tunnelID, log) - } - defer conn.Close() + return h.handleBind(ctx, conn, network, address, log) default: resp.Status = relay.StatusBadRequest diff --git a/handler/relay/metadata.go b/handler/relay/metadata.go index 0b526cf..06479c0 100644 --- a/handler/relay/metadata.go +++ b/handler/relay/metadata.go @@ -2,41 +2,29 @@ package relay import ( "math" - "strings" "time" - "github.com/go-gost/core/ingress" - "github.com/go-gost/core/logger" mdata "github.com/go-gost/core/metadata" mdutil "github.com/go-gost/core/metadata/util" - xingress "github.com/go-gost/x/ingress" "github.com/go-gost/x/internal/util/mux" - "github.com/go-gost/x/registry" ) type metadata struct { - readTimeout time.Duration - enableBind bool - udpBufferSize int - noDelay bool - hash string - directTunnel bool - entryPoint string - entryPointProxyProtocol int - ingress ingress.Ingress - muxCfg *mux.Config + readTimeout time.Duration + enableBind bool + udpBufferSize int + noDelay bool + hash string + muxCfg *mux.Config } func (h *relayHandler) parseMetadata(md mdata.Metadata) (err error) { const ( - readTimeout = "readTimeout" - enableBind = "bind" - udpBufferSize = "udpBufferSize" - noDelay = "nodelay" - hash = "hash" - entryPoint = "entryPoint" - entryPointID = "entryPoint.id" - entryPointProxyProtocol = "entryPoint.proxyProtocol" + readTimeout = "readTimeout" + enableBind = "bind" + udpBufferSize = "udpBufferSize" + noDelay = "nodelay" + hash = "hash" ) h.md.readTimeout = mdutil.GetDuration(md, readTimeout) @@ -51,33 +39,6 @@ func (h *relayHandler) parseMetadata(md mdata.Metadata) (err error) { h.md.hash = mdutil.GetString(md, hash) - h.md.directTunnel = mdutil.GetBool(md, "tunnel.direct") - h.md.entryPoint = mdutil.GetString(md, entryPoint) - h.md.entryPointProxyProtocol = mdutil.GetInt(md, entryPointProxyProtocol) - - h.md.ingress = registry.IngressRegistry().Get(mdutil.GetString(md, "ingress")) - if h.md.ingress == nil { - var rules []xingress.Rule - for _, s := range strings.Split(mdutil.GetString(md, "tunnel"), ",") { - ss := strings.SplitN(s, ":", 2) - if len(ss) != 2 { - continue - } - rules = append(rules, xingress.Rule{ - Hostname: ss[0], - Endpoint: ss[1], - }) - } - if len(rules) > 0 { - h.md.ingress = xingress.NewIngress( - xingress.RulesOption(rules), - xingress.LoggerOption(logger.Default().WithFields(map[string]any{ - "kind": "ingress", - })), - ) - } - } - h.md.muxCfg = &mux.Config{ Version: mdutil.GetInt(md, "mux.version"), KeepAliveInterval: mdutil.GetDuration(md, "mux.keepaliveInterval"), diff --git a/handler/relay/tunnel.go b/handler/relay/tunnel.go deleted file mode 100644 index 98e9a05..0000000 --- a/handler/relay/tunnel.go +++ /dev/null @@ -1,200 +0,0 @@ -package relay - -import ( - "fmt" - "net" - "sync" - "sync/atomic" - "time" - - "github.com/go-gost/core/logger" - "github.com/go-gost/relay" - "github.com/go-gost/x/internal/util/mux" - "github.com/google/uuid" -) - -type Connector struct { - id relay.ConnectorID - t time.Time - s *mux.Session -} - -func NewConnector(id relay.ConnectorID, s *mux.Session) *Connector { - c := &Connector{ - id: id, - t: time.Now(), - s: s, - } - go c.accept() - return c -} - -func (c *Connector) accept() { - for { - conn, err := c.s.Accept() - if err != nil { - logger.Default().Errorf("connector %s: %v", c.id, err) - c.s.Close() - return - } - conn.Close() - } -} - -func (c *Connector) ID() relay.ConnectorID { - return c.id -} - -func (c *Connector) Session() *mux.Session { - return c.s -} - -type Tunnel struct { - id relay.TunnelID - connectors []*Connector - t time.Time - n uint64 - mu sync.RWMutex -} - -func NewTunnel(id relay.TunnelID) *Tunnel { - t := &Tunnel{ - id: id, - t: time.Now(), - } - go t.clean() - return t -} - -func (t *Tunnel) ID() relay.TunnelID { - return t.id -} - -func (t *Tunnel) AddConnector(c *Connector) { - if c == nil { - return - } - - t.mu.Lock() - defer t.mu.Unlock() - - t.connectors = append(t.connectors, c) -} - -func (t *Tunnel) GetConnector(network string) *Connector { - t.mu.RLock() - defer t.mu.RUnlock() - - var connectors []*Connector - for _, c := range t.connectors { - if network == "udp" && c.id.IsUDP() || - network != "udp" && !c.id.IsUDP() { - connectors = append(connectors, c) - } - } - if len(connectors) == 0 { - return nil - } - n := atomic.AddUint64(&t.n, 1) - 1 - return connectors[n%uint64(len(connectors))] -} - -func (t *Tunnel) clean() { - ticker := time.NewTicker(30 * time.Second) - for range ticker.C { - t.mu.Lock() - var connectors []*Connector - for _, c := range t.connectors { - if c.Session().IsClosed() { - logger.Default().Debugf("remove tunnel %s connector %s", t.id, c.id) - continue - } - connectors = append(connectors, c) - } - if len(connectors) != len(t.connectors) { - t.connectors = connectors - } - t.mu.Unlock() - } -} - -type ConnectorPool struct { - tunnels map[string]*Tunnel - mu sync.RWMutex -} - -func NewConnectorPool() *ConnectorPool { - return &ConnectorPool{ - tunnels: make(map[string]*Tunnel), - } -} - -func (p *ConnectorPool) Add(tid relay.TunnelID, c *Connector) { - p.mu.Lock() - defer p.mu.Unlock() - - s := tid.String() - - t := p.tunnels[s] - if t == nil { - t = NewTunnel(tid) - p.tunnels[s] = t - } - t.AddConnector(c) -} - -func (p *ConnectorPool) Get(network string, tid relay.TunnelID) *Connector { - if p == nil { - return nil - } - - p.mu.RLock() - defer p.mu.RUnlock() - - t := p.tunnels[tid.String()] - if t == nil { - return nil - } - - return t.GetConnector(network) -} - -func parseTunnelID(s string) (tid relay.TunnelID) { - if s == "" { - return - } - private := false - if s[0] == '$' { - private = true - s = s[1:] - } - uuid, _ := uuid.Parse(s) - - if private { - return relay.NewPrivateTunnelID(uuid[:]) - } - return relay.NewTunnelID(uuid[:]) -} - -func getTunnelConn(network string, pool *ConnectorPool, tid relay.TunnelID, retry int, log logger.Logger) (conn net.Conn, cid relay.ConnectorID, err error) { - if retry <= 0 { - retry = 1 - } - for i := 0; i < retry; i++ { - c := pool.Get(network, tid) - if c == nil { - err = fmt.Errorf("tunnel %s not available", tid.String()) - break - } - - conn, err = c.Session().GetConn() - if err != nil { - log.Error(err) - continue - } - cid = c.id - break - } - - return -} diff --git a/handler/tun/client.go b/handler/tun/client.go index b3ece0c..a02bb7c 100644 --- a/handler/tun/client.go +++ b/handler/tun/client.go @@ -44,7 +44,7 @@ func (h *tunHandler) handleClient(ctx context.Context, conn net.Conn, raddr stri ctx, cancel := context.WithCancel(ctx) defer cancel() - go h.keepAlive(ctx, cc, ips) + go h.keepalive(ctx, cc, ips) return h.transportClient(conn, cc, log) }() @@ -57,7 +57,7 @@ func (h *tunHandler) handleClient(ctx context.Context, conn net.Conn, raddr stri } } -func (h *tunHandler) keepAlive(ctx context.Context, conn net.Conn, ips []net.IP) { +func (h *tunHandler) keepalive(ctx context.Context, conn net.Conn, ips []net.IP) { // handshake keepAliveData := bufpool.Get(keepAliveHeaderLength + len(ips)*net.IPv6len) defer bufpool.Put(keepAliveData)