Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
json convertor changes and unit tests
  • Loading branch information
ankitk-me committed Jan 8, 2024
commit ef84156927d35d22f0201759e29ba4e57f71af49
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ bindings:
options:
value:
type: protobuf
format: json
catalog:
catalog0:
- subject: test0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
{
"const": "protobuf"
},
"format":
{
"type": "string",
"enum":
[
"json"
]
},
"catalog":
{
"type": "object",
Expand Down
6 changes: 6 additions & 0 deletions incubator/validator-protobuf/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ WARRANTIES OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.

This project includes:
error-prone annotations under Apache 2.0
FindBugs-jsr305 under The Apache Software License, Version 2.0
Gson under Apache-2.0
Guava: Google Core Libraries for Java under Apache License, Version 2.0
J2ObjC Annotations under Apache License, Version 2.0
Protocol Buffers [Core] under BSD-3-Clause
Protocol Buffers [Util] under BSD-3-Clause


This project also includes code under copyright of the following entities:
Expand Down
7 changes: 6 additions & 1 deletion incubator/validator-protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.84</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.90</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand All @@ -48,6 +48,11 @@
<artifactId>protobuf-java</artifactId>
<version>3.24.4</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.25.1</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected DescriptorTree findByIndexes(

for (Integer index : indexes)
{
current = current.getChild(index);
current = current.findChild(index);
if (current == null)
{
return null;
Expand All @@ -100,7 +100,7 @@ private DescriptorTree findParent(
return this.children.getOrDefault(part, null);
}

private DescriptorTree getChild(
private DescriptorTree findChild(
int index)
{
DescriptorTree tree = this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.util.JsonFormat;

import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.validator.FragmentValidator;
Expand All @@ -33,11 +34,14 @@

public class ProtobufReadValidator extends ProtobufValidator implements ValueValidator, FragmentValidator
{
private final JsonFormat.Printer printer;

public ProtobufReadValidator(
ProtobufValidatorConfig config,
LongFunction<CatalogHandler> supplyCatalog)
{
super(config, supplyCatalog);
this.printer = JsonFormat.printer();
}

@Override
Expand All @@ -46,7 +50,25 @@ public int padding(
int index,
int length)
{
return FragmentValidator.super.padding(data, index, length);
int padding = 0;
if (FORMAT_JSON.equals(format))
{
int schemaId = handler.resolve(data, index, length);

if (schemaId == NO_SCHEMA_ID)
{
if (catalog.id != NO_SCHEMA_ID)
{
schemaId = catalog.id;
}
else
{
schemaId = handler.resolve(subject, catalog.version);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (catalog.id != NO_SCHEMA_ID)
{
schemaId = catalog.id;
}
else
{
schemaId = handler.resolve(subject, catalog.version);
}
schemaId = catalog.id != NO_SCHEMA_ID ? catalog.id : handler.resolve(subject, catalog.version);

}
padding = supplyJsonFormatPadding(schemaId);
}
return padding;
}

@Override
Expand Down Expand Up @@ -88,8 +110,6 @@ private int decodePayload(
int length,
ValueConsumer next)
{
int valLength = -1;

if (schemaId == NO_SCHEMA_ID)
{
if (catalog.id != NO_SCHEMA_ID)
Expand All @@ -103,24 +123,18 @@ private int decodePayload(
}

int progress = decodeIndexes(data, index, length);
int currentIndex = index + progress;
int remainingLength = length - progress;

if (validate(schemaId, data, currentIndex, remainingLength))
{
next.accept(data, currentIndex, remainingLength);
valLength = remainingLength;
}
return valLength;
return validate(schemaId, data, index + progress, length - progress, next);
}

private boolean validate(
private int validate(
int schemaId,
DirectBuffer buffer,
DirectBuffer data,
int index,
int length)
int length,
ValueConsumer next)
{
boolean status = false;
int valLength = -1;
Descriptors.FileDescriptor fileDescriptor = supplyDescriptor(schemaId);
if (fileDescriptor != null)
{
Expand All @@ -130,16 +144,31 @@ private boolean validate(
Descriptors.Descriptor descriptor = tree.descriptor;
try
{
in.wrap(buffer, index, length);
in.wrap(data, index, length);
DynamicMessage message = DynamicMessage.parseFrom(descriptor, in);
status = message.getUnknownFields().asMap().isEmpty();
if (message.getUnknownFields().asMap().isEmpty())
{
if (FORMAT_JSON.equals(format))
{
String json = printer.print(message);
int jsonLength = json.length();
valueRO.wrap(json.getBytes());
next.accept(valueRO, 0, jsonLength);
valLength = jsonLength;
}
else
{
next.accept(data, index, length);
valLength = length;
}
}
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
return status;
return valLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectCache;
import org.agrona.collections.Object2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.io.DirectBufferInputStream;
import org.antlr.v4.runtime.BailErrorStrategy;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CharStreams;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTreeWalker;

import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;

import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
Expand All @@ -42,15 +49,20 @@
public class ProtobufValidator
{
protected static final byte ZERO_INDEX = 0x0;
protected static final String FORMAT_JSON = "json";

protected final SchemaConfig catalog;
protected final CatalogHandler handler;
protected final String subject;
protected final String format;
protected final List<Integer> indexes;
protected final DirectBufferInputStream in;
protected final DirectBuffer valueRO;

private final Int2ObjectCache<FileDescriptor> descriptors;
private final Object2ObjectHashMap<String, DynamicMessage.Builder> builders;
private final FileDescriptor[] dependencies;
private final Int2IntHashMap paddings;

protected ProtobufValidator(
ProtobufValidatorConfig config,
Expand All @@ -62,10 +74,14 @@ protected ProtobufValidator(
this.subject = catalog != null && catalog.subject != null
? catalog.subject
: config.subject;
this.format = config.format;
this.descriptors = new Int2ObjectCache<>(1, 1024, i -> {});
this.builders = new Object2ObjectHashMap<>();
this.in = new DirectBufferInputStream();
this.dependencies = new FileDescriptor[0];
this.indexes = new LinkedList<>();
this.paddings = new Int2IntHashMap(-1);
this.valueRO = new UnsafeBuffer();
}

protected FileDescriptor supplyDescriptor(
Expand Down Expand Up @@ -116,6 +132,40 @@ protected int decodeIndexes(
return progress;
}

protected int supplyIndexPadding(
int schemaId)
{
return paddings.computeIfAbsent(schemaId, id -> calculateIndexPadding(supplyDescriptor(id)));
}

protected int supplyJsonFormatPadding(
int schemaId)
{
return paddings.computeIfAbsent(schemaId, id -> calculateJsonFormatPadding(supplyDescriptor(id)));
}

protected DynamicMessage.Builder supplyDynamicMessageBuilder(
Descriptors.Descriptor descriptor)
{
DynamicMessage.Builder builder;
if (builders.containsKey(catalog.record))
{
builder = builders.get(catalog.record);
}
else
{
builder = createDynamicMessageBuilder(descriptor);
builders.put(catalog.record, builder);
}
return builder;
}

private DynamicMessage.Builder createDynamicMessageBuilder(
Descriptors.Descriptor descriptor)
{
return DynamicMessage.newBuilder(descriptor);
}

private int decodeIndex(
byte encodedByte)
{
Expand All @@ -130,6 +180,41 @@ private int decodeIndex(
return (result >>> 1) ^ -(result & 1);
}

private int calculateIndexPadding(
FileDescriptor descriptor)
{
int padding = 0;
if (descriptor != null)
{
DescriptorTree tree = new DescriptorTree(descriptor).findByName(catalog.record);
if (tree != null)
{
padding = tree.indexes.size() + 1;
}
}
return padding;
}

private int calculateJsonFormatPadding(
FileDescriptor descriptor)
{
int padding = 0;

if (descriptor != null)
{
try
{
padding = 2 + JsonFormat.printer().print(descriptor.toProto()).length();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should reuse this printer?
How often is this method called when messages are flowing?
Why 2 + ...? Suggest using a constant to better document the intent.

}
catch (InvalidProtocolBufferException e)
{
e.printStackTrace();
}

}
return padding;
}

private FileDescriptor createDescriptors(
int schemaId)
{
Expand Down
Loading