Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP k3po
  • Loading branch information
attilakreiner committed Mar 11, 2024
commit 39c2286aeb09bf10080414ec89eaad29562a55a7
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

accept "zilla://streams/app0"
option zilla:window 8192

accepted
connected

accept "http://localhost:4318/v1/logs"
accepted
connected

read http:method "POST"
read http:version "HTTP/1.1"
read http:header "Host" "localhost:4318"
read
'{'
'"resourceLogs":['
'{'
'"resource":{'
'"attributes":['
'{'
'"key":"service.namespace",'
'"value":{'
'"stringValue":"example"'
'}'
'}'
']'
'},'
'"scopeLogs":['
'{'
'"scope":{'
'"name":"OtlpLogsSerializer",'
'"version":"1.0.0"'
'},'
Comment thread
attilakreiner marked this conversation as resolved.
'"logRecords":['
'{'
'"timeUnixNano":42000000,'
'"observedTimeUnixNano":42000000,'
'"traceId":"8000000000000001",'
'"spanId":"100000004",'
Comment thread
attilakreiner marked this conversation as resolved.
Outdated
'"body":{'
'"stringValue":"body"'
'},'
'"attributes":['
'{'
'"key":"event.name",'
'"value":{'
'"stringValue":"test.test"'
'}'
'},'
'{'
'"key":"extension",'
'"value":{'
'"stringValue":"test event message"'
'}'
'}'
']'
'}'
']'
'}'
']'
'}'
']'
'}'
read closed

write http:status "200" "OK"
write close
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

---
name: test
telemetry:
attributes:
service.namespace: example
exporters:
test0:
type: otlp
options:
interval: 1
signals:
- logs
endpoint:
location: http://localhost:4318
bindings:
net0:
type: test
kind: server
options:
events:
- timestamp: 42
message: test event message
exit: app0
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ telemetry:
type: otlp
options:
interval: 1
signals:
- metrics
endpoint:
location: http://localhost:4318
bindings:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

connect "zilla://streams/net0"
option zilla:window 8192

connected
Comment thread
attilakreiner marked this conversation as resolved.
Outdated
3 changes: 2 additions & 1 deletion incubator/exporter-otlp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@
</artifactItem>
</artifactItems>
<includes>
io/aklivity/zilla/specs/exporter/otlp/application/*,
io/aklivity/zilla/specs/exporter/otlp/application/**/*,
io/aklivity/zilla/specs/exporter/otlp/network/**/*,
io/aklivity/zilla/specs/exporter/otlp/config/*
</includes>
<outputDirectory>${project.build.directory}/test-classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.aklivity.zilla.runtime.engine.metrics.reader.MetricsReader;
import io.aklivity.zilla.runtime.exporter.otlp.config.OtlpOptionsConfig;
import io.aklivity.zilla.runtime.exporter.otlp.internal.config.OtlpExporterConfig;
import io.aklivity.zilla.runtime.exporter.otlp.internal.serializer.EventReader;
import io.aklivity.zilla.runtime.exporter.otlp.internal.serializer.OtlpLogsSerializer;
import io.aklivity.zilla.runtime.exporter.otlp.internal.serializer.OtlpMetricsSerializer;

Expand Down Expand Up @@ -98,7 +99,8 @@ public void start()

MetricsReader metrics = new MetricsReader(collector, context::supplyLocalName);
metricsSerializer = new OtlpMetricsSerializer(metrics.records(), attributes, context::resolveMetric, resolveKind);
logsSerializer = new OtlpLogsSerializer(attributes);
EventReader eventReader = new EventReader(context);
logsSerializer = new OtlpLogsSerializer(attributes, eventReader);
lastSuccess = System.currentTimeMillis();
nextAttempt = lastSuccess + interval;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.exporter.otlp.internal.serializer;

import java.io.StringReader;
import java.util.concurrent.TimeUnit;

import jakarta.json.Json;
import jakarta.json.JsonArray;
import jakarta.json.JsonArrayBuilder;
import jakarta.json.JsonObjectBuilder;
import jakarta.json.JsonReader;

import org.agrona.DirectBuffer;

import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.function.MessageReader;
import io.aklivity.zilla.runtime.engine.event.EventFormatter;
import io.aklivity.zilla.runtime.exporter.otlp.internal.types.event.EventFW;

public class EventReader
{
private static final int MESSAGE_COUNT_LIMIT = 100;
private static final String TIME_UNIX_NANO = "timeUnixNano";
private static final String OBSERVED_TIME_UNIX_NANO = "observedTimeUnixNano";
private static final String TRACE_ID = "traceId";
private static final String SPAN_ID = "spanId";
private static final String BODY = "body";
private static final String ATTRIBUTES = "attributes";
private static final String BODY_FORMAT = "{\"stringValue\": \"%s\"}";
private static final String STRING_ATTRIBUTE_FORMAT = "{\"key\":\"%s\", \"value\":{\"stringValue\": \"%s\"}}";
private static final String INT_ATTRIBUTE_FORMAT = "{\"key\":\"%s\", \"value\":{\"intValue\": \"%d\"}}";

private final EngineContext context;
private final MessageReader readEvent;
private final EventFormatter formatter;
private final EventFW eventRO = new EventFW();

private JsonArrayBuilder eventsJson;
private JsonObjectBuilder eventJson;
private JsonArrayBuilder eventAttributesJson;

public EventReader(
EngineContext context)
{
this.context = context;
this.readEvent = context.supplyEventReader();
this.formatter = context.supplyEventFormatter();
}

public JsonArray readEvents()
{
eventsJson = Json.createArrayBuilder();
readEvent.read(this::handleEvent, MESSAGE_COUNT_LIMIT);
return eventsJson.build();
}

private void handleEvent(
int msgTypeId,
DirectBuffer buffer,
int index,
int length)
{
final EventFW event = eventRO.wrap(buffer, index, index + length);
String qname = context.supplyQName(event.namespacedId());
eventJson = Json.createObjectBuilder();
long nanos = TimeUnit.MILLISECONDS.toNanos(event.timestamp());
eventJson.add(TIME_UNIX_NANO, nanos);
eventJson.add(OBSERVED_TIME_UNIX_NANO, nanos);
eventJson.add(TRACE_ID, String.format("%x", event.traceId()));
eventJson.add(SPAN_ID, String.format("%x", event.namespacedId()));
addBody("body");
eventAttributesJson = Json.createArrayBuilder();
addStringAttribute("event.name", "test.test");
String extension = formatter.format(msgTypeId, buffer, index, length);
addStringAttribute("extension", extension);
//addIntAttribute("id", 42);
eventJson.add(ATTRIBUTES, eventAttributesJson);
eventsJson.add(eventJson);
System.out.println(eventJson);
System.out.printf("%s 0x%016x [%d]\n", qname, event.traceId(), event.timestamp());
Comment thread
attilakreiner marked this conversation as resolved.
Outdated
}

private void addBody(
String body)
{
String json = String.format(BODY_FORMAT, body);
JsonReader reader = Json.createReader(new StringReader(json));
eventJson.add(BODY, reader.readObject());
}

private void addStringAttribute(
String key,
String value)
{
String json = String.format(STRING_ATTRIBUTE_FORMAT, key, value);
JsonReader reader = Json.createReader(new StringReader(json));
eventAttributesJson.add(reader.readObject());
}

private void addIntAttribute(
String key,
int value)
{
String json = String.format(INT_ATTRIBUTE_FORMAT, key, value);
JsonReader reader = Json.createReader(new StringReader(json));
eventAttributesJson.add(reader.readObject());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
*/
package io.aklivity.zilla.runtime.exporter.otlp.internal.serializer;

import java.io.StringReader;
import java.util.List;

import jakarta.json.Json;
import jakarta.json.JsonArray;
import jakarta.json.JsonArrayBuilder;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;

import io.aklivity.zilla.runtime.engine.config.AttributeConfig;

Expand All @@ -31,49 +29,21 @@ public class OtlpLogsSerializer
private static final String SCOPE_VERSION = "1.0.0";

private final List<AttributeConfig> attributes;
private final EventReader eventReader;

public OtlpLogsSerializer(
List<AttributeConfig> attributes)
List<AttributeConfig> attributes,
EventReader eventReader)
{
this.attributes = attributes;
this.eventReader = eventReader;
}

public String serializeAll()
{
JsonArrayBuilder attributesArray = Json.createArrayBuilder();
attributes.forEach(attr -> attributesArray.add(attributeToJson(attr)));
//JsonArrayBuilder logsArray = Json.createArrayBuilder();
String logRecords =
"[\n" +
" {\n" +
" \"timeUnixNano\": \"1544712660300000000\",\n" +
" \"observedTimeUnixNano\": \"1544712660300000000\",\n" +
" \"severityNumber\": 10,\n" +
" \"severityText\": \"Information\",\n" +
" \"traceId\": \"5B8EFFF798038103D269B633813FC60C\",\n" +
" \"spanId\": \"EEE19B7EC3C1B174\",\n" +
" \"body\": {\n" +
" \"stringValue\": \"1st Example log record\"\n" +
" },\n" +
" \"attributes\": [\n" +
" {\n" +
" \"key\": \"string.attribute\",\n" +
" \"value\": {\n" +
" \"stringValue\": \"jo napot\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"key\": \"int.attribute\",\n" +
" \"value\": {\n" +
" \"intValue\": \"77\"\n" +
" }\n" +
" }\n" +
" ]\n" +
" }\n" +
"]";
JsonReader reader = Json.createReader(new StringReader(logRecords));
JsonArray logsArray = reader.readArray();
// TODO: Ati - serialize logs to logsArray
JsonArray logsArray = eventReader.readEvents();
return createJson(attributesArray, logsArray);
}

Expand All @@ -91,7 +61,6 @@ private JsonObject attributeToJson(

private String createJson(
JsonArrayBuilder attributes,
//JsonArrayBuilder logsArray)
JsonArray logsArray)
{
JsonObject resource = Json.createObjectBuilder()
Expand Down
Loading