Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
nits
  • Loading branch information
juliuszsompolski committed Jul 12, 2023
commit 0f889638a14759bb4bff73eb3f3899fe388d1084
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,21 @@ private[connect] class ExecuteGrpcResponseSender[T](

/**
* Attach to the executionObserver, consume responses from it, and send them to grpcObserver.
* @param lastSentIndex Start sending the stream from response after this.
* @param lastConsumedStreamIndex the last index that was already consumed and sent.
* This sender will start from index after that.
* 0 means start from beginning (since first response has index 1)
*
* @return true if the execution was detached before stream completed.
* The caller needs to finish the grpcObserver stream
* false if stream was finished. In this case, grpcObserver stream is already completed.
*/
def run(executionObserver: ExecuteResponseObserver[T], lastSentIndex: Long): Boolean = {
def run(
executionObserver: ExecuteResponseObserver[T],
lastConsumedStreamIndex: Long): Boolean = {
// register to be notified about available responses.
executionObserver.attachConsumer(this)

var nextIndex = lastSentIndex + 1
var nextIndex = lastConsumedStreamIndex + 1
var finished = false

while (!finished) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ private[connect] class ExecuteResponseObserver[T]()
* and to assert that all responses are consumed. */
private var highestConsumedIndex: Long = 0

// sender to notify of available responses.
/** Consumer that waits for available responses.
* There can be only one at a time, @see attachConsumer. */
private var responseSender: Option[ExecuteGrpcResponseSender[T]] = None

def onNext(r: T): Unit = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder)

/** Launches the execution in a background thread, returns immediately. */
def start(): Unit = {
this.executionThread.start()
executionThread.start()
}

/** Joins the background execution thread after it is finished. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils

/**
* Handle ExecutePlanRequest where the operatoin to handle is Plan execution of type
* Handle ExecutePlanRequest where the operation to handle is of `Plan` type.
* proto.Plan.OpTypeCase.ROOT
* @param executeHolder
*/
class SparkConnectPlanExecution(executeHolder: ExecuteHolder) {
private[execution] class SparkConnectPlanExecution(executeHolder: ExecuteHolder) {

private val sessionHolder = executeHolder.sessionHolder
private val session = executeHolder.session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.connect.execution.{ExecuteGrpcResponseSender, ExecuteResponseObserver, ExecuteThreadRunner}

/**
* Object used to hold the Spark Connect execution state, and perform
* T - response type of the execution.
* Object used to hold the Spark Connect execution state.
*/
case class ExecuteHolder(
request: proto.ExecutePlanRequest,
operationId: String,
sessionHolder: SessionHolder) extends Logging {
private[connect] class ExecuteHolder(
val request: proto.ExecutePlanRequest,
val operationId: String,
val sessionHolder: SessionHolder) extends Logging {

val jobTag =
s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
private[connect] def createExecuteHolder(request: ExecutePlanRequest): ExecuteHolder = {

val operationId = UUID.randomUUID().toString
val executePlanHolder = ExecuteHolder(request, operationId, this)
val executePlanHolder = new ExecuteHolder(request, operationId, this)
assert(executions.putIfAbsent(operationId, executePlanHolder) == null)
executePlanHolder
}
Expand Down