package gojay
import (
"strconv"
"sync"
"time"
)
type MarshalerStream interface {
MarshalStream (enc *StreamEncoder )
}
type StreamEncoder struct {
mux *sync .RWMutex
*Encoder
nConsumer int
delimiter byte
deadline *time .Time
done chan struct {}
}
func (s *StreamEncoder ) EncodeStream (m MarshalerStream ) {
if s .nConsumer == 1 {
go consume (s , s , m )
return
}
go consume (s , s , m )
for i := 1 ; i < s .nConsumer ; i ++ {
s .mux .RLock ()
select {
case <- s .done :
default :
ss := Stream .borrowEncoder (s .w )
ss .mux .Lock ()
ss .done = s .done
ss .buf = make ([]byte , 0 , 512 )
ss .delimiter = s .delimiter
go consume (s , ss , m )
ss .mux .Unlock ()
}
s .mux .RUnlock ()
}
return
}
func (s *StreamEncoder ) LineDelimited () *StreamEncoder {
s .delimiter = '\n'
return s
}
func (s *StreamEncoder ) CommaDelimited () *StreamEncoder {
s .delimiter = ','
return s
}
func (s *StreamEncoder ) NConsumer (n int ) *StreamEncoder {
s .nConsumer = n
return s
}
func (s *StreamEncoder ) Release () {
s .isPooled = 1
streamEncPool .Put (s )
}
func (s *StreamEncoder ) Done () <-chan struct {} {
return s .done
}
func (s *StreamEncoder ) Err () error {
return s .err
}
func (s *StreamEncoder ) Deadline () (time .Time , bool ) {
if s .deadline != nil {
return *s .deadline , true
}
return time .Time {}, false
}
func (s *StreamEncoder ) SetDeadline (t time .Time ) {
s .deadline = &t
}
func (s *StreamEncoder ) Value (key interface {}) interface {} {
return nil
}
func (s *StreamEncoder ) Cancel (err error ) {
s .mux .Lock ()
defer s .mux .Unlock ()
select {
case <- s .done :
default :
s .err = err
close (s .done )
}
}
func (s *StreamEncoder ) AddObject (v MarshalerJSONObject ) {
if v .IsNil () {
return
}
s .Encoder .writeByte ('{' )
v .MarshalJSONObject (s .Encoder )
s .Encoder .writeByte ('}' )
s .Encoder .writeByte (s .delimiter )
}
func (s *StreamEncoder ) AddString (v string ) {
s .Encoder .writeByte ('"' )
s .Encoder .writeString (v )
s .Encoder .writeByte ('"' )
s .Encoder .writeByte (s .delimiter )
}
func (s *StreamEncoder ) AddArray (v MarshalerJSONArray ) {
s .Encoder .writeByte ('[' )
v .MarshalJSONArray (s .Encoder )
s .Encoder .writeByte (']' )
s .Encoder .writeByte (s .delimiter )
}
func (s *StreamEncoder ) AddInt (value int ) {
s .buf = strconv .AppendInt (s .buf , int64 (value ), 10 )
s .Encoder .writeByte (s .delimiter )
}
func (s *StreamEncoder ) AddFloat64 (value float64 ) {
s .buf = strconv .AppendFloat (s .buf , value , 'f' , -1 , 64 )
s .Encoder .writeByte (s .delimiter )
}
func (s *StreamEncoder ) AddFloat (value float64 ) {
s .AddFloat64 (value )
}
func consume(init *StreamEncoder , s *StreamEncoder , m MarshalerStream ) {
defer s .Release ()
for {
select {
case <- init .Done ():
return
default :
m .MarshalStream (s )
if s .Encoder .err != nil {
init .Cancel (s .Encoder .err )
return
}
i , err := s .Encoder .Write ()
if err != nil || i == 0 {
init .Cancel (err )
return
}
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .