package mux
import (
"errors"
"io"
"net"
"sync"
"github.com/pion/ice/v4"
"github.com/pion/logging"
"github.com/pion/transport/v3/packetio"
)
const (
maxBufferSize = 1000 * 1000
maxPendingPackets = 15
)
type Config struct {
Conn net .Conn
BufferSize int
LoggerFactory logging .LoggerFactory
}
type Mux struct {
nextConn net .Conn
bufferSize int
lock sync .Mutex
endpoints map [*Endpoint ]MatchFunc
isClosed bool
pendingPackets [][]byte
closedCh chan struct {}
log logging .LeveledLogger
}
func NewMux (config Config ) *Mux {
mux := &Mux {
nextConn : config .Conn ,
endpoints : make (map [*Endpoint ]MatchFunc ),
bufferSize : config .BufferSize ,
closedCh : make (chan struct {}),
log : config .LoggerFactory .NewLogger ("mux" ),
}
go mux .readLoop ()
return mux
}
func (m *Mux ) NewEndpoint (matchFunc MatchFunc ) *Endpoint {
endpoint := &Endpoint {
mux : m ,
buffer : packetio .NewBuffer (),
}
endpoint .buffer .SetLimitSize (maxBufferSize )
m .lock .Lock ()
m .endpoints [endpoint ] = matchFunc
m .lock .Unlock ()
go m .handlePendingPackets (endpoint , matchFunc )
return endpoint
}
func (m *Mux ) RemoveEndpoint (e *Endpoint ) {
m .lock .Lock ()
defer m .lock .Unlock ()
delete (m .endpoints , e )
}
func (m *Mux ) Close () error {
m .lock .Lock ()
for e := range m .endpoints {
if err := e .close (); err != nil {
m .lock .Unlock ()
return err
}
delete (m .endpoints , e )
}
m .isClosed = true
m .lock .Unlock ()
err := m .nextConn .Close ()
if err != nil {
return err
}
<-m .closedCh
return nil
}
func (m *Mux ) readLoop () {
defer func () {
close (m .closedCh )
}()
buf := make ([]byte , m .bufferSize )
for {
n , err := m .nextConn .Read (buf )
switch {
case errors .Is (err , io .EOF ), errors .Is (err , ice .ErrClosed ):
return
case errors .Is (err , io .ErrShortBuffer ), errors .Is (err , packetio .ErrTimeout ):
m .log .Errorf ("mux: failed to read from packetio.Buffer %s" , err .Error())
continue
case err != nil :
m .log .Errorf ("mux: ending readLoop packetio.Buffer error %s" , err .Error())
return
}
if err = m .dispatch (buf [:n ]); err != nil {
if errors .Is (err , io .ErrClosedPipe ) {
return
}
m .log .Errorf ("mux: ending readLoop dispatch error %s" , err .Error())
return
}
}
}
func (m *Mux ) dispatch (buf []byte ) error {
if len (buf ) == 0 {
m .log .Warnf ("Warning: mux: unable to dispatch zero length packet" )
return nil
}
var endpoint *Endpoint
m .lock .Lock ()
for e , f := range m .endpoints {
if f (buf ) {
endpoint = e
break
}
}
if endpoint == nil {
defer m .lock .Unlock ()
if !m .isClosed {
if len (m .pendingPackets ) >= maxPendingPackets {
m .log .Warnf (
"Warning: mux: no endpoint for packet starting with %d, not adding to queue size(%d)" ,
buf [0 ],
len (m .pendingPackets ),
)
} else {
m .log .Warnf (
"Warning: mux: no endpoint for packet starting with %d, adding to queue size(%d)" ,
buf [0 ],
len (m .pendingPackets ),
)
m .pendingPackets = append (m .pendingPackets , append ([]byte {}, buf ...))
}
}
return nil
}
m .lock .Unlock ()
_ , err := endpoint .buffer .Write (buf )
if errors .Is (err , packetio .ErrFull ) {
m .log .Infof ("mux: endpoint buffer is full, dropping packet" )
return nil
}
return err
}
func (m *Mux ) handlePendingPackets (endpoint *Endpoint , matchFunc MatchFunc ) {
m .lock .Lock ()
defer m .lock .Unlock ()
pendingPackets := make ([][]byte , len (m .pendingPackets ))
for _ , buf := range m .pendingPackets {
if matchFunc (buf ) {
if _ , err := endpoint .buffer .Write (buf ); err != nil {
m .log .Warnf ("Warning: mux: error writing packet to endpoint from pending queue: %s" , err )
}
} else {
pendingPackets = append (pendingPackets , buf )
}
}
m .pendingPackets = pendingPackets
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .