Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ logs/
/spark/
docker-dist
docker-build
statsd-reporter
mesos-spark-integration-tests/
mesos-spark-integration-tests-assembly-*.jar
dcos-spark-scala-tests-assembly-*.jar
Expand Down
12 changes: 10 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
ROOT_DIR := $(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))))
BUILD_DIR := $(ROOT_DIR)/build
DIST_DIR := $(BUILD_DIR)/dist
STATSD_DIR := $(ROOT_DIR)/spark-statsd-reporter
GIT_COMMIT := $(shell git rev-parse HEAD)

SPARK_DEV_DOCKER_IMAGE ?= mesosphere/spark-dev
Expand Down Expand Up @@ -43,6 +44,12 @@ prod-dist: $(SPARK_DIR)
mv $${filename} $(DIST_DIR)
echo "Built: $(DIST_DIR)/$${filename}"

statsd-reporter:
pushd $(STATSD_DIR)
docker run -v $(STATSD_DIR):/spark-statsd-reporter -w /spark-statsd-reporter maven:3.6-jdk-8-alpine mvn clean package
popd
echo "$(STATSD_DIR)/target/spark-statsd-reporter.jar" > $@

# If build/dist/ doesn't exist, creates it and downloads the spark build in manifest.json.
# To instead use a locally built version of spark, you must run "make prod-dist".
SPARK_DIST_URI ?= $(shell jq ".default_spark_dist.uri" "$(ROOT_DIR)/manifest.json")
Expand All @@ -60,7 +67,7 @@ docker-login:
docker login --username="$(DOCKER_USERNAME)" --password="$(DOCKER_PASSWORD)"

DOCKER_DIST_IMAGE ?= $(SPARK_DEV_DOCKER_IMAGE):$(GIT_COMMIT)
docker-dist: $(DIST_DIR)
docker-dist: $(DIST_DIR) statsd-reporter
SPARK_BUILDS=`ls $(DIST_DIR)/spark-*.tgz || exit 0`
if [ `echo "$${SPARK_BUILDS}" | wc -w` == 1 ]; then
echo "Using spark: $${SPARK_BUILDS}"
Expand All @@ -76,6 +83,7 @@ docker-dist: $(DIST_DIR)
cp -r $(DIST_DIR)/spark-*/. $(BUILD_DIR)/docker/dist
cp -r conf/* $(BUILD_DIR)/docker/dist/conf
cp -r docker/* $(BUILD_DIR)/docker
cp `cat statsd-reporter` $(BUILD_DIR)/docker

pushd $(BUILD_DIR)/docker
docker build -t $(DOCKER_DIST_IMAGE) .
Expand Down Expand Up @@ -178,7 +186,7 @@ clean-cluster:
dcos-launch delete || echo "Error deleting cluster"

clean: clean-dist
for f in "$(DCOS_SPARK_TEST_JAR_PATH)" "$(UNIVERSE_URL_PATH)" "docker-build"; do
for f in "$(DCOS_SPARK_TEST_JAR_PATH)" "$(UNIVERSE_URL_PATH)" "docker-build" "statsd-reporter"; do
[ ! -e $$f ] || rm $$f
done

Expand Down
4 changes: 2 additions & 2 deletions conf/metrics.properties.template
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

# Enable StatsdSink for all instances by class name
*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink
*.sink.statsd.class=org.apache.spark.metrics.sink.statsd.StatsdSink
*.sink.statsd.prefix=spark
*.sink.statsd.tags=app_type=spark
*.sink.statsd.host=<STATSD_UDP_HOST>
*.sink.statsd.port=<STATSD_UDP_PORT>

Expand Down
3 changes: 2 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ RUN ln -s /bin/bash /usr/bin/bash \
ENV SPARK_HOME="/opt/spark"
ADD dist ${SPARK_HOME}
ADD krb5.conf.mustache /etc/
ADD spark-statsd-reporter.jar ${SPARK_HOME}/jars/mesosphere-statsd-reporter.jar

# required to run as nobody
RUN addgroup --gid 99 nobody \
Expand All @@ -95,4 +96,4 @@ RUN addgroup --gid 99 nobody \
&& chmod -R ugo+rw /var/log/ \
&& chmod -R ugo+rw ${SPARK_HOME}

WORKDIR ${SPARK_HOME}
WORKDIR ${SPARK_HOME}
3 changes: 3 additions & 0 deletions spark-statsd-reporter/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.idea
target
*.iml
94 changes: 94 additions & 0 deletions spark-statsd-reporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
Spark StatsD Reporter
---

# Overview
Spark StatsD Reporter provides a custom implementation of Spark sink which supports metric tagging, filtering, and
name formatting. Provided `MetricFormatter` normalizes metric names to a consistent form by removing variable parts and
placing them into tags.

# Motivation
Spark assigns metric **names** using `spark.app.id` and `spark.executor.id` as a part of them. Thus the number of metrics
is continuously growing because those IDs are unique between executions whereas the metrics themselves report the same thing.
It makes problematic the use of changing metric names in dashboards.

For example, `jvm_heap_used` reported by all Spark instances (components):
- `jvm_heap_used` (Dispatcher)
- `<spark.app.id>_driver_jvm_heap_used` (driver)
- `<spark.app.id>_<spark.executor.id>_jvm_heap_used` (executor)

# Tag enrichment
All the metrics reported by Driver or Executor instances are enriched with additional tags:

- `<prefix>_app_name` for Spark application name
- `<prefix>_instance` which is either `driver` or `executor`
- `<prefix>_instance_id` which is Mesos Framework ID for Driver and Mesos Task ID for Executor
- `<prefix>_namespace` contains the value of `spark.metrics.namespace` configuration property

# Metric name formatting
## Driver
### Default naming
Spark Driver metrics naming has the following format:
```
<spark.app.id>_driver_<source>_<metric>
```

If `spark.metrics.namespace` is provided, it replaces `spark.app.id`:
```
<spark.metrics.namespace>_driver_<source>_<metric>
```

`spark.app.id` is assigned to Mesos Framework ID and cannot be overwritten via configuration.

Examples:
```
<prefix>_a4d898f4_e1cf_4019_9950_c739bf9a3730_0003_driver_20190502210339_0002_driver_jvm_heap_used

# with --conf spark.metrics.namespace=namespace:
<prefix>_namespace_driver_jvm_heap_used
```

### Formatted metrics
Examples:
```
# without spark.metrics.namespace (namespace tag is set to 'default'):
before: <prefix>_a4d898f4_e1cf_4019_9950_c739bf9a3730_0003_driver_20190502210339_0002_driver_jvm_heap_used
after: <prefix>_driver_jvm_heap_used,<prefix>_namespace=default,<prefix>_app_name={value of spark.app.name},<prefix>_instance_type=driver,<prefix>_instance_id=a4d898f4_e1cf_4019_9950_c739bf9a3730_0003_driver_20190502210339_0002

# with --conf spark.metrics.namespace=namespace (namespace tag is set to the value of spark.metrics.namespace):
before: <prefix>_namespace_driver_jvm_heap_used
after: <prefix>_driver_jvm_heap_used,<prefix>_namespace=namespace,<prefix>_app_name={value of spark.app.name},<prefix>_instance_type=driver,<prefix>_instance_id=a4d898f4_e1cf_4019_9950_c739bf9a3730_0003_driver_20190502210339_0002
```

## Executor
### Default naming
Spark Executor metrics naming has the following format:
```
<spark.app.id>_<spark.executor.id>_<source>_<metric>
```

If `spark.metrics.namespace` is provided, it replaces `spark.app.id`:
```
<spark.metrics.namespace>_<spark.executor.id>_<source>_<metric>
```

`spark.app.id` is assigned to Mesos Framework ID and cannot be overwritten via configuration.

Examples:
```
<prefix>_a4d898f4_e1cf_4019_9950_c739bf9a3730_0003_driver_20190502210339_0002_aa6ee344_1314_46ea_b346_dcf6a5cfeceb_0_jvm_heap_used

# with --conf spark.metrics.namespace=namespace:
<prefix>_namespace_aa6ee344_1314_46ea_b346_dcf6a5cfeceb_0_jvm_heap_used
```

### Formatted metrics
Examples:
```
# without spark.metrics.namespace (namespace tag is set to 'default'):
before: <prefix>_a4d898f4_e1cf_4019_9950_c739bf9a3730_0003_driver_20190502210339_0002_aa6ee344_1314_46ea_b346_dcf6a5cfeceb_0_jvm_heap_used
after: <prefix>_executor_jvm_heap_used,<prefix>_namespace=default,<prefix>_app_name={value of spark.app.name},<prefix>_instance_type=executor,<prefix>_instance_id=aa6ee344_1314_46ea_b346_dcf6a5cfeceb_0

# with --conf spark.metrics.namespace=namespace (namespace tag is set to the value of spark.metrics.namespace):
before: <prefix>_namespace_aa6ee344_1314_46ea_b346_dcf6a5cfeceb_0_jvm_heap_used
after: <prefix>_executor_jvm_heap_used,<prefix>_namespace=namespace,<prefix>_app_name={value of spark.app.name},<prefix>_instance_type=executor,<prefix>_instance_id=aa6ee344_1314_46ea_b346_dcf6a5cfeceb_0
```
37 changes: 37 additions & 0 deletions spark-statsd-reporter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mesosphere</groupId>
<artifactId>spark-statsd-reporter</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-statsd-reporter</name>
<url>http://maven.apache.org</url>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<dropwizard.metrics.version>3.1.5</dropwizard.metrics.version>
<spark.version>2.4.0</spark.version>
<spark.scala.version>2.11</spark.scala.version>
</properties>

<build>
<finalName>${project.artifactId}</finalName>
</build>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${spark.scala.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.apache.spark.metrics.sink.statsd;

final class Configuration {
final static class Keys {
final static String HOST = "host";
final static String PORT = "port";
final static String PREFIX = "prefix";
final static String TAGS = "tags";
final static String POLL_INTERVAL = "poll.interval";
final static String POLL_UNIT = "poll.unit";
final static String RATE_UNIT = "rate.unit";
final static String DURATION_UNIT = "duration.unit";
}

final static class Defaults {
final static String HOST = "127.0.0.1";
final static String PORT = "8125";
final static String TAGS = "";
final static String POLL_INTERVAL = "10";
final static String POLL_UNIT = "SECONDS";
final static String RATE_UNIT = "SECONDS";
final static String DURATION_UNIT = "MILLISECONDS";
final static String PREFIX = "";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.apache.spark.metrics.sink.statsd;

class InstanceDetails {
private final String applicationId;
private final String applicationName;
private final InstanceType instanceType;
private final String instanceId;
private final String namespace;

InstanceDetails(String applicationId, String applicationName, InstanceType instanceType, String instanceId, String namespace) {
this.applicationId = applicationId;
this.applicationName = applicationName;
this.instanceType = instanceType;
this.instanceId = instanceId;
this.namespace = namespace;
}

String getApplicationId() {
return applicationId;
}

String getApplicationName() {
return applicationName;
}

InstanceType getInstanceType() {
return instanceType;
}

String getInstanceId() {
return instanceId;
}

String getNamespace() {
return namespace;
}

@Override
public String toString() {
return "InstanceDetails{" +
"applicationId='" + applicationId + '\'' +
", applicationName='" + applicationName + '\'' +
", instanceType=" + instanceType +
", instanceId='" + instanceId + '\'' +
", namespace='" + namespace + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.apache.spark.metrics.sink.statsd;

import org.apache.http.annotation.NotThreadSafe;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

/**
* Class used to access SparkEnv instance and extract relevant tags from SparkConf
* which is shared across Drivers and Executors. SparkEnv initializes Metric Sinks
* in its constructor is not available in the Sink during initialization (for Executors).
*/
@NotThreadSafe
class InstanceDetailsProvider {
private final static Logger logger = LoggerFactory.getLogger(StatsdReporter.class);

private InstanceDetails instance = null;

Optional<InstanceDetails> getInstanceDetails() {
if(instance == null) {
if (SparkEnv.get() == null) {
logger.warn("SparkEnv is not initialized, instance details unavailable");
} else {
SparkConf sparkConf = SparkEnv.get().conf();
instance =
new InstanceDetails(
sparkConf.getAppId(),
sparkConf.get("spark.app.name"),
InstanceType.valueOf(SparkEnv.get().metricsSystem().instance().toUpperCase()),
sparkConf.get("spark.executor.id"),
sparkConf.get("spark.metrics.namespace", "default")
);
}
}
return Optional.ofNullable(instance);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.apache.spark.metrics.sink.statsd;

enum InstanceType {
DRIVER,
EXECUTOR
}
Loading