Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix compile
  • Loading branch information
juliuszsompolski committed Jul 6, 2023
commit 2ba6d26ddfe07effecf0eb6c34da9d4733c138b1
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,6 @@ import org.apache.spark.connect.proto.ExecutePlanResponse
class ExecutePlanResponseObserver(responseObserver: StreamObserver[ExecutePlanResponse])
extends StreamObserver[ExecutePlanResponse] {

val jobTag =
"SparkConnect_" +
s"User_${sessionHolder.userId}_Session_${sessionHolder.sessionId}_Request_${operationId}"

def interrupt(): Unit = {
// TODO/WIP: This only interrupts active Spark jobs that are actively running.
// This would then throw the error from ExecutePlan and terminate it.
// But if the query is not running a Spark job, but executing code on Spark driver, this
// would be a noop and the execution will keep running.
sessionHolder.session.sparkContext.cancelJobsWithTag(jobTag)
}

def onNext(r: ExecutePlanResponse): Unit = {
responseObserver.onNext(r)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ class SparkConnectPlanExecution(executionHolder: ExecutionHolder) {
Dataset.ofRows(sessionHolder.session, planner.transformRelation(request.getPlan.getRoot))
responseObserver.onNext(createSchemaResponse(request.getSessionId, dataframe.schema))
processAsArrowBatches(request.getSessionId, dataframe, responseObserver)
responseObserver.onNext(createMetricsResponse(request.getSessionId, dataframe))
responseObserver.onNext(MetricGenerator.createMetricsResponse(request.getSessionId, dataframe))
if (dataframe.queryExecution.observedMetrics.nonEmpty) {
responseObserver.onNext(
SparkConnectStreamHandler.sendObservedMetricsToResponse(request.getSessionId, dataframe))
createObservedMetricsResponse(request.getSessionId, dataframe))
}
responseObserver.onCompleted()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.SparkSQLException
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{ExecutePlanRequest, ExecutePlanResponse}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connect.artifact.SparkConnectArtifactManager
import org.apache.spark.sql.connect.common.ProtoUtils
import org.apache.spark.sql.connect.execution.{ExecutePlanResponseObserver, SparkConnectPlanExecution}
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
Expand Down Expand Up @@ -96,7 +95,7 @@ case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) ex
}
}

protected def execute() = SparkConnectArtifactManager.withArtifactClassLoader {
protected def execute() = {
try {
// synchronized - check if already got interrupted while starting.
synchronized {
Expand All @@ -111,8 +110,8 @@ case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) ex
val debugString = requestString(executePlanRequest.get)

// Set tag for query cancellation
session.sparkContext.addJobTag(executeHolder.jobTag)
session.sparkContext.setDescription(
session.sparkContext.addJobTag(jobTag)
session.sparkContext.setJobDescription(
s"Spark Connect - ${StringUtils.abbreviate(debugString, 128)}")
session.sparkContext.setInterruptOnCancel(true)

Expand Down Expand Up @@ -141,7 +140,7 @@ case class ExecutionHolder(operationId: String, sessionHolder: SessionHolder) ex
// scalastyle:off
logDebug(s"Exception in execute: $e")
// Always cancel all remaining execution after error.
sessionHolder.session.sparkContext.cancelJobGroup(jobGroupId)
sessionHolder.session.sparkContext.cancelJobsWithTag(jobTag)
executionError = if (interrupted) {
// Turn the interrupt into OPERATION_CANCELLED error.
Some(new SparkSQLException("OPERATION_CANCELLED", Map.empty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.util.control.NonFatal

import org.apache.spark.JobArtifactSet
import org.apache.spark.SparkException
import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
Expand Down