Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Protobuf Schema & Descriptor cache detect schema change update
  • Loading branch information
ankitk-me committed Jan 30, 2024
commit 6308e4dd65900ebff5952ae4b2b36fc34d01a890
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class AvroModelFactorySpiTest
{
@Test
public void shouldCreateReader()
public void shouldLoadAndCreate()
{
Configuration config = new Configuration();
ModelFactory factory = ModelFactory.instantiate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.function.LongFunction;
import java.util.zip.CRC32C;

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
Expand Down Expand Up @@ -45,7 +46,7 @@
import io.aklivity.zilla.runtime.model.protobuf.internal.parser.Protobuf3Lexer;
import io.aklivity.zilla.runtime.model.protobuf.internal.parser.Protobuf3Parser;

public class ProtobufConverterHandler
public class ProtobufModelHandler
{
protected static final byte[] ZERO_INDEX = new byte[]{0x0};
protected static final String VIEW_JSON = "json";
Expand All @@ -66,8 +67,10 @@ public class ProtobufConverterHandler
private final Object2ObjectHashMap<String, DynamicMessage.Builder> builders;
private final FileDescriptor[] dependencies;
private final Int2IntHashMap paddings;
private final Int2IntHashMap crcCache;
private final CRC32C crc32c;

protected ProtobufConverterHandler(
protected ProtobufModelHandler(
ProtobufModelConfig config,
LongFunction<CatalogHandler> supplyCatalog)
{
Expand All @@ -86,6 +89,8 @@ protected ProtobufConverterHandler(
this.indexes = new LinkedList<>();
this.paddings = new Int2IntHashMap(-1);
this.out = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
this.crc32c = new CRC32C();
this.crcCache = new Int2IntHashMap(0);
}

protected FileDescriptor supplyDescriptor(
Expand Down Expand Up @@ -156,10 +161,11 @@ protected int supplyJsonFormatPadding(
}

protected DynamicMessage.Builder supplyDynamicMessageBuilder(
Descriptors.Descriptor descriptor)
Descriptors.Descriptor descriptor,
boolean cacheUpdate)
{
DynamicMessage.Builder builder;
if (builders.containsKey(descriptor.getFullName()))
if (builders.containsKey(descriptor.getFullName()) && !cacheUpdate)
{
builder = builders.get(descriptor.getFullName());
}
Expand All @@ -171,6 +177,26 @@ protected DynamicMessage.Builder supplyDynamicMessageBuilder(
return builder;
}

protected boolean invalidateCacheOnSchemaUpdate(
int schemaId)
{
boolean update = false;
if (crcCache.containsKey(schemaId))
{
String schemaText = handler.resolve(schemaId);
int checkSum = generateCRC32C(schemaText);
if (schemaText != null && crcCache.get(schemaId) != checkSum)
{
crcCache.remove(schemaId);
descriptors.remove(schemaId);
tree.remove(schemaId);
paddings.remove(schemaId);
update = true;
}
}
return update;
}

private DynamicMessage.Builder createDynamicMessageBuilder(
Descriptors.Descriptor descriptor)
{
Expand Down Expand Up @@ -235,6 +261,7 @@ private FileDescriptor createDescriptors(
String schemaText = handler.resolve(schemaId);
if (schemaText != null)
{
crcCache.put(schemaId, generateCRC32C(schemaText));
CharStream input = CharStreams.fromString(schemaText);
Protobuf3Lexer lexer = new Protobuf3Lexer(input);
CommonTokenStream tokens = new CommonTokenStream(lexer);
Expand Down Expand Up @@ -270,4 +297,13 @@ private DescriptorTree createDescriptorTree(
}
return tree;
}

private int generateCRC32C(
String schemaText)
{
byte[] bytes = schemaText.getBytes();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same feedback here as for avro.

crc32c.reset();
crc32c.update(bytes, 0, bytes.length);
return (int) crc32c.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
import io.aklivity.zilla.runtime.model.protobuf.config.ProtobufModelConfig;

public class ProtobufReadConverterHandler extends ProtobufConverterHandler implements ConverterHandler
public class ProtobufReadConverterHandler extends ProtobufModelHandler implements ConverterHandler
{
private final JsonFormat.Printer printer;
private final OutputStreamWriter output;
Expand Down Expand Up @@ -112,14 +112,15 @@ private int validate(
ValueConsumer next)
{
int valLength = -1;
boolean cacheUpdate = invalidateCacheOnSchemaUpdate(schemaId);
DescriptorTree tree = supplyDescriptorTree(schemaId);
if (tree != null)
{
Descriptors.Descriptor descriptor = tree.findByIndexes(indexes);
if (descriptor != null)
{
in.wrap(data, index, length);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor, cacheUpdate);
validate:
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
import io.aklivity.zilla.runtime.model.protobuf.config.ProtobufModelConfig;

public class ProtobufWriteConverterHandler extends ProtobufConverterHandler implements ConverterHandler
public class ProtobufWriteConverterHandler extends ProtobufModelHandler implements ConverterHandler
{
private final DirectBuffer indexesRO;
private final InputStreamReader input;
Expand Down Expand Up @@ -93,6 +93,7 @@ private boolean validate(
int length)
{
boolean status = false;
boolean cacheUpdate = invalidateCacheOnSchemaUpdate(schemaId);
DescriptorTree trees = supplyDescriptorTree(schemaId);
if (trees != null && catalog.record != null)
{
Expand All @@ -104,7 +105,7 @@ private boolean validate(
indexes.add(tree.indexes.size());
indexes.addAll(tree.indexes);
in.wrap(buffer, index, length);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor, cacheUpdate);
try
{
DynamicMessage message = builder.mergeFrom(in).build();
Expand Down Expand Up @@ -152,6 +153,7 @@ private int serializeJsonRecord(
ValueConsumer next)
{
int valLength = -1;
boolean cacheUpdate = invalidateCacheOnSchemaUpdate(schemaId);
DescriptorTree tree = supplyDescriptorTree(schemaId);
if (tree != null && catalog.record != null)
{
Expand All @@ -162,7 +164,7 @@ private int serializeJsonRecord(
indexes.clear();
indexes.add(tree.indexes.size());
indexes.addAll(tree.indexes);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor, cacheUpdate);
in.wrap(buffer, index, length);
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class ProtobufModelFactorySpiTest
{
@Test
public void shouldCreateReader()
public void shouldLoadAndCreate()
{
Configuration config = new Configuration();
ModelFactory factory = ModelFactory.instantiate();
Expand All @@ -51,7 +51,7 @@ public void shouldCreateReader()
.build();

assertThat(model, instanceOf(ProtobufModel.class));
assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(ProtobufConverterHandler.class));
assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(ProtobufConverterHandler.class));
assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(ProtobufReadConverterHandler.class));
assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(ProtobufWriteConverterHandler.class));
}
}