Skip to content

Commit f8fd577

Browse files
jfallowsclaude
andauthored
fix(binding-mcp): defer reply content-type for elicitation-capable sessions (#1844)
* fix(binding-mcp): defer reply content-type for elicitation-capable sessions When an mcp server fronts an arbitrary mcp-speaking upstream, the reply-side frames can arrive as: reply BEGIN, then an elicitation/create CHALLENGE, then the result. McpRequestStream.onAppBegin committed the HTTP response content-type eagerly (application/json, absent a progressToken), so a subsequent elicitation/create CHALLENGE — which must stream over text/event-stream — could not be honored. PR #1842 detects this and fails loud; this change avoids it arising in the first place. Defer the reply content-type when the session can elicit (requestTimeout > 0, i.e. the client advertised elicitation.url and options.timeout is set): the net reply BEGIN is not emitted until the first frame that determines the format — JSON on the first result DATA, or text/event-stream when an elicitation/create CHALLENGE arrives (server.onAppChallenge, with replyOpening still false so the #1842 guard is correctly skipped). Sessions that cannot elicit (requestTimeout == 0) keep the original eager behavior, preserving the cancel/abort/shutdown choreography. A shared ensureResponseBegun() commits the BEGIN exactly once and is invoked from onAppData, onAppEnd, onAppAbort and doAppCancel so every terminal path still emits a response. #1823 (resumable flush -> JSON) and #1842 (result-before-challenge -> abort) both still hold under deferral. Adds tools.call.elicit.deferred.begin (McpServerIT) exercising the reply-BEGIN-before-CHALLENGE order, which fails on develop and passes with this change. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix(binding-mcp): address review — fold begin guard into doEncodeResponseBegin - Add a no-extension McpServer.doEncodeResponseBegin overload that holds the replyOpening guard and builds the default success BEGIN (content-type from sseUpgrade); McpRequestStream calls it directly instead of a local helper. - Rename scenario tools.call.elicit.deferred.begin -> tools.call.elicit.deferred and drop verbose script comments. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * build: bump k3po to 3.4.3 3.4.3 pins the transitive org.kaazing net.* dependencies to concrete versions instead of version ranges, eliminating the remote maven-metadata lookup during notice:check that hung the CI build. https://claude.ai/code/session_01Br7T7CmnoMLUrtEpF4j9AG --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 7fde2a0 commit f8fd577

5 files changed

Lines changed: 306 additions & 20 deletions

File tree

  • runtime/binding-mcp/src
  • specs/binding-mcp.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mcp/streams

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
<byteman.version>4.0.26</byteman.version>
6262
<jmock.version>2.6.0</jmock.version>
6363
<mockito.version>5.23.0</mockito.version>
64-
<k3po.version>3.4.2</k3po.version>
64+
<k3po.version>3.4.3</k3po.version>
6565
<jmh.version>1.37</jmh.version>
6666
</properties>
6767

runtime/binding-mcp/src/main/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/McpServerFactory.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2191,6 +2191,22 @@ private void onAppChallenge(
21912191
.build());
21922192
}
21932193

2194+
private void doEncodeResponseBegin(
2195+
long traceId,
2196+
long authorization)
2197+
{
2198+
if (!McpState.replyOpening(state))
2199+
{
2200+
final String contentType = sseUpgrade ? CONTENT_TYPE_EVENT_STREAM : CONTENT_TYPE_JSON;
2201+
doEncodeResponseBegin(traceId, authorization, httpBeginExRW.wrap(codecBuffer, 0, codecBuffer.capacity())
2202+
.typeId(httpTypeId)
2203+
.headersItem(h -> h.name(HTTP_HEADER_STATUS).value(STATUS_200))
2204+
.headersItem(h -> h.name(HTTP_HEADER_CONTENT_TYPE).value(contentType))
2205+
.inject(this::injectAltSvc)
2206+
.build());
2207+
}
2208+
}
2209+
21942210
private void doEncodeResponseBegin(
21952211
long traceId,
21962212
long authorization,
@@ -4457,7 +4473,7 @@ private void doAppCancel(
44574473
long traceId,
44584474
long authorization)
44594475
{
4460-
server.doNetBegin(traceId, authorization, emptyRO);
4476+
server.doEncodeResponseBegin(traceId, authorization);
44614477
server.doNetAbort(traceId, authorization);
44624478
doAppReset(traceId, authorization);
44634479
doAppAbort(traceId, authorization);
@@ -4639,25 +4655,9 @@ private void onAppBegin(
46394655
server.sseUpgrade = true;
46404656
}
46414657

4642-
if (server.sseUpgrade)
4643-
{
4644-
server.doEncodeResponseBegin(traceId, authorization,
4645-
httpBeginExRW.wrap(codecBuffer, 0, codecBuffer.capacity())
4646-
.typeId(httpTypeId)
4647-
.headersItem(h -> h.name(HTTP_HEADER_STATUS).value(STATUS_200))
4648-
.headersItem(h -> h.name(HTTP_HEADER_CONTENT_TYPE).value(CONTENT_TYPE_EVENT_STREAM))
4649-
.inject(server::injectAltSvc)
4650-
.build());
4651-
}
4652-
else
4658+
if (server.sseUpgrade || session.requestTimeout == 0L)
46534659
{
4654-
server.doEncodeResponseBegin(traceId, authorization,
4655-
httpBeginExRW.wrap(codecBuffer, 0, codecBuffer.capacity())
4656-
.typeId(httpTypeId)
4657-
.headersItem(h -> h.name(HTTP_HEADER_STATUS).value(STATUS_200))
4658-
.headersItem(h -> h.name(HTTP_HEADER_CONTENT_TYPE).value(CONTENT_TYPE_JSON))
4659-
.inject(server::injectAltSvc)
4660-
.build());
4660+
server.doEncodeResponseBegin(traceId, authorization);
46614661
}
46624662

46634663
flushAppWindow(traceId, authorization, 0L, 0, 0, encodeMax);
@@ -4695,6 +4695,7 @@ else if (payload != null)
46954695
}
46964696
else
46974697
{
4698+
server.doEncodeResponseBegin(traceId, authorization);
46984699
server.doEncodeResponseData(traceId, authorization, payload.value());
46994700
}
47004701
}
@@ -4834,6 +4835,7 @@ private void onAppEnd(
48344835
}
48354836
else
48364837
{
4838+
server.doEncodeResponseBegin(traceId, authorization);
48374839
server.doEncodeResponseEnd(traceId, authorization);
48384840
}
48394841

@@ -4846,6 +4848,7 @@ private void onAppAbort(
48464848
final long traceId = abort.traceId();
48474849
final long authorization = abort.authorization();
48484850

4851+
server.doEncodeResponseBegin(traceId, authorization);
48494852
server.doNetAbort(traceId, authorization);
48504853

48514854
doAppAbort(traceId, authorization);

runtime/binding-mcp/src/test/java/io/aklivity/zilla/runtime/binding/mcp/internal/stream/McpServerIT.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,16 @@ public void shouldCallToolElicitAfterResult() throws Exception
334334
k3po.finish();
335335
}
336336

337+
@Test
338+
@Configuration("server.timeout.yaml")
339+
@Specification({
340+
"${net}/tools.call.elicit.deferred/client",
341+
"${app}/tools.call.elicit.deferred/server"})
342+
public void shouldCallToolElicitDeferred() throws Exception
343+
{
344+
k3po.finish();
345+
}
346+
337347
@Test
338348
@Configuration("server.timeout.yaml")
339349
@Specification({
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
#
2+
# Copyright 2021-2024 Aklivity Inc
3+
#
4+
# Licensed under the Aklivity Community License (the "License"); you may not use
5+
# this file except in compliance with the License. You may obtain a copy of the
6+
# License at
7+
#
8+
# https://www.aklivity.io/aklivity-community-license/
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
# specific language governing permissions and limitations under the License.
14+
#
15+
16+
property serverAddress "zilla://streams/app0"
17+
18+
accept ${serverAddress}
19+
option zilla:window 8192
20+
option zilla:transmission "half-duplex"
21+
22+
accepted
23+
24+
read zilla:begin.ext ${mcp:matchBeginEx()
25+
.typeId(zilla:id("mcp"))
26+
.lifecycle()
27+
.build()
28+
.build()}
29+
30+
connected
31+
32+
write zilla:begin.ext ${mcp:beginEx()
33+
.typeId(zilla:id("mcp"))
34+
.lifecycle()
35+
.sessionId("session-1")
36+
.build()
37+
.build()}
38+
write flush
39+
40+
accepted
41+
42+
read zilla:begin.ext ${mcp:matchBeginEx()
43+
.typeId(zilla:id("mcp"))
44+
.toolsCall()
45+
.sessionId("session-1")
46+
.name("get_weather")
47+
.contentLength(59)
48+
.build()
49+
.build()}
50+
51+
connected
52+
53+
read '{'
54+
'"name":"get_weather",'
55+
'"arguments":'
56+
'{'
57+
'"location": "New York"'
58+
'}'
59+
'}'
60+
61+
write flush
62+
63+
read advise zilla:challenge ${mcp:challengeEx()
64+
.typeId(zilla:id("mcp"))
65+
.elicitCreate()
66+
.id("1")
67+
.url("https://server.example.com/authorize?state=7f3a9b1c&redirect_uri=%s".formatted(http:encodeQuery("https://replace.me/callback")))
68+
.build()
69+
.build()}
70+
71+
read advised zilla:flush ${mcp:matchFlushEx()
72+
.typeId(zilla:id("mcp"))
73+
.elicitCallback()
74+
.url("http://localhost:8080/mcp/auth/callback?code=xyz&state=7f3a9b1c")
75+
.build()
76+
.build()}
77+
78+
write advise zilla:flush ${mcp:flushEx()
79+
.typeId(zilla:id("mcp"))
80+
.elicitComplete()
81+
.id("1")
82+
.status("COMPLETED")
83+
.build()
84+
.build()}
85+
86+
write '{'
87+
'"content":'
88+
'['
89+
'{'
90+
'"type": "text",'
91+
'"text": "Current weather in New York:\\nTemperature: 72°F\\nConditions: Partly cloudy"'
92+
'}'
93+
'],'
94+
'"isError": false'
95+
'}'
96+
write flush
97+
98+
write close
99+
100+
read closed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
#
2+
# Copyright 2021-2024 Aklivity Inc
3+
#
4+
# Licensed under the Aklivity Community License (the "License"); you may not use
5+
# this file except in compliance with the License. You may obtain a copy of the
6+
# License at
7+
#
8+
# https://www.aklivity.io/aklivity-community-license/
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
13+
# specific language governing permissions and limitations under the License.
14+
#
15+
16+
connect "zilla://streams/net0"
17+
option zilla:window 8192
18+
option zilla:transmission "half-duplex"
19+
20+
write zilla:begin.ext ${http:beginEx()
21+
.typeId(zilla:id("http"))
22+
.header(":method", "POST")
23+
.header(":scheme", "http")
24+
.header(":authority", "localhost:8080")
25+
.header(":path", "/mcp")
26+
.header("content-type", "application/json")
27+
.header("accept", "application/json, text/event-stream")
28+
.header("mcp-protocol-version", "2025-11-25")
29+
.build()}
30+
31+
connected
32+
33+
write '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-11-25","capabilities":{"elicitation":{"url":{}}},"clientInfo":{"name":"test","version":"1.0"}}}'
34+
write close
35+
36+
read zilla:begin.ext ${http:matchBeginEx()
37+
.typeId(zilla:id("http"))
38+
.header(":status", "200")
39+
.header("content-type", "application/json")
40+
.header("mcp-session-id", "transport-1")
41+
.build()}
42+
43+
read '{"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2025-11-25","capabilities":{"prompts":{"listChanged":true},"resources":{"listChanged":true},"tools":{"listChanged":true}},"serverInfo":{"name":"zilla","version":"1.0"}}}'
44+
read closed
45+
read notify LIFECYCLE_INITIALIZING
46+
47+
connect await LIFECYCLE_INITIALIZING
48+
"zilla://streams/net0"
49+
option zilla:window 8192
50+
option zilla:transmission "half-duplex"
51+
52+
write zilla:begin.ext ${http:beginEx()
53+
.typeId(zilla:id("http"))
54+
.header(":method", "POST")
55+
.header(":scheme", "http")
56+
.header(":authority", "localhost:8080")
57+
.header(":path", "/mcp")
58+
.header("content-type", "application/json")
59+
.header("accept", "application/json, text/event-stream")
60+
.header("mcp-protocol-version", "2025-11-25")
61+
.header("mcp-session-id", "transport-1")
62+
.build()}
63+
64+
connected
65+
66+
write '{"jsonrpc":"2.0","method":"notifications/initialized"}'
67+
write close
68+
69+
read closed
70+
read notify LIFECYCLE_INITIALIZED
71+
72+
connect await LIFECYCLE_INITIALIZED
73+
"zilla://streams/net0"
74+
option zilla:window 8192
75+
option zilla:transmission "half-duplex"
76+
77+
write zilla:begin.ext ${http:beginEx()
78+
.typeId(zilla:id("http"))
79+
.header(":method", "POST")
80+
.header(":scheme", "http")
81+
.header(":authority", "localhost:8080")
82+
.header(":path", "/mcp")
83+
.header("content-type", "application/json")
84+
.header("accept", "application/json, text/event-stream")
85+
.header("mcp-protocol-version", "2025-11-25")
86+
.header("mcp-session-id", "transport-1")
87+
.header("content-length", "115")
88+
.build()}
89+
90+
connected
91+
92+
write '{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"get_weather","arguments":{"location": "New York"}}}'
93+
94+
write close
95+
96+
read zilla:begin.ext ${http:matchBeginEx()
97+
.typeId(zilla:id("http"))
98+
.header(":status", "200")
99+
.header("content-type", "text/event-stream")
100+
.build()}
101+
102+
read 'id: 2:1\n'
103+
read 'data: {'
104+
'"jsonrpc":"2.0",'
105+
'"method":"elicitation/create",'
106+
'"params":'
107+
'{'
108+
'"mode":"url",'
109+
'"elicitationId":"elicit-1",'
110+
'"url":"https://server.example.com/authorize?state=transport-1.elicit-1.7f3a9b1c&redirect_uri='
111+
${http:encodeQuery("https://localhost:8080/mcp/auth/callback")}
112+
'"'
113+
'}'
114+
'}\n'
115+
read '\n'
116+
read notify ELICIT_CREATED
117+
118+
read 'id: 2:1\n'
119+
read 'data: {'
120+
'"jsonrpc":"2.0",'
121+
'"method":"elicitation/complete",'
122+
'"params":'
123+
'{'
124+
'"elicitationId":"elicit-1",'
125+
'"status":"completed"'
126+
'}'
127+
'}\n'
128+
read '\n'
129+
130+
read 'data: {'
131+
'"jsonrpc":"2.0",'
132+
'"id":2,'
133+
'"result":'
134+
'{'
135+
'"content":'
136+
'['
137+
'{'
138+
'"type": "text",'
139+
'"text": "Current weather in New York:\\nTemperature: 72°F\\nConditions: Partly cloudy"'
140+
'}'
141+
'],'
142+
'"isError": false'
143+
'}'
144+
'}\n'
145+
read '\n'
146+
147+
read closed
148+
149+
connect await ELICIT_CREATED
150+
"zilla://streams/net0"
151+
option zilla:window 8192
152+
option zilla:transmission "half-duplex"
153+
154+
write zilla:begin.ext ${http:beginEx()
155+
.typeId(zilla:id("http"))
156+
.header(":method", "GET")
157+
.header(":scheme", "http")
158+
.header(":authority", "localhost:8080")
159+
.header(":path", "/mcp/auth/callback?code=xyz&state=transport-1.elicit-1.7f3a9b1c")
160+
.build()}
161+
162+
connected
163+
164+
write close
165+
166+
read zilla:begin.ext ${http:matchBeginEx()
167+
.typeId(zilla:id("http"))
168+
.header(":status", "200")
169+
.header("content-type", "text/plain")
170+
.build()}
171+
172+
read 'Authorization complete, you may close this tab.'
173+
read closed

0 commit comments

Comments
 (0)