Skip to content

Commit b8be59f

Browse files
committed
Performance test improvements, measure eachBatch rate, time and lag, avg and max memory usage
1 parent 890f9ec commit b8be59f

File tree

8 files changed

+399
-160
lines changed

8 files changed

+399
-160
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@ coverage
2121
.nyc_output/
2222
*lcov.info
2323
**/lcov-report
24+
examples/performance/*.json
25+
*.log

.semaphore/semaphore.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ blocks:
191191
- export NODE_OPTIONS='--max-old-space-size=1536'
192192
- cd examples/performance
193193
- npm install
194-
- bash -c '../../ci/tests/run_perf_test.sh'
194+
- node '../../ci/tests/run_perf_test.js'
195195
- rm -rf ./node_modules
196196

197197
- name: "Linux amd64: Release"

ci/tests/run_perf_test.js

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
#!/usr/bin/env node
2+
3+
const { execSync } = require('child_process');
4+
5+
function runCommand(command) {
6+
try {
7+
const output = execSync(command, { encoding: 'utf8', stdio: 'pipe' });
8+
console.log(output);
9+
return output;
10+
} catch (error) {
11+
const errorOutput = error.stdout || error.stderr || error.message;
12+
console.log(errorOutput);
13+
return errorOutput;
14+
}
15+
}
16+
17+
function extractValue(content, pattern) {
18+
try {
19+
const lines = content.split('\n');
20+
const matchingLine = lines.find(line => line.includes(pattern));
21+
if (matchingLine) {
22+
const value = matchingLine.split(':')[1]?.trim();
23+
return value || '';
24+
}
25+
return '';
26+
} catch (error) {
27+
return '';
28+
}
29+
}
30+
31+
function compareNumbers(a, b, threshold = 0.7) {
32+
const numA = parseFloat(a);
33+
const numB = parseFloat(b);
34+
if (isNaN(numA) || isNaN(numB)) return false;
35+
return numA < (numB * threshold);
36+
}
37+
38+
function isBelow(value, target) {
39+
const numValue = parseFloat(value);
40+
const numTarget = parseFloat(target);
41+
if (isNaN(numValue) || isNaN(numTarget)) return false;
42+
return numValue < numTarget;
43+
}
44+
45+
// Run performance tests and store outputs in memory
46+
console.log('Running Confluent Producer/Consumer test...');
47+
const outputConfluentProducerConsumer = runCommand('MODE=confluent MESSAGE_COUNT=50000 node performance-consolidated.js --create-topics --consumer --producer');
48+
49+
console.log('Running KafkaJS Producer/Consumer test...');
50+
const outputKjsProducerConsumer = runCommand('MODE=kafkajs MESSAGE_COUNT=50000 node performance-consolidated.js --create-topics --consumer --producer');
51+
52+
console.log('Running Confluent CTP test...');
53+
const outputConfluentCtp = runCommand('MODE=confluent MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp');
54+
55+
console.log('Running KafkaJS CTP test...');
56+
const outputKjsCtp = runCommand('MODE=kafkajs MESSAGE_COUNT=5000 node performance-consolidated.js --create-topics --ctp');
57+
58+
// Extract Confluent results
59+
const producerConfluent = extractValue(outputConfluentProducerConsumer, '=== Producer Rate:');
60+
const consumerConfluentMessage = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate (eachMessage):');
61+
const consumerConfluentTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachMessage):');
62+
const consumerConfluentBatch = extractValue(outputConfluentProducerConsumer, '=== Consumer Rate (eachBatch):');
63+
const consumerConfluentBatchTime = extractValue(outputConfluentProducerConsumer, '=== Consumption time (eachBatch):');
64+
const consumerConfluentBatchAverageLag = extractValue(outputConfluentProducerConsumer, '=== Average eachBatch lag:');
65+
const consumerConfluentBatchMaxLag = extractValue(outputConfluentProducerConsumer, '=== Max eachBatch lag:');
66+
const consumerConfluentAverageRSS = extractValue(outputConfluentProducerConsumer, '=== Max Average RSS across tests:');
67+
const consumerConfluentMaxRSS = extractValue(outputConfluentProducerConsumer, '=== Max RSS across tests:');
68+
const ctpConfluent = extractValue(outputConfluentCtp, '=== Consume-Transform-Produce Rate:');
69+
70+
// Extract KafkaJS results
71+
const producerKjs = extractValue(outputKjsProducerConsumer, '=== Producer Rate:');
72+
const consumerKjsMessage = extractValue(outputKjsProducerConsumer, '=== Consumer Rate (eachMessage):');
73+
const consumerKjsTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachMessage):');
74+
const consumerKjsBatch = extractValue(outputKjsProducerConsumer, '=== Consumer Rate (eachBatch):');
75+
const consumerKjsBatchTime = extractValue(outputKjsProducerConsumer, '=== Consumption time (eachBatch):');
76+
const consumerKjsBatchAverageLag = extractValue(outputKjsProducerConsumer, '=== Average eachBatch lag:');
77+
const consumerKjsBatchMaxLag = extractValue(outputKjsProducerConsumer, '=== Max eachBatch lag:');
78+
const consumerKjsAverageRSS = extractValue(outputKjsProducerConsumer, '=== Max Average RSS across tests:');
79+
const consumerKjsMaxRSS = extractValue(outputKjsProducerConsumer, '=== Max RSS across tests:');
80+
const ctpKjs = extractValue(outputKjsCtp, '=== Consume-Transform-Produce Rate:');
81+
82+
// Print results
83+
console.log(`Producer rates: confluent ${producerConfluent}, kafkajs ${producerKjs}`);
84+
console.log(`Consumer rates (eachMessage): confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`);
85+
console.log(`Consumption time (eachMessage): confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`);
86+
console.log(`Consumer rates (eachBatch): confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`);
87+
console.log(`Consumption time (eachBatch): confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`);
88+
console.log(`Average eachBatch lag: confluent ${consumerConfluentBatchAverageLag}, kafkajs ${consumerKjsBatchAverageLag}`);
89+
console.log(`Max eachBatch lag: confluent ${consumerConfluentBatchMaxLag}, kafkajs ${consumerKjsBatchMaxLag}`);
90+
console.log(`Average RSS: confluent ${consumerConfluentAverageRSS}, kafkajs ${consumerKjsAverageRSS}`);
91+
console.log(`Max RSS: confluent ${consumerConfluentMaxRSS}, kafkajs ${consumerKjsMaxRSS}`);
92+
console.log(`CTP rates: confluent ${ctpConfluent}, kafkajs ${ctpKjs}`);
93+
94+
let errcode = 0;
95+
const maxPerformanceDifference = 0.7;
96+
97+
// Compare against KJS (30% threshold)
98+
if (compareNumbers(producerConfluent, producerKjs, maxPerformanceDifference)) {
99+
console.log(`Producer rates differ by more than 30%: confluent ${producerConfluent}, kafkajs ${producerKjs}`);
100+
errcode = 1;
101+
}
102+
103+
if (compareNumbers(consumerConfluentMessage, consumerKjsMessage, maxPerformanceDifference)) {
104+
console.log(`Consumer rates (eachMessage) differ by more than 30%: confluent ${consumerConfluentMessage}, kafkajs ${consumerKjsMessage}`);
105+
// FIXME: improve consumer performance at least to KafkaJS level
106+
errcode = 0;
107+
}
108+
109+
// Lower is better for time
110+
if (compareNumbers(consumerKjsTime, consumerConfluentTime, maxPerformanceDifference)) {
111+
console.log(`Consumption time (eachMessage) differ by more than 30%: confluent ${consumerConfluentTime}, kafkajs ${consumerKjsTime}`);
112+
errcode = 0;
113+
}
114+
115+
if (compareNumbers(consumerConfluentBatch, consumerKjsBatch, maxPerformanceDifference)) {
116+
console.log(`Consumer rates (eachBatch) differ by more than 30%: confluent ${consumerConfluentBatch}, kafkajs ${consumerKjsBatch}`);
117+
errcode = 0;
118+
}
119+
120+
// Lower is better for time
121+
if (compareNumbers(consumerKjsBatchTime, consumerConfluentBatchTime, maxPerformanceDifference)) {
122+
console.log(`Consumption time (eachBatch) differ by more than 30%: confluent ${consumerConfluentBatchTime}, kafkajs ${consumerKjsBatchTime}`);
123+
errcode = 0;
124+
}
125+
126+
if (compareNumbers(ctpConfluent, ctpKjs, maxPerformanceDifference)) {
127+
console.log(`CTP rates differ by more than 30%: confluent ${ctpConfluent}, kafkajs ${ctpKjs}`);
128+
errcode = 1;
129+
}
130+
131+
// Compare against target numbers
132+
const TARGET_PRODUCE = process.env.TARGET_PRODUCE_PERFORMANCE || '35';
133+
const TARGET_CONSUME = process.env.TARGET_CONSUME_PERFORMANCE || '18';
134+
const TARGET_CTP = process.env.TARGET_CTP_PERFORMANCE || '0.02';
135+
136+
if (isBelow(producerConfluent, TARGET_PRODUCE)) {
137+
console.log(`Confluent producer rate is below target: ${producerConfluent}`);
138+
errcode = 1;
139+
}
140+
141+
if (isBelow(consumerConfluentMessage, TARGET_CONSUME)) {
142+
console.log(`Confluent consumer rate is below target: ${consumerConfluentMessage}`);
143+
errcode = 1;
144+
}
145+
146+
if (isBelow(ctpConfluent, TARGET_CTP)) {
147+
console.log(`Confluent CTP rate is below target: ${ctpConfluent}`);
148+
errcode = 1;
149+
}
150+
151+
process.exit(errcode);

ci/tests/run_perf_test.sh

Lines changed: 0 additions & 64 deletions
This file was deleted.

examples/performance/performance-consolidated.js

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
const fs = require('fs');
12
const mode = process.env.MODE ? process.env.MODE : 'confluent';
23

34
let runProducer, runConsumer, runConsumeTransformProduce, runCreateTopics, runProducerConsumerTogether;
@@ -27,6 +28,44 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
2728
const produceConsumeLatency = process.argv.includes('--latency');
2829
const all = process.argv.includes('--all');
2930
const createTopics = process.argv.includes('--create-topics');
31+
let maxAverageRSSKB, maxMaxRSSKB;
32+
const stats = {};
33+
34+
let measures = [];
35+
let interval;
36+
const startTrackingMemory = () => {
37+
interval = setInterval(() => {
38+
const rss = BigInt(process.memoryUsage().rss);
39+
measures.push({ rss, timestamp: Date.now() });
40+
}, 100);
41+
};
42+
43+
const datapointToJSON = (m) =>
44+
({ rss: m.rss.toString(), timestamp: m.timestamp.toString() });
45+
46+
const endTrackingMemory = (fileName) => {
47+
clearInterval(interval);
48+
interval = null;
49+
const averageRSS = measures.reduce((sum, m) => sum + m.rss, 0n) / BigInt(measures.length);
50+
const averageRSSKB = averageRSS / 1024n;
51+
maxAverageRSSKB = !maxAverageRSSKB || averageRSSKB > maxAverageRSSKB ? averageRSSKB : maxAverageRSSKB;
52+
console.log(`=== Average RSS: ${averageRSSKB} KB`);
53+
const max = measures.reduce((prev, current) => (prev.rss > current.rss) ? prev : current);
54+
const maxRSSKB = max.rss / 1024n;
55+
maxMaxRSSKB = !maxMaxRSSKB || maxRSSKB > maxMaxRSSKB ? maxRSSKB : maxMaxRSSKB;
56+
console.log(`=== Max RSS: ${maxRSSKB} KB at ${new Date(max.timestamp).toISOString()}`);
57+
if (fileName) {
58+
const measuresJSON = JSON.stringify({
59+
measures: measures.map(datapointToJSON),
60+
averageRSS: averageRSS.toString(),
61+
maxRSS: datapointToJSON(max)
62+
}, null, 2);
63+
fs.writeFileSync(fileName, measuresJSON);
64+
}
65+
measures = [];
66+
}
67+
68+
console.log(`=== Starting Performance Tests - Mode ${mode} ===`);
3069

3170
if (createTopics || all) {
3271
console.log("=== Creating Topics (deleting if they exist already):");
@@ -45,18 +84,38 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
4584
console.log(` Batch Size: ${batchSize}`);
4685
console.log(` Compression: ${compression}`);
4786
console.log(` Warmup Messages: ${warmupMessages}`);
87+
startTrackingMemory();
4888
const producerRate = await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
89+
endTrackingMemory(`producer-memory-${mode}.json`);
4990
console.log("=== Producer Rate: ", producerRate);
5091
}
5192

5293
if (consumer || all) {
5394
// If user runs this without --producer then they are responsible for seeding the topic.
54-
console.log("=== Running Basic Consumer Performance Test:")
95+
console.log("=== Running Basic Consumer Performance Test (eachMessage):")
5596
console.log(` Brokers: ${brokers}`);
5697
console.log(` Topic: ${topic}`);
5798
console.log(` Message Count: ${messageCount}`);
58-
const consumerRate = await runConsumer(brokers, topic, messageCount);
59-
console.log("=== Consumer Rate: ", consumerRate);
99+
startTrackingMemory();
100+
const consumerRate = await runConsumer(brokers, topic, warmupMessages, messageCount, false, stats);
101+
endTrackingMemory(`consumer-memory-message-${mode}.json`);
102+
console.log("=== Consumer Rate (eachMessage): ", consumerRate);
103+
console.log("=== Consumption time (eachMessage): ", stats.durationSeconds);
104+
}
105+
106+
if (consumer || all) {
107+
// If user runs this without --producer then they are responsible for seeding the topic.
108+
console.log("=== Running Basic Consumer Performance Test (eachBatch):")
109+
console.log(` Brokers: ${brokers}`);
110+
console.log(` Topic: ${topic}`);
111+
console.log(` Message Count: ${messageCount}`);
112+
startTrackingMemory();
113+
const consumerRate = await runConsumer(brokers, topic, warmupMessages, messageCount, true, stats);
114+
endTrackingMemory(`consumer-memory-batch-${mode}.json`);
115+
console.log("=== Consumer Rate (eachBatch): ", consumerRate);
116+
console.log("=== Average eachBatch lag: ", stats.averageOffsetLag);
117+
console.log("=== Max eachBatch lag: ", stats.maxOffsetLag);
118+
console.log("=== Consumption time (eachBatch): ", stats.durationSeconds);
60119
}
61120

62121
if (ctp || all) {
@@ -67,7 +126,9 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
67126
console.log(` Message Count: ${messageCount}`);
68127
// Seed the topic with messages
69128
await runProducer(brokers, topic, batchSize, warmupMessages, messageCount, messageSize, compression);
129+
startTrackingMemory();
70130
const ctpRate = await runConsumeTransformProduce(brokers, topic, topic2, warmupMessages, messageCount, messageProcessTimeMs, ctpConcurrency);
131+
endTrackingMemory(`consume-transform-produce-${mode}.json`);
71132
console.log("=== Consume-Transform-Produce Rate: ", ctpRate);
72133
}
73134

@@ -78,7 +139,9 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
78139
console.log(` Message Count: ${messageCount}`);
79140
console.log(` Consumer Processing Time: ${consumerProcessingTime}`);
80141
console.log(` Producer Processing Time: ${producerProcessingTime}`);
142+
startTrackingMemory();
81143
const { mean, p50, p90, p95, outliers } = await runProducerConsumerTogether(brokers, topic, messageCount, messageSize, producerProcessingTime, consumerProcessingTime);
144+
endTrackingMemory(`producer-consumer-together-${mode}.json`);
82145
console.log(`=== Produce-To-Consume Latency (ms): Mean: ${mean}, P50: ${p50}, P90: ${p90}, P95: ${p95}`);
83146

84147
// The presence of outliers invalidates the mean measurement, and rasies concerns as to why there are any.
@@ -87,4 +150,9 @@ const producerProcessingTime = process.env.PRODUCER_PROCESSING_TIME ? +process.e
87150
console.log("=== Outliers (ms): ", outliers);
88151
}
89152
}
153+
154+
if (maxAverageRSSKB !== undefined && maxMaxRSSKB !== undefined) {
155+
console.log(`=== Max Average RSS across tests: ${maxAverageRSSKB}`);
156+
console.log(`=== Max RSS across tests: ${maxMaxRSSKB}`);
157+
}
90158
})();

0 commit comments

Comments
 (0)