package server
import (
"cmp"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"maps"
"slices"
"sort"
"sync"
"time"
"github.com/google/uuid"
"github.com/mark3labs/mcp-go/mcp"
)
type resourceEntry struct {
resource mcp .Resource
handler ResourceHandlerFunc
}
type resourceTemplateEntry struct {
template mcp .ResourceTemplate
handler ResourceTemplateHandlerFunc
}
type taskEntry struct {
task mcp .Task
sessionID string
toolName string
createdAt time .Time
result any
resultErr error
cancelFunc context .CancelFunc
done chan struct {}
completed bool
}
type ServerOption func (*MCPServer )
type ResourceHandlerFunc func (ctx context .Context , request mcp .ReadResourceRequest ) ([]mcp .ResourceContents , error )
type ResourceTemplateHandlerFunc func (ctx context .Context , request mcp .ReadResourceRequest ) ([]mcp .ResourceContents , error )
type PromptHandlerFunc func (ctx context .Context , request mcp .GetPromptRequest ) (*mcp .GetPromptResult , error )
type ToolHandlerFunc func (ctx context .Context , request mcp .CallToolRequest ) (*mcp .CallToolResult , error )
type TaskToolHandlerFunc func (ctx context .Context , request mcp .CallToolRequest ) (*mcp .CreateTaskResult , error )
type ToolHandlerMiddleware func (ToolHandlerFunc ) ToolHandlerFunc
type ResourceHandlerMiddleware func (ResourceHandlerFunc ) ResourceHandlerFunc
type ToolFilterFunc func (ctx context .Context , tools []mcp .Tool ) []mcp .Tool
type PromptHandlerMiddleware func (PromptHandlerFunc ) PromptHandlerFunc
type PromptFilterFunc func (ctx context .Context , prompts []mcp .Prompt ) []mcp .Prompt
type ServerTool struct {
Tool mcp .Tool
Handler ToolHandlerFunc
}
type ServerTaskTool struct {
Tool mcp .Tool
Handler TaskToolHandlerFunc
}
type ServerPrompt struct {
Prompt mcp .Prompt
Handler PromptHandlerFunc
}
type ServerResource struct {
Resource mcp .Resource
Handler ResourceHandlerFunc
}
type ServerResourceTemplate struct {
Template mcp .ResourceTemplate
Handler ResourceTemplateHandlerFunc
}
type serverKey struct {}
func ServerFromContext (ctx context .Context ) *MCPServer {
if srv , ok := ctx .Value (serverKey {}).(*MCPServer ); ok {
return srv
}
return nil
}
type UnparsableMessageError struct {
message json .RawMessage
method mcp .MCPMethod
err error
}
func (e *UnparsableMessageError ) Error () string {
return fmt .Sprintf ("unparsable %s request: %s" , e .method , e .err )
}
func (e *UnparsableMessageError ) Unwrap () error {
return e .err
}
func (e *UnparsableMessageError ) GetMessage () json .RawMessage {
return e .message
}
func (e *UnparsableMessageError ) GetMethod () mcp .MCPMethod {
return e .method
}
type requestError struct {
id any
code int
err error
}
func (e *requestError ) Error () string {
return fmt .Sprintf ("request error: %s" , e .err )
}
func (e *requestError ) ToJSONRPCError () mcp .JSONRPCError {
return mcp .JSONRPCError {
JSONRPC : mcp .JSONRPC_VERSION ,
ID : mcp .NewRequestId (e .id ),
Error : mcp .NewJSONRPCErrorDetails (e .code , e .err .Error(), nil ),
}
}
func (e *requestError ) Unwrap () error {
return e .err
}
type NotificationHandlerFunc func (ctx context .Context , notification mcp .JSONRPCNotification )
type MCPServer struct {
resourcesMu sync .RWMutex
resourceMiddlewareMu sync .RWMutex
promptsMu sync .RWMutex
toolsMu sync .RWMutex
toolMiddlewareMu sync .RWMutex
promptMiddlewareMu sync .RWMutex
notificationHandlersMu sync .RWMutex
capabilitiesMu sync .RWMutex
toolFiltersMu sync .RWMutex
promptFiltersMu sync .RWMutex
tasksMu sync .RWMutex
name string
version string
implementation mcp .Implementation
instructions string
resources map [string ]resourceEntry
resourceTemplates map [string ]resourceTemplateEntry
prompts map [string ]mcp .Prompt
promptHandlers map [string ]PromptHandlerFunc
tools map [string ]ServerTool
taskTools map [string ]ServerTaskTool
toolHandlerMiddlewares []ToolHandlerMiddleware
resourceHandlerMiddlewares []ResourceHandlerMiddleware
promptHandlerMiddlewares []PromptHandlerMiddleware
toolFilters []ToolFilterFunc
promptFilters []PromptFilterFunc
notificationHandlers map [string ]NotificationHandlerFunc
promptCompletionProvider PromptCompletionProvider
resourceCompletionProvider ResourceCompletionProvider
capabilities serverCapabilities
paginationLimit *int
sessions sync .Map
hooks *Hooks
taskHooks *TaskHooks
tasks map [string ]*taskEntry
expiredTasks map [string ]time .Time
maxConcurrentTasks *int
activeTasks int
inflightCancels sync .Map
}
func WithPaginationLimit (limit int ) ServerOption {
return func (s *MCPServer ) {
s .paginationLimit = &limit
}
}
type serverCapabilities struct {
tools *toolCapabilities
resources *resourceCapabilities
prompts *promptCapabilities
logging *bool
sampling *bool
elicitation *bool
roots *bool
tasks *taskCapabilities
completions *bool
experimental map [string ]any
}
type resourceCapabilities struct {
subscribe bool
listChanged bool
}
type promptCapabilities struct {
listChanged bool
}
type toolCapabilities struct {
listChanged bool
}
type taskCapabilities struct {
list bool
cancel bool
toolCallTasks bool
}
func WithResourceCapabilities (subscribe , listChanged bool ) ServerOption {
return func (s *MCPServer ) {
s .capabilities .resources = &resourceCapabilities {
subscribe : subscribe ,
listChanged : listChanged ,
}
}
}
func WithPromptCompletionProvider (provider PromptCompletionProvider ) ServerOption {
return func (s *MCPServer ) {
s .promptCompletionProvider = provider
}
}
func WithResourceCompletionProvider (provider ResourceCompletionProvider ) ServerOption {
return func (s *MCPServer ) {
s .resourceCompletionProvider = provider
}
}
func WithToolHandlerMiddleware (
toolHandlerMiddleware ToolHandlerMiddleware ,
) ServerOption {
return func (s *MCPServer ) {
s .toolMiddlewareMu .Lock ()
s .toolHandlerMiddlewares = append (s .toolHandlerMiddlewares , toolHandlerMiddleware )
s .toolMiddlewareMu .Unlock ()
}
}
func (s *MCPServer ) Use (mw ...ToolHandlerMiddleware ) {
s .toolMiddlewareMu .Lock ()
s .toolHandlerMiddlewares = append (s .toolHandlerMiddlewares , mw ...)
s .toolMiddlewareMu .Unlock ()
}
func WithResourceHandlerMiddleware (
resourceHandlerMiddleware ResourceHandlerMiddleware ,
) ServerOption {
return func (s *MCPServer ) {
s .resourceMiddlewareMu .Lock ()
s .resourceHandlerMiddlewares = append (s .resourceHandlerMiddlewares , resourceHandlerMiddleware )
s .resourceMiddlewareMu .Unlock ()
}
}
func WithResourceRecovery () ServerOption {
return WithResourceHandlerMiddleware (func (next ResourceHandlerFunc ) ResourceHandlerFunc {
return func (ctx context .Context , request mcp .ReadResourceRequest ) (result []mcp .ResourceContents , err error ) {
defer func () {
if r := recover (); r != nil {
err = fmt .Errorf (
"panic recovered in %s resource handler: %v" ,
request .Params .URI ,
r ,
)
}
}()
return next (ctx , request )
}
})
}
func WithToolFilter (
toolFilter ToolFilterFunc ,
) ServerOption {
return func (s *MCPServer ) {
s .toolFiltersMu .Lock ()
s .toolFilters = append (s .toolFilters , toolFilter )
s .toolFiltersMu .Unlock ()
}
}
func WithPromptHandlerMiddleware (
promptHandlerMiddleware PromptHandlerMiddleware ,
) ServerOption {
return func (s *MCPServer ) {
s .promptMiddlewareMu .Lock ()
s .promptHandlerMiddlewares = append (s .promptHandlerMiddlewares , promptHandlerMiddleware )
s .promptMiddlewareMu .Unlock ()
}
}
func WithPromptFilter (
promptFilter PromptFilterFunc ,
) ServerOption {
return func (s *MCPServer ) {
s .promptFiltersMu .Lock ()
s .promptFilters = append (s .promptFilters , promptFilter )
s .promptFiltersMu .Unlock ()
}
}
func WithRecovery () ServerOption {
return WithToolHandlerMiddleware (func (next ToolHandlerFunc ) ToolHandlerFunc {
return func (ctx context .Context , request mcp .CallToolRequest ) (result *mcp .CallToolResult , err error ) {
defer func () {
if r := recover (); r != nil {
err = fmt .Errorf (
"panic recovered in %s tool handler: %v" ,
request .Params .Name ,
r ,
)
}
}()
return next (ctx , request )
}
})
}
func WithHooks (hooks *Hooks ) ServerOption {
return func (s *MCPServer ) {
s .hooks = hooks
}
}
func (s *MCPServer ) GetHooks () *Hooks {
return s .hooks
}
func WithTaskHooks (taskHooks *TaskHooks ) ServerOption {
return func (s *MCPServer ) {
s .taskHooks = taskHooks
}
}
func WithMaxConcurrentTasks (limit int ) ServerOption {
return func (s *MCPServer ) {
s .maxConcurrentTasks = &limit
}
}
func WithPromptCapabilities (listChanged bool ) ServerOption {
return func (s *MCPServer ) {
s .capabilities .prompts = &promptCapabilities {
listChanged : listChanged ,
}
}
}
func WithToolCapabilities (listChanged bool ) ServerOption {
return func (s *MCPServer ) {
s .capabilities .tools = &toolCapabilities {
listChanged : listChanged ,
}
}
}
func WithLogging () ServerOption {
return func (s *MCPServer ) {
s .capabilities .logging = mcp .ToBoolPtr (true )
}
}
func WithElicitation () ServerOption {
return func (s *MCPServer ) {
s .capabilities .elicitation = mcp .ToBoolPtr (true )
}
}
func WithRoots () ServerOption {
return func (s *MCPServer ) {
s .capabilities .roots = mcp .ToBoolPtr (true )
}
}
func WithTaskCapabilities (list , cancel , toolCallTasks bool ) ServerOption {
return func (s *MCPServer ) {
s .capabilities .tasks = &taskCapabilities {
list : list ,
cancel : cancel ,
toolCallTasks : toolCallTasks ,
}
}
}
func WithInstructions (instructions string ) ServerOption {
return func (s *MCPServer ) {
s .instructions = instructions
}
}
func WithCompletions () ServerOption {
return func (s *MCPServer ) {
s .capabilities .completions = mcp .ToBoolPtr (true )
}
}
func WithExperimental (experimental map [string ]any ) ServerOption {
return func (s *MCPServer ) {
s .capabilities .experimental = experimental
}
}
func WithIcons (icons ...mcp .Icon ) ServerOption {
return func (s *MCPServer ) {
copied := make ([]mcp .Icon , len (icons ))
for i , icon := range icons {
copied [i ] = icon
if icon .Sizes != nil {
copied [i ].Sizes = make ([]string , len (icon .Sizes ))
copy (copied [i ].Sizes , icon .Sizes )
}
}
s .implementation .Icons = copied
}
}
func WithTitle (title string ) ServerOption {
return func (s *MCPServer ) {
s .implementation .Title = title
}
}
func WithDescription (description string ) ServerOption {
return func (s *MCPServer ) {
s .implementation .Description = description
}
}
func WithWebsiteURL (websiteURL string ) ServerOption {
return func (s *MCPServer ) {
s .implementation .WebsiteURL = websiteURL
}
}
func NewMCPServer (
name , version string ,
opts ...ServerOption ,
) *MCPServer {
s := &MCPServer {
resources : make (map [string ]resourceEntry ),
resourceTemplates : make (map [string ]resourceTemplateEntry ),
prompts : make (map [string ]mcp .Prompt ),
promptHandlers : make (map [string ]PromptHandlerFunc ),
tools : make (map [string ]ServerTool ),
taskTools : make (map [string ]ServerTaskTool ),
toolHandlerMiddlewares : make ([]ToolHandlerMiddleware , 0 ),
resourceHandlerMiddlewares : make ([]ResourceHandlerMiddleware , 0 ),
promptHandlerMiddlewares : make ([]PromptHandlerMiddleware , 0 ),
name : name ,
version : version ,
notificationHandlers : make (map [string ]NotificationHandlerFunc ),
tasks : make (map [string ]*taskEntry ),
expiredTasks : make (map [string ]time .Time ),
promptCompletionProvider : &DefaultPromptCompletionProvider {},
resourceCompletionProvider : &DefaultResourceCompletionProvider {},
capabilities : serverCapabilities {
tools : nil ,
resources : nil ,
prompts : nil ,
logging : nil ,
sampling : nil ,
elicitation : nil ,
roots : nil ,
tasks : nil ,
completions : nil ,
},
}
for _ , opt := range opts {
opt (s )
}
return s
}
func (s *MCPServer ) GenerateInProcessSessionID () string {
return GenerateInProcessSessionID ()
}
func (s *MCPServer ) AddResources (resources ...ServerResource ) {
s .implicitlyRegisterResourceCapabilities ()
s .resourcesMu .Lock ()
for _ , entry := range resources {
s .resources [entry .Resource .URI ] = resourceEntry {
resource : entry .Resource ,
handler : entry .Handler ,
}
}
s .resourcesMu .Unlock ()
if s .capabilities .resources .listChanged {
s .SendNotificationToAllClients (mcp .MethodNotificationResourcesListChanged , nil )
}
}
func (s *MCPServer ) SetResources (resources ...ServerResource ) {
s .resourcesMu .Lock ()
s .resources = make (map [string ]resourceEntry , len (resources ))
s .resourcesMu .Unlock ()
s .AddResources (resources ...)
}
func (s *MCPServer ) AddResource (
resource mcp .Resource ,
handler ResourceHandlerFunc ,
) {
s .AddResources (ServerResource {Resource : resource , Handler : handler })
}
func (s *MCPServer ) DeleteResources (uris ...string ) {
s .resourcesMu .Lock ()
var exists bool
for _ , uri := range uris {
if _ , ok := s .resources [uri ]; ok {
delete (s .resources , uri )
exists = true
}
}
s .resourcesMu .Unlock ()
if exists && s .capabilities .resources != nil && s .capabilities .resources .listChanged {
s .SendNotificationToAllClients (mcp .MethodNotificationResourcesListChanged , nil )
}
}
func (s *MCPServer ) RemoveResource (uri string ) {
s .resourcesMu .Lock ()
_ , exists := s .resources [uri ]
if exists {
delete (s .resources , uri )
}
s .resourcesMu .Unlock ()
if exists && s .capabilities .resources != nil && s .capabilities .resources .listChanged {
s .SendNotificationToAllClients (mcp .MethodNotificationResourcesListChanged , nil )
}
}
func (s *MCPServer ) AddResourceTemplates (resourceTemplates ...ServerResourceTemplate ) {
s .implicitlyRegisterResourceCapabilities ()
s .resourcesMu .Lock ()
for _ , entry := range resourceTemplates {
s .resourceTemplates [entry .Template .URITemplate .Raw ()] = resourceTemplateEntry {
template : entry .Template ,
handler : entry .Handler ,
}
}
s .resourcesMu .Unlock ()
if s .capabilities .resources .listChanged {
s .SendNotificationToAllClients (mcp .MethodNotificationResourcesListChanged , nil )
}
}
func (s *MCPServer ) SetResourceTemplates (templates ...ServerResourceTemplate ) {
s .resourcesMu .Lock ()
s .resourceTemplates = make (map [string ]resourceTemplateEntry , len (templates ))
s .resourcesMu .Unlock ()
s .AddResourceTemplates (templates ...)
}
func (s *MCPServer ) AddResourceTemplate (
template mcp .ResourceTemplate ,
handler ResourceTemplateHandlerFunc ,
) {
s .AddResourceTemplates (ServerResourceTemplate {Template : template , Handler : handler })
}
func (s *MCPServer ) AddPrompts (prompts ...ServerPrompt ) {
s .implicitlyRegisterPromptCapabilities ()
s .promptsMu .Lock ()
for _ , entry := range prompts {
s .prompts [entry .Prompt .Name ] = entry .Prompt
s .promptHandlers [entry .Prompt .Name ] = entry .Handler
}
s .promptsMu .Unlock ()
if s .capabilities .prompts .listChanged {
s .SendNotificationToAllClients (mcp .MethodNotificationPromptsListChanged , nil )
}
}
func (s *MCPServer ) AddPrompt (prompt mcp .Prompt , handler PromptHandlerFunc ) {
s .AddPrompts (ServerPrompt {Prompt : prompt , Handler : handler })
}
func (s *MCPServer ) SetPrompts (prompts ...ServerPrompt ) {
s .promptsMu .Lock ()
s .prompts = make (map [string ]mcp .Prompt , len (prompts ))
s .promptHandlers = make (map [string ]PromptHandlerFunc , len (prompts ))
s .promptsMu .Unlock ()
s .AddPrompts (prompts ...)
}
func (s *MCPServer ) DeletePrompts (names ...string ) {
s .promptsMu .Lock ()
var exists bool
for _ , name := range names {
if _ , ok := s .prompts [name ]; ok {
delete (s .prompts , name )
delete (s .promptHandlers , name )
exists = true
}
}
s .promptsMu .Unlock ()
if exists && s .capabilities .prompts != nil && s .capabilities .prompts .listChanged {
s .SendNotificationToAllClients (mcp .MethodNotificationPromptsListChanged , nil )
}
}
func (s *MCPServer ) AddTool (tool mcp .Tool , handler ToolHandlerFunc ) {
s .AddTools (ServerTool {Tool : tool , Handler : handler })
}
func (s *MCPServer ) AddTaskTool (tool mcp .Tool , handler TaskToolHandlerFunc ) {
s .AddTaskTools (ServerTaskTool {Tool : tool , Handler : handler })
}
func (s *MCPServer ) implicitlyRegisterToolCapabilities () {
s .implicitlyRegisterCapabilities (
func () bool { return s .capabilities .tools != nil },
func () { s .capabilities .tools = &toolCapabilities {listChanged : true } },
)
}
func (s *MCPServer ) implicitlyRegisterResourceCapabilities () {
s .implicitlyRegisterCapabilities (
func () bool { return s .capabilities .resources != nil },
func () { s .capabilities .resources = &resourceCapabilities {} },
)
}
func (s *MCPServer ) implicitlyRegisterPromptCapabilities () {
s .implicitlyRegisterCapabilities (
func () bool { return s .capabilities .prompts != nil },
func () { s .capabilities .prompts = &promptCapabilities {} },
)
}
func (s *MCPServer ) implicitlyRegisterCapabilities (check func () bool , register func ()) {
s .capabilitiesMu .RLock ()
if check () {
s .capabilitiesMu .RUnlock ()
return
}
s .capabilitiesMu .RUnlock ()
s .capabilitiesMu .Lock ()
if !check () {
register ()
}
s .capabilitiesMu .Unlock ()
}
func (s *MCPServer ) AddTools (tools ...ServerTool ) {
s .implicitlyRegisterToolCapabilities ()
s .toolsMu .Lock ()
for _ , entry := range tools {
name := entry .Tool .Name
if _ , exists := s .taskTools [name ]; exists {
s .toolsMu .Unlock ()
panic (fmt .Sprintf ("tool name '%s' already registered as task tool" , name ))
}
s .tools [name ] = entry
}
s .toolsMu .Unlock ()
if s .capabilities .tools .listChanged {
s .SendNotificationToAllClients (mcp .MethodNotificationToolsListChanged , nil )
}
}
func (s *MCPServer ) AddTaskTools (taskTools ...ServerTaskTool ) {
s .implicitlyRegisterToolCapabilities ()
s .toolsMu .Lock ()
for _ , entry := range taskTools {
name := entry .Tool .Name
if _ , exists := s .tools [name ]; exists {
s .toolsMu .Unlock ()
panic (fmt .Sprintf ("task tool name '%s' already registered as regular tool" , name ))
}
s .taskTools [name ] = entry
}
s .toolsMu .Unlock ()
if s .capabilities .tools .listChanged {
s .SendNotificationToAllClients (mcp .MethodNotificationToolsListChanged , nil )
}
}
func (s *MCPServer ) SetTools (tools ...ServerTool ) {
s .toolsMu .Lock ()
s .tools = make (map [string ]ServerTool , len (tools ))
s .toolsMu .Unlock ()
s .AddTools (tools ...)
}
func (s *MCPServer ) GetTool (toolName string ) *ServerTool {
s .toolsMu .RLock ()
defer s .toolsMu .RUnlock ()
if tool , ok := s .tools [toolName ]; ok {
return &tool
}
return nil
}
func (s *MCPServer ) ListTools () map [string ]*ServerTool {
s .toolsMu .RLock ()
defer s .toolsMu .RUnlock ()
if len (s .tools ) == 0 {
return nil
}
toolsCopy := make (map [string ]*ServerTool , len (s .tools ))
for name , tool := range s .tools {
toolsCopy [name ] = &tool
}
return toolsCopy
}
func (s *MCPServer ) DeleteTools (names ...string ) {
s .toolsMu .Lock ()
var exists bool
for _ , name := range names {
if _ , ok := s .tools [name ]; ok {
delete (s .tools , name )
exists = true
}
}
s .toolsMu .Unlock ()
if exists && s .capabilities .tools != nil && s .capabilities .tools .listChanged {
s .SendNotificationToAllClients (mcp .MethodNotificationToolsListChanged , nil )
}
}
func (s *MCPServer ) AddNotificationHandler (
method string ,
handler NotificationHandlerFunc ,
) {
s .notificationHandlersMu .Lock ()
defer s .notificationHandlersMu .Unlock ()
s .notificationHandlers [method ] = handler
}
func (s *MCPServer ) handleInitialize (
ctx context .Context ,
_ any ,
request mcp .InitializeRequest ,
) (*mcp .InitializeResult , *requestError ) {
capabilities := mcp .ServerCapabilities {}
if s .capabilities .resources != nil {
capabilities .Resources = &struct {
Subscribe bool `json:"subscribe,omitempty"`
ListChanged bool `json:"listChanged,omitempty"`
}{
Subscribe : s .capabilities .resources .subscribe ,
ListChanged : s .capabilities .resources .listChanged ,
}
}
if s .capabilities .prompts != nil {
capabilities .Prompts = &struct {
ListChanged bool `json:"listChanged,omitempty"`
}{
ListChanged : s .capabilities .prompts .listChanged ,
}
}
if s .capabilities .tools != nil {
capabilities .Tools = &struct {
ListChanged bool `json:"listChanged,omitempty"`
}{
ListChanged : s .capabilities .tools .listChanged ,
}
}
if s .capabilities .logging != nil && *s .capabilities .logging {
capabilities .Logging = &struct {}{}
}
if s .capabilities .sampling != nil && *s .capabilities .sampling {
capabilities .Sampling = &struct {}{}
}
if s .capabilities .elicitation != nil && *s .capabilities .elicitation {
capabilities .Elicitation = &mcp .ElicitationCapability {}
}
if s .capabilities .roots != nil && *s .capabilities .roots {
capabilities .Roots = &struct {}{}
}
if s .capabilities .tasks != nil {
tasksCapability := &mcp .TasksCapability {}
if s .capabilities .tasks .list {
tasksCapability .List = &struct {}{}
}
if s .capabilities .tasks .cancel {
tasksCapability .Cancel = &struct {}{}
}
if s .capabilities .tasks .toolCallTasks {
tasksCapability .Requests = &mcp .TaskRequestsCapability {
Tools : &struct {
Call *struct {} `json:"call,omitempty"`
}{
Call : &struct {}{},
},
}
}
capabilities .Tasks = tasksCapability
}
if s .capabilities .completions != nil && *s .capabilities .completions {
capabilities .Completions = &struct {}{}
}
if s .capabilities .experimental != nil {
capabilities .Experimental = s .capabilities .experimental
}
result := mcp .InitializeResult {
ProtocolVersion : s .protocolVersion (request .Params .ProtocolVersion ),
ServerInfo : mcp .Implementation {
Name : s .name ,
Version : s .version ,
Title : s .implementation .Title ,
Description : s .implementation .Description ,
WebsiteURL : s .implementation .WebsiteURL ,
Icons : s .implementation .Icons ,
},
Capabilities : capabilities ,
Instructions : s .instructions ,
}
if session := ClientSessionFromContext (ctx ); session != nil {
session .Initialize ()
if sessionWithClientInfo , ok := session .(SessionWithClientInfo ); ok {
sessionWithClientInfo .SetClientInfo (request .Params .ClientInfo )
sessionWithClientInfo .SetClientCapabilities (request .Params .Capabilities )
}
}
return &result , nil
}
func (s *MCPServer ) protocolVersion (clientVersion string ) string {
if len (clientVersion ) == 0 {
clientVersion = "2025-03-26"
}
if slices .Contains (mcp .ValidProtocolVersions , clientVersion ) {
return clientVersion
}
return mcp .LATEST_PROTOCOL_VERSION
}
func (s *MCPServer ) handlePing (
_ context .Context ,
_ any ,
_ mcp .PingRequest ,
) (*mcp .EmptyResult , *requestError ) {
return &mcp .EmptyResult {}, nil
}
func (s *MCPServer ) handleSetLevel (
ctx context .Context ,
id any ,
request mcp .SetLevelRequest ,
) (*mcp .EmptyResult , *requestError ) {
clientSession := ClientSessionFromContext (ctx )
if clientSession == nil || !clientSession .Initialized () {
return nil , &requestError {
id : id ,
code : mcp .INTERNAL_ERROR ,
err : ErrSessionNotInitialized ,
}
}
sessionLogging , ok := clientSession .(SessionWithLogging )
if !ok {
return nil , &requestError {
id : id ,
code : mcp .INTERNAL_ERROR ,
err : ErrSessionDoesNotSupportLogging ,
}
}
level := request .Params .Level
switch level {
case mcp .LoggingLevelDebug , mcp .LoggingLevelInfo , mcp .LoggingLevelNotice ,
mcp .LoggingLevelWarning , mcp .LoggingLevelError , mcp .LoggingLevelCritical ,
mcp .LoggingLevelAlert , mcp .LoggingLevelEmergency :
default :
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : fmt .Errorf ("invalid logging level '%s'" , level ),
}
}
sessionLogging .SetLogLevel (level )
return &mcp .EmptyResult {}, nil
}
func listByPagination[T mcp .Named ](
_ context .Context ,
s *MCPServer ,
cursor mcp .Cursor ,
allElements []T ,
) ([]T , mcp .Cursor , error ) {
startPos := 0
if cursor != "" {
c , err := base64 .StdEncoding .DecodeString (string (cursor ))
if err != nil {
return nil , "" , err
}
cString := string (c )
startPos = sort .Search (len (allElements ), func (i int ) bool {
return allElements [i ].GetName () > cString
})
}
endPos := len (allElements )
if s .paginationLimit != nil {
if len (allElements ) > startPos +*s .paginationLimit {
endPos = startPos + *s .paginationLimit
}
}
elementsToReturn := allElements [startPos :endPos ]
nextCursor := func () mcp .Cursor {
if s .paginationLimit != nil && len (elementsToReturn ) >= *s .paginationLimit {
nc := elementsToReturn [len (elementsToReturn )-1 ].GetName ()
toString := base64 .StdEncoding .EncodeToString ([]byte (nc ))
return mcp .Cursor (toString )
}
return ""
}()
return elementsToReturn , nextCursor , nil
}
func (s *MCPServer ) handleListResources (
ctx context .Context ,
id any ,
request mcp .ListResourcesRequest ,
) (*mcp .ListResourcesResult , *requestError ) {
s .resourcesMu .RLock ()
resourceMap := make (map [string ]mcp .Resource , len (s .resources ))
for uri , entry := range s .resources {
resourceMap [uri ] = entry .resource
}
s .resourcesMu .RUnlock ()
session := ClientSessionFromContext (ctx )
if session != nil {
if sessionWithResources , ok := session .(SessionWithResources ); ok {
if sessionResources := sessionWithResources .GetSessionResources (); sessionResources != nil {
for uri , serverResource := range sessionResources {
resourceMap [uri ] = serverResource .Resource
}
}
}
}
resourcesList := slices .SortedFunc (maps .Values (resourceMap ), func (a , b mcp .Resource ) int {
return cmp .Compare (a .Name , b .Name )
})
resourcesToReturn , nextCursor , err := listByPagination (
ctx ,
s ,
request .Params .Cursor ,
resourcesList ,
)
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : err ,
}
}
if resourcesToReturn == nil {
resourcesToReturn = []mcp .Resource {}
}
result := mcp .ListResourcesResult {
Resources : resourcesToReturn ,
PaginatedResult : mcp .PaginatedResult {
NextCursor : nextCursor ,
},
}
return &result , nil
}
func (s *MCPServer ) handleListResourceTemplates (
ctx context .Context ,
id any ,
request mcp .ListResourceTemplatesRequest ,
) (*mcp .ListResourceTemplatesResult , *requestError ) {
s .resourcesMu .RLock ()
templateMap := make (map [string ]mcp .ResourceTemplate , len (s .resourceTemplates ))
for uri , entry := range s .resourceTemplates {
templateMap [uri ] = entry .template
}
s .resourcesMu .RUnlock ()
session := ClientSessionFromContext (ctx )
if session != nil {
if sessionWithTemplates , ok := session .(SessionWithResourceTemplates ); ok {
if sessionTemplates := sessionWithTemplates .GetSessionResourceTemplates (); sessionTemplates != nil {
for uriTemplate , serverTemplate := range sessionTemplates {
templateMap [uriTemplate ] = serverTemplate .Template
}
}
}
}
templates := make ([]mcp .ResourceTemplate , 0 , len (templateMap ))
for _ , template := range templateMap {
templates = append (templates , template )
}
sort .Slice (templates , func (i , j int ) bool {
return templates [i ].Name < templates [j ].Name
})
templatesToReturn , nextCursor , err := listByPagination (
ctx ,
s ,
request .Params .Cursor ,
templates ,
)
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : err ,
}
}
result := mcp .ListResourceTemplatesResult {
ResourceTemplates : templatesToReturn ,
PaginatedResult : mcp .PaginatedResult {
NextCursor : nextCursor ,
},
}
return &result , nil
}
func (s *MCPServer ) handleReadResource (
ctx context .Context ,
id any ,
request mcp .ReadResourceRequest ,
) (*mcp .ReadResourceResult , *requestError ) {
s .resourcesMu .RLock ()
var handler ResourceHandlerFunc
var ok bool
session := ClientSessionFromContext (ctx )
if session != nil {
if sessionWithResources , typeAssertOk := session .(SessionWithResources ); typeAssertOk {
if sessionResources := sessionWithResources .GetSessionResources (); sessionResources != nil {
resource , sessionOk := sessionResources [request .Params .URI ]
if sessionOk {
handler = resource .Handler
ok = true
}
}
}
}
if !ok {
globalResource , rok := s .resources [request .Params .URI ]
if rok {
handler = globalResource .handler
ok = true
}
}
if ok {
s .resourcesMu .RUnlock ()
finalHandler := handler
s .resourceMiddlewareMu .RLock ()
mw := s .resourceHandlerMiddlewares
for i := len (mw ) - 1 ; i >= 0 ; i -- {
finalHandler = mw [i ](finalHandler )
}
s .resourceMiddlewareMu .RUnlock ()
contents , err := finalHandler (ctx , request )
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INTERNAL_ERROR ,
err : err ,
}
}
return &mcp .ReadResourceResult {Contents : contents }, nil
}
var matchedHandler ResourceTemplateHandlerFunc
var matched bool
if session != nil {
if sessionWithTemplates , ok := session .(SessionWithResourceTemplates ); ok {
sessionTemplates := sessionWithTemplates .GetSessionResourceTemplates ()
for _ , serverTemplate := range sessionTemplates {
if serverTemplate .Template .URITemplate == nil {
continue
}
if matchesTemplate (request .Params .URI , serverTemplate .Template .URITemplate ) {
matchedHandler = serverTemplate .Handler
matched = true
matchedVars := serverTemplate .Template .URITemplate .Match (request .Params .URI )
request .Params .Arguments = make (map [string ]any , len (matchedVars ))
for name , value := range matchedVars {
request .Params .Arguments [name ] = value .V
}
break
}
}
}
}
if !matched {
for _ , entry := range s .resourceTemplates {
template := entry .template
if template .URITemplate == nil {
continue
}
if matchesTemplate (request .Params .URI , template .URITemplate ) {
matchedHandler = entry .handler
matched = true
matchedVars := template .URITemplate .Match (request .Params .URI )
request .Params .Arguments = make (map [string ]any , len (matchedVars ))
for name , value := range matchedVars {
request .Params .Arguments [name ] = value .V
}
break
}
}
}
s .resourcesMu .RUnlock ()
if matched {
s .resourceMiddlewareMu .RLock ()
finalHandler := ResourceHandlerFunc (matchedHandler )
mw := s .resourceHandlerMiddlewares
for i := len (mw ) - 1 ; i >= 0 ; i -- {
finalHandler = mw [i ](finalHandler )
}
s .resourceMiddlewareMu .RUnlock ()
contents , err := finalHandler (ctx , request )
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INTERNAL_ERROR ,
err : err ,
}
}
return &mcp .ReadResourceResult {Contents : contents }, nil
}
return nil , &requestError {
id : id ,
code : mcp .RESOURCE_NOT_FOUND ,
err : fmt .Errorf (
"handler not found for resource URI '%s': %w" ,
request .Params .URI ,
ErrResourceNotFound ,
),
}
}
func matchesTemplate(uri string , template *mcp .URITemplate ) bool {
return template .Regexp ().MatchString (uri )
}
func (s *MCPServer ) handleListPrompts (
ctx context .Context ,
id any ,
request mcp .ListPromptsRequest ,
) (*mcp .ListPromptsResult , *requestError ) {
s .promptsMu .RLock ()
prompts := make ([]mcp .Prompt , 0 , len (s .prompts ))
for _ , prompt := range s .prompts {
prompts = append (prompts , prompt )
}
s .promptsMu .RUnlock ()
sort .Slice (prompts , func (i , j int ) bool {
return prompts [i ].Name < prompts [j ].Name
})
s .promptFiltersMu .RLock ()
if len (s .promptFilters ) > 0 {
for _ , filter := range s .promptFilters {
prompts = filter (ctx , prompts )
}
}
s .promptFiltersMu .RUnlock ()
promptsToReturn , nextCursor , err := listByPagination (
ctx ,
s ,
request .Params .Cursor ,
prompts ,
)
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : err ,
}
}
result := mcp .ListPromptsResult {
Prompts : promptsToReturn ,
PaginatedResult : mcp .PaginatedResult {
NextCursor : nextCursor ,
},
}
return &result , nil
}
func (s *MCPServer ) handleGetPrompt (
ctx context .Context ,
id any ,
request mcp .GetPromptRequest ,
) (*mcp .GetPromptResult , *requestError ) {
s .promptsMu .RLock ()
handler , ok := s .promptHandlers [request .Params .Name ]
s .promptsMu .RUnlock ()
if !ok {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : fmt .Errorf ("prompt '%s' not found: %w" , request .Params .Name , ErrPromptNotFound ),
}
}
finalHandler := handler
s .promptMiddlewareMu .RLock ()
mw := s .promptHandlerMiddlewares
for i := len (mw ) - 1 ; i >= 0 ; i -- {
finalHandler = mw [i ](finalHandler )
}
s .promptMiddlewareMu .RUnlock ()
result , err := finalHandler (ctx , request )
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INTERNAL_ERROR ,
err : err ,
}
}
return result , nil
}
func (s *MCPServer ) handleListTools (
ctx context .Context ,
id any ,
request mcp .ListToolsRequest ,
) (*mcp .ListToolsResult , *requestError ) {
s .toolsMu .RLock ()
tools := make ([]mcp .Tool , 0 , len (s .tools )+len (s .taskTools ))
toolNames := make ([]string , 0 , len (s .tools )+len (s .taskTools ))
for name := range s .tools {
toolNames = append (toolNames , name )
}
for name := range s .taskTools {
toolNames = append (toolNames , name )
}
sort .Strings (toolNames )
for _ , name := range toolNames {
if tool , ok := s .tools [name ]; ok {
tools = append (tools , tool .Tool )
} else if taskTool , ok := s .taskTools [name ]; ok {
tools = append (tools , taskTool .Tool )
}
}
s .toolsMu .RUnlock ()
session := ClientSessionFromContext (ctx )
if session != nil {
if sessionWithTools , ok := session .(SessionWithTools ); ok {
if sessionTools := sessionWithTools .GetSessionTools (); sessionTools != nil {
toolMap := make (map [string ]mcp .Tool )
for _ , tool := range tools {
toolMap [tool .Name ] = tool
}
for name , serverTool := range sessionTools {
toolMap [name ] = serverTool .Tool
}
tools = make ([]mcp .Tool , 0 , len (toolMap ))
for _ , tool := range toolMap {
tools = append (tools , tool )
}
sort .Slice (tools , func (i , j int ) bool {
return tools [i ].Name < tools [j ].Name
})
}
}
}
s .toolFiltersMu .RLock ()
if len (s .toolFilters ) > 0 {
for _ , filter := range s .toolFilters {
tools = filter (ctx , tools )
}
}
s .toolFiltersMu .RUnlock ()
toolsToReturn , nextCursor , err := listByPagination (
ctx ,
s ,
request .Params .Cursor ,
tools ,
)
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : err ,
}
}
result := mcp .ListToolsResult {
Tools : toolsToReturn ,
PaginatedResult : mcp .PaginatedResult {
NextCursor : nextCursor ,
},
}
return &result , nil
}
func (s *MCPServer ) handleToolCall (
ctx context .Context ,
id any ,
request mcp .CallToolRequest ,
) (any , *requestError ) {
var tool ServerTool
var ok bool
var taskToolOnly bool
session := ClientSessionFromContext (ctx )
if session != nil {
if sessionWithTools , typeAssertOk := session .(SessionWithTools ); typeAssertOk {
if sessionTools := sessionWithTools .GetSessionTools (); sessionTools != nil {
var sessionOk bool
tool , sessionOk = sessionTools [request .Params .Name ]
if sessionOk {
ok = true
}
}
}
}
if !ok {
s .toolsMu .RLock ()
tool , ok = s .tools [request .Params .Name ]
if !ok {
if taskTool , taskOk := s .taskTools [request .Params .Name ]; taskOk {
tool = ServerTool {
Tool : taskTool .Tool ,
Handler : nil ,
}
ok = true
taskToolOnly = true
}
}
s .toolsMu .RUnlock ()
}
if !ok {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : fmt .Errorf ("tool '%s' not found: %w" , request .Params .Name , ErrToolNotFound ),
}
}
if tool .Tool .Execution != nil && tool .Tool .Execution .TaskSupport == mcp .TaskSupportRequired {
if request .Params .Task == nil {
return nil , &requestError {
id : id ,
code : mcp .METHOD_NOT_FOUND ,
err : fmt .Errorf ("tool '%s' requires task augmentation" , request .Params .Name ),
}
}
}
shouldExecuteAsTask := request .Params .Task != nil &&
tool .Tool .Execution != nil &&
(tool .Tool .Execution .TaskSupport == mcp .TaskSupportOptional ||
tool .Tool .Execution .TaskSupport == mcp .TaskSupportRequired )
if shouldExecuteAsTask {
return s .handleTaskAugmentedToolCall (ctx , id , request )
}
if taskToolOnly {
return nil , &requestError {
id : id ,
code : mcp .METHOD_NOT_FOUND ,
err : fmt .Errorf ("tool '%s' does not support synchronous execution" , request .Params .Name ),
}
}
finalHandler := tool .Handler
s .toolMiddlewareMu .RLock ()
mw := s .toolHandlerMiddlewares
for i := len (mw ) - 1 ; i >= 0 ; i -- {
finalHandler = mw [i ](finalHandler )
}
s .toolMiddlewareMu .RUnlock ()
result , err := finalHandler (ctx , request )
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INTERNAL_ERROR ,
err : err ,
}
}
return result , nil
}
func (s *MCPServer ) handleTaskAugmentedToolCall (
ctx context .Context ,
id any ,
request mcp .CallToolRequest ,
) (*mcp .CreateTaskResult , *requestError ) {
s .toolsMu .RLock ()
taskTool , isTaskTool := s .taskTools [request .Params .Name ]
regularTool , isRegularTool := s .tools [request .Params .Name ]
s .toolsMu .RUnlock ()
var toolToUse ServerTaskTool
var hasTaskHandler bool
if isTaskTool {
toolToUse = taskTool
hasTaskHandler = true
} else if isRegularTool {
if regularTool .Tool .Execution == nil ||
(regularTool .Tool .Execution .TaskSupport != mcp .TaskSupportOptional &&
regularTool .Tool .Execution .TaskSupport != mcp .TaskSupportRequired ) {
return nil , &requestError {
id : id ,
code : mcp .METHOD_NOT_FOUND ,
err : fmt .Errorf ("tool '%s' does not support task augmentation" , request .Params .Name ),
}
}
hasTaskHandler = false
} else {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : fmt .Errorf ("tool '%s' not found" , request .Params .Name ),
}
}
taskID := uuid .New ().String ()
var ttl *int64
if request .Params .Task != nil {
ttl = request .Params .Task .TTL
}
entry , err := s .createTask (ctx , taskID , request .Params .Name , ttl , nil )
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INTERNAL_ERROR ,
err : err ,
}
}
if hasTaskHandler {
go s .executeTaskTool (ctx , entry , toolToUse , request )
} else {
go s .executeRegularToolAsTask (ctx , entry , regularTool , request )
}
s .tasksMu .RLock ()
taskCopy := entry .task
s .tasksMu .RUnlock ()
return &mcp .CreateTaskResult {
Task : taskCopy ,
}, nil
}
func (s *MCPServer ) executeTaskTool (
ctx context .Context ,
entry *taskEntry ,
taskTool ServerTaskTool ,
request mcp .CallToolRequest ,
) {
taskCtx , cancel := context .WithCancel (ctx )
defer cancel ()
s .tasksMu .Lock ()
entry .cancelFunc = cancel
s .tasksMu .Unlock ()
result , err := taskTool .Handler (taskCtx , request )
if err != nil {
if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) {
s .tasksMu .Lock ()
alreadyCancelled := entry .task .Status == mcp .TaskStatusCancelled
s .tasksMu .Unlock ()
if !alreadyCancelled {
cancelledAt := time .Now ()
duration := cancelledAt .Sub (entry .createdAt )
s .tasksMu .Lock ()
if !entry .completed {
entry .task .Status = mcp .TaskStatusCancelled
entry .task .StatusMessage = err .Error()
entry .task .LastUpdatedAt = cancelledAt .UTC ().Format (time .RFC3339 )
entry .completed = true
close (entry .done )
s .activeTasks --
s .sendTaskStatusNotification (entry .task )
if s .taskHooks != nil {
metrics := TaskMetrics {
TaskID : entry .task .TaskId ,
ToolName : entry .toolName ,
Status : entry .task .Status ,
StatusMessage : entry .task .StatusMessage ,
CreatedAt : entry .createdAt ,
CompletedAt : &cancelledAt ,
Duration : duration ,
SessionID : entry .sessionID ,
}
s .taskHooks .taskCancelled (ctx , metrics )
}
}
s .tasksMu .Unlock ()
}
return
}
s .completeTask (entry , nil , err )
return
}
s .completeTask (entry , result , nil )
}
func (s *MCPServer ) executeRegularToolAsTask (
ctx context .Context ,
entry *taskEntry ,
regularTool ServerTool ,
request mcp .CallToolRequest ,
) {
taskCtx , cancel := context .WithCancel (ctx )
defer cancel ()
s .tasksMu .Lock ()
entry .cancelFunc = cancel
s .tasksMu .Unlock ()
finalHandler := regularTool .Handler
s .toolMiddlewareMu .RLock ()
mw := s .toolHandlerMiddlewares
for i := len (mw ) - 1 ; i >= 0 ; i -- {
finalHandler = mw [i ](finalHandler )
}
s .toolMiddlewareMu .RUnlock ()
result , err := finalHandler (taskCtx , request )
if err != nil {
if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) {
s .tasksMu .Lock ()
alreadyCancelled := entry .task .Status == mcp .TaskStatusCancelled
s .tasksMu .Unlock ()
if !alreadyCancelled {
cancelledAt := time .Now ()
duration := cancelledAt .Sub (entry .createdAt )
s .tasksMu .Lock ()
if !entry .completed {
entry .task .Status = mcp .TaskStatusCancelled
entry .task .StatusMessage = err .Error()
entry .task .LastUpdatedAt = cancelledAt .UTC ().Format (time .RFC3339 )
entry .completed = true
close (entry .done )
s .activeTasks --
s .sendTaskStatusNotification (entry .task )
if s .taskHooks != nil {
metrics := TaskMetrics {
TaskID : entry .task .TaskId ,
ToolName : entry .toolName ,
Status : entry .task .Status ,
StatusMessage : entry .task .StatusMessage ,
CreatedAt : entry .createdAt ,
CompletedAt : &cancelledAt ,
Duration : duration ,
SessionID : entry .sessionID ,
}
s .taskHooks .taskCancelled (ctx , metrics )
}
}
s .tasksMu .Unlock ()
}
return
}
s .completeTask (entry , nil , err )
return
}
s .completeTask (entry , result , nil )
}
func (s *MCPServer ) handleNotification (
ctx context .Context ,
notification mcp .JSONRPCNotification ,
) mcp .JSONRPCMessage {
if notification .Method == "notifications/cancelled" {
if reqID , ok := notification .Params .AdditionalFields ["requestId" ]; ok {
key := inflightKey (ctx , reqID )
if cancel , loaded := s .inflightCancels .LoadAndDelete (key ); loaded {
if cancelFunc , ok := cancel .(context .CancelFunc ); ok {
cancelFunc ()
}
}
}
return nil
}
s .notificationHandlersMu .RLock ()
handler , ok := s .notificationHandlers [notification .Method ]
s .notificationHandlersMu .RUnlock ()
if ok {
handler (ctx , notification )
}
return nil
}
func inflightKey(ctx context .Context , requestID any ) string {
if session := ClientSessionFromContext (ctx ); session != nil {
return fmt .Sprintf ("%s:%v" , session .SessionID (), requestID )
}
return fmt .Sprintf (":%v" , requestID )
}
func createResponse(id any , result any ) mcp .JSONRPCMessage {
return mcp .NewJSONRPCResultResponse (mcp .NewRequestId (id ), result )
}
func createErrorResponse(
id any ,
code int ,
message string ,
) mcp .JSONRPCMessage {
return mcp .JSONRPCError {
JSONRPC : mcp .JSONRPC_VERSION ,
ID : mcp .NewRequestId (id ),
Error : mcp .NewJSONRPCErrorDetails (code , message , nil ),
}
}
func (s *MCPServer ) handleGetTask (
ctx context .Context ,
id any ,
request mcp .GetTaskRequest ,
) (*mcp .GetTaskResult , *requestError ) {
task , _ , err := s .getTask (ctx , request .Params .TaskId )
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : err ,
}
}
result := mcp .NewGetTaskResult (task )
return &result , nil
}
func (s *MCPServer ) handleListTasks (
ctx context .Context ,
id any ,
request mcp .ListTasksRequest ,
) (*mcp .ListTasksResult , *requestError ) {
tasks := s .listTasks (ctx )
sort .Slice (tasks , func (i , j int ) bool {
return tasks [i ].TaskId < tasks [j ].TaskId
})
tasksToReturn , nextCursor , err := listByPagination (
ctx ,
s ,
request .Params .Cursor ,
tasks ,
)
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : err ,
}
}
result := mcp .ListTasksResult {
Tasks : tasksToReturn ,
PaginatedResult : mcp .PaginatedResult {
NextCursor : nextCursor ,
},
}
return &result , nil
}
func (s *MCPServer ) handleTaskResult (
ctx context .Context ,
id any ,
request mcp .TaskResultRequest ,
) (*mcp .TaskResultResult , *requestError ) {
task , done , err := s .getTask (ctx , request .Params .TaskId )
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : err ,
}
}
if !task .Status .IsTerminal () {
select {
case <- done :
case <- ctx .Done ():
return nil , &requestError {
id : id ,
code : mcp .REQUEST_INTERRUPTED ,
err : ctx .Err (),
}
}
}
entry , err := s .getTaskEntry (ctx , request .Params .TaskId )
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : err ,
}
}
s .tasksMu .RLock ()
storedResult := entry .result
resultErr := entry .resultErr
taskID := entry .task .TaskId
s .tasksMu .RUnlock ()
if resultErr != nil {
return nil , &requestError {
id : id ,
code : mcp .INTERNAL_ERROR ,
err : resultErr ,
}
}
result := &mcp .TaskResultResult {
Result : mcp .Result {
Meta : mcp .WithRelatedTask (taskID ),
},
}
switch taskResult := storedResult .(type ) {
case *mcp .CallToolResult :
result .Content = taskResult .Content
result .StructuredContent = taskResult .StructuredContent
result .IsError = taskResult .IsError
mergeTaskResultMeta (result , taskResult .Meta )
case *mcp .CreateTaskResult :
result .Content = taskResult .Content
result .StructuredContent = taskResult .StructuredContent
result .IsError = taskResult .IsError
mergeTaskResultMeta (result , taskResult .Meta )
}
return result , nil
}
func mergeTaskResultMeta(result *mcp .TaskResultResult , meta *mcp .Meta ) {
if meta == nil {
return
}
if result .Meta .AdditionalFields == nil {
result .Meta .AdditionalFields = make (map [string ]any )
}
for k , v := range meta .AdditionalFields {
if k != mcp .RelatedTaskMetaKey {
result .Meta .AdditionalFields [k ] = v
}
}
}
func (s *MCPServer ) handleCancelTask (
ctx context .Context ,
id any ,
request mcp .CancelTaskRequest ,
) (*mcp .CancelTaskResult , *requestError ) {
err := s .cancelTask (ctx , request .Params .TaskId )
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : err ,
}
}
task , _ , err := s .getTask (ctx , request .Params .TaskId )
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INVALID_PARAMS ,
err : err ,
}
}
result := mcp .NewCancelTaskResult (task )
return &result , nil
}
func (s *MCPServer ) handleComplete (
ctx context .Context ,
id any ,
request mcp .CompleteRequest ,
) (*mcp .CompleteResult , *requestError ) {
var completion *mcp .Completion
var err error
switch ref := request .Params .Ref .(type ) {
case mcp .PromptReference :
completion , err = s .promptCompletionProvider .CompletePromptArgument (
ctx ,
ref .Name ,
request .Params .Argument ,
request .Params .Context ,
)
case mcp .ResourceReference :
completion , err = s .resourceCompletionProvider .CompleteResourceArgument (
ctx ,
ref .URI ,
request .Params .Argument ,
request .Params .Context ,
)
default :
return nil , &requestError {
id : id ,
code : mcp .INVALID_REQUEST ,
err : fmt .Errorf ("unknown reference type: %v" , ref ),
}
}
if err != nil {
return nil , &requestError {
id : id ,
code : mcp .INTERNAL_ERROR ,
err : err ,
}
}
if completion == nil {
return &mcp .CompleteResult {}, nil
}
return &mcp .CompleteResult {
Completion : *completion ,
}, nil
}
func (s *MCPServer ) createTask (ctx context .Context , taskID string , toolName string , ttl *int64 , pollInterval *int64 ) (*taskEntry , error ) {
opts := []mcp .TaskOption {}
if ttl != nil {
opts = append (opts , mcp .WithTaskTTL (*ttl ))
}
if pollInterval != nil {
opts = append (opts , mcp .WithTaskPollInterval (*pollInterval ))
}
task := mcp .NewTask (taskID , opts ...)
createdAt := time .Now ()
entry := &taskEntry {
task : task ,
sessionID : getSessionID (ctx ),
toolName : toolName ,
createdAt : createdAt ,
done : make (chan struct {}),
}
s .tasksMu .Lock ()
defer s .tasksMu .Unlock ()
if s .maxConcurrentTasks != nil && *s .maxConcurrentTasks > 0 {
if s .activeTasks >= *s .maxConcurrentTasks {
return nil , fmt .Errorf ("max concurrent tasks limit reached (%d)" , *s .maxConcurrentTasks )
}
}
s .activeTasks ++
s .tasks [taskID ] = entry
if s .taskHooks != nil {
metrics := TaskMetrics {
TaskID : taskID ,
ToolName : toolName ,
Status : task .Status ,
CreatedAt : createdAt ,
SessionID : getSessionID (ctx ),
}
s .taskHooks .taskCreated (ctx , metrics )
}
if ttl != nil && *ttl > 0 {
go s .scheduleTaskCleanup (taskID , *ttl )
}
return entry , nil
}
func (s *MCPServer ) getTask (ctx context .Context , taskID string ) (mcp .Task , chan struct {}, error ) {
s .tasksMu .RLock ()
entry , exists := s .tasks [taskID ]
if !exists {
if _ , wasExpired := s .expiredTasks [taskID ]; wasExpired {
s .tasksMu .RUnlock ()
return mcp .Task {}, nil , fmt .Errorf ("task has expired" )
}
s .tasksMu .RUnlock ()
return mcp .Task {}, nil , fmt .Errorf ("task not found" )
}
sessionID := getSessionID (ctx )
if entry .sessionID != "" && sessionID != "" && entry .sessionID != sessionID {
s .tasksMu .RUnlock ()
return mcp .Task {}, nil , fmt .Errorf ("task not found" )
}
taskCopy := entry .task
done := entry .done
s .tasksMu .RUnlock ()
return taskCopy , done , nil
}
func (s *MCPServer ) getTaskEntry (ctx context .Context , taskID string ) (*taskEntry , error ) {
s .tasksMu .RLock ()
entry , exists := s .tasks [taskID ]
if !exists {
if _ , wasExpired := s .expiredTasks [taskID ]; wasExpired {
s .tasksMu .RUnlock ()
return nil , fmt .Errorf ("task has expired" )
}
s .tasksMu .RUnlock ()
return nil , fmt .Errorf ("task not found" )
}
s .tasksMu .RUnlock ()
sessionID := getSessionID (ctx )
if entry .sessionID != "" && sessionID != "" && entry .sessionID != sessionID {
return nil , fmt .Errorf ("task not found" )
}
return entry , nil
}
func (s *MCPServer ) listTasks (ctx context .Context ) []mcp .Task {
sessionID := getSessionID (ctx )
s .tasksMu .RLock ()
defer s .tasksMu .RUnlock ()
var tasks []mcp .Task
for _ , entry := range s .tasks {
if sessionID == "" || entry .sessionID == "" || entry .sessionID == sessionID {
tasks = append (tasks , entry .task )
}
}
return tasks
}
func (s *MCPServer ) completeTask (entry *taskEntry , result any , err error ) {
s .tasksMu .Lock ()
defer s .tasksMu .Unlock ()
if entry .completed {
return
}
completedAt := time .Now ()
duration := completedAt .Sub (entry .createdAt )
if err != nil {
entry .task .Status = mcp .TaskStatusFailed
entry .task .StatusMessage = err .Error()
entry .resultErr = err
} else {
entry .task .Status = mcp .TaskStatusCompleted
entry .result = result
}
entry .task .LastUpdatedAt = completedAt .UTC ().Format (time .RFC3339 )
entry .completed = true
close (entry .done )
s .activeTasks --
s .sendTaskStatusNotification (entry .task )
if s .taskHooks != nil {
metrics := TaskMetrics {
TaskID : entry .task .TaskId ,
ToolName : entry .toolName ,
Status : entry .task .Status ,
StatusMessage : entry .task .StatusMessage ,
CreatedAt : entry .createdAt ,
CompletedAt : &completedAt ,
Duration : duration ,
SessionID : entry .sessionID ,
Error : err ,
}
if err != nil {
s .taskHooks .taskFailed (context .Background (), metrics )
} else {
s .taskHooks .taskCompleted (context .Background (), metrics )
}
}
}
func (s *MCPServer ) cancelTask (ctx context .Context , taskID string ) error {
entry , err := s .getTaskEntry (ctx , taskID )
if err != nil {
return err
}
s .tasksMu .Lock ()
defer s .tasksMu .Unlock ()
if entry .completed {
return fmt .Errorf ("cannot cancel task in terminal status: %s" , entry .task .Status )
}
if entry .cancelFunc != nil {
entry .cancelFunc ()
}
cancelledAt := time .Now ()
duration := cancelledAt .Sub (entry .createdAt )
entry .task .Status = mcp .TaskStatusCancelled
entry .task .StatusMessage = "Task cancelled by request"
entry .task .LastUpdatedAt = cancelledAt .UTC ().Format (time .RFC3339 )
entry .completed = true
close (entry .done )
s .activeTasks --
s .sendTaskStatusNotification (entry .task )
if s .taskHooks != nil {
metrics := TaskMetrics {
TaskID : entry .task .TaskId ,
ToolName : entry .toolName ,
Status : entry .task .Status ,
StatusMessage : entry .task .StatusMessage ,
CreatedAt : entry .createdAt ,
CompletedAt : &cancelledAt ,
Duration : duration ,
SessionID : entry .sessionID ,
}
s .taskHooks .taskCancelled (ctx , metrics )
}
return nil
}
func (s *MCPServer ) scheduleTaskCleanup (taskID string , ttlMs int64 ) {
time .Sleep (time .Duration (ttlMs ) * time .Millisecond )
s .tasksMu .Lock ()
delete (s .tasks , taskID )
s .expiredTasks [taskID ] = time .Now ()
s .tasksMu .Unlock ()
go func () {
time .Sleep (5 * time .Minute )
s .tasksMu .Lock ()
delete (s .expiredTasks , taskID )
s .tasksMu .Unlock ()
}()
}
func (s *MCPServer ) sendTaskStatusNotification (task mcp .Task ) {
taskMap := map [string ]any {
"taskId" : task .TaskId ,
"status" : task .Status ,
"createdAt" : task .CreatedAt ,
"lastUpdatedAt" : task .LastUpdatedAt ,
}
if task .StatusMessage != "" {
taskMap ["statusMessage" ] = task .StatusMessage
}
if task .TTL != nil {
taskMap ["ttl" ] = *task .TTL
}
if task .PollInterval != nil {
taskMap ["pollInterval" ] = *task .PollInterval
}
s .SendNotificationToAllClients (mcp .MethodNotificationTasksStatus , taskMap )
}
func getSessionID(ctx context .Context ) string {
if session := ClientSessionFromContext (ctx ); session != nil {
return session .SessionID ()
}
return ""
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .