improve tunnel handler

This commit is contained in:
ginuerzh 2023-10-27 22:11:11 +08:00
parent a4e3c25b22
commit 0bef7c0cdf
14 changed files with 158 additions and 44 deletions

View File

@ -115,7 +115,7 @@ func (p *httpPlugin) Admit(ctx context.Context, addr string, opts ...admission.O
return
}
req, err := http.NewRequest(http.MethodPost, p.url, bytes.NewReader(v))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.url, bytes.NewReader(v))
if err != nil {
return
}

View File

@ -125,7 +125,7 @@ func (p *httpPlugin) Authenticate(ctx context.Context, user, password string, op
return
}
req, err := http.NewRequest(http.MethodPost, p.url, bytes.NewReader(v))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.url, bytes.NewReader(v))
if err != nil {
return
}

View File

@ -138,7 +138,7 @@ func (p *httpPlugin) Contains(ctx context.Context, network, addr string, opts ..
return
}
req, err := http.NewRequest(http.MethodPost, p.url, bytes.NewReader(v))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.url, bytes.NewReader(v))
if err != nil {
return
}

2
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/gin-contrib/cors v1.3.1
github.com/gin-gonic/gin v1.9.1
github.com/go-gost/core v0.0.0-20231020111249-6431cd8bb957
github.com/go-gost/core v0.0.0-20231027140845-d975ec3c7477
github.com/go-gost/gosocks4 v0.0.1
github.com/go-gost/gosocks5 v0.4.0
github.com/go-gost/plugin v0.0.0-20231020155519-e190e1c74d78

6
go.sum
View File

@ -91,8 +91,10 @@ github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SU
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gost/core v0.0.0-20231020111249-6431cd8bb957 h1:Ch7m/rplsCHjpGuOzgXV+OrXmGxNa/UVLUGV2yUFGhQ=
github.com/go-gost/core v0.0.0-20231020111249-6431cd8bb957/go.mod h1:ndkgWVYRLwupVaFFWv8ML1Nr8tD3xhHK245PLpUDg4E=
github.com/go-gost/core v0.0.0-20231026142046-9e767d674527 h1:BLhpnK+J9A3vugXCJrC+BNjz2Q4qdEE8IWIlWr7VOaw=
github.com/go-gost/core v0.0.0-20231026142046-9e767d674527/go.mod h1:ndkgWVYRLwupVaFFWv8ML1Nr8tD3xhHK245PLpUDg4E=
github.com/go-gost/core v0.0.0-20231027140845-d975ec3c7477 h1:a49XfrB4mgbw7z7oN/WTovx0X7SbxdfoANsEDTy9CqI=
github.com/go-gost/core v0.0.0-20231027140845-d975ec3c7477/go.mod h1:ndkgWVYRLwupVaFFWv8ML1Nr8tD3xhHK245PLpUDg4E=
github.com/go-gost/gosocks4 v0.0.1 h1:+k1sec8HlELuQV7rWftIkmy8UijzUt2I6t+iMPlGB2s=
github.com/go-gost/gosocks4 v0.0.1/go.mod h1:3B6L47HbU/qugDg4JnoFPHgJXE43Inz8Bah1QaN9qCc=
github.com/go-gost/gosocks5 v0.4.0 h1:EIrOEkpJez4gwHrMa33frA+hHXJyevjp47thpMQsJzI=

View File

@ -4,9 +4,11 @@ import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"net"
"github.com/go-gost/core/logger"
"github.com/go-gost/core/recorder"
"github.com/go-gost/relay"
"github.com/go-gost/x/internal/util/mux"
"github.com/google/uuid"
@ -57,8 +59,15 @@ func (h *tunnelHandler) handleBind(ctx context.Context, conn net.Conn, network,
if h.md.ingress != nil {
h.md.ingress.Set(ctx, addr, tunnelID.String())
}
if h.recorder.Recorder != nil {
h.recorder.Recorder.Record(ctx, tunnelID[:])
if h.recorder != nil {
h.recorder.Record(ctx,
[]byte(fmt.Sprintf("%s:%s", tunnelID, connectorID)),
recorder.MetadataReocrdOption(connectorMetadata{
Op: "add",
Network: network,
Server: conn.LocalAddr().String(),
}),
)
}
log.Debugf("%s/%s: tunnel=%s, connector=%s established", addr, network, tunnelID, connectorID)

View File

@ -39,7 +39,7 @@ type tunnelHandler struct {
md metadata
options handler.Options
pool *ConnectorPool
recorder recorder.RecorderObject
recorder recorder.Recorder
epSvc service.Service
ep *entrypoint
}
@ -52,7 +52,6 @@ func NewHandler(opts ...handler.Option) handler.Handler {
return &tunnelHandler{
options: options,
pool: NewConnectorPool(),
}
}
@ -68,12 +67,14 @@ func (h *tunnelHandler) Init(md md.Metadata) (err error) {
if opts := h.router.Options(); opts != nil {
for _, ro := range opts.Recorders {
if ro.Record == xrecorder.RecorderServiceHandlerTunnelEndpoint {
h.recorder = ro
if ro.Record == xrecorder.RecorderServiceHandlerTunnelConnector {
h.recorder = ro.Recorder
break
}
}
}
h.pool = NewConnectorPool()
h.pool.WithRecorder(h.recorder)
h.ep = &entrypoint{
pool: h.pool,

View File

@ -1,6 +1,7 @@
package tunnel
import (
"context"
"fmt"
"net"
"sync"
@ -8,11 +9,18 @@ import (
"time"
"github.com/go-gost/core/logger"
"github.com/go-gost/core/recorder"
"github.com/go-gost/relay"
"github.com/go-gost/x/internal/util/mux"
"github.com/google/uuid"
)
type connectorMetadata struct {
Op string
Network string
Server string
}
type Connector struct {
id relay.ConnectorID
t time.Time
@ -54,18 +62,25 @@ type Tunnel struct {
connectors []*Connector
t time.Time
n uint64
close chan struct{}
mu sync.RWMutex
recorder recorder.Recorder
}
func NewTunnel(id relay.TunnelID) *Tunnel {
t := &Tunnel{
id: id,
t: time.Now(),
close: make(chan struct{}),
}
go t.clean()
return t
}
func (t *Tunnel) WithRecorder(recorder recorder.Recorder) {
t.recorder = recorder
}
func (t *Tunnel) ID() relay.TunnelID {
return t.id
}
@ -87,6 +102,9 @@ func (t *Tunnel) GetConnector(network string) *Connector {
var connectors []*Connector
for _, c := range t.connectors {
if c.Session().IsClosed() {
continue
}
if network == "udp" && c.id.IsUDP() ||
network != "udp" && !c.id.IsUDP() {
connectors = append(connectors, c)
@ -99,34 +117,83 @@ func (t *Tunnel) GetConnector(network string) *Connector {
return connectors[n%uint64(len(connectors))]
}
func (t *Tunnel) CloseOnIdle() bool {
t.mu.RLock()
defer t.mu.RUnlock()
select {
case <-t.close:
default:
if len(t.connectors) == 0 {
close(t.close)
return true
}
}
return false
}
func (t *Tunnel) clean() {
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
t.mu.Lock()
if len(t.connectors) == 0 {
t.mu.Unlock()
}
var connectors []*Connector
for _, c := range t.connectors {
if c.Session().IsClosed() {
logger.Default().Debugf("remove tunnel %s connector %s", t.id, c.id)
logger.Default().Debugf("remove tunnel: %s, connector: %s", t.id, c.id)
if t.recorder != nil {
t.recorder.Record(context.Background(),
[]byte(fmt.Sprintf("%s:%s", t.id, c.id)),
recorder.MetadataReocrdOption(connectorMetadata{
Op: "del",
}),
)
}
continue
}
connectors = append(connectors, c)
if t.recorder != nil {
t.recorder.Record(context.Background(),
[]byte(fmt.Sprintf("%s:%s", t.id, c.id)),
recorder.MetadataReocrdOption(connectorMetadata{
Op: "set",
}),
)
}
}
if len(connectors) != len(t.connectors) {
t.connectors = connectors
}
t.mu.Unlock()
case <-t.close:
return
}
}
}
type ConnectorPool struct {
tunnels map[string]*Tunnel
mu sync.RWMutex
recorder recorder.Recorder
}
func NewConnectorPool() *ConnectorPool {
return &ConnectorPool{
p := &ConnectorPool{
tunnels: make(map[string]*Tunnel),
}
go p.closeIdles()
return p
}
func (p *ConnectorPool) WithRecorder(recorder recorder.Recorder) {
p.recorder = recorder
}
func (p *ConnectorPool) Add(tid relay.TunnelID, c *Connector) {
@ -138,6 +205,8 @@ func (p *ConnectorPool) Add(tid relay.TunnelID, c *Connector) {
t := p.tunnels[s]
if t == nil {
t = NewTunnel(tid)
t.WithRecorder(p.recorder)
p.tunnels[s] = t
}
t.AddConnector(c)
@ -159,6 +228,22 @@ func (p *ConnectorPool) Get(network string, tid relay.TunnelID) *Connector {
return t.GetConnector(network)
}
func (p *ConnectorPool) closeIdles() {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for range ticker.C {
p.mu.Lock()
for k, v := range p.tunnels {
if v.CloseOnIdle() {
delete(p.tunnels, k)
logger.Default().Debugf("remove idle tunnel: %s", k)
}
}
p.mu.Unlock()
}
}
func parseTunnelID(s string) (tid relay.TunnelID) {
if s == "" {
return

View File

@ -159,7 +159,7 @@ func (p *httpPlugin) Select(ctx context.Context, opts ...hop.SelectOption) *chai
return nil
}
req, err := http.NewRequest(http.MethodPost, p.url, bytes.NewReader(v))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.url, bytes.NewReader(v))
if err != nil {
p.log.Error(err)
return nil

View File

@ -11,8 +11,8 @@ import (
"github.com/go-gost/core/hosts"
"github.com/go-gost/core/logger"
"github.com/go-gost/plugin/hosts/proto"
auth_util "github.com/go-gost/x/internal/util/auth"
"github.com/go-gost/x/internal/plugin"
auth_util "github.com/go-gost/x/internal/util/auth"
"google.golang.org/grpc"
)
@ -133,7 +133,7 @@ func (p *httpPlugin) Lookup(ctx context.Context, network, host string, opts ...h
return
}
req, err := http.NewRequest(http.MethodPost, p.url, bytes.NewReader(v))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.url, bytes.NewReader(v))
if err != nil {
return
}

View File

@ -134,7 +134,7 @@ func (p *httpPlugin) Get(ctx context.Context, host string, opts ...ingress.GetOp
return
}
req, err := http.NewRequest(http.MethodPost, p.url, bytes.NewReader(v))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.url, bytes.NewReader(v))
if err != nil {
return
}
@ -174,7 +174,7 @@ func (p *httpPlugin) Set(ctx context.Context, host, endpoint string, opts ...ing
return
}
req, err := http.NewRequest(http.MethodPut, p.url, bytes.NewReader(v))
req, err := http.NewRequestWithContext(ctx, http.MethodPut, p.url, bytes.NewReader(v))
if err != nil {
return
}

View File

@ -53,9 +53,17 @@ func (p *grpcPlugin) Record(ctx context.Context, b []byte, opts ...recorder.Reco
return nil
}
_, err := p.client.Record(context.Background(),
var options recorder.RecordOptions
for _, opt := range opts {
opt(&options)
}
md, _ := json.Marshal(options.Metadata)
_, err := p.client.Record(ctx,
&proto.RecordRequest{
Data: b,
Metadata: md,
})
if err != nil {
p.log.Error(err)
@ -73,6 +81,7 @@ func (p *grpcPlugin) Close() error {
type httpPluginRequest struct {
Data []byte `json:"data"`
Metadata []byte `json:"metadata"`
}
type httpPluginResponse struct {
@ -109,15 +118,23 @@ func (p *httpPlugin) Record(ctx context.Context, b []byte, opts ...recorder.Reco
return nil
}
var options recorder.RecordOptions
for _, opt := range opts {
opt(&options)
}
md, _ := json.Marshal(options.Metadata)
rb := httpPluginRequest{
Data: b,
Metadata: md,
}
v, err := json.Marshal(&rb)
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, p.url, bytes.NewReader(v))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.url, bytes.NewReader(v))
if err != nil {
return err
}

View File

@ -2,5 +2,5 @@ package recorder
const (
RecorderServiceHandlerSerial = "recorder.service.handler.serial"
RecorderServiceHandlerTunnelEndpoint = "recorder.service.handler.tunnel.endpoint"
RecorderServiceHandlerTunnelConnector = "recorder.service.handler.tunnel.connector"
)

View File

@ -56,7 +56,7 @@ func (p *grpcPlugin) Resolve(ctx context.Context, network, host string, opts ...
return
}
r, err := p.client.Resolve(context.Background(),
r, err := p.client.Resolve(ctx,
&proto.ResolveRequest{
Network: network,
Host: host,
@ -134,7 +134,7 @@ func (p *httpPlugin) Resolve(ctx context.Context, network, host string, opts ...
return
}
req, err := http.NewRequest(http.MethodPost, p.url, bytes.NewReader(v))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, p.url, bytes.NewReader(v))
if err != nil {
return
}