From c2f49e94443cbd88d5b7d55b5decf7c55ed24f1c Mon Sep 17 00:00:00 2001 From: ginuerzh Date: Tue, 5 Apr 2022 17:55:20 +0800 Subject: [PATCH] mv metrics to github.com/go-gost/x --- chain/route.go | 11 +- common/net/relay/relay.go | 126 --------------------- common/net/transport.go | 50 --------- metrics/metrics.go | 221 +++++-------------------------------- metrics/noop.go | 43 ++++++++ metrics/nop.go | 23 ---- metrics/service/service.go | 65 ----------- metrics/wrapper/conn.go | 96 ++++++++++++++-- service/service.go | 34 ++++-- 9 files changed, 190 insertions(+), 479 deletions(-) delete mode 100644 common/net/relay/relay.go delete mode 100644 common/net/transport.go create mode 100644 metrics/noop.go delete mode 100644 metrics/nop.go delete mode 100644 metrics/service/service.go diff --git a/chain/route.go b/chain/route.go index 9a4d503..c42f516 100644 --- a/chain/route.go +++ b/chain/route.go @@ -88,7 +88,10 @@ func (r *Route) connect(ctx context.Context) (conn net.Conn, err error) { defer func() { if err != nil && r.chain != nil { - metrics.ChainErrors(r.chain.name).Inc() + if v := metrics.GetCounter(metrics.MetricChainErrorsCounter, + metrics.Labels{"chain": r.chain.name}); v != nil { + v.Inc() + } } }() @@ -116,8 +119,10 @@ func (r *Route) connect(ctx context.Context) (conn net.Conn, err error) { } node.Marker.Reset() - metrics.ChainNodeConnectSeconds(r.chain.name, node.Name). - Observe(time.Since(start).Seconds()) + if v := metrics.GetObserver(metrics.MetricNodeConnectDurationObserver, + metrics.Labels{"chain": r.chain.name, "node": node.Name}); v != nil { + v.Observe(time.Since(start).Seconds()) + } preNode := node for _, node := range r.nodes[1:] { diff --git a/common/net/relay/relay.go b/common/net/relay/relay.go deleted file mode 100644 index af4cebe..0000000 --- a/common/net/relay/relay.go +++ /dev/null @@ -1,126 +0,0 @@ -package relay - -import ( - "net" - - "github.com/go-gost/core/bypass" - "github.com/go-gost/core/common/bufpool" - "github.com/go-gost/core/logger" -) - -type UDPRelay struct { - pc1 net.PacketConn - pc2 net.PacketConn - - bypass bypass.Bypass - bufferSize int - logger logger.Logger -} - -func NewUDPRelay(pc1, pc2 net.PacketConn) *UDPRelay { - return &UDPRelay{ - pc1: pc1, - pc2: pc2, - } -} - -func (r *UDPRelay) WithBypass(bp bypass.Bypass) *UDPRelay { - r.bypass = bp - return r -} - -func (r *UDPRelay) WithLogger(logger logger.Logger) *UDPRelay { - r.logger = logger - return r -} - -func (r *UDPRelay) SetBufferSize(n int) { - r.bufferSize = n -} - -func (r *UDPRelay) Run() (err error) { - bufSize := r.bufferSize - if bufSize <= 0 { - bufSize = 1500 - } - - errc := make(chan error, 2) - - go func() { - for { - err := func() error { - b := bufpool.Get(bufSize) - defer bufpool.Put(b) - - n, raddr, err := r.pc1.ReadFrom(*b) - if err != nil { - return err - } - - if r.bypass != nil && r.bypass.Contains(raddr.String()) { - if r.logger != nil { - r.logger.Warn("bypass: ", raddr) - } - return nil - } - - if _, err := r.pc2.WriteTo((*b)[:n], raddr); err != nil { - return err - } - - if r.logger != nil { - r.logger.Debugf("%s >>> %s data: %d", - r.pc2.LocalAddr(), raddr, n) - - } - - return nil - }() - - if err != nil { - errc <- err - return - } - } - }() - - go func() { - for { - err := func() error { - b := bufpool.Get(bufSize) - defer bufpool.Put(b) - - n, raddr, err := r.pc2.ReadFrom(*b) - if err != nil { - return err - } - - if r.bypass != nil && r.bypass.Contains(raddr.String()) { - if r.logger != nil { - r.logger.Warn("bypass: ", raddr) - } - return nil - } - - if _, err := r.pc1.WriteTo((*b)[:n], raddr); err != nil { - return err - } - - if r.logger != nil { - r.logger.Debugf("%s <<< %s data: %d", - r.pc2.LocalAddr(), raddr, n) - - } - - return nil - }() - - if err != nil { - errc <- err - return - } - } - }() - - return <-errc -} diff --git a/common/net/transport.go b/common/net/transport.go deleted file mode 100644 index 93674af..0000000 --- a/common/net/transport.go +++ /dev/null @@ -1,50 +0,0 @@ -package net - -import ( - "bufio" - "io" - "net" - - "github.com/go-gost/core/common/bufpool" -) - -func Transport(rw1, rw2 io.ReadWriter) error { - errc := make(chan error, 1) - go func() { - errc <- copyBuffer(rw1, rw2) - }() - - go func() { - errc <- copyBuffer(rw2, rw1) - }() - - err := <-errc - if err != nil && err == io.EOF { - err = nil - } - return err -} - -func copyBuffer(dst io.Writer, src io.Reader) error { - buf := bufpool.Get(4 * 1024) - defer bufpool.Put(buf) - - _, err := io.CopyBuffer(dst, src, *buf) - return err -} - -type bufferReaderConn struct { - net.Conn - br *bufio.Reader -} - -func NewBufferReaderConn(conn net.Conn, br *bufio.Reader) net.Conn { - return &bufferReaderConn{ - Conn: conn, - br: br, - } -} - -func (c *bufferReaderConn) Read(b []byte) (int, error) { - return c.br.Read(b) -} diff --git a/metrics/metrics.go b/metrics/metrics.go index 7d69657..65652f3 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -1,17 +1,31 @@ package metrics -import ( - "os" +type MetricName string - "github.com/prometheus/client_golang/prometheus" +const ( + MetricServicesGauge MetricName = "gost_services" + MetricServiceRequestsCounter MetricName = "gost_service_requests_total" + MetricServiceRequestsInFlightGauge MetricName = "gost_service_requests_in_flight" + MetricServiceRequestsDurationObserver MetricName = "gost_service_request_duration_seconds" + MetricServiceTransferInputBytesCounter MetricName = "gost_service_transfer_input_bytes_total" + MetricServiceTransferOutputBytesCounter MetricName = "gost_service_transfer_output_bytes_total" + MetricNodeConnectDurationObserver MetricName = "gost_chain_node_connect_duration_seconds" + MetricServiceHandlerErrorsCounter MetricName = "gost_service_handler_errors_total" + MetricChainErrorsCounter MetricName = "gost_chain_errors_total" ) +type Labels map[string]string + var ( - metrics *Metrics + metrics Metrics = Noop() ) -func SetGlobal(m *Metrics) { - metrics = m +func SetGlobal(m Metrics) { + if m != nil { + metrics = m + } else { + metrics = Noop() + } } type Gauge interface { @@ -30,195 +44,20 @@ type Observer interface { Observe(float64) } -type Metrics struct { - host string - services *prometheus.GaugeVec - requests *prometheus.CounterVec - requestsInFlight *prometheus.GaugeVec - requestSeconds *prometheus.HistogramVec - chainNodeConnectSecconds *prometheus.HistogramVec - inputBytes *prometheus.CounterVec - outputBytes *prometheus.CounterVec - handlerErrors *prometheus.CounterVec - chainErrors *prometheus.CounterVec +type Metrics interface { + Counter(name MetricName, labels Labels) Counter + Gauge(name MetricName, labels Labels) Gauge + Observer(name MetricName, labels Labels) Observer } -func NewMetrics() *Metrics { - host, _ := os.Hostname() - m := &Metrics{ - host: host, - services: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "gost_services", - Help: "Current number of services", - }, - []string{"host"}), - - requests: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gost_service_requests_total", - Help: "Total number of requests", - }, - []string{"host", "service"}), - - requestsInFlight: prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "gost_service_requests_in_flight", - Help: "Current in-flight requests", - }, - []string{"host", "service"}), - - requestSeconds: prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "gost_service_request_duration_seconds", - Help: "Distribution of request latencies", - Buckets: []float64{ - .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 15, 30, 60, - }, - }, - []string{"host", "service"}), - chainNodeConnectSecconds: prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "gost_chain_node_connect_duration_seconds", - Help: "Distribution of chain node connect latencies", - Buckets: []float64{ - .01, .05, .1, .25, .5, 1, 1.5, 2, 5, 10, 15, 30, 60, - }, - }, - []string{"host", "chain", "node"}), - inputBytes: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gost_service_transfer_input_bytes_total", - Help: "Total service input data transfer size in bytes", - }, - []string{"host", "service"}), - outputBytes: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gost_service_transfer_output_bytes_total", - Help: "Total service output data transfer size in bytes", - }, - []string{"host", "service"}), - handlerErrors: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gost_service_handler_errors_total", - Help: "Total service handler errors", - }, - []string{"host", "service"}), - chainErrors: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "gost_chain_errors_total", - Help: "Total chain errors", - }, - []string{"host", "chain"}), - } - prometheus.MustRegister(m.services) - prometheus.MustRegister(m.requests) - prometheus.MustRegister(m.requestsInFlight) - prometheus.MustRegister(m.requestSeconds) - prometheus.MustRegister(m.chainNodeConnectSecconds) - prometheus.MustRegister(m.inputBytes) - prometheus.MustRegister(m.outputBytes) - prometheus.MustRegister(m.handlerErrors) - prometheus.MustRegister(m.chainErrors) - return m +func GetCounter(name MetricName, labels Labels) Counter { + return metrics.Counter(name, labels) } -func Services() Gauge { - if metrics == nil || metrics.services == nil { - return nilGauge - } - return metrics.services. - With(prometheus.Labels{ - "host": metrics.host, - }) +func GetGauge(name MetricName, labels Labels) Gauge { + return metrics.Gauge(name, labels) } -func Requests(service string) Counter { - if metrics == nil || metrics.requests == nil { - return nilCounter - } - - return metrics.requests. - With(prometheus.Labels{ - "host": metrics.host, - "service": service, - }) -} - -func RequestsInFlight(service string) Gauge { - if metrics == nil || metrics.requestsInFlight == nil { - return nilGauge - } - return metrics.requestsInFlight. - With(prometheus.Labels{ - "host": metrics.host, - "service": service, - }) -} - -func RequestSeconds(service string) Observer { - if metrics == nil || metrics.requestSeconds == nil { - return nilObserver - } - return metrics.requestSeconds. - With(prometheus.Labels{ - "host": metrics.host, - "service": service, - }) -} - -func ChainNodeConnectSeconds(chain, node string) Observer { - if metrics == nil || metrics.chainNodeConnectSecconds == nil { - return nilObserver - } - return metrics.chainNodeConnectSecconds. - With(prometheus.Labels{ - "host": metrics.host, - "chain": chain, - "node": node, - }) -} - -func InputBytes(service string) Counter { - if metrics == nil || metrics.inputBytes == nil { - return nilCounter - } - return metrics.inputBytes. - With(prometheus.Labels{ - "host": metrics.host, - "service": service, - }) -} - -func OutputBytes(service string) Counter { - if metrics == nil || metrics.outputBytes == nil { - return nilCounter - } - return metrics.outputBytes. - With(prometheus.Labels{ - "host": metrics.host, - "service": service, - }) -} - -func HandlerErrors(service string) Counter { - if metrics == nil || metrics.handlerErrors == nil { - return nilCounter - } - return metrics.handlerErrors. - With(prometheus.Labels{ - "host": metrics.host, - "service": service, - }) -} - -func ChainErrors(chain string) Counter { - if metrics == nil || metrics.chainErrors == nil { - return nilCounter - } - return metrics.chainErrors. - With(prometheus.Labels{ - "host": metrics.host, - "chain": chain, - }) +func GetObserver(name MetricName, labels Labels) Observer { + return metrics.Observer(name, labels) } diff --git a/metrics/noop.go b/metrics/noop.go new file mode 100644 index 0000000..ff7e410 --- /dev/null +++ b/metrics/noop.go @@ -0,0 +1,43 @@ +package metrics + +var ( + nopGauge = &noopGauge{} + nopCounter = &noopCounter{} + nopObserver = &noopObserver{} + + noop Metrics = &noopMetrics{} +) + +type noopMetrics struct{} + +func Noop() Metrics { + return noop +} + +func (m *noopMetrics) Counter(name MetricName, labels Labels) Counter { + return nopCounter +} + +func (m *noopMetrics) Gauge(name MetricName, labels Labels) Gauge { + return nopGauge +} + +func (m *noopMetrics) Observer(name MetricName, labels Labels) Observer { + return nopObserver +} + +type noopGauge struct{} + +func (*noopGauge) Inc() {} +func (*noopGauge) Dec() {} +func (*noopGauge) Add(v float64) {} +func (*noopGauge) Set(v float64) {} + +type noopCounter struct{} + +func (*noopCounter) Inc() {} +func (*noopCounter) Add(v float64) {} + +type noopObserver struct{} + +func (*noopObserver) Observe(v float64) {} diff --git a/metrics/nop.go b/metrics/nop.go deleted file mode 100644 index 823a47f..0000000 --- a/metrics/nop.go +++ /dev/null @@ -1,23 +0,0 @@ -package metrics - -var ( - nilGauge = &nopGauge{} - nilCounter = &nopCounter{} - nilObserver = &nopObserver{} -) - -type nopGauge struct{} - -func (*nopGauge) Inc() {} -func (*nopGauge) Dec() {} -func (*nopGauge) Add(v float64) {} -func (*nopGauge) Set(v float64) {} - -type nopCounter struct{} - -func (*nopCounter) Inc() {} -func (*nopCounter) Add(v float64) {} - -type nopObserver struct{} - -func (*nopObserver) Observe(v float64) {} diff --git a/metrics/service/service.go b/metrics/service/service.go deleted file mode 100644 index 5eba0bb..0000000 --- a/metrics/service/service.go +++ /dev/null @@ -1,65 +0,0 @@ -package service - -import ( - "net" - "net/http" - - "github.com/prometheus/client_golang/prometheus/promhttp" -) - -const ( - DefaultPath = "/metrics" -) - -type options struct { - path string -} - -type Option func(*options) - -func PathOption(path string) Option { - return func(o *options) { - o.path = path - } -} - -type Service struct { - s *http.Server - ln net.Listener -} - -func NewService(addr string, opts ...Option) (*Service, error) { - ln, err := net.Listen("tcp", addr) - if err != nil { - return nil, err - } - - var options options - for _, opt := range opts { - opt(&options) - } - if options.path == "" { - options.path = DefaultPath - } - - mux := http.NewServeMux() - mux.Handle(options.path, promhttp.Handler()) - return &Service{ - s: &http.Server{ - Handler: mux, - }, - ln: ln, - }, nil -} - -func (s *Service) Serve() error { - return s.s.Serve(s.ln) -} - -func (s *Service) Addr() net.Addr { - return s.ln.Addr() -} - -func (s *Service) Close() error { - return s.s.Close() -} diff --git a/metrics/wrapper/conn.go b/metrics/wrapper/conn.go index ab036ad..5bcfda0 100644 --- a/metrics/wrapper/conn.go +++ b/metrics/wrapper/conn.go @@ -28,13 +28,25 @@ func WrapConn(service string, c net.Conn) net.Conn { func (c *serverConn) Read(b []byte) (n int, err error) { n, err = c.Conn.Read(b) - metrics.InputBytes(c.service).Add(float64(n)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferInputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } func (c *serverConn) Write(b []byte) (n int, err error) { n, err = c.Conn.Write(b) - metrics.OutputBytes(c.service).Add(float64(n)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferOutputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } @@ -61,13 +73,25 @@ func WrapPacketConn(service string, pc net.PacketConn) net.PacketConn { func (c *packetConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { n, addr, err = c.PacketConn.ReadFrom(p) - metrics.InputBytes(c.service).Add(float64(n)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferInputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } func (c *packetConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { n, err = c.PacketConn.WriteTo(p, addr) - metrics.OutputBytes(c.service).Add(float64(n)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferOutputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } @@ -107,7 +131,13 @@ func (c *udpConn) SetWriteBuffer(n int) error { func (c *udpConn) Read(b []byte) (n int, err error) { if nc, ok := c.PacketConn.(io.Reader); ok { n, err = nc.Read(b) - metrics.InputBytes(c.service).Add(float64(n)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferInputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } err = errUnsupport @@ -116,14 +146,26 @@ func (c *udpConn) Read(b []byte) (n int, err error) { func (c *udpConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { n, addr, err = c.PacketConn.ReadFrom(p) - metrics.InputBytes(c.service).Add(float64(n)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferInputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } func (c *udpConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) { if nc, ok := c.PacketConn.(readUDP); ok { n, addr, err = nc.ReadFromUDP(b) - metrics.InputBytes(c.service).Add(float64(n)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferInputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } err = errUnsupport @@ -133,7 +175,13 @@ func (c *udpConn) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) { func (c *udpConn) ReadMsgUDP(b, oob []byte) (n, oobn, flags int, addr *net.UDPAddr, err error) { if nc, ok := c.PacketConn.(readUDP); ok { n, oobn, flags, addr, err = nc.ReadMsgUDP(b, oob) - metrics.InputBytes(c.service).Add(float64(n + oobn)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferInputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } err = errUnsupport @@ -143,7 +191,13 @@ func (c *udpConn) ReadMsgUDP(b, oob []byte) (n, oobn, flags int, addr *net.UDPAd func (c *udpConn) Write(b []byte) (n int, err error) { if nc, ok := c.PacketConn.(io.Writer); ok { n, err = nc.Write(b) - metrics.OutputBytes(c.service).Add(float64(n)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferOutputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } err = errUnsupport @@ -152,14 +206,26 @@ func (c *udpConn) Write(b []byte) (n int, err error) { func (c *udpConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { n, err = c.PacketConn.WriteTo(p, addr) - metrics.OutputBytes(c.service).Add(float64(n)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferOutputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } func (c *udpConn) WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error) { if nc, ok := c.PacketConn.(writeUDP); ok { n, err = nc.WriteToUDP(b, addr) - metrics.OutputBytes(c.service).Add(float64(n)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferOutputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } err = errUnsupport @@ -169,7 +235,13 @@ func (c *udpConn) WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error) { func (c *udpConn) WriteMsgUDP(b, oob []byte, addr *net.UDPAddr) (n, oobn int, err error) { if nc, ok := c.PacketConn.(writeUDP); ok { n, oobn, err = nc.WriteMsgUDP(b, oob, addr) - metrics.OutputBytes(c.service).Add(float64(n + oobn)) + if counter := metrics.GetCounter( + metrics.MetricServiceTransferOutputBytesCounter, + metrics.Labels{ + "service": c.service, + }); counter != nil { + counter.Add(float64(n)) + } return } err = errUnsupport diff --git a/service/service.go b/service/service.go index c9cc298..3df74ad 100644 --- a/service/service.go +++ b/service/service.go @@ -66,8 +66,12 @@ func (s *service) Close() error { } func (s *service) Serve() error { - metrics.Services().Inc() - defer metrics.Services().Dec() + if v := metrics.GetGauge( + metrics.MetricServicesGauge, + metrics.Labels{}); v != nil { + v.Inc() + defer v.Dec() + } var tempDelay time.Duration for { @@ -98,22 +102,34 @@ func (s *service) Serve() error { } go func() { - metrics.Requests(s.name).Inc() + if v := metrics.GetCounter(metrics.MetricServiceRequestsCounter, + metrics.Labels{"service": s.name}); v != nil { + v.Inc() + } - metrics.RequestsInFlight(s.name).Inc() - defer metrics.RequestsInFlight(s.name).Dec() + if v := metrics.GetGauge(metrics.MetricServiceRequestsInFlightGauge, + metrics.Labels{"service": s.name}); v != nil { + v.Inc() + defer v.Dec() + } start := time.Now() - defer func() { - metrics.RequestSeconds(s.name).Observe(time.Since(start).Seconds()) - }() + if v := metrics.GetObserver(metrics.MetricServiceRequestsDurationObserver, + metrics.Labels{"service": s.name}); v != nil { + defer func() { + v.Observe(float64(time.Since(start).Seconds())) + }() + } if err := s.handler.Handle( context.Background(), conn, ); err != nil { s.options.logger.Error(err) - metrics.HandlerErrors(s.name).Inc() + if v := metrics.GetCounter(metrics.MetricServiceHandlerErrorsCounter, + metrics.Labels{"service": s.name}); v != nil { + v.Inc() + } } }() }