package libp2pwebrtc
import (
"errors"
"os"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/p2p/transport/webrtc/pb"
"github.com/libp2p/go-msgio/pbio"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v4"
)
const (
maxSendMessageSize = 16384
protoOverhead = 5
varintOverhead = 2
maxTotalControlMessagesSize = 50
maxFINACKWait = 10 * time .Second
maxReceiveMessageSize = 256 <<10 + 1 <<10
)
type receiveState uint8
const (
receiveStateReceiving receiveState = iota
receiveStateDataRead
receiveStateReset
)
type sendState uint8
const (
sendStateSending sendState = iota
sendStateDataSent
sendStateDataReceived
sendStateReset
)
type stream struct {
mx sync .Mutex
readerMx sync .Mutex
reader pbio .Reader
readError error
nextMessage *pb .Message
receiveState receiveState
writer pbio .Writer
writeStateChanged chan struct {}
sendState sendState
writeDeadline time .Time
writeError error
maxSendMessageSize int
controlMessageReaderOnce sync .Once
controlMessageReaderEndTime time .Time
onDoneOnce sync .Once
onDone func ()
id uint16
dataChannel *datachannel .DataChannel
closeForShutdownErr error
}
var _ network .MuxedStream = &stream {}
func newStream(
channel *webrtc .DataChannel ,
rwc datachannel .ReadWriteCloser ,
maxSendMessageSize int ,
onDone func (),
) *stream {
s := &stream {
reader : pbio .NewDelimitedReader (rwc , maxReceiveMessageSize ),
writer : pbio .NewDelimitedWriter (rwc ),
writeStateChanged : make (chan struct {}, 1 ),
id : *channel .ID (),
dataChannel : rwc .(*datachannel .DataChannel ),
onDone : onDone ,
maxSendMessageSize : maxSendMessageSize ,
}
s .dataChannel .SetBufferedAmountLowThreshold (uint64 (s .sendBufferLowThreshold ()))
s .dataChannel .OnBufferedAmountLow (func () {
s .notifyWriteStateChanged ()
})
return s
}
func (s *stream ) Close () error {
s .mx .Lock ()
isClosed := s .closeForShutdownErr != nil
s .mx .Unlock ()
if isClosed {
return nil
}
defer s .cleanup ()
closeWriteErr := s .CloseWrite ()
closeReadErr := s .CloseRead ()
if closeWriteErr != nil || closeReadErr != nil {
s .Reset ()
return errors .Join (closeWriteErr , closeReadErr )
}
s .mx .Lock ()
if s .controlMessageReaderEndTime .IsZero () {
s .controlMessageReaderEndTime = time .Now ().Add (maxFINACKWait )
s .setDataChannelReadDeadline (time .Now ().Add (-1 * time .Hour ))
}
s .mx .Unlock ()
return nil
}
func (s *stream ) Reset () error {
return s .ResetWithError (0 )
}
func (s *stream ) ResetWithError (errCode network .StreamErrorCode ) error {
s .mx .Lock ()
isClosed := s .closeForShutdownErr != nil
s .mx .Unlock ()
if isClosed {
return nil
}
defer s .cleanup ()
cancelWriteErr := s .cancelWrite (errCode )
closeReadErr := s .closeRead (errCode , false )
s .setDataChannelReadDeadline (time .Now ().Add (-1 * time .Hour ))
return errors .Join (closeReadErr , cancelWriteErr )
}
func (s *stream ) closeForShutdown (closeErr error ) {
defer s .cleanup ()
s .mx .Lock ()
defer s .mx .Unlock ()
s .closeForShutdownErr = closeErr
s .notifyWriteStateChanged ()
}
func (s *stream ) SetDeadline (t time .Time ) error {
_ = s .SetReadDeadline (t )
return s .SetWriteDeadline (t )
}
func (s *stream ) processIncomingFlag (msg *pb .Message ) {
if msg .Flag == nil {
return
}
switch msg .GetFlag () {
case pb .Message_STOP_SENDING :
if s .sendState == sendStateSending || s .sendState == sendStateDataSent {
s .sendState = sendStateReset
s .writeError = &network .StreamError {Remote : true , ErrorCode : network .StreamErrorCode (msg .GetErrorCode ())}
}
s .notifyWriteStateChanged ()
case pb .Message_FIN_ACK :
s .sendState = sendStateDataReceived
s .notifyWriteStateChanged ()
case pb .Message_FIN :
if s .receiveState == receiveStateReceiving {
s .receiveState = receiveStateDataRead
}
if err := s .writer .WriteMsg (&pb .Message {Flag : pb .Message_FIN_ACK .Enum ()}); err != nil {
log .Debugf ("failed to send FIN_ACK: %s" , err )
}
s .spawnControlMessageReader ()
case pb .Message_RESET :
if s .receiveState == receiveStateReceiving {
s .receiveState = receiveStateReset
s .readError = &network .StreamError {Remote : true , ErrorCode : network .StreamErrorCode (msg .GetErrorCode ())}
}
if s .sendState == sendStateSending || s .sendState == sendStateDataSent {
s .sendState = sendStateReset
s .writeError = &network .StreamError {Remote : true , ErrorCode : network .StreamErrorCode (msg .GetErrorCode ())}
}
s .spawnControlMessageReader ()
}
}
func (s *stream ) spawnControlMessageReader () {
s .controlMessageReaderOnce .Do (func () {
go func () {
defer s .setDataChannelReadDeadline (time .Time {})
defer s .dataChannel .Close ()
s .setDataChannelReadDeadline (time .Now ().Add (-1 * time .Hour ))
s .readerMx .Lock ()
s .mx .Lock ()
defer s .mx .Unlock ()
s .readerMx .Unlock ()
if s .nextMessage != nil {
s .processIncomingFlag (s .nextMessage )
s .nextMessage = nil
}
var msg pb .Message
for {
if s .closeForShutdownErr != nil {
return
}
if s .sendState == sendStateDataReceived || s .sendState == sendStateReset {
return
}
if !s .controlMessageReaderEndTime .IsZero () && time .Now ().After (s .controlMessageReaderEndTime ) {
return
}
s .setDataChannelReadDeadline (s .controlMessageReaderEndTime )
s .mx .Unlock ()
err := s .reader .ReadMsg (&msg )
s .mx .Lock ()
if err != nil {
if errors .Is (err , os .ErrDeadlineExceeded ) {
continue
}
return
}
s .processIncomingFlag (&msg )
}
}()
})
}
func (s *stream ) cleanup () {
s .onDoneOnce .Do (func () {
if s .onDone != nil {
s .onDone ()
}
})
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .