Source File
cache.go
Belonging Package
github.com/dgraph-io/ristretto/v2
/** SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>* SPDX-License-Identifier: Apache-2.0*/// Ristretto is a fast, fixed size, in-memory cache with a dual focus on// throughput and hit ratio performance. You can easily add Ristretto to an// existing system and keep the most valuable data where you need it.package ristrettoimport ()var (// TODO: find the optimal value for this or make it configurablesetBufSize = 32 * 1024)const itemSize = int64(unsafe.Sizeof(storeItem[any]{}))func zeroValue[ any]() {varreturn}// Key is the generic type to represent the keys type in key-value pair of the cache.type Key = z.Key// Cache is a thread-safe implementation of a hashmap with a TinyLFU admission// policy and a Sampled LFU eviction policy. You can use the same Cache instance// from as many goroutines as you want.type Cache[ Key, any] struct {// storedItems is the central concurrent hashmap where key-value items are stored.storedItems store[]// cachePolicy determines what gets let in to the cache and what gets kicked out.cachePolicy *defaultPolicy[]// getBuf is a custom ring buffer implementation that gets pushed to when// keys are read.getBuf *ringBuffer// setBuf is a buffer allowing us to batch/drop Sets during times of high// contention.setBuf chan *Item[]// onEvict is called for item evictions.onEvict func(*Item[])// onReject is called when an item is rejected via admission policy.onReject func(*Item[])// onExit is called whenever a value goes out of scope from the cache.onExit (func())// KeyToHash function is used to customize the key hashing algorithm.// Each key will be hashed using the provided function. If keyToHash value// is not set, the default keyToHash function is used.keyToHash func() (uint64, uint64)// stop is used to stop the processItems goroutine.stop chan struct{}done chan struct{}// indicates whether cache is closed.isClosed atomic.Bool// cost calculates cost from a value.cost func(value ) int64// ignoreInternalCost dictates whether to ignore the cost of internally storing// the item in the cost calculation.ignoreInternalCost bool// cleanupTicker is used to periodically check for entries whose TTL has passed.cleanupTicker *time.Ticker// Metrics contains a running log of important statistics like hits, misses,// and dropped items.Metrics *Metrics}// Config is passed to NewCache for creating new Cache instances.type Config[ Key, any] struct {// NumCounters determines the number of counters (keys) to keep that hold// access frequency information. It's generally a good idea to have more// counters than the max cache capacity, as this will improve eviction// accuracy and subsequent hit ratios.//// For example, if you expect your cache to hold 1,000,000 items when full,// NumCounters should be 10,000,000 (10x). Each counter takes up roughly// 3 bytes (4 bits for each counter * 4 copies plus about a byte per// counter for the bloom filter). Note that the number of counters is// internally rounded up to the nearest power of 2, so the space usage// may be a little larger than 3 bytes * NumCounters.//// We've seen good performance in setting this to 10x the number of items// you expect to keep in the cache when full.NumCounters int64// MaxCost is how eviction decisions are made. For example, if MaxCost is// 100 and a new item with a cost of 1 increases total cache cost to 101,// 1 item will be evicted.//// MaxCost can be considered as the cache capacity, in whatever units you// choose to use.//// For example, if you want the cache to have a max capacity of 100MB, you// would set MaxCost to 100,000,000 and pass an item's number of bytes as// the `cost` parameter for calls to Set. If new items are accepted, the// eviction process will take care of making room for the new item and not// overflowing the MaxCost value.//// MaxCost could be anything as long as it matches how you're using the cost// values when calling Set.MaxCost int64// BufferItems determines the size of Get buffers.//// Unless you have a rare use case, using `64` as the BufferItems value// results in good performance.//// If for some reason you see Get performance decreasing with lots of// contention (you shouldn't), try increasing this value in increments of 64.// This is a fine-tuning mechanism and you probably won't have to touch this.BufferItems int64// Metrics is true when you want variety of stats about the cache.// There is some overhead to keeping statistics, so you should only set this// flag to true when testing or throughput performance isn't a major factor.Metrics bool// OnEvict is called for every eviction with the evicted item.OnEvict func(item *Item[])// OnReject is called for every rejection done via the policy.OnReject func(item *Item[])// OnExit is called whenever a value is removed from cache. This can be// used to do manual memory deallocation. Would also be called on eviction// as well as on rejection of the value.OnExit func(val )// ShouldUpdate is called when a value already exists in cache and is being updated.// If ShouldUpdate returns true, the cache continues with the update (Set). If the// function returns false, no changes are made in the cache. If the value doesn't// already exist, the cache continue with setting that value for the given key.//// In this function, you can check whether the new value is valid. For example, if// your value has timestamp assosicated with it, you could check whether the new// value has the latest timestamp, preventing you from setting an older value.ShouldUpdate func(cur, prev ) bool// KeyToHash function is used to customize the key hashing algorithm.// Each key will be hashed using the provided function. If keyToHash value// is not set, the default keyToHash function is used.//// Ristretto has a variety of defaults depending on the underlying interface type// https://github.com/hypermodeinc/ristretto/blob/main/z/z.go#L19-L41).//// Note that if you want 128bit hashes you should use the both the values// in the return of the function. If you want to use 64bit hashes, you can// just return the first uint64 and return 0 for the second uint64.KeyToHash func(key ) (uint64, uint64)// Cost evaluates a value and outputs a corresponding cost. This function is ran// after Set is called for a new item or an item is updated with a cost param of 0.//// Cost is an optional function you can pass to the Config in order to evaluate// item cost at runtime, and only whentthe Set call isn't going to be dropped. This// is useful if calculating item cost is particularly expensive and you don't want to// waste time on items that will be dropped anyways.//// To signal to Ristretto that you'd like to use this Cost function:// 1. Set the Cost field to a non-nil function.// 2. When calling Set for new items or item updates, use a `cost` of 0.Cost func(value ) int64// IgnoreInternalCost set to true indicates to the cache that the cost of// internally storing the value should be ignored. This is useful when the// cost passed to set is not using bytes as units. Keep in mind that setting// this to true will increase the memory usage.IgnoreInternalCost bool// TtlTickerDurationInSec sets the value of time ticker for cleanup keys on TTL expiry.TtlTickerDurationInSec int64}type itemFlag byteconst (itemNew itemFlag = iotaitemDeleteitemUpdate)// Item is a full representation of what's stored in the cache for each key-value pair.type Item[ any] struct {flag itemFlagKey uint64Conflict uint64ValueCost int64Expiration time.Timewait chan struct{}}// NewCache returns a new Cache instance and any configuration errors, if any.func [ Key, any]( *Config[, ]) (*Cache[, ], error) {switch {case .NumCounters == 0:return nil, errors.New("NumCounters can't be zero")case .NumCounters < 0:return nil, errors.New("NumCounters can't be negative number")case .MaxCost == 0:return nil, errors.New("MaxCost can't be zero")case .MaxCost < 0:return nil, errors.New("MaxCost can't be be negative number")case .BufferItems == 0:return nil, errors.New("BufferItems can't be zero")case .BufferItems < 0:return nil, errors.New("BufferItems can't be be negative number")case .TtlTickerDurationInSec == 0:.TtlTickerDurationInSec = bucketDurationSecs}:= newPolicy[](.NumCounters, .MaxCost):= &Cache[, ]{storedItems: newStore[](),cachePolicy: ,getBuf: newRingBuffer(, .BufferItems),setBuf: make(chan *Item[], setBufSize),keyToHash: .KeyToHash,stop: make(chan struct{}),done: make(chan struct{}),cost: .Cost,ignoreInternalCost: .IgnoreInternalCost,cleanupTicker: time.NewTicker(time.Duration(.TtlTickerDurationInSec) * time.Second / 2),}.storedItems.SetShouldUpdateFn(.ShouldUpdate).onExit = func( ) {if .OnExit != nil {.OnExit()}}.onEvict = func( *Item[]) {if .OnEvict != nil {.OnEvict()}.onExit(.Value)}.onReject = func( *Item[]) {if .OnReject != nil {.OnReject()}.onExit(.Value)}if .keyToHash == nil {.keyToHash = z.KeyToHash[]}if .Metrics {.collectMetrics()}// NOTE: benchmarks seem to show that performance decreases the more// goroutines we have running cache.processItems(), so 1 should// usually be sufficientgo .processItems()return , nil}// Wait blocks until all buffered writes have been applied. This ensures a call to Set()// will be visible to future calls to Get().func ( *Cache[, ]) () {if == nil || .isClosed.Load() {return}:= make(chan struct{}).setBuf <- &Item[]{wait: }<-}// Get returns the value (if any) and a boolean representing whether the// value was found or not. The value can be nil and the boolean can be true at// the same time. Get will not return expired items.func ( *Cache[, ]) ( ) (, bool) {if == nil || .isClosed.Load() {return zeroValue[](), false}, := .keyToHash().getBuf.Push(), := .storedItems.Get(, )if {.Metrics.add(hit, , 1)} else {.Metrics.add(miss, , 1)}return ,}// Set attempts to add the key-value item to the cache. If it returns false,// then the Set was dropped and the key-value item isn't added to the cache. If// it returns true, there's still a chance it could be dropped by the policy if// its determined that the key-value item isn't worth keeping, but otherwise the// item will be added and other items will be evicted in order to make room.//// To dynamically evaluate the items cost using the Config.Coster function, set// the cost parameter to 0 and Coster will be ran when needed in order to find// the items true cost.//// Set writes the value of type V as is. If type V is a pointer type, It is ok// to update the memory pointed to by the pointer. Updating the pointer itself// will not be reflected in the cache. Be careful when using slice types as the// value type V. Calling `append` may update the underlined array pointer which// will not be reflected in the cache.func ( *Cache[, ]) ( , , int64) bool {return .SetWithTTL(, , , 0*time.Second)}// SetWithTTL works like Set but adds a key-value pair to the cache that will expire// after the specified TTL (time to live) has passed. A zero value means the value never// expires, which is identical to calling Set. A negative value is a no-op and the value// is discarded.//// See Set for more information.func ( *Cache[, ]) ( , , int64, time.Duration) bool {if == nil || .isClosed.Load() {return false}var time.Timeswitch {case == 0:// No expiration.breakcase < 0:// Treat this a no-op.return falsedefault:= time.Now().Add()}, := .keyToHash():= &Item[]{flag: itemNew,Key: ,Conflict: ,Value: ,Cost: ,Expiration: ,}// cost is eventually updated. The expiration must also be immediately updated// to prevent items from being prematurely removed from the map.if , := .storedItems.Update(); {.onExit().flag = itemUpdate}// Attempt to send item to cachePolicy.select {case .setBuf <- :return truedefault:if .flag == itemUpdate {// Return true if this was an update operation since we've already// updated the storedItems. For all the other operations (set/delete), we// return false which means the item was not inserted.return true}.Metrics.add(dropSets, , 1)return false}}// Del deletes the key-value item from the cache if it exists.func ( *Cache[, ]) ( ) {if == nil || .isClosed.Load() {return}, := .keyToHash()// Delete immediately., := .storedItems.Del(, ).onExit()// If we've set an item, it would be applied slightly later.// So we must push the same item to `setBuf` with the deletion flag.// This ensures that if a set is followed by a delete, it will be// applied in the correct order..setBuf <- &Item[]{flag: itemDelete,Key: ,Conflict: ,}}// GetTTL returns the TTL for the specified key and a bool that is true if the// item was found and is not expired.func ( *Cache[, ]) ( ) (time.Duration, bool) {if == nil {return 0, false}, := .keyToHash()if , := .storedItems.Get(, ); ! {// not foundreturn 0, false}:= .storedItems.Expiration()if .IsZero() {// found but no expirationreturn 0, true}if time.Now().After() {// found but expiredreturn 0, false}return time.Until(), true}// Close stops all goroutines and closes all channels.func ( *Cache[, ]) () {if == nil || .isClosed.Load() {return}.Clear()// Block until processItems goroutine is returned..stop <- struct{}{}<-.doneclose(.stop)close(.done)close(.setBuf).cachePolicy.Close().cleanupTicker.Stop().isClosed.Store(true)}// Clear empties the hashmap and zeroes all cachePolicy counters. Note that this is// not an atomic operation (but that shouldn't be a problem as it's assumed that// Set/Get calls won't be occurring until after this).func ( *Cache[, ]) () {if == nil || .isClosed.Load() {return}// Block until processItems goroutine is returned..stop <- struct{}{}<-.done// Clear out the setBuf channel.:for {select {case := <-.setBuf:if .wait != nil {close(.wait)continue}if .flag != itemUpdate {// In itemUpdate, the value is already set in the storedItems. So, no need to call// onEvict here..onEvict()}default:break}}// Clear value hashmap and cachePolicy data..cachePolicy.Clear().storedItems.Clear(.onEvict)// Only reset metrics if they're enabled.if .Metrics != nil {.Metrics.Clear()}// Restart processItems goroutine.go .processItems()}// MaxCost returns the max cost of the cache.func ( *Cache[, ]) () int64 {if == nil {return 0}return .cachePolicy.MaxCost()}// UpdateMaxCost updates the maxCost of an existing cache.func ( *Cache[, ]) ( int64) {if == nil {return}.cachePolicy.UpdateMaxCost()}// processItems is ran by goroutines processing the Set buffer.func ( *Cache[, ]) () {:= make(map[uint64]time.Time):= 100000 // TODO: Make this configurable via options.:= func( uint64) {if .Metrics == nil {return}[] = time.Now()if len() > {for := range {if len() <= {break}delete(, )}}}:= func( *Item[]) {if , := [.Key]; {.Metrics.trackEviction(int64(time.Since() / time.Second))delete(, .Key)}if .onEvict != nil {.onEvict()}}for {select {case := <-.setBuf:if .wait != nil {close(.wait)continue}// Calculate item cost value if new or update.if .Cost == 0 && .cost != nil && .flag != itemDelete {.Cost = .cost(.Value)}if !.ignoreInternalCost {// Add the cost of internally storing the object..Cost += itemSize}switch .flag {case itemNew:, := .cachePolicy.Add(.Key, .Cost)if {.storedItems.Set().Metrics.add(keyAdd, .Key, 1)(.Key)} else {.onReject()}for , := range {.Conflict, .Value = .storedItems.Del(.Key, 0)()}case itemUpdate:.cachePolicy.Update(.Key, .Cost)case itemDelete:.cachePolicy.Del(.Key) // Deals with metrics updates., := .storedItems.Del(.Key, .Conflict).onExit()}case <-.cleanupTicker.C:.storedItems.Cleanup(.cachePolicy, )case <-.stop:.done <- struct{}{}return}}}// collectMetrics just creates a new *Metrics instance and adds the pointers// to the cache and policy instances.func ( *Cache[, ]) () {.Metrics = newMetrics().cachePolicy.CollectMetrics(.Metrics)}type metricType intconst (// The following 2 keep track of hits and misses.hit = iotamiss// The following 3 keep track of number of keys added, updated and evicted.keyAddkeyUpdatekeyEvict// The following 2 keep track of cost of keys added and evicted.costAddcostEvict// The following keep track of how many sets were dropped or rejected later.dropSetsrejectSets// The following 2 keep track of how many gets were kept and dropped on the// floor.dropGetskeepGets// This should be the final enum. Other enums should be set before this.doNotUse)func stringFor( metricType) string {switch {case hit:return "hit"case miss:return "miss"case keyAdd:return "keys-added"case keyUpdate:return "keys-updated"case keyEvict:return "keys-evicted"case costAdd:return "cost-added"case costEvict:return "cost-evicted"case dropSets:return "sets-dropped"case rejectSets:return "sets-rejected" // by policy.case dropGets:return "gets-dropped"case keepGets:return "gets-kept"default:return "unidentified"}}// Metrics is a snapshot of performance statistics for the lifetime of a cache instance.type Metrics struct {all [doNotUse][]*uint64mu sync.RWMutexlife *z.HistogramData // Tracks the life expectancy of a key.}func newMetrics() *Metrics {:= &Metrics{life: z.NewHistogramData(z.HistogramBounds(1, 16)),}for := 0; < doNotUse; ++ {.all[] = make([]*uint64, 256):= .all[]for := range {[] = new(uint64)}}return}func ( *Metrics) ( metricType, , uint64) {if == nil {return}:= .all[]// Avoid false sharing by padding at least 64 bytes of space between two// atomic counters which would be incremented.:= ( % 25) * 10atomic.AddUint64([], )}func ( *Metrics) ( metricType) uint64 {if == nil {return 0}:= .all[]var uint64for := range {+= atomic.LoadUint64([])}return}// Hits is the number of Get calls where a value was found for the corresponding key.func ( *Metrics) () uint64 {return .get(hit)}// Misses is the number of Get calls where a value was not found for the corresponding key.func ( *Metrics) () uint64 {return .get(miss)}// KeysAdded is the total number of Set calls where a new key-value item was added.func ( *Metrics) () uint64 {return .get(keyAdd)}// KeysUpdated is the total number of Set calls where the value was updated.func ( *Metrics) () uint64 {return .get(keyUpdate)}// KeysEvicted is the total number of keys evicted.func ( *Metrics) () uint64 {return .get(keyEvict)}// CostAdded is the sum of costs that have been added (successful Set calls).func ( *Metrics) () uint64 {return .get(costAdd)}// CostEvicted is the sum of all costs that have been evicted.func ( *Metrics) () uint64 {return .get(costEvict)}// SetsDropped is the number of Set calls that don't make it into internal// buffers (due to contention or some other reason).func ( *Metrics) () uint64 {return .get(dropSets)}// SetsRejected is the number of Set calls rejected by the policy (TinyLFU).func ( *Metrics) () uint64 {return .get(rejectSets)}// GetsDropped is the number of Get counter increments that are dropped// internally.func ( *Metrics) () uint64 {return .get(dropGets)}// GetsKept is the number of Get counter increments that are kept.func ( *Metrics) () uint64 {return .get(keepGets)}// Ratio is the number of Hits over all accesses (Hits + Misses). This is the// percentage of successful Get calls.func ( *Metrics) () float64 {if == nil {return 0.0}, := .get(hit), .get(miss)if == 0 && == 0 {return 0.0}return float64() / float64(+)}func ( *Metrics) ( int64) {if == nil {return}.mu.Lock()defer .mu.Unlock().life.Update()}func ( *Metrics) () *z.HistogramData {if == nil {return nil}.mu.RLock()defer .mu.RUnlock()return .life.Copy()}// Clear resets all the metrics.func ( *Metrics) () {if == nil {return}for := 0; < doNotUse; ++ {for := range .all[] {atomic.StoreUint64(.all[][], 0)}}.mu.Lock().life = z.NewHistogramData(z.HistogramBounds(1, 16)).mu.Unlock()}// String returns a string representation of the metrics.func ( *Metrics) () string {if == nil {return ""}var bytes.Bufferfor := 0; < doNotUse; ++ {:= metricType()fmt.Fprintf(&, "%s: %d ", stringFor(), .get())}fmt.Fprintf(&, "gets-total: %d ", .get(hit)+.get(miss))fmt.Fprintf(&, "hit-ratio: %.2f", .Ratio())return .String()}
![]() |
The pages are generated with Golds v0.8.4. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds. |