// Package rpc2 provides bi-directional RPC client and server similar to net/rpc.
package rpc2 import ( ) // Client represents an RPC Client. // There may be multiple outstanding Calls associated // with a single Client, and a Client may be used by // multiple goroutines simultaneously. type Client struct { mutex sync.Mutex // protects pending, seq, request sending sync.Mutex request Request // temp area used in send() seq uint64 pending map[uint64]*Call closing bool shutdown bool server bool codec Codec handlers map[string]*handler disconnect chan struct{} State *State // additional information to associate with client blocking bool // whether to block request handling } // NewClient returns a new Client to handle requests to the // set of services at the other end of the connection. // It adds a buffer to the write side of the connection so // the header and payload are sent as a unit. func ( io.ReadWriteCloser) *Client { return NewClientWithCodec(NewGobCodec()) } // NewClientWithCodec is like NewClient but uses the specified // codec to encode requests and decode responses. func ( Codec) *Client { return &Client{ codec: , pending: make(map[uint64]*Call), handlers: make(map[string]*handler), disconnect: make(chan struct{}), seq: 1, // 0 means notification. } } // SetBlocking puts the client in blocking mode. // In blocking mode, received requests are processes synchronously. // If you have methods that may take a long time, other subsequent requests may time out. func ( *Client) ( bool) { .blocking = } // Run the client's read loop. // You must run this method before calling any methods on the server. func ( *Client) () { .readLoop() } // DisconnectNotify returns a channel that is closed // when the client connection has gone away. func ( *Client) () chan struct{} { return .disconnect } // Handle registers the handler function for the given method. If a handler already exists for method, Handle panics. func ( *Client) ( string, interface{}) { addHandler(.handlers, , ) } // readLoop reads messages from codec. // It reads a reqeust or a response to the previous request. // If the message is request, calls the handler function. // If the message is response, sends the reply to the associated call. func ( *Client) () { var error var Request var Response for == nil { = Request{} = Response{} if = .codec.ReadHeader(&, &); != nil { break } if .Method != "" { // request comes to server if = .readRequest(&); != nil { debugln("rpc2: error reading request:", .Error()) } } else { // response comes to client if = .readResponse(&); != nil { debugln("rpc2: error reading response:", .Error()) } } } // Terminate pending calls. .sending.Lock() .mutex.Lock() .shutdown = true := .closing if == io.EOF { if { = ErrShutdown } else { = io.ErrUnexpectedEOF } } for , := range .pending { .Error = .done() } .mutex.Unlock() .sending.Unlock() if != io.EOF && ! && !.server { debugln("rpc2: client protocol error:", ) } close(.disconnect) if ! { .codec.Close() } } func ( *Client) ( Request, *handler, reflect.Value) { // Invoke the method, providing a new value for the reply. := reflect.New(.replyType.Elem()) := .fn.Call([]reflect.Value{reflect.ValueOf(), , }) // Do not send response if request is a notification. if .Seq == 0 { return } // The return value for the method is an error. := [0].Interface() := "" if != nil { = .(error).Error() } := &Response{ Seq: .Seq, Error: , } if := .codec.WriteResponse(, .Interface()); != nil { debugln("rpc2: error writing response:", .Error()) } } func ( *Client) ( *Request) error { , := .handlers[.Method] if ! { := &Response{ Seq: .Seq, Error: "rpc2: can't find method " + .Method, } return .codec.WriteResponse(, ) } // Decode the argument value. var reflect.Value := false // if true, need to indirect before calling. if .argType.Kind() == reflect.Ptr { = reflect.New(.argType.Elem()) } else { = reflect.New(.argType) = true } // argv guaranteed to be a pointer now. if := .codec.ReadRequestBody(.Interface()); != nil { return } if { = .Elem() } if .blocking { .handleRequest(*, , ) } else { go .handleRequest(*, , ) } return nil } func ( *Client) ( *Response) error { := .Seq .mutex.Lock() := .pending[] delete(.pending, ) .mutex.Unlock() var error switch { case == nil: // We've got no pending call. That usually means that // WriteRequest partially failed, and call was already // removed; response is a server telling us about an // error reading request body. We should still attempt // to read error body, but there's no one to give it to. = .codec.ReadResponseBody(nil) if != nil { = errors.New("reading error body: " + .Error()) } case .Error != "": // We've got an error response. Give this to the request; // any subsequent requests will get the ReadResponseBody // error if there is one. .Error = ServerError(.Error) = .codec.ReadResponseBody(nil) if != nil { = errors.New("reading error body: " + .Error()) } .done() default: = .codec.ReadResponseBody(.Reply) if != nil { .Error = errors.New("reading body " + .Error()) } .done() } return } // Close waits for active calls to finish and closes the codec. func ( *Client) () error { .mutex.Lock() if .shutdown || .closing { .mutex.Unlock() return ErrShutdown } .closing = true .mutex.Unlock() return .codec.Close() } // Go invokes the function asynchronously. It returns the Call structure representing // the invocation. The done channel will signal when the call is complete by returning // the same Call object. If done is nil, Go will allocate a new channel. // If non-nil, done must be buffered or Go will deliberately crash. func ( *Client) ( string, interface{}, interface{}, chan *Call) *Call { := new(Call) .Method = .Args = .Reply = if == nil { = make(chan *Call, 10) // buffered. } else { // If caller passes done != nil, it must arrange that // done has enough buffer for the number of simultaneous // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all. if cap() == 0 { log.Panic("rpc2: done channel is unbuffered") } } .Done = .send() return } // CallWithContext invokes the named function, waits for it to complete, and // returns its error status, or an error from Context timeout. func ( *Client) ( context.Context, string, interface{}, interface{}) error { := .Go(, , , make(chan *Call, 1)) select { case <-.Done: return .Error case <-.Done(): return .Err() } return nil } // Call invokes the named function, waits for it to complete, and returns its error status. func ( *Client) ( string, interface{}, interface{}) error { return .CallWithContext(context.Background(), , , ) } func ( *Call) () { select { case .Done <- : // ok default: // We don't want to block here. It is the caller's responsibility to make // sure the channel has enough buffer space. See comment in Go(). debugln("rpc2: discarding Call reply due to insufficient Done chan capacity") } } // ServerError represents an error that has been returned from // the remote side of the RPC connection. type ServerError string func ( ServerError) () string { return string() } // ErrShutdown is returned when the connection is closing or closed. var ErrShutdown = errors.New("connection is shut down") // Call represents an active RPC. type Call struct { Method string // The name of the service and method to call. Args interface{} // The argument to the function (*struct). Reply interface{} // The reply from the function (*struct). Error error // After completion, the error status. Done chan *Call // Strobes when call is complete. } func ( *Client) ( *Call) { .sending.Lock() defer .sending.Unlock() // Register this call. .mutex.Lock() if .shutdown || .closing { .Error = ErrShutdown .mutex.Unlock() .done() return } := .seq .seq++ .pending[] = .mutex.Unlock() // Encode and send the request. .request.Seq = .request.Method = .Method := .codec.WriteRequest(&.request, .Args) if != nil { .mutex.Lock() = .pending[] delete(.pending, ) .mutex.Unlock() if != nil { .Error = .done() } } } // Notify sends a request to the receiver but does not wait for a return value. func ( *Client) ( string, interface{}) error { .sending.Lock() defer .sending.Unlock() if .shutdown || .closing { return ErrShutdown } .request.Seq = 0 .request.Method = return .codec.WriteRequest(&.request, ) }