Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
82fd38d
[SPARK-5200] Disable web UI in Hive ThriftServer tests
JoshRosen Jan 12, 2015
ef9224e
[SPARK-5102][Core]subclass of MapStatus needs to be registered with Kryo
lianhuiwang Jan 12, 2015
13e610b
SPARK-4159 [BUILD] Addendum: improve running of single test after ena…
srowen Jan 12, 2015
a3978f3
[SPARK-5078] Optionally read from SPARK_LOCAL_HOSTNAME
marmbrus Jan 12, 2015
aff49a3
SPARK-5172 [BUILD] spark-examples-***.jar shades a wrong Hadoop distr…
srowen Jan 12, 2015
3aed305
[SPARK-4999][Streaming] Change storeInBlockManager to false by default
jerryshao Jan 12, 2015
5d9fa55
[SPARK-5049][SQL] Fix ordering of partition columns in ParquetTableScan
marmbrus Jan 12, 2015
1e42e96
[SPARK-5138][SQL] Ensure schema can be inferred from a namedtuple
mulby Jan 13, 2015
f7741a9
[SPARK-5006][Deploy]spark.port.maxRetries doesn't work
WangTaoTheTonic Jan 13, 2015
9dea64e
[SPARK-4697][YARN]System properties should override environment varia…
WangTaoTheTonic Jan 13, 2015
39e333e
[SPARK-5131][Streaming][DOC]: There is a discrepancy in WAL implement…
uncleGen Jan 13, 2015
8ead999
[SPARK-5223] [MLlib] [PySpark] fix MapConverter and ListConverter in …
Jan 13, 2015
6463e0b
[SPARK-4912][SQL] Persistent tables for the Spark SQL data sources api
yhuai Jan 13, 2015
14e3f11
[SPARK-5168] Make SQLConf a field rather than mixin in SQLContext
rxin Jan 13, 2015
f996909
[SPARK-5123][SQL] Reconcile Java/Scala API for data types.
rxin Jan 14, 2015
d5eeb35
[SPARK-5167][SQL] Move Row into sql package and make it usable for Java.
rxin Jan 14, 2015
a3f7421
[SPARK-5248] [SQL] move sql.types.decimal.Decimal to sql.types.Decimal
adrian-wang Jan 14, 2015
81f72a0
[SPARK-5211][SQL]Restore HiveMetastoreTypes.toDataType
yhuai Jan 14, 2015
38bdc99
[SQL] some comments fix for GROUPING SETS
adrian-wang Jan 14, 2015
5840f54
[SPARK-2909] [MLlib] [PySpark] SparseVector in pyspark now supports i…
MechCoder Jan 14, 2015
9d4449c
[SPARK-5228][WebUI] Hide tables for "Active Jobs/Completed Jobs/Faile…
sarutak Jan 14, 2015
259936b
[SPARK-4014] Add TaskContext.attemptNumber and deprecate TaskContext.…
JoshRosen Jan 14, 2015
2fd7f72
[SPARK-5235] Make SQLConf Serializable
alexbaretta Jan 14, 2015
76389c5
[SPARK-5234][ml]examples for ml don't have sparkContext.stop
Jan 14, 2015
13d2406
[SPARK-5254][MLLIB] Update the user guide to position spark.ml better
mengxr Jan 15, 2015
cfa397c
[SPARK-5193][SQL] Tighten up SQLContext API
rxin Jan 15, 2015
6abc45e
[SPARK-5254][MLLIB] remove developers section from spark.ml guide
mengxr Jan 15, 2015
4b325c7
[SPARK-5193][SQL] Tighten up HiveContext API
rxin Jan 15, 2015
3c8650c
[SPARK-5224] [PySpark] improve performance of parallelize list/ndarray
Jan 15, 2015
1881431
[SPARK-5274][SQL] Reconcile Java and Scala UDFRegistration.
rxin Jan 16, 2015
65858ba
[Minor] Fix tiny typo in BlockManager
sarutak Jan 16, 2015
96c2c71
[SPARK-4857] [CORE] Adds Executor membership events to SparkListener
Jan 16, 2015
a79a9f9
[SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
Jan 16, 2015
2be82b1
[SPARK-1507][YARN]specify # cores for ApplicationMaster
WangTaoTheTonic Jan 16, 2015
e200ac8
[SPARK-5201][CORE] deal with int overflow in the ParallelCollectionRD…
advancedxy Jan 16, 2015
f6b852a
[DOCS] Fix typo in return type of cogroup
srowen Jan 16, 2015
e8422c5
[SPARK-5231][WebUI] History Server shows wrong job submission time.
sarutak Jan 16, 2015
ecf943d
[WebUI] Fix collapse of WebUI layout
sarutak Jan 16, 2015
d05c9ee
[SPARK-4923][REPL] Add Developer API to REPL to allow re-publishing t…
Jan 16, 2015
fd3a8a1
[SPARK-733] Add documentation on use of accumulators in lazy transfor…
Jan 16, 2015
ee1c1f3
[SPARK-4937][SQL] Adding optimization to simplify the And, Or condit…
scwf Jan 16, 2015
61b427d
[SPARK-5193][SQL] Remove Spark SQL Java-specific API.
rxin Jan 17, 2015
f3bfc76
[SQL][minor] Improved Row documentation.
rxin Jan 17, 2015
c1f3c27
[SPARK-4937][SQL] Comment for the newly optimization rules in `Boolea…
scwf Jan 17, 2015
6999910
[SPARK-5096] Use sbt tasks instead of vals to get hadoop version
marmbrus Jan 18, 2015
e7884bc
[SQL][Minor] Added comments and examples to explain BooleanSimplifica…
rxin Jan 18, 2015
e12b5b6
MAINTENANCE: Automated closing of pull requests.
pwendell Jan 18, 2015
ad16da1
[HOTFIX]: Minor clean up regarding skipped artifacts in build files.
pwendell Jan 18, 2015
1727e08
[SPARK-5279][SQL] Use java.math.BigDecimal as the exposed Decimal type.
rxin Jan 18, 2015
1a200a3
[SQL][Minor] Update sql doc according to data type APIs changes
scwf Jan 18, 2015
1955645
[SQL][minor] Put DataTypes.java in java dir.
rxin Jan 19, 2015
7dbf1fd
[SQL] fix typo in class description
Jan 19, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
When calculating the input metrics there was an assumption that one task only reads from one block - this is not true for some operations including coalesce. This patch simply increments the task's input metrics if previous ones existed of the same read method.

A limitation to this patch is that if a task reads from two different blocks of different read methods, one will override the other.

Author: Kostas Sakellis <[email protected]>

Closes apache#3120 from ksakellis/kostas-spark-4092 and squashes the following commits:

54e6658 [Kostas Sakellis] Drops metrics if conflicting read methods exist
f0e0cc5 [Kostas Sakellis] Add bytesReadCallback to InputMetrics
a2a36d4 [Kostas Sakellis] CR feedback
5a0c770 [Kostas Sakellis] [SPARK-4092] [CORE] Fix InputMetrics for coalesce'd Rdds
  • Loading branch information
Kostas Sakellis authored and pwendell committed Jan 16, 2015
commit a79a9f923c47f2ce7da93cf0ecfe2b66fcb9fdd4
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.addBytesRead(inputMetrics.bytesRead)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ private[spark] class Executor(
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
metrics.updateInputMetrics()
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
if (isLocal) {
// JobProgressListener will hold an reference of it during
Expand Down
75 changes: 73 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.spark.executor

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.executor.DataReadMethod
import org.apache.spark.executor.DataReadMethod.DataReadMethod

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -80,7 +85,17 @@ class TaskMetrics extends Serializable {
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
* are stored here.
*/
var inputMetrics: Option[InputMetrics] = None
private var _inputMetrics: Option[InputMetrics] = None

def inputMetrics = _inputMetrics

/**
* This should only be used when recreating TaskMetrics, not when updating input metrics in
* executors
*/
private[spark] def setInputMetrics(inputMetrics: Option[InputMetrics]) {
_inputMetrics = inputMetrics
}

/**
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
Expand Down Expand Up @@ -133,6 +148,30 @@ class TaskMetrics extends Serializable {
readMetrics
}

/**
* Returns the input metrics object that the task should use. Currently, if
* there exists an input metric with the same readMethod, we return that one
* so the caller can accumulate bytes read. If the readMethod is different
* than previously seen by this task, we return a new InputMetric but don't
* record it.
*
* Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
* we can store all the different inputMetrics (one per readMethod).
*/
private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod):
InputMetrics =synchronized {
_inputMetrics match {
case None =>
val metrics = new InputMetrics(readMethod)
_inputMetrics = Some(metrics)
metrics
case Some(metrics @ InputMetrics(method)) if method == readMethod =>
metrics
case Some(InputMetrics(method)) =>
new InputMetrics(readMethod)
}
}

/**
* Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics.
*/
Expand All @@ -146,6 +185,10 @@ class TaskMetrics extends Serializable {
}
_shuffleReadMetrics = Some(merged)
}

private[spark] def updateInputMetrics() = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}
}

private[spark] object TaskMetrics {
Expand Down Expand Up @@ -179,10 +222,38 @@ object DataWriteMethod extends Enumeration with Serializable {
*/
@DeveloperApi
case class InputMetrics(readMethod: DataReadMethod.Value) {

private val _bytesRead: AtomicLong = new AtomicLong()

/**
* Total bytes read.
*/
var bytesRead: Long = 0L
def bytesRead: Long = _bytesRead.get()
@volatile @transient var bytesReadCallback: Option[() => Long] = None

/**
* Adds additional bytes read for this read method.
*/
def addBytesRead(bytes: Long) = {
_bytesRead.addAndGet(bytes)
}

/**
* Invoke the bytesReadCallback and mutate bytesRead.
*/
def updateBytesRead() {
bytesReadCallback.foreach { c =>
_bytesRead.set(c())
}
}

/**
* Register a function that can be called to get up-to-date information on how many bytes the task
* has read from an input source.
*/
def setBytesReadCallback(f: Option[() => Long]) {
bytesReadCallback = f
}
}

/**
Expand Down
39 changes: 13 additions & 26 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,19 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()

val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.inputSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.inputSplit.value.asInstanceOf[FileSplit].getPath, jobConf)
} else {
None
}
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
split.inputSplit.value match {
case split: FileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, jobConf)
case _ => None
}
)
inputMetrics.setBytesReadCallback(bytesReadCallback)

var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
Expand All @@ -237,40 +238,26 @@ class HadoopRDD[K, V](
val key: K = reader.createKey()
val value: V = reader.createValue()

var recordsSinceMetricsUpdate = 0

override def getNext() = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}

// Update bytes read metric every few records
if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
&& bytesReadCallback.isDefined) {
recordsSinceMetricsUpdate = 0
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
} else {
recordsSinceMetricsUpdate += 1
}
(key, value)
}

override def close() {
try {
reader.close()
if (bytesReadCallback.isDefined) {
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead = split.inputSplit.value.getLength
context.taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.addBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
40 changes: 13 additions & 27 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,19 @@ class NewHadoopRDD[K, V](
logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value

val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(
split.serializableHadoopSplit.value.asInstanceOf[FileSplit].getPath, conf)
} else {
None
}
if (bytesReadCallback.isDefined) {
context.taskMetrics.inputMetrics = Some(inputMetrics)
}
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse(
split.serializableHadoopSplit.value match {
case split: FileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback(split.getPath, conf)
case _ => None
}
)
inputMetrics.setBytesReadCallback(bytesReadCallback)

val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
Expand Down Expand Up @@ -153,34 +154,19 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false

// Update bytes read metric every few records
if (recordsSinceMetricsUpdate == HadoopRDD.RECORDS_BETWEEN_BYTES_READ_METRIC_UPDATES
&& bytesReadCallback.isDefined) {
recordsSinceMetricsUpdate = 0
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
} else {
recordsSinceMetricsUpdate += 1
}

(reader.getCurrentKey, reader.getCurrentValue)
}

private def close() {
try {
reader.close()

// Update metrics with final amount
if (bytesReadCallback.isDefined) {
val bytesReadFn = bytesReadCallback.get
inputMetrics.bytesRead = bytesReadFn()
inputMetrics.updateBytesRead()
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength
context.taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[spark] class BlockResult(
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.bytesRead = bytes
inputMetrics.addBytesRead(bytes)
}

/**
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,8 @@ private[spark] object JsonProtocol {
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson))
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
metrics.inputMetrics =
Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
metrics.setInputMetrics(
Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson))
metrics.outputMetrics =
Utils.jsonOption(json \ "Output Metrics").map(outputMetricsFromJson)
metrics.updatedBlocks =
Expand Down Expand Up @@ -671,7 +671,7 @@ private[spark] object JsonProtocol {
def inputMetricsFromJson(json: JValue): InputMetrics = {
val metrics = new InputMetrics(
DataReadMethod.withName((json \ "Data Read Method").extract[String]))
metrics.bytesRead = (json \ "Bytes Read").extract[Long]
metrics.addBytesRead((json \ "Bytes Read").extract[Long])
metrics
}

Expand Down
Loading