Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ For semantic convention package changes, see the [semconv CHANGELOG](packages/se

* fix(exporter-zipkin): remove usages of deprecated `url.parse` from `node:url` [#5390](https://github.com/open-telemetry/opentelemetry-js/pull/5390) @chancancode
* fix(sdk-metrics): do not export from `PeriodicExportingMetricReader` when there are no metrics to export. [#5288](https://github.com/open-telemetry/opentelemetry-js/pull/5288) @jacksonweber
* fix(sdk-trace-base): always wait on pending export in SimpleSpanProcessor. [#5303](https://github.com/open-telemetry/opentelemetry-js/pull/5303) @anuraaga

### :books: (Refine Doc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
ExportResultCode,
globalErrorHandler,
BindOnceFuture,
ExportResult,
} from '@opentelemetry/core';
import { Span } from '../Span';
import { SpanProcessor } from '../SpanProcessor';
Expand All @@ -38,16 +37,15 @@
*/
export class SimpleSpanProcessor implements SpanProcessor {
private _shutdownOnce: BindOnceFuture<void>;
private _unresolvedExports: Set<Promise<void>>;
private _pendingExports: Set<Promise<void>>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unresolved seemed to be referring specifically to the resource so I renamed to pending to be more generic


constructor(private readonly _exporter: SpanExporter) {
this._shutdownOnce = new BindOnceFuture(this._shutdown, this);
this._unresolvedExports = new Set<Promise<void>>();
this._pendingExports = new Set<Promise<void>>();
}

async forceFlush(): Promise<void> {
// await unresolved resources before resolving
await Promise.all(Array.from(this._unresolvedExports));
await Promise.all(Array.from(this._pendingExports));
if (this._exporter.forceFlush) {
await this._exporter.forceFlush();
}
Expand All @@ -64,43 +62,26 @@
return;
}

const doExport = () =>
internal
._export(this._exporter, [span])
.then((result: ExportResult) => {
if (result.code !== ExportResultCode.SUCCESS) {
globalErrorHandler(
result.error ??
new Error(
`SimpleSpanProcessor: span export failed (status ${result})`
)
);
}
})
.catch(error => {
globalErrorHandler(error);
});
const pendingExport = this._doExport(span).catch(err =>
globalErrorHandler(err)
);
// Enqueue this export to the pending list so it can be flushed by the user.
this._pendingExports.add(pendingExport);
pendingExport.finally(() => this._pendingExports.delete(pendingExport));
}

// Avoid scheduling a promise to make the behavior more predictable and easier to test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this since it seemed to be misleading - a promise is scheduled by doExport anyways so there's no way to avoid scheduling one

private async _doExport(span: ReadableSpan): Promise<void> {
if (span.resource.asyncAttributesPending) {
const exportPromise = (span.resource as Resource)
.waitForAsyncAttributes?.()
.then(
() => {
if (exportPromise != null) {
this._unresolvedExports.delete(exportPromise);
}
return doExport();
},
err => globalErrorHandler(err)
);
// Ensure resource is fully resolved before exporting.
await (span.resource as Resource).waitForAsyncAttributes?.();
}

// store the unresolved exports
if (exportPromise != null) {
this._unresolvedExports.add(exportPromise);
}
} else {
void doExport();
const result = await internal._export(this._exporter, [span]);
if (result.code !== ExportResultCode.SUCCESS) {
throw (
result.error ??
new Error(`SimpleSpanProcessor: span export failed (status ${result})`)

Check warning on line 83 in packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts

View check run for this annotation

Codecov / codecov/patch

packages/opentelemetry-sdk-trace-base/src/export/SimpleSpanProcessor.ts#L83

Added line #L83 was not covered by tests
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,40 @@ describe('SimpleSpanProcessor', () => {
);
});

it('should await doExport() and delete from _unresolvedExports', async () => {
it('should await doExport() and delete from _pendingExports', async () => {
const testExporterWithDelay = new TestExporterWithDelay();
const processor = new SimpleSpanProcessor(testExporterWithDelay);
const spanContext: SpanContext = {
traceId: 'a3cda95b652f4a1592b449d5929fda1b',
spanId: '5e0c63257de34c92',
traceFlags: TraceFlags.SAMPLED,
};
const tracer = provider.getTracer('default') as Tracer;
const span = new SpanImpl({
scope: tracer.instrumentationScope,
resource: tracer['_resource'],
context: ROOT_CONTEXT,
spanContext,
name: 'span-name',
kind: SpanKind.CLIENT,
spanLimits: tracer.getSpanLimits(),
spanProcessor: tracer['_spanProcessor'],
});
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

assert.strictEqual(processor['_pendingExports'].size, 1);

await processor.forceFlush();

assert.strictEqual(processor['_pendingExports'].size, 0);

const exportedSpans = testExporterWithDelay.getFinishedSpans();

assert.strictEqual(exportedSpans.length, 1);
});

it('should await doExport() and delete from _pendingExports with async resource', async () => {
const testExporterWithDelay = new TestExporterWithDelay();
const processor = new SimpleSpanProcessor(testExporterWithDelay);

Expand Down Expand Up @@ -249,11 +282,11 @@ describe('SimpleSpanProcessor', () => {
processor.onStart(span, ROOT_CONTEXT);
processor.onEnd(span);

assert.strictEqual(processor['_unresolvedExports'].size, 1);
assert.strictEqual(processor['_pendingExports'].size, 1);

await processor.forceFlush();

assert.strictEqual(processor['_unresolvedExports'].size, 0);
assert.strictEqual(processor['_pendingExports'].size, 0);

const exportedSpans = testExporterWithDelay.getFinishedSpans();

Expand Down