diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index 44b634af95ca9..72c2b0e3f1095 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -241,14 +241,13 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( // The state of interrupted, response and lastIndex are changed under executionObserver // monitor, and will notify upon state change. if (response.isEmpty) { + val timeout = Math.max(1, deadlineTimeMillis - System.currentTimeMillis()) // Wake up more frequently to send the progress updates. val progressTimeout = executeHolder.sessionHolder.session.sessionState.conf .getConf(CONNECT_PROGRESS_REPORT_INTERVAL) // If the progress feature is disabled, wait for the deadline. - val timeout = if (progressTimeout > 0) { - progressTimeout - } else { - Math.max(1, deadlineTimeMillis - System.currentTimeMillis()) + if (progressTimeout > 0L) { + Math.min(progressTimeout, timeout) } logTrace(s"Wait for response to become available with timeout=$timeout ms.") executionObserver.responseLock.wait(timeout)