grpc: cancel stream

This commit is contained in:
ginuerzh 2022-12-30 19:34:48 +08:00
parent bb106e2d89
commit 3b245ec381
2 changed files with 8 additions and 18 deletions

View File

@ -1,8 +1,8 @@
package grpc package grpc
import ( import (
"context"
"errors" "errors"
"io"
"net" "net"
"time" "time"
@ -14,7 +14,7 @@ type conn struct {
rb []byte rb []byte
localAddr net.Addr localAddr net.Addr
remoteAddr net.Addr remoteAddr net.Addr
closed chan struct{} cancelFunc context.CancelFunc
} }
func (c *conn) Read(b []byte) (n int, err error) { func (c *conn) Read(b []byte) (n int, err error) {
@ -22,9 +22,6 @@ func (c *conn) Read(b []byte) (n int, err error) {
case <-c.c.Context().Done(): case <-c.c.Context().Done():
err = c.c.Context().Err() err = c.c.Context().Err()
return return
case <-c.closed:
err = io.ErrClosedPipe
return
default: default:
} }
@ -46,9 +43,6 @@ func (c *conn) Write(b []byte) (n int, err error) {
case <-c.c.Context().Done(): case <-c.c.Context().Done():
err = c.c.Context().Err() err = c.c.Context().Err()
return return
case <-c.closed:
err = io.ErrClosedPipe
return
default: default:
} }
@ -62,14 +56,8 @@ func (c *conn) Write(b []byte) (n int, err error) {
} }
func (c *conn) Close() error { func (c *conn) Close() error {
select { defer c.cancelFunc()
case <-c.closed:
default:
close(c.closed)
return c.c.CloseSend() return c.c.CloseSend()
}
return nil
} }
func (c *conn) LocalAddr() net.Addr { func (c *conn) LocalAddr() net.Addr {

View File

@ -109,8 +109,10 @@ func (d *grpcDialer) Dial(ctx context.Context, addr string, opts ...dialer.DialO
d.clients[addr] = client d.clients[addr] = client
} }
cli, err := client.TunnelX(context.Background(), d.md.path) ctx2, cancel := context.WithCancel(context.Background())
cli, err := client.TunnelX(ctx2, d.md.path)
if err != nil { if err != nil {
cancel()
return nil, err return nil, err
} }
@ -118,6 +120,6 @@ func (d *grpcDialer) Dial(ctx context.Context, addr string, opts ...dialer.DialO
c: cli, c: cli,
localAddr: &net.TCPAddr{}, localAddr: &net.TCPAddr{},
remoteAddr: &net.TCPAddr{}, remoteAddr: &net.TCPAddr{},
closed: make(chan struct{}), cancelFunc: cancel,
}, nil }, nil
} }