diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/client.rpt new file mode 100644 index 0000000000..10aa0d39ab --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/client.rpt @@ -0,0 +1,103 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE SOURCE IF NOT EXISTS weather (*)\n" + "INCLUDE header 'zilla:correlation-id' AS correlation_id\n" + "INCLUDE timestamp AS timestamp\n" + "INCLUDE header 'zilla:identity' AS owner_id\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.weather',\n" + " scan.startup.mode='latest',\n" + " scan.startup.timestamp.millis='140000000'\n" + ") FORMAT PLAIN ENCODE AVRO (\n" + " schema.registry = 'http://localhost:8081'\n" + ");" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SOURCE") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +connect "zilla://streams/app2" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE TOPIC IF NOT EXISTS weather " + "(city VARCHAR, temperature DOUBLE, date DATE);" + [0x00] + +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TOPIC") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/server.rpt new file mode 100644 index 0000000000..ff8e4dec8c --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/server.rpt @@ -0,0 +1,109 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +read await CREATE_TOPIC_COMPLETED + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE SOURCE IF NOT EXISTS weather (*)\n" + "INCLUDE header 'zilla:correlation-id' AS correlation_id\n" + "INCLUDE timestamp AS timestamp\n" + "INCLUDE header 'zilla:identity' AS owner_id\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.weather',\n" + " scan.startup.mode='latest',\n" + " scan.startup.timestamp.millis='140000000'\n" + ") FORMAT PLAIN ENCODE AVRO (\n" + " schema.registry = 'http://localhost:8081'\n" + ");" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SOURCE") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + + +accept "zilla://streams/app2" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE TOPIC IF NOT EXISTS weather " + "(city VARCHAR, temperature DOUBLE, date DATE);" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TOPIC") + .build() + .build()} + +write notify CREATE_TOPIC_COMPLETED + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt new file mode 100644 index 0000000000..e702643dc8 --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt @@ -0,0 +1,109 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE TABLE IF NOT EXISTS cities (\n" + " *,\n" + " PRIMARY KEY (key)\n" + ")\n" + "INCLUDE KEY AS key\n" + "INCLUDE header 'zilla:correlation-id' AS correlation_id\n" + "INCLUDE timestamp AS timestamp\n" + "INCLUDE header 'zilla:identity' AS owner_id\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.cities',\n" + " scan.startup.mode='latest',\n" + " scan.startup.timestamp.millis='140000000'\n" + ") FORMAT UPSERT ENCODE AVRO (\n" + " schema.registry = 'http://localhost:8081'\n" + ");" + [0x00] + +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TABLE") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + + +connect "zilla://streams/app2" + option zilla:window 8192 + option zilla:transmission "duplex" + + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE TOPIC IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TOPIC") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt new file mode 100644 index 0000000000..00c261f9db --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt @@ -0,0 +1,113 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +accept "zilla://streams/app1" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +read await CREATE_TOPIC_COMPLETED + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE TABLE IF NOT EXISTS cities (\n" + " *,\n" + " PRIMARY KEY (key)\n" + ")\n" + "INCLUDE KEY AS key\n" + "INCLUDE header 'zilla:correlation-id' AS correlation_id\n" + "INCLUDE timestamp AS timestamp\n" + "INCLUDE header 'zilla:identity' AS owner_id\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.cities',\n" + " scan.startup.mode='latest',\n" + " scan.startup.timestamp.millis='140000000'\n" + ") FORMAT UPSERT ENCODE AVRO (\n" + " schema.registry = 'http://localhost:8081'\n" + ");" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TABLE") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +accept "zilla://streams/app2" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE TOPIC IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TOPIC") + .build() + .build()} + +write notify CREATE_TOPIC_COMPLETED + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + + diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt index fb70359d1e..be0c185f67 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt @@ -35,7 +35,8 @@ write zilla:data.ext ${pgsql:dataEx() write "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" " PRIMARY KEY (key)\n" - ") INCLUDE KEY AS key\n" + ")\n" + "INCLUDE KEY AS key\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt index 6479311838..7554c745e1 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt @@ -39,7 +39,8 @@ read zilla:data.ext ${pgsql:dataEx() read "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" " PRIMARY KEY (key)\n" - ") INCLUDE KEY AS key\n" + ")\n" + "INCLUDE KEY AS key\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt index ec71b51e3c..a2cb2b4ad2 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt @@ -35,7 +35,8 @@ write zilla:data.ext ${pgsql:dataEx() write "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" " PRIMARY KEY (key)\n" - ") INCLUDE KEY AS key\n" + ")\n" + "INCLUDE KEY AS key\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt index 69ae550915..6596416bdd 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt @@ -39,7 +39,8 @@ read zilla:data.ext ${pgsql:dataEx() read "CREATE TABLE IF NOT EXISTS cities (\n" " *,\n" " PRIMARY KEY (key)\n" - ") INCLUDE KEY AS key\n" + ")\n" + "INCLUDE KEY AS key\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/client.rpt new file mode 100644 index 0000000000..7529d60a4b --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/client.rpt @@ -0,0 +1,57 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE TABLE IF NOT EXISTS weather " + "(city VARCHAR, temperature DOUBLE, date DATE)\n" + "INCLUDE zilla_correlation_id AS correlation_id\n" + "INCLUDE zilla_identity AS owner_id\n" + "INCLUDE timestamp as timestamp;" + [0x00] + +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TABLE") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/server.rpt new file mode 100644 index 0000000000..bf70fb8462 --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/server.rpt @@ -0,0 +1,58 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE TABLE IF NOT EXISTS weather " + "(city VARCHAR, temperature DOUBLE, date DATE)\n" + "INCLUDE zilla_correlation_id AS correlation_id\n" + "INCLUDE zilla_identity AS owner_id\n" + "INCLUDE timestamp as timestamp;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TABLE") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/client.rpt new file mode 100644 index 0000000000..d7aab69d57 --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/client.rpt @@ -0,0 +1,57 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE TABLE IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id))\n" + "INCLUDE zilla_correlation_id AS correlation_id\n" + "INCLUDE zilla_identity AS owner_id\n" + "INCLUDE timestamp as timestamp;" + [0x00] + +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TABLE") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/server.rpt new file mode 100644 index 0000000000..1b3abad6af --- /dev/null +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/server.rpt @@ -0,0 +1,59 @@ +# +# Copyright 2021-2023 Aklivity Inc +# +# Licensed under the Aklivity Community License (the "License"); you may not use +# this file except in compliance with the License. You may obtain a copy of the +# License at +# +# https://www.aklivity.io/aklivity-community-license/ +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +property serverAddress "zilla://streams/app0" + +accept ${serverAddress} + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${pgsql:beginEx() + .typeId(zilla:id("pgsql")) + .parameter("user", "root") + .parameter("database", "dev") + .parameter("application_name", "psql") + .parameter("client_encoding", "UTF8") + .build()} + +connected + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE TABLE IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id))\n" + "INCLUDE zilla_correlation_id AS correlation_id\n" + "INCLUDE zilla_identity AS owner_id\n" + "INCLUDE timestamp as timestamp;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TABLE") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + diff --git a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java index 0e49252489..45ba219cb4 100644 --- a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java +++ b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java @@ -85,4 +85,22 @@ public void shouldCreateFunction() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${app}/create.table.with.includes/client", + "${app}/create.table.with.includes/server" }) + public void shouldCreateTableWithIncludes() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${app}/create.table.with.primary.key.and.includes/client", + "${app}/create.table.with.primary.key.and.includes/server" }) + public void shouldCreateTableWithPrimaryKeyAndIncludes() throws Exception + { + k3po.finish(); + } } diff --git a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java index 4da5b7f5f8..d25affd204 100644 --- a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java +++ b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java @@ -85,4 +85,22 @@ public void shouldCreateFunction() throws Exception { k3po.finish(); } + + @Test + @Specification({ + "${app}/create.table.with.includes/client", + "${app}/create.table.with.includes/server" }) + public void shouldCreateTableWithIncludes() throws Exception + { + k3po.finish(); + } + + @Test + @Specification({ + "${app}/create.table.with.primary.key.and.includes/client", + "${app}/create.table.with.primary.key.and.includes/server" }) + public void shouldCreateTableWithPrimaryKeyAndIncludes() throws Exception + { + k3po.finish(); + } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/CreateTableCommand.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/CreateTableCommand.java new file mode 100644 index 0000000000..2aec705ba1 --- /dev/null +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/CreateTableCommand.java @@ -0,0 +1,33 @@ +/* + * Copyright 2021-2023 Aklivity Inc + * + * Licensed under the Aklivity Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * https://www.aklivity.io/aklivity-community-license/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package io.aklivity.zilla.runtime.binding.risingwave.internal.statement; + +import java.util.Map; + +import net.sf.jsqlparser.statement.create.table.CreateTable; + +public final class CreateTableCommand +{ + public final CreateTable createTable; + public final Map includes; + + public CreateTableCommand( + CreateTable createTable, + Map includes) + { + this.createTable = createTable; + this.includes = includes; + } +} diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java index 94bb0bf90e..aad0491dbf 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java @@ -14,14 +14,33 @@ */ package io.aklivity.zilla.runtime.binding.risingwave.internal.statement; +import java.io.StringReader; import java.util.List; +import java.util.Map; +import org.agrona.DirectBuffer; +import org.agrona.collections.Object2ObjectHashMap; + +import net.sf.jsqlparser.parser.CCJSqlParserManager; import net.sf.jsqlparser.statement.create.table.CreateTable; import net.sf.jsqlparser.statement.create.table.Index; public abstract class RisingwaveCommandTemplate { - public String getPrimaryKey( + private final CCJSqlParserManager parserManager = new CCJSqlParserManager(); + private final Map includeMap = new Object2ObjectHashMap<>(); + + protected final StringBuilder includeBuilder = new StringBuilder(); + protected static final Map ZILLA_MAPPINGS = new Object2ObjectHashMap<>(); + + static + { + ZILLA_MAPPINGS.put("zilla_correlation_id", "INCLUDE header 'zilla:correlation-id' AS %s\n"); + ZILLA_MAPPINGS.put("zilla_identity", "INCLUDE header 'zilla:identity' AS %s\n"); + ZILLA_MAPPINGS.put("timestamp", "INCLUDE timestamp AS %s\n"); + } + + public String primaryKey( CreateTable statement) { String primaryKey = null; @@ -44,4 +63,79 @@ public String getPrimaryKey( return primaryKey; } + + public CreateTableCommand parserCreateTable( + DirectBuffer buffer, + int offset, + int length) + { + String query = buffer.getStringWithoutLengthUtf8(offset, length); + + int includeIndex = query.indexOf("INCLUDE"); + + String createTablePart; + String includePart = null; + + CreateTable createTable = null; + Map includes = null; + + if (includeIndex != -1) + { + createTablePart = query.substring(0, includeIndex).trim(); + includePart = query.substring(includeIndex).trim(); + } + else + { + createTablePart = query.trim(); + } + + try + { + createTable = (CreateTable) parserManager.parse(new StringReader(createTablePart)); + } + catch (Exception ignore) + { + } + + if (includePart != null) + { + includes = parseSpecificIncludes(includePart); + } + + return new CreateTableCommand(createTable, includes); + } + + private Map parseSpecificIncludes( + String includePart) + { + String[] includeClauses = includePart.toLowerCase().split("include"); + for (String clause : includeClauses) + { + clause = clause.trim(); + if (!clause.isEmpty()) + { + String[] parts = clause.toLowerCase().split("as"); + if (parts.length == 2) + { + String key = parts[0].trim(); + String value = parts[1].trim().replace(";", ""); + + if (isValidInclude(key)) + { + includeMap.put(key, value); + } + } + } + } + + return includeMap; + } + + private static boolean isValidInclude( + String key) + { + return "zilla_correlation_id".equals(key) || + "zilla_identity".equals(key) || + "timestamp".equals(key); + } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java index 26cb3e096b..b8e96ccf01 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java @@ -14,13 +14,12 @@ */ package io.aklivity.zilla.runtime.binding.risingwave.internal.statement; -import net.sf.jsqlparser.statement.Statement; -import net.sf.jsqlparser.statement.create.table.CreateTable; +import java.util.Map; public class RisingwaveCreateSourceTemplate extends RisingwaveCommandTemplate { private final String sqlFormat = """ - CREATE SOURCE IF NOT EXISTS %s (*) + CREATE SOURCE IF NOT EXISTS %s (*)%s WITH ( connector='kafka', properties.bootstrap.server='%s', @@ -47,11 +46,19 @@ public RisingwaveCreateSourceTemplate( public String generate( String database, - Statement statement) + CreateTableCommand command) { - CreateTable createTable = (CreateTable) statement; - String table = createTable.getTable().getName(); + String table = command.createTable.getTable().getName(); - return String.format(sqlFormat, table, bootstrapServer, database, table, scanStartupMil, schemaRegistry); + includeBuilder.setLength(0); + final Map includes = command.includes; + if (includes != null && !includes.isEmpty()) + { + includeBuilder.append("\n"); + includes.forEach((k, v) -> includeBuilder.append(String.format(ZILLA_MAPPINGS.get(k), v))); + includeBuilder.delete(includeBuilder.length() - 1, includeBuilder.length()); + } + + return String.format(sqlFormat, table, includeBuilder, bootstrapServer, database, table, scanStartupMil, schemaRegistry); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java index 8b264a3576..7e069f3b6f 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java @@ -14,7 +14,8 @@ */ package io.aklivity.zilla.runtime.binding.risingwave.internal.statement; -import net.sf.jsqlparser.statement.Statement; +import java.util.Map; + import net.sf.jsqlparser.statement.create.table.CreateTable; public class RisingwaveCreateTableTemplate extends RisingwaveCommandTemplate @@ -23,7 +24,8 @@ public class RisingwaveCreateTableTemplate extends RisingwaveCommandTemplate CREATE TABLE IF NOT EXISTS %s ( *, PRIMARY KEY (key) - ) INCLUDE KEY AS key + ) + INCLUDE KEY AS key%s WITH ( connector='kafka', properties.bootstrap.server='%s', @@ -50,12 +52,21 @@ public RisingwaveCreateTableTemplate( public String generate( String database, - Statement statement) + CreateTableCommand command) { - CreateTable createTable = (CreateTable) statement; + CreateTable createTable = command.createTable; String table = createTable.getTable().getName(); - return String.format(sqlFormat, table, bootstrapServer, database, + includeBuilder.setLength(0); + final Map includes = command.includes; + if (includes != null && !includes.isEmpty()) + { + includeBuilder.append("\n"); + includes.forEach((k, v) -> includeBuilder.append(String.format(ZILLA_MAPPINGS.get(k), v))); + includeBuilder.delete(includeBuilder.length() - 1, includeBuilder.length()); + } + + return String.format(sqlFormat, table, includeBuilder, bootstrapServer, database, table, scanStartupMil, schemaRegistry); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java index 0e716228ad..9e2c627c56 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java @@ -16,7 +16,6 @@ import java.util.Map; -import net.sf.jsqlparser.statement.Statement; import net.sf.jsqlparser.statement.create.table.CreateTable; import net.sf.jsqlparser.statement.create.view.CreateView; @@ -35,11 +34,10 @@ public RisingwaveCreateTopicTemplate() } public String generate( - Statement statement) + CreateTable createTable) { - CreateTable createTable = (CreateTable) statement; String topic = createTable.getTable().getName(); - String primaryKeyField = getPrimaryKey(createTable); + String primaryKeyField = primaryKey(createTable); String primaryKey = primaryKeyField != null ? String.format(primaryKeyFormat, primaryKeyField) : ""; fieldBuilder.setLength(0); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java index 1455a884c3..0aa05d35ac 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java @@ -42,6 +42,7 @@ import io.aklivity.zilla.runtime.binding.risingwave.internal.config.RisingwaveBindingConfig; import io.aklivity.zilla.runtime.binding.risingwave.internal.config.RisingwaveCommandType; import io.aklivity.zilla.runtime.binding.risingwave.internal.config.RisingwaveRouteConfig; +import io.aklivity.zilla.runtime.binding.risingwave.internal.statement.CreateTableCommand; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.Flyweight; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.OctetsFW; import io.aklivity.zilla.runtime.binding.risingwave.internal.types.String32FW; @@ -66,7 +67,6 @@ import net.sf.jsqlparser.parser.CCJSqlParserManager; import net.sf.jsqlparser.statement.Statement; import net.sf.jsqlparser.statement.create.function.CreateFunction; -import net.sf.jsqlparser.statement.create.table.CreateTable; import net.sf.jsqlparser.statement.create.view.CreateView; public final class RisingwaveProxyFactory implements RisingwaveStreamFactory @@ -1475,23 +1475,23 @@ private void decodeCreateTableCommand( else { final RisingwaveBindingConfig binding = server.binding; - final CreateTable statement = (CreateTable) parseStatement(buffer, offset, length); - final String primaryKey = binding.createTable.getPrimaryKey(statement); + final CreateTableCommand command = binding.createTable.parserCreateTable(buffer, offset, length); + final String primaryKey = binding.createTable.primaryKey(command.createTable); String newStatement = ""; int progress = 0; if (server.commandsProcessed == 0) { - newStatement = binding.createTopic.generate(statement); + newStatement = binding.createTopic.generate(command.createTable); } else if (server.commandsProcessed == 1 && primaryKey != null) { - newStatement = binding.createTable.generate(server.database, statement); + newStatement = binding.createTable.generate(server.database, command); } else if (server.commandsProcessed == 1) { - newStatement = binding.createSource.generate(server.database, statement); + newStatement = binding.createSource.generate(server.database, command); } statementBuffer.putBytes(progress, newStatement.getBytes()); diff --git a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java index 615c8555de..a94346526c 100644 --- a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java +++ b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java @@ -100,4 +100,24 @@ public void shouldCreateFunction() throws Exception { k3po.finish(); } + + @Test + @Configuration("proxy.yaml") + @Specification({ + "${pgsql}/create.table.with.includes/client", + "${effective}/create.table.with.includes/server" }) + public void shouldCreateTableWithIncludes() throws Exception + { + k3po.finish(); + } + + @Test + @Configuration("proxy.yaml") + @Specification({ + "${pgsql}/create.table.with.primary.key.and.includes/client", + "${effective}/create.table.with.primary.key.and.includes/server" }) + public void shouldCreateTableWithPrimaryKeyAndIncludes() throws Exception + { + k3po.finish(); + } }