Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
41a8899
Schema Config Update
ankitk-me Sep 28, 2023
f26457c
converter implementation
ankitk-me Oct 6, 2023
fd69f8b
Merge branch 'aklivity:develop' into convertor
ankitk-me Oct 6, 2023
fdb9d98
test coverage update
ankitk-me Oct 6, 2023
dcecf18
schema update
ankitk-me Oct 6, 2023
b12c609
Interface update to support Streaming validation
ankitk-me Oct 12, 2023
3621172
refactoring
ankitk-me Oct 18, 2023
8435d0e
bug fix
ankitk-me Oct 19, 2023
71451a7
IT fix
ankitk-me Oct 25, 2023
2a882d4
Feature/m1 docker build support (#376)
vordimous Oct 7, 2023
c177894
0 for no mqtt session expiry should be mapped to max integer value fo…
akrambek Oct 7, 2023
add6eb3
Bump alpine in /cloud/docker-image/src/main/docker/release (#484)
dependabot[bot] Oct 7, 2023
366daff
Better handle request with same group id (#498)
akrambek Oct 11, 2023
e362fe9
Prepare release 0.9.55
jfallows Oct 11, 2023
66680f9
Update CHANGELOG.md
jfallows Oct 11, 2023
6829614
Fix flow control bug in mqtt-kakfa publish (#524)
bmaidics Oct 20, 2023
bb9fb56
Add extraEnv to Deployment in the helm chart (#511)
attilakreiner Oct 21, 2023
a1235ce
Sporadic github action build failure fix (#522)
akrambek Oct 23, 2023
6eb8a43
Merge branch 'feature/schema-registry' into convertor
ankitk-me Oct 25, 2023
f7bf292
pom fix
ankitk-me Oct 25, 2023
fd59bbb
updating Varint32FW initialisation
ankitk-me Oct 25, 2023
4c264d7
Fragment Validator Interface & Schema Update
ankitk-me Oct 27, 2023
1e1e73d
String & Test Fragment Validator implementation
ankitk-me Oct 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updating Varint32FW initialisation
  • Loading branch information
ankitk-me committed Oct 25, 2023
commit fd59bbb3897eff002f98a4fa8db8dd900dcfad5f
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public final class KafkaCachePartition
private final MutableDirectBuffer entryInfo = new UnsafeBuffer(new byte[6 * Long.BYTES + 3 * Integer.BYTES + Short.BYTES]);
private final MutableDirectBuffer valueInfo = new UnsafeBuffer(new byte[Integer.BYTES]);

private final Varint32FW.Builder varIntRW = new Varint32FW.Builder();
private final Varint32FW.Builder varIntRW = new Varint32FW.Builder().wrap(new UnsafeBuffer(new byte[5]), 0, 5);
private final Array32FW<KafkaHeaderFW> headersRO = new Array32FW<KafkaHeaderFW>(new KafkaHeaderFW());

private final DirectBufferInputStream ancestorIn = new DirectBufferInputStream();
Expand Down Expand Up @@ -394,19 +394,16 @@ public void writeEntryStart(
{
final ValueConsumer writeValue = (buffer, index, length) ->
{
Varint32FW newLength = varIntRW
.wrap(new UnsafeBuffer(new byte[5]), 0, 5)
.set(length)
.build();
Varint32FW newLength = varIntRW.set(length).build();
logFile.appendBytes(newLength);
logFile.appendBytes(buffer, index, length);
};
OctetsFW value = key.value();
int validated = validator.validate(value.buffer(), value.offset(), value.sizeof(), writeValue);
if (validated == -1)
{
logFile.appendBytes(key);
// For Fetch Validation failure, we still push the event to Cache
logFile.appendBytes(key);
// TODO: Placeholder to log fetch validation failure
}
}
Expand Down Expand Up @@ -452,8 +449,8 @@ public void writeEntryContinue(
int validated = validator.validate(payload.buffer(), payload.offset(), payload.sizeof(), writeValue);
if (validated == -1)
{
logFile.appendBytes(payload.buffer(), payload.offset(), payload.sizeof());
// For Fetch Validation failure, we still push the event to Cache
logFile.appendBytes(payload.buffer(), payload.offset(), payload.sizeof());
// TODO: Placeholder to log fetch validation failure
}
}
Expand Down Expand Up @@ -590,10 +587,7 @@ public int writeProduceEntryStart(
{
final ValueConsumer writeValue = (buffer, index, length) ->
{
Varint32FW newLength = varIntRW
.wrap(new UnsafeBuffer(new byte[5]), 0, 5)
.set(length)
.build();
Varint32FW newLength = varIntRW.set(length).build();
logFile.appendBytes(newLength);
logFile.appendBytes(buffer, index, length);
};
Expand Down