diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.stream.with.includes/client.rpt similarity index 98% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/client.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.stream.with.includes/client.rpt index 10aa0d39ab..679a78508e 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.stream.with.includes/client.rpt @@ -34,8 +34,8 @@ write zilla:data.ext ${pgsql:dataEx() .build()} write "CREATE SOURCE IF NOT EXISTS weather (*)\n" "INCLUDE header 'zilla:correlation-id' AS correlation_id\n" + "INCLUDE header 'zilla:identity' AS identity\n" "INCLUDE timestamp AS timestamp\n" - "INCLUDE header 'zilla:identity' AS owner_id\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.stream.with.includes/server.rpt similarity index 98% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/server.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.stream.with.includes/server.rpt index ff8e4dec8c..9871eea900 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.includes/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.stream.with.includes/server.rpt @@ -38,8 +38,8 @@ read zilla:data.ext ${pgsql:dataEx() .build()} read "CREATE SOURCE IF NOT EXISTS weather (*)\n" "INCLUDE header 'zilla:correlation-id' AS correlation_id\n" + "INCLUDE header 'zilla:identity' AS identity\n" "INCLUDE timestamp AS timestamp\n" - "INCLUDE header 'zilla:identity' AS owner_id\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.stream/client.rpt similarity index 100% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/client.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.stream/client.rpt diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.stream/server.rpt similarity index 100% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table/server.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.stream/server.rpt diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.tables/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.streams/client.rpt similarity index 100% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.tables/client.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.streams/client.rpt diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.tables/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.streams/server.rpt similarity index 100% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.tables/server.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.streams/server.rpt diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt index f9451bc7bb..7c587a897b 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/client.rpt @@ -32,14 +32,10 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "CREATE TABLE IF NOT EXISTS cities (\n" - " *,\n" - " PRIMARY KEY (id)\n" - ")\n" - "INCLUDE KEY AS key\n" - "INCLUDE header 'zilla:correlation-id' AS correlation_id\n" +write "CREATE SOURCE IF NOT EXISTS cities_source (*)\n" + "INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id_header\n" + "INCLUDE header 'zilla:identity' AS zilla_identity_header\n" "INCLUDE timestamp AS timestamp\n" - "INCLUDE header 'zilla:identity' AS owner_id\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" @@ -50,7 +46,60 @@ write "CREATE TABLE IF NOT EXISTS cities (\n" " schema.registry = 'http://localhost:8081'\n" ");" [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SOURCE") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE MATERIALIZED VIEW IF NOT EXISTS cities_view AS" + " SELECT id, name, description," + " COALESCE(correlation_id, zilla_correlation_id_header::varchar) as correlation_id," + " COALESCE(identity, zilla_identity_header::varchar) as identity," + " timestamp" + " FROM cities_source;" + [0x00] +write flush +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_MATERIALIZED_VIEW") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE TABLE IF NOT EXISTS cities" + " (id VARCHAR, name VARCHAR, description VARCHAR," + " correlation_id VARCHAR, identity VARCHAR, timestamp TIMESTAMP," + " PRIMARY KEY (id));" + [0x00] write flush read advised zilla:flush ${pgsql:flushEx() @@ -67,12 +116,65 @@ read advised zilla:flush ${pgsql:flushEx() .build() .build()} +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE SINK cities_view_sink INTO cities FROM cities_view;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE SINK cities_sink\n" + "FROM cities\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.cities',\n" + " primary_key='id'\n" + ") FORMAT UPSERT ENCODE AVRO (\n" + " schema.registry='http://localhost:8081'\n" + ") KEY ENCODE TEXT;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} connect "zilla://streams/app2" option zilla:window 8192 option zilla:transmission "duplex" - write zilla:begin.ext ${pgsql:beginEx() .typeId(zilla:id("pgsql")) .parameter("user", "root") @@ -89,7 +191,9 @@ write zilla:data.ext ${pgsql:dataEx() .build() .build()} write "CREATE TOPIC IF NOT EXISTS cities " - "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + "(id VARCHAR, name VARCHAR, description VARCHAR," + " correlation_id VARCHAR, identity VARCHAR, timestamp TIMESTAMP," + " PRIMARY KEY (id));" [0x00] write flush @@ -106,4 +210,3 @@ read advised zilla:flush ${pgsql:flushEx() .status("IDLE") .build() .build()} - diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt index 189026d000..4bc34b76a3 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key.and.includes/server.rpt @@ -36,29 +36,130 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE TABLE IF NOT EXISTS cities (\n" - " *,\n" - " PRIMARY KEY (id)\n" - ")\n" - "INCLUDE KEY AS key\n" - "INCLUDE header 'zilla:correlation-id' AS correlation_id\n" - "INCLUDE timestamp AS timestamp\n" - "INCLUDE header 'zilla:identity' AS owner_id\n" +read "CREATE SOURCE IF NOT EXISTS cities_source (*)\n" + "INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id_header\n" + "INCLUDE header 'zilla:identity' AS zilla_identity_header\n" + "INCLUDE timestamp AS timestamp\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.cities',\n" + " scan.startup.mode='latest',\n" + " scan.startup.timestamp.millis='140000000'\n" + ") FORMAT PLAIN ENCODE AVRO (\n" + " schema.registry = 'http://localhost:8081'\n" + ");" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SOURCE") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE MATERIALIZED VIEW IF NOT EXISTS cities_view AS" + " SELECT id, name, description," + " COALESCE(correlation_id, zilla_correlation_id_header::varchar) as correlation_id," + " COALESCE(identity, zilla_identity_header::varchar) as identity," + " timestamp" + " FROM cities_source;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_MATERIALIZED_VIEW") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE TABLE IF NOT EXISTS cities" + " (id VARCHAR, name VARCHAR, description VARCHAR," + " correlation_id VARCHAR, identity VARCHAR, timestamp TIMESTAMP," + " PRIMARY KEY (id));" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_TABLE") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE SINK cities_view_sink INTO cities FROM cities_view;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE SINK cities_sink\n" + "FROM cities\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" " topic='dev.cities',\n" - " scan.startup.mode='latest',\n" - " scan.startup.timestamp.millis='140000000'\n" - ") FORMAT PLAIN ENCODE AVRO (\n" - " schema.registry = 'http://localhost:8081'\n" - ");" + " primary_key='id'\n" + ") FORMAT UPSERT ENCODE AVRO (\n" + " schema.registry='http://localhost:8081'\n" + ") KEY ENCODE TEXT;" [0x00] write advise zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .completion() - .tag("CREATE_TABLE") + .tag("CREATE_SINK") .build() .build()} @@ -91,7 +192,9 @@ read zilla:data.ext ${pgsql:dataEx() .build() .build()} read "CREATE TOPIC IF NOT EXISTS cities " - "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + "(id VARCHAR, name VARCHAR, description VARCHAR," + " correlation_id VARCHAR, identity VARCHAR, timestamp TIMESTAMP," + " PRIMARY KEY (id));" [0x00] write advise zilla:flush ${pgsql:flushEx() @@ -109,5 +212,3 @@ write advise zilla:flush ${pgsql:flushEx() .status("IDLE") .build() .build()} - - diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt index be60e5fe79..b5829feab7 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/client.rpt @@ -32,11 +32,7 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "CREATE TABLE IF NOT EXISTS cities (\n" - " *,\n" - " PRIMARY KEY (id)\n" - ")\n" - "INCLUDE KEY AS key\n" +write "CREATE SOURCE IF NOT EXISTS cities_source (*)\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" @@ -47,7 +43,53 @@ write "CREATE TABLE IF NOT EXISTS cities (\n" " schema.registry = 'http://localhost:8081'\n" ");" [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SOURCE") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE MATERIALIZED VIEW IF NOT EXISTS cities_view AS SELECT * FROM cities_source;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_MATERIALIZED_VIEW") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE TABLE IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + [0x00] write flush read advised zilla:flush ${pgsql:flushEx() @@ -64,12 +106,65 @@ read advised zilla:flush ${pgsql:flushEx() .build() .build()} +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE SINK cities_view_sink INTO cities FROM cities_view;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE SINK cities_sink\n" + "FROM cities\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.cities',\n" + " primary_key='id'\n" + ") FORMAT UPSERT ENCODE AVRO (\n" + " schema.registry='http://localhost:8081'\n" + ") KEY ENCODE TEXT;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} connect "zilla://streams/app2" option zilla:window 8192 option zilla:transmission "duplex" - write zilla:begin.ext ${pgsql:beginEx() .typeId(zilla:id("pgsql")) .parameter("user", "root") @@ -103,4 +198,3 @@ read advised zilla:flush ${pgsql:flushEx() .status("IDLE") .build() .build()} - diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt index 8d2121373d..11d987ffc1 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/create.table.with.primary.key/server.rpt @@ -36,20 +36,61 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE TABLE IF NOT EXISTS cities (\n" - " *,\n" - " PRIMARY KEY (id)\n" - ")\n" - "INCLUDE KEY AS key\n" - "WITH (\n" - " connector='kafka',\n" - " properties.bootstrap.server='localhost:9092',\n" - " topic='dev.cities',\n" - " scan.startup.mode='latest',\n" - " scan.startup.timestamp.millis='140000000'\n" - ") FORMAT PLAIN ENCODE AVRO (\n" - " schema.registry = 'http://localhost:8081'\n" - ");" +read "CREATE SOURCE IF NOT EXISTS cities_source (*)\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.cities',\n" + " scan.startup.mode='latest',\n" + " scan.startup.timestamp.millis='140000000'\n" + ") FORMAT PLAIN ENCODE AVRO (\n" + " schema.registry = 'http://localhost:8081'\n" + ");" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SOURCE") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE MATERIALIZED VIEW IF NOT EXISTS cities_view AS SELECT * FROM cities_source;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_MATERIALIZED_VIEW") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE TABLE IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" [0x00] write advise zilla:flush ${pgsql:flushEx() @@ -66,6 +107,59 @@ write advise zilla:flush ${pgsql:flushEx() .build() .build()} +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE SINK cities_view_sink INTO cities FROM cities_view;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE SINK cities_sink\n" + "FROM cities\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.cities',\n" + " primary_key='id'\n" + ") FORMAT UPSERT ENCODE AVRO (\n" + " schema.registry='http://localhost:8081'\n" + ") KEY ENCODE TEXT;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + accept "zilla://streams/app2" option zilla:window 8192 option zilla:transmission "duplex" @@ -106,5 +200,3 @@ write advise zilla:flush ${pgsql:flushEx() .status("IDLE") .build() .build()} - - diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt index ab484b7434..98a4b465ce 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/client.rpt @@ -32,11 +32,7 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "CREATE TABLE IF NOT EXISTS cities (\n" - " *,\n" - " PRIMARY KEY (id)\n" - ")\n" - "INCLUDE KEY AS key\n" +write "CREATE SOURCE IF NOT EXISTS cities_source (*)\n" "WITH (\n" " connector='kafka',\n" " properties.bootstrap.server='localhost:9092',\n" @@ -49,6 +45,53 @@ write "CREATE TABLE IF NOT EXISTS cities (\n" [0x00] write flush +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SOURCE") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE MATERIALIZED VIEW IF NOT EXISTS cities_view AS SELECT * FROM cities_source;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_MATERIALIZED_VIEW") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE TABLE IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" + [0x00] +write flush + read advised zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .completion() @@ -63,6 +106,61 @@ read advised zilla:flush ${pgsql:flushEx() .build() .build()} +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE SINK cities_view_sink INTO cities FROM cities_view;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +write zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +write "CREATE SINK cities_sink\n" + "FROM cities\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.cities',\n" + " primary_key='id'\n" + ") FORMAT UPSERT ENCODE AVRO (\n" + " schema.registry='http://localhost:8081'\n" + ") KEY ENCODE TEXT;" + [0x00] +write flush + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +read advised zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + write zilla:data.ext ${pgsql:dataEx() .typeId(zilla:id("pgsql")) .query() diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt index 39350568c4..b3378e42e4 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/effective/query.with.multiple.statements/server.rpt @@ -36,20 +36,61 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE TABLE IF NOT EXISTS cities (\n" - " *,\n" - " PRIMARY KEY (id)\n" - ")\n" - "INCLUDE KEY AS key\n" - "WITH (\n" - " connector='kafka',\n" - " properties.bootstrap.server='localhost:9092',\n" - " topic='dev.cities',\n" - " scan.startup.mode='latest',\n" - " scan.startup.timestamp.millis='140000000'\n" - ") FORMAT PLAIN ENCODE AVRO (\n" - " schema.registry = 'http://localhost:8081'\n" - ");" +read "CREATE SOURCE IF NOT EXISTS cities_source (*)\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.cities',\n" + " scan.startup.mode='latest',\n" + " scan.startup.timestamp.millis='140000000'\n" + ") FORMAT PLAIN ENCODE AVRO (\n" + " schema.registry = 'http://localhost:8081'\n" + ");" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SOURCE") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE MATERIALIZED VIEW IF NOT EXISTS cities_view AS SELECT * FROM cities_source;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_MATERIALIZED_VIEW") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE TABLE IF NOT EXISTS cities " + "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id));" [0x00] write advise zilla:flush ${pgsql:flushEx() @@ -66,6 +107,59 @@ write advise zilla:flush ${pgsql:flushEx() .build() .build()} +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE SINK cities_view_sink INTO cities FROM cities_view;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + +read zilla:data.ext ${pgsql:dataEx() + .typeId(zilla:id("pgsql")) + .query() + .build() + .build()} +read "CREATE SINK cities_sink\n" + "FROM cities\n" + "WITH (\n" + " connector='kafka',\n" + " properties.bootstrap.server='localhost:9092',\n" + " topic='dev.cities',\n" + " primary_key='id'\n" + ") FORMAT UPSERT ENCODE AVRO (\n" + " schema.registry='http://localhost:8081'\n" + ") KEY ENCODE TEXT;" + [0x00] + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .completion() + .tag("CREATE_SINK") + .build() + .build()} + +write advise zilla:flush ${pgsql:flushEx() + .typeId(zilla:id("pgsql")) + .ready() + .status("IDLE") + .build() + .build()} + read zilla:data.ext ${pgsql:dataEx() .typeId(zilla:id("pgsql")) .query() diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream.with.includes/client.rpt similarity index 93% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/client.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream.with.includes/client.rpt index 7529d60a4b..dbb1c245b7 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream.with.includes/client.rpt @@ -32,10 +32,10 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "CREATE TABLE IF NOT EXISTS weather " +write "CREATE STREAM IF NOT EXISTS weather " "(city VARCHAR, temperature DOUBLE, date DATE)\n" "INCLUDE zilla_correlation_id AS correlation_id\n" - "INCLUDE zilla_identity AS owner_id\n" + "INCLUDE zilla_identity AS identity\n" "INCLUDE timestamp as timestamp;" [0x00] @@ -44,7 +44,7 @@ write flush read advised zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .completion() - .tag("CREATE_TABLE") + .tag("CREATE_STREAM") .build() .build()} diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream.with.includes/server.rpt similarity index 93% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/server.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream.with.includes/server.rpt index bf70fb8462..19833142cf 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.includes/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream.with.includes/server.rpt @@ -36,17 +36,17 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE TABLE IF NOT EXISTS weather " +read "CREATE STREAM IF NOT EXISTS weather " "(city VARCHAR, temperature DOUBLE, date DATE)\n" "INCLUDE zilla_correlation_id AS correlation_id\n" - "INCLUDE zilla_identity AS owner_id\n" + "INCLUDE zilla_identity AS identity\n" "INCLUDE timestamp as timestamp;" [0x00] write advise zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .completion() - .tag("CREATE_TABLE") + .tag("CREATE_STREAM") .build() .build()} diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream/client.rpt similarity index 94% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table/client.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream/client.rpt index f13127bff9..245b79577a 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream/client.rpt @@ -32,7 +32,7 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "CREATE TABLE IF NOT EXISTS weather " +write "CREATE STREAM IF NOT EXISTS weather " "(city VARCHAR, temperature DOUBLE, date DATE);" [0x00] @@ -41,7 +41,7 @@ write flush read advised zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .completion() - .tag("CREATE_TABLE") + .tag("CREATE_STREAM") .build() .build()} diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream/server.rpt similarity index 95% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table/server.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream/server.rpt index 2e2cc77ca9..45dc55a0b7 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.stream/server.rpt @@ -36,14 +36,14 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE TABLE IF NOT EXISTS weather " +read "CREATE STREAM IF NOT EXISTS weather " "(city VARCHAR, temperature DOUBLE, date DATE);" [0x00] write advise zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .completion() - .tag("CREATE_TABLE") + .tag("CREATE_STREAM") .build() .build()} diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.tables/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.streams/client.rpt similarity index 92% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.tables/client.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.streams/client.rpt index be9c1b188e..044b7660a5 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.tables/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.streams/client.rpt @@ -32,7 +32,7 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "CREATE TABLE IF NOT EXISTS weather_c " +write "CREATE STREAM IF NOT EXISTS weather_c " "(city VARCHAR, temperature DOUBLE, date DATE);" [0x00] @@ -41,7 +41,7 @@ write flush read advised zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .completion() - .tag("CREATE_TABLE") + .tag("CREATE_STREAM") .build() .build()} @@ -57,7 +57,7 @@ write zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -write "CREATE TABLE IF NOT EXISTS weather_f " +write "CREATE STREAM IF NOT EXISTS weather_f " "(city VARCHAR, temperature DOUBLE, date DATE);" [0x00] @@ -66,7 +66,7 @@ write flush read advised zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .completion() - .tag("CREATE_TABLE") + .tag("CREATE_STREAM") .build() .build()} diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.tables/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.streams/server.rpt similarity index 92% rename from incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.tables/server.rpt rename to incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.streams/server.rpt index 8ec0dc1570..f852dbdb84 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.tables/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.streams/server.rpt @@ -36,14 +36,14 @@ read zilla:data.ext ${pgsql:dataEx() .query() .build() .build()} -read "CREATE TABLE IF NOT EXISTS weather_c " +read "CREATE STREAM IF NOT EXISTS weather_c " "(city VARCHAR, temperature DOUBLE, date DATE);" [0x00] write advise zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .completion() - .tag("CREATE_TABLE") + .tag("CREATE_STREAM") .build() .build()} @@ -54,14 +54,14 @@ write advise zilla:flush ${pgsql:flushEx() .build() .build()} -read "CREATE TABLE IF NOT EXISTS weather_f " +read "CREATE STREAM IF NOT EXISTS weather_f " "(city VARCHAR, temperature DOUBLE, date DATE);" [0x00] write advise zilla:flush ${pgsql:flushEx() .typeId(zilla:id("pgsql")) .completion() - .tag("CREATE_TABLE") + .tag("CREATE_STREAM") .build() .build()} diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/client.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/client.rpt index d7aab69d57..f40e1041f9 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/client.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/client.rpt @@ -35,7 +35,7 @@ write zilla:data.ext ${pgsql:dataEx() write "CREATE TABLE IF NOT EXISTS cities " "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id))\n" "INCLUDE zilla_correlation_id AS correlation_id\n" - "INCLUDE zilla_identity AS owner_id\n" + "INCLUDE zilla_identity AS identity\n" "INCLUDE timestamp as timestamp;" [0x00] diff --git a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/server.rpt b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/server.rpt index 1b3abad6af..ed5fda502c 100644 --- a/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/server.rpt +++ b/incubator/binding-risingwave.spec/src/main/scripts/io/aklivity/zilla/specs/binding/risingwave/streams/pgsql/create.table.with.primary.key.and.includes/server.rpt @@ -39,7 +39,7 @@ read zilla:data.ext ${pgsql:dataEx() read "CREATE TABLE IF NOT EXISTS cities " "(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id))\n" "INCLUDE zilla_correlation_id AS correlation_id\n" - "INCLUDE zilla_identity AS owner_id\n" + "INCLUDE zilla_identity AS identity\n" "INCLUDE timestamp as timestamp;" [0x00] diff --git a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java index d1b671d511..b2a3a4dff1 100644 --- a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java +++ b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/EffectiveIT.java @@ -48,10 +48,10 @@ public void shouldCreateTableWithPrimaryKey() throws Exception @Test @Specification({ - "${app}/create.table/client", - "${app}/create.table/server" + "${app}/create.stream/client", + "${app}/create.stream/server" }) - public void shouldCreateTable() throws Exception + public void shouldCreateStream() throws Exception { k3po.finish(); } @@ -107,9 +107,9 @@ public void shouldCreateFunctionEmbedded() throws Exception @Test @Specification({ - "${app}/create.table.with.includes/client", - "${app}/create.table.with.includes/server" }) - public void shouldCreateTableWithIncludes() throws Exception + "${app}/create.stream.with.includes/client", + "${app}/create.stream.with.includes/server" }) + public void shouldCreateStreamWithIncludes() throws Exception { k3po.finish(); } @@ -134,9 +134,9 @@ public void shouldShowTablesWithNewline() throws Exception @Test @Specification({ - "${app}/create.tables/client", - "${app}/create.tables/server" }) - public void shouldCreateTables() throws Exception + "${app}/create.streams/client", + "${app}/create.streams/server" }) + public void shouldCreateStreams() throws Exception { k3po.finish(); } diff --git a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java index 7227533b33..ceb626e751 100644 --- a/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java +++ b/incubator/binding-risingwave.spec/src/test/java/io/aklivity/zilla/specs/binding/risingwave/streams/PgsqlIT.java @@ -48,10 +48,10 @@ public void shouldCreateTableWithPrimaryKey() throws Exception @Test @Specification({ - "${app}/create.table/client", - "${app}/create.table/server" + "${app}/create.stream/client", + "${app}/create.stream/server" }) - public void shouldCreateTable() throws Exception + public void shouldCreateStream() throws Exception { k3po.finish(); } @@ -107,9 +107,9 @@ public void shouldCreateFunctionEmbedded() throws Exception @Test @Specification({ - "${app}/create.table.with.includes/client", - "${app}/create.table.with.includes/server" }) - public void shouldCreateTableWithIncludes() throws Exception + "${app}/create.stream.with.includes/client", + "${app}/create.stream.with.includes/server" }) + public void shouldCreateStreamWithIncludes() throws Exception { k3po.finish(); } @@ -134,8 +134,8 @@ public void shouldShowTablesWithNewline() throws Exception @Test @Specification({ - "${app}/create.tables/client", - "${app}/create.tables/server" }) + "${app}/create.streams/client", + "${app}/create.streams/server" }) public void shouldCreateTables() throws Exception { k3po.finish(); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java index 602e124808..20431d5e40 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveBindingConfig.java @@ -81,8 +81,7 @@ public RisingwaveBindingConfig( udf = options.udfs.get(0); } - this.createTable = new RisingwaveCreateTableTemplate(bootstrapServer, - location, config.kafkaScanStartupTimestampMillis()); + this.createTable = new RisingwaveCreateTableTemplate(); this.createSource = new RisingwaveCreateSourceTemplate(bootstrapServer, location, config.kafkaScanStartupTimestampMillis()); this.createSink = new RisingwaveCreateSinkTemplate(bootstrapServer, location); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java index c0a0409270..1c16bea36e 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/config/RisingwaveCommandType.java @@ -20,6 +20,7 @@ public enum RisingwaveCommandType { CREATE_TOPIC_COMMAND("CREATE TOPIC".getBytes()), CREATE_TABLE_COMMAND("CREATE TABLE".getBytes()), + CREATE_STREAM_COMMAND("CREATE STREAM".getBytes()), CREATE_MATERIALIZED_VIEW_COMMAND("CREATE MATERIALIZED VIEW".getBytes()), CREATE_FUNCTION_COMMAND("CREATE FUNCTION".getBytes()), UNKNOWN_COMMAND("UNKNOWN".getBytes()); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java index d97fb34c48..bb65d54a24 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCommandTemplate.java @@ -15,6 +15,7 @@ package io.aklivity.zilla.runtime.binding.risingwave.internal.statement; import java.io.StringReader; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -28,11 +29,11 @@ public abstract class RisingwaveCommandTemplate { private final CCJSqlParserManager parserManager = new CCJSqlParserManager(); - private final Map includeMap = new Object2ObjectHashMap<>(); + private final Map includeMap = new LinkedHashMap<>(); + protected final StringBuilder fieldBuilder = new StringBuilder(); protected final StringBuilder includeBuilder = new StringBuilder(); protected static final Map ZILLA_MAPPINGS = new Object2ObjectHashMap<>(); - static { ZILLA_MAPPINGS.put("zilla_correlation_id", "INCLUDE header 'zilla:correlation-id' AS %s\n"); @@ -40,6 +41,14 @@ public abstract class RisingwaveCommandTemplate ZILLA_MAPPINGS.put("timestamp", "INCLUDE timestamp AS %s\n"); } + protected static final Map ZILLA_INCLUDE_TYPE_MAPPINGS = new Object2ObjectHashMap<>(); + static + { + ZILLA_INCLUDE_TYPE_MAPPINGS.put("zilla_correlation_id", "VARCHAR"); + ZILLA_INCLUDE_TYPE_MAPPINGS.put("zilla_identity", "VARCHAR"); + ZILLA_INCLUDE_TYPE_MAPPINGS.put("timestamp", "TIMESTAMP"); + } + public String primaryKey( CreateTable statement) { @@ -70,6 +79,7 @@ public RisingwaveCreateTableCommand parserCreateTable( int length) { String query = buffer.getStringWithoutLengthUtf8(offset, length); + query = query.replaceAll("(?i)\\bCREATE\\s+STREAM\\b", "CREATE TABLE"); int includeIndex = query.indexOf("INCLUDE"); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateMaterializedViewTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateMaterializedViewTemplate.java index 17c227df3a..4a57891eb3 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateMaterializedViewTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateMaterializedViewTemplate.java @@ -14,25 +14,60 @@ */ package io.aklivity.zilla.runtime.binding.risingwave.internal.statement; -import net.sf.jsqlparser.statement.Statement; +import net.sf.jsqlparser.statement.create.table.CreateTable; import net.sf.jsqlparser.statement.create.view.CreateView; public class RisingwaveCreateMaterializedViewTemplate extends RisingwaveCommandTemplate { private final String sqlFormat = """ CREATE MATERIALIZED VIEW IF NOT EXISTS %s AS %s;\u0000"""; + private final String fieldFormat = "%s, "; + private final String includeFormat = "COALESCE(%s, zilla_%s_header::varchar) as %s, "; public RisingwaveCreateMaterializedViewTemplate() { } public String generate( - Statement statement) + CreateView createView) { - CreateView createView = (CreateView) statement; String view = createView.getView().getName(); String select = createView.getSelect().toString(); return String.format(sqlFormat, view, select); } + + public String generate( + RisingwaveCreateTableCommand command) + { + CreateTable createTable = command.createTable; + String name = createTable.getTable().getName(); + + String select = "*"; + + if (command.includes != null) + { + fieldBuilder.setLength(0); + + createTable.getColumnDefinitions() + .forEach(c -> fieldBuilder.append( + String.format(fieldFormat, c.getColumnName()))); + command.includes.forEach((k, v) -> + { + if ("timestamp".equals(k)) + { + fieldBuilder.append(String.format(fieldFormat, v)); + } + else + { + fieldBuilder.append(String.format(includeFormat, v, v, v)); + } + }); + + fieldBuilder.delete(fieldBuilder.length() - 2, fieldBuilder.length()); + select = fieldBuilder.toString(); + } + + return String.format(sqlFormat, "%s_view".formatted(name), "SELECT %s FROM %s_source".formatted(select, name)); + } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java index d5133d2eaf..e30a55b5f3 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSinkTemplate.java @@ -17,12 +17,14 @@ import java.util.Map; import java.util.Optional; -import net.sf.jsqlparser.statement.Statement; +import net.sf.jsqlparser.statement.create.table.CreateTable; import net.sf.jsqlparser.statement.create.view.CreateView; public class RisingwaveCreateSinkTemplate extends RisingwaveCommandTemplate { private final String sqlFormat = """ + CREATE SINK %s_view_sink INTO %s FROM %s_view;\u0000"""; + private final String sqlKafkaFormat = """ CREATE SINK %s_sink FROM %s WITH ( @@ -49,9 +51,8 @@ public RisingwaveCreateSinkTemplate( public String generate( String database, Map columns, - Statement statement) + CreateView createView) { - CreateView createView = (CreateView) statement; String viewName = createView.getView().getName(); Optional> primaryKeyMatch = columns.entrySet().stream() @@ -68,6 +69,25 @@ public String generate( String textPrimaryKey = primaryKeyMatch.map(Map.Entry::getKey).orElse(null); String primaryKey = textPrimaryKey != null ? primaryKeyFormat.formatted(textPrimaryKey) : ""; - return String.format(sqlFormat, viewName, viewName, bootstrapServer, database, viewName, primaryKey, schemaRegistry); + return String.format(sqlKafkaFormat, viewName, viewName, bootstrapServer, database, viewName, primaryKey, schemaRegistry); + } + + public String generate( + String database, + String primaryKey, + CreateTable createTable) + { + String table = createTable.getTable().getName(); + + return String.format(sqlKafkaFormat, table, table, bootstrapServer, database, table, + primaryKeyFormat.formatted(primaryKey), schemaRegistry); + } + + public String generate( + CreateTable createTable) + { + String table = createTable.getTable().getName(); + + return String.format(sqlFormat, table, table, table); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java index da095da905..1e64e8456d 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateSourceTemplate.java @@ -44,7 +44,7 @@ public RisingwaveCreateSourceTemplate( this.scanStartupMil = scanStartupMil; } - public String generate( + public String generateStreamSource( String database, RisingwaveCreateTableCommand command) { @@ -61,4 +61,35 @@ public String generate( return String.format(sqlFormat, table, includeBuilder, bootstrapServer, database, table, scanStartupMil, schemaRegistry); } + + public String generateTableSource( + String database, + RisingwaveCreateTableCommand command) + { + String table = command.createTable.getTable().getName(); + String sourceName = "%s_source".formatted(table); + + includeBuilder.setLength(0); + final Map includes = command.includes; + if (includes != null && !includes.isEmpty()) + { + includeBuilder.append("\n"); + includes.forEach((k, v) -> + { + if ("timestamp".equals(k)) + { + includeBuilder.append(String.format(ZILLA_MAPPINGS.get(k), v)); + } + else + { + includeBuilder.append(String.format(ZILLA_MAPPINGS.get(k), "zilla_%s_header".formatted(v))); + } + + }); + includeBuilder.delete(includeBuilder.length() - 1, includeBuilder.length()); + } + + return String.format(sqlFormat, sourceName, includeBuilder, bootstrapServer, + database, table, scanStartupMil, schemaRegistry); + } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java index ac4d8507dc..9e8f98acf0 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTableTemplate.java @@ -14,61 +14,41 @@ */ package io.aklivity.zilla.runtime.binding.risingwave.internal.statement; -import java.util.Map; - import net.sf.jsqlparser.statement.create.table.CreateTable; public class RisingwaveCreateTableTemplate extends RisingwaveCommandTemplate { private final String sqlFormat = """ - CREATE TABLE IF NOT EXISTS %s ( - *, - PRIMARY KEY (%s) - ) - INCLUDE KEY AS key%s - WITH ( - connector='kafka', - properties.bootstrap.server='%s', - topic='%s.%s', - scan.startup.mode='latest', - scan.startup.timestamp.millis='%d' - ) FORMAT PLAIN ENCODE AVRO ( - schema.registry = '%s' - );\u0000"""; - - private final String bootstrapServer; - private final String schemaRegistry; - private final long scanStartupMil; + CREATE TABLE IF NOT EXISTS %s (%s%s);\u0000"""; + private final String primaryKeyFormat = ", PRIMARY KEY (%s)"; + private final String fieldFormat = "%s %s, "; - public RisingwaveCreateTableTemplate( - String bootstrapServer, - String schemaRegistry, - long scanStartupMil) + public RisingwaveCreateTableTemplate() { - this.bootstrapServer = bootstrapServer; - this.schemaRegistry = schemaRegistry; - this.scanStartupMil = scanStartupMil; } public String generate( - String database, RisingwaveCreateTableCommand command) { CreateTable createTable = command.createTable; - String table = createTable.getTable().getName(); + String topic = createTable.getTable().getName(); + String primaryKeyField = primaryKey(createTable); + String primaryKey = primaryKeyField != null ? String.format(primaryKeyFormat, primaryKeyField) : ""; - String primaryKey = primaryKey(createTable); + fieldBuilder.setLength(0); - includeBuilder.setLength(0); - final Map includes = command.includes; - if (includes != null && !includes.isEmpty()) + createTable.getColumnDefinitions() + .forEach(c -> fieldBuilder.append( + String.format(fieldFormat, c.getColumnName(), c.getColDataType().getDataType()))); + + if (command.includes != null) { - includeBuilder.append("\n"); - includes.forEach((k, v) -> includeBuilder.append(String.format(ZILLA_MAPPINGS.get(k), v))); - includeBuilder.delete(includeBuilder.length() - 1, includeBuilder.length()); + command.includes.forEach((k, v) -> fieldBuilder.append( + String.format(fieldFormat, v, ZILLA_INCLUDE_TYPE_MAPPINGS.get(k)))); } - return String.format(sqlFormat, table, primaryKey, includeBuilder, bootstrapServer, database, - table, scanStartupMil, schemaRegistry); + fieldBuilder.delete(fieldBuilder.length() - 2, fieldBuilder.length()); + + return String.format(sqlFormat, topic, fieldBuilder, primaryKey); } } diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java index 9e2c627c56..f088a3111a 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveCreateTopicTemplate.java @@ -45,6 +45,32 @@ public String generate( createTable.getColumnDefinitions() .forEach(c -> fieldBuilder.append( String.format(fieldFormat, c.getColumnName(), c.getColDataType().getDataType()))); + + fieldBuilder.delete(fieldBuilder.length() - 2, fieldBuilder.length()); + + return String.format(sqlFormat, topic, fieldBuilder, primaryKey); + } + + public String generate( + RisingwaveCreateTableCommand command) + { + CreateTable createTable = command.createTable; + String topic = createTable.getTable().getName(); + String primaryKeyField = primaryKey(createTable); + String primaryKey = primaryKeyField != null ? String.format(primaryKeyFormat, primaryKeyField) : ""; + + fieldBuilder.setLength(0); + + createTable.getColumnDefinitions() + .forEach(c -> fieldBuilder.append( + String.format(fieldFormat, c.getColumnName(), c.getColDataType().getDataType()))); + + if (command.includes != null) + { + command.includes.forEach((k, v) -> fieldBuilder.append( + String.format(fieldFormat, v, ZILLA_INCLUDE_TYPE_MAPPINGS.get(k)))); + } + fieldBuilder.delete(fieldBuilder.length() - 2, fieldBuilder.length()); return String.format(sqlFormat, topic, fieldBuilder, primaryKey); diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveSqlCommandParser.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveSqlCommandParser.java index 619a25ddfb..5fe8d3a60b 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveSqlCommandParser.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/statement/RisingwaveSqlCommandParser.java @@ -27,7 +27,7 @@ public final class RisingwaveSqlCommandParser private static final String SQL_COMMAND_PATTERN = "(?i)\\b(CREATE FUNCTION)\\b.*?\\$\\$(.*?)\\$\\$\\s*;[\\x00\\n]*" + "|\\b(CREATE FUNCTION)\\b.*?RETURNS .*?AS.*?;[\\x00\\n]*" + - "|\\b(CREATE MATERIALIZED VIEW|CREATE SOURCE|CREATE SINK|CREATE INDEX" + + "|\\b(CREATE MATERIALIZED VIEW|CREATE SOURCE|CREATE SINK|CREATE INDEX|CREATE STREAM" + "|CREATE VIEW|SHOW TABLES|DESCRIBE|SHOW)\\b.*?;[\\x00\\n]*" + "|\\b(SELECT|INSERT|UPDATE|DELETE|ALTER|DROP|CREATE TABLE|CREATE SCHEMA|CREATE DATABASE)\\b.*?;[\\x00\\n]*"; diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java index b80098e3e8..5c3204e749 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveCompletionCommand.java @@ -19,6 +19,7 @@ public enum RisingwaveCompletionCommand UNKNOWN_COMMAND("UNKNOWN".getBytes()), CREATE_TABLE_COMMAND("CREATE_TABLE".getBytes()), CREATE_MATERIALIZED_VIEW_COMMAND("CREATE_MATERIALIZED_VIEW".getBytes()), + CREATE_STREAM_COMMAND("CREATE_STREAM".getBytes()), CREATE_FUNCTION_COMMAND("CREATE_FUNCTION".getBytes()); private final byte[] value; diff --git a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java index 418a848edd..a4e1a31aee 100644 --- a/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java +++ b/incubator/binding-risingwave/src/main/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/RisingwaveProxyFactory.java @@ -144,6 +144,7 @@ public final class RisingwaveProxyFactory implements RisingwaveStreamFactory Object2ObjectHashMap clientTransforms = new Object2ObjectHashMap<>(); clientTransforms.put(RisingwaveCommandType.CREATE_TABLE_COMMAND, this::decodeCreateTableCommand); + clientTransforms.put(RisingwaveCommandType.CREATE_STREAM_COMMAND, this::decodeCreateStreamCommand); clientTransforms.put(RisingwaveCommandType.CREATE_MATERIALIZED_VIEW_COMMAND, this::decodeCreateMaterializedViewCommand); clientTransforms.put(RisingwaveCommandType.CREATE_FUNCTION_COMMAND, this::decodeCreateFunctionCommand); clientTransforms.put(RisingwaveCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand); @@ -1466,7 +1467,7 @@ private void decodeCreateTableCommand( int offset, int length) { - if (server.commandsProcessed == 2) + if (server.commandsProcessed == 6) { server.onCommandCompleted(traceId, authorization, length, RisingwaveCompletionCommand.CREATE_TABLE_COMMAND); } @@ -1481,15 +1482,68 @@ private void decodeCreateTableCommand( if (server.commandsProcessed == 0) { - newStatement = binding.createTopic.generate(command.createTable); + newStatement = binding.createTopic.generate(command); + } + else if (server.commandsProcessed == 1) + { + newStatement = binding.createSource.generateTableSource(server.database, command); + } + else if (server.commandsProcessed == 2) + { + newStatement = binding.createView.generate(command); + } + else if (server.commandsProcessed == 3) + { + newStatement = binding.createTable.generate(command); + } + else if (server.commandsProcessed == 4) + { + newStatement = binding.createSink.generate(command.createTable); + } + else if (server.commandsProcessed == 5) + { + newStatement = binding.createSink.generate(server.database, primaryKey, command.createTable); } - else if (server.commandsProcessed == 1 && primaryKey != null) + + statementBuffer.putBytes(progress, newStatement.getBytes()); + progress += newStatement.length(); + + final RisingwaveRouteConfig route = + server.binding.resolve(authorization, statementBuffer, 0, progress); + + final PgsqlClient client = server.streamsByRouteIds.get(route.id); + client.doPgsqlQuery(traceId, authorization, statementBuffer, 0, progress); + client.typeCommand = ignoreFlushCommand; + } + } + + private void decodeCreateStreamCommand( + PgsqlServer server, + long traceId, + long authorization, + DirectBuffer buffer, + int offset, + int length) + { + if (server.commandsProcessed == 2) + { + server.onCommandCompleted(traceId, authorization, length, RisingwaveCompletionCommand.CREATE_STREAM_COMMAND); + } + else + { + final RisingwaveBindingConfig binding = server.binding; + final RisingwaveCreateTableCommand command = binding.createTable.parserCreateTable(buffer, offset, length); + + String newStatement = ""; + int progress = 0; + + if (server.commandsProcessed == 0) { - newStatement = binding.createTable.generate(server.database, command); + newStatement = binding.createTopic.generate(command.createTable); } else if (server.commandsProcessed == 1) { - newStatement = binding.createSource.generate(server.database, command); + newStatement = binding.createSource.generateStreamSource(server.database, command); } statementBuffer.putBytes(progress, newStatement.getBytes()); diff --git a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java index 316ab6d996..9eed51a8c0 100644 --- a/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java +++ b/incubator/binding-risingwave/src/test/java/io/aklivity/zilla/runtime/binding/risingwave/internal/stream/ProxyIT.java @@ -62,9 +62,9 @@ public void shouldCreateTableWithPrimaryKey() throws Exception @Test @Configuration("proxy.yaml") @Specification({ - "${pgsql}/create.table/client", - "${effective}/create.table/server" }) - public void shouldCreateTable() throws Exception + "${pgsql}/create.stream/client", + "${effective}/create.stream/server" }) + public void shouldCreateStream() throws Exception { k3po.finish(); } @@ -72,9 +72,9 @@ public void shouldCreateTable() throws Exception @Test @Configuration("proxy.yaml") @Specification({ - "${pgsql}/create.tables/client", - "${effective}/create.tables/server" }) - public void shouldCreateTables() throws Exception + "${pgsql}/create.streams/client", + "${effective}/create.streams/server" }) + public void shouldCreateStreams() throws Exception { k3po.finish(); } @@ -144,9 +144,9 @@ public void shouldCreateFunctionEmbedded() throws Exception @Test @Configuration("proxy.yaml") @Specification({ - "${pgsql}/create.table.with.includes/client", - "${effective}/create.table.with.includes/server" }) - public void shouldCreateTableWithIncludes() throws Exception + "${pgsql}/create.stream.with.includes/client", + "${effective}/create.stream.with.includes/server" }) + public void shouldCreateStreamWithIncludes() throws Exception { k3po.finish(); }