Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,30 @@ read advised zilla:flush ${pgsql:flushEx()
.build()
.build()}

write zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
write "FLUSH;"
[0x00]

write flush

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("FLUSH")
.build()
.build()}

read advised zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}

connect "zilla://streams/app1"
option zilla:window 8192
option zilla:transmission "duplex"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,29 @@ write advise zilla:flush ${pgsql:flushEx()
.build()
.build()}

read zilla:data.ext ${pgsql:dataEx()
.typeId(zilla:id("pgsql"))
.query()
.build()
.build()}
read "FLUSH;"
[0x00]

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.completion()
.tag("FLUSH")
.build()
.build()}

write advise zilla:flush ${pgsql:flushEx()
.typeId(zilla:id("pgsql"))
.ready()
.status("IDLE")
.build()
.build()}


accepted


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.aklivity.zilla.runtime.binding.risingwave.internal.stream.RisingwaveCompletionCommand;
import io.aklivity.zilla.runtime.binding.risingwave.internal.types.stream.PgsqlFlushExFW;

public class RisingwaveCreateZfunctionMacro
public class RisingwaveCreateZfunctionMacro extends RisingwaveMacroBase
{
private static final String ZFUNCTION_NAME = "zfunctions";

Expand All @@ -35,6 +35,8 @@ public RisingwaveCreateZfunctionMacro(
CreateZfunction command,
RisingwaveMacroHandler handler)
{
super(sql, handler);

this.systemSchema = systemSchema;
this.user = user;
this.sql = sql;
Expand Down Expand Up @@ -67,6 +69,43 @@ public void onStarted(
handler.doExecuteSystemClient(traceId, authorization, sqlQuery);
}

@Override
public RisingwaveMacroState onReady(
long traceId,
long authorization,
PgsqlFlushExFW flushEx)
{
FlushState state = new FlushState();
state.onStarted(traceId, authorization);

return state;
}

@Override
public RisingwaveMacroState onError(
long traceId,
long authorization,
PgsqlFlushExFW flushEx)
{
handler.doFlushProxy(traceId, authorization, flushEx);

return errorState();
}
}

private final class FlushState implements RisingwaveMacroState
{
private final String sqlFormat = """
FLUSH;\u0000""";

@Override
public void onStarted(
long traceId,
long authorization)
{
handler.doExecuteSystemClient(traceId, authorization, sqlFormat);
}

@Override
public RisingwaveMacroState onCompletion(
long traceId,
Expand Down