diff --git a/handler/relay/entrypoint.go b/handler/relay/entrypoint.go index f8fbab6..93fd479 100644 --- a/handler/relay/entrypoint.go +++ b/handler/relay/entrypoint.go @@ -47,8 +47,8 @@ func newTCPListener(ln net.Listener, opts ...listener.Option) listener.Listener func (l *tcpListener) Init(md md.Metadata) (err error) { // l.logger.Debugf("pp: %d", l.options.ProxyProtocol) ln := l.ln - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/dtls/listener.go b/listener/dtls/listener.go index b9ca169..3796b3a 100644 --- a/listener/dtls/listener.go +++ b/listener/dtls/listener.go @@ -77,8 +77,8 @@ func (l *dtlsListener) Init(md md.Metadata) (err error) { if err != nil { return } - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/grpc/listener.go b/listener/grpc/listener.go index 546dbda..fb3f862 100644 --- a/listener/grpc/listener.go +++ b/listener/grpc/listener.go @@ -58,8 +58,8 @@ func (l *grpcListener) Init(md md.Metadata) (err error) { if err != nil { return } - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/http2/h2/listener.go b/listener/http2/h2/listener.go index f0a1d5f..f8cea7d 100644 --- a/listener/http2/h2/listener.go +++ b/listener/http2/h2/listener.go @@ -79,8 +79,8 @@ func (l *h2Listener) Init(md md.Metadata) (err error) { return err } l.addr = ln.Addr() - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/http2/listener.go b/listener/http2/listener.go index 666c3f9..5870492 100644 --- a/listener/http2/listener.go +++ b/listener/http2/listener.go @@ -68,8 +68,8 @@ func (l *http2Listener) Init(md md.Metadata) (err error) { return err } l.addr = ln.Addr() - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/mtls/listener.go b/listener/mtls/listener.go index ed5d1aa..9cfe92b 100644 --- a/listener/mtls/listener.go +++ b/listener/mtls/listener.go @@ -56,8 +56,8 @@ func (l *mtlsListener) Init(md md.Metadata) (err error) { return } - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/mws/listener.go b/listener/mws/listener.go index 3823f3c..9884ebd 100644 --- a/listener/mws/listener.go +++ b/listener/mws/listener.go @@ -98,8 +98,8 @@ func (l *mwsListener) Init(md md.Metadata) (err error) { if err != nil { return } - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/obfs/http/listener.go b/listener/obfs/http/listener.go index 5bbe2d9..8872ac7 100644 --- a/listener/obfs/http/listener.go +++ b/listener/obfs/http/listener.go @@ -52,8 +52,8 @@ func (l *obfsListener) Init(md md.Metadata) (err error) { if err != nil { return } - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/obfs/tls/listener.go b/listener/obfs/tls/listener.go index aac7c64..ec7758a 100644 --- a/listener/obfs/tls/listener.go +++ b/listener/obfs/tls/listener.go @@ -51,8 +51,8 @@ func (l *obfsListener) Init(md md.Metadata) (err error) { if err != nil { return } - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/redirect/tcp/listener.go b/listener/redirect/tcp/listener.go index c036cb9..47df75e 100644 --- a/listener/redirect/tcp/listener.go +++ b/listener/redirect/tcp/listener.go @@ -59,8 +59,8 @@ func (l *redirectListener) Init(md md.Metadata) (err error) { return err } - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/ssh/listener.go b/listener/ssh/listener.go index ad0c006..dafecfe 100644 --- a/listener/ssh/listener.go +++ b/listener/ssh/listener.go @@ -58,8 +58,8 @@ func (l *sshListener) Init(md md.Metadata) (err error) { return err } - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/sshd/listener.go b/listener/sshd/listener.go index 70a1640..c70b040 100644 --- a/listener/sshd/listener.go +++ b/listener/sshd/listener.go @@ -67,8 +67,8 @@ func (l *sshdListener) Init(md md.Metadata) (err error) { return err } - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/tcp/listener.go b/listener/tcp/listener.go index c91c6e2..451afd0 100644 --- a/listener/tcp/listener.go +++ b/listener/tcp/listener.go @@ -54,8 +54,8 @@ func (l *tcpListener) Init(md md.Metadata) (err error) { l.logger.Debugf("pp: %d", l.options.ProxyProtocol) - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/tls/listener.go b/listener/tls/listener.go index 82712e0..6f1204f 100644 --- a/listener/tls/listener.go +++ b/listener/tls/listener.go @@ -52,8 +52,8 @@ func (l *tlsListener) Init(md md.Metadata) (err error) { if err != nil { return } - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/listener/ws/listener.go b/listener/ws/listener.go index a07c4d1..05f8456 100644 --- a/listener/ws/listener.go +++ b/listener/ws/listener.go @@ -93,8 +93,8 @@ func (l *wsListener) Init(md md.Metadata) (err error) { if err != nil { return } - ln = metrics.WrapListener(l.options.Service, ln) ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second) + ln = metrics.WrapListener(l.options.Service, ln) ln = admission.WrapListener(l.options.Admission, ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln) diff --git a/metrics/prom.go b/metrics/prom.go index 413380f..d84dbe0 100644 --- a/metrics/prom.go +++ b/metrics/prom.go @@ -30,7 +30,7 @@ func NewMetrics() metrics.Metrics { Name: string(MetricServiceRequestsInFlightGauge), Help: "Current in-flight requests", }, - []string{"host", "service"}), + []string{"host", "service", "client"}), }, counters: map[metrics.MetricName]*prometheus.CounterVec{ MetricServiceRequestsCounter: prometheus.NewCounterVec( @@ -38,7 +38,7 @@ func NewMetrics() metrics.Metrics { Name: string(MetricServiceRequestsCounter), Help: "Total number of requests", }, - []string{"host", "service"}), + []string{"host", "service", "client"}), MetricServiceTransferInputBytesCounter: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: string(MetricServiceTransferInputBytesCounter), @@ -56,7 +56,7 @@ func NewMetrics() metrics.Metrics { Name: string(MetricServiceHandlerErrorsCounter), Help: "Total service handler errors", }, - []string{"host", "service"}), + []string{"host", "service", "client"}), MetricChainErrorsCounter: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: string(MetricChainErrorsCounter), diff --git a/service/service.go b/service/service.go index bb455fb..eec8316 100644 --- a/service/service.go +++ b/service/service.go @@ -145,12 +145,12 @@ func (s *defaultService) Serve() error { } tempDelay = 0 + host := conn.RemoteAddr().String() + if h, _, _ := net.SplitHostPort(host); h != "" { + host = h + } for _, rec := range s.options.recorders { if rec.Record == recorder.RecorderServiceClientAddress { - host := conn.RemoteAddr().String() - if h, _, _ := net.SplitHostPort(host); h != "" { - host = h - } if err := rec.Recorder.Record(context.Background(), []byte(host)); err != nil { s.options.logger.Errorf("record %s: %v", rec.Record, err) } @@ -166,12 +166,12 @@ func (s *defaultService) Serve() error { go func() { if v := xmetrics.GetCounter(xmetrics.MetricServiceRequestsCounter, - metrics.Labels{"service": s.name}); v != nil { + metrics.Labels{"service": s.name, "client": host}); v != nil { v.Inc() } if v := xmetrics.GetGauge(xmetrics.MetricServiceRequestsInFlightGauge, - metrics.Labels{"service": s.name}); v != nil { + metrics.Labels{"service": s.name, "client": host}); v != nil { v.Inc() defer v.Dec() } @@ -184,14 +184,13 @@ func (s *defaultService) Serve() error { }() } - host, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) ctx := sx.ContextWithHash(context.Background(), &sx.Hash{Source: host}) ctx = ContextWithSid(ctx, xid.New().String()) if err := s.handler.Handle(ctx, conn); err != nil { s.options.logger.Error(err) if v := xmetrics.GetCounter(xmetrics.MetricServiceHandlerErrorsCounter, - metrics.Labels{"service": s.name}); v != nil { + metrics.Labels{"service": s.name, "client": host}); v != nil { v.Inc() } }