Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Checkpoint
  • Loading branch information
akrambek committed Oct 3, 2024
commit 42ea5ad4d5dce025b55ac955b37d8f757ea0cebe
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ write zilla:data.ext ${pgsql:dataEx()
.build()}
write "CREATE SOURCE IF NOT EXISTS weather (*)\n"
"INCLUDE header 'zilla:correlation-id' AS correlation_id\n"
"INCLUDE header 'zilla:identity' AS identity\n"
"INCLUDE timestamp AS timestamp\n"
"INCLUDE header 'zilla:identity' AS owner_id\n"
"WITH (\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ read zilla:data.ext ${pgsql:dataEx()
.build()}
read "CREATE SOURCE IF NOT EXISTS weather (*)\n"
"INCLUDE header 'zilla:correlation-id' AS correlation_id\n"
"INCLUDE header 'zilla:identity' AS identity\n"
"INCLUDE timestamp AS timestamp\n"
"INCLUDE header 'zilla:identity' AS owner_id\n"
"WITH (\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ write zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
write "CREATE SOURCE IF NOT EXISTS cities_source (*)\n"
"INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id_header,\n"
"INCLUDE timestamp AS timestamp,\n"
"INCLUDE header 'zilla:identity' AS zilla_identity_header,\n"
"WITH (\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
" topic='dev.cities',\n"
" scan.startup.mode='latest',\n"
" scan.startup.timestamp.millis='140000000'\n"
") FORMAT PLAIN ENCODE AVRO (\n"
" schema.registry = 'http://localhost:8081'\n"
");"
[0x00]
write "CREATE SOURCE IF NOT EXISTS cities_source (*)\n"
"INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id_header\n"
"INCLUDE header 'zilla:identity' AS zilla_identity_header\n"
"INCLUDE timestamp AS timestamp\n"
"WITH (\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
" topic='dev.cities',\n"
" scan.startup.mode='latest',\n"
" scan.startup.timestamp.millis='140000000'\n"
") FORMAT PLAIN ENCODE AVRO (\n"
" schema.registry = 'http://localhost:8081'\n"
");"
[0x00]
write flush

read advised zilla:flush ${pgsql:flushEx()
Expand All @@ -67,11 +67,12 @@ write zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
write "CREATE MATERIALIZED VIEW cities_view AS\n"
" SELECT id, name, description,"
" COALESCE(correlation_id, zilla_correlation_id_header::varchar) as correlation_id,"
" COALESCE(identity, zilla_identity_header::varchar) as identity,
" timestamp FROM cities_source;"
write "CREATE MATERIALIZED VIEW IF NOT EXISTS cities_view AS"
" SELECT id, name, description,"
" COALESCE(correlation_id, zilla_correlation_id_header::varchar) as correlation_id,"
" COALESCE(identity, zilla_identity_header::varchar) as identity,"
" timestamp"
" FROM cities_source;"
[0x00]
write flush

Expand All @@ -94,9 +95,9 @@ write zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
write "CREATE TABLE IF NOT EXISTS cities "
"(id VARCHAR, name VARCHAR, description VARCHAR,"
" correlation_id VARCHAR, identity as VARCHAR, timestamp TIMESTAMP,"
write "CREATE TABLE IF NOT EXISTS cities"
" (id VARCHAR, name VARCHAR, description VARCHAR,"
" correlation_id VARCHAR, identity VARCHAR, timestamp TIMESTAMP,"
" PRIMARY KEY (id));"
[0x00]
write flush
Expand All @@ -121,7 +122,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()
.build()}
write "CREATE SINK cities_view_sink INTO cities FROM cities_view;"
[0x00]
[0x00]
write flush

read advised zilla:flush ${pgsql:flushEx()
Expand All @@ -146,13 +147,13 @@ write zilla:data.ext ${pgsql:dataEx()
write "CREATE SINK cities_sink\n"
"FROM cities\n"
"WITH (\n"
" connector='kafka',\n"
" topic='dev.cities',\n"
" properties.bootstrap.server='localhost:9092',\n"
" primary_key='id'\n
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
" topic='dev.cities',\n"
" primary_key='id'\n"
") FORMAT UPSERT ENCODE AVRO (\n"
" schema.registry = 'http://localhost:8081'\n"
");"
" schema.registry='http://localhost:8081'\n"
") KEY ENCODE TEXT;"
[0x00]
write flush

Expand Down Expand Up @@ -191,7 +192,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()}
write "CREATE TOPIC IF NOT EXISTS cities "
"(id VARCHAR, name VARCHAR, description VARCHAR,"
" correlation_id VARCHAR, identity as VARCHAR, timestamp TIMESTAMP,"
" correlation_id VARCHAR, identity VARCHAR, timestamp TIMESTAMP,"
" PRIMARY KEY (id));"
[0x00]
write flush
Expand All @@ -209,4 +210,3 @@ read advised zilla:flush ${pgsql:flushEx()
.status("IDLE")
.build()
.build()}

Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ read zilla:data.ext ${pgsql:dataEx()
.build()
.build()}
read "CREATE SOURCE IF NOT EXISTS cities_source (*)\n"
"INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id_header,\n"
"INCLUDE timestamp AS timestamp,\n"
"INCLUDE header 'zilla:identity' AS zilla_identity_header,\n"
"INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id_header\n"
"INCLUDE header 'zilla:identity' AS zilla_identity_header\n"
"INCLUDE timestamp AS timestamp\n"
"WITH (\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
Expand Down Expand Up @@ -70,11 +70,12 @@ read zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
read "CREATE MATERIALIZED VIEW cities_view AS\n"
" SELECT id, name, description,"
" COALESCE(correlation_id, zilla_correlation_id_header::varchar) as correlation_id,"
" COALESCE(identity, zilla_identity_header::varchar) as identity,
" timestamp FROM cities_source;"
read "CREATE MATERIALIZED VIEW IF NOT EXISTS cities_view AS"
" SELECT id, name, description,"
" COALESCE(correlation_id, zilla_correlation_id_header::varchar) as correlation_id,"
" COALESCE(identity, zilla_identity_header::varchar) as identity,"
" timestamp"
" FROM cities_source;"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
Expand All @@ -96,9 +97,9 @@ read zilla:data.ext ${pgsql:dataEx()
.query()
.build()
.build()}
read "CREATE TABLE IF NOT EXISTS cities "
"(id VARCHAR, name VARCHAR, description VARCHAR,"
" correlation_id VARCHAR, identity as VARCHAR, timestamp TIMESTAMP,"
read "CREATE TABLE IF NOT EXISTS cities"
" (id VARCHAR, name VARCHAR, description VARCHAR,"
" correlation_id VARCHAR, identity VARCHAR, timestamp TIMESTAMP,"
" PRIMARY KEY (id));"
[0x00]

Expand Down Expand Up @@ -146,13 +147,13 @@ read zilla:data.ext ${pgsql:dataEx()
read "CREATE SINK cities_sink\n"
"FROM cities\n"
"WITH (\n"
" connector='kafka',\n"
" topic='dev.cities',\n"
" properties.bootstrap.server='localhost:9092',\n"
" primary_key='id'\n
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
" topic='dev.cities',\n"
" primary_key='id'\n"
") FORMAT UPSERT ENCODE AVRO (\n"
" schema.registry = 'http://localhost:8081'\n"
");"
" schema.registry='http://localhost:8081'\n"
") KEY ENCODE TEXT;"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
Expand Down Expand Up @@ -192,7 +193,7 @@ read zilla:data.ext ${pgsql:dataEx()
.build()}
read "CREATE TOPIC IF NOT EXISTS cities "
"(id VARCHAR, name VARCHAR, description VARCHAR,"
" correlation_id VARCHAR, identity as VARCHAR, timestamp TIMESTAMP,"
" correlation_id VARCHAR, identity VARCHAR, timestamp TIMESTAMP,"
" PRIMARY KEY (id));"
[0x00]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ write zilla:data.ext ${pgsql:dataEx()
write "CREATE STREAM IF NOT EXISTS weather "
"(city VARCHAR, temperature DOUBLE, date DATE)\n"
"INCLUDE zilla_correlation_id AS correlation_id\n"
"INCLUDE zilla_identity AS owner_id\n"
"INCLUDE zilla_identity AS identity\n"
"INCLUDE timestamp as timestamp;"
[0x00]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ read zilla:data.ext ${pgsql:dataEx()
read "CREATE STREAM IF NOT EXISTS weather "
"(city VARCHAR, temperature DOUBLE, date DATE)\n"
"INCLUDE zilla_correlation_id AS correlation_id\n"
"INCLUDE zilla_identity AS owner_id\n"
"INCLUDE zilla_identity AS identity\n"
"INCLUDE timestamp as timestamp;"
[0x00]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ write zilla:data.ext ${pgsql:dataEx()
write "CREATE TABLE IF NOT EXISTS cities "
"(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id))\n"
"INCLUDE zilla_correlation_id AS correlation_id\n"
"INCLUDE zilla_identity AS owner_id\n"
"INCLUDE zilla_identity AS identity\n"
"INCLUDE timestamp as timestamp;"
[0x00]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ read zilla:data.ext ${pgsql:dataEx()
read "CREATE TABLE IF NOT EXISTS cities "
"(id VARCHAR, name VARCHAR, description VARCHAR, PRIMARY KEY (id))\n"
"INCLUDE zilla_correlation_id AS correlation_id\n"
"INCLUDE zilla_identity AS owner_id\n"
"INCLUDE zilla_identity AS identity\n"
"INCLUDE timestamp as timestamp;"
[0x00]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.aklivity.zilla.runtime.binding.risingwave.internal.statement;

import java.io.StringReader;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -28,18 +29,26 @@
public abstract class RisingwaveCommandTemplate
{
private final CCJSqlParserManager parserManager = new CCJSqlParserManager();
private final Map<String, String> includeMap = new Object2ObjectHashMap<>();
private final Map<String, String> includeMap = new LinkedHashMap<>();

protected final StringBuilder fieldBuilder = new StringBuilder();
protected final StringBuilder includeBuilder = new StringBuilder();
protected static final Map<String, String> ZILLA_MAPPINGS = new Object2ObjectHashMap<>();

static
{
ZILLA_MAPPINGS.put("zilla_correlation_id", "INCLUDE header 'zilla:correlation-id' AS %s\n");
ZILLA_MAPPINGS.put("zilla_identity", "INCLUDE header 'zilla:identity' AS %s\n");
ZILLA_MAPPINGS.put("timestamp", "INCLUDE timestamp AS %s\n");
}

protected static final Map<String, String> ZILLA_INCLUDE_TYPE_MAPPINGS = new Object2ObjectHashMap<>();
static
{
ZILLA_INCLUDE_TYPE_MAPPINGS.put("zilla_correlation_id", "VARCHAR");
ZILLA_INCLUDE_TYPE_MAPPINGS.put("zilla_identity", "VARCHAR");
ZILLA_INCLUDE_TYPE_MAPPINGS.put("timestamp", "TIMESTAMP");
}

public String primaryKey(
CreateTable statement)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class RisingwaveCreateMaterializedViewTemplate extends RisingwaveCommandT
{
private final String sqlFormat = """
CREATE MATERIALIZED VIEW IF NOT EXISTS %s AS %s;\u0000""";
private final String fieldFormat = "%s, ";
private final String includeFormat = "COALESCE(%s, zilla_%s_header::varchar) as %s, ";

public RisingwaveCreateMaterializedViewTemplate()
{
Expand All @@ -41,6 +43,31 @@ public String generate(
CreateTable createTable = command.createTable;
String name = createTable.getTable().getName();

return String.format(sqlFormat, "%s_view".formatted(name), "SELECT * FROM %s_source".formatted(name));
String select = "*";

if (command.includes != null)
{
fieldBuilder.setLength(0);

createTable.getColumnDefinitions()
.forEach(c -> fieldBuilder.append(
String.format(fieldFormat, c.getColumnName())));
command.includes.forEach((k, v) ->
{
if ("timestamp".equals(k))
{
fieldBuilder.append(String.format(fieldFormat, v));
}
else
{
fieldBuilder.append(String.format(includeFormat, v, v, v));
}
});

fieldBuilder.delete(fieldBuilder.length() - 2, fieldBuilder.length());
select = fieldBuilder.toString();
}

return String.format(sqlFormat, "%s_view".formatted(name), "SELECT %s FROM %s_source".formatted(select, name));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,52 @@ public RisingwaveCreateSourceTemplate(
this.scanStartupMil = scanStartupMil;
}

public String generate(
public String generateStreamSource(
String database,
RisingwaveCreateTableCommand command)
{
return generate(database, "", command);
String table = command.createTable.getTable().getName();

includeBuilder.setLength(0);
final Map<String, String> includes = command.includes;
if (includes != null && !includes.isEmpty())
{
includeBuilder.append("\n");
includes.forEach((k, v) -> includeBuilder.append(String.format(ZILLA_MAPPINGS.get(k), v)));
includeBuilder.delete(includeBuilder.length() - 1, includeBuilder.length());
}

return String.format(sqlFormat, table, includeBuilder, bootstrapServer, database, table, scanStartupMil, schemaRegistry);
}

public String generate(
public String generateTableSource(
String database,
String prefix,
RisingwaveCreateTableCommand command)
{
String table = command.createTable.getTable().getName();
String sourceName = "%s%s".formatted(table, prefix);
String sourceName = "%s_source".formatted(table);

includeBuilder.setLength(0);
final Map<String, String> includes = command.includes;
if (includes != null && !includes.isEmpty())
{
includeBuilder.append("\n");
includes.forEach((k, v) -> includeBuilder.append(String.format(ZILLA_MAPPINGS.get(k), v)));
includes.forEach((k, v) ->
{
if ("timestamp".equals(k))
{
includeBuilder.append(String.format(ZILLA_MAPPINGS.get(k), v));
}
else
{
includeBuilder.append(String.format(ZILLA_MAPPINGS.get(k), "zilla_%s_header".formatted(v)));
}

});
includeBuilder.delete(includeBuilder.length() - 1, includeBuilder.length());
}

return String.format(sqlFormat, sourceName, includeBuilder, bootstrapServer, database, table, scanStartupMil, schemaRegistry);
return String.format(sqlFormat, sourceName, includeBuilder, bootstrapServer,
database, table, scanStartupMil, schemaRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public class RisingwaveCreateTableTemplate extends RisingwaveCommandTemplate
private final String primaryKeyFormat = ", PRIMARY KEY (%s)";
private final String fieldFormat = "%s %s, ";

private final StringBuilder fieldBuilder = new StringBuilder();

public RisingwaveCreateTableTemplate()
{
}
Expand All @@ -42,6 +40,13 @@ public String generate(
createTable.getColumnDefinitions()
.forEach(c -> fieldBuilder.append(
String.format(fieldFormat, c.getColumnName(), c.getColDataType().getDataType())));

if (command.includes != null)
{
command.includes.forEach((k, v) -> fieldBuilder.append(
String.format(fieldFormat, v, ZILLA_INCLUDE_TYPE_MAPPINGS.get(k))));
}

fieldBuilder.delete(fieldBuilder.length() - 2, fieldBuilder.length());

return String.format(sqlFormat, topic, fieldBuilder, primaryKey);
Expand Down
Loading