diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoirFactory.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoirFactory.java new file mode 100644 index 00000000000..dff6d3b2618 --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoirFactory.java @@ -0,0 +1,96 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.exemplar; + +import io.opentelemetry.sdk.common.Clock; +import io.opentelemetry.sdk.metrics.data.DoubleExemplarData; +import io.opentelemetry.sdk.metrics.data.LongExemplarData; +import java.util.List; +import java.util.Random; +import java.util.function.Supplier; + +/** + * An interface for constructing an appropriate ExemplarReservoir for a given metric "memory cell". + */ +public interface ExemplarReservoirFactory { + ExemplarReservoir createLongExemplarReservoir(); + + ExemplarReservoir createDoubleExemplarReservoir(); + + /** An exemplar reservoir that stores no exemplars. */ + static ExemplarReservoirFactory noSamples() { + return new ExemplarReservoirFactory() { + @Override + public ExemplarReservoir createLongExemplarReservoir() { + return NoopExemplarReservoir.LONG_INSTANCE; + } + + @Override + public ExemplarReservoir createDoubleExemplarReservoir() { + return NoopExemplarReservoir.DOUBLE_INSTANCE; + } + + @Override + public String toString() { + return "noSamples"; + } + }; + } + + /** + * A reservoir with fixed size that stores the given number of exemplars. + * + * @param clock The clock to use when annotating measurements with time. + * @param size The maximum number of exemplars to preserve. + * @param randomSupplier The random number generator to use for sampling. + */ + static ExemplarReservoirFactory fixedSize( + Clock clock, int size, Supplier randomSupplier) { + return new ExemplarReservoirFactory() { + @Override + public ExemplarReservoir createLongExemplarReservoir() { + return RandomFixedSizeExemplarReservoir.createLong(clock, size, randomSupplier); + } + + @Override + public ExemplarReservoir createDoubleExemplarReservoir() { + return RandomFixedSizeExemplarReservoir.createDouble(clock, size, randomSupplier); + } + + @Override + public String toString() { + return "fixedSize(" + size + ")"; + } + }; + } + + /** + * A Reservoir sampler that preserves the latest seen measurement per-histogram bucket. + * + * @param clock The clock to use when annotating measurements with time. + * @param boundaries A list of (inclusive) upper bounds for the histogram. Should be in order from + * lowest to highest. + */ + static ExemplarReservoirFactory histogramBucket(Clock clock, List boundaries) { + return new ExemplarReservoirFactory() { + @Override + public ExemplarReservoir createLongExemplarReservoir() { + throw new UnsupportedOperationException( + "Cannot create long exemplars for histogram buckets"); + } + + @Override + public ExemplarReservoir createDoubleExemplarReservoir() { + return new HistogramExemplarReservoir(clock, boundaries); + } + + @Override + public String toString() { + return "histogramBucket(" + boundaries + ")"; + } + }; + } +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/AggregationExtension.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/AggregationExtension.java new file mode 100644 index 00000000000..a3132c6bcdf --- /dev/null +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/AggregationExtension.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics.internal.view; + +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; +import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory; + +/** + * An interface which allows customized configuration of aggregators. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public interface AggregationExtension extends Aggregation, AggregatorFactory { + /** Override the exemplar reservoir used for this aggregation. */ + AggregationExtension setExemplarReservoirFactory(ExemplarReservoirFactory reservoirFactory); +} diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregation.java index a0408bc32fd..dda62921310 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregation.java @@ -14,11 +14,11 @@ import io.opentelemetry.sdk.metrics.data.MetricDataType; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; -import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleBase2ExponentialHistogramAggregator; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; +import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory; /** * Exponential bucket histogram aggregation configuration. @@ -26,20 +26,28 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class Base2ExponentialHistogramAggregation implements Aggregation, AggregatorFactory { +public final class Base2ExponentialHistogramAggregation implements AggregationExtension { private static final int DEFAULT_MAX_BUCKETS = 160; private static final int DEFAULT_MAX_SCALE = 20; - + private static final ExemplarReservoirFactory DEFAULT_RESERVOIR = + ExemplarReservoirFactory.fixedSize( + Clock.getDefault(), + Runtime.getRuntime().availableProcessors(), + RandomSupplier.platformDefault()); private static final Aggregation DEFAULT = - new Base2ExponentialHistogramAggregation(DEFAULT_MAX_BUCKETS, DEFAULT_MAX_SCALE); + new Base2ExponentialHistogramAggregation( + DEFAULT_MAX_BUCKETS, DEFAULT_MAX_SCALE, DEFAULT_RESERVOIR); private final int maxBuckets; private final int maxScale; + private final ExemplarReservoirFactory reservoirFactory; - private Base2ExponentialHistogramAggregation(int maxBuckets, int maxScale) { + private Base2ExponentialHistogramAggregation( + int maxBuckets, int maxScale, ExemplarReservoirFactory reservoirFactory) { this.maxBuckets = maxBuckets; this.maxScale = maxScale; + this.reservoirFactory = reservoirFactory; } public static Aggregation getDefault() { @@ -60,7 +68,7 @@ public static Aggregation getDefault() { public static Aggregation create(int maxBuckets, int maxScale) { checkArgument(maxBuckets >= 1, "maxBuckets must be > 0"); checkArgument(maxScale <= 20 && maxScale >= -10, "maxScale must be -10 <= x <= 20"); - return new Base2ExponentialHistogramAggregation(maxBuckets, maxScale); + return new Base2ExponentialHistogramAggregation(maxBuckets, maxScale, DEFAULT_RESERVOIR); } @Override @@ -71,11 +79,7 @@ public Aggregator createAggr new DoubleBase2ExponentialHistogramAggregator( () -> ExemplarReservoir.filtered( - exemplarFilter, - ExemplarReservoir.doubleFixedSizeReservoir( - Clock.getDefault(), - Runtime.getRuntime().availableProcessors(), - RandomSupplier.platformDefault())), + exemplarFilter, reservoirFactory.createDoubleExemplarReservoir()), maxBuckets, maxScale); } @@ -99,4 +103,11 @@ public String toString() { + maxScale + "}"; } + + @Override + public AggregationExtension setExemplarReservoirFactory( + ExemplarReservoirFactory reservoirFactory) { + return new Base2ExponentialHistogramAggregation( + this.maxBuckets, this.maxScale, reservoirFactory); + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java index 798c9ea11b4..239b7b26bf7 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java @@ -10,11 +10,12 @@ import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; -import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; +import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Aggregation that selects the specified default based on instrument. @@ -22,9 +23,9 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class DefaultAggregation implements Aggregation, AggregatorFactory { +public final class DefaultAggregation implements AggregationExtension { - private static final Aggregation INSTANCE = new DefaultAggregation(); + private static final Aggregation INSTANCE = new DefaultAggregation(null); public static Aggregation getInstance() { return INSTANCE; @@ -33,39 +34,49 @@ public static Aggregation getInstance() { private static final ThrottlingLogger logger = new ThrottlingLogger(Logger.getLogger(DefaultAggregation.class.getName())); - private DefaultAggregation() {} + @Nullable private final ExemplarReservoirFactory reservoirFactory; - private static Aggregation resolve(InstrumentDescriptor instrument, boolean withAdvice) { + private DefaultAggregation(@Nullable ExemplarReservoirFactory reservoirFactory) { + this.reservoirFactory = reservoirFactory; + } + + private static AggregationExtension resolve(InstrumentDescriptor instrument, boolean withAdvice) { switch (instrument.getType()) { case COUNTER: case UP_DOWN_COUNTER: case OBSERVABLE_COUNTER: case OBSERVABLE_UP_DOWN_COUNTER: - return SumAggregation.getInstance(); + return (AggregationExtension) SumAggregation.getInstance(); case HISTOGRAM: if (withAdvice && instrument.getAdvice().getExplicitBucketBoundaries() != null) { - return ExplicitBucketHistogramAggregation.create( - instrument.getAdvice().getExplicitBucketBoundaries()); + return (AggregationExtension) + ExplicitBucketHistogramAggregation.create( + instrument.getAdvice().getExplicitBucketBoundaries()); } - return ExplicitBucketHistogramAggregation.getDefault(); + return (AggregationExtension) ExplicitBucketHistogramAggregation.getDefault(); case OBSERVABLE_GAUGE: - return LastValueAggregation.getInstance(); + return (AggregationExtension) LastValueAggregation.getInstance(); } logger.log(Level.WARNING, "Unable to find default aggregation for instrument: " + instrument); - return DropAggregation.getInstance(); + return (AggregationExtension) DropAggregation.getInstance(); } @Override public Aggregator createAggregator( InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter) { - return ((AggregatorFactory) resolve(instrumentDescriptor, /* withAdvice= */ true)) + if (this.reservoirFactory != null) { + return resolve(instrumentDescriptor, /* withAdvice= */ true) + .setExemplarReservoirFactory(this.reservoirFactory) + .createAggregator(instrumentDescriptor, exemplarFilter); + } + return resolve(instrumentDescriptor, /* withAdvice= */ true) .createAggregator(instrumentDescriptor, exemplarFilter); } @Override public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescriptor) { // This should always return true - return ((AggregatorFactory) resolve(instrumentDescriptor, /* withAdvice= */ false)) + return resolve(instrumentDescriptor, /* withAdvice= */ false) .isCompatibleWithInstrument(instrumentDescriptor); } @@ -73,4 +84,10 @@ public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescrip public String toString() { return "DefaultAggregation"; } + + @Override + public AggregationExtension setExemplarReservoirFactory( + ExemplarReservoirFactory reservoirFactory) { + return new DefaultAggregation(reservoirFactory); + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DropAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DropAggregation.java index df02ce003db..22f0b9cbdb6 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DropAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DropAggregation.java @@ -9,9 +9,9 @@ import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; -import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; +import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory; /** * Configuration representing no aggregation. @@ -19,7 +19,7 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class DropAggregation implements Aggregation, AggregatorFactory { +public final class DropAggregation implements AggregationExtension { private static final Aggregation INSTANCE = new DropAggregation(); @@ -45,4 +45,10 @@ public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescrip public String toString() { return "DropAggregation"; } + + @Override + public AggregationExtension setExemplarReservoirFactory( + ExemplarReservoirFactory reservoirFactory) { + throw new UnsupportedOperationException("DropAggregation does not allow exemplars"); + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ExplicitBucketHistogramAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ExplicitBucketHistogramAggregation.java index 1ec3867dc7e..7c935ca2c03 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ExplicitBucketHistogramAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/ExplicitBucketHistogramAggregation.java @@ -10,12 +10,12 @@ import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; -import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleExplicitBucketHistogramAggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.ExplicitBucketHistogramUtils; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; +import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory; import java.util.List; /** @@ -24,27 +24,35 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class ExplicitBucketHistogramAggregation implements Aggregation, AggregatorFactory { +public final class ExplicitBucketHistogramAggregation implements AggregationExtension { private static final Aggregation DEFAULT = new ExplicitBucketHistogramAggregation( - ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES); + ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES, + ExemplarReservoirFactory.histogramBucket( + Clock.getDefault(), + ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES)); public static Aggregation getDefault() { return DEFAULT; } public static Aggregation create(List bucketBoundaries) { - return new ExplicitBucketHistogramAggregation(bucketBoundaries); + return new ExplicitBucketHistogramAggregation( + bucketBoundaries, + ExemplarReservoirFactory.histogramBucket(Clock.getDefault(), bucketBoundaries)); } private final List bucketBoundaries; private final double[] bucketBoundaryArray; + private final ExemplarReservoirFactory reservoirFactory; - private ExplicitBucketHistogramAggregation(List bucketBoundaries) { + private ExplicitBucketHistogramAggregation( + List bucketBoundaries, ExemplarReservoirFactory reservoirFactory) { this.bucketBoundaries = bucketBoundaries; // We need to fail here if our bucket boundaries are ill-configured. this.bucketBoundaryArray = ExplicitBucketHistogramUtils.createBoundaryArray(bucketBoundaries); + this.reservoirFactory = reservoirFactory; } @Override @@ -56,9 +64,7 @@ public Aggregator createAggr bucketBoundaryArray, () -> ExemplarReservoir.filtered( - exemplarFilter, - ExemplarReservoir.histogramBucketReservoir( - Clock.getDefault(), bucketBoundaries))); + exemplarFilter, reservoirFactory.createDoubleExemplarReservoir())); } @Override @@ -76,4 +82,10 @@ public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescrip public String toString() { return "ExplicitBucketHistogramAggregation(" + bucketBoundaries.toString() + ")"; } + + @Override + public AggregationExtension setExemplarReservoirFactory( + ExemplarReservoirFactory reservoirFactory) { + return new ExplicitBucketHistogramAggregation(this.bucketBoundaries, reservoirFactory); + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/LastValueAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/LastValueAggregation.java index 5f6039532e0..d61fc61db6e 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/LastValueAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/LastValueAggregation.java @@ -10,12 +10,12 @@ import io.opentelemetry.sdk.metrics.data.ExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; -import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleLastValueAggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.LongLastValueAggregator; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; +import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory; /** * Last-value aggregation configuration. @@ -23,15 +23,20 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class LastValueAggregation implements Aggregation, AggregatorFactory { +public final class LastValueAggregation implements AggregationExtension { - private static final Aggregation INSTANCE = new LastValueAggregation(); + private static final Aggregation INSTANCE = + new LastValueAggregation(ExemplarReservoirFactory.noSamples()); public static Aggregation getInstance() { return INSTANCE; } - private LastValueAggregation() {} + private LastValueAggregation(ExemplarReservoirFactory reservoirFactory) { + this.reservoirFactory = reservoirFactory; + } + + private final ExemplarReservoirFactory reservoirFactory; @Override @SuppressWarnings("unchecked") @@ -41,9 +46,17 @@ public Aggregator createAggr // For the initial version we do not sample exemplars on gauges. switch (instrumentDescriptor.getValueType()) { case LONG: - return (Aggregator) new LongLastValueAggregator(ExemplarReservoir::longNoSamples); + return (Aggregator) + new LongLastValueAggregator( + () -> + ExemplarReservoir.filtered( + exemplarFilter, reservoirFactory.createLongExemplarReservoir())); case DOUBLE: - return (Aggregator) new DoubleLastValueAggregator(ExemplarReservoir::doubleNoSamples); + return (Aggregator) + new DoubleLastValueAggregator( + () -> + ExemplarReservoir.filtered( + exemplarFilter, reservoirFactory.createDoubleExemplarReservoir())); } throw new IllegalArgumentException("Invalid instrument value type"); } @@ -57,4 +70,10 @@ public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescrip public String toString() { return "LastValueAggregation"; } + + @Override + public AggregationExtension setExemplarReservoirFactory( + ExemplarReservoirFactory reservoirFactory) { + return new LastValueAggregation(reservoirFactory); + } } diff --git a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/SumAggregation.java b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/SumAggregation.java index b689d9f58f1..fd9a8419bc9 100644 --- a/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/SumAggregation.java +++ b/sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/SumAggregation.java @@ -13,12 +13,12 @@ import io.opentelemetry.sdk.metrics.data.LongExemplarData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator; -import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory; import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleSumAggregator; import io.opentelemetry.sdk.metrics.internal.aggregator.LongSumAggregator; import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir; +import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory; import java.util.function.Supplier; /** @@ -27,14 +27,26 @@ *

This class is internal and is hence not for public use. Its APIs are unstable and can change * at any time. */ -public final class SumAggregation implements Aggregation, AggregatorFactory { +public final class SumAggregation implements AggregationExtension { private static final SumAggregation INSTANCE = new SumAggregation(); public static Aggregation getInstance() { return INSTANCE; } - private SumAggregation() {} + private SumAggregation(ExemplarReservoirFactory reservoirFactory) { + this.reservoirFactory = reservoirFactory; + } + + private SumAggregation() { + this( + ExemplarReservoirFactory.fixedSize( + Clock.getDefault(), + Runtime.getRuntime().availableProcessors(), + RandomSupplier.platformDefault())); + } + + private final ExemplarReservoirFactory reservoirFactory; @Override @SuppressWarnings("unchecked") @@ -43,27 +55,20 @@ public Aggregator createAggr switch (instrumentDescriptor.getValueType()) { case LONG: { - Supplier> reservoirFactory = + Supplier> reservoirSupplier = () -> ExemplarReservoir.filtered( - exemplarFilter, - ExemplarReservoir.longFixedSizeReservoir( - Clock.getDefault(), - Runtime.getRuntime().availableProcessors(), - RandomSupplier.platformDefault())); - return (Aggregator) new LongSumAggregator(instrumentDescriptor, reservoirFactory); + exemplarFilter, reservoirFactory.createLongExemplarReservoir()); + return (Aggregator) new LongSumAggregator(instrumentDescriptor, reservoirSupplier); } case DOUBLE: { - Supplier> reservoirFactory = + Supplier> reservoirSupplier = () -> ExemplarReservoir.filtered( - exemplarFilter, - ExemplarReservoir.doubleFixedSizeReservoir( - Clock.getDefault(), - Runtime.getRuntime().availableProcessors(), - RandomSupplier.platformDefault())); - return (Aggregator) new DoubleSumAggregator(instrumentDescriptor, reservoirFactory); + exemplarFilter, reservoirFactory.createDoubleExemplarReservoir()); + return (Aggregator) + new DoubleSumAggregator(instrumentDescriptor, reservoirSupplier); } } throw new IllegalArgumentException("Invalid instrument value type"); @@ -87,4 +92,11 @@ public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescrip public String toString() { return "SumAggregation"; } + + @Override + public AggregationExtension setExemplarReservoirFactory( + ExemplarReservoirFactory reservoirFactory) { + // We copy-on-write as we don't expect much customization here. + return new SumAggregation(reservoirFactory); + } } diff --git a/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkCustomExemplarReservoirTest.java b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkCustomExemplarReservoirTest.java new file mode 100644 index 00000000000..87845a3cbb4 --- /dev/null +++ b/sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/SdkCustomExemplarReservoirTest.java @@ -0,0 +1,84 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.metrics; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleCounter; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter; +import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory; +import io.opentelemetry.sdk.metrics.internal.view.AggregationExtension; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.sdk.testing.time.TestClock; +import java.util.Random; +import org.junit.jupiter.api.Test; + +/** Unit tests for using the AggregationExtension and ExemplarReservoirFactory internal APIs. */ +public class SdkCustomExemplarReservoirTest { + private static final Resource RESOURCE = + Resource.create(Attributes.of(stringKey("resource_key"), "resource_value")); + private final TestClock testClock = TestClock.create(); + + private Meter initialize(InMemoryMetricReader memory, ExemplarReservoirFactory reservoirFactory) { + SdkMeterProvider sdkMeterProvider = + SdkMeterProvider.builder() + .setClock(testClock) + .setResource(RESOURCE) + .setExemplarFilter(ExemplarFilter.alwaysOn()) + .registerView( + InstrumentSelector.builder().setName("test").build(), + View.builder() + .setAggregation( + ((AggregationExtension) Aggregation.sum()) + .setExemplarReservoirFactory(reservoirFactory)) + .build()) + .registerMetricReader(memory) + .build(); + return sdkMeterProvider.get(getClass().getName()); + } + + @Test + void collectMetrics_withCustomExemplarReservoir() { + // Create reservoir that always samples, and makes sure we get the first two samples. + ExemplarReservoirFactory testReservor = + ExemplarReservoirFactory.fixedSize( + testClock, + 2, + () -> + new Random() { + @Override + public int nextInt(int bound) { + return bound - 1; + } + }); + InMemoryMetricReader sdkMeterReader = InMemoryMetricReader.create(); + Meter sdkMeter = initialize(sdkMeterReader, testReservor); + DoubleCounter instrument = sdkMeter.counterBuilder("test").ofDoubles().build(); + instrument.add(10); + instrument.add(1); + assertThat(sdkMeterReader.collectAllMetrics()) + .satisfiesExactly( + metric -> + assertThat(metric) + .hasResource(RESOURCE) + .hasDoubleSumSatisfying( + sum -> + sum.isMonotonic() + .isCumulative() + .hasPointsSatisfying( + point -> + point + .hasValue(11) + // TODO - has exemplars matching reservoir behavior + .hasExemplarsSatisfying( + exemplar -> exemplar.hasValue(10), + exemplar -> exemplar.hasValue(1))))); + } +}