add client label for metrics
This commit is contained in:
parent
de5ce1e1ca
commit
7576651a67
@ -47,8 +47,8 @@ func newTCPListener(ln net.Listener, opts ...listener.Option) listener.Listener
|
||||
func (l *tcpListener) Init(md md.Metadata) (err error) {
|
||||
// l.logger.Debugf("pp: %d", l.options.ProxyProtocol)
|
||||
ln := l.ln
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -77,8 +77,8 @@ func (l *dtlsListener) Init(md md.Metadata) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -58,8 +58,8 @@ func (l *grpcListener) Init(md md.Metadata) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -79,8 +79,8 @@ func (l *h2Listener) Init(md md.Metadata) (err error) {
|
||||
return err
|
||||
}
|
||||
l.addr = ln.Addr()
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -68,8 +68,8 @@ func (l *http2Listener) Init(md md.Metadata) (err error) {
|
||||
return err
|
||||
}
|
||||
l.addr = ln.Addr()
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -56,8 +56,8 @@ func (l *mtlsListener) Init(md md.Metadata) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -98,8 +98,8 @@ func (l *mwsListener) Init(md md.Metadata) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -52,8 +52,8 @@ func (l *obfsListener) Init(md md.Metadata) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -51,8 +51,8 @@ func (l *obfsListener) Init(md md.Metadata) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -59,8 +59,8 @@ func (l *redirectListener) Init(md md.Metadata) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -58,8 +58,8 @@ func (l *sshListener) Init(md md.Metadata) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -67,8 +67,8 @@ func (l *sshdListener) Init(md md.Metadata) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -54,8 +54,8 @@ func (l *tcpListener) Init(md md.Metadata) (err error) {
|
||||
|
||||
l.logger.Debugf("pp: %d", l.options.ProxyProtocol)
|
||||
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -52,8 +52,8 @@ func (l *tlsListener) Init(md md.Metadata) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -93,8 +93,8 @@ func (l *wsListener) Init(md md.Metadata) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = proxyproto.WrapListener(l.options.ProxyProtocol, ln, 10*time.Second)
|
||||
ln = metrics.WrapListener(l.options.Service, ln)
|
||||
ln = admission.WrapListener(l.options.Admission, ln)
|
||||
ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
|
||||
ln = climiter.WrapListener(l.options.ConnLimiter, ln)
|
||||
|
@ -30,7 +30,7 @@ func NewMetrics() metrics.Metrics {
|
||||
Name: string(MetricServiceRequestsInFlightGauge),
|
||||
Help: "Current in-flight requests",
|
||||
},
|
||||
[]string{"host", "service"}),
|
||||
[]string{"host", "service", "client"}),
|
||||
},
|
||||
counters: map[metrics.MetricName]*prometheus.CounterVec{
|
||||
MetricServiceRequestsCounter: prometheus.NewCounterVec(
|
||||
@ -38,7 +38,7 @@ func NewMetrics() metrics.Metrics {
|
||||
Name: string(MetricServiceRequestsCounter),
|
||||
Help: "Total number of requests",
|
||||
},
|
||||
[]string{"host", "service"}),
|
||||
[]string{"host", "service", "client"}),
|
||||
MetricServiceTransferInputBytesCounter: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: string(MetricServiceTransferInputBytesCounter),
|
||||
@ -56,7 +56,7 @@ func NewMetrics() metrics.Metrics {
|
||||
Name: string(MetricServiceHandlerErrorsCounter),
|
||||
Help: "Total service handler errors",
|
||||
},
|
||||
[]string{"host", "service"}),
|
||||
[]string{"host", "service", "client"}),
|
||||
MetricChainErrorsCounter: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: string(MetricChainErrorsCounter),
|
||||
|
@ -145,12 +145,12 @@ func (s *defaultService) Serve() error {
|
||||
}
|
||||
tempDelay = 0
|
||||
|
||||
for _, rec := range s.options.recorders {
|
||||
if rec.Record == recorder.RecorderServiceClientAddress {
|
||||
host := conn.RemoteAddr().String()
|
||||
if h, _, _ := net.SplitHostPort(host); h != "" {
|
||||
host = h
|
||||
}
|
||||
for _, rec := range s.options.recorders {
|
||||
if rec.Record == recorder.RecorderServiceClientAddress {
|
||||
if err := rec.Recorder.Record(context.Background(), []byte(host)); err != nil {
|
||||
s.options.logger.Errorf("record %s: %v", rec.Record, err)
|
||||
}
|
||||
@ -166,12 +166,12 @@ func (s *defaultService) Serve() error {
|
||||
|
||||
go func() {
|
||||
if v := xmetrics.GetCounter(xmetrics.MetricServiceRequestsCounter,
|
||||
metrics.Labels{"service": s.name}); v != nil {
|
||||
metrics.Labels{"service": s.name, "client": host}); v != nil {
|
||||
v.Inc()
|
||||
}
|
||||
|
||||
if v := xmetrics.GetGauge(xmetrics.MetricServiceRequestsInFlightGauge,
|
||||
metrics.Labels{"service": s.name}); v != nil {
|
||||
metrics.Labels{"service": s.name, "client": host}); v != nil {
|
||||
v.Inc()
|
||||
defer v.Dec()
|
||||
}
|
||||
@ -184,14 +184,13 @@ func (s *defaultService) Serve() error {
|
||||
}()
|
||||
}
|
||||
|
||||
host, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
|
||||
ctx := sx.ContextWithHash(context.Background(), &sx.Hash{Source: host})
|
||||
ctx = ContextWithSid(ctx, xid.New().String())
|
||||
|
||||
if err := s.handler.Handle(ctx, conn); err != nil {
|
||||
s.options.logger.Error(err)
|
||||
if v := xmetrics.GetCounter(xmetrics.MetricServiceHandlerErrorsCounter,
|
||||
metrics.Labels{"service": s.name}); v != nil {
|
||||
metrics.Labels{"service": s.name, "client": host}); v != nil {
|
||||
v.Inc()
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user