Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ coverage
.nyc_output/
*lcov.info
**/lcov-report
examples/performance/*.json
*.log
2 changes: 1 addition & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ blocks:
- export NODE_OPTIONS='--max-old-space-size=1536'
- cd examples/performance
- npm install
- bash -c '../../ci/tests/run_perf_test.sh'
- node '../../ci/tests/run_perf_test.js'
- rm -rf ./node_modules

- name: "Linux amd64: Release"
Expand Down
360 changes: 360 additions & 0 deletions ci/tests/run_perf_test.js

Large diffs are not rendered by default.

64 changes: 0 additions & 64 deletions ci/tests/run_perf_test.sh

This file was deleted.

167 changes: 151 additions & 16 deletions examples/performance/performance-consolidated.js
Original file line number Diff line number Diff line change
@@ -1,84 +1,214 @@
const fs = require('fs');
const mode = process.env.MODE ? process.env.MODE : 'confluent';

let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether;
if (mode === 'confluent') {
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether } = require('./performance-primitives'));
({ runProducer, runConsumer, runConsumeTransformProduce,
runCreateTopics, runLagMonitoring,
runProducerConsumerTogether } = require('./performance-primitives'));
} else {
({ runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether } = require('./performance-primitives-kafkajs'));
({ runProducer, runConsumer, runConsumeTransformProduce, runProducerConsumerTogether } = require('./performance-primitives-kafkajs'));
/* createTopics is more reliable in CKJS */
({ runCreateTopics, runLagMonitoring } = require('./performance-primitives'));
}

const brokers = process.env.KAFKA_BROKERS || 'localhost:9092';
const securityProtocol = process.env.SECURITY_PROTOCOL;
const saslUsername = process.env.SASL_USERNAME;
const saslPassword = process.env.SASL_PASSWORD;
const topic = process.env.KAFKA_TOPIC || 'test-topic';
const topic2 = process.env.KAFKA_TOPIC2 || 'test-topic2';
const messageCount = process.env.MESSAGE_COUNT ? +process.env.MESSAGE_COUNT : 1000000;
const messageSize = process.env.MESSAGE_SIZE ? +process.env.MESSAGE_SIZE : 256;
const batchSize = process.env.BATCH_SIZE ? +process.env.BATCH_SIZE : 100;
const compression = process.env.COMPRESSION || 'None';
// Between 0 and 1, percentage of random bytes in each message
const randomness = process.env.RANDOMNESS ? +process.env.RANDOMNESS : 0.5;
const numPartitions = process.env.PARTITIONS ? +process.env.PARTITIONS : 3;
const partitionsConsumedConcurrently = process.env.PARTITIONS_CONSUMED_CONCURRENTLY ? +process.env.PARTITIONS_CONSUMED_CONCURRENTLY : 1;
const warmupMessages = process.env.WARMUP_MESSAGES ? +process.env.WARMUP_MESSAGES : (batchSize * 10);
const messageProcessTimeMs = process.env.MESSAGE_PROCESS_TIME_MS ? +process.env.MESSAGE_PROCESS_TIME_MS : 5;
const ctpConcurrency = process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY ? +process.env.CONSUME_TRANSFORM_PRODUCE_CONCURRENCY : 1;
const consumerProcessingTime = process.env.CONSUMER_PROCESSING_TIME ? +process.env.CONSUMER_PROCESSING_TIME : 100;
const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.env.PRODUCER_PROCESSING_TIME : 100;
const limitRPS = process.env.LIMIT_RPS ? +process.env.LIMIT_RPS : null;
const parameters = {
brokers,
securityProtocol,
saslUsername,
saslPassword,
}

function logParameters(parameters) {
console.log(` Brokers: ${parameters.brokers}`);
if (parameters.securityProtocol && parameters.saslUsername && parameters.saslPassword) {
console.log(` Security Protocol: ${parameters.securityProtocol}`);
console.log(` SASL Username: ${parameters.saslUsername ? parameters.saslUsername : 'not set'}`);
console.log(` SASL Password: ${parameters.saslPassword ? '******' : 'not set'}`);
} else {
console.log(" No security protocol configured");
}
}

(async function () {
const producer = process.argv.includes('--producer');
const consumer = process.argv.includes('--consumer');
const consumerEachMessage = process.argv.includes('--consumer-each-message');
const consumerEachBatch = process.argv.includes('--consumer-each-batch');
const produceToSecondTopic = process.argv.includes('--produce-to-second-topic');
const ctp = process.argv.includes('--ctp');
const produceConsumeLatency = process.argv.includes('--latency');
const all = process.argv.includes('--all');
const createTopics = process.argv.includes('--create-topics');
const monitorLag = process.argv.includes('--monitor-lag');
let maxAverageRSSKB, maxMaxRSSKB;
const stats = {};

let measures = [];
let interval;
const startTrackingMemory = () => {
interval = setInterval(() => {
measures.push({ rss: process.memoryUsage().rss,
timestamp: Date.now() });
}, 100);
};

const datapointToJSON = (m) =>
({ rss: m.rss.toString(), timestamp: m.timestamp.toString() });

const endTrackingMemory = (name, fileName) => {
clearInterval(interval);
interval = null;
const averageRSS = measures.reduce((sum, m) => sum + m.rss, 0) / measures.length;
const averageRSSKB = averageRSS / 1024;
maxAverageRSSKB = !maxAverageRSSKB ? averageRSSKB : Math.max(averageRSSKB, maxAverageRSSKB);
console.log(`=== Average ${name} RSS KB: ${averageRSSKB}`);
const max = measures.reduce((prev, current) => (prev.rss > current.rss) ? prev : current);
const maxRSSKB = max.rss / 1024;
maxMaxRSSKB = !maxMaxRSSKB ? maxRSSKB : Math.max(maxRSSKB, maxMaxRSSKB);
console.log(`=== Max ${name} RSS KB: ${maxRSSKB}`);
if (fileName) {
const measuresJSON = JSON.stringify({
measures: measures.map(datapointToJSON),
averageRSS: averageRSS.toString(),
maxRSS: datapointToJSON(max)
}, null, 2);
fs.writeFileSync(fileName, measuresJSON);
}
measures = [];
}

console.log(`=== Starting Performance Tests - Mode ${mode} ===`);

if (createTopics || all) {
console.log("=== Creating Topics (deleting if they exist already):");
console.log(` Brokers: ${brokers}`);
logParameters(parameters);
console.log(` Topic: ${topic}`);
console.log(` Topic2: ${topic2}`);
await runCreateTopics(brokers, topic, topic2);
console.log(` Partitions: ${numPartitions}`);
await runCreateTopics(parameters, topic, topic2, numPartitions);
}

if (monitorLag) {
console.log("=== Starting Lag Monitoring:");
logParameters(parameters);
console.log(` Topic: ${topic}`);
const {
averageLag,
maxLag,
totalMeasurements
} = await runLagMonitoring(parameters, topic);
const monitoredGroupId = process.env.GROUPID_MONITOR;
console.log(`=== Average broker lag (${monitoredGroupId}): `, averageLag);
console.log(`=== Max broker lag (${monitoredGroupId}): `, maxLag);
console.log(`=== Total broker lag measurements (${monitoredGroupId}): `, totalMeasurements);
}

if (producer || all) {
console.log("=== Running Basic Producer Performance Test:")
console.log(` Brokers: ${brokers}`);
logParameters(parameters);
console.log(` Topic: ${topic}`);
console.log(` Message Count: ${messageCount}`);
console.log(` Message Size: ${messageSize}`);
console.log(` Batch Size: ${batchSize}`);
console.log(` Compression: ${compression}`);
console.log(` Limit RPS: ${limitRPS}`);
console.log(` Warmup Messages: ${warmupMessages}`);
const producerRate = await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
startTrackingMemory();
const producerRate = await runProducer(parameters, topic, batchSize,
warmupMessages, messageCount, messageSize, compression,
randomness, limitRPS);
endTrackingMemory('producer', `producer-memory-${mode}.json`);
console.log("=== Producer Rate: ", producerRate);
}

if (consumer || all) {
if (consumer || consumerEachMessage || all) {
// If user runs this without --producer then they are responsible for seeding the topic.
console.log("=== Running Basic Consumer Performance Test (eachMessage):")
logParameters(parameters);
console.log(` Topic: ${topic}`);
console.log(` Message Count: ${messageCount}`);
console.log(` Partitions consumed concurrently: ${partitionsConsumedConcurrently}`);
startTrackingMemory();
const consumerRate = await runConsumer(parameters, topic,
warmupMessages, messageCount,
false, partitionsConsumedConcurrently, stats,
produceToSecondTopic ? topic2 : null, compression);
endTrackingMemory('consumer-each-message', `consumer-memory-message-${mode}.json`);
console.log("=== Consumer Rate MB/s (eachMessage): ", consumerRate);
console.log("=== Consumer Rate msg/s (eachMessage): ", stats.messageRate);
console.log("=== Consumer average E2E latency (eachMessage): ", stats.avgLatency);
console.log("=== Consumer max E2E latency (eachMessage): ", stats.maxLatency);
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
}

if (consumer || consumerEachBatch || all) {
// If user runs this without --producer then they are responsible for seeding the topic.
console.log("=== Running Basic Consumer Performance Test:")
console.log(` Brokers: ${brokers}`);
console.log("=== Running Basic Consumer Performance Test (eachBatch):")
logParameters(parameters);
console.log(` Topic: ${topic}`);
console.log(` Message Count: ${messageCount}`);
const consumerRate = await runConsumer(brokers, topic, messageCount);
console.log("=== Consumer Rate: ", consumerRate);
console.log(` Partitions consumed concurrently: ${partitionsConsumedConcurrently}`);
startTrackingMemory();
const consumerRate = await runConsumer(parameters, topic,
warmupMessages, messageCount,
true, partitionsConsumedConcurrently, stats,
produceToSecondTopic ? topic2 : null, compression);
endTrackingMemory('consumer-each-batch', `consumer-memory-batch-${mode}.json`);
console.log("=== Consumer Rate MB/s (eachBatch): ", consumerRate);
console.log("=== Consumer Rate msg/s (eachBatch): ", stats.messageRate);
console.log("=== Average eachBatch lag: ", stats.averageOffsetLag);
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
console.log("=== Average eachBatch size: ", stats.averageBatchSize);
console.log("=== Consumer average E2E latency (eachBatch): ", stats.avgLatency);
console.log("=== Consumer max E2E latency (eachBatch): ", stats.maxLatency);
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);
}

if (ctp || all) {
console.log("=== Running Consume-Transform-Produce Performance Test:")
console.log(` Brokers: ${brokers}`);
logParameters(parameters);
console.log(` ConsumeTopic: ${topic}`);
console.log(` ProduceTopic: ${topic2}`);
console.log(` Message Count: ${messageCount}`);
// Seed the topic with messages
await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
const ctpRate = await runConsumeTransformProduce(brokers, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency);
await runProducer(parameters, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
startTrackingMemory();
const ctpRate = await runConsumeTransformProduce(parameters, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency);
endTrackingMemory('consume-transform-produce', `consume-transform-produce-${mode}.json`);
console.log("=== Consume-Transform-Produce Rate: ", ctpRate);
}

if (produceConsumeLatency || all) {
console.log("=== Running Produce-To-Consume Latency Performance Test:")
console.log(` Brokers: ${brokers}`);
logParameters(parameters);
console.log(` Topic: ${topic}`);
console.log(` Message Count: ${messageCount}`);
console.log(` Consumer Processing Time: ${consumerProcessingTime}`);
console.log(` Producer Processing Time: ${producerProcessingTime}`);
const { mean, p50, p90, p95, outliers } = await runProducerConsumerTogether(brokers, topic, messageCount, messageSize, producerProcessingTime, consumerProcessingTime);
startTrackingMemory();
const { mean, p50, p90, p95, outliers } = await runProducerConsumerTogether(parameters, topic, messageCount, messageSize, producerProcessingTime, consumerProcessingTime);
endTrackingMemory('producer-consumer-together', `producer-consumer-together-${mode}.json`);
console.log(`=== Produce-To-Consume Latency (ms): Mean: ${mean}, P50: ${p50}, P90: ${p90}, P95: ${p95}`);

// The presence of outliers invalidates the mean measurement, and rasies concerns as to why there are any.
Expand All @@ -87,4 +217,9 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
console.log("=== Outliers (ms): ", outliers);
}
}

if (maxAverageRSSKB !== undefined && maxMaxRSSKB !== undefined) {
console.log(`=== Max Average RSS across tests: `, maxAverageRSSKB);
console.log(`=== Max RSS across tests: `, maxMaxRSSKB);
}
})();
Loading