@@ -16,6 +16,7 @@ namespace ModelContextProtocol.Protocol.Transport;
16
16
/// </summary>
17
17
internal sealed class SseClientSessionTransport : TransportBase
18
18
{
19
+ private readonly string _endpointName ;
19
20
private readonly HttpClient _httpClient ;
20
21
private readonly SseClientTransportOptions _options ;
21
22
private readonly Uri _sseEndpoint ;
@@ -25,16 +26,15 @@ internal sealed class SseClientSessionTransport : TransportBase
25
26
private readonly ILogger _logger ;
26
27
private readonly TaskCompletionSource < bool > _connectionEstablished ;
27
28
28
- private string EndpointName => $ "Client (SSE) for ({ _options . Id } : { _options . Name } )";
29
-
30
29
/// <summary>
31
30
/// SSE transport for client endpoints. Unlike stdio it does not launch a process, but connects to an existing server.
32
31
/// The HTTP server can be local or remote, and must support the SSE protocol.
33
32
/// </summary>
34
33
/// <param name="transportOptions">Configuration options for the transport.</param>
35
34
/// <param name="httpClient">The HTTP client instance used for requests.</param>
36
35
/// <param name="loggerFactory">Logger factory for creating loggers.</param>
37
- public SseClientSessionTransport ( SseClientTransportOptions transportOptions , HttpClient httpClient , ILoggerFactory ? loggerFactory )
36
+ /// <param name="endpointName">The endpoint name used for logging purposes.</param>
37
+ public SseClientSessionTransport ( SseClientTransportOptions transportOptions , HttpClient httpClient , ILoggerFactory ? loggerFactory , string endpointName )
38
38
: base ( loggerFactory )
39
39
{
40
40
Throw . IfNull ( transportOptions ) ;
@@ -46,6 +46,7 @@ public SseClientSessionTransport(SseClientTransportOptions transportOptions, Htt
46
46
_connectionCts = new CancellationTokenSource ( ) ;
47
47
_logger = ( ILogger ? ) loggerFactory ? . CreateLogger < SseClientTransport > ( ) ?? NullLogger . Instance ;
48
48
_connectionEstablished = new TaskCompletionSource < bool > ( ) ;
49
+ _endpointName = endpointName ;
49
50
}
50
51
51
52
/// <inheritdoc/>
@@ -55,14 +56,14 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default)
55
56
{
56
57
if ( IsConnected )
57
58
{
58
- _logger . TransportAlreadyConnected ( EndpointName ) ;
59
+ _logger . TransportAlreadyConnected ( _endpointName ) ;
59
60
throw new McpTransportException ( "Transport is already connected" ) ;
60
61
}
61
62
62
63
// Start message receiving loop
63
64
_receiveTask = ReceiveMessagesAsync ( _connectionCts . Token ) ;
64
65
65
- _logger . TransportReadingMessages ( EndpointName ) ;
66
+ _logger . TransportReadingMessages ( _endpointName ) ;
66
67
67
68
await _connectionEstablished . Task . WaitAsync ( _options . ConnectionTimeout , cancellationToken ) . ConfigureAwait ( false ) ;
68
69
}
@@ -73,7 +74,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default)
73
74
}
74
75
catch ( Exception ex )
75
76
{
76
- _logger . TransportConnectFailed ( EndpointName , ex ) ;
77
+ _logger . TransportConnectFailed ( _endpointName , ex ) ;
77
78
await CloseAsync ( ) . ConfigureAwait ( false ) ;
78
79
throw new McpTransportException ( "Failed to connect transport" , ex ) ;
79
80
}
@@ -116,29 +117,29 @@ public override async Task SendMessageAsync(
116
117
// If the response is not a JSON-RPC response, it is an SSE message
117
118
if ( responseContent . Equals ( "accepted" , StringComparison . OrdinalIgnoreCase ) )
118
119
{
119
- _logger . SSETransportPostAccepted ( EndpointName , messageId ) ;
120
+ _logger . SSETransportPostAccepted ( _endpointName , messageId ) ;
120
121
// The response will arrive as an SSE message
121
122
}
122
123
else
123
124
{
124
125
JsonRpcResponse initializeResponse = JsonSerializer . Deserialize ( responseContent , McpJsonUtilities . JsonContext . Default . JsonRpcResponse ) ??
125
126
throw new McpTransportException ( "Failed to initialize client" ) ;
126
127
127
- _logger . TransportReceivedMessageParsed ( EndpointName , messageId ) ;
128
+ _logger . TransportReceivedMessageParsed ( _endpointName , messageId ) ;
128
129
await WriteMessageAsync ( initializeResponse , cancellationToken ) . ConfigureAwait ( false ) ;
129
- _logger . TransportMessageWritten ( EndpointName , messageId ) ;
130
+ _logger . TransportMessageWritten ( _endpointName , messageId ) ;
130
131
}
131
132
return ;
132
133
}
133
134
134
135
// Otherwise, check if the response was accepted (the response will come as an SSE message)
135
136
if ( responseContent . Equals ( "accepted" , StringComparison . OrdinalIgnoreCase ) )
136
137
{
137
- _logger . SSETransportPostAccepted ( EndpointName , messageId ) ;
138
+ _logger . SSETransportPostAccepted ( _endpointName , messageId ) ;
138
139
}
139
140
else
140
141
{
141
- _logger . SSETransportPostNotAccepted ( EndpointName , messageId , responseContent ) ;
142
+ _logger . SSETransportPostNotAccepted ( _endpointName , messageId , responseContent ) ;
142
143
throw new McpTransportException ( "Failed to send message" ) ;
143
144
}
144
145
}
@@ -216,17 +217,17 @@ private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
216
217
}
217
218
catch ( OperationCanceledException ) when ( cancellationToken . IsCancellationRequested )
218
219
{
219
- _logger . TransportReadMessagesCancelled ( EndpointName ) ;
220
+ _logger . TransportReadMessagesCancelled ( _endpointName ) ;
220
221
// Normal shutdown
221
222
}
222
223
catch ( IOException ) when ( cancellationToken . IsCancellationRequested )
223
224
{
224
- _logger . TransportReadMessagesCancelled ( EndpointName ) ;
225
+ _logger . TransportReadMessagesCancelled ( _endpointName ) ;
225
226
// Normal shutdown
226
227
}
227
228
catch ( Exception ex ) when ( ! cancellationToken . IsCancellationRequested )
228
229
{
229
- _logger . TransportConnectionError ( EndpointName , ex ) ;
230
+ _logger . TransportConnectionError ( _endpointName , ex ) ;
230
231
231
232
reconnectAttempts ++ ;
232
233
if ( reconnectAttempts >= _options . MaxReconnectAttempts )
@@ -245,7 +246,7 @@ private async Task ProcessSseMessage(string data, CancellationToken cancellation
245
246
{
246
247
if ( ! IsConnected )
247
248
{
248
- _logger . TransportMessageReceivedBeforeConnected ( EndpointName , data ) ;
249
+ _logger . TransportMessageReceivedBeforeConnected ( _endpointName , data ) ;
249
250
return ;
250
251
}
251
252
@@ -254,7 +255,7 @@ private async Task ProcessSseMessage(string data, CancellationToken cancellation
254
255
var message = JsonSerializer . Deserialize ( data , McpJsonUtilities . JsonContext . Default . IJsonRpcMessage ) ;
255
256
if ( message == null )
256
257
{
257
- _logger . TransportMessageParseUnexpectedType ( EndpointName , data ) ;
258
+ _logger . TransportMessageParseUnexpectedType ( _endpointName , data ) ;
258
259
return ;
259
260
}
260
261
@@ -264,13 +265,13 @@ private async Task ProcessSseMessage(string data, CancellationToken cancellation
264
265
messageId = messageWithId . Id . ToString ( ) ;
265
266
}
266
267
267
- _logger . TransportReceivedMessageParsed ( EndpointName , messageId ) ;
268
+ _logger . TransportReceivedMessageParsed ( _endpointName , messageId ) ;
268
269
await WriteMessageAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
269
- _logger . TransportMessageWritten ( EndpointName , messageId ) ;
270
+ _logger . TransportMessageWritten ( _endpointName , messageId ) ;
270
271
}
271
272
catch ( JsonException ex )
272
273
{
273
- _logger . TransportMessageParseFailed ( EndpointName , data , ex ) ;
274
+ _logger . TransportMessageParseFailed ( _endpointName , data , ex ) ;
274
275
}
275
276
}
276
277
@@ -280,7 +281,7 @@ private void HandleEndpointEvent(string data)
280
281
{
281
282
if ( string . IsNullOrEmpty ( data ) )
282
283
{
283
- _logger . TransportEndpointEventInvalid ( EndpointName , data ) ;
284
+ _logger . TransportEndpointEventInvalid ( _endpointName , data ) ;
284
285
return ;
285
286
}
286
287
@@ -308,7 +309,7 @@ private void HandleEndpointEvent(string data)
308
309
}
309
310
catch ( JsonException ex )
310
311
{
311
- _logger . TransportEndpointEventParseFailed ( EndpointName , data , ex ) ;
312
+ _logger . TransportEndpointEventParseFailed ( _endpointName , data , ex ) ;
312
313
throw new McpTransportException ( "Failed to parse endpoint event" , ex ) ;
313
314
}
314
315
}
0 commit comments