package wsutilimport ()// DefaultWriteBuffer contains size of Writer's default buffer. It used by// Writer constructor functions.varDefaultWriteBuffer = 4096var (// ErrNotEmpty is returned by Writer.WriteThrough() to indicate that buffer is // not empty and write through could not be done. That is, caller should call // Writer.FlushFragment() to make buffer empty.ErrNotEmpty = fmt.Errorf("writer not empty")// ErrControlOverflow is returned by ControlWriter.Write() to indicate that // no more data could be written to the underlying io.Writer because // MaxControlFramePayloadSize limit is reached.ErrControlOverflow = fmt.Errorf("control frame payload overflow"))// Constants which are represent frame length ranges.const ( len7 = int64(125) // 126 and 127 are reserved values len16 = int64(^uint16(0)) len64 = int64((^uint64(0)) >> 1))// ControlWriter is a wrapper around Writer that contains some guards for// buffered writes of control frames.typeControlWriterstruct { w *Writer limit int n int}// NewControlWriter contains ControlWriter with Writer inside whose buffer size// is at most ws.MaxControlFramePayloadSize + ws.MaxHeaderSize.func ( io.Writer, ws.State, ws.OpCode) *ControlWriter {return &ControlWriter{w: NewWriterSize(, , , ws.MaxControlFramePayloadSize),limit: ws.MaxControlFramePayloadSize, }}// NewControlWriterBuffer returns a new ControlWriter with buf as a buffer.//// Note that it reserves x bytes of buf for header data, where x could be// ws.MinHeaderSize or ws.MinHeaderSize+4 (depending on state). At most// (ws.MaxControlFramePayloadSize + x) bytes of buf will be used.//// It panics if len(buf) <= ws.MinHeaderSize + x.func ( io.Writer, ws.State, ws.OpCode, []byte) *ControlWriter { := ws.MaxControlFramePayloadSize + headerSize(, ws.MaxControlFramePayloadSize)iflen() > { = [:] } := NewWriterBuffer(, , , )return &ControlWriter{w: ,limit: len(.buf), }}// Write implements io.Writer. It writes to the underlying Writer until it// returns error or until ControlWriter write limit will be exceeded.func ( *ControlWriter) ( []byte) ( int, error) {if .n+len() > .limit {return0, ErrControlOverflow }return .w.Write()}// Flush flushes all buffered data to the underlying io.Writer.func ( *ControlWriter) () error {return .w.Flush()}var writers = pool.New(128, 65536)// GetWriter tries to reuse Writer getting it from the pool.//// This function is intended for memory consumption optimizations, because// NewWriter*() functions make allocations for inner buffer.//// Note the it ceils n to the power of two.//// If you have your own bytes buffer pool you could use NewWriterBuffer to use// pooled bytes in writer.func ( io.Writer, ws.State, ws.OpCode, int) *Writer { , := writers.Get()if != nil { := .(*Writer) .Reset(, , )return }// NOTE: we use m instead of n, because m is an attempt to reuse w of such // size in the future.returnNewWriterBufferSize(, , , )}// PutWriter puts w for future reuse by GetWriter().func ( *Writer) { .Reset(nil, 0, 0)writers.Put(, .Size())}// Writer contains logic of buffering output data into a WebSocket fragments.// It is much the same as bufio.Writer, except the thing that it works with// WebSocket frames, not the raw data.//// Writer writes frames with specified OpCode.// It uses ws.State to decide whether the output frames must be masked.//// Note that it does not check control frame size or other RFC rules.// That is, it must be used with special care to write control frames without// violation of RFC. You could use ControlWriter that wraps Writer and contains// some guards for writing control frames.//// If an error occurs writing to a Writer, no more data will be accepted and// all subsequent writes will return the error.//// After all data has been written, the client should call the Flush() method// to guarantee all data has been forwarded to the underlying io.Writer.typeWriterstruct {// dest specifies a destination of buffer flushes. dest io.Writer// op specifies the WebSocket operation code used in flushed frames. op ws.OpCode// state specifies the state of the Writer. state ws.State// extensions is a list of negotiated extensions for writer Dest. // It is used to meet the specs and set appropriate bits in fragment // header RSV segment. extensions []SendExtension// noFlush reports whether buffer must grow instead of being flushed. noFlush bool// Raw representation of the buffer, including reserved header bytes. raw []byte// Writeable part of buffer, without reserved header bytes. // Resetting this to nil will not result in reallocation if raw is not nil. // And vice versa: if buf is not nil, then Writer is assumed as ready and // initialized. buf []byte// Buffered bytes counter. n int dirty bool fseq int err error}// NewWriter returns a new Writer whose buffer has the DefaultWriteBuffer size.func ( io.Writer, ws.State, ws.OpCode) *Writer {returnNewWriterBufferSize(, , , 0)}// NewWriterSize returns a new Writer whose buffer size is at most n + ws.MaxHeaderSize.// That is, output frames payload length could be up to n, except the case when// Write() is called on empty Writer with len(p) > n.//// If n <= 0 then the default buffer size is used as Writer's buffer size.func ( io.Writer, ws.State, ws.OpCode, int) *Writer {if > 0 { += headerSize(, ) }returnNewWriterBufferSize(, , , )}// NewWriterBufferSize returns a new Writer whose buffer size is equal to n.// If n <= ws.MinHeaderSize then the default buffer size is used.//// Note that Writer will reserve x bytes for header data, where x is in range// [ws.MinHeaderSize,ws.MaxHeaderSize]. That is, frames flushed by Writer// will not have payload length equal to n, except the case when Write() is// called on empty Writer with len(p) > n.func ( io.Writer, ws.State, ws.OpCode, int) *Writer {if <= ws.MinHeaderSize { = DefaultWriteBuffer }returnNewWriterBuffer(, , , make([]byte, ))}// NewWriterBuffer returns a new Writer with buf as a buffer.//// Note that it reserves x bytes of buf for header data, where x is in range// [ws.MinHeaderSize,ws.MaxHeaderSize] (depending on state and buf size).//// You could use ws.HeaderSize() to calculate number of bytes needed to store// header data.//// It panics if len(buf) is too small to fit header and payload data.func ( io.Writer, ws.State, ws.OpCode, []byte) *Writer { := &Writer{dest: ,state: ,op: ,raw: , } .initBuf()return}func ( *Writer) () { := reserve(.state, len(.raw))iflen(.raw) <= {panic("wsutil: writer buffer is too small") } .buf = .raw[:]}// Reset resets Writer as it was created by New() methods.// Note that Reset does reset extensions and other options was set after// Writer initialization.func ( *Writer) ( io.Writer, ws.State, ws.OpCode) { .dest = .state = .op = .initBuf() .n = 0 .dirty = false .fseq = 0 .extensions = .extensions[:0] .noFlush = false}// ResetOp is an quick version of Reset().// ResetOp does reset unwritten fragments and does not reset results of// SetExtensions() or DisableFlush() methods.func ( *Writer) ( ws.OpCode) { .op = .n = 0 .dirty = false .fseq = 0}// SetExtensions adds xs as extensions to be used during writes.func ( *Writer) ( ...SendExtension) { .extensions = }// DisableFlush denies Writer to write fragments.func ( *Writer) () { .noFlush = true}// Size returns the size of the underlying buffer in bytes (not including// WebSocket header bytes).func ( *Writer) () int {returnlen(.buf)}// Available returns how many bytes are unused in the buffer.func ( *Writer) () int {returnlen(.buf) - .n}// Buffered returns the number of bytes that have been written into the current// buffer.func ( *Writer) () int {return .n}// Write implements io.Writer.//// Note that even if the Writer was created to have N-sized buffer, Write()// with payload of N bytes will not fit into that buffer. Writer reserves some// space to fit WebSocket header data.func ( *Writer) ( []byte) ( int, error) {// Even empty p may make a sense. .dirty = truevarintforlen() > .Available() && .err == nil {if .noFlush { .Grow(len())continue }if .Buffered() == 0 {// Large write, empty buffer. Write directly from p to avoid copy. // Trade off here is that we make additional Write() to underlying // io.Writer when writing frame header. // // On large buffers additional write is better than copying. , _ = .WriteThrough() } else { = copy(.buf[.n:], ) .n += .FlushFragment() } += = [:] }if .err != nil {return , .err } = copy(.buf[.n:], ) .n += += // Even if w.Available() == 0 we will not flush buffer preventively because // this could bring unwanted fragmentation. That is, user could create // buffer with size that fits exactly all further Write() call, and then // call Flush(), excepting that single and not fragmented frame will be // sent. With preemptive flush this case will produce two frames – last one // will be empty and just to set fin = true.return , .err}func ceilPowerOfTwo( int) int { |= >> 1 |= >> 2 |= >> 4 |= >> 8 |= >> 16 |= >> 32 ++return}// Grow grows Writer's internal buffer capacity to guarantee space for another// n bytes of _payload_ -- that is, frame header is not included in n.func ( *Writer) ( int) {// NOTE: we must respect the possibility of header reserved bytes grow.var ( = len(.raw) = len(.raw) - len(.buf) = len(.raw) - len(.buf) = .Buffered() )for := - - ; < ; {// This loop runs twice only at split cases, when reservation of raw // buffer space for the header shrinks capacity of new buffer such that // it still less than n. // // Loop is safe here because: // - (offset + buffered + n) is greater than size, otherwise (cap < n) // would be false: // size = offset + buffered + freeSpace (cap) // size' = offset + buffered + wantSpace (n) // Since (cap < n) is true in the loop condition, size' is guaranteed // to be greater => no infinite loop. = ceilPowerOfTwo( + + ) = reserve(.state, ) = - - }if < len(.raw) {panic("wsutil: buffer grow leads to its reduce") }if == len(.raw) {return } := make([]byte, )copy([-:], .raw[:+]) .raw = .buf = .raw[:]}// WriteThrough writes data bypassing the buffer.// Note that Writer's buffer must be empty before calling WriteThrough().func ( *Writer) ( []byte) ( int, error) {if .err != nil {return0, .err }if .Buffered() != 0 {return0, ErrNotEmpty }varws.Frame .Header = ws.Header{OpCode: .opCode(),Fin: false,Length: int64(len()), }for , := range .extensions { .Header, = .SetBits(.Header)if != nil {return0, } }if .state.ClientSide() {// Should copy bytes to prevent corruption of caller data. := pbytes.GetLen(len())deferpbytes.Put()copy(, ) .Payload = = ws.MaskFrameInPlace() } else { .Payload = } .err = ws.WriteFrame(.dest, )if .err == nil { = len() } .dirty = true .fseq++return , .err}// ReadFrom implements io.ReaderFrom.func ( *Writer) ( io.Reader) ( int64, error) {varintfor == nil {if .Available() == 0 {if .noFlush { .Grow(.Buffered()) // Twice bigger. } else { = .FlushFragment() }continue }// We copy the behavior of bufio.Writer here. // Also, from the docs on io.ReaderFrom: // ReadFrom reads data from r until EOF or error. // // See https://codereview.appspot.com/76400048/#ps1const = 100varintfor < { , = .Read(.buf[.n:])if != 0 || != nil {break } ++ }if == {return , io.ErrNoProgress } .n += += int64() }if == io.EOF {// NOTE: Do not flush preemptively. // See the Write() sources for more info. = nil .dirty = true }return , }// Flush writes any buffered data to the underlying io.Writer.// It sends the frame with "fin" flag set to true.//// If no Write() or ReadFrom() was made, then Flush() does nothing.func ( *Writer) () error {if (!.dirty && .Buffered() == 0) || .err != nil {return .err } .err = .flushFragment(true) .n = 0 .dirty = false .fseq = 0return .err}// FlushFragment writes any buffered data to the underlying io.Writer.// It sends the frame with "fin" flag set to false.func ( *Writer) () error {if .Buffered() == 0 || .err != nil {return .err } .err = .flushFragment(false) .n = 0 .fseq++return .err}func ( *Writer) ( bool) ( error) {var ( = .buf[:.n] = ws.Header{OpCode: .opCode(),Fin: ,Length: int64(len()), } )for , := range .extensions { , = .SetBits()if != nil {return } }if .state.ClientSide() { .Masked = true .Mask = ws.NewMask()ws.Cipher(, .Mask, 0) }// Write header to the header segment of the raw buffer.var ( = len(.raw) - len(.buf) = - ws.HeaderSize() ) := bytesWriter{buf: .raw[:], }if := ws.WriteHeader(&, ); != nil {// Must never be reached.panic("dump header error: " + .Error()) } _, = .dest.Write(.raw[ : +.n])return}func ( *Writer) () ws.OpCode {if .fseq > 0 {returnws.OpContinuation }return .op}var errNoSpace = fmt.Errorf("not enough buffer space")type bytesWriter struct { buf []byte pos int}func ( *bytesWriter) ( []byte) (int, error) { := copy(.buf[.pos:], ) .pos += if != len() {return , errNoSpace }return , nil}func writeFrame( io.Writer, ws.State, ws.OpCode, bool, []byte) error {varws.Frameif .ClientSide() {// Should copy bytes to prevent corruption of caller data. := pbytes.GetLen(len())deferpbytes.Put()copy(, ) = ws.NewFrame(, , ) = ws.MaskFrameInPlace() } else { = ws.NewFrame(, , ) }returnws.WriteFrame(, )}// reserve calculates number of bytes need to be reserved for frame header.//// Note that instead of ws.HeaderSize() it does calculation based on the buffer// size, not the payload size.func reserve( ws.State, int) ( int) {varintif .ClientSide() { = 4 }switch {case <= int(len7)++2:return + 2case <= int(len16)++4:return + 4default:return + 10 }}// headerSize returns number of bytes needed to encode header of a frame with// given state and length.func headerSize( ws.State, int) int {returnws.HeaderSize(ws.Header{Length: int64(),Masked: .ClientSide(), })}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.