// Copyright 2025 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.package http2import ()type streamMetadata struct { location *writeQueue priority PriorityParam}type priorityWriteSchedulerRFC9218 struct {// control contains control frames (SETTINGS, PING, etc.). control writeQueue// heads contain the head of a circular list of streams. // We put these heads within a nested array that represents urgency and // incremental, as defined in // https://www.rfc-editor.org/rfc/rfc9218.html#name-priority-parameters. // 8 represents u=0 up to u=7, and 2 represents i=false and i=true. heads [8][2]*writeQueue// streams contains a mapping between each stream ID and their metadata, so // we can quickly locate them when needing to, for example, adjust their // priority. streams map[uint32]streamMetadata// queuePool are empty queues for reuse. queuePool writeQueuePool// prioritizeIncremental is used to determine whether we should prioritize // incremental streams or not, when urgency is the same in a given Pop() // call. prioritizeIncremental bool}func newPriorityWriteSchedulerRFC9218() WriteScheduler { := &priorityWriteSchedulerRFC9218{streams: make(map[uint32]streamMetadata), }return}func ( *priorityWriteSchedulerRFC9218) ( uint32, OpenStreamOptions) {if .streams[].location != nil {panic(fmt.Errorf("stream %d already opened", )) } := .queuePool.get() .streams[] = streamMetadata{location: ,priority: .priority, } , := .priority.urgency, .priority.incrementalif .heads[][] == nil { .heads[][] = .next = .prev = } else {// Queues are stored in a ring. // Insert the new stream before ws.head, putting it at the end of the list. .prev = .heads[][].prev .next = .heads[][] .prev.next = .next.prev = }}func ( *priorityWriteSchedulerRFC9218) ( uint32) { := .streams[] , , := .location, .priority.urgency, .priority.incrementalif == nil {return }if .next == {// This was the only open stream. .heads[][] = nil } else { .prev.next = .next .next.prev = .previf .heads[][] == { .heads[][] = .next } }delete(.streams, ) .queuePool.put()}func ( *priorityWriteSchedulerRFC9218) ( uint32, PriorityParam) { := .streams[] , , := .location, .priority.urgency, .priority.incrementalif == nil {return }// Remove stream from current location.if .next == {// This was the only open stream. .heads[][] = nil } else { .prev.next = .next .next.prev = .previf .heads[][] == { .heads[][] = .next } }// Insert stream to the new queue. , = .urgency, .incrementalif .heads[][] == nil { .heads[][] = .next = .prev = } else {// Queues are stored in a ring. // Insert the new stream before ws.head, putting it at the end of the list. .prev = .heads[][].prev .next = .heads[][] .prev.next = .next.prev = }// Update the metadata. .streams[] = streamMetadata{location: ,priority: , }}func ( *priorityWriteSchedulerRFC9218) ( FrameWriteRequest) {if .isControl() { .control.push()return } := .streams[.StreamID()].locationif == nil {// This is a closed stream. // wr should not be a HEADERS or DATA frame. // We push the request onto the control queue.if .DataSize() > 0 {panic("add DATA on non-open stream") } .control.push()return } .push()}func ( *priorityWriteSchedulerRFC9218) () (FrameWriteRequest, bool) {// Control and RST_STREAM frames first.if !.control.empty() {return .control.shift(), true }// On the next Pop(), we want to prioritize incremental if we prioritized // non-incremental request of the same urgency this time. Vice-versa. // i.e. when there are incremental and non-incremental requests at the same // priority, we give 50% of our bandwidth to the incremental ones in // aggregate and 50% to the first non-incremental one (since // non-incremental streams do not use round-robin writes). .prioritizeIncremental = !.prioritizeIncremental// Always prioritize lowest u (i.e. highest urgency level).for := range .heads {for := range .heads[] {// When we want to prioritize incremental, we try to pop i=true // first before i=false when u is the same.if .prioritizeIncremental { = ( + 1) % 2 } := .heads[][]if == nil {continue }for {if , := .consume(math.MaxInt32); {if == 1 {// For incremental streams, we update head to q.next so // we can round-robin between multiple streams that can // immediately benefit from partial writes. .heads[][] = .next } else {// For non-incremental streams, we try to finish one to // completion rather than doing round-robin. However, // we update head here so that if q.consume() is !ok // (e.g. the stream has no more frame to consume), head // is updated to the next q that has frames to consume // on future iterations. This way, we do not prioritize // writing to unavailable stream on next Pop() calls, // preventing head-of-line blocking. .heads[][] = }return , true } = .nextif == .heads[][] {break } } } }returnFrameWriteRequest{}, false}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.