Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint
  • Loading branch information
juliuszsompolski committed Jul 20, 2023
commit 6095e6b01aa6dfcb6d43051a08fe41780f9736fd
Original file line number Diff line number Diff line change
Expand Up @@ -632,13 +632,13 @@ message InterruptRequest {
enum InterruptType {
INTERRUPT_TYPE_UNSPECIFIED = 0;

// Interrupt all running executions within the session with provided the session_id.
// Interrupt all running executions within the session with the provided session_id.
INTERRUPT_TYPE_ALL = 1;

// Interrupt all running executions within the session with the provided tag.
// Interrupt all running executions within the session with the provided operation_tag.
INTERRUPT_TYPE_TAG = 2;

// Interrupt the running execution within the session with the provided id.
// Interrupt the running execution within the session with the provided operation_id.
INTERRUPT_TYPE_ID = 3;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ private[connect] class ExecuteHolder(
s"Spark Connect execution tag cannot contain '${SparkContext.SPARK_JOB_TAGS_SEP}'.")
}
if (tag.isEmpty) {
throw new IllegalArgumentException(
"Spark Connect execution tag cannot be an empty string.")
throw new IllegalArgumentException("Spark Connect execution tag cannot be an empty string.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
case _: IllegalArgumentException =>
throw new SparkSQLException(
errorClass = "INVALID_HANDLE.FORMAT",
messageParameters = Map("handle" -> request.getOperationId)
)
messageParameters = Map("handle" -> request.getOperationId))
}
} else {
UUID.randomUUID().toString
Expand All @@ -74,8 +73,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
if (oldExecute != null) {
throw new SparkSQLException(
errorClass = "INVALID_HANDLE.ALREADY_EXISTS",
messageParameters = Map("handle" -> operationId)
)
messageParameters = Map("handle" -> operationId))
}
executePlanHolder
}
Expand All @@ -88,8 +86,11 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
executions.remove(operationId)
}

/** Interrupt all executions in the session.
* @return list of operationIds of interrupted executions */
/**
* Interrupt all executions in the session.
* @return
* list of operationIds of interrupted executions
*/
private[connect] def interruptAll(): Seq[String] = {
val interruptedIds = new mutable.ArrayBuffer[String]()
executions.asScala.values.foreach { execute =>
Expand All @@ -99,8 +100,11 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
interruptedIds.toSeq
}

/** Interrupt executions in the session with a given tag.
* @return list of operationIds of interrupted executions */
/**
* Interrupt executions in the session with a given tag.
* @return
* list of operationIds of interrupted executions
*/
private[connect] def interruptTag(tag: String): Seq[String] = {
val interruptedIds = new mutable.ArrayBuffer[String]()
executions.asScala.values.foreach { execute =>
Expand All @@ -112,8 +116,11 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
interruptedIds.toSeq
}

/** Interrupt the execution with the given operation_id
* @return list of operationIds of interrupted executions (one element or empty) */
/**
* Interrupt the execution with the given operation_id
* @return
* list of operationIds of interrupted executions (one element or empty)
*/
private[connect] def interruptOperation(operationId: String): Seq[String] = {
val interruptedIds = new mutable.ArrayBuffer[String]()
Option(executions.get(operationId)).foreach { execute =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ class SparkConnectInterruptHandler(responseObserver: StreamObserver[proto.Interr
throw new IllegalArgumentException(
s"INTERRUPT_TYPE_ID requested, but no operation_id provided.")
}
sessionHolder.interruptTag(v.getOperationId)
sessionHolder.interruptOperation(v.getOperationId)
case other =>
throw new UnsupportedOperationException(s"Unknown InterruptType $other!")
}

val response = proto.InterruptResponse.newBuilder()
val response = proto.InterruptResponse
.newBuilder()
.setSessionId(v.getSessionId)
.addAllInterruptedIds(interruptedIds.asJava)
.build()
Expand Down