add sd for tunnel

This commit is contained in:
ginuerzh
2023-10-31 22:59:14 +08:00
parent e8d5e719a4
commit a7166b8206
17 changed files with 795 additions and 173 deletions

View File

@ -3,24 +3,17 @@ package tunnel
import (
"context"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/go-gost/core/logger"
"github.com/go-gost/core/recorder"
"github.com/go-gost/core/sd"
"github.com/go-gost/relay"
"github.com/go-gost/x/internal/util/mux"
"github.com/google/uuid"
)
type connectorMetadata struct {
Op string
Network string
Server string
}
type Connector struct {
id relay.ConnectorID
t time.Time
@ -58,18 +51,20 @@ func (c *Connector) Session() *mux.Session {
}
type Tunnel struct {
node string
id relay.TunnelID
connectors []*Connector
t time.Time
n uint64
close chan struct{}
mu sync.RWMutex
recorder recorder.Recorder
sd sd.SD
}
func NewTunnel(id relay.TunnelID) *Tunnel {
func NewTunnel(node string, tid relay.TunnelID) *Tunnel {
t := &Tunnel{
id: id,
node: node,
id: tid,
t: time.Now(),
close: make(chan struct{}),
}
@ -77,8 +72,8 @@ func NewTunnel(id relay.TunnelID) *Tunnel {
return t
}
func (t *Tunnel) WithRecorder(recorder recorder.Recorder) {
t.recorder = recorder
func (t *Tunnel) WithSD(sd sd.SD) {
t.sd = sd
}
func (t *Tunnel) ID() relay.TunnelID {
@ -142,30 +137,21 @@ func (t *Tunnel) clean() {
t.mu.Lock()
if len(t.connectors) == 0 {
t.mu.Unlock()
break
}
var connectors []*Connector
for _, c := range t.connectors {
if c.Session().IsClosed() {
logger.Default().Debugf("remove tunnel: %s, connector: %s", t.id, c.id)
if t.recorder != nil {
t.recorder.Record(context.Background(),
[]byte(fmt.Sprintf("%s:%s", t.id, c.id)),
recorder.MetadataReocrdOption(connectorMetadata{
Op: "del",
}),
)
if t.sd != nil {
t.sd.Deregister(context.Background(), fmt.Sprintf("%s:%s:%s", t.node, t.id, c.id))
}
continue
}
connectors = append(connectors, c)
if t.recorder != nil {
t.recorder.Record(context.Background(),
[]byte(fmt.Sprintf("%s:%s", t.id, c.id)),
recorder.MetadataReocrdOption(connectorMetadata{
Op: "set",
}),
)
if t.sd != nil {
t.sd.Renew(context.Background(), fmt.Sprintf("%s:%s:%s", t.node, t.id, c.id))
}
}
if len(connectors) != len(t.connectors) {
@ -179,23 +165,22 @@ func (t *Tunnel) clean() {
}
type ConnectorPool struct {
tunnels map[string]*Tunnel
mu sync.RWMutex
recorder recorder.Recorder
node string
sd sd.SD
tunnels map[string]*Tunnel
mu sync.RWMutex
}
func NewConnectorPool() *ConnectorPool {
func NewConnectorPool(node string, sd sd.SD) *ConnectorPool {
p := &ConnectorPool{
node: node,
sd: sd,
tunnels: make(map[string]*Tunnel),
}
go p.closeIdles()
return p
}
func (p *ConnectorPool) WithRecorder(recorder recorder.Recorder) {
p.recorder = recorder
}
func (p *ConnectorPool) Add(tid relay.TunnelID, c *Connector) {
p.mu.Lock()
defer p.mu.Unlock()
@ -204,15 +189,15 @@ func (p *ConnectorPool) Add(tid relay.TunnelID, c *Connector) {
t := p.tunnels[s]
if t == nil {
t = NewTunnel(tid)
t.WithRecorder(p.recorder)
t = NewTunnel(p.node, tid)
t.WithSD(p.sd)
p.tunnels[s] = t
}
t.AddConnector(c)
}
func (p *ConnectorPool) Get(network string, tid relay.TunnelID) *Connector {
func (p *ConnectorPool) Get(network string, tid string) *Connector {
if p == nil {
return nil
}
@ -220,7 +205,7 @@ func (p *ConnectorPool) Get(network string, tid relay.TunnelID) *Connector {
p.mu.RLock()
defer p.mu.RUnlock()
t := p.tunnels[tid.String()]
t := p.tunnels[tid]
if t == nil {
return nil
}
@ -260,31 +245,3 @@ func parseTunnelID(s string) (tid relay.TunnelID) {
}
return relay.NewTunnelID(uuid[:])
}
func getTunnelConn(network string, pool *ConnectorPool, tid relay.TunnelID, retry int, log logger.Logger) (conn net.Conn, cid relay.ConnectorID, err error) {
if tid.IsZero() {
err = ErrTunnelID
return
}
if retry <= 0 {
retry = 1
}
for i := 0; i < retry; i++ {
c := pool.Get(network, tid)
if c == nil {
err = fmt.Errorf("tunnel %s not available", tid.String())
break
}
conn, err = c.Session().GetConn()
if err != nil {
log.Error(err)
continue
}
cid = c.id
break
}
return
}