/* * * Copyright 2024 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */package transportimport ()// ServerStream implements streaming functionality for a gRPC server.typeServerStreamstruct {Stream// Embed for common stream functionality. st internalServerTransport ctxDone <-chanstruct{} // closed at the end of stream. Cache of ctx.Done() (for performance)// cancel is invoked at the end of stream to cancel ctx. It also stops the // timer for monitoring the rpc deadline if configured. cancel func()// Holds compressor names passed in grpc-accept-encoding metadata from the // client. clientAdvertisedCompressors string// hdrMu protects outgoing header and trailer metadata. hdrMu sync.Mutex header metadata.MD// the outgoing header metadata. Updated by WriteHeader. headerSent atomic.Bool// atomically set when the headers are sent out. headerWireLength int}// Read reads an n byte message from the input stream.func ( *ServerStream) ( int) (mem.BufferSlice, error) { , := .Stream.read()if == nil { .st.incrMsgRecv() }return , }// SendHeader sends the header metadata for the given stream.func ( *ServerStream) ( metadata.MD) error {return .st.writeHeader(, )}// Write writes the hdr and data bytes to the output stream.func ( *ServerStream) ( []byte, mem.BufferSlice, *WriteOptions) error {return .st.write(, , , )}// WriteStatus sends the status of a stream to the client. WriteStatus is// the final call made on a stream and always occurs.func ( *ServerStream) ( *status.Status) error {return .st.writeStatus(, )}// isHeaderSent indicates whether headers have been sent.func ( *ServerStream) () bool {return .headerSent.Load()}// updateHeaderSent updates headerSent and returns true// if it was already set.func ( *ServerStream) () bool {return .headerSent.Swap(true)}// RecvCompress returns the compression algorithm applied to the inbound// message. It is empty string if there is no compression applied.func ( *ServerStream) () string {return .recvCompress}// SendCompress returns the send compressor name.func ( *ServerStream) () string {return .sendCompress}// ContentSubtype returns the content-subtype for a request. For example, a// content-subtype of "proto" will result in a content-type of// "application/grpc+proto". This will always be lowercase. See// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for// more details.func ( *ServerStream) () string {return .contentSubtype}// SetSendCompress sets the compression algorithm to the stream.func ( *ServerStream) ( string) error {if .isHeaderSent() || .getState() == streamDone {returnerrors.New("transport: set send compressor called after headers sent or stream done") } .sendCompress = returnnil}// SetContext sets the context of the stream. This will be deleted once the// stats handler callouts all move to gRPC layer.func ( *ServerStream) ( context.Context) { .ctx = }// ClientAdvertisedCompressors returns the compressor names advertised by the// client via grpc-accept-encoding header.func ( *ServerStream) () []string { := strings.Split(.clientAdvertisedCompressors, ",")for , := range { [] = strings.TrimSpace() }return}// Header returns the header metadata of the stream. It returns the out header// after t.WriteHeader is called. It does not block and must not be called// until after WriteHeader.func ( *ServerStream) () (metadata.MD, error) {// Return the header in stream. It will be the out // header after t.WriteHeader is called.return .header.Copy(), nil}// HeaderWireLength returns the size of the headers of the stream as received// from the wire.func ( *ServerStream) () int {return .headerWireLength}// SetHeader sets the header metadata. This can be called multiple times.// This should not be called in parallel to other data writes.func ( *ServerStream) ( metadata.MD) error {if .Len() == 0 {returnnil }if .isHeaderSent() || .getState() == streamDone {returnErrIllegalHeaderWrite } .hdrMu.Lock() .header = metadata.Join(.header, ) .hdrMu.Unlock()returnnil}// SetTrailer sets the trailer metadata which will be sent with the RPC status// by the server. This can be called multiple times.// This should not be called parallel to other data writes.func ( *ServerStream) ( metadata.MD) error {if .Len() == 0 {returnnil }if .getState() == streamDone {returnErrIllegalHeaderWrite } .hdrMu.Lock() .trailer = metadata.Join(.trailer, ) .hdrMu.Unlock()returnnil}func ( *ServerStream) ( int) { .st.adjustWindow(, uint32())}func ( *ServerStream) ( int) { .st.updateWindow(, uint32())}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.