Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2a2df72
Palantir Build infrastructure, Palantir Hadoop, Palantir Parquet
rshkv Feb 26, 2021
5075e0a
[SPARK-33770][SQL][TESTS][3.1][3.0] Fix the `ALTER TABLE .. DROP PART…
MaxGekk Dec 14, 2020
397184c
[SPARK-25200][YARN] Allow specifying HADOOP_CONF_DIR as spark property
rshkv Feb 23, 2021
bbd8d9b
[SPARK-26626][SQL] Maximum size for repeatedly substituted aliases in…
rshkv Feb 23, 2021
4ba513b
[SPARK-20952] ParquetFileFormat should forward TaskContext to its for…
rshkv Feb 23, 2021
ce81128
[SPARK-18079] [SQL] CollectLimitExec.executeToIterator should perform…
rshkv Feb 23, 2021
edd1779
[SPARK-33089][CORE] Enhance ExecutorPlugin API to include callbacks o…
fsamuel-bs Oct 16, 2020
2996136
Allow custom external catalogs (#127)
rshkv Feb 26, 2021
89c98b3
Custom CatalogFileIndex (#364)
rshkv Feb 26, 2021
0c99eb0
Implement a Docker image generator gradle plugin for Spark applications
jdcasale Feb 16, 2021
f830c35
Palantir SafeLogging
jdcasale Feb 16, 2021
36d6e37
K8s local file mounting
rshkv Feb 23, 2021
1209fb7
K8s local deploy mode
rshkv Feb 23, 2021
bfa8d60
Infer Pandas string columns in Arrow conversion on Python 2
rshkv Feb 25, 2021
16cb2a4
Palantir Conda Runner & R-Test infrastructure
rshkv Feb 25, 2021
1879255
[SPARK-33984][PYTHON] Upgrade to Py4J 0.10.9.1
HyukjinKwon Jan 4, 2021
02e67e0
[SPARK-21195][CORE] MetricSystem should pick up dynamically registere…
rshkv Feb 19, 2021
efe361f
Update contributing-to-spark.md
jdcasale Mar 4, 2021
f395b15
Default spark.sql.parquet.outputTimestampType to TIMESTAMP_MICROS
rshkv Mar 8, 2021
6d71135
[SPARK-33504][CORE][3.0] The application log in the Spark history ser…
viirya Feb 24, 2021
e46dbe7
[SPARK-34232][CORE][3.0] Redact SparkListenerEnvironmentUpdate event …
viirya Feb 24, 2021
a2bd672
[SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch…
Mar 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-33504][CORE][3.0] The application log in the Spark history ser…
…ver contains sensitive attributes should be redacted

### What changes were proposed in this pull request?

To make sure the sensitive attributes to be redacted in the history server log. This is the backport of original PR apache#30446.

### Why are the changes needed?

We found the secure attributes like password  in SparkListenerJobStart and SparkListenerStageSubmitted events would not been redated, resulting in sensitive attributes can be viewd directly.

The screenshot can be viewed in the attachment of JIRA Spark-33504

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test.

Closes apache#31631 from viirya/SPARK-33504-3.0.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
  • Loading branch information
viirya authored and rshkv committed Mar 9, 2021
commit 6d71135dea0029a649a3fc3da2ab270bb7d47dac
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.scheduler

import java.net.URI
import java.util.Properties

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -103,7 +105,7 @@ private[spark] class EventLoggingListener(

// Events that do not trigger a flush
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
logEvent(event)
logEvent(event.copy(properties = redactProperties(event.properties)))
if (shouldLogStageExecutorMetrics) {
// record the peak metrics for the new stage
liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()),
Expand Down Expand Up @@ -156,7 +158,9 @@ private[spark] class EventLoggingListener(
logEvent(event, flushLogger = true)
}

override def onJobStart(event: SparkListenerJobStart): Unit = logEvent(event, flushLogger = true)
override def onJobStart(event: SparkListenerJobStart): Unit = {
logEvent(event.copy(properties = redactProperties(event.properties)), flushLogger = true)
}

override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, flushLogger = true)

Expand Down Expand Up @@ -246,6 +250,22 @@ private[spark] class EventLoggingListener(
logWriter.stop()
}

private def redactProperties(properties: Properties): Properties = {
if (properties == null) {
return properties
}
val redactedProperties = new Properties
// properties may contain some custom local properties such as stage/job description
// only properties in sparkConf need to be redacted.
val (globalProperties, localProperties) = properties.asScala.toSeq.partition {
case (key, _) => sparkConf.contains(key)
}
(Utils.redact(sparkConf, globalProperties) ++ localProperties).foreach {
case (key, value) => redactedProperties.setProperty(key, value)
}
redactedProperties
}

private[spark] def redactEvent(
event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = {
// environmentDetails maps a string descriptor to a set of properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler

import java.io.{File, InputStream}
import java.util.Arrays
import java.util.{Arrays, Properties}

import scala.collection.immutable.Map
import scala.collection.mutable
Expand Down Expand Up @@ -96,6 +96,67 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
assert(redactedProps(key) == "*********(redacted)")
}

test("Spark-33504 sensitive attributes redaction in properties") {
val (secretKey, secretPassword) = ("spark.executorEnv.HADOOP_CREDSTORE_PASSWORD",
"secret_password")
val (customKey, customValue) = ("parse_token", "secret_password")

val conf = getLoggingConf(testDirPath, None).set(secretKey, secretPassword)

val properties = new Properties()
properties.setProperty(secretKey, secretPassword)
properties.setProperty(customKey, customValue)

val logName = "properties-reaction-test"
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus(conf)

val stageId = 1
val jobId = 1
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0,
Seq.empty, Seq.empty, "details")

val events = Array(SparkListenerStageSubmitted(stageInfo, properties),
SparkListenerJobStart(jobId, 0, Seq(stageInfo), properties))

eventLogger.start()
listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem]))
listenerBus.addToEventLogQueue(eventLogger)
events.foreach(event => listenerBus.post(event))
listenerBus.stop()
eventLogger.stop()

val logData = EventLogFileReader.openEventLog(new Path(eventLogger.logWriter.logPath),
fileSystem)
try {
val lines = readLines(logData)
val logStart = SparkListenerLogStart(SPARK_VERSION)
assert(lines.size === 3)
assert(lines(0).contains("SparkListenerLogStart"))
assert(lines(1).contains("SparkListenerStageSubmitted"))
assert(lines(2).contains("SparkListenerJobStart"))

lines.foreach{
line => JsonProtocol.sparkEventFromJson(parse(line)) match {
case logStartEvent: SparkListenerLogStart =>
assert(logStartEvent == logStart)

case stageSubmittedEvent: SparkListenerStageSubmitted =>
assert(stageSubmittedEvent.properties.getProperty(secretKey) == "*********(redacted)")
assert(stageSubmittedEvent.properties.getProperty(customKey) == customValue)

case jobStartEvent : SparkListenerJobStart =>
assert(jobStartEvent.properties.getProperty(secretKey) == "*********(redacted)")
assert(jobStartEvent.properties.getProperty(customKey) == customValue)

case _ => assert(false)
}
}
} finally {
logData.close()
}
}

test("Executor metrics update") {
testStageExecutorMetricsEventLogging()
}
Expand Down