package server

import (
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	

	
	
	
)

// StreamableHTTPOption defines a function type for configuring StreamableHTTPServer
type StreamableHTTPOption func(*StreamableHTTPServer)

// WithEndpointPath sets the endpoint path for the server.
// The default is "/mcp".
// It's only works for `Start` method. When used as a http.Handler, it has no effect.
func ( string) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		// Normalize the endpoint path to ensure it starts with a slash and doesn't end with one
		 := "/" + strings.Trim(, "/")
		.endpointPath = 
	}
}

// WithStateLess sets the server to stateless mode.
// If true, the server will manage no session information. Every request will be treated
// as a new session. No session id returned to the client.
// The default is false.
//
// Note: This is a convenience method. It's identical to set WithSessionIdManager option
// to StatelessSessionIdManager.
func ( bool) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		if  {
			.sessionIdManagerResolver = NewDefaultSessionIdManagerResolver(&StatelessSessionIdManager{})
		}
	}
}

// WithSessionIdManager sets a custom session id generator for the server.
// By default, the server uses StatelessGeneratingSessionIdManager (generates IDs but no local validation).
// Note: Options are applied in order; the last one wins. If combined with
// WithStateLess or WithSessionIdManagerResolver, whichever is applied last takes effect.
func ( SessionIdManager) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		if  == nil {
			.sessionIdManagerResolver = NewDefaultSessionIdManagerResolver(&StatelessSessionIdManager{})
			return
		}
		.sessionIdManagerResolver = NewDefaultSessionIdManagerResolver()
	}
}

// WithSessionIdManagerResolver sets a custom session id manager resolver for the server.
// This allows for request-based session id management strategies.
// Note: Options are applied in order; the last one wins. If combined with
// WithStateLess or WithSessionIdManager, whichever is applied last takes effect.
func ( SessionIdManagerResolver) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		if  == nil {
			.sessionIdManagerResolver = NewDefaultSessionIdManagerResolver(&StatelessSessionIdManager{})
			return
		}
		.sessionIdManagerResolver = 
	}
}

// WithStateful enables stateful session management using InsecureStatefulSessionIdManager.
// This requires sticky sessions in multi-instance deployments.
func ( bool) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		if  {
			.sessionIdManagerResolver = NewDefaultSessionIdManagerResolver(&InsecureStatefulSessionIdManager{})
		}
	}
}

// WithHeartbeatInterval sets the heartbeat interval. Positive interval means the
// server will send a heartbeat to the client through the GET connection, to keep
// the connection alive from being closed by the network infrastructure (e.g.
// gateways). If the client does not establish a GET connection, it has no
// effect. The default is not to send heartbeats.
func ( time.Duration) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		.listenHeartbeatInterval = 
	}
}

// WithDisableStreaming prevents the server from responding to GET requests with
// a streaming response. Instead, it will respond with a 405 Method Not Allowed status.
// This can be useful in scenarios where streaming is not desired or supported.
// The default is false, meaning streaming is enabled.
func ( bool) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		.disableStreaming = 
	}
}

// WithHTTPContextFunc sets a function that will be called to customise the context
// to the server using the incoming request.
// This can be used to inject context values from headers, for example.
func ( HTTPContextFunc) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		.contextFunc = 
	}
}

// WithStreamableHTTPServer sets the HTTP server instance for StreamableHTTPServer.
// NOTE: When providing a custom HTTP server, you must handle routing yourself
// If routing is not set up, the server will start but won't handle any MCP requests.
func ( *http.Server) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		.httpServer = 
	}
}

// WithLogger sets the logger for the server
func ( util.Logger) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		.logger = 
	}
}

// WithTLSCert sets the TLS certificate and key files for HTTPS support.
// Both certFile and keyFile must be provided to enable TLS.
func (,  string) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		.tlsCertFile = 
		.tlsKeyFile = 
	}
}

// WithSessionIdleTTL sets the idle TTL for per-session transport state.
// When enabled, a background sweeper periodically removes entries from
// per-session stores (tools, resources, resource templates, log levels,
// request IDs) for sessions that have been idle longer than the given
// duration. This prevents memory leaks when clients disconnect without
// sending a DELETE request. A zero or negative value disables the sweeper
// (the default).
func ( time.Duration) StreamableHTTPOption {
	return func( *StreamableHTTPServer) {
		.sessionIdleTTL = 
	}
}

// StreamableHTTPServer implements a Streamable-http based MCP server.
// It communicates with clients over HTTP protocol, supporting both direct HTTP responses, and SSE streams.
// https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http
//
// Usage:
//
//	server := NewStreamableHTTPServer(mcpServer)
//	server.Start(":8080") // The final url for client is http://xxxx:8080/mcp by default
//
// or the server itself can be used as a http.Handler, which is convenient to
// integrate with existing http servers, or advanced usage:
//
//	handler := NewStreamableHTTPServer(mcpServer)
//	http.Handle("/streamable-http", handler)
//	http.ListenAndServe(":8080", nil)
//
// Notice:
// Except for the GET handlers(listening), the POST handlers(request/notification) will
// not trigger the session registration. So the methods like `SendNotificationToSpecificClient`
// or `hooks.onRegisterSession` will not be triggered for POST messages.
//
// The current implementation does not support the following features from the specification:
//   - Stream Resumability
type StreamableHTTPServer struct {
	server                   *MCPServer
	sessionTools             *sessionToolsStore
	sessionResources         *sessionResourcesStore
	sessionResourceTemplates *sessionResourceTemplatesStore
	sessionRequestIDs        sync.Map // sessionId --> last requestID(*atomic.Int64)
	activeSessions           sync.Map // sessionId --> *streamableHttpSession (for sampling responses)

	httpServer *http.Server
	mu         sync.RWMutex

	endpointPath             string
	contextFunc              HTTPContextFunc
	sessionIdManagerResolver SessionIdManagerResolver
	sessionIdManager         SessionIdManager // for non-request contexts (sweeper)
	listenHeartbeatInterval  time.Duration
	logger                   util.Logger
	sessionLogLevels         *sessionLogLevelsStore
	disableStreaming         bool

	tlsCertFile string
	tlsKeyFile  string

	sessionIdleTTL    time.Duration
	sessionLastActive sync.Map // sessionID → *atomic.Int64 (unix nanos)
	sweeperCancel     context.CancelFunc
}

// NewStreamableHTTPServer creates a new streamable-http server instance
func ( *MCPServer,  ...StreamableHTTPOption) *StreamableHTTPServer {
	 := &StreamableHTTPServer{
		server:                   ,
		sessionTools:             newSessionToolsStore(),
		sessionLogLevels:         newSessionLogLevelsStore(),
		endpointPath:             "/mcp",
		sessionIdManagerResolver: NewDefaultSessionIdManagerResolver(&StatelessGeneratingSessionIdManager{}),
		logger:                   util.DefaultLogger(),
		sessionResources:         newSessionResourcesStore(),
		sessionResourceTemplates: newSessionResourceTemplatesStore(),
	}

	// Apply all options
	for ,  := range  {
		()
	}

	// Cache the session ID manager for use in non-request contexts (sweeper).
	// DefaultSessionIdManagerResolver always returns the same manager,
	// so resolving it once at startup is semantically identical.
	if ,  := .sessionIdManagerResolver.(*DefaultSessionIdManagerResolver);  {
		.sessionIdManager = .manager
	}

	if .sessionIdleTTL > 0 {
		,  := context.WithCancel(context.Background())
		.sweeperCancel = 
		.startSessionSweeper()
	}

	return 
}

// ServeHTTP implements the http.Handler interface.
func ( *StreamableHTTPServer) ( http.ResponseWriter,  *http.Request) {
	switch .Method {
	case http.MethodPost:
		.handlePost(, )
	case http.MethodGet:
		.handleGet(, )
	case http.MethodDelete:
		.handleDelete(, )
	default:
		http.NotFound(, )
	}
}

// Start begins serving the http server on the specified address and path
// (endpointPath). like:
//
//	s.Start(":8080")
func ( *StreamableHTTPServer) ( string) error {
	.mu.Lock()
	if .httpServer == nil {
		 := http.NewServeMux()
		.Handle(.endpointPath, )
		.httpServer = &http.Server{
			Addr:    ,
			Handler: ,
		}
	} else {
		if .httpServer.Addr == "" {
			.httpServer.Addr = 
		} else if .httpServer.Addr !=  {
			return fmt.Errorf("conflicting listen address: WithStreamableHTTPServer(%q) vs Start(%q)", .httpServer.Addr, )
		}
	}
	 := .httpServer
	.mu.Unlock()

	if .tlsCertFile != "" || .tlsKeyFile != "" {
		if .tlsCertFile == "" || .tlsKeyFile == "" {
			return fmt.Errorf("both TLS cert and key must be provided")
		}
		if ,  := os.Stat(.tlsCertFile);  != nil {
			return fmt.Errorf("failed to find TLS certificate file: %w", )
		}
		if ,  := os.Stat(.tlsKeyFile);  != nil {
			return fmt.Errorf("failed to find TLS key file: %w", )
		}
		return .ListenAndServeTLS(.tlsCertFile, .tlsKeyFile)
	}

	return .ListenAndServe()
}

// Shutdown gracefully stops the server, closing all active sessions
// and shutting down the HTTP server.
func ( *StreamableHTTPServer) ( context.Context) error {
	if .sweeperCancel != nil {
		.sweeperCancel()
	}

	// shutdown the server if needed (may use as a http.Handler)
	.mu.RLock()
	 := .httpServer
	.mu.RUnlock()
	if  != nil {
		return .Shutdown()
	}
	return nil
}

// --- internal methods ---

func ( *StreamableHTTPServer) ( http.ResponseWriter,  *http.Request) {
	// post request carry request/notification message

	// Check content type
	 := .Header.Get("Content-Type")
	, ,  := mime.ParseMediaType()
	if  != nil ||  != "application/json" {
		http.Error(, "Invalid content type: must be 'application/json'", http.StatusBadRequest)
		return
	}

	// Check the request body is valid json, meanwhile, get the request Method
	,  := io.ReadAll(.Body)
	if  != nil {
		.writeJSONRPCError(, nil, mcp.PARSE_ERROR, fmt.Sprintf("read request body error: %v", ))
		return
	}
	// First, try to parse as a response (sampling responses don't have a method field)
	var  struct {
		     json.RawMessage `json:"id"`
		 json.RawMessage `json:"result,omitempty"`
		  json.RawMessage `json:"error,omitempty"`
		 mcp.MCPMethod   `json:"method,omitempty"`
	}
	if  := json.Unmarshal(, &);  != nil {
		.writeJSONRPCError(, nil, mcp.PARSE_ERROR, "request body is not valid json")
		return
	}

	// detect empty ping response, skip session ID validation
	 := . == "" && . != nil &&
		(isJSONEmpty(.) && isJSONEmpty(.))
	 := . == "" && . != nil &&
		isExplicitEmptyObject(.) && len(bytes.TrimSpace(.)) == 0

	if  {
		.WriteHeader(http.StatusAccepted)
		return
	}
	if  {
		return
	}

	// Check if this is a sampling response (has result/error but no method)
	 := . == "" && . != nil &&
		(. != nil || . != nil)

	 := . == mcp.MethodInitialize

	// Handle sampling responses separately
	if  {
		if  := .handleSamplingResponse(, , );  != nil {
			.logger.Errorf("Failed to handle sampling response: %v", )
			http.Error(, "Failed to handle sampling response", http.StatusInternalServerError)
		}
		return
	}

	// Prepare the session for the mcp server
	// The session is ephemeral. Its life is the same as the request. It's only created
	// for interaction with the mcp server.
	var  string
	 := .sessionIdManagerResolver.ResolveSessionIdManager()
	if  {
		// generate a new one for initialize request
		 = .Generate()
	} else {
		// Get session ID from header.
		// Stateful servers need the client to carry the session ID.
		 = .Header.Get(HeaderKeySessionID)
		,  := .Validate()
		if  != nil {
			http.Error(, "Invalid session ID", http.StatusNotFound)
			return
		}
		if  {
			http.Error(, "Session terminated", http.StatusNotFound)
			return
		}
	}

	.touchSession()

	// For non-initialize requests, try to reuse existing registered session
	var  *streamableHttpSession
	if ! {
		if ,  := .server.sessions.Load();  {
			if ,  := .(*streamableHttpSession);  {
				 = 
			}
		}
	}

	// Check if a persistent session exists (for sampling support), otherwise create ephemeral session
	// Persistent sessions are created by GET (continuous listening) connections
	if  == nil {
		if ,  := .activeSessions.Load();  {
			if ,  := .(*streamableHttpSession);  {
				 = 
			}
		}
	}

	// Create ephemeral session if no persistent session exists
	if  == nil {
		 = newStreamableHttpSession(, .sessionTools, .sessionResources, .sessionResourceTemplates, .sessionLogLevels)
	}

	// Set the client context before handling the message
	 := .server.WithContext(.Context(), )
	if .contextFunc != nil {
		 = .contextFunc(, )
	}

	// handle potential notifications
	 := sync.Mutex{}
	 := false
	 := make(chan struct{})

	 = context.WithValue(, requestHeader, .Header)
	go func() {
		for {
			select {
			case  := <-.notificationChannel:
				func() {
					.Lock()
					defer .Unlock()
					// if the done chan is closed, as the request is terminated, just return
					select {
					case <-:
						return
					default:
					}
					defer func() {
						,  := .(http.Flusher)
						if  {
							.Flush()
						}
					}()

					// if there's notifications, upgradedHeader to SSE response
					if ! {
						.Header().Set("Content-Type", "text/event-stream")
						.Header().Set("Connection", "keep-alive")
						.Header().Set("Cache-Control", "no-cache")
						.WriteHeader(http.StatusOK)
						 = true
					}
					 := writeSSEEvent(, )
					if  != nil {
						.logger.Errorf("Failed to write SSE event: %v", )
						return
					}
				}()
			case <-:
				return
			case <-.Done():
				return
			}
		}
	}()

	// Process message through MCPServer
	 := .server.HandleMessage(, )
	if  == nil {
		.Lock()
		close()
		if ! {
			.Unlock()
			.WriteHeader(http.StatusAccepted)
		} else {
			.Unlock()
		}
		return
	}

	// Write response
	.Lock()

:
	for {
		select {
		case  := <-.notificationChannel:
			if ! {
				.Header().Set("Content-Type", "text/event-stream")
				.Header().Set("Connection", "keep-alive")
				.Header().Set("Cache-Control", "no-cache")
				.WriteHeader(http.StatusOK)
				 = true
			}
			if  := writeSSEEvent(, );  != nil {
				.logger.Errorf("Failed to write SSE event during drain: %v", )
			}
			if ,  := .(http.Flusher);  {
				.Flush()
			}
		default:
			break 
		}
	}

	// close the done chan before unlocking to signal the goroutine to stop
	close()
	.Unlock()
	if .Err() != nil {
		return
	}
	// If client-server communication already upgraded to SSE stream
	// Also check upgradedHeader: a notification during HandleMessage processing
	// may have already written SSE headers on this response, so we must continue
	// in SSE mode to avoid writing JSON on top of SSE data.
	if .upgradeToSSE.Load() ||  {
		if ! {
			.Header().Set("Content-Type", "text/event-stream")
			.Header().Set("Connection", "keep-alive")
			.Header().Set("Cache-Control", "no-cache")
			.WriteHeader(http.StatusOK)
			 = true
		}
		if  := writeSSEEvent(, );  != nil {
			.logger.Errorf("Failed to write final SSE response event: %v", )
		}
	} else {
		.Header().Set("Content-Type", "application/json")
		if  &&  != "" {
			// send the session ID back to the client
			.Header().Set(HeaderKeySessionID, )
		}
		.WriteHeader(http.StatusOK)
		 := json.NewEncoder().Encode()
		if  != nil {
			.logger.Errorf("Failed to write response: %v", )
		}
	}

	// Register session after successful initialization
	// Only register if not already registered (e.g., by a GET connection)
	if  &&  != "" {
		if ,  := .server.sessions.Load(); ! {
			// Store in activeSessions to prevent duplicate registration from GET
			.activeSessions.Store(, )
			// Register the session with the MCPServer for notification support
			if  := .server.RegisterSession(, );  != nil {
				.logger.Errorf("Failed to register POST session: %v", )
				.activeSessions.Delete()
				// Don't fail the request, just log the error
			}
		}
	}
}

func ( *StreamableHTTPServer) ( http.ResponseWriter,  *http.Request) {
	// get request is for listening to notifications
	// https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#listening-for-messages-from-the-server
	if .disableStreaming {
		.logger.Infof("Rejected GET request: streaming is disabled (session: %s)", .Header.Get(HeaderKeySessionID))
		http.Error(, "Streaming is disabled on this server", http.StatusMethodNotAllowed)
		return
	}

	// Check streaming support in the responseWriter. This can happen if the responseWriter has been overridden.
	// If not supported, return 405 early.
	,  := .(http.Flusher)
	if ! {
		http.Error(, "Streaming unsupported", http.StatusMethodNotAllowed)
		return
	}

	 := .Header.Get(HeaderKeySessionID)
	// The MCP specification doesn't require validating session ID for GET requests.
	// If no session ID is provided by the client, generate one using the configured SessionIdManager
	// so that custom session id generators are honored consistently across POST/GET flows.
	if  == "" {
		 := .sessionIdManagerResolver.ResolveSessionIdManager()
		 = .Generate()
	}

	// Get or create session atomically to prevent TOCTOU races
	// where concurrent GETs could both create and register duplicate sessions
	var  *streamableHttpSession
	 := newStreamableHttpSession(, .sessionTools, .sessionResources, .sessionResourceTemplates, .sessionLogLevels)
	,  := .activeSessions.LoadOrStore(, )
	 = .(*streamableHttpSession)

	if ! {
		// We created a new session, need to register it
		if  := .server.RegisterSession(.Context(), );  != nil {
			.activeSessions.Delete()
			http.Error(, fmt.Sprintf("Session registration failed: %v", ), http.StatusBadRequest)
			return
		}
		defer .server.UnregisterSession(.Context(), )
		defer .activeSessions.Delete()
	}

	.touchSession()

	// Set the client context before handling the message
	.Header().Set("Content-Type", "text/event-stream")
	.Header().Set("Cache-Control", "no-cache")
	.Header().Set("Connection", "keep-alive")
	.WriteHeader(http.StatusOK)

	.Flush()

	// Start notification handler for this session
	 := make(chan struct{})
	defer close()
	 := make(chan any, 16)

	go func() {
		for {
			select {
			case  := <-.notificationChannel:
				select {
				case  <- &:
				case <-:
					return
				}
			case  := <-.samplingRequestChan:
				// Send sampling request to client via SSE
				 := mcp.JSONRPCRequest{
					JSONRPC: "2.0",
					ID:      mcp.NewRequestId(.requestID),
					Request: mcp.Request{
						Method: string(mcp.MethodSamplingCreateMessage),
					},
					Params: .request.CreateMessageParams,
				}
				select {
				case  <- :
				case <-:
					return
				}
			case  := <-.elicitationRequestChan:
				// Send elicitation request to client via SSE
				 := mcp.JSONRPCRequest{
					JSONRPC: "2.0",
					ID:      mcp.NewRequestId(.requestID),
					Request: mcp.Request{
						Method: string(mcp.MethodElicitationCreate),
					},
					Params: .request.Params,
				}
				select {
				case  <- :
				case <-:
					return
				}
			case  := <-.rootsRequestChan:
				// Send list roots request to client via SSE
				 := mcp.JSONRPCRequest{
					JSONRPC: "2.0",
					ID:      mcp.NewRequestId(.requestID),
					Request: mcp.Request{
						Method: string(mcp.MethodListRoots),
					},
				}
				select {
				case  <- :
				case <-:
					return
				}
			case <-:
				return
			}
		}
	}()

	if .listenHeartbeatInterval > 0 {
		// heartbeat to keep the connection alive
		go func() {
			 := time.NewTicker(.listenHeartbeatInterval)
			defer .Stop()
			for {
				select {
				case <-.C:
					 := mcp.JSONRPCRequest{
						JSONRPC: "2.0",
						ID:      mcp.NewRequestId(.nextRequestID()),
						Request: mcp.Request{
							Method: "ping",
						},
					}
					select {
					case  <- :
					case <-:
						return
					}
				case <-:
					return
				}
			}
		}()
	}

	// Keep the connection open until the client disconnects
	//
	// There's will a Available() check when handler ends, and it maybe race with Flush(),
	// so we use a separate channel to send the data, inteading of flushing directly in other goroutine.
	for {
		select {
		case  := <-:
			if  == nil {
				continue
			}
			if  := writeSSEEvent(, );  != nil {
				.logger.Errorf("Failed to write SSE event: %v", )
				return
			}
			.Flush()
			.touchSession()
		case <-.Context().Done():
			return
		}
	}
}

func ( *StreamableHTTPServer) ( http.ResponseWriter,  *http.Request) {
	// delete request terminate the session
	 := .Header.Get(HeaderKeySessionID)
	 := .sessionIdManagerResolver.ResolveSessionIdManager()
	,  := .Terminate()
	if  != nil {
		http.Error(, fmt.Sprintf("Session termination failed: %v", ), http.StatusInternalServerError)
		return
	}
	if  {
		http.Error(, "Session termination not allowed", http.StatusMethodNotAllowed)
		return
	}

	.cleanupSessionState(.Context(), )

	.WriteHeader(http.StatusOK)
}

func writeSSEEvent( io.Writer,  any) error {
	,  := json.Marshal()
	if  != nil {
		return fmt.Errorf("failed to marshal data: %w", )
	}
	_,  = fmt.Fprintf(, "event: message\ndata: %s\n\n", )
	if  != nil {
		return fmt.Errorf("failed to write SSE event: %w", )
	}
	return nil
}

// handleSamplingResponse processes incoming sampling responses from clients
func ( *StreamableHTTPServer) ( http.ResponseWriter,  *http.Request,  struct {
	     json.RawMessage `json:"id"`
	 json.RawMessage `json:"result,omitempty"`
	  json.RawMessage `json:"error,omitempty"`
	 mcp.MCPMethod   `json:"method,omitempty"`
}) error {
	// Get session ID from header
	 := .Header.Get(HeaderKeySessionID)
	if  == "" {
		http.Error(, "Missing session ID for sampling response", http.StatusBadRequest)
		return fmt.Errorf("missing session ID")
	}

	// Validate session
	 := .sessionIdManagerResolver.ResolveSessionIdManager()
	,  := .Validate()
	if  != nil {
		http.Error(, "Invalid session ID", http.StatusNotFound)
		return 
	}
	if  {
		http.Error(, "Session terminated", http.StatusNotFound)
		return fmt.Errorf("session terminated")
	}

	// Parse the request ID
	var  int64
	if  := json.Unmarshal(., &);  != nil {
		http.Error(, "Invalid request ID in sampling response", http.StatusBadRequest)
		return 
	}

	// Create the sampling response item
	 := samplingResponseItem{
		requestID: ,
	}

	// Parse result or error
	if . != nil {
		// Parse error
		var  struct {
			    int    `json:"code"`
			 string `json:"message"`
		}
		if  := json.Unmarshal(., &);  != nil {
			.err = fmt.Errorf("failed to parse error: %v", )
		} else {
			.err = fmt.Errorf("sampling error %d: %s", ., .)
		}
	} else if . != nil {
		// Store the result to be unmarshaled later
		.result = .
	} else {
		.err = fmt.Errorf("sampling response has neither result nor error")
	}

	// Find the corresponding session and deliver the response
	// The response is delivered to the specific session identified by sessionID
	if  := .deliverSamplingResponse(, );  != nil {
		.logger.Errorf("Failed to deliver sampling response: %v", )
		http.Error(, "Failed to deliver response", http.StatusInternalServerError)
		return 
	}

	// Acknowledge receipt
	.WriteHeader(http.StatusAccepted)
	return nil
}

// deliverSamplingResponse delivers a sampling response to the appropriate session
func ( *StreamableHTTPServer) ( string,  samplingResponseItem) error {
	// Look up the active session
	,  := .activeSessions.Load()
	if ! {
		return fmt.Errorf("no active session found for session %s", )
	}

	,  := .(*streamableHttpSession)
	if ! {
		return fmt.Errorf("invalid session type for session %s", )
	}

	// Look up the dedicated response channel for this specific request
	,  := .samplingRequests.Load(.requestID)
	if ! {
		return fmt.Errorf("no pending request found for session %s, request %d", , .requestID)
	}

	,  := .(chan samplingResponseItem)
	if ! {
		return fmt.Errorf("invalid response channel type for session %s, request %d", , .requestID)
	}

	// Attempt to deliver the response with timeout to prevent indefinite blocking
	select {
	case  <- :
		.logger.Infof("Delivered sampling response for session %s, request %d", , .requestID)
		return nil
	default:
		return fmt.Errorf("failed to deliver sampling response for session %s, request %d: channel full or blocked", , .requestID)
	}
}

// writeJSONRPCError writes a JSON-RPC error response with the given error details.
func ( *StreamableHTTPServer) (
	 http.ResponseWriter,
	 any,
	 int,
	 string,
) {
	 := createErrorResponse(, , )
	.Header().Set("Content-Type", "application/json")
	.WriteHeader(http.StatusBadRequest)
	 := json.NewEncoder().Encode()
	if  != nil {
		.logger.Errorf("Failed to write JSONRPCError: %v", )
	}
}

// nextRequestID gets the next incrementing requestID for the current session
func ( *StreamableHTTPServer) ( string) int64 {
	,  := .sessionRequestIDs.LoadOrStore(, new(atomic.Int64))
	 := .(*atomic.Int64)
	return .Add(1)
}

// touchSession records the current time as the last activity for the given session.
// It is a no-op when the sweeper is disabled (sessionIdleTTL <= 0) or sessionID is empty.
func ( *StreamableHTTPServer) ( string) {
	if  == "" || .sessionIdleTTL <= 0 {
		return
	}
	 := time.Now().UnixNano()
	,  := .sessionLastActive.LoadOrStore(, new(atomic.Int64))
	.(*atomic.Int64).Store()
}

// cleanupSessionState removes all per-session transport state for the given session ID.
func ( *StreamableHTTPServer) ( context.Context,  string) {
	// Unregister first to stop notification routing before deleting data.
	.server.UnregisterSession(, )
	.activeSessions.Delete()
	.sessionTools.delete()
	.sessionResources.delete()
	.sessionResourceTemplates.delete()
	.sessionLogLevels.delete()
	.sessionRequestIDs.Delete()
	.sessionLastActive.Delete()
}

// startSessionSweeper launches a background goroutine that periodically removes
// transport state for sessions that have been idle longer than sessionIdleTTL.
func ( *StreamableHTTPServer) ( context.Context) {
	 := max(.sessionIdleTTL/2, time.Second)

	go func() {
		 := time.NewTicker()
		defer .Stop()

		for {
			select {
			case <-.Done():
				return
			case <-.C:
				.sweepExpiredSessions()
			}
		}
	}()
}

// sweepExpiredSessions iterates all tracked sessions and cleans up those
// whose last activity exceeds sessionIdleTTL.
func ( *StreamableHTTPServer) () {
	 := time.Now().UnixNano()
	 := .sessionIdleTTL.Nanoseconds()

	.sessionLastActive.Range(func(,  any) bool {
		,  := .(string)
		if ! {
			.sessionLastActive.Delete()
			return true
		}
		,  := .(*atomic.Int64)
		if ! {
			.sessionLastActive.Delete()
			return true
		}

		 := .Load()
		if - <  {
			return true
		}

		// Re-check: if lastActive changed since we read it, the session
		// was touched concurrently — skip it. A small TOCTOU window
		// remains between this check and cleanup, but it is acceptable
		// for a distributed best-effort sweeper.
		if .Load() !=  {
			return true
		}

		.logger.Infof("Sweeping expired session: %s", )
		 := .sessionIdManager
		if  == nil {
			 = .sessionIdManagerResolver.ResolveSessionIdManager(nil)
		}
		_, _ = .Terminate()
		.cleanupSessionState(context.Background(), )
		return true
	})
}

// --- session ---
type sessionLogLevelsStore struct {
	mu   sync.RWMutex
	logs map[string]mcp.LoggingLevel
}

func newSessionLogLevelsStore() *sessionLogLevelsStore {
	return &sessionLogLevelsStore{
		logs: make(map[string]mcp.LoggingLevel),
	}
}

func ( *sessionLogLevelsStore) ( string) mcp.LoggingLevel {
	.mu.RLock()
	defer .mu.RUnlock()
	,  := .logs[]
	if ! {
		return mcp.LoggingLevelError
	}
	return 
}

func ( *sessionLogLevelsStore) ( string,  mcp.LoggingLevel) {
	.mu.Lock()
	defer .mu.Unlock()
	.logs[] = 
}

func ( *sessionLogLevelsStore) ( string) {
	.mu.Lock()
	defer .mu.Unlock()
	delete(.logs, )
}

type sessionResourcesStore struct {
	mu        sync.RWMutex
	resources map[string]map[string]ServerResource // sessionID -> resourceURI -> resource
}

func newSessionResourcesStore() *sessionResourcesStore {
	return &sessionResourcesStore{
		resources: make(map[string]map[string]ServerResource),
	}
}

func ( *sessionResourcesStore) ( string) map[string]ServerResource {
	.mu.RLock()
	defer .mu.RUnlock()
	 := make(map[string]ServerResource, len(.resources[]))
	maps.Copy(, .resources[])
	return 
}

func ( *sessionResourcesStore) ( string,  map[string]ServerResource) {
	.mu.Lock()
	defer .mu.Unlock()
	 := make(map[string]ServerResource, len())
	maps.Copy(, )
	.resources[] = 
}

func ( *sessionResourcesStore) ( string) {
	.mu.Lock()
	defer .mu.Unlock()
	delete(.resources, )
}

type sessionResourceTemplatesStore struct {
	mu        sync.RWMutex
	templates map[string]map[string]ServerResourceTemplate // sessionID -> uriTemplate -> template
}

func newSessionResourceTemplatesStore() *sessionResourceTemplatesStore {
	return &sessionResourceTemplatesStore{
		templates: make(map[string]map[string]ServerResourceTemplate),
	}
}

func ( *sessionResourceTemplatesStore) ( string) map[string]ServerResourceTemplate {
	.mu.RLock()
	defer .mu.RUnlock()
	 := make(map[string]ServerResourceTemplate, len(.templates[]))
	maps.Copy(, .templates[])
	return 
}

func ( *sessionResourceTemplatesStore) ( string,  map[string]ServerResourceTemplate) {
	.mu.Lock()
	defer .mu.Unlock()
	 := make(map[string]ServerResourceTemplate, len())
	maps.Copy(, )
	.templates[] = 
}

func ( *sessionResourceTemplatesStore) ( string) {
	.mu.Lock()
	defer .mu.Unlock()
	delete(.templates, )
}

type sessionToolsStore struct {
	mu    sync.RWMutex
	tools map[string]map[string]ServerTool // sessionID -> toolName -> tool
}

func newSessionToolsStore() *sessionToolsStore {
	return &sessionToolsStore{
		tools: make(map[string]map[string]ServerTool),
	}
}

func ( *sessionToolsStore) ( string) map[string]ServerTool {
	.mu.RLock()
	defer .mu.RUnlock()
	 := make(map[string]ServerTool, len(.tools[]))
	maps.Copy(, .tools[])
	return 
}

func ( *sessionToolsStore) ( string,  map[string]ServerTool) {
	.mu.Lock()
	defer .mu.Unlock()
	 := make(map[string]ServerTool, len())
	maps.Copy(, )
	.tools[] = 
}

func ( *sessionToolsStore) ( string) {
	.mu.Lock()
	defer .mu.Unlock()
	delete(.tools, )
}

// Sampling support types for HTTP transport
type samplingRequestItem struct {
	requestID int64
	request   mcp.CreateMessageRequest
	response  chan samplingResponseItem
}

type samplingResponseItem struct {
	requestID int64
	result    json.RawMessage
	err       error
}

// Elicitation support types for HTTP transport
type elicitationRequestItem struct {
	requestID int64
	request   mcp.ElicitationRequest
	response  chan samplingResponseItem
}

// Roots support types for HTTP transport
type rootsRequestItem struct {
	requestID int64
	request   mcp.ListRootsRequest
	response  chan samplingResponseItem
}

// streamableHttpSession is a session for streamable-http transport
// When in POST handlers(request/notification), it's ephemeral, and only exists in the life of the request handler.
// When in GET handlers(listening), it's a real session, and will be registered in the MCP server.
type streamableHttpSession struct {
	sessionID           string
	notificationChannel chan mcp.JSONRPCNotification // server -> client notifications
	tools               *sessionToolsStore
	resources           *sessionResourcesStore
	resourceTemplates   *sessionResourceTemplatesStore
	upgradeToSSE        atomic.Bool
	logLevels           *sessionLogLevelsStore
	clientInfo          atomic.Value // stores session-specific client info
	clientCapabilities  atomic.Value // stores session-specific client capabilities

	// Sampling support for bidirectional communication
	samplingRequestChan    chan samplingRequestItem    // server -> client sampling requests
	elicitationRequestChan chan elicitationRequestItem // server -> client elicitation requests
	rootsRequestChan       chan rootsRequestItem       // server -> client list roots requests

	samplingRequests sync.Map     // requestID -> pending sampling request context
	requestIDCounter atomic.Int64 // for generating unique request IDs
}

func newStreamableHttpSession( string,  *sessionToolsStore,  *sessionResourcesStore,  *sessionResourceTemplatesStore,  *sessionLogLevelsStore) *streamableHttpSession {
	 := &streamableHttpSession{
		sessionID:              ,
		notificationChannel:    make(chan mcp.JSONRPCNotification, 100),
		tools:                  ,
		resources:              ,
		resourceTemplates:      ,
		logLevels:              ,
		samplingRequestChan:    make(chan samplingRequestItem, 10),
		elicitationRequestChan: make(chan elicitationRequestItem, 10),
		rootsRequestChan:       make(chan rootsRequestItem, 10),
	}
	return 
}

func ( *streamableHttpSession) () string {
	return .sessionID
}

func ( *streamableHttpSession) () chan<- mcp.JSONRPCNotification {
	return .notificationChannel
}

func ( *streamableHttpSession) () {
	// do nothing
	// the session is ephemeral, no real initialized action needed
}

func ( *streamableHttpSession) () bool {
	// the session is ephemeral, no real initialized action needed
	return true
}

func ( *streamableHttpSession) ( mcp.LoggingLevel) {
	.logLevels.set(.sessionID, )
}

func ( *streamableHttpSession) () mcp.LoggingLevel {
	return .logLevels.get(.sessionID)
}

var _ ClientSession = (*streamableHttpSession)(nil)

func ( *streamableHttpSession) () map[string]ServerTool {
	return .tools.get(.sessionID)
}

func ( *streamableHttpSession) ( map[string]ServerTool) {
	.tools.set(.sessionID, )
}

func ( *streamableHttpSession) () map[string]ServerResource {
	return .resources.get(.sessionID)
}

func ( *streamableHttpSession) ( map[string]ServerResource) {
	.resources.set(.sessionID, )
}

func ( *streamableHttpSession) () map[string]ServerResourceTemplate {
	return .resourceTemplates.get(.sessionID)
}

func ( *streamableHttpSession) ( map[string]ServerResourceTemplate) {
	.resourceTemplates.set(.sessionID, )
}

func ( *streamableHttpSession) () mcp.Implementation {
	if  := .clientInfo.Load();  != nil {
		if ,  := .(mcp.Implementation);  {
			return 
		}
	}
	return mcp.Implementation{}
}

func ( *streamableHttpSession) ( mcp.Implementation) {
	.clientInfo.Store()
}

func ( *streamableHttpSession) () mcp.ClientCapabilities {
	if  := .clientCapabilities.Load();  != nil {
		if ,  := .(mcp.ClientCapabilities);  {
			return 
		}
	}
	return mcp.ClientCapabilities{}
}

func ( *streamableHttpSession) ( mcp.ClientCapabilities) {
	.clientCapabilities.Store()
}

var (
	_ SessionWithTools             = (*streamableHttpSession)(nil)
	_ SessionWithResources         = (*streamableHttpSession)(nil)
	_ SessionWithResourceTemplates = (*streamableHttpSession)(nil)
	_ SessionWithLogging           = (*streamableHttpSession)(nil)
	_ SessionWithClientInfo        = (*streamableHttpSession)(nil)
)

func ( *streamableHttpSession) () {
	.upgradeToSSE.Store(true)
}

var _ SessionWithStreamableHTTPConfig = (*streamableHttpSession)(nil)

// RequestSampling implements SessionWithSampling interface for HTTP transport
func ( *streamableHttpSession) ( context.Context,  mcp.CreateMessageRequest) (*mcp.CreateMessageResult, error) {
	// Generate unique request ID
	 := .requestIDCounter.Add(1)

	// Create response channel for this specific request
	 := make(chan samplingResponseItem, 1)

	// Create the sampling request item
	 := samplingRequestItem{
		requestID: ,
		request:   ,
		response:  ,
	}

	// Store the pending request
	.samplingRequests.Store(, )
	defer .samplingRequests.Delete()

	// Send the sampling request via the channel (non-blocking)
	select {
	case .samplingRequestChan <- :
		// Request queued successfully
	case <-.Done():
		return nil, .Err()
	default:
		return nil, fmt.Errorf("sampling request queue is full - server overloaded")
	}

	// Wait for response or context cancellation
	select {
	case  := <-:
		if .err != nil {
			return nil, .err
		}
		var  mcp.CreateMessageResult
		if  := json.Unmarshal(.result, &);  != nil {
			return nil, fmt.Errorf("failed to unmarshal sampling response: %v", )
		}

		// Parse content from map[string]any to proper Content type (TextContent, ImageContent, AudioContent)
		// HTTP transport unmarshals Content as map[string]any, we need to convert it to the proper type
		if ,  := .Content.(map[string]any);  {
			,  := mcp.ParseContent()
			if  != nil {
				return nil, fmt.Errorf("failed to parse sampling response content: %w", )
			}
			.Content = 
		}

		return &, nil
	case <-.Done():
		return nil, .Err()
	}
}

// ListRoots implements SessionWithRoots interface for HTTP transport.
// It sends a list roots request to the client via SSE and waits for the response.
func ( *streamableHttpSession) ( context.Context,  mcp.ListRootsRequest) (*mcp.ListRootsResult, error) {
	// Generate unique request ID
	 := .requestIDCounter.Add(1)

	// Create response channel for this specific request
	 := make(chan samplingResponseItem, 1)

	// Create the roots request item
	 := rootsRequestItem{
		requestID: ,
		request:   ,
		response:  ,
	}

	// Store the pending request
	.samplingRequests.Store(, )
	defer .samplingRequests.Delete()

	// Send the list roots request via the channel (non-blocking)
	select {
	case .rootsRequestChan <- :
		// Request queued successfully
	case <-.Done():
		return nil, .Err()
	default:
		return nil, fmt.Errorf("list roots request queue is full - server overloaded")
	}

	// Wait for response or context cancellation
	select {
	case  := <-:
		if .err != nil {
			return nil, .err
		}
		var  mcp.ListRootsResult
		if  := json.Unmarshal(.result, &);  != nil {
			return nil, fmt.Errorf("failed to unmarshal list roots response: %v", )
		}
		return &, nil
	case <-.Done():
		return nil, .Err()
	}
}

// RequestElicitation implements SessionWithElicitation interface for HTTP transport
func ( *streamableHttpSession) ( context.Context,  mcp.ElicitationRequest) (*mcp.ElicitationResult, error) {
	// Generate unique request ID
	 := .requestIDCounter.Add(1)

	// Create response channel for this specific request
	 := make(chan samplingResponseItem, 1)

	// Create the sampling request item
	 := elicitationRequestItem{
		requestID: ,
		request:   ,
		response:  ,
	}

	// Store the pending request
	.samplingRequests.Store(, )
	defer .samplingRequests.Delete()

	// Send the sampling request via the channel (non-blocking)
	select {
	case .elicitationRequestChan <- :
		// Request queued successfully
	case <-.Done():
		return nil, .Err()
	default:
		return nil, fmt.Errorf("elicitation request queue is full - server overloaded")
	}

	// Wait for response or context cancellation
	select {
	case  := <-:
		if .err != nil {
			return nil, .err
		}
		var  mcp.ElicitationResult
		if  := json.Unmarshal(.result, &);  != nil {
			return nil, fmt.Errorf("failed to unmarshal elicitation response: %v", )
		}
		return &, nil
	case <-.Done():
		return nil, .Err()
	}
}

var _ SessionWithSampling = (*streamableHttpSession)(nil)
var _ SessionWithElicitation = (*streamableHttpSession)(nil)
var _ SessionWithRoots = (*streamableHttpSession)(nil)

// --- session id manager ---

// SessionIdManagerResolver resolves a SessionIdManager based on the HTTP request.
// Implementations must handle a nil r, which may be passed from non-request
// contexts such as the session idle TTL sweeper.
type SessionIdManagerResolver interface {
	ResolveSessionIdManager(r *http.Request) SessionIdManager
}

type SessionIdManager interface {
	Generate() string
	// Validate checks if a session ID is valid and not terminated.
	// Returns isTerminated=true if the ID is valid but belongs to a terminated session.
	// Returns err!=nil if the ID format is invalid or lookup failed.
	Validate(sessionID string) (isTerminated bool, err error)
	// Terminate marks a session ID as terminated.
	// Returns isNotAllowed=true if the server policy prevents client termination.
	// Returns err!=nil if the ID is invalid or termination failed.
	Terminate(sessionID string) (isNotAllowed bool, err error)
}

// DefaultSessionIdManagerResolver is a simple resolver that returns the same SessionIdManager for all requests
type DefaultSessionIdManagerResolver struct {
	manager SessionIdManager
}

// NewDefaultSessionIdManagerResolver creates a new DefaultSessionIdManagerResolver with the given SessionIdManager
func ( SessionIdManager) *DefaultSessionIdManagerResolver {
	if  == nil {
		 = &StatelessSessionIdManager{}
	}
	return &DefaultSessionIdManagerResolver{manager: }
}

// ResolveSessionIdManager returns the configured SessionIdManager for all requests
func ( *DefaultSessionIdManagerResolver) ( *http.Request) SessionIdManager {
	return .manager
}

// StatelessSessionIdManager does nothing, which means it has no session management, which is stateless.
type StatelessSessionIdManager struct{}

func ( *StatelessSessionIdManager) () string {
	return ""
}

func ( *StatelessSessionIdManager) ( string) ( bool,  error) {
	// In stateless mode, ignore session IDs completely - don't validate or reject them
	return false, nil
}

func ( *StatelessSessionIdManager) ( string) ( bool,  error) {
	return false, nil
}

// StatelessGeneratingSessionIdManager generates session IDs but doesn't validate them locally.
// This allows session IDs to be generated for clients while working across multiple instances.
type StatelessGeneratingSessionIdManager struct{}

func ( *StatelessGeneratingSessionIdManager) () string {
	return idPrefix + uuid.New().String()
}

func ( *StatelessGeneratingSessionIdManager) ( string) ( bool,  error) {
	// Only validate format, not existence - allows cross-instance operation
	if !strings.HasPrefix(, idPrefix) {
		return false, fmt.Errorf("invalid session id: %s", )
	}
	if ,  := uuid.Parse([len(idPrefix):]);  != nil {
		return false, fmt.Errorf("invalid session id: %s", )
	}
	return false, nil
}

func ( *StatelessGeneratingSessionIdManager) ( string) ( bool,  error) {
	// No-op termination since we don't track sessions
	return false, nil
}

// InsecureStatefulSessionIdManager generate id with uuid and tracks active sessions.
// It validates both format and existence of session IDs.
// For more secure session id, use a more complex generator, like a JWT.
type InsecureStatefulSessionIdManager struct {
	sessions   sync.Map
	terminated sync.Map
}

const idPrefix = "mcp-session-"

func ( *InsecureStatefulSessionIdManager) () string {
	 := idPrefix + uuid.New().String()
	.sessions.Store(, true)
	return 
}

func ( *InsecureStatefulSessionIdManager) ( string) ( bool,  error) {
	if !strings.HasPrefix(, idPrefix) {
		return false, fmt.Errorf("invalid session id: %s", )
	}
	if ,  := uuid.Parse([len(idPrefix):]);  != nil {
		return false, fmt.Errorf("invalid session id: %s", )
	}
	if ,  := .terminated.Load();  {
		return true, nil
	}
	if ,  := .sessions.Load(); ! {
		return false, fmt.Errorf("session not found: %s", )
	}
	return false, nil
}

func ( *InsecureStatefulSessionIdManager) ( string) ( bool,  error) {
	if ,  := .terminated.Load();  {
		return false, nil
	}
	if ,  := .sessions.Load(); ! {
		return false, nil
	}
	.terminated.Store(, true)
	.sessions.Delete()
	return false, nil
}

// NewTestStreamableHTTPServer creates a test server for testing purposes
func ( *MCPServer,  ...StreamableHTTPOption) *httptest.Server {
	 := NewStreamableHTTPServer(, ...)
	 := httptest.NewServer()
	return 
}

// isJSONEmpty reports whether the provided JSON value is "empty":
//   - null
//   - empty object: {}
//   - empty array: []
//
// It also treats nil/whitespace-only input as empty.
// It does NOT treat 0, false, "" or non-empty composites as empty.
func isJSONEmpty( json.RawMessage) bool {
	if len() == 0 {
		return true
	}

	 := bytes.TrimSpace()
	if len() == 0 {
		return true
	}

	switch [0] {
	case '{':
		if len() == 2 && [1] == '}' {
			return true
		}
		for  := 1;  < len(); ++ {
			if !unicode.IsSpace(rune([])) {
				return [] == '}'
			}
		}
	case '[':
		if len() == 2 && [1] == ']' {
			return true
		}
		for  := 1;  < len(); ++ {
			if !unicode.IsSpace(rune([])) {
				return [] == ']'
			}
		}

	case '"': // treat "" as not empty
		return false

	case 'n': // null
		return len() == 4 &&
			[1] == 'u' &&
			[2] == 'l' &&
			[3] == 'l'
	}
	return false
}

// isExplicitEmptyObject reports whether data is a JSON object literal with no fields.
func isExplicitEmptyObject( json.RawMessage) bool {
	if len() == 0 {
		return false
	}

	 := bytes.TrimSpace()
	if len() == 0 || [0] != '{' {
		return false
	}

	var  map[string]json.RawMessage
	if  := json.Unmarshal(, &);  != nil {
		return false
	}

	return len() == 0
}