add metrics for service

This commit is contained in:
ginuerzh 2022-03-05 00:28:13 +08:00
parent 8d8785f534
commit e587b4df7c
34 changed files with 548 additions and 118 deletions

View File

@ -11,7 +11,7 @@ import (
"github.com/go-gost/gost/pkg/config" "github.com/go-gost/gost/pkg/config"
"github.com/go-gost/gost/pkg/metadata" "github.com/go-gost/gost/pkg/metadata"
"github.com/go-gost/gost/pkg/metrics" metrics "github.com/go-gost/gost/pkg/metrics/service"
"github.com/go-gost/gost/pkg/registry" "github.com/go-gost/gost/pkg/registry"
) )
@ -46,6 +46,12 @@ func buildConfigFromCmd(services, nodes stringList) (*config.Config, error) {
} }
} }
if v := os.Getenv("GOST_LOGGING_LEVEL"); v != "" {
cfg.Log = &config.LogConfig{
Level: v,
}
}
var chain *config.ChainConfig var chain *config.ChainConfig
if len(nodes) > 0 { if len(nodes) > 0 {
chain = &config.ChainConfig{ chain = &config.ChainConfig{

View File

@ -8,7 +8,7 @@ import (
"github.com/go-gost/gost/pkg/config" "github.com/go-gost/gost/pkg/config"
"github.com/go-gost/gost/pkg/config/parsing" "github.com/go-gost/gost/pkg/config/parsing"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
"github.com/go-gost/gost/pkg/metrics" metrics "github.com/go-gost/gost/pkg/metrics/service"
"github.com/go-gost/gost/pkg/registry" "github.com/go-gost/gost/pkg/registry"
"github.com/go-gost/gost/pkg/service" "github.com/go-gost/gost/pkg/service"
) )

View File

@ -10,7 +10,7 @@ import (
"github.com/go-gost/gost/pkg/config" "github.com/go-gost/gost/pkg/config"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
"github.com/go-gost/gost/pkg/metrics" metrics "github.com/go-gost/gost/pkg/metrics/service"
) )
var ( var (

222
pkg/common/metrics/conn.go Normal file
View File

@ -0,0 +1,222 @@
package metrics
import (
"errors"
"io"
"net"
"syscall"
"github.com/go-gost/gost/pkg/metrics"
)
var (
errUnsupport = errors.New("unsupported operation")
)
// ServerConn is a server side Conn with metrics supported.
type serverConn struct {
net.Conn
service string
}
func WrapConn(service string, c net.Conn) net.Conn {
return &serverConn{
service: service,
Conn: c,
}
}
func (c *serverConn) Read(b []byte) (n int, err error) {
n, err = c.Conn.Read(b)
metrics.RequestInputBytes(c.service).Add(float64(n))
return
}
func (c *serverConn) Write(b []byte) (n int, err error) {
n, err = c.Conn.Write(b)
metrics.RequestOutputBytes(c.service).Add(float64(n))
return
}
type packetConn struct {
net.PacketConn
service string
}
func WrapPacketConn(service string, pc net.PacketConn) net.PacketConn {
return &packetConn{
PacketConn: pc,
service: service,
}
}
func (c *packetConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
n, addr, err = c.PacketConn.ReadFrom(p)
metrics.RequestInputBytes(c.service).Add(float64(n))
return
}
func (c *packetConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
n, err = c.PacketConn.WriteTo(p, addr)
metrics.RequestOutputBytes(c.service).Add(float64(n))
return
}
type udpConn struct {
net.PacketConn
service string
}
func WrapUDPConn(service string, pc net.PacketConn) UDPConn {
return &udpConn{
PacketConn: pc,
service: service,
}
}
func (c *udpConn) RemoteAddr() net.Addr {
if nc, ok := c.PacketConn.(remoteAddr); ok {
return nc.RemoteAddr()
}
return nil
}
func (c *udpConn) SetReadBuffer(n int) error {
if nc, ok := c.PacketConn.(setBuffer); ok {
return nc.SetReadBuffer(n)
}
return errUnsupport
}
func (c *udpConn) SetWriteBuffer(n int) error {
if nc, ok := c.PacketConn.(setBuffer); ok {
return nc.SetWriteBuffer(n)
}
return errUnsupport
}
func (c *udpConn) Read(b []byte) (n int, err error) {
if nc, ok := c.PacketConn.(io.Reader); ok {
n, err = nc.Read(b)
metrics.RequestInputBytes(c.service).Add(float64(n))
return
}
err = errUnsupport
return
}
func (c *udpConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
n, addr, err = c.PacketConn.ReadFrom(p)
metrics.RequestInputBytes(c.service).Add(float64(n))
return
}
func (c *udpConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) {
if nc, ok := c.PacketConn.(readUDP); ok {
n, addr, err = nc.ReadFromUDP(b)
metrics.RequestInputBytes(c.service).Add(float64(n))
return
}
err = errUnsupport
return
}
func (c *udpConn) ReadMsgUDP(b, oob []byte) (n, oobn, flags int, addr *net.UDPAddr, err error) {
if nc, ok := c.PacketConn.(readUDP); ok {
n, oobn, flags, addr, err = nc.ReadMsgUDP(b, oob)
metrics.RequestInputBytes(c.service).Add(float64(n + oobn))
return
}
err = errUnsupport
return
}
func (c *udpConn) Write(b []byte) (n int, err error) {
if nc, ok := c.PacketConn.(io.Writer); ok {
n, err = nc.Write(b)
metrics.RequestOutputBytes(c.service).Add(float64(n))
return
}
err = errUnsupport
return
}
func (c *udpConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
n, err = c.PacketConn.WriteTo(p, addr)
metrics.RequestOutputBytes(c.service).Add(float64(n))
return
}
func (c *udpConn) WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error) {
if nc, ok := c.PacketConn.(writeUDP); ok {
n, err = nc.WriteToUDP(b, addr)
metrics.RequestOutputBytes(c.service).Add(float64(n))
return
}
err = errUnsupport
return
}
func (c *udpConn) WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error) {
if nc, ok := c.PacketConn.(writeUDP); ok {
n, oobn, err = nc.WriteMsgUDP(b, oob, addr)
metrics.RequestOutputBytes(c.service).Add(float64(n + oobn))
return
}
err = errUnsupport
return
}
func (c *udpConn) SyscallConn() (rc syscall.RawConn, err error) {
if nc, ok := c.PacketConn.(syscallConn); ok {
return nc.SyscallConn()
}
err = errUnsupport
return
}
func (c *udpConn) SetDSCP(n int) error {
if nc, ok := c.PacketConn.(setDSCP); ok {
return nc.SetDSCP(n)
}
return nil
}
type UDPConn interface {
net.PacketConn
io.Reader
io.Writer
readUDP
writeUDP
setBuffer
syscallConn
remoteAddr
}
type setBuffer interface {
SetReadBuffer(bytes int) error
SetWriteBuffer(bytes int) error
}
type readUDP interface {
ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
ReadMsgUDP(b, oob []byte) (n, oobn, flags int, addr *net.UDPAddr, err error)
}
type writeUDP interface {
WriteToUDP(b []byte, addr *net.UDPAddr) (int, error)
WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error)
}
type syscallConn interface {
SyscallConn() (syscall.RawConn, error)
}
type remoteAddr interface {
RemoteAddr() net.Addr
}
// tcpraw.TCPConn
type setDSCP interface {
SetDSCP(int) error
}

View File

@ -0,0 +1,23 @@
package metrics
import "net"
type listener struct {
service string
net.Listener
}
func WrapListener(service string, ln net.Listener) net.Listener {
return &listener{
service: service,
Listener: ln,
}
}
func (ln *listener) Accept() (net.Conn, error) {
c, err := ln.Listener.Accept()
if err != nil {
return nil, err
}
return WrapConn(ln.service, c), nil
}

View File

@ -59,6 +59,7 @@ func ParseService(cfg *config.ServiceConfig) (service.Service, error) {
listener.AuthOption(parseAuth(cfg.Listener.Auth)), listener.AuthOption(parseAuth(cfg.Listener.Auth)),
listener.TLSConfigOption(tlsConfig), listener.TLSConfigOption(tlsConfig),
listener.LoggerOption(listenerLogger), listener.LoggerOption(listenerLogger),
listener.ServiceOption(cfg.Name),
) )
if cfg.Listener.Metadata == nil { if cfg.Listener.Metadata == nil {
@ -119,7 +120,7 @@ func ParseService(cfg *config.ServiceConfig) (service.Service, error) {
return nil, err return nil, err
} }
s := service.NewService(ln, h, s := service.NewService(cfg.Name, ln, h,
service.AdmissionOption(registry.AdmissionRegistry().Get(cfg.Admission)), service.AdmissionOption(registry.AdmissionRegistry().Get(cfg.Admission)),
service.LoggerOption(serviceLogger), service.LoggerOption(serviceLogger),
) )

View File

@ -9,6 +9,7 @@ import (
"net/http" "net/http"
"strings" "strings"
"github.com/go-gost/gost/pkg/common/metrics"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
md "github.com/go-gost/gost/pkg/metadata" md "github.com/go-gost/gost/pkg/metadata"
@ -112,6 +113,7 @@ func (l *dnsListener) Accept() (conn net.Conn, err error) {
var ok bool var ok bool
select { select {
case conn = <-l.cqueue: case conn = <-l.cqueue:
conn = metrics.WrapConn(l.options.Service, conn)
case err, ok = <-l.errChan: case err, ok = <-l.errChan:
if !ok { if !ok {
err = listener.ErrClosed err = listener.ErrClosed

View File

@ -10,7 +10,7 @@ import (
// serverConn is a server side connection for UDP client peer, it implements net.Conn and net.PacketConn. // serverConn is a server side connection for UDP client peer, it implements net.Conn and net.PacketConn.
type serverConn struct { type serverConn struct {
net.PacketConn pc net.PacketConn
raddr net.Addr raddr net.Addr
rc chan []byte // data receive queue rc chan []byte // data receive queue
fresh int32 fresh int32
@ -34,11 +34,11 @@ func newServerConn(conn net.PacketConn, raddr net.Addr, cfg *serverConnConfig) *
cfg = &serverConnConfig{} cfg = &serverConnConfig{}
} }
c := &serverConn{ c := &serverConn{
PacketConn: conn, pc: conn,
raddr: raddr, raddr: raddr,
rc: make(chan []byte, cfg.qsize), rc: make(chan []byte, cfg.qsize),
closed: make(chan struct{}), closed: make(chan struct{}),
config: cfg, config: cfg,
} }
go c.ttlWait() go c.ttlWait()
return c return c
@ -54,11 +54,6 @@ func (c *serverConn) send(b []byte) error {
} }
func (c *serverConn) Read(b []byte) (n int, err error) { func (c *serverConn) Read(b []byte) (n int, err error) {
n, _, err = c.ReadFrom(b)
return
}
func (c *serverConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
select { select {
case bb := <-c.rc: case bb := <-c.rc:
n = copy(b, bb) n = copy(b, bb)
@ -68,13 +63,11 @@ func (c *serverConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
return return
} }
addr = c.raddr
return return
} }
func (c *serverConn) Write(b []byte) (n int, err error) { func (c *serverConn) Write(b []byte) (n int, err error) {
return c.WriteTo(b, c.raddr) return c.pc.WriteTo(b, c.raddr)
} }
func (c *serverConn) Close() error { func (c *serverConn) Close() error {
@ -93,10 +86,26 @@ func (c *serverConn) Close() error {
return nil return nil
} }
func (c *serverConn) LocalAddr() net.Addr {
return c.pc.LocalAddr()
}
func (c *serverConn) RemoteAddr() net.Addr { func (c *serverConn) RemoteAddr() net.Addr {
return c.raddr return c.raddr
} }
func (c *serverConn) SetDeadline(t time.Time) error {
return c.pc.SetDeadline(t)
}
func (c *serverConn) SetReadDeadline(t time.Time) error {
return c.pc.SetReadDeadline(t)
}
func (c *serverConn) SetWriteDeadline(t time.Time) error {
return c.pc.SetWriteDeadline(t)
}
func (c *serverConn) ttlWait() { func (c *serverConn) ttlWait() {
ticker := time.NewTicker(c.config.ttl) ticker := time.NewTicker(c.config.ttl)
defer ticker.Stop() defer ticker.Stop()

View File

@ -5,6 +5,7 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"github.com/go-gost/gost/pkg/common/metrics"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
md "github.com/go-gost/gost/pkg/metadata" md "github.com/go-gost/gost/pkg/metadata"
@ -17,23 +18,23 @@ func init() {
} }
type ftcpListener struct { type ftcpListener struct {
addr string
md metadata
conn net.PacketConn conn net.PacketConn
connChan chan net.Conn connChan chan net.Conn
errChan chan error errChan chan error
connPool connPool connPool connPool
logger logger.Logger logger logger.Logger
md metadata
options listener.Options
} }
func NewListener(opts ...listener.Option) listener.Listener { func NewListener(opts ...listener.Option) listener.Listener {
options := &listener.Options{} options := listener.Options{}
for _, opt := range opts { for _, opt := range opts {
opt(options) opt(&options)
} }
return &ftcpListener{ return &ftcpListener{
addr: options.Addr, logger: options.Logger,
logger: options.Logger, options: options,
} }
} }
@ -42,7 +43,7 @@ func (l *ftcpListener) Init(md md.Metadata) (err error) {
return return
} }
l.conn, err = tcpraw.Listen("tcp", addr) l.conn, err = tcpraw.Listen("tcp", l.options.Addr)
if err != nil { if err != nil {
return return
} }
@ -59,6 +60,7 @@ func (l *ftcpListener) Accept() (conn net.Conn, err error) {
var ok bool var ok bool
select { select {
case conn = <-l.connChan: case conn = <-l.connChan:
conn = metrics.WrapConn(l.options.Service, conn)
case err, ok = <-l.errChan: case err, ok = <-l.errChan:
if !ok { if !ok {
err = listener.ErrClosed err = listener.ErrClosed

View File

@ -3,6 +3,7 @@ package grpc
import ( import (
"net" "net"
"github.com/go-gost/gost/pkg/common/metrics"
pb "github.com/go-gost/gost/pkg/common/util/grpc/proto" pb "github.com/go-gost/gost/pkg/common/util/grpc/proto"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -42,15 +43,11 @@ func (l *grpcListener) Init(md md.Metadata) (err error) {
return return
} }
laddr, err := net.ResolveTCPAddr("tcp", l.options.Addr) ln, err := net.Listen("tcp", l.options.Addr)
if err != nil {
return
}
ln, err := net.ListenTCP("tcp", laddr)
if err != nil { if err != nil {
return return
} }
ln = metrics.WrapListener(l.options.Service, ln)
var opts []grpc.ServerOption var opts []grpc.ServerOption
if !l.md.insecure { if !l.md.insecure {

View File

@ -7,6 +7,7 @@ import (
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"github.com/go-gost/gost/pkg/common/metrics"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
md "github.com/go-gost/gost/pkg/metadata" md "github.com/go-gost/gost/pkg/metadata"
@ -68,6 +69,7 @@ func (l *h2Listener) Init(md md.Metadata) (err error) {
return err return err
} }
l.addr = ln.Addr() l.addr = ln.Addr()
ln = metrics.WrapListener(l.options.Service, ln)
if l.h2c { if l.h2c {
l.server.Handler = h2c.NewHandler( l.server.Handler = h2c.NewHandler(

View File

@ -5,7 +5,7 @@ import (
"net" "net"
"net/http" "net/http"
"github.com/go-gost/gost/pkg/common/util" "github.com/go-gost/gost/pkg/common/metrics"
http2_util "github.com/go-gost/gost/pkg/internal/util/http2" http2_util "github.com/go-gost/gost/pkg/internal/util/http2"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -58,11 +58,10 @@ func (l *http2Listener) Init(md md.Metadata) (err error) {
return err return err
} }
l.addr = ln.Addr() l.addr = ln.Addr()
ln = metrics.WrapListener(l.options.Service, ln)
ln = tls.NewListener( ln = tls.NewListener(
&util.TCPKeepAliveListener{ ln,
TCPListener: ln.(*net.TCPListener),
},
l.options.TLSConfig, l.options.TLSConfig,
) )

View File

@ -3,6 +3,7 @@ package http3
import ( import (
"net" "net"
"github.com/go-gost/gost/pkg/common/metrics"
pht_util "github.com/go-gost/gost/pkg/internal/util/pht" pht_util "github.com/go-gost/gost/pkg/internal/util/pht"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -64,7 +65,11 @@ func (l *http3Listener) Init(md md.Metadata) (err error) {
} }
func (l *http3Listener) Accept() (conn net.Conn, err error) { func (l *http3Listener) Accept() (conn net.Conn, err error) {
return l.server.Accept() conn, err = l.server.Accept()
if err != nil {
return
}
return metrics.WrapConn(l.options.Service, conn), nil
} }
func (l *http3Listener) Addr() net.Addr { func (l *http3Listener) Addr() net.Addr {

View File

@ -4,6 +4,7 @@ import (
"net" "net"
"time" "time"
"github.com/go-gost/gost/pkg/common/metrics"
kcp_util "github.com/go-gost/gost/pkg/common/util/kcp" kcp_util "github.com/go-gost/gost/pkg/common/util/kcp"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -19,22 +20,23 @@ func init() {
} }
type kcpListener struct { type kcpListener struct {
addr string conn net.PacketConn
ln *kcp.Listener ln *kcp.Listener
cqueue chan net.Conn cqueue chan net.Conn
errChan chan error errChan chan error
logger logger.Logger logger logger.Logger
md metadata md metadata
options listener.Options
} }
func NewListener(opts ...listener.Option) listener.Listener { func NewListener(opts ...listener.Option) listener.Listener {
options := &listener.Options{} options := listener.Options{}
for _, opt := range opts { for _, opt := range opts {
opt(options) opt(&options)
} }
return &kcpListener{ return &kcpListener{
addr: options.Addr, logger: options.Logger,
logger: options.Logger, options: options,
} }
} }
@ -46,37 +48,45 @@ func (l *kcpListener) Init(md md.Metadata) (err error) {
config := l.md.config config := l.md.config
config.Init() config.Init()
var ln *kcp.Listener var conn net.PacketConn
if config.TCP { if config.TCP {
var conn net.PacketConn conn, err = tcpraw.Listen("tcp", l.options.Addr)
conn, err = tcpraw.Listen("tcp", l.addr) } else {
var udpAddr *net.UDPAddr
udpAddr, err = net.ResolveUDPAddr("udp", l.options.Addr)
if err != nil { if err != nil {
return return
} }
ln, err = kcp.ServeConn( conn, err = net.ListenUDP("udp", udpAddr)
kcp_util.BlockCrypt(config.Key, config.Crypt, kcp_util.DefaultSalt), config.DataShard, config.ParityShard, conn)
} else {
ln, err = kcp.ListenWithOptions(l.addr,
kcp_util.BlockCrypt(config.Key, config.Crypt, kcp_util.DefaultSalt), config.DataShard, config.ParityShard)
} }
if err != nil { if err != nil {
return return
} }
conn = metrics.WrapUDPConn(l.options.Service, conn)
ln, err := kcp.ServeConn(
kcp_util.BlockCrypt(config.Key, config.Crypt, kcp_util.DefaultSalt),
config.DataShard, config.ParityShard, conn)
if err != nil {
return
}
if config.DSCP > 0 { if config.DSCP > 0 {
if err = ln.SetDSCP(config.DSCP); err != nil { if er := ln.SetDSCP(config.DSCP); er != nil {
l.logger.Warn(err) l.logger.Warn(er)
} }
} }
if err = ln.SetReadBuffer(config.SockBuf); err != nil { if er := ln.SetReadBuffer(config.SockBuf); er != nil {
l.logger.Warn(err) l.logger.Warn(er)
} }
if err = ln.SetWriteBuffer(config.SockBuf); err != nil { if er := ln.SetWriteBuffer(config.SockBuf); er != nil {
l.logger.Warn(err) l.logger.Warn(er)
} }
l.ln = ln l.ln = ln
l.conn = conn
l.cqueue = make(chan net.Conn, l.md.backlog) l.cqueue = make(chan net.Conn, l.md.backlog)
l.errChan = make(chan error, 1) l.errChan = make(chan error, 1)
@ -98,6 +108,7 @@ func (l *kcpListener) Accept() (conn net.Conn, err error) {
} }
func (l *kcpListener) Close() error { func (l *kcpListener) Close() error {
l.conn.Close()
return l.ln.Close() return l.ln.Close()
} }

View File

@ -3,6 +3,7 @@ package http
import ( import (
"net" "net"
"github.com/go-gost/gost/pkg/common/metrics"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
md "github.com/go-gost/gost/pkg/metadata" md "github.com/go-gost/gost/pkg/metadata"
@ -14,20 +15,20 @@ func init() {
} }
type obfsListener struct { type obfsListener struct {
addr string
md metadata
net.Listener net.Listener
logger logger.Logger logger logger.Logger
md metadata
options listener.Options
} }
func NewListener(opts ...listener.Option) listener.Listener { func NewListener(opts ...listener.Option) listener.Listener {
options := &listener.Options{} options := listener.Options{}
for _, opt := range opts { for _, opt := range opts {
opt(options) opt(&options)
} }
return &obfsListener{ return &obfsListener{
addr: options.Addr, logger: options.Logger,
logger: options.Logger, options: options,
} }
} }
@ -36,14 +37,11 @@ func (l *obfsListener) Init(md md.Metadata) (err error) {
return return
} }
laddr, err := net.ResolveTCPAddr("tcp", l.addr) ln, err := net.Listen("tcp", l.options.Addr)
if err != nil {
return
}
ln, err := net.ListenTCP("tcp", laddr)
if err != nil { if err != nil {
return return
} }
ln = metrics.WrapListener(l.options.Service, ln)
l.Listener = ln l.Listener = ln
return return

View File

@ -3,6 +3,7 @@ package tls
import ( import (
"net" "net"
"github.com/go-gost/gost/pkg/common/metrics"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
md "github.com/go-gost/gost/pkg/metadata" md "github.com/go-gost/gost/pkg/metadata"
@ -14,20 +15,20 @@ func init() {
} }
type obfsListener struct { type obfsListener struct {
addr string
md metadata
net.Listener net.Listener
logger logger.Logger logger logger.Logger
md metadata
options listener.Options
} }
func NewListener(opts ...listener.Option) listener.Listener { func NewListener(opts ...listener.Option) listener.Listener {
options := &listener.Options{} options := listener.Options{}
for _, opt := range opts { for _, opt := range opts {
opt(options) opt(&options)
} }
return &obfsListener{ return &obfsListener{
addr: options.Addr, logger: options.Logger,
logger: options.Logger, options: options,
} }
} }
@ -36,14 +37,11 @@ func (l *obfsListener) Init(md md.Metadata) (err error) {
return return
} }
laddr, err := net.ResolveTCPAddr("tcp", l.addr) ln, err := net.Listen("tcp", l.options.Addr)
if err != nil {
return
}
ln, err := net.ListenTCP("tcp", laddr)
if err != nil { if err != nil {
return return
} }
ln = metrics.WrapListener(l.options.Service, ln)
l.Listener = ln l.Listener = ln
return return

View File

@ -16,6 +16,7 @@ type Options struct {
TLSConfig *tls.Config TLSConfig *tls.Config
Chain chain.Chainer Chain chain.Chainer
Logger logger.Logger Logger logger.Logger
Service string
} }
type Option func(opts *Options) type Option func(opts *Options)
@ -55,3 +56,9 @@ func LoggerOption(logger logger.Logger) Option {
opts.Logger = logger opts.Logger = logger
} }
} }
func ServiceOption(service string) Option {
return func(opts *Options) {
opts.Service = service
}
}

View File

@ -5,6 +5,7 @@ package pht
import ( import (
"net" "net"
"github.com/go-gost/gost/pkg/common/metrics"
pht_util "github.com/go-gost/gost/pkg/internal/util/pht" pht_util "github.com/go-gost/gost/pkg/internal/util/pht"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -78,7 +79,12 @@ func (l *phtListener) Init(md md.Metadata) (err error) {
} }
func (l *phtListener) Accept() (conn net.Conn, err error) { func (l *phtListener) Accept() (conn net.Conn, err error) {
return l.server.Accept() conn, err = l.server.Accept()
if err != nil {
return
}
conn = metrics.WrapConn(l.options.Service, conn)
return
} }
func (l *phtListener) Addr() net.Addr { func (l *phtListener) Addr() net.Addr {

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"net" "net"
"github.com/go-gost/gost/pkg/common/metrics"
quic_util "github.com/go-gost/gost/pkg/internal/util/quic" quic_util "github.com/go-gost/gost/pkg/internal/util/quic"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -52,6 +53,7 @@ func (l *quicListener) Init(md md.Metadata) (err error) {
} }
var conn net.PacketConn = uc var conn net.PacketConn = uc
// conn = metrics.WrapPacketConn(l.options.Service, conn)
if l.md.cipherKey != nil { if l.md.cipherKey != nil {
conn = quic_util.CipherPacketConn(uc, l.md.cipherKey) conn = quic_util.CipherPacketConn(uc, l.md.cipherKey)
@ -88,6 +90,7 @@ func (l *quicListener) Accept() (conn net.Conn, err error) {
var ok bool var ok bool
select { select {
case conn = <-l.cqueue: case conn = <-l.cqueue:
conn = metrics.WrapConn(l.options.Service, conn)
case err, ok = <-l.errChan: case err, ok = <-l.errChan:
if !ok { if !ok {
err = listener.ErrClosed err = listener.ErrClosed

View File

@ -14,20 +14,20 @@ func init() {
} }
type redirectListener struct { type redirectListener struct {
addr string ln *net.UDPConn
ln *net.UDPConn logger logger.Logger
logger logger.Logger md metadata
md metadata options listener.Options
} }
func NewListener(opts ...listener.Option) listener.Listener { func NewListener(opts ...listener.Option) listener.Listener {
options := &listener.Options{} options := listener.Options{}
for _, opt := range opts { for _, opt := range opts {
opt(options) opt(&options)
} }
return &redirectListener{ return &redirectListener{
addr: options.Addr, logger: options.Logger,
logger: options.Logger, options: options,
} }
} }
@ -36,7 +36,7 @@ func (l *redirectListener) Init(md md.Metadata) (err error) {
return return
} }
laddr, err := net.ResolveUDPAddr("udp", l.addr) laddr, err := net.ResolveUDPAddr("udp", l.options.Addr)
if err != nil { if err != nil {
return return
} }
@ -51,7 +51,12 @@ func (l *redirectListener) Init(md md.Metadata) (err error) {
} }
func (l *redirectListener) Accept() (conn net.Conn, err error) { func (l *redirectListener) Accept() (conn net.Conn, err error) {
return l.accept() conn, err = l.accept()
if err != nil {
return
}
// conn = metrics.WrapConn(l.options.Service, conn)
return
} }
func (l *redirectListener) Addr() net.Addr { func (l *redirectListener) Addr() net.Addr {

View File

@ -5,6 +5,7 @@ import (
"net" "net"
"github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/chain"
"github.com/go-gost/gost/pkg/common/metrics"
"github.com/go-gost/gost/pkg/connector" "github.com/go-gost/gost/pkg/connector"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -64,12 +65,14 @@ func (l *rtcpListener) Accept() (conn net.Conn, err error) {
} }
if l.ln == nil { if l.ln == nil {
l.ln, err = l.router.Bind(context.Background(), "tcp", l.laddr.String(), l.ln, err = l.router.Bind(
context.Background(), "tcp", l.laddr.String(),
connector.MuxBindOption(true), connector.MuxBindOption(true),
) )
if err != nil { if err != nil {
return nil, connector.NewAcceptError(err) return nil, connector.NewAcceptError(err)
} }
l.ln = metrics.WrapListener(l.options.Service, l.ln)
} }
conn, err = l.ln.Accept() conn, err = l.ln.Accept()
if err != nil { if err != nil {

View File

@ -5,6 +5,7 @@ import (
"net" "net"
"github.com/go-gost/gost/pkg/chain" "github.com/go-gost/gost/pkg/chain"
"github.com/go-gost/gost/pkg/common/metrics"
"github.com/go-gost/gost/pkg/connector" "github.com/go-gost/gost/pkg/connector"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -17,7 +18,7 @@ func init() {
} }
type rudpListener struct { type rudpListener struct {
laddr *net.UDPAddr laddr net.Addr
ln net.Listener ln net.Listener
router *chain.Router router *chain.Router
closed chan struct{} closed chan struct{}
@ -64,7 +65,8 @@ func (l *rudpListener) Accept() (conn net.Conn, err error) {
} }
if l.ln == nil { if l.ln == nil {
l.ln, err = l.router.Bind(context.Background(), "udp", l.laddr.String(), l.ln, err = l.router.Bind(
context.Background(), "udp", l.laddr.String(),
connector.BacklogBindOption(l.md.backlog), connector.BacklogBindOption(l.md.backlog),
connector.UDPConnTTLBindOption(l.md.ttl), connector.UDPConnTTLBindOption(l.md.ttl),
connector.UDPDataBufferSizeBindOption(l.md.readBufferSize), connector.UDPDataBufferSizeBindOption(l.md.readBufferSize),
@ -80,6 +82,11 @@ func (l *rudpListener) Accept() (conn net.Conn, err error) {
l.ln = nil l.ln = nil
return nil, connector.NewAcceptError(err) return nil, connector.NewAcceptError(err)
} }
if pc, ok := conn.(net.PacketConn); ok {
conn = metrics.WrapUDPConn(l.options.Service, pc)
}
return return
} }

View File

@ -5,6 +5,7 @@ import (
"net" "net"
"time" "time"
"github.com/go-gost/gost/pkg/common/metrics"
ssh_util "github.com/go-gost/gost/pkg/internal/util/ssh" ssh_util "github.com/go-gost/gost/pkg/internal/util/ssh"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -18,7 +19,6 @@ func init() {
} }
type sshListener struct { type sshListener struct {
addr string
net.Listener net.Listener
config *ssh.ServerConfig config *ssh.ServerConfig
cqueue chan net.Conn cqueue chan net.Conn
@ -34,7 +34,6 @@ func NewListener(opts ...listener.Option) listener.Listener {
opt(&options) opt(&options)
} }
return &sshListener{ return &sshListener{
addr: options.Addr,
logger: options.Logger, logger: options.Logger,
options: options, options: options,
} }
@ -45,11 +44,12 @@ func (l *sshListener) Init(md md.Metadata) (err error) {
return return
} }
ln, err := net.Listen("tcp", l.addr) ln, err := net.Listen("tcp", l.options.Addr)
if err != nil { if err != nil {
return err return err
} }
ln = metrics.WrapListener(l.options.Service, ln)
l.Listener = ln l.Listener = ln
config := &ssh.ServerConfig{ config := &ssh.ServerConfig{

View File

@ -7,6 +7,7 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/go-gost/gost/pkg/common/metrics"
ssh_util "github.com/go-gost/gost/pkg/internal/util/ssh" ssh_util "github.com/go-gost/gost/pkg/internal/util/ssh"
sshd_util "github.com/go-gost/gost/pkg/internal/util/sshd" sshd_util "github.com/go-gost/gost/pkg/internal/util/sshd"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
@ -27,7 +28,6 @@ func init() {
} }
type sshdListener struct { type sshdListener struct {
addr string
net.Listener net.Listener
config *ssh.ServerConfig config *ssh.ServerConfig
cqueue chan net.Conn cqueue chan net.Conn
@ -43,7 +43,6 @@ func NewListener(opts ...listener.Option) listener.Listener {
opt(&options) opt(&options)
} }
return &sshdListener{ return &sshdListener{
addr: options.Addr,
logger: options.Logger, logger: options.Logger,
options: options, options: options,
} }
@ -54,11 +53,12 @@ func (l *sshdListener) Init(md md.Metadata) (err error) {
return return
} }
ln, err := net.Listen("tcp", l.addr) ln, err := net.Listen("tcp", l.options.Addr)
if err != nil { if err != nil {
return err return err
} }
ln = metrics.WrapListener(l.options.Service, ln)
l.Listener = ln l.Listener = ln
config := &ssh.ServerConfig{ config := &ssh.ServerConfig{

View File

@ -3,6 +3,7 @@ package tcp
import ( import (
"net" "net"
"github.com/go-gost/gost/pkg/common/metrics"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
md "github.com/go-gost/gost/pkg/metadata" md "github.com/go-gost/gost/pkg/metadata"
@ -36,15 +37,11 @@ func (l *tcpListener) Init(md md.Metadata) (err error) {
return return
} }
laddr, err := net.ResolveTCPAddr("tcp", l.options.Addr) ln, err := net.Listen("tcp", l.options.Addr)
if err != nil {
return
}
ln, err := net.ListenTCP("tcp", laddr)
if err != nil { if err != nil {
return return
} }
l.Listener = metrics.WrapListener(l.options.Service, ln)
l.Listener = ln
return return
} }

View File

@ -4,6 +4,7 @@ import (
"crypto/tls" "crypto/tls"
"net" "net"
"github.com/go-gost/gost/pkg/common/metrics"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
md "github.com/go-gost/gost/pkg/metadata" md "github.com/go-gost/gost/pkg/metadata"
@ -41,6 +42,7 @@ func (l *tlsListener) Init(md md.Metadata) (err error) {
if err != nil { if err != nil {
return return
} }
ln = metrics.WrapListener(l.options.Service, ln)
l.Listener = tls.NewListener(ln, l.options.TLSConfig) l.Listener = tls.NewListener(ln, l.options.TLSConfig)

View File

@ -4,6 +4,7 @@ import (
"crypto/tls" "crypto/tls"
"net" "net"
"github.com/go-gost/gost/pkg/common/metrics"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
md "github.com/go-gost/gost/pkg/metadata" md "github.com/go-gost/gost/pkg/metadata"
@ -44,6 +45,8 @@ func (l *mtlsListener) Init(md md.Metadata) (err error) {
if err != nil { if err != nil {
return return
} }
ln = metrics.WrapListener(l.options.Service, ln)
l.Listener = tls.NewListener(ln, l.options.TLSConfig) l.Listener = tls.NewListener(ln, l.options.TLSConfig)
l.cqueue = make(chan net.Conn, l.md.backlog) l.cqueue = make(chan net.Conn, l.md.backlog)

View File

@ -3,6 +3,7 @@ package udp
import ( import (
"net" "net"
"github.com/go-gost/gost/pkg/common/metrics"
"github.com/go-gost/gost/pkg/common/util/udp" "github.com/go-gost/gost/pkg/common/util/udp"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -15,20 +16,20 @@ func init() {
} }
type udpListener struct { type udpListener struct {
addr string
md metadata
net.Listener net.Listener
logger logger.Logger logger logger.Logger
md metadata
options listener.Options
} }
func NewListener(opts ...listener.Option) listener.Listener { func NewListener(opts ...listener.Option) listener.Listener {
options := &listener.Options{} options := listener.Options{}
for _, opt := range opts { for _, opt := range opts {
opt(options) opt(&options)
} }
return &udpListener{ return &udpListener{
addr: options.Addr, logger: options.Logger,
logger: options.Logger, options: options,
} }
} }
@ -37,7 +38,7 @@ func (l *udpListener) Init(md md.Metadata) (err error) {
return return
} }
laddr, err := net.ResolveUDPAddr("udp", l.addr) laddr, err := net.ResolveUDPAddr("udp", l.options.Addr)
if err != nil { if err != nil {
return return
} }
@ -47,7 +48,9 @@ func (l *udpListener) Init(md md.Metadata) (err error) {
return return
} }
l.Listener = udp.NewListener(conn, laddr, l.Listener = udp.NewListener(
metrics.WrapPacketConn(l.options.Service, conn),
laddr,
l.md.backlog, l.md.backlog,
l.md.readQueueSize, l.md.readBufferSize, l.md.readQueueSize, l.md.readBufferSize,
l.md.ttl, l.md.ttl,

View File

@ -6,6 +6,7 @@ import (
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"github.com/go-gost/gost/pkg/common/metrics"
ws_util "github.com/go-gost/gost/pkg/internal/util/ws" ws_util "github.com/go-gost/gost/pkg/internal/util/ws"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -82,6 +83,8 @@ func (l *wsListener) Init(md md.Metadata) (err error) {
if err != nil { if err != nil {
return return
} }
ln = metrics.WrapListener(l.options.Service, ln)
if l.tlsEnabled { if l.tlsEnabled {
ln = tls.NewListener(ln, l.options.TLSConfig) ln = tls.NewListener(ln, l.options.TLSConfig)
} }

View File

@ -6,6 +6,7 @@ import (
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
"github.com/go-gost/gost/pkg/common/metrics"
ws_util "github.com/go-gost/gost/pkg/internal/util/ws" ws_util "github.com/go-gost/gost/pkg/internal/util/ws"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
@ -87,6 +88,8 @@ func (l *mwsListener) Init(md md.Metadata) (err error) {
if err != nil { if err != nil {
return return
} }
ln = metrics.WrapListener(l.options.Service, ln)
if l.tlsEnabled { if l.tlsEnabled {
ln = tls.NewListener(ln, l.options.TLSConfig) ln = tls.NewListener(ln, l.options.TLSConfig)
} }

95
pkg/metrics/metrics.go Normal file
View File

@ -0,0 +1,95 @@
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
global = newMetrics()
)
type Metrics struct {
services prometheus.Gauge
requests *prometheus.CounterVec
requestsInFlight *prometheus.GaugeVec
requestSeconds *prometheus.HistogramVec
requestInputBytes *prometheus.CounterVec
requestOutputBytes *prometheus.CounterVec
}
func newMetrics() *Metrics {
m := &Metrics{
services: prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "gost_services",
Help: "Current number of services",
}),
requests: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gost_service_requests_total",
Help: "Total number of requests",
},
[]string{"service"}),
requestsInFlight: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "gost_service_requests_in_flight",
Help: "Current in-flight requests",
},
[]string{"service"}),
requestSeconds: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gost_service_request_duration_seconds",
Help: "Distribution of request latencies",
Buckets: []float64{
.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 15, 20, 30,
},
},
[]string{"service"}),
requestInputBytes: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gost_service_request_transfer_input_bytes_total",
Help: "Total request input data transfer size in bytes",
},
[]string{"service"}),
requestOutputBytes: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gost_service_request_transfer_output_bytes_total",
Help: "Total request output data transfer size in bytes",
},
[]string{"service"}),
}
prometheus.MustRegister(m.services)
prometheus.MustRegister(m.requests)
prometheus.MustRegister(m.requestsInFlight)
prometheus.MustRegister(m.requestSeconds)
prometheus.MustRegister(m.requestInputBytes)
prometheus.MustRegister(m.requestOutputBytes)
return m
}
func Services() prometheus.Gauge {
return global.services
}
func Requests(service string) prometheus.Counter {
return global.requests.With(prometheus.Labels{"service": service})
}
func RequestsInFlight(service string) prometheus.Gauge {
return global.requestsInFlight.With(prometheus.Labels{"service": service})
}
func RequestSeconds(service string) prometheus.Observer {
return global.requestSeconds.With(prometheus.Labels{"service": service})
}
func RequestInputBytes(service string) prometheus.Counter {
return global.requestInputBytes.With(prometheus.Labels{"service": service})
}
func RequestOutputBytes(service string) prometheus.Counter {
return global.requestOutputBytes.With(prometheus.Labels{"service": service})
}

View File

@ -1,4 +1,4 @@
package metrics package service
import ( import (
"net" "net"

View File

@ -7,7 +7,7 @@ import (
) )
var ( var (
ErrInvalid = errors.New("resolver invalid") ErrInvalid = errors.New("resolver is invalid")
) )
type Resolver interface { type Resolver interface {

View File

@ -9,6 +9,8 @@ import (
"github.com/go-gost/gost/pkg/handler" "github.com/go-gost/gost/pkg/handler"
"github.com/go-gost/gost/pkg/listener" "github.com/go-gost/gost/pkg/listener"
"github.com/go-gost/gost/pkg/logger" "github.com/go-gost/gost/pkg/logger"
"github.com/go-gost/gost/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
) )
type options struct { type options struct {
@ -37,17 +39,19 @@ type Service interface {
} }
type service struct { type service struct {
name string
listener listener.Listener listener listener.Listener
handler handler.Handler handler handler.Handler
options options options options
} }
func NewService(ln listener.Listener, h handler.Handler, opts ...Option) Service { func NewService(name string, ln listener.Listener, h handler.Handler, opts ...Option) Service {
var options options var options options
for _, opt := range opts { for _, opt := range opts {
opt(&options) opt(&options)
} }
return &service{ return &service{
name: name,
listener: ln, listener: ln,
handler: h, handler: h,
options: options, options: options,
@ -63,6 +67,9 @@ func (s *service) Close() error {
} }
func (s *service) Serve() error { func (s *service) Serve() error {
metrics.Services().Inc()
defer metrics.Services().Dec()
var tempDelay time.Duration var tempDelay time.Duration
for { for {
conn, e := s.listener.Accept() conn, e := s.listener.Accept()
@ -92,6 +99,17 @@ func (s *service) Serve() error {
continue continue
} }
go s.handler.Handle(context.Background(), conn) go func() {
metrics.Requests(s.name).Inc()
metrics.RequestsInFlight(s.name).Inc()
defer metrics.RequestsInFlight(s.name).Dec()
timer := prometheus.NewTimer(
metrics.RequestSeconds(s.name))
defer timer.ObserveDuration()
s.handler.Handle(context.Background(), conn)
}()
} }
} }