package redisimport ()type pipelineExecer func(context.Context, []Cmder) error// Pipeliner is an mechanism to realise Redis Pipeline technique.//// Pipelining is a technique to extremely speed up processing by packing// operations to batches, send them at once to Redis and read a replies in a// single step.// See https://redis.io/topics/pipelining//// Pay attention, that Pipeline is not a transaction, so you can get unexpected// results in case of big pipelines and small read/write timeouts.// Redis client has retransmission logic in case of timeouts, pipeline// can be retransmitted and commands can be executed more then once.// To avoid this: it is good idea to use reasonable bigger read/write timeouts// depends of your batch size and/or use TxPipeline.typePipelinerinterface {StatefulCmdable// Len is to obtain the number of commands in the pipeline that have not yet been executed.Len() int// Do is an API for executing any command. // If a certain Redis command is not yet supported, you can use Do to execute it.Do(ctx context.Context, args ...interface{}) *Cmd// Process is to put the commands to be executed into the pipeline buffer.Process(ctx context.Context, cmd Cmder) error// Discard is to discard all commands in the cache that have not yet been executed.Discard()// Exec is to send all the commands buffered in the pipeline to the redis-server.Exec(ctx context.Context) ([]Cmder, error)}var _ Pipeliner = (*Pipeline)(nil)// Pipeline implements pipelining as described in// http://redis.io/topics/pipelining.// Please note: it is not safe for concurrent use by multiple goroutines.typePipelinestruct {cmdablestatefulCmdable exec pipelineExecer cmds []Cmder}func ( *Pipeline) () { .cmdable = .Process .statefulCmdable = .Process}// Len returns the number of queued commands.func ( *Pipeline) () int {returnlen(.cmds)}// Do queues the custom command for later execution.func ( *Pipeline) ( context.Context, ...interface{}) *Cmd { := NewCmd(, ...)iflen() == 0 { .SetErr(errors.New("redis: please enter the command to be executed"))return } _ = .Process(, )return}// Process queues the cmd for later execution.func ( *Pipeline) ( context.Context, Cmder) error { .cmds = append(.cmds, )returnnil}// Discard resets the pipeline and discards queued commands.func ( *Pipeline) () { .cmds = .cmds[:0]}// Exec executes all previously queued commands using one// client-server roundtrip.//// Exec always returns list of commands and error of the first failed// command if any.func ( *Pipeline) ( context.Context) ([]Cmder, error) {iflen(.cmds) == 0 {returnnil, nil } := .cmds .cmds = nilreturn , .exec(, )}func ( *Pipeline) ( context.Context, func(Pipeliner) error) ([]Cmder, error) {if := (); != nil {returnnil, }return .Exec()}func ( *Pipeline) () Pipeliner {return}func ( *Pipeline) ( context.Context, func(Pipeliner) error) ([]Cmder, error) {return .Pipelined(, )}func ( *Pipeline) () Pipeliner {return}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.