add chain group

This commit is contained in:
ginuerzh
2022-09-02 10:54:42 +08:00
parent e77908a89e
commit b88ab3acdc
7 changed files with 286 additions and 143 deletions

View File

@ -54,7 +54,7 @@ func (r *Route) Dial(ctx context.Context, network, address string, opts ...DialO
return nil, err
}
cc, err := r.GetNode(r.Len()-1).Transport.Connect(ctx, conn, network, address)
cc, err := r.GetNode(r.Len()-1).transport.Connect(ctx, conn, network, address)
if err != nil {
conn.Close()
return nil, err
@ -72,7 +72,7 @@ func (r *Route) Bind(ctx context.Context, network, address string, opts ...conne
return nil, err
}
ln, err := r.GetNode(r.Len()-1).Transport.Bind(ctx, conn, network, address, opts...)
ln, err := r.GetNode(r.Len()-1).transport.Bind(ctx, conn, network, address, opts...)
if err != nil {
conn.Close()
return nil, err
@ -90,34 +90,54 @@ func (r *Route) connect(ctx context.Context) (conn net.Conn, err error) {
node := r.nodes[0]
defer func() {
if err != nil && r.chain != nil {
if v := metrics.GetCounter(metrics.MetricChainErrorsCounter,
metrics.Labels{"chain": r.chain.name, "node": node.Name}); v != nil {
v.Inc()
if r.chain != nil {
marker := r.chain.Marker()
// chain error
if err != nil {
if marker != nil {
marker.Mark()
}
if v := metrics.GetCounter(metrics.MetricChainErrorsCounter,
metrics.Labels{"chain": r.chain.name, "node": node.Name}); v != nil {
v.Inc()
}
} else {
if marker != nil {
marker.Reset()
}
}
}
}()
addr, err := resolve(ctx, network, node.Addr, node.Resolver, node.Hosts, r.logger)
addr, err := resolve(ctx, network, node.Addr, node.resolver, node.hostMapper, r.logger)
marker := node.Marker()
if err != nil {
node.Marker.Mark()
if marker != nil {
marker.Mark()
}
return
}
start := time.Now()
cc, err := node.Transport.Dial(ctx, addr)
cc, err := node.transport.Dial(ctx, addr)
if err != nil {
node.Marker.Mark()
if marker != nil {
marker.Mark()
}
return
}
cn, err := node.Transport.Handshake(ctx, cc)
cn, err := node.transport.Handshake(ctx, cc)
if err != nil {
cc.Close()
node.Marker.Mark()
if marker != nil {
marker.Mark()
}
return
}
node.Marker.Reset()
if marker != nil {
marker.Reset()
}
if r.chain != nil {
if v := metrics.GetObserver(metrics.MetricNodeConnectDurationObserver,
@ -128,25 +148,34 @@ func (r *Route) connect(ctx context.Context) (conn net.Conn, err error) {
preNode := node
for _, node := range r.nodes[1:] {
addr, err = resolve(ctx, network, node.Addr, node.Resolver, node.Hosts, r.logger)
marker := node.Marker()
addr, err = resolve(ctx, network, node.Addr, node.resolver, node.hostMapper, r.logger)
if err != nil {
cn.Close()
node.Marker.Mark()
if marker != nil {
marker.Mark()
}
return
}
cc, err = preNode.Transport.Connect(ctx, cn, "tcp", addr)
cc, err = preNode.transport.Connect(ctx, cn, "tcp", addr)
if err != nil {
cn.Close()
node.Marker.Mark()
if marker != nil {
marker.Mark()
}
return
}
cc, err = node.Transport.Handshake(ctx, cc)
cc, err = node.transport.Handshake(ctx, cc)
if err != nil {
cn.Close()
node.Marker.Mark()
if marker != nil {
marker.Mark()
}
return
}
node.Marker.Reset()
if marker != nil {
marker.Reset()
}
cn = cc
preNode = node
@ -176,8 +205,8 @@ func (r *Route) Path() (path []*Node) {
}
for _, node := range r.nodes {
if node.Transport != nil && node.Transport.route != nil {
path = append(path, node.Transport.route.Path()...)
if node.transport != nil && node.transport.route != nil {
path = append(path, node.transport.route.Path()...)
}
path = append(path, node)
}