diff --git a/src/OpenTelemetry/Trace/Export/BatchingActivityProcessor.cs b/src/OpenTelemetry/Trace/Export/BatchingActivityProcessor.cs
new file mode 100644
index 00000000000..7faba54b150
--- /dev/null
+++ b/src/OpenTelemetry/Trace/Export/BatchingActivityProcessor.cs
@@ -0,0 +1,257 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+using OpenTelemetry.Internal;
+
+namespace OpenTelemetry.Trace.Export
+{
+ ///
+ /// Implements processor that batches activities before calling exporter.
+ ///
+ public class BatchingActivityProcessor : ActivityProcessor, IDisposable
+ {
+ private const int DefaultMaxQueueSize = 2048;
+ private const int DefaultMaxExportBatchSize = 512;
+ private static readonly TimeSpan DefaultScheduledDelay = TimeSpan.FromMilliseconds(5000);
+ private static readonly TimeSpan DefaultExporterTimeout = TimeSpan.FromMilliseconds(30000);
+ private readonly ConcurrentQueue exportQueue;
+ private readonly int maxQueueSize;
+ private readonly int maxExportBatchSize;
+ private readonly TimeSpan scheduledDelay;
+ private readonly TimeSpan exporterTimeout;
+ private readonly ActivityExporter exporter;
+ private readonly List batch = new List();
+ private CancellationTokenSource cts;
+ private volatile int currentQueueSize;
+ private bool stopping = false;
+
+ ///
+ /// Initializes a new instance of the class with default parameters:
+ ///
+ /// -
+ /// maxQueueSize = 2048,
+ ///
+ /// -
+ /// scheduledDelay = 5 sec,
+ ///
+ /// -
+ /// exporterTimeout = 30 sec,
+ ///
+ /// -
+ /// maxExportBatchSize = 512
+ ///
+ ///
+ ///
+ /// Exporter instance.
+ public BatchingActivityProcessor(ActivityExporter exporter)
+ : this(exporter, DefaultMaxQueueSize, DefaultScheduledDelay, DefaultExporterTimeout, DefaultMaxExportBatchSize)
+ {
+ }
+
+ ///
+ /// Initializes a new instance of the class with custom settings.
+ ///
+ /// Exporter instance.
+ /// Maximum queue size. After the size is reached activities are dropped by processor.
+ /// The delay between two consecutive exports.
+ /// Maximum allowed time to export data.
+ /// The maximum batch size of every export. It must be smaller or equal to maxQueueSize.
+ public BatchingActivityProcessor(ActivityExporter exporter, int maxQueueSize, TimeSpan scheduledDelay, TimeSpan exporterTimeout, int maxExportBatchSize)
+ {
+ if (maxQueueSize <= 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
+ }
+
+ if (maxExportBatchSize <= 0 || maxExportBatchSize > maxQueueSize)
+ {
+ throw new ArgumentOutOfRangeException(nameof(maxExportBatchSize));
+ }
+
+ this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
+ this.maxQueueSize = maxQueueSize;
+ this.scheduledDelay = scheduledDelay;
+ this.exporterTimeout = exporterTimeout;
+ this.maxExportBatchSize = maxExportBatchSize;
+
+ this.cts = new CancellationTokenSource();
+ this.exportQueue = new ConcurrentQueue();
+
+ // worker task that will last for lifetime of processor.
+ // Threads are also useless as exporter tasks run in thread pool threads.
+ Task.Run(() => this.Worker(this.cts.Token), this.cts.Token);
+ }
+
+ ///
+ public override void OnStart(Activity activity)
+ {
+ }
+
+ ///
+ public override void OnEnd(Activity activity)
+ {
+ if (this.stopping)
+ {
+ return;
+ }
+
+ // because of race-condition between checking the size and enqueueing,
+ // we might end up with a bit more activities than maxQueueSize.
+ // Let's just tolerate it to avoid extra synchronization.
+ if (this.currentQueueSize >= this.maxQueueSize)
+ {
+ OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted();
+ return;
+ }
+
+ Interlocked.Increment(ref this.currentQueueSize);
+
+ this.exportQueue.Enqueue(activity);
+ }
+
+ ///
+ public override async Task ShutdownAsync(CancellationToken cancellationToken)
+ {
+ if (!this.stopping)
+ {
+ this.stopping = true;
+
+ // This will stop the loop after current batch finishes.
+ this.cts.Cancel(false);
+ this.cts.Dispose();
+ this.cts = null;
+
+ // if there are more items, continue until cancellation token allows
+ while (this.currentQueueSize > 0 && !cancellationToken.IsCancellationRequested)
+ {
+ await this.ExportBatchAsync(cancellationToken).ConfigureAwait(false);
+ }
+
+ await this.exporter.ShutdownAsync(cancellationToken);
+
+ // there is no point in waiting for a worker task if cancellation happens
+ // it's dead already or will die on the next iteration on its own
+
+ // ExportBatchAsync must never throw, we are here either because it was cancelled
+ // or because there are no items left
+ OpenTelemetrySdkEventSource.Log.ShutdownEvent(this.currentQueueSize);
+ }
+ }
+
+ public void Dispose()
+ {
+ this.Dispose(true);
+ }
+
+ protected virtual void Dispose(bool isDisposing)
+ {
+ if (!this.stopping)
+ {
+ this.ShutdownAsync(CancellationToken.None).ContinueWith(_ => { }).GetAwaiter().GetResult();
+ }
+
+ if (isDisposing)
+ {
+ if (this.exporter is IDisposable disposableExporter)
+ {
+ try
+ {
+ disposableExporter.Dispose();
+ }
+ catch (Exception e)
+ {
+ OpenTelemetrySdkEventSource.Log.SpanProcessorException("Dispose", e);
+ }
+ }
+ }
+ }
+
+ private async Task ExportBatchAsync(CancellationToken cancellationToken)
+ {
+ try
+ {
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return;
+ }
+
+ if (this.exportQueue.TryDequeue(out var nextActivity))
+ {
+ Interlocked.Decrement(ref this.currentQueueSize);
+ this.batch.Add(nextActivity);
+ }
+ else
+ {
+ // nothing in queue
+ return;
+ }
+
+ while (this.batch.Count < this.maxExportBatchSize && this.exportQueue.TryDequeue(out nextActivity))
+ {
+ Interlocked.Decrement(ref this.currentQueueSize);
+ this.batch.Add(nextActivity);
+ }
+
+ var result = await this.exporter.ExportAsync(this.batch, cancellationToken).ConfigureAwait(false);
+ if (result != ExportResult.Success)
+ {
+ OpenTelemetrySdkEventSource.Log.ExporterErrorResult(result);
+
+ // we do not support retries for now and leave it up to exporter
+ // as only exporter implementation knows how to retry: which items failed
+ // and what is the reasonable policy for that exporter.
+ }
+ }
+ catch (Exception ex)
+ {
+ OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ExportBatchAsync), ex);
+ }
+ finally
+ {
+ this.batch.Clear();
+ }
+ }
+
+ private async Task Worker(CancellationToken cancellationToken)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var sw = Stopwatch.StartNew();
+ using (var exportCancellationTokenSource = new CancellationTokenSource(this.exporterTimeout))
+ {
+ await this.ExportBatchAsync(exportCancellationTokenSource.Token).ConfigureAwait(false);
+ }
+
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return;
+ }
+
+ var remainingWait = this.scheduledDelay - sw.Elapsed;
+ if (remainingWait > TimeSpan.Zero)
+ {
+ await Task.Delay(remainingWait, cancellationToken).ConfigureAwait(false);
+ }
+ }
+ }
+ }
+}
diff --git a/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestActivityExporter.cs b/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestActivityExporter.cs
index 54feea0d9f6..c2dbf99a331 100644
--- a/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestActivityExporter.cs
+++ b/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestActivityExporter.cs
@@ -25,7 +25,7 @@ namespace OpenTelemetry.Testing.Export
{
public class TestActivityExporter : ActivityExporter
{
- private readonly ConcurrentQueue spanDataList = new ConcurrentQueue();
+ private readonly ConcurrentQueue activities = new ConcurrentQueue();
private readonly Action> onExport;
public TestActivityExporter(Action> onExport)
@@ -33,17 +33,24 @@ public TestActivityExporter(Action> onExport)
this.onExport = onExport;
}
- public Activity[] ExportedSpans => this.spanDataList.ToArray();
+ public Activity[] ExportedActivities => this.activities.ToArray();
public bool WasShutDown { get; private set; } = false;
public override Task ExportAsync(IEnumerable data, CancellationToken cancellationToken)
{
+ // Added a sleep for zero milliseconds to respect cancellation time set by export timeout.
+ Thread.Sleep(0);
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return default;
+ }
+
this.onExport?.Invoke(data);
foreach (var s in data)
{
- this.spanDataList.Enqueue(s);
+ this.activities.Enqueue(s);
}
return Task.FromResult(ExportResult.Success);
diff --git a/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestExporter.cs b/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestSpanExporter.cs
similarity index 90%
rename from test/OpenTelemetry.Tests/Implementation/Testing/Export/TestExporter.cs
rename to test/OpenTelemetry.Tests/Implementation/Testing/Export/TestSpanExporter.cs
index cd253d1f553..b004b15b554 100644
--- a/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestExporter.cs
+++ b/test/OpenTelemetry.Tests/Implementation/Testing/Export/TestSpanExporter.cs
@@ -1,4 +1,4 @@
-//
+//
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,12 +22,12 @@
namespace OpenTelemetry.Testing.Export
{
- public class TestExporter : SpanExporter
+ public class TestSpanExporter : SpanExporter
{
private readonly ConcurrentQueue spanDataList = new ConcurrentQueue();
private readonly Action> onExport;
- public TestExporter(Action> onExport)
+ public TestSpanExporter(Action> onExport)
{
this.onExport = onExport;
}
diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Config/SpanProcessorPipelineTests.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Config/SpanProcessorPipelineTests.cs
index 2f5efdc02f6..6a890eb0b59 100644
--- a/test/OpenTelemetry.Tests/Implementation/Trace/Config/SpanProcessorPipelineTests.cs
+++ b/test/OpenTelemetry.Tests/Implementation/Trace/Config/SpanProcessorPipelineTests.cs
@@ -55,7 +55,7 @@ public void PipelineBuilder_AddExporter()
{
var builder = new SpanProcessorPipelineBuilder();
- var exporter = new TestExporter(null);
+ var exporter = new TestSpanExporter(null);
builder.SetExporter(exporter);
Assert.Same(exporter, builder.Exporter);
@@ -72,7 +72,7 @@ public void PipelineBuilder_AddExporterAndExportingProcessor()
{
var builder = new SpanProcessorPipelineBuilder();
- var exporter = new TestExporter(null);
+ var exporter = new TestSpanExporter(null);
builder.SetExporter(exporter);
bool processorFactoryCalled = false;
@@ -203,7 +203,7 @@ public void PipelineBuilder_AddProcessorChainWithExporter()
Assert.NotNull(exporter);
return new SimpleSpanProcessor(exporter);
})
- .SetExporter(new TestExporter(null));
+ .SetExporter(new TestSpanExporter(null));
var firstProcessor = (TestProcessor)builder.Build();
diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerBuilderTests.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerBuilderTests.cs
index 6f2078df24d..72f369b2e77 100644
--- a/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerBuilderTests.cs
+++ b/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerBuilderTests.cs
@@ -56,7 +56,7 @@ public void TracerBuilder_ValidArgs()
bool instrumentationFactoryCalled = true;
var sampler = new ProbabilitySampler(0.1);
- var exporter = new TestExporter(_ => { });
+ var exporter = new TestSpanExporter(_ => { });
var options = new TracerConfiguration(1, 1, 1);
builder
diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerFactoryTest.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerFactoryTest.cs
index fe7e9989689..8524a84f3f8 100644
--- a/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerFactoryTest.cs
+++ b/test/OpenTelemetry.Tests/Implementation/Trace/Config/TracerFactoryTest.cs
@@ -57,7 +57,7 @@ public void CreateFactory_BuilderWithArgs()
{
var exporterCalledCount = 0;
- var testExporter = new TestExporter(spans =>
+ var testExporter = new TestSpanExporter(spans =>
{
exporterCalledCount++;
Assert.Single(spans);
@@ -124,7 +124,7 @@ public void CreateFactory_BuilderWithMultiplePipelines()
{
var exporterCalledCount = 0;
- var testExporter = new TestExporter(spans =>
+ var testExporter = new TestSpanExporter(spans =>
{
exporterCalledCount++;
Assert.Single(spans);
diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingActivityProcessorTests.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingActivityProcessorTests.cs
new file mode 100644
index 00000000000..e2c5f307daa
--- /dev/null
+++ b/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingActivityProcessorTests.cs
@@ -0,0 +1,403 @@
+//
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using OpenTelemetry.Testing.Export;
+using OpenTelemetry.Trace.Configuration;
+using OpenTelemetry.Trace.Samplers;
+using Xunit;
+
+namespace OpenTelemetry.Trace.Export.Test
+{
+ public class BatchingActivityProcessorTests : IDisposable
+ {
+ private const string ActivityName1 = "MyActivityName/1";
+ private const string ActivityName2 = "MyActivityName/2";
+ private const string ActivitySourceName = "my.source";
+
+ private static readonly TimeSpan DefaultDelay = TimeSpan.FromMilliseconds(30);
+ private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(1);
+ private static readonly ActivitySource Source = new ActivitySource(ActivitySourceName);
+
+ [Fact]
+ public void ThrowsOnInvalidArguments()
+ {
+ Assert.Throws(() => new BatchingActivityProcessor(null));
+ Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 0, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), 0));
+ Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 2048, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), 0));
+ Assert.Throws(() => new BatchingActivityProcessor(new TestActivityExporter(null), 512, TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(5), 513));
+ }
+
+ [Fact]
+ public async Task ShutdownTwice()
+ {
+ using var activityProcessor = new BatchingActivityProcessor(new TestActivityExporter(null));
+ await activityProcessor.ShutdownAsync(CancellationToken.None);
+
+ // does not throw
+ await activityProcessor.ShutdownAsync(CancellationToken.None);
+ }
+
+ [Fact]
+ public async Task ShutdownWithHugeScheduledDelay()
+ {
+ using var activityProcessor =
+ new BatchingActivityProcessor(new TestActivityExporter(null), 128, TimeSpan.FromMinutes(1), TimeSpan.FromSeconds(100), 32);
+ var sw = Stopwatch.StartNew();
+ using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)))
+ {
+ cts.Token.ThrowIfCancellationRequested();
+ await activityProcessor.ShutdownAsync(cts.Token).ConfigureAwait(false);
+ }
+
+ sw.Stop();
+ Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100));
+ }
+
+ [Fact]
+ public void CancelWithExporterTimeoutMilliSeconds()
+ {
+ var activityExporter = new TestActivityExporter(null);
+ using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(0), 1);
+ var openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(b => b
+ .AddActivitySource(ActivitySourceName)
+ .SetSampler(new AlwaysOnActivitySampler())
+ .AddProcessorPipeline(pp => pp.AddProcessor(ap => activityProcessor)));
+
+ var activity1 = this.CreateActivity(ActivityName1);
+
+ var exported = this.WaitForActivities(activityExporter, 0, DefaultTimeout);
+ }
+
+ [Fact]
+ public void ExportDifferentSampledActivities()
+ {
+ var activityExporter = new TestActivityExporter(null);
+ using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 128);
+ var openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(b => b
+ .AddActivitySource(ActivitySourceName)
+ .SetSampler(new AlwaysOnActivitySampler())
+ .AddProcessorPipeline(pp => pp.AddProcessor(ap => activityProcessor)));
+
+ var activity1 = this.CreateActivity(ActivityName1);
+ var activity2 = this.CreateActivity(ActivityName2);
+
+ var exported = this.WaitForActivities(activityExporter, 2, DefaultTimeout);
+
+ Assert.Equal(2, exported.Length);
+ Assert.Contains(activity1, exported);
+ Assert.Contains(activity2, exported);
+ }
+
+ [Fact]
+ public void ExporterIsSlowerThanDelay()
+ {
+ var exportStartTimes = new List();
+ var exportEndTimes = new List();
+ var activityExporter = new TestActivityExporter(_ =>
+ {
+ exportStartTimes.Add(Stopwatch.GetTimestamp());
+ Thread.Sleep(50);
+ exportEndTimes.Add(Stopwatch.GetTimestamp());
+ });
+
+ using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(30), DefaultTimeout, 2);
+ var openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(b => b
+ .AddActivitySource(ActivitySourceName)
+ .SetSampler(new AlwaysOnActivitySampler())
+ .AddProcessorPipeline(pp => pp.AddProcessor(ap => activityProcessor)));
+
+ var activities = new List();
+ for (int i = 0; i < 20; i++)
+ {
+ activities.Add(this.CreateActivity(i.ToString()));
+ }
+
+ var exported = this.WaitForActivities(activityExporter, 20, TimeSpan.FromSeconds(2));
+
+ Assert.Equal(activities.Count, exported.Length);
+ Assert.InRange(exportStartTimes.Count, 10, 20);
+
+ for (int i = 1; i < exportStartTimes.Count - 1; i++)
+ {
+ Assert.InRange(exportStartTimes[i], exportEndTimes[i - 1] + 1, exportStartTimes[i + 1] - 1);
+ }
+ }
+
+ [Fact]
+ public void AddActivityAfterQueueIsExhausted()
+ {
+ int exportCalledCount = 0;
+ var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount));
+ using var activityProcessor = new BatchingActivityProcessor(activityExporter, 1, TimeSpan.FromMilliseconds(100), DefaultTimeout, 1);
+ var openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(b => b
+ .AddActivitySource(ActivitySourceName)
+ .SetSampler(new AlwaysOnActivitySampler())
+ .AddProcessorPipeline(pp => pp.AddProcessor(ap => activityProcessor)));
+
+ var activities = new List();
+ for (int i = 0; i < 20; i++)
+ {
+ activities.Add(this.CreateActivity(i.ToString()));
+ }
+
+ var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout);
+
+ Assert.Equal(1, exportCalledCount);
+ Assert.InRange(exported.Length, 1, 2);
+ Assert.Contains(activities.First(), exported);
+ }
+
+ [Fact]
+ public void ExportMoreActivitiesThanTheMaxBatchSize()
+ {
+ var exporterCalled = new ManualResetEvent(false);
+ int exportCalledCount = 0;
+ var activityExporter = new TestActivityExporter(_ =>
+ {
+ exporterCalled.Set();
+ Interlocked.Increment(ref exportCalledCount);
+ });
+
+ using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 3);
+ var openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(b => b
+ .AddActivitySource(ActivitySourceName)
+ .SetSampler(new AlwaysOnActivitySampler())
+ .AddProcessorPipeline(pp => pp.AddProcessor(ap => activityProcessor)));
+
+ var activity1 = this.CreateActivity(ActivityName1);
+ var activity2 = this.CreateActivity(ActivityName1);
+ var activity3 = this.CreateActivity(ActivityName1);
+ var activity4 = this.CreateActivity(ActivityName1);
+ var activity5 = this.CreateActivity(ActivityName1);
+ var activity6 = this.CreateActivity(ActivityName1);
+
+ // wait for exporter to be called to stabilize tests on the build server
+ exporterCalled.WaitOne(TimeSpan.FromSeconds(10));
+
+ var exported = this.WaitForActivities(activityExporter, 6, DefaultTimeout);
+
+ Assert.InRange(exportCalledCount, 2, 6);
+
+ Assert.Equal(6, exported.Count());
+ Assert.Contains(activity1, exported);
+ Assert.Contains(activity2, exported);
+ Assert.Contains(activity3, exported);
+ Assert.Contains(activity4, exported);
+ Assert.Contains(activity5, exported);
+ Assert.Contains(activity6, exported);
+ }
+
+ [Fact(Skip = "Reenable once AlwaysParentActivitySampler is added")]
+ public void ExportNotSampledActivities()
+ {
+ int exportCalledCount = 0;
+ var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount));
+ using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 1);
+ var openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(b => b
+ .SetSampler(new AlwaysOffActivitySampler())
+ .AddActivitySource(ActivitySourceName)
+ .AddProcessorPipeline(pp => pp.AddProcessor(ap => activityProcessor)));
+
+ var activity1 = this.CreateSampledEndedActivity(ActivityName1);
+ var activity2 = this.CreateNotSampledEndedActivity(ActivityName2);
+
+ // Activities are recorded and exported in the same order as they are created, we test that a non
+ // sampled activity is not exported by creating and ending a sampled activity after a non sampled activity
+ // and checking that the first exported activity is the sampled activity (the non sampled did not get
+ // exported).
+ var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout);
+ Assert.Equal(1, exportCalledCount);
+
+ // Need to check this because otherwise the variable activity1 is unused, other option is to not
+ // have a activity1 variable.
+ Assert.Single(exported);
+ Assert.Contains(activity1, exported);
+ }
+
+ [Fact]
+ public void ProcessorDoesNotBlockOnExporter()
+ {
+ var resetEvent = new ManualResetEvent(false);
+ var activityExporter = new TestActivityExporter(_ => resetEvent.WaitOne(TimeSpan.FromSeconds(10)));
+ using var activityProcessor = new BatchingActivityProcessor(activityExporter, 128, DefaultDelay, DefaultTimeout, 128);
+
+ var openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(b => b
+ .AddActivitySource(ActivitySourceName)
+ .SetSampler(new AlwaysOnActivitySampler())
+ .AddProcessorPipeline(pp => pp.AddProcessor(ap => activityProcessor)));
+
+ var activity = Source.StartActivity("foo");
+
+ // does not block
+ var sw = Stopwatch.StartNew();
+ activity?.Stop();
+ sw.Stop();
+
+ Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100));
+
+ resetEvent.Set();
+
+ var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout);
+
+ Assert.Single(exported);
+ }
+
+ [Fact]
+ public async Task ShutdownOnNotEmptyQueueFullFlush()
+ {
+ const int batchSize = 2;
+ int exportCalledCount = 0;
+ var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount));
+ using var activityProcessor =
+ new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(100), DefaultTimeout, batchSize);
+ var openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(b => b
+ .AddActivitySource(ActivitySourceName)
+ .SetSampler(new AlwaysOnActivitySampler())
+ .AddProcessorPipeline(pp => pp.AddProcessor(ap => activityProcessor)));
+
+ var activities = new List();
+ for (int i = 0; i < 100; i++)
+ {
+ activities.Add(this.CreateActivity(i.ToString()));
+ }
+
+ Assert.True(activityExporter.ExportedActivities.Length < activities.Count);
+ using (var cts = new CancellationTokenSource(DefaultTimeout))
+ {
+ await activityProcessor.ShutdownAsync(cts.Token);
+ }
+
+ Assert.True(activityExporter.WasShutDown);
+ Assert.Equal(activities.Count, activityExporter.ExportedActivities.Length);
+ Assert.InRange(exportCalledCount, activities.Count / batchSize, activities.Count);
+ }
+
+ [Fact]
+ public async Task ShutdownOnNotEmptyQueueNotFullFlush()
+ {
+ const int batchSize = 2;
+ int exportCalledCount = 0;
+
+ // we'll need about 1.5 sec to export all activities
+ // we export 100 activities in batches of 2, each export takes 30ms, in one thread
+ var activityExporter = new TestActivityExporter(_ =>
+ {
+ Interlocked.Increment(ref exportCalledCount);
+ Thread.Sleep(30);
+ });
+
+ using var activityProcessor =
+ new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(100), DefaultTimeout, batchSize);
+ var openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(b => b
+ .AddActivitySource(ActivitySourceName)
+ .SetSampler(new AlwaysOnActivitySampler())
+ .AddProcessorPipeline(pp => pp.AddProcessor(ap => activityProcessor)));
+ var activities = new List();
+ for (int i = 0; i < 100; i++)
+ {
+ activities.Add(this.CreateActivity(i.ToString()));
+ }
+
+ Assert.True(activityExporter.ExportedActivities.Length < activities.Count);
+
+ // we won't bs able to export all before cancellation will fire
+ using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(200)))
+ {
+ await activityProcessor.ShutdownAsync(cts.Token);
+ }
+
+ var exportedCount = activityExporter.ExportedActivities.Length;
+ Assert.True(exportedCount < activities.Count);
+ }
+
+ [Fact]
+ public void DisposeFlushes()
+ {
+ const int batchSize = 1;
+ int exportCalledCount = 0;
+ var activityExporter = new TestActivityExporter(_ => Interlocked.Increment(ref exportCalledCount));
+ var activities = new List();
+ using (var batchingActivityProcessor = new BatchingActivityProcessor(activityExporter, 128, TimeSpan.FromMilliseconds(100), DefaultTimeout, batchSize))
+ {
+ var openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(b => b
+ .AddActivitySource(ActivitySourceName)
+ .SetSampler(new AlwaysOnActivitySampler())
+ .AddProcessorPipeline(pp => pp.AddProcessor(ap => batchingActivityProcessor)));
+ for (int i = 0; i < 3; i++)
+ {
+ activities.Add(this.CreateActivity(i.ToString()));
+ }
+
+ Assert.True(activityExporter.ExportedActivities.Length < activities.Count);
+ }
+
+ Assert.True(activityExporter.WasShutDown);
+ Assert.Equal(activities.Count, activityExporter.ExportedActivities.Length);
+ Assert.Equal(activities.Count / batchSize, exportCalledCount);
+ }
+
+ public void Dispose()
+ {
+ Activity.Current = null;
+ }
+
+ private Activity CreateActivity(string activityName)
+ {
+ var activity = Source.StartActivity(activityName);
+ activity?.Stop();
+ return activity;
+ }
+
+ private Activity CreateSampledEndedActivity(string activityName)
+ {
+ var context = new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.Recorded);
+
+ var activity = Source.StartActivity(activityName, ActivityKind.Internal, context);
+ activity.Stop();
+ return activity;
+ }
+
+ private Activity CreateNotSampledEndedActivity(string activityName)
+ {
+ var context = new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.None);
+
+ var activity = Source.StartActivity(activityName, ActivityKind.Server, context);
+ activity?.Stop();
+ return activity;
+ }
+
+ private Activity[] WaitForActivities(TestActivityExporter exporter, int activityCount, TimeSpan timeout)
+ {
+ var sw = Stopwatch.StartNew();
+ while (exporter.ExportedActivities.Length < activityCount && sw.Elapsed <= timeout)
+ {
+ Thread.Sleep(10);
+ }
+
+ Assert.True(
+ exporter.ExportedActivities.Length >= activityCount,
+ $"Expected at least {activityCount}, got {exporter.ExportedActivities.Length}");
+
+ return exporter.ExportedActivities;
+ }
+ }
+}
diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingSpanProcessorTest.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingSpanProcessorTest.cs
index 81123bc485a..ddfa90d0bca 100644
--- a/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingSpanProcessorTest.cs
+++ b/test/OpenTelemetry.Tests/Implementation/Trace/Export/BatchingSpanProcessorTest.cs
@@ -39,15 +39,15 @@ public class BatchingSpanProcessorTest : IDisposable
public void ThrowsOnInvalidArguments()
{
Assert.Throws(() => new BatchingSpanProcessor(null));
- Assert.Throws(() => new BatchingSpanProcessor(new TestExporter(null), 0, TimeSpan.FromSeconds(5), 0));
- Assert.Throws(() => new BatchingSpanProcessor(new TestExporter(null), 2048, TimeSpan.FromSeconds(5), 0));
- Assert.Throws(() => new BatchingSpanProcessor(new TestExporter(null), 512, TimeSpan.FromSeconds(5), 513));
+ Assert.Throws(() => new BatchingSpanProcessor(new TestSpanExporter(null), 0, TimeSpan.FromSeconds(5), 0));
+ Assert.Throws(() => new BatchingSpanProcessor(new TestSpanExporter(null), 2048, TimeSpan.FromSeconds(5), 0));
+ Assert.Throws(() => new BatchingSpanProcessor(new TestSpanExporter(null), 512, TimeSpan.FromSeconds(5), 513));
}
[Fact]
public async Task ShutdownTwice()
{
- using var spanProcessor = new BatchingSpanProcessor(new TestExporter(null));
+ using var spanProcessor = new BatchingSpanProcessor(new TestSpanExporter(null));
await spanProcessor.ShutdownAsync(CancellationToken.None);
// does not throw
@@ -58,7 +58,7 @@ public async Task ShutdownTwice()
public async Task ShutdownWithHugeScheduleDelay()
{
using var spanProcessor =
- new BatchingSpanProcessor(new TestExporter(null), 128, TimeSpan.FromMinutes(1), 32);
+ new BatchingSpanProcessor(new TestSpanExporter(null), 128, TimeSpan.FromMinutes(1), 32);
var sw = Stopwatch.StartNew();
using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)))
{
@@ -73,7 +73,7 @@ public async Task ShutdownWithHugeScheduleDelay()
[Fact]
public void ExportDifferentSampledSpans()
{
- var spanExporter = new TestExporter(null);
+ var spanExporter = new TestSpanExporter(null);
using var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, DefaultDelay, 128);
var span1 = this.CreateSampledEndedSpan(SpanName1, spanProcessor);
var span2 = this.CreateSampledEndedSpan(SpanName2, spanProcessor);
@@ -90,7 +90,7 @@ public void ExporterIsSlowerThanDelay()
{
var exportStartTimes = new List();
var exportEndTimes = new List();
- var spanExporter = new TestExporter(_ =>
+ var spanExporter = new TestSpanExporter(_ =>
{
exportStartTimes.Add(Stopwatch.GetTimestamp());
Thread.Sleep(50);
@@ -119,7 +119,7 @@ public void ExporterIsSlowerThanDelay()
public void AddSpanAfterQueueIsExhausted()
{
int exportCalledCount = 0;
- var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount));
+ var spanExporter = new TestSpanExporter(_ => Interlocked.Increment(ref exportCalledCount));
using var spanProcessor = new BatchingSpanProcessor(spanExporter, 1, TimeSpan.FromMilliseconds(100), 1);
var spans = new List();
for (int i = 0; i < 20; i++)
@@ -139,7 +139,7 @@ public void ExportMoreSpansThanTheMaxBatchSize()
{
var exporterCalled = new ManualResetEvent(false);
int exportCalledCount = 0;
- var spanExporter = new TestExporter(_ =>
+ var spanExporter = new TestSpanExporter(_ =>
{
exporterCalled.Set();
Interlocked.Increment(ref exportCalledCount);
@@ -173,7 +173,7 @@ public void ExportMoreSpansThanTheMaxBatchSize()
public void ExportNotSampledSpans()
{
int exportCalledCount = 0;
- var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount));
+ var spanExporter = new TestSpanExporter(_ => Interlocked.Increment(ref exportCalledCount));
using var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, DefaultDelay, 3);
var span1 = this.CreateNotSampledEndedSpan(SpanName1, spanProcessor);
var span2 = this.CreateSampledEndedSpan(SpanName2, spanProcessor);
@@ -195,7 +195,7 @@ public void ExportNotSampledSpans()
public void ProcessorDoesNotBlockOnExporter()
{
var resetEvent = new ManualResetEvent(false);
- var spanExporter = new TestExporter(_ => resetEvent.WaitOne(TimeSpan.FromSeconds(10)));
+ var spanExporter = new TestSpanExporter(_ => resetEvent.WaitOne(TimeSpan.FromSeconds(10)));
using var factory = TracerFactory.Create(b => b
.AddProcessorPipeline(p => p
.SetExporter(spanExporter)
@@ -224,7 +224,7 @@ public async Task ShutdownOnNotEmptyQueueFullFlush()
{
const int batchSize = 2;
int exportCalledCount = 0;
- var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount));
+ var spanExporter = new TestSpanExporter(_ => Interlocked.Increment(ref exportCalledCount));
using var spanProcessor =
new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(100), batchSize);
var spans = new List();
@@ -252,7 +252,7 @@ public async Task ShutdownOnNotEmptyQueueNotFullFlush()
// we'll need about 1.5 sec to export all spans
// we export 100 spans in batches of 2, each export takes 30ms, in one thread
- var spanExporter = new TestExporter(_ =>
+ var spanExporter = new TestSpanExporter(_ =>
{
Interlocked.Increment(ref exportCalledCount);
Thread.Sleep(30);
@@ -283,7 +283,7 @@ public void DisposeFlushes()
{
const int batchSize = 2;
int exportCalledCount = 0;
- var spanExporter = new TestExporter(_ => Interlocked.Increment(ref exportCalledCount));
+ var spanExporter = new TestSpanExporter(_ => Interlocked.Increment(ref exportCalledCount));
var spans = new List();
using (var spanProcessor = new BatchingSpanProcessor(spanExporter, 128, TimeSpan.FromMilliseconds(100), batchSize))
{
@@ -305,7 +305,7 @@ public void Dispose()
Activity.Current = null;
}
- private SpanData[] WaitForSpans(TestExporter exporter, int spanCount, TimeSpan timeout)
+ private SpanData[] WaitForSpans(TestSpanExporter exporter, int spanCount, TimeSpan timeout)
{
var sw = Stopwatch.StartNew();
while (exporter.ExportedSpans.Length < spanCount && sw.Elapsed <= timeout)
diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleActivityProcessorTest.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleActivityProcessorTest.cs
index cea63f1affb..28252c70d32 100644
--- a/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleActivityProcessorTest.cs
+++ b/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleActivityProcessorTest.cs
@@ -167,10 +167,10 @@ private Activity[] WaitForSpans(TestActivityExporter exporter, int spanCount, Ti
() =>
{
Thread.Sleep(0);
- return exporter.ExportedSpans.Length >= spanCount;
+ return exporter.ExportedActivities.Length >= spanCount;
}, timeout + TimeSpan.FromMilliseconds(20)));
- return exporter.ExportedSpans;
+ return exporter.ExportedActivities;
}
}
}
diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleSpanProcessorTest.cs b/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleSpanProcessorTest.cs
index 1d4429be2d2..6d45cabdfc3 100644
--- a/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleSpanProcessorTest.cs
+++ b/test/OpenTelemetry.Tests/Implementation/Trace/Export/SimpleSpanProcessorTest.cs
@@ -30,12 +30,12 @@ public class SimpleSpanProcessorTest : IDisposable
private const string SpanName1 = "MySpanName/1";
private const string SpanName2 = "MySpanName/2";
- private TestExporter spanExporter;
+ private TestSpanExporter spanExporter;
private Tracer tracer;
public SimpleSpanProcessorTest()
{
- this.spanExporter = new TestExporter(null);
+ this.spanExporter = new TestSpanExporter(null);
this.tracer = TracerFactory.Create(b => b
.AddProcessorPipeline(p => p
.SetExporter(this.spanExporter)
@@ -53,7 +53,7 @@ public void ThrowsOnNullExporter()
[Fact]
public void ThrowsInExporter()
{
- this.spanExporter = new TestExporter(_ => throw new ArgumentException("123"));
+ this.spanExporter = new TestSpanExporter(_ => throw new ArgumentException("123"));
this.tracer = TracerFactory.Create(b => b
.AddProcessorPipeline(p => p
.SetExporter(this.spanExporter)
@@ -70,7 +70,7 @@ public void ThrowsInExporter()
[Fact]
public void ProcessorDoesNotBlockOnExporter()
{
- this.spanExporter = new TestExporter(async _ => await Task.Delay(500));
+ this.spanExporter = new TestSpanExporter(async _ => await Task.Delay(500));
this.tracer = TracerFactory.Create(b => b
.AddProcessorPipeline(p => p
.SetExporter(this.spanExporter)
@@ -95,7 +95,7 @@ public void ProcessorDoesNotBlockOnExporter()
[Fact]
public async Task ShutdownTwice()
{
- var spanProcessor = new SimpleSpanProcessor(new TestExporter(null));
+ var spanProcessor = new SimpleSpanProcessor(new TestSpanExporter(null));
await spanProcessor.ShutdownAsync(CancellationToken.None).ConfigureAwait(false);
@@ -156,7 +156,7 @@ private SpanSdk CreateNotSampledEndedSpan(string spanName)
return span;
}
- private SpanData[] WaitForSpans(TestExporter exporter, int spanCount, TimeSpan timeout)
+ private SpanData[] WaitForSpans(TestSpanExporter exporter, int spanCount, TimeSpan timeout)
{
Assert.True(
SpinWait.SpinUntil(
diff --git a/test/OpenTelemetry.Tests/Implementation/Trace/TracerTest.cs b/test/OpenTelemetry.Tests/Implementation/Trace/TracerTest.cs
index e749e89a7c6..13647495e0d 100644
--- a/test/OpenTelemetry.Tests/Implementation/Trace/TracerTest.cs
+++ b/test/OpenTelemetry.Tests/Implementation/Trace/TracerTest.cs
@@ -40,7 +40,7 @@ public class TracerTest
public TracerTest()
{
- this.spanProcessor = new SimpleSpanProcessor(new TestExporter(null));
+ this.spanProcessor = new SimpleSpanProcessor(new TestSpanExporter(null));
this.tracerConfiguration = new TracerConfiguration();
this.tracerFactory = TracerFactory.Create(b => b
.AddProcessorPipeline(p => p.AddProcessor(_ => this.spanProcessor)));
@@ -50,7 +50,7 @@ public TracerTest()
[Fact]
public void BadConstructorArgumentsThrow()
{
- var noopProc = new SimpleSpanProcessor(new TestExporter(null));
+ var noopProc = new SimpleSpanProcessor(new TestSpanExporter(null));
Assert.Throws(() => new TracerSdk(null, new AlwaysOnSampler(), new TracerConfiguration(), Resource.Empty));
Assert.Throws(() => new TracerSdk(noopProc, new AlwaysOnSampler(), null, Resource.Empty));
Assert.Throws(() => new TracerSdk(noopProc, new AlwaysOnSampler(), new TracerConfiguration(), null));