Skip to content

[DB-1849] Support forwarding of multi-stream writes to the leader#5459

Merged
timothycoleman merged 4 commits intomasterfrom
shaan1337/fwd-multi-stream-writes
Jan 15, 2026
Merged

[DB-1849] Support forwarding of multi-stream writes to the leader#5459
timothycoleman merged 4 commits intomasterfrom
shaan1337/fwd-multi-stream-writes

Conversation

@shaan1337
Copy link
Member

@shaan1337 shaan1337 commented Jan 14, 2026

User description

Changed: Support forwarding of multi-stream writes to the leader

#5437 uses a multi-stream append to write to two different streams when creating a user index. Trying to create a user index on a follower node through the HTTP API would result in the following error as multi-stream writes are currently not forwarded to the leader:

{"code":2,"message":"Exception was thrown by handler. LeaderInfo: Exception of type 'KurrentDB.Core.Services.Transport.Enumerators.ReadResponseException+NotHandled+LeaderInfo' was thrown.","details":[]}%

This PR adds support for forwarding of multi-stream writes to the leader through the TCP API.

Note that the change is not forward compatible - thus trying to forward a multi-stream write from a new version of a follower to an old version of the leader will result in this (benign) error:

Bad request received from 'leader-normal' [...], Error: Could not unwrap network package for command 140.

Auto-created Ticket

#5460

PR Type

Enhancement


Description

  • Add TCP protocol support for multi-stream write forwarding to leader

  • Implement authorization checks for multi-stream write operations

  • Add new TcpCommand types for multi-stream write requests/responses

  • Remove restriction preventing multi-stream writes on follower nodes


Diagram Walkthrough

flowchart LR
  Client["Client Request"]
  Follower["Follower Node"]
  Auth["Authorization Gateway"]
  Dispatcher["ClientWriteTcpDispatcher"]
  Leader["Leader Node"]
  Response["Write Response"]
  
  Client -->|Multi-stream Write| Follower
  Follower -->|Authorize Each Stream| Auth
  Auth -->|Wrap as WriteEventsMultiStream| Dispatcher
  Dispatcher -->|Forward via TCP 0x8C| Leader
  Leader -->|Process & Reply 0x8D| Dispatcher
  Dispatcher -->|Unwrap Response| Response
Loading

File Walkthrough

Relevant files
Enhancement
TcpClientMessageDto.cs
Add multi-stream write message constructors                           

src/KurrentDB.Core/Messages/TcpClientMessageDto.cs

  • Add constructor for WriteEventsMultiStream DTO to initialize stream
    IDs, expected versions, events, stream indexes, and leader requirement
    flag
  • Add constructor for WriteEventsMultiStreamCompleted DTO to initialize
    result, event numbers, positions, and failure information
+24/-0   
AuthorizationGateway.cs
Implement multi-stream write authorization                             

src/KurrentDB.Core/Services/AuthorizationGateway.cs

  • Replace single-stream authorization with multi-stream support in
    Authorize(WriteEvents) method
  • Add AuthorizeMany method to sequentially check authorization for each
    stream in multi-stream writes
  • Add AuthorizeManyAsync method to handle asynchronous authorization
    checks across multiple streams
  • Import System.Collections.Generic namespace
+51/-4   
ClientWriteTcpDispatcher.cs
Add multi-stream write TCP serialization support                 

src/KurrentDB.Core/Services/Transport/Tcp/ClientWriteTcpDispatcher.cs

  • Add UnwrapWriteEventsMultiStream method to deserialize multi-stream
    write requests from TCP packages
  • Add UnwrapWriteEventsMultiStreamCompleted method to deserialize
    multi-stream write responses
  • Modify WrapWriteEvents to handle both single-stream and multi-stream
    writes with appropriate DTO types
  • Split WrapWriteEventsCompleted into separate methods for single-stream
    and multi-stream responses
  • Register new unwrappers for WriteEventsMultiStream and
    WriteEventsMultiStreamCompleted TCP commands
+100/-6 
TcpCommand.cs
Define new TCP command types                                                         

src/KurrentDB.Core/Services/Transport/Tcp/TcpCommand.cs

  • Add WriteEventsMultiStream = 0x8C command type for multi-stream write
    requests
  • Add WriteEventsMultiStreamCompleted = 0x8D command type for
    multi-stream write responses
+3/-0     
ClientMessageDtos.proto
Define multi-stream write protobuf messages                           

src/Protos/ClientAPI/ClientMessageDtos.proto

  • Add WriteEventsMultiStream protobuf message with stream IDs, expected
    versions, events, stream indexes, and leader flag
  • Add WriteEventsMultiStreamCompleted protobuf message with result,
    event numbers, positions, and failure tracking
+19/-0   
Bug fix
ClusterVNodeController.cs
Remove multi-stream write forwarding restriction                 

src/KurrentDB.Core/Services/VNode/ClusterVNodeController.cs

  • Remove multi-stream write restriction from HandleAsNonLeader method
  • Update condition to only check RequireLeader flag, allowing
    multi-stream writes to be forwarded
+1/-2     

@shaan1337 shaan1337 force-pushed the shaan1337/fwd-multi-stream-writes branch 3 times, most recently from ace3009 to 06c8b9f Compare January 14, 2026 13:58
@shaan1337 shaan1337 changed the title Forwarding of multi-stream writes to the leader Support forwarding of multi-stream writes to the leader Jan 15, 2026
@shaan1337 shaan1337 force-pushed the shaan1337/fwd-multi-stream-writes branch from 06c8b9f to e00cef0 Compare January 15, 2026 07:07
@shaan1337 shaan1337 added the breaking This PR contains a breaking change label Jan 15, 2026
@shaan1337 shaan1337 marked this pull request as ready for review January 15, 2026 07:17
@shaan1337 shaan1337 requested a review from a team as a code owner January 15, 2026 07:17
@qodo-code-review
Copy link
Contributor

qodo-code-review bot commented Jan 15, 2026

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🔴
Authorization bypass

Description: The new multi-stream authorization flow can publish the write request before all async
access checks complete because AuthorizeMany calls AuthorizeManyAsync(...) when
CheckAccessAsync(...) is not completed but does not return/break, allowing the loop to
continue and potentially reach destination.Publish(request) while authorization is still
pending (authorization bypass for multi-stream writes).
AuthorizationGateway.cs [396-434]

Referred Code
void AuthorizeMany<TRequest>(
	ClaimsPrincipal user,
	ReadOnlyMemory<Operation> operations,
	IEnvelope replyTo,
	IPublisher destination,
	TRequest request,
	Func<TRequest, Message> createAccessDenied) where TRequest : Message {
	while (!operations.IsEmpty) {
		var operation = operations.Span[0];
		operations = operations[1..];

		var accessCheck = authorizationProvider.CheckAccessAsync(user, operation, CancellationToken.None);
		if (!accessCheck.IsCompleted)
			AuthorizeManyAsync(accessCheck, user, operations, replyTo, destination, request, createAccessDenied);
		else {
			if (accessCheck.Result)
				continue;

			replyTo.ReplyWith(createAccessDenied(request));
			return;
		}


 ... (clipped 18 lines)
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
🟢
No codebase code duplication found New Components Detected:
- AuthorizeMany
- AuthorizeManyAsync
- UnwrapWriteEventsMultiStreamCompleted
Custom Compliance
🟢
Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

🔴
Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Missing input validation: The new multi-stream TCP unwrap path does not validate that event_stream_ids,
expected_versions, and event_stream_indexes are consistent and safe to use, risking
runtime exceptions or malformed request handling from untrusted network input.

Referred Code
private ClientMessage.WriteEvents UnwrapWriteEventsMultiStream(TcpPackage package, IEnvelope envelope, ClaimsPrincipal user) {
	var dto = package.Data.Deserialize<WriteEventsMultiStream>();
	if (dto == null)
		return null;

	var events = new Event[dto.Events.Count];
	for (int i = 0; i < events.Length; ++i) {
		// ReSharper disable PossibleNullReferenceException
		var e = dto.Events[i];
		// ReSharper restore PossibleNullReferenceException
		events[i] = new Event(new Guid(e.EventId.ToByteArray()), e.EventType, e.DataContentType == 1,
			e.Data.ToByteArray(), false, e.Metadata.ToByteArray());
	}

	var cts = new CancellationTokenSource();
	var envelopeWrapper = new CallbackEnvelope(OnMessage);
	cts.CancelAfter(_writeTimeout);

	return new ClientMessage.WriteEvents(
		Guid.NewGuid(),
		package.CorrelationId,


 ... (clipped 9 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Authorization bypass risk: AuthorizeMany starts an async access check via AuthorizeManyAsync but does not return,
allowing the loop to continue and potentially publish the write request before
authorization completes.

Referred Code
void AuthorizeMany<TRequest>(
	ClaimsPrincipal user,
	ReadOnlyMemory<Operation> operations,
	IEnvelope replyTo,
	IPublisher destination,
	TRequest request,
	Func<TRequest, Message> createAccessDenied) where TRequest : Message {
	while (!operations.IsEmpty) {
		var operation = operations.Span[0];
		operations = operations[1..];

		var accessCheck = authorizationProvider.CheckAccessAsync(user, operation, CancellationToken.None);
		if (!accessCheck.IsCompleted)
			AuthorizeManyAsync(accessCheck, user, operations, replyTo, destination, request, createAccessDenied);
		else {
			if (accessCheck.Result)
				continue;

			replyTo.ReplyWith(createAccessDenied(request));
			return;
		}


 ... (clipped 4 lines)

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@shaan1337 shaan1337 changed the title Support forwarding of multi-stream writes to the leader [DB-1849] Support forwarding of multi-stream writes to the leader Jan 15, 2026
@linear
Copy link

linear bot commented Jan 15, 2026

@qodo-code-review
Copy link
Contributor

qodo-code-review bot commented Jan 15, 2026

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent crashes from unhandled exceptions
Suggestion Impact:The commit wrapped the body of AuthorizeManyAsync in a try-catch, added Serilog logging of exceptions, and adjusted the authorization flow to early-return on access denial. This addresses the core concern of unhandled exceptions in an async void method.

code diff:

+using Serilog;
 using static KurrentDB.Core.Messages.ClientMessage;
 
 namespace KurrentDB.Core.Services;
@@ -429,10 +430,27 @@
 		IPublisher destination,
 		TRequest request,
 		Func<TRequest, Message> createAccessDenied) where TRequest : Message {
-		if (await accessCheck)
-			AuthorizeMany(user, operations, replyTo, destination, request, createAccessDenied);
-		else
-			replyTo.ReplyWith(createAccessDenied(request));
+		try {
+			if (!await accessCheck) {
+				replyTo.ReplyWith(createAccessDenied(request));
+				return;
+			}
+
+			while (!operations.IsEmpty) {
+				var operation = operations.Span[0];
+				operations = operations[1..];
+
+				if (await authorizationProvider.CheckAccessAsync(user, operation, CancellationToken.None))
+					continue;
+
+				replyTo.ReplyWith(createAccessDenied(request));
+				return;
+			}
+
+			destination.Publish(request);
+		} catch (Exception ex) {
+			Log.Error(ex, "Unhandled exception during authorization");
+		}

Add a try-catch block inside the async void method AuthorizeManyAsync to prevent
unhandled exceptions from await accessCheck from crashing the application.

src/KurrentDB.Core/Services/AuthorizationGateway.cs [422-434]

 async void AuthorizeManyAsync<TRequest>(
 	ValueTask<bool> accessCheck,
 	ClaimsPrincipal user,
 	ReadOnlyMemory<Operation> operations,
 	IEnvelope replyTo,
 	IPublisher destination,
 	TRequest request,
 	Func<TRequest, Message> createAccessDenied) where TRequest : Message {
-	if (await accessCheck)
-		AuthorizeMany(user, operations, replyTo, destination, request, createAccessDenied);
-	else
+	try {
+		if (await accessCheck)
+			AuthorizeMany(user, operations, replyTo, destination, request, createAccessDenied);
+		else
+			replyTo.ReplyWith(createAccessDenied(request));
+	} catch (Exception ex) {
+		// It's important to log the exception. Assuming a logger is available.
+		// Log.Error(ex, "Error during asynchronous authorization for multi-stream write.");
 		replyTo.ReplyWith(createAccessDenied(request));
+	}
 }

[Suggestion processed]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that an unhandled exception in an async void method can crash the application and proposes a try-catch block to handle potential exceptions from await accessCheck gracefully.

Medium
Fix span pattern matching

Replace the invalid span pattern match msg.FailureStreamIndexes.Span is [ 0 ]
with an explicit length and index check (msg.FailureStreamIndexes.Length == 1 &&
msg.FailureStreamIndexes.Span[0] == 0).

src/KurrentDB.Core/Services/Transport/Tcp/ClientWriteTcpDispatcher.cs [226-227]

-if (msg.FailureStreamIndexes.Span.IsEmpty || msg.FailureStreamIndexes.Span is [ 0 ])
+if (msg.FailureStreamIndexes.Length == 0
+    || (msg.FailureStreamIndexes.Length == 1 && msg.FailureStreamIndexes.Span[0] == 0))
     return WrapWriteEventsCompletedForSingleStream(msg);
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that the pattern matching is [ 0 ] on a ReadOnlySpan<T> is invalid C# syntax and provides a correct, equivalent check, improving code correctness and preventing a compilation error.

Medium
  • Update

@shaan1337 shaan1337 force-pushed the shaan1337/fwd-multi-stream-writes branch from e00cef0 to c1ac8c3 Compare January 15, 2026 09:40
@shaan1337 shaan1337 force-pushed the shaan1337/fwd-multi-stream-writes branch from c48b66c to 198a088 Compare January 15, 2026 11:46
@shaan1337 shaan1337 removed the breaking This PR contains a breaking change label Jan 15, 2026
@timothycoleman timothycoleman merged commit 5b27d97 into master Jan 15, 2026
89 checks passed
@timothycoleman timothycoleman deleted the shaan1337/fwd-multi-stream-writes branch January 15, 2026 16:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants