add more options for grpc

This commit is contained in:
ginuerzh 2022-12-21 18:17:46 +08:00
parent 86acbf7f66
commit ee2c05494c
12 changed files with 254 additions and 93 deletions

View File

@ -276,9 +276,15 @@ definitions:
type: string
type: array
x-go-name: Bypasses
host:
type: string
x-go-name: Host
name:
type: string
x-go-name: Name
protocol:
type: string
x-go-name: Protocol
type: object
x-go-package: github.com/go-gost/x/config
ForwarderConfig:
@ -469,6 +475,50 @@ definitions:
output:
type: string
x-go-name: Output
rotation:
$ref: '#/definitions/LogRotationConfig'
type: object
x-go-package: github.com/go-gost/x/config
LogRotationConfig:
properties:
compress:
description: |-
Compress determines if the rotated log files should be compressed
using gzip. The default is not to perform compression.
type: boolean
x-go-name: Compress
localTime:
description: |-
LocalTime determines if the time used for formatting the timestamps in
backup files is the computer's local time. The default is to use UTC
time.
type: boolean
x-go-name: LocalTime
maxAge:
description: |-
MaxAge is the maximum number of days to retain old log files based on the
timestamp encoded in their filename. Note that a day is defined as 24
hours and may not exactly correspond to calendar days due to daylight
savings, leap seconds, etc. The default is not to remove old log files
based on age.
format: int64
type: integer
x-go-name: MaxAge
maxBackups:
description: |-
MaxBackups is the maximum number of old log files to retain. The default
is to retain all old log files (though MaxAge may still cause them to get
deleted.)
format: int64
type: integer
x-go-name: MaxBackups
maxSize:
description: |-
MaxSize is the maximum size in megabytes of the log file before it gets
rotated. It defaults to 100 megabytes.
format: int64
type: integer
x-go-name: MaxSize
type: object
x-go-package: github.com/go-gost/x/config
MetricsConfig:
@ -521,6 +571,9 @@ definitions:
$ref: '#/definitions/ConnectorConfig'
dialer:
$ref: '#/definitions/DialerConfig'
host:
type: string
x-go-name: Host
hosts:
type: string
x-go-name: Hosts
@ -534,6 +587,9 @@ definitions:
name:
type: string
x-go-name: Name
protocol:
type: string
x-go-name: Protocol
resolver:
type: string
x-go-name: Resolver

View File

@ -339,6 +339,12 @@ func ParseRecorder(cfg *config.RecorderConfig) (r recorder.Recorder) {
xrecorder.KeyRedisRecorderOption(cfg.Redis.Key),
xrecorder.PasswordRedisRecorderOption(cfg.Redis.Password),
)
case "sset": // sorted set
return xrecorder.RedisSortedSetRecorder(cfg.Redis.Addr,
xrecorder.DBRedisRecorderOption(cfg.Redis.DB),
xrecorder.KeyRedisRecorderOption(cfg.Redis.Key),
xrecorder.PasswordRedisRecorderOption(cfg.Redis.Password),
)
default: // redis set
return xrecorder.RedisSetRecorder(cfg.Redis.Addr,
xrecorder.DBRedisRecorderOption(cfg.Redis.DB),

View File

@ -66,6 +66,7 @@ func (c *conn) Close() error {
case <-c.closed:
default:
close(c.closed)
return c.c.CloseSend()
}
return nil

View File

@ -4,7 +4,6 @@ import (
"context"
"net"
"sync"
"time"
"github.com/go-gost/core/dialer"
md "github.com/go-gost/core/metadata"
@ -14,6 +13,7 @@ import (
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)
func init() {
@ -82,7 +82,7 @@ func (d *grpcDialer) Dial(ctx context.Context, addr string, opts ...dialer.DialO
grpc.WithAuthority(host),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.DefaultConfig,
MinConnectTimeout: 10 * time.Second,
MinConnectTimeout: d.md.minConnectTimeout,
}),
grpc.FailOnNonTempDialError(true),
}
@ -92,6 +92,14 @@ func (d *grpcDialer) Dial(ctx context.Context, addr string, opts ...dialer.DialO
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
if d.md.keepalive {
grpcOpts = append(grpcOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: d.md.keepaliveTime,
Timeout: d.md.keepaliveTimeout,
PermitWithoutStream: d.md.keepalivePermitWithoutStream,
}))
}
cc, err := grpc.DialContext(ctx, addr, grpcOpts...)
if err != nil {
d.options.Logger.Error(err)

View File

@ -1,6 +1,8 @@
package grpc
import (
"time"
mdata "github.com/go-gost/core/metadata"
mdutil "github.com/go-gost/core/metadata/util"
)
@ -9,18 +11,33 @@ type metadata struct {
insecure bool
host string
path string
keepalive bool
keepaliveTime time.Duration
keepaliveTimeout time.Duration
keepalivePermitWithoutStream bool
minConnectTimeout time.Duration
}
func (d *grpcDialer) parseMetadata(md mdata.Metadata) (err error) {
const (
insecure = "grpcInsecure"
host = "host"
path = "path"
)
d.md.insecure = mdutil.GetBool(md, insecure)
d.md.host = mdutil.GetString(md, host)
d.md.path = mdutil.GetString(md, path)
d.md.insecure = mdutil.GetBool(md, "grpc.insecure", "grpcInsecure", "insecure")
d.md.host = mdutil.GetString(md, "grpc.authority", "grpc.host", "host")
d.md.path = mdutil.GetString(md, "grpc.path", "path")
d.md.keepalive = mdutil.GetBool(md, "grpc.keepalive", "keepalive", "keepAlive")
if d.md.keepalive {
d.md.keepaliveTime = mdutil.GetDuration(md, "grpc.keepalive.time", "keepalive.time")
if d.md.keepaliveTime <= 0 {
d.md.keepaliveTime = 30 * time.Second
}
d.md.keepaliveTimeout = mdutil.GetDuration(md, "grpc.keepalive.timeout", "keepalive.timeout")
if d.md.keepaliveTimeout <= 0 {
d.md.keepaliveTimeout = 30 * time.Second
}
d.md.keepalivePermitWithoutStream = mdutil.GetBool(md, "grpc.keepalive.permitWithoutStream", "keepalive.permitWithoutStream")
}
d.md.minConnectTimeout = mdutil.GetDuration(md, "grpc.minConnectTimeout", "minConnectTimeout")
if d.md.minConnectTimeout <= 0 {
d.md.minConnectTimeout = 30 * time.Second
}
return
}

View File

@ -5,7 +5,6 @@ import (
"context"
"io"
"net"
"sort"
"strconv"
"strings"
"sync"
@ -22,42 +21,6 @@ const (
IPLimitKey = "$$"
)
type limiterGroup struct {
limiters []limiter.Limiter
}
func newLimiterGroup(limiters ...limiter.Limiter) *limiterGroup {
sort.Slice(limiters, func(i, j int) bool {
return limiters[i].Limit() < limiters[j].Limit()
})
return &limiterGroup{limiters: limiters}
}
func (l *limiterGroup) Allow(n int) (b bool) {
var i int
for i = range l.limiters {
if b = l.limiters[i].Allow(n); !b {
break
}
}
if !b && i > 0 && n > 0 {
for i := range l.limiters[:i] {
l.limiters[i].Allow(-n)
}
}
return
}
func (l *limiterGroup) Limit() int {
if len(l.limiters) == 0 {
return 0
}
return l.limiters[0].Limit()
}
type options struct {
limits []string
fileLoader loader.Loader

View File

@ -1,6 +1,7 @@
package conn
import (
"sort"
"sync/atomic"
limiter "github.com/go-gost/core/limiter/conn"
@ -28,3 +29,39 @@ func (l *llimiter) Allow(n int) bool {
}
return true
}
type limiterGroup struct {
limiters []limiter.Limiter
}
func newLimiterGroup(limiters ...limiter.Limiter) *limiterGroup {
sort.Slice(limiters, func(i, j int) bool {
return limiters[i].Limit() < limiters[j].Limit()
})
return &limiterGroup{limiters: limiters}
}
func (l *limiterGroup) Allow(n int) (b bool) {
var i int
for i = range l.limiters {
if b = l.limiters[i].Allow(n); !b {
break
}
}
if !b && i > 0 && n > 0 {
for i := range l.limiters[:i] {
l.limiters[i].Allow(-n)
}
}
return
}
func (l *limiterGroup) Limit() int {
if len(l.limiters) == 0 {
return 0
}
return l.limiters[0].Limit()
}

View File

@ -1,6 +1,7 @@
package rate
import (
"sort"
"time"
limiter "github.com/go-gost/core/limiter/rate"
@ -24,3 +25,32 @@ func (l *rlimiter) Allow(n int) bool {
func (l *rlimiter) Limit() float64 {
return float64(l.limiter.Limit())
}
type limiterGroup struct {
limiters []limiter.Limiter
}
func newLimiterGroup(limiters ...limiter.Limiter) *limiterGroup {
sort.Slice(limiters, func(i, j int) bool {
return limiters[i].Limit() < limiters[j].Limit()
})
return &limiterGroup{limiters: limiters}
}
func (l *limiterGroup) Allow(n int) (b bool) {
b = true
for i := range l.limiters {
if v := l.limiters[i].Allow(n); !v {
b = false
}
}
return
}
func (l *limiterGroup) Limit() float64 {
if len(l.limiters) == 0 {
return 0
}
return l.limiters[0].Limit()
}

View File

@ -5,7 +5,6 @@ import (
"context"
"io"
"net"
"sort"
"strconv"
"strings"
"sync"
@ -22,35 +21,6 @@ const (
IPLimitKey = "$$"
)
type limiterGroup struct {
limiters []limiter.Limiter
}
func newLimiterGroup(limiters ...limiter.Limiter) *limiterGroup {
sort.Slice(limiters, func(i, j int) bool {
return limiters[i].Limit() < limiters[j].Limit()
})
return &limiterGroup{limiters: limiters}
}
func (l *limiterGroup) Allow(n int) (b bool) {
b = true
for i := range l.limiters {
if v := l.limiters[i].Allow(n); !v {
b = false
}
}
return
}
func (l *limiterGroup) Limit() float64 {
if len(l.limiters) == 0 {
return 0
}
return l.limiters[0].Limit()
}
type options struct {
limits []string
fileLoader loader.Loader

View File

@ -17,6 +17,7 @@ import (
"github.com/go-gost/x/registry"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
func init() {
@ -67,6 +68,19 @@ func (l *grpcListener) Init(md md.Metadata) (err error) {
if !l.md.insecure {
opts = append(opts, grpc.Creds(credentials.NewTLS(l.options.TLSConfig)))
}
if l.md.keepalive {
opts = append(opts,
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: l.md.keepaliveMinTime,
PermitWithoutStream: l.md.keepalivePermitWithoutStream,
}),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: l.md.keepaliveMaxConnectionIdle,
Time: l.md.keepaliveTime,
Timeout: l.md.keepaliveTimeout,
}),
)
}
l.server = grpc.NewServer(opts...)
l.addr = ln.Addr()

View File

@ -1,6 +1,8 @@
package grpc
import (
"time"
mdata "github.com/go-gost/core/metadata"
mdutil "github.com/go-gost/core/metadata/util"
)
@ -13,21 +15,41 @@ type metadata struct {
backlog int
insecure bool
path string
keepalive bool
keepaliveMinTime time.Duration
keepaliveTime time.Duration
keepaliveTimeout time.Duration
keepalivePermitWithoutStream bool
keepaliveMaxConnectionIdle time.Duration
}
func (l *grpcListener) parseMetadata(md mdata.Metadata) (err error) {
const (
backlog = "backlog"
insecure = "grpcInsecure"
path = "path"
)
l.md.backlog = mdutil.GetInt(md, backlog)
l.md.backlog = mdutil.GetInt(md, "grpc.backlog", "backlog")
if l.md.backlog <= 0 {
l.md.backlog = defaultBacklog
}
l.md.insecure = mdutil.GetBool(md, insecure)
l.md.path = mdutil.GetString(md, path)
l.md.insecure = mdutil.GetBool(md, "grpc.insecure", "grpcInsecure", "insecure")
l.md.path = mdutil.GetString(md, "grpc.path", "path")
l.md.keepalive = mdutil.GetBool(md, "grpc.keepalive", "keepalive", "keepAlive")
if l.md.keepalive {
l.md.keepaliveMinTime = mdutil.GetDuration(md, "grpc.keepalive.minTime", "keepalive.minTime")
if l.md.keepaliveMinTime <= 0 {
l.md.keepaliveMinTime = 30 * time.Second
}
l.md.keepaliveTime = mdutil.GetDuration(md, "grpc.keepalive.time", "keepalive.time")
if l.md.keepaliveTime <= 0 {
l.md.keepaliveTime = 60 * time.Second
}
l.md.keepaliveTimeout = mdutil.GetDuration(md, "grpc.keepalive.timeout", "keepalive.timeout")
if l.md.keepaliveTimeout <= 0 {
l.md.keepaliveTimeout = 30 * time.Second
}
l.md.keepalivePermitWithoutStream = mdutil.GetBool(md, "grpc.keepalive.permitWithoutStream", "keepalive.permitWithoutStream")
l.md.keepaliveMaxConnectionIdle = mdutil.GetDuration(md, "grpc.keepalive.maxConnectionIdle", "keepalive.maxConnectionIdle")
}
return
}

View File

@ -99,3 +99,40 @@ func (r *redisListRecorder) Record(ctx context.Context, b []byte) error {
func (r *redisListRecorder) Close() error {
return r.client.Close()
}
type redisSortedSetRecorder struct {
client *redis.Client
key string
}
// RedisSortedSetRecorder records data to a redis sorted set.
func RedisSortedSetRecorder(addr string, opts ...RedisRecorderOption) recorder.Recorder {
var options redisRecorderOptions
for _, opt := range opts {
opt(&options)
}
return &redisSortedSetRecorder{
client: redis.NewClient(&redis.Options{
Addr: addr,
Password: options.password,
DB: options.db,
}),
key: options.key,
}
}
func (r *redisSortedSetRecorder) Record(ctx context.Context, b []byte) error {
if r.key == "" {
return nil
}
return r.client.ZIncr(ctx, r.key, &redis.Z{
Score: 1,
Member: b,
}).Err()
}
func (r *redisSortedSetRecorder) Close() error {
return r.client.Close()
}