// Copyright 2016 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.package http2import ()// RFC 7540, Section 5.3.5: the default weight is 16.const priorityDefaultWeightRFC7540 = 15// 16 = 15 + 1// PriorityWriteSchedulerConfig configures a priorityWriteScheduler.typePriorityWriteSchedulerConfigstruct {// MaxClosedNodesInTree controls the maximum number of closed streams to // retain in the priority tree. Setting this to zero saves a small amount // of memory at the cost of performance. // // See RFC 7540, Section 5.3.4: // "It is possible for a stream to become closed while prioritization // information ... is in transit. ... This potentially creates suboptimal // prioritization, since the stream could be given a priority that is // different from what is intended. To avoid these problems, an endpoint // SHOULD retain stream prioritization state for a period after streams // become closed. The longer state is retained, the lower the chance that // streams are assigned incorrect or default priority values." MaxClosedNodesInTree int// MaxIdleNodesInTree controls the maximum number of idle streams to // retain in the priority tree. Setting this to zero saves a small amount // of memory at the cost of performance. // // See RFC 7540, Section 5.3.4: // Similarly, streams that are in the "idle" state can be assigned // priority or become a parent of other streams. This allows for the // creation of a grouping node in the dependency tree, which enables // more flexible expressions of priority. Idle streams begin with a // default priority (Section 5.3.5). MaxIdleNodesInTree int// ThrottleOutOfOrderWrites enables write throttling to help ensure that // data is delivered in priority order. This works around a race where // stream B depends on stream A and both streams are about to call Write // to queue DATA frames. If B wins the race, a naive scheduler would eagerly // write as much data from B as possible, but this is suboptimal because A // is a higher-priority stream. With throttling enabled, we write a small // amount of data from B to minimize the amount of bandwidth that B can // steal from A. ThrottleOutOfOrderWrites bool}// NewPriorityWriteScheduler constructs a WriteScheduler that schedules// frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3.// If cfg is nil, default options are used.func ( *PriorityWriteSchedulerConfig) WriteScheduler {if == nil {// For justification of these defaults, see: // https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY = &PriorityWriteSchedulerConfig{MaxClosedNodesInTree: 10,MaxIdleNodesInTree: 10,ThrottleOutOfOrderWrites: false, } } := &priorityWriteSchedulerRFC7540{nodes: make(map[uint32]*priorityNodeRFC7540),maxClosedNodesInTree: .MaxClosedNodesInTree,maxIdleNodesInTree: .MaxIdleNodesInTree,enableWriteThrottle: .ThrottleOutOfOrderWrites, } .nodes[0] = &.rootif .ThrottleOutOfOrderWrites { .writeThrottleLimit = 1024 } else { .writeThrottleLimit = math.MaxInt32 }return}type priorityNodeStateRFC7540 intconst ( priorityNodeOpenRFC7540 priorityNodeStateRFC7540 = iota priorityNodeClosedRFC7540 priorityNodeIdleRFC7540)// priorityNodeRFC7540 is a node in an HTTP/2 priority tree.// Each node is associated with a single stream ID.// See RFC 7540, Section 5.3.type priorityNodeRFC7540 struct { q writeQueue// queue of pending frames to write id uint32// id of the stream, or 0 for the root of the tree weight uint8// the actual weight is weight+1, so the value is in [1,256] state priorityNodeStateRFC7540// open | closed | idle bytes int64// number of bytes written by this node, or 0 if closed subtreeBytes int64// sum(node.bytes) of all nodes in this subtree// These links form the priority tree. parent *priorityNodeRFC7540 kids *priorityNodeRFC7540// start of the kids list prev, next *priorityNodeRFC7540// doubly-linked list of siblings}func ( *priorityNodeRFC7540) ( *priorityNodeRFC7540) {if == {panic("setParent to self") }if .parent == {return }// Unlink from current parent.if := .parent; != nil {if .prev == nil { .kids = .next } else { .prev.next = .next }if .next != nil { .next.prev = .prev } }// Link to new parent. // If parent=nil, remove n from the tree. // Always insert at the head of parent.kids (this is assumed by walkReadyInOrder). .parent = if == nil { .next = nil .prev = nil } else { .next = .kids .prev = nilif .next != nil { .next.prev = } .kids = }}func ( *priorityNodeRFC7540) ( int64) { .bytes += for ; != nil; = .parent { .subtreeBytes += }}// walkReadyInOrder iterates over the tree in priority order, calling f for each node// with a non-empty write queue. When f returns true, this function returns true and the// walk halts. tmp is used as scratch space for sorting.//// f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true// if any ancestor p of n is still open (ignoring the root node).func ( *priorityNodeRFC7540) ( bool, *[]*priorityNodeRFC7540, func(*priorityNodeRFC7540, bool) bool) bool {if !.q.empty() && (, ) {returntrue }if .kids == nil {returnfalse }// Don't consider the root "open" when updating openParent since // we can't send data frames on the root stream (only control frames).if .id != 0 { = || (.state == priorityNodeOpenRFC7540) }// Common case: only one kid or all kids have the same weight. // Some clients don't use weights; other clients (like web browsers) // use mostly-linear priority trees. := .kids.weight := falsefor := .kids.next; != nil; = .next {if .weight != { = truebreak } }if ! {for := .kids; != nil; = .next {if .(, , ) {returntrue } }returnfalse }// Uncommon case: sort the child nodes. We remove the kids from the parent, // then re-insert after sorting so we can reuse tmp for future sort calls. * = (*)[:0]for .kids != nil { * = append(*, .kids) .kids.setParent(nil) }sort.Sort(sortPriorityNodeSiblingsRFC7540(*))for := len(*) - 1; >= 0; -- { (*)[].setParent() // setParent inserts at the head of n.kids }for := .kids; != nil; = .next {if .(, , ) {returntrue } }returnfalse}type sortPriorityNodeSiblingsRFC7540 []*priorityNodeRFC7540func ( sortPriorityNodeSiblingsRFC7540) () int { returnlen() }func ( sortPriorityNodeSiblingsRFC7540) (, int) { [], [] = [], [] }func ( sortPriorityNodeSiblingsRFC7540) (, int) bool {// Prefer the subtree that has sent fewer bytes relative to its weight. // See sections 5.3.2 and 5.3.4. , := float64([].weight)+1, float64([].subtreeBytes) , := float64([].weight)+1, float64([].subtreeBytes)if == 0 && == 0 {return >= }if == 0 {returnfalse }return / <= /}type priorityWriteSchedulerRFC7540 struct {// root is the root of the priority tree, where root.id = 0. // The root queues control frames that are not associated with any stream. root priorityNodeRFC7540// nodes maps stream ids to priority tree nodes. nodes map[uint32]*priorityNodeRFC7540// maxID is the maximum stream id in nodes. maxID uint32// lists of nodes that have been closed or are idle, but are kept in // the tree for improved prioritization. When the lengths exceed either // maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded. closedNodes, idleNodes []*priorityNodeRFC7540// From the config. maxClosedNodesInTree int maxIdleNodesInTree int writeThrottleLimit int32 enableWriteThrottle bool// tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations. tmp []*priorityNodeRFC7540// pool of empty queues for reuse. queuePool writeQueuePool}func ( *priorityWriteSchedulerRFC7540) ( uint32, OpenStreamOptions) {// The stream may be currently idle but cannot be opened or closed.if := .nodes[]; != nil {if .state != priorityNodeIdleRFC7540 {panic(fmt.Sprintf("stream %d already opened", )) } .state = priorityNodeOpenRFC7540return }// RFC 7540, Section 5.3.5: // "All streams are initially assigned a non-exclusive dependency on stream 0x0. // Pushed streams initially depend on their associated stream. In both cases, // streams are assigned a default weight of 16." := .nodes[.PusherID]if == nil { = &.root } := &priorityNodeRFC7540{q: *.queuePool.get(),id: ,weight: priorityDefaultWeightRFC7540,state: priorityNodeOpenRFC7540, } .setParent() .nodes[] = if > .maxID { .maxID = }}func ( *priorityWriteSchedulerRFC7540) ( uint32) {if == 0 {panic("violation of WriteScheduler interface: cannot close stream 0") }if .nodes[] == nil {panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", )) }if .nodes[].state != priorityNodeOpenRFC7540 {panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", )) } := .nodes[] .state = priorityNodeClosedRFC7540 .addBytes(-.bytes) := .q .queuePool.put(&)if .maxClosedNodesInTree > 0 { .addClosedOrIdleNode(&.closedNodes, .maxClosedNodesInTree, ) } else { .removeNode() }}func ( *priorityWriteSchedulerRFC7540) ( uint32, PriorityParam) {if == 0 {panic("adjustPriority on root") }// If streamID does not exist, there are two cases: // - A closed stream that has been removed (this will have ID <= maxID) // - An idle stream that is being used for "grouping" (this will have ID > maxID) := .nodes[]if == nil {if <= .maxID || .maxIdleNodesInTree == 0 {return } .maxID = = &priorityNodeRFC7540{q: *.queuePool.get(),id: ,weight: priorityDefaultWeightRFC7540,state: priorityNodeIdleRFC7540, } .setParent(&.root) .nodes[] = .addClosedOrIdleNode(&.idleNodes, .maxIdleNodesInTree, ) }// Section 5.3.1: A dependency on a stream that is not currently in the tree // results in that stream being given a default priority (Section 5.3.5). := .nodes[.StreamDep]if == nil { .setParent(&.root) .weight = priorityDefaultWeightRFC7540return }// Ignore if the client tries to make a node its own parent.if == {return }// Section 5.3.3: // "If a stream is made dependent on one of its own dependencies, the // formerly dependent stream is first moved to be dependent on the // reprioritized stream's previous parent. The moved dependency retains // its weight." // // That is: if parent depends on n, move parent to depend on n.parent.for := .parent; != nil; = .parent {if == { .setParent(.parent)break } }// Section 5.3.3: The exclusive flag causes the stream to become the sole // dependency of its parent stream, causing other dependencies to become // dependent on the exclusive stream.if .Exclusive { := .kidsfor != nil { := .nextif != { .setParent() } = } } .setParent() .weight = .Weight}func ( *priorityWriteSchedulerRFC7540) ( FrameWriteRequest) {var *priorityNodeRFC7540if .isControl() { = &.root } else { := .StreamID() = .nodes[]if == nil {// id is an idle or closed stream. wr should not be a HEADERS or // DATA frame. In other case, we push wr onto the root, rather // than creating a new priorityNode.if .DataSize() > 0 {panic("add DATA on non-open stream") } = &.root } } .q.push()}func ( *priorityWriteSchedulerRFC7540) () ( FrameWriteRequest, bool) { .root.walkReadyInOrder(false, &.tmp, func( *priorityNodeRFC7540, bool) bool { := int32(math.MaxInt32)if { = .writeThrottleLimit } , = .q.consume()if ! {returnfalse } .addBytes(int64(.DataSize()))// If B depends on A and B continuously has data available but A // does not, gradually increase the throttling limit to allow B to // steal more and more bandwidth from A.if { .writeThrottleLimit += 1024if .writeThrottleLimit < 0 { .writeThrottleLimit = math.MaxInt32 } } elseif .enableWriteThrottle { .writeThrottleLimit = 1024 }returntrue })return , }func ( *priorityWriteSchedulerRFC7540) ( *[]*priorityNodeRFC7540, int, *priorityNodeRFC7540) {if == 0 {return }iflen(*) == {// Remove the oldest node, then shift left. .removeNode((*)[0]) := (*)[1:]copy(*, ) * = (*)[:len()] } * = append(*, )}func ( *priorityWriteSchedulerRFC7540) ( *priorityNodeRFC7540) {for .kids != nil { .kids.setParent(.parent) } .setParent(nil)delete(.nodes, .id)}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.