Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cli/clisqlshell/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//pkg/sql/scanner",
"//pkg/sql/sqlfsm",
"//pkg/util/envutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_knz_go_libedit//:go-libedit",
],
Expand Down
8 changes: 8 additions & 0 deletions pkg/cli/clisqlshell/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

democlusterapi "github.com/cockroachdb/cockroach/pkg/cli/democluster/api"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

// Context represents the external configuration of the interactive
Expand Down Expand Up @@ -71,4 +72,11 @@ type internalContext struct {

// current database name, if known. This is maintained on a best-effort basis.
dbName string

// state about the current query.
mu struct {
syncutil.Mutex
cancelFn func()
doneCh chan struct{}
}
}
104 changes: 96 additions & 8 deletions pkg/cli/clisqlshell/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/url"
"os"
"os/exec"
"os/signal"
"path/filepath"
"regexp"
"sort"
Expand Down Expand Up @@ -1607,10 +1608,11 @@ func (c *cliState) doRunStatements(nextState cliStateEnum) cliStateEnum {
}

// Now run the statement/query.
c.exitErr = c.sqlExecCtx.RunQueryAndFormatResults(
context.Background(),
c.conn, c.iCtx.stdout, c.iCtx.stderr,
clisqlclient.MakeQuery(c.concatLines))
c.exitErr = c.runWithInterruptableCtx(func(ctx context.Context) error {
return c.sqlExecCtx.RunQueryAndFormatResults(ctx,
c.conn, c.iCtx.stdout, c.iCtx.stderr,
clisqlclient.MakeQuery(c.concatLines))
})
if c.exitErr != nil {
if !c.singleStatement {
clierror.OutputError(c.iCtx.stderr, c.exitErr, true /*showSeverity*/, false /*verbose*/)
Expand Down Expand Up @@ -1640,10 +1642,11 @@ func (c *cliState) doRunStatements(nextState cliStateEnum) cliStateEnum {
if strings.Contains(c.iCtx.autoTrace, "kv") {
traceType = "kv"
}
if err := c.sqlExecCtx.RunQueryAndFormatResults(
context.Background(),
c.conn, c.iCtx.stdout, c.iCtx.stderr,
clisqlclient.MakeQuery(fmt.Sprintf("SHOW %s TRACE FOR SESSION", traceType))); err != nil {
if err := c.runWithInterruptableCtx(func(ctx context.Context) error {
return c.sqlExecCtx.RunQueryAndFormatResults(ctx,
c.conn, c.iCtx.stdout, c.iCtx.stderr,
clisqlclient.MakeQuery(fmt.Sprintf("SHOW %s TRACE FOR SESSION", traceType)))
}); err != nil {
clierror.OutputError(c.iCtx.stderr, err, true /*showSeverity*/, false /*verbose*/)
if c.exitErr == nil {
// Both the query and SET tracing had encountered no error
Expand Down Expand Up @@ -1705,6 +1708,9 @@ func NewShell(

// RunInteractive implements the Shell interface.
func (c *cliState) RunInteractive(cmdIn, cmdOut, cmdErr *os.File) (exitErr error) {
finalFn := c.maybeHandleInterrupt()
defer finalFn()

return c.doRunShell(cliStart, cmdIn, cmdOut, cmdErr)
}

Expand Down Expand Up @@ -1986,3 +1992,85 @@ func (c *cliState) serverSideParse(sql string) (helpText string, err error) {
}
return "", nil
}

func (c *cliState) maybeHandleInterrupt() func() {
if !c.cliCtx.IsInteractive {
return func() {}
}
intCh := make(chan os.Signal, 1)
signal.Notify(intCh, os.Interrupt)
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case <-intCh:
c.iCtx.mu.Lock()
cancelFn, doneCh := c.iCtx.mu.cancelFn, c.iCtx.mu.doneCh
c.iCtx.mu.Unlock()
if cancelFn == nil {
// No query currently executing; nothing to do.
continue
}

fmt.Fprintf(c.iCtx.stderr, "\nattempting to cancel query...\n")
// Cancel the query's context, which should make the driver
// send a cancellation message.
cancelFn()

// Now wait for the shell to process the cancellation.
//
// If it takes too long (e.g. server has become unresponsive,
// or we're connected to a pre-22.1 server which does not
// support cancellation), fall back to the previous behavior
// which is to interrupt the shell altogether.
tooLongTimer := time.After(3 * time.Second)
wait:
for {
select {
case <-doneCh:
break wait
case <-tooLongTimer:
fmt.Fprintln(c.iCtx.stderr, "server does not respond to query cancellation; a second interrupt will stop the shell.")
signal.Reset(os.Interrupt)
}
}
// Re-arm the signal handler.
signal.Notify(intCh, os.Interrupt)

case <-ctx.Done():
// Shell is terminating.
return
}
}
}()
return cancel
}

func (c *cliState) runWithInterruptableCtx(fn func(ctx context.Context) error) error {
if !c.cliCtx.IsInteractive {
return fn(context.Background())
}
// The cancellation function can be used by the Ctrl+C handler
// to cancel this query.
ctx, cancel := context.WithCancel(context.Background())
// doneCh will be used on the return path to teach the Ctrl+C
// handler that the query has been cancelled successfully.
doneCh := make(chan struct{})
defer func() { close(doneCh) }()

// Inform the Ctrl+C handler that this query is executing.
c.iCtx.mu.Lock()
c.iCtx.mu.cancelFn = cancel
c.iCtx.mu.doneCh = doneCh
c.iCtx.mu.Unlock()
defer func() {
c.iCtx.mu.Lock()
c.iCtx.mu.cancelFn = nil
c.iCtx.mu.doneCh = nil
c.iCtx.mu.Unlock()
}()

// Now run the query.
err := fn(ctx)
return err
}
108 changes: 108 additions & 0 deletions pkg/cli/interactive_tests/test_interrupt.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#! /usr/bin/env expect -f

source [file join [file dirname $argv0] common.tcl]

start_server $argv

spawn $argv sql
eexpect "defaultdb>"

start_test "Check that interrupt with a partial line clears the line."
send "asasaa"
interrupt
eexpect "defaultdb>"
end_test

start_test "Check that interrupt with a multiline clears the current line."
send "select\r"
eexpect " -> "
send "'XXX'"
interrupt
send "'YYY';\r"
eexpect "column"
eexpect "YYY"
eexpect "defaultdb>"
end_test

start_test "Check that interrupt with an empty line prints an information message."
interrupt
eexpect "terminate input to exit"
eexpect "defaultdb>"
end_test

start_test "Check that interrupt can cancel a query."
send "select pg_sleep(1000000);\r"
eexpect "\r\n"
sleep 0.4
interrupt
eexpect "query execution canceled"
eexpect "57014"

# TODO(knz): we currently need to trigger a reconnection
# before we get a healthy prompt. This will be fixed
# in a later version.
send "\r"
eexpect "defaultdb>"
end_test

# Quit the SQL client, and open a unix shell.
send_eof
eexpect eof
spawn /bin/bash
set shell2_spawn_id $spawn_id
send "PS1=':''/# '\r"
eexpect ":/# "

start_test "Check that interactive, non-terminal queries are cancellable without terminating the shell."
send "$argv sql >/dev/null\r"
eexpect "\r\n"
sleep 0.4
send "select 'A'+3;\r"
eexpect "\r\n"
# Sanity check: we can still process _some_ SQL.
# stderr is not redirected so we still see errors.
eexpect "unsupported binary operator"

# Now on to check cancellation.
send "select pg_sleep(10000);\r"
sleep 0.4
interrupt
eexpect "query execution canceled"
eexpect "57014"

# TODO(knz): we currently need to trigger a reconnection
# before we get a healthy prompt. This will be fixed
# in a later version.
send "\rselect 1;\r"

# Send another query, expect an error. The shell should
# not have terminated by this point.
send "select 'A'+3;\r"
eexpect "\r\n"
eexpect "unsupported binary operator"

# Terminate SQL shell, expect unix shell.
send_eof
eexpect ":/# "

start_test "Check that non-interactive interrupts terminate the SQL shell."
send "cat | $argv sql\r"
eexpect "\r\n"
sleep 0.4
# Sanity check.
send "select 'XX'||'YY';\r"
eexpect "XXYY"

# Check what interrupt does.
send "select pg_sleep(10000);\r"
sleep 0.4
interrupt
# This exits the SQL shell directly. expect unix shell.
eexpect ":/# "

end_test

send "exit 0\r"
eexpect eof

stop_server $argv