package msgioimport (pool)// ErrMsgTooLarge is returned when the message length is exessivevarErrMsgTooLarge = errors.New("message too large")const ( lengthSize = 4 defaultMaxSize = 8 * 1024 * 1024// 8mb)// Writer is the msgio Writer interface. It writes len-framed messages.typeWriterinterface {// Write writes passed in buffer as a single message.Write([]byte) (int, error)// WriteMsg writes the msg in the passed in buffer.WriteMsg([]byte) error}// WriteCloser is a Writer + Closer interface. Like in `golang/pkg/io`typeWriteCloserinterface {Writerio.Closer}// Reader is the msgio Reader interface. It reads len-framed messages.typeReaderinterface {// Read reads the next message from the Reader. // The client must pass a buffer large enough, or io.ErrShortBuffer will be // returned.Read([]byte) (int, error)// ReadMsg reads the next message from the Reader. // Uses a pool.BufferPool internally to reuse buffers. User may call // ReleaseMsg(msg) to signal a buffer can be reused.ReadMsg() ([]byte, error)// ReleaseMsg signals a buffer can be reused.ReleaseMsg([]byte)// NextMsgLen returns the length of the next (peeked) message. Does // not destroy the message or have other adverse effectsNextMsgLen() (int, error)}// ReadCloser combines a Reader and Closer.typeReadCloserinterface {Readerio.Closer}// ReadWriter combines a Reader and Writer.typeReadWriterinterface {ReaderWriter}// ReadWriteCloser combines a Reader, a Writer, and Closer.typeReadWriteCloserinterface {ReaderWriterio.Closer}// writer is the underlying type that implements the Writer interface.type writer struct { W io.Writer pool *pool.BufferPool lock sync.Mutex}// NewWriter wraps an io.Writer with a msgio framed writer. The msgio.Writer// will write the length prefix of every message written.func ( io.Writer) WriteCloser {returnNewWriterWithPool(, pool.GlobalPool)}// NewWriterWithPool is identical to NewWriter but allows the user to pass a// custom buffer pool.func ( io.Writer, *pool.BufferPool) WriteCloser {return &writer{W: , pool: }}func ( *writer) ( []byte) (int, error) { := .WriteMsg()if != nil {return0, }returnlen(), nil}func ( *writer) ( []byte) ( error) { .lock.Lock()defer .lock.Unlock() := .pool.Get(len() + lengthSize)NBO.PutUint32(, uint32(len()))copy([lengthSize:], ) _, = .W.Write() .pool.Put()return}func ( *writer) () error {if , := .W.(io.Closer); {return .Close() }returnnil}// reader is the underlying type that implements the Reader interface.type reader struct { R io.Reader lbuf [lengthSize]byte next int pool *pool.BufferPool lock sync.Mutex max int// the maximal message size (in bytes) this reader handles}// NewReader wraps an io.Reader with a msgio framed reader. The msgio.Reader// will read whole messages at a time (using the length). Assumes an equivalent// writer on the other side.func ( io.Reader) ReadCloser {returnNewReaderWithPool(, pool.GlobalPool)}// NewReaderSize is equivalent to NewReader but allows one to// specify a max message size.func ( io.Reader, int) ReadCloser {returnNewReaderSizeWithPool(, , pool.GlobalPool)}// NewReaderWithPool is the same as NewReader but allows one to specify a buffer// pool.func ( io.Reader, *pool.BufferPool) ReadCloser {returnNewReaderSizeWithPool(, defaultMaxSize, )}// NewReaderWithPool is the same as NewReader but allows one to specify a buffer// pool and a max message size.func ( io.Reader, int, *pool.BufferPool) ReadCloser {if == nil {panic("nil pool") }return &reader{R: ,next: -1,pool: ,max: , }}// NextMsgLen reads the length of the next msg into s.lbuf, and returns it.// WARNING: like Read, NextMsgLen is destructive. It reads from the internal// reader.func ( *reader) () (int, error) { .lock.Lock()defer .lock.Unlock()return .nextMsgLen()}func ( *reader) () (int, error) {if .next == -1 { , := ReadLen(.R, .lbuf[:])if != nil {return0, } .next = }return .next, nil}func ( *reader) ( []byte) (int, error) { .lock.Lock()defer .lock.Unlock() , := .nextMsgLen()if != nil {return0, }if > len() {return0, io.ErrShortBuffer } , := io.ReadFull(.R, [:])if < { .next = - // we only partially consumed the message. } else { .next = -1// signal we've consumed this msg }return , }func ( *reader) () ([]byte, error) { .lock.Lock()defer .lock.Unlock() , := .nextMsgLen()if != nil {returnnil, }if == 0 { .next = -1returnnil, nil }if > .max || < 0 {returnnil, ErrMsgTooLarge } := .pool.Get() , := io.ReadFull(.R, )if < { .next = - // we only partially consumed the message. } else { .next = -1// signal we've consumed this msg }return [:], }func ( *reader) ( []byte) { .pool.Put()}func ( *reader) () error {if , := .R.(io.Closer); {return .Close() }returnnil}// readWriter is the underlying type that implements a ReadWriter.type readWriter struct {ReaderWriter}// NewReadWriter wraps an io.ReadWriter with a msgio.ReadWriter. Writing// and Reading will be appropriately framed.func ( io.ReadWriter) ReadWriteCloser {return &readWriter{Reader: NewReader(),Writer: NewWriter(), }}// Combine wraps a pair of msgio.Writer and msgio.Reader with a msgio.ReadWriter.func ( Writer, Reader) ReadWriteCloser {return &readWriter{Reader: , Writer: }}func ( *readWriter) () error {var []errorif , := .Writer.(WriteCloser); {if := .Close(); != nil { = append(, ) } }if , := .Reader.(ReadCloser); {if := .Close(); != nil { = append(, ) } }iflen() > 0 {returnmultiErr() }returnnil}// multiErr is a util to return multiple errorstype multiErr []errorfunc ( multiErr) () string {iflen() == 0 {return"no errors" } := "Multiple errors: "for , := range {if != 0 { += ", " } += .Error() }return}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.