package server
import (
"context"
"fmt"
"maps"
"net/url"
"github.com/mark3labs/mcp-go/mcp"
)
type ClientSession interface {
Initialize ()
Initialized () bool
NotificationChannel () chan <- mcp .JSONRPCNotification
SessionID () string
}
type SessionWithLogging interface {
ClientSession
SetLogLevel (level mcp .LoggingLevel )
GetLogLevel () mcp .LoggingLevel
}
type SessionWithTools interface {
ClientSession
GetSessionTools () map [string ]ServerTool
SetSessionTools (tools map [string ]ServerTool )
}
type SessionWithResources interface {
ClientSession
GetSessionResources () map [string ]ServerResource
SetSessionResources (resources map [string ]ServerResource )
}
type SessionWithResourceTemplates interface {
ClientSession
GetSessionResourceTemplates () map [string ]ServerResourceTemplate
SetSessionResourceTemplates (templates map [string ]ServerResourceTemplate )
}
type SessionWithClientInfo interface {
ClientSession
GetClientInfo () mcp .Implementation
SetClientInfo (clientInfo mcp .Implementation )
GetClientCapabilities () mcp .ClientCapabilities
SetClientCapabilities (clientCapabilities mcp .ClientCapabilities )
}
type SessionWithElicitation interface {
ClientSession
RequestElicitation (ctx context .Context , request mcp .ElicitationRequest ) (*mcp .ElicitationResult , error )
}
type SessionWithRoots interface {
ClientSession
ListRoots (ctx context .Context , request mcp .ListRootsRequest ) (*mcp .ListRootsResult , error )
}
type SessionWithStreamableHTTPConfig interface {
ClientSession
UpgradeToSSEWhenReceiveNotification ()
}
type clientSessionKey struct {}
func ClientSessionFromContext (ctx context .Context ) ClientSession {
if session , ok := ctx .Value (clientSessionKey {}).(ClientSession ); ok {
return session
}
return nil
}
func (s *MCPServer ) WithContext (
ctx context .Context ,
session ClientSession ,
) context .Context {
return context .WithValue (ctx , clientSessionKey {}, session )
}
func (s *MCPServer ) RegisterSession (
ctx context .Context ,
session ClientSession ,
) error {
sessionID := session .SessionID ()
if _ , exists := s .sessions .LoadOrStore (sessionID , session ); exists {
return ErrSessionExists
}
s .hooks .RegisterSession (ctx , session )
return nil
}
func (s *MCPServer ) buildLogNotification (notification mcp .LoggingMessageNotification ) mcp .JSONRPCNotification {
return mcp .JSONRPCNotification {
JSONRPC : mcp .JSONRPC_VERSION ,
Notification : mcp .Notification {
Method : notification .Method ,
Params : mcp .NotificationParams {
AdditionalFields : map [string ]any {
"level" : notification .Params .Level ,
"logger" : notification .Params .Logger ,
"data" : notification .Params .Data ,
},
},
},
}
}
func (s *MCPServer ) SendLogMessageToClient (ctx context .Context , notification mcp .LoggingMessageNotification ) error {
session := ClientSessionFromContext (ctx )
if session == nil || !session .Initialized () {
return ErrNotificationNotInitialized
}
sessionLogging , ok := session .(SessionWithLogging )
if !ok {
return ErrSessionDoesNotSupportLogging
}
if !notification .Params .Level .ShouldSendTo (sessionLogging .GetLogLevel ()) {
return nil
}
return s .sendNotificationCore (ctx , session , s .buildLogNotification (notification ))
}
func (s *MCPServer ) sendNotificationToAllClients (notification mcp .JSONRPCNotification ) {
s .sessions .Range (func (k , v any ) bool {
if session , ok := v .(ClientSession ); ok && session .Initialized () {
if sessionWithStreamableHTTPConfig , ok := session .(SessionWithStreamableHTTPConfig ); ok {
sessionWithStreamableHTTPConfig .UpgradeToSSEWhenReceiveNotification ()
}
select {
case session .NotificationChannel () <- notification :
default :
if s .hooks != nil && len (s .hooks .OnError ) > 0 {
err := ErrNotificationChannelBlocked
hooks := s .hooks
go func (sessionID string , hooks *Hooks ) {
ctx := context .Background ()
hooks .onError (ctx , nil , "notification" , map [string ]any {
"method" : notification .Method ,
"sessionID" : sessionID ,
}, fmt .Errorf ("notification channel blocked for session %s: %w" , sessionID , err ))
}(session .SessionID (), hooks )
}
}
}
return true
})
}
func (s *MCPServer ) sendNotificationToSpecificClient (session ClientSession , notification mcp .JSONRPCNotification ) error {
if sessionWithStreamableHTTPConfig , ok := session .(SessionWithStreamableHTTPConfig ); ok {
sessionWithStreamableHTTPConfig .UpgradeToSSEWhenReceiveNotification ()
}
select {
case session .NotificationChannel () <- notification :
return nil
default :
if s .hooks != nil && len (s .hooks .OnError ) > 0 {
err := ErrNotificationChannelBlocked
ctx := context .Background ()
hooks := s .hooks
go func (sID string , hooks *Hooks ) {
hooks .onError (ctx , nil , "notification" , map [string ]any {
"method" : notification .Method ,
"sessionID" : sID ,
}, fmt .Errorf ("notification channel blocked for session %s: %w" , sID , err ))
}(session .SessionID (), hooks )
}
return ErrNotificationChannelBlocked
}
}
func (s *MCPServer ) SendLogMessageToSpecificClient (sessionID string , notification mcp .LoggingMessageNotification ) error {
sessionValue , ok := s .sessions .Load (sessionID )
if !ok {
return ErrSessionNotFound
}
session , ok := sessionValue .(ClientSession )
if !ok || !session .Initialized () {
return ErrSessionNotInitialized
}
sessionLogging , ok := session .(SessionWithLogging )
if !ok {
return ErrSessionDoesNotSupportLogging
}
if !notification .Params .Level .ShouldSendTo (sessionLogging .GetLogLevel ()) {
return nil
}
return s .sendNotificationToSpecificClient (session , s .buildLogNotification (notification ))
}
func (s *MCPServer ) UnregisterSession (
ctx context .Context ,
sessionID string ,
) {
sessionValue , ok := s .sessions .LoadAndDelete (sessionID )
if !ok {
return
}
if session , ok := sessionValue .(ClientSession ); ok {
s .hooks .UnregisterSession (ctx , session )
}
}
func (s *MCPServer ) SendNotificationToAllClients (
method string ,
params map [string ]any ,
) {
notification := mcp .JSONRPCNotification {
JSONRPC : mcp .JSONRPC_VERSION ,
Notification : mcp .Notification {
Method : method ,
Params : mcp .NotificationParams {
AdditionalFields : params ,
},
},
}
s .sendNotificationToAllClients (notification )
}
func (s *MCPServer ) sendNotificationCore (
ctx context .Context ,
session ClientSession ,
notification mcp .JSONRPCNotification ,
) error {
if sessionWithStreamableHTTPConfig , ok := session .(SessionWithStreamableHTTPConfig ); ok {
sessionWithStreamableHTTPConfig .UpgradeToSSEWhenReceiveNotification ()
}
select {
case session .NotificationChannel () <- notification :
return nil
default :
if s .hooks != nil && len (s .hooks .OnError ) > 0 {
method := notification .Method
err := ErrNotificationChannelBlocked
hooks := s .hooks
go func (sessionID string , hooks *Hooks ) {
hooks .onError (ctx , nil , "notification" , map [string ]any {
"method" : method ,
"sessionID" : sessionID ,
}, fmt .Errorf ("notification channel blocked for session %s: %w" , sessionID , err ))
}(session .SessionID (), hooks )
}
return ErrNotificationChannelBlocked
}
}
func (s *MCPServer ) SendNotificationToClient (
ctx context .Context ,
method string ,
params map [string ]any ,
) error {
session := ClientSessionFromContext (ctx )
if session == nil || !session .Initialized () {
return ErrNotificationNotInitialized
}
notification := mcp .JSONRPCNotification {
JSONRPC : mcp .JSONRPC_VERSION ,
Notification : mcp .Notification {
Method : method ,
Params : mcp .NotificationParams {
AdditionalFields : params ,
},
},
}
return s .sendNotificationCore (ctx , session , notification )
}
func (s *MCPServer ) SendNotificationToSpecificClient (
sessionID string ,
method string ,
params map [string ]any ,
) error {
sessionValue , ok := s .sessions .Load (sessionID )
if !ok {
return ErrSessionNotFound
}
session , ok := sessionValue .(ClientSession )
if !ok || !session .Initialized () {
return ErrSessionNotInitialized
}
notification := mcp .JSONRPCNotification {
JSONRPC : mcp .JSONRPC_VERSION ,
Notification : mcp .Notification {
Method : method ,
Params : mcp .NotificationParams {
AdditionalFields : params ,
},
},
}
return s .sendNotificationToSpecificClient (session , notification )
}
func (s *MCPServer ) AddSessionTool (sessionID string , tool mcp .Tool , handler ToolHandlerFunc ) error {
return s .AddSessionTools (sessionID , ServerTool {Tool : tool , Handler : handler })
}
func (s *MCPServer ) AddSessionTools (sessionID string , tools ...ServerTool ) error {
sessionValue , ok := s .sessions .Load (sessionID )
if !ok {
return ErrSessionNotFound
}
session , ok := sessionValue .(SessionWithTools )
if !ok {
return ErrSessionDoesNotSupportTools
}
s .implicitlyRegisterToolCapabilities ()
sessionTools := session .GetSessionTools ()
newSessionTools := make (map [string ]ServerTool , len (sessionTools )+len (tools ))
maps .Copy (newSessionTools , sessionTools )
for _ , tool := range tools {
newSessionTools [tool .Tool .Name ] = tool
}
session .SetSessionTools (newSessionTools )
if session .Initialized () && s .capabilities .tools != nil && s .capabilities .tools .listChanged {
if err := s .SendNotificationToSpecificClient (sessionID , "notifications/tools/list_changed" , nil ); err != nil {
if s .hooks != nil && len (s .hooks .OnError ) > 0 {
hooks := s .hooks
go func (sID string , hooks *Hooks ) {
ctx := context .Background ()
hooks .onError (ctx , nil , "notification" , map [string ]any {
"method" : "notifications/tools/list_changed" ,
"sessionID" : sID ,
}, fmt .Errorf ("failed to send notification after adding tools: %w" , err ))
}(sessionID , hooks )
}
}
}
return nil
}
func (s *MCPServer ) DeleteSessionTools (sessionID string , names ...string ) error {
sessionValue , ok := s .sessions .Load (sessionID )
if !ok {
return ErrSessionNotFound
}
session , ok := sessionValue .(SessionWithTools )
if !ok {
return ErrSessionDoesNotSupportTools
}
sessionTools := session .GetSessionTools ()
if sessionTools == nil {
return nil
}
newSessionTools := make (map [string ]ServerTool , len (sessionTools ))
maps .Copy (newSessionTools , sessionTools )
for _ , name := range names {
delete (newSessionTools , name )
}
session .SetSessionTools (newSessionTools )
if session .Initialized () && s .capabilities .tools != nil && s .capabilities .tools .listChanged {
if err := s .SendNotificationToSpecificClient (sessionID , "notifications/tools/list_changed" , nil ); err != nil {
if s .hooks != nil && len (s .hooks .OnError ) > 0 {
hooks := s .hooks
go func (sID string , hooks *Hooks ) {
ctx := context .Background ()
hooks .onError (ctx , nil , "notification" , map [string ]any {
"method" : "notifications/tools/list_changed" ,
"sessionID" : sID ,
}, fmt .Errorf ("failed to send notification after deleting tools: %w" , err ))
}(sessionID , hooks )
}
}
}
return nil
}
func (s *MCPServer ) AddSessionResource (sessionID string , resource mcp .Resource , handler ResourceHandlerFunc ) error {
return s .AddSessionResources (sessionID , ServerResource {Resource : resource , Handler : handler })
}
func (s *MCPServer ) AddSessionResources (sessionID string , resources ...ServerResource ) error {
sessionValue , ok := s .sessions .Load (sessionID )
if !ok {
return ErrSessionNotFound
}
session , ok := sessionValue .(SessionWithResources )
if !ok {
return ErrSessionDoesNotSupportResources
}
s .implicitlyRegisterCapabilities (
func () bool { return s .capabilities .resources != nil },
func () { s .capabilities .resources = &resourceCapabilities {listChanged : true } },
)
sessionResources := session .GetSessionResources ()
newSessionResources := make (map [string ]ServerResource , len (sessionResources )+len (resources ))
maps .Copy (newSessionResources , sessionResources )
for _ , resource := range resources {
if resource .Resource .URI == "" {
return fmt .Errorf ("resource URI cannot be empty" )
}
if _ , err := url .ParseRequestURI (resource .Resource .URI ); err != nil {
return fmt .Errorf ("invalid resource URI: %w" , err )
}
newSessionResources [resource .Resource .URI ] = resource
}
session .SetSessionResources (newSessionResources )
if session .Initialized () && s .capabilities .resources != nil && s .capabilities .resources .listChanged {
if err := s .SendNotificationToSpecificClient (sessionID , "notifications/resources/list_changed" , nil ); err != nil {
if s .hooks != nil && len (s .hooks .OnError ) > 0 {
hooks := s .hooks
go func (sID string , hooks *Hooks ) {
ctx := context .Background ()
hooks .onError (ctx , nil , "notification" , map [string ]any {
"method" : "notifications/resources/list_changed" ,
"sessionID" : sID ,
}, fmt .Errorf ("failed to send notification after adding resources: %w" , err ))
}(sessionID , hooks )
}
}
}
return nil
}
func (s *MCPServer ) DeleteSessionResources (sessionID string , uris ...string ) error {
sessionValue , ok := s .sessions .Load (sessionID )
if !ok {
return ErrSessionNotFound
}
session , ok := sessionValue .(SessionWithResources )
if !ok {
return ErrSessionDoesNotSupportResources
}
sessionResources := session .GetSessionResources ()
if sessionResources == nil {
return nil
}
newSessionResources := make (map [string ]ServerResource , len (sessionResources ))
maps .Copy (newSessionResources , sessionResources )
actuallyDeleted := false
for _ , uri := range uris {
if _ , exists := newSessionResources [uri ]; exists {
delete (newSessionResources , uri )
actuallyDeleted = true
}
}
if !actuallyDeleted {
return nil
}
session .SetSessionResources (newSessionResources )
if actuallyDeleted && session .Initialized () && s .capabilities .resources != nil && s .capabilities .resources .listChanged {
if err := s .SendNotificationToSpecificClient (sessionID , "notifications/resources/list_changed" , nil ); err != nil {
if s .hooks != nil && len (s .hooks .OnError ) > 0 {
hooks := s .hooks
go func (sID string , hooks *Hooks ) {
ctx := context .Background ()
hooks .onError (ctx , nil , "notification" , map [string ]any {
"method" : "notifications/resources/list_changed" ,
"sessionID" : sID ,
}, fmt .Errorf ("failed to send notification after deleting resources: %w" , err ))
}(sessionID , hooks )
}
}
}
return nil
}
func (s *MCPServer ) AddSessionResourceTemplate (sessionID string , template mcp .ResourceTemplate , handler ResourceTemplateHandlerFunc ) error {
return s .AddSessionResourceTemplates (sessionID , ServerResourceTemplate {
Template : template ,
Handler : handler ,
})
}
func (s *MCPServer ) AddSessionResourceTemplates (sessionID string , templates ...ServerResourceTemplate ) error {
sessionValue , ok := s .sessions .Load (sessionID )
if !ok {
return ErrSessionNotFound
}
session , ok := sessionValue .(SessionWithResourceTemplates )
if !ok {
return ErrSessionDoesNotSupportResourceTemplates
}
s .implicitlyRegisterCapabilities (
func () bool { return s .capabilities .resources != nil },
func () { s .capabilities .resources = &resourceCapabilities {listChanged : true } },
)
sessionTemplates := session .GetSessionResourceTemplates ()
newTemplates := make (map [string ]ServerResourceTemplate , len (sessionTemplates )+len (templates ))
maps .Copy (newTemplates , sessionTemplates )
for _ , t := range templates {
if t .Template .URITemplate == nil {
return fmt .Errorf ("resource template URITemplate cannot be nil" )
}
raw := t .Template .URITemplate .Raw ()
if raw == "" {
return fmt .Errorf ("resource template URITemplate cannot be empty" )
}
if t .Template .Name == "" {
return fmt .Errorf ("resource template name cannot be empty" )
}
newTemplates [raw ] = t
}
session .SetSessionResourceTemplates (newTemplates )
if session .Initialized () && s .capabilities .resources != nil && s .capabilities .resources .listChanged {
if err := s .SendNotificationToSpecificClient (sessionID , "notifications/resources/list_changed" , nil ); err != nil {
if s .hooks != nil && len (s .hooks .OnError ) > 0 {
hooks := s .hooks
go func (sID string , hooks *Hooks ) {
ctx := context .Background ()
hooks .onError (ctx , nil , "notification" , map [string ]any {
"method" : "notifications/resources/list_changed" ,
"sessionID" : sID ,
}, fmt .Errorf ("failed to send notification after adding resource templates: %w" , err ))
}(sessionID , hooks )
}
}
}
return nil
}
func (s *MCPServer ) DeleteSessionResourceTemplates (sessionID string , uriTemplates ...string ) error {
sessionValue , ok := s .sessions .Load (sessionID )
if !ok {
return ErrSessionNotFound
}
session , ok := sessionValue .(SessionWithResourceTemplates )
if !ok {
return ErrSessionDoesNotSupportResourceTemplates
}
sessionTemplates := session .GetSessionResourceTemplates ()
deletedAny := false
newTemplates := make (map [string ]ServerResourceTemplate , len (sessionTemplates ))
maps .Copy (newTemplates , sessionTemplates )
for _ , uriTemplate := range uriTemplates {
if _ , exists := newTemplates [uriTemplate ]; exists {
delete (newTemplates , uriTemplate )
deletedAny = true
}
}
if deletedAny {
session .SetSessionResourceTemplates (newTemplates )
if session .Initialized () && s .capabilities .resources != nil && s .capabilities .resources .listChanged {
if err := s .SendNotificationToSpecificClient (sessionID , "notifications/resources/list_changed" , nil ); err != nil {
if s .hooks != nil && len (s .hooks .OnError ) > 0 {
hooks := s .hooks
go func (sID string , hooks *Hooks ) {
ctx := context .Background ()
hooks .onError (ctx , nil , "notification" , map [string ]any {
"method" : "notifications/resources/list_changed" ,
"sessionID" : sID ,
}, fmt .Errorf ("failed to send notification after deleting resource templates: %w" , err ))
}(sessionID , hooks )
}
}
}
}
return nil
}
The pages are generated with Golds v0.8.4 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .