package rcmgr
import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"os"
"strings"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/network"
)
type trace struct {
path string
ctx context .Context
cancel func ()
wg sync .WaitGroup
mx sync .Mutex
done bool
pendingWrites []interface {}
reporters []TraceReporter
}
type TraceReporter interface {
ConsumeEvent (TraceEvt )
}
func WithTrace (path string ) Option {
return func (r *resourceManager ) error {
if r .trace == nil {
r .trace = &trace {path : path }
} else {
r .trace .path = path
}
return nil
}
}
func WithTraceReporter (reporter TraceReporter ) Option {
return func (r *resourceManager ) error {
if r .trace == nil {
r .trace = &trace {}
}
r .trace .reporters = append (r .trace .reporters , reporter )
return nil
}
}
type TraceEvtTyp string
const (
TraceStartEvt TraceEvtTyp = "start"
TraceCreateScopeEvt TraceEvtTyp = "create_scope"
TraceDestroyScopeEvt TraceEvtTyp = "destroy_scope"
TraceReserveMemoryEvt TraceEvtTyp = "reserve_memory"
TraceBlockReserveMemoryEvt TraceEvtTyp = "block_reserve_memory"
TraceReleaseMemoryEvt TraceEvtTyp = "release_memory"
TraceAddStreamEvt TraceEvtTyp = "add_stream"
TraceBlockAddStreamEvt TraceEvtTyp = "block_add_stream"
TraceRemoveStreamEvt TraceEvtTyp = "remove_stream"
TraceAddConnEvt TraceEvtTyp = "add_conn"
TraceBlockAddConnEvt TraceEvtTyp = "block_add_conn"
TraceRemoveConnEvt TraceEvtTyp = "remove_conn"
)
type scopeClass struct {
name string
}
func (s scopeClass ) MarshalJSON () ([]byte , error ) {
name := s .name
var span string
if idx := strings .Index (name , "span:" ); idx > -1 {
name = name [:idx -1 ]
span = name [idx +5 :]
}
if name == "system" || name == "transient" || name == "allowlistedSystem" || name == "allowlistedTransient" {
return json .Marshal (struct {
Class string
Span string `json:",omitempty"`
}{
Class : name ,
Span : span ,
})
}
if strings .HasPrefix (name , "conn-" ) {
return json .Marshal (struct {
Class string
Conn string
Span string `json:",omitempty"`
}{
Class : "conn" ,
Conn : name [5 :],
Span : span ,
})
}
if strings .HasPrefix (name , "stream-" ) {
return json .Marshal (struct {
Class string
Stream string
Span string `json:",omitempty"`
}{
Class : "stream" ,
Stream : name [7 :],
Span : span ,
})
}
if strings .HasPrefix (name , "peer:" ) {
return json .Marshal (struct {
Class string
Peer string
Span string `json:",omitempty"`
}{
Class : "peer" ,
Peer : name [5 :],
Span : span ,
})
}
if strings .HasPrefix (name , "service:" ) {
if idx := strings .Index (name , "peer:" ); idx > 0 {
return json .Marshal (struct {
Class string
Service string
Peer string
Span string `json:",omitempty"`
}{
Class : "service-peer" ,
Service : name [8 : idx -1 ],
Peer : name [idx +5 :],
Span : span ,
})
} else {
return json .Marshal (struct {
Class string
Service string
Span string `json:",omitempty"`
}{
Class : "service" ,
Service : name [8 :],
Span : span ,
})
}
}
if strings .HasPrefix (name , "protocol:" ) {
if idx := strings .Index (name , "peer:" ); idx > -1 {
return json .Marshal (struct {
Class string
Protocol string
Peer string
Span string `json:",omitempty"`
}{
Class : "protocol-peer" ,
Protocol : name [9 : idx -1 ],
Peer : name [idx +5 :],
Span : span ,
})
} else {
return json .Marshal (struct {
Class string
Protocol string
Span string `json:",omitempty"`
}{
Class : "protocol" ,
Protocol : name [9 :],
Span : span ,
})
}
}
return nil , fmt .Errorf ("unrecognized scope: %s" , name )
}
type TraceEvt struct {
Time string
Type TraceEvtTyp
Scope *scopeClass `json:",omitempty"`
Name string `json:",omitempty"`
Limit interface {} `json:",omitempty"`
Priority uint8 `json:",omitempty"`
Delta int64 `json:",omitempty"`
DeltaIn int `json:",omitempty"`
DeltaOut int `json:",omitempty"`
Memory int64 `json:",omitempty"`
StreamsIn int `json:",omitempty"`
StreamsOut int `json:",omitempty"`
ConnsIn int `json:",omitempty"`
ConnsOut int `json:",omitempty"`
FD int `json:",omitempty"`
}
func (t *trace ) push (evt TraceEvt ) {
t .mx .Lock ()
defer t .mx .Unlock ()
if t .done {
return
}
evt .Time = time .Now ().Format (time .RFC3339Nano )
if evt .Name != "" {
evt .Scope = &scopeClass {name : evt .Name }
}
for _ , reporter := range t .reporters {
reporter .ConsumeEvent (evt )
}
if t .path != "" {
t .pendingWrites = append (t .pendingWrites , evt )
}
}
func (t *trace ) backgroundWriter (out io .WriteCloser ) {
defer t .wg .Done ()
defer out .Close ()
gzOut := gzip .NewWriter (out )
defer gzOut .Close ()
jsonOut := json .NewEncoder (gzOut )
ticker := time .NewTicker (time .Second )
defer ticker .Stop ()
var pend []interface {}
getEvents := func () {
t .mx .Lock ()
tmp := t .pendingWrites
t .pendingWrites = pend [:0 ]
pend = tmp
t .mx .Unlock ()
}
for {
select {
case <- ticker .C :
getEvents ()
if len (pend ) == 0 {
continue
}
if err := t .writeEvents (pend , jsonOut ); err != nil {
log .Warnf ("error writing rcmgr trace: %s" , err )
t .mx .Lock ()
t .done = true
t .mx .Unlock ()
return
}
if err := gzOut .Flush (); err != nil {
log .Warnf ("error flushing rcmgr trace: %s" , err )
t .mx .Lock ()
t .done = true
t .mx .Unlock ()
return
}
case <- t .ctx .Done ():
getEvents ()
if len (pend ) == 0 {
return
}
if err := t .writeEvents (pend , jsonOut ); err != nil {
log .Warnf ("error writing rcmgr trace: %s" , err )
return
}
if err := gzOut .Flush (); err != nil {
log .Warnf ("error flushing rcmgr trace: %s" , err )
}
return
}
}
}
func (t *trace ) writeEvents (pend []interface {}, jout *json .Encoder ) error {
for _ , e := range pend {
if err := jout .Encode (e ); err != nil {
return err
}
}
return nil
}
func (t *trace ) Start (limits Limiter ) error {
if t == nil {
return nil
}
t .ctx , t .cancel = context .WithCancel (context .Background ())
if t .path != "" {
out , err := os .OpenFile (t .path , os .O_CREATE |os .O_WRONLY |os .O_TRUNC , 0644 )
if err != nil {
return nil
}
t .wg .Add (1 )
go t .backgroundWriter (out )
}
t .push (TraceEvt {
Type : TraceStartEvt ,
Limit : limits ,
})
return nil
}
func (t *trace ) Close () error {
if t == nil {
return nil
}
t .mx .Lock ()
if t .done {
t .mx .Unlock ()
return nil
}
t .cancel ()
t .done = true
t .mx .Unlock ()
t .wg .Wait ()
return nil
}
func (t *trace ) CreateScope (scope string , limit Limit ) {
if t == nil {
return
}
t .push (TraceEvt {
Type : TraceCreateScopeEvt ,
Name : scope ,
Limit : limit ,
})
}
func (t *trace ) DestroyScope (scope string ) {
if t == nil {
return
}
t .push (TraceEvt {
Type : TraceDestroyScopeEvt ,
Name : scope ,
})
}
func (t *trace ) ReserveMemory (scope string , prio uint8 , size , mem int64 ) {
if t == nil {
return
}
if size == 0 {
return
}
t .push (TraceEvt {
Type : TraceReserveMemoryEvt ,
Name : scope ,
Priority : prio ,
Delta : size ,
Memory : mem ,
})
}
func (t *trace ) BlockReserveMemory (scope string , prio uint8 , size , mem int64 ) {
if t == nil {
return
}
if size == 0 {
return
}
t .push (TraceEvt {
Type : TraceBlockReserveMemoryEvt ,
Name : scope ,
Priority : prio ,
Delta : size ,
Memory : mem ,
})
}
func (t *trace ) ReleaseMemory (scope string , size , mem int64 ) {
if t == nil {
return
}
if size == 0 {
return
}
t .push (TraceEvt {
Type : TraceReleaseMemoryEvt ,
Name : scope ,
Delta : -size ,
Memory : mem ,
})
}
func (t *trace ) AddStream (scope string , dir network .Direction , nstreamsIn , nstreamsOut int ) {
if t == nil {
return
}
var deltaIn , deltaOut int
if dir == network .DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
t .push (TraceEvt {
Type : TraceAddStreamEvt ,
Name : scope ,
DeltaIn : deltaIn ,
DeltaOut : deltaOut ,
StreamsIn : nstreamsIn ,
StreamsOut : nstreamsOut ,
})
}
func (t *trace ) BlockAddStream (scope string , dir network .Direction , nstreamsIn , nstreamsOut int ) {
if t == nil {
return
}
var deltaIn , deltaOut int
if dir == network .DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
t .push (TraceEvt {
Type : TraceBlockAddStreamEvt ,
Name : scope ,
DeltaIn : deltaIn ,
DeltaOut : deltaOut ,
StreamsIn : nstreamsIn ,
StreamsOut : nstreamsOut ,
})
}
func (t *trace ) RemoveStream (scope string , dir network .Direction , nstreamsIn , nstreamsOut int ) {
if t == nil {
return
}
var deltaIn , deltaOut int
if dir == network .DirInbound {
deltaIn = -1
} else {
deltaOut = -1
}
t .push (TraceEvt {
Type : TraceRemoveStreamEvt ,
Name : scope ,
DeltaIn : deltaIn ,
DeltaOut : deltaOut ,
StreamsIn : nstreamsIn ,
StreamsOut : nstreamsOut ,
})
}
func (t *trace ) AddStreams (scope string , deltaIn , deltaOut , nstreamsIn , nstreamsOut int ) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 {
return
}
t .push (TraceEvt {
Type : TraceAddStreamEvt ,
Name : scope ,
DeltaIn : deltaIn ,
DeltaOut : deltaOut ,
StreamsIn : nstreamsIn ,
StreamsOut : nstreamsOut ,
})
}
func (t *trace ) BlockAddStreams (scope string , deltaIn , deltaOut , nstreamsIn , nstreamsOut int ) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 {
return
}
t .push (TraceEvt {
Type : TraceBlockAddStreamEvt ,
Name : scope ,
DeltaIn : deltaIn ,
DeltaOut : deltaOut ,
StreamsIn : nstreamsIn ,
StreamsOut : nstreamsOut ,
})
}
func (t *trace ) RemoveStreams (scope string , deltaIn , deltaOut , nstreamsIn , nstreamsOut int ) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 {
return
}
t .push (TraceEvt {
Type : TraceRemoveStreamEvt ,
Name : scope ,
DeltaIn : -deltaIn ,
DeltaOut : -deltaOut ,
StreamsIn : nstreamsIn ,
StreamsOut : nstreamsOut ,
})
}
func (t *trace ) AddConn (scope string , dir network .Direction , usefd bool , nconnsIn , nconnsOut , nfd int ) {
if t == nil {
return
}
var deltaIn , deltaOut , deltafd int
if dir == network .DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
if usefd {
deltafd = 1
}
t .push (TraceEvt {
Type : TraceAddConnEvt ,
Name : scope ,
DeltaIn : deltaIn ,
DeltaOut : deltaOut ,
Delta : int64 (deltafd ),
ConnsIn : nconnsIn ,
ConnsOut : nconnsOut ,
FD : nfd ,
})
}
func (t *trace ) BlockAddConn (scope string , dir network .Direction , usefd bool , nconnsIn , nconnsOut , nfd int ) {
if t == nil {
return
}
var deltaIn , deltaOut , deltafd int
if dir == network .DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
if usefd {
deltafd = 1
}
t .push (TraceEvt {
Type : TraceBlockAddConnEvt ,
Name : scope ,
DeltaIn : deltaIn ,
DeltaOut : deltaOut ,
Delta : int64 (deltafd ),
ConnsIn : nconnsIn ,
ConnsOut : nconnsOut ,
FD : nfd ,
})
}
func (t *trace ) RemoveConn (scope string , dir network .Direction , usefd bool , nconnsIn , nconnsOut , nfd int ) {
if t == nil {
return
}
var deltaIn , deltaOut , deltafd int
if dir == network .DirInbound {
deltaIn = -1
} else {
deltaOut = -1
}
if usefd {
deltafd = -1
}
t .push (TraceEvt {
Type : TraceRemoveConnEvt ,
Name : scope ,
DeltaIn : deltaIn ,
DeltaOut : deltaOut ,
Delta : int64 (deltafd ),
ConnsIn : nconnsIn ,
ConnsOut : nconnsOut ,
FD : nfd ,
})
}
func (t *trace ) AddConns (scope string , deltaIn , deltaOut , deltafd , nconnsIn , nconnsOut , nfd int ) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
return
}
t .push (TraceEvt {
Type : TraceAddConnEvt ,
Name : scope ,
DeltaIn : deltaIn ,
DeltaOut : deltaOut ,
Delta : int64 (deltafd ),
ConnsIn : nconnsIn ,
ConnsOut : nconnsOut ,
FD : nfd ,
})
}
func (t *trace ) BlockAddConns (scope string , deltaIn , deltaOut , deltafd , nconnsIn , nconnsOut , nfd int ) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
return
}
t .push (TraceEvt {
Type : TraceBlockAddConnEvt ,
Name : scope ,
DeltaIn : deltaIn ,
DeltaOut : deltaOut ,
Delta : int64 (deltafd ),
ConnsIn : nconnsIn ,
ConnsOut : nconnsOut ,
FD : nfd ,
})
}
func (t *trace ) RemoveConns (scope string , deltaIn , deltaOut , deltafd , nconnsIn , nconnsOut , nfd int ) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
return
}
t .push (TraceEvt {
Type : TraceRemoveConnEvt ,
Name : scope ,
DeltaIn : -deltaIn ,
DeltaOut : -deltaOut ,
Delta : -int64 (deltafd ),
ConnsIn : nconnsIn ,
ConnsOut : nconnsOut ,
FD : nfd ,
})
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .