Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Avro Schema & GenericDatum cache detect schema change update
  • Loading branch information
ankitk-me committed Jan 30, 2024
commit 691bf2e53875f7a645c7fcc6e27a4f652c922746
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.function.LongFunction;
import java.util.zip.CRC32C;

import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
Expand All @@ -43,7 +44,7 @@
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig;

public abstract class AvroConverterHandler
public abstract class AvroModelHandler
{
protected static final String VIEW_JSON = "json";

Expand All @@ -67,8 +68,10 @@ public abstract class AvroConverterHandler
private final Int2ObjectCache<GenericDatumWriter<GenericRecord>> writers;
private final Int2ObjectCache<GenericRecord> records;
private final Int2IntHashMap paddings;
private final Int2IntHashMap crcCache;
private final CRC32C crc32c;

protected AvroConverterHandler(
protected AvroModelHandler(
AvroModelConfig config,
LongFunction<CatalogHandler> supplyCatalog)
{
Expand All @@ -90,6 +93,8 @@ protected AvroConverterHandler(
this.paddings = new Int2IntHashMap(-1);
this.expandable = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
this.in = new DirectBufferInputStream();
this.crc32c = new CRC32C();
this.crcCache = new Int2IntHashMap(0);
}

protected final boolean validate(
Expand All @@ -101,6 +106,7 @@ protected final boolean validate(
boolean status = false;
try
{
invalidateCacheOnSchemaUpdate(schemaId);
GenericRecord record = supplyRecord(schemaId);
in.wrap(buffer, index, length);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
Expand Down Expand Up @@ -147,6 +153,26 @@ protected final GenericRecord supplyRecord(
return records.computeIfAbsent(schemaId, this::createRecord);
}

protected void invalidateCacheOnSchemaUpdate(
int schemaId)
{
if (crcCache.containsKey(schemaId))
{
String schemaText = handler.resolve(schemaId);
int checkSum = generateCRC32C(schemaText);
if (schemaText != null && crcCache.get(schemaId) != checkSum)
{
crcCache.remove(schemaId);
schemas.remove(schemaId);
readers.remove(schemaId);
writers.remove(schemaId);
records.remove(schemaId);
paddings.remove(schemaId);

}
}
}

private GenericDatumReader<GenericRecord> createReader(
int schemaId)
{
Expand Down Expand Up @@ -191,6 +217,7 @@ private Schema resolveSchema(
if (schemaText != null)
{
schema = new Schema.Parser().parse(schemaText);
crcCache.put(schemaId, generateCRC32C(schemaText));
}
return schema;
}
Expand All @@ -217,4 +244,13 @@ private int calculatePadding(
}
return padding;
}

private int generateCRC32C(
String schemaText)
{
byte[] bytes = schemaText.getBytes();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is going to allocate a new byte[] on every call to validate.
Let's instead store the actual schemaText rather than a crc32c hash to compare.

crc32c.reset();
crc32c.update(bytes, 0, bytes.length);
return (int) crc32c.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig;

public class AvroReadConverterHandler extends AvroConverterHandler implements ConverterHandler
public class AvroReadConverterHandler extends AvroModelHandler implements ConverterHandler
{
public AvroReadConverterHandler(
AvroModelConfig config,
Expand Down Expand Up @@ -125,6 +125,7 @@ private void deserializeRecord(
{
try
{
invalidateCacheOnSchemaUpdate(schemaId);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
GenericDatumWriter<GenericRecord> writer = supplyWriter(schemaId);
if (reader != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig;

public class AvroWriteConverterHandler extends AvroConverterHandler implements ConverterHandler
public class AvroWriteConverterHandler extends AvroModelHandler implements ConverterHandler
{
public AvroWriteConverterHandler(
AvroModelConfig config,
Expand Down Expand Up @@ -80,6 +80,7 @@ private int serializeJsonRecord(
{
try
{
invalidateCacheOnSchemaUpdate(schemaId);
Schema schema = supplySchema(schemaId);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
GenericDatumWriter<GenericRecord> writer = supplyWriter(schemaId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void shouldCreateReader()
.build();

assertThat(model, instanceOf(AvroModel.class));
assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(AvroConverterHandler.class));
assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(AvroConverterHandler.class));
assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(AvroReadConverterHandler.class));
assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(AvroWriteConverterHandler.class));
}
}