package transport
import (
"context"
"fmt"
"io"
"math"
"net"
"net/http"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
icredentials "google.golang.org/grpc/internal/credentials"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/proxyattributes"
istatus "google.golang.org/grpc/internal/status"
isyscall "google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
var clientConnectionCounter uint64
var goAwayLoopyWriterTimeout = 5 * time .Second
var metadataFromOutgoingContextRaw = internal .FromOutgoingContextRaw .(func (context .Context ) (metadata .MD , [][]string , bool ))
type http2Client struct {
lastRead int64
ctx context .Context
cancel context .CancelFunc
ctxDone <-chan struct {}
userAgent string
address resolver .Address
md metadata .MD
conn net .Conn
loopy *loopyWriter
remoteAddr net .Addr
localAddr net .Addr
authInfo credentials .AuthInfo
readerDone chan struct {}
writerDone chan struct {}
goAway chan struct {}
keepaliveDone chan struct {}
framer *framer
controlBuf *controlBuffer
fc *trInFlow
scheme string
isSecure bool
perRPCCreds []credentials .PerRPCCredentials
kp keepalive .ClientParameters
keepaliveEnabled bool
statsHandlers []stats .Handler
initialWindowSize int32
maxSendHeaderListSize *uint32
bdpEst *bdpEstimator
maxConcurrentStreams uint32
streamQuota int64
streamsQuotaAvailable chan struct {}
waitingStreams uint32
registeredCompressors string
mu sync .Mutex
nextID uint32
state transportState
activeStreams map [uint32 ]*ClientStream
prevGoAwayID uint32
goAwayReason GoAwayReason
goAwayDebugMessage string
kpDormancyCond *sync .Cond
kpDormant bool
channelz *channelz .Socket
onClose func (GoAwayReason )
bufferPool mem .BufferPool
connectionID uint64
logger *grpclog .PrefixLogger
}
func dial(ctx context .Context , fn func (context .Context , string ) (net .Conn , error ), addr resolver .Address , grpcUA string ) (net .Conn , error ) {
address := addr .Addr
networkType , ok := networktype .Get (addr )
if fn != nil {
if networkType == "unix" && !strings .HasPrefix (address , "\x00" ) {
if filepath .IsAbs (address ) {
return fn (ctx , "unix://" +address )
}
return fn (ctx , "unix:" +address )
}
return fn (ctx , address )
}
if !ok {
networkType , address = ParseDialTarget (address )
}
if opts , present := proxyattributes .Get (addr ); present {
return proxyDial (ctx , addr , grpcUA , opts )
}
return internal .NetDialerWithTCPKeepalive ().DialContext (ctx , networkType , address )
}
func isTemporary(err error ) bool {
switch err := err .(type ) {
case interface {
Temporary () bool
}:
return err .Temporary ()
case interface {
Timeout () bool
}:
return err .Timeout ()
}
return true
}
func NewHTTP2Client (connectCtx , ctx context .Context , addr resolver .Address , opts ConnectOptions , onClose func (GoAwayReason )) (_ ClientTransport , err error ) {
scheme := "http"
ctx , cancel := context .WithCancel (ctx )
defer func () {
if err != nil {
cancel ()
}
}()
connectCtx = icredentials .NewClientHandshakeInfoContext (connectCtx , credentials .ClientHandshakeInfo {Attributes : addr .Attributes })
conn , err := dial (connectCtx , opts .Dialer , addr , opts .UserAgent )
if err != nil {
if opts .FailOnNonTempDialError {
return nil , connectionErrorf (isTemporary (err ), err , "transport: error while dialing: %v" , err )
}
return nil , connectionErrorf (true , err , "transport: Error while dialing: %v" , err )
}
defer func (conn net .Conn ) {
if err != nil {
conn .Close ()
}
}(conn )
ctxMonitorDone := grpcsync .NewEvent ()
newClientCtx , newClientDone := context .WithCancel (connectCtx )
defer func () {
newClientDone ()
<-ctxMonitorDone .Done ()
}()
go func (conn net .Conn ) {
defer ctxMonitorDone .Fire ()
<-newClientCtx .Done ()
if err := connectCtx .Err (); err != nil {
if logger .V (logLevel ) {
logger .Infof ("Aborting due to connect deadline expiring: %v" , err )
}
conn .Close ()
}
}(conn )
kp := opts .KeepaliveParams
if kp .Time == 0 {
kp .Time = defaultClientKeepaliveTime
}
if kp .Timeout == 0 {
kp .Timeout = defaultClientKeepaliveTimeout
}
keepaliveEnabled := false
if kp .Time != infinity {
if err = isyscall .SetTCPUserTimeout (conn , kp .Timeout ); err != nil {
return nil , connectionErrorf (false , err , "transport: failed to set TCP_USER_TIMEOUT: %v" , err )
}
keepaliveEnabled = true
}
var (
isSecure bool
authInfo credentials .AuthInfo
)
transportCreds := opts .TransportCredentials
perRPCCreds := opts .PerRPCCredentials
if b := opts .CredsBundle ; b != nil {
if t := b .TransportCredentials (); t != nil {
transportCreds = t
}
if t := b .PerRPCCredentials (); t != nil {
perRPCCreds = append (perRPCCreds , t )
}
}
if transportCreds != nil {
conn , authInfo , err = transportCreds .ClientHandshake (connectCtx , addr .ServerName , conn )
if err != nil {
return nil , connectionErrorf (isTemporary (err ), err , "transport: authentication handshake failed: %v" , err )
}
for _ , cd := range perRPCCreds {
if cd .RequireTransportSecurity () {
if ci , ok := authInfo .(interface {
GetCommonAuthInfo () credentials .CommonAuthInfo
}); ok {
secLevel := ci .GetCommonAuthInfo ().SecurityLevel
if secLevel != credentials .InvalidSecurityLevel && secLevel < credentials .PrivacyAndIntegrity {
return nil , connectionErrorf (true , nil , "transport: cannot send secure credentials on an insecure connection" )
}
}
}
}
isSecure = true
if transportCreds .Info ().SecurityProtocol == "tls" {
scheme = "https"
}
}
icwz := int32 (initialWindowSize )
if opts .InitialConnWindowSize >= defaultWindowSize {
icwz = opts .InitialConnWindowSize
}
writeBufSize := opts .WriteBufferSize
readBufSize := opts .ReadBufferSize
maxHeaderListSize := defaultClientMaxHeaderListSize
if opts .MaxHeaderListSize != nil {
maxHeaderListSize = *opts .MaxHeaderListSize
}
t := &http2Client {
ctx : ctx ,
ctxDone : ctx .Done (),
cancel : cancel ,
userAgent : opts .UserAgent ,
registeredCompressors : grpcutil .RegisteredCompressors (),
address : addr ,
conn : conn ,
remoteAddr : conn .RemoteAddr (),
localAddr : conn .LocalAddr (),
authInfo : authInfo ,
readerDone : make (chan struct {}),
writerDone : make (chan struct {}),
goAway : make (chan struct {}),
keepaliveDone : make (chan struct {}),
framer : newFramer (conn , writeBufSize , readBufSize , opts .SharedWriteBuffer , maxHeaderListSize ),
fc : &trInFlow {limit : uint32 (icwz )},
scheme : scheme ,
activeStreams : make (map [uint32 ]*ClientStream ),
isSecure : isSecure ,
perRPCCreds : perRPCCreds ,
kp : kp ,
statsHandlers : opts .StatsHandlers ,
initialWindowSize : initialWindowSize ,
nextID : 1 ,
maxConcurrentStreams : defaultMaxStreamsClient ,
streamQuota : defaultMaxStreamsClient ,
streamsQuotaAvailable : make (chan struct {}, 1 ),
keepaliveEnabled : keepaliveEnabled ,
bufferPool : opts .BufferPool ,
onClose : onClose ,
}
var czSecurity credentials .ChannelzSecurityValue
if au , ok := authInfo .(credentials .ChannelzSecurityInfo ); ok {
czSecurity = au .GetSecurityValue ()
}
t .channelz = channelz .RegisterSocket (
&channelz .Socket {
SocketType : channelz .SocketTypeNormal ,
Parent : opts .ChannelzParent ,
SocketMetrics : channelz .SocketMetrics {},
EphemeralMetrics : t .socketMetrics ,
LocalAddr : t .localAddr ,
RemoteAddr : t .remoteAddr ,
SocketOptions : channelz .GetSocketOption (t .conn ),
Security : czSecurity ,
})
t .logger = prefixLoggerForClientTransport (t )
t .ctx = peer .NewContext (t .ctx , t .getPeer ())
if md , ok := addr .Metadata .(*metadata .MD ); ok {
t .md = *md
} else if md := imetadata .Get (addr ); md != nil {
t .md = md
}
t .controlBuf = newControlBuffer (t .ctxDone )
if opts .InitialWindowSize >= defaultWindowSize {
t .initialWindowSize = opts .InitialWindowSize
}
if !opts .StaticWindowSize {
t .bdpEst = &bdpEstimator {
bdp : initialWindowSize ,
updateFlowControl : t .updateFlowControl ,
}
}
for _ , sh := range t .statsHandlers {
t .ctx = sh .TagConn (t .ctx , &stats .ConnTagInfo {
RemoteAddr : t .remoteAddr ,
LocalAddr : t .localAddr ,
})
connBegin := &stats .ConnBegin {
Client : true ,
}
sh .HandleConn (t .ctx , connBegin )
}
if t .keepaliveEnabled {
t .kpDormancyCond = sync .NewCond (&t .mu )
go t .keepalive ()
}
readerErrCh := make (chan error , 1 )
go t .reader (readerErrCh )
defer func () {
if err != nil {
close (t .writerDone )
t .Close (err )
}
}()
n , err := t .conn .Write (clientPreface )
if err != nil {
err = connectionErrorf (true , err , "transport: failed to write client preface: %v" , err )
return nil , err
}
if n != len (clientPreface ) {
err = connectionErrorf (true , nil , "transport: preface mismatch, wrote %d bytes; want %d" , n , len (clientPreface ))
return nil , err
}
var ss []http2 .Setting
if t .initialWindowSize != defaultWindowSize {
ss = append (ss , http2 .Setting {
ID : http2 .SettingInitialWindowSize ,
Val : uint32 (t .initialWindowSize ),
})
}
if opts .MaxHeaderListSize != nil {
ss = append (ss , http2 .Setting {
ID : http2 .SettingMaxHeaderListSize ,
Val : *opts .MaxHeaderListSize ,
})
}
err = t .framer .fr .WriteSettings (ss ...)
if err != nil {
err = connectionErrorf (true , err , "transport: failed to write initial settings frame: %v" , err )
return nil , err
}
if delta := uint32 (icwz - defaultWindowSize ); delta > 0 {
if err := t .framer .fr .WriteWindowUpdate (0 , delta ); err != nil {
err = connectionErrorf (true , err , "transport: failed to write window update: %v" , err )
return nil , err
}
}
t .connectionID = atomic .AddUint64 (&clientConnectionCounter , 1 )
if err := t .framer .writer .Flush (); err != nil {
return nil , err
}
if err = <-readerErrCh ; err != nil {
return nil , err
}
go func () {
t .loopy = newLoopyWriter (clientSide , t .framer , t .controlBuf , t .bdpEst , t .conn , t .logger , t .outgoingGoAwayHandler , t .bufferPool )
if err := t .loopy .run (); !isIOError (err ) {
t .conn .Close ()
}
close (t .writerDone )
}()
return t , nil
}
func (t *http2Client ) newStream (ctx context .Context , callHdr *CallHdr ) *ClientStream {
s := &ClientStream {
Stream : &Stream {
method : callHdr .Method ,
sendCompress : callHdr .SendCompress ,
buf : newRecvBuffer (),
contentSubtype : callHdr .ContentSubtype ,
},
ct : t ,
done : make (chan struct {}),
headerChan : make (chan struct {}),
doneFunc : callHdr .DoneFunc ,
}
s .wq = newWriteQuota (defaultWriteQuota , s .done )
s .requestRead = func (n int ) {
t .adjustWindow (s , uint32 (n ))
}
s .ctx = ctx
s .trReader = &transportReader {
reader : &recvBufferReader {
ctx : s .ctx ,
ctxDone : s .ctx .Done (),
recv : s .buf ,
closeStream : func (err error ) {
s .Close (err )
},
},
windowHandler : func (n int ) {
t .updateWindow (s , uint32 (n ))
},
}
return s
}
func (t *http2Client ) getPeer () *peer .Peer {
return &peer .Peer {
Addr : t .remoteAddr ,
AuthInfo : t .authInfo ,
LocalAddr : t .localAddr ,
}
}
func (t *http2Client ) outgoingGoAwayHandler (g *goAway ) (bool , error ) {
t .mu .Lock ()
maxStreamID := t .nextID - 2
t .mu .Unlock ()
if err := t .framer .fr .WriteGoAway (maxStreamID , http2 .ErrCodeNo , g .debugData ); err != nil {
return false , err
}
return false , g .closeConn
}
func (t *http2Client ) createHeaderFields (ctx context .Context , callHdr *CallHdr ) ([]hpack .HeaderField , error ) {
aud := t .createAudience (callHdr )
ri := credentials .RequestInfo {
Method : callHdr .Method ,
AuthInfo : t .authInfo ,
}
ctxWithRequestInfo := credentials .NewContextWithRequestInfo (ctx , ri )
authData , err := t .getTrAuthData (ctxWithRequestInfo , aud )
if err != nil {
return nil , err
}
callAuthData , err := t .getCallAuthData (ctxWithRequestInfo , aud , callHdr )
if err != nil {
return nil , err
}
hfLen := 7
hfLen += len (authData ) + len (callAuthData )
headerFields := make ([]hpack .HeaderField , 0 , hfLen )
headerFields = append (headerFields , hpack .HeaderField {Name : ":method" , Value : "POST" })
headerFields = append (headerFields , hpack .HeaderField {Name : ":scheme" , Value : t .scheme })
headerFields = append (headerFields , hpack .HeaderField {Name : ":path" , Value : callHdr .Method })
headerFields = append (headerFields , hpack .HeaderField {Name : ":authority" , Value : callHdr .Host })
headerFields = append (headerFields , hpack .HeaderField {Name : "content-type" , Value : grpcutil .ContentType (callHdr .ContentSubtype )})
headerFields = append (headerFields , hpack .HeaderField {Name : "user-agent" , Value : t .userAgent })
headerFields = append (headerFields , hpack .HeaderField {Name : "te" , Value : "trailers" })
if callHdr .PreviousAttempts > 0 {
headerFields = append (headerFields , hpack .HeaderField {Name : "grpc-previous-rpc-attempts" , Value : strconv .Itoa (callHdr .PreviousAttempts )})
}
registeredCompressors := t .registeredCompressors
if callHdr .SendCompress != "" {
headerFields = append (headerFields , hpack .HeaderField {Name : "grpc-encoding" , Value : callHdr .SendCompress })
if !grpcutil .IsCompressorNameRegistered (callHdr .SendCompress ) {
if registeredCompressors != "" {
registeredCompressors += ","
}
registeredCompressors += callHdr .SendCompress
}
}
if registeredCompressors != "" {
headerFields = append (headerFields , hpack .HeaderField {Name : "grpc-accept-encoding" , Value : registeredCompressors })
}
if dl , ok := ctx .Deadline (); ok {
timeout := time .Until (dl )
if timeout <= 0 {
return nil , status .Error (codes .DeadlineExceeded , context .DeadlineExceeded .Error())
}
headerFields = append (headerFields , hpack .HeaderField {Name : "grpc-timeout" , Value : grpcutil .EncodeDuration (timeout )})
}
for k , v := range authData {
headerFields = append (headerFields , hpack .HeaderField {Name : k , Value : encodeMetadataHeader (k , v )})
}
for k , v := range callAuthData {
headerFields = append (headerFields , hpack .HeaderField {Name : k , Value : encodeMetadataHeader (k , v )})
}
if md , added , ok := metadataFromOutgoingContextRaw (ctx ); ok {
var k string
for k , vv := range md {
if isReservedHeader (k ) {
continue
}
for _ , v := range vv {
headerFields = append (headerFields , hpack .HeaderField {Name : k , Value : encodeMetadataHeader (k , v )})
}
}
for _ , vv := range added {
for i , v := range vv {
if i %2 == 0 {
k = strings .ToLower (v )
continue
}
if isReservedHeader (k ) {
continue
}
headerFields = append (headerFields , hpack .HeaderField {Name : k , Value : encodeMetadataHeader (k , v )})
}
}
}
for k , vv := range t .md {
if isReservedHeader (k ) {
continue
}
for _ , v := range vv {
headerFields = append (headerFields , hpack .HeaderField {Name : k , Value : encodeMetadataHeader (k , v )})
}
}
return headerFields , nil
}
func (t *http2Client ) createAudience (callHdr *CallHdr ) string {
if len (t .perRPCCreds ) == 0 && callHdr .Creds == nil {
return ""
}
host := strings .TrimSuffix (callHdr .Host , ":443" )
pos := strings .LastIndex (callHdr .Method , "/" )
if pos == -1 {
pos = len (callHdr .Method )
}
return "https://" + host + callHdr .Method [:pos ]
}
func (t *http2Client ) getTrAuthData (ctx context .Context , audience string ) (map [string ]string , error ) {
if len (t .perRPCCreds ) == 0 {
return nil , nil
}
authData := map [string ]string {}
for _ , c := range t .perRPCCreds {
data , err := c .GetRequestMetadata (ctx , audience )
if err != nil {
if st , ok := status .FromError (err ); ok {
if istatus .IsRestrictedControlPlaneCode (st ) {
err = status .Errorf (codes .Internal , "transport: received per-RPC creds error with illegal status: %v" , err )
}
return nil , err
}
return nil , status .Errorf (codes .Unauthenticated , "transport: per-RPC creds failed due to error: %v" , err )
}
for k , v := range data {
k = strings .ToLower (k )
authData [k ] = v
}
}
return authData , nil
}
func (t *http2Client ) getCallAuthData (ctx context .Context , audience string , callHdr *CallHdr ) (map [string ]string , error ) {
var callAuthData map [string ]string
if callCreds := callHdr .Creds ; callCreds != nil {
if callCreds .RequireTransportSecurity () {
ri , _ := credentials .RequestInfoFromContext (ctx )
if !t .isSecure || credentials .CheckSecurityLevel (ri .AuthInfo , credentials .PrivacyAndIntegrity ) != nil {
return nil , status .Error (codes .Unauthenticated , "transport: cannot send secure credentials on an insecure connection" )
}
}
data , err := callCreds .GetRequestMetadata (ctx , audience )
if err != nil {
if st , ok := status .FromError (err ); ok {
if istatus .IsRestrictedControlPlaneCode (st ) {
err = status .Errorf (codes .Internal , "transport: received per-RPC creds error with illegal status: %v" , err )
}
return nil , err
}
return nil , status .Errorf (codes .Internal , "transport: per-RPC creds failed due to error: %v" , err )
}
callAuthData = make (map [string ]string , len (data ))
for k , v := range data {
k = strings .ToLower (k )
callAuthData [k ] = v
}
}
return callAuthData , nil
}
type NewStreamError struct {
Err error
AllowTransparentRetry bool
}
func (e NewStreamError ) Error () string {
return e .Err .Error()
}
func (t *http2Client ) NewStream (ctx context .Context , callHdr *CallHdr ) (*ClientStream , error ) {
ctx = peer .NewContext (ctx , t .getPeer ())
if t .address .ServerName != "" {
newCallHdr := *callHdr
newCallHdr .Host = t .address .ServerName
callHdr = &newCallHdr
}
if callHdr .Authority != "" {
auth , ok := t .authInfo .(credentials .AuthorityValidator )
if !ok {
return nil , &NewStreamError {Err : status .Errorf (codes .Unavailable , "credentials type %q does not implement the AuthorityValidator interface, but authority override specified with CallAuthority call option" , t .authInfo .AuthType ())}
}
if err := auth .ValidateAuthority (callHdr .Authority ); err != nil {
return nil , &NewStreamError {Err : status .Errorf (codes .Unavailable , "failed to validate authority %q : %v" , callHdr .Authority , err )}
}
newCallHdr := *callHdr
newCallHdr .Host = callHdr .Authority
callHdr = &newCallHdr
}
headerFields , err := t .createHeaderFields (ctx , callHdr )
if err != nil {
return nil , &NewStreamError {Err : err , AllowTransparentRetry : false }
}
s := t .newStream (ctx , callHdr )
cleanup := func (err error ) {
if s .swapState (streamDone ) == streamDone {
return
}
s .unprocessed .Store (true )
s .write (recvMsg {err : err })
close (s .done )
if atomic .CompareAndSwapUint32 (&s .headerChanClosed , 0 , 1 ) {
close (s .headerChan )
}
}
hdr := &headerFrame {
hf : headerFields ,
endStream : false ,
initStream : func (uint32 ) error {
t .mu .Lock ()
if t .state == closing {
t .mu .Unlock ()
cleanup (ErrConnClosing )
return ErrConnClosing
}
if channelz .IsOn () {
t .channelz .SocketMetrics .StreamsStarted .Add (1 )
t .channelz .SocketMetrics .LastLocalStreamCreatedTimestamp .Store (time .Now ().UnixNano ())
}
if t .kpDormant {
t .kpDormancyCond .Signal ()
}
t .mu .Unlock ()
return nil
},
onOrphaned : cleanup ,
wq : s .wq ,
}
firstTry := true
var ch chan struct {}
transportDrainRequired := false
checkForStreamQuota := func () bool {
if t .streamQuota <= 0 {
if firstTry {
t .waitingStreams ++
}
ch = t .streamsQuotaAvailable
return false
}
if !firstTry {
t .waitingStreams --
}
t .streamQuota --
t .mu .Lock ()
if t .state == draining || t .activeStreams == nil {
t .mu .Unlock ()
return false
}
hdr .streamID = t .nextID
t .nextID += 2
transportDrainRequired = t .nextID > MaxStreamID
s .id = hdr .streamID
s .fc = &inFlow {limit : uint32 (t .initialWindowSize )}
t .activeStreams [s .id ] = s
t .mu .Unlock ()
if t .streamQuota > 0 && t .waitingStreams > 0 {
select {
case t .streamsQuotaAvailable <- struct {}{}:
default :
}
}
return true
}
var hdrListSizeErr error
checkForHeaderListSize := func () bool {
if t .maxSendHeaderListSize == nil {
return true
}
var sz int64
for _ , f := range hdr .hf {
if sz += int64 (f .Size ()); sz > int64 (*t .maxSendHeaderListSize ) {
hdrListSizeErr = status .Errorf (codes .Internal , "header list size to send violates the maximum size (%d bytes) set by server" , *t .maxSendHeaderListSize )
return false
}
}
return true
}
for {
success , err := t .controlBuf .executeAndPut (func () bool {
return checkForHeaderListSize () && checkForStreamQuota ()
}, hdr )
if err != nil {
return nil , &NewStreamError {Err : err , AllowTransparentRetry : true }
}
if success {
break
}
if hdrListSizeErr != nil {
return nil , &NewStreamError {Err : hdrListSizeErr }
}
firstTry = false
select {
case <- ch :
case <- ctx .Done ():
return nil , &NewStreamError {Err : ContextErr (ctx .Err ())}
case <- t .goAway :
return nil , &NewStreamError {Err : errStreamDrain , AllowTransparentRetry : true }
case <- t .ctx .Done ():
return nil , &NewStreamError {Err : ErrConnClosing , AllowTransparentRetry : true }
}
}
if len (t .statsHandlers ) != 0 {
header , ok := metadata .FromOutgoingContext (ctx )
if ok {
header .Set ("user-agent" , t .userAgent )
} else {
header = metadata .Pairs ("user-agent" , t .userAgent )
}
for _ , sh := range t .statsHandlers {
outHeader := &stats .OutHeader {
Client : true ,
FullMethod : callHdr .Method ,
RemoteAddr : t .remoteAddr ,
LocalAddr : t .localAddr ,
Compression : callHdr .SendCompress ,
Header : header ,
}
sh .HandleRPC (s .ctx , outHeader )
}
}
if transportDrainRequired {
if t .logger .V (logLevel ) {
t .logger .Infof ("Draining transport: t.nextID > MaxStreamID" )
}
t .GracefulClose ()
}
return s , nil
}
func (t *http2Client ) closeStream (s *ClientStream , err error , rst bool , rstCode http2 .ErrCode , st *status .Status , mdata map [string ][]string , eosReceived bool ) {
if s .swapState (streamDone ) == streamDone {
<-s .done
return
}
s .status = st
if len (mdata ) > 0 {
s .trailer = mdata
}
if err != nil {
s .write (recvMsg {err : err })
}
if atomic .CompareAndSwapUint32 (&s .headerChanClosed , 0 , 1 ) {
s .noHeaders = true
close (s .headerChan )
}
cleanup := &cleanupStream {
streamID : s .id ,
onWrite : func () {
t .mu .Lock ()
if t .activeStreams != nil {
delete (t .activeStreams , s .id )
}
t .mu .Unlock ()
if channelz .IsOn () {
if eosReceived {
t .channelz .SocketMetrics .StreamsSucceeded .Add (1 )
} else {
t .channelz .SocketMetrics .StreamsFailed .Add (1 )
}
}
},
rst : rst ,
rstCode : rstCode ,
}
addBackStreamQuota := func () bool {
t .streamQuota ++
if t .streamQuota > 0 && t .waitingStreams > 0 {
select {
case t .streamsQuotaAvailable <- struct {}{}:
default :
}
}
return true
}
t .controlBuf .executeAndPut (addBackStreamQuota , cleanup )
close (s .done )
if s .doneFunc != nil {
s .doneFunc ()
}
}
func (t *http2Client ) Close (err error ) {
t .conn .SetWriteDeadline (time .Now ().Add (time .Second * 10 ))
t .mu .Lock ()
if t .state == closing {
t .mu .Unlock ()
return
}
if t .logger .V (logLevel ) {
t .logger .Infof ("Closing: %v" , err )
}
if t .state != draining {
t .onClose (GoAwayInvalid )
}
t .state = closing
streams := t .activeStreams
t .activeStreams = nil
if t .kpDormant {
t .kpDormancyCond .Signal ()
}
goAwayDebugMessage := t .goAwayDebugMessage
t .mu .Unlock ()
t .controlBuf .put (&goAway {code : http2 .ErrCodeNo , debugData : []byte ("client transport shutdown" ), closeConn : err })
timer := time .NewTimer (goAwayLoopyWriterTimeout )
defer timer .Stop ()
select {
case <- t .writerDone :
case <- timer .C :
t .logger .Infof ("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport." , goAwayLoopyWriterTimeout )
}
t .cancel ()
t .conn .Close ()
<-t .readerDone
if t .keepaliveEnabled {
<-t .keepaliveDone
}
channelz .RemoveEntry (t .channelz .ID )
var st *status .Status
if len (goAwayDebugMessage ) > 0 {
st = status .Newf (codes .Unavailable , "closing transport due to: %v, received prior goaway: %v" , err , goAwayDebugMessage )
err = st .Err ()
} else {
st = status .New (codes .Unavailable , err .Error())
}
for _ , s := range streams {
t .closeStream (s , err , false , http2 .ErrCodeNo , st , nil , false )
}
for _ , sh := range t .statsHandlers {
connEnd := &stats .ConnEnd {
Client : true ,
}
sh .HandleConn (t .ctx , connEnd )
}
}
func (t *http2Client ) GracefulClose () {
t .mu .Lock ()
if t .state == draining || t .state == closing {
t .mu .Unlock ()
return
}
if t .logger .V (logLevel ) {
t .logger .Infof ("GracefulClose called" )
}
t .onClose (GoAwayInvalid )
t .state = draining
active := len (t .activeStreams )
t .mu .Unlock ()
if active == 0 {
t .Close (connectionErrorf (true , nil , "no active streams left to process while draining" ))
return
}
t .controlBuf .put (&incomingGoAway {})
}
func (t *http2Client ) write (s *ClientStream , hdr []byte , data mem .BufferSlice , opts *WriteOptions ) error {
if opts .Last {
if !s .compareAndSwapState (streamActive , streamWriteDone ) {
return errStreamDone
}
} else if s .getState () != streamActive {
return errStreamDone
}
df := &dataFrame {
streamID : s .id ,
endStream : opts .Last ,
h : hdr ,
data : data ,
}
dataLen := data .Len ()
if hdr != nil || dataLen != 0 {
if err := s .wq .get (int32 (len (hdr ) + dataLen )); err != nil {
return err
}
}
data .Ref ()
if err := t .controlBuf .put (df ); err != nil {
data .Free ()
return err
}
t .incrMsgSent ()
return nil
}
func (t *http2Client ) getStream (f http2 .Frame ) *ClientStream {
t .mu .Lock ()
s := t .activeStreams [f .Header ().StreamID ]
t .mu .Unlock ()
return s
}
func (t *http2Client ) adjustWindow (s *ClientStream , n uint32 ) {
if w := s .fc .maybeAdjust (n ); w > 0 {
t .controlBuf .put (&outgoingWindowUpdate {streamID : s .id , increment : w })
}
}
func (t *http2Client ) updateWindow (s *ClientStream , n uint32 ) {
if w := s .fc .onRead (n ); w > 0 {
t .controlBuf .put (&outgoingWindowUpdate {streamID : s .id , increment : w })
}
}
func (t *http2Client ) updateFlowControl (n uint32 ) {
updateIWS := func () bool {
t .initialWindowSize = int32 (n )
t .mu .Lock ()
for _ , s := range t .activeStreams {
s .fc .newLimit (n )
}
t .mu .Unlock ()
return true
}
t .controlBuf .executeAndPut (updateIWS , &outgoingWindowUpdate {streamID : 0 , increment : t .fc .newLimit (n )})
t .controlBuf .put (&outgoingSettings {
ss : []http2 .Setting {
{
ID : http2 .SettingInitialWindowSize ,
Val : n ,
},
},
})
}
func (t *http2Client ) handleData (f *http2 .DataFrame ) {
size := f .Header ().Length
var sendBDPPing bool
if t .bdpEst != nil {
sendBDPPing = t .bdpEst .add (size )
}
if w := t .fc .onData (size ); w > 0 {
t .controlBuf .put (&outgoingWindowUpdate {
streamID : 0 ,
increment : w ,
})
}
if sendBDPPing {
if w := t .fc .reset (); w > 0 {
t .controlBuf .put (&outgoingWindowUpdate {
streamID : 0 ,
increment : w ,
})
}
t .controlBuf .put (bdpPing )
}
s := t .getStream (f )
if s == nil {
return
}
if size > 0 {
if err := s .fc .onData (size ); err != nil {
t .closeStream (s , io .EOF , true , http2 .ErrCodeFlowControl , status .New (codes .Internal , err .Error()), nil , false )
return
}
if f .Header ().Flags .Has (http2 .FlagDataPadded ) {
if w := s .fc .onRead (size - uint32 (len (f .Data ()))); w > 0 {
t .controlBuf .put (&outgoingWindowUpdate {s .id , w })
}
}
if len (f .Data ()) > 0 {
pool := t .bufferPool
if pool == nil {
pool = mem .DefaultBufferPool ()
}
s .write (recvMsg {buffer : mem .Copy (f .Data (), pool )})
}
}
if f .StreamEnded () {
t .closeStream (s , io .EOF , false , http2 .ErrCodeNo , status .New (codes .Internal , "server closed the stream without sending trailers" ), nil , true )
}
}
func (t *http2Client ) handleRSTStream (f *http2 .RSTStreamFrame ) {
s := t .getStream (f )
if s == nil {
return
}
if f .ErrCode == http2 .ErrCodeRefusedStream {
s .unprocessed .Store (true )
}
statusCode , ok := http2ErrConvTab [f .ErrCode ]
if !ok {
if t .logger .V (logLevel ) {
t .logger .Infof ("Received a RST_STREAM frame with code %q, but found no mapped gRPC status" , f .ErrCode )
}
statusCode = codes .Unknown
}
if statusCode == codes .Canceled {
if d , ok := s .ctx .Deadline (); ok && !d .After (time .Now ()) {
statusCode = codes .DeadlineExceeded
}
}
st := status .Newf (statusCode , "stream terminated by RST_STREAM with error code: %v" , f .ErrCode )
t .closeStream (s , st .Err (), false , http2 .ErrCodeNo , st , nil , false )
}
func (t *http2Client ) handleSettings (f *http2 .SettingsFrame , isFirst bool ) {
if f .IsAck () {
return
}
var maxStreams *uint32
var ss []http2 .Setting
var updateFuncs []func ()
f .ForeachSetting (func (s http2 .Setting ) error {
switch s .ID {
case http2 .SettingMaxConcurrentStreams :
maxStreams = new (uint32 )
*maxStreams = s .Val
case http2 .SettingMaxHeaderListSize :
updateFuncs = append (updateFuncs , func () {
t .maxSendHeaderListSize = new (uint32 )
*t .maxSendHeaderListSize = s .Val
})
default :
ss = append (ss , s )
}
return nil
})
if isFirst && maxStreams == nil {
maxStreams = new (uint32 )
*maxStreams = math .MaxUint32
}
sf := &incomingSettings {
ss : ss ,
}
if maxStreams != nil {
updateStreamQuota := func () {
delta := int64 (*maxStreams ) - int64 (t .maxConcurrentStreams )
t .maxConcurrentStreams = *maxStreams
t .streamQuota += delta
if delta > 0 && t .waitingStreams > 0 {
close (t .streamsQuotaAvailable )
t .streamsQuotaAvailable = make (chan struct {}, 1 )
}
}
updateFuncs = append (updateFuncs , updateStreamQuota )
}
t .controlBuf .executeAndPut (func () bool {
for _ , f := range updateFuncs {
f ()
}
return true
}, sf )
}
func (t *http2Client ) handlePing (f *http2 .PingFrame ) {
if f .IsAck () {
if t .bdpEst != nil {
t .bdpEst .calculate (f .Data )
}
return
}
pingAck := &ping {ack : true }
copy (pingAck .data [:], f .Data [:])
t .controlBuf .put (pingAck )
}
func (t *http2Client ) handleGoAway (f *http2 .GoAwayFrame ) error {
t .mu .Lock ()
if t .state == closing {
t .mu .Unlock ()
return nil
}
if f .ErrCode == http2 .ErrCodeEnhanceYourCalm && string (f .DebugData ()) == "too_many_pings" {
logger .Errorf ("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"." )
}
id := f .LastStreamID
if id > 0 && id %2 == 0 {
t .mu .Unlock ()
return connectionErrorf (true , nil , "received goaway with non-zero even-numbered stream id: %v" , id )
}
select {
case <- t .goAway :
if id > t .prevGoAwayID {
t .mu .Unlock ()
return connectionErrorf (true , nil , "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v" , id , t .prevGoAwayID )
}
default :
t .setGoAwayReason (f )
close (t .goAway )
defer t .controlBuf .put (&incomingGoAway {})
if t .state != draining {
t .onClose (t .goAwayReason )
t .state = draining
}
}
upperLimit := t .prevGoAwayID
if upperLimit == 0 {
upperLimit = math .MaxUint32
}
t .prevGoAwayID = id
if len (t .activeStreams ) == 0 {
t .mu .Unlock ()
return connectionErrorf (true , nil , "received goaway and there are no active streams" )
}
streamsToClose := make ([]*ClientStream , 0 )
for streamID , stream := range t .activeStreams {
if streamID > id && streamID <= upperLimit {
stream .unprocessed .Store (true )
streamsToClose = append (streamsToClose , stream )
}
}
t .mu .Unlock ()
for _ , stream := range streamsToClose {
t .closeStream (stream , errStreamDrain , false , http2 .ErrCodeNo , statusGoAway , nil , false )
}
return nil
}
func (t *http2Client ) setGoAwayReason (f *http2 .GoAwayFrame ) {
t .goAwayReason = GoAwayNoReason
if f .ErrCode == http2 .ErrCodeEnhanceYourCalm {
if string (f .DebugData ()) == "too_many_pings" {
t .goAwayReason = GoAwayTooManyPings
}
}
if len (f .DebugData ()) == 0 {
t .goAwayDebugMessage = fmt .Sprintf ("code: %s" , f .ErrCode )
} else {
t .goAwayDebugMessage = fmt .Sprintf ("code: %s, debug data: %q" , f .ErrCode , string (f .DebugData ()))
}
}
func (t *http2Client ) GetGoAwayReason () (GoAwayReason , string ) {
t .mu .Lock ()
defer t .mu .Unlock ()
return t .goAwayReason , t .goAwayDebugMessage
}
func (t *http2Client ) handleWindowUpdate (f *http2 .WindowUpdateFrame ) {
t .controlBuf .put (&incomingWindowUpdate {
streamID : f .Header ().StreamID ,
increment : f .Increment ,
})
}
func (t *http2Client ) operateHeaders (frame *http2 .MetaHeadersFrame ) {
s := t .getStream (frame )
if s == nil {
return
}
endStream := frame .StreamEnded ()
s .bytesReceived .Store (true )
initialHeader := atomic .LoadUint32 (&s .headerChanClosed ) == 0
if !initialHeader && !endStream {
st := status .New (codes .Internal , "a HEADERS frame cannot appear in the middle of a stream" )
t .closeStream (s , st .Err (), true , http2 .ErrCodeProtocol , st , nil , false )
return
}
if frame .Truncated {
se := status .New (codes .Internal , "peer header list size exceeded limit" )
t .closeStream (s , se .Err (), true , http2 .ErrCodeFrameSize , se , nil , endStream )
return
}
var (
isGRPC = !initialHeader
mdata = make (map [string ][]string )
contentTypeErr = "malformed header: missing HTTP content-type"
grpcMessage string
recvCompress string
httpStatusCode *int
httpStatusErr string
rawStatusCode = codes .Unknown
headerError string
)
if initialHeader {
httpStatusErr = "malformed header: missing HTTP status"
}
for _ , hf := range frame .Fields {
switch hf .Name {
case "content-type" :
if _ , validContentType := grpcutil .ContentSubtype (hf .Value ); !validContentType {
contentTypeErr = fmt .Sprintf ("transport: received unexpected content-type %q" , hf .Value )
break
}
contentTypeErr = ""
mdata [hf .Name ] = append (mdata [hf .Name ], hf .Value )
isGRPC = true
case "grpc-encoding" :
recvCompress = hf .Value
case "grpc-status" :
code , err := strconv .ParseInt (hf .Value , 10 , 32 )
if err != nil {
se := status .New (codes .Internal , fmt .Sprintf ("transport: malformed grpc-status: %v" , err ))
t .closeStream (s , se .Err (), true , http2 .ErrCodeProtocol , se , nil , endStream )
return
}
rawStatusCode = codes .Code (uint32 (code ))
case "grpc-message" :
grpcMessage = decodeGrpcMessage (hf .Value )
case ":status" :
if hf .Value == "200" {
httpStatusErr = ""
statusCode := 200
httpStatusCode = &statusCode
break
}
c , err := strconv .ParseInt (hf .Value , 10 , 32 )
if err != nil {
se := status .New (codes .Internal , fmt .Sprintf ("transport: malformed http-status: %v" , err ))
t .closeStream (s , se .Err (), true , http2 .ErrCodeProtocol , se , nil , endStream )
return
}
statusCode := int (c )
httpStatusCode = &statusCode
httpStatusErr = fmt .Sprintf (
"unexpected HTTP status code received from server: %d (%s)" ,
statusCode ,
http .StatusText (statusCode ),
)
default :
if isReservedHeader (hf .Name ) && !isWhitelistedHeader (hf .Name ) {
break
}
v , err := decodeMetadataHeader (hf .Name , hf .Value )
if err != nil {
headerError = fmt .Sprintf ("transport: malformed %s: %v" , hf .Name , err )
logger .Warningf ("Failed to decode metadata header (%q, %q): %v" , hf .Name , hf .Value , err )
break
}
mdata [hf .Name ] = append (mdata [hf .Name ], v )
}
}
if !isGRPC || httpStatusErr != "" {
var code = codes .Internal
if httpStatusCode != nil {
var ok bool
code , ok = HTTPStatusConvTab [*httpStatusCode ]
if !ok {
code = codes .Unknown
}
}
var errs []string
if httpStatusErr != "" {
errs = append (errs , httpStatusErr )
}
if contentTypeErr != "" {
errs = append (errs , contentTypeErr )
}
se := status .New (code , strings .Join (errs , "; " ))
t .closeStream (s , se .Err (), true , http2 .ErrCodeProtocol , se , nil , endStream )
return
}
if headerError != "" {
se := status .New (codes .Internal , headerError )
t .closeStream (s , se .Err (), true , http2 .ErrCodeProtocol , se , nil , endStream )
return
}
if !endStream {
if atomic .CompareAndSwapUint32 (&s .headerChanClosed , 0 , 1 ) {
s .headerValid = true
s .recvCompress = recvCompress
if len (mdata ) > 0 {
s .header = mdata
}
close (s .headerChan )
}
}
for _ , sh := range t .statsHandlers {
if !endStream {
inHeader := &stats .InHeader {
Client : true ,
WireLength : int (frame .Header ().Length ),
Header : metadata .MD (mdata ).Copy (),
Compression : s .recvCompress ,
}
sh .HandleRPC (s .ctx , inHeader )
} else {
inTrailer := &stats .InTrailer {
Client : true ,
WireLength : int (frame .Header ().Length ),
Trailer : metadata .MD (mdata ).Copy (),
}
sh .HandleRPC (s .ctx , inTrailer )
}
}
if !endStream {
return
}
status := istatus .NewWithProto (rawStatusCode , grpcMessage , mdata [grpcStatusDetailsBinHeader ])
rstStream := s .getState () == streamActive
t .closeStream (s , io .EOF , rstStream , http2 .ErrCodeNo , status , mdata , true )
}
func (t *http2Client ) readServerPreface () error {
frame , err := t .framer .fr .ReadFrame ()
if err != nil {
return connectionErrorf (true , err , "error reading server preface: %v" , err )
}
sf , ok := frame .(*http2 .SettingsFrame )
if !ok {
return connectionErrorf (true , nil , "initial http2 frame from server is not a settings frame: %T" , frame )
}
t .handleSettings (sf , true )
return nil
}
func (t *http2Client ) reader (errCh chan <- error ) {
var errClose error
defer func () {
close (t .readerDone )
if errClose != nil {
t .Close (errClose )
}
}()
if err := t .readServerPreface (); err != nil {
errCh <- err
return
}
close (errCh )
if t .keepaliveEnabled {
atomic .StoreInt64 (&t .lastRead , time .Now ().UnixNano ())
}
for {
t .controlBuf .throttle ()
frame , err := t .framer .fr .ReadFrame ()
if t .keepaliveEnabled {
atomic .StoreInt64 (&t .lastRead , time .Now ().UnixNano ())
}
if err != nil {
if se , ok := err .(http2 .StreamError ); ok {
t .mu .Lock ()
s := t .activeStreams [se .StreamID ]
t .mu .Unlock ()
if s != nil {
code := http2ErrConvTab [se .Code ]
errorDetail := t .framer .fr .ErrorDetail ()
var msg string
if errorDetail != nil {
msg = errorDetail .Error()
} else {
msg = "received invalid frame"
}
t .closeStream (s , status .Error (code , msg ), true , http2 .ErrCodeProtocol , status .New (code , msg ), nil , false )
}
continue
}
errClose = connectionErrorf (true , err , "error reading from server: %v" , err )
return
}
switch frame := frame .(type ) {
case *http2 .MetaHeadersFrame :
t .operateHeaders (frame )
case *http2 .DataFrame :
t .handleData (frame )
case *http2 .RSTStreamFrame :
t .handleRSTStream (frame )
case *http2 .SettingsFrame :
t .handleSettings (frame , false )
case *http2 .PingFrame :
t .handlePing (frame )
case *http2 .GoAwayFrame :
errClose = t .handleGoAway (frame )
case *http2 .WindowUpdateFrame :
t .handleWindowUpdate (frame )
default :
if logger .V (logLevel ) {
logger .Errorf ("transport: http2Client.reader got unhandled frame type %v." , frame )
}
}
}
}
func (t *http2Client ) keepalive () {
var err error
defer func () {
close (t .keepaliveDone )
if err != nil {
t .Close (err )
}
}()
p := &ping {data : [8 ]byte {}}
outstandingPing := false
timeoutLeft := time .Duration (0 )
prevNano := time .Now ().UnixNano ()
timer := time .NewTimer (t .kp .Time )
for {
select {
case <- timer .C :
lastRead := atomic .LoadInt64 (&t .lastRead )
if lastRead > prevNano {
outstandingPing = false
timer .Reset (time .Duration (lastRead ) + t .kp .Time - time .Duration (time .Now ().UnixNano ()))
prevNano = lastRead
continue
}
if outstandingPing && timeoutLeft <= 0 {
err = connectionErrorf (true , nil , "keepalive ping failed to receive ACK within timeout" )
return
}
t .mu .Lock ()
if t .state == closing {
t .mu .Unlock ()
return
}
if len (t .activeStreams ) < 1 && !t .kp .PermitWithoutStream {
outstandingPing = false
t .kpDormant = true
t .kpDormancyCond .Wait ()
}
t .kpDormant = false
t .mu .Unlock ()
if !outstandingPing {
if channelz .IsOn () {
t .channelz .SocketMetrics .KeepAlivesSent .Add (1 )
}
t .controlBuf .put (p )
timeoutLeft = t .kp .Timeout
outstandingPing = true
}
sleepDuration := min (t .kp .Time , timeoutLeft )
timeoutLeft -= sleepDuration
timer .Reset (sleepDuration )
case <- t .ctx .Done ():
if !timer .Stop () {
<-timer .C
}
return
}
}
}
func (t *http2Client ) Error () <-chan struct {} {
return t .ctx .Done ()
}
func (t *http2Client ) GoAway () <-chan struct {} {
return t .goAway
}
func (t *http2Client ) socketMetrics () *channelz .EphemeralSocketMetrics {
return &channelz .EphemeralSocketMetrics {
LocalFlowControlWindow : int64 (t .fc .getSize ()),
RemoteFlowControlWindow : t .getOutFlowWindow (),
}
}
func (t *http2Client ) RemoteAddr () net .Addr { return t .remoteAddr }
func (t *http2Client ) incrMsgSent () {
if channelz .IsOn () {
t .channelz .SocketMetrics .MessagesSent .Add (1 )
t .channelz .SocketMetrics .LastMessageSentTimestamp .Store (time .Now ().UnixNano ())
}
}
func (t *http2Client ) incrMsgRecv () {
if channelz .IsOn () {
t .channelz .SocketMetrics .MessagesReceived .Add (1 )
t .channelz .SocketMetrics .LastMessageReceivedTimestamp .Store (time .Now ().UnixNano ())
}
}
func (t *http2Client ) getOutFlowWindow () int64 {
resp := make (chan uint32 , 1 )
timer := time .NewTimer (time .Second )
defer timer .Stop ()
t .controlBuf .put (&outFlowControlSizeRequest {resp })
select {
case sz := <- resp :
return int64 (sz )
case <- t .ctxDone :
return -1
case <- timer .C :
return -2
}
}
func (t *http2Client ) stateForTesting () transportState {
t .mu .Lock ()
defer t .mu .Unlock ()
return t .state
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .