Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Flow optimisation, module-info changes & bug fixes
  • Loading branch information
ankitk-me committed Jan 9, 2024
commit a842df3675add8354f16777794ad559d505169be
6 changes: 6 additions & 0 deletions cloud/docker-image/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,12 @@
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>validator-protobuf</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>

Expand Down
2 changes: 2 additions & 0 deletions cloud/docker-image/src/main/docker/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
<include>com/fasterxml/jackson/**</include>
<include>org/yaml/snakeyaml/**</include>
<include>org/junit/**</include>
<include>com/google/**</include>
<include>org/checkerframework/**</include>
</includes>
</fileSet>
</fileSets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"io.aklivity.zilla:validator-avro",
"io.aklivity.zilla:validator-core",
"io.aklivity.zilla:validator-json",
"io.aklivity.zilla:validator-protobuf",
"io.aklivity.zilla:vault-filesystem",
"org.slf4j:slf4j-simple",
"org.antlr:antlr4-runtime"
Expand Down
6 changes: 6 additions & 0 deletions incubator/command-generate/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>validator-protobuf</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>vault-filesystem</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
requires io.aklivity.zilla.runtime.validator.avro;
requires io.aklivity.zilla.runtime.validator.core;
requires io.aklivity.zilla.runtime.validator.json;
requires io.aklivity.zilla.runtime.validator.protobuf;

requires com.fasterxml.jackson.dataformat.yaml;
requires com.fasterxml.jackson.databind;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ bindings:
catalog0:
- subject: test0
version: latest
record: SimpleMessage
record: example
exit: test
2 changes: 1 addition & 1 deletion incubator/validator-protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.25.1</version>
<version>3.24.4</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_SCHEMA_ID;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.function.LongFunction;

import org.agrona.DirectBuffer;
Expand All @@ -35,13 +36,18 @@
public class ProtobufReadValidator extends ProtobufValidator implements ValueValidator, FragmentValidator
{
private final JsonFormat.Printer printer;
private final OutputStreamWriter output;

public ProtobufReadValidator(
ProtobufValidatorConfig config,
LongFunction<CatalogHandler> supplyCatalog)
{
super(config, supplyCatalog);
this.printer = JsonFormat.printer();
this.printer = JsonFormat.printer()
.omittingInsignificantWhitespace()
.preservingProtoFieldNames()
.includingDefaultValueFields();
this.output = new OutputStreamWriter(out);
}

@Override
Expand Down Expand Up @@ -135,32 +141,38 @@ private int validate(
ValueConsumer next)
{
int valLength = -1;
Descriptors.FileDescriptor fileDescriptor = supplyDescriptor(schemaId);
if (fileDescriptor != null)
DescriptorTree tree = supplyDescriptorTree(schemaId);
if (tree != null)
{
DescriptorTree tree = new DescriptorTree(fileDescriptor).findByIndexes(indexes);
tree = tree.findByIndexes(indexes);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps findByIndexes should return Descriptor directly?

if (tree != null)
{
Descriptors.Descriptor descriptor = tree.descriptor;
in.wrap(data, index, length);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
validate:
try
{
in.wrap(data, index, length);
DynamicMessage message = DynamicMessage.parseFrom(descriptor, in);
if (message.getUnknownFields().asMap().isEmpty())
builder.mergeFrom(in);
DynamicMessage message = builder.build();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
builder.mergeFrom(in);
DynamicMessage message = builder.build();
DynamicMessage message = builder.mergeFrom(in).build();

builder.clear();
if (!message.getUnknownFields().asMap().isEmpty())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This .asMap() still has allocation logic, but it may be the best we can do with these APIs.

{
if (FORMAT_JSON.equals(format))
{
String json = printer.print(message);
int jsonLength = json.length();
valueRO.wrap(json.getBytes());
next.accept(valueRO, 0, jsonLength);
valLength = jsonLength;
}
else
{
next.accept(data, index, length);
valLength = length;
}
break validate;
}

if (FORMAT_JSON.equals(format))
{
out.wrap(out.buffer());
printer.appendTo(message, output);
output.flush();
valLength = out.position();
next.accept(out.buffer(), 0, valLength);
}
else
{
next.accept(data, index, length);
valLength = length;
}
}
catch (IOException e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectCache;
import org.agrona.collections.Object2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.io.DirectBufferInputStream;
import org.agrona.io.ExpandableDirectBufferOutputStream;
import org.antlr.v4.runtime.BailErrorStrategy;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CharStreams;
Expand Down Expand Up @@ -57,9 +58,10 @@ public class ProtobufValidator
protected final String format;
protected final List<Integer> indexes;
protected final DirectBufferInputStream in;
protected final DirectBuffer valueRO;
protected final ExpandableDirectBufferOutputStream out;

private final Int2ObjectCache<FileDescriptor> descriptors;
private final Int2ObjectCache<DescriptorTree> tree;
private final Object2ObjectHashMap<String, DynamicMessage.Builder> builders;
private final FileDescriptor[] dependencies;
private final Int2IntHashMap paddings;
Expand All @@ -76,12 +78,13 @@ protected ProtobufValidator(
: config.subject;
this.format = config.format;
this.descriptors = new Int2ObjectCache<>(1, 1024, i -> {});
this.tree = new Int2ObjectCache<>(1, 1024, i -> {});
this.builders = new Object2ObjectHashMap<>();
this.in = new DirectBufferInputStream();
this.dependencies = new FileDescriptor[0];
this.indexes = new LinkedList<>();
this.paddings = new Int2IntHashMap(-1);
this.valueRO = new UnsafeBuffer();
this.out = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
}

protected FileDescriptor supplyDescriptor(
Expand All @@ -90,6 +93,12 @@ protected FileDescriptor supplyDescriptor(
return descriptors.computeIfAbsent(schemaId, this::createDescriptors);
}

protected DescriptorTree supplyDescriptorTree(
int schemaId)
{
return tree.computeIfAbsent(schemaId, this::createDescriptorTree);
}

protected byte[] encodeIndexes()
{
int size = indexes.size();
Expand Down Expand Up @@ -118,6 +127,7 @@ protected int decodeIndexes(
int length)
{
int progress = 0;
indexes.clear();
int encodedLength = decodeIndex(data.getByte(index));
progress += BitUtil.SIZE_OF_BYTE;
if (encodedLength == 0)
Expand All @@ -135,7 +145,7 @@ protected int decodeIndexes(
protected int supplyIndexPadding(
int schemaId)
{
return paddings.computeIfAbsent(schemaId, id -> calculateIndexPadding(supplyDescriptor(id)));
return paddings.computeIfAbsent(schemaId, this::calculateIndexPadding);
}

protected int supplyJsonFormatPadding(
Expand All @@ -148,14 +158,14 @@ protected DynamicMessage.Builder supplyDynamicMessageBuilder(
Descriptors.Descriptor descriptor)
{
DynamicMessage.Builder builder;
if (builders.containsKey(catalog.record))
if (builders.containsKey(descriptor.getFullName()))
{
builder = builders.get(catalog.record);
builder = builders.get(descriptor.getFullName());
}
else
{
builder = createDynamicMessageBuilder(descriptor);
builders.put(catalog.record, builder);
builders.put(descriptor.getFullName(), builder);
}
return builder;
}
Expand All @@ -181,12 +191,13 @@ private int decodeIndex(
}

private int calculateIndexPadding(
FileDescriptor descriptor)
int schemaId)
{
int padding = 0;
if (descriptor != null)
DescriptorTree trees = supplyDescriptorTree(schemaId);
if (trees != null && catalog.record != null)
{
DescriptorTree tree = new DescriptorTree(descriptor).findByName(catalog.record);
DescriptorTree tree = trees.findByName(catalog.record);
if (tree != null)
{
padding = tree.indexes.size() + 1;
Expand Down Expand Up @@ -245,4 +256,17 @@ private FileDescriptor createDescriptors(
}
return descriptor;
}

private DescriptorTree createDescriptorTree(
int schemaId)
{
DescriptorTree tree = null;
FileDescriptor descriptor = supplyDescriptor(schemaId);

if (descriptor != null)
{
tree = new DescriptorTree(descriptor);
}
return tree;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
package io.aklivity.zilla.runtime.validator.protobuf;

import java.io.IOException;
import java.io.InputStreamReader;
import java.util.function.LongFunction;

import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.io.ExpandableDirectBufferOutputStream;
import org.agrona.io.DirectBufferInputStream;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
Expand All @@ -36,15 +36,19 @@
public class ProtobufWriteValidator extends ProtobufValidator implements ValueValidator, FragmentValidator
{
private final DirectBuffer indexesRO;
private final ExpandableDirectBufferOutputStream out;
private final InputStreamReader input;
private final DirectBufferInputStream in;
private final JsonFormat.Parser parser;

public ProtobufWriteValidator(
ProtobufValidatorConfig config,
LongFunction<CatalogHandler> supplyCatalog)
{
super(config, supplyCatalog);
this.indexesRO = new UnsafeBuffer();
this.out = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
this.in = new DirectBufferInputStream();
this.input = new InputStreamReader(in);
this.parser = JsonFormat.parser();
}

@Override
Expand Down Expand Up @@ -113,19 +117,22 @@ private boolean validate(
int length)
{
boolean status = false;
Descriptors.FileDescriptor fileDescriptor = supplyDescriptor(schemaId);
if (fileDescriptor != null)
DescriptorTree trees = supplyDescriptorTree(schemaId);
if (trees != null && catalog.record != null)
{
DescriptorTree tree = new DescriptorTree(fileDescriptor).findByName(catalog.record);
DescriptorTree tree = trees.findByName(catalog.record);
if (tree != null)
{
Descriptors.Descriptor descriptor = tree.descriptor;
indexes.add(tree.indexes.size());
indexes.addAll(tree.indexes);
in.wrap(buffer, index, length);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
try
{
in.wrap(buffer, index, length);
DynamicMessage message = DynamicMessage.parseFrom(descriptor, in);
builder.mergeFrom(in);
DynamicMessage message = builder.build();
builder.clear();
status = message.getUnknownFields().asMap().isEmpty();
}
catch (IOException e)
Expand Down Expand Up @@ -155,7 +162,8 @@ private int encode(
indexesRO.wrap(encodeIndexes());
valLength = indexes.size();
}
next.accept(valueRO, 0, valLength);
indexes.clear();
next.accept(indexesRO, 0, valLength);
next.accept(buffer, index, length);
return valLength + length;
}
Expand All @@ -168,22 +176,22 @@ private int serializeJsonRecord(
ValueConsumer next)
{
int valLength = -1;
Descriptors.FileDescriptor fileDescriptor = supplyDescriptor(schemaId);
if (fileDescriptor != null)
DescriptorTree tree = supplyDescriptorTree(schemaId);
if (tree != null && catalog.record != null)
{
DescriptorTree tree = new DescriptorTree(fileDescriptor).findByName(catalog.record);
tree = tree.findByName(catalog.record);
if (tree != null)
{
Descriptors.Descriptor descriptor = tree.descriptor;
indexes.add(tree.indexes.size());
indexes.addAll(tree.indexes);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
in.wrap(buffer, index, length);
try
{
byte[] byteArray = new byte[length];
buffer.getBytes(index, byteArray);
JsonFormat.parser().merge(new String(byteArray), builder);
parser.merge(input, builder);
DynamicMessage message = builder.build();
builder.clear();
if (message.isInitialized() && message.getUnknownFields().asMap().isEmpty())
{
out.wrap(out.buffer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
module io.aklivity.zilla.runtime.validator.protobuf
{
requires org.antlr.antlr4.runtime;
requires protobuf.java;
requires io.aklivity.zilla.runtime.engine;

exports io.aklivity.zilla.runtime.validator.json.config;
exports io.aklivity.zilla.runtime.validator.protobuf.config;

provides io.aklivity.zilla.runtime.engine.config.ValidatorConfigAdapterSpi
with io.aklivity.zilla.runtime.validator.protobuf.config.ProtobufValidatorConfigAdapter;
Expand Down
Loading