Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
lint
  • Loading branch information
juliuszsompolski committed Jul 20, 2023
commit 5112ccfc48c7a2805773f0d15facb1e2f25c65a3
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,12 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
spark.addTag("one")
assert(spark.getTags() == Set("one"))
try {
spark.range(10).map(n => {
Thread.sleep(30000); n
}).collect()
spark
.range(10)
.map(n => {
Thread.sleep(30000); n
})
.collect()
} finally {
spark.clearTags() // clear for the case of thread reuse by another Future
}
Expand All @@ -139,9 +142,12 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
spark.addTag("one")
spark.addTag("two") // duplicates shouldn't matter
try {
spark.range(10).map(n => {
Thread.sleep(30000); n
}).collect()
spark
.range(10)
.map(n => {
Thread.sleep(30000); n
})
.collect()
} finally {
spark.clearTags() // clear for the case of thread reuse by another Future
}
Expand All @@ -154,9 +160,12 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
spark.addTag("two")
assert(spark.getTags() == Set("two"))
try {
spark.range(10).map(n => {
Thread.sleep(30000); n
}).collect()
spark
.range(10)
.map(n => {
Thread.sleep(30000); n
})
.collect()
} finally {
spark.clearTags() // clear for the case of thread reuse by another Future
}
Expand All @@ -170,9 +179,12 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
spark.removeTag("two") // check that remove works, despite duplicate add
assert(spark.getTags() == Set("one"))
try {
spark.range(10).map(n => {
Thread.sleep(30000); n
}).collect()
spark
.range(10)
.map(n => {
Thread.sleep(30000); n
})
.collect()
} finally {
spark.clearTags() // clear for the case of thread reuse by another Future
}
Expand All @@ -184,7 +196,9 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
eventually(timeout(20.seconds), interval(1.seconds)) {
val ids = spark.interruptTag("two")
interrupted ++= ids
assert(interrupted.distinct.length == 2, s"Interrupted operations: ${interrupted.distinct}.")
assert(
interrupted.distinct.length == 2,
s"Interrupted operations: ${interrupted.distinct}.")
}
val e2 = intercept[SparkException] {
ThreadUtils.awaitResult(q2, 1.minute)
Expand All @@ -201,7 +215,9 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
eventually(timeout(20.seconds), interval(1.seconds)) {
val ids = spark.interruptTag("one")
interrupted ++= ids
assert(interrupted.distinct.length == 2, s"Interrupted operations: ${interrupted.distinct}.")
assert(
interrupted.distinct.length == 2,
s"Interrupted operations: ${interrupted.distinct}.")
}
val e1 = intercept[SparkException] {
ThreadUtils.awaitResult(q1, 1.minute)
Expand All @@ -218,9 +234,12 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
val session = spark
import session.implicits._

val result = spark.range(10).map(n => {
Thread.sleep(5000); n
}).collectResult()
val result = spark
.range(10)
.map(n => {
Thread.sleep(5000); n
})
.collectResult()
// cancel
val operationId = result.operationId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that we block on receiving this operationId from the server? When that happens we can only interrupt all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently getting the operationId is indeed a bit awkward, and adding tags interruptTag is a more convenient API to use.
This will still be improved in a followup PR for async and detachable API execution. Some preparation is here already - https://github.com/apache/spark/pull/42009/files#diff-3cad257dc0c15b4d091beebdfd42659f803193c23667425d8926b84113a2a312R288 operationId can also be passed in ExecutePlanRequest, so that the client already knows it from the start and doesn't need to take it from first response.
In my followup PR I also plan to add a response that would always be sent right at the beginning of the query, so that even if the client did not set the operationId, it can get it right away.

val canceledId = spark.interruptOperation(operationId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends
} finally {
executeHolder.sessionHolder.session.sparkContext.removeJobTag(executeHolder.jobTag)
executeHolder.userDefinedTags.foreach { tag =>
executeHolder.sessionHolder.session.sparkContext.removeJobTag(
executeHolder.tagToSparkJobTag(tag))
executeHolder.sessionHolder.session.sparkContext
.removeJobTag(executeHolder.tagToSparkJobTag(tag))
}
}
} catch {
Expand Down