This commit is contained in:
ginuerzh
2021-11-16 16:12:16 +08:00
parent 2c0ce35b0b
commit 83dacf67d5
15 changed files with 543 additions and 266 deletions

102
pkg/common/util/udp/conn.go Normal file
View File

@ -0,0 +1,102 @@
package udp
import (
"errors"
"net"
"sync"
"sync/atomic"
"github.com/go-gost/gost/pkg/common/bufpool"
)
// Conn is a server side connection for UDP client peer, it implements net.Conn and net.PacketConn.
type Conn struct {
net.PacketConn
localAddr net.Addr
remoteAddr net.Addr
rc chan []byte // data receive queue
idle int32 // indicate the connection is idle
closed chan struct{}
closeMutex sync.Mutex
}
func NewConn(c net.PacketConn, localAddr, remoteAddr net.Addr, queueSize int) *Conn {
return &Conn{
PacketConn: c,
localAddr: localAddr,
remoteAddr: remoteAddr,
rc: make(chan []byte, queueSize),
closed: make(chan struct{}),
}
}
func (c *Conn) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
select {
case bb := <-c.rc:
n = copy(b, bb)
c.SetIdle(false)
bufpool.Put(bb)
case <-c.closed:
err = net.ErrClosed
return
}
addr = c.remoteAddr
return
}
func (c *Conn) Read(b []byte) (n int, err error) {
n, _, err = c.ReadFrom(b)
return
}
func (c *Conn) Write(b []byte) (n int, err error) {
return c.WriteTo(b, c.remoteAddr)
}
func (c *Conn) Close() error {
c.closeMutex.Lock()
defer c.closeMutex.Unlock()
select {
case <-c.closed:
default:
close(c.closed)
}
return nil
}
func (c *Conn) LocalAddr() net.Addr {
return c.localAddr
}
func (c *Conn) RemoteAddr() net.Addr {
return c.remoteAddr
}
func (c *Conn) IsIdle() bool {
return atomic.LoadInt32(&c.idle) > 0
}
func (c *Conn) SetIdle(idle bool) {
v := int32(0)
if idle {
v = 1
}
atomic.StoreInt32(&c.idle, v)
}
func (c *Conn) WriteQueue(b []byte) error {
select {
case c.rc <- b:
return nil
case <-c.closed:
return net.ErrClosed
default:
return errors.New("recv queue is full")
}
}

100
pkg/common/util/udp/pool.go Normal file
View File

@ -0,0 +1,100 @@
package udp
import (
"sync"
"time"
"github.com/go-gost/gost/pkg/logger"
)
type ConnPool struct {
m sync.Map
ttl time.Duration
closed chan struct{}
logger logger.Logger
}
func NewConnPool(ttl time.Duration) *ConnPool {
p := &ConnPool{
ttl: ttl,
closed: make(chan struct{}),
}
go p.idleCheck()
return p
}
func (p *ConnPool) WithLogger(logger logger.Logger) *ConnPool {
p.logger = logger
return p
}
func (p *ConnPool) Get(key interface{}) (c *Conn, ok bool) {
v, ok := p.m.Load(key)
if ok {
c, ok = v.(*Conn)
}
return
}
func (p *ConnPool) Set(key interface{}, c *Conn) {
p.m.Store(key, c)
}
func (p *ConnPool) Delete(key interface{}) {
p.m.Delete(key)
}
func (p *ConnPool) Close() {
select {
case <-p.closed:
return
default:
}
close(p.closed)
p.m.Range(func(k, v interface{}) bool {
if c, ok := v.(*Conn); ok && c != nil {
c.Close()
}
return true
})
}
func (p *ConnPool) idleCheck() {
ticker := time.NewTicker(p.ttl)
defer ticker.Stop()
for {
select {
case <-ticker.C:
size := 0
idles := 0
p.m.Range(func(key, value interface{}) bool {
c, ok := value.(*Conn)
if !ok || c == nil {
p.Delete(key)
return true
}
size++
if c.IsIdle() {
idles++
p.Delete(key)
c.Close()
return true
}
c.SetIdle(true)
return true
})
if idles > 0 {
p.logger.Debugf("connection pool: size=%d, idle=%d", size, idles)
}
case <-p.closed:
return
}
}
}