add port range support for service

This commit is contained in:
ginuerzh
2024-07-08 22:38:21 +08:00
parent 96f4d7bf5c
commit c1d0887a9b
73 changed files with 1915 additions and 316 deletions

View File

@ -2,6 +2,8 @@ package chain
import (
"context"
"errors"
"fmt"
"net"
"time"
@ -10,9 +12,86 @@ import (
"github.com/go-gost/core/logger"
"github.com/go-gost/core/metrics"
"github.com/go-gost/core/selector"
xnet "github.com/go-gost/x/internal/net"
"github.com/go-gost/x/internal/net/dialer"
"github.com/go-gost/x/internal/net/udp"
xmetrics "github.com/go-gost/x/metrics"
)
var (
ErrEmptyRoute = errors.New("empty route")
)
var (
DefaultRoute chain.Route = &defaultRoute{}
)
// defaultRoute is a Route without nodes.
type defaultRoute struct{}
func (*defaultRoute) Dial(ctx context.Context, network, address string, opts ...chain.DialOption) (net.Conn, error) {
var options chain.DialOptions
for _, opt := range opts {
opt(&options)
}
netd := dialer.Dialer{
Interface: options.Interface,
Netns: options.Netns,
Logger: options.Logger,
}
if options.SockOpts != nil {
netd.Mark = options.SockOpts.Mark
}
return netd.Dial(ctx, network, address)
}
func (*defaultRoute) Bind(ctx context.Context, network, address string, opts ...chain.BindOption) (net.Listener, error) {
var options chain.BindOptions
for _, opt := range opts {
opt(&options)
}
switch network {
case "tcp", "tcp4", "tcp6":
addr, err := net.ResolveTCPAddr(network, address)
if err != nil {
return nil, err
}
return net.ListenTCP(network, addr)
case "udp", "udp4", "udp6":
addr, err := net.ResolveUDPAddr(network, address)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP(network, addr)
if err != nil {
return nil, err
}
logger := logger.Default().WithFields(map[string]any{
"network": network,
"address": address,
})
ln := udp.NewListener(conn, &udp.ListenConfig{
Backlog: options.Backlog,
ReadQueueSize: options.UDPDataQueueSize,
ReadBufferSize: options.UDPDataBufferSize,
TTL: options.UDPConnTTL,
KeepAlive: true,
Logger: logger,
})
return ln, err
default:
err := fmt.Errorf("network %s unsupported", network)
return nil, err
}
}
func (r *defaultRoute) Nodes() []*chain.Node {
return nil
}
type RouteOptions struct {
Chain chain.Chainer
}
@ -25,12 +104,12 @@ func ChainRouteOption(c chain.Chainer) RouteOption {
}
}
type route struct {
type chainRoute struct {
nodes []*chain.Node
options RouteOptions
}
func NewRoute(opts ...RouteOption) *route {
func NewRoute(opts ...RouteOption) *chainRoute {
var options RouteOptions
for _, opt := range opts {
if opt != nil {
@ -38,18 +117,18 @@ func NewRoute(opts ...RouteOption) *route {
}
}
return &route{
return &chainRoute{
options: options,
}
}
func (r *route) addNode(nodes ...*chain.Node) {
func (r *chainRoute) addNode(nodes ...*chain.Node) {
r.nodes = append(r.nodes, nodes...)
}
func (r *route) Dial(ctx context.Context, network, address string, opts ...chain.DialOption) (net.Conn, error) {
func (r *chainRoute) Dial(ctx context.Context, network, address string, opts ...chain.DialOption) (net.Conn, error) {
if len(r.Nodes()) == 0 {
return chain.DefaultRoute.Dial(ctx, network, address, opts...)
return DefaultRoute.Dial(ctx, network, address, opts...)
}
var options chain.DialOptions
@ -73,9 +152,9 @@ func (r *route) Dial(ctx context.Context, network, address string, opts ...chain
return cc, nil
}
func (r *route) Bind(ctx context.Context, network, address string, opts ...chain.BindOption) (net.Listener, error) {
func (r *chainRoute) Bind(ctx context.Context, network, address string, opts ...chain.BindOption) (net.Listener, error) {
if len(r.Nodes()) == 0 {
return chain.DefaultRoute.Bind(ctx, network, address, opts...)
return DefaultRoute.Bind(ctx, network, address, opts...)
}
var options chain.BindOptions
@ -106,7 +185,7 @@ func (r *route) Bind(ctx context.Context, network, address string, opts ...chain
return ln, nil
}
func (r *route) connect(ctx context.Context, logger logger.Logger) (conn net.Conn, err error) {
func (r *chainRoute) connect(ctx context.Context, logger logger.Logger) (conn net.Conn, err error) {
network := "ip"
node := r.nodes[0]
@ -138,7 +217,7 @@ func (r *route) connect(ctx context.Context, logger logger.Logger) (conn net.Con
}
}()
addr, err := chain.Resolve(ctx, network, node.Addr, node.Options().Resolver, node.Options().HostMapper, logger)
addr, err := xnet.Resolve(ctx, network, node.Addr, node.Options().Resolver, node.Options().HostMapper, logger)
marker := node.Marker()
if err != nil {
if marker != nil {
@ -182,7 +261,7 @@ func (r *route) connect(ctx context.Context, logger logger.Logger) (conn net.Con
preNode := node
for _, node := range r.nodes[1:] {
marker := node.Marker()
addr, err = chain.Resolve(ctx, network, node.Addr, node.Options().Resolver, node.Options().HostMapper, logger)
addr, err = xnet.Resolve(ctx, network, node.Addr, node.Options().Resolver, node.Options().HostMapper, logger)
if err != nil {
cn.Close()
if marker != nil {
@ -218,14 +297,14 @@ func (r *route) connect(ctx context.Context, logger logger.Logger) (conn net.Con
return
}
func (r *route) getNode(index int) *chain.Node {
func (r *chainRoute) getNode(index int) *chain.Node {
if r == nil || len(r.Nodes()) == 0 || index < 0 || index >= len(r.Nodes()) {
return nil
}
return r.nodes[index]
}
func (r *route) Nodes() []*chain.Node {
func (r *chainRoute) Nodes() []*chain.Node {
if r != nil {
return r.nodes
}

205
chain/router.go Normal file
View File

@ -0,0 +1,205 @@
package chain
import (
"bytes"
"context"
"fmt"
"net"
"time"
"github.com/go-gost/core/chain"
"github.com/go-gost/core/logger"
"github.com/go-gost/core/recorder"
xnet "github.com/go-gost/x/internal/net"
)
type Router struct {
options chain.RouterOptions
}
func NewRouter(opts ...chain.RouterOption) *Router {
r := &Router{}
for _, opt := range opts {
if opt != nil {
opt(&r.options)
}
}
if r.options.Timeout == 0 {
r.options.Timeout = 15 * time.Second
}
if r.options.Logger == nil {
r.options.Logger = logger.Default().WithFields(map[string]any{"kind": "router"})
}
return r
}
func (r *Router) Options() *chain.RouterOptions {
if r == nil {
return nil
}
return &r.options
}
func (r *Router) Dial(ctx context.Context, network, address string) (conn net.Conn, err error) {
if r.options.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.options.Timeout)
defer cancel()
}
host := address
if h, _, _ := net.SplitHostPort(address); h != "" {
host = h
}
r.record(ctx, recorder.RecorderServiceRouterDialAddress, []byte(host))
conn, err = r.dial(ctx, network, address)
if err != nil {
r.record(ctx, recorder.RecorderServiceRouterDialAddressError, []byte(host))
return
}
if network == "udp" || network == "udp4" || network == "udp6" {
if _, ok := conn.(net.PacketConn); !ok {
return &packetConn{conn}, nil
}
}
return
}
func (r *Router) record(ctx context.Context, name string, data []byte) error {
if len(data) == 0 {
return nil
}
for _, rec := range r.options.Recorders {
if rec.Record == name {
err := rec.Recorder.Record(ctx, data)
if err != nil {
r.options.Logger.Errorf("record %s: %v", name, err)
}
return err
}
}
return nil
}
func (r *Router) dial(ctx context.Context, network, address string) (conn net.Conn, err error) {
count := r.options.Retries + 1
if count <= 0 {
count = 1
}
r.options.Logger.Debugf("dial %s/%s", address, network)
for i := 0; i < count; i++ {
var ipAddr string
ipAddr, err = xnet.Resolve(ctx, "ip", address, r.options.Resolver, r.options.HostMapper, r.options.Logger)
if err != nil {
r.options.Logger.Error(err)
break
}
var route chain.Route
if r.options.Chain != nil {
route = r.options.Chain.Route(ctx, network, ipAddr, chain.WithHostRouteOption(address))
}
if r.options.Logger.IsLevelEnabled(logger.DebugLevel) {
buf := bytes.Buffer{}
for _, node := range routePath(route) {
fmt.Fprintf(&buf, "%s@%s > ", node.Name, node.Addr)
}
fmt.Fprintf(&buf, "%s", ipAddr)
r.options.Logger.Debugf("route(retry=%d) %s", i, buf.String())
}
if route == nil {
route = DefaultRoute
}
conn, err = route.Dial(ctx, network, ipAddr,
chain.InterfaceDialOption(r.options.IfceName),
chain.NetnsDialOption(r.options.Netns),
chain.SockOptsDialOption(r.options.SockOpts),
chain.LoggerDialOption(r.options.Logger),
)
if err == nil {
break
}
r.options.Logger.Errorf("route(retry=%d) %s", i, err)
}
return
}
func (r *Router) Bind(ctx context.Context, network, address string, opts ...chain.BindOption) (ln net.Listener, err error) {
if r.options.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.options.Timeout)
defer cancel()
}
count := r.options.Retries + 1
if count <= 0 {
count = 1
}
r.options.Logger.Debugf("bind on %s/%s", address, network)
for i := 0; i < count; i++ {
var route chain.Route
if r.options.Chain != nil {
route = r.options.Chain.Route(ctx, network, address)
if route == nil || len(route.Nodes()) == 0 {
err = ErrEmptyRoute
return
}
}
if r.options.Logger.IsLevelEnabled(logger.DebugLevel) {
buf := bytes.Buffer{}
for _, node := range routePath(route) {
fmt.Fprintf(&buf, "%s@%s > ", node.Name, node.Addr)
}
fmt.Fprintf(&buf, "%s", address)
r.options.Logger.Debugf("route(retry=%d) %s", i, buf.String())
}
if route == nil {
route = DefaultRoute
}
ln, err = route.Bind(ctx, network, address, opts...)
if err == nil {
break
}
r.options.Logger.Errorf("route(retry=%d) %s", i, err)
}
return
}
func routePath(route chain.Route) (path []*chain.Node) {
if route == nil {
return
}
for _, node := range route.Nodes() {
if tr := node.Options().Transport; tr != nil {
path = append(path, routePath(tr.Options().Route)...)
}
path = append(path, node)
}
return
}
type packetConn struct {
net.Conn
}
func (c *packetConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
n, err = c.Read(b)
addr = c.Conn.RemoteAddr()
return
}
func (c *packetConn) WriteTo(b []byte, addr net.Addr) (n int, err error) {
return c.Write(b)
}

106
chain/transport.go Normal file
View File

@ -0,0 +1,106 @@
package chain
import (
"context"
"net"
"github.com/go-gost/core/chain"
"github.com/go-gost/core/connector"
"github.com/go-gost/core/dialer"
net_dialer "github.com/go-gost/x/internal/net/dialer"
)
type Transport struct {
dialer dialer.Dialer
connector connector.Connector
options chain.TransportOptions
}
func NewTransport(d dialer.Dialer, c connector.Connector, opts ...chain.TransportOption) *Transport {
tr := &Transport{
dialer: d,
connector: c,
}
for _, opt := range opts {
if opt != nil {
opt(&tr.options)
}
}
return tr
}
func (tr *Transport) Dial(ctx context.Context, addr string) (net.Conn, error) {
netd := &net_dialer.Dialer{
Interface: tr.options.IfceName,
Netns: tr.options.Netns,
}
if tr.options.SockOpts != nil {
netd.Mark = tr.options.SockOpts.Mark
}
if tr.options.Route != nil && len(tr.options.Route.Nodes()) > 0 {
netd.DialFunc = func(ctx context.Context, network, addr string) (net.Conn, error) {
return tr.options.Route.Dial(ctx, network, addr)
}
}
opts := []dialer.DialOption{
dialer.HostDialOption(tr.options.Addr),
dialer.NetDialerDialOption(netd),
}
return tr.dialer.Dial(ctx, addr, opts...)
}
func (tr *Transport) Handshake(ctx context.Context, conn net.Conn) (net.Conn, error) {
var err error
if hs, ok := tr.dialer.(dialer.Handshaker); ok {
conn, err = hs.Handshake(ctx, conn,
dialer.AddrHandshakeOption(tr.options.Addr))
if err != nil {
return nil, err
}
}
if hs, ok := tr.connector.(connector.Handshaker); ok {
return hs.Handshake(ctx, conn)
}
return conn, nil
}
func (tr *Transport) Connect(ctx context.Context, conn net.Conn, network, address string) (net.Conn, error) {
netd := &net_dialer.Dialer{
Interface: tr.options.IfceName,
Netns: tr.options.Netns,
}
if tr.options.SockOpts != nil {
netd.Mark = tr.options.SockOpts.Mark
}
return tr.connector.Connect(ctx, conn, network, address,
connector.DialerConnectOption(netd),
)
}
func (tr *Transport) Bind(ctx context.Context, conn net.Conn, network, address string, opts ...connector.BindOption) (net.Listener, error) {
if binder, ok := tr.connector.(connector.Binder); ok {
return binder.Bind(ctx, conn, network, address, opts...)
}
return nil, connector.ErrBindUnsupported
}
func (tr *Transport) Multiplex() bool {
if mux, ok := tr.dialer.(dialer.Multiplexer); ok {
return mux.Multiplex()
}
return false
}
func (tr *Transport) Options() *chain.TransportOptions {
if tr != nil {
return &tr.options
}
return nil
}
func (tr *Transport) Copy() chain.Transporter {
tr2 := &Transport{}
*tr2 = *tr
return tr
}