Skip to content
247 changes: 247 additions & 0 deletions src/OpenTelemetry/Trace/Export/BatchingActivityProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
// <copyright file="BatchingActivityProcessor.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Trace.Export
{
/// <summary>
/// Implements processor that batches activities before calling exporter.
/// </summary>
public class BatchingActivityProcessor : ActivityProcessor, IDisposable
{
private const int DefaultMaxQueueSize = 2048;
private const int DefaultMaxExportBatchSize = 512;
private static readonly TimeSpan DefaultScheduleDelay = TimeSpan.FromMilliseconds(5000);
private readonly ConcurrentQueue<Activity> exportQueue;
private readonly int maxQueueSize;
private readonly int maxExportBatchSize;
private readonly TimeSpan scheduleDelay;
private readonly ActivityExporter exporter;
private readonly List<Activity> batch = new List<Activity>();
private CancellationTokenSource cts;
private volatile int currentQueueSize;
private bool stopping = false;

/// <summary>
/// Initializes a new instance of the <see cref="BatchingActivityProcessor"/> class with default parameters:
/// <list type="bullet">
/// <item>
/// <description>maxQueueSize = 2048,</description>
/// </item>
/// <item>
/// <description>scheduleDelay = 5 sec,</description>
/// </item>
/// <item>
/// <description>maxExportBatchSize = 512</description>
/// </item>
/// </list>
/// </summary>
/// <param name="exporter">Exporter instance.</param>
public BatchingActivityProcessor(ActivityExporter exporter)
: this(exporter, DefaultMaxQueueSize, DefaultScheduleDelay, DefaultMaxExportBatchSize)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="BatchingActivityProcessor"/> class with custom settings.
/// </summary>
/// <param name="exporter">Exporter instance.</param>
/// <param name="maxQueueSize">Maximum queue size. After the size is reached activities are dropped by processor.</param>
/// <param name="scheduleDelay">The delay between two consecutive exports.</param>
/// <param name="maxExportBatchSize">The maximum batch size of every export. It must be smaller or equal to maxQueueSize.</param>
public BatchingActivityProcessor(ActivityExporter exporter, int maxQueueSize, TimeSpan scheduleDelay, int maxExportBatchSize)
{
if (maxQueueSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
}

if (maxExportBatchSize <= 0 || maxExportBatchSize > maxQueueSize)
{
throw new ArgumentOutOfRangeException(nameof(maxExportBatchSize));
}

this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
this.maxQueueSize = maxQueueSize;
this.scheduleDelay = scheduleDelay;
this.maxExportBatchSize = maxExportBatchSize;

this.cts = new CancellationTokenSource();
this.exportQueue = new ConcurrentQueue<Activity>();

// worker task that will last for lifetime of processor.
// Threads are also useless as exporter tasks run in thread pool threads.
Task.Run(() => this.Worker(this.cts.Token), this.cts.Token);
}

/// <inheritdoc/>
public override void OnStart(Activity activity)
{
}

/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
if (this.stopping)
{
return;
}

// because of race-condition between checking the size and enqueueing,
// we might end up with a bit more activities than maxQueueSize.
// Let's just tolerate it to avoid extra synchronization.
if (this.currentQueueSize >= this.maxQueueSize)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted();
return;
}

Interlocked.Increment(ref this.currentQueueSize);

this.exportQueue.Enqueue(activity);
}

/// <inheritdoc/>
public override async Task ShutdownAsync(CancellationToken cancellationToken)
{
if (!this.stopping)
{
this.stopping = true;

// This will stop the loop after current batch finishes.
this.cts.Cancel(false);
this.cts.Dispose();
this.cts = null;

// if there are more items, continue until cancellation token allows
while (this.currentQueueSize > 0 && !cancellationToken.IsCancellationRequested)
{
await this.ExportBatchAsync(cancellationToken).ConfigureAwait(false);
}

await this.exporter.ShutdownAsync(cancellationToken);

// there is no point in waiting for a worker task if cancellation happens
// it's dead already or will die on the next iteration on its own

// ExportBatchAsync must never throw, we are here either because it was cancelled
// or because there are no items left
OpenTelemetrySdkEventSource.Log.ShutdownEvent(this.currentQueueSize);
}
}

public void Dispose()
{
this.Dispose(true);
}

protected virtual void Dispose(bool isDisposing)
{
if (!this.stopping)
{
this.ShutdownAsync(CancellationToken.None).ContinueWith(_ => { }).GetAwaiter().GetResult();
}

if (isDisposing)
{
if (this.exporter is IDisposable disposableExporter)
{
try
{
disposableExporter.Dispose();
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("Dispose", e);
}
}
}
}

private async Task ExportBatchAsync(CancellationToken cancellationToken)
{
try
{
if (cancellationToken.IsCancellationRequested)
{
return;
}

if (this.exportQueue.TryDequeue(out var nextActivity))
{
Interlocked.Decrement(ref this.currentQueueSize);
this.batch.Add(nextActivity);
}
else
{
// nothing in queue
return;
}

while (this.batch.Count < this.maxExportBatchSize && this.exportQueue.TryDequeue(out nextActivity))
{
Interlocked.Decrement(ref this.currentQueueSize);
this.batch.Add(nextActivity);
}

var result = await this.exporter.ExportAsync(this.batch, cancellationToken).ConfigureAwait(false);
if (result != ExportResult.Success)
{
OpenTelemetrySdkEventSource.Log.ExporterErrorResult(result);

// we do not support retries for now and leave it up to exporter
// as only exporter implementation knows how to retry: which items failed
// and what is the reasonable policy for that exporter.
}
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ExportBatchAsync), ex);
}
finally
{
this.batch.Clear();
}
}

private async Task Worker(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var sw = Stopwatch.StartNew();
await this.ExportBatchAsync(cancellationToken).ConfigureAwait(false);

if (cancellationToken.IsCancellationRequested)
{
return;
}

var remainingWait = this.scheduleDelay - sw.Elapsed;
if (remainingWait > TimeSpan.Zero)
{
await Task.Delay(remainingWait, cancellationToken).ConfigureAwait(false);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// <copyright file="TestActivityExporter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Trace.Export;
using System.Diagnostics;

namespace OpenTelemetry.Testing.Export
{
public class TestActivityExporter : ActivityExporter
{
private readonly ConcurrentQueue<Activity> activities = new ConcurrentQueue<Activity>();
private readonly Action<IEnumerable<Activity>> onExport;
public TestActivityExporter(Action<IEnumerable<Activity>> onExport)
{
this.onExport = onExport;
}

public Activity[] ExportedActivities => activities.ToArray();

public bool WasShutDown { get; private set; } = false;

public override Task<ExportResult> ExportAsync(IEnumerable<Activity> data, CancellationToken cancellationToken)
{
this.onExport?.Invoke(data);

foreach (var s in data)
{
this.activities.Enqueue(s);
}

return Task.FromResult(ExportResult.Success);
}

public override Task ShutdownAsync(CancellationToken cancellationToken)
{
this.WasShutDown = true;
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// <copyright file="TestExporter.cs" company="OpenTelemetry Authors">
// <copyright file="TestSpanExporter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -18,16 +18,15 @@
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Trace;
using OpenTelemetry.Trace.Export;

namespace OpenTelemetry.Testing.Export
{
public class TestExporter : SpanExporter
public class TestSpanExporter : SpanExporter
{
private readonly ConcurrentQueue<SpanData> spanDataList = new ConcurrentQueue<SpanData>();
private readonly Action<IEnumerable<SpanData>> onExport;
public TestExporter(Action<IEnumerable<SpanData>> onExport)
public TestSpanExporter(Action<IEnumerable<SpanData>> onExport)
{
this.onExport = onExport;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void PipelineBuilder_AddExporter()
{
var builder = new SpanProcessorPipelineBuilder();

var exporter = new TestExporter(null);
var exporter = new TestSpanExporter(null);
builder.SetExporter(exporter);

Assert.Same(exporter, builder.Exporter);
Expand All @@ -72,7 +72,7 @@ public void PipelineBuilder_AddExporterAndExportingProcessor()
{
var builder = new SpanProcessorPipelineBuilder();

var exporter = new TestExporter(null);
var exporter = new TestSpanExporter(null);
builder.SetExporter(exporter);

bool processorFactoryCalled = false;
Expand Down Expand Up @@ -203,7 +203,7 @@ public void PipelineBuilder_AddProcessorChainWithExporter()
Assert.NotNull(exporter);
return new SimpleSpanProcessor(exporter);
})
.SetExporter(new TestExporter(null));
.SetExporter(new TestSpanExporter(null));

var firstProcessor = (TestProcessor)builder.Build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void TracerBuilder_ValidArgs()
bool instrumentationFactoryCalled = true;

var sampler = new ProbabilitySampler(0.1);
var exporter = new TestExporter(_ => { });
var exporter = new TestSpanExporter(_ => { });
var options = new TracerConfiguration(1, 1, 1);

builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void CreateFactory_BuilderWithArgs()
{
var exporterCalledCount = 0;

var testExporter = new TestExporter(spans =>
var testExporter = new TestSpanExporter(spans =>
{
exporterCalledCount++;
Assert.Single(spans);
Expand Down Expand Up @@ -126,7 +126,7 @@ public void CreateFactory_BuilderWithMultiplePipelines()
{
var exporterCalledCount = 0;

var testExporter = new TestExporter(spans =>
var testExporter = new TestSpanExporter(spans =>
{
exporterCalledCount++;
Assert.Single(spans);
Expand Down
Loading