fix race condition

This commit is contained in:
ginuerzh 2023-10-18 19:19:43 +08:00
parent 3f3deb98b8
commit 54b56df214
3 changed files with 43 additions and 32 deletions

View File

@ -2,7 +2,6 @@ package local
import ( import (
"bufio" "bufio"
"bytes"
"context" "context"
"crypto/tls" "crypto/tls"
"errors" "errors"
@ -195,7 +194,7 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, log l
err = func() error { err = func() error {
req, err := http.ReadRequest(br) req, err := http.ReadRequest(br)
if err != nil { if err != nil {
log.Errorf("read http request: %v", err) // log.Errorf("read http request: %v", err)
return err return err
} }
@ -268,10 +267,14 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, log l
}) })
} }
if err := req.Write(cc); err != nil {
cc.Close()
log.Warnf("send request to node %s(%s): %v", target.Name, target.Addr, err)
return resp.Write(rw)
}
if req.Header.Get("Upgrade") == "websocket" { if req.Header.Get("Upgrade") == "websocket" {
var buf bytes.Buffer err := xnet.Transport(cc, xio.NewReadWriter(br, rw))
req.Write(&buf)
err := xnet.Transport(cc, xio.NewReadWriter(io.MultiReader(&buf, br), rw))
if err == nil { if err == nil {
err = io.EOF err = io.EOF
} }
@ -281,12 +284,6 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, log l
go func() { go func() {
defer cc.Close() defer cc.Close()
if err := req.Write(cc); err != nil {
log.Warnf("send request to node %s(%s): %v", target.Name, target.Addr, err)
resp.Write(rw)
return
}
res, err := http.ReadResponse(bufio.NewReader(cc), req) res, err := http.ReadResponse(bufio.NewReader(cc), req)
if err != nil { if err != nil {
log.Warnf("read response from node %s(%s): %v", target.Name, target.Addr, err) log.Warnf("read response from node %s(%s): %v", target.Name, target.Addr, err)

View File

@ -2,7 +2,6 @@ package remote
import ( import (
"bufio" "bufio"
"bytes"
"context" "context"
"crypto/tls" "crypto/tls"
"errors" "errors"
@ -197,7 +196,7 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, remot
err = func() error { err = func() error {
req, err := http.ReadRequest(br) req, err := http.ReadRequest(br)
if err != nil { if err != nil {
log.Errorf("read http request: %v", err) // log.Errorf("read http request: %v", err)
return err return err
} }
@ -272,10 +271,14 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, remot
cc = proxyproto.WrapClientConn(h.md.proxyProtocol, remoteAddr, localAddr, cc) cc = proxyproto.WrapClientConn(h.md.proxyProtocol, remoteAddr, localAddr, cc)
if err := req.Write(cc); err != nil {
cc.Close()
log.Warnf("send request to node %s(%s): %v", target.Name, target.Addr, err)
return resp.Write(rw)
}
if req.Header.Get("Upgrade") == "websocket" { if req.Header.Get("Upgrade") == "websocket" {
var buf bytes.Buffer err := xnet.Transport(cc, xio.NewReadWriter(br, rw))
req.Write(&buf)
err := xnet.Transport(cc, xio.NewReadWriter(io.MultiReader(&buf, br), rw))
if err == nil { if err == nil {
err = io.EOF err = io.EOF
} }
@ -285,12 +288,6 @@ func (h *forwardHandler) handleHTTP(ctx context.Context, rw io.ReadWriter, remot
go func() { go func() {
defer cc.Close() defer cc.Close()
if err := req.Write(cc); err != nil {
log.Warnf("send request to node %s(%s): %v", target.Name, target.Addr, err)
resp.Write(rw)
return
}
res, err := http.ReadResponse(bufio.NewReader(cc), req) res, err := http.ReadResponse(bufio.NewReader(cc), req)
if err != nil { if err != nil {
log.Warnf("read response from node %s(%s): %v", target.Name, target.Addr, err) log.Warnf("read response from node %s(%s): %v", target.Name, target.Addr, err)

View File

@ -3,6 +3,7 @@ package rtcp
import ( import (
"context" "context"
"net" "net"
"sync"
"github.com/go-gost/core/chain" "github.com/go-gost/core/chain"
"github.com/go-gost/core/listener" "github.com/go-gost/core/listener"
@ -27,6 +28,7 @@ type rtcpListener struct {
logger logger.Logger logger logger.Logger
closed chan struct{} closed chan struct{}
options listener.Options options listener.Options
mu sync.Mutex
} }
func NewListener(opts ...listener.Option) listener.Listener { func NewListener(opts ...listener.Option) listener.Listener {
@ -72,23 +74,25 @@ func (l *rtcpListener) Accept() (conn net.Conn, err error) {
default: default:
} }
if l.ln == nil { ln := l.getListener()
l.ln, err = l.router.Bind( if ln == nil {
ln, err = l.router.Bind(
context.Background(), "tcp", l.laddr.String(), context.Background(), "tcp", l.laddr.String(),
chain.MuxBindOption(true), chain.MuxBindOption(true),
) )
if err != nil { if err != nil {
return nil, listener.NewAcceptError(err) return nil, listener.NewAcceptError(err)
} }
l.ln = metrics.WrapListener(l.options.Service, l.ln) ln = metrics.WrapListener(l.options.Service, ln)
l.ln = admission.WrapListener(l.options.Admission, l.ln) ln = admission.WrapListener(l.options.Admission, ln)
l.ln = limiter.WrapListener(l.options.TrafficLimiter, l.ln) ln = limiter.WrapListener(l.options.TrafficLimiter, ln)
l.ln = climiter.WrapListener(l.options.ConnLimiter, l.ln) ln = climiter.WrapListener(l.options.ConnLimiter, ln)
l.setListener(ln)
} }
conn, err = l.ln.Accept() conn, err = l.ln.Accept()
if err != nil { if err != nil {
l.ln.Close() ln.Close()
l.ln = nil l.setListener(nil)
return nil, listener.NewAcceptError(err) return nil, listener.NewAcceptError(err)
} }
return return
@ -103,8 +107,9 @@ func (l *rtcpListener) Close() error {
case <-l.closed: case <-l.closed:
default: default:
close(l.closed) close(l.closed)
if l.ln != nil { ln := l.getListener()
l.ln.Close() if ln != nil {
ln.Close()
// l.ln = nil // l.ln = nil
} }
} }
@ -112,6 +117,18 @@ func (l *rtcpListener) Close() error {
return nil return nil
} }
func (l *rtcpListener) setListener(ln net.Listener) {
l.mu.Lock()
defer l.mu.Unlock()
l.ln = ln
}
func (l *rtcpListener) getListener() net.Listener {
l.mu.Lock()
defer l.mu.Unlock()
return l.ln
}
type bindAddr struct { type bindAddr struct {
addr string addr string
} }