Skip to content

Commit cf79126

Browse files
committed
Move subscription management to a dedicated class and add subscription cleanup on session end
1 parent be4d5f0 commit cf79126

File tree

3 files changed

+119
-17
lines changed

3 files changed

+119
-17
lines changed

samples/EverythingServer/Program.cs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,24 @@
1515

1616
var builder = WebApplication.CreateBuilder(args);
1717

18-
// Subscriptions tracks resource URIs to McpServer instances
19-
// Use thread-safe data structures since handlers can run in parallel
20-
// even in the context of a single session.
21-
ConcurrentDictionary<string, ConcurrentBag<IMcpServer>> subscriptions = new();
22-
2318
builder.Services
2419
.AddMcpServer()
25-
.WithHttpTransport()
20+
.WithHttpTransport(options =>
21+
{
22+
// Add a RunSessionHandler to remove all subscriptions for the session when it ends
23+
options.RunSessionHandler = async (httpContext, mcpServer, token) =>
24+
{
25+
try
26+
{
27+
await mcpServer.RunAsync(token);
28+
}
29+
finally
30+
{
31+
// This code runs when the session ends
32+
SubscriptionManager.RemoveAllSubscriptions(mcpServer);
33+
}
34+
};
35+
})
2636
.WithTools<AddTool>()
2737
.WithTools<AnnotatedMessageTool>()
2838
.WithTools<EchoTool>()
@@ -35,10 +45,13 @@
3545
.WithResources<SimpleResourceType>()
3646
.WithSubscribeToResourcesHandler(async (ctx, ct) =>
3747
{
48+
if (ctx.Server.SessionId == null)
49+
{
50+
throw new McpException("Cannot add subscription for server with null SessionId");
51+
}
3852
if (ctx.Params?.Uri is { } uri)
3953
{
40-
var bag = subscriptions.GetOrAdd(uri, _ => new ConcurrentBag<IMcpServer>());
41-
bag.Add(ctx.Server);
54+
SubscriptionManager.AddSubscription(uri, ctx.Server);
4255

4356
await ctx.Server.SampleAsync([
4457
new ChatMessage(ChatRole.System, "You are a helpful test server"),
@@ -56,14 +69,13 @@ await ctx.Server.SampleAsync([
5669
})
5770
.WithUnsubscribeFromResourcesHandler(async (ctx, ct) =>
5871
{
72+
if (ctx.Server.SessionId == null)
73+
{
74+
throw new McpException("Cannot remove subscription for server with null SessionId");
75+
}
5976
if (ctx.Params?.Uri is { } uri)
6077
{
61-
if (subscriptions.TryGetValue(uri, out var bag))
62-
{
63-
// Remove ctx.Server from the subscription bag (ConcurrentBag does not support removal, so recreate)
64-
var newBag = new ConcurrentBag<IMcpServer>(bag.Where(s => s != ctx.Server));
65-
subscriptions[uri] = newBag;
66-
}
78+
SubscriptionManager.RemoveSubscription(uri, ctx.Server);
6779
}
6880
return new EmptyResult();
6981
})
@@ -143,7 +155,6 @@ await ctx.Server.SampleAsync([
143155
.WithLogging(b => b.SetResourceBuilder(resource))
144156
.UseOtlpExporter();
145157

146-
builder.Services.AddSingleton(subscriptions);
147158
builder.Services.AddHostedService<SubscriptionMessageSender>();
148159
builder.Services.AddHostedService<LoggingUpdateMessageSender>();
149160

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
using ModelContextProtocol;
2+
using ModelContextProtocol.Server;
3+
4+
// This class manages subscriptions to resources by McpServer instances.
5+
// The subscription information must be accessed in a thread-safe manner since handlers
6+
// can run in parallel even in the context of a single session.
7+
static class SubscriptionManager
8+
{
9+
// Subscriptions tracks resource URIs to bags of McpServer instances (thread-safe via locking)
10+
private static Dictionary<string, List<IMcpServer>> subscriptions = new();
11+
12+
// SessionSubscriptions is a secondary index to subscriptions to allow efficient removal of all
13+
// subscriptions for a given session when it ends. (thread-safe via locking)
14+
private static Dictionary<string /* sessionId */, List<string> /* uris */> sessionSubscriptions = new();
15+
16+
private static readonly object _subscriptionsLock = new();
17+
18+
public static void AddSubscription(string uri, IMcpServer server)
19+
{
20+
if (server.SessionId == null)
21+
{
22+
throw new McpException("Cannot add subscription for server with null SessionId");
23+
}
24+
lock (_subscriptionsLock)
25+
{
26+
subscriptions[uri] ??= new List<IMcpServer>();
27+
subscriptions[uri].Add(server);
28+
sessionSubscriptions[server.SessionId] ??= new List<string>();
29+
sessionSubscriptions[server.SessionId].Add(uri);
30+
}
31+
}
32+
33+
public static void RemoveSubscription(string uri, IMcpServer server)
34+
{
35+
if (server.SessionId == null)
36+
{
37+
throw new McpException("Cannot remove subscription for server with null SessionId");
38+
}
39+
lock (_subscriptionsLock)
40+
{
41+
if (subscriptions.ContainsKey(uri))
42+
{
43+
// Remove the server from the list of subscriptions for the URI
44+
subscriptions[uri] = subscriptions[uri].Where(s => s.SessionId != server.SessionId).ToList();
45+
if (subscriptions[uri]?.Count == 0)
46+
{
47+
subscriptions.Remove(uri);
48+
}
49+
}
50+
// Remove the URI from the list of subscriptions for the session
51+
sessionSubscriptions[server.SessionId]?.Remove(uri);
52+
if (sessionSubscriptions[server.SessionId]?.Count == 0)
53+
{
54+
sessionSubscriptions.Remove(server.SessionId);
55+
}
56+
}
57+
}
58+
59+
public static IDictionary<string, List<IMcpServer>> GetSubscriptions()
60+
{
61+
lock (_subscriptionsLock)
62+
{
63+
// Return a copy of the subscriptions dictionary to avoid external modification
64+
return subscriptions.ToDictionary(entry => entry.Key,
65+
entry => entry.Value.ToList());
66+
}
67+
}
68+
69+
public static void RemoveAllSubscriptions(IMcpServer server)
70+
{
71+
if (server.SessionId is { } sessionId)
72+
{
73+
lock (_subscriptionsLock)
74+
{
75+
// Remove all subscriptions for the session
76+
if (sessionSubscriptions.TryGetValue(sessionId, out var uris))
77+
{
78+
foreach (var uri in uris)
79+
{
80+
subscriptions[uri] = subscriptions[uri].Where(s => s.SessionId != sessionId).ToList();
81+
if (subscriptions[uri]?.Count == 0)
82+
{
83+
subscriptions.Remove(uri);
84+
}
85+
}
86+
sessionSubscriptions.Remove(sessionId);
87+
}
88+
}
89+
}
90+
}
91+
}

samples/EverythingServer/SubscriptionMessageSender.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using ModelContextProtocol.Server;
33

44
using System.Collections.Concurrent;
5-
internal class SubscriptionMessageSender(ConcurrentDictionary<string, ConcurrentBag<IMcpServer>> subscriptions) : BackgroundService
5+
internal class SubscriptionMessageSender() : BackgroundService
66
{
77
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
88
{
@@ -13,7 +13,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
1313
{
1414
try
1515
{
16-
foreach (var (uri, servers) in subscriptions)
16+
foreach (var (uri, servers) in SubscriptionManager.GetSubscriptions())
1717
{
1818
foreach (var server in servers)
1919
{

0 commit comments

Comments
 (0)