@@ -44,6 +44,8 @@ public class StreamableHttpClientTransport implements McpClientTransport {
44
44
45
45
private static final Logger LOGGER = LoggerFactory .getLogger (StreamableHttpClientTransport .class );
46
46
47
+ private static final String MCP_SESSION_ID = "Mcp-Session-Id" ;
48
+
47
49
private final HttpClientSseClientTransport sseClientTransport ;
48
50
49
51
private final HttpClient httpClient ;
@@ -58,6 +60,8 @@ public class StreamableHttpClientTransport implements McpClientTransport {
58
60
59
61
private final AtomicReference <String > lastEventId = new AtomicReference <>();
60
62
63
+ private final AtomicReference <String > mcpSessionId = new AtomicReference <>();
64
+
61
65
private final AtomicBoolean fallbackToSse = new AtomicBoolean (false );
62
66
63
67
StreamableHttpClientTransport (final HttpClient httpClient , final HttpRequest .Builder requestBuilder ,
@@ -173,15 +177,19 @@ public Mono<Void> connect(final Function<Mono<McpSchema.JSONRPCMessage>, Mono<Mc
173
177
}
174
178
175
179
return Mono .defer (() -> Mono .fromFuture (() -> {
176
- final HttpRequest .Builder builder = requestBuilder .copy ()
180
+ final HttpRequest .Builder request = requestBuilder .copy ()
177
181
.GET ()
178
182
.header ("Accept" , "text/event-stream" )
179
183
.uri (uri );
180
184
final String lastId = lastEventId .get ();
181
185
if (lastId != null ) {
182
- builder .header ("Last-Event-ID" , lastId );
186
+ request .header ("Last-Event-ID" , lastId );
187
+ }
188
+ if (mcpSessionId .get () != null ) {
189
+ request .header (MCP_SESSION_ID , mcpSessionId .get ());
183
190
}
184
- return httpClient .sendAsync (builder .build (), HttpResponse .BodyHandlers .ofInputStream ());
191
+
192
+ return httpClient .sendAsync (request .build (), HttpResponse .BodyHandlers .ofInputStream ());
185
193
}).flatMap (response -> {
186
194
if (response .statusCode () == 405 || response .statusCode () == 404 ) {
187
195
LOGGER .warn ("Operation not allowed, falling back to SSE" );
@@ -216,20 +224,34 @@ public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message,
216
224
}
217
225
218
226
return serializeJson (message ).flatMap (json -> {
219
- final HttpRequest request = requestBuilder .copy ()
227
+ final HttpRequest . Builder request = requestBuilder .copy ()
220
228
.POST (HttpRequest .BodyPublishers .ofString (json ))
221
229
.header ("Accept" , "application/json, text/event-stream" )
222
230
.header ("Content-Type" , "application/json" )
223
- .uri (uri )
224
- .build ();
225
- return Mono .fromFuture (httpClient .sendAsync (request , HttpResponse .BodyHandlers .ofInputStream ()))
231
+ .uri (uri );
232
+ if (mcpSessionId .get () != null ) {
233
+ request .header (MCP_SESSION_ID , mcpSessionId .get ());
234
+ }
235
+
236
+ return Mono .fromFuture (httpClient .sendAsync (request .build (), HttpResponse .BodyHandlers .ofInputStream ()))
226
237
.flatMap (response -> {
227
238
239
+ // server may assign a session ID at initialization time, if yes we
240
+ // have to use it for any subsequent requests
241
+ response .headers ().firstValue (MCP_SESSION_ID ).map (String ::trim ).ifPresent (this .mcpSessionId ::set );
242
+
228
243
// If the response is 202 Accepted, there's no body to process
229
244
if (response .statusCode () == 202 ) {
230
245
return Mono .empty ();
231
246
}
232
247
248
+ // must like server terminate session and the client need to start a
249
+ // new session by sending a new `InitializeRequest` without a session
250
+ // ID attached.
251
+ if (mcpSessionId .get () != null && response .statusCode () == 404 ) {
252
+ mcpSessionId .set (null );
253
+ }
254
+
233
255
if (response .statusCode () == 405 || response .statusCode () == 404 ) {
234
256
LOGGER .warn ("Operation not allowed, falling back to SSE" );
235
257
fallbackToSse .set (true );
0 commit comments