diff --git a/.github/labeler.yml b/.github/labeler.yml
new file mode 100644
index 00000000..1b2a9700
--- /dev/null
+++ b/.github/labeler.yml
@@ -0,0 +1,13 @@
+#
+# Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#
+
+# Add label to any changes within the folder or any subfolders
+xinfra-monitor:
+ - src/main/java/com/linkedin/xinfra/monitor/**/*
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 00000000..8b410884
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,27 @@
+name: CI
+
+on:
+ pull_request:
+ types: ['opened', 'synchronize']
+
+jobs:
+ ci:
+ name: ci
+ strategy:
+ matrix:
+ version: ['11.0.13']
+ dist: ['microsoft']
+
+ runs-on: ubuntu-latest
+ steps:
+ - name: checkout code
+ uses: actions/checkout@v3
+ with:
+ fetch-depth: 0
+ - name: set up JDK ${{matrix.version}} (${{matrix.dist}})
+ uses: actions/setup-java@v3
+ with:
+ java-version: ${{ matrix.version }}
+ distribution: ${{ matrix.dist }}
+ - name: test
+ run: ./gradlew --info test --no-daemon
diff --git a/.github/workflows/greetings.yml b/.github/workflows/greetings.yml
new file mode 100644
index 00000000..aa455d62
--- /dev/null
+++ b/.github/workflows/greetings.yml
@@ -0,0 +1,17 @@
+name: Greetings
+
+on:
+ pull_request:
+ types: ['opened']
+ issues:
+ types: ['opened']
+
+jobs:
+ greeting:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/first-interaction@v1
+ with:
+ repo-token: ${{ secrets.GITHUB_TOKEN }}
+ issue-message: 'This is your first issue in the repository. Thank you for raising this issue.'' first issue'
+ pr-message: 'This is your first pull request in the repository. Thank you for this patch. Please review the Wiki page in the repository before submitting a PR.'' first pr'
diff --git a/.github/workflows/label.yml b/.github/workflows/label.yml
new file mode 100644
index 00000000..96317667
--- /dev/null
+++ b/.github/workflows/label.yml
@@ -0,0 +1,25 @@
+# This workflow will triage pull requests and apply a label based on the
+# paths that are modified in the pull request.
+#
+# To use this workflow, you will need to set up a .github/labeler.yml
+# file with configuration. For more information, see:
+# https://github.com/actions/labeler/blob/master/README.md
+
+name: "Pull Request Labeler"
+on:
+ - pull_request
+
+jobs:
+ triage:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/labeler@v3-preview
+ with:
+ repo-token: "${{ secrets.GITHUB_TOKEN }}"
+
+
+
+
+
+
+
diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml
new file mode 100644
index 00000000..34045177
--- /dev/null
+++ b/.github/workflows/stale.yml
@@ -0,0 +1,19 @@
+name: Mark stale issues and pull requests
+
+on:
+ schedule:
+ - cron: "30 1 * * *"
+
+jobs:
+ stale:
+
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/stale@v1
+ with:
+ repo-token: ${{ secrets.GITHUB_TOKEN }}
+ stale-issue-message: 'Stale issue message'
+ stale-pr-message: 'Stale pull request message'
+ stale-issue-label: 'no-issue-activity'
+ stale-pr-label: 'no-pr-activity'
diff --git a/.github/workflows/tag.yml b/.github/workflows/tag.yml
new file mode 100644
index 00000000..e5356484
--- /dev/null
+++ b/.github/workflows/tag.yml
@@ -0,0 +1,35 @@
+name: tag (release) flow
+
+on:
+ create:
+ tags:
+ - '*'
+
+jobs:
+ gradle-java8:
+ name: Java 8 release
+ runs-on: ubuntu-latest
+ steps:
+ - name: checkout code
+ uses: actions/checkout@v3
+ with:
+ # bring in all history because the gradle versions plugin needs to "walk back" to the closest ancestor tag
+ # to figure out what version this is. optimizing this is left as a challenge to future committers
+ fetch-depth: 0
+ - name: Set up JDK
+ uses: actions/setup-java@v3
+ with:
+ java-version: 11
+ distribution: microsoft
+ - name: Build with Gradle
+ # add --info or --debug below for more details when trying to understand issues
+ run: ./gradlew clean build javadoc --stacktrace --warning-mode all --no-daemon
+ - name: Branch tag
+ id: branch_tag
+ run: echo "RELEASE_TAG=${GITHUB_REF#refs/tags/}" >> $GITHUB_OUTPUT
+ - name: Publish to Jfrog
+ env:
+ JFROG_USER: ${{ secrets.JFROG_USER }}
+ JFROG_KEY: ${{ secrets.JFROG_KEY }}
+ RELEASE_TAG: ${{ steps.branch_tag.outputs.RELEASE_TAG }}
+ run: ./scripts/publishToJfrog.sh
diff --git a/.gitignore b/.gitignore
index b8ae435e..5b4afd4a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,3 +2,15 @@
.DS_Store
build/
logs/
+.classpath
+.idea/
+.project
+.settings/
+src/test/java/com/linkedin/xinfra/monitor/RandomTests.java
+
+config/andrew-choi.properties
+config/andrew-multi-cluster-monitor.properties
+
+kafka-monitor.iml
+kafka-monitor.ipr
+kafka-monitor.iws
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index 0d5889e9..00000000
--- a/.travis.yml
+++ /dev/null
@@ -1,6 +0,0 @@
-language: java
-
-jdk:
- - oraclejdk7
- - openjdk7
- - oraclejdk8
diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md
new file mode 100644
index 00000000..118ac0fd
--- /dev/null
+++ b/CODE_OF_CONDUCT.md
@@ -0,0 +1,80 @@
+# Contributor Covenant Code of Conduct
+
+## Our Pledge
+
+In the interest of fostering an open and welcoming environment, we as
+contributors and maintainers pledge to making participation in our project and
+our community a harassment-free experience for everyone, regardless of age, body
+size, disability, ethnicity, sex characteristics, gender identity and expression,
+level of experience, education, socio-economic status, nationality, personal
+appearance, race, religion, or sexual identity and orientation.
+
+## Our Standards
+
+Examples of behavior that contributes to creating a positive environment
+include:
+
+* Using welcoming and inclusive language
+* Being respectful of differing viewpoints and experiences
+* Gracefully accepting constructive criticism
+* Focusing on what is best for the community
+* Showing empathy towards other community members
+
+Examples of unacceptable behavior by participants include:
+
+* The use of sexualized language or imagery and unwelcome sexual attention or
+ advances
+* Trolling, insulting/derogatory comments, and personal or political attacks
+* Public or private harassment
+* Publishing others' private information, such as a physical or electronic
+ address, without explicit permission
+* Other conduct which could reasonably be considered inappropriate in a
+ professional setting
+
+## Our Responsibilities
+
+Project maintainers are responsible for clarifying the standards of acceptable
+behavior and are expected to take appropriate and fair corrective action in
+response to any instances of unacceptable behavior.
+
+Project maintainers have the right and responsibility to remove, edit, or
+reject comments, commits, code, wiki edits, issues, and other contributions
+that are not aligned to this Code of Conduct, or to ban temporarily or
+permanently any contributor for other behaviors that they deem inappropriate,
+threatening, offensive, or harmful.
+
+## Scope
+
+This Code of Conduct applies both within project spaces and in public spaces
+when an individual is representing the project or its community. Examples of
+representing a project or community include using an official project e-mail
+address, posting via an official social media account, or acting as an appointed
+representative at an online or offline event. Representation of a project may be
+further defined and clarified by project maintainers.
+
+## Enforcement
+
+Instances of abusive, harassing, or otherwise unacceptable behavior may be
+reported by contacting the project team at andchoi@linkedin.com. All
+complaints will be reviewed and investigated and will result in a response that
+is deemed necessary and appropriate to the circumstances. The project team is
+obligated to maintain confidentiality with regard to the reporter of an incident.
+Further details of specific enforcement policies may be posted separately.
+
+Project maintainers who do not follow or enforce the Code of Conduct in good
+faith may face temporary or permanent repercussions as determined by other
+members of the project's leadership.
+
+## Attribution
+
+This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
+available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
+
+[homepage]: https://www.contributor-covenant.org
+
+## FAQ
+
+For LinkedIn Code of Conduct (OSS Code of Conduct) issues or inquiries, Global Compliance & Integrity inbox, please email integrity@linkedin.com.
+
+
+
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 00000000..7d1d7f31
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1,27 @@
+Contribution Agreement
+======================
+
+As a contributor, you represent that the code you submit is your
+original work or that of your employer (in which case you represent you
+have the right to bind your employer). By submitting code, you (and, if
+applicable, your employer) are licensing the submitted code to LinkedIn
+and the open source community subject to the Apache 2.0 license.
+
+Responsible Disclosure of Security Vulnerabilities
+==================================================
+
+Please do not file reports on Github for security issues.
+Please review the guidelines on at
+https://www.linkedin.com/help/linkedin/answer/62924/security-vulnerabilities?lang=en
+
+Tips for Getting Your Pull Request Accepted
+===========================================
+
+1. Make sure all new features are tested and the tests pass.
+2. Bug fixes must include a test case demonstrating the error that it fixes.
+
+Reporting Issues
+===============
+Please use the [link](https://github.com/linkedin/kafka-monitor/issues/new) for reporting any issues.
+
+
diff --git a/LICENSE b/LICENSE
index 02c5bb4d..0d5476d3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,3 @@
-
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
@@ -187,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
- Copyright 2016 LinkedIn Corp. All rights reserved.
+ Copyright 2016, 2017, 2018, 2019, 2020, 2021 LinkedIn Corp. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
diff --git a/NOTICE b/NOTICE
index ee9a4345..d02fd50c 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,4 +1,4 @@
-Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
+Copyright 2016, 2017, 2018, 2019, 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
@@ -34,18 +34,3 @@ License: http://www.json.org/license.html
This product includes/uses JUnit (https://http://junit.org/)
Copyright 2002-2016 JUnit
License: Eclipse Public License 1.0
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/README.md b/README.md
index 7b5ba295..8313ba42 100644
--- a/README.md
+++ b/README.md
@@ -1,95 +1,186 @@
-# Kafka Monitor
+
+
+
+# Xinfra Monitor
[](https://travis-ci.org/linkedin/kafka-monitor)
+
+
+
-Kafka Monitor is a framework to implement and execute long-running kafka
+Xinfra Monitor (formerly Kafka Monitor) is a framework to implement and execute long-running kafka
system tests in a real cluster. It complements Kafka’s existing system
tests by capturing potential bugs or regressions that are only likely to occur
after prolonged period of time or with low probability. Moreover, it allows you to monitor Kafka
cluster using end-to-end pipelines to obtain a number of derived vital stats
-such as end-to-end latency, service availability and message loss rate. You can easily
-deploy Kafka Monitor to test and monitor your Kafka cluster without requiring
+such as
+
+
+
+ End-to-end latency
+
+
+ Service availability
+
+
+ Produce and Consume availability
+
+
+ Consumer offset commit availability
+
+
+ Consumer offset commit latency
+
+
+ Kafka message loss rate
+
+
+ And many, many more.
+
+
+
+You can easily
+deploy Xinfra Monitor to test and monitor your Kafka cluster without requiring
any change to your application.
-Kafka Monitor can automatically create the monitor topic with the specified config
+Xinfra Monitor can automatically create the monitor topic with the specified config
and increase partition count of the monitor topic to ensure partition# >=
broker#. It can also reassign partition and trigger preferred leader election
to ensure that each broker acts as leader of at least one partition of the
-monitor topic. This allows Kafka Monitor to detect performance issue on every
+monitor topic. This allows Xinfra Monitor to detect performance issue on every
broker without requiring users to manually manage the partition assignment of
the monitor topic.
+Xinfra Monitor is used in conjunction with different middle-layer services such as li-apache-kafka-clients in order to monitor single clusters, pipeline desination clusters, and other types of clusters as done in Linkedin engineering for real-time cluster healthchecks.
+
+These are some of the metrics emitted from a Xinfra Monitor instance.
+
+```
+kmf:type=kafka-monitor:offline-runnable-count
+kmf.services:type=produce-service,name=*:produce-availability-avg
+kmf.services:type=consume-service,name=*:consume-availability-avg
+kmf.services:type=produce-service,name=*:records-produced-total
+kmf.services:type=consume-service,name=*:records-consumed-total
+kmf.services:type=produce-service,name=*:records-produced-rate
+kmf.services:type=produce-service,name=*:produce-error-rate
+kmf.services:type=consume-service,name=*:consume-error-rate
+kmf.services:type=consume-service,name=*:records-lost-total
+kmf.services:type=consume-service,name=*:records-lost-rate
+kmf.services:type=consume-service,name=*:records-duplicated-total
+kmf.services:type=consume-service,name=*:records-delay-ms-avg
+kmf.services:type=commit-availability-service,name=*:offsets-committed-avg
+kmf.services:type=commit-availability-service,name=*:offsets-committed-total
+kmf.services:type=commit-availability-service,name=*:failed-commit-offsets-avg
+kmf.services:type=commit-availability-service,name=*:failed-commit-offsets-total
+kmf.services:type=commit-latency-service,name=*:commit-offset-latency-ms-avg
+kmf.services:type=commit-latency-service,name=*:commit-offset-latency-ms-max
+kmf.services:type=commit-latency-service,name=*:commit-offset-latency-ms-99th
+kmf.services:type=commit-latency-service,name=*:commit-offset-latency-ms-999th
+kmf.services:type=commit-latency-service,name=*:commit-offset-latency-ms-9999th
+```
+
## Getting Started
### Prerequisites
-Kafka Monitor requires Gradle 2.0 or higher. Java 7 should be used for
+Xinfra Monitor requires Gradle 2.0 or higher. Java 7 should be used for
building in order to support both Java 7 and Java 8 at runtime.
-Kafka Monitor supports Apache Kafka 0.8 and 0.9. Use branch 0.8.2.2 to monitor Apache
-Kafka cluster 0.8. Use branch 0.9.0.1 to compile with Kafka 0.9. Use master
-branch to compile with Kafka 0.10.
+Xinfra Monitor supports Apache Kafka 0.8 to 2.0:
+- Use branch 0.8.2.2 to work with Apache Kafka 0.8
+- Use branch 0.9.0.1 to work with Apache Kafka 0.9
+- Use branch 0.10.2.1 to work with Apache Kafka 0.10
+- Use branch 0.11.x to work with Apache Kafka 0.11
+- Use branch 1.0.x to work with Apache Kafka 1.0
+- Use branch 1.1.x to work with Apache Kafka 1.1
+- Use master branch to work with Apache Kafka 2.0
+
### Configuration Tips
-- We advise advanced users to run Kafka Monitor with
-`./bin/kafka-monitor-start.sh config/kafka-monitor.properties`. The default
-kafka-monitor.properties in the repo provides an simple example of how to
+
+
We advise advanced users to run Xinfra Monitor with
+./bin/xinfra-monitor-start.sh config/xinfra-monitor.properties. The default
+xinfra-monitor.properties in the repo provides an simple example of how to
monitor a single cluster. You probably need to change the value of
-`zookeeper.connect` and `bootstrap.servers` to point to your cluster.
-
-- The full list of configs and their documentation can be found in the code of
+zookeeper.connect and bootstrap.servers to point to your cluster.
+
+
+
The full list of configs and their documentation can be found in the code of
Config class for respective service, e.g. ProduceServiceConfig.java and
-ConsumeServiceConfig.java.
-
-- You can specify multiple SingleClusterMonitor in the kafka-monitor.properties to
-monitor multiple Kafka clusters in one Kafka Monitor process. As another
-advanced use-cse, you can point ProduceService and ConsumeService to two
-different Kafka clusters that are connected by MirrorMaker to monitor their
-end-to-end latency.
-
-- Kafka Monitor by default will automatically create the monitor topic based on
-the e.g. `topic-management.replicationFactor` and `topic-management.partitionsToBrokersRatio`
+ConsumeServiceConfig.java.
+
+
You can specify multiple SingleClusterMonitor in the xinfra-monitor.properties to
+monitor multiple Kafka clusters in one Xinfra Monitor process. As another
+advanced use-case, you can point ProduceService and ConsumeService to two different Kafka clusters that are connected by MirrorMaker to monitor their end-to-end latency.
+
+
Xinfra Monitor by default will automatically create the monitor topic based on
+the e.g. topic-management.replicationFactor and topic-management.partitionsToBrokersRatio
specified in the config. replicationFactor is 1 by default and you probably
want to change it to the same replication factor as used for your existing
-topics. You can disable auto topic creation by setting `produce.topic.topicCreationEnabled` to false.
-
-- Kafka Monitor can automatically increase partition count of the monitor topic
+topics. You can disable auto topic creation by setting produce.topic.topicCreationEnabled to false.
+
+
+
Xinfra Monitor can automatically increase partition count of the monitor topic
to ensure partition# >= broker#. It can also reassign partition and trigger
preferred leader election to ensure that each broker acts as leader of at least
one partition of the monitor topic. To use this feature, use either
-EndToEndTest or TopicManagementService in the properties file.
-
-
-### Build Kafka Monitor
+EndToEndTest or TopicManagementService in the properties file.
+
+
When using Secure Sockets Layer (SSL) or any non-plaintext security protocol for AdminClient, please configure the following entries in the single-cluster-monitor props, produce.producer.props, as well as consume.consumer.props. https://docs.confluent.io/current/installation/configuration/admin-configs.html
+
+
ssl.key.password
+
ssl.keystore.location
+
ssl.keystore.password
+
ssl.truststore.location
+
ssl.truststore.password
+
+
+
+
+### Build Xinfra Monitor
```
$ git clone https://github.com/linkedin/kafka-monitor.git
$ cd kafka-monitor
$ ./gradlew jar
```
-### Start KafkaMonitor to run tests/services specified in the config file
+### Start XinfraMonitor to run tests/services specified in the config file
+```
+$ ./bin/xinfra-monitor-start.sh config/xinfra-monitor.properties
+```
+
+### Run Xinfra Monitor with arbitrary producer/consumer configuration (e.g. SASL enabled client)
+Edit `config/xinfra-monitor.properties` to specify custom configurations for producer in the key/value map `produce.producer.props` in
+`config/xinfra-monitor.properties`. Similarly specify configurations for
+consumer as well. The documentation for producer and consumer in the key/value maps can be found in the Apache Kafka wiki.
+
```
-$ ./bin/kafka-monitor-start.sh config/kafka-monitor.properties
+$ ./bin/xinfra-monitor-start.sh config/xinfra-monitor.properties
```
### Run SingleClusterMonitor app to monitor kafka cluster
+
+Metrics `produce-availability-avg` and `consume-availability-avg` demonstrate
+whether messages can be properly produced to and consumed from this cluster.
+See Service Overview wiki for how these metrics are derived.
+
```
$ ./bin/single-cluster-monitor.sh --topic test --broker-list localhost:9092 --zookeeper localhost:2181
```
-### Get metric values (e.g. service availability, message loss rate) in real-time as time series graphs
-Open ```localhost:8000/index.html``` in your web browser
+### Run MultiClusterMonitor app to monitor a pipeline of Kafka clusters connected by MirrorMaker
+Edit `config/multi-cluster-monitor.properties` to specify the right broker and
+zookeeper url as suggested by the comment in the properties file
-You can edit webapp/index.html to easily add new metrics to be displayed.
+Metrics `produce-availability-avg` and `consume-availability-avg` demonstrate
+whether messages can be properly produced to the source cluster and consumed
+from the destination cluster. See config/multi-cluster-monitor.properties for
+the full jmx path for these metrics.
-### Query metric value (e.g. service availability) via HTTP request
```
-curl localhost:8778/jolokia/read/kmf.services:type=produce-service,name=*/produce-availability-avg
+$ ./bin/xinfra-monitor-start.sh config/multi-cluster-monitor.properties
```
-You can query other JMX metric value as well by substituting object-name and
-attribute-name of the JMX metric in the query above.
-
### Run checkstyle on the java code
```
./gradlew checkstyleMain checkstyleTest
@@ -105,8 +196,6 @@ attribute-name of the JMX metric in the query above.
- [Motivation](https://github.com/linkedin/kafka-monitor/wiki/Motivation)
- [Design Overview](https://github.com/linkedin/kafka-monitor/wiki/Design-Overview)
-- [Service Design](https://github.com/linkedin/kafka-monitor/wiki/Service-Design)
+- [Service and App Overview](https://github.com/linkedin/kafka-monitor/wiki)
- [Future Work](https://github.com/linkedin/kafka-monitor/wiki/Future-Work)
-
-
-
+- [Application Configuration](https://github.com/linkedin/kafka-monitor/wiki/App-Configuration)
diff --git a/SECURITY.md b/SECURITY.md
new file mode 100644
index 00000000..f645bac2
--- /dev/null
+++ b/SECURITY.md
@@ -0,0 +1,17 @@
+# Security Policy
+
+## Supported Versions
+
+
+
+| Version | Supported |
+| ------- | ------------------ |
+| 1.1.x | :white_check_mark: |
+
+
+
+## Reporting a Vulnerability
+
+Use this section to tell people how to report a vulnerability.
+
+Please report a vulnerability on issues at https://github.com/linkedin/kafka-monitor/issues/new.
diff --git a/bin/kmf-run-class.sh b/bin/kmf-run-class.sh
index d694ee0b..0fb1f8a2 100755
--- a/bin/kmf-run-class.sh
+++ b/bin/kmf-run-class.sh
@@ -50,7 +50,7 @@ fi
# Log4j settings
if [ -z "$KAFKA_LOG4J_OPTS" ]; then
- KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/config/log4j.properties"
+ KAFKA_LOG4J_OPTS="-Dlog4j.configurationFile=$base_dir/config/log4j2.properties"
fi
KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=$LOG_DIR $KAFKA_LOG4J_OPTS"
@@ -74,7 +74,7 @@ fi
# JVM performance options
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
- KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
+ KAFKA_JVM_PERFORMANCE_OPTS="-server -Djava.awt.headless=true"
fi
diff --git a/bin/single-cluster-monitor.sh b/bin/single-cluster-monitor.sh
index 7c83636f..79f9eacb 100755
--- a/bin/single-cluster-monitor.sh
+++ b/bin/single-cluster-monitor.sh
@@ -9,4 +9,4 @@
base_dir=$(dirname $0)
-exec $base_dir/kmf-run-class.sh com/linkedin/kmf/apps/SingleClusterMonitor $@
+exec $base_dir/kmf-run-class.sh com/linkedin/xinfra/monitor/apps/SingleClusterMonitor $@
diff --git a/bin/windows/kafka-monitor-start.bat b/bin/windows/kafka-monitor-start.bat
index abba5640..45eedad7 100644
--- a/bin/windows/kafka-monitor-start.bat
+++ b/bin/windows/kafka-monitor-start.bat
@@ -15,11 +15,11 @@ popd
IF [%1] EQU [] (
- echo USAGE: %0 config/kafka-monitor.properties
+ echo USAGE: %0 config/xinfra-monitor.properties
EXIT /B 1
)
-set COMMAND=%BASE_DIR%\kmf-run-class.bat com.linkedin.kmf.KafkaMonitor %*
+set COMMAND=%BASE_DIR%\kmf-run-class.bat com.linkedin.xinfra.monitor.XinfraMonitor %*
rem echo basedir: %BASE_DIR%
diff --git a/bin/windows/kmf-run-class.bat b/bin/windows/kmf-run-class.bat
index 559a965d..caddf261 100644
--- a/bin/windows/kmf-run-class.bat
+++ b/bin/windows/kmf-run-class.bat
@@ -10,12 +10,12 @@ REM an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either expre
setlocal enabledelayedexpansion
IF [%1] EQU [] (
- echo USAGE: %0 com.linkedin.kmf.KafkaMonitor config/kafka-monitor.properties
+ echo USAGE: %0 com.linkedin.xinfra.monitor.XinfraMonitor config/xinfra-monitor.properties
EXIT /B 1
)
IF [%2] EQU [] (
- echo USAGE: %0 %1 config/kafka-monitor.properties
+ echo USAGE: %0 %1 config/xinfra-monitor.properties
EXIT /B 1
)
@@ -60,7 +60,7 @@ IF ["%LOG_DIR%"] EQU [""] (
rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
- set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%\config\log4j.properties
+ set KAFKA_LOG4J_OPTS=-Dlog4j.configurationFile=%BASE_DIR%\config\log4j2.properties
) ELSE (
# create logs directory
IF not exist %LOG_DIR% (
@@ -170,7 +170,7 @@ REM fi
REM Launch mode
REM if [ "x$DAEMON_MODE" = "xtrue" ]; then
-REM nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH REM $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &REM
-REM elseREM
+REM nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH REM $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &REM
+REM elseREM
REM exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS REM $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
REM fi
diff --git a/bin/kafka-monitor-start.sh b/bin/xinfra-monitor-start.sh
similarity index 86%
rename from bin/kafka-monitor-start.sh
rename to bin/xinfra-monitor-start.sh
index eb4863dc..2a341a16 100755
--- a/bin/kafka-monitor-start.sh
+++ b/bin/xinfra-monitor-start.sh
@@ -9,4 +9,4 @@
base_dir=$(dirname $0)
-exec $base_dir/kmf-run-class.sh com/linkedin/kmf/KafkaMonitor $@
+exec $base_dir/kmf-run-class.sh com/linkedin/xinfra/monitor/XinfraMonitor $@
diff --git a/build.gradle b/build.gradle
index 9db41aca..230cba17 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,51 +1,134 @@
+
+def configDocDir = "${buildDir}/configDocs"
+
+apply plugin: 'maven-publish'
+apply plugin: 'distribution'
+
+
allprojects {
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: 'java'
apply plugin: 'checkstyle'
- version = "1.0.0"
+ sourceCompatibility = 8
+ targetCompatibility = 8
- sourceCompatibility = 1.7
+ group = 'com.linkedin.kmf'
repositories {
mavenCentral()
+ maven {
+ url "https://linkedin.jfrog.io/artifactory/avro-util/"
+ }
}
dependencies {
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
- compile 'org.slf4j:slf4j-log4j12:1.7.6'
- compile 'org.apache.avro:avro:1.4.0'
- compile 'org.apache.kafka:kafka_2.11:0.10.1.1'
- compile 'org.apache.kafka:kafka-clients:0.10.1.1'
- compile 'org.testng:testng:6.8.8'
- compile 'org.eclipse.jetty:jetty-server:8.1.19.v20160209'
+ compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1'
+ compile 'org.apache.avro:avro:1.9.2'
compile 'org.json:json:20140107'
- compile 'com.fasterxml.jackson.core:jackson-databind:2.7.1'
- compile 'org.jolokia:jolokia-jvm:1.3.3'
+ compile 'org.jolokia:jolokia-jvm:1.6.2'
compile 'net.savantly:graphite-client:1.1.0-RELEASE'
compile 'com.timgroup:java-statsd-client:3.0.1'
-
+ compile 'com.signalfx.public:signalfx-codahale:0.0.47'
+ compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.8.2'
+ compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.2'
+ compile 'org.apache.commons:commons-lang3:3.12.0'
+ compile 'com.linkedin.avroutil1:helper-all:0.2.118'
+ compile 'org.apache.zookeeper:zookeeper:3.8.0'
+ testCompile 'org.mockito:mockito-core:2.24.0'
testCompile 'org.testng:testng:6.8.8'
}
tasks.create(name: "copyDependantLibs", type: Copy) {
- from (configurations.testRuntime) {
+ from(configurations.testRuntime) {
include('slf4j-log4j12*')
}
- from (configurations.runtime) {
- }
+ from(configurations.runtime) {}
into "build/dependant-libs"
duplicatesStrategy 'exclude'
}
jar {
+ doFirst {
+ manifest {
+ // embed version information into jar manifests
+ attributes('Name': "${project.name}",
+ 'Specification-Title': "${project.name}",
+ 'Specification-Version': "${project.version}",
+ 'Specification-Vendor': "LinkedIn",
+ 'Implementation-Title': "${project.name}",
+ 'Implementation-Version': "${project.version}",
+ 'Implementation-Vendor': "LinkedIn")
+ }
+ }
+
dependsOn 'copyDependantLibs'
}
+ task sourceJar(type: Jar) {
+ from sourceSets.main.allJava
+ classifier "sources"
+ }
+
+ task javadocJar(type: Jar) {
+ from javadoc
+ classifier = 'javadoc'
+ }
+
task testJar(type: Jar) {
- classifier = 'test'
- from sourceSets.test.output
+ from sourceSets.test.allJava
+ classifier = 'tests'
+ }
+
+ publishing {
+ publications {
+ MyPublication(MavenPublication) {
+ groupId project.group
+ artifactId project.name
+ version project.version
+
+ from components.java
+ artifact sourceJar
+ artifact javadocJar
+ artifact testJar
+ artifact distZip
+ artifact distTar
+
+ pom {
+ name = 'kafka-monitor'
+ description = 'kafka monitor'
+ url = 'https://github.com/linkedin/kafka-monitor'
+
+ licenses {
+ license {
+ name = 'The Apache Software License, Version 2.0'
+ url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
+ }
+ }
+ scm {
+ connection = 'scm:git:git://github.com:linkedin/kafka-monitor.git'
+ developerConnection = 'scm:git:ssh://github.com:linkedin/kafka-monitor.git'
+ url = 'https://github.com/linkedin/kafka-monitor'
+ }
+ }
+
+ repositories {
+ mavenLocal()
+ maven {
+ name "LinkedInJfrog"
+ url "https://linkedin.jfrog.io/artifactory/kafka-monitor"
+ credentials {
+ if (System.getenv('JFROG_USER') != null && System.getenv('JFROG_KEY') != null) {
+ username System.getenv('JFROG_USER')
+ password System.getenv('JFROG_KEY')
+ }
+ }
+ }
+ }
+ }
+ }
}
artifacts {
@@ -54,8 +137,24 @@ allprojects {
checkstyle {
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+ configProperties = ["suppressionFile": new File(rootDir, "checkstyle/suppressions.xml")]
+ }
+
+ task createConfigDocs( dependsOn : compileJava, type : JavaExec) {
+ outputs.dir configDocDir
+ classpath sourceSets.main.runtimeClasspath
+ main = 'com.linkedin.xinfra.monitor.common.ConfigDocumentationGenerator'
+ args = [configDocDir,
+ 'com.linkedin.xinfra.monitor.services.configs.ConsumeServiceConfig',
+ 'com.linkedin.xinfra.monitor.services.configs.DefaultMetricsReporterServiceConfig',
+ 'com.linkedin.xinfra.monitor.services.configs.JettyServiceConfig',
+ 'com.linkedin.xinfra.monitor.services.configs.ProduceServiceConfig',
+ 'com.linkedin.xinfra.monitor.services.configs.TopicManagementServiceConfig',
+ 'com.linkedin.xinfra.monitor.apps.configs.MultiClusterMonitorConfig']
}
+ build.dependsOn createConfigDocs
+
test.dependsOn('checkstyleMain', 'checkstyleTest')
test {
@@ -66,8 +165,43 @@ allprojects {
exceptionFormat = 'full'
}
}
+
+ distributions {
+ main {
+ contents {
+ into('bin') {
+ from 'bin'
+ }
+ into('build/libs') {
+ from jar
+ }
+ into('build/dependant-libs') {
+ from copyDependantLibs
+ }
+ into('config') {
+ from 'config'
+ }
+ into('build/configDocs') {
+ from createConfigDocs
+ }
+ into('webapp') {
+ from 'webapp'
+ }
+ from('.') {
+ include 'README.md'
+ }
+ }
+ }
+ }
+ tasks.withType(Tar){
+ compression = Compression.GZIP
+ extension = 'tar.gz'
+ }
+
}
-task wrapper(type: Wrapper) {
- gradleVersion = '2.11'
+wrapper {
+ gradleVersion = '5.2.1'
+ distributionType = Wrapper.DistributionType.ALL
+
}
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index f7edb531..da75cff1 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -18,11 +18,14 @@
-
+
-
+
+
+
+
@@ -32,6 +35,7 @@
+
@@ -77,4 +81,6 @@
+
+
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
new file mode 100644
index 00000000..b5062103
--- /dev/null
+++ b/checkstyle/suppressions.xml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/config/kafka-monitor.properties b/config/kafka-monitor.properties
deleted file mode 100644
index 91779669..00000000
--- a/config/kafka-monitor.properties
+++ /dev/null
@@ -1,95 +0,0 @@
-# Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
-# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-
-# This properties file specifies the tests/services that KafkaMonitor
-# should instantiate and run, together with the key/value pairs used to
-# configure these tests/services. It should have the following format:
-#
-# {
-# "name1" : {
-# "type": TestClassName
-# "key1": value1,
-# "key2": value2,
-# ...
-# },
-# "name2" : {
-# "type": ServiceClassName
-# "key1": value1,
-# "key2": value2,
-# ...
-# },
-# ...
-# }
-#
-# TestClassName can be canonical name or simple name of any class that implements
-# interface com.linkedin.kmf.services.Test. These classes should be under
-# package com.linkedin.kmf.tests.
-#
-# ServiceClassName can be canonical name or simple name of any class that implements
-# interface com.linkedin.kmf.services.Service. These classes should be under
-# package com.linkedin.kmf.services.
-#
-# Each test/service should be configured with class.name which can be either TestClassName
-# or ServiceClassName. The key for the test/service in the json map is used as name to
-# identify the test/service in the log or JMX metrics, which is useful if multiple
-# test/service with the same class.name are run in the same Kafka Monitor process.
-#
-
-{
- "single-cluster-monitor": {
- "class.name": "com.linkedin.kmf.apps.SingleClusterMonitor",
- "topic": "kafka-monitor-topic",
- "zookeeper.connect": "localhost:2181",
- "bootstrap.servers": "localhost:9092",
- "produce.record.delay.ms": 100,
- "topic-management.topicCreationEnabled": true,
- "topic-management.replicationFactor" : 1,
- "topic-management.partitionsToBrokersRatio" : 2.0,
- "topic-management.rebalance.interval.ms" : 600000,
- "topic-management.topicFactory.props": {
- },
- "produce.producer.props": {
- "client.id": "kmf-client-id"
- },
-
- "consume.latency.sla.ms": "20000",
- "consume.consumer.props": {
-
- }
-
- },
-
- "reporter-service": {
- "class.name": "com.linkedin.kmf.services.DefaultMetricsReporterService",
- "report.interval.sec": 1,
- "report.metrics.list": [
- "kmf:type=kafka-monitor:offline-runnable-count",
- "kmf.services:type=produce-service,name=*:produce-availability-avg",
- "kmf.services:type=consume-service,name=*:consume-availability-avg",
- "kmf.services:type=produce-service,name=*:records-produced-total",
- "kmf.services:type=consume-service,name=*:records-consumed-total",
- "kmf.services:type=consume-service,name=*:records-lost-total",
- "kmf.services:type=consume-service,name=*:records-duplicated-total",
- "kmf.services:type=consume-service,name=*:records-delay-ms-avg",
- "kmf.services:type=produce-service,name=*:records-produced-rate",
- "kmf.services:type=produce-service,name=*:produce-error-rate",
- "kmf.services:type=consume-service,name=*:consume-error-rate"
- ]
- },
-
- "jetty-service": {
- "class.name": "com.linkedin.kmf.services.JettyService",
- "jetty.port": 8000
- },
-
- "jolokia-service": {
- "class.name": "com.linkedin.kmf.services.JolokiaService"
- }
-}
-
-
diff --git a/config/log4j.properties b/config/log4j.properties
deleted file mode 100644
index 8d4b79a9..00000000
--- a/config/log4j.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-# Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
-# file except in compliance with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
-# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-
-log4j.rootLogger=INFO, stdout
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
-
-log4j.appender.kafkaClientAppender=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.kafkaClientAppender.DatePattern='.'yyyy-MM-dd-HH
-log4j.appender.kafkaClientAppender.File=${kafka.logs.dir}/kafka-client.log
-log4j.appender.kafkaClientAppender.layout=org.apache.log4j.PatternLayout
-log4j.appender.kafkaClientAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
-
-log4j.logger.com.linkedin.kmf.core.KafkaMonitor=INFO, stdout
-log4j.additivity.com.linkedin.kmf.core.KafkaMonitor=false
-
-log4j.logger.org.apache.kafka=WARN, kafkaClientAppender
-log4j.additivity.org.apache.kafka=false
-
-log4j.logger.kafka=WARN, kafkaClientAppender
-log4j.additivity.kafka=false
-
diff --git a/config/log4j2.properties b/config/log4j2.properties
new file mode 100644
index 00000000..4896697d
--- /dev/null
+++ b/config/log4j2.properties
@@ -0,0 +1,47 @@
+# Copyright 2020 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+
+status = error
+dest = err
+name = PropertiesConfig
+
+filter.threshold.type = ThresholdFilter
+filter.threshold.level = debug
+
+appender.console.type = Console
+appender.console.name = STDOUT
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = [%d] %p %m (%c)%n
+
+appender.kafka.type = RollingFile
+appender.kafka.name = KAFKA
+appender.kafka.filename = ${sys:kafka.logs.dir}/kafka-client.log
+appender.kafka.filePattern = ${sys:kafka.logs.dir}/kafka-client.log.%d{yyyy-MM-dd-HH}
+appender.kafka.layout.type = PatternLayout
+appender.kafka.layout.pattern = [%d] %p %m (%c)%n
+appender.kafka.policies.type = Policies
+appender.kafka.policies.time.type = TimeBasedTriggeringPolicy
+
+# Modify this as needed when working on dev box. Trace -> Debug -> Info -> Warn -> Error -> Fatal
+rootLogger.level = info
+rootLogger.appenderRef.console.ref = STDOUT
+
+logger.kmf.name = com.linkedin.kmf.core.KafkaMonitor
+logger.kmf.level = info
+logger.kmf.additivity = false
+logger.kmf.appenderRef.console.ref = STDOUT
+
+logger.kafkaClient.name = org.apache.kafka
+logger.kafkaClient.level = warn
+logger.kafkaClient.additivity = false
+logger.kafkaClient.appenderRef.kafka.ref = KAFKA
+
+logger.kafka.name = kafka
+logger.kafka.level = warn
+logger.kafka.additivity = false
+logger.kafka.appenderRef.kafka.ref = KAFKA
diff --git a/config/multi-cluster-monitor.properties b/config/multi-cluster-monitor.properties
index 6c35ec19..dd40b035 100644
--- a/config/multi-cluster-monitor.properties
+++ b/config/multi-cluster-monitor.properties
@@ -12,22 +12,23 @@
# each cluster in the pipeline. The "produce.service.props" should use the first cluster and
# the "consume.service.props" should use the last cluster in the pipeline.
+# Produce service: Configure Produce Service to produce to the first cluster of the pipeline
+# Consume service: Configure Consume Service to consume from the last cluster of the pipeline
+# Last cluster: If there are more than two clusters in the pipeline, add one property map for each one of them.
{
"multi-cluster-monitor": {
"class.name": "com.linkedin.kmf.apps.MultiClusterMonitor",
"topic": "kafka-monitor-topic",
-
"produce.service.props": {
- "zookeeper.connect": "localhost:2181/cluster1",
+ "zookeeper.connect": "localhost:2181/first_cluster",
"bootstrap.servers": "localhost:9092",
"produce.record.delay.ms": 100,
"produce.producer.props": {
"client.id": "kafka-monitor-client-id"
}
},
-
"consume.service.props": {
- "zookeeper.connect": "localhost:2181/cluster2",
+ "zookeeper.connect": "localhost:2181/last_cluster",
"bootstrap.servers": "localhost:9095",
"consume.latency.sla.ms": "20000",
"consume.consumer.props": {
@@ -37,7 +38,8 @@
"topic.management.props.per.cluster" : {
"first-cluster" : {
- "zookeeper.connect": "localhost:2181/cluster1",
+ "bootstrap.servers": "localhost:9092",
+ "zookeeper.connect": "localhost:2181/first_cluster",
"topic-management.topicCreationEnabled": true,
"topic-management.replicationFactor" : 1,
"topic-management.partitionsToBrokersRatio" : 2.0,
@@ -47,7 +49,8 @@
},
"last-cluster" : {
- "zookeeper.connect": "localhost:2181/cluster2",
+ "bootstrap.servers": "localhost:9095",
+ "zookeeper.connect": "localhost:2181/last_cluster",
"topic-management.topicCreationEnabled": true,
"topic-management.replicationFactor" : 1,
"topic-management.partitionsToBrokersRatio" : 2.0,
@@ -68,22 +71,22 @@
"kmf.services:type=produce-service,name=*:records-produced-total",
"kmf.services:type=consume-service,name=*:records-consumed-total",
"kmf.services:type=consume-service,name=*:records-lost-total",
+ "kmf.services:type=consume-service,name=*:records-lost-rate",
"kmf.services:type=consume-service,name=*:records-duplicated-total",
"kmf.services:type=consume-service,name=*:records-delay-ms-avg",
"kmf.services:type=produce-service,name=*:records-produced-rate",
"kmf.services:type=produce-service,name=*:produce-error-rate",
- "kmf.services:type=consume-service,name=*:consume-error-rate"
+ "kmf.services:type=consume-service,name=*:consume-error-rate",
+ "kmf.services:type=commit-availability-service,name=*:offsets-committed-avg",
+ "kmf.services:type=commit-availability-service,name=*:commit-latency-avg",
+ "kmf.services:type=commit-availability-service,name=*:commit-availability-avg",
+ "kmf.services:type=commit-availability-service,name=*:failed-commit-offsets-avg",
+ "kmf.services:type=commit-availability-service,name=*:offsets-committed-total",
+ "kmf.services:type=commit-availability-service,name=*:failed-commit-offsets-total"
]
},
- "jetty-service": {
- "class.name": "com.linkedin.kmf.services.JettyService",
- "jetty.port": 8000
- },
-
"jolokia-service": {
"class.name": "com.linkedin.kmf.services.JolokiaService"
}
}
-
-
diff --git a/config/prometheus-exporter.yaml b/config/prometheus-exporter.yaml
new file mode 100644
index 00000000..7401e3a4
--- /dev/null
+++ b/config/prometheus-exporter.yaml
@@ -0,0 +1,7 @@
+---
+lowercaseOutputName: true
+rules:
+- pattern : kmf<>([\w\d-]+)
+ name: kmf_$1_$2
+- pattern : kmf.services<>([\w\d-]+)
+ name: kmf_services_$1_$2_$3
diff --git a/config/xinfra-monitor.properties b/config/xinfra-monitor.properties
new file mode 100644
index 00000000..6993bf47
--- /dev/null
+++ b/config/xinfra-monitor.properties
@@ -0,0 +1,197 @@
+# Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+
+# This properties file specifies the tests/services that XinfraMonitor
+# should instantiate and run, together with the key/value pairs used to
+# configure these tests/services. It should have the following format:
+#
+# {
+# "name1" : {
+# "type": TestClassName
+# "key1": value1,
+# "key2": value2,
+# ...
+# },
+# "name2" : {
+# "type": ServiceClassName
+# "key1": value1,
+# "key2": value2,
+# ...
+# },
+# ...
+# }
+#
+# TestClassName can be canonical name or simple name of any class that implements
+# interface com.linkedin.kmf.services.Test. These classes should be under
+# package com.linkedin.kmf.tests.
+#
+# ServiceClassName can be canonical name or simple name of any class that implements
+# interface com.linkedin.kmf.services.Service. These classes should be under
+# package com.linkedin.kmf.services.
+#
+# Each test/service should be configured with class.name which can be either TestClassName
+# or ServiceClassName. The key for the test/service in the json map is used as name to
+# identify the test/service in the log or JMX metrics, which is useful if multiple
+# test/service with the same class.name are run in the same Kafka Monitor process.
+#
+# If using Secure Socket Layer for security protocol, SSL properties must be defined under
+# produce.producer.props, consume.consumer.props, as well as single-cluster-monitor props
+
+{
+ "single-cluster-monitor": {
+ "class.name": "com.linkedin.xinfra.monitor.apps.SingleClusterMonitor",
+ "topic": "xinfra-monitor-topic",
+ "zookeeper.connect": "localhost:2181",
+ "bootstrap.servers": "localhost:9092,localhost:9093",
+ "request.timeout.ms": 9000,
+ "produce.record.delay.ms": 100,
+ "topic-management.topicManagementEnabled": true,
+ "topic-management.topicCreationEnabled": true,
+ "topic-management.replicationFactor" : 1,
+ "topic-management.partitionsToBrokersRatio" : 2.0,
+ "topic-management.rebalance.interval.ms" : 600000,
+ "topic-management.preferred.leader.election.check.interval.ms" : 300000,
+ "topic-management.topicFactory.props": {
+ },
+ "topic-management.topic.props": {
+ "retention.ms": "3600000"
+ },
+ "produce.producer.props": {
+ "client.id": "kmf-client-id"
+ },
+
+ "consume.latency.sla.ms": "20000",
+ "consume.consumer.props": {
+ }
+ },
+
+ "offset-commit-service": {
+ "class.name": "com.linkedin.xinfra.monitor.services.OffsetCommitService",
+ "zookeeper.connect": "localhost:2181",
+ "bootstrap.servers": "localhost:9092,localhost:9093",
+ "consumer.props": {
+ "group.id": "target-consumer-group"
+ }
+ },
+
+ "jolokia-service": {
+ "class.name": "com.linkedin.xinfra.monitor.services.JolokiaService"
+ },
+
+ "reporter-service": {
+ "class.name": "com.linkedin.xinfra.monitor.services.DefaultMetricsReporterService",
+ "report.interval.sec": 1,
+ "report.metrics.list": [
+ "kmf:type=kafka-monitor:offline-runnable-count",
+ "kmf.services:type=produce-service,name=*:produce-availability-avg",
+ "kmf.services:type=consume-service,name=*:consume-availability-avg",
+ "kmf.services:type=produce-service,name=*:records-produced-total",
+ "kmf.services:type=consume-service,name=*:records-consumed-total",
+ "kmf.services:type=produce-service,name=*:records-produced-rate",
+ "kmf.services:type=produce-service,name=*:produce-error-rate",
+ "kmf.services:type=consume-service,name=*:consume-error-rate",
+ "kmf.services:type=consume-service,name=*:records-lost-total",
+ "kmf.services:type=consume-service,name=*:records-lost-rate",
+ "kmf.services:type=consume-service,name=*:records-duplicated-total",
+ "kmf.services:type=consume-service,name=*:records-delay-ms-avg",
+ "kmf.services:type=commit-availability-service,name=*:offsets-committed-avg",
+ "kmf.services:type=commit-availability-service,name=*:offsets-committed-total",
+ "kmf.services:type=commit-availability-service,name=*:failed-commit-offsets-avg",
+ "kmf.services:type=commit-availability-service,name=*:failed-commit-offsets-total",
+ "kmf.services:type=commit-latency-service,name=*:commit-offset-latency-ms-avg",
+ "kmf.services:type=commit-latency-service,name=*:commit-offset-latency-ms-max",
+ "kmf.services:type=commit-latency-service,name=*:commit-offset-latency-ms-99th",
+ "kmf.services:type=commit-latency-service,name=*:commit-offset-latency-ms-999th",
+ "kmf.services:type=commit-latency-service,name=*:commit-offset-latency-ms-9999th",
+ "kmf.services:type=cluster-topic-manipulation-service,name=*:topic-creation-metadata-propagation-ms-avg",
+ "kmf.services:type=cluster-topic-manipulation-service,name=*:topic-creation-metadata-propagation-ms-max",
+ "kmf.services:type=cluster-topic-manipulation-service,name=*:topic-deletion-metadata-propagation-ms-avg",
+ "kmf.services:type=cluster-topic-manipulation-service,name=*:topic-deletion-metadata-propagation-ms-max",
+ "kmf.services:type=offset-commit-service,name=*:offset-commit-availability-avg",
+ "kmf.services:type=offset-commit-service,name=*:offset-commit-service-success-rate",
+ "kmf.services:type=offset-commit-service,name=*:offset-commit-service-success-total",
+ "kmf.services:type=offset-commit-service,name=*:offset-commit-service-failure-rate",
+ "kmf.services:type=offset-commit-service,name=*:offset-commit-service-failure-total"
+ ]
+ },
+
+ "cluster-topic-manipulation-service":{
+ "class.name":"com.linkedin.xinfra.monitor.services.ClusterTopicManipulationService",
+ "zookeeper.connect": "localhost:2181",
+ "bootstrap.servers":"localhost:9092,localhost:9093",
+ "topic": "xinfra-monitor-topic"
+ },
+
+# Example produce-service to produce messages to cluster
+# "produce-service": {
+# "class.name": "com.linkedin.kmf.services.ProduceService",
+# "topic": "xinfra-monitor-topic",
+# "zookeeper.connect": "localhost:2181",
+# "bootstrap.servers": "localhost:9092",
+# "consume.latency.sla.ms": "20000",
+# "consume.consumer.props": {
+# }
+# },
+
+# Example consume-service to consume messages
+# "consume-service": {
+# "class.name": "com.linkedin.kmf.services.ConsumeService",
+# "topic": "xinfra-monitor-topic",
+# "zookeeper.connect": "localhost:2181",
+# "bootstrap.servers": "localhost:9092",
+# "consume.latency.sla.ms": "20000",
+# "consume.consumer.props": {
+# }
+# },
+
+# Example statsd-service to report metrics
+# "statsd-service": {
+# "class.name": "com.linkedin.xinfra.monitor.services.StatsdMetricsReporterService",
+# "report.statsd.host": "localhost",
+# "report.statsd.port": "8125",
+# "report.statsd.prefix": "xinfra-monitor",
+# "report.interval.sec": 1,
+# "report.metrics.list": [
+# "kmf.services:type=produce-service,name=*:produce-availability-avg",
+# "kmf.services:type=consume-service,name=*:consume-availability-avg"
+# ]
+# },
+
+# Example kafka-service to report metrics
+ "reporter-kafka-service": {
+ "class.name": "com.linkedin.xinfra.monitor.services.KafkaMetricsReporterService",
+ "report.interval.sec": 3,
+ "zookeeper.connect": "localhost:2181",
+ "bootstrap.servers": "localhost:9092",
+ "topic": "xinfra-monitor-topic-metrics",
+ "report.kafka.topic.replication.factor": 1,
+ "report.metrics.list": [
+ "kmf.services:type=produce-service,name=*:produce-availability-avg",
+ "kmf.services:type=consume-service,name=*:consume-availability-avg",
+ "kmf.services:type=produce-service,name=*:records-produced-total",
+ "kmf.services:type=consume-service,name=*:records-consumed-total",
+ "kmf.services:type=consume-service,name=*:records-lost-total",
+ "kmf.services:type=consume-service,name=*:records-duplicated-total",
+ "kmf.services:type=consume-service,name=*:records-delay-ms-avg",
+ "kmf.services:type=produce-service,name=*:records-produced-rate",
+ "kmf.services:type=produce-service,name=*:produce-error-rate",
+ "kmf.services:type=consume-service,name=*:consume-error-rate"
+ ]
+ }
+
+# Example signalfx-service to report metrics
+# "signalfx-service": {
+# "class.name": "com.linkedin.kmf.services.SignalFxMetricsReporterService",
+# "report.interval.sec": 1,
+# "report.metric.dimensions": {
+# },
+# "report.signalfx.url": "",
+# "report.signalfx.token" : ""
+# }
+
+}
diff --git a/docker/Dockerfile b/docker/Dockerfile
index fcd49e39..baff1023 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -17,14 +17,11 @@ MAINTAINER coffeepac@gmail.com
WORKDIR /opt/kafka-monitor
ADD build/ build/
-ADD core/build/ core/build/
-ADD services/build/ services/build/
-ADD tests/build/ tests/build/
-ADD bin/kafka-monitor-start.sh bin/kafka-monitor-start.sh
+ADD bin/xinfra-monitor-start.sh bin/xinfra-monitor-start.sh
ADD bin/kmf-run-class.sh bin/kmf-run-class.sh
-ADD config/kafka-monitor.properties config/kafka-monitor.properties
-ADD config/log4j.properties config/log4j.properties
+ADD config/xinfra-monitor.properties config/xinfra-monitor.properties
+ADD config/log4j2.properties config/log4j2.properties
ADD docker/kafka-monitor-docker-entry.sh kafka-monitor-docker-entry.sh
ADD webapp/ webapp/
-CMD ["/opt/kafka-monitor/kafka-monitor-docker-entry.sh"]
\ No newline at end of file
+CMD ["/opt/kafka-monitor/kafka-monitor-docker-entry.sh"]
diff --git a/docker/kafka-monitor-docker-entry.sh b/docker/kafka-monitor-docker-entry.sh
index 3abc003b..97554bb0 100755
--- a/docker/kafka-monitor-docker-entry.sh
+++ b/docker/kafka-monitor-docker-entry.sh
@@ -15,7 +15,13 @@
set -x
+# SIGTERM-handler
+trap 'pkill java; exit 130' SIGINT
+trap 'pkill java; exit 143' SIGTERM
+
# wait for DNS services to be available
sleep 10
-bin/kafka-monitor-start.sh config/kafka-monitor.properties
+bin/xinfra-monitor-start.sh config/xinfra-monitor.properties &
+
+wait $!
\ No newline at end of file
diff --git a/docs/images/xinfra_monitor.png b/docs/images/xinfra_monitor.png
new file mode 100644
index 00000000..d0dfce61
Binary files /dev/null and b/docs/images/xinfra_monitor.png differ
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index 05ef575b..51288f9c 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index c0abcf1d..842c8c5a 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Tue Dec 06 22:38:25 EST 2016
+#Mon Apr 01 18:19:43 PDT 2019
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-3.2.1-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-all.zip
diff --git a/gradlew b/gradlew
index 9d82f789..2477741a 100755
--- a/gradlew
+++ b/gradlew
@@ -1,4 +1,4 @@
-#!/usr/bin/env bash
+#!/usr/bin/env sh
##############################################################################
##
@@ -6,12 +6,30 @@
##
##############################################################################
-# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
-DEFAULT_JVM_OPTS=""
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+# Need this for relative symlinks.
+while [ -h "$PRG" ] ; do
+ ls=`ls -ld "$PRG"`
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ PRG=`dirname "$PRG"`"/$link"
+ fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/" >/dev/null
+APP_HOME="`pwd -P`"
+cd "$SAVED" >/dev/null
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS=""
+
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
@@ -30,6 +48,7 @@ die ( ) {
cygwin=false
msys=false
darwin=false
+nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
@@ -40,26 +59,11 @@ case "`uname`" in
MINGW* )
msys=true
;;
+ NONSTOP* )
+ nonstop=true
+ ;;
esac
-# Attempt to set APP_HOME
-# Resolve links: $0 may be a link
-PRG="$0"
-# Need this for relative symlinks.
-while [ -h "$PRG" ] ; do
- ls=`ls -ld "$PRG"`
- link=`expr "$ls" : '.*-> \(.*\)$'`
- if expr "$link" : '/.*' > /dev/null; then
- PRG="$link"
- else
- PRG=`dirname "$PRG"`"/$link"
- fi
-done
-SAVED="`pwd`"
-cd "`dirname \"$PRG\"`/" >/dev/null
-APP_HOME="`pwd -P`"
-cd "$SAVED" >/dev/null
-
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
@@ -85,7 +89,7 @@ location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
-if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
+if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
@@ -100,18 +104,18 @@ if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then
fi
fi
-# For Darwin, add options to specify how the application appears in the dock
+# For Darwin, add options to specify how the application appears in the dock.
if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
-# For Cygwin, switch paths to Windows format before running java
+# For Cygwin, switch paths to Windows format before running java.
if $cygwin ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
- # We build the pattern for arguments to be converted via cygpath
+ # We build the pattern for arguments to be converted via cygpath.
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
SEP=""
for dir in $ROOTDIRSRAW ; do
@@ -119,7 +123,7 @@ if $cygwin ; then
SEP="|"
done
OURCYGPATTERN="(^($ROOTDIRS))"
- # Add a user-defined pattern to the cygpath arguments
+ # Add a user-defined pattern to the cygpath arguments.
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
fi
@@ -150,11 +154,19 @@ if $cygwin ; then
esac
fi
-# Split up the JVM_OPTS And GRADLE_OPTS values into an array, following the shell quoting and substitution rules
-function splitJvmOpts() {
- JVM_OPTS=("$@")
+# Escape application args
+save ( ) {
+ for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
+ echo " "
}
-eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS
-JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME"
+APP_ARGS=$(save "$@")
+
+# Collect all arguments for the java command, following the shell quoting and substitution rules
+eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
+
+# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
+if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
+ cd "$(dirname "$0")"
+fi
-exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@"
+exec "$JAVACMD" "$@"
diff --git a/gradlew.bat b/gradlew.bat
index aec99730..e95643d6 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -8,14 +8,14 @@
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
-@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
-set DEFAULT_JVM_OPTS=
-
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+set DEFAULT_JVM_OPTS=
+
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
@@ -46,10 +46,9 @@ echo location of your Java installation.
goto fail
:init
-@rem Get command-line arguments, handling Windowz variants
+@rem Get command-line arguments, handling Windows variants
if not "%OS%" == "Windows_NT" goto win9xME_args
-if "%@eval[2+2]" == "4" goto 4NT_args
:win9xME_args
@rem Slurp the command line arguments.
@@ -60,11 +59,6 @@ set _SKIP=2
if "x%~1" == "x" goto execute
set CMD_LINE_ARGS=%*
-goto execute
-
-:4NT_args
-@rem Get arguments from the 4NT Shell from JP Software
-set CMD_LINE_ARGS=%$
:execute
@rem Setup the command line
diff --git a/scripts/publishToJfrog.sh b/scripts/publishToJfrog.sh
new file mode 100755
index 00000000..cc12364d
--- /dev/null
+++ b/scripts/publishToJfrog.sh
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+
+result=${PWD##*/}
+if [[ "$result" = "scripts" ]]
+then
+ echo "script must be run from root project folder, not $PWD"
+ exit 1
+else
+ echo "we are in $PWD and tag is $RELEASE_TAG"
+
+ if [[ $RELEASE_TAG =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]
+ then
+ echo "publishing: tag $RELEASE_TAG looks like a semver"
+ git status
+ git describe --tags
+ ./gradlew printVersion
+ ./gradlew publishMyPublicationPublicationToLinkedInJfrogRepository
+ else
+ echo "not publishing: tag $RELEASE_TAG is NOT a valid semantic version (x.y.z)"
+ fi
+fi
diff --git a/semantic-build-versioning.gradle b/semantic-build-versioning.gradle
new file mode 100644
index 00000000..bee379f7
--- /dev/null
+++ b/semantic-build-versioning.gradle
@@ -0,0 +1,2 @@
+
+/* This is used by vivin:gradle-semantic-build-versioning plugin to generate versioned jar files. */
diff --git a/settings.gradle b/settings.gradle
index e69de29b..f37e0228 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -0,0 +1,15 @@
+buildscript {
+ repositories {
+ maven {
+ url 'https://plugins.gradle.org/m2/'
+ }
+ }
+ dependencies {
+ classpath 'gradle.plugin.net.vivin:gradle-semantic-build-versioning:4.0.0'
+ }
+}
+
+apply plugin: 'net.vivin.gradle-semantic-build-versioning'
+
+// otherwise it defaults to the folder name
+rootProject.name = 'kafka-monitor'
diff --git a/src/main/java/com/linkedin/kmf/common/Utils.java b/src/main/java/com/linkedin/kmf/common/Utils.java
deleted file mode 100644
index e1c8494b..00000000
--- a/src/main/java/com/linkedin/kmf/common/Utils.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
- * file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- */
-package com.linkedin.kmf.common;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Properties;
-import java.util.Set;
-
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.server.KafkaConfig;
-import kafka.utils.ZkUtils;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.JsonEncoder;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.security.JaasUtils;
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
-
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-
-/**
- * Kafka monitoring utilities.
- */
-public class Utils {
- private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
-
- public static final int ZK_CONNECTION_TIMEOUT_MS = 30_000;
- public static final int ZK_SESSION_TIMEOUT_MS = 30_000;
-
- /**
- * Read number of partitions for the given topic on the specified zookeeper
- * @param zkUrl zookeeper connection url
- * @param topic topic name
- *
- * @return the number of partitions of the given topic
- */
- public static int getPartitionNumForTopic(String zkUrl, String topic) {
- ZkUtils zkUtils = ZkUtils.apply(zkUrl, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, JaasUtils.isZkSecurityEnabled());
- try {
- Seq topics = scala.collection.JavaConversions.asScalaBuffer(Arrays.asList(topic));
- return zkUtils.getPartitionsForTopics(topics).apply(topic).size();
- } catch (NoSuchElementException e) {
- return 0;
- } finally {
- zkUtils.close();
- }
- }
-
- /**
- * Create the topic that the monitor uses to monitor the cluster. This method attempts to create a topic so that all
- * the brokers in the cluster will have partitionToBrokerRatio partitions. If the topic exists, but has different parameters
- * then this does nothing to update the parameters.
- *
- * TODO: Do we care about rack aware mode? I would think no because we want to spread the topic over all brokers.
- * @param zkUrl zookeeper connection url
- * @param topic topic name
- * @param replicationFactor the replication factor for the topic
- * @param partitionToBrokerRatio This is multiplied by the number brokers to compute the number of partitions in the topic.
- * @param topicConfig additional parameters for the topic for example min.insync.replicas
- * @return the number of partitions created
- */
- public static int createMonitoringTopicIfNotExists(String zkUrl, String topic, int replicationFactor,
- double partitionToBrokerRatio, Properties topicConfig) {
- ZkUtils zkUtils = ZkUtils.apply(zkUrl, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, JaasUtils.isZkSecurityEnabled());
- try {
- if (AdminUtils.topicExists(zkUtils, topic)) {
- return getPartitionNumForTopic(zkUrl, topic);
- }
-
- int brokerCount = zkUtils.getAllBrokersInCluster().size();
-
- int partitionCount = (int) Math.ceil(brokerCount * partitionToBrokerRatio);
-
- int defaultMinIsr = Math.max(replicationFactor - 1, 1);
- if (!topicConfig.containsKey(KafkaConfig.MinInSyncReplicasProp())) {
- topicConfig.setProperty(KafkaConfig.MinInSyncReplicasProp(), Integer.toString(defaultMinIsr));
- }
-
- try {
- AdminUtils.createTopic(zkUtils, topic, partitionCount, replicationFactor, topicConfig, RackAwareMode.Enforced$.MODULE$);
- } catch (TopicExistsException e) {
- //There is a race condition with the consumer.
- LOG.debug("Monitoring topic " + topic + " already exists in cluster " + zkUrl, e);
- return getPartitionNumForTopic(zkUrl, topic);
- }
- LOG.info("Created monitoring topic " + topic + " in cluster " + zkUrl + " with " + partitionCount + " partitions, min ISR of "
- + topicConfig.get(KafkaConfig.MinInSyncReplicasProp()) + " and replication factor of " + replicationFactor + ".");
-
- return partitionCount;
- } finally {
- zkUtils.close();
- }
- }
-
- /**
- * @param zkUrl zookeeper connection url
- * @return number of brokers in this cluster
- */
- public static int getBrokerCount(String zkUrl) {
- ZkUtils zkUtils = ZkUtils.apply(zkUrl, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, JaasUtils.isZkSecurityEnabled());
- try {
- return zkUtils.getAllBrokersInCluster().size();
- } finally {
- zkUtils.close();
- }
- }
-
- /**
- * @param timestamp time in Ms when this message is generated
- * @param topic topic this message is sent to
- * @param idx index is consecutive numbers used by KafkaMonitor to determine duplicate or lost messages
- * @param msgSize size of the message
- * @return string that encodes the above fields
- */
- public static String jsonFromFields(String topic, long idx, long timestamp, String producerId, int msgSize) {
- GenericRecord record = new GenericData.Record(DefaultTopicSchema.MESSAGE_V0);
- record.put(DefaultTopicSchema.TOPIC_FIELD.name(), topic);
- record.put(DefaultTopicSchema.INDEX_FIELD.name(), idx);
- record.put(DefaultTopicSchema.TIME_FIELD.name(), timestamp);
- record.put(DefaultTopicSchema.PRODUCER_ID_FIELD.name(), producerId);
- // CONTENT_FIELD is composed of #msgSize number of character 'x', e.g. xxxxxxxxxx
- record.put(DefaultTopicSchema.CONTENT_FIELD.name(), String.format("%1$-" + msgSize + "s", "").replace(' ', 'x'));
- return jsonFromGenericRecord(record);
- }
-
- /**
- * @param message kafka message in the string format
- * @return GenericRecord that is deserialized from kafka message w.r.t. expected schema
- */
- public static GenericRecord genericRecordFromJson(String message) {
- GenericRecord record = new GenericData.Record(DefaultTopicSchema.MESSAGE_V0);
- JSONObject jsonObject = new JSONObject(message);
- record.put(DefaultTopicSchema.TOPIC_FIELD.name(), jsonObject.getString(DefaultTopicSchema.TOPIC_FIELD.name()));
- record.put(DefaultTopicSchema.INDEX_FIELD.name(), jsonObject.getLong(DefaultTopicSchema.INDEX_FIELD.name()));
- record.put(DefaultTopicSchema.TIME_FIELD.name(), jsonObject.getLong(DefaultTopicSchema.TIME_FIELD.name()));
- record.put(DefaultTopicSchema.PRODUCER_ID_FIELD.name(), jsonObject.getString(DefaultTopicSchema.PRODUCER_ID_FIELD.name()));
- record.put(DefaultTopicSchema.CONTENT_FIELD.name(), jsonObject.getString(DefaultTopicSchema.CONTENT_FIELD.name()));
- return record;
- }
-
- public static String jsonFromGenericRecord(GenericRecord record) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- GenericDatumWriter writer = new GenericDatumWriter<>(DefaultTopicSchema.MESSAGE_V0);
-
- try {
- Encoder encoder = new JsonEncoder(DefaultTopicSchema.MESSAGE_V0, out);
- writer.write(record, encoder);
- encoder.flush();
- } catch (IOException e) {
- LOG.error("Unable to serialize avro record due to error " + e);
- }
- return out.toString();
- }
-
- public static List getMBeanAttributeValues(String mbeanExpr, String attributeExpr) {
- List values = new ArrayList<>();
- MBeanServer server = ManagementFactory.getPlatformMBeanServer();
- try {
- Set mbeanNames = server.queryNames(new ObjectName(mbeanExpr), null);
- for (ObjectName mbeanName: mbeanNames) {
- MBeanInfo mBeanInfo = server.getMBeanInfo(mbeanName);
- MBeanAttributeInfo[] attributeInfos = mBeanInfo.getAttributes();
- for (MBeanAttributeInfo attributeInfo: attributeInfos) {
- if (attributeInfo.getName().equals(attributeExpr) || attributeExpr.length() == 0 || attributeExpr.equals("*")) {
- double value = (Double) server.getAttribute(mbeanName, attributeInfo.getName());
- values.add(new MbeanAttributeValue(mbeanName.getCanonicalName(), attributeInfo.getName(), value));
- }
- }
- }
- } catch (Exception e) {
- LOG.error("fail to retrieve value for " + mbeanExpr + ":" + attributeExpr, e);
- }
- return values;
- }
-
-}
diff --git a/src/main/java/com/linkedin/kmf/consumer/NewConsumer.java b/src/main/java/com/linkedin/kmf/consumer/NewConsumer.java
deleted file mode 100644
index 0d827606..00000000
--- a/src/main/java/com/linkedin/kmf/consumer/NewConsumer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
- * file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- */
-package com.linkedin.kmf.consumer;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Properties;
-
-/*
- * Wrap around the new consumer from Apache Kafka and implement the #KMBaseConsumer interface
- */
-public class NewConsumer implements KMBaseConsumer {
-
- private final KafkaConsumer _consumer;
- private Iterator> _recordIter;
-
- public NewConsumer(String topic, Properties consumerProperties) {
- _consumer = new KafkaConsumer<>(consumerProperties);
- _consumer.subscribe(Arrays.asList(topic));
- }
-
- @Override
- public BaseConsumerRecord receive() {
- if (_recordIter == null || !_recordIter.hasNext())
- _recordIter = _consumer.poll(Long.MAX_VALUE).iterator();
-
- ConsumerRecord record = _recordIter.next();
- return new BaseConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), record.value());
- }
-
- @Override
- public void close() {
- _consumer.close();
- }
-
-}
diff --git a/src/main/java/com/linkedin/kmf/consumer/OldConsumer.java b/src/main/java/com/linkedin/kmf/consumer/OldConsumer.java
deleted file mode 100644
index fdd842cc..00000000
--- a/src/main/java/com/linkedin/kmf/consumer/OldConsumer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
- * file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- */
-package com.linkedin.kmf.consumer;
-
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.message.MessageAndMetadata;
-import kafka.serializer.StringDecoder;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/*
- * Wrap around the old consumer from Apache Kafka and implement the #KMBaseConsumer interface
- */
-public class OldConsumer implements KMBaseConsumer {
-
- private final ConsumerConnector _connector;
- private final ConsumerIterator _iter;
-
- public OldConsumer(String topic, Properties consumerProperties) {
- _connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
- Map topicCountMap = new HashMap<>();
- topicCountMap.put(topic, 1);
- Map>> kafkaStreams = _connector.createMessageStreams(topicCountMap, new StringDecoder(null), new StringDecoder(null));
- _iter = kafkaStreams.get(topic).get(0).iterator();
- }
-
- @Override
- public BaseConsumerRecord receive() {
- if (!_iter.hasNext())
- return null;
- MessageAndMetadata record = _iter.next();
- return new BaseConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), record.message());
- }
-
- @Override
- public void close() {
- _connector.shutdown();
- }
-
-}
diff --git a/src/main/java/com/linkedin/kmf/partitioner/OldKMPartitioner.java b/src/main/java/com/linkedin/kmf/partitioner/OldKMPartitioner.java
deleted file mode 100644
index fa356eff..00000000
--- a/src/main/java/com/linkedin/kmf/partitioner/OldKMPartitioner.java
+++ /dev/null
@@ -1,17 +0,0 @@
-/**
- * Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
- * file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- */
-package com.linkedin.kmf.partitioner;
-
-public class OldKMPartitioner implements KMPartitioner {
-
- public int partition(String key, int partitionNum) {
- return Math.abs(key.hashCode()) % partitionNum;
- }
-}
diff --git a/src/main/java/com/linkedin/kmf/services/ConsumeService.java b/src/main/java/com/linkedin/kmf/services/ConsumeService.java
deleted file mode 100644
index e2d07f77..00000000
--- a/src/main/java/com/linkedin/kmf/services/ConsumeService.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
- * file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- */
-package com.linkedin.kmf.services;
-
-import com.linkedin.kmf.common.DefaultTopicSchema;
-import com.linkedin.kmf.common.Utils;
-import com.linkedin.kmf.consumer.BaseConsumerRecord;
-import com.linkedin.kmf.consumer.KMBaseConsumer;
-import com.linkedin.kmf.consumer.NewConsumer;
-import com.linkedin.kmf.consumer.OldConsumer;
-import com.linkedin.kmf.services.configs.ConsumeServiceConfig;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Percentile;
-import org.apache.kafka.common.metrics.stats.Percentiles;
-import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Total;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.SystemTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsumeService implements Service {
- private static final Logger LOG = LoggerFactory.getLogger(ConsumeService.class);
- private static final String METRIC_GROUP_NAME = "consume-service";
- private static final String[] NONOVERRIDABLE_PROPERTIES =
- new String[] {ConsumeServiceConfig.BOOTSTRAP_SERVERS_CONFIG,
- ConsumeServiceConfig.ZOOKEEPER_CONNECT_CONFIG};
-
- private final String _name;
- private final ConsumeMetrics _sensors;
- private final KMBaseConsumer _consumer;
- private final Thread _thread;
- private final int _latencyPercentileMaxMs;
- private final int _latencyPercentileGranularityMs;
- private final AtomicBoolean _running;
- private final int _latencySlaMs;
-
- public ConsumeService(Map props, String name) throws Exception {
- _name = name;
- Map consumerPropsOverride = props.containsKey(ConsumeServiceConfig.CONSUMER_PROPS_CONFIG)
- ? (Map) props.get(ConsumeServiceConfig.CONSUMER_PROPS_CONFIG) : new HashMap<>();
- ConsumeServiceConfig config = new ConsumeServiceConfig(props);
- String topic = config.getString(ConsumeServiceConfig.TOPIC_CONFIG);
- String zkConnect = config.getString(ConsumeServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
- String brokerList = config.getString(ConsumeServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
- String consumerClassName = config.getString(ConsumeServiceConfig.CONSUMER_CLASS_CONFIG);
- _latencySlaMs = config.getInt(ConsumeServiceConfig.LATENCY_SLA_MS_CONFIG);
- _latencyPercentileMaxMs = config.getInt(ConsumeServiceConfig.LATENCY_PERCENTILE_MAX_MS_CONFIG);
- _latencyPercentileGranularityMs = config.getInt(ConsumeServiceConfig.LATENCY_PERCENTILE_GRANULARITY_MS_CONFIG);
- _running = new AtomicBoolean(false);
-
- for (String property: NONOVERRIDABLE_PROPERTIES) {
- if (consumerPropsOverride.containsKey(property)) {
- throw new ConfigException("Override must not contain " + property + " config.");
- }
- }
-
- Properties consumerProps = new Properties();
-
- // Assign default config. This has the lowest priority.
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "kmf-consumer");
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kmf-consumer-group-" + new Random().nextInt());
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-
- if (consumerClassName.equals(NewConsumer.class.getCanonicalName()) || consumerClassName.equals(NewConsumer.class.getSimpleName())) {
- consumerClassName = NewConsumer.class.getCanonicalName();
- } else if (consumerClassName.equals(OldConsumer.class.getCanonicalName()) || consumerClassName.equals(OldConsumer.class.getSimpleName())) {
- consumerClassName = OldConsumer.class.getCanonicalName();
- // The name/value of these configs are changed in the new consumer.
- consumerProps.put("auto.commit.enable", "false");
- consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "largest");
- }
-
- // Assign config specified for ConsumeService.
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
- consumerProps.put("zookeeper.connect", zkConnect);
-
- // Assign config specified for consumer. This has the highest priority.
- consumerProps.putAll(consumerPropsOverride);
-
- _consumer = (KMBaseConsumer) Class.forName(consumerClassName).getConstructor(String.class, Properties.class).newInstance(topic, consumerProps);
-
- _thread = new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- consume();
- } catch (Exception e) {
- LOG.error(_name + "/ConsumeService failed", e);
- }
- }
- }, _name + " consume-service");
- _thread.setDaemon(true);
-
- MetricConfig metricConfig = new MetricConfig().samples(60).timeWindow(1000, TimeUnit.MILLISECONDS);
- List reporters = new ArrayList<>();
- reporters.add(new JmxReporter(JMX_PREFIX));
- Metrics metrics = new Metrics(metricConfig, reporters, new SystemTime());
- Map tags = new HashMap<>();
- tags.put("name", _name);
- _sensors = new ConsumeMetrics(metrics, tags);
- }
-
- private void consume() throws Exception {
- // Delay 1 second to reduce the chance that consumer creates topic before TopicManagementService
- Thread.sleep(1000);
-
- Map nextIndexes = new HashMap<>();
-
- while (_running.get()) {
- BaseConsumerRecord record;
- try {
- record = _consumer.receive();
- } catch (Exception e) {
- _sensors._consumeError.record();
- LOG.warn(_name + "/ConsumeService failed to receive record", e);
- // Avoid busy while loop
- Thread.sleep(100);
- continue;
- }
-
- if (record == null)
- continue;
-
- GenericRecord avroRecord = Utils.genericRecordFromJson(record.value());
- if (avroRecord == null) {
- _sensors._consumeError.record();
- continue;
- }
- int partition = record.partition();
- long index = (Long) avroRecord.get(DefaultTopicSchema.INDEX_FIELD.name());
- long currMs = System.currentTimeMillis();
- long prevMs = (Long) avroRecord.get(DefaultTopicSchema.TIME_FIELD.name());
- _sensors._recordsConsumed.record();
- _sensors._bytesConsumed.record(record.value().length());
- _sensors._recordsDelay.record(currMs - prevMs);
-
- if (currMs - prevMs > _latencySlaMs)
- _sensors._recordsDelayed.record();
-
- if (index == -1L || !nextIndexes.containsKey(partition)) {
- nextIndexes.put(partition, -1L);
- continue;
- }
-
- long nextIndex = nextIndexes.get(partition);
- if (nextIndex == -1 || index == nextIndex) {
- nextIndexes.put(partition, index + 1);
- } else if (index < nextIndex) {
- _sensors._recordsDuplicated.record();
- } else if (index > nextIndex) {
- nextIndexes.put(partition, index + 1);
- _sensors._recordsLost.record(index - nextIndex);
- }
- }
- }
-
- @Override
- public synchronized void start() {
- if (_running.compareAndSet(false, true)) {
- _thread.start();
- LOG.info("{}/ConsumeService started", _name);
- }
- }
-
- @Override
- public synchronized void stop() {
- if (_running.compareAndSet(true, false)) {
- try {
- _consumer.close();
- } catch (Exception e) {
- LOG.warn(_name + "/ConsumeService while trying to close consumer.", e);
- }
- LOG.info("{}/ConsumeService stopped", _name);
- }
- }
-
- @Override
- public void awaitShutdown() {
- LOG.info("{}/ConsumeService shutdown completed", _name);
- }
-
- @Override
- public boolean isRunning() {
- return _running.get() && _thread.isAlive();
- }
-
- private class ConsumeMetrics {
- public final Metrics metrics;
- private final Sensor _bytesConsumed;
- private final Sensor _consumeError;
- private final Sensor _recordsConsumed;
- private final Sensor _recordsDuplicated;
- private final Sensor _recordsLost;
- private final Sensor _recordsDelay;
- private final Sensor _recordsDelayed;
-
- public ConsumeMetrics(Metrics metrics, final Map tags) {
- this.metrics = metrics;
-
- _bytesConsumed = metrics.sensor("bytes-consumed");
- _bytesConsumed.add(new MetricName("bytes-consumed-rate", METRIC_GROUP_NAME, "The average number of bytes per second that are consumed", tags), new Rate());
-
- _consumeError = metrics.sensor("consume-error");
- _consumeError.add(new MetricName("consume-error-rate", METRIC_GROUP_NAME, "The average number of errors per second", tags), new Rate());
- _consumeError.add(new MetricName("consume-error-total", METRIC_GROUP_NAME, "The total number of errors", tags), new Total());
-
- _recordsConsumed = metrics.sensor("records-consumed");
- _recordsConsumed.add(new MetricName("records-consumed-rate", METRIC_GROUP_NAME, "The average number of records per second that are consumed", tags), new Rate());
- _recordsConsumed.add(new MetricName("records-consumed-total", METRIC_GROUP_NAME, "The total number of records that are consumed", tags), new Total());
-
- _recordsDuplicated = metrics.sensor("records-duplicated");
- _recordsDuplicated.add(new MetricName("records-duplicated-rate", METRIC_GROUP_NAME, "The average number of records per second that are duplicated", tags), new Rate());
- _recordsDuplicated.add(new MetricName("records-duplicated-total", METRIC_GROUP_NAME, "The total number of records that are duplicated", tags), new Total());
-
- _recordsLost = metrics.sensor("records-lost");
- _recordsLost.add(new MetricName("records-lost-rate", METRIC_GROUP_NAME, "The average number of records per second that are lost", tags), new Rate());
- _recordsLost.add(new MetricName("records-lost-total", METRIC_GROUP_NAME, "The total number of records that are lost", tags), new Total());
-
- _recordsDelayed = metrics.sensor("records-delayed");
- _recordsDelayed.add(new MetricName("records-delayed-rate", METRIC_GROUP_NAME, "The average number of records per second that are either lost or arrive after maximum allowed latency under SLA", tags), new Rate());
- _recordsDelayed.add(new MetricName("records-delayed-total", METRIC_GROUP_NAME, "The total number of records that are either lost or arrive after maximum allowed latency under SLA", tags), new Total());
-
- _recordsDelay = metrics.sensor("records-delay");
- _recordsDelay.add(new MetricName("records-delay-ms-avg", METRIC_GROUP_NAME, "The average latency of records from producer to consumer", tags), new Avg());
- _recordsDelay.add(new MetricName("records-delay-ms-max", METRIC_GROUP_NAME, "The maximum latency of records from producer to consumer", tags), new Max());
-
- // There are 2 extra buckets use for values smaller than 0.0 or larger than max, respectively.
- int bucketNum = _latencyPercentileMaxMs / _latencyPercentileGranularityMs + 2;
- int sizeInBytes = 4 * bucketNum;
- _recordsDelay.add(new Percentiles(sizeInBytes, _latencyPercentileMaxMs, Percentiles.BucketSizing.CONSTANT,
- new Percentile(new MetricName("records-delay-ms-99th", METRIC_GROUP_NAME, "The 99th percentile latency of records from producer to consumer", tags), 99.0),
- new Percentile(new MetricName("records-delay-ms-999th", METRIC_GROUP_NAME, "The 999th percentile latency of records from producer to consumer", tags), 99.9)));
-
- metrics.addMetric(new MetricName("consume-availability-avg", METRIC_GROUP_NAME, "The average consume availability", tags),
- new Measurable() {
- @Override
- public double measure(MetricConfig config, long now) {
- double recordsConsumedRate = _sensors.metrics.metrics().get(new MetricName("records-consumed-rate", METRIC_GROUP_NAME, tags)).value();
- double recordsLostRate = _sensors.metrics.metrics().get(new MetricName("records-lost-rate", METRIC_GROUP_NAME, tags)).value();
- double recordsDelayedRate = _sensors.metrics.metrics().get(new MetricName("records-delayed-rate", METRIC_GROUP_NAME, tags)).value();
-
- if (new Double(recordsLostRate).isNaN())
- recordsLostRate = 0;
- if (new Double(recordsDelayedRate).isNaN())
- recordsDelayedRate = 0;
-
- double consumeAvailability = recordsConsumedRate + recordsLostRate > 0
- ? (recordsConsumedRate - recordsDelayedRate) / (recordsConsumedRate + recordsLostRate) : 0;
-
- return consumeAvailability;
- }
- }
- );
- }
-
- }
-
-}
\ No newline at end of file
diff --git a/src/main/java/com/linkedin/kmf/services/JettyService.java b/src/main/java/com/linkedin/kmf/services/JettyService.java
deleted file mode 100644
index 42b74ae5..00000000
--- a/src/main/java/com/linkedin/kmf/services/JettyService.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
- * file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- */
-package com.linkedin.kmf.services;
-
-import com.linkedin.kmf.services.configs.JettyServiceConfig;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.handler.ResourceHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-// Jetty server that serves html files.
-public class JettyService implements Service {
- private static final Logger LOG = LoggerFactory.getLogger(JettyService.class);
-
- private final String _name;
- private final Server _jettyServer;
- private final int _port;
-
- public JettyService(Map props, String name) {
- _name = name;
- JettyServiceConfig config = new JettyServiceConfig(props);
- _port = config.getInt(JettyServiceConfig.PORT_CONFIG);
- _jettyServer = new Server(_port);
- ResourceHandler resourceHandler = new ResourceHandler();
- resourceHandler.setDirectoriesListed(true);
- resourceHandler.setWelcomeFiles(new String[]{"index.html"});
- resourceHandler.setResourceBase("webapp");
- _jettyServer.setHandler(resourceHandler);
- }
-
- public synchronized void start() {
- try {
- _jettyServer.start();
- LOG.info("{}/JettyService started at port {}", _name, _port);
- } catch (Exception e) {
- LOG.error(_name + "/JettyService failed to start", e);
- }
- }
-
- public synchronized void stop() {
- try {
- _jettyServer.stop();
- LOG.info("{}/JettyService stopped", _name);
- } catch (Exception e) {
- LOG.error(_name + "/JettyService failed to stop", e);
- }
- }
-
- public boolean isRunning() {
- return _jettyServer.isRunning();
- }
-
- public void awaitShutdown() {
-
- }
-
-}
diff --git a/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java b/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java
deleted file mode 100644
index d869d41c..00000000
--- a/src/main/java/com/linkedin/kmf/services/MultiClusterTopicManagementService.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Copyright 2016 LinkedIn Corp. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this
- * file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- */
-
-package com.linkedin.kmf.services;
-
-import com.linkedin.kmf.common.Utils;
-import com.linkedin.kmf.services.configs.CommonServiceConfig;
-import com.linkedin.kmf.services.configs.MultiClusterTopicManagementServiceConfig;
-import com.linkedin.kmf.services.configs.TopicManagementServiceConfig;
-import com.linkedin.kmf.topicfactory.TopicFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import kafka.admin.AdminOperationException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import kafka.admin.AdminUtils;
-import kafka.admin.BrokerMetadata;
-import kafka.admin.PreferredReplicaLeaderElectionCommand;
-import kafka.admin.RackAwareMode;
-import kafka.cluster.Broker;
-import kafka.common.TopicAndPartition;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.security.JaasUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
-
-import static com.linkedin.kmf.common.Utils.ZK_CONNECTION_TIMEOUT_MS;
-import static com.linkedin.kmf.common.Utils.ZK_SESSION_TIMEOUT_MS;
-
-/**
- * This service periodically checks and rebalances the monitor topics across a pipeline of Kafka clusters so that
- * leadership of the partitions of the monitor topic in each cluster is distributed evenly across brokers in the cluster.
- *
- * More specifically, this service may do some or all of the following tasks depending on the config:
- *
- * - Create the monitor topic using the user-specified replication factor and partition number
- * - Increase partition number of the monitor topic if either partitionsToBrokersRatio or minPartitionNum is not satisfied
- * - Increase replication factor of the monitor topic if the user-specified replicationFactor is not satisfied
- * - Reassign partition across brokers to make sure each broker acts as preferred leader of at least one partition of the monitor topic
- * - Trigger preferred leader election to make sure each broker acts as leader of at least one partition of the monitor topic.
- * - Make sure the number of partitions of the monitor topic is same across all monitored custers.
- *
- */
-public class MultiClusterTopicManagementService implements Service {
- private static final Logger LOG = LoggerFactory.getLogger(MultiClusterTopicManagementService.class);
-
- private final AtomicBoolean _isRunning = new AtomicBoolean(false);
- private final String _serviceName;
- private final Map _topicManagementByCluster;
- private final int _scheduleIntervalMs;
- private final ScheduledExecutorService _executor;
-
- public MultiClusterTopicManagementService(Map props, String serviceName) throws Exception {
- _serviceName = serviceName;
- MultiClusterTopicManagementServiceConfig config = new MultiClusterTopicManagementServiceConfig(props);
- String topic = config.getString(CommonServiceConfig.TOPIC_CONFIG);
- Map propsByCluster = props.containsKey(MultiClusterTopicManagementServiceConfig.PROPS_PER_CLUSTER_CONFIG)
- ? (Map) props.get(MultiClusterTopicManagementServiceConfig.PROPS_PER_CLUSTER_CONFIG) : new HashMap<>();
- _topicManagementByCluster = initializeTopicManagementHelper(propsByCluster, topic);
- _scheduleIntervalMs = config.getInt(MultiClusterTopicManagementServiceConfig.REBALANCE_INTERVAL_MS_CONFIG);
- _executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, _serviceName + "-multi-cluster-topic-management-service");
- }
- });
- }
-
- private Map initializeTopicManagementHelper(Map propsByCluster, String topic) throws Exception {
- Map topicManagementByCluster = new HashMap<>();
- for (Map.Entry entry: propsByCluster.entrySet()) {
- String clusterName = entry.getKey();
- Map serviceProps = entry.getValue();
- if (serviceProps.containsKey(MultiClusterTopicManagementServiceConfig.TOPIC_CONFIG))
- throw new ConfigException("The raw per-cluster config for MultiClusterTopicManagementService must not contain " +
- MultiClusterTopicManagementServiceConfig.TOPIC_CONFIG);
- serviceProps.put(MultiClusterTopicManagementServiceConfig.TOPIC_CONFIG, topic);
- topicManagementByCluster.put(clusterName, new TopicManagementHelper(serviceProps));
- }
- return topicManagementByCluster;
- }
-
- @Override
- public synchronized void start() {
- if (_isRunning.compareAndSet(false, true)) {
- Runnable r = new TopicManagementRunnable();
- _executor.scheduleWithFixedDelay(r, 0, _scheduleIntervalMs, TimeUnit.MILLISECONDS);
- LOG.info("{}/MultiClusterTopicManagementService started.", _serviceName);
- }
- }
-
- @Override
- public synchronized void stop() {
- if (_isRunning.compareAndSet(true, false)) {
- _executor.shutdown();
- LOG.info("{}/MultiClusterTopicManagementService stopped.", _serviceName);
- }
- }
-
- @Override
- public boolean isRunning() {
- return _isRunning.get() && !_executor.isShutdown();
- }
-
- @Override
- public void awaitShutdown() {
- try {
- _executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.info("Thread interrupted when waiting for {}/MultiClusterTopicManagementService to shutdown", _serviceName);
- }
- LOG.info("{}/MultiClusterTopicManagementService shutdown completed", _serviceName);
- }
-
- private class TopicManagementRunnable implements Runnable {
- @Override
- public void run() {
- try {
- for (TopicManagementHelper helper : _topicManagementByCluster.values()) {
- helper.maybeCreateTopic();
- }
-
- /*
- * The partition number of the monitor topics should be the minimum partition number that satisifies the following conditions:
- * - partition number of the monitor topics across all monitored clusters should be the same
- * - partitionNum / brokerNum >= user-configured partitionsToBrokersRatio.
- * - partitionNum >= user-configured minPartitionNum
- */
-
- int minPartitionNum = 0;
- for (TopicManagementHelper helper : _topicManagementByCluster.values()) {
- minPartitionNum = Math.max(minPartitionNum, helper.minPartitionNum());
- }
- for (TopicManagementHelper helper : _topicManagementByCluster.values()) {
- helper.maybeAddPartitions(minPartitionNum);
- }
-
- for (Map.Entry entry : _topicManagementByCluster.entrySet()) {
- String clusterName = entry.getKey();
- TopicManagementHelper helper = entry.getValue();
- try {
- helper.maybeReassignPartitionAndElectLeader();
- } catch (IOException | ZkNodeExistsException | AdminOperationException e) {
- LOG.warn(_serviceName + "/MultiClusterTopicManagementService will retry later in cluster " + clusterName, e);
- }
- }
- } catch (Exception e) {
- LOG.error(_serviceName + "/MultiClusterTopicManagementService will stop due to error.", e);
- stop();
- }
- }
- }
-
- static class TopicManagementHelper {
-
- private final boolean _topicCreationEnabled;
- private final String _topic;
- private final String _zkConnect;
- private final int _replicationFactor;
- private final double _minPartitionsToBrokersRatio;
- private final int _minPartitionNum;
- private final TopicFactory _topicFactory;
- private final Properties _topicProperties;
-
- TopicManagementHelper(Map props) throws Exception {
- TopicManagementServiceConfig config = new TopicManagementServiceConfig(props);
- _topicCreationEnabled = config.getBoolean(TopicManagementServiceConfig.TOPIC_CREATION_ENABLED_CONFIG);
- _topic = config.getString(TopicManagementServiceConfig.TOPIC_CONFIG);
- _zkConnect = config.getString(TopicManagementServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
- _replicationFactor = config.getInt(TopicManagementServiceConfig.TOPIC_REPLICATION_FACTOR_CONFIG);
- _minPartitionsToBrokersRatio = config.getDouble(TopicManagementServiceConfig.PARTITIONS_TO_BROKERS_RATIO_CONFIG);
- _minPartitionNum = config.getInt(TopicManagementServiceConfig.MIN_PARTITION_NUM_CONFIG);
- String topicFactoryClassName = config.getString(TopicManagementServiceConfig.TOPIC_FACTORY_CLASS_CONFIG);
- _topicProperties = new Properties();
- if (props.containsKey(TopicManagementServiceConfig.TOPIC_PROPS_CONFIG))
- _topicProperties.putAll((Map) props.get(TopicManagementServiceConfig.TOPIC_PROPS_CONFIG));
-
- Map topicFactoryConfig = props.containsKey(TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) ?
- (Map) props.get(TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) : new HashMap();
- _topicFactory = (TopicFactory) Class.forName(topicFactoryClassName).getConstructor(Map.class).newInstance(topicFactoryConfig);
- }
-
- void maybeCreateTopic() throws Exception {
- if (_topicCreationEnabled) {
- _topicFactory.createTopicIfNotExist(_zkConnect, _topic, _replicationFactor, _minPartitionsToBrokersRatio, _topicProperties);
- }
- }
-
- int minPartitionNum() {
- int brokerCount = Utils.getBrokerCount(_zkConnect);
- return Math.max((int) Math.ceil(_minPartitionsToBrokersRatio * brokerCount), _minPartitionNum);
- }
-
- void maybeAddPartitions(int minPartitionNum) {
- ZkUtils zkUtils = ZkUtils.apply(_zkConnect, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, JaasUtils.isZkSecurityEnabled());
- try {
- int partitionNum = getPartitionInfo(zkUtils, _topic).size();
- if (partitionNum < minPartitionNum) {
- LOG.info("MultiClusterTopicManagementService will increase partition of the topic {} "
- + "in cluster {} from {} to {}.", _topic, _zkConnect, partitionNum, minPartitionNum);
- AdminUtils.addPartitions(zkUtils, _topic, minPartitionNum, null, false, RackAwareMode.Enforced$.MODULE$);
- }
- } finally {
- zkUtils.close();
- }
- }
-
- void maybeReassignPartitionAndElectLeader() throws Exception {
- ZkUtils zkUtils = ZkUtils.apply(_zkConnect, ZK_SESSION_TIMEOUT_MS, ZK_CONNECTION_TIMEOUT_MS, JaasUtils.isZkSecurityEnabled());
-
- try {
- List partitionInfoList = getPartitionInfo(zkUtils, _topic);
- Collection brokers = scala.collection.JavaConversions.asJavaCollection(zkUtils.getAllBrokersInCluster());
-
- if (partitionInfoList.size() == 0)
- throw new IllegalStateException("Topic " + _topic + " does not exist in cluster " + _zkConnect);
-
- int currentReplicationFactor = getReplicationFactor(partitionInfoList);
-
- if (_replicationFactor < currentReplicationFactor)
- throw new RuntimeException(String.format("Configured replication factor %d "
- + "is smaller than the current replication factor %d of the topic %s in cluster %s",
- _replicationFactor, currentReplicationFactor, _topic, _zkConnect));
-
- if (_replicationFactor > currentReplicationFactor && zkUtils.getPartitionsBeingReassigned().isEmpty()) {
- LOG.info("MultiClusterTopicManagementService will increase the replication factor of the topic {} in cluster {}", _topic, _zkConnect);
- reassignPartitions(zkUtils, brokers, _topic, partitionInfoList.size(), _replicationFactor);
- }
-
- if (partitionInfoList.size() >= brokers.size() &&
- someBrokerNotPreferredLeader(partitionInfoList, brokers) &&
- zkUtils.getPartitionsBeingReassigned().isEmpty()) {
- LOG.info("MultiClusterTopicManagementService will reassign partitions of the topic {} in cluster {}", _topic, _zkConnect);
- reassignPartitions(zkUtils, brokers, _topic, partitionInfoList.size(), _replicationFactor);
- }
-
- if (partitionInfoList.size() >= brokers.size() &&
- someBrokerNotElectedLeader(partitionInfoList, brokers)) {
- LOG.info("MultiClusterTopicManagementService will trigger preferred leader election for the topic {} in cluster {}", _topic, _zkConnect);
- triggerPreferredLeaderElection(zkUtils, partitionInfoList);
- }
- } finally {
- zkUtils.close();
- }
- }
-
- private static void triggerPreferredLeaderElection(ZkUtils zkUtils, List partitionInfoList) {
- scala.collection.mutable.HashSet scalaPartitionInfoSet = new scala.collection.mutable.HashSet<>();
- for (PartitionInfo javaPartitionInfo : partitionInfoList) {
- scalaPartitionInfoSet.add(new TopicAndPartition(javaPartitionInfo.topic(), javaPartitionInfo.partition()));
- }
- PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, scalaPartitionInfoSet);
- }
-
- private static void reassignPartitions(ZkUtils zkUtils, Collection brokers, String topic, int partitionCount, int replicationFactor) {
- scala.collection.mutable.ArrayBuffer brokersMetadata = new scala.collection.mutable.ArrayBuffer<>(brokers.size());
- for (Broker broker : brokers) {
- brokersMetadata.$plus$eq(new BrokerMetadata(broker.id(), broker.rack()));
- }
- scala.collection.Map