package yamux
import (
"io"
"math"
"sync"
"sync/atomic"
"time"
)
type streamState int
const (
streamInit streamState = iota
streamSYNSent
streamSYNReceived
streamEstablished
streamFinished
)
type halfStreamState int
const (
halfOpen halfStreamState = iota
halfClosed
halfReset
)
type Stream struct {
sendWindow uint32
memorySpan MemoryManager
id uint32
session *Session
recvWindow uint32
epochStart time .Time
state streamState
writeState, readState halfStreamState
writeErr, readErr error
stateLock sync .Mutex
recvBuf segmentedBuffer
recvNotifyCh chan struct {}
sendNotifyCh chan struct {}
readDeadline, writeDeadline pipeDeadline
}
func newStream(session *Session , id uint32 , state streamState , initialWindow uint32 , memorySpan MemoryManager ) *Stream {
s := &Stream {
id : id ,
session : session ,
state : state ,
sendWindow : initialStreamWindow ,
readDeadline : makePipeDeadline (),
writeDeadline : makePipeDeadline (),
memorySpan : memorySpan ,
recvBuf : newSegmentedBuffer (initialWindow ),
recvWindow : session .config .InitialStreamWindowSize ,
epochStart : time .Now (),
recvNotifyCh : make (chan struct {}, 1 ),
sendNotifyCh : make (chan struct {}, 1 ),
}
return s
}
func (s *Stream ) Session () *Session {
return s .session
}
func (s *Stream ) StreamID () uint32 {
return s .id
}
func (s *Stream ) Read (b []byte ) (n int , err error ) {
START :
s .stateLock .Lock ()
state := s .readState
resetErr := s .readErr
s .stateLock .Unlock ()
switch state {
case halfOpen :
case halfClosed :
empty := s .recvBuf .Len () == 0
if empty {
return 0 , io .EOF
}
case halfReset :
return 0 , resetErr
default :
panic ("unknown state" )
}
if s .recvBuf .Len () == 0 {
select {
case <- s .recvNotifyCh :
goto START
case <- s .readDeadline .wait ():
return 0 , ErrTimeout
}
}
n , _ = s .recvBuf .Read (b )
err = s .sendWindowUpdate (s .readDeadline .wait ())
return n , err
}
func (s *Stream ) Write (b []byte ) (int , error ) {
var total int
for total < len (b ) {
n , err := s .write (b [total :])
total += n
if err != nil {
return total , err
}
}
return total , nil
}
func (s *Stream ) write (b []byte ) (n int , err error ) {
var flags uint16
var max uint32
var hdr header
START :
s .stateLock .Lock ()
state := s .writeState
resetErr := s .writeErr
s .stateLock .Unlock ()
switch state {
case halfOpen :
case halfClosed :
return 0 , ErrStreamClosed
case halfReset :
return 0 , resetErr
default :
panic ("unknown state" )
}
window := atomic .LoadUint32 (&s .sendWindow )
if window == 0 {
select {
case <- s .sendNotifyCh :
goto START
case <- s .writeDeadline .wait ():
return 0 , ErrTimeout
}
}
flags = s .sendFlags ()
max = min (window , s .session .config .MaxMessageSize -headerSize , uint32 (len (b )))
hdr = encode (typeData , flags , s .id , max )
if err = s .session .sendMsg (hdr , b [:max ], s .writeDeadline .wait (), true ); err != nil {
return 0 , err
}
atomic .AddUint32 (&s .sendWindow , ^uint32 (max -1 ))
return int (max ), err
}
func (s *Stream ) sendFlags () uint16 {
s .stateLock .Lock ()
defer s .stateLock .Unlock ()
var flags uint16
switch s .state {
case streamInit :
flags |= flagSYN
s .state = streamSYNSent
case streamSYNReceived :
flags |= flagACK
s .state = streamEstablished
}
return flags
}
func (s *Stream ) sendWindowUpdate (deadline <-chan struct {}) error {
flags := s .sendFlags ()
needed , delta := s .recvBuf .GrowTo (s .recvWindow , flags != 0 )
if !needed {
return nil
}
now := time .Now ()
if rtt := s .session .getRTT (); flags == 0 && rtt > 0 && now .Sub (s .epochStart ) < rtt *4 {
var recvWindow uint32
if s .recvWindow > math .MaxUint32 /2 {
recvWindow = min (math .MaxUint32 , s .session .config .MaxStreamWindowSize )
} else {
recvWindow = min (s .recvWindow *2 , s .session .config .MaxStreamWindowSize )
}
if recvWindow > s .recvWindow {
grow := recvWindow - s .recvWindow
if err := s .memorySpan .ReserveMemory (int (grow ), 128 ); err == nil {
s .recvWindow = recvWindow
_, delta = s .recvBuf .GrowTo (s .recvWindow , true )
}
}
}
s .epochStart = now
hdr := encode (typeWindowUpdate , flags , s .id , delta )
return s .session .sendMsg (hdr , nil , deadline , true )
}
func (s *Stream ) sendClose () error {
flags := s .sendFlags ()
flags |= flagFIN
hdr := encode (typeWindowUpdate , flags , s .id , 0 )
return s .session .sendMsg (hdr , nil , nil , true )
}
func (s *Stream ) sendReset (errCode uint32 ) error {
hdr := encode (typeWindowUpdate , flagRST , s .id , errCode )
return s .session .sendMsg (hdr , nil , nil , true )
}
func (s *Stream ) Reset () error {
return s .ResetWithError (0 )
}
func (s *Stream ) ResetWithError (errCode uint32 ) error {
sendReset := false
s .stateLock .Lock ()
switch s .state {
case streamFinished :
s .stateLock .Unlock ()
return nil
case streamInit :
case streamSYNSent , streamSYNReceived , streamEstablished :
sendReset = true
default :
panic ("unhandled state" )
}
if s .writeState == halfOpen {
s .writeState = halfReset
s .writeErr = &StreamError {Remote : false , ErrorCode : errCode }
}
if s .readState == halfOpen {
s .readState = halfReset
s .readErr = &StreamError {Remote : false , ErrorCode : errCode }
}
s .state = streamFinished
s .notifyWaiting ()
s .stateLock .Unlock ()
if sendReset {
_ = s .sendReset (errCode )
}
s .cleanup ()
return nil
}
func (s *Stream ) CloseWrite () error {
s .stateLock .Lock ()
switch s .writeState {
case halfOpen :
case halfClosed :
s .stateLock .Unlock ()
return nil
case halfReset :
s .stateLock .Unlock ()
return s .writeErr
default :
panic ("invalid state" )
}
s .writeState = halfClosed
cleanup := s .readState != halfOpen
if cleanup {
s .state = streamFinished
}
s .stateLock .Unlock ()
s .notifyWaiting ()
err := s .sendClose ()
if cleanup {
s .cleanup ()
}
return err
}
func (s *Stream ) CloseRead () error {
cleanup := false
s .stateLock .Lock ()
switch s .readState {
case halfOpen :
case halfClosed , halfReset :
s .stateLock .Unlock ()
return nil
default :
panic ("invalid state" )
}
s .readState = halfReset
s .readErr = ErrStreamReset
cleanup = s .writeState != halfOpen
if cleanup {
s .state = streamFinished
}
s .stateLock .Unlock ()
s .notifyWaiting ()
if cleanup {
s .cleanup ()
}
return nil
}
func (s *Stream ) Close () error {
_ = s .CloseRead ()
return s .CloseWrite ()
}
func (s *Stream ) forceClose (err error ) {
s .stateLock .Lock ()
if s .readState == halfOpen {
s .readState = halfReset
s .readErr = err
}
if s .writeState == halfOpen {
s .writeState = halfReset
s .writeErr = err
}
s .state = streamFinished
s .notifyWaiting ()
s .stateLock .Unlock ()
s .readDeadline .set (time .Time {})
s .writeDeadline .set (time .Time {})
}
func (s *Stream ) cleanup () {
s .session .closeStream (s .id )
s .readDeadline .set (time .Time {})
s .writeDeadline .set (time .Time {})
}
func (s *Stream ) processFlags (hdr header , flags uint16 ) {
var closeStream bool
defer func () {
if closeStream {
s .cleanup ()
}
}()
if flags &flagACK == flagACK {
s .stateLock .Lock ()
if s .state == streamSYNSent {
s .state = streamEstablished
}
s .stateLock .Unlock ()
s .session .establishStream (s .id )
}
if flags &flagFIN == flagFIN {
var notify bool
s .stateLock .Lock ()
if s .readState == halfOpen {
s .readState = halfClosed
if s .writeState != halfOpen {
closeStream = true
s .state = streamFinished
}
notify = true
}
s .stateLock .Unlock ()
if notify {
s .notifyWaiting ()
}
}
if flags &flagRST == flagRST {
s .stateLock .Lock ()
var resetErr error = ErrStreamReset
if hdr .MsgType () == typeWindowUpdate {
resetErr = &StreamError {Remote : true , ErrorCode : hdr .Length ()}
}
if s .readState == halfOpen {
s .readState = halfReset
s .readErr = resetErr
}
if s .writeState == halfOpen {
s .writeState = halfReset
s .writeErr = resetErr
}
s .state = streamFinished
s .stateLock .Unlock ()
closeStream = true
s .notifyWaiting ()
}
}
func (s *Stream ) notifyWaiting () {
asyncNotify (s .recvNotifyCh )
asyncNotify (s .sendNotifyCh )
}
func (s *Stream ) incrSendWindow (hdr header , flags uint16 ) {
s .processFlags (hdr , flags )
atomic .AddUint32 (&s .sendWindow , hdr .Length ())
asyncNotify (s .sendNotifyCh )
}
func (s *Stream ) readData (hdr header , flags uint16 , conn io .Reader ) error {
s .processFlags (hdr , flags )
length := hdr .Length ()
if length == 0 {
return nil
}
if err := s .recvBuf .Append (conn , length ); err != nil {
s .session .logger .Printf ("[ERR] yamux: Failed to read stream data on stream %d: %v" , s .id , err )
return err
}
asyncNotify (s .recvNotifyCh )
return nil
}
func (s *Stream ) SetDeadline (t time .Time ) error {
if err := s .SetReadDeadline (t ); err != nil {
return err
}
if err := s .SetWriteDeadline (t ); err != nil {
return err
}
return nil
}
func (s *Stream ) SetReadDeadline (t time .Time ) error {
s .stateLock .Lock ()
defer s .stateLock .Unlock ()
if s .readState == halfOpen {
s .readDeadline .set (t )
}
return nil
}
func (s *Stream ) SetWriteDeadline (t time .Time ) error {
s .stateLock .Lock ()
defer s .stateLock .Unlock ()
if s .writeState == halfOpen {
s .writeDeadline .set (t )
}
return nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .