Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
135 commits
Select commit Hold shift + click to select a range
2cef977
Adjust padding to accommodate good enough headers and don't include …
akrambek Oct 25, 2023
d201582
Merge branch 'develop' into feature/consumer-group-cont
akrambek Oct 25, 2023
76bf9de
Merge branch 'feature/consumer-group-cont' into develop
akrambek Oct 26, 2023
29ae79c
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
ec1b39e
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
51a9f0e
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
4394783
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
e8696ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
51c37b1
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
5da5f04
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
db1e17c
Merge branch 'aklivity:develop' into develop
akrambek Nov 4, 2023
40f73dc
Merge branch 'aklivity:develop' into develop
akrambek Nov 6, 2023
d1a0492
Merge branch 'aklivity:develop' into develop
akrambek Nov 23, 2023
45799ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 29, 2023
1e55162
Merge branch 'aklivity:develop' into develop
akrambek Nov 30, 2023
fedc41f
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
18a8d74
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
f160aad
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
e0e7d5a
Merge branch 'aklivity:develop' into develop
akrambek Dec 6, 2023
9f4a8a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
456f111
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
0d27262
Merge branch 'aklivity:develop' into develop
akrambek Dec 9, 2023
9fe7a91
Merge branch 'aklivity:develop' into develop
akrambek Dec 11, 2023
7e3d237
Merge branch 'aklivity:develop' into develop
akrambek Dec 12, 2023
33c4411
Merge branch 'aklivity:develop' into develop
akrambek Dec 13, 2023
fe9e318
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
d8b5e5c
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
ebca7ef
Merge branch 'aklivity:develop' into develop
akrambek Dec 18, 2023
5e3e059
Merge branch 'aklivity:develop' into develop
akrambek Dec 22, 2023
ee71db9
Merge branch 'aklivity:develop' into develop
akrambek Dec 24, 2023
0b7a15a
Merge branch 'aklivity:develop' into develop
akrambek Dec 25, 2023
be13489
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
95df84c
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
3ebdbf5
Merge branch 'aklivity:develop' into develop
akrambek Dec 28, 2023
24ad9e1
Merge branch 'aklivity:develop' into develop
akrambek Dec 30, 2023
6d21fec
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
368a0a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
7069f1a
Merge branch 'aklivity:develop' into develop
akrambek Jan 2, 2024
09b7041
Merge branch 'aklivity:develop' into develop
akrambek Jan 3, 2024
98f1faa
Merge branch 'aklivity:develop' into develop
akrambek Jan 4, 2024
371391a
Merge branch 'aklivity:develop' into develop
akrambek Jan 5, 2024
c6a0882
Merge branch 'aklivity:develop' into develop
akrambek Jan 8, 2024
f99f009
Merge branch 'aklivity:develop' into develop
akrambek Jan 9, 2024
a110b68
Merge branch 'aklivity:develop' into develop
akrambek Jan 11, 2024
80c4625
Merge branch 'aklivity:develop' into develop
akrambek Jan 16, 2024
6617e20
Merge branch 'aklivity:develop' into develop
akrambek Jan 19, 2024
dea9f53
Merge branch 'aklivity:develop' into develop
akrambek Jan 20, 2024
b74db57
Merge branch 'aklivity:develop' into develop
akrambek Jan 23, 2024
4617b54
Merge branch 'aklivity:develop' into develop
akrambek Jan 30, 2024
b3b421d
Merge branch 'aklivity:develop' into develop
akrambek Jan 31, 2024
73d64b1
Merge branch 'aklivity:develop' into develop
akrambek Feb 1, 2024
7bb546e
Merge branch 'aklivity:develop' into develop
akrambek Feb 2, 2024
b1c7901
Merge branch 'aklivity:develop' into develop
akrambek Feb 8, 2024
949df2f
Merge branch 'aklivity:develop' into develop
akrambek Feb 13, 2024
ca946b8
Merge branch 'aklivity:develop' into develop
akrambek Feb 14, 2024
f9dcd75
Merge branch 'aklivity:develop' into develop
akrambek Feb 21, 2024
e1e5e75
Merge branch 'aklivity:develop' into develop
akrambek Feb 22, 2024
5f50549
Merge branch 'aklivity:develop' into develop
akrambek Feb 23, 2024
32725be
Merge branch 'aklivity:develop' into develop
akrambek Feb 28, 2024
83b145a
Merge branch 'aklivity:develop' into develop
akrambek Feb 28, 2024
75e7709
Merge branch 'aklivity:develop' into develop
akrambek Feb 29, 2024
d4c3117
Merge branch 'aklivity:develop' into develop
akrambek Feb 29, 2024
a438cfe
Merge branch 'aklivity:develop' into develop
akrambek Mar 1, 2024
36f8f1e
Merge branch 'aklivity:develop' into develop
akrambek Mar 4, 2024
cf8a5c7
Merge branch 'aklivity:develop' into develop
akrambek Mar 10, 2024
7b46270
Merge branch 'aklivity:develop' into develop
akrambek Mar 12, 2024
6461ebf
Merge branch 'aklivity:develop' into develop
akrambek Mar 13, 2024
a97cfb3
Merge branch 'aklivity:develop' into develop
akrambek Mar 14, 2024
fc97dc2
Merge branch 'aklivity:develop' into develop
akrambek Mar 16, 2024
a5edb3f
Merge branch 'aklivity:develop' into develop
akrambek Mar 17, 2024
7a79fd6
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
82c24c3
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
1f3f305
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
3a12b8b
Merge branch 'aklivity:develop' into develop
akrambek Mar 25, 2024
7684d2a
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
d40e2cd
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
fcd6cd2
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
141a871
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
782b711
Merge branch 'aklivity:develop' into develop
akrambek Apr 10, 2024
0c93c5f
Merge branch 'aklivity:develop' into develop
akrambek Apr 10, 2024
70f2543
Merge branch 'aklivity:develop' into develop
akrambek Apr 11, 2024
c451d70
Merge branch 'aklivity:develop' into develop
akrambek Apr 12, 2024
45f6a83
Merge branch 'aklivity:develop' into develop
akrambek Apr 12, 2024
32ba7e9
Merge branch 'aklivity:develop' into develop
akrambek Apr 16, 2024
95ff994
Merge branch 'aklivity:develop' into develop
akrambek Apr 22, 2024
49787e7
Merge branch 'aklivity:develop' into develop
akrambek May 1, 2024
bb3b121
Merge branch 'aklivity:develop' into develop
akrambek May 1, 2024
3d6cf14
Merge branch 'aklivity:develop' into develop
akrambek May 1, 2024
9bc44b5
Merge branch 'aklivity:develop' into develop
akrambek May 2, 2024
4e82b78
Merge branch 'aklivity:develop' into develop
akrambek May 8, 2024
dd0ae1b
Merge branch 'aklivity:develop' into develop
akrambek May 10, 2024
688c928
Merge branch 'aklivity:develop' into develop
akrambek May 15, 2024
027a6dc
Merge branch 'aklivity:develop' into develop
akrambek May 16, 2024
7fb8aea
Merge branch 'aklivity:develop' into develop
akrambek May 20, 2024
57b40a8
filtering by structured value field(s)
ankitk-me Jun 10, 2024
e340fb1
model-json extract support
ankitk-me Jun 11, 2024
0ccc615
Test Converter update to support extracted header for ITs
ankitk-me Jun 12, 2024
fa1916e
incorrect index fix
ankitk-me Jun 12, 2024
6b3d555
incorrect index fix for number
ankitk-me Jun 12, 2024
0d15ed9
Merge remote-tracking branch 'origin' into payloadFilter
ankitk-me Jun 12, 2024
bff1a15
Merge branch 'aklivity:develop' into develop
akrambek Jun 12, 2024
11f8b1e
Merge branch 'aklivity:develop' into develop
akrambek Jun 12, 2024
94c5656
Merge remote-tracking branch 'origin' into payloadFilter
ankitk-me Jun 12, 2024
e66f27c
addressing review comments
ankitk-me Jun 14, 2024
d421999
update to reuse matcher object
ankitk-me Jun 14, 2024
6832657
Merge branch 'aklivity:develop' into develop
akrambek Jun 19, 2024
6ffc602
support for Avro model and header format change
ankitk-me Jun 19, 2024
864ea63
support for Protobuf model
ankitk-me Jun 21, 2024
0caa9f5
WIP
akrambek Jun 24, 2024
f35a45a
Merge branch 'aklivity:develop' into develop
akrambek Jun 24, 2024
ff652e5
Merge branch 'develop' into feature/asyncapi-topics
akrambek Jun 24, 2024
2ebdc1c
WIP
akrambek Jun 27, 2024
7de1be5
Support other format data types in catalog
akrambek Jun 28, 2024
8fa527d
Remove comment
akrambek Jun 28, 2024
86ea213
Support kafka key validation
akrambek Jun 28, 2024
1f13ca9
WIP
akrambek Jun 28, 2024
ec2f8a9
Merge branch 'aklivity:develop' into develop
akrambek Jun 29, 2024
b8fd378
Merge branch 'develop' into feature/key-validation
akrambek Jun 29, 2024
bc7af09
WIP
akrambek Jun 29, 2024
769fa52
WIP
akrambek Jun 29, 2024
8f874d1
Merge branch 'aklivity:develop' into develop
akrambek Jun 29, 2024
8a84564
Merge branch 'develop' into feature/key-validation
akrambek Jun 29, 2024
2141e7e
Merge branch 'feature/key-validation' into feature/asyncapi-schema-re…
akrambek Jun 29, 2024
eef2dd6
Fix conflict
akrambek Jun 29, 2024
e866c11
WIP
akrambek Jun 30, 2024
21e57ff
WIP
akrambek Jun 30, 2024
fe30d17
WIP
akrambek Jun 30, 2024
85678f4
WIP
akrambek Jun 30, 2024
5cbdb14
WIP
akrambek Jul 1, 2024
7f2842c
Merge branch 'aklivity:develop' into develop
akrambek Jul 1, 2024
f73451d
Merge branch 'develop' into feature/asyncapi-schema-registry
akrambek Jul 1, 2024
88bc237
Apply feedback from PR
akrambek Jul 2, 2024
67ee5f0
Check if the binding exists
akrambek Jul 4, 2024
d9c8edb
Merge branch 'aklivity:develop' into develop
akrambek Jul 4, 2024
e2174c8
Merge branch 'develop' into feature/asyncapi-schema-registry
akrambek Jul 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
support for Protobuf model
  • Loading branch information
ankitk-me committed Jun 21, 2024
commit 864ea635f7f76389f40255ef3c7d7c2167d2f5e9
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.model.avro.internal;

import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.model.avro.internal.types.OctetsFW;

public class AvroField
{
public final OctetsFW value;
public final MutableDirectBuffer buffer;

public AvroField()
{
this.value = new OctetsFW();
this.buffer = new UnsafeBuffer(new byte[24]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectCache;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.io.DirectBufferInputStream;
import org.agrona.io.ExpandableDirectBufferOutputStream;
import org.apache.avro.AvroRuntimeException;
Expand Down Expand Up @@ -65,7 +64,6 @@ public abstract class AvroModelHandler
private static final int COMMA_LENGTH = ",".length();
private static final int JSON_FIELD_ARRAY_LENGTH = "\"\":[]," .length() + COMMA_LENGTH * 100;
private static final int JSON_FIELD_MAP_LENGTH = "\"\":{},".length();
private static final DirectBuffer EMPTY_BUFFER = new UnsafeBuffer();

protected final SchemaConfig catalog;
protected final CatalogHandler handler;
Expand All @@ -78,7 +76,7 @@ public abstract class AvroModelHandler
protected final ExpandableDirectBufferOutputStream expandable;
protected final DirectBufferInputStream in;
protected final AvroModelEventContext event;
protected final Map<String, OctetsFW> extracted;
protected final Map<String, AvroField> extracted;

private final Int2ObjectCache<Schema> schemas;
private final Int2ObjectCache<GenericDatumReader<GenericRecord>> readers;
Expand All @@ -90,9 +88,8 @@ public abstract class AvroModelHandler
private final AvroLongFW longRO;
private final AvroFloatFW floatRO;
private final AvroDoubleFW doubleRO;
private final MutableDirectBuffer text;

private int progress;
protected int progress;

protected AvroModelHandler(
AvroModelConfig config,
Expand Down Expand Up @@ -123,7 +120,6 @@ protected AvroModelHandler(
this.longRO = new AvroLongFW();
this.floatRO = new AvroFloatFW();
this.doubleRO = new AvroDoubleFW();
this.text = new UnsafeBuffer(new byte[24]);

}

Expand All @@ -136,10 +132,6 @@ protected final boolean validate(
int length)
{
boolean status = false;
for (OctetsFW value: extracted.values())
{
value.wrap(EMPTY_BUFFER, 0, 0);
}
try
{
GenericRecord record = supplyRecord(schemaId);
Expand All @@ -163,7 +155,7 @@ protected final boolean validate(
return status;
}

private void extractFields(
protected void extractFields(
DirectBuffer buffer,
int length,
Schema schema)
Expand Down Expand Up @@ -304,7 +296,7 @@ private void extract(
Schema schema,
DirectBuffer data,
int limit,
OctetsFW octets)
AvroField field)
{
switch (schema.getType())
{
Expand All @@ -316,19 +308,22 @@ private void extract(
AvroBytesFW bytes = bytesRO.wrap(data, progress, limit);
OctetsFW value = bytes.value();
progress = bytes.limit();
if (octets != null)
if (field != null)
{
OctetsFW octets = field.value;
octets.wrap(value.buffer(), value.offset(), value.limit());
}
break;
case ENUM:
case INT:
AvroIntFW int32 = intRO.wrap(data, progress, limit);
int intValue = int32.value();
progress = int32.limit();
int length = text.putIntAscii(0, intValue);
if (octets != null)
if (field != null)
{
octets.wrap(text, 0, length);
MutableDirectBuffer text = field.buffer;
int length = text.putIntAscii(0, intValue);
field.value.wrap(text, 0, length);
}
break;
case FLOAT:
Expand All @@ -337,20 +332,22 @@ private void extract(
DirectBuffer buffer = avroFloat.value().value();
float floatValue = Float.intBitsToFloat(decodeNumberBytes(len, buffer));
progress = avroFloat.limit();
length = text.putStringWithoutLengthAscii(0, String.valueOf(floatValue));
if (octets != null)
if (field != null)
{
octets.wrap(text, 0, length);
MutableDirectBuffer text = field.buffer;
int length = text.putStringWithoutLengthAscii(0, String.valueOf(floatValue));
field.value.wrap(text, 0, length);
}
break;
case LONG:
AvroLongFW avroLong = longRO.wrap(data, progress, limit);
long longValue = avroLong.value();
progress = avroLong.limit();
length = text.putLongAscii(0, longValue);
if (octets != null)
if (field != null)
{
octets.wrap(text, 0, length);
MutableDirectBuffer text = field.buffer;
int length = text.putLongAscii(0, longValue);
field.value.wrap(text, 0, length);
}
break;
case DOUBLE:
Expand All @@ -365,20 +362,29 @@ private void extract(
double doubleValue = Double.longBitsToDouble((((long) decoded) & 0xffffffffL) |
(((long) decodeNumberBytes(len, buffer)) << 32));
progress = avroDouble.limit();
length = text.putStringWithoutLengthAscii(0, String.valueOf(doubleValue));
if (octets != null)
if (field != null)
{
octets.wrap(text, 0, length);
MutableDirectBuffer text = field.buffer;
int length = text.putStringWithoutLengthAscii(0, String.valueOf(doubleValue));
field.value.wrap(text, 0, length);
}
break;
case BOOLEAN:
AvroBooleanFW avroBoolean = new AvroBooleanFW().wrap(data, progress, limit);
value = avroBoolean.value();
progress = avroBoolean.limit();
if (octets != null)
if (field != null)
{
octets.wrap(value.buffer(), value.offset(), value.limit());
field.value.wrap(value.buffer(), value.offset(), value.limit());
}
break;
case FIXED:
int fixedSize = schema.getFixedSize();
if (field != null)
{
field.value.wrap(data, progress, progress + fixedSize);
}
progress += fixedSize;
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.regex.Pattern;

import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
Expand All @@ -38,6 +39,7 @@ public class AvroReadConverterHandler extends AvroModelHandler implements Conver
{
private static final String PATH = "^\\$\\.([A-Za-z_][A-Za-z0-9_]*)$";
private static final Pattern PATH_PATTERN = Pattern.compile(PATH);
private static final DirectBuffer EMPTY_BUFFER = new UnsafeBuffer();

private final Matcher matcher;

Expand Down Expand Up @@ -82,7 +84,7 @@ public void extract(
{
if (matcher.reset(path).matches())
{
extracted.put(matcher.group(1), new OctetsFW());
extracted.put(matcher.group(1), new AvroField());
}
}

Expand All @@ -95,6 +97,10 @@ public int convert(
int length,
ValueConsumer next)
{
for (AvroField field: extracted.values())
{
field.value.wrap(EMPTY_BUFFER, 0, 0);
}
return handler.decode(traceId, bindingId, data, index, length, next, this::decodePayload);
}

Expand All @@ -105,7 +111,7 @@ public int extractedLength(
OctetsFW value = null;
if (matcher.reset(path).matches())
{
value = extracted.get(matcher.group(1));
value = extracted.get(matcher.group(1)).value;
}
return value != null ? value.sizeof() : 0;
}
Expand All @@ -117,7 +123,7 @@ public void extracted(
{
if (matcher.reset(path).matches())
{
OctetsFW value = extracted.get(matcher.group(1));
OctetsFW value = extracted.get(matcher.group(1)).value;
if (value != null && value.sizeof() != 0)
{
visitor.visit(value.buffer(), value.offset(), value.sizeof());
Expand Down Expand Up @@ -188,6 +194,9 @@ record = reader.read(record, decoderFactory.binaryDecoder(in, decoder));
JsonEncoder out = encoderFactory.jsonEncoder(schema, expandable);
writer.write(record, out);
out.flush();

progress = index;
extractFields(buffer, length, schema);
}
}
catch (IOException | AvroRuntimeException ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,28 @@ public void shouldExtract()
assertEquals(data.capacity(), converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));

assertEquals(8, converter.extractedLength(stringPath));
final ConverterHandler.FieldVisitor visitor = (buffer, index, length) ->
ConverterHandler.FieldVisitor visitor = (buffer, index, length) ->
{
assertEquals("positive", buffer.getStringWithoutLengthUtf8(index, length));
};
converter.extracted(stringPath, visitor);

ConverterHandler.FieldVisitor doubleVisitor = (buffer, index, length) ->
{
assertEquals("1.2", buffer.getStringWithoutLengthUtf8(index, length));
};
converter.extracted(doublePath, doubleVisitor);

ConverterHandler.FieldVisitor intVisitor = (buffer, index, length) ->
{
assertEquals("1", buffer.getStringWithoutLengthUtf8(index, length));
};
converter.extracted(intPath, intVisitor);

ConverterHandler.FieldVisitor floatVisitor = (buffer, index, length) ->
{
assertEquals("2.2", buffer.getStringWithoutLengthUtf8(index, length));
};
converter.extracted(floatPath, floatVisitor);
}
}
2 changes: 1 addition & 1 deletion runtime/model-protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</licenses>

<properties>
<jacoco.coverage.ratio>0.92</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.89</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.model.protobuf.internal;

import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.model.protobuf.internal.types.OctetsFW;

public class ProtobufField
{
public final OctetsFW value;
public final MutableDirectBuffer buffer;

public ProtobufField()
{
this.value = new OctetsFW();
this.buffer = new UnsafeBuffer(new byte[24]);
}
}
Loading