Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.util.List;

import com.codahale.metrics.MetricSet;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -117,6 +118,12 @@ public void fetchBlocks(
}
}

@Override
public MetricSet shuffleMetrics() {
checkInit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to the change here -- but should we also checkInit in the close() function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it should be, but looks like we never touch this issue before.

return clientFactory.getAllMetrics();
}

/**
* Registers this executor with an external shuffle server. This registration is required to
* inform the shuffle server about where and how we store our shuffle files.
Expand All @@ -140,6 +147,7 @@ public void registerWithShuffleServer(

@Override
public void close() {
checkInit();
clientFactory.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.spark.network.shuffle;

import java.io.Closeable;
import java.util.Collections;

import com.codahale.metrics.MetricSet;

/** Provides an interface for reading shuffle files, either from an Executor or external service. */
public abstract class ShuffleClient implements Closeable {
Expand Down Expand Up @@ -52,4 +55,13 @@ public abstract void fetchBlocks(
String[] blockIds,
BlockFetchingListener listener,
TempShuffleFileManager tempShuffleFileManager);

/**
* Get the shuffle MetricsSet from ShuffleClient, this will be used in MetricsSystem to
* get the Shuffle related metrics.
*/
public MetricSet shuffleMetrics() {
// Return an empty MetricSet by default.
return () -> Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana

private var server: TransportServer = _

private val shuffleServiceSource = new ExternalShuffleServiceSource(blockHandler)
private val shuffleServiceSource = new ExternalShuffleServiceSource

/** Create a new shuffle block handler. Factored out for subclasses to override. */
protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
Expand All @@ -83,6 +83,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
}
server = transportContext.createServer(port, bootstraps.asJava)

shuffleServiceSource.registerMetricSet(server.getAllMetrics)
shuffleServiceSource.registerMetricSet(blockHandler.getAllMetrics)
masterMetricsSystem.registerSource(shuffleServiceSource)
masterMetricsSystem.start()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ package org.apache.spark.deploy

import javax.annotation.concurrent.ThreadSafe

import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.{MetricRegistry, MetricSet}

import org.apache.spark.metrics.source.Source
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler

/**
* Provides metrics source for external shuffle service
*/
@ThreadSafe
private class ExternalShuffleServiceSource
(blockHandler: ExternalShuffleBlockHandler) extends Source {
private class ExternalShuffleServiceSource extends Source {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my own understanding, not directly related to this change --

I hadn't realized that the ExternalShuffleBlockHandler had its own ShuffleMetrics already. Some of those metrics really seem like they should be part of regular shuffle server, in the executor. Eg., openBlockRequestLatencyMillis. Do you know why its separate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, maybe it should a part of regular shuffle server.

override val metricRegistry = new MetricRegistry()
override val sourceName = "shuffleService"

metricRegistry.registerAll(blockHandler.getAllMetrics)
def registerMetricSet(metricSet: MetricSet): Unit = {
metricRegistry.registerAll(metricSet)
}
}
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ private[spark] class Executor(
private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]()

if (!isLocal) {
env.metricsSystem.registerSource(executorSource)
env.blockManager.initialize(conf.getAppId)
env.metricsSystem.registerSource(executorSource)
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
}

// Whether to load classes in user jars before those in Spark jars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.spark.network.netty

import java.nio.ByteBuffer
import java.util.{HashMap => JHashMap, Map => JMap}

import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
import scala.reflect.ClassTag

import com.codahale.metrics.{Metric, MetricSet}

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
Expand Down Expand Up @@ -83,6 +86,19 @@ private[spark] class NettyBlockTransferService(
Utils.startServiceOnPort(_port, startService, conf, getClass.getName)._1
}

override def shuffleMetrics(): MetricSet = {
require(server != null && clientFactory != null, "NettyBlockTransferServer is not initialized")

new MetricSet {
val allMetrics = new JHashMap[String, Metric]()
override def getMetrics: JMap[String, Metric] = {
allMetrics.putAll(clientFactory.getAllMetrics.getMetrics)
allMetrics.putAll(server.getAllMetrics.getMetrics)
allMetrics
}
}
}

override def fetchBlocks(
host: String,
port: Int,
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ import scala.reflect.ClassTag
import scala.util.Random
import scala.util.control.NonFatal

import com.codahale.metrics.{MetricRegistry, MetricSet}

import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.metrics.source.Source
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.netty.SparkTransportConf
Expand Down Expand Up @@ -248,6 +251,16 @@ private[spark] class BlockManager(
logInfo(s"Initialized BlockManager: $blockManagerId")
}

def shuffleMetricsSource: Source = {
import BlockManager._

if (externalShuffleServiceEnabled) {
new ShuffleMetricsSource("ExternalShuffle", shuffleClient.shuffleMetrics())
} else {
new ShuffleMetricsSource("NettyBlockTransfer", shuffleClient.shuffleMetrics())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we really need to distinguish these two cases? whether or not you have the external shuffle service, this memory is still owned by the executor JVM (its really only external on the remote end).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the external shuffle, we only have Transport client in the executor side, while for NettyBlockTransfer each executor will both server as transport client as well as server. So from my thought I explicitly distinguish those two cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I guess I haven't seen enough setups to know how users subscribe to these and whether they'd actually want that distinction. (especially since it seems like some of the distinction probably shouldn't be there, eg. the openBlockLatency metric.) But I don't think there is any clear right answer here, if you think this is the best naming, that is fine with me.

}
}

private def registerWithExternalShuffleServer() {
logInfo("Registering executor with local external shuffle service.")
val shuffleConfig = new ExecutorShuffleInfo(
Expand Down Expand Up @@ -1526,4 +1539,12 @@ private[spark] object BlockManager {
}
blockManagers.toMap
}

private class ShuffleMetricsSource(
override val sourceName: String,
metricSet: MetricSet) extends Source {

override val metricRegistry = new MetricRegistry
metricRegistry.registerAll(metricSet)
}
}