Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f311ff2
feat: cross-process test log correlation via OTLP receiver (#4818)
thomhurst Apr 13, 2026
a32e2ad
fix: address code review findings on OTLP correlation PR
thomhurst Apr 13, 2026
13ee150
refactor: simplify after code review
thomhurst Apr 13, 2026
e3109a9
fix: gate CreateHttpClient override on EnableTelemetryCollection
thomhurst Apr 13, 2026
c3f804a
fix: address third-round review findings
thomhurst Apr 13, 2026
1f9ee36
docs: add telemetry correlation docs, enable by default
thomhurst Apr 13, 2026
8959f79
docs: note field ordering assumption in ParseResourceLogs
thomhurst Apr 13, 2026
b373da4
test: add thorough tests for OTLP log-to-test correlation
thomhurst Apr 13, 2026
ab164b9
fix: update public API snapshots for new InternalsVisibleTo entry
thomhurst Apr 14, 2026
4a9231d
feat: add OTLP integration tests and auto-inject telemetry env vars
thomhurst Apr 14, 2026
3aed4c2
refactor: simplify integration tests per code review
thomhurst Apr 14, 2026
dc20e0c
docs: address review feedback on shared handler and TraceId scope
thomhurst Apr 14, 2026
28d44bb
fix: generate unique TraceId per HTTP request for reliable correlation
thomhurst Apr 14, 2026
c40e230
feat: give each test its own TraceId via root activity with class link
thomhurst Apr 14, 2026
4359189
test: add engine activity tests, fix test body parent chain
thomhurst Apr 14, 2026
0587668
fix: restore parent-child activity hierarchy, move correlation to han…
thomhurst Apr 14, 2026
3e6575c
refactor: replace raw tag key strings with TUnitActivitySource constants
thomhurst Apr 14, 2026
3221c82
fix: replace fixed Task.Delay with polling in OtlpReceiverTests
thomhurst Apr 14, 2026
3ddbf74
fix: address review round 4 findings
thomhurst Apr 14, 2026
477a319
fix: remove mutually exclusive RunOnLinuxOnly/RunOnWindowsOnly attrs
thomhurst Apr 14, 2026
870a6c5
fix: add missing HangDump package to Aspire and ASP.NET test projects
thomhurst Apr 14, 2026
e911de2
fix: skip Aspire, ASP.NET, and RPC test modules on macOS
thomhurst Apr 14, 2026
3dd2b20
fix: enable Docker setup on macOS CI runners
thomhurst Apr 14, 2026
0874d0e
fix: skip Docker-dependent test modules on macOS
thomhurst Apr 14, 2026
87cfffa
fix: skip RPC tests globally until fixed (#5540)
thomhurst Apr 14, 2026
98fc40f
fix: restrict Docker-dependent CI modules to Linux, add missing ApiSe…
thomhurst Apr 14, 2026
08b62fb
fix: fix PublicAPI snapshot ordering and flaky severity test
thomhurst Apr 14, 2026
8c5e115
fix: move mock tests AOT publish into pipeline module
thomhurst Apr 14, 2026
068cdb0
Refactor code structure for improved readability and maintainability
thomhurst Apr 14, 2026
8034a5b
fix: share IntegrationTestFixture in WaitForHealthy tests
thomhurst Apr 14, 2026
f55d181
fix: restart Docker after setup to initialize iptables chains
thomhurst Apr 14, 2026
97178be
fix: remove docker/setup-docker-action
thomhurst Apr 14, 2026
4c3b872
fix: correct ASP.NET execution order assertions
thomhurst Apr 14, 2026
9145da5
fix: use CreateTablesAsync to avoid EnsureCreatedAsync race in parall…
thomhurst Apr 14, 2026
cad9f86
fix: use absolute path for AOT mock test publish output
thomhurst Apr 14, 2026
db8c9f9
merge: resolve conflict with main (NuGet.Protocol 7.3.1)
thomhurst Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: simplify after code review
- ProtobufReader: extract shared ReadLengthDelimited() helper, eliminating
  duplicated bounds-check logic across ReadBytes/ReadString/ReadEmbeddedMessage
- ProtobufReader: add ReadBytesAsSpan() to avoid byte[] allocation for trace ID
- OtlpReceiver: replace unbounded ConcurrentBag<Task> with ConcurrentDictionary
  that self-cleans via ContinueWith — prevents memory leak under sustained load
- OtlpReceiver: track forwarding tasks so DisposeAsync awaits them too
- OtlpReceiver: extract MaxPortBindingAttempts constant
- AspireFixture: cache HttpMessageHandler across CreateHttpClient calls to avoid
  allocating new SocketsHttpHandler per client
  • Loading branch information
thomhurst committed Apr 13, 2026
commit 13ee150a6c46d00269c95b384cefcbf7dd4886ee
8 changes: 6 additions & 2 deletions TUnit.Aspire/AspireFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class AspireFixture<TAppHost> : IAsyncInitializer, IAsyncDisposable
{
private DistributedApplication? _app;
private Telemetry.OtlpReceiver? _otlpReceiver;
private HttpMessageHandler? _httpHandler;

/// <summary>
/// The running Aspire distributed application.
Expand All @@ -46,7 +47,7 @@ public class AspireFixture<TAppHost> : IAsyncInitializer, IAsyncDisposable
/// <returns>An <see cref="HttpClient"/> configured to connect to the resource with baggage propagation.</returns>
public HttpClient CreateHttpClient(string resourceName, string? endpointName = null)
{
var handler = new Http.TUnitBaggagePropagationHandler
_httpHandler ??= new Http.TUnitBaggagePropagationHandler
{
InnerHandler = new SocketsHttpHandler
{
Expand All @@ -55,7 +56,7 @@ public HttpClient CreateHttpClient(string resourceName, string? endpointName = n
},
};

return new HttpClient(handler)
return new HttpClient(_httpHandler, disposeHandler: false)
{
BaseAddress = App.GetEndpoint(resourceName, endpointName),
};
Expand Down Expand Up @@ -346,6 +347,9 @@ public virtual async ValueTask DisposeAsync()
_otlpReceiver = null;
}

_httpHandler?.Dispose();
_httpHandler = null;

GC.SuppressFinalize(this);
}

Expand Down
46 changes: 18 additions & 28 deletions TUnit.Aspire/Telemetry/OtlpLogParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ private static void ParseScopeLogs(ProtobufReader reader, string resourceName, L
break;

case 9 when wireType == WireType.LengthDelimited:
var bytes = reader.ReadBytes();
if (bytes.Length == 16)
var traceBytes = reader.ReadBytesAsSpan();
if (traceBytes.Length == 16)
{
traceId = Convert.ToHexString(bytes);
traceId = Convert.ToHexString(traceBytes);
}

break;
Expand Down Expand Up @@ -285,49 +285,39 @@ public ulong ReadVarint()
return result;
}

public byte[] ReadBytes()
public ReadOnlySpan<byte> ReadBytesAsSpan()
{
var length = (int)ReadVarint();

if ((uint)length > (uint)_data.Length)
{
throw new InvalidOperationException(
$"Protobuf length-delimited field declares {length} bytes but only {_data.Length} remain.");
}
return ReadLengthDelimited();
}

var result = _data[..length].ToArray();
_data = _data[length..];
return result;
public byte[] ReadBytes()
{
return ReadLengthDelimited().ToArray();
}

public string ReadString()
{
var length = (int)ReadVarint();

if ((uint)length > (uint)_data.Length)
{
throw new InvalidOperationException(
$"Protobuf string field declares {length} bytes but only {_data.Length} remain.");
}

var result = Encoding.UTF8.GetString(_data[..length]);
_data = _data[length..];
return result;
return Encoding.UTF8.GetString(ReadLengthDelimited());
}

public ProtobufReader ReadEmbeddedMessage()
{
return new ProtobufReader(ReadLengthDelimited());
}

private ReadOnlySpan<byte> ReadLengthDelimited()
{
var length = (int)ReadVarint();

if ((uint)length > (uint)_data.Length)
{
throw new InvalidOperationException(
$"Protobuf embedded message declares {length} bytes but only {_data.Length} remain.");
$"Protobuf length-delimited field declares {length} bytes but only {_data.Length} remain.");
}

var embedded = new ProtobufReader(_data[..length]);
var result = _data[..length];
_data = _data[length..];
return embedded;
return result;
}

public void Skip(WireType wireType)
Expand Down
27 changes: 18 additions & 9 deletions TUnit.Aspire/Telemetry/OtlpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ namespace TUnit.Aspire.Telemetry;
/// </remarks>
internal sealed class OtlpReceiver : IAsyncDisposable
{
private const int MaxPortBindingAttempts = 10;

private readonly HttpListener _listener;
private readonly CancellationTokenSource _cts = new();
private readonly ConcurrentBag<Task> _inflightRequests = [];
private readonly ConcurrentDictionary<int, Task> _inflightTasks = new();
private readonly HttpClient? _forwardingClient;
private readonly string? _upstreamEndpoint;
private Task? _listenTask;
private int _taskIdCounter;

/// <summary>
/// The port the receiver is listening on.
Expand Down Expand Up @@ -81,11 +84,17 @@ private async Task ListenLoop()
}

// Process each request without blocking the listen loop
var task = Task.Run(() => ProcessRequestAsync(context));
_inflightRequests.Add(task);
TrackTask(Task.Run(() => ProcessRequestAsync(context)));
}
}

private void TrackTask(Task task)
{
var id = Interlocked.Increment(ref _taskIdCounter);
_inflightTasks[id] = task;
task.ContinueWith(_ => _inflightTasks.TryRemove(id, out Task? _), TaskContinuationOptions.ExecuteSynchronously);
}

private async Task ProcessRequestAsync(HttpListenerContext context)
{
if (_cts.IsCancellationRequested)
Expand Down Expand Up @@ -122,10 +131,10 @@ private async Task ProcessRequestAsync(HttpListenerContext context)
ProcessLogs(body);
}

// Forward to upstream if configured (fire-and-forget)
// Forward to upstream if configured
if (_upstreamEndpoint is not null && _forwardingClient is not null)
{
_ = ForwardAsync(path, body, request.ContentType);
TrackTask(ForwardAsync(path, body, request.ContentType));
}

// Return 200 OK with empty protobuf response
Expand Down Expand Up @@ -238,11 +247,11 @@ public async ValueTask DisposeAsync()
}
}

// Wait for any in-flight request processing to complete so we don't
// Wait for any in-flight request/forwarding tasks to complete so we don't
// access TraceRegistry or TestContext after they've been torn down.
try
{
await Task.WhenAll(_inflightRequests).ConfigureAwait(false);
await Task.WhenAll(_inflightTasks.Values).ConfigureAwait(false);
}
catch
{
Expand All @@ -259,7 +268,7 @@ public async ValueTask DisposeAsync()
/// </summary>
private static (HttpListener Listener, int Port) CreateListener()
{
for (var attempt = 0; attempt < 10; attempt++)
for (var attempt = 0; attempt < MaxPortBindingAttempts; attempt++)
{
var port = FindFreePort();
var listener = new HttpListener();
Expand All @@ -276,7 +285,7 @@ private static (HttpListener Listener, int Port) CreateListener()
}
}

throw new InvalidOperationException("Could not bind OTLP listener after 10 attempts.");
throw new InvalidOperationException($"Could not bind OTLP listener after {MaxPortBindingAttempts} attempts.");
}

private static int FindFreePort()
Expand Down
Loading