Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4aa78d3
adding projection functionality, rethink automic double
harsimar Dec 10, 2024
bf15276
atomic double and some other changes
harsimar Dec 11, 2024
4ace19d
merge from master to get p3 changes
harsimar Dec 11, 2024
75718c7
pr comments and some build stuff
harsimar Dec 11, 2024
27d4979
changing guava version
harsimar Dec 11, 2024
619bb47
spotbug fix
harsimar Dec 11, 2024
915d579
starting to add tests
harsimar Dec 12, 2024
9c8fa37
added unit tests for derived metric projection
harsimar Dec 13, 2024
e3377d7
fix conflict from main
harsimar Dec 13, 2024
c599211
reorganize tests
harsimar Dec 13, 2024
a6ab936
fixing inconsistency with main re okio
harsimar Dec 13, 2024
494a1b9
pr comments & small refactorings
harsimar Dec 16, 2024
1351d2b
remove logging, spotless
harsimar Dec 16, 2024
a77adfe
validator
harsimar Dec 17, 2024
68b9c83
minor
harsimar Dec 17, 2024
d80053e
changes to concurrency handling
harsimar Dec 17, 2024
f83c116
moving synchronization to derivedMetricAggregation
harsimar Dec 17, 2024
d8b618f
moving derivedMetricAggregation to its own file
harsimar Dec 17, 2024
77f176a
merge projection into here
harsimar Dec 18, 2024
96e705f
validator round 1 - need to refactor for validator to store errors
harsimar Dec 18, 2024
6e81b55
finish implementation and starting to add tests
harsimar Dec 19, 2024
568f059
merge from main
harsimar Dec 19, 2024
de6bd93
spotless
harsimar Dec 20, 2024
f492237
remove unused classes
harsimar Dec 20, 2024
191783d
pr comments
harsimar Jan 6, 2025
c3c3c49
fix ci errors
harsimar Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
adding projection functionality, rethink automic double
  • Loading branch information
harsimar committed Dec 10, 2024
commit 4aa78d336ba7cbdd4fbcc344b788d9eac202beed
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,9 @@
import com.azure.monitor.opentelemetry.autoconfigure.implementation.models.TelemetryExceptionDetails;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.models.MessageData;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.models.ContextTagKeys;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.filtering.DependencyDataColumns;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.filtering.FilteringConfiguration;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.filtering.TelemetryColumns;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.filtering.ExceptionDataColumns;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.filtering.RequestDataColumns;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.RemoteDependency;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.Request;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.KeyValuePairString;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.DocumentIngress;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.filtering.*;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.*;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.Exception;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.FilterConjunctionGroupInfo;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.DerivedMetricInfo;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.utils.CpuPerformanceCounterCalculator;
import reactor.util.annotation.Nullable;

Expand All @@ -36,6 +27,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -75,7 +67,7 @@ synchronized void disable() {

synchronized void enable(Supplier<String> instrumentationKeySupplier) {
this.instrumentationKeySupplier = instrumentationKeySupplier;
counters.set(new Counters());
counters.set(new Counters(configuration.get().getValidProjectionInitInfo()));
}

synchronized void setQuickPulseStatus(QuickPulseStatus quickPulseStatus) {
Expand All @@ -89,7 +81,7 @@ synchronized QuickPulseStatus getQuickPulseStatus() {

@Nullable
synchronized FinalCounters getAndRestart() {
Counters currentCounters = counters.getAndSet(new Counters());
Counters currentCounters = counters.getAndSet(new Counters(configuration.get().getValidProjectionInitInfo()));
if (currentCounters != null) {
return new FinalCounters(currentCounters);
}
Expand Down Expand Up @@ -168,9 +160,7 @@ private boolean matchesDocumentFilters(TelemetryColumns columns, String telemetr
}

private void applyMetricFilters(TelemetryColumns columns, String telemetryType,
FilteringConfiguration currentConfig) {
// TODO (harskaur): In a future PR, use Filter class to check if columns match any filter
// TODO (harskaur): If columns matches a filter, then create/increment a derived metric
FilteringConfiguration currentConfig, Counters currentCounters) {
// TODO (harskaur): when this PR is merged, remove logging (it is for manual testing & making sure the build does not complain about useless methods)
List<DerivedMetricInfo> metricsConfig = currentConfig.fetchMetricConfigForTelemetryType(telemetryType);
try {
Expand All @@ -180,6 +170,13 @@ private void applyMetricFilters(TelemetryColumns columns, String telemetryType,
} catch (IOException e) {
logger.error(e.getMessage());
}
for (DerivedMetricInfo derivedMetricInfo : metricsConfig) {
if (Filter.checkMetricFilters(derivedMetricInfo, columns)) {
synchronized (currentCounters.derivedMetrics) {
currentCounters.derivedMetrics.calculateProjection(derivedMetricInfo, columns);
}
}
}
}

private void addDependency(RemoteDependencyData telemetry, int itemCount, FilteringConfiguration currentConfig) {
Expand All @@ -195,7 +192,7 @@ private void addDependency(RemoteDependencyData telemetry, int itemCount, Filter
}

DependencyDataColumns columns = new DependencyDataColumns(telemetry);
applyMetricFilters(columns, "Dependency", currentConfig);
applyMetricFilters(columns, "Dependency", currentConfig, counters);

if (matchesDocumentFilters(columns, "Dependency", currentConfig)) {
RemoteDependency dependencyDoc = new RemoteDependency();
Expand Down Expand Up @@ -223,7 +220,7 @@ private void addException(TelemetryExceptionData exceptionData, int itemCount,
counters.exceptions.addAndGet(itemCount);

ExceptionDataColumns columns = new ExceptionDataColumns(exceptionData);
applyMetricFilters(columns, "Exception", currentConfig);
applyMetricFilters(columns, "Exception", currentConfig, counters);

if (matchesDocumentFilters(columns, "Exception", currentConfig)) {
List<TelemetryExceptionDetails> exceptionList = exceptionData.getExceptions();
Expand Down Expand Up @@ -257,7 +254,7 @@ private void addRequest(RequestData requestTelemetry, int itemCount, String oper
}

RequestDataColumns columns = new RequestDataColumns(requestTelemetry);
applyMetricFilters(columns, "Request", currentConfig);
applyMetricFilters(columns, "Request", currentConfig, counters);

if (matchesDocumentFilters(columns, "Request", currentConfig)) {
Request requestDoc = new Request();
Expand Down Expand Up @@ -382,6 +379,8 @@ class FinalCounters {
final double processNormalizedCpuUsage;
final List<DocumentIngress> documentList = new ArrayList<>();

final Map<String, Double> projections;

private FinalCounters(Counters currentCounters) {

processPhysicalMemory = getPhysicalMemory(memory);
Expand All @@ -401,6 +400,9 @@ private FinalCounters(Counters currentCounters) {
synchronized (currentCounters.documentList) {
this.documentList.addAll(currentCounters.documentList);
}
synchronized (currentCounters.derivedMetrics) {
this.projections = currentCounters.derivedMetrics.fetchFinalDerivedMetricValues();
}
}

private long getPhysicalMemory(@Nullable MemoryMXBean memory) {
Expand Down Expand Up @@ -453,6 +455,12 @@ static class Counters {
final AtomicInteger unsuccessfulRdds = new AtomicInteger(0);
final List<DocumentIngress> documentList = new ArrayList<>();

final DerivedMetricProjections derivedMetrics;

Counters(Map<String, AggregationType> projectionInfo) {
derivedMetrics = new DerivedMetricProjections(projectionInfo);
}

static long encodeCountAndDuration(long count, long duration) {
if (count > MAX_COUNT || duration > MAX_DURATION) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ private static List<MetricPoint> addMetricsToMonitoringDataPoint(QuickPulseDataC
metrics.put("\\Processor(_Total)\\% Processor Time", counters.processNormalizedCpuUsage); // TODO: remove old cpu counter name when service side makes the UI change
metrics.put("\\% Process\\Processor Time Normalized", counters.processNormalizedCpuUsage);

metrics.putAll(counters.projections);

for (Map.Entry<String, Double> entry : metrics.entrySet()) {
MetricPoint point = new MetricPoint();
point.setName(entry.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,18 @@ public boolean matchesCustomDimFilter(FilterInfo filter, String trimmedFieldName
}
}

public double getCustomDimValueForProjection(String key) {
double result = Double.NaN;
if (customDimensions.containsKey(key)) {
String value = customDimensions.get(key);
try {
result = Double.valueOf(value);
} catch (NumberFormatException e) {
// TODO (harskaur): track this error in the error tracker, as this means a customer asked to project a dimension that did not have a numeric value
return result;
}
}
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,8 @@ public boolean checkAllCustomDims(FilterInfo filter, TelemetryColumns data) {
public boolean checkCustomDimFilter(FilterInfo filter, TelemetryColumns data, String trimmedFieldName) {
return customDims.matchesCustomDimFilter(filter, trimmedFieldName);
}

public double getCustomDimValueForProjection(String key) {
return customDims.getCustomDimValueForProjection(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.filtering;

import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.AggregationType;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.DerivedMetricInfo;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class DerivedMetricProjections {

public static final String COUNT = "Count()";
private final Map<String, DerivedMetricAggregation> derivedMetricValues = new HashMap<>();

public DerivedMetricProjections(Map<String, AggregationType> projectionInfo) {
for (Map.Entry<String, AggregationType> entry : projectionInfo.entrySet()) {
AggregationType aggregationType = entry.getValue();
DerivedMetricAggregation value;
if (aggregationType.equals(AggregationType.MIN)) {
value = new DerivedMetricAggregation(Long.MAX_VALUE, aggregationType);
} else if (aggregationType.equals(AggregationType.MAX)) {
value = new DerivedMetricAggregation(Long.MIN_VALUE, aggregationType);
} else if (aggregationType.equals(AggregationType.SUM) || aggregationType.equals(AggregationType.AVG)) {
value = new DerivedMetricAggregation(0, aggregationType);
} else {
value = null; // we should never hit this case - that means the UI gave us an invalid aggregation type
}
derivedMetricValues.put(entry.getKey(), value);
}
}

// This is intended to be called once for every post request
public Map<String, Double> fetchFinalDerivedMetricValues() {
Map<String, Double> result = new HashMap<>();
for (Map.Entry<String, DerivedMetricAggregation> entry : derivedMetricValues.entrySet()) {
String id = entry.getKey();
DerivedMetricAggregation dma = entry.getValue();
double intermediateValue = dma.aggregation.doubleValue();
double count = dma.count.doubleValue();
if (count == 0) {
result.put(id, 0.0);
} else {
if (dma.aggregationType.equals(AggregationType.AVG)) {
result.put(id, intermediateValue / count);
} else {
result.put(id, intermediateValue);
}
}
}
return result;
}

public void calculateProjection(DerivedMetricInfo derivedMetricInfo, TelemetryColumns columns) {
double incrementBy = Double.NaN;
if (COUNT.equals(derivedMetricInfo.getProjection())) {
incrementBy = 1.0;
} else if (KnownRequestColumns.DURATION.equals(derivedMetricInfo.getProjection())) {
if (columns instanceof RequestDataColumns || columns instanceof DependencyDataColumns) {
long duration = columns.getFieldValue(KnownRequestColumns.DURATION, Long.class);
// in case duration from telemetrycolumns doesn't parse correctly.
incrementBy = duration != -1 ? (double) duration : Double.NaN;
}
// The UI doesn't allow for Trace/Exception metrics charts to selection a projection that is a Duration,
// so letting that case slip though.
} else if (derivedMetricInfo.getProjection().startsWith(Filter.CUSTOM_DIM_FIELDNAME_PREFIX)) {
String customDimKey = derivedMetricInfo.getProjection().substring(Filter.CUSTOM_DIM_FIELDNAME_PREFIX.length());
incrementBy = columns.getCustomDimValueForProjection(customDimKey);
// It is possible for the custom dim value to not parse to a double, or for the custom dim key to not be present.
// For now, such cases produce Double.Nan and get skipped when calculating projection.
// TODO (harskaur): For future PR, the error tracker should track the errors mentioned in lines above.
}

if (incrementBy != Double.NaN) {
calculateAggregation(derivedMetricInfo.getAggregation(), derivedMetricInfo.getId(), incrementBy);
}
}

private void calculateAggregation(AggregationType type, String id, double incrementBy) {
DerivedMetricAggregation dma = derivedMetricValues.get(id);
dma.count.getAndAdd(1);
// TODO (harskaur): Use atomic double?? Long will turn out inaccurate with custom dim projections
if (type.equals(AggregationType.SUM) || type.equals(AggregationType.AVG)) {
dma.aggregation.getAndAdd((long) incrementBy);
} else if (type.equals(AggregationType.MIN)) {
dma.aggregation.getAndAccumulate((long) incrementBy , Math::min);
} else if (type.equals(AggregationType.MAX)) {
dma.aggregation.getAndAccumulate((long) incrementBy , Math::max);
}
}

static class DerivedMetricAggregation {
// This class represents the intermediate state of a derived metric value.
// It keeps track of the count and the aggregated value so that these two
// fields can be used to determine the final value of a derived metric
// when the data fetcher asks for it.

// Depending on the aggregationType, aggregation holds different values.
// For min, it is the current minimum value
// For max, it is the current max value
// For sum & avg, this represents the current sum.
// When metric values are retrieved by the data fetcher, the final value will
// be determined based on the count and the aggregation.

// TODO (harskaur): Use atomic double?? Long will turn out inaccurate with custom dim projections
AtomicLong aggregation;
AtomicLong count = new AtomicLong(0);
AggregationType aggregationType;
DerivedMetricAggregation(long initValue, AggregationType type) {
aggregation = new AtomicLong(initValue);
aggregationType = type;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,8 @@ public List<String> getAllFieldValuesAsString() {
}
return result;
}

public double getCustomDimValueForProjection(String key) {
return customDims.getCustomDimValueForProjection(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.CollectionConfigurationInfo;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.DocumentStreamInfo;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.DocumentFilterConjunctionGroupInfo;
import com.azure.monitor.opentelemetry.autoconfigure.implementation.quickpulse.swagger.models.AggregationType;


import java.util.Set;
import java.util.List;
Expand All @@ -26,16 +28,20 @@ public class FilteringConfiguration {

private final String etag;

private Map<String, AggregationType> validProjectionInfo;

public FilteringConfiguration() {
validDerivedMetricInfos = new HashMap<>();
validDocumentFilterConjunctionGroupInfos = new HashMap<>();
etag = "";
validProjectionInfo = new HashMap<>();
}

public FilteringConfiguration(CollectionConfigurationInfo configuration) {
validDerivedMetricInfos = parseMetricFilterConfiguration(configuration);
validDocumentFilterConjunctionGroupInfos = parseDocumentFilterConfiguration(configuration);
etag = configuration.getETag();
validProjectionInfo = initValidProjectionInfo();
}

public List<DerivedMetricInfo> fetchMetricConfigForTelemetryType(String telemetryType) {
Expand All @@ -59,6 +65,10 @@ public String getETag() {
return etag;
}

public Map<String, AggregationType> getValidProjectionInitInfo() {
return new HashMap<>(validProjectionInfo);
}

private Map<String, Map<String, List<FilterConjunctionGroupInfo>>>
parseDocumentFilterConfiguration(CollectionConfigurationInfo configuration) {
Map<String, Map<String, List<FilterConjunctionGroupInfo>>> result = new HashMap<>();
Expand Down Expand Up @@ -112,4 +122,14 @@ public String getETag() {
return result;
}

private Map<String, AggregationType> initValidProjectionInfo() {
Map<String, AggregationType> result = new HashMap<>();
for (List<DerivedMetricInfo> derivedMetricInfoList : validDerivedMetricInfos.values()) {
for (DerivedMetricInfo derivedMetricInfo : derivedMetricInfoList) {
result.put(derivedMetricInfo.getId(), derivedMetricInfo.getAggregation());
}
}
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@ public boolean checkCustomDimFilter(FilterInfo filter, TelemetryColumns data, St
return customDims.matchesCustomDimFilter(filter, trimmedFieldName);
}

public double getCustomDimValueForProjection(String key) {
return customDims.getCustomDimValueForProjection(key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ public interface TelemetryColumns {

boolean checkCustomDimFilter(FilterInfo filter, TelemetryColumns data, String trimmedFieldName);

double getCustomDimValueForProjection(String key);

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ public List<String> getAllFieldValuesAsString() {
result.add((String) mapping.get(KnownTraceColumns.MESSAGE));
return result;
}

public double getCustomDimValueForProjection(String key) {
return customDims.getCustomDimValueForProjection(key);
}
}