/* * * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */package binarylogimport (binlogpb)var (// DefaultSink is the sink where the logs will be written to. It's exported // for the binarylog package to update.DefaultSinkSink = &noopSink{} // TODO(blog): change this default (file in /tmp).)// Sink writes log entry into the binary log sink.//// sink is a copy of the exported binarylog.Sink, to avoid circular dependency.typeSinkinterface {// Write will be called to write the log entry into the sink. // // It should be thread-safe so it can be called in parallel.Write(*binlogpb.GrpcLogEntry) error// Close will be called when the Sink is replaced by a new Sink.Close() error}type noopSink struct{}func ( *noopSink) (*binlogpb.GrpcLogEntry) error { returnnil }func ( *noopSink) () error { returnnil }// newWriterSink creates a binary log sink with the given writer.//// Write() marshals the proto message and writes it to the given writer. Each// message is prefixed with a 4 byte big endian unsigned integer as the length.//// No buffer is done, Close() doesn't try to close the writer.func newWriterSink( io.Writer) Sink {return &writerSink{out: }}type writerSink struct { out io.Writer}func ( *writerSink) ( *binlogpb.GrpcLogEntry) error { , := proto.Marshal()if != nil {grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", )return } := make([]byte, 4)binary.BigEndian.PutUint32(, uint32(len()))if , := .out.Write(); != nil {return }if , := .out.Write(); != nil {return }returnnil}func ( *writerSink) () error { returnnil }type bufferedSink struct { mu sync.Mutex closer io.Closer out Sink// out is built on buf. buf *bufio.Writer// buf is kept for flush. flusherStarted bool writeTicker *time.Ticker done chanstruct{}}func ( *bufferedSink) ( *binlogpb.GrpcLogEntry) error { .mu.Lock()defer .mu.Unlock()if !.flusherStarted {// Start the write loop when Write is called. .startFlushGoroutine() .flusherStarted = true }if := .out.Write(); != nil {return }returnnil}const ( bufFlushDuration = 60 * time.Second)func ( *bufferedSink) () { .writeTicker = time.NewTicker(bufFlushDuration)gofunc() {for {select {case<-.done:returncase<-.writeTicker.C: } .mu.Lock()if := .buf.Flush(); != nil {grpclogLogger.Warningf("failed to flush to Sink: %v", ) } .mu.Unlock() } }()}func ( *bufferedSink) () error { .mu.Lock()defer .mu.Unlock()if .writeTicker != nil { .writeTicker.Stop() }close(.done)if := .buf.Flush(); != nil {grpclogLogger.Warningf("failed to flush to Sink: %v", ) }if := .closer.Close(); != nil {grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", ) }if := .out.Close(); != nil {grpclogLogger.Warningf("failed to close the Sink: %v", ) }returnnil}// NewBufferedSink creates a binary log sink with the given WriteCloser.//// Write() marshals the proto message and writes it to the given writer. Each// message is prefixed with a 4 byte big endian unsigned integer as the length.//// Content is kept in a buffer, and is flushed every 60 seconds.//// Close closes the WriteCloser.func ( io.WriteCloser) Sink { := bufio.NewWriter()return &bufferedSink{closer: ,out: newWriterSink(),buf: ,done: make(chanstruct{}), }}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.