From 7c24416504c6a03003de16ff45b5c06d98843ddb Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Wed, 30 Apr 2025 17:36:45 -0700 Subject: [PATCH] Remove batching support from StreamableHttpServerTransport --- .../Transport/StreamClientSessionTransport.cs | 2 +- .../Transport/StreamableHttpPostTransport.cs | 45 ++----------------- src/ModelContextProtocol/Shared/McpSession.cs | 2 +- .../StreamableHttpServerConformanceTests.cs | 33 -------------- 4 files changed, 5 insertions(+), 77 deletions(-) diff --git a/src/ModelContextProtocol/Protocol/Transport/StreamClientSessionTransport.cs b/src/ModelContextProtocol/Protocol/Transport/StreamClientSessionTransport.cs index 1abc05b5..fc8672e0 100644 --- a/src/ModelContextProtocol/Protocol/Transport/StreamClientSessionTransport.cs +++ b/src/ModelContextProtocol/Protocol/Transport/StreamClientSessionTransport.cs @@ -70,7 +70,7 @@ public override async Task SendMessageAsync(JsonRpcMessage message, Cancellation id = messageWithId.Id.ToString(); } - var json = JsonSerializer.Serialize(message, McpJsonUtilities.DefaultOptions.GetTypeInfo(typeof(JsonRpcMessage))); + var json = JsonSerializer.Serialize(message, McpJsonUtilities.JsonContext.Default.JsonRpcMessage); using var _ = await _sendLock.LockAsync(cancellationToken).ConfigureAwait(false); try diff --git a/src/ModelContextProtocol/Protocol/Transport/StreamableHttpPostTransport.cs b/src/ModelContextProtocol/Protocol/Transport/StreamableHttpPostTransport.cs index 29d06267..4cdb30b3 100644 --- a/src/ModelContextProtocol/Protocol/Transport/StreamableHttpPostTransport.cs +++ b/src/ModelContextProtocol/Protocol/Transport/StreamableHttpPostTransport.cs @@ -32,7 +32,9 @@ public async ValueTask RunAsync(CancellationToken cancellationToken) // The incomingChannel is null to handle the potential client GET request to handle unsolicited JsonRpcMessages. if (incomingChannel is not null) { - await OnPostBodyReceivedAsync(httpBodies.Input, cancellationToken).ConfigureAwait(false); + var message = await JsonSerializer.DeserializeAsync(httpBodies.Input.AsStream(), + McpJsonUtilities.JsonContext.Default.JsonRpcMessage, cancellationToken).ConfigureAwait(false); + await OnMessageReceivedAsync(message, cancellationToken).ConfigureAwait(false); } if (_pendingRequests.Count == 0) @@ -72,24 +74,6 @@ public async ValueTask DisposeAsync() } } - private async ValueTask OnPostBodyReceivedAsync(PipeReader streamableHttpRequestBody, CancellationToken cancellationToken) - { - if (!await IsJsonArrayAsync(streamableHttpRequestBody, cancellationToken).ConfigureAwait(false)) - { - var message = await JsonSerializer.DeserializeAsync(streamableHttpRequestBody.AsStream(), McpJsonUtilities.JsonContext.Default.JsonRpcMessage, cancellationToken).ConfigureAwait(false); - await OnMessageReceivedAsync(message, cancellationToken).ConfigureAwait(false); - } - else - { - // Batched JSON-RPC message - var messages = JsonSerializer.DeserializeAsyncEnumerable(streamableHttpRequestBody.AsStream(), McpJsonUtilities.JsonContext.Default.JsonRpcMessage, cancellationToken).ConfigureAwait(false); - await foreach (var message in messages.WithCancellation(cancellationToken)) - { - await OnMessageReceivedAsync(message, cancellationToken).ConfigureAwait(false); - } - } - } - private async ValueTask OnMessageReceivedAsync(JsonRpcMessage? message, CancellationToken cancellationToken) { if (message is null) @@ -108,27 +92,4 @@ private async ValueTask OnMessageReceivedAsync(JsonRpcMessage? message, Cancella Throw.IfNull(incomingChannel); await incomingChannel.WriteAsync(message, cancellationToken).ConfigureAwait(false); } - - private async ValueTask IsJsonArrayAsync(PipeReader requestBody, CancellationToken cancellationToken) - { - // REVIEW: Should we bother trimming whitespace before checking for '['? - var firstCharacterResult = await requestBody.ReadAtLeastAsync(1, cancellationToken).ConfigureAwait(false); - - try - { - if (firstCharacterResult.Buffer.Length == 0) - { - return false; - } - - Span firstCharBuffer = stackalloc byte[1]; - firstCharacterResult.Buffer.Slice(0, 1).CopyTo(firstCharBuffer); - return firstCharBuffer[0] == (byte)'['; - } - finally - { - // Never consume data when checking for '['. System.Text.Json still needs to consume it. - requestBody.AdvanceTo(firstCharacterResult.Buffer.Start); - } - } } diff --git a/src/ModelContextProtocol/Shared/McpSession.cs b/src/ModelContextProtocol/Shared/McpSession.cs index 95b0509d..fc7136a9 100644 --- a/src/ModelContextProtocol/Shared/McpSession.cs +++ b/src/ModelContextProtocol/Shared/McpSession.cs @@ -75,7 +75,7 @@ public McpSession( StdioClientSessionTransport or StdioServerTransport => "stdio", StreamClientSessionTransport or StreamServerTransport => "stream", SseClientSessionTransport or SseResponseStreamTransport => "sse", - StreamableHttpServerTransport or StreamableHttpPostTransport => "http", + StreamableHttpClientSessionTransport or StreamableHttpServerTransport or StreamableHttpPostTransport => "http", _ => "unknownTransport" }; diff --git a/tests/ModelContextProtocol.AspNetCore.Tests/StreamableHttpServerConformanceTests.cs b/tests/ModelContextProtocol.AspNetCore.Tests/StreamableHttpServerConformanceTests.cs index 56e25936..9e5ce6fa 100644 --- a/tests/ModelContextProtocol.AspNetCore.Tests/StreamableHttpServerConformanceTests.cs +++ b/tests/ModelContextProtocol.AspNetCore.Tests/StreamableHttpServerConformanceTests.cs @@ -182,39 +182,6 @@ public async Task InitializeJsonRpcRequest_IsHandled_WithCompleteSseResponse() await CallInitializeAndValidateAsync(); } - [Fact] - public async Task BatchedJsonRpcRequests_IsHandled_WithCompleteSseResponse() - { - await StartAsync(); - - using var response = await HttpClient.PostAsync("", JsonContent($"[{InitializeRequest},{EchoRequest}]"), TestContext.Current.CancellationToken); - Assert.Equal(HttpStatusCode.OK, response.StatusCode); - - var eventCount = 0; - await foreach (var sseEvent in ReadSseAsync(response.Content)) - { - var jsonRpcResponse = JsonSerializer.Deserialize(sseEvent, GetJsonTypeInfo()); - Assert.NotNull(jsonRpcResponse); - var responseId = Assert.IsType(jsonRpcResponse.Id.Id); - - switch (responseId) - { - case 1: - AssertServerInfo(jsonRpcResponse); - break; - case 2: - AssertEchoResponse(jsonRpcResponse); - break; - default: - throw new Exception($"Unexpected response ID: {jsonRpcResponse.Id}"); - } - - eventCount++; - } - - Assert.Equal(2, eventCount); - } - [Fact] public async Task SingleJsonRpcRequest_ThatThrowsIsHandled_WithCompleteSseResponse() {