Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
TTL based cache update cleanup
  • Loading branch information
ankitk-me committed Feb 1, 2024
commit f601836e7f67852c234b1ea3edc4266888f86e9c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.function.LongFunction;
import java.util.zip.CRC32C;

import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
Expand Down Expand Up @@ -68,8 +67,6 @@ public abstract class AvroModelHandler
private final Int2ObjectCache<GenericDatumWriter<GenericRecord>> writers;
private final Int2ObjectCache<GenericRecord> records;
private final Int2IntHashMap paddings;
private final Int2IntHashMap crcCache;
private final CRC32C crc32c;

protected AvroModelHandler(
AvroModelConfig config,
Expand All @@ -93,8 +90,6 @@ protected AvroModelHandler(
this.paddings = new Int2IntHashMap(-1);
this.expandable = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
this.in = new DirectBufferInputStream();
this.crc32c = new CRC32C();
this.crcCache = new Int2IntHashMap(0);
}

protected final boolean validate(
Expand All @@ -106,7 +101,6 @@ protected final boolean validate(
boolean status = false;
try
{
invalidateCacheOnSchemaUpdate(schemaId);
GenericRecord record = supplyRecord(schemaId);
in.wrap(buffer, index, length);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
Expand Down Expand Up @@ -153,26 +147,6 @@ protected final GenericRecord supplyRecord(
return records.computeIfAbsent(schemaId, this::createRecord);
}

protected void invalidateCacheOnSchemaUpdate(
int schemaId)
{
if (crcCache.containsKey(schemaId))
{
String schemaText = handler.resolve(schemaId);
int checkSum = generateCRC32C(schemaText);
if (schemaText != null && crcCache.get(schemaId) != checkSum)
{
crcCache.remove(schemaId);
schemas.remove(schemaId);
readers.remove(schemaId);
writers.remove(schemaId);
records.remove(schemaId);
paddings.remove(schemaId);

}
}
}

private GenericDatumReader<GenericRecord> createReader(
int schemaId)
{
Expand Down Expand Up @@ -217,7 +191,6 @@ private Schema resolveSchema(
if (schemaText != null)
{
schema = new Schema.Parser().parse(schemaText);
crcCache.put(schemaId, generateCRC32C(schemaText));
}
return schema;
}
Expand All @@ -244,13 +217,4 @@ private int calculatePadding(
}
return padding;
}

private int generateCRC32C(
String schemaText)
{
byte[] bytes = schemaText.getBytes();
crc32c.reset();
crc32c.update(bytes, 0, bytes.length);
return (int) crc32c.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ private void deserializeRecord(
{
try
{
invalidateCacheOnSchemaUpdate(schemaId);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
GenericDatumWriter<GenericRecord> writer = supplyWriter(schemaId);
if (reader != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ private int serializeJsonRecord(
{
try
{
invalidateCacheOnSchemaUpdate(schemaId);
Schema schema = supplySchema(schemaId);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
GenericDatumWriter<GenericRecord> writer = supplyWriter(schemaId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

import java.io.StringReader;
import java.util.function.LongFunction;
import java.util.zip.CRC32C;

import jakarta.json.spi.JsonProvider;
import jakarta.json.stream.JsonParser;
import jakarta.json.stream.JsonParserFactory;

import org.agrona.DirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectCache;
import org.agrona.io.DirectBufferInputStream;
import org.leadpony.justify.api.JsonSchema;
Expand All @@ -48,8 +46,6 @@ public abstract class JsonModelHandler
private final JsonProvider schemaProvider;
private final JsonValidationService service;
private final JsonParserFactory factory;
private final CRC32C crc32c;
private final Int2IntHashMap crcCache;
private DirectBufferInputStream in;

public JsonModelHandler(
Expand All @@ -68,8 +64,6 @@ public JsonModelHandler(
this.schemas = new Int2ObjectCache<>(1, 1024, i -> {});
this.providers = new Int2ObjectCache<>(1, 1024, i -> {});
this.in = new DirectBufferInputStream();
this.crc32c = new CRC32C();
this.crcCache = new Int2IntHashMap(0);
}

protected final boolean validate(
Expand All @@ -81,7 +75,6 @@ protected final boolean validate(
boolean status = false;
try
{
invalidateCacheOnSchemaUpdate(schemaId);
JsonProvider provider = supplyProvider(schemaId);
in.wrap(buffer, index, length);
provider.createReader(in).readValue();
Expand All @@ -94,22 +87,6 @@ protected final boolean validate(
return status;
}

protected void invalidateCacheOnSchemaUpdate(
int schemaId)
{
if (crcCache.containsKey(schemaId))
{
String schemaText = handler.resolve(schemaId);
int checkSum = generateCRC32C(schemaText);
if (schemaText != null && crcCache.get(schemaId) != checkSum)
{
crcCache.remove(schemaId);
schemas.remove(schemaId);
providers.remove(schemaId);
}
}
}

protected JsonProvider supplyProvider(
int schemaId)
{
Expand All @@ -132,7 +109,6 @@ private JsonSchema resolveSchema(
JsonParser schemaParser = factory.createParser(new StringReader(schemaText));
JsonSchemaReader reader = service.createSchemaReader(schemaParser);
schema = reader.read();
crcCache.put(schemaId, generateCRC32C(schemaText));
}

return schema;
Expand All @@ -149,13 +125,4 @@ private JsonProvider createProvider(
}
return provider;
}

private int generateCRC32C(
String schemaText)
{
byte[] bytes = schemaText.getBytes();
crc32c.reset();
crc32c.update(bytes, 0, bytes.length);
return (int) crc32c.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public boolean validate(
int schemaId = catalog != null && catalog.id > 0
? catalog.id
: handler.resolve(subject, catalog.version);
invalidateCacheOnSchemaUpdate(schemaId);

JsonProvider provider = supplyProvider(schemaId);
parser = provider.createParser(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ public void shouldVerifyInvalidJsonArray()
"]";
byte[] bytes = payload.getBytes();
data.wrap(bytes, 0, bytes.length);
assertEquals(-1, converter.convert(data, 0, data.capacity(), ValueConsumer.NOP));

converter.invalidateCacheOnSchemaUpdate(9);
assertEquals(-1, converter.convert(data, 0, data.capacity(), ValueConsumer.NOP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.function.LongFunction;
import java.util.zip.CRC32C;

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
Expand Down Expand Up @@ -67,8 +66,6 @@ public class ProtobufModelHandler
private final Object2ObjectHashMap<String, DynamicMessage.Builder> builders;
private final FileDescriptor[] dependencies;
private final Int2IntHashMap paddings;
private final Int2IntHashMap crcCache;
private final CRC32C crc32c;

protected ProtobufModelHandler(
ProtobufModelConfig config,
Expand All @@ -89,8 +86,6 @@ protected ProtobufModelHandler(
this.indexes = new LinkedList<>();
this.paddings = new Int2IntHashMap(-1);
this.out = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
this.crc32c = new CRC32C();
this.crcCache = new Int2IntHashMap(0);
}

protected FileDescriptor supplyDescriptor(
Expand Down Expand Up @@ -161,11 +156,10 @@ protected int supplyJsonFormatPadding(
}

protected DynamicMessage.Builder supplyDynamicMessageBuilder(
Descriptors.Descriptor descriptor,
boolean cacheUpdate)
Descriptors.Descriptor descriptor)
{
DynamicMessage.Builder builder;
if (builders.containsKey(descriptor.getFullName()) && !cacheUpdate)
if (builders.containsKey(descriptor.getFullName()))
{
builder = builders.get(descriptor.getFullName());
}
Expand All @@ -177,26 +171,6 @@ protected DynamicMessage.Builder supplyDynamicMessageBuilder(
return builder;
}

protected boolean invalidateCacheOnSchemaUpdate(
int schemaId)
{
boolean update = false;
if (crcCache.containsKey(schemaId))
{
String schemaText = handler.resolve(schemaId);
int checkSum = generateCRC32C(schemaText);
if (schemaText != null && crcCache.get(schemaId) != checkSum)
{
crcCache.remove(schemaId);
descriptors.remove(schemaId);
tree.remove(schemaId);
paddings.remove(schemaId);
update = true;
}
}
return update;
}

private DynamicMessage.Builder createDynamicMessageBuilder(
Descriptors.Descriptor descriptor)
{
Expand Down Expand Up @@ -261,7 +235,6 @@ private FileDescriptor createDescriptors(
String schemaText = handler.resolve(schemaId);
if (schemaText != null)
{
crcCache.put(schemaId, generateCRC32C(schemaText));
CharStream input = CharStreams.fromString(schemaText);
Protobuf3Lexer lexer = new Protobuf3Lexer(input);
CommonTokenStream tokens = new CommonTokenStream(lexer);
Expand Down Expand Up @@ -297,13 +270,4 @@ private DescriptorTree createDescriptorTree(
}
return tree;
}

private int generateCRC32C(
String schemaText)
{
byte[] bytes = schemaText.getBytes();
crc32c.reset();
crc32c.update(bytes, 0, bytes.length);
return (int) crc32c.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,14 @@ private int validate(
ValueConsumer next)
{
int valLength = -1;
boolean cacheUpdate = invalidateCacheOnSchemaUpdate(schemaId);
DescriptorTree tree = supplyDescriptorTree(schemaId);
if (tree != null)
{
Descriptors.Descriptor descriptor = tree.findByIndexes(indexes);
if (descriptor != null)
{
in.wrap(data, index, length);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor, cacheUpdate);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
validate:
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ private boolean validate(
int length)
{
boolean status = false;
boolean cacheUpdate = invalidateCacheOnSchemaUpdate(schemaId);
DescriptorTree trees = supplyDescriptorTree(schemaId);
if (trees != null && catalog.record != null)
{
Expand All @@ -105,7 +104,7 @@ private boolean validate(
indexes.add(tree.indexes.size());
indexes.addAll(tree.indexes);
in.wrap(buffer, index, length);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor, cacheUpdate);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
try
{
DynamicMessage message = builder.mergeFrom(in).build();
Expand Down Expand Up @@ -153,7 +152,6 @@ private int serializeJsonRecord(
ValueConsumer next)
{
int valLength = -1;
boolean cacheUpdate = invalidateCacheOnSchemaUpdate(schemaId);
DescriptorTree tree = supplyDescriptorTree(schemaId);
if (tree != null && catalog.record != null)
{
Expand All @@ -164,7 +162,7 @@ private int serializeJsonRecord(
indexes.clear();
indexes.add(tree.indexes.size());
indexes.addAll(tree.indexes);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor, cacheUpdate);
DynamicMessage.Builder builder = supplyDynamicMessageBuilder(descriptor);
in.wrap(buffer, index, length);
try
{
Expand Down