package msgio

import (
	
	
	

	pool 
)

// ErrMsgTooLarge is returned when the message length is exessive
var ErrMsgTooLarge = errors.New("message too large")

const (
	lengthSize     = 4
	defaultMaxSize = 8 * 1024 * 1024 // 8mb
)

// Writer is the msgio Writer interface. It writes len-framed messages.
type Writer interface {

	// Write writes passed in buffer as a single message.
	Write([]byte) (int, error)

	// WriteMsg writes the msg in the passed in buffer.
	WriteMsg([]byte) error
}

// WriteCloser is a Writer + Closer interface. Like in `golang/pkg/io`
type WriteCloser interface {
	Writer
	io.Closer
}

// Reader is the msgio Reader interface. It reads len-framed messages.
type Reader interface {

	// Read reads the next message from the Reader.
	// The client must pass a buffer large enough, or io.ErrShortBuffer will be
	// returned.
	Read([]byte) (int, error)

	// ReadMsg reads the next message from the Reader.
	// Uses a pool.BufferPool internally to reuse buffers. User may call
	// ReleaseMsg(msg) to signal a buffer can be reused.
	ReadMsg() ([]byte, error)

	// ReleaseMsg signals a buffer can be reused.
	ReleaseMsg([]byte)

	// NextMsgLen returns the length of the next (peeked) message. Does
	// not destroy the message or have other adverse effects
	NextMsgLen() (int, error)
}

// ReadCloser combines a Reader and Closer.
type ReadCloser interface {
	Reader
	io.Closer
}

// ReadWriter combines a Reader and Writer.
type ReadWriter interface {
	Reader
	Writer
}

// ReadWriteCloser combines a Reader, a Writer, and Closer.
type ReadWriteCloser interface {
	Reader
	Writer
	io.Closer
}

// writer is the underlying type that implements the Writer interface.
type writer struct {
	W io.Writer

	pool *pool.BufferPool
	lock sync.Mutex
}

// NewWriter wraps an io.Writer with a msgio framed writer. The msgio.Writer
// will write the length prefix of every message written.
func ( io.Writer) WriteCloser {
	return NewWriterWithPool(, pool.GlobalPool)
}

// NewWriterWithPool is identical to NewWriter but allows the user to pass a
// custom buffer pool.
func ( io.Writer,  *pool.BufferPool) WriteCloser {
	return &writer{W: , pool: }
}

func ( *writer) ( []byte) (int, error) {
	 := .WriteMsg()
	if  != nil {
		return 0, 
	}
	return len(), nil
}

func ( *writer) ( []byte) ( error) {
	.lock.Lock()
	defer .lock.Unlock()

	 := .pool.Get(len() + lengthSize)
	NBO.PutUint32(, uint32(len()))
	copy([lengthSize:], )
	_,  = .W.Write()
	.pool.Put()

	return 
}

func ( *writer) () error {
	if ,  := .W.(io.Closer);  {
		return .Close()
	}
	return nil
}

// reader is the underlying type that implements the Reader interface.
type reader struct {
	R io.Reader

	lbuf [lengthSize]byte
	next int
	pool *pool.BufferPool
	lock sync.Mutex
	max  int // the maximal message size (in bytes) this reader handles
}

// NewReader wraps an io.Reader with a msgio framed reader. The msgio.Reader
// will read whole messages at a time (using the length). Assumes an equivalent
// writer on the other side.
func ( io.Reader) ReadCloser {
	return NewReaderWithPool(, pool.GlobalPool)
}

// NewReaderSize is equivalent to NewReader but allows one to
// specify a max message size.
func ( io.Reader,  int) ReadCloser {
	return NewReaderSizeWithPool(, , pool.GlobalPool)
}

// NewReaderWithPool is the same as NewReader but allows one to specify a buffer
// pool.
func ( io.Reader,  *pool.BufferPool) ReadCloser {
	return NewReaderSizeWithPool(, defaultMaxSize, )
}

// NewReaderWithPool is the same as NewReader but allows one to specify a buffer
// pool and a max message size.
func ( io.Reader,  int,  *pool.BufferPool) ReadCloser {
	if  == nil {
		panic("nil pool")
	}
	return &reader{
		R:    ,
		next: -1,
		pool: ,
		max:  ,
	}
}

// NextMsgLen reads the length of the next msg into s.lbuf, and returns it.
// WARNING: like Read, NextMsgLen is destructive. It reads from the internal
// reader.
func ( *reader) () (int, error) {
	.lock.Lock()
	defer .lock.Unlock()
	return .nextMsgLen()
}

func ( *reader) () (int, error) {
	if .next == -1 {
		,  := ReadLen(.R, .lbuf[:])
		if  != nil {
			return 0, 
		}

		.next = 
	}
	return .next, nil
}

func ( *reader) ( []byte) (int, error) {
	.lock.Lock()
	defer .lock.Unlock()

	,  := .nextMsgLen()
	if  != nil {
		return 0, 
	}

	if  > len() {
		return 0, io.ErrShortBuffer
	}

	,  := io.ReadFull(.R, [:])
	if  <  {
		.next =  -  // we only partially consumed the message.
	} else {
		.next = -1 // signal we've consumed this msg
	}
	return , 
}

func ( *reader) () ([]byte, error) {
	.lock.Lock()
	defer .lock.Unlock()

	,  := .nextMsgLen()
	if  != nil {
		return nil, 
	}

	if  == 0 {
		.next = -1
		return nil, nil
	}

	if  > .max ||  < 0 {
		return nil, ErrMsgTooLarge
	}

	 := .pool.Get()
	,  := io.ReadFull(.R, )
	if  <  {
		.next =  -  // we only partially consumed the message.
	} else {
		.next = -1 // signal we've consumed this msg
	}
	return [:], 
}

func ( *reader) ( []byte) {
	.pool.Put()
}

func ( *reader) () error {
	if ,  := .R.(io.Closer);  {
		return .Close()
	}
	return nil
}

// readWriter is the underlying type that implements a ReadWriter.
type readWriter struct {
	Reader
	Writer
}

// NewReadWriter wraps an io.ReadWriter with a msgio.ReadWriter. Writing
// and Reading will be appropriately framed.
func ( io.ReadWriter) ReadWriteCloser {
	return &readWriter{
		Reader: NewReader(),
		Writer: NewWriter(),
	}
}

// Combine wraps a pair of msgio.Writer and msgio.Reader with a msgio.ReadWriter.
func ( Writer,  Reader) ReadWriteCloser {
	return &readWriter{Reader: , Writer: }
}

func ( *readWriter) () error {
	var  []error

	if ,  := .Writer.(WriteCloser);  {
		if  := .Close();  != nil {
			 = append(, )
		}
	}
	if ,  := .Reader.(ReadCloser);  {
		if  := .Close();  != nil {
			 = append(, )
		}
	}

	if len() > 0 {
		return multiErr()
	}
	return nil
}

// multiErr is a util to return multiple errors
type multiErr []error

func ( multiErr) () string {
	if len() == 0 {
		return "no errors"
	}

	 := "Multiple errors: "
	for ,  := range  {
		if  != 0 {
			 += ", "
		}
		 += .Error()
	}
	return 
}