From ab925c84e0e5ffa2c5be7b6f338b4f999e8b2177 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Mon, 16 Sep 2024 21:52:42 +0530 Subject: [PATCH 1/5] create `function` support in `risingwave` binding --- .../risingwave/config/proxy.function.yaml | 30 ++++++++++ .../schema/risingwave.schema.patch.json | 21 ++++++- .../effective/create.function/client.rpt | 56 +++++++++++++++++++ .../effective/create.function/server.rpt | 56 +++++++++++++++++++ .../streams/pgsql/create.function/client.rpt | 54 ++++++++++++++++++ .../streams/pgsql/create.function/server.rpt | 55 ++++++++++++++++++ .../risingwave/streams/EffectiveIT.java | 10 ++++ .../binding/risingwave/streams/PgsqlIT.java | 10 ++++ .../config/RisingwaveOptionConfigBuilder.java | 15 ++++- .../config/RisingwaveOptionsConfig.java | 5 +- .../config/RisingwaveUdfConfig.java | 39 +++++++++++++ .../config/RisingwaveUdfConfigBuilder.java | 51 +++++++++++++++++ .../config/RisingwaveBindingConfig.java | 3 + .../config/RisingwaveCommandType.java | 1 + .../RisingwaveOptionsConfigAdapter.java | 12 ++++ .../config/RisingwaveUdfConfigAdapter.java | 56 +++++++++++++++++++ .../RisingwaveCreateFunctionTemplate.java | 45 +++++++++++++++ .../stream/RisingwaveCompletionCommand.java | 3 +- .../stream/RisingwaveProxyFactory.java | 45 ++++++++++++++- .../risingwave/internal/stream/ProxyIT.java | 10 ++++ 20 files changed, 570 insertions(+), 7 deletions(-) create mode 100644 incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml create mode 100644 incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt create mode 100644 incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt create mode 100644 incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/client.rpt create mode 100644 incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/server.rpt create mode 100644 incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfig.java create mode 100644 incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java create mode 100644 incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveUdfConfigAdapter.java create mode 100644 incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml new file mode 100644 index 0000000000..1d0f88bb74 --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml @@ -0,0 +1,30 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +--- +name: test +bindings: + app0: + type: risingwave + kind: proxy + options: + udf: + url: http://localhost:8815 + routes: + - exit: app1 + when: + - commands: + - "CREATE FUNCTION" + exit: app1 diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json index e94438df33..d2dfcc29aa 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json @@ -59,6 +59,21 @@ "$ref": "#/$defs/converter" } } + }, + "udf": + { + "title": "UDF", + "type": "object", + "properties": + { + "url": + { + "title": "URL", + "type": "string", + "pattern": "^([a-zA-Z0-9\\\\.-]+)(:(\\\\{[a-zA-Z_]+\\\\}|[0-9]+))?$" + } + }, + "additionalProperties": false } }, "additionalProperties": false @@ -83,7 +98,11 @@ "items": { "type": "string", - "enum": [ "CREATE TOPIC" ] + "enum": + [ + "CREATE TOPIC", + "CREATE FUNCTION" + ] } } } diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt new file mode 100644 index 0000000000..afabf8623d --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt @@ -0,0 +1,56 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE FUNCTION IF NOT EXISTS series(int) RETURNS TABLE (x int) " + "AS series" + "LANGUAGE java" + "USING LINK 'http://localhost:8815';" + [0x00] + +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_FUNCTION") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt new file mode 100644 index 0000000000..ce62f96d1c --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt @@ -0,0 +1,56 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE FUNCTION IF NOT EXISTS series(int) RETURNS TABLE (x int) " + "AS series" + "LANGUAGE java" + "USING LINK 'http://localhost:8815';" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_FUNCTION") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/client.rpt new file mode 100644 index 0000000000..3d64898b9c --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/client.rpt @@ -0,0 +1,54 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE FUNCTION IF NOT EXISTS series(int) RETURNS TABLE (x int) " + "AS series using LINK http://localhost:8815;" + [0x00] + +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_FUNCTION") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/server.rpt new file mode 100644 index 0000000000..e4433a89f9 --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/server.rpt @@ -0,0 +1,55 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE FUNCTION IF NOT EXISTS series(int) RETURNS TABLE (x int) " + "AS series;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_FUNCTION") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} diff --git a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java index 71283a1b9c..0e49252489 100644 --- a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java +++ b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java @@ -75,4 +75,14 @@ public void shouldHandleQueryWithMultiStatements() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${app}/create.function/client", + "${app}/create.function/server" + }) + public void shouldCreateFunction() throws Exception + { + k3po.finish(); + } } diff --git a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java index 2d732d2fe5..4da5b7f5f8 100644 --- a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java +++ b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java @@ -75,4 +75,14 @@ public void shouldHandleQueryWithMultiStatements() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${app}/create.function/client", + "${app}/create.function/server" + }) + public void shouldCreateFunction() throws Exception + { + k3po.finish(); + } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionConfigBuilder.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionConfigBuilder.java index 43bda8aa12..da82b2fd3c 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionConfigBuilder.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionConfigBuilder.java @@ -23,6 +23,7 @@ public class RisingwaveOptionConfigBuilder extends ConfigBuilder mapper; private RisingwaveKafkaConfig kafka; + private RisingwaveUdfConfig udf; RisingwaveOptionConfigBuilder( Function mapper) @@ -49,8 +50,20 @@ public RisingwaveKafkaConfigBuilder> kafka() return RisingwaveKafkaConfig.builder(this::kafka); } + public RisingwaveOptionConfigBuilder udf( + RisingwaveUdfConfig udf) + { + this.udf = udf; + return this; + } + + public RisingwaveUdfConfigBuilder> udf() + { + return RisingwaveUdfConfig.builder(this::udf); + } + public T build() { - return mapper.apply(new RisingwaveOptionsConfig(kafka)); + return mapper.apply(new RisingwaveOptionsConfig(kafka, udf)); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionsConfig.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionsConfig.java index c550b3b08b..c47dded112 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionsConfig.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionsConfig.java @@ -19,6 +19,7 @@ public final class RisingwaveOptionsConfig extends OptionsConfig { public final RisingwaveKafkaConfig kafka; + public final RisingwaveUdfConfig udf; public static RisingwaveOptionConfigBuilder builder() { @@ -26,8 +27,10 @@ public static RisingwaveOptionConfigBuilder builder() } RisingwaveOptionsConfig( - RisingwaveKafkaConfig kafka) + RisingwaveKafkaConfig kafka, + RisingwaveUdfConfig udf) { this.kafka = kafka; + this.udf = udf; } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfig.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfig.java new file mode 100644 index 0000000000..dcdac45788 --- /dev/null +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfig.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.risingwave.config; + +import java.util.function.Function; + +public class RisingwaveUdfConfig +{ + public final String url; + + public static RisingwaveUdfConfigBuilder builder() + { + return new RisingwaveUdfConfigBuilder<>(RisingwaveUdfConfig.class::cast); + } + + public static RisingwaveUdfConfigBuilder builder( + Function mapper) + { + return new RisingwaveUdfConfigBuilder<>(mapper); + } + + RisingwaveUdfConfig( + String url) + { + this.url = url; + } +} diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java new file mode 100644 index 0000000000..d6f6a95f3b --- /dev/null +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java @@ -0,0 +1,51 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.risingwave.config; + +import java.util.function.Function; + +import io.aklivity.zilla.runtime.engine.config.ConfigBuilder; + +public class RisingwaveUdfConfigBuilder extends ConfigBuilder> +{ + private final Function mapper; + + private String url; + + RisingwaveUdfConfigBuilder( + Function mapper) + { + this.mapper = mapper; + } + + @Override + @SuppressWarnings("unchecked") + protected Class> thisType() + { + return (Class>) getClass(); + } + + public RisingwaveUdfConfigBuilder url( + String url) + { + this.url = url; + return this; + } + + public T build() + { + return mapper.apply(new RisingwaveUdfConfig(url)); + } +} diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java index 3305c1be09..f37c0d0c84 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java @@ -23,6 +23,7 @@ import io.aklivity.zilla.runtime.binding.risingwave.config.RisingwaveOptionsConfig; import io.aklivity.zilla.runtime.binding.risingwave.internal.RisingwaveConfiguration; +import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateFunctionTemplate; import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateMaterializedViewTemplate; import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateSinkTemplate; import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateSourceTemplate; @@ -47,6 +48,7 @@ public final class RisingwaveBindingConfig public final RisingwaveCreateTableTemplate createTable; public final RisingwaveCreateSourceTemplate createSource; public final RisingwaveCreateSinkTemplate createSink; + public final RisingwaveCreateFunctionTemplate createFunction; public RisingwaveBindingConfig( RisingwaveConfiguration config, @@ -72,6 +74,7 @@ public RisingwaveBindingConfig( this.createTopic = new RisingwaveCreateTopicTemplate(); this.createView = new RisingwaveCreateMaterializedViewTemplate(); this.describeView = new RisingwaveDescribeMaterializedViewTemplate(); + this.createFunction = new RisingwaveCreateFunctionTemplate(options.udf.url); } public RisingwaveRouteConfig resolve( diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java index 28d9e617fc..c0a0409270 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java @@ -21,6 +21,7 @@ public enum RisingwaveCommandType CREATE_TOPIC_COMMAND("CREATE TOPIC".getBytes()), CREATE_TABLE_COMMAND("CREATE TABLE".getBytes()), CREATE_MATERIALIZED_VIEW_COMMAND("CREATE MATERIALIZED VIEW".getBytes()), + CREATE_FUNCTION_COMMAND("CREATE FUNCTION".getBytes()), UNKNOWN_COMMAND("UNKNOWN".getBytes()); private final byte[] value; diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapter.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapter.java index 6f609a035e..89d71983ac 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapter.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapter.java @@ -28,8 +28,10 @@ public final class RisingwaveOptionsConfigAdapter implements OptionsConfigAdapterSpi, JsonbAdapter { private static final String KAFKA_NAME = "kafka"; + private static final String UDF_NAME = "udf"; private final RisingwaveKafkaConfigAdapter kafka = new RisingwaveKafkaConfigAdapter(); + private final RisingwaveUdfConfigAdapter udf = new RisingwaveUdfConfigAdapter(); @Override public Kind kind() @@ -56,6 +58,11 @@ public JsonObject adaptToJson( object.add(KAFKA_NAME, kafka.adaptToJson(options.kafka)); } + if (options.udf != null) + { + object.add(UDF_NAME, udf.adaptToJson(options.udf)); + } + return object.build(); } @@ -69,6 +76,11 @@ public OptionsConfig adaptFromJson( options.kafka(kafka.adaptFromJson(object.getJsonObject(KAFKA_NAME))); } + if (object.containsKey(UDF_NAME)) + { + options.udf(udf.adaptFromJson(object.getJsonObject(UDF_NAME))); + } + return options.build(); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveUdfConfigAdapter.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveUdfConfigAdapter.java new file mode 100644 index 0000000000..697c098284 --- /dev/null +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveUdfConfigAdapter.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.risingwave.internal.config; + +import jakarta.json.Json; +import jakarta.json.JsonObject; +import jakarta.json.JsonObjectBuilder; +import jakarta.json.bind.adapter.JsonbAdapter; + +import io.aklivity.zilla.runtime.binding.risingwave.config.RisingwaveUdfConfig; +import io.aklivity.zilla.runtime.binding.risingwave.config.RisingwaveUdfConfigBuilder; + +public final class RisingwaveUdfConfigAdapter implements JsonbAdapter +{ + private static final String URL_NAME = "url"; + + @Override + public JsonObject adaptToJson( + RisingwaveUdfConfig udf) + { + JsonObjectBuilder object = Json.createObjectBuilder(); + + if (udf.url != null) + { + object.add(URL_NAME, udf.url); + } + + return object.build(); + } + + @Override + public RisingwaveUdfConfig adaptFromJson( + JsonObject object) + { + RisingwaveUdfConfigBuilder builder = RisingwaveUdfConfig.builder(); + + if (object.containsKey(URL_NAME)) + { + builder.url(object.getString(URL_NAME)); + } + + return builder.build(); + } +} diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java new file mode 100644 index 0000000000..eb48160624 --- /dev/null +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java @@ -0,0 +1,45 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.risingwave.internal.statement; + +import net.sf.jsqlparser.statement.Statement; +import net.sf.jsqlparser.statement.create.function.CreateFunction; + +public class RisingwaveCreateFunctionTemplate extends RisingwaveCommandTemplate +{ + private final String sqlFormat = """ + CREATE FUNCTION IF NOT EXISTS IF NOT EXISTS %s(%s) + RETURNS %s + LANGUAGE %s + AS %s + USING LINK '%s'; + \u0000"""; + + private final String link; + + public RisingwaveCreateFunctionTemplate( + String link) + { + this.link = link; + } + + public String generate( + Statement statement) + { + CreateFunction createFunction = (CreateFunction) statement; + + return null; //sqlFormat.formatted(name, parameters, returnType, lang, as, link); + } +} diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java index 2224cff8a6..b80098e3e8 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java @@ -18,7 +18,8 @@ public enum RisingwaveCompletionCommand { UNKNOWN_COMMAND("UNKNOWN".getBytes()), CREATE_TABLE_COMMAND("CREATE_TABLE".getBytes()), - CREATE_MATERIALIZED_VIEW_COMMAND("CREATE_MATERIALIZED_VIEW".getBytes()); + CREATE_MATERIALIZED_VIEW_COMMAND("CREATE_MATERIALIZED_VIEW".getBytes()), + CREATE_FUNCTION_COMMAND("CREATE_FUNCTION".getBytes()); private final byte[] value; diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java index ba1d815e2f..346496f691 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java @@ -64,6 +64,7 @@ import io.aklivity.zilla.runtime.engine.config.BindingConfig; import net.sf.jsqlparser.parser.CCJSqlParserManager; import net.sf.jsqlparser.statement.Statement; +import net.sf.jsqlparser.statement.create.function.CreateFunction; import net.sf.jsqlparser.statement.create.table.CreateTable; import net.sf.jsqlparser.statement.create.view.CreateView; @@ -141,6 +142,7 @@ public final class RisingwaveProxyFactory implements RisingwaveStreamFactory new Object2ObjectHashMap<>(); clientTransforms.put(RisingwaveCommandType.CREATE_TABLE_COMMAND, this::onDecodeCreateTableCommand); clientTransforms.put(RisingwaveCommandType.CREATE_MATERIALIZED_VIEW_COMMAND, this::onDecodeCreateMaterializedViewCommand); + clientTransforms.put(RisingwaveCommandType.CREATE_FUNCTION_COMMAND, this::onDecodeCreateFunctionCommand); clientTransforms.put(RisingwaveCommandType.UNKNOWN_COMMAND, this::onDecodeUnknownCommand); this.clientTransforms = clientTransforms; } @@ -1533,6 +1535,43 @@ else if (server.commandsProcessed == 3) } } + private void onDecodeCreateFunctionCommand( + PgsqlServer server, + long traceId, + long authorization, + DirectBuffer buffer, + int offset, + int length) + { + if (server.commandsProcessed == 1) + { + server.onCommandCompleted(traceId, authorization, length, RisingwaveCompletionCommand.CREATE_FUNCTION_COMMAND); + } + else + { + final RisingwaveBindingConfig binding = server.binding; + final CreateFunction statement = (CreateFunction) parseStatement(buffer, offset, length); + + String newStatement = ""; + int progress = 0; + + if (server.commandsProcessed == 0) + { + newStatement = binding.createFunction.generate(statement); + } + + statementBuffer.putBytes(progress, newStatement.getBytes()); + progress += newStatement.length(); + + final RisingwaveRouteConfig route = + server.binding.resolve(authorization, statementBuffer, 0, progress); + + final PgsqlClient client = server.streamsByRouteIds.get(route.id); + client.doPgsqlQuery(traceId, authorization, statementBuffer, 0, progress); + client.typeCommand = ignoreFlushCommand; + } + } + private void onDecodeUnknownCommand( PgsqlServer server, long traceId, @@ -1635,8 +1674,8 @@ private Statement parseStatement( try { //TODO: Try to generalize it - if (decodeCommandType(buffer, offset, length). - equals(RisingwaveCommandType.CREATE_MATERIALIZED_VIEW_COMMAND)) + RisingwaveCommandType commandType = decodeCommandType(buffer, offset, length); + if (commandType.equals(RisingwaveCommandType.CREATE_MATERIALIZED_VIEW_COMMAND)) { String sql = buffer.getStringWithoutLengthUtf8(offset, length); // Replace "CREATE MATERIALIZED VIEW" with "CREATE VIEW" for compatibility @@ -1653,7 +1692,7 @@ private Statement parseStatement( } catch (Exception ignored) { - //NOOP + ignored.printStackTrace(); } return statement; diff --git a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java index 714d982c90..615c8555de 100644 --- a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java +++ b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java @@ -90,4 +90,14 @@ public void shouldCreateMaterializedView() throws Exception { k3po.finish(); } + + @Test + @Configuration("proxy.function.yaml") + @Specification({ + "${pgsql}/create.function/client", + "${effective}/create.function/server" }) + public void shouldCreateFunction() throws Exception + { + k3po.finish(); + } } From 88e040cd7214f5028a22872075dc48fdd737e7ff Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Tue, 17 Sep 2024 19:25:36 +0530 Subject: [PATCH 2/5] addressing review comments --- .../risingwave/config/proxy.function.yaml | 20 +++++++---- .../schema/risingwave.schema.patch.json | 34 +++++++++++-------- .../effective/create.function/client.rpt | 6 ++-- .../effective/create.function/server.rpt | 6 ++-- .../streams/pgsql/create.function/client.rpt | 4 +-- .../streams/pgsql/create.function/server.rpt | 4 +-- .../config/RisingwaveOptionConfigBuilder.java | 12 +++++-- .../config/RisingwaveOptionsConfig.java | 8 +++-- .../config/RisingwaveUdfConfig.java | 9 +++-- .../config/RisingwaveUdfConfigBuilder.java | 19 ++++++++--- .../config/RisingwaveBindingConfig.java | 2 +- .../RisingwaveOptionsConfigAdapter.java | 12 +++++-- .../config/RisingwaveUdfConfigAdapter.java | 21 +++++++++--- .../RisingwaveCreateFunctionTemplate.java | 32 ++++++++++------- .../stream/RisingwaveProxyFactory.java | 11 ++++++ .../RisingwaveOptionsConfigAdapterTest.java | 12 ++++++- 16 files changed, 146 insertions(+), 66 deletions(-) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml index 1d0f88bb74..c35c368626 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml @@ -15,16 +15,24 @@ --- name: test +catalogs: + catalog0: + type: test + options: + url: http://localhost:8081 bindings: app0: type: risingwave kind: proxy options: udf: - url: http://localhost:8815 - routes: - - exit: app1 - when: - - commands: - - "CREATE FUNCTION" + - server: http://localhost:8815 + kafka: + properties: + bootstrap.server: localhost:9092 + format: + model: test + catalog: + catalog0: + - strategy: topic exit: app1 diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json index d2dfcc29aa..d322cb17c5 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json @@ -63,17 +63,27 @@ "udf": { "title": "UDF", - "type": "object", - "properties": + "type": "array", + "items": { - "url": + "type": "object", + "properties": { - "title": "URL", - "type": "string", - "pattern": "^([a-zA-Z0-9\\\\.-]+)(:(\\\\{[a-zA-Z_]+\\\\}|[0-9]+))?$" - } - }, - "additionalProperties": false + "server": + { + "title": "Server", + "type": "string", + "pattern": "^([a-zA-Z0-9\\\\.-]+)(:(\\\\{[a-zA-Z_]+\\\\}|[0-9]+))?$" + }, + "language": + { + "title": "Language", + "type": "string", + "default": "java" + } + }, + "additionalProperties": false + } } }, "additionalProperties": false @@ -98,11 +108,7 @@ "items": { "type": "string", - "enum": - [ - "CREATE TOPIC", - "CREATE FUNCTION" - ] + "enum": [ "CREATE TOPIC" ] } } } diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt index afabf8623d..ab7ceebb46 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt @@ -32,9 +32,9 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "CREATE FUNCTION IF NOT EXISTS series(int) RETURNS TABLE (x int) " - "AS series" - "LANGUAGE java" +write "CREATE FUNCTION series ( int ) RETURNS TABLE ( x int ) " + "AS series " + "LANGUAGE java " "USING LINK 'http://localhost:8815';" [0x00] diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt index ce62f96d1c..d22b8ad016 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt @@ -34,9 +34,9 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE FUNCTION IF NOT EXISTS series(int) RETURNS TABLE (x int) " - "AS series" - "LANGUAGE java" +read "CREATE FUNCTION series ( int ) RETURNS TABLE ( x int ) " + "AS series " + "LANGUAGE java " "USING LINK 'http://localhost:8815';" [0x00] diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/client.rpt index 3d64898b9c..8f7384ab62 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/client.rpt @@ -32,8 +32,8 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "CREATE FUNCTION IF NOT EXISTS series(int) RETURNS TABLE (x int) " - "AS series using LINK http://localhost:8815;" +write "CREATE FUNCTION series(int) RETURNS TABLE (x int) " + "AS series;" [0x00] write flush diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/server.rpt index e4433a89f9..073520fff6 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.function/server.rpt @@ -36,8 +36,8 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE FUNCTION IF NOT EXISTS series(int) RETURNS TABLE (x int) " - "AS series;" +read "CREATE FUNCTION series(int) RETURNS TABLE (x int) " + "AS series;" [0x00] write advise zilla:flush ${pgsql:flushEx() diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionConfigBuilder.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionConfigBuilder.java index da82b2fd3c..04ab007e0f 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionConfigBuilder.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionConfigBuilder.java @@ -14,6 +14,8 @@ */ package io.aklivity.zilla.runtime.binding.risingwave.config; +import java.util.ArrayList; +import java.util.List; import java.util.function.Function; import io.aklivity.zilla.runtime.engine.config.ConfigBuilder; @@ -23,7 +25,7 @@ public class RisingwaveOptionConfigBuilder extends ConfigBuilder mapper; private RisingwaveKafkaConfig kafka; - private RisingwaveUdfConfig udf; + private List udfs; RisingwaveOptionConfigBuilder( Function mapper) @@ -53,7 +55,11 @@ public RisingwaveKafkaConfigBuilder> kafka() public RisingwaveOptionConfigBuilder udf( RisingwaveUdfConfig udf) { - this.udf = udf; + if (udfs == null) + { + udfs = new ArrayList<>(); + } + udfs.add(udf); return this; } @@ -64,6 +70,6 @@ public RisingwaveUdfConfigBuilder> udf() public T build() { - return mapper.apply(new RisingwaveOptionsConfig(kafka, udf)); + return mapper.apply(new RisingwaveOptionsConfig(kafka, udfs)); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionsConfig.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionsConfig.java index c47dded112..c663193266 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionsConfig.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveOptionsConfig.java @@ -14,12 +14,14 @@ */ package io.aklivity.zilla.runtime.binding.risingwave.config; +import java.util.List; + import io.aklivity.zilla.runtime.engine.config.OptionsConfig; public final class RisingwaveOptionsConfig extends OptionsConfig { public final RisingwaveKafkaConfig kafka; - public final RisingwaveUdfConfig udf; + public final List udfs; public static RisingwaveOptionConfigBuilder builder() { @@ -28,9 +30,9 @@ public static RisingwaveOptionConfigBuilder builder() RisingwaveOptionsConfig( RisingwaveKafkaConfig kafka, - RisingwaveUdfConfig udf) + List udfs) { this.kafka = kafka; - this.udf = udf; + this.udfs = udfs; } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfig.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfig.java index dcdac45788..a97cd8e261 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfig.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfig.java @@ -18,7 +18,8 @@ public class RisingwaveUdfConfig { - public final String url; + public final String server; + public final String language; public static RisingwaveUdfConfigBuilder builder() { @@ -32,8 +33,10 @@ public static RisingwaveUdfConfigBuilder builder( } RisingwaveUdfConfig( - String url) + String server, + String language) { - this.url = url; + this.server = server; + this.language = language; } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java index d6f6a95f3b..79e10536a4 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java @@ -22,7 +22,8 @@ public class RisingwaveUdfConfigBuilder extends ConfigBuilder mapper; - private String url; + private String server; + private String language; RisingwaveUdfConfigBuilder( Function mapper) @@ -37,15 +38,23 @@ protected Class> thisType() return (Class>) getClass(); } - public RisingwaveUdfConfigBuilder url( - String url) + public RisingwaveUdfConfigBuilder server( + String server) { - this.url = url; + this.server = server; + return this; + } + + public RisingwaveUdfConfigBuilder language( + String language) + { + this.language = language; return this; } public T build() { - return mapper.apply(new RisingwaveUdfConfig(url)); + String language = this.language != null ? this.language : "java"; + return mapper.apply(new RisingwaveUdfConfig(server, language)); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java index f37c0d0c84..aa3a1140df 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java @@ -74,7 +74,7 @@ public RisingwaveBindingConfig( this.createTopic = new RisingwaveCreateTopicTemplate(); this.createView = new RisingwaveCreateMaterializedViewTemplate(); this.describeView = new RisingwaveDescribeMaterializedViewTemplate(); - this.createFunction = new RisingwaveCreateFunctionTemplate(options.udf.url); + this.createFunction = new RisingwaveCreateFunctionTemplate(options.udfs.get(0)); } public RisingwaveRouteConfig resolve( diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapter.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapter.java index 89d71983ac..98871da4ad 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapter.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapter.java @@ -15,6 +15,8 @@ package io.aklivity.zilla.runtime.binding.risingwave.internal.config; import jakarta.json.Json; +import jakarta.json.JsonArray; +import jakarta.json.JsonArrayBuilder; import jakarta.json.JsonObject; import jakarta.json.JsonObjectBuilder; import jakarta.json.bind.adapter.JsonbAdapter; @@ -58,9 +60,12 @@ public JsonObject adaptToJson( object.add(KAFKA_NAME, kafka.adaptToJson(options.kafka)); } - if (options.udf != null) + if (options.udfs != null && !options.udfs.isEmpty()) { - object.add(UDF_NAME, udf.adaptToJson(options.udf)); + JsonArrayBuilder udfs = Json.createArrayBuilder(); + options.udfs.forEach(f -> udfs.add(udf.adaptToJson(f))); + + object.add(UDF_NAME, udfs); } return object.build(); @@ -78,7 +83,8 @@ public OptionsConfig adaptFromJson( if (object.containsKey(UDF_NAME)) { - options.udf(udf.adaptFromJson(object.getJsonObject(UDF_NAME))); + JsonArray udfs = object.getJsonArray(UDF_NAME); + udfs.forEach(f -> options.udf(udf.adaptFromJson((JsonObject) f))); } return options.build(); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveUdfConfigAdapter.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveUdfConfigAdapter.java index 697c098284..03b6754f30 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveUdfConfigAdapter.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveUdfConfigAdapter.java @@ -24,7 +24,8 @@ public final class RisingwaveUdfConfigAdapter implements JsonbAdapter { - private static final String URL_NAME = "url"; + private static final String SERVER_NAME = "server"; + private static final String LANGUAGE_NAME = "language"; @Override public JsonObject adaptToJson( @@ -32,9 +33,14 @@ public JsonObject adaptToJson( { JsonObjectBuilder object = Json.createObjectBuilder(); - if (udf.url != null) + if (udf.server != null) { - object.add(URL_NAME, udf.url); + object.add(SERVER_NAME, udf.server); + } + + if (udf.language != null) + { + object.add(LANGUAGE_NAME, udf.language); } return object.build(); @@ -46,9 +52,14 @@ public RisingwaveUdfConfig adaptFromJson( { RisingwaveUdfConfigBuilder builder = RisingwaveUdfConfig.builder(); - if (object.containsKey(URL_NAME)) + if (object.containsKey(SERVER_NAME)) + { + builder.server(object.getString(SERVER_NAME)); + } + + if (object.containsKey(LANGUAGE_NAME)) { - builder.url(object.getString(URL_NAME)); + builder.language(object.getString(LANGUAGE_NAME)); } return builder.build(); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java index eb48160624..dc4f17a26a 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java @@ -14,32 +14,40 @@ */ package io.aklivity.zilla.runtime.binding.risingwave.internal.statement; +import java.util.List; + +import io.aklivity.zilla.runtime.binding.risingwave.config.RisingwaveUdfConfig; import net.sf.jsqlparser.statement.Statement; import net.sf.jsqlparser.statement.create.function.CreateFunction; public class RisingwaveCreateFunctionTemplate extends RisingwaveCommandTemplate { - private final String sqlFormat = """ - CREATE FUNCTION IF NOT EXISTS IF NOT EXISTS %s(%s) - RETURNS %s - LANGUAGE %s - AS %s - USING LINK '%s'; - \u0000"""; - - private final String link; + private final RisingwaveUdfConfig udf; public RisingwaveCreateFunctionTemplate( - String link) + RisingwaveUdfConfig udf) { - this.link = link; + this.udf = udf; } public String generate( Statement statement) { CreateFunction createFunction = (CreateFunction) statement; + List parts = createFunction.getFunctionDeclarationParts(); + if (!parts.stream().anyMatch(item -> item.equalsIgnoreCase("LANGUAGE"))) + { + createFunction.addFunctionDeclarationParts("LANGUAGE", udf.language); + } + + if (!parts.stream().anyMatch(item -> item.equalsIgnoreCase("LINK"))) + { + createFunction.addFunctionDeclarationParts("USING", "LINK", "'%s'".formatted(udf.server)); + } + + parts.removeIf(part -> part.equals(";")); + createFunction.addFunctionDeclarationParts(";"); - return null; //sqlFormat.formatted(name, parameters, returnType, lang, as, link); + return createFunction.toString(); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java index 346496f691..f87d02813f 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java @@ -1684,6 +1684,17 @@ private Statement parseStatement( sql = sql.replace("IF NOT EXISTS", ""); statement = parserManager.parse(new StringReader(sql)); } + else if (commandType.equals(RisingwaveCommandType.CREATE_FUNCTION_COMMAND)) + { + if (buffer.getByte(offset + length + Byte.BYTES) == END_OF_FIELD) + { + length -= Byte.BYTES; + } + inputStream.wrap(buffer, offset, length); + statement = parserManager.parse(reader); + /*String sql = buffer.getStringWithoutLengthUtf8(offset, length); + statement = parserManager.parse(new StringReader(sql));*/ + } else { inputStream.wrap(buffer, offset, length); diff --git a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapterTest.java b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapterTest.java index 3c436d6a23..7030baf210 100644 --- a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapterTest.java +++ b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapterTest.java @@ -82,7 +82,17 @@ public void shouldReadOptions() ] } } - } + }, + "udf": [ + { + "server": "http://udf.zillabase.dev:8815", + "language": "java" + }, + { + "server": "http://udf-python.zillabase.dev:8815", + "language": "python" + } + ] }"""; RisingwaveOptionsConfig options = jsonb.fromJson(text, RisingwaveOptionsConfig.class); From fa6102147736f72e8725f0116e91a71c8528e3fe Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Wed, 18 Sep 2024 09:46:08 +0530 Subject: [PATCH 3/5] IT fix --- .../effective/create.function/client.rpt | 7 +-- .../effective/create.function/server.rpt | 7 +-- .../config/RisingwaveBindingConfig.java | 35 +++++++++---- .../RisingwaveCreateFunctionTemplate.java | 51 +++++++++++++++---- .../stream/RisingwaveProxyFactory.java | 7 +-- 5 files changed, 77 insertions(+), 30 deletions(-) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt index ab7ceebb46..e07f659466 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/client.rpt @@ -32,9 +32,10 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "CREATE FUNCTION series ( int ) RETURNS TABLE ( x int ) " - "AS series " - "LANGUAGE java " +write "CREATE FUNCTION series(int)\n" + "RETURNS TABLE (x int)\n" + "AS series\n" + "LANGUAGE java\n" "USING LINK 'http://localhost:8815';" [0x00] diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt index d22b8ad016..885d785192 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.function/server.rpt @@ -34,9 +34,10 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE FUNCTION series ( int ) RETURNS TABLE ( x int ) " - "AS series " - "LANGUAGE java " +read "CREATE FUNCTION series(int)\n" + "RETURNS TABLE (x int)\n" + "AS series\n" + "LANGUAGE java\n" "USING LINK 'http://localhost:8815';" [0x00] diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java index aa3a1140df..49b1b8e8e3 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java @@ -22,6 +22,7 @@ import org.agrona.DirectBuffer; import io.aklivity.zilla.runtime.binding.risingwave.config.RisingwaveOptionsConfig; +import io.aklivity.zilla.runtime.binding.risingwave.config.RisingwaveUdfConfig; import io.aklivity.zilla.runtime.binding.risingwave.internal.RisingwaveConfiguration; import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateFunctionTemplate; import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.RisingwaveCreateMaterializedViewTemplate; @@ -61,20 +62,34 @@ public RisingwaveBindingConfig( this.kind = binding.kind; this.routes = binding.routes.stream().map(RisingwaveRouteConfig::new).collect(toList()); - final CatalogedConfig cataloged = options.kafka.format.cataloged.get(0); - cataloged.id = binding.resolveId.applyAsLong(cataloged.name); + String bootstrapServer = null; + String location = null; + RisingwaveUdfConfig udf = null; - final CatalogHandler catalogHandler = supplyCatalog.apply(cataloged.id); - this.createTable = new RisingwaveCreateTableTemplate(options.kafka.properties.bootstrapServer, - catalogHandler.location(), config.kafkaScanStartupTimestampMillis()); - this.createSource = new RisingwaveCreateSourceTemplate(options.kafka.properties.bootstrapServer, - catalogHandler.location(), config.kafkaScanStartupTimestampMillis()); - this.createSink = new RisingwaveCreateSinkTemplate( - options.kafka.properties.bootstrapServer, catalogHandler.location()); + if (options.kafka != null) + { + final CatalogedConfig cataloged = options.kafka.format.cataloged.get(0); + cataloged.id = binding.resolveId.applyAsLong(cataloged.name); + + final CatalogHandler catalogHandler = supplyCatalog.apply(cataloged.id); + bootstrapServer = options.kafka.properties.bootstrapServer; + location = catalogHandler.location(); + } + + if (options.udfs != null) + { + udf = options.udfs.get(0); + } + + this.createTable = new RisingwaveCreateTableTemplate(bootstrapServer, + location, config.kafkaScanStartupTimestampMillis()); + this.createSource = new RisingwaveCreateSourceTemplate(bootstrapServer, + location, config.kafkaScanStartupTimestampMillis()); + this.createSink = new RisingwaveCreateSinkTemplate(bootstrapServer, location); this.createTopic = new RisingwaveCreateTopicTemplate(); this.createView = new RisingwaveCreateMaterializedViewTemplate(); this.describeView = new RisingwaveDescribeMaterializedViewTemplate(); - this.createFunction = new RisingwaveCreateFunctionTemplate(options.udfs.get(0)); + this.createFunction = new RisingwaveCreateFunctionTemplate(udf); } public RisingwaveRouteConfig resolve( diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java index dc4f17a26a..33cb6a958a 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateFunctionTemplate.java @@ -22,6 +22,13 @@ public class RisingwaveCreateFunctionTemplate extends RisingwaveCommandTemplate { + private final String sqlFormat = """ + CREATE FUNCTION %s(%s) + RETURNS %s + AS %s + LANGUAGE %s + USING LINK '%s';\u0000"""; + private final RisingwaveUdfConfig udf; public RisingwaveCreateFunctionTemplate( @@ -35,19 +42,45 @@ public String generate( { CreateFunction createFunction = (CreateFunction) statement; List parts = createFunction.getFunctionDeclarationParts(); - if (!parts.stream().anyMatch(item -> item.equalsIgnoreCase("LANGUAGE"))) - { - createFunction.addFunctionDeclarationParts("LANGUAGE", udf.language); - } - if (!parts.stream().anyMatch(item -> item.equalsIgnoreCase("LINK"))) + String functionName = parts.get(0); + + int paramStartIndex = parts.indexOf("("); + int paramEndIndex = parts.indexOf(")"); + String parameters = paramStartIndex >= 0 && paramEndIndex > paramStartIndex + ? String.join(" ", parts.subList(paramStartIndex + 1, paramEndIndex)) + : ""; + + int returnsIndex = parts.indexOf("RETURNS"); + String returnType = returnsIndex >= 0 ? parts.get(returnsIndex + 1) : ""; + + if ("TABLE".equalsIgnoreCase(returnType)) { - createFunction.addFunctionDeclarationParts("USING", "LINK", "'%s'".formatted(udf.server)); + int tableStartIndex = -1; + int tableEndIndex = -1; + for (int i = returnsIndex; i < parts.size(); i++) + { + if (parts.get(i).equals("(")) + { + tableStartIndex = i; + } + else if (parts.get(i).equals(")")) + { + tableEndIndex = i; + break; + } + } + + if (tableStartIndex >= 0 && tableEndIndex > tableStartIndex) + { + String tableDefinition = String.join(" ", parts.subList(tableStartIndex + 1, tableEndIndex)); + returnType += " (" + tableDefinition + ")"; + } } - parts.removeIf(part -> part.equals(";")); - createFunction.addFunctionDeclarationParts(";"); + int asIndex = parts.indexOf("AS"); + String body = asIndex >= 0 ? parts.get(asIndex + 1) : ""; - return createFunction.toString(); + return sqlFormat.formatted(functionName, parameters, returnType, body, udf.language, udf.server); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java index f87d02813f..3568713529 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java @@ -1690,10 +1690,8 @@ else if (commandType.equals(RisingwaveCommandType.CREATE_FUNCTION_COMMAND)) { length -= Byte.BYTES; } - inputStream.wrap(buffer, offset, length); - statement = parserManager.parse(reader); - /*String sql = buffer.getStringWithoutLengthUtf8(offset, length); - statement = parserManager.parse(new StringReader(sql));*/ + String sql = buffer.getStringWithoutLengthUtf8(offset, length); + statement = parserManager.parse(new StringReader(sql)); } else { @@ -1703,7 +1701,6 @@ else if (commandType.equals(RisingwaveCommandType.CREATE_FUNCTION_COMMAND)) } catch (Exception ignored) { - ignored.printStackTrace(); } return statement; From acc945f9d3ff58c1bd06611c281264f83cf728d6 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Wed, 18 Sep 2024 11:13:58 +0530 Subject: [PATCH 4/5] addressing review comments & config validation tests --- .../risingwave/config/proxy.function.yaml | 13 ----- .../schema/risingwave.schema.patch.json | 4 +- .../binding/risingwave/config/SchemaTest.java | 52 +++++++++++++++++++ .../config/RisingwaveUdfConfigBuilder.java | 4 +- .../RisingwaveOptionsConfigAdapterTest.java | 15 ++++-- 5 files changed, 70 insertions(+), 18 deletions(-) create mode 100644 incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/config/SchemaTest.java diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml index c35c368626..4eb1dc5d79 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/config/proxy.function.yaml @@ -15,11 +15,6 @@ --- name: test -catalogs: - catalog0: - type: test - options: - url: http://localhost:8081 bindings: app0: type: risingwave @@ -27,12 +22,4 @@ bindings: options: udf: - server: http://localhost:8815 - kafka: - properties: - bootstrap.server: localhost:9092 - format: - model: test - catalog: - catalog0: - - strategy: topic exit: app1 diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json index d322cb17c5..b9fada8a4b 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json @@ -83,7 +83,9 @@ } }, "additionalProperties": false - } + }, + "minItems": 1, + "maxItems": 1 } }, "additionalProperties": false diff --git a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/config/SchemaTest.java b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/config/SchemaTest.java new file mode 100644 index 0000000000..e7d165aec9 --- /dev/null +++ b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/config/SchemaTest.java @@ -0,0 +1,52 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.specs.binding.risingwave.config; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; + +import jakarta.json.JsonObject; + +import org.junit.Rule; +import org.junit.Test; + +import io.aklivity.zilla.specs.engine.config.ConfigSchemaRule; + +public class SchemaTest +{ + @Rule + public final ConfigSchemaRule schema = new ConfigSchemaRule() + .schemaPatch("io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json") + .schemaPatch("io/aklivity/zilla/specs/engine/schema/catalog/test.schema.patch.json") + .schemaPatch("io/aklivity/zilla/specs/engine/schema/model/test.schema.patch.json") + .configurationRoot("io/aklivity/zilla/specs/binding/risingwave/config"); + + @Test + public void shouldValidateOptionsUdf() + { + JsonObject config = schema.validate("proxy.function.yaml"); + + assertThat(config, not(nullValue())); + } + + @Test + public void shouldValidateOptionsKafka() + { + JsonObject config = schema.validate("proxy.yaml"); + + assertThat(config, not(nullValue())); + } +} diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java index 79e10536a4..092a5f0741 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/config/RisingwaveUdfConfigBuilder.java @@ -20,6 +20,8 @@ public class RisingwaveUdfConfigBuilder extends ConfigBuilder> { + private static final String LANGUAGE_DEFAULT = "java"; + private final Function mapper; private String server; @@ -54,7 +56,7 @@ public RisingwaveUdfConfigBuilder language( public T build() { - String language = this.language != null ? this.language : "java"; + String language = this.language != null ? this.language : LANGUAGE_DEFAULT; return mapper.apply(new RisingwaveUdfConfig(server, language)); } } diff --git a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapterTest.java b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapterTest.java index 7030baf210..f3e01d23ae 100644 --- a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapterTest.java +++ b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveOptionsConfigAdapterTest.java @@ -36,6 +36,7 @@ import io.aklivity.zilla.runtime.binding.risingwave.config.RisingwaveKafkaConfig; import io.aklivity.zilla.runtime.binding.risingwave.config.RisingwaveKafkaPropertiesConfig; import io.aklivity.zilla.runtime.binding.risingwave.config.RisingwaveOptionsConfig; +import io.aklivity.zilla.runtime.binding.risingwave.config.RisingwaveUdfConfig; import io.aklivity.zilla.runtime.engine.config.CatalogedConfig; import io.aklivity.zilla.runtime.engine.config.ConfigAdapterContext; import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapter; @@ -85,8 +86,7 @@ public void shouldReadOptions() }, "udf": [ { - "server": "http://udf.zillabase.dev:8815", - "language": "java" + "server": "http://udf.zillabase.dev:8815" }, { "server": "http://udf-python.zillabase.dev:8815", @@ -98,12 +98,15 @@ public void shouldReadOptions() RisingwaveOptionsConfig options = jsonb.fromJson(text, RisingwaveOptionsConfig.class); assertThat(options, not(nullValue())); + assertEquals(options.udfs.get(0).server, "http://udf.zillabase.dev:8815"); + assertEquals(options.udfs.get(0).language, "java"); } @Test public void shouldWriteOptions() { - String expected = "{\"kafka\":{\"properties\":{\"bootstrap.server\":\"localhost:9092\"},\"format\":\"test\"}}"; + String expected = "{\"kafka\":{\"properties\":{\"bootstrap.server\":\"localhost:9092\"},\"format\":\"test\"}," + + "\"udf\":[{\"server\":\"http://udf-python.zillabase.dev:8815\",\"language\":\"python\"}]}"; RisingwaveKafkaPropertiesConfig properties = RisingwaveKafkaPropertiesConfig.builder() .inject(identity()) @@ -122,10 +125,16 @@ public void shouldWriteOptions() .format(model) .build(); + RisingwaveUdfConfig udf = RisingwaveUdfConfig.builder() + .server("http://udf-python.zillabase.dev:8815") + .language("python") + .build(); + RisingwaveOptionsConfig options = RisingwaveOptionsConfig.builder() .inject(identity()) .kafka(kafka) + .udf(udf) .build(); String text = jsonb.toJson(options); From a5f7bfd92b0f886873831b394781f392b4fe345a Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Wed, 18 Sep 2024 20:13:43 +0530 Subject: [PATCH 5/5] `"const": "java"` for `language` --- .../binding/risingwave/schema/risingwave.schema.patch.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json index b9fada8a4b..1a58ec38b4 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/schema/risingwave.schema.patch.json @@ -79,7 +79,8 @@ { "title": "Language", "type": "string", - "default": "java" + "default": "java", + "const": "java" } }, "additionalProperties": false