diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCommandType.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCommandType.java index 60ca9fd8b7..e7f4f22c7e 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCommandType.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCommandType.java @@ -22,6 +22,7 @@ public enum PgsqlKafkaCommandType { CREATE_TOPIC_COMMAND("CREATE TOPIC".getBytes()), + DROP_TOPIC_COMMAND("DROP TOPIC".getBytes()), UNKNOWN_COMMAND("UNKNOWN".getBytes()); private final byte[] value; diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCompletionCommand.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCompletionCommand.java index c1d7352a22..085b7ea29a 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCompletionCommand.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaCompletionCommand.java @@ -17,6 +17,7 @@ public enum PgsqlKafkaCompletionCommand { CREATE_TOPIC_COMMAND("CREATE_TOPIC".getBytes()), + DROP_TOPIC_COMMAND("DROP_TOPIC".getBytes()), UNKNOWN_COMMAND("UNKNOWN".getBytes()); private final byte[] value; diff --git a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java index d7ca9e8180..6461b89336 100644 --- a/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java +++ b/incubator/binding-pgsql-kafka/src/main/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/PgsqlKafkaProxyFactory.java @@ -16,6 +16,7 @@ import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT; import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_VERSION_ID; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import java.io.InputStreamReader; @@ -64,6 +65,7 @@ import net.sf.jsqlparser.parser.CCJSqlParserManager; import net.sf.jsqlparser.statement.Statement; import net.sf.jsqlparser.statement.create.table.CreateTable; +import net.sf.jsqlparser.statement.drop.Drop; public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory { @@ -143,6 +145,7 @@ public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory Object2ObjectHashMap pgsqlDecoder = new Object2ObjectHashMap<>(); pgsqlDecoder.put(PgsqlKafkaCommandType.CREATE_TOPIC_COMMAND, this::decodeCreateTopicCommand); + pgsqlDecoder.put(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND, this::decodeDropTopicCommand); pgsqlDecoder.put(PgsqlKafkaCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand); this.pgsqlDecoder = pgsqlDecoder; } @@ -232,6 +235,7 @@ private final class PgsqlProxy private final String database; private final PgsqlKafkaBindingConfig binding; private final KafkaCreateTopicsProxy createTopicsProxy; + private final KafkaDeleteTopicsProxy deleteTopicsProxy; private final IntArrayQueue queries; @@ -281,6 +285,7 @@ private PgsqlProxy( this.queries = new IntArrayQueue(); this.createTopicsProxy = new KafkaCreateTopicsProxy(routedId, resolvedId, this); + this.deleteTopicsProxy = new KafkaDeleteTopicsProxy(routedId, resolvedId, this); } private void onAppMessage( @@ -488,7 +493,7 @@ private void onCommandCompleted( doAppWindow(traceId, authorization); } - public void onKafkaCreateTopicsBegin( + public void onKafkaBegin( long traceId, long authorization) { @@ -1018,7 +1023,81 @@ protected void onKafkaBegin( if (!errorExits) { - delegate.onKafkaCreateTopicsBegin(traceId, authorization); + delegate.onKafkaBegin(traceId, authorization); + + doKafkaWindow(traceId, authorization); + doKafkaEnd(traceId, authorization); + } + else + { + delegate.cleanup(traceId, authorization); + } + } + } + + private final class KafkaDeleteTopicsProxy extends KafkaProxy + { + private KafkaDeleteTopicsProxy( + long originId, + long routedId, + PgsqlProxy delegate) + { + super(originId, routedId, delegate); + } + + private void doKafkaBegin( + long traceId, + long authorization, + List topics) + { + initialSeq = delegate.initialSeq; + initialAck = delegate.initialAck; + initialMax = delegate.initialMax; + state = PgsqlKafkaState.openingInitial(state); + + final KafkaBeginExFW kafkaBeginEx = + kafkaBeginExRW.wrap(extBuffer, 0, extBuffer.capacity()) + .typeId(kafkaTypeId) + .request(r -> r + .deleteTopics(c -> c + .names(ct -> + topics.forEach(t -> ct.item(i -> i.set(t, UTF_8)))) + .timeout(config.kafkaTopicRequestTimeoutMs()))) + .build(); + + kafka = newKafkaConsumer(this::onKafkaMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax, + traceId, authorization, 0, kafkaBeginEx); + } + + @Override + protected void onKafkaBegin( + BeginFW begin) + { + final long sequence = begin.sequence(); + final long acknowledge = begin.acknowledge(); + final long traceId = begin.traceId(); + final long authorization = begin.authorization(); + final OctetsFW extension = begin.extension(); + + assert acknowledge <= sequence; + assert sequence >= replySeq; + assert acknowledge >= replyAck; + + replySeq = sequence; + replyAck = acknowledge; + state = PgsqlKafkaState.openingReply(state); + + assert replyAck <= replySeq; + + final ExtensionFW beginEx = extension.get(extensionRO::tryWrap); + final KafkaBeginExFW kafkaBeginEx = + beginEx != null && beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::tryWrap) : null; + + boolean errorExits = kafkaBeginEx.response().deleteTopics().topics().anyMatch(t -> t.error() != 0); + + if (!errorExits) + { + delegate.onKafkaBegin(traceId, authorization); doKafkaWindow(traceId, authorization); doKafkaEnd(traceId, authorization); @@ -1292,6 +1371,35 @@ else if (server.commandsProcessed == 0) } } + private void decodeDropTopicCommand( + PgsqlProxy server, + long traceId, + long authorization, + DirectBuffer buffer, + int offset, + int length) + { + if (server.commandsProcessed == 1) + { + server.onCommandCompleted(traceId, authorization, length, PgsqlKafkaCompletionCommand.DROP_TOPIC_COMMAND); + } + else if (server.commandsProcessed == 0) + { + final Drop drop = (Drop) parseStatement(buffer, offset, length); + final String topic = drop.getName().getName(); + + final PgsqlKafkaBindingConfig binding = server.binding; + final String subjectKey = String.format("%s.%s-key", server.database, topic); + final String subjectValue = String.format("%s.%s-value", server.database, topic); + + binding.catalog.unregister(subjectKey); + binding.catalog.unregister(subjectValue); + + final KafkaDeleteTopicsProxy deleteTopicsProxy = server.deleteTopicsProxy; + deleteTopicsProxy.doKafkaBegin(traceId, authorization, List.of("%s.%s".formatted(server.database, topic))); + } + } + private void decodeUnknownCommand( PgsqlProxy server, long traceId, @@ -1351,6 +1459,13 @@ private Statement parseStatement( sql = sql.replace("CREATE TOPIC", "CREATE TABLE"); statement = parserManager.parse(new StringReader(sql)); } + if (decodeCommandType(buffer, offset, length). + equals(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND)) + { + String sql = buffer.getStringWithoutLengthUtf8(offset, length); + sql = sql.replace("DROP TOPIC", "DROP TABLE"); + statement = parserManager.parse(new StringReader(sql)); + } else { inputStream.wrap(buffer, offset, length); diff --git a/incubator/binding-pgsql-kafka/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/ProxyIT.java b/incubator/binding-pgsql-kafka/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/ProxyIT.java index 58d08cba16..ea24574a15 100644 --- a/incubator/binding-pgsql-kafka/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/ProxyIT.java +++ b/incubator/binding-pgsql-kafka/src/test/java/io/aklivity/zilla/runtime/binding/pgsql/kafka/internal/stream/ProxyIT.java @@ -58,4 +58,15 @@ public void shouldCreateTopic() throws Exception { k3po.finish(); } + + @Test + @Configuration("proxy.yaml") + @Specification({ + "${pgsql}/drop.topic/client", + "${kafka}/drop.topic/server" + }) + public void shouldDropTopic() throws Exception + { + k3po.finish(); + } } diff --git a/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java b/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java index 10de4803db..587c991d91 100644 --- a/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java +++ b/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/handler/SchemaRegistryCatalogHandler.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.catalog.schema.registry.internal.handler; import static io.aklivity.zilla.runtime.catalog.schema.registry.internal.handler.CachedSchemaId.IN_PROGRESS; +import static io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.UnregisterSchemaRequest.NO_VERSIONS; import java.net.URI; import java.net.http.HttpClient; @@ -35,6 +36,7 @@ import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryCatalogConfig; import io.aklivity.zilla.runtime.catalog.schema.registry.internal.events.SchemaRegistryEventContext; import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.RegisterSchemaRequest; +import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.UnregisterSchemaRequest; import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.SchemaRegistryPrefixFW; import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler; import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer; @@ -42,7 +44,8 @@ public class SchemaRegistryCatalogHandler implements CatalogHandler { private static final String SUBJECT_VERSION_PATH = "/subjects/{0}/versions/{1}"; - private static final String SUBJECT_PATH = "/subjects/{0}/versions"; + private static final String REGISTER_SUBJECT_PATH = "/subjects/{0}/versions"; + private static final String UNREGISTER_SUBJECT_PATH = "/subjects/{0}"; private static final String SCHEMA_PATH = "/schemas/ids/{0}"; private static final int MAX_PADDING_LENGTH = 5; @@ -55,7 +58,8 @@ public class SchemaRegistryCatalogHandler implements CatalogHandler private final HttpClient client; private final String baseUrl; - private final RegisterSchemaRequest request; + private final RegisterSchemaRequest registerRequest; + private final UnregisterSchemaRequest unregisterRequest; private final CRC32C crc32c; private final Int2ObjectCache schemas; private final Int2ObjectCache schemaIds; @@ -70,7 +74,8 @@ public SchemaRegistryCatalogHandler( { this.baseUrl = catalog.options.url; this.client = HttpClient.newHttpClient(); - this.request = new RegisterSchemaRequest(); + this.registerRequest = new RegisterSchemaRequest(); + this.unregisterRequest = new UnregisterSchemaRequest(); this.crc32c = new CRC32C(); this.schemas = new Int2ObjectCache<>(1, 1024, i -> {}); this.schemaIds = new Int2ObjectCache<>(1, 1024, i -> {}); @@ -88,15 +93,30 @@ public int register( { int versionId = NO_VERSION_ID; - String response = sendPostHttpRequest(MessageFormat.format(SUBJECT_PATH, subject), schema); + String response = sendPostHttpRequest(MessageFormat.format(REGISTER_SUBJECT_PATH, subject), schema); if (response != null) { - versionId = request.resolveResponse(response); + versionId = registerRequest.resolveResponse(response); } return versionId; } + @Override + public int[] unregister( + String subject) + { + int[] versions = NO_VERSIONS; + + String response = sendDeleteHttpRequest(MessageFormat.format(UNREGISTER_SUBJECT_PATH, subject)); + if (response != null) + { + versions = unregisterRequest.resolveResponse(response); + } + + return versions; + } + @Override public String resolve( int schemaId) @@ -149,7 +169,7 @@ public String resolve( { event.onRetrievableSchemaId(catalogId, schemaId); } - newFuture.complete(new CachedSchema(request.resolveSchemaResponse(response), retryAttempts)); + newFuture.complete(new CachedSchema(registerRequest.resolveSchemaResponse(response), retryAttempts)); } } catch (Throwable ex) @@ -251,8 +271,8 @@ else if (response != null) { event.onRetrievableSchemaSubjectVersion(catalogId, subject, version); } - newFuture.complete(new CachedSchemaId(System.currentTimeMillis(), request.resolveResponse(response), - retryAttempts, retryAfter)); + newFuture.complete(new CachedSchemaId(System.currentTimeMillis(), + registerRequest.resolveResponse(response), retryAttempts, retryAfter)); } } catch (Throwable ex) @@ -390,6 +410,28 @@ private String sendPostHttpRequest( return responseBody; } + private String sendDeleteHttpRequest( + String path) + { + HttpRequest httpRequest = HttpRequest + .newBuilder(toURI(baseUrl, path)) + .version(HttpClient.Version.HTTP_1_1) + .DELETE() + .build(); + + String responseBody; + try + { + HttpResponse httpResponse = client.send(httpRequest, HttpResponse.BodyHandlers.ofString()); + responseBody = httpResponse.statusCode() == 200 ? httpResponse.body() : null; + } + catch (Exception ex) + { + responseBody = null; + } + return responseBody; + } + @Override public String location() { diff --git a/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/serializer/UnregisterSchemaRequest.java b/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/serializer/UnregisterSchemaRequest.java new file mode 100644 index 0000000000..84ff7d4361 --- /dev/null +++ b/runtime/catalog-schema-registry/src/main/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/serializer/UnregisterSchemaRequest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer; + +import java.io.StringReader; +import java.util.stream.IntStream; + +import jakarta.json.Json; +import jakarta.json.JsonArray; +import jakarta.json.JsonReader; +import jakarta.json.stream.JsonParsingException; + +public class UnregisterSchemaRequest +{ + public static final int[] NO_VERSIONS = new int[0]; + + public int[] resolveResponse( + String response) + { + try + { + JsonReader reader = Json.createReader(new StringReader(response)); + JsonArray array = reader.readArray(); + + return IntStream.range(0, array.size()) + .map(array::getInt) + .toArray(); + } + catch (JsonParsingException ex) + { + return NO_VERSIONS; + } + } +} diff --git a/runtime/catalog-schema-registry/src/test/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryIT.java b/runtime/catalog-schema-registry/src/test/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryIT.java index d22a449690..704c6f25ad 100644 --- a/runtime/catalog-schema-registry/src/test/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryIT.java +++ b/runtime/catalog-schema-registry/src/test/java/io/aklivity/zilla/runtime/catalog/schema/registry/internal/SchemaRegistryIT.java @@ -145,4 +145,15 @@ public void shouldRegisterSchema() throws Exception { k3po.finish(); } + + @Test + @Configuration("unregister/zilla.yaml") + @Specification({ + "${net}/handshake/client", + "${app}/handshake/server", + "${remote}/unregister.schema" }) + public void shouldUnregisterSchema() throws Exception + { + k3po.finish(); + } } diff --git a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java index 65b3aac632..ceabb8fdcf 100644 --- a/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java +++ b/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/catalog/CatalogHandler.java @@ -69,6 +69,12 @@ default int register( return NO_VERSION_ID; } + default int[] unregister( + String subject) + { + return new int[0]; + } + String resolve( int schemaId); diff --git a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java index e7e5a3e590..115764015e 100644 --- a/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java +++ b/runtime/engine/src/test/java/io/aklivity/zilla/runtime/engine/test/internal/binding/TestBindingFactory.java @@ -343,11 +343,15 @@ else if (assertion.schema != null && !assertion.schema.equals(schema)) } else { + if (catalog.subject != null && schema == null) + { + handler.unregister(catalog.subject); + } if (catalog.subject != null && schema != null) { handler.register(catalog.subject, schema); } - else if (catalog.subject != null && catalog.version != null) + else if (catalog.subject != null && catalog.version != null && schema != null) { handler.resolve(catalog.subject, catalog.version); } diff --git a/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/config/unregister/zilla.yaml b/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/config/unregister/zilla.yaml new file mode 100644 index 0000000000..aac3817758 --- /dev/null +++ b/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/config/unregister/zilla.yaml @@ -0,0 +1,31 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +--- +name: test +catalogs: + catalog0: + type: schema-registry + options: + url: http://localhost:8081 +bindings: + net0: + type: test + kind: server + options: + catalog: + catalog0: + - subject: items-snapshots-value + exit: app0 diff --git a/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/streams/unregister.schema.rpt b/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/streams/unregister.schema.rpt new file mode 100644 index 0000000000..98873d56dd --- /dev/null +++ b/specs/catalog-schema-registry.spec/src/main/scripts/io/aklivity/zilla/specs/catalog/schema/registry/streams/unregister.schema.rpt @@ -0,0 +1,33 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "http://localhost:8081/subjects/items-snapshots-value" + +accepted +connected + +read http:method "DELETE" +read http:version "HTTP/1.1" +read http:header "Host" "localhost:8081" + +read closed + +write http:status "200" "OK" +write http:header "content-type" "application/vnd.schemaregistry.v1+json" +write http:content-length + +write '[1]' + +write close