Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4aa78d3
adding projection functionality, rethink automic double
harsimar Dec 10, 2024
bf15276
atomic double and some other changes
harsimar Dec 11, 2024
4ace19d
merge from master to get p3 changes
harsimar Dec 11, 2024
75718c7
pr comments and some build stuff
harsimar Dec 11, 2024
27d4979
changing guava version
harsimar Dec 11, 2024
619bb47
spotbug fix
harsimar Dec 11, 2024
915d579
starting to add tests
harsimar Dec 12, 2024
9c8fa37
added unit tests for derived metric projection
harsimar Dec 13, 2024
e3377d7
fix conflict from main
harsimar Dec 13, 2024
c599211
reorganize tests
harsimar Dec 13, 2024
a6ab936
fixing inconsistency with main re okio
harsimar Dec 13, 2024
494a1b9
pr comments & small refactorings
harsimar Dec 16, 2024
1351d2b
remove logging, spotless
harsimar Dec 16, 2024
a77adfe
validator
harsimar Dec 17, 2024
68b9c83
minor
harsimar Dec 17, 2024
d80053e
changes to concurrency handling
harsimar Dec 17, 2024
f83c116
moving synchronization to derivedMetricAggregation
harsimar Dec 17, 2024
d8b618f
moving derivedMetricAggregation to its own file
harsimar Dec 17, 2024
77f176a
merge projection into here
harsimar Dec 18, 2024
96e705f
validator round 1 - need to refactor for validator to store errors
harsimar Dec 18, 2024
6e81b55
finish implementation and starting to add tests
harsimar Dec 19, 2024
568f059
merge from main
harsimar Dec 19, 2024
de6bd93
spotless
harsimar Dec 20, 2024
f492237
remove unused classes
harsimar Dec 20, 2024
191783d
pr comments
harsimar Jan 6, 2025
c3c3c49
fix ci errors
harsimar Jan 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
changes to concurrency handling
  • Loading branch information
harsimar committed Dec 17, 2024
commit d80053ec47958d1aabdf00880abd12904b3115e7
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@
<suppress files="com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.RemoteDependency" checks="EnforceFinalFieldsCheck" />
<suppress files="com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.Request" checks="EnforceFinalFieldsCheck" />
<suppress files="com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.Trace" checks="EnforceFinalFieldsCheck" />
<suppress files="com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.filtering.DerivedMetricProjections" checks="EnforceFinalFieldsCheck" />
<suppress files="com.azure.monitor.opentelemetry.autoconfigure.AzureMonitorAutoConfigure.java" checks="com.azure.tools.checkstyle.checks.ExternalDependencyExposedCheck" />
<suppress files="com.azure.monitor.opentelemetry.autoconfigure.implementation.builders.TelemetryTruncation.java" checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck" />
<suppress files="com.azure.monitor.opentelemetry.autoconfigure.implementation.heartbeat.DefaultHeartBeatPropertyProvider.java" checks="com.azure.tools.checkstyle.checks.GoodLoggingCheck" />
Expand Down
6 changes: 0 additions & 6 deletions sdk/monitor/azure-monitor-opentelemetry-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@
<artifactId>opentelemetry-semconv-incubating</artifactId>
<version>1.26.0-alpha</version> <!-- {x-version-update;io.opentelemetry.semconv:opentelemetry-semconv-incubating;external_dependency} -->
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.1.0-jre</version> <!-- {x-version-update;com.google.guava:guava;external_dependency} -->
</dependency>

<!-- Added this dependency to include necessary annotations used by reactor core.
Without this dependency, javadoc throws a warning as it cannot find enum When.MAYBE
Expand Down Expand Up @@ -205,7 +200,6 @@
<include>io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:[1.43.0]</include> <!-- {x-include-update;io.opentelemetry:opentelemetry-sdk-extension-autoconfigure;external_dependency} -->
<include>io.opentelemetry.semconv:opentelemetry-semconv-incubating:[1.26.0-alpha]</include> <!-- {x-include-update;io.opentelemetry.semconv:opentelemetry-semconv-incubating;external_dependency} -->
<include>com.squareup.okio:okio:[3.9.1]</include> <!-- {x-include-update;com.squareup.okio:okio;external_dependency} -->
<include>com.google.guava:guava:[33.1.0-jre]</include> <!-- {x-include-update;com.google.guava:guava;external_dependency} -->
</includes>
</bannedDependencies>
</rules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,8 @@ private void applyMetricFilters(TelemetryColumns columns, TelemetryType telemetr
List<DerivedMetricInfo> metricsConfig = currentConfig.fetchMetricConfigForTelemetryType(telemetryType);
for (DerivedMetricInfo derivedMetricInfo : metricsConfig) {
if (Filter.checkMetricFilters(derivedMetricInfo, columns)) {
synchronized (currentCounters.derivedMetrics) {
// TODO (harskaur): In future PR, track any error that comes from calculateProjection
currentCounters.derivedMetrics.calculateProjection(derivedMetricInfo, columns);
}
// TODO (harskaur): In future PR, track any error that comes from calculateProjection
currentCounters.derivedMetrics.calculateProjection(derivedMetricInfo, columns);
}
}
}
Expand Down Expand Up @@ -432,9 +430,8 @@ private FinalCounters(Counters currentCounters) {
synchronized (currentCounters.documentList) {
this.documentList.addAll(currentCounters.documentList);
}
synchronized (currentCounters.derivedMetrics) {
this.projections = currentCounters.derivedMetrics.fetchFinalDerivedMetricValues();
}
this.projections = currentCounters.derivedMetrics.fetchFinalDerivedMetricValues();

}

private long getPhysicalMemory(@Nullable MemoryMXBean memory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,24 @@

import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.AggregationType;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.DerivedMetricInfo;
import com.google.common.util.concurrent.AtomicDouble;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class DerivedMetricProjections {

public static final String COUNT = "Count()";
private final Map<String, DerivedMetricAggregation> derivedMetricValues = new HashMap<>();
private final Object lock = new Object();

public DerivedMetricProjections(Map<String, AggregationType> projectionInfo) {
for (Map.Entry<String, AggregationType> entry : projectionInfo.entrySet()) {
AggregationType aggregationType = entry.getValue();
DerivedMetricAggregation value;
if (aggregationType.equals(AggregationType.MIN)) {
value = new DerivedMetricAggregation(Long.MAX_VALUE, aggregationType);
value = new DerivedMetricAggregation(Double.MAX_VALUE, aggregationType);
} else if (aggregationType.equals(AggregationType.MAX)) {
value = new DerivedMetricAggregation(Long.MIN_VALUE, aggregationType);
value = new DerivedMetricAggregation(Double.MIN_VALUE, aggregationType);
} else if (aggregationType.equals(AggregationType.SUM) || aggregationType.equals(AggregationType.AVG)) {
value = new DerivedMetricAggregation(0, aggregationType);
} else {
Expand All @@ -36,17 +35,19 @@ public DerivedMetricProjections(Map<String, AggregationType> projectionInfo) {
// This is intended to be called once for every post request
public Map<String, Double> fetchFinalDerivedMetricValues() {
Map<String, Double> result = new HashMap<>();
for (Map.Entry<String, DerivedMetricAggregation> entry : derivedMetricValues.entrySet()) {
String id = entry.getKey();
DerivedMetricAggregation dma = entry.getValue();
double intermediateValue = dma.aggregation.get();
long count = dma.count.get();
if (count == 0) {
result.put(id, 0.0);
} else if (dma.aggregationType.equals(AggregationType.AVG)) {
result.put(id, intermediateValue / count);
} else {
result.put(id, intermediateValue);
synchronized (lock) {
for (Map.Entry<String, DerivedMetricAggregation> entry : derivedMetricValues.entrySet()) {
String id = entry.getKey();
DerivedMetricAggregation dma = entry.getValue();
double intermediateValue = dma.aggregation;
long count = dma.count;
if (count == 0) {
result.put(id, 0.0);
} else if (dma.aggregationType.equals(AggregationType.AVG)) {
result.put(id, intermediateValue / count);
} else {
result.put(id, intermediateValue);
}
}
}
return result;
Expand All @@ -61,7 +62,7 @@ public void calculateProjection(DerivedMetricInfo derivedMetricInfo, TelemetryCo
} else if (KnownRequestColumns.DURATION.equals(derivedMetricInfo.getProjection())) {
long duration = columns.getFieldValue(KnownRequestColumns.DURATION, Long.class);
// in case duration from telemetrycolumns doesn't parse correctly.
// also quickpulse expects duration derived metrics to be reported in ms.
// also quickpulse expects duration derived metrics to be reported in millis.
incrementBy = duration != -1 ? (double) duration / 1000.0 : Double.NaN;
} else if (derivedMetricInfo.getProjection().startsWith(Filter.CUSTOM_DIM_FIELDNAME_PREFIX)) {
String customDimKey
Expand All @@ -77,18 +78,20 @@ public void calculateProjection(DerivedMetricInfo derivedMetricInfo, TelemetryCo
}

private void calculateAggregation(AggregationType type, String id, double incrementBy) {
DerivedMetricAggregation dma = derivedMetricValues.get(id);
dma.count.getAndAdd(1);
if (type.equals(AggregationType.SUM) || type.equals(AggregationType.AVG)) {
dma.aggregation.getAndAdd(incrementBy);
} else if (type.equals(AggregationType.MIN)) {
dma.aggregation.getAndAccumulate(incrementBy, Math::min);
} else if (type.equals(AggregationType.MAX)) {
dma.aggregation.getAndAccumulate(incrementBy, Math::max);
synchronized (lock) {
DerivedMetricAggregation dma = derivedMetricValues.get(id);
dma.count++;
if (type.equals(AggregationType.SUM) || type.equals(AggregationType.AVG)) {
dma.aggregation += incrementBy;
} else if (type.equals(AggregationType.MIN)) {
dma.aggregation = Math.min(dma.aggregation, incrementBy);
} else if (type.equals(AggregationType.MAX)) {
dma.aggregation = Math.max(dma.aggregation, incrementBy);
}
}
}

static class DerivedMetricAggregation {
private static class DerivedMetricAggregation {
// This class represents the intermediate state of a derived metric value.
// It keeps track of the count and the aggregated value so that these two
// fields can be used to determine the final value of a derived metric
Expand All @@ -101,12 +104,12 @@ static class DerivedMetricAggregation {
// When metric values are retrieved by the data fetcher, the final value will
// be determined based on the count and the aggregation.

final AtomicDouble aggregation;
final AtomicLong count = new AtomicLong(0);
double aggregation;
long count = 0;
final AggregationType aggregationType;

DerivedMetricAggregation(long initValue, AggregationType type) {
aggregation = new AtomicDouble(initValue);
DerivedMetricAggregation(double initValue, AggregationType type) {
aggregation = initValue;
aggregationType = type;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
requires io.opentelemetry.sdk.trace;
requires io.opentelemetry.semconv;
requires io.opentelemetry.semconv.incubating;
requires com.google.common;

opens com.azure.monitor.opentelemetry.autoconfigure.implementation.models to com.azure.core;
opens com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models to com.azure.core;
Expand Down
Loading