Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
WIP
  • Loading branch information
akrambek committed Oct 4, 2024
commit ffb6b13221b4df2318e7a82680b2bd583cc87d3d
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public enum PgsqlKafkaCommandType
{
CREATE_TOPIC_COMMAND("CREATE TOPIC".getBytes()),
DROP_TOPIC_COMMAND("DROP TOPIC".getBytes()),
UNKNOWN_COMMAND("UNKNOWN".getBytes());

private final byte[] value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
public enum PgsqlKafkaCompletionCommand
{
CREATE_TOPIC_COMMAND("CREATE_TOPIC".getBytes()),
DROP_TOPIC_COMMAND("DROP_TOPIC".getBytes()),
UNKNOWN_COMMAND("UNKNOWN".getBytes());

private final byte[] value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_VERSION_ID;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

import java.io.InputStreamReader;
Expand Down Expand Up @@ -143,6 +144,7 @@ public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory
Object2ObjectHashMap<PgsqlKafkaCommandType, PgsqlDecoder> pgsqlDecoder =
new Object2ObjectHashMap<>();
pgsqlDecoder.put(PgsqlKafkaCommandType.CREATE_TOPIC_COMMAND, this::decodeCreateTopicCommand);
pgsqlDecoder.put(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND, this::decodeDropTopicCommand);
pgsqlDecoder.put(PgsqlKafkaCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand);
this.pgsqlDecoder = pgsqlDecoder;
}
Expand Down Expand Up @@ -232,6 +234,7 @@ private final class PgsqlProxy
private final String database;
private final PgsqlKafkaBindingConfig binding;
private final KafkaCreateTopicsProxy createTopicsProxy;
private final KafkaDeleteTopicsProxy deleteTopicsProxy;

private final IntArrayQueue queries;

Expand Down Expand Up @@ -281,6 +284,7 @@ private PgsqlProxy(
this.queries = new IntArrayQueue();

this.createTopicsProxy = new KafkaCreateTopicsProxy(routedId, resolvedId, this);
this.deleteTopicsProxy = new KafkaDeleteTopicsProxy(routedId, resolvedId, this);
}

private void onAppMessage(
Expand Down Expand Up @@ -488,7 +492,7 @@ private void onCommandCompleted(
doAppWindow(traceId, authorization);
}

public void onKafkaCreateTopicsBegin(
public void onKafkaBegin(
long traceId,
long authorization)
{
Expand Down Expand Up @@ -990,6 +994,80 @@ private void doKafkaBegin(
traceId, authorization, 0, kafkaBeginEx);
}

@Override
protected void onKafkaBegin(
BeginFW begin)
{
final long sequence = begin.sequence();
final long acknowledge = begin.acknowledge();
final long traceId = begin.traceId();
final long authorization = begin.authorization();
final OctetsFW extension = begin.extension();

assert acknowledge <= sequence;
assert sequence >= replySeq;
assert acknowledge >= replyAck;

replySeq = sequence;
replyAck = acknowledge;
state = PgsqlKafkaState.openingReply(state);

assert replyAck <= replySeq;

final ExtensionFW beginEx = extension.get(extensionRO::tryWrap);
final KafkaBeginExFW kafkaBeginEx =
beginEx != null && beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::tryWrap) : null;

boolean errorExits = kafkaBeginEx.response().deleteTopics().topics().anyMatch(t -> t.error() != 0);

if (!errorExits)
{
delegate.onKafkaBegin(traceId, authorization);

doKafkaWindow(traceId, authorization);
doKafkaEnd(traceId, authorization);
}
else
{
delegate.cleanup(traceId, authorization);
}
}
}

private final class KafkaDeleteTopicsProxy extends KafkaProxy
{
private KafkaDeleteTopicsProxy(
long originId,
long routedId,
PgsqlProxy delegate)
{
super(originId, routedId, delegate);
}

private void doKafkaBegin(
long traceId,
long authorization,
List<String> topics)
{
initialSeq = delegate.initialSeq;
initialAck = delegate.initialAck;
initialMax = delegate.initialMax;
state = PgsqlKafkaState.openingInitial(state);

final KafkaBeginExFW kafkaBeginEx =
kafkaBeginExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.request(r -> r
.deleteTopics(c -> c
.names(ct ->
topics.forEach(t -> ct.item(i -> i.set(t, UTF_8))))
.timeout(config.kafkaTopicRequestTimeoutMs())))
.build();

kafka = newKafkaConsumer(this::onKafkaMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, 0, kafkaBeginEx);
}

@Override
protected void onKafkaBegin(
BeginFW begin)
Expand Down Expand Up @@ -1018,7 +1096,7 @@ protected void onKafkaBegin(

if (!errorExits)
{
delegate.onKafkaCreateTopicsBegin(traceId, authorization);
delegate.onKafkaBegin(traceId, authorization);

doKafkaWindow(traceId, authorization);
doKafkaEnd(traceId, authorization);
Expand Down Expand Up @@ -1292,6 +1370,35 @@ else if (server.commandsProcessed == 0)
}
}

private void decodeDropTopicCommand(
PgsqlProxy server,
long traceId,
long authorization,
DirectBuffer buffer,
int offset,
int length)
{
if (server.commandsProcessed == 1)
{
server.onCommandCompleted(traceId, authorization, length, PgsqlKafkaCompletionCommand.DROP_TOPIC_COMMAND);
}
else if (server.commandsProcessed == 0)
{
final CreateTable createTable = (CreateTable) parseStatement(buffer, offset, length);
final String topic = createTable.getTable().getName();

final PgsqlKafkaBindingConfig binding = server.binding;
final String subjectKey = String.format("%s.%s-key", server.database, topic);
final String subjectValue = String.format("%s.%s-value", server.database, topic);

binding.catalog.unregister(subjectKey);
binding.catalog.unregister(subjectValue);

final KafkaDeleteTopicsProxy deleteTopicsProxy = server.deleteTopicsProxy;
deleteTopicsProxy.doKafkaBegin(traceId, authorization, topics);
}
}

private void decodeUnknownCommand(
PgsqlProxy server,
long traceId,
Expand Down Expand Up @@ -1351,6 +1458,13 @@ private Statement parseStatement(
sql = sql.replace("CREATE TOPIC", "CREATE TABLE");
statement = parserManager.parse(new StringReader(sql));
}
if (decodeCommandType(buffer, offset, length).
equals(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND))
{
String sql = buffer.getStringWithoutLengthUtf8(offset, length);
sql = sql.replace("DROP TOPIC", "DROP TABLE");
statement = parserManager.parse(new StringReader(sql));
}
else
{
inputStream.wrap(buffer, offset, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
public class SchemaRegistryCatalogHandler implements CatalogHandler
{
private static final String SUBJECT_VERSION_PATH = "/subjects/{0}/versions/{1}";
private static final String SUBJECT_PATH = "/subjects/{0}/versions";
private static final String REGISTER_SUBJECT_PATH = "/subjects/{0}/versions";
private static final String UNREGISTER_SUBJECT_PATH = "/subjects/{0}";
private static final String SCHEMA_PATH = "/schemas/ids/{0}";

private static final int MAX_PADDING_LENGTH = 5;
Expand Down Expand Up @@ -88,7 +89,7 @@ public int register(
{
int versionId = NO_VERSION_ID;

String response = sendPostHttpRequest(MessageFormat.format(SUBJECT_PATH, subject), schema);
String response = sendPostHttpRequest(MessageFormat.format(REGISTER_SUBJECT_PATH, subject), schema);
if (response != null)
{
versionId = request.resolveResponse(response);
Expand All @@ -97,6 +98,17 @@ public int register(
return versionId;
}

@Override
public void unregister(
String subject)
{
String response = sendDeleteHttpRequest(MessageFormat.format(UNREGISTER_SUBJECT_PATH, subject));
if (response != null)
{
request.resolveResponse(response);
}
}

@Override
public String resolve(
int schemaId)
Expand Down Expand Up @@ -390,6 +402,28 @@ private String sendPostHttpRequest(
return responseBody;
}

private String sendDeleteHttpRequest(
String path)
{
HttpRequest httpRequest = HttpRequest
.newBuilder(toURI(baseUrl, path))
.version(HttpClient.Version.HTTP_1_1)
.DELETE()
.build();

String responseBody;
try
{
HttpResponse<String> httpResponse = client.send(httpRequest, HttpResponse.BodyHandlers.ofString());
responseBody = httpResponse.statusCode() == 200 ? httpResponse.body() : null;
}
catch (Exception ex)
{
responseBody = null;
}
return responseBody;
}

@Override
public String location()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,15 @@ public void shouldRegisterSchema() throws Exception
{
k3po.finish();
}

@Test
@Configuration("unregister/zilla.yaml")
@Specification({
"${net}/handshake/client",
"${app}/handshake/server",
"${remote}/unregister.schema" })
public void shouldUnregisterSchema() throws Exception
{
k3po.finish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ default int register(
return NO_VERSION_ID;
}

default void unregister(
String subject)
{
}

String resolve(
int schemaId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ else if (assertion.schema != null && !assertion.schema.equals(schema))
}
else
{
if (catalog.subject != null && schema == null)
{
handler.unregister(catalog.subject);
}
if (catalog.subject != null && schema != null)
{
handler.register(catalog.subject, schema);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

---
name: test
catalogs:
catalog0:
type: schema-registry
options:
url: http://localhost:8081
bindings:
net0:
type: test
kind: server
options:
catalog:
catalog0:
- subject: items-snapshots-value
exit: app0
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Copyright 2021-2023 Aklivity Inc
#
# Licensed under the Aklivity Community License (the "License"); you may not use
# this file except in compliance with the License. You may obtain a copy of the
# License at
#
# https://www.aklivity.io/aklivity-community-license/
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#

accept "http://localhost:8081/subjects/items-snapshots-value"

accepted
connected

read http:method "DELETE"
read http:version "HTTP/1.1"
read http:header "Host" "localhost:8081"

read closed

write http:status "200" "OK"
write http:header "content-type" "application/vnd.schemaregistry.v1+json"
write http:content-length

write '[1]'

write close