From ffb6b13221b4df2318e7a82680b2bd583cc87d3d Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 4 Oct 2024 11:58:52 -0700 Subject: [PATCH 1/4] WIP --- .../stream/PgsqlKafkaCommandType.java | 1 + .../stream/PgsqlKafkaCompletionCommand.java | 1 + .../stream/PgsqlKafkaProxyFactory.java | 118 +++++++++++++++++- .../handler/SchemaRegistryCatalogHandler.java | 38 +++++- .../registry/internal/SchemaRegistryIT.java | 11 ++ .../engine/catalog/CatalogHandler.java | 5 + .../internal/binding/TestBindingFactory.java | 4 + .../registry/config/unregister/zilla.yaml | 31 +++++ .../registry/streams/unregister.schema.rpt | 33 +++++ 9 files changed, 238 insertions(+), 4 deletions(-) create mode 100644 specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/config/unregister/zilla.yaml create mode 100644 specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/streams/unregister.schema.rpt diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCommandType.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCommandType.java index 60ca9fd8b7..e7f4f22c7e 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCommandType.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCommandType.java @@ -22,6 +22,7 @@ public enum PgsqlKafkaCommandType { CREATE_TOPIC_COMMAND("CREATE TOPIC".getBytes()), + DROP_TOPIC_COMMAND("DROP TOPIC".getBytes()), UNKNOWN_COMMAND("UNKNOWN".getBytes()); private final byte[] value; diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCompletionCommand.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCompletionCommand.java index c1d7352a22..085b7ea29a 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCompletionCommand.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCompletionCommand.java @@ -17,6 +17,7 @@ public enum PgsqlKafkaCompletionCommand { CREATE_TOPIC_COMMAND("CREATE_TOPIC".getBytes()), + DROP_TOPIC_COMMAND("DROP_TOPIC".getBytes()), UNKNOWN_COMMAND("UNKNOWN".getBytes()); private final byte[] value; diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java index d7ca9e8180..934d58acec 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java @@ -16,6 +16,7 @@ import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT; import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_VERSION_ID; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import java.io.InputStreamReader; @@ -143,6 +144,7 @@ public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory Object2ObjectHashMap pgsqlDecoder = new Object2ObjectHashMap<>(); pgsqlDecoder.put(PgsqlKafkaCommandType.CREATE_TOPIC_COMMAND, this::decodeCreateTopicCommand); + pgsqlDecoder.put(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND, this::decodeDropTopicCommand); pgsqlDecoder.put(PgsqlKafkaCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand); this.pgsqlDecoder = pgsqlDecoder; } @@ -232,6 +234,7 @@ private final class PgsqlProxy private final String database; private final PgsqlKafkaBindingConfig binding; private final KafkaCreateTopicsProxy createTopicsProxy; + private final KafkaDeleteTopicsProxy deleteTopicsProxy; private final IntArrayQueue queries; @@ -281,6 +284,7 @@ private PgsqlProxy( this.queries = new IntArrayQueue(); this.createTopicsProxy = new KafkaCreateTopicsProxy(routedId, resolvedId, this); + this.deleteTopicsProxy = new KafkaDeleteTopicsProxy(routedId, resolvedId, this); } private void onAppMessage( @@ -488,7 +492,7 @@ private void onCommandCompleted( doAppWindow(traceId, authorization); } - public void onKafkaCreateTopicsBegin( + public void onKafkaBegin( long traceId, long authorization) { @@ -990,6 +994,80 @@ private void doKafkaBegin( traceId, authorization, 0, kafkaBeginEx); } + @Override + protected void onKafkaBegin( + BeginFW begin) + { + final long sequence = begin.sequence(); + final long acknowledge = begin.acknowledge(); + final long traceId = begin.traceId(); + final long authorization = begin.authorization(); + final OctetsFW extension = begin.extension(); + + assert acknowledge <= sequence; + assert sequence >= replySeq; + assert acknowledge >= replyAck; + + replySeq = sequence; + replyAck = acknowledge; + state = PgsqlKafkaState.openingReply(state); + + assert replyAck <= replySeq; + + final ExtensionFW beginEx = extension.get(extensionRO::tryWrap); + final KafkaBeginExFW kafkaBeginEx = + beginEx != null && beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::tryWrap) : null; + + boolean errorExits = kafkaBeginEx.response().deleteTopics().topics().anyMatch(t -> t.error() != 0); + + if (!errorExits) + { + delegate.onKafkaBegin(traceId, authorization); + + doKafkaWindow(traceId, authorization); + doKafkaEnd(traceId, authorization); + } + else + { + delegate.cleanup(traceId, authorization); + } + } + } + + private final class KafkaDeleteTopicsProxy extends KafkaProxy + { + private KafkaDeleteTopicsProxy( + long originId, + long routedId, + PgsqlProxy delegate) + { + super(originId, routedId, delegate); + } + + private void doKafkaBegin( + long traceId, + long authorization, + List topics) + { + initialSeq = delegate.initialSeq; + initialAck = delegate.initialAck; + initialMax = delegate.initialMax; + state = PgsqlKafkaState.openingInitial(state); + + final KafkaBeginExFW kafkaBeginEx = + kafkaBeginExRW.wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(kafkaTypeId) + .request(r -> r + .deleteTopics(c -> c + .names(ct -> + topics.forEach(t -> ct.item(i -> i.set(t, UTF_8)))) + .timeout(config.kafkaTopicRequestTimeoutMs()))) + .build(); + + kafka = newKafkaConsumer(this::onKafkaMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax, + traceId, authorization, 0, kafkaBeginEx); + } + @Override protected void onKafkaBegin( BeginFW begin) @@ -1018,7 +1096,7 @@ protected void onKafkaBegin( if (!errorExits) { - delegate.onKafkaCreateTopicsBegin(traceId, authorization); + delegate.onKafkaBegin(traceId, authorization); doKafkaWindow(traceId, authorization); doKafkaEnd(traceId, authorization); @@ -1292,6 +1370,35 @@ else if (server.commandsProcessed == 0) } } + private void decodeDropTopicCommand( + PgsqlProxy server, + long traceId, + long authorization, + DirectBuffer buffer, + int offset, + int length) + { + if (server.commandsProcessed == 1) + { + server.onCommandCompleted(traceId, authorization, length, PgsqlKafkaCompletionCommand.DROP_TOPIC_COMMAND); + } + else if (server.commandsProcessed == 0) + { + final CreateTable createTable = (CreateTable) parseStatement(buffer, offset, length); + final String topic = createTable.getTable().getName(); + + final PgsqlKafkaBindingConfig binding = server.binding; + final String subjectKey = String.format("%s.%s-key", server.database, topic); + final String subjectValue = String.format("%s.%s-value", server.database, topic); + + binding.catalog.unregister(subjectKey); + binding.catalog.unregister(subjectValue); + + final KafkaDeleteTopicsProxy deleteTopicsProxy = server.deleteTopicsProxy; + deleteTopicsProxy.doKafkaBegin(traceId, authorization, topics); + } + } + private void decodeUnknownCommand( PgsqlProxy server, long traceId, @@ -1351,6 +1458,13 @@ private Statement parseStatement( sql = sql.replace("CREATE TOPIC", "CREATE TABLE"); statement = parserManager.parse(new StringReader(sql)); } + if (decodeCommandType(buffer, offset, length). + equals(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND)) + { + String sql = buffer.getStringWithoutLengthUtf8(offset, length); + sql = sql.replace("DROP TOPIC", "DROP TABLE"); + statement = parserManager.parse(new StringReader(sql)); + } else { inputStream.wrap(buffer, offset, length); diff --git a/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java b/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java index 10de4803db..54fa0676eb 100644 --- a/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java +++ b/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java @@ -42,7 +42,8 @@ public class SchemaRegistryCatalogHandler implements CatalogHandler { private static final String SUBJECT_VERSION_PATH = "/subjects/{0}/versions/{1}"; - private static final String SUBJECT_PATH = "/subjects/{0}/versions"; + private static final String REGISTER_SUBJECT_PATH = "/subjects/{0}/versions"; + private static final String UNREGISTER_SUBJECT_PATH = "/subjects/{0}"; private static final String SCHEMA_PATH = "/schemas/ids/{0}"; private static final int MAX_PADDING_LENGTH = 5; @@ -88,7 +89,7 @@ public int register( { int versionId = NO_VERSION_ID; - String response = sendPostHttpRequest(MessageFormat.format(SUBJECT_PATH, subject), schema); + String response = sendPostHttpRequest(MessageFormat.format(REGISTER_SUBJECT_PATH, subject), schema); if (response != null) { versionId = request.resolveResponse(response); @@ -97,6 +98,17 @@ public int register( return versionId; } + @Override + public void unregister( + String subject) + { + String response = sendDeleteHttpRequest(MessageFormat.format(UNREGISTER_SUBJECT_PATH, subject)); + if (response != null) + { + request.resolveResponse(response); + } + } + @Override public String resolve( int schemaId) @@ -390,6 +402,28 @@ private String sendPostHttpRequest( return responseBody; } + private String sendDeleteHttpRequest( + String path) + { + HttpRequest httpRequest = HttpRequest + .newBuilder(toURI(baseUrl, path)) + .version(HttpClient.Version.HTTP_1_1) + .DELETE() + .build(); + + String responseBody; + try + { + HttpResponse httpResponse = client.send(httpRequest, HttpResponse.BodyHandlers.ofString()); + responseBody = httpResponse.statusCode() == 200 ? httpResponse.body() : null; + } + catch (Exception ex) + { + responseBody = null; + } + return responseBody; + } + @Override public String location() { diff --git a/runtime/catalog-schema-registry/src/test/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryIT.java b/runtime/catalog-schema-registry/src/test/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryIT.java index d22a449690..704c6f25ad 100644 --- a/runtime/catalog-schema-registry/src/test/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryIT.java +++ b/runtime/catalog-schema-registry/src/test/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryIT.java @@ -145,4 +145,15 @@ public void shouldRegisterSchema() throws Exception { k3po.finish(); } + + @Test + @Configuration("unregister/zilla.yaml") + @Specification({ + "${net}/handshake/client", + "${app}/handshake/server", + "${remote}/unregister.schema" }) + public void shouldUnregisterSchema() throws Exception + { + k3po.finish(); + } } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java index 65b3aac632..38d52b1a7f 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java @@ -69,6 +69,11 @@ default int register( return NO_VERSION_ID; } + default void unregister( + String subject) + { + } + String resolve( int schemaId); diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java index e7e5a3e590..b5b6cb58aa 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java @@ -343,6 +343,10 @@ else if (assertion.schema != null && !assertion.schema.equals(schema)) } else { + if (catalog.subject != null && schema == null) + { + handler.unregister(catalog.subject); + } if (catalog.subject != null && schema != null) { handler.register(catalog.subject, schema); diff --git a/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/config/unregister/zilla.yaml b/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/config/unregister/zilla.yaml new file mode 100644 index 0000000000..aac3817758 --- /dev/null +++ b/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/config/unregister/zilla.yaml @@ -0,0 +1,31 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +--- +name: test +catalogs: + catalog0: + type: schema-registry + options: + url: http://localhost:8081 +bindings: + net0: + type: test + kind: server + options: + catalog: + catalog0: + - subject: items-snapshots-value + exit: app0 diff --git a/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/streams/unregister.schema.rpt b/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/streams/unregister.schema.rpt new file mode 100644 index 0000000000..98873d56dd --- /dev/null +++ b/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/streams/unregister.schema.rpt @@ -0,0 +1,33 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "http://localhost:8081/subjects/items-snapshots-value" + +accepted +connected + +read http:method "DELETE" +read http:version "HTTP/1.1" +read http:header "Host" "localhost:8081" + +read closed + +write http:status "200" "OK" +write http:header "content-type" "application/vnd.schemaregistry.v1+json" +write http:content-length + +write '[1]' + +write close From 598faa125a1c84bce347f74222171f2c41e672ba Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 4 Oct 2024 12:26:54 -0700 Subject: [PATCH 2/4] WIP --- .../engine/test/internal/binding/TestBindingFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java index b5b6cb58aa..115764015e 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java @@ -351,7 +351,7 @@ else if (assertion.schema != null && !assertion.schema.equals(schema)) { handler.register(catalog.subject, schema); } - else if (catalog.subject != null && catalog.version != null) + else if (catalog.subject != null && catalog.version != null && schema != null) { handler.resolve(catalog.subject, catalog.version); } From 8629818c60ffa036f1da662663ce09957dd15b2f Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 4 Oct 2024 12:42:44 -0700 Subject: [PATCH 3/4] Checkpoint --- .../stream/PgsqlKafkaProxyFactory.java | 9 ++-- .../pgsql/kafka/internal/stream/ProxyIT.java | 11 +++++ .../handler/SchemaRegistryCatalogHandler.java | 24 ++++++---- .../serializer/UnregisterSchemaRequest.java | 46 +++++++++++++++++++ .../engine/catalog/CatalogHandler.java | 3 +- 5 files changed, 80 insertions(+), 13 deletions(-) create mode 100644 runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/serializer/UnregisterSchemaRequest.java diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java index 934d58acec..7b9c94a912 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java @@ -65,6 +65,7 @@ import net.sf.jsqlparser.parser.CCJSqlParserManager; import net.sf.jsqlparser.statement.Statement; import net.sf.jsqlparser.statement.create.table.CreateTable; +import net.sf.jsqlparser.statement.drop.Drop; public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory { @@ -1092,7 +1093,7 @@ protected void onKafkaBegin( final KafkaBeginExFW kafkaBeginEx = beginEx != null && beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::tryWrap) : null; - boolean errorExits = kafkaBeginEx.response().createTopics().topics().anyMatch(t -> t.error() != 0); + boolean errorExits = kafkaBeginEx.response().deleteTopics().topics().anyMatch(t -> t.error() != 0); if (!errorExits) { @@ -1384,8 +1385,8 @@ private void decodeDropTopicCommand( } else if (server.commandsProcessed == 0) { - final CreateTable createTable = (CreateTable) parseStatement(buffer, offset, length); - final String topic = createTable.getTable().getName(); + final Drop drop = (Drop) parseStatement(buffer, offset, length); + final String topic = drop.getName().getName(); final PgsqlKafkaBindingConfig binding = server.binding; final String subjectKey = String.format("%s.%s-key", server.database, topic); @@ -1395,7 +1396,7 @@ else if (server.commandsProcessed == 0) binding.catalog.unregister(subjectValue); final KafkaDeleteTopicsProxy deleteTopicsProxy = server.deleteTopicsProxy; - deleteTopicsProxy.doKafkaBegin(traceId, authorization, topics); + deleteTopicsProxy.doKafkaBegin(traceId, authorization, List.of("%s.%s".formatted(server.database, topic))); } } diff --git a/incubator/binding-pgsql-kafka/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/ProxyIT.java b/incubator/binding-pgsql-kafka/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/ProxyIT.java index 58d08cba16..ea24574a15 100644 --- a/incubator/binding-pgsql-kafka/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/ProxyIT.java +++ b/incubator/binding-pgsql-kafka/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/ProxyIT.java @@ -58,4 +58,15 @@ public void shouldCreateTopic() throws Exception { k3po.finish(); } + + @Test + @Configuration("proxy.yaml") + @Specification({ + "${pgsql}/drop.topic/client", + "${kafka}/drop.topic/server" + }) + public void shouldDropTopic() throws Exception + { + k3po.finish(); + } } diff --git a/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java b/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java index 54fa0676eb..587c991d91 100644 --- a/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java +++ b/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.catalog.schema.registry.internal.handler; import static io.aklivity.zilla.runtime.catalog.schema.registry.internal.handler.CachedSchemaId.IN_PROGRESS; +import static io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.UnregisterSchemaRequest.NO_VERSIONS; import java.net.URI; import java.net.http.HttpClient; @@ -35,6 +36,7 @@ import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryCatalogConfig; import io.aklivity.zilla.runtime.catalog.schema.registry.internal.events.SchemaRegistryEventContext; import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.RegisterSchemaRequest; +import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.UnregisterSchemaRequest; import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.SchemaRegistryPrefixFW; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; @@ -56,7 +58,8 @@ public class SchemaRegistryCatalogHandler implements CatalogHandler private final HttpClient client; private final String baseUrl; - private final RegisterSchemaRequest request; + private final RegisterSchemaRequest registerRequest; + private final UnregisterSchemaRequest unregisterRequest; private final CRC32C crc32c; private final Int2ObjectCache schemas; private final Int2ObjectCache schemaIds; @@ -71,7 +74,8 @@ public SchemaRegistryCatalogHandler( { this.baseUrl = catalog.options.url; this.client = HttpClient.newHttpClient(); - this.request = new RegisterSchemaRequest(); + this.registerRequest = new RegisterSchemaRequest(); + this.unregisterRequest = new UnregisterSchemaRequest(); this.crc32c = new CRC32C(); this.schemas = new Int2ObjectCache<>(1, 1024, i -> {}); this.schemaIds = new Int2ObjectCache<>(1, 1024, i -> {}); @@ -92,21 +96,25 @@ public int register( String response = sendPostHttpRequest(MessageFormat.format(REGISTER_SUBJECT_PATH, subject), schema); if (response != null) { - versionId = request.resolveResponse(response); + versionId = registerRequest.resolveResponse(response); } return versionId; } @Override - public void unregister( + public int[] unregister( String subject) { + int[] versions = NO_VERSIONS; + String response = sendDeleteHttpRequest(MessageFormat.format(UNREGISTER_SUBJECT_PATH, subject)); if (response != null) { - request.resolveResponse(response); + versions = unregisterRequest.resolveResponse(response); } + + return versions; } @Override @@ -161,7 +169,7 @@ public String resolve( { event.onRetrievableSchemaId(catalogId, schemaId); } - newFuture.complete(new CachedSchema(request.resolveSchemaResponse(response), retryAttempts)); + newFuture.complete(new CachedSchema(registerRequest.resolveSchemaResponse(response), retryAttempts)); } } catch (Throwable ex) @@ -263,8 +271,8 @@ else if (response != null) { event.onRetrievableSchemaSubjectVersion(catalogId, subject, version); } - newFuture.complete(new CachedSchemaId(System.currentTimeMillis(), request.resolveResponse(response), - retryAttempts, retryAfter)); + newFuture.complete(new CachedSchemaId(System.currentTimeMillis(), + registerRequest.resolveResponse(response), retryAttempts, retryAfter)); } } catch (Throwable ex) diff --git a/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/serializer/UnregisterSchemaRequest.java b/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/serializer/UnregisterSchemaRequest.java new file mode 100644 index 0000000000..84ff7d4361 --- /dev/null +++ b/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/serializer/UnregisterSchemaRequest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer; + +import java.io.StringReader; +import java.util.stream.IntStream; + +import jakarta.json.Json; +import jakarta.json.JsonArray; +import jakarta.json.JsonReader; +import jakarta.json.stream.JsonParsingException; + +public class UnregisterSchemaRequest +{ + public static final int[] NO_VERSIONS = new int[0]; + + public int[] resolveResponse( + String response) + { + try + { + JsonReader reader = Json.createReader(new StringReader(response)); + JsonArray array = reader.readArray(); + + return IntStream.range(0, array.size()) + .map(array::getInt) + .toArray(); + } + catch (JsonParsingException ex) + { + return NO_VERSIONS; + } + } +} diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java index 38d52b1a7f..ceabb8fdcf 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java @@ -69,9 +69,10 @@ default int register( return NO_VERSION_ID; } - default void unregister( + default int[] unregister( String subject) { + return new int[0]; } String resolve( From 91f627616ff6f20f01265ef48b400fc4726cb233 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 4 Oct 2024 16:31:24 -0700 Subject: [PATCH 4/4] Fix typo --- .../pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java index 7b9c94a912..6461b89336 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java @@ -1019,7 +1019,7 @@ protected void onKafkaBegin( final KafkaBeginExFW kafkaBeginEx = beginEx != null && beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::tryWrap) : null; - boolean errorExits = kafkaBeginEx.response().deleteTopics().topics().anyMatch(t -> t.error() != 0); + boolean errorExits = kafkaBeginEx.response().createTopics().topics().anyMatch(t -> t.error() != 0); if (!errorExits) {