Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP event id
  • Loading branch information
attilakreiner committed Mar 13, 2024
commit 059d9e1484a47b80ff1d67adbba6ba94a7945862
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ public class SchemaRegistryEventContext
private final EventFW.Builder eventRW = new EventFW.Builder();
private final SchemaRegistryEventExFW.Builder schemaRegistryEventExRW = new SchemaRegistryEventExFW.Builder();
private final int schemaRegistryTypeId;
private final int remoteAccessRejectedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

public SchemaRegistryEventContext(
EngineContext context)
{
this.schemaRegistryTypeId = context.supplyTypeId(SchemaRegistryCatalog.NAME);
this.remoteAccessRejectedEventId = context.supplyEventId("catalog.schema.registry.remote.access.rejected");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand All @@ -64,6 +66,7 @@ public void remoteAccessRejected(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(remoteAccessRejectedEventId)
.timestamp(clock.millis())
.traceId(0L)
.namespacedId(catalogId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ public class HttpEventContext
private final EventFW.Builder eventRW = new EventFW.Builder();
private final HttpEventExFW.Builder httpEventExRW = new HttpEventExFW.Builder();
private final int httpTypeId;
private final int requestAcceptedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

public HttpEventContext(
EngineContext context)
{
this.httpTypeId = context.supplyTypeId(HttpBinding.NAME);
this.requestAcceptedEventId = context.supplyEventId("binding.http.request.accepted");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand All @@ -78,6 +80,7 @@ public void requestAccepted(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(requestAcceptedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
Expand Down Expand Up @@ -107,6 +110,7 @@ public void requestAccepted(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(requestAcceptedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ public class KafkaEventContext
private final EventFW.Builder eventRW = new EventFW.Builder();
private final KafkaEventExFW.Builder kafkaEventExRW = new KafkaEventExFW.Builder();
private final int kafkaTypeId;
private final int authorizationFailedEventId;
private final int apiVersionRejectedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

public KafkaEventContext(
EngineContext context)
{
this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME);
this.authorizationFailedEventId = context.supplyEventId("binding.kafka.authorization.failed");
this.apiVersionRejectedEventId = context.supplyEventId("binding.kafka.api.version.rejected");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand All @@ -63,6 +67,7 @@ public void authorizationFailed(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(authorizationFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
Expand All @@ -87,6 +92,7 @@ public void apiVersionRejected(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(apiVersionRejectedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,20 @@ public class TcpEventContext
private final TcpEventExFW.Builder tcpEventExRW = new TcpEventExFW.Builder();

private final int tcpTypeId;
private final int dnsFailedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

public TcpEventContext(
EngineContext context)
{
this.tcpTypeId = context.supplyTypeId(TcpBinding.NAME);
this.dnsFailedEventId = context.supplyEventId("binding.tcp.dns.failed");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}

public void dnsResolutionFailed(
public void dnsFailed(
long traceId,
long bindingId,
String address)
Expand All @@ -63,6 +65,7 @@ public void dnsResolutionFailed(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(dnsFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ else if (binding.routes == TcpBindingConfig.DEFAULT_CLIENT_ROUTES)
}
catch (TcpDnsFailedException ex)
{
event.dnsResolutionFailed(traceId, binding.id, ex.hostname);
event.dnsFailed(traceId, binding.id, ex.hostname);
}
return resolved;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,23 @@ public class TlsEventContext
private final EventFW.Builder eventRW = new EventFW.Builder();
private final TlsEventExFW.Builder tlsEventExRW = new TlsEventExFW.Builder();
private final int tlsTypeId;
private final int tlsFailedEventId;
private final int tlsProtocolRejectedEventId;
private final int tlsKeyRejectedEventId;
private final int tlsPeerNotVerifiedEventId;
private final int tlsHandshakeFailedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

public TlsEventContext(
EngineContext context)
{
this.tlsTypeId = context.supplyTypeId(TlsBinding.NAME);
this.tlsFailedEventId = context.supplyEventId("binding.tls.tls.failed");
this.tlsProtocolRejectedEventId = context.supplyEventId("binding.tls.protocol.rejected");
this.tlsKeyRejectedEventId = context.supplyEventId("binding.tls.key.rejected");
this.tlsPeerNotVerifiedEventId = context.supplyEventId("binding.tls.peer.not.verified");
this.tlsHandshakeFailedEventId = context.supplyEventId("binding.tls.handshake.failed");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand All @@ -64,6 +74,7 @@ public void tlsFailed(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(tlsFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
Expand All @@ -84,6 +95,7 @@ public void tlsProtocolRejected(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(tlsProtocolRejectedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
Expand All @@ -104,6 +116,7 @@ public void tlsKeyRejected(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(tlsKeyRejectedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
Expand All @@ -124,6 +137,7 @@ public void tlsPeerNotVerified(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(tlsPeerNotVerifiedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
Expand All @@ -144,6 +158,7 @@ public void tlsHandshakeFailed(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(tlsHandshakeFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ String supplyLocalName(
String supplyQName(
long namespacedId);

int supplyEventId(
String name);

BindingHandler streamFactory();

GuardHandler supplyGuard(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,13 @@ public String supplyQName(
labels.lookupLabel(NamespacedId.localId(namespacedId)));
}

@Override
public int supplyEventId(
String name)
{
return labels.supplyLabelId(name);
}

@Override
public int supplyTypeId(
String name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ public class TestEventContext
private final AtomicBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final EventFW.Builder eventRW = new EventFW.Builder();
private final int testTypeId;
private final int connectedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

public TestEventContext(
EngineContext context)
{
this.testTypeId = context.supplyTypeId(TestBinding.NAME);
this.connectedEventId = context.supplyEventId("binding.test.connected");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand All @@ -54,6 +56,7 @@ public void connected(
String8FW extension = new String8FW(message);
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(connectedEventId)
.timestamp(timestamp)
.traceId(traceId)
.namespacedId(bindingId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ public class JwtEventContext
private final EventFW.Builder eventRW = new EventFW.Builder();
private final JwtEventExFW.Builder jwtEventExRW = new JwtEventExFW.Builder();
private final int jwtTypeId;
private final int authorizationFailedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

public JwtEventContext(
EngineContext context)
{
this.jwtTypeId = context.supplyTypeId(JwtGuard.NAME);
this.authorizationFailedEventId = context.supplyEventId("guard.jwt.authorization.failed");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand All @@ -61,6 +63,7 @@ public void authorizationFailed(
.build();
EventFW event = eventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.id(authorizationFailedEventId)
.timestamp(clock.millis())
.traceId(traceId)
.namespacedId(bindingId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ scope core
{
struct Event
{
int32 id;
int64 timestamp;
int64 traceId;
int64 namespacedId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ telemetry:
events:
- qname: test.net0
message: test event message
id: binding.test.connected
bindings:
net0:
type: test
Expand Down