package swarm
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
)
var _ network .Stream = &Stream {}
type Stream struct {
id uint64
stream network .MuxedStream
conn *Conn
scope network .StreamManagementScope
closeMx sync .Mutex
isClosed bool
acceptStreamGoroutineCompleted bool
protocol atomic .Pointer [protocol .ID ]
stat network .Stats
}
func (s *Stream ) ID () string {
return fmt .Sprintf ("%s-%d" , s .conn .ID (), s .id )
}
func (s *Stream ) String () string {
return fmt .Sprintf (
"<swarm.Stream[%s] %s (%s) <-> %s (%s)>" ,
s .conn .conn .Transport (),
s .conn .LocalMultiaddr (),
s .conn .LocalPeer (),
s .conn .RemoteMultiaddr (),
s .conn .RemotePeer (),
)
}
func (s *Stream ) Conn () network .Conn {
return s .conn
}
func (s *Stream ) Read (p []byte ) (int , error ) {
n , err := s .stream .Read (p )
if s .conn .swarm .bwc != nil {
s .conn .swarm .bwc .LogRecvMessage (int64 (n ))
s .conn .swarm .bwc .LogRecvMessageStream (int64 (n ), s .Protocol (), s .Conn ().RemotePeer ())
}
return n , err
}
func (s *Stream ) Write (p []byte ) (int , error ) {
n , err := s .stream .Write (p )
if s .conn .swarm .bwc != nil {
s .conn .swarm .bwc .LogSentMessage (int64 (n ))
s .conn .swarm .bwc .LogSentMessageStream (int64 (n ), s .Protocol (), s .Conn ().RemotePeer ())
}
return n , err
}
func (s *Stream ) Close () error {
err := s .stream .Close ()
s .closeAndRemoveStream ()
return err
}
func (s *Stream ) Reset () error {
err := s .stream .Reset ()
s .closeAndRemoveStream ()
return err
}
func (s *Stream ) ResetWithError (errCode network .StreamErrorCode ) error {
err := s .stream .ResetWithError (errCode )
s .closeAndRemoveStream ()
return err
}
func (s *Stream ) closeAndRemoveStream () {
s .closeMx .Lock ()
defer s .closeMx .Unlock ()
if s .isClosed {
return
}
s .isClosed = true
s .conn .swarm .refs .Done ()
if s .acceptStreamGoroutineCompleted {
s .conn .removeStream (s )
}
}
func (s *Stream ) CloseWrite () error {
return s .stream .CloseWrite ()
}
func (s *Stream ) CloseRead () error {
return s .stream .CloseRead ()
}
func (s *Stream ) completeAcceptStreamGoroutine () {
s .closeMx .Lock ()
defer s .closeMx .Unlock ()
if s .acceptStreamGoroutineCompleted {
return
}
s .acceptStreamGoroutineCompleted = true
if s .isClosed {
s .conn .removeStream (s )
}
}
func (s *Stream ) Protocol () protocol .ID {
p := s .protocol .Load ()
if p == nil {
return ""
}
return *p
}
func (s *Stream ) SetProtocol (p protocol .ID ) error {
if err := s .scope .SetProtocol (p ); err != nil {
return err
}
s .protocol .Store (&p )
return nil
}
func (s *Stream ) SetDeadline (t time .Time ) error {
return s .stream .SetDeadline (t )
}
func (s *Stream ) SetReadDeadline (t time .Time ) error {
return s .stream .SetReadDeadline (t )
}
func (s *Stream ) SetWriteDeadline (t time .Time ) error {
return s .stream .SetWriteDeadline (t )
}
func (s *Stream ) Stat () network .Stats {
return s .stat
}
func (s *Stream ) Scope () network .StreamScope {
return s .scope
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .