diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaConfiguration.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaConfiguration.java index 35ed7c5814..93fcf954e0 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaConfiguration.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaConfiguration.java @@ -57,6 +57,7 @@ public class KafkaConfiguration extends Configuration public static final PropertyDef KAFKA_CACHE_CLEANUP_POLICY; public static final IntPropertyDef KAFKA_CACHE_MAX_MESSAGE_BYTES; public static final LongPropertyDef KAFKA_CACHE_RETENTION_MILLIS; + public static final LongPropertyDef KAFKA_CACHE_RETENTION_MILLIS_MAX; public static final LongPropertyDef KAFKA_CACHE_RETENTION_BYTES; public static final LongPropertyDef KAFKA_CACHE_DELETE_RETENTION_MILLIS; public static final LongPropertyDef KAFKA_CACHE_MIN_COMPACTION_LAG_MILLIS; @@ -120,6 +121,7 @@ public class KafkaConfiguration extends Configuration KafkaConfiguration::cleanupPolicy, "delete"); KAFKA_CACHE_MAX_MESSAGE_BYTES = config.property("cache.max.message.bytes", 1000012); KAFKA_CACHE_RETENTION_MILLIS = config.property("cache.retention.ms", 604800000L); + KAFKA_CACHE_RETENTION_MILLIS_MAX = config.property("cache.retention.ms.max", 30000L); KAFKA_CACHE_RETENTION_BYTES = config.property("cache.retention.bytes", -1L); KAFKA_CACHE_DELETE_RETENTION_MILLIS = config.property("cache.delete.retention.ms", 86400000L); KAFKA_CACHE_MIN_COMPACTION_LAG_MILLIS = config.property("cache.min.compaction.lag.ms", 0L); @@ -235,6 +237,11 @@ public long cacheRetentionMillis() return KAFKA_CACHE_RETENTION_MILLIS.getAsLong(this); } + public long cacheRetentionMillisMax() + { + return KAFKA_CACHE_RETENTION_MILLIS_MAX.getAsLong(this); + } + public long cacheSegmentMillis() { return KAFKA_CACHE_SEGMENT_MILLIS.getAsLong(this); diff --git a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerFetchFactory.java b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerFetchFactory.java index bd45d81cc9..46f9aaa512 100644 --- a/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerFetchFactory.java +++ b/runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheServerFetchFactory.java @@ -163,6 +163,7 @@ public final class KafkaCacheServerFetchFactory implements BindingHandler private final int reconnectDelay; private final EngineContext context; private final boolean verbose; + private final long retentionMillisMaxLive; public KafkaCacheServerFetchFactory( KafkaConfiguration config, @@ -188,6 +189,7 @@ public KafkaCacheServerFetchFactory( this.supplyCacheRoute = supplyCacheRoute; this.reconnectDelay = config.cacheServerReconnect(); this.verbose = config.verbose(); + this.retentionMillisMaxLive = config.cacheRetentionMillisMax(); } @Override @@ -527,7 +529,7 @@ private KafkaCacheServerFetchFanout( this.partition = partition; this.deltaType = deltaType; this.defaultOffset = defaultOffset; - this.retentionMillisMax = defaultOffset == LIVE ? SECONDS.toMillis(30) : Long.MAX_VALUE; + this.retentionMillisMax = defaultOffset == LIVE ? retentionMillisMaxLive : Long.MAX_VALUE; this.members = new ArrayList<>(); this.leaderId = leaderId; this.convertKey = topicType.keyReader; @@ -1190,7 +1192,7 @@ private void onServerFanoutInitialSignalSegmentDelete( final long now = currentTimeMillis(); Node segmentNode = partition.sentinel().next(); - while (!segmentNode.sentinel() && + while (!segmentNode.sentinel() && segmentNode != partition.head() && partition.deleteAt(segmentNode.segment(), retentionMillisMax) <= now) { segmentNode.remove(); diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaConfigurationTest.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaConfigurationTest.java index f2e7096b9c..8242d39570 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaConfigurationTest.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/KafkaConfigurationTest.java @@ -16,6 +16,7 @@ package io.aklivity.zilla.runtime.binding.kafka.internal; import static io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration.KAFKA_CACHE_CLIENT_CLEANUP_DELAY; +import static io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration.KAFKA_CACHE_RETENTION_MILLIS_MAX; import static io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration.KAFKA_CACHE_SERVER_RECONNECT_DELAY; import static io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration.KAFKA_CLIENT_CONNECTION_POOL_CLEANUP_MILLIS; import static io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration.KAFKA_CLIENT_INSTANCE_ID; @@ -38,6 +39,9 @@ public class KafkaConfigurationTest public static final String KAFKA_CLIENT_SASL_SCRAM_NONCE_NAME = "zilla.binding.kafka.client.sasl.scram.nonce"; public static final String KAFKA_CLIENT_INSTANCE_ID_NAME = "zilla.binding.kafka.client.instance.id"; + public static final String KAFKA_CACHE_RETENTION_MILLIS_MAX_NAME = + "zilla.binding.kafka.cache.retention.ms.max"; + @Test public void shouldVerifyConstants() throws Exception { @@ -48,5 +52,6 @@ public void shouldVerifyConstants() throws Exception assertEquals(KAFKA_CACHE_CLIENT_CLEANUP_DELAY.name(), KAFKA_CACHE_CLIENT_CLEANUP_DELAY_NAME); assertEquals(KAFKA_CLIENT_SASL_SCRAM_NONCE.name(), KAFKA_CLIENT_SASL_SCRAM_NONCE_NAME); assertEquals(KAFKA_CLIENT_INSTANCE_ID.name(), KAFKA_CLIENT_INSTANCE_ID_NAME); + assertEquals(KAFKA_CACHE_RETENTION_MILLIS_MAX.name(), KAFKA_CACHE_RETENTION_MILLIS_MAX_NAME); } } diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheFetchIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheFetchIT.java index e1460d0399..6f4255c7c3 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheFetchIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheFetchIT.java @@ -36,11 +36,13 @@ import io.aklivity.k3po.runtime.junit.annotation.Specification; import io.aklivity.k3po.runtime.junit.rules.K3poRule; import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding; +import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfigurationTest; import io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCache; import io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCachePartition; import io.aklivity.zilla.runtime.binding.kafka.internal.cache.KafkaCacheTopic; import io.aklivity.zilla.runtime.engine.test.EngineRule; import io.aklivity.zilla.runtime.engine.test.annotation.Configuration; +import io.aklivity.zilla.runtime.engine.test.annotation.Configure; public class CacheFetchIT { @@ -451,6 +453,22 @@ public void shouldReceiveMessagesWithNoFilter() throws Exception k3po.finish(); } + @Test + @Configuration("cache.options.live.yaml") + @Specification({ + "${app}/messages.after.retention.max/client", + "${app}/messages.after.retention.max/server"}) + @ScriptProperty("serverAddress \"zilla://streams/app1\"") + @Configure(name = KafkaConfigurationTest.KAFKA_CACHE_RETENTION_MILLIS_MAX_NAME, value = "200") + public void shouldReceiveMessagesAfterRetentionMax() throws Exception + { + partition.append(10L); + k3po.start(); + Thread.sleep(250); + k3po.notifyBarrier("SEND_MESSAGE_2"); + k3po.finish(); + } + @Test @Configuration("cache.yaml") @Specification({ diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/cache.options.live.yaml b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/cache.options.live.yaml new file mode 100644 index 0000000000..66b85e1f16 --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/config/cache.options.live.yaml @@ -0,0 +1,31 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +--- +name: test +bindings: + app0: + type: kafka + kind: cache_client + exit: cache0 + cache0: + type: kafka + kind: cache_server + options: + topics: + - name: test + defaultOffset: live + exit: app1 diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/fetch/messages.after.retention.max/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/fetch/messages.after.retention.max/client.rpt new file mode 100644 index 0000000000..058928cb3e --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/fetch/messages.after.retention.max/client.rpt @@ -0,0 +1,91 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +read zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} + +read notify ROUTED_BROKER_CLIENT + +connect await ROUTED_BROKER_CLIENT + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "half-duplex" + option zilla:affinity 0xb1 + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .fetch() + .topic("test") + .partition(0, 10) + .build() + .build()} + +connected + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .fetch() + .topic("test") + .partition(0, 10, 10) + .build() + .build()} + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .fetch() + .partition(0, 10, 10) + .build() + .build()} +read "Hello, world" + +read advised zilla:flush ${kafka:flushEx() + .typeId(zilla:id("kafka")) + .fetch() + .partition(0, 10, 10) + .build() + .build()} + +read zilla:data.ext ${kafka:matchDataEx() + .typeId(zilla:id("kafka")) + .fetch() + .partition(0, 20, 20) + .build() + .build()} +read "Hello, again" diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/fetch/messages.after.retention.max/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/fetch/messages.after.retention.max/server.rpt new file mode 100644 index 0000000000..dc1c488dbb --- /dev/null +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/fetch/messages.after.retention.max/server.rpt @@ -0,0 +1,93 @@ +# +# Copyright 2021-2024 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +property deltaMillis 0L +property newTimestamp ${kafka:timestamp() + deltaMillis} + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "half-duplex" + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .meta() + .topic("test") + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .meta() + .partition(0, 177) + .build() + .build()} +write flush + +accepted + +read zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .fetch() + .topic("test") + .partition(0, 10) + .build() + .build()} + +connected + +write zilla:begin.ext ${kafka:beginEx() + .typeId(zilla:id("kafka")) + .fetch() + .topic("test") + .partition(0, 10, 10) + .build() + .build()} +write flush + +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .fetch() + .timestamp(newTimestamp) + .partition(0, 10, 10) + .build() + .build()} +write "Hello, world" +write flush + +write await SEND_MESSAGE_2 +write zilla:data.ext ${kafka:dataEx() + .typeId(zilla:id("kafka")) + .fetch() + .timestamp(newTimestamp) + .partition(0, 20, 20) + .build() + .build()} +write "Hello, again" +write flush