add quic listener

This commit is contained in:
ginuerzh
2021-04-11 19:06:20 +08:00
parent f478584932
commit 7de1a38023
8 changed files with 508 additions and 19 deletions

View File

@ -47,16 +47,18 @@ func (l *Listener) Init(md listener.Metadata) (err error) {
}
config.Init()
var ln *kcp.Listener
if config.TCP {
var conn net.PacketConn
conn, err = tcpraw.Listen("tcp", addr)
if err != nil {
return
}
l.ln, err = kcp.ServeConn(
ln, err = kcp.ServeConn(
blockCrypt(config.Key, config.Crypt, Salt), config.DataShard, config.ParityShard, conn)
} else {
l.ln, err = kcp.ListenWithOptions(addr,
ln, err = kcp.ListenWithOptions(addr,
blockCrypt(config.Key, config.Crypt, Salt), config.DataShard, config.ParityShard)
}
if err != nil {
@ -64,22 +66,19 @@ func (l *Listener) Init(md listener.Metadata) (err error) {
}
if config.DSCP > 0 {
if err = l.ln.SetDSCP(config.DSCP); err != nil {
if err = ln.SetDSCP(config.DSCP); err != nil {
l.logger.Warn(err)
}
}
if err = l.ln.SetReadBuffer(config.SockBuf); err != nil {
if err = ln.SetReadBuffer(config.SockBuf); err != nil {
l.logger.Warn(err)
}
if err = l.ln.SetWriteBuffer(config.SockBuf); err != nil {
if err = ln.SetWriteBuffer(config.SockBuf); err != nil {
l.logger.Warn(err)
}
queueSize := l.md.connQueueSize
if queueSize <= 0 {
queueSize = defaultQueueSize
}
l.connChan = make(chan net.Conn, queueSize)
l.ln = ln
l.connChan = make(chan net.Conn, l.md.connQueueSize)
l.errChan = make(chan error, 1)
go l.listenLoop()
@ -133,6 +132,8 @@ func (l *Listener) listenLoop() {
}
func (l *Listener) mux(conn net.Conn) {
defer conn.Close()
smuxConfig := smux.DefaultConfig()
smuxConfig.MaxReceiveBuffer = l.md.config.SockBuf
smuxConfig.KeepAliveInterval = time.Duration(l.md.config.KeepAlive) * time.Second
@ -158,6 +159,7 @@ func (l *Listener) mux(conn net.Conn) {
select {
case l.connChan <- stream:
case <-stream.GetDieCh():
stream.Close()
default:
stream.Close()
l.logger.Error("connection queue is full")

View File

@ -0,0 +1,142 @@
package quic
import (
"context"
"errors"
"net"
"github.com/go-gost/gost/logger"
"github.com/go-gost/gost/server/listener"
"github.com/go-gost/gost/utils"
"github.com/lucas-clemente/quic-go"
)
var (
_ listener.Listener = (*Listener)(nil)
)
type Listener struct {
md metadata
ln quic.Listener
connChan chan net.Conn
errChan chan error
logger logger.Logger
}
func NewListener(opts ...listener.Option) *Listener {
options := &listener.Options{}
for _, opt := range opts {
opt(options)
}
return &Listener{
logger: options.Logger,
}
}
func (l *Listener) Init(md listener.Metadata) (err error) {
l.md, err = l.parseMetadata(md)
if err != nil {
return
}
laddr, err := net.ResolveUDPAddr("udp", l.md.addr)
if err != nil {
return
}
var conn net.PacketConn
conn, err = net.ListenUDP("udp", laddr)
if err != nil {
return
}
if l.md.cipherKey != nil {
conn = utils.QUICCipherConn(conn, l.md.cipherKey)
}
config := &quic.Config{
KeepAlive: l.md.keepAlive,
HandshakeIdleTimeout: l.md.HandshakeTimeout,
MaxIdleTimeout: l.md.MaxIdleTimeout,
}
ln, err := quic.Listen(conn, l.md.tlsConfig, config)
if err != nil {
return
}
l.ln = ln
l.connChan = make(chan net.Conn, l.md.connQueueSize)
l.errChan = make(chan error, 1)
go l.listenLoop()
return
}
func (l *Listener) Accept() (conn net.Conn, err error) {
var ok bool
select {
case conn = <-l.connChan:
case err, ok = <-l.errChan:
if !ok {
err = listener.ErrClosed
}
}
return
}
func (l *Listener) Close() error {
return l.ln.Close()
}
func (l *Listener) Addr() net.Addr {
return l.ln.Addr()
}
func (l *Listener) listenLoop() {
for {
ctx := context.Background()
session, err := l.ln.Accept(ctx)
if err != nil {
l.logger.Error("accept:", err)
l.errChan <- err
close(l.errChan)
return
}
go l.mux(ctx, session)
}
}
func (l *Listener) mux(ctx context.Context, session quic.Session) {
defer session.CloseWithError(0, "")
for {
stream, err := session.AcceptStream(ctx)
if err != nil {
l.logger.Error("accept stream:", err)
return
}
conn := utils.QUICConn(session, stream)
select {
case l.connChan <- conn:
case <-stream.Context().Done():
stream.Close()
default:
stream.Close()
l.logger.Error("connection queue is full")
}
}
}
func (l *Listener) parseMetadata(md listener.Metadata) (m metadata, err error) {
if val, ok := md[addr]; ok {
m.addr = val
} else {
err = errors.New("missing address")
return
}
return
}

View File

@ -0,0 +1,32 @@
package quic
import (
"crypto/tls"
"time"
)
const (
addr = "addr"
certFile = "certFile"
keyFile = "keyFile"
caFile = "caFile"
keepAlive = "keepAlive"
keepAlivePeriod = "keepAlivePeriod"
)
const (
defaultKeepAlivePeriod = 180 * time.Second
)
type metadata struct {
addr string
tlsConfig *tls.Config
keepAlive bool
HandshakeTimeout time.Duration
MaxIdleTimeout time.Duration
cipherKey []byte
connQueueSize int
}

View File

@ -104,6 +104,7 @@ func (l *Listener) mux(conn net.Conn) {
select {
case l.connChan <- stream:
case <-stream.GetDieCh():
stream.Close()
default:
stream.Close()
l.logger.Error("connection queue is full")

View File

@ -64,11 +64,7 @@ func (l *Listener) Init(md listener.Metadata) (err error) {
ReadHeaderTimeout: l.md.readHeaderTimeout,
}
queueSize := l.md.connQueueSize
if queueSize <= 0 {
queueSize = defaultQueueSize
}
l.connChan = make(chan net.Conn, queueSize)
l.connChan = make(chan net.Conn, l.md.connQueueSize)
l.errChan = make(chan error, 1)
ln, err := net.Listen("tcp", l.md.addr)
@ -173,6 +169,7 @@ func (l *Listener) mux(conn net.Conn) {
select {
case l.connChan <- stream:
case <-stream.GetDieCh():
stream.Close()
default:
stream.Close()
l.logger.Error("connection queue is full")