Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
41a8899
Schema Config Update
ankitk-me Sep 28, 2023
f26457c
converter implementation
ankitk-me Oct 6, 2023
fd69f8b
Merge branch 'aklivity:develop' into convertor
ankitk-me Oct 6, 2023
fdb9d98
test coverage update
ankitk-me Oct 6, 2023
dcecf18
schema update
ankitk-me Oct 6, 2023
b12c609
Interface update to support Streaming validation
ankitk-me Oct 12, 2023
3621172
refactoring
ankitk-me Oct 18, 2023
8435d0e
bug fix
ankitk-me Oct 19, 2023
71451a7
IT fix
ankitk-me Oct 25, 2023
2a882d4
Feature/m1 docker build support (#376)
vordimous Oct 7, 2023
c177894
0 for no mqtt session expiry should be mapped to max integer value fo…
akrambek Oct 7, 2023
add6eb3
Bump alpine in /cloud/docker-image/src/main/docker/release (#484)
dependabot[bot] Oct 7, 2023
366daff
Better handle request with same group id (#498)
akrambek Oct 11, 2023
e362fe9
Prepare release 0.9.55
jfallows Oct 11, 2023
66680f9
Update CHANGELOG.md
jfallows Oct 11, 2023
6829614
Fix flow control bug in mqtt-kakfa publish (#524)
bmaidics Oct 20, 2023
bb9fb56
Add extraEnv to Deployment in the helm chart (#511)
attilakreiner Oct 21, 2023
a1235ce
Sporadic github action build failure fix (#522)
akrambek Oct 23, 2023
6eb8a43
Merge branch 'feature/schema-registry' into convertor
ankitk-me Oct 25, 2023
f7bf292
pom fix
ankitk-me Oct 25, 2023
fd59bbb
updating Varint32FW initialisation
ankitk-me Oct 25, 2023
4c264d7
Fragment Validator Interface & Schema Update
ankitk-me Oct 27, 2023
1e1e73d
String & Test Fragment Validator implementation
ankitk-me Oct 30, 2023
65f12f0
Addressing review feedback
ankitk-me Nov 1, 2023
705f7d4
Addressing review comments
ankitk-me Nov 2, 2023
02b3b7a
avro validator.yaml update
ankitk-me Nov 6, 2023
6191c15
Schema patch issue fix
ankitk-me Nov 7, 2023
7d6418a
dynamic value length allocation issue
ankitk-me Nov 13, 2023
9c533a5
addressing review comments
ankitk-me Nov 15, 2023
364acb4
Use parametric types to avoid both cast and warning about raw types
jfallows Nov 16, 2023
31e70f4
Use converted file to handle variable size converted value during pro…
jfallows Nov 16, 2023
69dfe0b
Prefix is byte followed by big endian int32, requires literal byte se…
jfallows Nov 16, 2023
767d008
adding test coverage
ankitk-me Nov 17, 2023
0b1e590
optimising object allocation Avro validator
ankitk-me Nov 17, 2023
ce59fb0
checkstyle fix
ankitk-me Nov 17, 2023
2d6a9e3
IT & implementation to support fetch message without Schema ID prefix
ankitk-me Nov 21, 2023
664cece
addressing review feedback
ankitk-me Nov 27, 2023
b4b02c8
fetch message without Schema ID prefix implementation
ankitk-me Nov 28, 2023
f4e90e9
Avro & Json Read Validator fix
ankitk-me Nov 28, 2023
8075684
fix checkstyle
ankitk-me Nov 28, 2023
4dec79f
ITs for convertor & updating Test Validator
ankitk-me Nov 29, 2023
11d3747
dynamic message size after conversion implementation
ankitk-me Dec 12, 2023
7281c2f
Merge branch 'feature/schema-registry' into convertor
ankitk-me Dec 13, 2023
08cc929
updating latest changes with Value & Fragment Validator interface.
ankitk-me Dec 13, 2023
3cf0185
Converter bug fix
ankitk-me Dec 14, 2023
a8f9c08
Addressing review feedback
ankitk-me Dec 18, 2023
1d82d23
addressing review comments
ankitk-me Dec 19, 2023
8fb1016
using ExpandableDirectByteBuffer with valid index & length
ankitk-me Dec 19, 2023
9f3e434
review feedback & adding functional interface to CatalogHandler
ankitk-me Dec 20, 2023
c48b2d7
encoded bug fix: position reset to 0
ankitk-me Dec 21, 2023
9b77963
Addressing review comments
ankitk-me Dec 21, 2023
49f2963
Avro unit test fix
ankitk-me Dec 21, 2023
ee1e231
return -1 and ignore prefix.sizeof() in case of validation failure
ankitk-me Dec 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressing review feedback
  • Loading branch information
ankitk-me committed Dec 18, 2023
commit a8f9c0826d1c5c932d23fc4a6f644e396e3785d1
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ public InlineCatalogHandler(
registerSchema(config.subjects);
}

@Override
public String type()
{
return INLINE;
}

@Override
public int register(
String subject,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,29 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteOrder;
import java.text.MessageFormat;
import java.util.zip.CRC32C;

import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2ObjectCache;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryOptionsConfig;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.RegisterSchemaRequest;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.validator.function.ValueConsumer;

public class SchemaRegistryCatalogHandler implements CatalogHandler
{
private static final String SUBJECT_VERSION_PATH = "/subjects/{0}/versions/{1}";
private static final String SCHEMA_PATH = "/schemas/ids/{0}";
private static final String REGISTER_SCHEMA_PATH = "/subjects/{0}/versions";
private static final byte MAGIC_BYTE = 0x0;
private static final int ENRICHED_LENGTH = 5;
private static final int MAX_PADDING_LEN = 5;

private final MutableDirectBuffer prefixRO;
private final HttpClient client;
private final String baseUrl;
private final RegisterSchemaRequest request;
Expand All @@ -49,12 +57,7 @@ public SchemaRegistryCatalogHandler(
this.crc32c = new CRC32C();
this.cache = new Int2ObjectCache<>(1, 1024, i -> {});
this.schemaIdCache = new Int2ObjectCache<>(1, 1024, i -> {});
}

@Override
public String type()
{
return SCHEMA_REGISTRY;
this.prefixRO = new UnsafeBuffer(new byte[5]);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have this structure generated from an internal idl instead for better readability, much like we do for the kafka cache entry descriptions and other protocol codec flyweights.

}

@Override
Expand Down Expand Up @@ -130,6 +133,23 @@ public int resolve(
return schemaId;
}

@Override
public int enrich(
int schemaId,
ValueConsumer next)
{
prefixRO.putByte(0, MAGIC_BYTE);
prefixRO.putInt(1, schemaId, ByteOrder.BIG_ENDIAN);
next.accept(prefixRO, 0, 5);
return ENRICHED_LENGTH;
}

@Override
public int maxPadding()
{
return MAX_PADDING_LEN;
}

private String sendHttpRequest(
String path)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.rules.RuleChain.outerRule;

import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -31,6 +33,7 @@
import org.kaazing.k3po.junit.rules.K3poRule;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryOptionsConfig;
import io.aklivity.zilla.runtime.engine.validator.function.ValueConsumer;

public class SchemaRegistryIT
{
Expand Down Expand Up @@ -153,4 +156,23 @@ public void shouldResolveSchemaViaSubjectVersionFromCache() throws Exception
assertThat(schema, not(nullValue()));
assertEquals(expected, schema);
}

@Test
public void shouldVerifyMaxPadding()
{
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);

assertEquals(5, catalog.maxPadding());
}

@Test
public void shouldVerifyEnrichedData()
{
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);

MutableDirectBuffer value = new UnsafeBuffer(new byte[ 5]);
value.putBytes(0, new byte[]{0x00, 0x00, 0x00, 0x00, 0x01});

assertEquals(value.capacity(), catalog.enrich(1, ValueConsumer.NOP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.agrona.DirectBuffer;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.JsonEncoder;

import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
Expand Down Expand Up @@ -64,7 +67,7 @@ else if (catalog.id != NO_SCHEMA_ID)
{
schemaId = handler.resolve(subject, catalog.version);
}
padding = paddings.computeIfAbsent(schemaId, this::supplyPadding);
padding = supplyPadding(schemaId);
}
return padding;
}
Expand Down Expand Up @@ -117,7 +120,7 @@ else if (catalog.id != NO_SCHEMA_ID)
schemaId = handler.resolve(subject, catalog.version);
}

reader = readers.computeIfAbsent(schemaId, this::supplyReader);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
if (reader != null)
{
int payloadLength = length - progress;
Expand All @@ -128,7 +131,7 @@ else if (catalog.id != NO_SCHEMA_ID)
int recordLength = record.length;
if (recordLength > 0)
{
valLength = record.length;
valLength = recordLength;
valueRO.wrap(record);
next.accept(valueRO, 0, valLength);
}
Expand All @@ -151,12 +154,13 @@ private byte[] deserializeRecord(
encoded.reset();
try
{
record = records.computeIfAbsent(schemaId, this::supplyRecord);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
GenericDatumWriter<GenericRecord> writer = supplyWriter(schemaId);
GenericRecord record = supplyRecord(schemaId);
in.wrap(buffer, index, length);
record = reader.read(record, decoderFactory.binaryDecoder(in, decoder));
Schema schema = record.getSchema();
JsonEncoder out = encoderFactory.jsonEncoder(schema, encoded);
writer = writers.computeIfAbsent(schemaId, this::supplyWriter);
writer.write(record, out);
out.flush();
encoded.close();
Expand Down
Loading