package rpc
import (
"context"
"net"
"os"
"strconv"
"sync/atomic"
"github.com/soheilhy/cmux"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)
type MuxNewServerFn func (num int , conn net .Conn ) (*Server , error )
var ssM = states .MuxStates
type Mux struct {
*am .ExceptionHandler
Mach *am .Machine
Source am .Api
NewServerFn MuxNewServerFn
Args any
ArgsPrefix string
Name string
Addr string
Listener net .Listener
LogEnabled bool
NewServerErr error
clients []net .Conn
cmux cmux .CMux
connCount atomic .Int64
}
func NewMux (
ctx context .Context , name string , newServerFn MuxNewServerFn , opts *MuxOpts ,
) (*Mux , error ) {
if opts == nil {
opts = &MuxOpts {}
}
d := &Mux {
Name : name ,
LogEnabled : os .Getenv (EnvAmRpcLogMux ) != "" ,
NewServerFn : newServerFn ,
Args : opts .Args ,
ArgsPrefix : opts .ArgsPrefix ,
}
mach , err := am .NewCommon (ctx , "rm-" +name , states .MuxSchema , ssM .Names (),
d , opts .Parent , &am .Opts {Tags : []string {"rpc-mux" }})
if err != nil {
return nil , err
}
mach .SemLogger ().SetArgsMapper (LogArgs )
mach .SetGroups (states .MuxGroups , ssC )
d .Mach = mach
if os .Getenv (EnvAmRpcDbg ) != "" {
_ = amhelp .MachDebugEnv (mach )
}
return d , nil
}
func (m *Mux ) ExceptionState (e *am .Event ) {
m .ExceptionHandler .ExceptionState (e )
}
func (m *Mux ) NewServerErrEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && a .Err != nil
}
func (m *Mux ) NewServerErrState (e *am .Event ) {
args := ParseArgs (e .Args )
m .NewServerErr = args .Err
}
func (m *Mux ) StartEnter (e *am .Event ) bool {
return m .NewServerFn != nil || m .Source != nil
}
func (m *Mux ) StartState (e *am .Event ) {
ctx := m .Mach .NewStateCtx (ssM .Start )
addr := m .Addr
mach := m .Mach
go func () {
if ctx .Err () != nil {
return
}
if m .Listener == nil {
cfg := net .ListenConfig {}
lis , err := cfg .Listen (ctx , "tcp4" , addr )
if err != nil {
AddErrNetwork (e , mach , err )
mach .EvRemove1 (e , ssM .Start , nil )
return
}
m .Listener = lis
}
m .cmux = cmux .New (m .Listener )
m .Addr = m .Listener .Addr ().String ()
m .log ("mux started on %s" , m .Addr )
l := m .cmux .Match (cmux .Any ())
go m .accept (l )
if err := m .cmux .Serve (); err != nil {
mach .AddErr (err , nil )
mach .EvRemove1 (e , ssM .Start , nil )
}
}()
}
func (m *Mux ) StartEnd (e *am .Event ) {
if m .Listener != nil {
_ = m .Listener .Close ()
m .Listener = nil
}
}
func (m *Mux ) ClientConnectedState (e *am .Event ) {
m .Mach .EvRemove1 (e , ssM .ClientConnected , nil )
}
func (m *Mux ) HasClientsEnd (e *am .Event ) bool {
return len (m .clients ) == 0
}
func (m *Mux ) HealthcheckState (e *am .Event ) {
}
func (m *Mux ) accept (l net .Listener ) {
mach := m .Mach
defer mach .PanicToErr (nil )
go m .Mach .Add1 (ssM .Ready , Pass (&A {
Addr : l .Addr ().String (),
}))
for {
conn , err := l .Accept ()
if err != nil {
mach .AddErr (err , nil )
continue
}
m .Mach .Add1 (ssM .ClientConnected , Pass (&A {
Addr : conn .RemoteAddr ().String (),
}))
var num int64 = -1
for {
num = m .connCount .Load ()
if m .connCount .CompareAndSwap (num , num +1 ) {
break
}
}
var server *Server
if m .NewServerFn == nil {
server , err = NewServer (m .Mach .Ctx (), ":0" ,
m .Name +"-" +strconv .Itoa (int (num )), m .Source , &ServerOpts {
Parent : m .Mach ,
Args : m .Args ,
ArgsPrefix : m .ArgsPrefix ,
})
} else {
server , err = m .NewServerFn (int (num ), conn )
}
if err != nil {
_ = conn .Close ()
mach .Log ("failed to create a new server: %s" , err )
continue
}
server .Conn = conn
server .Start ()
go func () {
muxCtx := m .Mach .NewStateCtx (ssM .Start )
<-server .Mach .When1 (ssS .ClientConnected , muxCtx )
<-server .Mach .WhenNot1 (ssS .ClientConnected , muxCtx )
server .Stop (true )
}()
}
}
func (m *Mux ) Start () am .Result {
return m .Mach .Add1 (ssM .Start , nil )
}
func (m *Mux ) Stop (dispose bool ) am .Result {
res := m .Mach .Remove1 (ssM .Start , nil )
if dispose {
m .Mach .Dispose ()
}
return res
}
func (m *Mux ) log (msg string , args ...any ) {
if !m .LogEnabled {
return
}
m .Mach .Log (msg , args ...)
}
type MuxOpts struct {
Parent am .Api
Args any
ArgsPrefix string
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .