add observer
This commit is contained in:
@ -2,6 +2,7 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os/exec"
|
||||
@ -13,10 +14,12 @@ import (
|
||||
"github.com/go-gost/core/listener"
|
||||
"github.com/go-gost/core/logger"
|
||||
"github.com/go-gost/core/metrics"
|
||||
"github.com/go-gost/core/observer"
|
||||
"github.com/go-gost/core/recorder"
|
||||
"github.com/go-gost/core/service"
|
||||
ctxvalue "github.com/go-gost/x/internal/ctx"
|
||||
xmetrics "github.com/go-gost/x/metrics"
|
||||
"github.com/go-gost/x/stats"
|
||||
"github.com/rs/xid"
|
||||
)
|
||||
|
||||
@ -27,6 +30,8 @@ type options struct {
|
||||
postUp []string
|
||||
preDown []string
|
||||
postDown []string
|
||||
stats *stats.Stats
|
||||
observer observer.Observer
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
@ -68,6 +73,18 @@ func PostDownOption(cmds []string) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func StatsOption(stats *stats.Stats) Option {
|
||||
return func(opts *options) {
|
||||
opts.stats = stats
|
||||
}
|
||||
}
|
||||
|
||||
func ObserverOption(observer observer.Observer) Option {
|
||||
return func(opts *options) {
|
||||
opts.observer = observer
|
||||
}
|
||||
}
|
||||
|
||||
func LoggerOption(logger logger.Logger) Option {
|
||||
return func(opts *options) {
|
||||
opts.logger = logger
|
||||
@ -78,6 +95,7 @@ type defaultService struct {
|
||||
name string
|
||||
listener listener.Listener
|
||||
handler handler.Handler
|
||||
status *Status
|
||||
options options
|
||||
}
|
||||
|
||||
@ -91,7 +109,13 @@ func NewService(name string, ln listener.Listener, h handler.Handler, opts ...Op
|
||||
listener: ln,
|
||||
handler: h,
|
||||
options: options,
|
||||
status: &Status{
|
||||
createTime: time.Now(),
|
||||
events: make([]Event, 0, MaxEventSize),
|
||||
stats: options.stats,
|
||||
},
|
||||
}
|
||||
s.setState(StateRunning)
|
||||
|
||||
s.execCmds("pre-up", s.options.preUp)
|
||||
|
||||
@ -104,6 +128,14 @@ func (s *defaultService) Addr() net.Addr {
|
||||
|
||||
func (s *defaultService) Serve() error {
|
||||
s.execCmds("post-up", s.options.postUp)
|
||||
s.setState(StateReady)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
if s.status.Stats() != nil {
|
||||
go s.observeStats(ctx)
|
||||
}
|
||||
|
||||
if v := xmetrics.GetGauge(
|
||||
xmetrics.MetricServicesGauge,
|
||||
@ -126,14 +158,25 @@ func (s *defaultService) Serve() error {
|
||||
if max := 5 * time.Second; tempDelay > max {
|
||||
tempDelay = max
|
||||
}
|
||||
|
||||
s.setState(StateFailed)
|
||||
|
||||
s.options.logger.Warnf("accept: %v, retrying in %v", e, tempDelay)
|
||||
time.Sleep(tempDelay)
|
||||
continue
|
||||
}
|
||||
s.setState(StateClosed)
|
||||
s.options.logger.Errorf("accept: %v", e)
|
||||
|
||||
return e
|
||||
}
|
||||
tempDelay = 0
|
||||
|
||||
if tempDelay > 0 {
|
||||
tempDelay = 0
|
||||
s.setState(StateReady)
|
||||
}
|
||||
|
||||
s.status.stats.Add(stats.KindTotalConns, 1)
|
||||
|
||||
clientAddr := conn.RemoteAddr().String()
|
||||
clientIP := clientAddr
|
||||
@ -141,7 +184,7 @@ func (s *defaultService) Serve() error {
|
||||
clientIP = h
|
||||
}
|
||||
|
||||
ctx := ctxvalue.ContextWithSid(context.Background(), ctxvalue.Sid(xid.New().String()))
|
||||
ctx := ctxvalue.ContextWithSid(ctx, ctxvalue.Sid(xid.New().String()))
|
||||
ctx = ctxvalue.ContextWithClientAddr(ctx, ctxvalue.ClientAddr(clientAddr))
|
||||
ctx = ctxvalue.ContextWithHash(ctx, &ctxvalue.Hash{Source: clientIP})
|
||||
|
||||
@ -161,6 +204,9 @@ func (s *defaultService) Serve() error {
|
||||
}
|
||||
|
||||
go func() {
|
||||
s.status.stats.Add(stats.KindCurrentConns, 1)
|
||||
defer s.status.stats.Add(stats.KindCurrentConns, -1)
|
||||
|
||||
if v := xmetrics.GetCounter(xmetrics.MetricServiceRequestsCounter,
|
||||
metrics.Labels{"service": s.name, "client": clientIP}); v != nil {
|
||||
v.Inc()
|
||||
@ -186,11 +232,16 @@ func (s *defaultService) Serve() error {
|
||||
metrics.Labels{"service": s.name, "client": clientIP}); v != nil {
|
||||
v.Inc()
|
||||
}
|
||||
s.status.stats.Add(stats.KindTotalErrs, 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *defaultService) Status() *Status {
|
||||
return s.status
|
||||
}
|
||||
|
||||
func (s *defaultService) Close() error {
|
||||
s.execCmds("pre-down", s.options.preDown)
|
||||
defer s.execCmds("post-down", s.options.postDown)
|
||||
@ -214,3 +265,64 @@ func (s *defaultService) execCmds(phase string, cmds []string) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *defaultService) setState(state State) {
|
||||
s.status.setState(state)
|
||||
|
||||
msg := fmt.Sprintf("service %s is %s", s.name, state)
|
||||
s.status.addEvent(Event{
|
||||
Time: time.Now(),
|
||||
Message: msg,
|
||||
})
|
||||
|
||||
if obs := s.options.observer; obs != nil {
|
||||
obs.Observe(context.Background(), []observer.Event{ServiceEvent{
|
||||
Kind: "service",
|
||||
Service: s.name,
|
||||
State: state,
|
||||
Msg: msg,
|
||||
}})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *defaultService) observeStats(ctx context.Context) {
|
||||
if s.options.observer == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
st := s.status.Stats()
|
||||
if !st.IsUpdated() {
|
||||
break
|
||||
}
|
||||
s.options.observer.Observe(ctx, []observer.Event{
|
||||
stats.StatsEvent{
|
||||
Kind: "service",
|
||||
Service: s.name,
|
||||
TotalConns: st.Get(stats.KindTotalConns),
|
||||
CurrentConns: st.Get(stats.KindCurrentConns),
|
||||
InputBytes: st.Get(stats.KindInputBytes),
|
||||
OutputBytes: st.Get(stats.KindOutputBytes),
|
||||
},
|
||||
})
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type ServiceEvent struct {
|
||||
Kind string
|
||||
Service string
|
||||
State State
|
||||
Msg string
|
||||
}
|
||||
|
||||
func (ServiceEvent) Type() observer.EventType {
|
||||
return observer.EventStatus
|
||||
}
|
||||
|
Reference in New Issue
Block a user