// Package rpc2 provides bi-directional RPC client and server similar to net/rpc.
package rpc2import ()// Client represents an RPC Client.// There may be multiple outstanding Calls associated// with a single Client, and a Client may be used by// multiple goroutines simultaneously.typeClientstruct { mutex sync.Mutex// protects pending, seq, request sending sync.Mutex request Request// temp area used in send() seq uint64 pending map[uint64]*Call closing bool shutdown bool server bool codec Codec handlers map[string]*handler disconnect chanstruct{} State *State// additional information to associate with client blocking bool// whether to block request handling}// NewClient returns a new Client to handle requests to the// set of services at the other end of the connection.// It adds a buffer to the write side of the connection so// the header and payload are sent as a unit.func ( io.ReadWriteCloser) *Client {returnNewClientWithCodec(NewGobCodec())}// NewClientWithCodec is like NewClient but uses the specified// codec to encode requests and decode responses.func ( Codec) *Client {return &Client{codec: ,pending: make(map[uint64]*Call),handlers: make(map[string]*handler),disconnect: make(chanstruct{}),seq: 1, // 0 means notification. }}// SetBlocking puts the client in blocking mode.// In blocking mode, received requests are processes synchronously.// If you have methods that may take a long time, other subsequent requests may time out.func ( *Client) ( bool) { .blocking = }// Run the client's read loop.// You must run this method before calling any methods on the server.func ( *Client) () { .readLoop()}// DisconnectNotify returns a channel that is closed// when the client connection has gone away.func ( *Client) () chanstruct{} {return .disconnect}// Handle registers the handler function for the given method. If a handler already exists for method, Handle panics.func ( *Client) ( string, interface{}) {addHandler(.handlers, , )}// readLoop reads messages from codec.// It reads a reqeust or a response to the previous request.// If the message is request, calls the handler function.// If the message is response, sends the reply to the associated call.func ( *Client) () {varerrorvarRequestvarResponsefor == nil { = Request{} = Response{}if = .codec.ReadHeader(&, &); != nil {break }if .Method != "" {// request comes to serverif = .readRequest(&); != nil {debugln("rpc2: error reading request:", .Error()) } } else {// response comes to clientif = .readResponse(&); != nil {debugln("rpc2: error reading response:", .Error()) } } }// Terminate pending calls. .sending.Lock() .mutex.Lock() .shutdown = true := .closingif == io.EOF {if { = ErrShutdown } else { = io.ErrUnexpectedEOF } }for , := range .pending { .Error = .done() } .mutex.Unlock() .sending.Unlock()if != io.EOF && ! && !.server {debugln("rpc2: client protocol error:", ) }close(.disconnect)if ! { .codec.Close() }}func ( *Client) ( Request, *handler, reflect.Value) {// Invoke the method, providing a new value for the reply. := reflect.New(.replyType.Elem()) := .fn.Call([]reflect.Value{reflect.ValueOf(), , })// Do not send response if request is a notification.if .Seq == 0 {return }// The return value for the method is an error. := [0].Interface() := ""if != nil { = .(error).Error() } := &Response{Seq: .Seq,Error: , }if := .codec.WriteResponse(, .Interface()); != nil {debugln("rpc2: error writing response:", .Error()) }}func ( *Client) ( *Request) error { , := .handlers[.Method]if ! { := &Response{Seq: .Seq,Error: "rpc2: can't find method " + .Method, }return .codec.WriteResponse(, ) }// Decode the argument value.varreflect.Value := false// if true, need to indirect before calling.if .argType.Kind() == reflect.Ptr { = reflect.New(.argType.Elem()) } else { = reflect.New(.argType) = true }// argv guaranteed to be a pointer now.if := .codec.ReadRequestBody(.Interface()); != nil {return }if { = .Elem() }if .blocking { .handleRequest(*, , ) } else {go .handleRequest(*, , ) }returnnil}func ( *Client) ( *Response) error { := .Seq .mutex.Lock() := .pending[]delete(.pending, ) .mutex.Unlock()varerrorswitch {case == nil:// We've got no pending call. That usually means that // WriteRequest partially failed, and call was already // removed; response is a server telling us about an // error reading request body. We should still attempt // to read error body, but there's no one to give it to. = .codec.ReadResponseBody(nil)if != nil { = errors.New("reading error body: " + .Error()) }case .Error != "":// We've got an error response. Give this to the request; // any subsequent requests will get the ReadResponseBody // error if there is one. .Error = ServerError(.Error) = .codec.ReadResponseBody(nil)if != nil { = errors.New("reading error body: " + .Error()) } .done()default: = .codec.ReadResponseBody(.Reply)if != nil { .Error = errors.New("reading body " + .Error()) } .done() }return}// Close waits for active calls to finish and closes the codec.func ( *Client) () error { .mutex.Lock()if .shutdown || .closing { .mutex.Unlock()returnErrShutdown } .closing = true .mutex.Unlock()return .codec.Close()}// Go invokes the function asynchronously. It returns the Call structure representing// the invocation. The done channel will signal when the call is complete by returning// the same Call object. If done is nil, Go will allocate a new channel.// If non-nil, done must be buffered or Go will deliberately crash.func ( *Client) ( string, interface{}, interface{}, chan *Call) *Call { := new(Call) .Method = .Args = .Reply = if == nil { = make(chan *Call, 10) // buffered. } else {// If caller passes done != nil, it must arrange that // done has enough buffer for the number of simultaneous // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all.ifcap() == 0 {log.Panic("rpc2: done channel is unbuffered") } } .Done = .send()return}// CallWithContext invokes the named function, waits for it to complete, and// returns its error status, or an error from Context timeout.func ( *Client) ( context.Context, string, interface{}, interface{}) error { := .Go(, , , make(chan *Call, 1))select {case<-.Done:return .Errorcase<-.Done():return .Err() }returnnil}// Call invokes the named function, waits for it to complete, and returns its error status.func ( *Client) ( string, interface{}, interface{}) error {return .CallWithContext(context.Background(), , , )}func ( *Call) () {select {case .Done<- :// okdefault:// We don't want to block here. It is the caller's responsibility to make // sure the channel has enough buffer space. See comment in Go().debugln("rpc2: discarding Call reply due to insufficient Done chan capacity") }}// ServerError represents an error that has been returned from// the remote side of the RPC connection.typeServerErrorstringfunc ( ServerError) () string {returnstring()}// ErrShutdown is returned when the connection is closing or closed.varErrShutdown = errors.New("connection is shut down")// Call represents an active RPC.typeCallstruct { Method string// The name of the service and method to call. Args interface{} // The argument to the function (*struct). Reply interface{} // The reply from the function (*struct). Error error// After completion, the error status. Done chan *Call// Strobes when call is complete.}func ( *Client) ( *Call) { .sending.Lock()defer .sending.Unlock()// Register this call. .mutex.Lock()if .shutdown || .closing { .Error = ErrShutdown .mutex.Unlock() .done()return } := .seq .seq++ .pending[] = .mutex.Unlock()// Encode and send the request. .request.Seq = .request.Method = .Method := .codec.WriteRequest(&.request, .Args)if != nil { .mutex.Lock() = .pending[]delete(.pending, ) .mutex.Unlock()if != nil { .Error = .done() } }}// Notify sends a request to the receiver but does not wait for a return value.func ( *Client) ( string, interface{}) error { .sending.Lock()defer .sending.Unlock()if .shutdown || .closing {returnErrShutdown } .request.Seq = 0 .request.Method = return .codec.WriteRequest(&.request, )}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.