Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
135 commits
Select commit Hold shift + click to select a range
2cef977
Adjust padding to accommodate good enough headers and don't include …
akrambek Oct 25, 2023
d201582
Merge branch 'develop' into feature/consumer-group-cont
akrambek Oct 25, 2023
76bf9de
Merge branch 'feature/consumer-group-cont' into develop
akrambek Oct 26, 2023
29ae79c
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
ec1b39e
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
51a9f0e
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
4394783
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
e8696ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
51c37b1
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
5da5f04
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
db1e17c
Merge branch 'aklivity:develop' into develop
akrambek Nov 4, 2023
40f73dc
Merge branch 'aklivity:develop' into develop
akrambek Nov 6, 2023
d1a0492
Merge branch 'aklivity:develop' into develop
akrambek Nov 23, 2023
45799ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 29, 2023
1e55162
Merge branch 'aklivity:develop' into develop
akrambek Nov 30, 2023
fedc41f
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
18a8d74
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
f160aad
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
e0e7d5a
Merge branch 'aklivity:develop' into develop
akrambek Dec 6, 2023
9f4a8a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
456f111
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
0d27262
Merge branch 'aklivity:develop' into develop
akrambek Dec 9, 2023
9fe7a91
Merge branch 'aklivity:develop' into develop
akrambek Dec 11, 2023
7e3d237
Merge branch 'aklivity:develop' into develop
akrambek Dec 12, 2023
33c4411
Merge branch 'aklivity:develop' into develop
akrambek Dec 13, 2023
fe9e318
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
d8b5e5c
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
ebca7ef
Merge branch 'aklivity:develop' into develop
akrambek Dec 18, 2023
5e3e059
Merge branch 'aklivity:develop' into develop
akrambek Dec 22, 2023
ee71db9
Merge branch 'aklivity:develop' into develop
akrambek Dec 24, 2023
0b7a15a
Merge branch 'aklivity:develop' into develop
akrambek Dec 25, 2023
be13489
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
95df84c
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
3ebdbf5
Merge branch 'aklivity:develop' into develop
akrambek Dec 28, 2023
24ad9e1
Merge branch 'aklivity:develop' into develop
akrambek Dec 30, 2023
6d21fec
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
368a0a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
7069f1a
Merge branch 'aklivity:develop' into develop
akrambek Jan 2, 2024
09b7041
Merge branch 'aklivity:develop' into develop
akrambek Jan 3, 2024
98f1faa
Merge branch 'aklivity:develop' into develop
akrambek Jan 4, 2024
371391a
Merge branch 'aklivity:develop' into develop
akrambek Jan 5, 2024
c6a0882
Merge branch 'aklivity:develop' into develop
akrambek Jan 8, 2024
f99f009
Merge branch 'aklivity:develop' into develop
akrambek Jan 9, 2024
a110b68
Merge branch 'aklivity:develop' into develop
akrambek Jan 11, 2024
80c4625
Merge branch 'aklivity:develop' into develop
akrambek Jan 16, 2024
6617e20
Merge branch 'aklivity:develop' into develop
akrambek Jan 19, 2024
dea9f53
Merge branch 'aklivity:develop' into develop
akrambek Jan 20, 2024
b74db57
Merge branch 'aklivity:develop' into develop
akrambek Jan 23, 2024
4617b54
Merge branch 'aklivity:develop' into develop
akrambek Jan 30, 2024
b3b421d
Merge branch 'aklivity:develop' into develop
akrambek Jan 31, 2024
73d64b1
Merge branch 'aklivity:develop' into develop
akrambek Feb 1, 2024
7bb546e
Merge branch 'aklivity:develop' into develop
akrambek Feb 2, 2024
b1c7901
Merge branch 'aklivity:develop' into develop
akrambek Feb 8, 2024
949df2f
Merge branch 'aklivity:develop' into develop
akrambek Feb 13, 2024
ca946b8
Merge branch 'aklivity:develop' into develop
akrambek Feb 14, 2024
f9dcd75
Merge branch 'aklivity:develop' into develop
akrambek Feb 21, 2024
e1e5e75
Merge branch 'aklivity:develop' into develop
akrambek Feb 22, 2024
5f50549
Merge branch 'aklivity:develop' into develop
akrambek Feb 23, 2024
32725be
Merge branch 'aklivity:develop' into develop
akrambek Feb 28, 2024
83b145a
Merge branch 'aklivity:develop' into develop
akrambek Feb 28, 2024
75e7709
Merge branch 'aklivity:develop' into develop
akrambek Feb 29, 2024
d4c3117
Merge branch 'aklivity:develop' into develop
akrambek Feb 29, 2024
a438cfe
Merge branch 'aklivity:develop' into develop
akrambek Mar 1, 2024
36f8f1e
Merge branch 'aklivity:develop' into develop
akrambek Mar 4, 2024
cf8a5c7
Merge branch 'aklivity:develop' into develop
akrambek Mar 10, 2024
7b46270
Merge branch 'aklivity:develop' into develop
akrambek Mar 12, 2024
6461ebf
Merge branch 'aklivity:develop' into develop
akrambek Mar 13, 2024
a97cfb3
Merge branch 'aklivity:develop' into develop
akrambek Mar 14, 2024
fc97dc2
Merge branch 'aklivity:develop' into develop
akrambek Mar 16, 2024
a5edb3f
Merge branch 'aklivity:develop' into develop
akrambek Mar 17, 2024
7a79fd6
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
82c24c3
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
1f3f305
Merge branch 'aklivity:develop' into develop
akrambek Mar 19, 2024
3a12b8b
Merge branch 'aklivity:develop' into develop
akrambek Mar 25, 2024
7684d2a
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
d40e2cd
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
fcd6cd2
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
141a871
Merge branch 'aklivity:develop' into develop
akrambek Apr 9, 2024
782b711
Merge branch 'aklivity:develop' into develop
akrambek Apr 10, 2024
0c93c5f
Merge branch 'aklivity:develop' into develop
akrambek Apr 10, 2024
70f2543
Merge branch 'aklivity:develop' into develop
akrambek Apr 11, 2024
c451d70
Merge branch 'aklivity:develop' into develop
akrambek Apr 12, 2024
45f6a83
Merge branch 'aklivity:develop' into develop
akrambek Apr 12, 2024
32ba7e9
Merge branch 'aklivity:develop' into develop
akrambek Apr 16, 2024
95ff994
Merge branch 'aklivity:develop' into develop
akrambek Apr 22, 2024
49787e7
Merge branch 'aklivity:develop' into develop
akrambek May 1, 2024
bb3b121
Merge branch 'aklivity:develop' into develop
akrambek May 1, 2024
3d6cf14
Merge branch 'aklivity:develop' into develop
akrambek May 1, 2024
9bc44b5
Merge branch 'aklivity:develop' into develop
akrambek May 2, 2024
4e82b78
Merge branch 'aklivity:develop' into develop
akrambek May 8, 2024
dd0ae1b
Merge branch 'aklivity:develop' into develop
akrambek May 10, 2024
688c928
Merge branch 'aklivity:develop' into develop
akrambek May 15, 2024
027a6dc
Merge branch 'aklivity:develop' into develop
akrambek May 16, 2024
7fb8aea
Merge branch 'aklivity:develop' into develop
akrambek May 20, 2024
57b40a8
filtering by structured value field(s)
ankitk-me Jun 10, 2024
e340fb1
model-json extract support
ankitk-me Jun 11, 2024
0ccc615
Test Converter update to support extracted header for ITs
ankitk-me Jun 12, 2024
fa1916e
incorrect index fix
ankitk-me Jun 12, 2024
6b3d555
incorrect index fix for number
ankitk-me Jun 12, 2024
0d15ed9
Merge remote-tracking branch 'origin' into payloadFilter
ankitk-me Jun 12, 2024
bff1a15
Merge branch 'aklivity:develop' into develop
akrambek Jun 12, 2024
11f8b1e
Merge branch 'aklivity:develop' into develop
akrambek Jun 12, 2024
94c5656
Merge remote-tracking branch 'origin' into payloadFilter
ankitk-me Jun 12, 2024
e66f27c
addressing review comments
ankitk-me Jun 14, 2024
d421999
update to reuse matcher object
ankitk-me Jun 14, 2024
6832657
Merge branch 'aklivity:develop' into develop
akrambek Jun 19, 2024
6ffc602
support for Avro model and header format change
ankitk-me Jun 19, 2024
864ea63
support for Protobuf model
ankitk-me Jun 21, 2024
0caa9f5
WIP
akrambek Jun 24, 2024
f35a45a
Merge branch 'aklivity:develop' into develop
akrambek Jun 24, 2024
ff652e5
Merge branch 'develop' into feature/asyncapi-topics
akrambek Jun 24, 2024
2ebdc1c
WIP
akrambek Jun 27, 2024
7de1be5
Support other format data types in catalog
akrambek Jun 28, 2024
8fa527d
Remove comment
akrambek Jun 28, 2024
86ea213
Support kafka key validation
akrambek Jun 28, 2024
1f13ca9
WIP
akrambek Jun 28, 2024
ec2f8a9
Merge branch 'aklivity:develop' into develop
akrambek Jun 29, 2024
b8fd378
Merge branch 'develop' into feature/key-validation
akrambek Jun 29, 2024
bc7af09
WIP
akrambek Jun 29, 2024
769fa52
WIP
akrambek Jun 29, 2024
8f874d1
Merge branch 'aklivity:develop' into develop
akrambek Jun 29, 2024
8a84564
Merge branch 'develop' into feature/key-validation
akrambek Jun 29, 2024
2141e7e
Merge branch 'feature/key-validation' into feature/asyncapi-schema-re…
akrambek Jun 29, 2024
eef2dd6
Fix conflict
akrambek Jun 29, 2024
e866c11
WIP
akrambek Jun 30, 2024
21e57ff
WIP
akrambek Jun 30, 2024
fe30d17
WIP
akrambek Jun 30, 2024
85678f4
WIP
akrambek Jun 30, 2024
5cbdb14
WIP
akrambek Jul 1, 2024
7f2842c
Merge branch 'aklivity:develop' into develop
akrambek Jul 1, 2024
f73451d
Merge branch 'develop' into feature/asyncapi-schema-registry
akrambek Jul 1, 2024
88bc237
Apply feedback from PR
akrambek Jul 2, 2024
67ee5f0
Check if the binding exists
akrambek Jul 4, 2024
d9c8edb
Merge branch 'aklivity:develop' into develop
akrambek Jul 4, 2024
e2174c8
Merge branch 'develop' into feature/asyncapi-schema-registry
akrambek Jul 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
support for Avro model and header format change
  • Loading branch information
ankitk-me committed Jun 19, 2024
commit 6ffc602d070ed341342ee79169982b594b9a5e75
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType;
Expand All @@ -29,6 +31,11 @@

public final class KafkaTopicConfigBuilder<T> extends ConfigBuilder<T, KafkaTopicConfigBuilder<T>>
{
private static final String PATH = "^\\$\\{message\\.value\\.([A-Za-z_][A-Za-z0-9_]*)\\}$";
private static final Pattern PATH_PATTERN = Pattern.compile(PATH);
private static final String INTERNAL_PATH_PATTERN = "$.%s";

private final Matcher matcher;
private final Function<KafkaTopicConfig, T> mapper;
private String name;
private KafkaOffsetType defaultOffset;
Expand All @@ -42,6 +49,7 @@ public final class KafkaTopicConfigBuilder<T> extends ConfigBuilder<T, KafkaTopi
{
this.mapper = mapper;
this.headers = new ArrayList<>();
this.matcher = PATH_PATTERN.matcher("");
}

@Override
Expand Down Expand Up @@ -94,7 +102,11 @@ public KafkaTopicConfigBuilder<T> header(
{
this.headers = new ArrayList<>();
}
this.headers.add(new KafkaTopicHeaderType(new String32FW(name, UTF_8), path));
if (matcher.reset(path).matches())
{
this.headers.add(new KafkaTopicHeaderType(new String32FW(name, UTF_8),
String.format(INTERNAL_PATH_PATTERN, matcher.group(1))));
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

import static jakarta.json.JsonValue.ValueType.OBJECT;

import java.util.Map;

import jakarta.json.Json;
import jakarta.json.JsonArray;
import jakarta.json.JsonArrayBuilder;
import jakarta.json.JsonObject;
import jakarta.json.JsonObjectBuilder;
import jakarta.json.JsonString;
import jakarta.json.JsonValue;
import jakarta.json.bind.adapter.JsonbAdapter;

Expand Down Expand Up @@ -79,12 +80,10 @@ public JsonObject adaptToJson(

if (topic.headers != null && !topic.headers.isEmpty())
{
JsonArrayBuilder headers = Json.createArrayBuilder();
JsonObjectBuilder headers = Json.createObjectBuilder();
for (KafkaTopicHeaderType header : topic.headers)
{
JsonObjectBuilder headerJson = Json.createObjectBuilder();
headerJson.add(header.name.asString(), header.path);
headers.add(headerJson);
headers.add(header.name.asString(), header.path);
}
object.add(HEADERS_NAME, headers);
}
Expand Down Expand Up @@ -139,17 +138,14 @@ public KafkaTopicConfig adaptFromJson(
topicBuilder.value(converter.adaptFromJson(valueObject.build()));
}

JsonArray headers = object.containsKey(HEADERS_NAME) ? object.getJsonArray(HEADERS_NAME) : null;
JsonObject headers = object.containsKey(HEADERS_NAME) ? object.getJsonObject(HEADERS_NAME) : null;

if (headers != null && !headers.isEmpty())
if (headers != null)
{
for (JsonValue header: headers)
for (Map.Entry<String, JsonValue> entry : headers.entrySet())
{
JsonObject headerJson = header.asJsonObject();
for (String headerName: headerJson.keySet())
{
topicBuilder.header(headerName, headerJson.getString(headerName));
}
JsonString jsonString = (JsonString) entry.getValue();
topicBuilder.header(entry.getKey(), jsonString.getString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;

import jakarta.json.bind.Jsonb;
import jakarta.json.bind.JsonbBuilder;
Expand Down Expand Up @@ -228,11 +229,9 @@ public void shouldReadHeadersOptions()
"{" +
"\"name\": \"test\"," +
"\"headers\":" +
"[" +
"{" +
"\"correlation-id\": \"$.correlation-id\"" +
"}" +
"]" +
"{" +
"\"correlation-id\": \"${message.value.correlationId}\"" +
"}" +
"}" +
"]" +
"}";
Expand All @@ -241,10 +240,8 @@ public void shouldReadHeadersOptions()

assertThat(options, not(nullValue()));
assertThat(options.bootstrap, equalTo(singletonList("test")));
assertThat(options.topics, equalTo(singletonList(KafkaTopicConfig.builder()
.name("test")
.header("correlation-id", "$.correlation-id")
.build())));
assertEquals(options.topics.get(0).headers.get(0).name.asString(), "correlation-id");
assertEquals(options.topics.get(0).headers.get(0).path, "$.correlationId");
}

@Test
Expand All @@ -254,7 +251,7 @@ public void shouldWriteHeadersOptions()
.bootstrap(singletonList("test"))
.topics(singletonList(KafkaTopicConfig.builder()
.name("test")
.header("correlation-id", "$.correlation-id")
.header("correlation-id", "${message.value.correlationId}")
.value(TestModelConfig.builder().length(0).build())
.build()))
.build();
Expand All @@ -263,6 +260,6 @@ public void shouldWriteHeadersOptions()

assertThat(text, not(nullValue()));
assertThat(text, equalTo("{\"bootstrap\":[\"test\"]," +
"\"topics\":[{\"name\":\"test\",\"value\":\"test\",\"headers\":[{\"correlation-id\":\"$.correlation-id\"}]}]}"));
"\"topics\":[{\"name\":\"test\",\"value\":\"test\",\"headers\":{\"correlation-id\":\"$.correlationId\"}}]}"));
}
}
2 changes: 1 addition & 1 deletion runtime/model-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<artifactId>flyweight-maven-plugin</artifactId>
<version>${project.version}</version>
<configuration>
<scopeNames>core avro_model</scopeNames>
<scopeNames>internal core avro_model</scopeNames>
<packageName>io.aklivity.zilla.runtime.model.avro.internal.types</packageName>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;

import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectCache;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.io.DirectBufferInputStream;
import org.agrona.io.ExpandableDirectBufferOutputStream;
import org.apache.avro.AvroRuntimeException;
Expand All @@ -42,6 +46,13 @@
import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig;
import io.aklivity.zilla.runtime.model.avro.internal.types.AvroBooleanFW;
import io.aklivity.zilla.runtime.model.avro.internal.types.AvroBytesFW;
import io.aklivity.zilla.runtime.model.avro.internal.types.AvroDoubleFW;
import io.aklivity.zilla.runtime.model.avro.internal.types.AvroFloatFW;
import io.aklivity.zilla.runtime.model.avro.internal.types.AvroIntFW;
import io.aklivity.zilla.runtime.model.avro.internal.types.AvroLongFW;
import io.aklivity.zilla.runtime.model.avro.internal.types.OctetsFW;

public abstract class AvroModelHandler
{
Expand All @@ -54,6 +65,7 @@ public abstract class AvroModelHandler
private static final int COMMA_LENGTH = ",".length();
private static final int JSON_FIELD_ARRAY_LENGTH = "\"\":[]," .length() + COMMA_LENGTH * 100;
private static final int JSON_FIELD_MAP_LENGTH = "\"\":{},".length();
private static final DirectBuffer EMPTY_BUFFER = new UnsafeBuffer();

protected final SchemaConfig catalog;
protected final CatalogHandler handler;
Expand All @@ -66,12 +78,21 @@ public abstract class AvroModelHandler
protected final ExpandableDirectBufferOutputStream expandable;
protected final DirectBufferInputStream in;
protected final AvroModelEventContext event;
protected final Map<String, OctetsFW> extracted;

private final Int2ObjectCache<Schema> schemas;
private final Int2ObjectCache<GenericDatumReader<GenericRecord>> readers;
private final Int2ObjectCache<GenericDatumWriter<GenericRecord>> writers;
private final Int2ObjectCache<GenericRecord> records;
private final Int2IntHashMap paddings;
private final AvroBytesFW bytesRO;
private final AvroIntFW intRO;
private final AvroLongFW longRO;
private final AvroFloatFW floatRO;
private final AvroDoubleFW doubleRO;
private final MutableDirectBuffer text;

private int progress;

protected AvroModelHandler(
AvroModelConfig config,
Expand All @@ -96,6 +117,14 @@ protected AvroModelHandler(
this.expandable = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
this.in = new DirectBufferInputStream();
this.event = new AvroModelEventContext(context);
this.extracted = new HashMap<>();
this.bytesRO = new AvroBytesFW();
this.intRO = new AvroIntFW();
this.longRO = new AvroLongFW();
this.floatRO = new AvroFloatFW();
this.doubleRO = new AvroDoubleFW();
this.text = new UnsafeBuffer(new byte[24]);

}

protected final boolean validate(
Expand All @@ -107,16 +136,25 @@ protected final boolean validate(
int length)
{
boolean status = false;
for (OctetsFW value: extracted.values())
{
value.wrap(EMPTY_BUFFER, 0, 0);
}
try
{
GenericRecord record = supplyRecord(schemaId);
in.wrap(buffer, index, length);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
Schema schema = supplySchema(schemaId);
if (reader != null)
{
reader.read(record, decoderFactory.binaryDecoder(in, decoder));
decoderFactory.binaryDecoder(in, decoder);
reader.read(record, decoder);
status = true;

}
progress = index;
extractFields(buffer, length, schema);
}
catch (IOException | AvroRuntimeException ex)
{
Expand All @@ -125,6 +163,17 @@ protected final boolean validate(
return status;
}

private void extractFields(
DirectBuffer buffer,
int length,
Schema schema)
{
for (Schema.Field field : schema.getFields())
{
extract(field.schema(), buffer, length, extracted.get(field.name()));
}
}

protected final Schema supplySchema(
int schemaId)
{
Expand Down Expand Up @@ -250,4 +299,97 @@ private int calculatePadding(
}
return padding;
}

private void extract(
Schema schema,
DirectBuffer data,
int limit,
OctetsFW octets)
{
switch (schema.getType())
{
case RECORD:
extractFields(data, limit, schema);
break;
case BYTES:
case STRING:
AvroBytesFW bytes = bytesRO.wrap(data, progress, limit);
OctetsFW value = bytes.value();
progress = bytes.limit();
if (octets != null)
{
octets.wrap(value.buffer(), value.offset(), value.limit());
}
break;
case INT:
AvroIntFW int32 = intRO.wrap(data, progress, limit);
int intValue = int32.value();
progress = int32.limit();
int length = text.putIntAscii(0, intValue);
if (octets != null)
{
octets.wrap(text, 0, length);
}
break;
case FLOAT:
AvroFloatFW avroFloat = floatRO.wrap(data, progress, limit);
int len = 0;
DirectBuffer buffer = avroFloat.value().value();
float floatValue = Float.intBitsToFloat(decodeNumberBytes(len, buffer));
progress = avroFloat.limit();
length = text.putStringWithoutLengthAscii(0, String.valueOf(floatValue));
if (octets != null)
{
octets.wrap(text, 0, length);
}
break;
case LONG:
AvroLongFW avroLong = longRO.wrap(data, progress, limit);
long longValue = avroLong.value();
progress = avroLong.limit();
length = text.putLongAscii(0, longValue);
if (octets != null)
{
octets.wrap(text, 0, length);
}
break;
case DOUBLE:
AvroDoubleFW avroDouble = doubleRO.wrap(data, progress, limit);
len = 0;
buffer = avroDouble.value().value();
int decoded = (buffer.getByte(len++) & 0xff) |
((buffer.getByte(len++) & 0xff) << 8) |
((buffer.getByte(len++) & 0xff) << 16) |
((buffer.getByte(len++) & 0xff) << 24);

double doubleValue = Double.longBitsToDouble((((long) decoded) & 0xffffffffL) |
(((long) decodeNumberBytes(len, buffer)) << 32));
progress = avroDouble.limit();
length = text.putStringWithoutLengthAscii(0, String.valueOf(doubleValue));
if (octets != null)
{
octets.wrap(text, 0, length);
}
break;
case BOOLEAN:
AvroBooleanFW avroBoolean = new AvroBooleanFW().wrap(data, progress, limit);
value = avroBoolean.value();
progress = avroBoolean.limit();
if (octets != null)
{
octets.wrap(value.buffer(), value.offset(), value.limit());
}
break;
}
}

private static int decodeNumberBytes(
int len,
DirectBuffer buffer)
{
return (buffer.getByte(len++) & 0xff) |
((buffer.getByte(len++) & 0xff) << 8) |
((buffer.getByte(len++) & 0xff) << 16) |
((buffer.getByte(len) & 0xff) << 24);
}
}
Loading