relay: add direct routing for tunnel

This commit is contained in:
ginuerzh 2023-02-06 21:16:56 +08:00
parent 02a5f4dde4
commit 6f9f5ce6ab
9 changed files with 58 additions and 36 deletions

View File

@ -333,7 +333,7 @@ func ParseIngress(cfg *config.IngressConfig) ingress.Ingress {
} }
rules = append(rules, xingress.Rule{ rules = append(rules, xingress.Rule{
Host: rule.Hostname, Hostname: rule.Hostname,
Endpoint: rule.Endpoint, Endpoint: rule.Endpoint,
}) })
} }

View File

@ -19,13 +19,12 @@ func (c *relayConnector) parseMetadata(md mdata.Metadata) (err error) {
const ( const (
connectTimeout = "connectTimeout" connectTimeout = "connectTimeout"
noDelay = "nodelay" noDelay = "nodelay"
tunnelID = "tunnelID"
) )
c.md.connectTimeout = mdutil.GetDuration(md, connectTimeout) c.md.connectTimeout = mdutil.GetDuration(md, connectTimeout)
c.md.noDelay = mdutil.GetBool(md, noDelay) c.md.noDelay = mdutil.GetBool(md, noDelay)
if s := mdutil.GetString(md, tunnelID); s != "" { if s := mdutil.GetString(md, "tunnelID", "tunnel.id"); s != "" {
uuid, err := uuid.Parse(s) uuid, err := uuid.Parse(s)
if err != nil { if err != nil {
return err return err

View File

@ -132,7 +132,11 @@ func (h *forwardHandler) Handle(ctx context.Context, conn net.Conn, opts ...hand
log.Debugf("%s >> %s", conn.RemoteAddr(), target.Addr) log.Debugf("%s >> %s", conn.RemoteAddr(), target.Addr)
cc, err := h.router.Dial(ctx, network, target.Addr) addr := target.Addr
if _, _, err := net.SplitHostPort(addr); err != nil {
addr += ":0"
}
cc, err := h.router.Dial(ctx, network, addr)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
// TODO: the router itself may be failed due to the failed node in the router, // TODO: the router itself may be failed due to the failed node in the router,

View File

@ -13,6 +13,7 @@ import (
"github.com/go-gost/x/internal/net/udp" "github.com/go-gost/x/internal/net/udp"
"github.com/go-gost/x/internal/util/mux" "github.com/go-gost/x/internal/util/mux"
relay_util "github.com/go-gost/x/internal/util/relay" relay_util "github.com/go-gost/x/internal/util/relay"
metrics "github.com/go-gost/x/metrics/wrapper"
xservice "github.com/go-gost/x/service" xservice "github.com/go-gost/x/service"
"github.com/google/uuid" "github.com/google/uuid"
) )
@ -129,23 +130,32 @@ func (h *relayHandler) bindUDP(ctx context.Context, conn net.Conn, network, addr
Status: relay.StatusOK, Status: relay.StatusOK,
} }
var pc net.PacketConn
var err error
bindAddr, _ := net.ResolveUDPAddr(network, address) bindAddr, _ := net.ResolveUDPAddr(network, address)
pc, err := net.ListenUDP(network, bindAddr) pc, err = net.ListenUDP(network, bindAddr)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return err return err
} }
serviceName := fmt.Sprintf("%s-ep-%s", h.options.Service, pc.LocalAddr())
log = log.WithFields(map[string]any{
"service": serviceName,
"listener": "udp",
"handler": "ep-udp",
"bind": fmt.Sprintf("%s/%s", pc.LocalAddr(), pc.LocalAddr().Network()),
})
pc = metrics.WrapPacketConn(serviceName, pc)
// pc = admission.WrapPacketConn(l.options.Admission, pc)
// pc = limiter.WrapPacketConn(l.options.TrafficLimiter, pc)
defer pc.Close() defer pc.Close()
af := &relay.AddrFeature{} af := &relay.AddrFeature{}
err = af.ParseFrom(pc.LocalAddr().String()) if err := af.ParseFrom(pc.LocalAddr().String()); err != nil {
if err != nil {
log.Warn(err) log.Warn(err)
} }
// Issue: may not reachable when host has multi-interface
af.Host, _, _ = net.SplitHostPort(conn.LocalAddr().String())
af.AType = relay.AddrIPv4
resp.Features = append(resp.Features, af) resp.Features = append(resp.Features, af)
if _, err := resp.WriteTo(conn); err != nil { if _, err := resp.WriteTo(conn); err != nil {
log.Error(err) log.Error(err)
@ -183,7 +193,6 @@ func (h *relayHandler) handleBindTunnel(ctx context.Context, conn net.Conn, netw
resp.WriteTo(conn) resp.WriteTo(conn)
return return
} }
connectorID := relay.NewConnectorID(uuid[:]) connectorID := relay.NewConnectorID(uuid[:])
if network == "udp" { if network == "udp" {
connectorID = relay.NewUDPConnectorID(uuid[:]) connectorID = relay.NewUDPConnectorID(uuid[:])

View File

@ -123,11 +123,10 @@ func (h *relayHandler) handleConnectTunnel(ctx context.Context, conn net.Conn, n
if ingress := h.md.ingress; ingress != nil { if ingress := h.md.ingress; ingress != nil {
tid = parseTunnelID(ingress.Get(host)) tid = parseTunnelID(ingress.Get(host))
} }
if !tid.Equal(tunnelID) && !h.md.directTunnel {
if !tid.Equal(tunnelID) {
resp.Status = relay.StatusBadRequest resp.Status = relay.StatusBadRequest
resp.WriteTo(conn) resp.WriteTo(conn)
err := fmt.Errorf("tunnel %s not found", tunnelID.String()) err := fmt.Errorf("not route to host %s", host)
log.Error(err) log.Error(err)
return err return err
} }

View File

@ -177,8 +177,13 @@ func (h *tunnelHandler) Handle(ctx context.Context, conn net.Conn, opts ...handl
if h.ingress != nil { if h.ingress != nil {
tunnelID = parseTunnelID(h.ingress.Get(host)) tunnelID = parseTunnelID(h.ingress.Get(host))
} }
if tunnelID.IsZero() {
err := fmt.Errorf("no route to host %s", host)
log.Error(err)
return err
}
if tunnelID.IsPrivate() { if tunnelID.IsPrivate() {
err := fmt.Errorf("access denied: tunnel %s is private", tunnelID) err := fmt.Errorf("access denied: tunnel %s is private for host %s", tunnelID, host)
log.Error(err) log.Error(err)
return err return err
} }

View File

@ -21,6 +21,7 @@ type metadata struct {
hash string hash string
entryPoint string entryPoint string
ingress ingress.Ingress ingress ingress.Ingress
directTunnel bool
} }
func (h *relayHandler) parseMetadata(md mdata.Metadata) (err error) { func (h *relayHandler) parseMetadata(md mdata.Metadata) (err error) {
@ -47,13 +48,23 @@ func (h *relayHandler) parseMetadata(md mdata.Metadata) (err error) {
h.md.entryPoint = mdutil.GetString(md, entryPoint) h.md.entryPoint = mdutil.GetString(md, entryPoint)
h.md.ingress = registry.IngressRegistry().Get(mdutil.GetString(md, "ingress")) h.md.ingress = registry.IngressRegistry().Get(mdutil.GetString(md, "ingress"))
h.md.directTunnel = mdutil.GetBool(md, "tunnel.direct")
if h.md.ingress == nil { if h.md.ingress == nil {
if ss := strings.Split(mdutil.GetString(md, "tunnel"), ":"); len(ss) == 2 { var rules []xingress.Rule
for _, s := range strings.Split(mdutil.GetString(md, "tunnel"), ",") {
ss := strings.SplitN(s, ":", 2)
if len(ss) != 2 {
continue
}
rules = append(rules, xingress.Rule{
Hostname: ss[0],
Endpoint: ss[1],
})
}
if len(rules) > 0 {
h.md.ingress = xingress.NewIngress( h.md.ingress = xingress.NewIngress(
xingress.RulesOption([]xingress.Rule{ xingress.RulesOption(rules),
{Host: ss[0], Endpoint: ss[1]},
}),
xingress.LoggerOption(logger.Default().WithFields(map[string]any{ xingress.LoggerOption(logger.Default().WithFields(map[string]any{
"kind": "ingress", "kind": "ingress",
})), })),

View File

@ -87,14 +87,9 @@ func (t *Tunnel) GetConnector(network string) *Connector {
var connectors []*Connector var connectors []*Connector
for _, c := range t.connectors { for _, c := range t.connectors {
if network == "udp" { if network == "udp" && c.id.IsUDP() ||
if c.id.IsUDP() { network != "udp" && !c.id.IsUDP() {
connectors = append(connectors, c) connectors = append(connectors, c)
}
} else {
if !c.id.IsUDP() {
connectors = append(connectors, c)
}
} }
} }
if len(connectors) == 0 { if len(connectors) == 0 {
@ -181,14 +176,14 @@ func parseTunnelID(s string) (tid relay.TunnelID) {
return relay.NewTunnelID(uuid[:]) return relay.NewTunnelID(uuid[:])
} }
func getTunnelConn(network string, pool *ConnectorPool, tunnelID relay.TunnelID, retry int, log logger.Logger) (conn net.Conn, err error) { func getTunnelConn(network string, pool *ConnectorPool, tid relay.TunnelID, retry int, log logger.Logger) (conn net.Conn, err error) {
if retry <= 0 { if retry <= 0 {
retry = 1 retry = 1
} }
for i := 0; i < retry; i++ { for i := 0; i < retry; i++ {
c := pool.Get(network, tunnelID) c := pool.Get(network, tid)
if c == nil { if c == nil {
err = fmt.Errorf("tunnel %s not available", tunnelID.String()) err = fmt.Errorf("tunnel %s not available", tid.String())
break break
} }

View File

@ -15,7 +15,7 @@ import (
) )
type Rule struct { type Rule struct {
Host string Hostname string
Endpoint string Endpoint string
} }
@ -122,10 +122,10 @@ func (ing *ingress) reload(ctx context.Context) error {
rules := make(map[string]Rule) rules := make(map[string]Rule)
fn := func(rule Rule) { fn := func(rule Rule) {
if rule.Host == "" || rule.Endpoint == "" { if rule.Hostname == "" || rule.Endpoint == "" {
return return
} }
host := rule.Host host := rule.Hostname
if host[0] == '*' { if host[0] == '*' {
host = host[1:] host = host[1:]
} }
@ -210,7 +210,7 @@ func (ing *ingress) parseRules(r io.Reader) (rules []Rule, err error) {
scanner := bufio.NewScanner(r) scanner := bufio.NewScanner(r)
for scanner.Scan() { for scanner.Scan() {
if rule := ing.parseLine(scanner.Text()); rule.Host != "" { if rule := ing.parseLine(scanner.Text()); rule.Hostname != "" {
rules = append(rules, rule) rules = append(rules, rule)
} }
} }
@ -287,7 +287,7 @@ func (ing *ingress) parseLine(s string) (rule Rule) {
} }
return Rule{ return Rule{
Host: sp[0], Hostname: sp[0],
Endpoint: sp[1], Endpoint: sp[1],
} }
} }