Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e93cedd
tags api
xupefei Aug 20, 2024
694db1b
Merge branch 'master' of github.com:apache/spark into reverse-api-tag
xupefei Aug 20, 2024
40610a7
rename
xupefei Aug 20, 2024
a70d7d2
address comments
xupefei Aug 21, 2024
0656e25
.
xupefei Aug 21, 2024
6b6ca7f
.
xupefei Aug 21, 2024
d3cd5f5
new approach
xupefei Aug 23, 2024
0922dd2
address comments
xupefei Aug 26, 2024
2a6fcc6
return job IDs earlier
xupefei Aug 26, 2024
ef0fddf
doc
xupefei Aug 26, 2024
f2ad163
no mention of spark session in core
xupefei Aug 27, 2024
ab00685
re
xupefei Aug 27, 2024
dd10f46
fix test
xupefei Aug 27, 2024
bc9b76d
revert some changes
xupefei Aug 28, 2024
8656810
undo
xupefei Aug 28, 2024
1dfafad
wip
xupefei Aug 28, 2024
1d4d5cc
.
xupefei Aug 29, 2024
a35c4e5
Merge branch 'master' of github.com:apache/spark into reverse-api-tag
xupefei Aug 29, 2024
d1208c4
revert unnessesary changes and fix tests
xupefei Aug 29, 2024
13342cf
comment
xupefei Aug 29, 2024
3879989
oh no
xupefei Aug 29, 2024
cf6437f
remove internal tags
xupefei Aug 30, 2024
4d7da3b
Merge branch 'master' of github.com:apache/spark into reverse-api-tag
xupefei Aug 30, 2024
b3b7cbc
test
xupefei Aug 30, 2024
7c9294e
Merge branch 'master' of github.com:apache/spark into reverse-api-tag
xupefei Aug 30, 2024
7338b1d
move doc to api
xupefei Aug 30, 2024
905bf91
fix test
xupefei Sep 3, 2024
514b5e4
address mridulm's comments
xupefei Sep 10, 2024
c6fb41f
address herman's comments
xupefei Sep 10, 2024
2a0292c
address hyukjin's comment
xupefei Sep 10, 2024
2d059b3
Merge branch 'master' of github.com:apache/spark into reverse-api-tag
xupefei Sep 10, 2024
a55c47c
scalastyle
xupefei Sep 16, 2024
e66ba0a
fmt
xupefei Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove internal tags
  • Loading branch information
xupefei committed Aug 30, 2024
commit cf6437f396c71dbcad1e46fd38b45644f0cd0c62
42 changes: 4 additions & 38 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -912,18 +912,11 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def addJobTag(tag: String): Unit = {
SparkContext.throwIfInvalidTag(tag)
val existingTags = getJobTags() ++ getInternalJobTags()
val existingTags = getJobTags()
val newTags = (existingTags + tag).mkString(SparkContext.SPARK_JOB_TAGS_SEP)
setLocalProperty(SparkContext.SPARK_JOB_TAGS, newTags)
}

/**
* Add a tag to be assigned to all the jobs started by this thread. The tag will be prefixed with
* an internal prefix to avoid conflicts with user tags.
*/
private[spark] def addInternalJobTag(tag: String): Unit =
addJobTag(s"${SparkContext.SPARK_JOB_TAGS_INTERNAL_PREFIX}$tag")

/**
* Remove a tag previously added to be assigned to all the jobs started by this thread.
* Noop if such a tag was not added earlier.
Expand All @@ -934,7 +927,7 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def removeJobTag(tag: String): Unit = {
SparkContext.throwIfInvalidTag(tag)
val existingTags = getJobTags() ++ getInternalJobTags()
val existingTags = getJobTags()
val newTags = (existingTags - tag).mkString(SparkContext.SPARK_JOB_TAGS_SEP)
if (newTags.isEmpty) {
clearJobTags()
Expand All @@ -943,22 +936,6 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

/**
* Remove an internal tag previously added to be assigned to all the jobs started by this thread.
*/
private[spark] def removeInternalJobTag(tag: String): Unit =
removeJobTag(s"${SparkContext.SPARK_JOB_TAGS_INTERNAL_PREFIX}$tag")

/**
* Get the tags that are currently set to be assigned to all the jobs started by this thread.
*/
private[spark] def getInternalJobTags(): Set[String] = {
Option(getLocalProperty(SparkContext.SPARK_JOB_TAGS))
.map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet)
.getOrElse(Set())
.filter(_.startsWith(SparkContext.SPARK_JOB_TAGS_INTERNAL_PREFIX)) // only internal tags
}

/**
* Get the tags that are currently set to be assigned to all the jobs started by this thread.
*
Expand All @@ -968,8 +945,7 @@ class SparkContext(config: SparkConf) extends Logging {
Option(getLocalProperty(SparkContext.SPARK_JOB_TAGS))
.map(_.split(SparkContext.SPARK_JOB_TAGS_SEP).toSet)
.getOrElse(Set())
.filterNot(_.startsWith(SparkContext.SPARK_JOB_TAGS_INTERNAL_PREFIX)) // exclude internal tags
.filter(_.nonEmpty) // empty string tag should not happen, but be defensive
.filter(!_.isEmpty) // empty string tag should not happen, but be defensive
}

/**
Expand All @@ -978,14 +954,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @since 3.5.0
*/
def clearJobTags(): Unit = {
val internalTags = getInternalJobTags() // exclude internal tags
if (internalTags.isEmpty) {
setLocalProperty(SparkContext.SPARK_JOB_TAGS, null)
} else {
setLocalProperty(
SparkContext.SPARK_JOB_TAGS,
internalTags.mkString(SparkContext.SPARK_JOB_TAGS_SEP))
}
setLocalProperty(SparkContext.SPARK_JOB_TAGS, null)
}

/**
Expand Down Expand Up @@ -3148,9 +3117,6 @@ object SparkContext extends Logging {
/** Separator of tags in SPARK_JOB_TAGS property */
private[spark] val SPARK_JOB_TAGS_SEP = ","

/** Prefix to mark a tag to be visible internally, not by users */
private[spark] val SPARK_JOB_TAGS_INTERNAL_PREFIX = "~~spark~internal~tag~~"

// Same rules apply to Spark Connect execution tags, see ExecuteHolder.throwIfInvalidTag
private[spark] def throwIfInvalidTag(tag: String) = {
if (tag == null) {
Expand Down
14 changes: 5 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ class SparkSession private(
* @since 4.0.0
*/
def interruptAll(): Seq[String] =
doInterruptTag(sessionJobTag, "as part of cancellation of all jobs", tagIsInternal = true)
doInterruptTag(sessionJobTag, "as part of cancellation of all jobs")

/**
* Request to interrupt all currently running operations of this session with the given job tag.
Expand All @@ -897,16 +897,12 @@ class SparkSession private(
def interruptTag(tag: String): Seq[String] = {
val realTag = managedJobTags.get(tag)
if (realTag == null) return Seq.empty
doInterruptTag(realTag, s"part of cancelled job tags $tag", tagIsInternal = false)
doInterruptTag(realTag, s"part of cancelled job tags $tag")
}

private def doInterruptTag(
tag: String,
reason: String,
tagIsInternal: Boolean): Seq[String] = {
val realTag = if (tagIsInternal) s"${SparkContext.SPARK_JOB_TAGS_INTERNAL_PREFIX}$tag" else tag
private def doInterruptTag(tag: String, reason: String): Seq[String] = {
val cancelledTags =
sparkContext.cancelJobsWithTagWithFuture(realTag, reason)
sparkContext.cancelJobsWithTagWithFuture(tag, reason)

ThreadUtils.awaitResult(cancelledTags, 60.seconds)
.flatMap(job => Option(job.properties.getProperty(SQLExecution.EXECUTION_ROOT_ID_KEY)))
Expand All @@ -926,7 +922,7 @@ class SparkSession private(
scala.util.Try(executionId.toLong).toOption match {
case Some(executionIdToBeCancelled) =>
val tagToBeCancelled = SQLExecution.executionIdJobTag(this, executionIdToBeCancelled)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)

doInterruptTag(tagToBeCancelled, reason = "", tagIsInternal = true)
doInterruptTag(tagToBeCancelled, reason = "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps set a reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to let it to have the same reason as APIs in SparkContext. Will udpate.

case None =>
throw new IllegalArgumentException("executionId must be a number in string form.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object SQLExecution extends Logging {
// And for the root execution, rootExecutionId == executionId.
if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) {
sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId.toString)
sc.addInternalJobTag(executionIdJobTag(sparkSession, executionId))
sc.addJobTag(executionIdJobTag(sparkSession, executionId))
}
val rootExecutionId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY).toLong
executionIdToQueryExecution.put(executionId, queryExecution)
Expand Down Expand Up @@ -133,7 +133,7 @@ object SQLExecution extends Logging {
sparkPlanInfo = SparkPlanInfo.EMPTY,
time = System.currentTimeMillis(),
modifiedConfigs = redactedConfigs,
jobTags = sc.getJobTags() ++ sc.getInternalJobTags(),
jobTags = sc.getJobTags(),
jobGroupId = Option(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
)
try {
Expand Down Expand Up @@ -217,7 +217,7 @@ object SQLExecution extends Logging {
// The current execution is the root execution if rootExecutionId == executionId.
if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == executionId.toString) {
sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, null)
sc.removeInternalJobTag(executionIdJobTag(sparkSession, executionId))
sc.removeJobTag(executionIdJobTag(sparkSession, executionId))
}
sc.setLocalProperty(SPARK_JOB_INTERRUPT_ON_CANCEL, originalInterruptOnCancel)
}
Expand Down Expand Up @@ -257,14 +257,14 @@ object SQLExecution extends Logging {
}

private[sql] def withSessionTagsApplied[T](sparkSession: SparkSession)(block: => T): T = {
sparkSession.sparkContext.addInternalJobTag(sparkSession.sessionJobTag)
sparkSession.sparkContext.addJobTag(sparkSession.sessionJobTag)
val userTags = sparkSession.managedJobTags.values().asScala.toSeq
userTags.foreach(sparkSession.sparkContext.addJobTag)

try {
block
} finally {
sparkSession.sparkContext.removeInternalJobTag(sparkSession.sessionJobTag)
sparkSession.sparkContext.removeJobTag(sparkSession.sessionJobTag)
userTags.foreach(sparkSession.sparkContext.removeJobTag)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,18 @@ class SparkSessionJobTaggingAndCancellationSuite
val tags = job.head.properties.get(SparkContext.SPARK_JOB_TAGS).asInstanceOf[String]
.split(SparkContext.SPARK_JOB_TAGS_SEP)

val sessionTag = s"${SparkContext.SPARK_JOB_TAGS_INTERNAL_PREFIX}${ss.sessionJobTag}"
val executionRootIdTag = SparkContext.SPARK_JOB_TAGS_INTERNAL_PREFIX +
SQLExecution.executionIdJobTag(
val executionRootIdTag = SQLExecution.executionIdJobTag(
ss,
job.head.properties.get(SQLExecution.EXECUTION_ROOT_ID_KEY).asInstanceOf[String].toLong)
val userTagsPrefix = s"spark-session-${ss.sessionUUID}-"

ss match {
case s if s == sessionA => assert(tags.toSet == Set(
sessionTag, executionRootIdTag, s"${userTagsPrefix}one"))
s.sessionJobTag, executionRootIdTag, s"${userTagsPrefix}one"))
case s if s == sessionB => assert(tags.toSet == Set(
sessionTag, executionRootIdTag, s"${userTagsPrefix}one", s"${userTagsPrefix}two"))
s.sessionJobTag, executionRootIdTag, s"${userTagsPrefix}one", s"${userTagsPrefix}two"))
case s if s == sessionC => assert(tags.toSet == Set(
sessionTag, executionRootIdTag, s"${userTagsPrefix}boo"))
s.sessionJobTag, executionRootIdTag, s"${userTagsPrefix}boo"))
}
}

Expand Down