Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion incubator/catalog-schema-registry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.90</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.85</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

accept "zilla://streams/app0"
option zilla:window 8192

accepted
connected

accept "http://localhost:4318/v1/logs"
accepted
connected

read http:method "POST"
read http:version "HTTP/1.1"
read http:header "Host" "localhost:4318"
read
'{'
'"resourceLogs":['
'{'
'"resource":{'
'"attributes":['
'{'
'"key":"service.namespace",'
'"value":{'
'"stringValue":"example"'
'}'
'}'
']'
'},'
'"scopeLogs":['
'{'
'"scope":{'
'"name":"OtlpLogsSerializer",'
'"version":"1.0.0"'
'},'
Comment thread
attilakreiner marked this conversation as resolved.
'"logRecords":['
'{'
'"timeUnixNano":42000000,'
'"observedTimeUnixNano":42000000,'
'"traceId":"8000000000000001",'
'"spanId":"100000004",'
Comment thread
attilakreiner marked this conversation as resolved.
Outdated
'"body":{'
'"stringValue":"body"'
'},'
'"attributes":['
'{'
'"key":"event.name",'
'"value":{'
'"stringValue":"test.test"'
'}'
'},'
'{'
'"key":"extension",'
'"value":{'
'"stringValue":"test event message"'
'}'
'}'
']'
'}'
']'
'}'
']'
'}'
']'
'}'
read closed

write http:status "200" "OK"
write close
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

---
name: test
telemetry:
attributes:
service.namespace: example
exporters:
test0:
type: otlp
options:
interval: 1
signals:
- logs
endpoint:
location: http://localhost:4318
bindings:
net0:
type: test
kind: server
options:
events:
- timestamp: 42
message: test event message
exit: app0
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ telemetry:
type: otlp
options:
interval: 1
signals:
- metrics
endpoint:
location: http://localhost:4318
bindings:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

connect "zilla://streams/net0"
option zilla:window 8192

connected
Comment thread
attilakreiner marked this conversation as resolved.
Outdated
3 changes: 2 additions & 1 deletion incubator/exporter-otlp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@
</artifactItem>
</artifactItems>
<includes>
io/aklivity/zilla/specs/exporter/otlp/application/*,
io/aklivity/zilla/specs/exporter/otlp/application/**/*,
io/aklivity/zilla/specs/exporter/otlp/network/**/*,
io/aklivity/zilla/specs/exporter/otlp/config/*
</includes>
<outputDirectory>${project.build.directory}/test-classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class OtlpOptionsConfig extends OptionsConfig
public enum OtlpSignalsConfig
{
METRICS,
LOGS,
}

public long interval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
public class OtlpOverridesConfig
Comment thread
attilakreiner marked this conversation as resolved.
{
public URI metrics;
public URI logs;

public OtlpOverridesConfig(
URI metrics)
URI metrics,
URI logs)
{
this.metrics = metrics;
this.logs = logs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.exporter.otlp.internal;

import static io.aklivity.zilla.runtime.exporter.otlp.config.OtlpOptionsConfig.OtlpSignalsConfig.LOGS;
import static io.aklivity.zilla.runtime.exporter.otlp.config.OtlpOptionsConfig.OtlpSignalsConfig.METRICS;

import java.net.HttpURLConnection;
Expand All @@ -36,6 +37,8 @@
import io.aklivity.zilla.runtime.engine.metrics.reader.MetricsReader;
import io.aklivity.zilla.runtime.exporter.otlp.config.OtlpOptionsConfig;
import io.aklivity.zilla.runtime.exporter.otlp.internal.config.OtlpExporterConfig;
import io.aklivity.zilla.runtime.exporter.otlp.internal.serializer.EventReader;
import io.aklivity.zilla.runtime.exporter.otlp.internal.serializer.OtlpLogsSerializer;
import io.aklivity.zilla.runtime.exporter.otlp.internal.serializer.OtlpMetricsSerializer;

public class OltpExporterHandler implements ExporterHandler
Expand All @@ -49,18 +52,21 @@ public class OltpExporterHandler implements ExporterHandler
private final Set<OtlpOptionsConfig.OtlpSignalsConfig> signals;
private final String protocol;
private final URI metricsEndpoint;
private final URI logsEndpoint;
private final long interval;
private final Collector collector;
private final LongFunction<KindConfig> resolveKind;
private final List<AttributeConfig> attributes;
private final HttpClient httpClient;
private final Consumer<HttpResponse<String>> responseHandler;

private OtlpMetricsSerializer serializer;
private OtlpMetricsSerializer metricsSerializer;
private OtlpLogsSerializer logsSerializer;
private long nextAttempt;
private long lastSuccess;
private boolean warningLogged;
private CompletableFuture<HttpResponse<String>> response;
private CompletableFuture<HttpResponse<String>> metricsResponse;
private CompletableFuture<HttpResponse<String>> logsResponse;

public OltpExporterHandler(
OltpConfiguration config,
Expand All @@ -75,6 +81,7 @@ public OltpExporterHandler(
this.warningInterval = config.warningInterval().toMillis();
this.context = context;
this.metricsEndpoint = exporter.resolveMetrics();
this.logsEndpoint = exporter.resolveLogs();
this.signals = exporter.resolveSignals();
this.protocol = exporter.resolveProtocol();
this.interval = exporter.resolveInterval();
Expand All @@ -88,11 +95,12 @@ public OltpExporterHandler(
@Override
public void start()
{
assert signals.contains(METRICS);
assert HTTP.equals(protocol);

MetricsReader metrics = new MetricsReader(collector, context::supplyLocalName);
serializer = new OtlpMetricsSerializer(metrics.records(), attributes, context::resolveMetric, resolveKind);
metricsSerializer = new OtlpMetricsSerializer(metrics.records(), attributes, context::resolveMetric, resolveKind);
EventReader eventReader = new EventReader(context);
logsSerializer = new OtlpLogsSerializer(attributes, eventReader);
lastSuccess = System.currentTimeMillis();
nextAttempt = lastSuccess + interval;
}
Expand All @@ -104,17 +112,30 @@ public int export()
long now = System.currentTimeMillis();
if (now >= nextAttempt)
{
if (response == null || response.isDone())
if (signals.contains(METRICS) && (metricsResponse == null || metricsResponse.isDone()))
{
String json = serializer.serializeAll();
HttpRequest request = HttpRequest.newBuilder()
String metricsJson = metricsSerializer.serializeAll();
HttpRequest metricsRequest = HttpRequest.newBuilder()
.uri(metricsEndpoint)
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(json))
.POST(HttpRequest.BodyPublishers.ofString(metricsJson))
.timeout(timeoutInterval)
.build();
response = httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString());
response.thenAccept(responseHandler);
metricsResponse = httpClient.sendAsync(metricsRequest, HttpResponse.BodyHandlers.ofString());
metricsResponse.thenAccept(responseHandler);
nextAttempt = now + retryInterval;
}
if (signals.contains(LOGS) && (logsResponse == null || logsResponse.isDone()))
{
String logsJson = logsSerializer.serializeAll();
HttpRequest logsRequest = HttpRequest.newBuilder()
.uri(logsEndpoint)
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(logsJson))
.timeout(timeoutInterval)
.build();
logsResponse = httpClient.sendAsync(logsRequest, HttpResponse.BodyHandlers.ofString());
logsResponse.thenAccept(responseHandler);
Comment thread
attilakreiner marked this conversation as resolved.
Outdated
nextAttempt = now + retryInterval;
}
if (!warningLogged && now - lastSuccess > warningInterval)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.exporter.otlp.internal.config;

import static io.aklivity.zilla.runtime.exporter.otlp.config.OtlpOptionsConfig.OtlpSignalsConfig.LOGS;
import static io.aklivity.zilla.runtime.exporter.otlp.config.OtlpOptionsConfig.OtlpSignalsConfig.METRICS;

import java.net.URI;
Expand All @@ -26,7 +27,8 @@
public class OtlpExporterConfig
{
private static final String DEFAULT_METRICS_PATH = "/v1/metrics";
private static final Set<OtlpOptionsConfig.OtlpSignalsConfig> DEFAULT_SIGNALS = Set.of(METRICS);
private static final String DEFAULT_LOGS_PATH = "/v1/logs";
private static final Set<OtlpOptionsConfig.OtlpSignalsConfig> DEFAULT_SIGNALS = Set.of(METRICS, LOGS);
private static final long DEFAULT_INTERVAL = Duration.ofSeconds(30).toMillis();

private final OtlpOptionsConfig options;
Expand Down Expand Up @@ -56,6 +58,25 @@ public URI resolveMetrics()
return result;
}

public URI resolveLogs()
{
assert options != null;
assert options.endpoint != null;
assert options.endpoint.location != null;

URI result;
URI location = options.endpoint.location;
if (options.endpoint.overrides != null && options.endpoint.overrides.logs != null)
{
result = location.resolve(options.endpoint.overrides.logs);
}
else
{
result = location.resolve(DEFAULT_LOGS_PATH);
}
return result;
}

public Set<OtlpOptionsConfig.OtlpSignalsConfig> resolveSignals()
{
assert options != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public class OtlpOverridesAdapter implements JsonbAdapter<OtlpOverridesConfig, JsonObject>
{
private static final String METRICS_NAME = "metrics";
private static final String LOGS_NAME = "logs";

@Override
public JsonObject adaptToJson(
Expand All @@ -36,6 +37,10 @@ public JsonObject adaptToJson(
{
object.add(METRICS_NAME, overrides.metrics.toString());
}
if (overrides.logs != null)
{
object.add(LOGS_NAME, overrides.logs.toString());
}
return object.build();
}

Expand All @@ -46,6 +51,9 @@ public OtlpOverridesConfig adaptFromJson(
URI metrics = object.containsKey(METRICS_NAME)
? URI.create(object.getString(METRICS_NAME))
: null;
return new OtlpOverridesConfig(metrics);
URI logs = object.containsKey(LOGS_NAME)
? URI.create(object.getString(LOGS_NAME))
: null;
return new OtlpOverridesConfig(metrics, logs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package io.aklivity.zilla.runtime.exporter.otlp.internal.config;

import static io.aklivity.zilla.runtime.exporter.otlp.config.OtlpOptionsConfig.OtlpSignalsConfig.LOGS;
import static io.aklivity.zilla.runtime.exporter.otlp.config.OtlpOptionsConfig.OtlpSignalsConfig.METRICS;

import java.util.Set;
Expand All @@ -29,6 +30,7 @@
public class OtlpSignalsAdapter implements JsonbAdapter<Set<OtlpOptionsConfig.OtlpSignalsConfig>, JsonArray>
{
private static final String METRICS_NAME = "metrics";
private static final String LOGS_NAME = "logs";

@Override
public JsonArray adaptToJson(
Expand All @@ -48,6 +50,10 @@ public Set<OtlpOptionsConfig.OtlpSignalsConfig> adaptFromJson(
{
signals.add(METRICS);
}
if (array.contains(Json.createValue(LOGS_NAME)))
{
signals.add(LOGS);
}
return signals;
}
}
Loading