From e587b4df7c05b36a13739e3ba1512fdf64d57db2 Mon Sep 17 00:00:00 2001 From: ginuerzh Date: Sat, 5 Mar 2022 00:28:13 +0800 Subject: [PATCH] add metrics for service --- cmd/gost/cmd.go | 8 +- cmd/gost/config.go | 2 +- cmd/gost/main.go | 2 +- pkg/common/metrics/conn.go | 222 ++++++++++++++++++++++++++ pkg/common/metrics/listener.go | 23 +++ pkg/config/parsing/service.go | 3 +- pkg/listener/dns/listener.go | 2 + pkg/listener/ftcp/conn.go | 37 +++-- pkg/listener/ftcp/listener.go | 16 +- pkg/listener/grpc/listener.go | 9 +- pkg/listener/http2/h2/listener.go | 2 + pkg/listener/http2/listener.go | 7 +- pkg/listener/http3/listener.go | 7 +- pkg/listener/kcp/listener.go | 49 +++--- pkg/listener/obfs/http/listener.go | 22 ++- pkg/listener/obfs/tls/listener.go | 22 ++- pkg/listener/option.go | 7 + pkg/listener/pht/listener.go | 8 +- pkg/listener/quic/listener.go | 3 + pkg/listener/redirect/udp/listener.go | 25 +-- pkg/listener/rtcp/listener.go | 5 +- pkg/listener/rudp/listener.go | 11 +- pkg/listener/ssh/listener.go | 6 +- pkg/listener/sshd/listener.go | 6 +- pkg/listener/tcp/listener.go | 9 +- pkg/listener/tls/listener.go | 2 + pkg/listener/tls/mux/listener.go | 3 + pkg/listener/udp/listener.go | 21 +-- pkg/listener/ws/listener.go | 3 + pkg/listener/ws/mux/listener.go | 3 + pkg/metrics/metrics.go | 95 +++++++++++ pkg/metrics/{ => service}/service.go | 2 +- pkg/resolver/resolver.go | 2 +- pkg/service/service.go | 22 ++- 34 files changed, 548 insertions(+), 118 deletions(-) create mode 100644 pkg/common/metrics/conn.go create mode 100644 pkg/common/metrics/listener.go create mode 100644 pkg/metrics/metrics.go rename pkg/metrics/{ => service}/service.go (98%) diff --git a/cmd/gost/cmd.go b/cmd/gost/cmd.go index b29788a..8205e0d 100644 --- a/cmd/gost/cmd.go +++ b/cmd/gost/cmd.go @@ -11,7 +11,7 @@ import ( "github.com/go-gost/gost/pkg/config" "github.com/go-gost/gost/pkg/metadata" - "github.com/go-gost/gost/pkg/metrics" + metrics "github.com/go-gost/gost/pkg/metrics/service" "github.com/go-gost/gost/pkg/registry" ) @@ -46,6 +46,12 @@ func buildConfigFromCmd(services, nodes stringList) (*config.Config, error) { } } + if v := os.Getenv("GOST_LOGGING_LEVEL"); v != "" { + cfg.Log = &config.LogConfig{ + Level: v, + } + } + var chain *config.ChainConfig if len(nodes) > 0 { chain = &config.ChainConfig{ diff --git a/cmd/gost/config.go b/cmd/gost/config.go index 1a77ef0..e1a7f35 100644 --- a/cmd/gost/config.go +++ b/cmd/gost/config.go @@ -8,7 +8,7 @@ import ( "github.com/go-gost/gost/pkg/config" "github.com/go-gost/gost/pkg/config/parsing" "github.com/go-gost/gost/pkg/logger" - "github.com/go-gost/gost/pkg/metrics" + metrics "github.com/go-gost/gost/pkg/metrics/service" "github.com/go-gost/gost/pkg/registry" "github.com/go-gost/gost/pkg/service" ) diff --git a/cmd/gost/main.go b/cmd/gost/main.go index 67fed91..fdf2350 100644 --- a/cmd/gost/main.go +++ b/cmd/gost/main.go @@ -10,7 +10,7 @@ import ( "github.com/go-gost/gost/pkg/config" "github.com/go-gost/gost/pkg/logger" - "github.com/go-gost/gost/pkg/metrics" + metrics "github.com/go-gost/gost/pkg/metrics/service" ) var ( diff --git a/pkg/common/metrics/conn.go b/pkg/common/metrics/conn.go new file mode 100644 index 0000000..b831b8a --- /dev/null +++ b/pkg/common/metrics/conn.go @@ -0,0 +1,222 @@ +package metrics + +import ( + "errors" + "io" + "net" + "syscall" + + "github.com/go-gost/gost/pkg/metrics" +) + +var ( + errUnsupport = errors.New("unsupported operation") +) + +// ServerConn is a server side Conn with metrics supported. +type serverConn struct { + net.Conn + service string +} + +func WrapConn(service string, c net.Conn) net.Conn { + return &serverConn{ + service: service, + Conn: c, + } +} + +func (c *serverConn) Read(b []byte) (n int, err error) { + n, err = c.Conn.Read(b) + metrics.RequestInputBytes(c.service).Add(float64(n)) + return +} + +func (c *serverConn) Write(b []byte) (n int, err error) { + n, err = c.Conn.Write(b) + metrics.RequestOutputBytes(c.service).Add(float64(n)) + return +} + +type packetConn struct { + net.PacketConn + service string +} + +func WrapPacketConn(service string, pc net.PacketConn) net.PacketConn { + return &packetConn{ + PacketConn: pc, + service: service, + } +} + +func (c *packetConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + n, addr, err = c.PacketConn.ReadFrom(p) + metrics.RequestInputBytes(c.service).Add(float64(n)) + return +} + +func (c *packetConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { + n, err = c.PacketConn.WriteTo(p, addr) + metrics.RequestOutputBytes(c.service).Add(float64(n)) + return +} + +type udpConn struct { + net.PacketConn + service string +} + +func WrapUDPConn(service string, pc net.PacketConn) UDPConn { + return &udpConn{ + PacketConn: pc, + service: service, + } +} + +func (c *udpConn) RemoteAddr() net.Addr { + if nc, ok := c.PacketConn.(remoteAddr); ok { + return nc.RemoteAddr() + } + return nil +} + +func (c *udpConn) SetReadBuffer(n int) error { + if nc, ok := c.PacketConn.(setBuffer); ok { + return nc.SetReadBuffer(n) + } + return errUnsupport +} + +func (c *udpConn) SetWriteBuffer(n int) error { + if nc, ok := c.PacketConn.(setBuffer); ok { + return nc.SetWriteBuffer(n) + } + return errUnsupport +} + +func (c *udpConn) Read(b []byte) (n int, err error) { + if nc, ok := c.PacketConn.(io.Reader); ok { + n, err = nc.Read(b) + metrics.RequestInputBytes(c.service).Add(float64(n)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + n, addr, err = c.PacketConn.ReadFrom(p) + metrics.RequestInputBytes(c.service).Add(float64(n)) + return +} + +func (c *udpConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) { + if nc, ok := c.PacketConn.(readUDP); ok { + n, addr, err = nc.ReadFromUDP(b) + metrics.RequestInputBytes(c.service).Add(float64(n)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) ReadMsgUDP(b, oob []byte) (n, oobn, flags int, addr *net.UDPAddr, err error) { + if nc, ok := c.PacketConn.(readUDP); ok { + n, oobn, flags, addr, err = nc.ReadMsgUDP(b, oob) + metrics.RequestInputBytes(c.service).Add(float64(n + oobn)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) Write(b []byte) (n int, err error) { + if nc, ok := c.PacketConn.(io.Writer); ok { + n, err = nc.Write(b) + metrics.RequestOutputBytes(c.service).Add(float64(n)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { + n, err = c.PacketConn.WriteTo(p, addr) + metrics.RequestOutputBytes(c.service).Add(float64(n)) + return +} + +func (c *udpConn) WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error) { + if nc, ok := c.PacketConn.(writeUDP); ok { + n, err = nc.WriteToUDP(b, addr) + metrics.RequestOutputBytes(c.service).Add(float64(n)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error) { + if nc, ok := c.PacketConn.(writeUDP); ok { + n, oobn, err = nc.WriteMsgUDP(b, oob, addr) + metrics.RequestOutputBytes(c.service).Add(float64(n + oobn)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) SyscallConn() (rc syscall.RawConn, err error) { + if nc, ok := c.PacketConn.(syscallConn); ok { + return nc.SyscallConn() + } + err = errUnsupport + return +} + +func (c *udpConn) SetDSCP(n int) error { + if nc, ok := c.PacketConn.(setDSCP); ok { + return nc.SetDSCP(n) + } + return nil +} + +type UDPConn interface { + net.PacketConn + io.Reader + io.Writer + readUDP + writeUDP + setBuffer + syscallConn + remoteAddr +} + +type setBuffer interface { + SetReadBuffer(bytes int) error + SetWriteBuffer(bytes int) error +} + +type readUDP interface { + ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) + ReadMsgUDP(b, oob []byte) (n, oobn, flags int, addr *net.UDPAddr, err error) +} + +type writeUDP interface { + WriteToUDP(b []byte, addr *net.UDPAddr) (int, error) + WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error) +} + +type syscallConn interface { + SyscallConn() (syscall.RawConn, error) +} + +type remoteAddr interface { + RemoteAddr() net.Addr +} + +// tcpraw.TCPConn +type setDSCP interface { + SetDSCP(int) error +} diff --git a/pkg/common/metrics/listener.go b/pkg/common/metrics/listener.go new file mode 100644 index 0000000..f389ea4 --- /dev/null +++ b/pkg/common/metrics/listener.go @@ -0,0 +1,23 @@ +package metrics + +import "net" + +type listener struct { + service string + net.Listener +} + +func WrapListener(service string, ln net.Listener) net.Listener { + return &listener{ + service: service, + Listener: ln, + } +} + +func (ln *listener) Accept() (net.Conn, error) { + c, err := ln.Listener.Accept() + if err != nil { + return nil, err + } + return WrapConn(ln.service, c), nil +} diff --git a/pkg/config/parsing/service.go b/pkg/config/parsing/service.go index b46d94d..8977ad7 100644 --- a/pkg/config/parsing/service.go +++ b/pkg/config/parsing/service.go @@ -59,6 +59,7 @@ func ParseService(cfg *config.ServiceConfig) (service.Service, error) { listener.AuthOption(parseAuth(cfg.Listener.Auth)), listener.TLSConfigOption(tlsConfig), listener.LoggerOption(listenerLogger), + listener.ServiceOption(cfg.Name), ) if cfg.Listener.Metadata == nil { @@ -119,7 +120,7 @@ func ParseService(cfg *config.ServiceConfig) (service.Service, error) { return nil, err } - s := service.NewService(ln, h, + s := service.NewService(cfg.Name, ln, h, service.AdmissionOption(registry.AdmissionRegistry().Get(cfg.Admission)), service.LoggerOption(serviceLogger), ) diff --git a/pkg/listener/dns/listener.go b/pkg/listener/dns/listener.go index 04a7b47..e6bbc3d 100644 --- a/pkg/listener/dns/listener.go +++ b/pkg/listener/dns/listener.go @@ -9,6 +9,7 @@ import ( "net/http" "strings" + "github.com/go-gost/gost/pkg/common/metrics" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" md "github.com/go-gost/gost/pkg/metadata" @@ -112,6 +113,7 @@ func (l *dnsListener) Accept() (conn net.Conn, err error) { var ok bool select { case conn = <-l.cqueue: + conn = metrics.WrapConn(l.options.Service, conn) case err, ok = <-l.errChan: if !ok { err = listener.ErrClosed diff --git a/pkg/listener/ftcp/conn.go b/pkg/listener/ftcp/conn.go index 35c59ed..df02303 100644 --- a/pkg/listener/ftcp/conn.go +++ b/pkg/listener/ftcp/conn.go @@ -10,7 +10,7 @@ import ( // serverConn is a server side connection for UDP client peer, it implements net.Conn and net.PacketConn. type serverConn struct { - net.PacketConn + pc net.PacketConn raddr net.Addr rc chan []byte // data receive queue fresh int32 @@ -34,11 +34,11 @@ func newServerConn(conn net.PacketConn, raddr net.Addr, cfg *serverConnConfig) * cfg = &serverConnConfig{} } c := &serverConn{ - PacketConn: conn, - raddr: raddr, - rc: make(chan []byte, cfg.qsize), - closed: make(chan struct{}), - config: cfg, + pc: conn, + raddr: raddr, + rc: make(chan []byte, cfg.qsize), + closed: make(chan struct{}), + config: cfg, } go c.ttlWait() return c @@ -54,11 +54,6 @@ func (c *serverConn) send(b []byte) error { } func (c *serverConn) Read(b []byte) (n int, err error) { - n, _, err = c.ReadFrom(b) - return -} - -func (c *serverConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) { select { case bb := <-c.rc: n = copy(b, bb) @@ -68,13 +63,11 @@ func (c *serverConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) { return } - addr = c.raddr - return } func (c *serverConn) Write(b []byte) (n int, err error) { - return c.WriteTo(b, c.raddr) + return c.pc.WriteTo(b, c.raddr) } func (c *serverConn) Close() error { @@ -93,10 +86,26 @@ func (c *serverConn) Close() error { return nil } +func (c *serverConn) LocalAddr() net.Addr { + return c.pc.LocalAddr() +} + func (c *serverConn) RemoteAddr() net.Addr { return c.raddr } +func (c *serverConn) SetDeadline(t time.Time) error { + return c.pc.SetDeadline(t) +} + +func (c *serverConn) SetReadDeadline(t time.Time) error { + return c.pc.SetReadDeadline(t) +} + +func (c *serverConn) SetWriteDeadline(t time.Time) error { + return c.pc.SetWriteDeadline(t) +} + func (c *serverConn) ttlWait() { ticker := time.NewTicker(c.config.ttl) defer ticker.Stop() diff --git a/pkg/listener/ftcp/listener.go b/pkg/listener/ftcp/listener.go index d56d7da..7239d11 100644 --- a/pkg/listener/ftcp/listener.go +++ b/pkg/listener/ftcp/listener.go @@ -5,6 +5,7 @@ import ( "sync" "sync/atomic" + "github.com/go-gost/gost/pkg/common/metrics" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" md "github.com/go-gost/gost/pkg/metadata" @@ -17,23 +18,23 @@ func init() { } type ftcpListener struct { - addr string - md metadata conn net.PacketConn connChan chan net.Conn errChan chan error connPool connPool logger logger.Logger + md metadata + options listener.Options } func NewListener(opts ...listener.Option) listener.Listener { - options := &listener.Options{} + options := listener.Options{} for _, opt := range opts { - opt(options) + opt(&options) } return &ftcpListener{ - addr: options.Addr, - logger: options.Logger, + logger: options.Logger, + options: options, } } @@ -42,7 +43,7 @@ func (l *ftcpListener) Init(md md.Metadata) (err error) { return } - l.conn, err = tcpraw.Listen("tcp", addr) + l.conn, err = tcpraw.Listen("tcp", l.options.Addr) if err != nil { return } @@ -59,6 +60,7 @@ func (l *ftcpListener) Accept() (conn net.Conn, err error) { var ok bool select { case conn = <-l.connChan: + conn = metrics.WrapConn(l.options.Service, conn) case err, ok = <-l.errChan: if !ok { err = listener.ErrClosed diff --git a/pkg/listener/grpc/listener.go b/pkg/listener/grpc/listener.go index d84b7fc..10847ae 100644 --- a/pkg/listener/grpc/listener.go +++ b/pkg/listener/grpc/listener.go @@ -3,6 +3,7 @@ package grpc import ( "net" + "github.com/go-gost/gost/pkg/common/metrics" pb "github.com/go-gost/gost/pkg/common/util/grpc/proto" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -42,15 +43,11 @@ func (l *grpcListener) Init(md md.Metadata) (err error) { return } - laddr, err := net.ResolveTCPAddr("tcp", l.options.Addr) - if err != nil { - return - } - - ln, err := net.ListenTCP("tcp", laddr) + ln, err := net.Listen("tcp", l.options.Addr) if err != nil { return } + ln = metrics.WrapListener(l.options.Service, ln) var opts []grpc.ServerOption if !l.md.insecure { diff --git a/pkg/listener/http2/h2/listener.go b/pkg/listener/http2/h2/listener.go index 2b253c2..50210b2 100644 --- a/pkg/listener/http2/h2/listener.go +++ b/pkg/listener/http2/h2/listener.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httputil" + "github.com/go-gost/gost/pkg/common/metrics" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" md "github.com/go-gost/gost/pkg/metadata" @@ -68,6 +69,7 @@ func (l *h2Listener) Init(md md.Metadata) (err error) { return err } l.addr = ln.Addr() + ln = metrics.WrapListener(l.options.Service, ln) if l.h2c { l.server.Handler = h2c.NewHandler( diff --git a/pkg/listener/http2/listener.go b/pkg/listener/http2/listener.go index 0d398da..9c25fa5 100644 --- a/pkg/listener/http2/listener.go +++ b/pkg/listener/http2/listener.go @@ -5,7 +5,7 @@ import ( "net" "net/http" - "github.com/go-gost/gost/pkg/common/util" + "github.com/go-gost/gost/pkg/common/metrics" http2_util "github.com/go-gost/gost/pkg/internal/util/http2" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -58,11 +58,10 @@ func (l *http2Listener) Init(md md.Metadata) (err error) { return err } l.addr = ln.Addr() + ln = metrics.WrapListener(l.options.Service, ln) ln = tls.NewListener( - &util.TCPKeepAliveListener{ - TCPListener: ln.(*net.TCPListener), - }, + ln, l.options.TLSConfig, ) diff --git a/pkg/listener/http3/listener.go b/pkg/listener/http3/listener.go index 76aa269..63a4e8d 100644 --- a/pkg/listener/http3/listener.go +++ b/pkg/listener/http3/listener.go @@ -3,6 +3,7 @@ package http3 import ( "net" + "github.com/go-gost/gost/pkg/common/metrics" pht_util "github.com/go-gost/gost/pkg/internal/util/pht" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -64,7 +65,11 @@ func (l *http3Listener) Init(md md.Metadata) (err error) { } func (l *http3Listener) Accept() (conn net.Conn, err error) { - return l.server.Accept() + conn, err = l.server.Accept() + if err != nil { + return + } + return metrics.WrapConn(l.options.Service, conn), nil } func (l *http3Listener) Addr() net.Addr { diff --git a/pkg/listener/kcp/listener.go b/pkg/listener/kcp/listener.go index 54d11f5..6b8703f 100644 --- a/pkg/listener/kcp/listener.go +++ b/pkg/listener/kcp/listener.go @@ -4,6 +4,7 @@ import ( "net" "time" + "github.com/go-gost/gost/pkg/common/metrics" kcp_util "github.com/go-gost/gost/pkg/common/util/kcp" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -19,22 +20,23 @@ func init() { } type kcpListener struct { - addr string + conn net.PacketConn ln *kcp.Listener cqueue chan net.Conn errChan chan error logger logger.Logger md metadata + options listener.Options } func NewListener(opts ...listener.Option) listener.Listener { - options := &listener.Options{} + options := listener.Options{} for _, opt := range opts { - opt(options) + opt(&options) } return &kcpListener{ - addr: options.Addr, - logger: options.Logger, + logger: options.Logger, + options: options, } } @@ -46,37 +48,45 @@ func (l *kcpListener) Init(md md.Metadata) (err error) { config := l.md.config config.Init() - var ln *kcp.Listener + var conn net.PacketConn if config.TCP { - var conn net.PacketConn - conn, err = tcpraw.Listen("tcp", l.addr) + conn, err = tcpraw.Listen("tcp", l.options.Addr) + } else { + var udpAddr *net.UDPAddr + udpAddr, err = net.ResolveUDPAddr("udp", l.options.Addr) if err != nil { return } - ln, err = kcp.ServeConn( - kcp_util.BlockCrypt(config.Key, config.Crypt, kcp_util.DefaultSalt), config.DataShard, config.ParityShard, conn) - } else { - ln, err = kcp.ListenWithOptions(l.addr, - kcp_util.BlockCrypt(config.Key, config.Crypt, kcp_util.DefaultSalt), config.DataShard, config.ParityShard) + conn, err = net.ListenUDP("udp", udpAddr) } if err != nil { return } + conn = metrics.WrapUDPConn(l.options.Service, conn) + + ln, err := kcp.ServeConn( + kcp_util.BlockCrypt(config.Key, config.Crypt, kcp_util.DefaultSalt), + config.DataShard, config.ParityShard, conn) + if err != nil { + return + } + if config.DSCP > 0 { - if err = ln.SetDSCP(config.DSCP); err != nil { - l.logger.Warn(err) + if er := ln.SetDSCP(config.DSCP); er != nil { + l.logger.Warn(er) } } - if err = ln.SetReadBuffer(config.SockBuf); err != nil { - l.logger.Warn(err) + if er := ln.SetReadBuffer(config.SockBuf); er != nil { + l.logger.Warn(er) } - if err = ln.SetWriteBuffer(config.SockBuf); err != nil { - l.logger.Warn(err) + if er := ln.SetWriteBuffer(config.SockBuf); er != nil { + l.logger.Warn(er) } l.ln = ln + l.conn = conn l.cqueue = make(chan net.Conn, l.md.backlog) l.errChan = make(chan error, 1) @@ -98,6 +108,7 @@ func (l *kcpListener) Accept() (conn net.Conn, err error) { } func (l *kcpListener) Close() error { + l.conn.Close() return l.ln.Close() } diff --git a/pkg/listener/obfs/http/listener.go b/pkg/listener/obfs/http/listener.go index 9411248..1867099 100644 --- a/pkg/listener/obfs/http/listener.go +++ b/pkg/listener/obfs/http/listener.go @@ -3,6 +3,7 @@ package http import ( "net" + "github.com/go-gost/gost/pkg/common/metrics" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" md "github.com/go-gost/gost/pkg/metadata" @@ -14,20 +15,20 @@ func init() { } type obfsListener struct { - addr string - md metadata net.Listener - logger logger.Logger + logger logger.Logger + md metadata + options listener.Options } func NewListener(opts ...listener.Option) listener.Listener { - options := &listener.Options{} + options := listener.Options{} for _, opt := range opts { - opt(options) + opt(&options) } return &obfsListener{ - addr: options.Addr, - logger: options.Logger, + logger: options.Logger, + options: options, } } @@ -36,14 +37,11 @@ func (l *obfsListener) Init(md md.Metadata) (err error) { return } - laddr, err := net.ResolveTCPAddr("tcp", l.addr) - if err != nil { - return - } - ln, err := net.ListenTCP("tcp", laddr) + ln, err := net.Listen("tcp", l.options.Addr) if err != nil { return } + ln = metrics.WrapListener(l.options.Service, ln) l.Listener = ln return diff --git a/pkg/listener/obfs/tls/listener.go b/pkg/listener/obfs/tls/listener.go index fd6fd93..b8860a7 100644 --- a/pkg/listener/obfs/tls/listener.go +++ b/pkg/listener/obfs/tls/listener.go @@ -3,6 +3,7 @@ package tls import ( "net" + "github.com/go-gost/gost/pkg/common/metrics" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" md "github.com/go-gost/gost/pkg/metadata" @@ -14,20 +15,20 @@ func init() { } type obfsListener struct { - addr string - md metadata net.Listener - logger logger.Logger + logger logger.Logger + md metadata + options listener.Options } func NewListener(opts ...listener.Option) listener.Listener { - options := &listener.Options{} + options := listener.Options{} for _, opt := range opts { - opt(options) + opt(&options) } return &obfsListener{ - addr: options.Addr, - logger: options.Logger, + logger: options.Logger, + options: options, } } @@ -36,14 +37,11 @@ func (l *obfsListener) Init(md md.Metadata) (err error) { return } - laddr, err := net.ResolveTCPAddr("tcp", l.addr) - if err != nil { - return - } - ln, err := net.ListenTCP("tcp", laddr) + ln, err := net.Listen("tcp", l.options.Addr) if err != nil { return } + ln = metrics.WrapListener(l.options.Service, ln) l.Listener = ln return diff --git a/pkg/listener/option.go b/pkg/listener/option.go index 353effd..8cea96a 100644 --- a/pkg/listener/option.go +++ b/pkg/listener/option.go @@ -16,6 +16,7 @@ type Options struct { TLSConfig *tls.Config Chain chain.Chainer Logger logger.Logger + Service string } type Option func(opts *Options) @@ -55,3 +56,9 @@ func LoggerOption(logger logger.Logger) Option { opts.Logger = logger } } + +func ServiceOption(service string) Option { + return func(opts *Options) { + opts.Service = service + } +} diff --git a/pkg/listener/pht/listener.go b/pkg/listener/pht/listener.go index 5a863fe..d7454dd 100644 --- a/pkg/listener/pht/listener.go +++ b/pkg/listener/pht/listener.go @@ -5,6 +5,7 @@ package pht import ( "net" + "github.com/go-gost/gost/pkg/common/metrics" pht_util "github.com/go-gost/gost/pkg/internal/util/pht" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -78,7 +79,12 @@ func (l *phtListener) Init(md md.Metadata) (err error) { } func (l *phtListener) Accept() (conn net.Conn, err error) { - return l.server.Accept() + conn, err = l.server.Accept() + if err != nil { + return + } + conn = metrics.WrapConn(l.options.Service, conn) + return } func (l *phtListener) Addr() net.Addr { diff --git a/pkg/listener/quic/listener.go b/pkg/listener/quic/listener.go index 74727ed..16b5326 100644 --- a/pkg/listener/quic/listener.go +++ b/pkg/listener/quic/listener.go @@ -4,6 +4,7 @@ import ( "context" "net" + "github.com/go-gost/gost/pkg/common/metrics" quic_util "github.com/go-gost/gost/pkg/internal/util/quic" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -52,6 +53,7 @@ func (l *quicListener) Init(md md.Metadata) (err error) { } var conn net.PacketConn = uc + // conn = metrics.WrapPacketConn(l.options.Service, conn) if l.md.cipherKey != nil { conn = quic_util.CipherPacketConn(uc, l.md.cipherKey) @@ -88,6 +90,7 @@ func (l *quicListener) Accept() (conn net.Conn, err error) { var ok bool select { case conn = <-l.cqueue: + conn = metrics.WrapConn(l.options.Service, conn) case err, ok = <-l.errChan: if !ok { err = listener.ErrClosed diff --git a/pkg/listener/redirect/udp/listener.go b/pkg/listener/redirect/udp/listener.go index 1a067ca..e2883cb 100644 --- a/pkg/listener/redirect/udp/listener.go +++ b/pkg/listener/redirect/udp/listener.go @@ -14,20 +14,20 @@ func init() { } type redirectListener struct { - addr string - ln *net.UDPConn - logger logger.Logger - md metadata + ln *net.UDPConn + logger logger.Logger + md metadata + options listener.Options } func NewListener(opts ...listener.Option) listener.Listener { - options := &listener.Options{} + options := listener.Options{} for _, opt := range opts { - opt(options) + opt(&options) } return &redirectListener{ - addr: options.Addr, - logger: options.Logger, + logger: options.Logger, + options: options, } } @@ -36,7 +36,7 @@ func (l *redirectListener) Init(md md.Metadata) (err error) { return } - laddr, err := net.ResolveUDPAddr("udp", l.addr) + laddr, err := net.ResolveUDPAddr("udp", l.options.Addr) if err != nil { return } @@ -51,7 +51,12 @@ func (l *redirectListener) Init(md md.Metadata) (err error) { } func (l *redirectListener) Accept() (conn net.Conn, err error) { - return l.accept() + conn, err = l.accept() + if err != nil { + return + } + // conn = metrics.WrapConn(l.options.Service, conn) + return } func (l *redirectListener) Addr() net.Addr { diff --git a/pkg/listener/rtcp/listener.go b/pkg/listener/rtcp/listener.go index 3def291..d983e07 100644 --- a/pkg/listener/rtcp/listener.go +++ b/pkg/listener/rtcp/listener.go @@ -5,6 +5,7 @@ import ( "net" "github.com/go-gost/gost/pkg/chain" + "github.com/go-gost/gost/pkg/common/metrics" "github.com/go-gost/gost/pkg/connector" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -64,12 +65,14 @@ func (l *rtcpListener) Accept() (conn net.Conn, err error) { } if l.ln == nil { - l.ln, err = l.router.Bind(context.Background(), "tcp", l.laddr.String(), + l.ln, err = l.router.Bind( + context.Background(), "tcp", l.laddr.String(), connector.MuxBindOption(true), ) if err != nil { return nil, connector.NewAcceptError(err) } + l.ln = metrics.WrapListener(l.options.Service, l.ln) } conn, err = l.ln.Accept() if err != nil { diff --git a/pkg/listener/rudp/listener.go b/pkg/listener/rudp/listener.go index 9f0f2a5..3d8dad1 100644 --- a/pkg/listener/rudp/listener.go +++ b/pkg/listener/rudp/listener.go @@ -5,6 +5,7 @@ import ( "net" "github.com/go-gost/gost/pkg/chain" + "github.com/go-gost/gost/pkg/common/metrics" "github.com/go-gost/gost/pkg/connector" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -17,7 +18,7 @@ func init() { } type rudpListener struct { - laddr *net.UDPAddr + laddr net.Addr ln net.Listener router *chain.Router closed chan struct{} @@ -64,7 +65,8 @@ func (l *rudpListener) Accept() (conn net.Conn, err error) { } if l.ln == nil { - l.ln, err = l.router.Bind(context.Background(), "udp", l.laddr.String(), + l.ln, err = l.router.Bind( + context.Background(), "udp", l.laddr.String(), connector.BacklogBindOption(l.md.backlog), connector.UDPConnTTLBindOption(l.md.ttl), connector.UDPDataBufferSizeBindOption(l.md.readBufferSize), @@ -80,6 +82,11 @@ func (l *rudpListener) Accept() (conn net.Conn, err error) { l.ln = nil return nil, connector.NewAcceptError(err) } + + if pc, ok := conn.(net.PacketConn); ok { + conn = metrics.WrapUDPConn(l.options.Service, pc) + } + return } diff --git a/pkg/listener/ssh/listener.go b/pkg/listener/ssh/listener.go index da8fd44..afbea97 100644 --- a/pkg/listener/ssh/listener.go +++ b/pkg/listener/ssh/listener.go @@ -5,6 +5,7 @@ import ( "net" "time" + "github.com/go-gost/gost/pkg/common/metrics" ssh_util "github.com/go-gost/gost/pkg/internal/util/ssh" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -18,7 +19,6 @@ func init() { } type sshListener struct { - addr string net.Listener config *ssh.ServerConfig cqueue chan net.Conn @@ -34,7 +34,6 @@ func NewListener(opts ...listener.Option) listener.Listener { opt(&options) } return &sshListener{ - addr: options.Addr, logger: options.Logger, options: options, } @@ -45,11 +44,12 @@ func (l *sshListener) Init(md md.Metadata) (err error) { return } - ln, err := net.Listen("tcp", l.addr) + ln, err := net.Listen("tcp", l.options.Addr) if err != nil { return err } + ln = metrics.WrapListener(l.options.Service, ln) l.Listener = ln config := &ssh.ServerConfig{ diff --git a/pkg/listener/sshd/listener.go b/pkg/listener/sshd/listener.go index 57ae9b6..6163f6e 100644 --- a/pkg/listener/sshd/listener.go +++ b/pkg/listener/sshd/listener.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "github.com/go-gost/gost/pkg/common/metrics" ssh_util "github.com/go-gost/gost/pkg/internal/util/ssh" sshd_util "github.com/go-gost/gost/pkg/internal/util/sshd" "github.com/go-gost/gost/pkg/listener" @@ -27,7 +28,6 @@ func init() { } type sshdListener struct { - addr string net.Listener config *ssh.ServerConfig cqueue chan net.Conn @@ -43,7 +43,6 @@ func NewListener(opts ...listener.Option) listener.Listener { opt(&options) } return &sshdListener{ - addr: options.Addr, logger: options.Logger, options: options, } @@ -54,11 +53,12 @@ func (l *sshdListener) Init(md md.Metadata) (err error) { return } - ln, err := net.Listen("tcp", l.addr) + ln, err := net.Listen("tcp", l.options.Addr) if err != nil { return err } + ln = metrics.WrapListener(l.options.Service, ln) l.Listener = ln config := &ssh.ServerConfig{ diff --git a/pkg/listener/tcp/listener.go b/pkg/listener/tcp/listener.go index 9e5f43e..84cc164 100644 --- a/pkg/listener/tcp/listener.go +++ b/pkg/listener/tcp/listener.go @@ -3,6 +3,7 @@ package tcp import ( "net" + "github.com/go-gost/gost/pkg/common/metrics" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" md "github.com/go-gost/gost/pkg/metadata" @@ -36,15 +37,11 @@ func (l *tcpListener) Init(md md.Metadata) (err error) { return } - laddr, err := net.ResolveTCPAddr("tcp", l.options.Addr) - if err != nil { - return - } - ln, err := net.ListenTCP("tcp", laddr) + ln, err := net.Listen("tcp", l.options.Addr) if err != nil { return } + l.Listener = metrics.WrapListener(l.options.Service, ln) - l.Listener = ln return } diff --git a/pkg/listener/tls/listener.go b/pkg/listener/tls/listener.go index 7db56d8..d62c144 100644 --- a/pkg/listener/tls/listener.go +++ b/pkg/listener/tls/listener.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "net" + "github.com/go-gost/gost/pkg/common/metrics" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" md "github.com/go-gost/gost/pkg/metadata" @@ -41,6 +42,7 @@ func (l *tlsListener) Init(md md.Metadata) (err error) { if err != nil { return } + ln = metrics.WrapListener(l.options.Service, ln) l.Listener = tls.NewListener(ln, l.options.TLSConfig) diff --git a/pkg/listener/tls/mux/listener.go b/pkg/listener/tls/mux/listener.go index 99a2595..8346cba 100644 --- a/pkg/listener/tls/mux/listener.go +++ b/pkg/listener/tls/mux/listener.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "net" + "github.com/go-gost/gost/pkg/common/metrics" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" md "github.com/go-gost/gost/pkg/metadata" @@ -44,6 +45,8 @@ func (l *mtlsListener) Init(md md.Metadata) (err error) { if err != nil { return } + + ln = metrics.WrapListener(l.options.Service, ln) l.Listener = tls.NewListener(ln, l.options.TLSConfig) l.cqueue = make(chan net.Conn, l.md.backlog) diff --git a/pkg/listener/udp/listener.go b/pkg/listener/udp/listener.go index 51afa84..2a0ab86 100644 --- a/pkg/listener/udp/listener.go +++ b/pkg/listener/udp/listener.go @@ -3,6 +3,7 @@ package udp import ( "net" + "github.com/go-gost/gost/pkg/common/metrics" "github.com/go-gost/gost/pkg/common/util/udp" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -15,20 +16,20 @@ func init() { } type udpListener struct { - addr string - md metadata net.Listener - logger logger.Logger + logger logger.Logger + md metadata + options listener.Options } func NewListener(opts ...listener.Option) listener.Listener { - options := &listener.Options{} + options := listener.Options{} for _, opt := range opts { - opt(options) + opt(&options) } return &udpListener{ - addr: options.Addr, - logger: options.Logger, + logger: options.Logger, + options: options, } } @@ -37,7 +38,7 @@ func (l *udpListener) Init(md md.Metadata) (err error) { return } - laddr, err := net.ResolveUDPAddr("udp", l.addr) + laddr, err := net.ResolveUDPAddr("udp", l.options.Addr) if err != nil { return } @@ -47,7 +48,9 @@ func (l *udpListener) Init(md md.Metadata) (err error) { return } - l.Listener = udp.NewListener(conn, laddr, + l.Listener = udp.NewListener( + metrics.WrapPacketConn(l.options.Service, conn), + laddr, l.md.backlog, l.md.readQueueSize, l.md.readBufferSize, l.md.ttl, diff --git a/pkg/listener/ws/listener.go b/pkg/listener/ws/listener.go index a2ccbf3..532c241 100644 --- a/pkg/listener/ws/listener.go +++ b/pkg/listener/ws/listener.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httputil" + "github.com/go-gost/gost/pkg/common/metrics" ws_util "github.com/go-gost/gost/pkg/internal/util/ws" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -82,6 +83,8 @@ func (l *wsListener) Init(md md.Metadata) (err error) { if err != nil { return } + ln = metrics.WrapListener(l.options.Service, ln) + if l.tlsEnabled { ln = tls.NewListener(ln, l.options.TLSConfig) } diff --git a/pkg/listener/ws/mux/listener.go b/pkg/listener/ws/mux/listener.go index 2913134..aeebbea 100644 --- a/pkg/listener/ws/mux/listener.go +++ b/pkg/listener/ws/mux/listener.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httputil" + "github.com/go-gost/gost/pkg/common/metrics" ws_util "github.com/go-gost/gost/pkg/internal/util/ws" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" @@ -87,6 +88,8 @@ func (l *mwsListener) Init(md md.Metadata) (err error) { if err != nil { return } + ln = metrics.WrapListener(l.options.Service, ln) + if l.tlsEnabled { ln = tls.NewListener(ln, l.options.TLSConfig) } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 0000000..4cea365 --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,95 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + global = newMetrics() +) + +type Metrics struct { + services prometheus.Gauge + requests *prometheus.CounterVec + requestsInFlight *prometheus.GaugeVec + requestSeconds *prometheus.HistogramVec + requestInputBytes *prometheus.CounterVec + requestOutputBytes *prometheus.CounterVec +} + +func newMetrics() *Metrics { + m := &Metrics{ + services: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "gost_services", + Help: "Current number of services", + }), + + requests: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gost_service_requests_total", + Help: "Total number of requests", + }, + []string{"service"}), + + requestsInFlight: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "gost_service_requests_in_flight", + Help: "Current in-flight requests", + }, + []string{"service"}), + + requestSeconds: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gost_service_request_duration_seconds", + Help: "Distribution of request latencies", + Buckets: []float64{ + .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 15, 20, 30, + }, + }, + []string{"service"}), + requestInputBytes: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gost_service_request_transfer_input_bytes_total", + Help: "Total request input data transfer size in bytes", + }, + []string{"service"}), + requestOutputBytes: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gost_service_request_transfer_output_bytes_total", + Help: "Total request output data transfer size in bytes", + }, + []string{"service"}), + } + prometheus.MustRegister(m.services) + prometheus.MustRegister(m.requests) + prometheus.MustRegister(m.requestsInFlight) + prometheus.MustRegister(m.requestSeconds) + prometheus.MustRegister(m.requestInputBytes) + prometheus.MustRegister(m.requestOutputBytes) + return m +} + +func Services() prometheus.Gauge { + return global.services +} + +func Requests(service string) prometheus.Counter { + return global.requests.With(prometheus.Labels{"service": service}) +} + +func RequestsInFlight(service string) prometheus.Gauge { + return global.requestsInFlight.With(prometheus.Labels{"service": service}) +} + +func RequestSeconds(service string) prometheus.Observer { + return global.requestSeconds.With(prometheus.Labels{"service": service}) +} + +func RequestInputBytes(service string) prometheus.Counter { + return global.requestInputBytes.With(prometheus.Labels{"service": service}) +} + +func RequestOutputBytes(service string) prometheus.Counter { + return global.requestOutputBytes.With(prometheus.Labels{"service": service}) +} diff --git a/pkg/metrics/service.go b/pkg/metrics/service/service.go similarity index 98% rename from pkg/metrics/service.go rename to pkg/metrics/service/service.go index ae14efd..99588ca 100644 --- a/pkg/metrics/service.go +++ b/pkg/metrics/service/service.go @@ -1,4 +1,4 @@ -package metrics +package service import ( "net" diff --git a/pkg/resolver/resolver.go b/pkg/resolver/resolver.go index 5cc959d..ad0435c 100644 --- a/pkg/resolver/resolver.go +++ b/pkg/resolver/resolver.go @@ -7,7 +7,7 @@ import ( ) var ( - ErrInvalid = errors.New("resolver invalid") + ErrInvalid = errors.New("resolver is invalid") ) type Resolver interface { diff --git a/pkg/service/service.go b/pkg/service/service.go index c6b66a2..19cd7c0 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -9,6 +9,8 @@ import ( "github.com/go-gost/gost/pkg/handler" "github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/logger" + "github.com/go-gost/gost/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" ) type options struct { @@ -37,17 +39,19 @@ type Service interface { } type service struct { + name string listener listener.Listener handler handler.Handler options options } -func NewService(ln listener.Listener, h handler.Handler, opts ...Option) Service { +func NewService(name string, ln listener.Listener, h handler.Handler, opts ...Option) Service { var options options for _, opt := range opts { opt(&options) } return &service{ + name: name, listener: ln, handler: h, options: options, @@ -63,6 +67,9 @@ func (s *service) Close() error { } func (s *service) Serve() error { + metrics.Services().Inc() + defer metrics.Services().Dec() + var tempDelay time.Duration for { conn, e := s.listener.Accept() @@ -92,6 +99,17 @@ func (s *service) Serve() error { continue } - go s.handler.Handle(context.Background(), conn) + go func() { + metrics.Requests(s.name).Inc() + + metrics.RequestsInFlight(s.name).Inc() + defer metrics.RequestsInFlight(s.name).Dec() + + timer := prometheus.NewTimer( + metrics.RequestSeconds(s.name)) + defer timer.ObserveDuration() + + s.handler.Handle(context.Background(), conn) + }() } }