diff --git a/go.mod b/go.mod index 7de6fff..370fa72 100644 --- a/go.mod +++ b/go.mod @@ -2,15 +2,13 @@ module github.com/go-gost/core go 1.18 -replace github.com/templexxx/cpu v0.0.7 => github.com/templexxx/cpu v0.0.10-0.20211111114238-98168dcec14a - require ( github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d github.com/go-gost/gosocks4 v0.0.1 github.com/go-gost/gosocks5 v0.3.1-0.20211109033403-d894d75b7f09 - github.com/go-gost/metrics v0.0.0-20220314135054-2263ae431a5f github.com/gobwas/glob v0.2.3 github.com/miekg/dns v1.1.45 + github.com/prometheus/client_golang v1.12.1 github.com/sirupsen/logrus v1.8.1 github.com/xtaci/smux v1.5.16 golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 @@ -22,7 +20,6 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.6 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect - github.com/prometheus/client_golang v1.12.1 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect diff --git a/go.sum b/go.sum index 5b28dd7..95992de 100644 --- a/go.sum +++ b/go.sum @@ -67,8 +67,6 @@ github.com/go-gost/gosocks4 v0.0.1 h1:+k1sec8HlELuQV7rWftIkmy8UijzUt2I6t+iMPlGB2 github.com/go-gost/gosocks4 v0.0.1/go.mod h1:3B6L47HbU/qugDg4JnoFPHgJXE43Inz8Bah1QaN9qCc= github.com/go-gost/gosocks5 v0.3.1-0.20211109033403-d894d75b7f09 h1:A95M6UWcfZgOuJkQ7QLfG0Hs5peWIUSysCDNz4pfe04= github.com/go-gost/gosocks5 v0.3.1-0.20211109033403-d894d75b7f09/go.mod h1:1G6I7HP7VFVxveGkoK8mnprnJqSqJjdcASKsdUn4Pp4= -github.com/go-gost/metrics v0.0.0-20220314135054-2263ae431a5f h1:gNquUvOvPXUpq4Xk7ed7motbVN5t0HMqImf96k+pzlU= -github.com/go-gost/metrics v0.0.0-20220314135054-2263ae431a5f/go.mod h1:Ac2Pigx5GMJEznkP9wLdBJ36+rYwWiJPqWk7lrg3FKg= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= diff --git a/listener/rtcp/listener.go b/listener/rtcp/listener.go index 184024b..c7e3847 100644 --- a/listener/rtcp/listener.go +++ b/listener/rtcp/listener.go @@ -9,8 +9,8 @@ import ( "github.com/go-gost/core/listener" "github.com/go-gost/core/logger" md "github.com/go-gost/core/metadata" + metrics "github.com/go-gost/core/metrics/wrapper" "github.com/go-gost/core/registry" - metrics "github.com/go-gost/metrics/wrapper" ) func init() { diff --git a/listener/rudp/listener.go b/listener/rudp/listener.go index d4ad230..0a6a598 100644 --- a/listener/rudp/listener.go +++ b/listener/rudp/listener.go @@ -9,8 +9,8 @@ import ( "github.com/go-gost/core/listener" "github.com/go-gost/core/logger" md "github.com/go-gost/core/metadata" + metrics "github.com/go-gost/core/metrics/wrapper" "github.com/go-gost/core/registry" - metrics "github.com/go-gost/metrics/wrapper" ) func init() { diff --git a/listener/tcp/listener.go b/listener/tcp/listener.go index 5dcacc9..1396349 100644 --- a/listener/tcp/listener.go +++ b/listener/tcp/listener.go @@ -6,8 +6,8 @@ import ( "github.com/go-gost/core/listener" "github.com/go-gost/core/logger" md "github.com/go-gost/core/metadata" + metrics "github.com/go-gost/core/metrics/wrapper" "github.com/go-gost/core/registry" - metrics "github.com/go-gost/metrics/wrapper" ) func init() { diff --git a/listener/tls/listener.go b/listener/tls/listener.go index e19a92e..bfe22f3 100644 --- a/listener/tls/listener.go +++ b/listener/tls/listener.go @@ -8,8 +8,8 @@ import ( "github.com/go-gost/core/listener" "github.com/go-gost/core/logger" md "github.com/go-gost/core/metadata" + metrics "github.com/go-gost/core/metrics/wrapper" "github.com/go-gost/core/registry" - metrics "github.com/go-gost/metrics/wrapper" ) func init() { diff --git a/listener/udp/listener.go b/listener/udp/listener.go index a5169bd..c516b2d 100644 --- a/listener/udp/listener.go +++ b/listener/udp/listener.go @@ -7,8 +7,8 @@ import ( "github.com/go-gost/core/listener" "github.com/go-gost/core/logger" md "github.com/go-gost/core/metadata" + metrics "github.com/go-gost/core/metrics/wrapper" "github.com/go-gost/core/registry" - metrics "github.com/go-gost/metrics/wrapper" ) func init() { diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 0000000..85d5ee0 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,182 @@ +package metrics + +import ( + "os" + + "github.com/prometheus/client_golang/prometheus" +) + +var ( + metrics *Metrics +) + +func SetGlobal(m *Metrics) { + metrics = m +} + +type Gauge interface { + Inc() + Dec() + Add(float64) + Set(float64) +} + +type Counter interface { + Inc() + Add(float64) +} + +type Observer interface { + Observe(float64) +} + +type Metrics struct { + host string + services *prometheus.GaugeVec + requests *prometheus.CounterVec + requestsInFlight *prometheus.GaugeVec + requestSeconds *prometheus.HistogramVec + inputBytes *prometheus.CounterVec + outputBytes *prometheus.CounterVec + handlerErrors *prometheus.CounterVec +} + +func NewMetrics() *Metrics { + host, _ := os.Hostname() + m := &Metrics{ + host: host, + services: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "gost_services", + Help: "Current number of services", + }, + []string{"host"}), + + requests: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gost_service_requests_total", + Help: "Total number of requests", + }, + []string{"host", "service"}), + + requestsInFlight: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "gost_service_requests_in_flight", + Help: "Current in-flight requests", + }, + []string{"host", "service"}), + + requestSeconds: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gost_service_request_duration_seconds", + Help: "Distribution of request latencies", + Buckets: []float64{ + .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 15, 30, 60, + }, + }, + []string{"host", "service"}), + inputBytes: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gost_service_transfer_input_bytes_total", + Help: "Total service input data transfer size in bytes", + }, + []string{"host", "service"}), + outputBytes: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gost_service_transfer_output_bytes_total", + Help: "Total service output data transfer size in bytes", + }, + []string{"host", "service"}), + handlerErrors: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "gost_service_handler_errors_total", + Help: "Total service handler errors", + }, + []string{"host", "service"}), + } + prometheus.MustRegister(m.services) + prometheus.MustRegister(m.requests) + prometheus.MustRegister(m.requestsInFlight) + prometheus.MustRegister(m.requestSeconds) + prometheus.MustRegister(m.inputBytes) + prometheus.MustRegister(m.outputBytes) + prometheus.MustRegister(m.handlerErrors) + return m +} + +func Services() Gauge { + if metrics == nil || metrics.services == nil { + return nilGauge + } + return metrics.services. + With(prometheus.Labels{ + "host": metrics.host, + }) +} + +func Requests(service string) Counter { + if metrics == nil || metrics.requests == nil { + return nilCounter + } + + return metrics.requests. + With(prometheus.Labels{ + "host": metrics.host, + "service": service, + }) +} + +func RequestsInFlight(service string) Gauge { + if metrics == nil || metrics.requestsInFlight == nil { + return nilGauge + } + return metrics.requestsInFlight. + With(prometheus.Labels{ + "host": metrics.host, + "service": service, + }) +} + +func RequestSeconds(service string) Observer { + if metrics == nil || metrics.requestSeconds == nil { + return nilObserver + } + return metrics.requestSeconds. + With(prometheus.Labels{ + "host": metrics.host, + "service": service, + }) +} + +func InputBytes(service string) Counter { + if metrics == nil || metrics.inputBytes == nil { + return nilCounter + } + return metrics.inputBytes. + With(prometheus.Labels{ + "host": metrics.host, + "service": service, + }) +} + +func OutputBytes(service string) Counter { + if metrics == nil || metrics.outputBytes == nil { + return nilCounter + } + return metrics.outputBytes. + With(prometheus.Labels{ + "host": metrics.host, + "service": service, + }) +} + +func HandlerErrors(service string) Counter { + if metrics == nil || metrics.handlerErrors == nil { + return nilCounter + } + return metrics.handlerErrors. + With(prometheus.Labels{ + "host": metrics.host, + "service": service, + }) +} diff --git a/metrics/nop.go b/metrics/nop.go new file mode 100644 index 0000000..823a47f --- /dev/null +++ b/metrics/nop.go @@ -0,0 +1,23 @@ +package metrics + +var ( + nilGauge = &nopGauge{} + nilCounter = &nopCounter{} + nilObserver = &nopObserver{} +) + +type nopGauge struct{} + +func (*nopGauge) Inc() {} +func (*nopGauge) Dec() {} +func (*nopGauge) Add(v float64) {} +func (*nopGauge) Set(v float64) {} + +type nopCounter struct{} + +func (*nopCounter) Inc() {} +func (*nopCounter) Add(v float64) {} + +type nopObserver struct{} + +func (*nopObserver) Observe(v float64) {} diff --git a/metrics/service/service.go b/metrics/service/service.go new file mode 100644 index 0000000..5eba0bb --- /dev/null +++ b/metrics/service/service.go @@ -0,0 +1,65 @@ +package service + +import ( + "net" + "net/http" + + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const ( + DefaultPath = "/metrics" +) + +type options struct { + path string +} + +type Option func(*options) + +func PathOption(path string) Option { + return func(o *options) { + o.path = path + } +} + +type Service struct { + s *http.Server + ln net.Listener +} + +func NewService(addr string, opts ...Option) (*Service, error) { + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + + var options options + for _, opt := range opts { + opt(&options) + } + if options.path == "" { + options.path = DefaultPath + } + + mux := http.NewServeMux() + mux.Handle(options.path, promhttp.Handler()) + return &Service{ + s: &http.Server{ + Handler: mux, + }, + ln: ln, + }, nil +} + +func (s *Service) Serve() error { + return s.s.Serve(s.ln) +} + +func (s *Service) Addr() net.Addr { + return s.ln.Addr() +} + +func (s *Service) Close() error { + return s.s.Close() +} diff --git a/metrics/wrapper/conn.go b/metrics/wrapper/conn.go new file mode 100644 index 0000000..ad7ae2f --- /dev/null +++ b/metrics/wrapper/conn.go @@ -0,0 +1,222 @@ +package wrapper + +import ( + "errors" + "io" + "net" + "syscall" + + "github.com/go-gost/core/metrics" +) + +var ( + errUnsupport = errors.New("unsupported operation") +) + +// ServerConn is a server side Conn with metrics supported. +type serverConn struct { + net.Conn + service string +} + +func WrapConn(service string, c net.Conn) net.Conn { + return &serverConn{ + service: service, + Conn: c, + } +} + +func (c *serverConn) Read(b []byte) (n int, err error) { + n, err = c.Conn.Read(b) + metrics.InputBytes(c.service).Add(float64(n)) + return +} + +func (c *serverConn) Write(b []byte) (n int, err error) { + n, err = c.Conn.Write(b) + metrics.OutputBytes(c.service).Add(float64(n)) + return +} + +type packetConn struct { + net.PacketConn + service string +} + +func WrapPacketConn(service string, pc net.PacketConn) net.PacketConn { + return &packetConn{ + PacketConn: pc, + service: service, + } +} + +func (c *packetConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + n, addr, err = c.PacketConn.ReadFrom(p) + metrics.InputBytes(c.service).Add(float64(n)) + return +} + +func (c *packetConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { + n, err = c.PacketConn.WriteTo(p, addr) + metrics.OutputBytes(c.service).Add(float64(n)) + return +} + +type udpConn struct { + net.PacketConn + service string +} + +func WrapUDPConn(service string, pc net.PacketConn) UDPConn { + return &udpConn{ + PacketConn: pc, + service: service, + } +} + +func (c *udpConn) RemoteAddr() net.Addr { + if nc, ok := c.PacketConn.(remoteAddr); ok { + return nc.RemoteAddr() + } + return nil +} + +func (c *udpConn) SetReadBuffer(n int) error { + if nc, ok := c.PacketConn.(setBuffer); ok { + return nc.SetReadBuffer(n) + } + return errUnsupport +} + +func (c *udpConn) SetWriteBuffer(n int) error { + if nc, ok := c.PacketConn.(setBuffer); ok { + return nc.SetWriteBuffer(n) + } + return errUnsupport +} + +func (c *udpConn) Read(b []byte) (n int, err error) { + if nc, ok := c.PacketConn.(io.Reader); ok { + n, err = nc.Read(b) + metrics.InputBytes(c.service).Add(float64(n)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + n, addr, err = c.PacketConn.ReadFrom(p) + metrics.InputBytes(c.service).Add(float64(n)) + return +} + +func (c *udpConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) { + if nc, ok := c.PacketConn.(readUDP); ok { + n, addr, err = nc.ReadFromUDP(b) + metrics.InputBytes(c.service).Add(float64(n)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) ReadMsgUDP(b, oob []byte) (n, oobn, flags int, addr *net.UDPAddr, err error) { + if nc, ok := c.PacketConn.(readUDP); ok { + n, oobn, flags, addr, err = nc.ReadMsgUDP(b, oob) + metrics.InputBytes(c.service).Add(float64(n + oobn)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) Write(b []byte) (n int, err error) { + if nc, ok := c.PacketConn.(io.Writer); ok { + n, err = nc.Write(b) + metrics.OutputBytes(c.service).Add(float64(n)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { + n, err = c.PacketConn.WriteTo(p, addr) + metrics.OutputBytes(c.service).Add(float64(n)) + return +} + +func (c *udpConn) WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error) { + if nc, ok := c.PacketConn.(writeUDP); ok { + n, err = nc.WriteToUDP(b, addr) + metrics.OutputBytes(c.service).Add(float64(n)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error) { + if nc, ok := c.PacketConn.(writeUDP); ok { + n, oobn, err = nc.WriteMsgUDP(b, oob, addr) + metrics.OutputBytes(c.service).Add(float64(n + oobn)) + return + } + err = errUnsupport + return +} + +func (c *udpConn) SyscallConn() (rc syscall.RawConn, err error) { + if nc, ok := c.PacketConn.(syscallConn); ok { + return nc.SyscallConn() + } + err = errUnsupport + return +} + +func (c *udpConn) SetDSCP(n int) error { + if nc, ok := c.PacketConn.(setDSCP); ok { + return nc.SetDSCP(n) + } + return nil +} + +type UDPConn interface { + net.PacketConn + io.Reader + io.Writer + readUDP + writeUDP + setBuffer + syscallConn + remoteAddr +} + +type setBuffer interface { + SetReadBuffer(bytes int) error + SetWriteBuffer(bytes int) error +} + +type readUDP interface { + ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) + ReadMsgUDP(b, oob []byte) (n, oobn, flags int, addr *net.UDPAddr, err error) +} + +type writeUDP interface { + WriteToUDP(b []byte, addr *net.UDPAddr) (int, error) + WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error) +} + +type syscallConn interface { + SyscallConn() (syscall.RawConn, error) +} + +type remoteAddr interface { + RemoteAddr() net.Addr +} + +// tcpraw.TCPConn +type setDSCP interface { + SetDSCP(int) error +} diff --git a/metrics/wrapper/listener.go b/metrics/wrapper/listener.go new file mode 100644 index 0000000..e79f522 --- /dev/null +++ b/metrics/wrapper/listener.go @@ -0,0 +1,23 @@ +package wrapper + +import "net" + +type listener struct { + service string + net.Listener +} + +func WrapListener(service string, ln net.Listener) net.Listener { + return &listener{ + service: service, + Listener: ln, + } +} + +func (ln *listener) Accept() (net.Conn, error) { + c, err := ln.Listener.Accept() + if err != nil { + return nil, err + } + return WrapConn(ln.service, c), nil +} diff --git a/service/service.go b/service/service.go index f5ff926..c9cc298 100644 --- a/service/service.go +++ b/service/service.go @@ -9,7 +9,7 @@ import ( "github.com/go-gost/core/handler" "github.com/go-gost/core/listener" "github.com/go-gost/core/logger" - "github.com/go-gost/metrics" + "github.com/go-gost/core/metrics" ) type options struct {