// Package server provides MCP (Model Context Protocol) server implementations.
package server import ( ) // resourceEntry holds both a resource and its handler type resourceEntry struct { resource mcp.Resource handler ResourceHandlerFunc } // resourceTemplateEntry holds both a template and its handler type resourceTemplateEntry struct { template mcp.ResourceTemplate handler ResourceTemplateHandlerFunc } // taskEntry holds task state and associated data type taskEntry struct { task mcp.Task sessionID string toolName string // Name of the tool that created this task createdAt time.Time // When the task was created (for metrics) result any // The actual result once completed resultErr error // Error if task failed cancelFunc context.CancelFunc // Function to cancel the task done chan struct{} // Channel to signal task completion completed bool // Whether the task has been completed (guards done channel closure) } // ServerOption is a function that configures an MCPServer. type ServerOption func(*MCPServer) // ResourceHandlerFunc is a function that returns resource contents. type ResourceHandlerFunc func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) // ResourceTemplateHandlerFunc is a function that returns a resource template. type ResourceTemplateHandlerFunc func(ctx context.Context, request mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) // PromptHandlerFunc handles prompt requests with given arguments. type PromptHandlerFunc func(ctx context.Context, request mcp.GetPromptRequest) (*mcp.GetPromptResult, error) // ToolHandlerFunc handles tool calls with given arguments. type ToolHandlerFunc func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) // TaskToolHandlerFunc handles tool calls that execute asynchronously. // It returns immediately with task creation info; the actual result is // retrieved later via tasks/result. type TaskToolHandlerFunc func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CreateTaskResult, error) // ToolHandlerMiddleware is a middleware function that wraps a ToolHandlerFunc. type ToolHandlerMiddleware func(ToolHandlerFunc) ToolHandlerFunc // ResourceHandlerMiddleware is a middleware function that wraps a ResourceHandlerFunc. type ResourceHandlerMiddleware func(ResourceHandlerFunc) ResourceHandlerFunc // ToolFilterFunc is a function that filters tools based on context, typically using session information. type ToolFilterFunc func(ctx context.Context, tools []mcp.Tool) []mcp.Tool // PromptHandlerMiddleware is a middleware function that wraps a PromptHandlerFunc. type PromptHandlerMiddleware func(PromptHandlerFunc) PromptHandlerFunc // PromptFilterFunc is a function that filters prompts based on context, typically using session information. type PromptFilterFunc func(ctx context.Context, prompts []mcp.Prompt) []mcp.Prompt // ServerTool combines a Tool with its ToolHandlerFunc. type ServerTool struct { Tool mcp.Tool Handler ToolHandlerFunc } // ServerTaskTool combines a Tool with its TaskToolHandlerFunc. type ServerTaskTool struct { Tool mcp.Tool Handler TaskToolHandlerFunc } // ServerPrompt combines a Prompt with its handler function. type ServerPrompt struct { Prompt mcp.Prompt Handler PromptHandlerFunc } // ServerResource combines a Resource with its handler function. type ServerResource struct { Resource mcp.Resource Handler ResourceHandlerFunc } // ServerResourceTemplate combines a ResourceTemplate with its handler function. type ServerResourceTemplate struct { Template mcp.ResourceTemplate Handler ResourceTemplateHandlerFunc } // serverKey is the context key for storing the server instance type serverKey struct{} // ServerFromContext retrieves the MCPServer instance from a context func ( context.Context) *MCPServer { if , := .Value(serverKey{}).(*MCPServer); { return } return nil } // UnparsableMessageError is attached to the RequestError when json.Unmarshal // fails on the request. type UnparsableMessageError struct { message json.RawMessage method mcp.MCPMethod err error } func ( *UnparsableMessageError) () string { return fmt.Sprintf("unparsable %s request: %s", .method, .err) } func ( *UnparsableMessageError) () error { return .err } func ( *UnparsableMessageError) () json.RawMessage { return .message } func ( *UnparsableMessageError) () mcp.MCPMethod { return .method } // RequestError is an error that can be converted to a JSON-RPC error. // Implements Unwrap() to allow inspecting the error chain. type requestError struct { id any code int err error } func ( *requestError) () string { return fmt.Sprintf("request error: %s", .err) } func ( *requestError) () mcp.JSONRPCError { return mcp.JSONRPCError{ JSONRPC: mcp.JSONRPC_VERSION, ID: mcp.NewRequestId(.id), Error: mcp.NewJSONRPCErrorDetails(.code, .err.Error(), nil), } } func ( *requestError) () error { return .err } // NotificationHandlerFunc handles incoming notifications. type NotificationHandlerFunc func(ctx context.Context, notification mcp.JSONRPCNotification) // MCPServer implements a Model Context Protocol server that can handle various types of requests // including resources, prompts, and tools. type MCPServer struct { // Separate mutexes for different resource types resourcesMu sync.RWMutex resourceMiddlewareMu sync.RWMutex promptsMu sync.RWMutex toolsMu sync.RWMutex toolMiddlewareMu sync.RWMutex promptMiddlewareMu sync.RWMutex notificationHandlersMu sync.RWMutex capabilitiesMu sync.RWMutex toolFiltersMu sync.RWMutex promptFiltersMu sync.RWMutex tasksMu sync.RWMutex name string version string implementation mcp.Implementation instructions string resources map[string]resourceEntry resourceTemplates map[string]resourceTemplateEntry prompts map[string]mcp.Prompt promptHandlers map[string]PromptHandlerFunc tools map[string]ServerTool taskTools map[string]ServerTaskTool toolHandlerMiddlewares []ToolHandlerMiddleware resourceHandlerMiddlewares []ResourceHandlerMiddleware promptHandlerMiddlewares []PromptHandlerMiddleware toolFilters []ToolFilterFunc promptFilters []PromptFilterFunc notificationHandlers map[string]NotificationHandlerFunc promptCompletionProvider PromptCompletionProvider resourceCompletionProvider ResourceCompletionProvider capabilities serverCapabilities paginationLimit *int sessions sync.Map hooks *Hooks taskHooks *TaskHooks tasks map[string]*taskEntry expiredTasks map[string]time.Time // Tracks recently expired task IDs with expiration timestamp maxConcurrentTasks *int // Optional limit on concurrent running tasks activeTasks int // Current count of running (non-terminal) tasks inflightCancels sync.Map // Maps request ID -> context.CancelFunc for in-flight requests } // WithPaginationLimit sets the pagination limit for the server. func ( int) ServerOption { return func( *MCPServer) { .paginationLimit = & } } // serverCapabilities defines the supported features of the MCP server type serverCapabilities struct { tools *toolCapabilities resources *resourceCapabilities prompts *promptCapabilities logging *bool sampling *bool elicitation *bool roots *bool tasks *taskCapabilities completions *bool experimental map[string]any } // resourceCapabilities defines the supported resource-related features type resourceCapabilities struct { subscribe bool listChanged bool } // promptCapabilities defines the supported prompt-related features type promptCapabilities struct { listChanged bool } // toolCapabilities defines the supported tool-related features type toolCapabilities struct { listChanged bool } // taskCapabilities defines the supported task-related features type taskCapabilities struct { list bool cancel bool toolCallTasks bool } // WithResourceCapabilities configures resource-related server capabilities func (, bool) ServerOption { return func( *MCPServer) { // Always create a non-nil capability object .capabilities.resources = &resourceCapabilities{ subscribe: , listChanged: , } } } // WithPromptCompletionProvider sets a custom prompt completion provider func ( PromptCompletionProvider) ServerOption { return func( *MCPServer) { .promptCompletionProvider = } } // WithResourceCompletionProvider sets a custom resource completion provider func ( ResourceCompletionProvider) ServerOption { return func( *MCPServer) { .resourceCompletionProvider = } } // WithToolHandlerMiddleware allows adding a middleware for the // tool handler call chain. func ( ToolHandlerMiddleware, ) ServerOption { return func( *MCPServer) { .toolMiddlewareMu.Lock() .toolHandlerMiddlewares = append(.toolHandlerMiddlewares, ) .toolMiddlewareMu.Unlock() } } // Use adds one or more tool handler middlewares to the server. // Middleware is applied in the order added (outermost first), matching net/http convention. func ( *MCPServer) ( ...ToolHandlerMiddleware) { .toolMiddlewareMu.Lock() .toolHandlerMiddlewares = append(.toolHandlerMiddlewares, ...) .toolMiddlewareMu.Unlock() } // WithResourceHandlerMiddleware allows adding a middleware for the // resource handler call chain. func ( ResourceHandlerMiddleware, ) ServerOption { return func( *MCPServer) { .resourceMiddlewareMu.Lock() .resourceHandlerMiddlewares = append(.resourceHandlerMiddlewares, ) .resourceMiddlewareMu.Unlock() } } // WithResourceRecovery adds a middleware that recovers from panics in resource handlers. func () ServerOption { return WithResourceHandlerMiddleware(func( ResourceHandlerFunc) ResourceHandlerFunc { return func( context.Context, mcp.ReadResourceRequest) ( []mcp.ResourceContents, error) { defer func() { if := recover(); != nil { = fmt.Errorf( "panic recovered in %s resource handler: %v", .Params.URI, , ) } }() return (, ) } }) } // WithToolFilter adds a filter function that will be applied to tools before they are returned in list_tools func ( ToolFilterFunc, ) ServerOption { return func( *MCPServer) { .toolFiltersMu.Lock() .toolFilters = append(.toolFilters, ) .toolFiltersMu.Unlock() } } // WithPromptHandlerMiddleware allows adding a middleware for the // prompt handler call chain. func ( PromptHandlerMiddleware, ) ServerOption { return func( *MCPServer) { .promptMiddlewareMu.Lock() .promptHandlerMiddlewares = append(.promptHandlerMiddlewares, ) .promptMiddlewareMu.Unlock() } } // WithPromptFilter adds a filter function that will be applied to prompts before they are returned in list_prompts func ( PromptFilterFunc, ) ServerOption { return func( *MCPServer) { .promptFiltersMu.Lock() .promptFilters = append(.promptFilters, ) .promptFiltersMu.Unlock() } } // WithRecovery adds a middleware that recovers from panics in tool handlers. func () ServerOption { return WithToolHandlerMiddleware(func( ToolHandlerFunc) ToolHandlerFunc { return func( context.Context, mcp.CallToolRequest) ( *mcp.CallToolResult, error) { defer func() { if := recover(); != nil { = fmt.Errorf( "panic recovered in %s tool handler: %v", .Params.Name, , ) } }() return (, ) } }) } // WithHooks allows adding hooks that will be called before or after // either [all] requests or before / after specific request methods, or else // prior to returning an error to the client. func ( *Hooks) ServerOption { return func( *MCPServer) { .hooks = } } // GetHooks returns the server's current Hooks instance, or nil if no hooks // have been configured. The returned pointer can be used to add additional // hooks via the Add* methods without replacing existing hook registrations. func ( *MCPServer) () *Hooks { return .hooks } // WithTaskHooks allows adding hooks for task lifecycle events. // Use these hooks to monitor task execution, track metrics, and observe // task-augmented tool behavior. func ( *TaskHooks) ServerOption { return func( *MCPServer) { .taskHooks = } } // WithMaxConcurrentTasks sets a limit on the maximum number of concurrent running tasks. // When this limit is reached, attempts to create new tasks will fail with an error. // If not set (or set to 0), there is no limit on concurrent tasks. func ( int) ServerOption { return func( *MCPServer) { .maxConcurrentTasks = & } } // WithPromptCapabilities configures prompt-related server capabilities func ( bool) ServerOption { return func( *MCPServer) { // Always create a non-nil capability object .capabilities.prompts = &promptCapabilities{ listChanged: , } } } // WithToolCapabilities configures tool-related server capabilities func ( bool) ServerOption { return func( *MCPServer) { // Always create a non-nil capability object .capabilities.tools = &toolCapabilities{ listChanged: , } } } // WithLogging enables logging capabilities for the server func () ServerOption { return func( *MCPServer) { .capabilities.logging = mcp.ToBoolPtr(true) } } // WithElicitation enables elicitation capabilities for the server func () ServerOption { return func( *MCPServer) { .capabilities.elicitation = mcp.ToBoolPtr(true) } } // WithRoots returns a ServerOption that enables the roots capability on the MCPServer func () ServerOption { return func( *MCPServer) { .capabilities.roots = mcp.ToBoolPtr(true) } } // WithTaskCapabilities configures task-related server capabilities func (, , bool) ServerOption { return func( *MCPServer) { // Always create a non-nil capability object .capabilities.tasks = &taskCapabilities{ list: , cancel: , toolCallTasks: , } } } // WithInstructions sets the server instructions for the client returned in the initialize response func ( string) ServerOption { return func( *MCPServer) { .instructions = } } // WithCompletions enables the completion capability func () ServerOption { return func( *MCPServer) { .capabilities.completions = mcp.ToBoolPtr(true) } } // WithExperimental sets experimental, non-standard capabilities on the server. func ( map[string]any) ServerOption { return func( *MCPServer) { .capabilities.experimental = } } // WithIcons sets the server icons for the implementation metadata returned // during initialization. The icons slice and nested Sizes fields are defensively // copied to prevent external mutation. func ( ...mcp.Icon) ServerOption { return func( *MCPServer) { := make([]mcp.Icon, len()) for , := range { [] = if .Sizes != nil { [].Sizes = make([]string, len(.Sizes)) copy([].Sizes, .Sizes) } } .implementation.Icons = } } // WithTitle sets the human-readable display title for the server implementation. func ( string) ServerOption { return func( *MCPServer) { .implementation.Title = } } // WithDescription sets the description for the server implementation. func ( string) ServerOption { return func( *MCPServer) { .implementation.Description = } } // WithWebsiteURL sets the website URL for the server implementation. func ( string) ServerOption { return func( *MCPServer) { .implementation.WebsiteURL = } } // NewMCPServer creates a new MCP server instance with the given name, version and options func ( , string, ...ServerOption, ) *MCPServer { := &MCPServer{ resources: make(map[string]resourceEntry), resourceTemplates: make(map[string]resourceTemplateEntry), prompts: make(map[string]mcp.Prompt), promptHandlers: make(map[string]PromptHandlerFunc), tools: make(map[string]ServerTool), taskTools: make(map[string]ServerTaskTool), toolHandlerMiddlewares: make([]ToolHandlerMiddleware, 0), resourceHandlerMiddlewares: make([]ResourceHandlerMiddleware, 0), promptHandlerMiddlewares: make([]PromptHandlerMiddleware, 0), name: , version: , notificationHandlers: make(map[string]NotificationHandlerFunc), tasks: make(map[string]*taskEntry), expiredTasks: make(map[string]time.Time), promptCompletionProvider: &DefaultPromptCompletionProvider{}, resourceCompletionProvider: &DefaultResourceCompletionProvider{}, capabilities: serverCapabilities{ tools: nil, resources: nil, prompts: nil, logging: nil, sampling: nil, elicitation: nil, roots: nil, tasks: nil, completions: nil, }, } for , := range { () } return } // GenerateInProcessSessionID generates a unique session ID for inprocess clients func ( *MCPServer) () string { return GenerateInProcessSessionID() } // AddResources registers multiple resources at once func ( *MCPServer) ( ...ServerResource) { .implicitlyRegisterResourceCapabilities() .resourcesMu.Lock() for , := range { .resources[.Resource.URI] = resourceEntry{ resource: .Resource, handler: .Handler, } } .resourcesMu.Unlock() // When the list of available resources changes, servers that declared the listChanged capability SHOULD send a notification if .capabilities.resources.listChanged { // Send notification to all initialized sessions .SendNotificationToAllClients(mcp.MethodNotificationResourcesListChanged, nil) } } // SetResources replaces all existing resources with the provided list func ( *MCPServer) ( ...ServerResource) { .resourcesMu.Lock() .resources = make(map[string]resourceEntry, len()) .resourcesMu.Unlock() .AddResources(...) } // AddResource registers a new resource and its handler func ( *MCPServer) ( mcp.Resource, ResourceHandlerFunc, ) { .AddResources(ServerResource{Resource: , Handler: }) } // DeleteResources removes resources from the server func ( *MCPServer) ( ...string) { .resourcesMu.Lock() var bool for , := range { if , := .resources[]; { delete(.resources, ) = true } } .resourcesMu.Unlock() // Send notification to all initialized sessions if listChanged capability is enabled and we actually remove a resource if && .capabilities.resources != nil && .capabilities.resources.listChanged { .SendNotificationToAllClients(mcp.MethodNotificationResourcesListChanged, nil) } } // RemoveResource removes a resource from the server func ( *MCPServer) ( string) { .resourcesMu.Lock() , := .resources[] if { delete(.resources, ) } .resourcesMu.Unlock() // Send notification to all initialized sessions if listChanged capability is enabled and we actually remove a resource if && .capabilities.resources != nil && .capabilities.resources.listChanged { .SendNotificationToAllClients(mcp.MethodNotificationResourcesListChanged, nil) } } // AddResourceTemplates registers multiple resource templates at once func ( *MCPServer) ( ...ServerResourceTemplate) { .implicitlyRegisterResourceCapabilities() .resourcesMu.Lock() for , := range { .resourceTemplates[.Template.URITemplate.Raw()] = resourceTemplateEntry{ template: .Template, handler: .Handler, } } .resourcesMu.Unlock() // When the list of available resources changes, servers that declared the listChanged capability SHOULD send a notification if .capabilities.resources.listChanged { // Send notification to all initialized sessions .SendNotificationToAllClients(mcp.MethodNotificationResourcesListChanged, nil) } } // SetResourceTemplates replaces all existing resource templates with the provided list func ( *MCPServer) ( ...ServerResourceTemplate) { .resourcesMu.Lock() .resourceTemplates = make(map[string]resourceTemplateEntry, len()) .resourcesMu.Unlock() .AddResourceTemplates(...) } // AddResourceTemplate registers a new resource template and its handler func ( *MCPServer) ( mcp.ResourceTemplate, ResourceTemplateHandlerFunc, ) { .AddResourceTemplates(ServerResourceTemplate{Template: , Handler: }) } // AddPrompts registers multiple prompts at once func ( *MCPServer) ( ...ServerPrompt) { .implicitlyRegisterPromptCapabilities() .promptsMu.Lock() for , := range { .prompts[.Prompt.Name] = .Prompt .promptHandlers[.Prompt.Name] = .Handler } .promptsMu.Unlock() // When the list of available prompts changes, servers that declared the listChanged capability SHOULD send a notification. if .capabilities.prompts.listChanged { // Send notification to all initialized sessions .SendNotificationToAllClients(mcp.MethodNotificationPromptsListChanged, nil) } } // AddPrompt registers a new prompt handler with the given name func ( *MCPServer) ( mcp.Prompt, PromptHandlerFunc) { .AddPrompts(ServerPrompt{Prompt: , Handler: }) } // SetPrompts replaces all existing prompts with the provided list func ( *MCPServer) ( ...ServerPrompt) { .promptsMu.Lock() .prompts = make(map[string]mcp.Prompt, len()) .promptHandlers = make(map[string]PromptHandlerFunc, len()) .promptsMu.Unlock() .AddPrompts(...) } // DeletePrompts removes prompts from the server func ( *MCPServer) ( ...string) { .promptsMu.Lock() var bool for , := range { if , := .prompts[]; { delete(.prompts, ) delete(.promptHandlers, ) = true } } .promptsMu.Unlock() // Send notification to all initialized sessions if listChanged capability is enabled, and we actually remove a prompt if && .capabilities.prompts != nil && .capabilities.prompts.listChanged { // Send notification to all initialized sessions .SendNotificationToAllClients(mcp.MethodNotificationPromptsListChanged, nil) } } // AddTool registers a new tool and its handler func ( *MCPServer) ( mcp.Tool, ToolHandlerFunc) { .AddTools(ServerTool{Tool: , Handler: }) } // AddTaskTool registers a new task tool and its handler func ( *MCPServer) ( mcp.Tool, TaskToolHandlerFunc) { .AddTaskTools(ServerTaskTool{Tool: , Handler: }) } // Register tool capabilities due to a tool being added. Default to // listChanged: true, but don't change the value if we've already explicitly // registered tools.listChanged false. func ( *MCPServer) () { .implicitlyRegisterCapabilities( func() bool { return .capabilities.tools != nil }, func() { .capabilities.tools = &toolCapabilities{listChanged: true} }, ) } func ( *MCPServer) () { .implicitlyRegisterCapabilities( func() bool { return .capabilities.resources != nil }, func() { .capabilities.resources = &resourceCapabilities{} }, ) } func ( *MCPServer) () { .implicitlyRegisterCapabilities( func() bool { return .capabilities.prompts != nil }, func() { .capabilities.prompts = &promptCapabilities{} }, ) } func ( *MCPServer) ( func() bool, func()) { .capabilitiesMu.RLock() if () { .capabilitiesMu.RUnlock() return } .capabilitiesMu.RUnlock() .capabilitiesMu.Lock() if !() { () } .capabilitiesMu.Unlock() } // AddTools registers multiple tools at once func ( *MCPServer) ( ...ServerTool) { .implicitlyRegisterToolCapabilities() .toolsMu.Lock() for , := range { := .Tool.Name // Check for collision with task tools if , := .taskTools[]; { .toolsMu.Unlock() panic(fmt.Sprintf("tool name '%s' already registered as task tool", )) } .tools[] = } .toolsMu.Unlock() // When the list of available tools changes, servers that declared the listChanged capability SHOULD send a notification. if .capabilities.tools.listChanged { // Send notification to all initialized sessions .SendNotificationToAllClients(mcp.MethodNotificationToolsListChanged, nil) } } // AddTaskTools registers multiple task tools at once func ( *MCPServer) ( ...ServerTaskTool) { .implicitlyRegisterToolCapabilities() .toolsMu.Lock() for , := range { := .Tool.Name // Check for collision with regular tools if , := .tools[]; { .toolsMu.Unlock() panic(fmt.Sprintf("task tool name '%s' already registered as regular tool", )) } .taskTools[] = } .toolsMu.Unlock() // When the list of available tools changes, servers that declared the listChanged capability SHOULD send a notification. if .capabilities.tools.listChanged { // Send notification to all initialized sessions .SendNotificationToAllClients(mcp.MethodNotificationToolsListChanged, nil) } } // SetTools replaces all existing tools with the provided list func ( *MCPServer) ( ...ServerTool) { .toolsMu.Lock() .tools = make(map[string]ServerTool, len()) .toolsMu.Unlock() .AddTools(...) } // GetTool retrieves the specified tool func ( *MCPServer) ( string) *ServerTool { .toolsMu.RLock() defer .toolsMu.RUnlock() if , := .tools[]; { return & } return nil } func ( *MCPServer) () map[string]*ServerTool { .toolsMu.RLock() defer .toolsMu.RUnlock() if len(.tools) == 0 { return nil } // Create a copy to prevent external modification := make(map[string]*ServerTool, len(.tools)) for , := range .tools { [] = & } return } // DeleteTools removes tools from the server func ( *MCPServer) ( ...string) { .toolsMu.Lock() var bool for , := range { if , := .tools[]; { delete(.tools, ) = true } } .toolsMu.Unlock() // When the list of available tools changes, servers that declared the listChanged capability SHOULD send a notification. if && .capabilities.tools != nil && .capabilities.tools.listChanged { // Send notification to all initialized sessions .SendNotificationToAllClients(mcp.MethodNotificationToolsListChanged, nil) } } // AddNotificationHandler registers a new handler for incoming notifications func ( *MCPServer) ( string, NotificationHandlerFunc, ) { .notificationHandlersMu.Lock() defer .notificationHandlersMu.Unlock() .notificationHandlers[] = } func ( *MCPServer) ( context.Context, any, mcp.InitializeRequest, ) (*mcp.InitializeResult, *requestError) { := mcp.ServerCapabilities{} // Only add resource capabilities if they're configured if .capabilities.resources != nil { .Resources = &struct { bool `json:"subscribe,omitempty"` bool `json:"listChanged,omitempty"` }{ : .capabilities.resources.subscribe, : .capabilities.resources.listChanged, } } // Only add prompt capabilities if they're configured if .capabilities.prompts != nil { .Prompts = &struct { bool `json:"listChanged,omitempty"` }{ : .capabilities.prompts.listChanged, } } // Only add tool capabilities if they're configured if .capabilities.tools != nil { .Tools = &struct { bool `json:"listChanged,omitempty"` }{ : .capabilities.tools.listChanged, } } if .capabilities.logging != nil && *.capabilities.logging { .Logging = &struct{}{} } if .capabilities.sampling != nil && *.capabilities.sampling { .Sampling = &struct{}{} } if .capabilities.elicitation != nil && *.capabilities.elicitation { .Elicitation = &mcp.ElicitationCapability{} } if .capabilities.roots != nil && *.capabilities.roots { .Roots = &struct{}{} } // Only add task capabilities if they're configured if .capabilities.tasks != nil { := &mcp.TasksCapability{} if .capabilities.tasks.list { .List = &struct{}{} } if .capabilities.tasks.cancel { .Cancel = &struct{}{} } if .capabilities.tasks.toolCallTasks { .Requests = &mcp.TaskRequestsCapability{ Tools: &struct { *struct{} `json:"call,omitempty"` }{ : &struct{}{}, }, } } .Tasks = } if .capabilities.completions != nil && *.capabilities.completions { .Completions = &struct{}{} } if .capabilities.experimental != nil { .Experimental = .capabilities.experimental } := mcp.InitializeResult{ ProtocolVersion: .protocolVersion(.Params.ProtocolVersion), ServerInfo: mcp.Implementation{ Name: .name, Version: .version, Title: .implementation.Title, Description: .implementation.Description, WebsiteURL: .implementation.WebsiteURL, Icons: .implementation.Icons, }, Capabilities: , Instructions: .instructions, } if := ClientSessionFromContext(); != nil { .Initialize() // Store client info if the session supports it if , := .(SessionWithClientInfo); { .SetClientInfo(.Params.ClientInfo) .SetClientCapabilities(.Params.Capabilities) } } return &, nil } func ( *MCPServer) ( string) string { // For backwards compatibility, if the server does not receive an MCP-Protocol-Version header, // and has no other way to identify the version - for example, by relying on the protocol version negotiated // during initialization - the server SHOULD assume protocol version 2025-03-26 // https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#protocol-version-header if len() == 0 { = "2025-03-26" } if slices.Contains(mcp.ValidProtocolVersions, ) { return } return mcp.LATEST_PROTOCOL_VERSION } func ( *MCPServer) ( context.Context, any, mcp.PingRequest, ) (*mcp.EmptyResult, *requestError) { return &mcp.EmptyResult{}, nil } func ( *MCPServer) ( context.Context, any, mcp.SetLevelRequest, ) (*mcp.EmptyResult, *requestError) { := ClientSessionFromContext() if == nil || !.Initialized() { return nil, &requestError{ id: , code: mcp.INTERNAL_ERROR, err: ErrSessionNotInitialized, } } , := .(SessionWithLogging) if ! { return nil, &requestError{ id: , code: mcp.INTERNAL_ERROR, err: ErrSessionDoesNotSupportLogging, } } := .Params.Level // Validate logging level switch { case mcp.LoggingLevelDebug, mcp.LoggingLevelInfo, mcp.LoggingLevelNotice, mcp.LoggingLevelWarning, mcp.LoggingLevelError, mcp.LoggingLevelCritical, mcp.LoggingLevelAlert, mcp.LoggingLevelEmergency: // Valid level default: return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: fmt.Errorf("invalid logging level '%s'", ), } } .SetLogLevel() return &mcp.EmptyResult{}, nil } func listByPagination[ mcp.Named]( context.Context, *MCPServer, mcp.Cursor, [], ) ([], mcp.Cursor, error) { := 0 if != "" { , := base64.StdEncoding.DecodeString(string()) if != nil { return nil, "", } := string() = sort.Search(len(), func( int) bool { return [].GetName() > }) } := len() if .paginationLimit != nil { if len() > +*.paginationLimit { = + *.paginationLimit } } := [:] // set the next cursor := func() mcp.Cursor { if .paginationLimit != nil && len() >= *.paginationLimit { := [len()-1].GetName() := base64.StdEncoding.EncodeToString([]byte()) return mcp.Cursor() } return "" }() return , , nil } func ( *MCPServer) ( context.Context, any, mcp.ListResourcesRequest, ) (*mcp.ListResourcesResult, *requestError) { .resourcesMu.RLock() := make(map[string]mcp.Resource, len(.resources)) for , := range .resources { [] = .resource } .resourcesMu.RUnlock() // Check if there are session-specific resources := ClientSessionFromContext() if != nil { if , := .(SessionWithResources); { if := .GetSessionResources(); != nil { // Merge session-specific resources with global resources for , := range { [] = .Resource } } } } // Sort the resources by name := slices.SortedFunc(maps.Values(), func(, mcp.Resource) int { return cmp.Compare(.Name, .Name) }) // Apply pagination , , := listByPagination( , , .Params.Cursor, , ) if != nil { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: , } } if == nil { = []mcp.Resource{} } := mcp.ListResourcesResult{ Resources: , PaginatedResult: mcp.PaginatedResult{ NextCursor: , }, } return &, nil } func ( *MCPServer) ( context.Context, any, mcp.ListResourceTemplatesRequest, ) (*mcp.ListResourceTemplatesResult, *requestError) { // Get global templates .resourcesMu.RLock() := make(map[string]mcp.ResourceTemplate, len(.resourceTemplates)) for , := range .resourceTemplates { [] = .template } .resourcesMu.RUnlock() // Check if there are session-specific resource templates := ClientSessionFromContext() if != nil { if , := .(SessionWithResourceTemplates); { if := .GetSessionResourceTemplates(); != nil { // Merge session-specific templates with global templates // Session templates override global ones for , := range { [] = .Template } } } } // Convert map to slice for sorting and pagination := make([]mcp.ResourceTemplate, 0, len()) for , := range { = append(, ) } sort.Slice(, func(, int) bool { return [].Name < [].Name }) , , := listByPagination( , , .Params.Cursor, , ) if != nil { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: , } } := mcp.ListResourceTemplatesResult{ ResourceTemplates: , PaginatedResult: mcp.PaginatedResult{ NextCursor: , }, } return &, nil } func ( *MCPServer) ( context.Context, any, mcp.ReadResourceRequest, ) (*mcp.ReadResourceResult, *requestError) { .resourcesMu.RLock() // First check session-specific resources var ResourceHandlerFunc var bool := ClientSessionFromContext() if != nil { if , := .(SessionWithResources); { if := .GetSessionResources(); != nil { , := [.Params.URI] if { = .Handler = true } } } } // If not found in session tools, check global tools if ! { , := .resources[.Params.URI] if { = .handler = true } } // First try direct resource handlers if { .resourcesMu.RUnlock() := .resourceMiddlewareMu.RLock() := .resourceHandlerMiddlewares // Apply middlewares in reverse order for := len() - 1; >= 0; -- { = []() } .resourceMiddlewareMu.RUnlock() , := (, ) if != nil { return nil, &requestError{ id: , code: mcp.INTERNAL_ERROR, err: , } } return &mcp.ReadResourceResult{Contents: }, nil } // If no direct handler found, try matching against templates var ResourceTemplateHandlerFunc var bool // First check session templates if available if != nil { if , := .(SessionWithResourceTemplates); { := .GetSessionResourceTemplates() for , := range { if .Template.URITemplate == nil { continue } if matchesTemplate(.Params.URI, .Template.URITemplate) { = .Handler = true := .Template.URITemplate.Match(.Params.URI) // Convert matched variables to a map .Params.Arguments = make(map[string]any, len()) for , := range { .Params.Arguments[] = .V } break } } } } // If not found in session templates, check global templates if ! { for , := range .resourceTemplates { := .template if .URITemplate == nil { continue } if matchesTemplate(.Params.URI, .URITemplate) { = .handler = true := .URITemplate.Match(.Params.URI) // Convert matched variables to a map .Params.Arguments = make(map[string]any, len()) for , := range { .Params.Arguments[] = .V } break } } } .resourcesMu.RUnlock() if { // If a match is found, then we have a final handler and can // apply middlewares. .resourceMiddlewareMu.RLock() := ResourceHandlerFunc() := .resourceHandlerMiddlewares // Apply middlewares in reverse order for := len() - 1; >= 0; -- { = []() } .resourceMiddlewareMu.RUnlock() , := (, ) if != nil { return nil, &requestError{ id: , code: mcp.INTERNAL_ERROR, err: , } } return &mcp.ReadResourceResult{Contents: }, nil } return nil, &requestError{ id: , code: mcp.RESOURCE_NOT_FOUND, err: fmt.Errorf( "handler not found for resource URI '%s': %w", .Params.URI, ErrResourceNotFound, ), } } // matchesTemplate checks if a URI matches a URI template pattern func matchesTemplate( string, *mcp.URITemplate) bool { return .Regexp().MatchString() } func ( *MCPServer) ( context.Context, any, mcp.ListPromptsRequest, ) (*mcp.ListPromptsResult, *requestError) { .promptsMu.RLock() := make([]mcp.Prompt, 0, len(.prompts)) for , := range .prompts { = append(, ) } .promptsMu.RUnlock() // sort prompts by name sort.Slice(, func(, int) bool { return [].Name < [].Name }) // Apply prompt filters if any are defined .promptFiltersMu.RLock() if len(.promptFilters) > 0 { for , := range .promptFilters { = (, ) } } .promptFiltersMu.RUnlock() , , := listByPagination( , , .Params.Cursor, , ) if != nil { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: , } } := mcp.ListPromptsResult{ Prompts: , PaginatedResult: mcp.PaginatedResult{ NextCursor: , }, } return &, nil } func ( *MCPServer) ( context.Context, any, mcp.GetPromptRequest, ) (*mcp.GetPromptResult, *requestError) { .promptsMu.RLock() , := .promptHandlers[.Params.Name] .promptsMu.RUnlock() if ! { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: fmt.Errorf("prompt '%s' not found: %w", .Params.Name, ErrPromptNotFound), } } := .promptMiddlewareMu.RLock() := .promptHandlerMiddlewares // Apply middlewares in reverse order for := len() - 1; >= 0; -- { = []() } .promptMiddlewareMu.RUnlock() , := (, ) if != nil { return nil, &requestError{ id: , code: mcp.INTERNAL_ERROR, err: , } } return , nil } func ( *MCPServer) ( context.Context, any, mcp.ListToolsRequest, ) (*mcp.ListToolsResult, *requestError) { // Get the base tools from the server (both regular and task tools) .toolsMu.RLock() := make([]mcp.Tool, 0, len(.tools)+len(.taskTools)) // Get all tool names for consistent ordering := make([]string, 0, len(.tools)+len(.taskTools)) for := range .tools { = append(, ) } for := range .taskTools { = append(, ) } // Sort the tool names for consistent ordering sort.Strings() // Add tools in sorted order for , := range { if , := .tools[]; { = append(, .Tool) } else if , := .taskTools[]; { = append(, .Tool) } } .toolsMu.RUnlock() // Check if there are session-specific tools := ClientSessionFromContext() if != nil { if , := .(SessionWithTools); { if := .GetSessionTools(); != nil { // Override or add session-specific tools // We need to create a map first to merge the tools properly := make(map[string]mcp.Tool) // Add global tools first for , := range { [.Name] = } // Then override with session-specific tools for , := range { [] = .Tool } // Convert back to slice = make([]mcp.Tool, 0, len()) for , := range { = append(, ) } // Sort again to maintain consistent ordering sort.Slice(, func(, int) bool { return [].Name < [].Name }) } } } // Apply tool filters if any are defined .toolFiltersMu.RLock() if len(.toolFilters) > 0 { for , := range .toolFilters { = (, ) } } .toolFiltersMu.RUnlock() // Apply pagination , , := listByPagination( , , .Params.Cursor, , ) if != nil { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: , } } := mcp.ListToolsResult{ Tools: , PaginatedResult: mcp.PaginatedResult{ NextCursor: , }, } return &, nil } func ( *MCPServer) ( context.Context, any, mcp.CallToolRequest, ) (any, *requestError) { // First check session-specific tools var ServerTool var bool var bool := ClientSessionFromContext() if != nil { if , := .(SessionWithTools); { if := .GetSessionTools(); != nil { var bool , = [.Params.Name] if { = true } } } } // If not found in session tools, check global tools if ! { .toolsMu.RLock() , = .tools[.Params.Name] // If not in regular tools, check task tools if ! { if , := .taskTools[.Params.Name]; { // Convert ServerTaskTool to ServerTool for validation // The tool metadata is the same, we just need it for checking task support = ServerTool{ Tool: .Tool, Handler: nil, // Handler will be used from taskTool in handleTaskAugmentedToolCall } = true = true } } .toolsMu.RUnlock() } if ! { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: fmt.Errorf("tool '%s' not found: %w", .Params.Name, ErrToolNotFound), } } // Validate task support requirements if .Tool.Execution != nil && .Tool.Execution.TaskSupport == mcp.TaskSupportRequired { if .Params.Task == nil { return nil, &requestError{ id: , code: mcp.METHOD_NOT_FOUND, err: fmt.Errorf("tool '%s' requires task augmentation", .Params.Name), } } } // Check if this should be executed as a task (hybrid mode support) // Tools with TaskSupportOptional or TaskSupportRequired can be executed as tasks := .Params.Task != nil && .Tool.Execution != nil && (.Tool.Execution.TaskSupport == mcp.TaskSupportOptional || .Tool.Execution.TaskSupport == mcp.TaskSupportRequired) if { // Route to task-augmented execution handler return .handleTaskAugmentedToolCall(, , ) } if { return nil, &requestError{ id: , code: mcp.METHOD_NOT_FOUND, err: fmt.Errorf("tool '%s' does not support synchronous execution", .Params.Name), } } := .Handler .toolMiddlewareMu.RLock() := .toolHandlerMiddlewares // Apply middlewares in reverse order for := len() - 1; >= 0; -- { = []() } .toolMiddlewareMu.RUnlock() , := (, ) if != nil { return nil, &requestError{ id: , code: mcp.INTERNAL_ERROR, err: , } } return , nil } // handleTaskAugmentedToolCall handles tool calls that are executed as tasks. // It creates a task entry, starts async execution, and returns CreateTaskResult immediately. func ( *MCPServer) ( context.Context, any, mcp.CallToolRequest, ) (*mcp.CreateTaskResult, *requestError) { // Look up the tool - check both taskTools and regular tools .toolsMu.RLock() , := .taskTools[.Params.Name] , := .tools[.Params.Name] .toolsMu.RUnlock() // Determine which tool to use and validate task support var ServerTaskTool var bool if { // Tool is registered as a task tool = = true } else if { // Tool is a regular tool with task support // Validate that it actually supports task augmentation if .Tool.Execution == nil || (.Tool.Execution.TaskSupport != mcp.TaskSupportOptional && .Tool.Execution.TaskSupport != mcp.TaskSupportRequired) { return nil, &requestError{ id: , code: mcp.METHOD_NOT_FOUND, err: fmt.Errorf("tool '%s' does not support task augmentation", .Params.Name), } } = false } else { // Tool not found in either map return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: fmt.Errorf("tool '%s' not found", .Params.Name), } } // Generate task ID (UUID v4) := uuid.New().String() // Extract TTL from task params var *int64 if .Params.Task != nil { = .Params.Task.TTL } // Create task entry (pollInterval is nil - server doesn't set a default) , := .createTask(, , .Params.Name, , nil) if != nil { return nil, &requestError{ id: , code: mcp.INTERNAL_ERROR, err: , } } // Execute tool asynchronously // For regular tools being used as tasks, we need different execution logic if { go .executeTaskTool(, , , ) } else { // Execute regular tool wrapped as a task go .executeRegularToolAsTask(, , , ) } // Return CreateTaskResult immediately with task as top-level field // Make a copy of the task to avoid data races with background goroutine .tasksMu.RLock() := .task .tasksMu.RUnlock() return &mcp.CreateTaskResult{ Task: , }, nil } // executeTaskTool executes a task tool handler asynchronously. // It creates a cancellable context, stores the cancel function for potential cancellation, // and executes the handler in the background, storing the result when complete. func ( *MCPServer) ( context.Context, *taskEntry, ServerTaskTool, mcp.CallToolRequest, ) { // Create cancellable context for this task execution , := context.WithCancel() defer () // Store cancel func in entry so it can be cancelled via tasks/cancel .tasksMu.Lock() .cancelFunc = .tasksMu.Unlock() // Execute the task tool handler , := .Handler(, ) if != nil { // If the error is due to context cancellation, don't mark as failed. // The cancelTask method will handle setting the proper status. // However, if cancelTask hasn't been called yet, we should still mark it. if errors.Is(, context.Canceled) || errors.Is(, context.DeadlineExceeded) { // Check if task was already cancelled via tasks/cancel .tasksMu.Lock() := .task.Status == mcp.TaskStatusCancelled .tasksMu.Unlock() if ! { // Handler detected cancellation before tasks/cancel was called // Mark as cancelled with the context error message := time.Now() := .Sub(.createdAt) .tasksMu.Lock() if !.completed { .task.Status = mcp.TaskStatusCancelled .task.StatusMessage = .Error() .task.LastUpdatedAt = .UTC().Format(time.RFC3339) .completed = true close(.done) // Decrement active tasks counter .activeTasks-- .sendTaskStatusNotification(.task) // Fire task cancellation hook if .taskHooks != nil { := TaskMetrics{ TaskID: .task.TaskId, ToolName: .toolName, Status: .task.Status, StatusMessage: .task.StatusMessage, CreatedAt: .createdAt, CompletedAt: &, Duration: , SessionID: .sessionID, } .taskHooks.taskCancelled(, ) } } .tasksMu.Unlock() } return } // Task failed - complete with error .completeTask(, nil, ) return } // Task succeeded - store the CreateTaskResult // Note: The actual result will be retrieved later via tasks/result .completeTask(, , nil) } // executeRegularToolAsTask executes a regular tool handler asynchronously as a task. // This is used for hybrid mode where a tool with TaskSupportOptional is called with task params. func ( *MCPServer) ( context.Context, *taskEntry, ServerTool, mcp.CallToolRequest, ) { // Create cancellable context for this task execution , := context.WithCancel() defer () // Store cancel func in entry so it can be cancelled via tasks/cancel .tasksMu.Lock() .cancelFunc = .tasksMu.Unlock() // Execute the regular tool handler with middleware applied := .Handler .toolMiddlewareMu.RLock() := .toolHandlerMiddlewares for := len() - 1; >= 0; -- { = []() } .toolMiddlewareMu.RUnlock() , := (, ) if != nil { // If the error is due to context cancellation, don't mark as failed. // The cancelTask method will handle setting the proper status. // However, if cancelTask hasn't been called yet, we should still mark it. if errors.Is(, context.Canceled) || errors.Is(, context.DeadlineExceeded) { // Check if task was already cancelled via tasks/cancel .tasksMu.Lock() := .task.Status == mcp.TaskStatusCancelled .tasksMu.Unlock() if ! { // Handler detected cancellation before tasks/cancel was called // Mark as cancelled with the context error message := time.Now() := .Sub(.createdAt) .tasksMu.Lock() if !.completed { .task.Status = mcp.TaskStatusCancelled .task.StatusMessage = .Error() .task.LastUpdatedAt = .UTC().Format(time.RFC3339) .completed = true close(.done) // Decrement active tasks counter .activeTasks-- .sendTaskStatusNotification(.task) // Fire task cancellation hook if .taskHooks != nil { := TaskMetrics{ TaskID: .task.TaskId, ToolName: .toolName, Status: .task.Status, StatusMessage: .task.StatusMessage, CreatedAt: .createdAt, CompletedAt: &, Duration: , SessionID: .sessionID, } .taskHooks.taskCancelled(, ) } } .tasksMu.Unlock() } return } // Task failed - complete with error .completeTask(, nil, ) return } // Task succeeded - store the CallToolResult directly // When retrieved via tasks/result, this will be returned to the client .completeTask(, , nil) } func ( *MCPServer) ( context.Context, mcp.JSONRPCNotification, ) mcp.JSONRPCMessage { // Handle cancellation notifications per MCP spec if .Method == "notifications/cancelled" { if , := .Params.AdditionalFields["requestId"]; { := inflightKey(, ) if , := .inflightCancels.LoadAndDelete(); { if , := .(context.CancelFunc); { () } } } return nil } .notificationHandlersMu.RLock() , := .notificationHandlers[.Method] .notificationHandlersMu.RUnlock() if { (, ) } return nil } // inflightKey returns a session-scoped key for the inflight cancellation map. // This prevents cross-session request ID collisions in multi-client scenarios. func inflightKey( context.Context, any) string { if := ClientSessionFromContext(); != nil { return fmt.Sprintf("%s:%v", .SessionID(), ) } return fmt.Sprintf(":%v", ) } func createResponse( any, any) mcp.JSONRPCMessage { return mcp.NewJSONRPCResultResponse(mcp.NewRequestId(), ) } func createErrorResponse( any, int, string, ) mcp.JSONRPCMessage { return mcp.JSONRPCError{ JSONRPC: mcp.JSONRPC_VERSION, ID: mcp.NewRequestId(), Error: mcp.NewJSONRPCErrorDetails(, , nil), } } // // Task Request Handlers // // handleGetTask handles tasks/get requests to retrieve task status. func ( *MCPServer) ( context.Context, any, mcp.GetTaskRequest, ) (*mcp.GetTaskResult, *requestError) { , , := .getTask(, .Params.TaskId) if != nil { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: , } } := mcp.NewGetTaskResult() return &, nil } // handleListTasks handles tasks/list requests to list all tasks. func ( *MCPServer) ( context.Context, any, mcp.ListTasksRequest, ) (*mcp.ListTasksResult, *requestError) { := .listTasks() // Sort tasks by TaskId for consistent pagination sort.Slice(, func(, int) bool { return [].TaskId < [].TaskId }) // Apply pagination , , := listByPagination( , , .Params.Cursor, , ) if != nil { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: , } } := mcp.ListTasksResult{ Tasks: , PaginatedResult: mcp.PaginatedResult{ NextCursor: , }, } return &, nil } // handleTaskResult handles tasks/result requests to get task results. func ( *MCPServer) ( context.Context, any, mcp.TaskResultRequest, ) (*mcp.TaskResultResult, *requestError) { , , := .getTask(, .Params.TaskId) if != nil { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: , } } // Wait for task completion if not terminal if !.Status.IsTerminal() { select { case <-: // Task completed case <-.Done(): return nil, &requestError{ id: , code: mcp.REQUEST_INTERRUPTED, err: .Err(), } } } // Re-fetch the task entry to get the final result/error under lock , := .getTaskEntry(, .Params.TaskId) if != nil { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: , } } // Read result and error under lock .tasksMu.RLock() := .result := .resultErr := .task.TaskId .tasksMu.RUnlock() // Return error if task failed if != nil { return nil, &requestError{ id: , code: mcp.INTERNAL_ERROR, err: , } } // Extract the CallToolResult and populate TaskResultResult := &mcp.TaskResultResult{ Result: mcp.Result{ Meta: mcp.WithRelatedTask(), }, } switch taskResult := .(type) { case *mcp.CallToolResult: .Content = .Content .StructuredContent = .StructuredContent .IsError = .IsError mergeTaskResultMeta(, .Meta) case *mcp.CreateTaskResult: .Content = .Content .StructuredContent = .StructuredContent .IsError = .IsError mergeTaskResultMeta(, .Meta) } return , nil } func mergeTaskResultMeta( *mcp.TaskResultResult, *mcp.Meta) { if == nil { return } if .Meta.AdditionalFields == nil { .Meta.AdditionalFields = make(map[string]any) } for , := range .AdditionalFields { if != mcp.RelatedTaskMetaKey { .Meta.AdditionalFields[] = } } } // handleCancelTask handles tasks/cancel requests to cancel a task. func ( *MCPServer) ( context.Context, any, mcp.CancelTaskRequest, ) (*mcp.CancelTaskResult, *requestError) { := .cancelTask(, .Params.TaskId) if != nil { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: , } } // Get the updated task , , := .getTask(, .Params.TaskId) if != nil { return nil, &requestError{ id: , code: mcp.INVALID_PARAMS, err: , } } := mcp.NewCancelTaskResult() return &, nil } func ( *MCPServer) ( context.Context, any, mcp.CompleteRequest, ) (*mcp.CompleteResult, *requestError) { var *mcp.Completion var error switch ref := .Params.Ref.(type) { case mcp.PromptReference: , = .promptCompletionProvider.CompletePromptArgument( , .Name, .Params.Argument, .Params.Context, ) case mcp.ResourceReference: , = .resourceCompletionProvider.CompleteResourceArgument( , .URI, .Params.Argument, .Params.Context, ) default: return nil, &requestError{ id: , code: mcp.INVALID_REQUEST, err: fmt.Errorf("unknown reference type: %v", ), } } if != nil { return nil, &requestError{ id: , code: mcp.INTERNAL_ERROR, err: , } } // Defensive nil check: default providers always return non-nil completions, // but custom providers might erroneously return nil. Treat as empty result. if == nil { return &mcp.CompleteResult{}, nil } return &mcp.CompleteResult{ Completion: *, }, nil } // // Task Management Methods // // createTask creates a new task entry and returns it. // Returns an error if the max concurrent tasks limit is exceeded. func ( *MCPServer) ( context.Context, string, string, *int64, *int64) (*taskEntry, error) { // Build task entry first (no lock needed) := []mcp.TaskOption{} if != nil { = append(, mcp.WithTaskTTL(*)) } if != nil { = append(, mcp.WithTaskPollInterval(*)) } := mcp.NewTask(, ...) := time.Now() := &taskEntry{ task: , sessionID: getSessionID(), toolName: , createdAt: , done: make(chan struct{}), } // Single critical section for check + increment + insert .tasksMu.Lock() defer .tasksMu.Unlock() // Check concurrent task limit if .maxConcurrentTasks != nil && *.maxConcurrentTasks > 0 { if .activeTasks >= *.maxConcurrentTasks { return nil, fmt.Errorf("max concurrent tasks limit reached (%d)", *.maxConcurrentTasks) } } // Increment active task counter and insert task atomically .activeTasks++ .tasks[] = // Fire task created hook if .taskHooks != nil { := TaskMetrics{ TaskID: , ToolName: , Status: .Status, CreatedAt: , SessionID: getSessionID(), } .taskHooks.taskCreated(, ) } // Start TTL cleanup if specified if != nil && * > 0 { go .scheduleTaskCleanup(, *) } return , nil } // getTask retrieves a task by ID, checking session isolation if applicable. // Returns a copy of the task and the done channel for waiting on completion. func ( *MCPServer) ( context.Context, string) (mcp.Task, chan struct{}, error) { .tasksMu.RLock() , := .tasks[] if ! { // Check if this task was recently expired if , := .expiredTasks[]; { .tasksMu.RUnlock() return mcp.Task{}, nil, fmt.Errorf("task has expired") } .tasksMu.RUnlock() return mcp.Task{}, nil, fmt.Errorf("task not found") } // Verify session isolation := getSessionID() if .sessionID != "" && != "" && .sessionID != { .tasksMu.RUnlock() return mcp.Task{}, nil, fmt.Errorf("task not found") } // Return a copy of the task and the done channel := .task := .done .tasksMu.RUnlock() return , , nil } // getTaskEntry retrieves the raw task entry for internal use (requires caller to handle synchronization). func ( *MCPServer) ( context.Context, string) (*taskEntry, error) { .tasksMu.RLock() , := .tasks[] if ! { // Check if this task was recently expired if , := .expiredTasks[]; { .tasksMu.RUnlock() return nil, fmt.Errorf("task has expired") } .tasksMu.RUnlock() return nil, fmt.Errorf("task not found") } .tasksMu.RUnlock() // Verify session isolation := getSessionID() if .sessionID != "" && != "" && .sessionID != { return nil, fmt.Errorf("task not found") } return , nil } // listTasks returns copies of all tasks for the current session. func ( *MCPServer) ( context.Context) []mcp.Task { := getSessionID() .tasksMu.RLock() defer .tasksMu.RUnlock() var []mcp.Task for , := range .tasks { // Filter by session if applicable if == "" || .sessionID == "" || .sessionID == { = append(, .task) } } return } // completeTask marks a task as completed with the given result. func ( *MCPServer) ( *taskEntry, any, error) { .tasksMu.Lock() defer .tasksMu.Unlock() // Guard against double completion if .completed { return } := time.Now() := .Sub(.createdAt) if != nil { .task.Status = mcp.TaskStatusFailed .task.StatusMessage = .Error() .resultErr = } else { .task.Status = mcp.TaskStatusCompleted .result = } // Update the lastUpdatedAt timestamp .task.LastUpdatedAt = .UTC().Format(time.RFC3339) // Mark as completed and signal .completed = true close(.done) // Decrement active tasks counter .activeTasks-- // Send task status notification .sendTaskStatusNotification(.task) // Fire task hooks if .taskHooks != nil { := TaskMetrics{ TaskID: .task.TaskId, ToolName: .toolName, Status: .task.Status, StatusMessage: .task.StatusMessage, CreatedAt: .createdAt, CompletedAt: &, Duration: , SessionID: .sessionID, Error: , } if != nil { .taskHooks.taskFailed(context.Background(), ) } else { .taskHooks.taskCompleted(context.Background(), ) } } } // cancelTask cancels a running task. func ( *MCPServer) ( context.Context, string) error { , := .getTaskEntry(, ) if != nil { return } .tasksMu.Lock() defer .tasksMu.Unlock() // Don't allow cancelling already completed tasks if .completed { return fmt.Errorf("cannot cancel task in terminal status: %s", .task.Status) } // Cancel the context if available if .cancelFunc != nil { .cancelFunc() } := time.Now() := .Sub(.createdAt) .task.Status = mcp.TaskStatusCancelled .task.StatusMessage = "Task cancelled by request" // Update the lastUpdatedAt timestamp .task.LastUpdatedAt = .UTC().Format(time.RFC3339) // Mark as completed and signal .completed = true close(.done) // Decrement active tasks counter .activeTasks-- // Send task status notification .sendTaskStatusNotification(.task) // Fire task cancellation hook if .taskHooks != nil { := TaskMetrics{ TaskID: .task.TaskId, ToolName: .toolName, Status: .task.Status, StatusMessage: .task.StatusMessage, CreatedAt: .createdAt, CompletedAt: &, Duration: , SessionID: .sessionID, } .taskHooks.taskCancelled(, ) } return nil } // scheduleTaskCleanup schedules a task for cleanup after its TTL expires. func ( *MCPServer) ( string, int64) { time.Sleep(time.Duration() * time.Millisecond) .tasksMu.Lock() delete(.tasks, ) // Record that this task expired for better error messages // Keep the tombstone for 5 minutes to allow clients to distinguish // between "not found" and "expired" .expiredTasks[] = time.Now() .tasksMu.Unlock() // Clean up the tombstone after 5 minutes go func() { time.Sleep(5 * time.Minute) .tasksMu.Lock() delete(.expiredTasks, ) .tasksMu.Unlock() }() } // sendTaskStatusNotification sends a notification when a task's status changes. func ( *MCPServer) ( mcp.Task) { // Convert task to map[string]any for notification params := map[string]any{ "taskId": .TaskId, "status": .Status, "createdAt": .CreatedAt, "lastUpdatedAt": .LastUpdatedAt, } if .StatusMessage != "" { ["statusMessage"] = .StatusMessage } if .TTL != nil { ["ttl"] = *.TTL } if .PollInterval != nil { ["pollInterval"] = *.PollInterval } .SendNotificationToAllClients(mcp.MethodNotificationTasksStatus, ) } // getSessionID extracts the session ID from the context. func getSessionID( context.Context) string { if := ClientSessionFromContext(); != nil { return .SessionID() } return "" }