Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
"multiformats": "^11.0.1",
"prom-client": "^15.1.0",
"qs": "^6.11.1",
"snappyjs": "^0.7.0",
"@chainsafe/snappy-wasm": "^0.5.0",
"strict-event-emitter-types": "^2.0.0",
"systeminformation": "^5.22.9",
"uint8arraylist": "^2.4.7",
Expand Down
26 changes: 19 additions & 7 deletions packages/beacon-node/src/network/gossip/encoding.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {Message} from "@libp2p/interface";
// snappyjs is better for compression for smaller payloads
import {compress, uncompress} from "snappyjs";
import xxhashFactory from "xxhash-wasm";
import {digest} from "@chainsafe/as-sha256";
import {RPC} from "@chainsafe/libp2p-gossipsub/message";
import {DataTransform} from "@chainsafe/libp2p-gossipsub/types";
import snappyWasm from "@chainsafe/snappy-wasm";
import {ForkName} from "@lodestar/params";
import {intToBytes} from "@lodestar/utils";
import {MESSAGE_DOMAIN_VALID_SNAPPY} from "./constants.js";
Expand All @@ -17,6 +17,10 @@ const xxhash = await xxhashFactory();
// Use salt to prevent msgId from being mined for collisions
const h64Seed = BigInt(Math.floor(Math.random() * 1e9));

// create singleton snappy encoder + decoder
const encoder = new snappyWasm.Encoder();
const decoder = new snappyWasm.Decoder();

// Shared buffer to convert msgId to string
const sharedMsgIdBuf = Buffer.alloc(20);

Expand Down Expand Up @@ -82,11 +86,12 @@ export class DataTransformSnappy implements DataTransform {
* - `outboundTransform()`: compress snappy payload
*/
inboundTransform(topicStr: string, data: Uint8Array): Uint8Array {
const uncompressedData = uncompress(data, this.maxSizePerMessage);
// check uncompressed data length before we actually decompress
const uncompressedDataLength = snappyWasm.decompress_len(data);
if (uncompressedDataLength > this.maxSizePerMessage) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could improve this check to be per topic
as far as I know this 10MB cap should only apply to beacon_block
others may either have fixed size, for example beacon_attestation or have different max size

throw Error(`ssz_snappy decoded data length ${uncompressedDataLength} > ${this.maxSizePerMessage}`);
}

// check uncompressed data length before we extract beacon block root, slot or
// attestation data at later steps
const uncompressedDataLength = uncompressedData.length;
const topic = this.gossipTopicCache.getTopic(topicStr);
const sszType = getGossipSSZType(topic);
this.metrics?.dataTransform.inbound.inc({type: topic.type});
Expand All @@ -98,6 +103,10 @@ export class DataTransformSnappy implements DataTransform {
throw Error(`ssz_snappy decoded data length ${uncompressedDataLength} > ${sszType.maxSize}`);
}

// Only after sanity length checks, we can decompress the data
// Using Buffer.alloc() instead of Buffer.allocUnsafe() to mitigate high GC pressure observed in some environments
const uncompressedData = Buffer.alloc(uncompressedDataLength);
decoder.decompress_into(data, uncompressedData);
return uncompressedData;
}

Expand All @@ -111,7 +120,10 @@ export class DataTransformSnappy implements DataTransform {
if (data.length > this.maxSizePerMessage) {
throw Error(`ssz_snappy encoded data length ${data.length} > ${this.maxSizePerMessage}`);
}
// No need to parse topic, everything is snappy compressed
return compress(data);

// Using Buffer.alloc() instead of Buffer.allocUnsafe() to mitigate high GC pressure observed in some environments
const compressedData = Buffer.alloc(snappyWasm.max_compress_len(data.length));
const compressedLen = encoder.compress_into(data, compressedData);
return compressedData.subarray(0, compressedLen);
}
}
47 changes: 47 additions & 0 deletions packages/beacon-node/test/perf/network/gossip/snappy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {randomBytes} from "node:crypto";
import * as snappyRs from "snappy";
import * as snappyJs from "snappyjs";
import {bench, describe} from "@chainsafe/benchmark";
import snappyWasm from "@chainsafe/snappy-wasm";

describe("network / gossip / snappy", () => {
const msgLens = [
Expand All @@ -15,6 +16,8 @@ describe("network / gossip / snappy", () => {
10000, // 100000,
];
describe("compress", () => {
const encoder = new snappyWasm.Encoder();

for (const msgLen of msgLens) {
const uncompressed = randomBytes(msgLen);
const RUNS_FACTOR = 1000;
Expand All @@ -38,9 +41,33 @@ describe("network / gossip / snappy", () => {
}
},
});

bench({
id: `${msgLen} bytes - compress - snappy-wasm`,
runsFactor: RUNS_FACTOR,
fn: () => {
for (let i = 0; i < RUNS_FACTOR; i++) {
encoder.compress(uncompressed);
}
},
});

bench({
id: `${msgLen} bytes - compress - snappy-wasm - prealloc`,
runsFactor: RUNS_FACTOR,
fn: () => {
for (let i = 0; i < RUNS_FACTOR; i++) {
let out = Buffer.alloc(snappyWasm.max_compress_len(uncompressed.length));
const len = encoder.compress_into(uncompressed, out);
out = out.subarray(0, len);
}
},
});
}
});
describe("uncompress", () => {
const decoder = new snappyWasm.Decoder();

for (const msgLen of msgLens) {
const uncompressed = randomBytes(msgLen);
const compressed = snappyJs.compress(uncompressed);
Expand All @@ -65,6 +92,26 @@ describe("network / gossip / snappy", () => {
}
},
});

bench({
id: `${msgLen} bytes - uncompress - snappy-wasm`,
runsFactor: RUNS_FACTOR,
fn: () => {
for (let i = 0; i < RUNS_FACTOR; i++) {
decoder.decompress(compressed);
}
},
});

bench({
id: `${msgLen} bytes - uncompress - snappy-wasm - prealloc`,
runsFactor: RUNS_FACTOR,
fn: () => {
for (let i = 0; i < RUNS_FACTOR; i++) {
decoder.decompress_into(compressed, Buffer.alloc(snappyWasm.decompress_len(compressed)));
}
},
});
}
});
});
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,11 @@
"@chainsafe/pubkey-index-map-win32-arm64-msvc" "3.0.0"
"@chainsafe/pubkey-index-map-win32-x64-msvc" "3.0.0"

"@chainsafe/snappy-wasm@^0.5.0":
version "0.5.0"
resolved "https://registry.yarnpkg.com/@chainsafe/snappy-wasm/-/snappy-wasm-0.5.0.tgz#067e534341ef746706e2dbf255bd7604c849be73"
integrity sha512-ydXvhr9p+JjvzSSEyi6XExq8pHugFnrk70mk17T6mhDsklPvaXc+8K90G7TJF+u51lxI/fpv7MahrA5ayjFcSA==

"@chainsafe/ssz@^0.11.1":
version "0.11.1"
resolved "https://registry.yarnpkg.com/@chainsafe/ssz/-/ssz-0.11.1.tgz#d4aec883af2ec5196ae67b96242c467da20b2476"
Expand Down
Loading