diff --git a/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroConverterHandler.java b/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java similarity index 85% rename from incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroConverterHandler.java rename to incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java index c85d45194d..34628373c7 100644 --- a/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroConverterHandler.java +++ b/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelHandler.java @@ -20,6 +20,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.function.LongFunction; +import java.util.zip.CRC32C; import org.agrona.DirectBuffer; import org.agrona.ExpandableDirectByteBuffer; @@ -43,7 +44,7 @@ import io.aklivity.zilla.runtime.engine.config.SchemaConfig; import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig; -public abstract class AvroConverterHandler +public abstract class AvroModelHandler { protected static final String VIEW_JSON = "json"; @@ -67,8 +68,10 @@ public abstract class AvroConverterHandler private final Int2ObjectCache> writers; private final Int2ObjectCache records; private final Int2IntHashMap paddings; + private final Int2IntHashMap crcCache; + private final CRC32C crc32c; - protected AvroConverterHandler( + protected AvroModelHandler( AvroModelConfig config, LongFunction supplyCatalog) { @@ -90,6 +93,8 @@ protected AvroConverterHandler( this.paddings = new Int2IntHashMap(-1); this.expandable = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer()); this.in = new DirectBufferInputStream(); + this.crc32c = new CRC32C(); + this.crcCache = new Int2IntHashMap(0); } protected final boolean validate( @@ -101,6 +106,7 @@ protected final boolean validate( boolean status = false; try { + invalidateCacheOnSchemaUpdate(schemaId); GenericRecord record = supplyRecord(schemaId); in.wrap(buffer, index, length); GenericDatumReader reader = supplyReader(schemaId); @@ -147,6 +153,26 @@ protected final GenericRecord supplyRecord( return records.computeIfAbsent(schemaId, this::createRecord); } + protected void invalidateCacheOnSchemaUpdate( + int schemaId) + { + if (crcCache.containsKey(schemaId)) + { + String schemaText = handler.resolve(schemaId); + int checkSum = generateCRC32C(schemaText); + if (schemaText != null && crcCache.get(schemaId) != checkSum) + { + crcCache.remove(schemaId); + schemas.remove(schemaId); + readers.remove(schemaId); + writers.remove(schemaId); + records.remove(schemaId); + paddings.remove(schemaId); + + } + } + } + private GenericDatumReader createReader( int schemaId) { @@ -191,6 +217,7 @@ private Schema resolveSchema( if (schemaText != null) { schema = new Schema.Parser().parse(schemaText); + crcCache.put(schemaId, generateCRC32C(schemaText)); } return schema; } @@ -217,4 +244,13 @@ private int calculatePadding( } return padding; } + + private int generateCRC32C( + String schemaText) + { + byte[] bytes = schemaText.getBytes(); + crc32c.reset(); + crc32c.update(bytes, 0, bytes.length); + return (int) crc32c.getValue(); + } } diff --git a/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java b/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java index 7b47f60817..1e7f50cd3f 100644 --- a/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java +++ b/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroReadConverterHandler.java @@ -32,7 +32,7 @@ import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig; -public class AvroReadConverterHandler extends AvroConverterHandler implements ConverterHandler +public class AvroReadConverterHandler extends AvroModelHandler implements ConverterHandler { public AvroReadConverterHandler( AvroModelConfig config, @@ -125,6 +125,7 @@ private void deserializeRecord( { try { + invalidateCacheOnSchemaUpdate(schemaId); GenericDatumReader reader = supplyReader(schemaId); GenericDatumWriter writer = supplyWriter(schemaId); if (reader != null) diff --git a/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java b/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java index 7fa17fba7a..c3d3520eef 100644 --- a/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java +++ b/incubator/model-avro/src/main/java/io/aklivity/zilla/runtime/model/avro/internal/AvroWriteConverterHandler.java @@ -29,7 +29,7 @@ import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig; -public class AvroWriteConverterHandler extends AvroConverterHandler implements ConverterHandler +public class AvroWriteConverterHandler extends AvroModelHandler implements ConverterHandler { public AvroWriteConverterHandler( AvroModelConfig config, @@ -80,6 +80,7 @@ private int serializeJsonRecord( { try { + invalidateCacheOnSchemaUpdate(schemaId); Schema schema = supplySchema(schemaId); GenericDatumReader reader = supplyReader(schemaId); GenericDatumWriter writer = supplyWriter(schemaId); diff --git a/incubator/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelFactorySpiTest.java b/incubator/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelFactorySpiTest.java index 86ff4bad04..5e89a00f20 100644 --- a/incubator/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelFactorySpiTest.java +++ b/incubator/model-avro/src/test/java/io/aklivity/zilla/runtime/model/avro/internal/AvroModelFactorySpiTest.java @@ -31,7 +31,7 @@ public class AvroModelFactorySpiTest { @Test - public void shouldCreateReader() + public void shouldLoadAndCreate() { Configuration config = new Configuration(); ModelFactory factory = ModelFactory.instantiate(); @@ -51,7 +51,7 @@ public void shouldCreateReader() .build(); assertThat(model, instanceOf(AvroModel.class)); - assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(AvroConverterHandler.class)); - assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(AvroConverterHandler.class)); + assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(AvroReadConverterHandler.class)); + assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(AvroWriteConverterHandler.class)); } } diff --git a/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonConverterHandler.java b/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonModelHandler.java similarity index 78% rename from incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonConverterHandler.java rename to incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonModelHandler.java index ef4b254b31..aa6da0b554 100644 --- a/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonConverterHandler.java +++ b/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonModelHandler.java @@ -16,12 +16,14 @@ import java.io.StringReader; import java.util.function.LongFunction; +import java.util.zip.CRC32C; import jakarta.json.spi.JsonProvider; import jakarta.json.stream.JsonParser; import jakarta.json.stream.JsonParserFactory; import org.agrona.DirectBuffer; +import org.agrona.collections.Int2IntHashMap; import org.agrona.collections.Int2ObjectCache; import org.agrona.io.DirectBufferInputStream; import org.leadpony.justify.api.JsonSchema; @@ -35,7 +37,7 @@ import io.aklivity.zilla.runtime.engine.config.SchemaConfig; import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig; -public abstract class JsonConverterHandler +public abstract class JsonModelHandler { protected final SchemaConfig catalog; protected final CatalogHandler handler; @@ -46,9 +48,11 @@ public abstract class JsonConverterHandler private final JsonProvider schemaProvider; private final JsonValidationService service; private final JsonParserFactory factory; + private final CRC32C crc32c; + private final Int2IntHashMap crcCache; private DirectBufferInputStream in; - public JsonConverterHandler( + public JsonModelHandler( JsonModelConfig config, LongFunction supplyCatalog) { @@ -64,6 +68,8 @@ public JsonConverterHandler( this.schemas = new Int2ObjectCache<>(1, 1024, i -> {}); this.providers = new Int2ObjectCache<>(1, 1024, i -> {}); this.in = new DirectBufferInputStream(); + this.crc32c = new CRC32C(); + this.crcCache = new Int2IntHashMap(0); } protected final boolean validate( @@ -75,6 +81,7 @@ protected final boolean validate( boolean status = false; try { + invalidateCacheOnSchemaUpdate(schemaId); JsonProvider provider = supplyProvider(schemaId); in.wrap(buffer, index, length); provider.createReader(in).readValue(); @@ -87,18 +94,34 @@ protected final boolean validate( return status; } - private JsonSchema supplySchema( + protected void invalidateCacheOnSchemaUpdate( int schemaId) { - return schemas.computeIfAbsent(schemaId, this::resolveSchema); + if (crcCache.containsKey(schemaId)) + { + String schemaText = handler.resolve(schemaId); + int checkSum = generateCRC32C(schemaText); + if (schemaText != null && crcCache.get(schemaId) != checkSum) + { + crcCache.remove(schemaId); + schemas.remove(schemaId); + providers.remove(schemaId); + } + } } - private JsonProvider supplyProvider( + protected JsonProvider supplyProvider( int schemaId) { return providers.computeIfAbsent(schemaId, this::createProvider); } + private JsonSchema supplySchema( + int schemaId) + { + return schemas.computeIfAbsent(schemaId, this::resolveSchema); + } + private JsonSchema resolveSchema( int schemaId) { @@ -109,6 +132,7 @@ private JsonSchema resolveSchema( JsonParser schemaParser = factory.createParser(new StringReader(schemaText)); JsonSchemaReader reader = service.createSchemaReader(schemaParser); schema = reader.read(); + crcCache.put(schemaId, generateCRC32C(schemaText)); } return schema; @@ -125,4 +149,13 @@ private JsonProvider createProvider( } return provider; } + + private int generateCRC32C( + String schemaText) + { + byte[] bytes = schemaText.getBytes(); + crc32c.reset(); + crc32c.update(bytes, 0, bytes.length); + return (int) crc32c.getValue(); + } } diff --git a/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonReadConverterHandler.java b/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonReadConverterHandler.java index 2b0137f595..ad62353542 100644 --- a/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonReadConverterHandler.java +++ b/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonReadConverterHandler.java @@ -25,7 +25,7 @@ import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig; -public class JsonReadConverterHandler extends JsonConverterHandler implements ConverterHandler +public class JsonReadConverterHandler extends JsonModelHandler implements ConverterHandler { public JsonReadConverterHandler( JsonModelConfig config, diff --git a/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonValidatorHandler.java b/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonValidatorHandler.java index abe45e9a53..62c8d3f916 100644 --- a/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonValidatorHandler.java +++ b/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonValidatorHandler.java @@ -14,40 +14,23 @@ */ package io.aklivity.zilla.runtime.model.json.internal; -import java.io.StringReader; import java.util.function.LongFunction; import jakarta.json.spi.JsonProvider; import jakarta.json.stream.JsonParser; -import jakarta.json.stream.JsonParserFactory; import jakarta.json.stream.JsonParsingException; import org.agrona.DirectBuffer; import org.agrona.ExpandableDirectByteBuffer; -import org.agrona.collections.Int2ObjectCache; import org.agrona.io.DirectBufferInputStream; -import org.leadpony.justify.api.JsonSchema; -import org.leadpony.justify.api.JsonSchemaReader; -import org.leadpony.justify.api.JsonValidationService; -import org.leadpony.justify.api.ProblemHandler; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; -import io.aklivity.zilla.runtime.engine.config.CatalogedConfig; -import io.aklivity.zilla.runtime.engine.config.SchemaConfig; import io.aklivity.zilla.runtime.engine.model.ValidatorHandler; import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig; -public class JsonValidatorHandler implements ValidatorHandler +public class JsonValidatorHandler extends JsonModelHandler implements ValidatorHandler { - private final SchemaConfig catalog; - private final CatalogHandler handler; - private final String subject; - private final Int2ObjectCache schemas; - private final Int2ObjectCache providers; - private final JsonProvider schemaProvider; - private final JsonValidationService service; - private final JsonParserFactory factory; private final DirectBufferInputStream in; private final ExpandableDirectByteBuffer buffer; @@ -58,17 +41,7 @@ public JsonValidatorHandler( JsonModelConfig config, LongFunction supplyCatalog) { - this.schemaProvider = JsonProvider.provider(); - this.service = JsonValidationService.newInstance(); - this.factory = schemaProvider.createParserFactory(null); - CatalogedConfig cataloged = config.cataloged.get(0); - this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null; - this.handler = supplyCatalog.apply(cataloged.id); - this.subject = catalog != null && catalog.subject != null - ? catalog.subject - : config.subject; - this.schemas = new Int2ObjectCache<>(1, 1024, i -> {}); - this.providers = new Int2ObjectCache<>(1, 1024, i -> {}); + super(config, supplyCatalog); this.buffer = new ExpandableDirectByteBuffer(); this.in = new DirectBufferInputStream(buffer); } @@ -83,10 +56,6 @@ public boolean validate( { boolean status = true; - int schemaId = catalog != null && catalog.id > 0 - ? catalog.id - : handler.resolve(subject, catalog.version); - try { if ((flags & FLAGS_INIT) != 0x00) @@ -100,6 +69,12 @@ public boolean validate( if ((flags & FLAGS_FIN) != 0x00) { in.wrap(buffer, 0, progress); + + int schemaId = catalog != null && catalog.id > 0 + ? catalog.id + : handler.resolve(subject, catalog.version); + invalidateCacheOnSchemaUpdate(schemaId); + JsonProvider provider = supplyProvider(schemaId); parser = provider.createParser(in); while (parser.hasNext()) @@ -116,43 +91,4 @@ public boolean validate( return status; } - - private JsonSchema supplySchema( - int schemaId) - { - return schemas.computeIfAbsent(schemaId, this::resolveSchema); - } - - private JsonProvider supplyProvider( - int schemaId) - { - return providers.computeIfAbsent(schemaId, this::createProvider); - } - - private JsonSchema resolveSchema( - int schemaId) - { - JsonSchema schema = null; - String schemaText = handler.resolve(schemaId); - if (schemaText != null) - { - JsonParser schemaParser = factory.createParser(new StringReader(schemaText)); - JsonSchemaReader reader = service.createSchemaReader(schemaParser); - schema = reader.read(); - } - - return schema; - } - - private JsonProvider createProvider( - int schemaId) - { - JsonSchema schema = supplySchema(schemaId); - JsonProvider provider = null; - if (schema != null) - { - provider = service.createJsonProvider(schema, parser -> ProblemHandler.throwing()); - } - return provider; - } } diff --git a/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonWriteConverterHandler.java b/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonWriteConverterHandler.java index de9fc5bc53..9286f1eba2 100644 --- a/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonWriteConverterHandler.java +++ b/incubator/model-json/src/main/java/io/aklivity/zilla/runtime/model/json/internal/JsonWriteConverterHandler.java @@ -23,7 +23,7 @@ import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig; -public class JsonWriteConverterHandler extends JsonConverterHandler implements ConverterHandler +public class JsonWriteConverterHandler extends JsonModelHandler implements ConverterHandler { public JsonWriteConverterHandler( JsonModelConfig config, diff --git a/incubator/model-json/src/test/java/io/aklivity/zilla/runtime/model/json/internal/JsonConverterTest.java b/incubator/model-json/src/test/java/io/aklivity/zilla/runtime/model/json/internal/JsonConverterTest.java index a47f8b1dc2..cac188613c 100644 --- a/incubator/model-json/src/test/java/io/aklivity/zilla/runtime/model/json/internal/JsonConverterTest.java +++ b/incubator/model-json/src/test/java/io/aklivity/zilla/runtime/model/json/internal/JsonConverterTest.java @@ -210,5 +210,7 @@ public void shouldVerifyInvalidJsonArray() byte[] bytes = payload.getBytes(); data.wrap(bytes, 0, bytes.length); assertEquals(-1, converter.convert(data, 0, data.capacity(), ValueConsumer.NOP)); + + converter.invalidateCacheOnSchemaUpdate(9); } } diff --git a/incubator/model-json/src/test/java/io/aklivity/zilla/runtime/model/json/internal/JsonModelFactorySpiTest.java b/incubator/model-json/src/test/java/io/aklivity/zilla/runtime/model/json/internal/JsonModelFactorySpiTest.java index 6b0d571329..5aa0afa399 100644 --- a/incubator/model-json/src/test/java/io/aklivity/zilla/runtime/model/json/internal/JsonModelFactorySpiTest.java +++ b/incubator/model-json/src/test/java/io/aklivity/zilla/runtime/model/json/internal/JsonModelFactorySpiTest.java @@ -16,6 +16,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import org.junit.Test; @@ -31,13 +32,13 @@ public class JsonModelFactorySpiTest { @Test - public void shouldCreateReader() + public void shouldLoadAndCreate() { Configuration config = new Configuration(); ModelFactory factory = ModelFactory.instantiate(); Model model = factory.create("json", config); - ModelContext context = new JsonModelContext(mock(EngineContext.class)); + ModelContext context = model.supply(mock(EngineContext.class)); ModelConfig modelConfig = JsonModelConfig.builder() .subject("test-value") @@ -51,7 +52,9 @@ public void shouldCreateReader() .build(); assertThat(model, instanceOf(JsonModel.class)); - assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(JsonConverterHandler.class)); - assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(JsonConverterHandler.class)); + assertEquals(model.name(), "json"); + assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(JsonReadConverterHandler.class)); + assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(JsonWriteConverterHandler.class)); + assertThat(context.supplyValidatorHandler(modelConfig), instanceOf(JsonValidatorHandler.class)); } } diff --git a/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufConverterHandler.java b/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufModelHandler.java similarity index 86% rename from incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufConverterHandler.java rename to incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufModelHandler.java index 3fb20561be..43feea1f1f 100644 --- a/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufConverterHandler.java +++ b/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufModelHandler.java @@ -18,6 +18,7 @@ import java.util.LinkedList; import java.util.List; import java.util.function.LongFunction; +import java.util.zip.CRC32C; import org.agrona.BitUtil; import org.agrona.DirectBuffer; @@ -45,7 +46,7 @@ import io.aklivity.zilla.runtime.model.protobuf.internal.parser.Protobuf3Lexer; import io.aklivity.zilla.runtime.model.protobuf.internal.parser.Protobuf3Parser; -public class ProtobufConverterHandler +public class ProtobufModelHandler { protected static final byte[] ZERO_INDEX = new byte[]{0x0}; protected static final String VIEW_JSON = "json"; @@ -66,8 +67,10 @@ public class ProtobufConverterHandler private final Object2ObjectHashMap builders; private final FileDescriptor[] dependencies; private final Int2IntHashMap paddings; + private final Int2IntHashMap crcCache; + private final CRC32C crc32c; - protected ProtobufConverterHandler( + protected ProtobufModelHandler( ProtobufModelConfig config, LongFunction supplyCatalog) { @@ -86,6 +89,8 @@ protected ProtobufConverterHandler( this.indexes = new LinkedList<>(); this.paddings = new Int2IntHashMap(-1); this.out = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer()); + this.crc32c = new CRC32C(); + this.crcCache = new Int2IntHashMap(0); } protected FileDescriptor supplyDescriptor( @@ -156,10 +161,11 @@ protected int supplyJsonFormatPadding( } protected DynamicMessage.Builder supplyDynamicMessageBuilder( - Descriptors.Descriptor descriptor) + Descriptors.Descriptor descriptor, + boolean cacheUpdate) { DynamicMessage.Builder builder; - if (builders.containsKey(descriptor.getFullName())) + if (builders.containsKey(descriptor.getFullName()) && !cacheUpdate) { builder = builders.get(descriptor.getFullName()); } @@ -171,6 +177,26 @@ protected DynamicMessage.Builder supplyDynamicMessageBuilder( return builder; } + protected boolean invalidateCacheOnSchemaUpdate( + int schemaId) + { + boolean update = false; + if (crcCache.containsKey(schemaId)) + { + String schemaText = handler.resolve(schemaId); + int checkSum = generateCRC32C(schemaText); + if (schemaText != null && crcCache.get(schemaId) != checkSum) + { + crcCache.remove(schemaId); + descriptors.remove(schemaId); + tree.remove(schemaId); + paddings.remove(schemaId); + update = true; + } + } + return update; + } + private DynamicMessage.Builder createDynamicMessageBuilder( Descriptors.Descriptor descriptor) { @@ -235,6 +261,7 @@ private FileDescriptor createDescriptors( String schemaText = handler.resolve(schemaId); if (schemaText != null) { + crcCache.put(schemaId, generateCRC32C(schemaText)); CharStream input = CharStreams.fromString(schemaText); Protobuf3Lexer lexer = new Protobuf3Lexer(input); CommonTokenStream tokens = new CommonTokenStream(lexer); @@ -270,4 +297,13 @@ private DescriptorTree createDescriptorTree( } return tree; } + + private int generateCRC32C( + String schemaText) + { + byte[] bytes = schemaText.getBytes(); + crc32c.reset(); + crc32c.update(bytes, 0, bytes.length); + return (int) crc32c.getValue(); + } } diff --git a/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufReadConverterHandler.java b/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufReadConverterHandler.java index 010dace5d0..9df88b31e1 100644 --- a/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufReadConverterHandler.java +++ b/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufReadConverterHandler.java @@ -31,7 +31,7 @@ import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; import io.aklivity.zilla.runtime.model.protobuf.config.ProtobufModelConfig; -public class ProtobufReadConverterHandler extends ProtobufConverterHandler implements ConverterHandler +public class ProtobufReadConverterHandler extends ProtobufModelHandler implements ConverterHandler { private final JsonFormat.Printer printer; private final OutputStreamWriter output; @@ -112,6 +112,7 @@ private int validate( ValueConsumer next) { int valLength = -1; + boolean cacheUpdate = invalidateCacheOnSchemaUpdate(schemaId); DescriptorTree tree = supplyDescriptorTree(schemaId); if (tree != null) { @@ -119,7 +120,7 @@ private int validate( if (descriptor != null) { in.wrap(data, index, length); - DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor); + DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor, cacheUpdate); validate: try { diff --git a/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufWriteConverterHandler.java b/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufWriteConverterHandler.java index e55778ddde..941cdd08f3 100644 --- a/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufWriteConverterHandler.java +++ b/incubator/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufWriteConverterHandler.java @@ -31,7 +31,7 @@ import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; import io.aklivity.zilla.runtime.model.protobuf.config.ProtobufModelConfig; -public class ProtobufWriteConverterHandler extends ProtobufConverterHandler implements ConverterHandler +public class ProtobufWriteConverterHandler extends ProtobufModelHandler implements ConverterHandler { private final DirectBuffer indexesRO; private final InputStreamReader input; @@ -93,6 +93,7 @@ private boolean validate( int length) { boolean status = false; + boolean cacheUpdate = invalidateCacheOnSchemaUpdate(schemaId); DescriptorTree trees = supplyDescriptorTree(schemaId); if (trees != null && catalog.record != null) { @@ -104,7 +105,7 @@ private boolean validate( indexes.add(tree.indexes.size()); indexes.addAll(tree.indexes); in.wrap(buffer, index, length); - DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor); + DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor, cacheUpdate); try { DynamicMessage message = builder.mergeFrom(in).build(); @@ -152,6 +153,7 @@ private int serializeJsonRecord( ValueConsumer next) { int valLength = -1; + boolean cacheUpdate = invalidateCacheOnSchemaUpdate(schemaId); DescriptorTree tree = supplyDescriptorTree(schemaId); if (tree != null && catalog.record != null) { @@ -162,7 +164,7 @@ private int serializeJsonRecord( indexes.clear(); indexes.add(tree.indexes.size()); indexes.addAll(tree.indexes); - DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor); + DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor, cacheUpdate); in.wrap(buffer, index, length); try { diff --git a/incubator/model-protobuf/src/test/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufModelFactorySpiTest.java b/incubator/model-protobuf/src/test/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufModelFactorySpiTest.java index 90645dbcc9..27a7f643a5 100644 --- a/incubator/model-protobuf/src/test/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufModelFactorySpiTest.java +++ b/incubator/model-protobuf/src/test/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufModelFactorySpiTest.java @@ -31,7 +31,7 @@ public class ProtobufModelFactorySpiTest { @Test - public void shouldCreateReader() + public void shouldLoadAndCreate() { Configuration config = new Configuration(); ModelFactory factory = ModelFactory.instantiate(); @@ -51,7 +51,7 @@ public void shouldCreateReader() .build(); assertThat(model, instanceOf(ProtobufModel.class)); - assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(ProtobufConverterHandler.class)); - assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(ProtobufConverterHandler.class)); + assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(ProtobufReadConverterHandler.class)); + assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(ProtobufWriteConverterHandler.class)); } }