From 416405b1f08361803e4b5b51289670f51c78fa2a Mon Sep 17 00:00:00 2001 From: ginuerzh Date: Thu, 2 Feb 2023 19:18:10 +0800 Subject: [PATCH] add UDP support for reverse proxy tunnel --- admission/wrapper/conn.go | 7 +++ connector/relay/bind.go | 23 +++++---- connector/relay/conn.go | 78 +++++++++++++++++++++++++++++-- connector/relay/connector.go | 2 +- connector/relay/listener.go | 30 ++++++++---- connector/ss/udp/metadata.go | 2 +- handler/forward/local/handler.go | 36 ++++++++------ handler/forward/remote/handler.go | 22 ++++++--- handler/relay/bind.go | 19 +++----- handler/relay/conn.go | 3 -- handler/relay/connect.go | 2 +- handler/relay/entrypoint.go | 2 +- handler/relay/handler.go | 4 +- handler/relay/metadata.go | 2 +- handler/relay/tunnel.go | 27 +++++++---- handler/socks/v5/metadata.go | 2 +- handler/ss/udp/metadata.go | 2 +- handler/tap/metadata.go | 2 +- internal/net/transport.go | 10 ++-- internal/net/udp/relay.go | 2 +- internal/util/icmp/conn.go | 4 +- limiter/traffic/wrapper/conn.go | 7 +++ listener/ftcp/metadata.go | 4 +- listener/redirect/udp/metadata.go | 2 +- listener/rtcp/listener.go | 2 +- listener/rudp/listener.go | 2 +- listener/rudp/metadata.go | 4 +- listener/udp/metadata.go | 4 +- metrics/wrapper/conn.go | 7 +++ 29 files changed, 226 insertions(+), 87 deletions(-) diff --git a/admission/wrapper/conn.go b/admission/wrapper/conn.go index b7fe2f6..7d35bb3 100644 --- a/admission/wrapper/conn.go +++ b/admission/wrapper/conn.go @@ -232,3 +232,10 @@ func (c *udpConn) SetDSCP(n int) error { } return nil } + +func (c *udpConn) Metadata() metadata.Metadata { + if md, ok := c.PacketConn.(metadata.Metadatable); ok { + return md.Metadata() + } + return nil +} diff --git a/connector/relay/bind.go b/connector/relay/bind.go index 1980a53..2cc5751 100644 --- a/connector/relay/bind.go +++ b/connector/relay/bind.go @@ -17,7 +17,7 @@ import ( // Bind implements connector.Binder. func (c *relayConnector) Bind(ctx context.Context, conn net.Conn, network, address string, opts ...connector.BindOption) (net.Listener, error) { if !c.md.tunnelID.IsZero() { - return c.tunnel(ctx, conn, c.options.Logger) + return c.bindTunnel(ctx, conn, network, c.options.Logger) } log := c.options.Logger.WithFields(map[string]any{ @@ -43,31 +43,36 @@ func (c *relayConnector) Bind(ctx context.Context, conn net.Conn, network, addre } } -func (c *relayConnector) tunnel(ctx context.Context, conn net.Conn, log logger.Logger) (net.Listener, error) { - addr, cid, err := c.initTunnel(conn) +func (c *relayConnector) bindTunnel(ctx context.Context, conn net.Conn, network string, log logger.Logger) (net.Listener, error) { + addr, cid, err := c.initTunnel(conn, network) if err != nil { return nil, err } - log.Debugf("create tunnel %s connector %s OK", c.md.tunnelID.String(), cid) + log.Debugf("create tunnel %s connector %s/%s OK", c.md.tunnelID.String(), cid, network) session, err := mux.ServerSession(conn) if err != nil { return nil, err } - return &tcpListener{ + return &bindListener{ + network: network, addr: addr, session: session, logger: log, }, nil } -func (c *relayConnector) initTunnel(conn net.Conn) (addr net.Addr, cid relay.ConnectorID, err error) { +func (c *relayConnector) initTunnel(conn net.Conn, network string) (addr net.Addr, cid relay.ConnectorID, err error) { req := relay.Request{ Version: relay.Version1, Cmd: relay.CmdBind, } + if network == "udp" { + req.Cmd |= relay.FUDP + } + if c.options.Auth != nil { pwd, _ := c.options.Auth.Password() req.Features = append(req.Features, &relay.UserAuthFeature{ @@ -77,7 +82,7 @@ func (c *relayConnector) initTunnel(conn net.Conn) (addr net.Addr, cid relay.Con } req.Features = append(req.Features, &relay.TunnelFeature{ - ID: c.md.tunnelID, + ID: c.md.tunnelID.ID(), }) if _, err = req.WriteTo(conn); err != nil { return @@ -102,7 +107,7 @@ func (c *relayConnector) initTunnel(conn net.Conn) (addr net.Addr, cid relay.Con } case relay.FeatureTunnel: if feature, _ := f.(*relay.TunnelFeature); feature != nil { - cid = feature.ID + cid = relay.NewConnectorID(feature.ID[:]) } } } @@ -122,7 +127,7 @@ func (c *relayConnector) bindTCP(ctx context.Context, conn net.Conn, network, ad return nil, err } - return &tcpListener{ + return &bindListener{ addr: laddr, session: session, logger: log, diff --git a/connector/relay/conn.go b/connector/relay/conn.go index a2a6171..7a42e37 100644 --- a/connector/relay/conn.go +++ b/connector/relay/conn.go @@ -10,6 +10,7 @@ import ( "net" "sync" + "github.com/go-gost/core/common/bufpool" mdata "github.com/go-gost/core/metadata" "github.com/go-gost/relay" ) @@ -67,9 +68,11 @@ func (c *udpConn) Read(b []byte) (n int, err error) { if len(b) >= dlen { return io.ReadFull(c.Conn, b[:dlen]) } - buf := make([]byte, dlen) - _, err = io.ReadFull(c.Conn, buf) - n = copy(b, buf) + + buf := bufpool.Get(dlen) + defer bufpool.Put(buf) + _, err = io.ReadFull(c.Conn, *buf) + n = copy(b, *buf) return } @@ -137,3 +140,72 @@ func (c *bindConn) RemoteAddr() net.Addr { func (c *bindConn) Metadata() mdata.Metadata { return c.md } + +type bindUDPConn struct { + net.Conn + localAddr net.Addr + remoteAddr net.Addr + md mdata.Metadata +} + +func (c *bindUDPConn) Read(b []byte) (n int, err error) { + // 2-byte data length header + var bh [2]byte + _, err = io.ReadFull(c.Conn, bh[:]) + if err != nil { + return + } + + dlen := int(binary.BigEndian.Uint16(bh[:])) + if len(b) >= dlen { + n, err = io.ReadFull(c.Conn, b[:dlen]) + return + } + + buf := bufpool.Get(dlen) + defer bufpool.Put(buf) + + _, err = io.ReadFull(c.Conn, *buf) + n = copy(b, *buf) + + return +} + +func (c *bindUDPConn) Write(b []byte) (n int, err error) { + if len(b) > math.MaxUint16 { + err = errors.New("write: data maximum exceeded") + return + } + + // 2-byte data length header + var bh [2]byte + binary.BigEndian.PutUint16(bh[:], uint16(len(b))) + _, err = c.Conn.Write(bh[:]) + if err != nil { + return + } + return c.Conn.Write(b) +} + +func (c *bindUDPConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) { + addr = c.remoteAddr + n, err = c.Read(b) + return +} + +func (c *bindUDPConn) WriteTo(b []byte, addr net.Addr) (n int, err error) { + return c.Write(b) +} + +func (c *bindUDPConn) LocalAddr() net.Addr { + return c.localAddr +} + +func (c *bindUDPConn) RemoteAddr() net.Addr { + return c.remoteAddr +} + +// Metadata implements metadata.Metadatable interface. +func (c *bindUDPConn) Metadata() mdata.Metadata { + return c.md +} diff --git a/connector/relay/connector.go b/connector/relay/connector.go index f752d5c..10a9130 100644 --- a/connector/relay/connector.go +++ b/connector/relay/connector.go @@ -88,7 +88,7 @@ func (c *relayConnector) Connect(ctx context.Context, conn net.Conn, network, ad if !c.md.tunnelID.IsZero() { req.Features = append(req.Features, &relay.TunnelFeature{ - ID: c.md.tunnelID, + ID: c.md.tunnelID.ID(), }) } diff --git a/connector/relay/listener.go b/connector/relay/listener.go index f47525f..3c6f8a6 100644 --- a/connector/relay/listener.go +++ b/connector/relay/listener.go @@ -6,18 +6,20 @@ import ( "strconv" "github.com/go-gost/core/logger" + mdata "github.com/go-gost/core/metadata" "github.com/go-gost/relay" "github.com/go-gost/x/internal/util/mux" mdx "github.com/go-gost/x/metadata" ) -type tcpListener struct { +type bindListener struct { + network string addr net.Addr session *mux.Session logger logger.Logger } -func (p *tcpListener) Accept() (net.Conn, error) { +func (p *bindListener) Accept() (net.Conn, error) { cc, err := p.session.Accept() if err != nil { return nil, err @@ -33,7 +35,7 @@ func (p *tcpListener) Accept() (net.Conn, error) { return conn, nil } -func (p *tcpListener) getPeerConn(conn net.Conn) (net.Conn, error) { +func (p *bindListener) getPeerConn(conn net.Conn) (net.Conn, error) { // second reply, peer connected resp := relay.Response{} if _, err := resp.ReadFrom(conn); err != nil { @@ -64,21 +66,33 @@ func (p *tcpListener) getPeerConn(conn net.Conn) (net.Conn, error) { return nil, err } + var md mdata.Metadata + if host != "" { + md = mdx.NewMetadata(map[string]any{"host": host}) + } + + if p.network == "udp" { + return &bindUDPConn{ + Conn: conn, + localAddr: p.addr, + remoteAddr: raddr, + md: md, + }, nil + } + cn := &bindConn{ Conn: conn, localAddr: p.addr, remoteAddr: raddr, - } - if host != "" { - cn.md = mdx.NewMetadata(map[string]any{"host": host}) + md: md, } return cn, nil } -func (p *tcpListener) Addr() net.Addr { +func (p *bindListener) Addr() net.Addr { return p.addr } -func (p *tcpListener) Close() error { +func (p *bindListener) Close() error { return p.session.Close() } diff --git a/connector/ss/udp/metadata.go b/connector/ss/udp/metadata.go index 5ceed75..8828edf 100644 --- a/connector/ss/udp/metadata.go +++ b/connector/ss/udp/metadata.go @@ -27,7 +27,7 @@ func (c *ssuConnector) parseMetadata(md mdata.Metadata) (err error) { if bs := mdutil.GetInt(md, bufferSize); bs > 0 { c.md.bufferSize = int(math.Min(math.Max(float64(bs), 512), 64*1024)) } else { - c.md.bufferSize = 1500 + c.md.bufferSize = 4096 } return diff --git a/handler/forward/local/handler.go b/handler/forward/local/handler.go index 7d7c4d7..4958e42 100644 --- a/handler/forward/local/handler.go +++ b/handler/forward/local/handler.go @@ -92,11 +92,9 @@ func (h *forwardHandler) Handle(ctx context.Context, conn net.Conn, opts ...hand var rw io.ReadWriter = conn var host string var protocol string - if h.md.sniffing { - if network == "tcp" { - rw, host, protocol, _ = forward.Sniffing(ctx, conn) - log.Debugf("sniffing: host=%s, protocol=%s", host, protocol) - } + if network == "tcp" && h.md.sniffing { + rw, host, protocol, _ = forward.Sniffing(ctx, conn) + log.Debugf("sniffing: host=%s, protocol=%s", host, protocol) } if protocol == forward.ProtoHTTP { @@ -107,19 +105,23 @@ func (h *forwardHandler) Handle(ctx context.Context, conn net.Conn, opts ...hand if _, _, err := net.SplitHostPort(host); err != nil { host = net.JoinHostPort(host, "0") } - target := &chain.Node{ - Addr: host, + + var target *chain.Node + if host != "" { + target = &chain.Node{ + Addr: host, + } } if h.hop != nil { target = h.hop.Select(ctx, chain.HostSelectOption(host), chain.ProtocolSelectOption(protocol), ) - if target == nil { - err := errors.New("target not available") - log.Error(err) - return err - } + } + if target == nil { + err := errors.New("target not available") + log.Error(err) + return err } log = log.WithFields(map[string]any{ @@ -172,7 +174,9 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, log l return err } - var target *chain.Node + target := &chain.Node{ + Addr: req.Host, + } if h.hop != nil { target = h.hop.Select(ctx, chain.HostSelectOption(req.Host), @@ -235,6 +239,9 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, log l go func() { defer cc.Close() err := xnet.CopyBuffer(rw, cc, 8192) + if err != nil { + resp.Write(rw) + } log.Debugf("close connection to node %s(%s), reason: %v", target.Name, target.Addr, err) connPool.Delete(target) }() @@ -266,9 +273,12 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, log l return err } + // cc.SetReadDeadline(time.Now().Add(10 * time.Second)) + return nil }() if err != nil { + // log.Error(err) break } } diff --git a/handler/forward/remote/handler.go b/handler/forward/remote/handler.go index 8f33705..5eb577a 100644 --- a/handler/forward/remote/handler.go +++ b/handler/forward/remote/handler.go @@ -92,11 +92,9 @@ func (h *forwardHandler) Handle(ctx context.Context, conn net.Conn, opts ...hand var rw io.ReadWriter = conn var host string var protocol string - if h.md.sniffing { - if network == "tcp" { - rw, host, protocol, _ = forward.Sniffing(ctx, conn) - log.Debugf("sniffing: host=%s, protocol=%s", host, protocol) - } + if network == "tcp" && h.md.sniffing { + rw, host, protocol, _ = forward.Sniffing(ctx, conn) + log.Debugf("sniffing: host=%s, protocol=%s", host, protocol) } if protocol == forward.ProtoHTTP { h.handleHTTP(ctx, rw, log) @@ -109,6 +107,11 @@ func (h *forwardHandler) Handle(ctx context.Context, conn net.Conn, opts ...hand } } var target *chain.Node + if host != "" { + target = &chain.Node{ + Addr: host, + } + } if h.hop != nil { target = h.hop.Select(ctx, chain.HostSelectOption(host), @@ -172,7 +175,9 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, log l return err } - var target *chain.Node + target := &chain.Node{ + Addr: req.Host, + } if h.hop != nil { target = h.hop.Select(ctx, chain.HostSelectOption(req.Host), @@ -235,6 +240,9 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, log l go func() { defer cc.Close() err := xnet.CopyBuffer(rw, cc, 8192) + if err != nil { + resp.Write(rw) + } log.Debugf("close connection to node %s(%s), reason: %v", target.Name, target.Addr, err) connPool.Delete(target) }() @@ -266,6 +274,8 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, log l return err } + // cc.SetReadDeadline(time.Now().Add(10 * time.Second)) + return nil }() if err != nil { diff --git a/handler/relay/bind.go b/handler/relay/bind.go index 24effee..4ef94a9 100644 --- a/handler/relay/bind.go +++ b/handler/relay/bind.go @@ -193,20 +193,12 @@ func (h *relayHandler) serveTCPBind(ctx context.Context, conn net.Conn, ln net.L } } -func (h *relayHandler) handleTunnel(ctx context.Context, conn net.Conn, tunnelID relay.TunnelID, log logger.Logger) (err error) { +func (h *relayHandler) handleBindTunnel(ctx context.Context, conn net.Conn, network string, tunnelID relay.TunnelID, log logger.Logger) (err error) { resp := relay.Response{ Version: relay.Version1, Status: relay.StatusOK, } - /* - if h.ep == nil { - resp.Status = relay.StatusServiceUnavailable - resp.WriteTo(conn) - return - } - */ - uuid, err := uuid.NewRandom() if err != nil { resp.Status = relay.StatusInternalServerError @@ -214,7 +206,10 @@ func (h *relayHandler) handleTunnel(ctx context.Context, conn net.Conn, tunnelID return } - connectorID := relay.NewTunnelID(uuid[:]) + connectorID := relay.NewConnectorID(uuid[:]) + if network == "udp" { + connectorID = relay.NewUDPConnectorID(uuid[:]) + } addr := ":0" if h.ep != nil { @@ -227,7 +222,7 @@ func (h *relayHandler) handleTunnel(ctx context.Context, conn net.Conn, tunnelID } resp.Features = append(resp.Features, af, &relay.TunnelFeature{ - ID: connectorID, + ID: connectorID.ID(), }, ) resp.WriteTo(conn) @@ -239,7 +234,7 @@ func (h *relayHandler) handleTunnel(ctx context.Context, conn net.Conn, tunnelID } h.pool.Add(tunnelID, NewConnector(connectorID, session)) - log.Debugf("tunnel %s connector %s established", tunnelID, connectorID) + log.Debugf("tunnel %s connector %s/%s established", tunnelID, connectorID, network) return } diff --git a/handler/relay/conn.go b/handler/relay/conn.go index 350ed0a..7c9a7b1 100644 --- a/handler/relay/conn.go +++ b/handler/relay/conn.go @@ -15,9 +15,6 @@ type tcpConn struct { } func (c *tcpConn) Read(b []byte) (n int, err error) { - if err != nil { - return - } return c.Conn.Read(b) } diff --git a/handler/relay/connect.go b/handler/relay/connect.go index fe2ac0c..90bb305 100644 --- a/handler/relay/connect.go +++ b/handler/relay/connect.go @@ -133,7 +133,7 @@ func (h *relayHandler) handleConnectTunnel(ctx context.Context, conn net.Conn, n return err } - cc, err := getTunnelConn(h.pool, tunnelID, 3, log) + cc, err := getTunnelConn(network, h.pool, tunnelID, 3, log) if err != nil { log.Error(err) return err diff --git a/handler/relay/entrypoint.go b/handler/relay/entrypoint.go index 6d4409e..e446435 100644 --- a/handler/relay/entrypoint.go +++ b/handler/relay/entrypoint.go @@ -128,7 +128,7 @@ func (h *epHandler) Handle(ctx context.Context, conn net.Conn, opts ...handler.H "tunnel": tunnelID.String(), }) - cc, err := getTunnelConn(h.pool, tunnelID, 3, log) + cc, err := getTunnelConn("tcp", h.pool, tunnelID, 3, log) if err != nil { log.Error(err) return err diff --git a/handler/relay/handler.go b/handler/relay/handler.go index a293bfc..760dd31 100644 --- a/handler/relay/handler.go +++ b/handler/relay/handler.go @@ -173,7 +173,7 @@ func (h *relayHandler) Handle(ctx context.Context, conn net.Conn, opts ...handle } case relay.FeatureTunnel: if feature, _ := f.(*relay.TunnelFeature); feature != nil { - tunnelID = feature.ID + tunnelID = relay.NewTunnelID(feature.ID[:]) } } } @@ -210,7 +210,7 @@ func (h *relayHandler) Handle(ctx context.Context, conn net.Conn, opts ...handle return h.handleConnect(ctx, conn, network, address, log) case relay.CmdBind: if !tunnelID.IsZero() { - return h.handleTunnel(ctx, conn, tunnelID, log) + return h.handleBindTunnel(ctx, conn, network, tunnelID, log) } defer conn.Close() diff --git a/handler/relay/metadata.go b/handler/relay/metadata.go index bc8de2c..0f526d5 100644 --- a/handler/relay/metadata.go +++ b/handler/relay/metadata.go @@ -38,7 +38,7 @@ func (h *relayHandler) parseMetadata(md mdata.Metadata) (err error) { if bs := mdutil.GetInt(md, udpBufferSize); bs > 0 { h.md.udpBufferSize = int(math.Min(math.Max(float64(bs), 512), 64*1024)) } else { - h.md.udpBufferSize = 1500 + h.md.udpBufferSize = 4096 } h.md.hash = mdutil.GetString(md, hash) diff --git a/handler/relay/tunnel.go b/handler/relay/tunnel.go index 3a37a15..b4849ec 100644 --- a/handler/relay/tunnel.go +++ b/handler/relay/tunnel.go @@ -81,16 +81,27 @@ func (t *Tunnel) AddConnector(c *Connector) { t.connectors = append(t.connectors, c) } -func (t *Tunnel) GetConnector() *Connector { +func (t *Tunnel) GetConnector(network string) *Connector { t.mu.RLock() defer t.mu.RUnlock() - if len(t.connectors) == 0 { + var connectors []*Connector + for _, c := range t.connectors { + if network == "udp" { + if c.id.IsUDP() { + connectors = append(connectors, c) + } + } else { + if !c.id.IsUDP() { + connectors = append(connectors, c) + } + } + } + if len(connectors) == 0 { return nil } - n := atomic.AddUint64(&t.n, 1) - 1 - return t.connectors[n%uint64(len(t.connectors))] + return connectors[n%uint64(len(connectors))] } func (t *Tunnel) clean() { @@ -137,7 +148,7 @@ func (p *ConnectorPool) Add(tid relay.TunnelID, c *Connector) { t.AddConnector(c) } -func (p *ConnectorPool) Get(tid relay.TunnelID) *Connector { +func (p *ConnectorPool) Get(network string, tid relay.TunnelID) *Connector { if p == nil { return nil } @@ -150,7 +161,7 @@ func (p *ConnectorPool) Get(tid relay.TunnelID) *Connector { return nil } - return t.GetConnector() + return t.GetConnector(network) } func parseTunnelID(s string) (tid relay.TunnelID) { @@ -170,12 +181,12 @@ func parseTunnelID(s string) (tid relay.TunnelID) { return relay.NewTunnelID(uuid[:]) } -func getTunnelConn(pool *ConnectorPool, tunnelID relay.TunnelID, retry int, log logger.Logger) (conn net.Conn, err error) { +func getTunnelConn(network string, pool *ConnectorPool, tunnelID relay.TunnelID, retry int, log logger.Logger) (conn net.Conn, err error) { if retry <= 0 { retry = 1 } for i := 0; i < retry; i++ { - c := pool.Get(tunnelID) + c := pool.Get(network, tunnelID) if c == nil { err = fmt.Errorf("tunnel %s not available", tunnelID.String()) break diff --git a/handler/socks/v5/metadata.go b/handler/socks/v5/metadata.go index 523cff9..27ca67e 100644 --- a/handler/socks/v5/metadata.go +++ b/handler/socks/v5/metadata.go @@ -37,7 +37,7 @@ func (h *socks5Handler) parseMetadata(md mdata.Metadata) (err error) { if bs := mdutil.GetInt(md, udpBufferSize); bs > 0 { h.md.udpBufferSize = int(math.Min(math.Max(float64(bs), 512), 64*1024)) } else { - h.md.udpBufferSize = 1500 + h.md.udpBufferSize = 4096 } h.md.compatibilityMode = mdutil.GetBool(md, compatibilityMode) diff --git a/handler/ss/udp/metadata.go b/handler/ss/udp/metadata.go index 2491fbb..a2d4a99 100644 --- a/handler/ss/udp/metadata.go +++ b/handler/ss/udp/metadata.go @@ -27,7 +27,7 @@ func (h *ssuHandler) parseMetadata(md mdata.Metadata) (err error) { if bs := mdutil.GetInt(md, bufferSize); bs > 0 { h.md.bufferSize = int(math.Min(math.Max(float64(bs), 512), 64*1024)) } else { - h.md.bufferSize = 1500 + h.md.bufferSize = 4096 } return } diff --git a/handler/tap/metadata.go b/handler/tap/metadata.go index 60065b2..4860075 100644 --- a/handler/tap/metadata.go +++ b/handler/tap/metadata.go @@ -19,7 +19,7 @@ func (h *tapHandler) parseMetadata(md mdata.Metadata) (err error) { h.md.key = mdutil.GetString(md, key) h.md.bufferSize = mdutil.GetInt(md, bufferSize) if h.md.bufferSize <= 0 { - h.md.bufferSize = 1500 + h.md.bufferSize = 4096 } return } diff --git a/internal/net/transport.go b/internal/net/transport.go index 375cd36..eca8efc 100644 --- a/internal/net/transport.go +++ b/internal/net/transport.go @@ -8,14 +8,18 @@ import ( "github.com/go-gost/core/common/bufpool" ) +const ( + bufferSize = 32 * 1024 +) + func Transport(rw1, rw2 io.ReadWriter) error { errc := make(chan error, 1) go func() { - errc <- CopyBuffer(rw1, rw2, 8192) + errc <- CopyBuffer(rw1, rw2, bufferSize) }() go func() { - errc <- CopyBuffer(rw2, rw1, 8192) + errc <- CopyBuffer(rw2, rw1, bufferSize) }() if err := <-errc; err != nil && err != io.EOF { @@ -26,7 +30,7 @@ func Transport(rw1, rw2 io.ReadWriter) error { } func CopyBuffer(dst io.Writer, src io.Reader, bufSize int) error { - buf := bufpool.Get(8192) + buf := bufpool.Get(bufSize) defer bufpool.Put(buf) _, err := io.CopyBuffer(dst, src, *buf) diff --git a/internal/net/udp/relay.go b/internal/net/udp/relay.go index b8c012e..62b6e84 100644 --- a/internal/net/udp/relay.go +++ b/internal/net/udp/relay.go @@ -41,7 +41,7 @@ func (r *Relay) SetBufferSize(n int) { func (r *Relay) Run() (err error) { bufSize := r.bufferSize if bufSize <= 0 { - bufSize = 1500 + bufSize = 4096 } errc := make(chan error, 2) diff --git a/internal/util/icmp/conn.go b/internal/util/icmp/conn.go index d782e5c..5d333aa 100644 --- a/internal/util/icmp/conn.go +++ b/internal/util/icmp/conn.go @@ -15,8 +15,8 @@ import ( ) const ( - readBufferSize = 1500 - writeBufferSize = 1500 + readBufferSize = 4096 + writeBufferSize = 4096 magicNumber = 0x474F5354 ) diff --git a/limiter/traffic/wrapper/conn.go b/limiter/traffic/wrapper/conn.go index 144d130..4cec7e0 100644 --- a/limiter/traffic/wrapper/conn.go +++ b/limiter/traffic/wrapper/conn.go @@ -449,3 +449,10 @@ func (c *udpConn) SetDSCP(n int) error { } return nil } + +func (c *udpConn) Metadata() metadata.Metadata { + if md, ok := c.PacketConn.(metadata.Metadatable); ok { + return md.Metadata() + } + return nil +} diff --git a/listener/ftcp/metadata.go b/listener/ftcp/metadata.go index 3ec653b..8ec5142 100644 --- a/listener/ftcp/metadata.go +++ b/listener/ftcp/metadata.go @@ -9,8 +9,8 @@ import ( const ( defaultTTL = 60 * time.Second - defaultReadBufferSize = 1500 - defaultReadQueueSize = 128 + defaultReadBufferSize = 4096 + defaultReadQueueSize = 1024 defaultBacklog = 128 ) diff --git a/listener/redirect/udp/metadata.go b/listener/redirect/udp/metadata.go index 465ce93..9aabd80 100644 --- a/listener/redirect/udp/metadata.go +++ b/listener/redirect/udp/metadata.go @@ -9,7 +9,7 @@ import ( const ( defaultTTL = 30 * time.Second - defaultReadBufferSize = 1500 + defaultReadBufferSize = 4096 ) type metadata struct { diff --git a/listener/rtcp/listener.go b/listener/rtcp/listener.go index ef1d187..340e25e 100644 --- a/listener/rtcp/listener.go +++ b/listener/rtcp/listener.go @@ -104,7 +104,7 @@ func (l *rtcpListener) Close() error { close(l.closed) if l.ln != nil { l.ln.Close() - l.ln = nil + // l.ln = nil } } diff --git a/listener/rudp/listener.go b/listener/rudp/listener.go index e1943d8..38d5efe 100644 --- a/listener/rudp/listener.go +++ b/listener/rudp/listener.go @@ -110,7 +110,7 @@ func (l *rudpListener) Close() error { close(l.closed) if l.ln != nil { l.ln.Close() - l.ln = nil + // l.ln = nil } } diff --git a/listener/rudp/metadata.go b/listener/rudp/metadata.go index a055694..fd71f7d 100644 --- a/listener/rudp/metadata.go +++ b/listener/rudp/metadata.go @@ -9,8 +9,8 @@ import ( const ( defaultTTL = 5 * time.Second - defaultReadBufferSize = 1500 - defaultReadQueueSize = 128 + defaultReadBufferSize = 4096 + defaultReadQueueSize = 1024 defaultBacklog = 128 ) diff --git a/listener/udp/metadata.go b/listener/udp/metadata.go index 9b9ed95..efd775c 100644 --- a/listener/udp/metadata.go +++ b/listener/udp/metadata.go @@ -9,8 +9,8 @@ import ( const ( defaultTTL = 5 * time.Second - defaultReadBufferSize = 1500 - defaultReadQueueSize = 128 + defaultReadBufferSize = 4096 + defaultReadQueueSize = 1024 defaultBacklog = 128 ) diff --git a/metrics/wrapper/conn.go b/metrics/wrapper/conn.go index 3cd4fe7..f17674a 100644 --- a/metrics/wrapper/conn.go +++ b/metrics/wrapper/conn.go @@ -286,3 +286,10 @@ func (c *udpConn) SetDSCP(n int) error { } return nil } + +func (c *udpConn) Metadata() metadata.Metadata { + if md, ok := c.PacketConn.(metadata.Metadatable); ok { + return md.Metadata() + } + return nil +}