// Package rate provides rate limiting functionality at a global, network, and subnet level.
package rate import ( manet ) // Limit is the configuration for a token bucket rate limiter. // The bucket has a capacity of Burst, and is refilled at a rate of RPS tokens per second. // Initially, buckets are completley full, i.e. tokens in the bucket is equal to `Burst`. // In any given time interval T seconds, maximum events allowed will be `T*RPS + Burst`. type Limit struct { // RPS is the rate of requests per second in steady state. RPS float64 // Burst is the number of requests allowed over the RPS. Burst int } // PrefixLimit is a rate limit configuration that applies to a specific network prefix. type PrefixLimit struct { Prefix netip.Prefix Limit } // SubnetLimit is a rate limit configuration that applies to a specific subnet. type SubnetLimit struct { PrefixLength int Limit } // Limiter rate limits new streams for a service. It allows setting NetworkPrefix specific, // global, and subnet specific limits. Use 0 for no rate limiting. // The limiter maintains state that must be periodically cleaned up using Cleanup type Limiter struct { // NetworkPrefixLimits are limits for streams with peer IPs belonging to specific subnets. // It can be used to increase the limit for trusted networks and decrease the limit for specific networks. NetworkPrefixLimits []PrefixLimit // GlobalLimit is the limit for all streams where the peer IP doesn't fall within any // of the `NetworkPrefixLimits` GlobalLimit Limit // SubnetRateLimiter is a rate limiter for subnets. SubnetRateLimiter SubnetLimiter initOnce sync.Once globalBucket *rate.Limiter networkPrefixBuckets []*rate.Limiter // ith element ratelimits ith NetworkPrefixLimits } func ( *Limiter) () { .initOnce.Do(func() { if .GlobalLimit.RPS == 0 { .globalBucket = rate.NewLimiter(rate.Inf, 0) } else { .globalBucket = rate.NewLimiter(rate.Limit(.GlobalLimit.RPS), .GlobalLimit.Burst) } // clone the slice in case it's shared with other limiters .NetworkPrefixLimits = slices.Clone(.NetworkPrefixLimits) // sort such that the widest prefix (smallest bit count) is last. slices.SortFunc(.NetworkPrefixLimits, func(, PrefixLimit) int { return .Prefix.Bits() - .Prefix.Bits() }) .networkPrefixBuckets = make([]*rate.Limiter, 0, len(.NetworkPrefixLimits)) for , := range .NetworkPrefixLimits { if .RPS == 0 { .networkPrefixBuckets = append(.networkPrefixBuckets, rate.NewLimiter(rate.Inf, 0)) } else { .networkPrefixBuckets = append(.networkPrefixBuckets, rate.NewLimiter(rate.Limit(.RPS), .Burst)) } } }) } // Limit rate limits a StreamHandler function. func ( *Limiter) ( func( network.Stream)) func( network.Stream) { .init() return func( network.Stream) { := .Conn().RemoteMultiaddr() , := manet.ToIP() if != nil { = nil } , := netip.AddrFromSlice() if ! { = netip.Addr{} } if !.Allow() { _ = .ResetWithError(network.StreamRateLimited) return } () } } // Allow returns true if requests for `ipAddr` are within specified rate limits func ( *Limiter) ( netip.Addr) bool { .init() // Check buckets from the most specific to the least. // // This ensures that a single peer cannot take up all the tokens in the global // rate limiting bucket. We *MUST* follow this order because the rate limiter // implementation doesn't have a `ReturnToken` method. If we checked the global // bucket before the specific bucket, and the specific bucket rejected the // request, there's no way to return the token to the global bucket. So all // rejected requests from the specific bucket would take up tokens from the global bucket. // prefixs have been sorted from most to least specific so rejected requests for more // specific prefixes don't take up tokens from the less specific prefixes. := false for , := range .NetworkPrefixLimits { if .Prefix.Contains() { if !.networkPrefixBuckets[].Allow() { return false } = true } } if { return true } if !.SubnetRateLimiter.Allow(, time.Now()) { return false } return .globalBucket.Allow() } // SubnetLimiter rate limits requests per ip subnet. type SubnetLimiter struct { // IPv4SubnetLimits are the per subnet limits for streams with IPv4 Peers. IPv4SubnetLimits []SubnetLimit // IPv6SubnetLimits are the per subnet limits for streams with IPv6 Peers. IPv6SubnetLimits []SubnetLimit // GracePeriod is the time to wait to remove a full capacity bucket. // Keeping a bucket around helps prevent allocations GracePeriod time.Duration initOnce sync.Once mx sync.Mutex ipv4Heaps []*bucketHeap ipv6Heaps []*bucketHeap } func ( *SubnetLimiter) () { .initOnce.Do(func() { // smaller prefix length, i.e. largest subnet, last slices.SortFunc(.IPv4SubnetLimits, func(, SubnetLimit) int { return .PrefixLength - .PrefixLength }) slices.SortFunc(.IPv6SubnetLimits, func(, SubnetLimit) int { return .PrefixLength - .PrefixLength }) .ipv4Heaps = make([]*bucketHeap, len(.IPv4SubnetLimits)) for := range .IPv4SubnetLimits { .ipv4Heaps[] = &bucketHeap{ prefixBucket: make([]prefixBucketWithExpiry, 0), prefixToIndex: make(map[netip.Prefix]int), } heap.Init(.ipv4Heaps[]) } .ipv6Heaps = make([]*bucketHeap, len(.IPv6SubnetLimits)) for := range .IPv6SubnetLimits { .ipv6Heaps[] = &bucketHeap{ prefixBucket: make([]prefixBucketWithExpiry, 0), prefixToIndex: make(map[netip.Prefix]int), } heap.Init(.ipv6Heaps[]) } }) } // Allow returns true if requests for `ipAddr` are within specified rate limits func ( *SubnetLimiter) ( netip.Addr, time.Time) bool { .init() .mx.Lock() defer .mx.Unlock() .cleanUp() var []SubnetLimit var []*bucketHeap if .Is4() { = .IPv4SubnetLimits = .ipv4Heaps } else { = .IPv6SubnetLimits = .ipv6Heaps } for , := range { , := .Prefix(.PrefixLength) if != nil { return false // we have a ipaddr this shouldn't happen } := [].Get() if == (prefixBucketWithExpiry{}) { = prefixBucketWithExpiry{ Prefix: , tokenBucket: tokenBucket{rate.NewLimiter(rate.Limit(.RPS), .Burst)}, Expiry: , } } if !.Allow() { // bucket is empty, its expiry would have been set correctly the last time // it allowed a request. return false } .Expiry = .FullAt().Add(.GracePeriod) [].Upsert() } return true } // cleanUp removes limiters that have expired by now. func ( *SubnetLimiter) ( time.Time) { for , := range .ipv4Heaps { .Expire() } for , := range .ipv6Heaps { .Expire() } } // tokenBucket is a *rate.Limiter with a `FullAt` method. type tokenBucket struct { *rate.Limiter } // FullAt returns the instant at which the bucket will be full. func ( *tokenBucket) ( time.Time) time.Time { := float64(.Burst()) - .TokensAt() := float64(.Limit()) := time.Duration(( / ) * float64(time.Second)) return .Add() } // prefixBucketWithExpiry is a token bucket with a prefix and Expiry. The expiry is when the bucket // will be full with tokens. type prefixBucketWithExpiry struct { tokenBucket Prefix netip.Prefix Expiry time.Time } // bucketHeap is a heap of buckets ordered by their Expiry. At expiry, the bucket // is removed from the heap as a full bucket is indistinguishable from a new bucket. type bucketHeap struct { prefixBucket []prefixBucketWithExpiry prefixToIndex map[netip.Prefix]int } var _ heap.Interface = (*bucketHeap)(nil) // Upsert replaces the bucket with prefix `b.Prefix` with the provided bucket, `b`, or // inserts `b` if no bucket with prefix `b.Prefix` exists. func ( *bucketHeap) ( prefixBucketWithExpiry) { if , := .prefixToIndex[.Prefix]; { .prefixBucket[] = heap.Fix(, ) return } heap.Push(, ) } // Get returns the limiter for a prefix func ( *bucketHeap) ( netip.Prefix) prefixBucketWithExpiry { if , := .prefixToIndex[]; { return .prefixBucket[] } return prefixBucketWithExpiry{} } // Expire removes elements with expiry before `expiry` func ( *bucketHeap) ( time.Time) { for .Len() > 0 { := .prefixBucket[0] if .Expiry.After() { break } heap.Pop() } } // Methods for the heap interface // Len returns the length of the heap func ( *bucketHeap) () int { return len(.prefixBucket) } // Less compares two elements in the heap func ( *bucketHeap) (, int) bool { return .prefixBucket[].Expiry.Before(.prefixBucket[].Expiry) } // Swap swaps two elements in the heap func ( *bucketHeap) (, int) { .prefixBucket[], .prefixBucket[] = .prefixBucket[], .prefixBucket[] .prefixToIndex[.prefixBucket[].Prefix] = .prefixToIndex[.prefixBucket[].Prefix] = } // Push adds a new element to the heap func ( *bucketHeap) ( any) { := .(prefixBucketWithExpiry) .prefixBucket = append(.prefixBucket, ) .prefixToIndex[.Prefix] = len(.prefixBucket) - 1 } // Pop removes and returns the top element from the heap func ( *bucketHeap) () any { := len(.prefixBucket) := .prefixBucket[-1] .prefixBucket = .prefixBucket[0 : -1] delete(.prefixToIndex, .Prefix) return }