Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d24d165
feat: add Batch Processor for Logs
Flash0ver Jun 26, 2025
aad0599
test: Batch Processor for Logs
Flash0ver Jun 26, 2025
76fcc1b
docs: Batch Processor for Logs
Flash0ver Jun 26, 2025
2ad33f6
test: fix unavailable API on TargetFramework=net48
Flash0ver Jun 27, 2025
38e1c04
test: run all Logs tests on full framework
Flash0ver Jun 27, 2025
f7a43b8
ref: remove usage of System.Threading.Lock
Flash0ver Jun 27, 2025
e6b0b74
ref: rename members for clarity
Flash0ver Jun 30, 2025
a84b78f
Merge branch 'feat/logs' into feat/logs-buffering
Flash0ver Jun 30, 2025
53c90ea
ref: delete Timer-Abstraction and change to System.Threading.Timer
Flash0ver Jul 1, 2025
6580632
ref: delete .ctor only called from tests
Flash0ver Jul 1, 2025
6e2ee9b
ref: switch Buffer-Processor to be lock-free but discarding
Flash0ver Jul 2, 2025
0774709
test: fix BatchBuffer and Tests
Flash0ver Jul 3, 2025
d9ae794
fix: flushing buffer on Timeout
Flash0ver Jul 8, 2025
7e1f5ea
feat: add Backpressure-ClientReport
Flash0ver Jul 10, 2025
365a2fb
ref: make BatchProcessor more resilient
Flash0ver Jul 11, 2025
c478391
Format code
getsentry-bot Jul 11, 2025
211beea
Merge branch 'feat/logs' into feat/logs-buffering
Flash0ver Jul 11, 2025
c699c2d
test: fix on .NET Framework
Flash0ver Jul 11, 2025
b21b537
fix: BatchBuffer flushed on Shutdown/Dispose
Flash0ver Jul 14, 2025
e8850db
ref: minimize locking
Flash0ver Jul 22, 2025
57f9ccc
Merge branch 'feat/logs' into feat/logs-buffering
Flash0ver Jul 22, 2025
c63bc53
ref: rename BatchProcessor to StructuredLogBatchProcessor
Flash0ver Jul 22, 2025
f28cc6d
ref: rename BatchBuffer to StructuredLogBatchBuffer
Flash0ver Jul 22, 2025
79ce02e
ref: remove internal options
Flash0ver Jul 23, 2025
0702796
test: ref
Flash0ver Jul 23, 2025
4e5f097
perf: update Benchmark result
Flash0ver Jul 23, 2025
1276725
ref: make SentryStructuredLogger. Flush abstract
Flash0ver Jul 24, 2025
3816fab
ref: guard an invariant of the Flush-Scope
Flash0ver Jul 24, 2025
28b6654
ref: remove unused values
Flash0ver Jul 24, 2025
1eef330
docs: improve comments
Flash0ver Jul 24, 2025
d72ca5c
perf: update Benchmark after signature change
Flash0ver Jul 25, 2025
49fefc1
ref: discard logs gracefully when Hub is (being) disposed
Flash0ver Jul 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
ref: minimize locking
  • Loading branch information
Flash0ver committed Jul 22, 2025
commit e8850db578a5f3a4ba9afb940fa934e365e89020
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
```

BenchmarkDotNet v0.13.12, macOS 15.5 (24F74) [Darwin 24.5.0]
Apple M3 Pro, 1 CPU, 12 logical and 12 physical cores
.NET SDK 9.0.301
[Host] : .NET 8.0.14 (8.0.1425.11118), Arm64 RyuJIT AdvSIMD
DefaultJob : .NET 8.0.14 (8.0.1425.11118), Arm64 RyuJIT AdvSIMD


```
| Method | BatchCount | OperationsPerInvoke | Mean | Error | StdDev | Gen0 | Allocated |
|---------------- |----------- |-------------------- |----------:|----------:|----------:|-------:|----------:|
| **EnqueueAndFlush** | **10** | **100** | **2.209 μs** | **0.0221 μs** | **0.0196 μs** | **0.7095** | **5.8 KB** |
| **EnqueueAndFlush** | **10** | **200** | **3.829 μs** | **0.0287 μs** | **0.0268 μs** | **1.3199** | **10.8 KB** |
| **EnqueueAndFlush** | **10** | **1000** | **18.363 μs** | **0.2813 μs** | **0.2889 μs** | **6.1951** | **50.8 KB** |
| **EnqueueAndFlush** | **100** | **100** | **1.021 μs** | **0.0133 μs** | **0.0118 μs** | **0.2441** | **2 KB** |
| **EnqueueAndFlush** | **100** | **200** | **1.816 μs** | **0.0211 μs** | **0.0176 μs** | **0.3910** | **3.2 KB** |
| **EnqueueAndFlush** | **100** | **1000** | **8.762 μs** | **0.0761 μs** | **0.0675 μs** | **1.5564** | **12.83 KB** |
246 changes: 152 additions & 94 deletions src/Sentry/Internal/BatchBuffer.cs
Original file line number Diff line number Diff line change
@@ -1,159 +1,156 @@
using Sentry.Infrastructure;
using Sentry.Threading;

namespace Sentry.Internal;

/// <summary>
/// A slim wrapper over an <see cref="System.Array"/>, intended for buffering.
/// <para>Requires a minimum capacity of 2.</para>
/// A wrapper over an <see cref="System.Array"/>, intended for reusable buffering.
/// </summary>
/// <remarks>
/// Not all members are thread-safe.
/// See individual members for notes on thread safety.
/// Must be attempted to flush via <see cref="TryEnterFlushScope"/> when either the <see cref="Capacity"/> is reached,
/// or when the <see cref="_timeout"/> is exceeded.
/// </remarks>
[DebuggerDisplay("Name = {Name}, Capacity = {Capacity}, IsEmpty = {IsEmpty}, IsFull = {IsFull}, AddCount = {AddCount}")]
[DebuggerDisplay("Name = {Name}, Capacity = {Capacity}, Additions = {_additions}, AddCount = {AddCount}")]
internal sealed class BatchBuffer<T> : IDisposable
{
private readonly T[] _array;
private int _additions;
private readonly CounterEvent _addCounter;
private readonly NonReentrantLock _addLock;
private readonly ScopedCountdownLock _addLock;

private readonly ISystemClock _clock;
private readonly Timer _timer;
private readonly TimeSpan _timeout;

private readonly Action<BatchBuffer<T>, DateTimeOffset> _timeoutExceededAction;

private DateTimeOffset _lastFlush = DateTimeOffset.MinValue;

/// <summary>
/// Create a new buffer.
/// </summary>
/// <param name="capacity">Length of the new buffer.</param>
/// <param name="timeout">When the timeout exceeds after an item has been added and the <paramref name="capacity"/> not yet been exceeded, <paramref name="timeoutExceededAction"/> is invoked.</param>
/// <param name="clock">The <see cref="ISystemClock"/> with which to interpret <paramref name="timeout"/>.</param>
/// <param name="timeoutExceededAction">The operation to execute when the <paramref name="timeout"/> exceeds if the buffer is neither empty nor full.</param>
/// <param name="name">Name of the new buffer.</param>
/// <exception cref="ArgumentOutOfRangeException">When <paramref name="capacity"/> is less than <see langword="2"/>.</exception>
public BatchBuffer(int capacity, string? name = null)
public BatchBuffer(int capacity, TimeSpan timeout, ISystemClock clock, Action<BatchBuffer<T>, DateTimeOffset> timeoutExceededAction, string? name = null)
{
ThrowIfLessThanTwo(capacity, nameof(capacity));
Name = name ?? "default";
ThrowIfNegativeOrZero(timeout, nameof(timeout));

_array = new T[capacity];
_additions = 0;
_addCounter = new CounterEvent();
_addLock = new NonReentrantLock();
_addLock = new ScopedCountdownLock();

_clock = clock;
_timer = new Timer(OnIntervalElapsed, this, Timeout.Infinite, Timeout.Infinite);
_timeout = timeout;

_timeoutExceededAction = timeoutExceededAction;
Name = name ?? "default";
}

/// <summary>
/// Name of the buffer.
/// </summary>
/// <remarks>
/// This property is thread-safe.
/// </remarks>
internal string Name { get; }

/// <summary>
/// Maximum number of elements that can be added to the buffer.
/// </summary>
/// <remarks>
/// This property is thread-safe.
/// </remarks>
internal int Capacity => _array.Length;

/// <summary>
/// Have any elements been added to the buffer?
/// Gets a value indicating whether this buffer is empty.
/// </summary>
/// <remarks>
/// This property is not thread-safe.
/// </remarks>
internal bool IsEmpty => _additions == 0;

/// <summary>
/// Have <see cref="Capacity"/> number of elements been added to the buffer?
/// Number of <see cref="Add"/> operations in progress.
/// </summary>
/// <remarks>
/// This property is not thread-safe.
/// This property is used for debugging only.
/// </remarks>
internal bool IsFull => _additions >= _array.Length;
private int AddCount => _addLock.Count;

/// <summary>
/// Number of <see cref="TryAdd"/> operations in progress.
/// Attempt to atomically add one element to the buffer.
/// </summary>
/// <remarks>
/// This property is used for debugging only.
/// </remarks>
private int AddCount => _addCounter.Count;
/// <param name="item">Element attempted to be added atomically.</param>
/// <returns>An <see cref="BatchBufferAddStatus"/> describing the result of the operation.</returns>
internal BatchBufferAddStatus Add(T item)
{
using var scope = _addLock.TryEnterCounterScope();
if (!scope.IsEntered)
{
return BatchBufferAddStatus.IgnoredIsFlushing;
}

var count = Interlocked.Increment(ref _additions);

if (count == 1)
{
EnableTimer();
_array[count - 1] = item;
return BatchBufferAddStatus.AddedFirst;
}

if (count < _array.Length)
{
_array[count - 1] = item;
return BatchBufferAddStatus.Added;
}

if (count == _array.Length)
{
DisableTimer();
_array[count - 1] = item;
return BatchBufferAddStatus.AddedLast;
}

Debug.Assert(count > _array.Length);
return BatchBufferAddStatus.IgnoredCapacityExceeded;
}

/// <summary>
/// Enters a <see cref="FlushScope"/> used to ensure that only a single flush operation is in progress.
/// </summary>
/// <returns>A <see cref="FlushScope"/> that must be disposed to exit.</returns>
/// <remarks>
/// This method is thread-safe.
/// Must be disposed to exit.
/// </remarks>
internal FlushScope TryEnterFlushScope(out bool lockTaken)
internal FlushScope TryEnterFlushScope()
{
if (_addLock.TryEnter())
var scope = _addLock.TryEnterLockScope();
if (scope.IsEntered)
{
lockTaken = true;
return new FlushScope(this);
return new FlushScope(this, scope);
}

lockTaken = false;
return new FlushScope();
}

/// <summary>
/// Exits the <see cref="FlushScope"/> through <see cref="FlushScope.Dispose"/>.
/// </summary>
/// <remarks>
/// This method is thread-safe.
/// </remarks>
private void ExitFlushScope()
{
_addLock.Exit();
Debug.Assert(_addLock.IsEngaged);
}

/// <summary>
/// Blocks the current thread until all <see cref="TryAdd"/> operations have completed.
/// Forces invocation of a Timeout of the active buffer.
/// </summary>
/// <remarks>
/// This method is thread-safe.
/// </remarks>
internal void WaitAddCompleted()
internal void OnIntervalElapsed(object? state)
{
_addCounter.Wait();
}

/// <summary>
/// Attempt to atomically add one element to the buffer.
/// </summary>
/// <param name="item">Element attempted to be added atomically.</param>
/// <param name="count">When this method returns <see langword="true"/>, is set to the Length at which the <paramref name="item"/> was added at.</param>
/// <returns><see langword="true"/> when <paramref name="item"/> was added atomically; <see langword="false"/> when <paramref name="item"/> was not added.</returns>
/// <remarks>
/// This method is thread-safe.
/// </remarks>
internal bool TryAdd(T item, out int count)
{
if (_addLock.IsEntered)
{
count = 0;
return false;
}

using var scope = _addCounter.EnterScope();

count = Interlocked.Increment(ref _additions);

if (count <= _array.Length)
{
_array[count - 1] = item;
return true;
}

return false;
var now = _clock.GetUtcNow();
_timeoutExceededAction(this, now);
}

/// <summary>
/// Returns a new Array consisting of the elements successfully added.
/// </summary>
/// <returns>An Array with Length of successful additions.</returns>
/// <remarks>
/// This method is not thread-safe.
/// </remarks>
internal T[] ToArrayAndClear()
private T[] ToArrayAndClear()
{
var additions = _additions;
var length = _array.Length;
Expand All @@ -169,12 +166,10 @@ internal T[] ToArrayAndClear()
/// </summary>
/// <param name="length">The Length of the buffer a new Array is created from.</param>
/// <returns>An Array with Length of <paramref name="length"/>.</returns>
/// <remarks>
/// This method is not thread-safe.
/// </remarks>
internal T[] ToArrayAndClear(int length)
private T[] ToArrayAndClear(int length)
{
Debug.Assert(_addCounter.IsSet);
Debug.Assert(_addLock.IsSet);

var array = ToArray(length);
Clear(length);
return array;
Expand Down Expand Up @@ -203,35 +198,87 @@ private void Clear(int length)
Array.Clear(_array, 0, length);
}

private void EnableTimer()
{
var updated = _timer.Change(_timeout, Timeout.InfiniteTimeSpan);
Debug.Assert(updated, "Timer was not successfully enabled.");
}

private void DisableTimer()
{
var updated = _timer.Change(Timeout.Infinite, Timeout.Infinite);
Debug.Assert(updated, "Timer was not successfully disabled.");
}

/// <inheritdoc />
public void Dispose()
{
_addCounter.Dispose();
_timer.Dispose();
_addLock.Dispose();
}

private static void ThrowIfLessThanTwo(int capacity, string paramName)
private static void ThrowIfLessThanTwo(int value, string paramName)
{
if (capacity < 2)
if (value < 2)
{
ThrowLessThanTwo(capacity, paramName);
ThrowLessThanTwo(value, paramName);
}

static void ThrowLessThanTwo(int value, string paramName)
{
throw new ArgumentOutOfRangeException(paramName, value, "Argument must be at least two.");
}
}

private static void ThrowLessThanTwo(int capacity, string paramName)
private static void ThrowIfNegativeOrZero(TimeSpan value, string paramName)
{
throw new ArgumentOutOfRangeException(paramName, capacity, "Argument must be at least two.");
if (value <= TimeSpan.Zero && value != Timeout.InfiniteTimeSpan)
{
ThrowNegativeOrZero(value, paramName);
}

static void ThrowNegativeOrZero(TimeSpan value, string paramName)
{
throw new ArgumentOutOfRangeException(paramName, value, "Argument must be a non-negative and non-zero value.");
}
}

/// <summary>
/// A scope than ensures only a single <see cref="Flush"/> operation is in progress,
/// and blocks the calling thread until all <see cref="Add"/> operations have finished.
/// When <see cref="IsEntered"/> is <see langword="true"/>, no more <see cref="Add"/> can be started,
/// which will then return <see cref="BatchBufferAddStatus.IgnoredIsFlushing"/> immediately.
/// </summary>
/// <remarks>
/// Only <see cref="Flush"/> when scope <see cref="IsEntered"/>.
/// </remarks>
internal ref struct FlushScope : IDisposable
{
private BatchBuffer<T>? _lockObj;
private ScopedCountdownLock.LockScope _scope;

internal FlushScope(BatchBuffer<T> lockObj)
internal FlushScope(BatchBuffer<T> lockObj, ScopedCountdownLock.LockScope scope)
{
_lockObj = lockObj;
_scope = scope;
}

internal bool IsEntered => _lockObj is not null;
internal bool IsEntered => _scope.IsEntered;

internal T[] Flush()
{
var lockObj = _lockObj;
if (lockObj is not null)
{
lockObj._lastFlush = lockObj._clock.GetUtcNow();
_scope.Wait();

var array = lockObj.ToArrayAndClear();
return array;
}

throw new ObjectDisposedException(nameof(FlushScope));
}

public void Dispose()
{
Expand All @@ -241,6 +288,17 @@ public void Dispose()
_lockObj = null;
lockObj.ExitFlushScope();
}

_scope.Dispose();
}
}
}

internal enum BatchBufferAddStatus : byte
{
AddedFirst,
Added,
AddedLast,
IgnoredCapacityExceeded,
IgnoredIsFlushing,
}
Loading