Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Include generated
  • Loading branch information
akrambek committed Jan 8, 2025
commit e08bb0b8f946e0edb7789e2e154c53206dea1988
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ write zilla:data.ext ${pgsql:dataEx()
write "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n"
" SELECT\n"
" c.correlation_id,\n"
" c.owner_id,\n"
" c.created_at,\n"
" CASE\n"
" WHEN balance >= c.amount THEN 'PaymentSent'\n"
" ELSE 'PaymentDeclined'\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ read zilla:data.ext ${pgsql:dataEx()
read "CREATE MATERIALIZED VIEW zb_catalog.send_payment_handler AS\n"
" SELECT\n"
" c.correlation_id,\n"
" c.owner_id,\n"
" c.created_at,\n"
" CASE\n"
" WHEN balance >= c.amount THEN 'PaymentSent'\n"
" ELSE 'PaymentDeclined'\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ private final class CreateHandlerMaterializedViewState implements RisingwaveMacr
CREATE MATERIALIZED VIEW %s.%s AS
SELECT
c.correlation_id,
%s,
%s
FROM %s.%s_commands c
%s
Expand Down Expand Up @@ -316,6 +317,11 @@ public void onStarted(
}
}

String include = command.columns().stream()
.filter(c -> ZILLA_MAPPINGS.containsKey(c.generatedAlways()))
.map(c -> "c.%s".formatted(c.name()))
.collect(Collectors.joining(",\n "));

String commandName = command.commandHandlers().entrySet().stream()
.filter(e -> e.getValue().equals(name))
.map(Map.Entry::getKey)
Expand All @@ -324,7 +330,8 @@ public void onStarted(

String where = "c.%s = '%s'".formatted(command.dispatchOn(), commandName);

String sqlQuery = String.format(sqlFormat, systemSchema, name, columns, systemSchema, from, join, where);
String sqlQuery = String.format(sqlFormat, systemSchema, name, include, columns,
systemSchema, from, join, where);

handler.doExecuteSystemClient(traceId, authorization, sqlQuery);
}
Expand Down