Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
41a8899
Schema Config Update
ankitk-me Sep 28, 2023
f26457c
converter implementation
ankitk-me Oct 6, 2023
fd69f8b
Merge branch 'aklivity:develop' into convertor
ankitk-me Oct 6, 2023
fdb9d98
test coverage update
ankitk-me Oct 6, 2023
dcecf18
schema update
ankitk-me Oct 6, 2023
b12c609
Interface update to support Streaming validation
ankitk-me Oct 12, 2023
3621172
refactoring
ankitk-me Oct 18, 2023
8435d0e
bug fix
ankitk-me Oct 19, 2023
71451a7
IT fix
ankitk-me Oct 25, 2023
2a882d4
Feature/m1 docker build support (#376)
vordimous Oct 7, 2023
c177894
0 for no mqtt session expiry should be mapped to max integer value fo…
akrambek Oct 7, 2023
add6eb3
Bump alpine in /cloud/docker-image/src/main/docker/release (#484)
dependabot[bot] Oct 7, 2023
366daff
Better handle request with same group id (#498)
akrambek Oct 11, 2023
e362fe9
Prepare release 0.9.55
jfallows Oct 11, 2023
66680f9
Update CHANGELOG.md
jfallows Oct 11, 2023
6829614
Fix flow control bug in mqtt-kakfa publish (#524)
bmaidics Oct 20, 2023
bb9fb56
Add extraEnv to Deployment in the helm chart (#511)
attilakreiner Oct 21, 2023
a1235ce
Sporadic github action build failure fix (#522)
akrambek Oct 23, 2023
6eb8a43
Merge branch 'feature/schema-registry' into convertor
ankitk-me Oct 25, 2023
f7bf292
pom fix
ankitk-me Oct 25, 2023
fd59bbb
updating Varint32FW initialisation
ankitk-me Oct 25, 2023
4c264d7
Fragment Validator Interface & Schema Update
ankitk-me Oct 27, 2023
1e1e73d
String & Test Fragment Validator implementation
ankitk-me Oct 30, 2023
65f12f0
Addressing review feedback
ankitk-me Nov 1, 2023
705f7d4
Addressing review comments
ankitk-me Nov 2, 2023
02b3b7a
avro validator.yaml update
ankitk-me Nov 6, 2023
6191c15
Schema patch issue fix
ankitk-me Nov 7, 2023
7d6418a
dynamic value length allocation issue
ankitk-me Nov 13, 2023
9c533a5
addressing review comments
ankitk-me Nov 15, 2023
364acb4
Use parametric types to avoid both cast and warning about raw types
jfallows Nov 16, 2023
31e70f4
Use converted file to handle variable size converted value during pro…
jfallows Nov 16, 2023
69dfe0b
Prefix is byte followed by big endian int32, requires literal byte se…
jfallows Nov 16, 2023
767d008
adding test coverage
ankitk-me Nov 17, 2023
0b1e590
optimising object allocation Avro validator
ankitk-me Nov 17, 2023
ce59fb0
checkstyle fix
ankitk-me Nov 17, 2023
2d6a9e3
IT & implementation to support fetch message without Schema ID prefix
ankitk-me Nov 21, 2023
664cece
addressing review feedback
ankitk-me Nov 27, 2023
b4b02c8
fetch message without Schema ID prefix implementation
ankitk-me Nov 28, 2023
f4e90e9
Avro & Json Read Validator fix
ankitk-me Nov 28, 2023
8075684
fix checkstyle
ankitk-me Nov 28, 2023
4dec79f
ITs for convertor & updating Test Validator
ankitk-me Nov 29, 2023
11d3747
dynamic message size after conversion implementation
ankitk-me Dec 12, 2023
7281c2f
Merge branch 'feature/schema-registry' into convertor
ankitk-me Dec 13, 2023
08cc929
updating latest changes with Value & Fragment Validator interface.
ankitk-me Dec 13, 2023
3cf0185
Converter bug fix
ankitk-me Dec 14, 2023
a8f9c08
Addressing review feedback
ankitk-me Dec 18, 2023
1d82d23
addressing review comments
ankitk-me Dec 19, 2023
8fb1016
using ExpandableDirectByteBuffer with valid index & length
ankitk-me Dec 19, 2023
9f3e434
review feedback & adding functional interface to CatalogHandler
ankitk-me Dec 20, 2023
c48b2d7
encoded bug fix: position reset to 0
ankitk-me Dec 21, 2023
9b77963
Addressing review comments
ankitk-me Dec 21, 2023
49f2963
Avro unit test fix
ankitk-me Dec 21, 2023
ee1e231
return -1 and ignore prefix.sizeof() in case of validation failure
ankitk-me Dec 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
addressing review comments
  • Loading branch information
ankitk-me committed Dec 19, 2023
commit 1d82d23dbf0bc643a481328a6256b428c6ecf014
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class SchemaRegistryCatalogHandler implements CatalogHandler
private static final String REGISTER_SCHEMA_PATH = "/subjects/{0}/versions";
private static final byte MAGIC_BYTE = 0x0;
private static final int ENRICHED_LENGTH = 5;
private static final int MAX_PADDING_LEN = 5;
private static final int MAX_PADDING_LENGTH = 5;

private final MutableDirectBuffer prefixRO;
private final HttpClient client;
Expand Down Expand Up @@ -147,7 +147,7 @@ public int enrich(
@Override
public int maxPadding()
{
return MAX_PADDING_LEN;
return MAX_PADDING_LENGTH;
}

private String sendHttpRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,13 @@ else if (catalog.id != NO_SCHEMA_ID)
int payloadIndex = index + progress;
if (FORMAT_JSON.equals(format))
{
byte[] record = deserializeRecord(schemaId, data, payloadIndex, payloadLength);
int recordLength = record.length;
int recordLength = encoded.position();
deserializeRecord(schemaId, data, payloadIndex, payloadLength);
recordLength = encoded.position() - recordLength;
if (recordLength > 0)
{
valLength = recordLength;
valueRO.wrap(record);
valueRO.wrap(encoded.buffer());
next.accept(valueRO, 0, valLength);
}
}
Expand All @@ -145,13 +146,12 @@ else if (validate(schemaId, data, payloadIndex, payloadLength))
return valLength;
}

private byte[] deserializeRecord(
private void deserializeRecord(
int schemaId,
DirectBuffer buffer,
int index,
int length)
{
encoded.reset();
try
{
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
Expand All @@ -169,6 +169,5 @@ record = reader.read(record, decoderFactory.binaryDecoder(in, decoder));
{
Comment thread
jfallows marked this conversation as resolved.
ex.printStackTrace();
}
return encoded.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.function.LongFunction;

import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectCache;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.io.DirectBufferInputStream;
import org.agrona.io.ExpandableDirectBufferOutputStream;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;

Expand All @@ -45,7 +49,8 @@ public abstract class AvroValidator
protected static final byte MAGIC_BYTE = 0x0;
protected static final String FORMAT_JSON = "json";

private static final InputStream EMPTY_STREAM = new ByteArrayInputStream(new byte[0]);
private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
private static final OutputStream EMPTY_OUTPUT_STREAM = new ByteArrayOutputStream(0);
private static final int JSON_FIELD_STRUCTURE_LENGTH = "\"\":\"\",".length();

protected final DirectBuffer valueRO;
Expand All @@ -54,9 +59,10 @@ public abstract class AvroValidator
protected final DecoderFactory decoderFactory;
protected final EncoderFactory encoderFactory;
protected final BinaryDecoder decoder;
protected final BinaryEncoder encoder;
protected final String subject;
protected final String format;
protected final ByteArrayOutputStream encoded;
protected final ExpandableDirectBufferOutputStream encoded;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming this to expandable as it is used for both encode and decode.

protected final DirectBufferInputStream in;

private final Int2ObjectCache<Schema> schemas;
Expand All @@ -70,8 +76,9 @@ protected AvroValidator(
LongFunction<CatalogHandler> supplyCatalog)
{
this.decoderFactory = DecoderFactory.get();
this.decoder = decoderFactory.binaryDecoder(EMPTY_STREAM, null);
this.decoder = decoderFactory.binaryDecoder(EMPTY_INPUT_STREAM, null);
this.encoderFactory = EncoderFactory.get();
this.encoder = encoderFactory.binaryEncoder(EMPTY_OUTPUT_STREAM, null);
CatalogedConfig cataloged = config.cataloged.get(0);
this.handler = supplyCatalog.apply(cataloged.id);
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null;
Expand All @@ -85,7 +92,7 @@ protected AvroValidator(
this.records = new Int2ObjectCache<>(1, 1024, i -> {});
this.paddings = new Int2IntHashMap(-1);
this.valueRO = new UnsafeBuffer();
this.encoded = new ByteArrayOutputStream();
this.encoded = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
this.in = new DirectBufferInputStream();
}

Expand All @@ -100,7 +107,7 @@ protected final boolean validate(
{
GenericRecord record = supplyRecord(schemaId);
in.wrap(buffer, index, length);
GenericDatumReader<GenericRecord> reader = readers.computeIfAbsent(schemaId, this::supplyReader);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
reader.read(record, decoderFactory.binaryDecoder(in, decoder));
status = true;
}
Expand All @@ -126,24 +133,25 @@ protected final int supplyPadding(
protected final GenericDatumReader<GenericRecord> supplyReader(
int schemaId)
{
return readers.computeIfAbsent(schemaId, id -> createReader(supplySchema(id)));
return readers.computeIfAbsent(schemaId, this::createReader);
}

protected final GenericDatumWriter<GenericRecord> supplyWriter(
int schemaId)
{
return writers.computeIfAbsent(schemaId, id -> createWriter(supplySchema(id)));
return writers.computeIfAbsent(schemaId, this::createWriter);
}

protected final GenericRecord supplyRecord(
int schemaId)
{
return records.computeIfAbsent(schemaId, id -> createRecord(supplySchema(schemaId)));
return records.computeIfAbsent(schemaId, this::createRecord);
}

private GenericDatumReader<GenericRecord> createReader(
Schema schema)
int schemaId)
{
Schema schema = supplySchema(schemaId);
GenericDatumReader<GenericRecord> reader = null;
if (schema != null)
{
Expand All @@ -153,8 +161,9 @@ private GenericDatumReader<GenericRecord> createReader(
}

private GenericDatumWriter<GenericRecord> createWriter(
Schema schema)
int schemaId)
{
Schema schema = supplySchema(schemaId);
GenericDatumWriter<GenericRecord> writer = null;
if (schema != null)
{
Expand All @@ -164,8 +173,9 @@ private GenericDatumWriter<GenericRecord> createWriter(
}

private GenericRecord createRecord(
Schema schema)
int schemaId)
{
Schema schema = supplySchema(schemaId);
GenericRecord record = null;
if (schema != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;

import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.validator.FragmentValidator;
Expand Down Expand Up @@ -91,13 +90,13 @@ private int validateComplete(
{
if (FORMAT_JSON.equals(format))
{
byte[] record = serializeJsonRecord(schemaId, data, index, length);

int recordLength = record.length;
int recordLength = encoded.position();
serializeJsonRecord(schemaId, data, index, length);
recordLength = encoded.position() - recordLength;
if (recordLength > 0)
{
valLength = recordLength + handler.enrich(schemaId, next);
valueRO.wrap(record);
valueRO.wrap(encoded.buffer());
next.accept(valueRO, 0, recordLength);
}
}
Expand All @@ -110,13 +109,12 @@ else if (validate(schemaId, data, index, length))
return valLength;
}

private byte[] serializeJsonRecord(
private void serializeJsonRecord(
int schemaId,
DirectBuffer buffer,
int index,
int length)
{
encoded.reset();
try
{
Schema schema = supplySchema(schemaId);
Expand All @@ -125,15 +123,14 @@ private byte[] serializeJsonRecord(
GenericRecord record = supplyRecord(schemaId);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the json decoder below be precreated and reused instead of creating afresh each time?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't seems possible as we don't have a method to take reuse JsonDecoder.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I was able to optimise BinaryEncoder & reuse the same instance.

in.wrap(buffer, index, length);
record = reader.read(record, decoderFactory.jsonDecoder(schema, in));
BinaryEncoder out = encoderFactory.binaryEncoder(encoded, null);
writer.write(record, out);
out.flush();
encoderFactory.binaryEncoder(encoded, encoder);
writer.write(record, encoder);
encoder.flush();
encoded.close();
}
catch (IOException | AvroRuntimeException ex)
{
ex.printStackTrace();
}
return encoded.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public void shouldVerifyValidAvroEvent()
CatalogConfig catalogConfig = new CatalogConfig("test0", "test",
TestCatalogOptionsConfig.builder()
.id(9)
.exclude(true)
.schema(SCHEMA)
.build());
LongFunction<CatalogHandler> handler = value -> context.attach(catalogConfig);
Expand Down Expand Up @@ -173,7 +172,6 @@ public void shouldReadAvroEventExpectJson()
CatalogConfig catalogConfig = new CatalogConfig("test0", "test",
TestCatalogOptionsConfig.builder()
.id(9)
.exclude(true)
.schema(SCHEMA)
.build());
LongFunction<CatalogHandler> handler = value -> context.attach(catalogConfig);
Expand Down Expand Up @@ -207,6 +205,8 @@ public void shouldReadAvroEventExpectJson()

int progress = validator.validate(data, 0, data.capacity(), ValueConsumer.NOP);
assertEquals(expected.capacity(), progress);

assertEquals(expected.capacity(), validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand Down Expand Up @@ -248,6 +248,8 @@ public void shouldWriteJsonEventExpectAvro()
data.wrap(payload.getBytes(), 0, payload.getBytes().length);
int progress = validator.validate(data, 0, data.capacity(), ValueConsumer.NOP);
assertEquals(expected.capacity(), progress);

assertEquals(expected.capacity(), validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ public boolean validate(
int length)
{
final int limit = index + length;
validate:
while (index < limit)
{
final int charByte0 = data.getByte(index);
final int charByteCount = (charByte0 & 0b1000_0000) != 0
? Integer.numberOfLeadingZeros((~charByte0 & 0xff) << 24)
: 1;

for (int j = 1; j < charByteCount; j++)
final int charByteLimit = index + charByteCount;
for (int charByteIndex = index + 1; charByteIndex < charByteLimit; charByteIndex++)
{
if (index + j >= limit || (data.getByte(index + j) & 0b11000000) != 0b10000000)
if (charByteIndex >= limit || (data.getByte(charByteIndex) & 0b11000000) != 0b10000000)
{
break;
break validate;
}
}
index += charByteCount;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are calculating index + j more than once here and also j is not descriptive.

final int charByteLimit =. index + charByteCount;
for (int charByteIndex = index + 1; charByteIndex < charByteLimit; charByteIndex++)
{
    if (charByteIndex >= limit || (data.getByte(charByteIndex) & 0b11000000) != 0b10000000)
    {
        break validate;
    }
}

I think the break above needs to break from the outer while loop, not just the for loop, so we'll need to add a validate: label to the while loop, agree?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private JsonSchema supplySchema(
private JsonProvider supplyProvider(
int schemaId)
{
return providers.computeIfAbsent(schemaId, id -> createProvider(supplySchema(id)));
return providers.computeIfAbsent(schemaId, this::createProvider);
}

private JsonSchema resolveSchema(
Expand All @@ -117,8 +117,9 @@ private JsonSchema resolveSchema(
}

private JsonProvider createProvider(
JsonSchema schema)
int schemaId)
{
JsonSchema schema = supplySchema(schemaId);
JsonProvider provider = null;
if (schema != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
public interface CatalogHandler
{
int NO_SCHEMA_ID = 0;
int NO_ENRICHMENT = 0;
int ZERO_PADDING = 0;

int register(
String subject,
Expand All @@ -39,11 +37,11 @@ default int enrich(
int schemaId,
ValueConsumer next)
{
return NO_ENRICHMENT;
return 0;
}

default int maxPadding()
{
return ZERO_PADDING;
return 0;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rename to encodePadding to indicate this is needed only for encode case.

Also let's rename Validator.maxPadding to Validator.padding since padding concept is already an upper bound in zilla.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@

public class TestCatalogHandler implements CatalogHandler
{
private static final int MAX_PADDING_LEN = 10;
private static final int MAX_PADDING_LENGTH = 10;
private static final byte MAGIC_BYTE = 0x0;
private static final int ENRICHED_LENGTH = 5;
private static final int PREFIX_LENGTH = 5;

private final String schema;
private final MutableDirectBuffer prefixRO;
private final int id;
private final boolean embed;
private final boolean exclude;

public TestCatalogHandler(
TestCatalogOptionsConfig config)
Expand All @@ -43,7 +42,6 @@ public TestCatalogHandler(
this.prefixRO = new UnsafeBuffer(new byte[5]);
this.id = config.id;
this.embed = config.embed;
this.exclude = config.exclude;
}

@Override
Expand All @@ -56,20 +54,16 @@ public int enrich(
{
prefixRO.putByte(0, MAGIC_BYTE);
prefixRO.putInt(1, schemaId, ByteOrder.BIG_ENDIAN);
next.accept(prefixRO, 0, 5);
length = ENRICHED_LENGTH;
}
else if (exclude)
{
length = ENRICHED_LENGTH;
next.accept(prefixRO, 0, PREFIX_LENGTH);
length = PREFIX_LENGTH;
}
return length;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to have booleans for embed and exclude or is this more a funciton of whether the validator is read vs write?

If it is a property of the validator, then perhaps we need to pass this context from the validator to the catalog handler, either via a parameter or by having 2 methods on catalog handler?

}

@Override
public int maxPadding()
{
return MAX_PADDING_LEN;
return MAX_PADDING_LENGTH;
}

@Override
Expand Down
Loading