-
Notifications
You must be signed in to change notification settings - Fork 70
Validator Interface Update & Converter Changes #533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
jfallows
merged 53 commits into
aklivity:feature/schema-registry
from
ankitk-me:convertor
Dec 22, 2023
Merged
Changes from 23 commits
Commits
Show all changes
53 commits
Select commit
Hold shift + click to select a range
41a8899
Schema Config Update
ankitk-me f26457c
converter implementation
ankitk-me fd69f8b
Merge branch 'aklivity:develop' into convertor
ankitk-me fdb9d98
test coverage update
ankitk-me dcecf18
schema update
ankitk-me b12c609
Interface update to support Streaming validation
ankitk-me 3621172
refactoring
ankitk-me 8435d0e
bug fix
ankitk-me 71451a7
IT fix
ankitk-me 2a882d4
Feature/m1 docker build support (#376)
vordimous c177894
0 for no mqtt session expiry should be mapped to max integer value fo…
akrambek add6eb3
Bump alpine in /cloud/docker-image/src/main/docker/release (#484)
dependabot[bot] 366daff
Better handle request with same group id (#498)
akrambek e362fe9
Prepare release 0.9.55
jfallows 66680f9
Update CHANGELOG.md
jfallows 6829614
Fix flow control bug in mqtt-kakfa publish (#524)
bmaidics bb9fb56
Add extraEnv to Deployment in the helm chart (#511)
attilakreiner a1235ce
Sporadic github action build failure fix (#522)
akrambek 6eb8a43
Merge branch 'feature/schema-registry' into convertor
ankitk-me f7bf292
pom fix
ankitk-me fd59bbb
updating Varint32FW initialisation
ankitk-me 4c264d7
Fragment Validator Interface & Schema Update
ankitk-me 1e1e73d
String & Test Fragment Validator implementation
ankitk-me 65f12f0
Addressing review feedback
ankitk-me 705f7d4
Addressing review comments
ankitk-me 02b3b7a
avro validator.yaml update
ankitk-me 6191c15
Schema patch issue fix
ankitk-me 7d6418a
dynamic value length allocation issue
ankitk-me 9c533a5
addressing review comments
ankitk-me 364acb4
Use parametric types to avoid both cast and warning about raw types
jfallows 31e70f4
Use converted file to handle variable size converted value during pro…
jfallows 69dfe0b
Prefix is byte followed by big endian int32, requires literal byte se…
jfallows 767d008
adding test coverage
ankitk-me 0b1e590
optimising object allocation Avro validator
ankitk-me ce59fb0
checkstyle fix
ankitk-me 2d6a9e3
IT & implementation to support fetch message without Schema ID prefix
ankitk-me 664cece
addressing review feedback
ankitk-me b4b02c8
fetch message without Schema ID prefix implementation
ankitk-me f4e90e9
Avro & Json Read Validator fix
ankitk-me 8075684
fix checkstyle
ankitk-me 4dec79f
ITs for convertor & updating Test Validator
ankitk-me 11d3747
dynamic message size after conversion implementation
ankitk-me 7281c2f
Merge branch 'feature/schema-registry' into convertor
ankitk-me 08cc929
updating latest changes with Value & Fragment Validator interface.
ankitk-me 3cf0185
Converter bug fix
ankitk-me a8f9c08
Addressing review feedback
ankitk-me 1d82d23
addressing review comments
ankitk-me 8fb1016
using ExpandableDirectByteBuffer with valid index & length
ankitk-me 9f3e434
review feedback & adding functional interface to CatalogHandler
ankitk-me c48b2d7
encoded bug fix: position reset to 0
ankitk-me 9b77963
Addressing review comments
ankitk-me 49f2963
Avro unit test fix
ankitk-me ee1e231
return -1 and ignore prefix.sizeof() in case of validation failure
ankitk-me File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
50 changes: 50 additions & 0 deletions
50
...r-avro.spec/src/main/scripts/io/aklivity/zilla/specs/validator/avro/config/validator.yaml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| # | ||
| # Copyright 2021-2023 Aklivity Inc | ||
| # | ||
| # Licensed under the Aklivity Community License (the "License"); you may not use | ||
| # this file except in compliance with the License. You may obtain a copy of the | ||
| # License at | ||
| # | ||
| # https://www.aklivity.io/aklivity-community-license/ | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| # WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations under the License. | ||
| # | ||
|
|
||
| --- | ||
| name: test | ||
| catalogs: | ||
| test0: | ||
| type: test | ||
| options: | ||
| schema: | | ||
| { | ||
| "fields": [ | ||
| { | ||
| "name": "id", | ||
| "type": "string" | ||
| }, | ||
| { | ||
| "name": "status", | ||
| "type": "string" | ||
| } | ||
| ], | ||
| "name": "Event", | ||
| "namespace": "io.aklivity.example", | ||
| "type": "record" | ||
| } | ||
| bindings: | ||
| test: | ||
| kind: server | ||
| type: test | ||
| options: | ||
| value: | ||
| type: avro | ||
| expect: json | ||
| catalog: | ||
| catalog0: | ||
| - subject: test0 | ||
| version: latest | ||
| exit: test |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
44 changes: 44 additions & 0 deletions
44
...tor-avro.spec/src/test/java/io/aklivity/zilla/specs/validator/avro/config/SchemaTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Copyright 2021-2023 Aklivity Inc | ||
| * | ||
| * Licensed under the Aklivity Community License (the "License"); you may not use | ||
| * this file except in compliance with the License. You may obtain a copy of the | ||
| * License at | ||
| * | ||
| * https://www.aklivity.io/aklivity-community-license/ | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| * WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations under the License. | ||
| */ | ||
| package io.aklivity.zilla.specs.validator.avro.config; | ||
|
|
||
| import static org.hamcrest.MatcherAssert.assertThat; | ||
| import static org.hamcrest.Matchers.not; | ||
| import static org.hamcrest.Matchers.nullValue; | ||
|
|
||
| import jakarta.json.JsonObject; | ||
|
|
||
| import org.junit.Rule; | ||
| import org.junit.Test; | ||
|
|
||
| import io.aklivity.zilla.specs.engine.config.ConfigSchemaRule; | ||
|
|
||
| public class SchemaTest | ||
| { | ||
| @Rule | ||
| public final ConfigSchemaRule schema = new ConfigSchemaRule() | ||
| .schemaPatch("io/aklivity/zilla/specs/engine/schema/binding/test.schema.patch.json") | ||
| .schemaPatch("io/aklivity/zilla/specs/engine/schema/catalog/test.schema.patch.json") | ||
| .schemaPatch("io/aklivity/zilla/specs/validator/avro/schema/avro.schema.patch.json") | ||
| .configurationRoot("io/aklivity/zilla/specs/validator/avro/config"); | ||
|
|
||
| @Test | ||
| public void shouldValidateCatalog() | ||
| { | ||
| JsonObject config = schema.validate("validator.yaml"); | ||
|
|
||
| assertThat(config, not(nullValue())); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
132 changes: 132 additions & 0 deletions
132
...r-avro/src/main/java/io/aklivity/zilla/runtime/validator/avro/AvroReadValueValidator.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| /* | ||
| * Copyright 2021-2023 Aklivity Inc | ||
| * | ||
| * Licensed under the Aklivity Community License (the "License"); you may not use | ||
| * this file except in compliance with the License. You may obtain a copy of the | ||
| * License at | ||
| * | ||
| * https://www.aklivity.io/aklivity-community-license/ | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
| * WARRANTIES OF ANY KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations under the License. | ||
| */ | ||
| package io.aklivity.zilla.runtime.validator.avro; | ||
|
|
||
| import java.io.ByteArrayOutputStream; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.function.LongFunction; | ||
| import java.util.function.ToLongFunction; | ||
|
|
||
| import org.agrona.DirectBuffer; | ||
| import org.agrona.MutableDirectBuffer; | ||
| import org.agrona.concurrent.UnsafeBuffer; | ||
| import org.apache.avro.Schema; | ||
| import org.apache.avro.generic.GenericDatumReader; | ||
| import org.apache.avro.generic.GenericDatumWriter; | ||
| import org.apache.avro.generic.GenericRecord; | ||
| import org.apache.avro.io.JsonEncoder; | ||
| import org.apache.avro.specific.SpecificDatumWriter; | ||
| import org.apache.avro.specific.SpecificRecord; | ||
|
|
||
| import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; | ||
| import io.aklivity.zilla.runtime.engine.validator.function.ValueConsumer; | ||
| import io.aklivity.zilla.runtime.validator.avro.config.AvroValidatorConfig; | ||
|
|
||
| public class AvroReadValueValidator extends AvroValueValidator | ||
| { | ||
| public AvroReadValueValidator( | ||
| AvroValidatorConfig config, | ||
| ToLongFunction<String> resolveId, | ||
| LongFunction<CatalogHandler> supplyCatalog) | ||
| { | ||
| super(config, resolveId, supplyCatalog); | ||
| } | ||
|
|
||
| @Override | ||
| public int validate( | ||
| DirectBuffer data, | ||
| int index, | ||
| int length, | ||
| ValueConsumer next) | ||
| { | ||
| MutableDirectBuffer value = new UnsafeBuffer(); | ||
| int valLength = -1; | ||
|
|
||
| byte[] payloadBytes = new byte[length]; | ||
| data.getBytes(0, payloadBytes); | ||
| ByteBuffer byteBuf = ByteBuffer.wrap(payloadBytes); | ||
|
|
||
| int schemaId; | ||
| Schema schema; | ||
| if (byteBuf.get() == MAGIC_BYTE) | ||
| { | ||
| schemaId = byteBuf.getInt(); | ||
| int size = length - 1 - 4; | ||
| byte[] valBytes = new byte[size]; | ||
| data.getBytes(length - size, valBytes); | ||
| schema = fetchSchema(schemaId); | ||
| if (schema != null) | ||
| { | ||
| if ("json".equals(expect)) | ||
| { | ||
| byte[] record = serializeAvroRecord(schema, valBytes); | ||
| value.wrap(record); | ||
| valLength = record.length; | ||
| } | ||
| else if (validate(schema, valBytes)) | ||
| { | ||
| value.wrap(data); | ||
| valLength = length; | ||
| } | ||
| } | ||
| } | ||
| else | ||
| { | ||
| schemaId = catalog != null ? catalog.id : 0; | ||
| schema = fetchSchema(schemaId); | ||
| if (schema != null) | ||
| { | ||
| if ("json".equals(expect)) | ||
| { | ||
| byte[] record = serializeAvroRecord(schema, payloadBytes); | ||
| value.wrap(record); | ||
| valLength = record.length; | ||
| } | ||
| else if (validate(schema, payloadBytes)) | ||
| { | ||
| value.wrap(data); | ||
| valLength = length; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| next.accept(value, index, valLength); | ||
| return valLength; | ||
| } | ||
|
|
||
| private byte[] serializeAvroRecord( | ||
| Schema schema, | ||
| byte[] payloadBytes) | ||
| { | ||
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); | ||
| try | ||
| { | ||
| reader = new GenericDatumReader(schema); | ||
| GenericRecord record = (GenericRecord) reader.read(null, | ||
| decoder.binaryDecoder(payloadBytes, null)); | ||
| JsonEncoder jsonEncoder = encoder.jsonEncoder(record.getSchema(), outputStream); | ||
| writer = record instanceof SpecificRecord ? | ||
| new SpecificDatumWriter<>(record.getSchema()) : | ||
| new GenericDatumWriter<>(record.getSchema()); | ||
| writer.write(record, jsonEncoder); | ||
| jsonEncoder.flush(); | ||
| outputStream.close(); | ||
| } | ||
| catch (Exception e) | ||
| { | ||
| } | ||
| return outputStream.toByteArray(); | ||
| } | ||
|
ankitk-me marked this conversation as resolved.
Outdated
|
||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.