-
Notifications
You must be signed in to change notification settings - Fork 862
Batching activity processor #755
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
522a8d6
639b58a
66cceca
cd21b18
0c808c5
82f1499
d2abd31
ef75193
64254ae
5c58475
30f1d14
9a2230e
aec5fad
3d78f77
36879b6
27ece54
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,247 @@ | ||
| // <copyright file="BatchingActivityProcessor.cs" company="OpenTelemetry Authors"> | ||
| // Copyright The OpenTelemetry Authors | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
| // </copyright> | ||
|
|
||
| using System; | ||
| using System.Collections.Concurrent; | ||
| using System.Collections.Generic; | ||
| using System.Diagnostics; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using OpenTelemetry.Internal; | ||
|
|
||
| namespace OpenTelemetry.Trace.Export | ||
| { | ||
| /// <summary> | ||
| /// Implements processor that batches activities before calling exporter. | ||
| /// </summary> | ||
| public class BatchingActivityProcessor : ActivityProcessor, IDisposable | ||
| { | ||
| private const int DefaultMaxQueueSize = 2048; | ||
| private const int DefaultMaxExportBatchSize = 512; | ||
| private static readonly TimeSpan DefaultScheduleDelay = TimeSpan.FromMilliseconds(5000); | ||
| private readonly ConcurrentQueue<Activity> exportQueue; | ||
| private readonly int maxQueueSize; | ||
| private readonly int maxExportBatchSize; | ||
| private readonly TimeSpan scheduleDelay; | ||
| private readonly ActivityExporter exporter; | ||
| private readonly List<Activity> batch = new List<Activity>(); | ||
| private CancellationTokenSource cts; | ||
| private volatile int currentQueueSize; | ||
| private bool stopping = false; | ||
reyang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="BatchingActivityProcessor"/> class with default parameters: | ||
| /// <list type="bullet"> | ||
| /// <item> | ||
| /// <description>maxQueueSize = 2048,</description> | ||
| /// </item> | ||
| /// <item> | ||
| /// <description>scheduleDelay = 5 sec,</description> | ||
| /// </item> | ||
| /// <item> | ||
| /// <description>maxExportBatchSize = 512</description> | ||
| /// </item> | ||
| /// </list> | ||
| /// </summary> | ||
| /// <param name="exporter">Exporter instance.</param> | ||
| public BatchingActivityProcessor(ActivityExporter exporter) | ||
| : this(exporter, DefaultMaxQueueSize, DefaultScheduleDelay, DefaultMaxExportBatchSize) | ||
| { | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="BatchingActivityProcessor"/> class with custom settings. | ||
| /// </summary> | ||
| /// <param name="exporter">Exporter instance.</param> | ||
| /// <param name="maxQueueSize">Maximum queue size. After the size is reached activities are dropped by processor.</param> | ||
| /// <param name="scheduleDelay">The delay between two consecutive exports.</param> | ||
| /// <param name="maxExportBatchSize">The maximum batch size of every export. It must be smaller or equal to maxQueueSize.</param> | ||
| public BatchingActivityProcessor(ActivityExporter exporter, int maxQueueSize, TimeSpan scheduleDelay, int maxExportBatchSize) | ||
| { | ||
| if (maxQueueSize <= 0) | ||
| { | ||
| throw new ArgumentOutOfRangeException(nameof(maxQueueSize)); | ||
| } | ||
|
|
||
| if (maxExportBatchSize <= 0 || maxExportBatchSize > maxQueueSize) | ||
| { | ||
| throw new ArgumentOutOfRangeException(nameof(maxExportBatchSize)); | ||
| } | ||
|
|
||
| this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter)); | ||
| this.maxQueueSize = maxQueueSize; | ||
| this.scheduleDelay = scheduleDelay; | ||
| this.maxExportBatchSize = maxExportBatchSize; | ||
|
|
||
| this.cts = new CancellationTokenSource(); | ||
| this.exportQueue = new ConcurrentQueue<Activity>(); | ||
|
|
||
| // worker task that will last for lifetime of processor. | ||
| // Threads are also useless as exporter tasks run in thread pool threads. | ||
| Task.Run(() => this.Worker(this.cts.Token), this.cts.Token); | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public override void OnStart(Activity activity) | ||
| { | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public override void OnEnd(Activity activity) | ||
| { | ||
| if (this.stopping) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| // because of race-condition between checking the size and enqueueing, | ||
| // we might end up with a bit more activities than maxQueueSize. | ||
| // Let's just tolerate it to avoid extra synchronization. | ||
| if (this.currentQueueSize >= this.maxQueueSize) | ||
| { | ||
| OpenTelemetrySdkEventSource.Log.SpanProcessorQueueIsExhausted(); | ||
| return; | ||
| } | ||
|
|
||
| Interlocked.Increment(ref this.currentQueueSize); | ||
|
|
||
| this.exportQueue.Enqueue(activity); | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| public override async Task ShutdownAsync(CancellationToken cancellationToken) | ||
| { | ||
| if (!this.stopping) | ||
| { | ||
| this.stopping = true; | ||
|
|
||
| // This will stop the loop after current batch finishes. | ||
| this.cts.Cancel(false); | ||
| this.cts.Dispose(); | ||
| this.cts = null; | ||
|
|
||
| // if there are more items, continue until cancellation token allows | ||
| while (this.currentQueueSize > 0 && !cancellationToken.IsCancellationRequested) | ||
| { | ||
| await this.ExportBatchAsync(cancellationToken).ConfigureAwait(false); | ||
| } | ||
|
|
||
| await this.exporter.ShutdownAsync(cancellationToken); | ||
|
|
||
| // there is no point in waiting for a worker task if cancellation happens | ||
| // it's dead already or will die on the next iteration on its own | ||
|
|
||
| // ExportBatchAsync must never throw, we are here either because it was cancelled | ||
| // or because there are no items left | ||
| OpenTelemetrySdkEventSource.Log.ShutdownEvent(this.currentQueueSize); | ||
| } | ||
| } | ||
|
|
||
| public void Dispose() | ||
| { | ||
| this.Dispose(true); | ||
| } | ||
|
|
||
| protected virtual void Dispose(bool isDisposing) | ||
| { | ||
| if (!this.stopping) | ||
| { | ||
| this.ShutdownAsync(CancellationToken.None).ContinueWith(_ => { }).GetAwaiter().GetResult(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could introduce exception which is against the .NET guideline: The object must not throw an exception if its Dispose method is called multiple times. Reason: this.cts.Cancel(false);
this.cts.Dispose();
this.cts = null;
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's with
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. This will suppress an exception.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @reyang @CodeBlanch @cijothomas
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'd open a new issue and merge this one. So that we can make progress and keep track of fixing the underlying issue.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created an issue #769 to track it. |
||
| } | ||
|
|
||
| if (isDisposing) | ||
| { | ||
| if (this.exporter is IDisposable disposableExporter) | ||
| { | ||
| try | ||
| { | ||
| disposableExporter.Dispose(); | ||
| } | ||
| catch (Exception e) | ||
| { | ||
| OpenTelemetrySdkEventSource.Log.SpanProcessorException("Dispose", e); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private async Task ExportBatchAsync(CancellationToken cancellationToken) | ||
| { | ||
| try | ||
| { | ||
| if (cancellationToken.IsCancellationRequested) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| if (this.exportQueue.TryDequeue(out var nextActivity)) | ||
| { | ||
| Interlocked.Decrement(ref this.currentQueueSize); | ||
| this.batch.Add(nextActivity); | ||
| } | ||
| else | ||
| { | ||
| // nothing in queue | ||
| return; | ||
| } | ||
|
|
||
| while (this.batch.Count < this.maxExportBatchSize && this.exportQueue.TryDequeue(out nextActivity)) | ||
| { | ||
| Interlocked.Decrement(ref this.currentQueueSize); | ||
| this.batch.Add(nextActivity); | ||
| } | ||
|
|
||
| var result = await this.exporter.ExportAsync(this.batch, cancellationToken).ConfigureAwait(false); | ||
| if (result != ExportResult.Success) | ||
| { | ||
| OpenTelemetrySdkEventSource.Log.ExporterErrorResult(result); | ||
|
|
||
| // we do not support retries for now and leave it up to exporter | ||
| // as only exporter implementation knows how to retry: which items failed | ||
| // and what is the reasonable policy for that exporter. | ||
| } | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.ExportBatchAsync), ex); | ||
| } | ||
| finally | ||
| { | ||
| this.batch.Clear(); | ||
| } | ||
| } | ||
|
|
||
| private async Task Worker(CancellationToken cancellationToken) | ||
| { | ||
| while (!cancellationToken.IsCancellationRequested) | ||
| { | ||
| var sw = Stopwatch.StartNew(); | ||
| await this.ExportBatchAsync(cancellationToken).ConfigureAwait(false); | ||
|
|
||
| if (cancellationToken.IsCancellationRequested) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| var remainingWait = this.scheduleDelay - sw.Elapsed; | ||
| if (remainingWait > TimeSpan.Zero) | ||
| { | ||
| await Task.Delay(remainingWait, cancellationToken).ConfigureAwait(false); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.