update sd

This commit is contained in:
ginuerzh 2023-11-02 22:31:52 +08:00
parent 68edeb2d59
commit d464be5fd0
5 changed files with 48 additions and 18 deletions

View File

@ -54,15 +54,15 @@ func (h *tunnelHandler) handleBind(ctx context.Context, conn net.Conn, network,
return return
} }
h.pool.Add(tunnelID, NewConnector(connectorID, session)) h.pool.Add(tunnelID, NewConnector(connectorID, tunnelID, h.id, session, h.md.sd), h.md.tunnelTTL)
if h.md.ingress != nil { if h.md.ingress != nil {
h.md.ingress.Set(ctx, addr, tunnelID.String()) h.md.ingress.Set(ctx, addr, tunnelID.String())
} }
if h.md.sd != nil { if h.md.sd != nil {
err := h.md.sd.Register(ctx, &sd.Service{ err := h.md.sd.Register(ctx, &sd.Service{
ID: connectorID.String(), ID: connectorID.String(),
Name: tunnelID.String(), Name: tunnelID.String(),
Node: h.id, Node: h.id,
Network: network, Network: network,
Address: h.md.entryPoint, Address: h.md.entryPoint,
}) })

View File

@ -20,7 +20,9 @@ type Dialer struct {
func (d *Dialer) Dial(ctx context.Context, network string, tid string) (conn net.Conn, node string, cid string, err error) { func (d *Dialer) Dial(ctx context.Context, network string, tid string) (conn net.Conn, node string, cid string, err error) {
retry := d.retry retry := d.retry
retry = 1 if retry <= 0 {
retry = 1
}
for i := 0; i < retry; i++ { for i := 0; i < retry; i++ {
c := d.pool.Get(network, tid) c := d.pool.Get(network, tid)

View File

@ -15,12 +15,17 @@ import (
"github.com/go-gost/x/registry" "github.com/go-gost/x/registry"
) )
const (
defaultTTL = 15 * time.Second
)
type metadata struct { type metadata struct {
readTimeout time.Duration readTimeout time.Duration
entryPoint string entryPoint string
entryPointID relay.TunnelID entryPointID relay.TunnelID
entryPointProxyProtocol int entryPointProxyProtocol int
directTunnel bool directTunnel bool
tunnelTTL time.Duration
ingress ingress.Ingress ingress ingress.Ingress
sd sd.SD sd sd.SD
muxCfg *mux.Config muxCfg *mux.Config
@ -29,6 +34,10 @@ type metadata struct {
func (h *tunnelHandler) parseMetadata(md mdata.Metadata) (err error) { func (h *tunnelHandler) parseMetadata(md mdata.Metadata) (err error) {
h.md.readTimeout = mdutil.GetDuration(md, "readTimeout") h.md.readTimeout = mdutil.GetDuration(md, "readTimeout")
h.md.tunnelTTL = mdutil.GetDuration(md, "tunnel.ttl")
if h.md.tunnelTTL <= 0 {
h.md.tunnelTTL = defaultTTL
}
h.md.directTunnel = mdutil.GetBool(md, "tunnel.direct") h.md.directTunnel = mdutil.GetBool(md, "tunnel.direct")
h.md.entryPoint = mdutil.GetString(md, "entrypoint") h.md.entryPoint = mdutil.GetString(md, "entrypoint")
h.md.entryPointID = parseTunnelID(mdutil.GetString(md, "entrypoint.id")) h.md.entryPointID = parseTunnelID(mdutil.GetString(md, "entrypoint.id"))

View File

@ -14,16 +14,22 @@ import (
) )
type Connector struct { type Connector struct {
id relay.ConnectorID id relay.ConnectorID
t time.Time tid relay.TunnelID
s *mux.Session node string
sd sd.SD
t time.Time
s *mux.Session
} }
func NewConnector(id relay.ConnectorID, s *mux.Session) *Connector { func NewConnector(id relay.ConnectorID, tid relay.TunnelID, node string, s *mux.Session, sd sd.SD) *Connector {
c := &Connector{ c := &Connector{
id: id, id: id,
t: time.Now(), tid: tid,
s: s, node: node,
sd: sd,
t: time.Now(),
s: s,
} }
go c.accept() go c.accept()
return c return c
@ -35,6 +41,13 @@ func (c *Connector) accept() {
if err != nil { if err != nil {
logger.Default().Errorf("connector %s: %v", c.id, err) logger.Default().Errorf("connector %s: %v", c.id, err)
c.s.Close() c.s.Close()
if c.sd != nil {
c.sd.Deregister(context.Background(), &sd.Service{
ID: c.id.String(),
Name: c.tid.String(),
Node: c.node,
})
}
return return
} }
conn.Close() conn.Close()
@ -58,14 +71,19 @@ type Tunnel struct {
close chan struct{} close chan struct{}
mu sync.RWMutex mu sync.RWMutex
sd sd.SD sd sd.SD
ttl time.Duration
} }
func NewTunnel(node string, tid relay.TunnelID) *Tunnel { func NewTunnel(node string, tid relay.TunnelID, ttl time.Duration) *Tunnel {
t := &Tunnel{ t := &Tunnel{
node: node, node: node,
id: tid, id: tid,
t: time.Now(), t: time.Now(),
close: make(chan struct{}), close: make(chan struct{}),
ttl: ttl,
}
if t.ttl <= 0 {
t.ttl = defaultTTL
} }
go t.clean() go t.clean()
return t return t
@ -127,7 +145,7 @@ func (t *Tunnel) CloseOnIdle() bool {
} }
func (t *Tunnel) clean() { func (t *Tunnel) clean() {
ticker := time.NewTicker(1 * time.Minute) ticker := time.NewTicker(t.ttl)
defer ticker.Stop() defer ticker.Stop()
for { for {
@ -188,7 +206,7 @@ func NewConnectorPool(node string, sd sd.SD) *ConnectorPool {
return p return p
} }
func (p *ConnectorPool) Add(tid relay.TunnelID, c *Connector) { func (p *ConnectorPool) Add(tid relay.TunnelID, c *Connector, ttl time.Duration) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@ -196,7 +214,7 @@ func (p *ConnectorPool) Add(tid relay.TunnelID, c *Connector) {
t := p.tunnels[s] t := p.tunnels[s]
if t == nil { if t == nil {
t = NewTunnel(p.node, tid) t = NewTunnel(p.node, tid, ttl)
t.WithSD(p.sd) t.WithSD(p.sd)
p.tunnels[s] = t p.tunnels[s] = t

View File

@ -146,7 +146,7 @@ type sdService struct {
} }
type httpGetResponse struct { type httpGetResponse struct {
Services []*sdService Services []*sdService `json:"services"`
} }
type httpPlugin struct { type httpPlugin struct {
@ -327,8 +327,9 @@ func (p *httpPlugin) Get(ctx context.Context, name string) (services []*sd.Servi
continue continue
} }
services = append(services, &sd.Service{ services = append(services, &sd.Service{
Node: v.Node, ID: v.ID,
Name: v.Name, Name: v.Name,
Node: v.Node,
Network: v.Network, Network: v.Network,
Address: v.Address, Address: v.Address,
}) })