Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix remaining issues
  • Loading branch information
akrambek committed Oct 18, 2024
commit 8a89a8122e00d86dcbd2e850bda3b2bc16c65a1d
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void enterFunc_arg(
PostgreSqlParser.Func_argContext ctx)
{
String argName = ctx.param_name() != null ? ctx.param_name().getText() : null;
String argType = ctx.func_type().getText();
String argType = tokens.getText(ctx.func_type());
arguments.add(new FunctionArgument(argName, argType));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ write zilla:data.ext ${pgsql:dataEx()
write "CREATE SOURCE IF NOT EXISTS weather (*)\n"
"INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id\n"
"INCLUDE header 'zilla:identity' AS zilla_identity\n"
"INCLUDE timestamp AS timestamp\n"
"INCLUDE timestamp AS zilla_timestamp\n"
"WITH (\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
Expand Down Expand Up @@ -82,8 +82,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()
.build()}
write "CREATE TOPIC IF NOT EXISTS weather "
"(city VARCHAR, temperature DOUBLE, date DATE,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, timestamp TIMESTAMP);"
"(city VARCHAR, temperature DOUBLE, date DATE);"
[0x00]

write flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ read zilla:data.ext ${pgsql:dataEx()
read "CREATE SOURCE IF NOT EXISTS weather (*)\n"
"INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id\n"
"INCLUDE header 'zilla:identity' AS zilla_identity\n"
"INCLUDE timestamp AS timestamp\n"
"INCLUDE timestamp AS zilla_timestamp\n"
"WITH (\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
Expand Down Expand Up @@ -88,8 +88,7 @@ read zilla:data.ext ${pgsql:dataEx()
.build()
.build()}
read "CREATE TOPIC IF NOT EXISTS weather "
"(city VARCHAR, temperature DOUBLE, date DATE,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, timestamp TIMESTAMP);"
"(city VARCHAR, temperature DOUBLE, date DATE);"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ write zilla:data.ext ${pgsql:dataEx()
write "CREATE SOURCE IF NOT EXISTS cities_source (*)\n"
"INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id_header\n"
"INCLUDE header 'zilla:identity' AS zilla_identity_header\n"
"INCLUDE timestamp AS timestamp\n"
"INCLUDE timestamp AS zilla_timestamp_timestamp\n"
"WITH (\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
Expand Down Expand Up @@ -70,7 +70,8 @@ write zilla:data.ext ${pgsql:dataEx()
write "CREATE MATERIALIZED VIEW IF NOT EXISTS cities_view"
" AS SELECT id, name, description,"
" COALESCE(zilla_correlation_id, zilla_correlation_id_header::varchar) as zilla_correlation_id,"
" COALESCE(zilla_identity, zilla_identity_header::varchar) as zilla_identity, timestamp"
" COALESCE(zilla_identity, zilla_identity_header::varchar) as zilla_identity,"
" COALESCE(zilla_timestamp, zilla_timestamp_timestamp::varchar) as zilla_timestamp"
" FROM cities_source;"
[0x00]
write flush
Expand All @@ -96,7 +97,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()}
write "CREATE TABLE IF NOT EXISTS cities"
" (id VARCHAR, name VARCHAR, description VARCHAR,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, timestamp TIMESTAMP,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, zilla_timestamp TIMESTAMP,"
" PRIMARY KEY (id));"
[0x00]
write flush
Expand Down Expand Up @@ -191,7 +192,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()}
write "CREATE TOPIC IF NOT EXISTS cities "
"(id VARCHAR, name VARCHAR, description VARCHAR,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, timestamp TIMESTAMP,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, zilla_timestamp TIMESTAMP,"
" PRIMARY KEY (id));"
[0x00]
write flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ read zilla:data.ext ${pgsql:dataEx()
read "CREATE SOURCE IF NOT EXISTS cities_source (*)\n"
"INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id_header\n"
"INCLUDE header 'zilla:identity' AS zilla_identity_header\n"
"INCLUDE timestamp AS timestamp\n"
"INCLUDE timestamp AS zilla_timestamp_timestamp\n"
"WITH (\n"
" connector='kafka',\n"
" properties.bootstrap.server='localhost:9092',\n"
Expand Down Expand Up @@ -73,7 +73,8 @@ read zilla:data.ext ${pgsql:dataEx()
read "CREATE MATERIALIZED VIEW IF NOT EXISTS cities_view"
" AS SELECT id, name, description,"
" COALESCE(zilla_correlation_id, zilla_correlation_id_header::varchar) as zilla_correlation_id,"
" COALESCE(zilla_identity, zilla_identity_header::varchar) as zilla_identity, timestamp"
" COALESCE(zilla_identity, zilla_identity_header::varchar) as zilla_identity,"
" COALESCE(zilla_timestamp, zilla_timestamp_timestamp::varchar) as zilla_timestamp"
" FROM cities_source;"
[0x00]

Expand All @@ -98,7 +99,7 @@ read zilla:data.ext ${pgsql:dataEx()
.build()}
read "CREATE TABLE IF NOT EXISTS cities"
" (id VARCHAR, name VARCHAR, description VARCHAR,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, timestamp TIMESTAMP,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, zilla_timestamp TIMESTAMP,"
" PRIMARY KEY (id));"
[0x00]

Expand Down Expand Up @@ -192,7 +193,7 @@ read zilla:data.ext ${pgsql:dataEx()
.build()}
read "CREATE TOPIC IF NOT EXISTS cities "
"(id VARCHAR, name VARCHAR, description VARCHAR,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, timestamp TIMESTAMP,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, zilla_timestamp TIMESTAMP,"
" PRIMARY KEY (id));"
[0x00]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()}
write "CREATE STREAM IF NOT EXISTS weather "
"(city VARCHAR, temperature DOUBLE, date DATE,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, timestamp TIMESTAMP);"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, zilla_timestamp TIMESTAMP);"
[0x00]

write flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx()
.build()}
read "CREATE STREAM IF NOT EXISTS weather "
"(city VARCHAR, temperature DOUBLE, date DATE,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, timestamp TIMESTAMP);"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, zilla_timestamp TIMESTAMP);"
[0x00]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ write zilla:data.ext ${pgsql:dataEx()
.build()}
write "CREATE TABLE IF NOT EXISTS cities "
"(id VARCHAR, name VARCHAR, description VARCHAR,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, timestamp TIMESTAMP,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, zilla_timestamp TIMESTAMP,"
" PRIMARY KEY (id));"
[0x00]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ read zilla:data.ext ${pgsql:dataEx()
.build()}
read "CREATE TABLE IF NOT EXISTS cities "
"(id VARCHAR, name VARCHAR, description VARCHAR,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, timestamp TIMESTAMP,"
" zilla_correlation_id VARCHAR, zilla_identity VARCHAR, zilla_timestamp TIMESTAMP,"
" PRIMARY KEY (id));"
[0x00]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,25 @@

public abstract class RisingwaveCommandTemplate
{
protected static final String ZILLA_CORRELATION_ID = "zilla_correlation_id";
protected static final String ZILLA_IDENTITY = "zilla_identity";
protected static final String ZILLA_TIMESTAMP = "zilla_timestamp";

protected final StringBuilder fieldBuilder = new StringBuilder();
protected final StringBuilder includeBuilder = new StringBuilder();
protected static final Map<String, String> ZILLA_MAPPINGS = new Object2ObjectHashMap<>();
static
{
ZILLA_MAPPINGS.put("zilla_correlation_id", "INCLUDE header 'zilla:correlation-id' AS %s\n");
ZILLA_MAPPINGS.put("zilla_identity", "INCLUDE header 'zilla:identity' AS %s\n");
ZILLA_MAPPINGS.put("timestamp", "INCLUDE timestamp AS %s\n");
ZILLA_MAPPINGS.put(ZILLA_CORRELATION_ID, "INCLUDE header 'zilla:correlation-id' AS %s\n");
ZILLA_MAPPINGS.put(ZILLA_IDENTITY, "INCLUDE header 'zilla:identity' AS %s\n");
ZILLA_MAPPINGS.put(ZILLA_TIMESTAMP, "INCLUDE timestamp AS %s\n");
}

protected static final Map<String, String> ZILLA_INCLUDE_TYPE_MAPPINGS = new Object2ObjectHashMap<>();
static
{
ZILLA_INCLUDE_TYPE_MAPPINGS.put("zilla_correlation_id", "VARCHAR");
ZILLA_INCLUDE_TYPE_MAPPINGS.put("zilla_identity", "VARCHAR");
ZILLA_INCLUDE_TYPE_MAPPINGS.put("timestamp", "TIMESTAMP");
ZILLA_INCLUDE_TYPE_MAPPINGS.put(ZILLA_CORRELATION_ID, "VARCHAR");
ZILLA_INCLUDE_TYPE_MAPPINGS.put(ZILLA_IDENTITY, "VARCHAR");
ZILLA_INCLUDE_TYPE_MAPPINGS.put(ZILLA_TIMESTAMP, "TIMESTAMP");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class RisingwaveCreateMaterializedViewTemplate extends RisingwaveCommandT
CREATE MATERIALIZED VIEW IF NOT EXISTS %s AS %s;\u0000""";
private final String fieldFormat = "%s, ";
private final String includeFormat = "COALESCE(%s, %s_header::varchar) as %s, ";
private final String timestampFormat = "COALESCE(%s, %s_timestamp::varchar) as %s, ";

public RisingwaveCreateMaterializedViewTemplate()
{
Expand Down Expand Up @@ -63,9 +64,9 @@ public String generate(

includes.forEach((k, v) ->
{
if ("timestamp".equals(k))
if (ZILLA_TIMESTAMP.equals(k))
{
fieldBuilder.append(String.format(fieldFormat, k));
fieldBuilder.append(String.format(timestampFormat, k, k, k));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ public String generateTableSource(
includeBuilder.append("\n");
includes.forEach((k, v) ->
{
if ("timestamp".equals(k))
if (ZILLA_TIMESTAMP.equals(k))
{
includeBuilder.append(String.format(ZILLA_MAPPINGS.get(k), k));
includeBuilder.append(String.format(ZILLA_MAPPINGS.get(k), "%s_timestamp".formatted(k)));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ public String generate(
fieldBuilder.setLength(0);

streamInfo.columns()
.forEach((k, v) -> fieldBuilder.append(
String.format(fieldFormat, k, v)));
.entrySet()
.stream()
.filter(e -> !ZILLA_MAPPINGS.containsKey(e.getKey()))
.forEach(e -> fieldBuilder.append(
String.format(fieldFormat, e.getKey(), e.getValue())));

fieldBuilder.delete(fieldBuilder.length() - 2, fieldBuilder.length());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ public void shouldGenerateMaterializedViewWithIncludes()
columns.put("id", "INT");
columns.put("zilla_correlation_id", "VARCHAR");
columns.put("zilla_identity", "VARCHAR");
columns.put("timestamp", "TIMESTAMP");
columns.put("zilla_timestamp", "TIMESTAMP");

TableInfo tableInfo = new TableInfo("test_table", columns, Set.of("id"));
String expectedSQL = "CREATE MATERIALIZED VIEW IF NOT EXISTS test_table_view AS SELECT id," +
" COALESCE(zilla_correlation_id, zilla_correlation_id_header::varchar) as zilla_correlation_id," +
" COALESCE(zilla_identity, zilla_identity_header::varchar) as zilla_identity, timestamp" +
" COALESCE(zilla_identity, zilla_identity_header::varchar) as zilla_identity," +
" COALESCE(zilla_timestamp, zilla_timestamp_timestamp::varchar) as zilla_timestamp" +
" FROM test_table_source;\u0000";

String actualSQL = template.generate(tableInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ public void shouldGenerateTableSourceWithValidTableInfoAndIncludes()
columns.put("id", "INT");
columns.put("zilla_correlation_id", "VARCHAR");
columns.put("zilla_identity", "VARCHAR");
columns.put("timestamp", "TIMESTAMP");
columns.put("zilla_timestamp", "TIMESTAMP");

TableInfo tableInfo = new TableInfo(
"test_table", columns, Set.of("id"));
String expectedSQL = """
CREATE SOURCE IF NOT EXISTS test_table_source (*)
INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id_header
INCLUDE header 'zilla:identity' AS zilla_identity_header
INCLUDE timestamp AS timestamp
INCLUDE timestamp AS zilla_timestamp_timestamp
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
Expand Down Expand Up @@ -109,6 +109,36 @@ CREATE SOURCE IF NOT EXISTS empty_stream (*)
assertEquals(expectedSQL, actualSQL);
}

@Test
public void shouldGenerateStreamSourceWithEmptyColumnsReturnsSQLWithIncludes()
{
Map<String, String> columns = new LinkedHashMap<>();
columns.put("id", "INT");
columns.put("zilla_correlation_id", "VARCHAR");
columns.put("zilla_identity", "VARCHAR");
columns.put("zilla_timestamp", "TIMESTAMP");

String expectedSQL = """
CREATE SOURCE IF NOT EXISTS include_stream (*)
INCLUDE header 'zilla:correlation-id' AS zilla_correlation_id
INCLUDE header 'zilla:identity' AS zilla_identity
INCLUDE timestamp AS zilla_timestamp
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test_db.include_stream',
scan.startup.mode='latest',
scan.startup.timestamp.millis='1627846260000'
) FORMAT PLAIN ENCODE AVRO (
schema.registry = 'http://localhost:8081'
);\u0000""";
StreamInfo streamInfo = new StreamInfo("include_stream", columns);

String actualSQL = template.generateStreamSource("test_db", streamInfo);

assertEquals(expectedSQL, actualSQL);
}

@Test
public void shouldGenerateTableSourceWithEmptyColumnsAndWithoutIncludes()
{
Expand Down