Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
41a8899
Schema Config Update
ankitk-me Sep 28, 2023
f26457c
converter implementation
ankitk-me Oct 6, 2023
fd69f8b
Merge branch 'aklivity:develop' into convertor
ankitk-me Oct 6, 2023
fdb9d98
test coverage update
ankitk-me Oct 6, 2023
dcecf18
schema update
ankitk-me Oct 6, 2023
b12c609
Interface update to support Streaming validation
ankitk-me Oct 12, 2023
3621172
refactoring
ankitk-me Oct 18, 2023
8435d0e
bug fix
ankitk-me Oct 19, 2023
71451a7
IT fix
ankitk-me Oct 25, 2023
2a882d4
Feature/m1 docker build support (#376)
vordimous Oct 7, 2023
c177894
0 for no mqtt session expiry should be mapped to max integer value fo…
akrambek Oct 7, 2023
add6eb3
Bump alpine in /cloud/docker-image/src/main/docker/release (#484)
dependabot[bot] Oct 7, 2023
366daff
Better handle request with same group id (#498)
akrambek Oct 11, 2023
e362fe9
Prepare release 0.9.55
jfallows Oct 11, 2023
66680f9
Update CHANGELOG.md
jfallows Oct 11, 2023
6829614
Fix flow control bug in mqtt-kakfa publish (#524)
bmaidics Oct 20, 2023
bb9fb56
Add extraEnv to Deployment in the helm chart (#511)
attilakreiner Oct 21, 2023
a1235ce
Sporadic github action build failure fix (#522)
akrambek Oct 23, 2023
6eb8a43
Merge branch 'feature/schema-registry' into convertor
ankitk-me Oct 25, 2023
f7bf292
pom fix
ankitk-me Oct 25, 2023
fd59bbb
updating Varint32FW initialisation
ankitk-me Oct 25, 2023
4c264d7
Fragment Validator Interface & Schema Update
ankitk-me Oct 27, 2023
1e1e73d
String & Test Fragment Validator implementation
ankitk-me Oct 30, 2023
65f12f0
Addressing review feedback
ankitk-me Nov 1, 2023
705f7d4
Addressing review comments
ankitk-me Nov 2, 2023
02b3b7a
avro validator.yaml update
ankitk-me Nov 6, 2023
6191c15
Schema patch issue fix
ankitk-me Nov 7, 2023
7d6418a
dynamic value length allocation issue
ankitk-me Nov 13, 2023
9c533a5
addressing review comments
ankitk-me Nov 15, 2023
364acb4
Use parametric types to avoid both cast and warning about raw types
jfallows Nov 16, 2023
31e70f4
Use converted file to handle variable size converted value during pro…
jfallows Nov 16, 2023
69dfe0b
Prefix is byte followed by big endian int32, requires literal byte se…
jfallows Nov 16, 2023
767d008
adding test coverage
ankitk-me Nov 17, 2023
0b1e590
optimising object allocation Avro validator
ankitk-me Nov 17, 2023
ce59fb0
checkstyle fix
ankitk-me Nov 17, 2023
2d6a9e3
IT & implementation to support fetch message without Schema ID prefix
ankitk-me Nov 21, 2023
664cece
addressing review feedback
ankitk-me Nov 27, 2023
b4b02c8
fetch message without Schema ID prefix implementation
ankitk-me Nov 28, 2023
f4e90e9
Avro & Json Read Validator fix
ankitk-me Nov 28, 2023
8075684
fix checkstyle
ankitk-me Nov 28, 2023
4dec79f
ITs for convertor & updating Test Validator
ankitk-me Nov 29, 2023
11d3747
dynamic message size after conversion implementation
ankitk-me Dec 12, 2023
7281c2f
Merge branch 'feature/schema-registry' into convertor
ankitk-me Dec 13, 2023
08cc929
updating latest changes with Value & Fragment Validator interface.
ankitk-me Dec 13, 2023
3cf0185
Converter bug fix
ankitk-me Dec 14, 2023
a8f9c08
Addressing review feedback
ankitk-me Dec 18, 2023
1d82d23
addressing review comments
ankitk-me Dec 19, 2023
8fb1016
using ExpandableDirectByteBuffer with valid index & length
ankitk-me Dec 19, 2023
9f3e434
review feedback & adding functional interface to CatalogHandler
ankitk-me Dec 20, 2023
c48b2d7
encoded bug fix: position reset to 0
ankitk-me Dec 21, 2023
9b77963
Addressing review comments
ankitk-me Dec 21, 2023
49f2963
Avro unit test fix
ankitk-me Dec 21, 2023
ee1e231
return -1 and ignore prefix.sizeof() in case of validation failure
ankitk-me Dec 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
adding test coverage
  • Loading branch information
ankitk-me committed Nov 17, 2023
commit 767d0080b4a4f14cb876815e609cacf0b1fc3748
2 changes: 1 addition & 1 deletion incubator/validator-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.87</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.92</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,14 @@ private int validateComplete(
{
byte[] record = deserializeAvroRecord(schema, payloadBytes, progress, length - progress);
valueRO.wrap(record);
next.accept(valueRO, 0, valLength);
valLength = record.length;
}
else if (validate(schema, payloadBytes, progress, length - progress))
{
valueRO.wrap(data);
next.accept(data, index, length);
valLength = length;
}
if (valLength != -1)
{
next.accept(valueRO, 0, valLength);
}
}
return valLength;
}
Expand All @@ -126,7 +123,7 @@ private byte[] deserializeAvroRecord(
try
{
reader = new GenericDatumReader(schema);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's introduce fetchReader(int schemaId) that returns GenericDatumReader so that we don't need to keep creating new instances of GenericDatumReader for the same Schema.

GenericRecord record = (GenericRecord) reader.read(null,
GenericRecord record = reader.read(null,
decoder.binaryDecoder(bytes, offset, length, null));
JsonEncoder jsonEncoder = encoder.jsonEncoder(record.getSchema(), outputStream);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename jsonEncoder to out.

writer = record instanceof SpecificRecord ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.aklivity.zilla.runtime.engine.config.CatalogConfig;
import io.aklivity.zilla.runtime.engine.test.internal.catalog.TestCatalog;
import io.aklivity.zilla.runtime.engine.test.internal.catalog.config.TestCatalogOptionsConfig;
import io.aklivity.zilla.runtime.engine.validator.function.FragmentConsumer;
import io.aklivity.zilla.runtime.engine.validator.function.ValueConsumer;
import io.aklivity.zilla.runtime.validator.avro.config.AvroValidatorConfig;

Expand Down Expand Up @@ -214,4 +215,45 @@ public void shouldWriteJsonEventExpectAvro()
int progress = validator.validate(data, 0, data.capacity(), ValueConsumer.NOP);
assertEquals(expected.capacity(), progress);
}

@Test
public void shouldWriteValidFragmentAvroEvent()
{
CatalogConfig catalogConfig = new CatalogConfig("test0", "test", new TestCatalogOptionsConfig(SCHEMA));
LongFunction<CatalogHandler> handler = value -> context.attach(catalogConfig);
AvroWriteValidator validator = new AvroWriteValidator(avroConfig, handler);

DirectBuffer data = new UnsafeBuffer();

byte[] bytes = {0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f,
0x73, 0x69, 0x74, 0x69, 0x76, 0x65};
data.wrap(bytes, 0, bytes.length);

byte[] expectedBytes = {0x00, 0x00, 0x00, 0x00, 0x01, 0x06, 0x69, 0x64,
0x30, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65};
DirectBuffer expected = new UnsafeBuffer();
expected.wrap(expectedBytes);

assertEquals(0, validator.validate(0x00, data, 0, data.capacity(), FragmentConsumer.NOP));

assertEquals(expected.capacity(), validator.validate(0x01, data, 0, data.capacity(), FragmentConsumer.NOP));
}

@Test
public void shouldVerifyValidFragmentAvroEvent()
{
CatalogConfig catalogConfig = new CatalogConfig("test0", "test", new TestCatalogOptionsConfig(SCHEMA));
LongFunction<CatalogHandler> handler = value -> context.attach(catalogConfig);
AvroReadValidator validator = new AvroReadValidator(avroConfig, handler);

DirectBuffer data = new UnsafeBuffer();

byte[] bytes = {0x00, 0x00, 0x00, 0x00, 0x09, 0x06, 0x69, 0x64,
0x30, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65};
data.wrap(bytes, 0, bytes.length);

assertEquals(0, validator.validate(0x00, data, 0, data.capacity(), FragmentConsumer.NOP));

assertEquals(data.capacity(), validator.validate(0x01, data, 0, data.capacity(), FragmentConsumer.NOP));
}
}
4 changes: 2 additions & 2 deletions incubator/validator-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.70</jacoco.coverage.ratio>
<jacoco.missed.count>1</jacoco.missed.count>
<jacoco.coverage.ratio>0.82</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@

import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.ValidatorConfig;
import io.aklivity.zilla.runtime.engine.validator.FragmentValidator;
import io.aklivity.zilla.runtime.engine.validator.ValueValidator;
import io.aklivity.zilla.runtime.validator.core.config.IntegerValidatorConfig;

public class IntegerValidatorFactoryTest
{
@Test
@SuppressWarnings("unchecked")
public void shouldCreateReadValidator()
public void shouldCreateValueReader()
{
// GIVEN
ValidatorConfig validator = new IntegerValidatorConfig();
Expand All @@ -47,7 +48,7 @@ public void shouldCreateReadValidator()

Comment thread
ankitk-me marked this conversation as resolved.
@Test
@SuppressWarnings("unchecked")
public void shouldCreateWriteValidator()
public void shouldCreateValueWriter()
{
// GIVEN
ValidatorConfig validator = new IntegerValidatorConfig();
Expand All @@ -60,4 +61,36 @@ public void shouldCreateWriteValidator()
// THEN
assertThat(writer, instanceOf(IntegerValidator.class));
}

@Test
@SuppressWarnings("unchecked")
public void shouldCreateFragmentReader()
{
// GIVEN
ValidatorConfig validator = new IntegerValidatorConfig();
LongFunction<CatalogHandler> supplyCatalog = mock(LongFunction.class);
IntegerValidatorFactory factory = new IntegerValidatorFactory();

// WHEN
FragmentValidator reader = factory.createFragmentReader(validator, supplyCatalog);

// THEN
assertThat(reader, instanceOf(IntegerValidator.class));
}

@Test
@SuppressWarnings("unchecked")
public void shouldCreateFragmentWriter()
{
// GIVEN
ValidatorConfig validator = new IntegerValidatorConfig();
LongFunction<CatalogHandler> supplyCatalog = mock(LongFunction.class);
IntegerValidatorFactory factory = new IntegerValidatorFactory();

// WHEN
FragmentValidator writer = factory.createFragmentWriter(validator, supplyCatalog);

// THEN
assertThat(writer, instanceOf(IntegerValidator.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Test;

import io.aklivity.zilla.runtime.engine.validator.function.FragmentConsumer;
import io.aklivity.zilla.runtime.engine.validator.function.ValueConsumer;
import io.aklivity.zilla.runtime.validator.core.config.IntegerValidatorConfig;

Expand Down Expand Up @@ -47,4 +48,17 @@ public void shouldVerifyInvalidInteger()
data.wrap(bytes, 0, bytes.length);
assertEquals(-1, validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
public void shouldVerifyValidFragmentInteger()
{
DirectBuffer data = new UnsafeBuffer();

byte[] bytes = {0, 0, 0, 42};
data.wrap(bytes, 0, bytes.length);

assertEquals(0, validator.validate(0x00, data, 0, data.capacity(), FragmentConsumer.NOP));

assertEquals(data.capacity(), validator.validate(0x01, data, 0, data.capacity(), FragmentConsumer.NOP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@

import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.ValidatorConfig;
import io.aklivity.zilla.runtime.engine.validator.FragmentValidator;
import io.aklivity.zilla.runtime.engine.validator.ValueValidator;
import io.aklivity.zilla.runtime.validator.core.config.LongValidatorConfig;

public class LongValidatorFactoryTest
{
@Test
@SuppressWarnings("unchecked")
public void shouldCreateReadValidator()
public void shouldCreateValueReader()
{
// GIVEN
ValidatorConfig validator = new LongValidatorConfig();
Expand All @@ -47,7 +48,7 @@ public void shouldCreateReadValidator()

Comment thread
ankitk-me marked this conversation as resolved.
Outdated
@Test
@SuppressWarnings("unchecked")
public void shouldCreateWriteValidator()
public void shouldCreateValueWriter()
{
// GIVEN
ValidatorConfig validator = new LongValidatorConfig();
Expand All @@ -60,4 +61,36 @@ public void shouldCreateWriteValidator()
// THEN
assertThat(writer, instanceOf(LongValidator.class));
}

@Test
@SuppressWarnings("unchecked")
public void shouldCreateFragmentReader()
{
// GIVEN
ValidatorConfig validator = new LongValidatorConfig();
LongFunction<CatalogHandler> supplyCatalog = mock(LongFunction.class);
LongValidatorFactory factory = new LongValidatorFactory();

// WHEN
FragmentValidator reader = factory.createFragmentReader(validator, supplyCatalog);

// THEN
assertThat(reader, instanceOf(LongValidator.class));
}

@Test
@SuppressWarnings("unchecked")
public void shouldCreateFragmentWriter()
{
// GIVEN
ValidatorConfig validator = new LongValidatorConfig();
LongFunction<CatalogHandler> supplyCatalog = mock(LongFunction.class);
LongValidatorFactory factory = new LongValidatorFactory();

// WHEN
FragmentValidator writer = factory.createFragmentWriter(validator, supplyCatalog);

// THEN
assertThat(writer, instanceOf(LongValidator.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Test;

import io.aklivity.zilla.runtime.engine.validator.function.FragmentConsumer;
import io.aklivity.zilla.runtime.engine.validator.function.ValueConsumer;
import io.aklivity.zilla.runtime.validator.core.config.LongValidatorConfig;

Expand Down Expand Up @@ -47,4 +48,17 @@ public void shouldVerifyInvalidLong()
data.wrap(bytes, 0, bytes.length);
assertEquals(-1, validator.validate(data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
public void shouldVerifyValidFragmentLong()
{
DirectBuffer data = new UnsafeBuffer();

byte[] bytes = {0, 0, 0, 0, 0, 0, 0, 42};
data.wrap(bytes, 0, bytes.length);

assertEquals(0, validator.validate(0x00, data, 0, data.capacity(), FragmentConsumer.NOP));

assertEquals(data.capacity(), validator.validate(0x01, data, 0, data.capacity(), FragmentConsumer.NOP));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.validator.core;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.nio.charset.StandardCharsets;

import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Test;

public class StringEncodingTest
{
@Test
public void shouldVerifyValidUTF8()
{
DirectBuffer data = new UnsafeBuffer();

byte[] bytes = "Valid String".getBytes();
data.wrap(bytes, 0, bytes.length);

assertTrue(StringEncoding.UTF_8.validate(data, 0, bytes.length));
}

@Test
public void shouldVerifyValidUTF16()
{
DirectBuffer data = new UnsafeBuffer();

byte[] bytes = "Valid String".getBytes(StandardCharsets.UTF_8);
data.wrap(bytes, 0, bytes.length);

assertTrue(StringEncoding.UTF_8.validate(data, 0, bytes.length));
}

@Test
public void shouldVerifyStringEncodingOf()
{
assertEquals(StringEncoding.UTF_8, StringEncoding.of("utf_8"));
assertEquals(StringEncoding.UTF_16, StringEncoding.of("utf_16"));
assertEquals(StringEncoding.INVALID, StringEncoding.of("invalid_encoding"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,22 @@ public void shouldVerifyIncompleteMessage()
data.wrap(bytes, 0, bytes.length);
assertEquals(0, validator.validate(FLAGS_INIT, data, 0, data.capacity(), FragmentConsumer.NOP));
}

@Test
public void shouldVerifyValidFragmentUTF8()
{
StringValidatorConfig config = StringValidatorConfig.builder()
.encoding("utf_8")
.build();
StringValidator validator = new StringValidator(config);

DirectBuffer data = new UnsafeBuffer();

byte[] bytes = "Valid String".getBytes();
data.wrap(bytes, 0, bytes.length);

assertEquals(0, validator.validate(0x00, data, 0, data.capacity(), FragmentConsumer.NOP));

assertEquals(data.capacity(), validator.validate(0x01, data, 0, data.capacity(), FragmentConsumer.NOP));
}
}
2 changes: 1 addition & 1 deletion incubator/validator-json/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.82</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.88</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.leadpony.justify.api.JsonSchema;

import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
Expand All @@ -30,8 +29,6 @@

public class JsonReadValidator extends JsonValidator implements ValueValidator, FragmentValidator
{
private final DirectBuffer valueRO = new UnsafeBuffer();

public JsonReadValidator(
JsonValidatorConfig config,
LongFunction<CatalogHandler> supplyCatalog)
Expand Down Expand Up @@ -86,9 +83,8 @@ private int validateComplete(
JsonSchema schema = fetchSchema(schemaId);
if (schema != null && validate(schema, data, progress, length - progress))
{
valueRO.wrap(data);
next.accept(data, index, length);
valLength = length;
next.accept(valueRO, 0, length);
}
return valLength;
}
Expand Down
Loading