From ee2c05494ce12a20162a5c4bd16ebfc63ecfa245 Mon Sep 17 00:00:00 2001 From: ginuerzh Date: Wed, 21 Dec 2022 18:17:46 +0800 Subject: [PATCH] add more options for grpc --- api/swagger.yaml | 56 +++++++++++++++++++++++++++++++++++++++ config/parsing/parse.go | 6 +++++ dialer/grpc/conn.go | 1 + dialer/grpc/dialer.go | 12 +++++++-- dialer/grpc/metadata.go | 41 +++++++++++++++++++--------- limiter/conn/conn.go | 37 -------------------------- limiter/conn/limiter.go | 37 ++++++++++++++++++++++++++ limiter/rate/limiter.go | 30 +++++++++++++++++++++ limiter/rate/rate.go | 30 --------------------- listener/grpc/listener.go | 14 ++++++++++ listener/grpc/metadata.go | 46 +++++++++++++++++++++++--------- recorder/redis.go | 37 ++++++++++++++++++++++++++ 12 files changed, 254 insertions(+), 93 deletions(-) diff --git a/api/swagger.yaml b/api/swagger.yaml index f702498..b87e273 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -276,9 +276,15 @@ definitions: type: string type: array x-go-name: Bypasses + host: + type: string + x-go-name: Host name: type: string x-go-name: Name + protocol: + type: string + x-go-name: Protocol type: object x-go-package: github.com/go-gost/x/config ForwarderConfig: @@ -469,6 +475,50 @@ definitions: output: type: string x-go-name: Output + rotation: + $ref: '#/definitions/LogRotationConfig' + type: object + x-go-package: github.com/go-gost/x/config + LogRotationConfig: + properties: + compress: + description: |- + Compress determines if the rotated log files should be compressed + using gzip. The default is not to perform compression. + type: boolean + x-go-name: Compress + localTime: + description: |- + LocalTime determines if the time used for formatting the timestamps in + backup files is the computer's local time. The default is to use UTC + time. + type: boolean + x-go-name: LocalTime + maxAge: + description: |- + MaxAge is the maximum number of days to retain old log files based on the + timestamp encoded in their filename. Note that a day is defined as 24 + hours and may not exactly correspond to calendar days due to daylight + savings, leap seconds, etc. The default is not to remove old log files + based on age. + format: int64 + type: integer + x-go-name: MaxAge + maxBackups: + description: |- + MaxBackups is the maximum number of old log files to retain. The default + is to retain all old log files (though MaxAge may still cause them to get + deleted.) + format: int64 + type: integer + x-go-name: MaxBackups + maxSize: + description: |- + MaxSize is the maximum size in megabytes of the log file before it gets + rotated. It defaults to 100 megabytes. + format: int64 + type: integer + x-go-name: MaxSize type: object x-go-package: github.com/go-gost/x/config MetricsConfig: @@ -521,6 +571,9 @@ definitions: $ref: '#/definitions/ConnectorConfig' dialer: $ref: '#/definitions/DialerConfig' + host: + type: string + x-go-name: Host hosts: type: string x-go-name: Hosts @@ -534,6 +587,9 @@ definitions: name: type: string x-go-name: Name + protocol: + type: string + x-go-name: Protocol resolver: type: string x-go-name: Resolver diff --git a/config/parsing/parse.go b/config/parsing/parse.go index 2283ae0..b061df6 100644 --- a/config/parsing/parse.go +++ b/config/parsing/parse.go @@ -339,6 +339,12 @@ func ParseRecorder(cfg *config.RecorderConfig) (r recorder.Recorder) { xrecorder.KeyRedisRecorderOption(cfg.Redis.Key), xrecorder.PasswordRedisRecorderOption(cfg.Redis.Password), ) + case "sset": // sorted set + return xrecorder.RedisSortedSetRecorder(cfg.Redis.Addr, + xrecorder.DBRedisRecorderOption(cfg.Redis.DB), + xrecorder.KeyRedisRecorderOption(cfg.Redis.Key), + xrecorder.PasswordRedisRecorderOption(cfg.Redis.Password), + ) default: // redis set return xrecorder.RedisSetRecorder(cfg.Redis.Addr, xrecorder.DBRedisRecorderOption(cfg.Redis.DB), diff --git a/dialer/grpc/conn.go b/dialer/grpc/conn.go index 90ed28c..5bdc7a3 100644 --- a/dialer/grpc/conn.go +++ b/dialer/grpc/conn.go @@ -66,6 +66,7 @@ func (c *conn) Close() error { case <-c.closed: default: close(c.closed) + return c.c.CloseSend() } return nil diff --git a/dialer/grpc/dialer.go b/dialer/grpc/dialer.go index ad7c464..1714326 100644 --- a/dialer/grpc/dialer.go +++ b/dialer/grpc/dialer.go @@ -4,7 +4,6 @@ import ( "context" "net" "sync" - "time" "github.com/go-gost/core/dialer" md "github.com/go-gost/core/metadata" @@ -14,6 +13,7 @@ import ( "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" ) func init() { @@ -82,7 +82,7 @@ func (d *grpcDialer) Dial(ctx context.Context, addr string, opts ...dialer.DialO grpc.WithAuthority(host), grpc.WithConnectParams(grpc.ConnectParams{ Backoff: backoff.DefaultConfig, - MinConnectTimeout: 10 * time.Second, + MinConnectTimeout: d.md.minConnectTimeout, }), grpc.FailOnNonTempDialError(true), } @@ -92,6 +92,14 @@ func (d *grpcDialer) Dial(ctx context.Context, addr string, opts ...dialer.DialO grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) } + if d.md.keepalive { + grpcOpts = append(grpcOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: d.md.keepaliveTime, + Timeout: d.md.keepaliveTimeout, + PermitWithoutStream: d.md.keepalivePermitWithoutStream, + })) + } + cc, err := grpc.DialContext(ctx, addr, grpcOpts...) if err != nil { d.options.Logger.Error(err) diff --git a/dialer/grpc/metadata.go b/dialer/grpc/metadata.go index 28b0bda..882694b 100644 --- a/dialer/grpc/metadata.go +++ b/dialer/grpc/metadata.go @@ -1,26 +1,43 @@ package grpc import ( + "time" + mdata "github.com/go-gost/core/metadata" mdutil "github.com/go-gost/core/metadata/util" ) type metadata struct { - insecure bool - host string - path string + insecure bool + host string + path string + keepalive bool + keepaliveTime time.Duration + keepaliveTimeout time.Duration + keepalivePermitWithoutStream bool + minConnectTimeout time.Duration } func (d *grpcDialer) parseMetadata(md mdata.Metadata) (err error) { - const ( - insecure = "grpcInsecure" - host = "host" - path = "path" - ) - - d.md.insecure = mdutil.GetBool(md, insecure) - d.md.host = mdutil.GetString(md, host) - d.md.path = mdutil.GetString(md, path) + d.md.insecure = mdutil.GetBool(md, "grpc.insecure", "grpcInsecure", "insecure") + d.md.host = mdutil.GetString(md, "grpc.authority", "grpc.host", "host") + d.md.path = mdutil.GetString(md, "grpc.path", "path") + d.md.keepalive = mdutil.GetBool(md, "grpc.keepalive", "keepalive", "keepAlive") + if d.md.keepalive { + d.md.keepaliveTime = mdutil.GetDuration(md, "grpc.keepalive.time", "keepalive.time") + if d.md.keepaliveTime <= 0 { + d.md.keepaliveTime = 30 * time.Second + } + d.md.keepaliveTimeout = mdutil.GetDuration(md, "grpc.keepalive.timeout", "keepalive.timeout") + if d.md.keepaliveTimeout <= 0 { + d.md.keepaliveTimeout = 30 * time.Second + } + d.md.keepalivePermitWithoutStream = mdutil.GetBool(md, "grpc.keepalive.permitWithoutStream", "keepalive.permitWithoutStream") + } + d.md.minConnectTimeout = mdutil.GetDuration(md, "grpc.minConnectTimeout", "minConnectTimeout") + if d.md.minConnectTimeout <= 0 { + d.md.minConnectTimeout = 30 * time.Second + } return } diff --git a/limiter/conn/conn.go b/limiter/conn/conn.go index c49fa1e..2ec5107 100644 --- a/limiter/conn/conn.go +++ b/limiter/conn/conn.go @@ -5,7 +5,6 @@ import ( "context" "io" "net" - "sort" "strconv" "strings" "sync" @@ -22,42 +21,6 @@ const ( IPLimitKey = "$$" ) -type limiterGroup struct { - limiters []limiter.Limiter -} - -func newLimiterGroup(limiters ...limiter.Limiter) *limiterGroup { - sort.Slice(limiters, func(i, j int) bool { - return limiters[i].Limit() < limiters[j].Limit() - }) - return &limiterGroup{limiters: limiters} -} - -func (l *limiterGroup) Allow(n int) (b bool) { - var i int - - for i = range l.limiters { - if b = l.limiters[i].Allow(n); !b { - break - } - } - if !b && i > 0 && n > 0 { - for i := range l.limiters[:i] { - l.limiters[i].Allow(-n) - } - } - - return -} - -func (l *limiterGroup) Limit() int { - if len(l.limiters) == 0 { - return 0 - } - - return l.limiters[0].Limit() -} - type options struct { limits []string fileLoader loader.Loader diff --git a/limiter/conn/limiter.go b/limiter/conn/limiter.go index 9920a98..5d2f926 100644 --- a/limiter/conn/limiter.go +++ b/limiter/conn/limiter.go @@ -1,6 +1,7 @@ package conn import ( + "sort" "sync/atomic" limiter "github.com/go-gost/core/limiter/conn" @@ -28,3 +29,39 @@ func (l *llimiter) Allow(n int) bool { } return true } + +type limiterGroup struct { + limiters []limiter.Limiter +} + +func newLimiterGroup(limiters ...limiter.Limiter) *limiterGroup { + sort.Slice(limiters, func(i, j int) bool { + return limiters[i].Limit() < limiters[j].Limit() + }) + return &limiterGroup{limiters: limiters} +} + +func (l *limiterGroup) Allow(n int) (b bool) { + var i int + + for i = range l.limiters { + if b = l.limiters[i].Allow(n); !b { + break + } + } + if !b && i > 0 && n > 0 { + for i := range l.limiters[:i] { + l.limiters[i].Allow(-n) + } + } + + return +} + +func (l *limiterGroup) Limit() int { + if len(l.limiters) == 0 { + return 0 + } + + return l.limiters[0].Limit() +} diff --git a/limiter/rate/limiter.go b/limiter/rate/limiter.go index 8057185..f2ef982 100644 --- a/limiter/rate/limiter.go +++ b/limiter/rate/limiter.go @@ -1,6 +1,7 @@ package rate import ( + "sort" "time" limiter "github.com/go-gost/core/limiter/rate" @@ -24,3 +25,32 @@ func (l *rlimiter) Allow(n int) bool { func (l *rlimiter) Limit() float64 { return float64(l.limiter.Limit()) } + +type limiterGroup struct { + limiters []limiter.Limiter +} + +func newLimiterGroup(limiters ...limiter.Limiter) *limiterGroup { + sort.Slice(limiters, func(i, j int) bool { + return limiters[i].Limit() < limiters[j].Limit() + }) + return &limiterGroup{limiters: limiters} +} + +func (l *limiterGroup) Allow(n int) (b bool) { + b = true + for i := range l.limiters { + if v := l.limiters[i].Allow(n); !v { + b = false + } + } + return +} + +func (l *limiterGroup) Limit() float64 { + if len(l.limiters) == 0 { + return 0 + } + + return l.limiters[0].Limit() +} diff --git a/limiter/rate/rate.go b/limiter/rate/rate.go index 326588e..3461615 100644 --- a/limiter/rate/rate.go +++ b/limiter/rate/rate.go @@ -5,7 +5,6 @@ import ( "context" "io" "net" - "sort" "strconv" "strings" "sync" @@ -22,35 +21,6 @@ const ( IPLimitKey = "$$" ) -type limiterGroup struct { - limiters []limiter.Limiter -} - -func newLimiterGroup(limiters ...limiter.Limiter) *limiterGroup { - sort.Slice(limiters, func(i, j int) bool { - return limiters[i].Limit() < limiters[j].Limit() - }) - return &limiterGroup{limiters: limiters} -} - -func (l *limiterGroup) Allow(n int) (b bool) { - b = true - for i := range l.limiters { - if v := l.limiters[i].Allow(n); !v { - b = false - } - } - return -} - -func (l *limiterGroup) Limit() float64 { - if len(l.limiters) == 0 { - return 0 - } - - return l.limiters[0].Limit() -} - type options struct { limits []string fileLoader loader.Loader diff --git a/listener/grpc/listener.go b/listener/grpc/listener.go index 4e63660..546dbda 100644 --- a/listener/grpc/listener.go +++ b/listener/grpc/listener.go @@ -17,6 +17,7 @@ import ( "github.com/go-gost/x/registry" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" ) func init() { @@ -67,6 +68,19 @@ func (l *grpcListener) Init(md md.Metadata) (err error) { if !l.md.insecure { opts = append(opts, grpc.Creds(credentials.NewTLS(l.options.TLSConfig))) } + if l.md.keepalive { + opts = append(opts, + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: l.md.keepaliveMinTime, + PermitWithoutStream: l.md.keepalivePermitWithoutStream, + }), + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: l.md.keepaliveMaxConnectionIdle, + Time: l.md.keepaliveTime, + Timeout: l.md.keepaliveTimeout, + }), + ) + } l.server = grpc.NewServer(opts...) l.addr = ln.Addr() diff --git a/listener/grpc/metadata.go b/listener/grpc/metadata.go index 292918c..662e1ce 100644 --- a/listener/grpc/metadata.go +++ b/listener/grpc/metadata.go @@ -1,6 +1,8 @@ package grpc import ( + "time" + mdata "github.com/go-gost/core/metadata" mdutil "github.com/go-gost/core/metadata/util" ) @@ -10,24 +12,44 @@ const ( ) type metadata struct { - backlog int - insecure bool - path string + backlog int + insecure bool + path string + keepalive bool + keepaliveMinTime time.Duration + keepaliveTime time.Duration + keepaliveTimeout time.Duration + keepalivePermitWithoutStream bool + keepaliveMaxConnectionIdle time.Duration } func (l *grpcListener) parseMetadata(md mdata.Metadata) (err error) { - const ( - backlog = "backlog" - insecure = "grpcInsecure" - path = "path" - ) - - l.md.backlog = mdutil.GetInt(md, backlog) + l.md.backlog = mdutil.GetInt(md, "grpc.backlog", "backlog") if l.md.backlog <= 0 { l.md.backlog = defaultBacklog } - l.md.insecure = mdutil.GetBool(md, insecure) - l.md.path = mdutil.GetString(md, path) + l.md.insecure = mdutil.GetBool(md, "grpc.insecure", "grpcInsecure", "insecure") + l.md.path = mdutil.GetString(md, "grpc.path", "path") + + l.md.keepalive = mdutil.GetBool(md, "grpc.keepalive", "keepalive", "keepAlive") + if l.md.keepalive { + l.md.keepaliveMinTime = mdutil.GetDuration(md, "grpc.keepalive.minTime", "keepalive.minTime") + if l.md.keepaliveMinTime <= 0 { + l.md.keepaliveMinTime = 30 * time.Second + } + l.md.keepaliveTime = mdutil.GetDuration(md, "grpc.keepalive.time", "keepalive.time") + if l.md.keepaliveTime <= 0 { + l.md.keepaliveTime = 60 * time.Second + } + l.md.keepaliveTimeout = mdutil.GetDuration(md, "grpc.keepalive.timeout", "keepalive.timeout") + if l.md.keepaliveTimeout <= 0 { + l.md.keepaliveTimeout = 30 * time.Second + } + + l.md.keepalivePermitWithoutStream = mdutil.GetBool(md, "grpc.keepalive.permitWithoutStream", "keepalive.permitWithoutStream") + l.md.keepaliveMaxConnectionIdle = mdutil.GetDuration(md, "grpc.keepalive.maxConnectionIdle", "keepalive.maxConnectionIdle") + } + return } diff --git a/recorder/redis.go b/recorder/redis.go index 9157140..2c4749d 100644 --- a/recorder/redis.go +++ b/recorder/redis.go @@ -99,3 +99,40 @@ func (r *redisListRecorder) Record(ctx context.Context, b []byte) error { func (r *redisListRecorder) Close() error { return r.client.Close() } + +type redisSortedSetRecorder struct { + client *redis.Client + key string +} + +// RedisSortedSetRecorder records data to a redis sorted set. +func RedisSortedSetRecorder(addr string, opts ...RedisRecorderOption) recorder.Recorder { + var options redisRecorderOptions + for _, opt := range opts { + opt(&options) + } + + return &redisSortedSetRecorder{ + client: redis.NewClient(&redis.Options{ + Addr: addr, + Password: options.password, + DB: options.db, + }), + key: options.key, + } +} + +func (r *redisSortedSetRecorder) Record(ctx context.Context, b []byte) error { + if r.key == "" { + return nil + } + + return r.client.ZIncr(ctx, r.key, &redis.Z{ + Score: 1, + Member: b, + }).Err() +} + +func (r *redisSortedSetRecorder) Close() error { + return r.client.Close() +}