Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
renames
  • Loading branch information
juliuszsompolski committed Jul 6, 2023
commit bbf87618d78b49c4d3864b4359051e7c6c7337f1
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.connect.common.LiteralValueProtoConverter.toLiteralProto
import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_ARROW_MAX_BATCH_SIZE
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.connect.service.ExecutionHolder
import org.apache.spark.sql.connect.service.ExecuteHolder
import org.apache.spark.sql.execution.{LocalTableScanExec, SQLExecution}
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.types.StructType
Expand All @@ -40,17 +40,17 @@ import org.apache.spark.util.ThreadUtils
/**
* Handle ExecutePlanRequest where the operatoin to handle is Plan execution of type
* proto.Plan.OpTypeCase.ROOT
* @param executionHolder
* @param executeHolder
*/
class SparkConnectPlanExecution(executionHolder: ExecutionHolder) {
class SparkConnectPlanExecution(executeHolder: ExecuteHolder) {

private val sessionHolder = executionHolder.sessionHolder
private val session = executionHolder.session
private val sessionHolder = executeHolder.sessionHolder
private val session = executeHolder.session

def handlePlan(responseObserver: ExecutePlanResponseObserver): Unit = {
val request = executionHolder.executePlanRequest.getOrElse {
val request = executeHolder.executePlanRequest.getOrElse {
throw new IllegalStateException(
s"Execution ${executionHolder.operationId} doesn't have an ExecutePlanRequest.")
s"Execution ${executeHolder.operationId} doesn't have an ExecutePlanRequest.")
}
if (request.getPlan.getOpTypeCase != proto.Plan.OpTypeCase.ROOT) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.util.Utils
/**
* Object used to hold the Spark Connect execution state, and perform
*/
case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {
case class ExecuteHolder(operationId: String, sessionHolder: SessionHolder) extends Logging {

val jobTag =
s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ import org.apache.spark.util.Utils
case class SessionHolder(userId: String, sessionId: String, session: SparkSession)
extends Logging {

val executions: ConcurrentMap[String, ExecutionHolder] =
new ConcurrentHashMap[String, ExecutionHolder]()
val executions: ConcurrentMap[String, ExecuteHolder] =
new ConcurrentHashMap[String, ExecuteHolder]()

// Mapping from relation ID (passed to client) to runtime dataframe. Used for callbacks like
// foreachBatch() in Streaming. Lazy since most sessions don't need it.
private lazy val dataFrameCache: ConcurrentMap[String, DataFrame] = new ConcurrentHashMap()

private[connect] def createExecutionHolder(): ExecutionHolder = {
private[connect] def createExecuteHolder(): ExecuteHolder = {

val operationId = UUID.randomUUID().toString
val executePlanHolder = ExecutionHolder(operationId, this)
val executePlanHolder = ExecuteHolder(operationId, this)
assert(executions.putIfAbsent(operationId, executePlanHolder) == null)
executePlanHolder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
def handle(v: ExecutePlanRequest): Unit = {
val sessionHolder = SparkConnectService
.getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
val executionHolder = sessionHolder.createExecutionHolder()
val executeHolder = sessionHolder.createExecuteHolder()
try {
executionHolder.run(v, responseObserver)
executeHolder.run(v, responseObserver)
} finally {
sessionHolder.removeExecutePlanHolder(executionHolder.operationId)
sessionHolder.removeExecutePlanHolder(executeHolder.operationId)
}
}
}