Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Review fixes
  • Loading branch information
bmaidics committed Feb 21, 2024
commit 945fbb570004daa350deae3df6c7022ef0f67e91
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.config;

import static io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaTopicType.DEFAULT_TOPIC_TYPE;
import static io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaOffsetType.HISTORICAL;
import static java.util.stream.Collectors.toList;

Expand All @@ -34,15 +35,13 @@

public final class KafkaBindingConfig
{
private static final KafkaTopicType DEFAULT_TOPIC_TYPE = new KafkaTopicType();

public final long id;
public final String name;
public final KafkaOptionsConfig options;
public final KindConfig kind;
public final List<KafkaRouteConfig> routes;
public final ToLongFunction<String> resolveId;
public final List<KafkaTopicType> kafkaTopicTypes;
public final List<KafkaTopicType> topicTypes;

public KafkaBindingConfig(
BindingConfig binding,
Expand All @@ -54,7 +53,7 @@ public KafkaBindingConfig(
this.options = KafkaOptionsConfig.class.cast(binding.options);
this.routes = binding.routes.stream().map(KafkaRouteConfig::new).collect(toList());
this.resolveId = binding.resolveId;
this.kafkaTopicTypes = options != null && options.topics != null
this.topicTypes = options != null && options.topics != null
? options.topics.stream().map(t -> new KafkaTopicType(context, t)).collect(toList()) : Collections.emptyList();
}

Expand Down Expand Up @@ -117,15 +116,15 @@ public KafkaOffsetType supplyDefaultOffset(
public KafkaTopicType resolveTopicType(
String topic)
{
KafkaTopicType topicType = DEFAULT_TOPIC_TYPE;
for (KafkaTopicType k : kafkaTopicTypes)
KafkaTopicType matchedType = DEFAULT_TOPIC_TYPE;
for (KafkaTopicType topicType : topicTypes)
{
if (k.matches(topic))
if (topicType.matches(topic))
{
topicType = k;
matchedType = topicType;
break;
}
}
return topicType;
return matchedType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.aklivity.zilla.runtime.binding.kafka.internal.config;

import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -24,13 +25,16 @@

public class KafkaTopicType
{
private final Matcher topicMatch;
public static final KafkaTopicType DEFAULT_TOPIC_TYPE = new KafkaTopicType();

public final ConverterHandler keyReader;
public final ConverterHandler keyWriter;
public final ConverterHandler valueReader;
public final ConverterHandler valueWriter;

public KafkaTopicType()
private final Matcher topicMatch;

private KafkaTopicType()
{
this.topicMatch = null;
this.keyReader = ConverterHandler.NONE;
Expand All @@ -44,10 +48,18 @@ public KafkaTopicType(
KafkaTopicConfig topicConfig)
{
this.topicMatch = topicConfig.name != null ? asMatcher(topicConfig.name) : null;
this.keyReader = topicConfig.key != null ? context.supplyReadConverter(topicConfig.key) : ConverterHandler.NONE;
this.keyWriter = topicConfig.key != null ? context.supplyWriteConverter(topicConfig.key) : ConverterHandler.NONE;
this.valueReader = topicConfig.value != null ? context.supplyReadConverter(topicConfig.value) : ConverterHandler.NONE;
this.valueWriter = topicConfig.value != null ? context.supplyWriteConverter(topicConfig.value) : ConverterHandler.NONE;
this.keyReader = Optional.ofNullable(topicConfig.key)
.map(context::supplyReadConverter)
.orElse(ConverterHandler.NONE);
this.keyWriter = Optional.ofNullable(topicConfig.key)
.map(context::supplyWriteConverter)
.orElse(ConverterHandler.NONE);
this.valueReader = Optional.ofNullable(topicConfig.value)
.map(context::supplyReadConverter)
.orElse(ConverterHandler.NONE);
this.valueWriter = Optional.ofNullable(topicConfig.value)
.map(context::supplyWriteConverter)
.orElse(ConverterHandler.NONE);
}

public boolean matches(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,16 @@ public void shouldFetchMergedMessageValueValid() throws Exception
k3po.finish();
}

@Test
@Configuration("cache.options.parameterized.topic.validate.yaml")
@Specification({
"${app}/merged.fetch.from.parameterized.topic.value.valid/client",
"${app}/unmerged.fetch.from.parameterized.topic.value.valid/server"})
public void shouldFetchMergedFromParameterizedTopicValid() throws Exception
{
k3po.finish();
}

Comment thread
bmaidics marked this conversation as resolved.
@Test
@Configuration("cache.options.validate.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#
# Copyright 2021-2023 Aklivity Inc.
#
# Aklivity licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

---
name: test
catalogs:
test0:
type: test
options:
id: 1
schema: |
{
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "status",
"type": "string"
}
],
"name": "Event",
"namespace": "io.aklivity.example",
"type": "record"
}
bindings:
app0:
type: kafka
kind: cache_client
routes:
- exit: cache0
when:
- topic: test.*
cache0:
type: kafka
kind: cache_server
options:
topics:
- name: test.{id}
Comment thread
bmaidics marked this conversation as resolved.
value:
model: test
capability: read
length: 13
catalog:
test0:
- id: 1
routes:
- exit: app1
when:
- topic: test.*

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
# Copyright 2021-2023 Aklivity Inc.
#
# Aklivity licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

connect "zilla://streams/app0"
option zilla:window 16
option zilla:transmission "half-duplex"

write zilla:begin.ext ${kafka:beginEx()
.typeId(zilla:id("kafka"))
.merged()
.capabilities("FETCH_ONLY")
.topic("test.id0")
.partition(0, 1)
.build()
.build()}

connected

read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.merged()
.fetch()
.partition(0, 1, 2)
.build()
.build()}
read ${kafka:varint(3)} "id0" ${kafka:varint(8)} "positive"
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Copyright 2021-2023 Aklivity Inc.
#
# Aklivity licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#

property deltaMillis 0L
property newTimestamp ${kafka:timestamp() + deltaMillis}

accept "zilla://streams/app0"
option zilla:window 8192
option zilla:transmission "half-duplex"

accepted

read zilla:begin.ext ${kafka:beginEx()
.typeId(zilla:id("kafka"))
.merged()
.capabilities("FETCH_ONLY")
.topic("test.id0")
.partition(0, 1)
.build()
.build()}

connected

write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.merged()
.fetch()
.timestamp(newTimestamp)
.partition(0, 1, 2)
.build()
.build()}
write ${kafka:varint(3)} "id0" ${kafka:varint(8)} "positive"
write flush
Loading