Skip to content

Commit c6ad275

Browse files
authored
Merge branch 'master' into to-file-max-fields
2 parents 9acdf9d + 0523f5e commit c6ad275

File tree

47 files changed

+794
-123
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+794
-123
lines changed

common/network-common/src/main/java/org/apache/spark/network/TransportContext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.ArrayList;
2121
import java.util.List;
2222

23+
import com.codahale.metrics.Counter;
2324
import io.netty.channel.Channel;
2425
import io.netty.channel.ChannelPipeline;
2526
import io.netty.channel.EventLoopGroup;
@@ -66,6 +67,8 @@ public class TransportContext {
6667
private final RpcHandler rpcHandler;
6768
private final boolean closeIdleConnections;
6869
private final boolean isClientOnly;
70+
// Number of registered connections to the shuffle service
71+
private Counter registeredConnections = new Counter();
6972

7073
/**
7174
* Force to create MessageEncoder and MessageDecoder so that we can make sure they will be created
@@ -221,7 +224,7 @@ private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler
221224
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
222225
rpcHandler, conf.maxChunksBeingTransferred());
223226
return new TransportChannelHandler(client, responseHandler, requestHandler,
224-
conf.connectionTimeoutMs(), closeIdleConnections);
227+
conf.connectionTimeoutMs(), closeIdleConnections, this);
225228
}
226229

227230
/**
@@ -234,4 +237,8 @@ private ChunkFetchRequestHandler createChunkFetchHandler(TransportChannelHandler
234237
}
235238

236239
public TransportConf getConf() { return conf; }
240+
241+
public Counter getRegisteredConnections() {
242+
return registeredConnections;
243+
}
237244
}

common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.channel.SimpleChannelInboundHandler;
2222
import io.netty.handler.timeout.IdleState;
2323
import io.netty.handler.timeout.IdleStateEvent;
24+
import org.apache.spark.network.TransportContext;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627

@@ -57,18 +58,21 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message
5758
private final TransportRequestHandler requestHandler;
5859
private final long requestTimeoutNs;
5960
private final boolean closeIdleConnections;
61+
private final TransportContext transportContext;
6062

6163
public TransportChannelHandler(
6264
TransportClient client,
6365
TransportResponseHandler responseHandler,
6466
TransportRequestHandler requestHandler,
6567
long requestTimeoutMs,
66-
boolean closeIdleConnections) {
68+
boolean closeIdleConnections,
69+
TransportContext transportContext) {
6770
this.client = client;
6871
this.responseHandler = responseHandler;
6972
this.requestHandler = requestHandler;
7073
this.requestTimeoutNs = requestTimeoutMs * 1000L * 1000;
7174
this.closeIdleConnections = closeIdleConnections;
75+
this.transportContext = transportContext;
7276
}
7377

7478
public TransportClient getClient() {
@@ -176,4 +180,16 @@ public TransportResponseHandler getResponseHandler() {
176180
return responseHandler;
177181
}
178182

183+
@Override
184+
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
185+
transportContext.getRegisteredConnections().inc();
186+
super.channelRegistered(ctx);
187+
}
188+
189+
@Override
190+
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
191+
transportContext.getRegisteredConnections().dec();
192+
super.channelUnregistered(ctx);
193+
}
194+
179195
}

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.TimeUnit;
2424

25+
import com.codahale.metrics.Counter;
2526
import com.codahale.metrics.MetricSet;
2627
import com.google.common.base.Preconditions;
2728
import com.google.common.collect.Lists;
@@ -159,4 +160,8 @@ public void close() {
159160
}
160161
bootstrap = null;
161162
}
163+
164+
public Counter getRegisteredConnections() {
165+
return context.getRegisteredConnections();
166+
}
162167
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.codahale.metrics.Metric;
3030
import com.codahale.metrics.MetricSet;
3131
import com.codahale.metrics.Timer;
32+
import com.codahale.metrics.Counter;
3233
import com.google.common.annotations.VisibleForTesting;
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
@@ -173,22 +174,29 @@ private void checkAuth(TransportClient client, String appId) {
173174
/**
174175
* A simple class to wrap all shuffle service wrapper metrics
175176
*/
176-
private class ShuffleMetrics implements MetricSet {
177+
@VisibleForTesting
178+
public class ShuffleMetrics implements MetricSet {
177179
private final Map<String, Metric> allMetrics;
178180
// Time latency for open block request in ms
179181
private final Timer openBlockRequestLatencyMillis = new Timer();
180182
// Time latency for executor registration latency in ms
181183
private final Timer registerExecutorRequestLatencyMillis = new Timer();
182184
// Block transfer rate in byte per second
183185
private final Meter blockTransferRateBytes = new Meter();
186+
// Number of active connections to the shuffle service
187+
private Counter activeConnections = new Counter();
188+
// Number of registered connections to the shuffle service
189+
private Counter registeredConnections = new Counter();
184190

185-
private ShuffleMetrics() {
191+
public ShuffleMetrics() {
186192
allMetrics = new HashMap<>();
187193
allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
188194
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
189195
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
190196
allMetrics.put("registeredExecutorsSize",
191197
(Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
198+
allMetrics.put("numActiveConnections", activeConnections);
199+
allMetrics.put("numRegisteredConnections", registeredConnections);
192200
}
193201

194202
@Override
@@ -244,4 +252,16 @@ public ManagedBuffer next() {
244252
}
245253
}
246254

255+
@Override
256+
public void channelActive(TransportClient client) {
257+
metrics.activeConnections.inc();
258+
super.channelActive(client);
259+
}
260+
261+
@Override
262+
public void channelInactive(TransportClient client) {
263+
metrics.activeConnections.dec();
264+
super.channelInactive(client);
265+
}
266+
247267
}

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -170,15 +170,6 @@ protected void serviceInit(Configuration conf) throws Exception {
170170
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
171171
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
172172

173-
// register metrics on the block handler into the Node Manager's metrics system.
174-
YarnShuffleServiceMetrics serviceMetrics =
175-
new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
176-
177-
MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
178-
metricsSystem.register(
179-
"sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
180-
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
181-
182173
// If authentication is enabled, set up the shuffle server to use a
183174
// special RPC handler that filters out unauthenticated fetch requests
184175
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
@@ -199,6 +190,18 @@ protected void serviceInit(Configuration conf) throws Exception {
199190
port = shuffleServer.getPort();
200191
boundPort = port;
201192
String authEnabledString = authEnabled ? "enabled" : "not enabled";
193+
194+
// register metrics on the block handler into the Node Manager's metrics system.
195+
blockHandler.getAllMetrics().getMetrics().put("numRegisteredConnections",
196+
shuffleServer.getRegisteredConnections());
197+
YarnShuffleServiceMetrics serviceMetrics =
198+
new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
199+
200+
MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
201+
metricsSystem.register(
202+
"sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
203+
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
204+
202205
logger.info("Started YARN shuffle service for Spark on port {}. " +
203206
"Authentication is {}. Registered executor file is {}", port, authEnabledString,
204207
registeredExecutorFile);

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ public static void collectMetric(
107107
throw new IllegalStateException(
108108
"Not supported class type of metric[" + name + "] for value " + gaugeValue);
109109
}
110+
} else if (metric instanceof Counter) {
111+
Counter c = (Counter) metric;
112+
long counterValue = c.getCount();
113+
metricsRecordBuilder.addGauge(new ShuffleServiceMetricsInfo(name, "Number of " +
114+
"connections to shuffle service " + name), counterValue);
110115
}
111116
}
112117

core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
8484
server = transportContext.createServer(port, bootstraps.asJava)
8585

8686
shuffleServiceSource.registerMetricSet(server.getAllMetrics)
87+
blockHandler.getAllMetrics.getMetrics.put("numRegisteredConnections",
88+
server.getRegisteredConnections)
8789
shuffleServiceSource.registerMetricSet(blockHandler.getAllMetrics)
8890
masterMetricsSystem.registerSource(shuffleServiceSource)
8991
masterMetricsSystem.start()
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.metrics.source
19+
20+
import com.codahale.metrics.{Gauge, MetricRegistry}
21+
22+
import org.apache.spark.SparkContext
23+
import org.apache.spark.annotation.Experimental
24+
import org.apache.spark.util.{AccumulatorV2, DoubleAccumulator, LongAccumulator}
25+
26+
/**
27+
* AccumulatorSource is a Spark metric Source that reports the current value
28+
* of the accumulator as a gauge.
29+
*
30+
* It is restricted to the LongAccumulator and the DoubleAccumulator, as those
31+
* are the current built-in numerical accumulators with Spark, and excludes
32+
* the CollectionAccumulator, as that is a List of values (hard to report,
33+
* to a metrics system)
34+
*/
35+
private[spark] class AccumulatorSource extends Source {
36+
private val registry = new MetricRegistry
37+
protected def register[T](accumulators: Map[String, AccumulatorV2[_, T]]): Unit = {
38+
accumulators.foreach {
39+
case (name, accumulator) =>
40+
val gauge = new Gauge[T] {
41+
override def getValue: T = accumulator.value
42+
}
43+
registry.register(MetricRegistry.name(name), gauge)
44+
}
45+
}
46+
47+
override def sourceName: String = "AccumulatorSource"
48+
override def metricRegistry: MetricRegistry = registry
49+
}
50+
51+
@Experimental
52+
class LongAccumulatorSource extends AccumulatorSource
53+
54+
@Experimental
55+
class DoubleAccumulatorSource extends AccumulatorSource
56+
57+
/**
58+
* :: Experimental ::
59+
* Metrics source specifically for LongAccumulators. Accumulators
60+
* are only valid on the driver side, so these metrics are reported
61+
* only by the driver.
62+
* Register LongAccumulators using:
63+
* LongAccumulatorSource.register(sc, {"name" -> longAccumulator})
64+
*/
65+
@Experimental
66+
object LongAccumulatorSource {
67+
def register(sc: SparkContext, accumulators: Map[String, LongAccumulator]): Unit = {
68+
val source = new LongAccumulatorSource
69+
source.register(accumulators)
70+
sc.env.metricsSystem.registerSource(source)
71+
}
72+
}
73+
74+
/**
75+
* :: Experimental ::
76+
* Metrics source specifically for DoubleAccumulators. Accumulators
77+
* are only valid on the driver side, so these metrics are reported
78+
* only by the driver.
79+
* Register DoubleAccumulators using:
80+
* DoubleAccumulatorSource.register(sc, {"name" -> doubleAccumulator})
81+
*/
82+
@Experimental
83+
object DoubleAccumulatorSource {
84+
def register(sc: SparkContext, accumulators: Map[String, DoubleAccumulator]): Unit = {
85+
val source = new DoubleAccumulatorSource
86+
source.register(accumulators)
87+
sc.env.metricsSystem.registerSource(source)
88+
}
89+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.metrics.source
19+
20+
import com.codahale.metrics.MetricRegistry
21+
import org.mockito.ArgumentCaptor
22+
import org.mockito.Mockito.{mock, never, spy, times, verify, when}
23+
24+
import org.apache.spark.{SparkContext, SparkEnv, SparkFunSuite}
25+
import org.apache.spark.metrics.MetricsSystem
26+
import org.apache.spark.util.{DoubleAccumulator, LongAccumulator}
27+
28+
class AccumulatorSourceSuite extends SparkFunSuite {
29+
test("that that accumulators register against the metric system's register") {
30+
val acc1 = new LongAccumulator()
31+
val acc2 = new LongAccumulator()
32+
val mockContext = mock(classOf[SparkContext])
33+
val mockEnvironment = mock(classOf[SparkEnv])
34+
val mockMetricSystem = mock(classOf[MetricsSystem])
35+
when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
36+
when(mockContext.env) thenReturn (mockEnvironment)
37+
val accs = Map("my-accumulator-1" -> acc1,
38+
"my-accumulator-2" -> acc2)
39+
LongAccumulatorSource.register(mockContext, accs)
40+
val captor = new ArgumentCaptor[AccumulatorSource]()
41+
verify(mockMetricSystem, times(1)).registerSource(captor.capture())
42+
val source = captor.getValue()
43+
val gauges = source.metricRegistry.getGauges()
44+
assert (gauges.size == 2)
45+
assert (gauges.firstKey == "my-accumulator-1")
46+
assert (gauges.lastKey == "my-accumulator-2")
47+
}
48+
49+
test("the accumulators value property is checked when the gauge's value is requested") {
50+
val acc1 = new LongAccumulator()
51+
acc1.add(123)
52+
val acc2 = new LongAccumulator()
53+
acc2.add(456)
54+
val mockContext = mock(classOf[SparkContext])
55+
val mockEnvironment = mock(classOf[SparkEnv])
56+
val mockMetricSystem = mock(classOf[MetricsSystem])
57+
when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
58+
when(mockContext.env) thenReturn (mockEnvironment)
59+
val accs = Map("my-accumulator-1" -> acc1,
60+
"my-accumulator-2" -> acc2)
61+
LongAccumulatorSource.register(mockContext, accs)
62+
val captor = new ArgumentCaptor[AccumulatorSource]()
63+
verify(mockMetricSystem, times(1)).registerSource(captor.capture())
64+
val source = captor.getValue()
65+
val gauges = source.metricRegistry.getGauges()
66+
assert(gauges.get("my-accumulator-1").getValue() == 123)
67+
assert(gauges.get("my-accumulator-2").getValue() == 456)
68+
}
69+
70+
test("the double accumulators value propety is checked when the gauge's value is requested") {
71+
val acc1 = new DoubleAccumulator()
72+
acc1.add(123.123)
73+
val acc2 = new DoubleAccumulator()
74+
acc2.add(456.456)
75+
val mockContext = mock(classOf[SparkContext])
76+
val mockEnvironment = mock(classOf[SparkEnv])
77+
val mockMetricSystem = mock(classOf[MetricsSystem])
78+
when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
79+
when(mockContext.env) thenReturn (mockEnvironment)
80+
val accs = Map(
81+
"my-accumulator-1" -> acc1,
82+
"my-accumulator-2" -> acc2)
83+
DoubleAccumulatorSource.register(mockContext, accs)
84+
val captor = new ArgumentCaptor[AccumulatorSource]()
85+
verify(mockMetricSystem, times(1)).registerSource(captor.capture())
86+
val source = captor.getValue()
87+
val gauges = source.metricRegistry.getGauges()
88+
assert(gauges.get("my-accumulator-1").getValue() == 123.123)
89+
assert(gauges.get("my-accumulator-2").getValue() == 456.456)
90+
}
91+
}

0 commit comments

Comments
 (0)