From 4419e352bae4461809e9c22d2d68cf01ea57640a Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 5 Dec 2025 09:08:40 +0700 Subject: [PATCH 1/9] feat: add snappyjs implementation in ts --- .../src/network/gossip/snappyjs/compressor.ts | 212 ++++++++++++++++++ .../network/gossip/snappyjs/decompressor.ts | 107 +++++++++ .../src/network/gossip/snappyjs/error.ts | 14 ++ .../src/network/gossip/snappyjs/index.ts | 56 +++++ 4 files changed, 389 insertions(+) create mode 100644 packages/beacon-node/src/network/gossip/snappyjs/compressor.ts create mode 100644 packages/beacon-node/src/network/gossip/snappyjs/decompressor.ts create mode 100644 packages/beacon-node/src/network/gossip/snappyjs/error.ts create mode 100644 packages/beacon-node/src/network/gossip/snappyjs/index.ts diff --git a/packages/beacon-node/src/network/gossip/snappyjs/compressor.ts b/packages/beacon-node/src/network/gossip/snappyjs/compressor.ts new file mode 100644 index 00000000000..703e59ea4a6 --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappyjs/compressor.ts @@ -0,0 +1,212 @@ +const BLOCK_LOG = 16; +const BLOCK_SIZE = 1 << BLOCK_LOG; + +const MAX_HASH_TABLE_BITS = 14; +const globalHashTables = new Array(MAX_HASH_TABLE_BITS + 1); + +export class SnappyCompressor { + constructor(private readonly array: Uint8Array) {} + + maxCompressedLength(): number { + const sourceLen = this.array.length; + return 32 + sourceLen + Math.floor(sourceLen / 6); + } + + compressToBuffer(outBuffer: Uint8Array): number { + const array = this.array; + const length = array.length; + let pos = 0; + let outPos = 0; + + let fragmentSize: number; + + outPos = putVarint(length, outBuffer, outPos); + while (pos < length) { + fragmentSize = Math.min(length - pos, BLOCK_SIZE); + outPos = compressFragment(array, pos, fragmentSize, outBuffer, outPos); + pos += fragmentSize; + } + + return outPos; + } +} + +function hashFunc(key: number, hashFuncShift: number): number { + return (key * 0x1e35a7bd) >>> hashFuncShift; +} + +function load32(array: Uint8Array, pos: number): number { + return array[pos] + (array[pos + 1] << 8) + (array[pos + 2] << 16) + (array[pos + 3] << 24); +} + +function equals32(array: Uint8Array, pos1: number, pos2: number): boolean { + return ( + array[pos1] === array[pos2] && + array[pos1 + 1] === array[pos2 + 1] && + array[pos1 + 2] === array[pos2 + 2] && + array[pos1 + 3] === array[pos2 + 3] + ); +} + +function copyBytes(fromArray: Uint8Array, fromPos: number, toArray: Uint8Array, toPos: number, length: number): void { + for (let i = 0; i < length; i++) { + toArray[toPos + i] = fromArray[fromPos + i]; + } +} + +function emitLiteral(input: Uint8Array, ip: number, len: number, output: Uint8Array, op: number): number { + if (len <= 60) { + output[op] = (len - 1) << 2; + op += 1; + } else if (len < 256) { + output[op] = 60 << 2; + output[op + 1] = len - 1; + op += 2; + } else { + output[op] = 61 << 2; + output[op + 1] = (len - 1) & 0xff; + output[op + 2] = (len - 1) >>> 8; + op += 3; + } + copyBytes(input, ip, output, op, len); + return op + len; +} + +function emitCopyLessThan64(output: Uint8Array, op: number, offset: number, len: number): number { + if (len < 12 && offset < 2048) { + output[op] = 1 + ((len - 4) << 2) + ((offset >>> 8) << 5); + output[op + 1] = offset & 0xff; + return op + 2; + } + output[op] = 2 + ((len - 1) << 2); + output[op + 1] = offset & 0xff; + output[op + 2] = offset >>> 8; + return op + 3; +} + +function emitCopy(output: Uint8Array, op: number, offset: number, len: number): number { + while (len >= 68) { + op = emitCopyLessThan64(output, op, offset, 64); + len -= 64; + } + if (len > 64) { + op = emitCopyLessThan64(output, op, offset, 60); + len -= 60; + } + return emitCopyLessThan64(output, op, offset, len); +} + +function compressFragment(input: Uint8Array, ip: number, inputSize: number, output: Uint8Array, op: number): number { + let hashTableBits = 1; + while (1 << hashTableBits <= inputSize && hashTableBits <= MAX_HASH_TABLE_BITS) { + hashTableBits += 1; + } + hashTableBits -= 1; + const hashFuncShift = 32 - hashTableBits; + + if (typeof globalHashTables[hashTableBits] === "undefined") { + globalHashTables[hashTableBits] = new Uint16Array(1 << hashTableBits); + } + const hashTable = globalHashTables[hashTableBits]; + for (let i = 0; i < hashTable.length; i++) { + hashTable[i] = 0; + } + + const ipEnd = ip + inputSize; + let ipLimit: number; + const baseIp = ip; + let nextEmit = ip; + + let hash: number; + let nextHash: number; + let nextIp: number; + let candidate = 0; + let skip: number; + let bytesBetweenHashLookups: number; + let base: number; + let matched: number; + let offset: number; + let prevHash: number; + let curHash: number; + let flag = true; + + const INPUT_MARGIN = 15; + if (inputSize >= INPUT_MARGIN) { + ipLimit = ipEnd - INPUT_MARGIN; + + ip += 1; + nextHash = hashFunc(load32(input, ip), hashFuncShift); + + while (flag) { + skip = 32; + nextIp = ip; + do { + ip = nextIp; + hash = nextHash; + bytesBetweenHashLookups = skip >>> 5; + skip += 1; + nextIp = ip + bytesBetweenHashLookups; + if (ip > ipLimit) { + flag = false; + break; + } + nextHash = hashFunc(load32(input, nextIp), hashFuncShift); + candidate = baseIp + hashTable[hash]; + hashTable[hash] = ip - baseIp; + } while (!equals32(input, ip, candidate)); + + if (!flag) { + break; + } + + op = emitLiteral(input, nextEmit, ip - nextEmit, output, op); + + do { + base = ip; + matched = 4; + while (ip + matched < ipEnd && input[ip + matched] === input[candidate + matched]) { + matched += 1; + } + ip += matched; + offset = base - candidate; + op = emitCopy(output, op, offset, matched); + + nextEmit = ip; + if (ip >= ipLimit) { + flag = false; + break; + } + prevHash = hashFunc(load32(input, ip - 1), hashFuncShift); + hashTable[prevHash] = ip - 1 - baseIp; + curHash = hashFunc(load32(input, ip), hashFuncShift); + candidate = baseIp + hashTable[curHash]; + hashTable[curHash] = ip - baseIp; + } while (equals32(input, ip, candidate)); + + if (!flag) { + break; + } + + ip += 1; + nextHash = hashFunc(load32(input, ip), hashFuncShift); + } + } + + if (nextEmit < ipEnd) { + op = emitLiteral(input, nextEmit, ipEnd - nextEmit, output, op); + } + + return op; +} + +function putVarint(value: number, output: Uint8Array, op: number): number { + do { + output[op] = value & 0x7f; + value = value >>> 7; + if (value > 0) { + output[op] += 0x80; + } + op += 1; + } while (value > 0); + return op; +} diff --git a/packages/beacon-node/src/network/gossip/snappyjs/decompressor.ts b/packages/beacon-node/src/network/gossip/snappyjs/decompressor.ts new file mode 100644 index 00000000000..7ecba7cb8fb --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappyjs/decompressor.ts @@ -0,0 +1,107 @@ +const WORD_MASK = [0, 0xff, 0xffff, 0xffffff, 0xffffffff]; +function copyBytes(fromArray: Uint8Array, fromPos: number, toArray: Uint8Array, toPos: number, length: number): void { + for (let i = 0; i < length; i++) { + toArray[toPos + i] = fromArray[fromPos + i]; + } +} + +function selfCopyBytes(array: Uint8Array, pos: number, offset: number, length: number): void { + for (let i = 0; i < length; i++) { + array[pos + i] = array[pos - offset + i]; + } +} + +export class SnappyDecompressor { + private pos = 0; + constructor(private readonly array: Uint8Array) {} + + readUncompressedLength(): number { + let result = 0; + let shift = 0; + let c: number; + let val: number; + + while (shift < 32 && this.pos < this.array.length) { + c = this.array[this.pos]; + this.pos += 1; + val = c & 0x7f; + if ((val << shift) >>> shift !== val) { + return -1; + } + result |= val << shift; + if (c < 128) { + return result; + } + shift += 7; + } + return -1; + } + + uncompressToBuffer(outBuffer: Uint8Array): boolean { + const array = this.array; + const arrayLength = array.length; + let pos = this.pos; + let outPos = 0; + + let c: number; + let len = 0; + let smallLen: number; + let offset = 0; + + while (pos < array.length) { + c = array[pos]; + pos += 1; + if ((c & 0x3) === 0) { + // Literal + len = (c >>> 2) + 1; + if (len > 60) { + if (pos + 3 >= arrayLength) { + return false; + } + smallLen = len - 60; + len = array[pos] + (array[pos + 1] << 8) + (array[pos + 2] << 16) + (array[pos + 3] << 24); + len = (len & WORD_MASK[smallLen]) + 1; + pos += smallLen; + } + if (pos + len > arrayLength) { + return false; + } + copyBytes(array, pos, outBuffer, outPos, len); + pos += len; + outPos += len; + } else { + switch (c & 0x3) { + case 1: + len = ((c >>> 2) & 0x7) + 4; + offset = array[pos] + ((c >>> 5) << 8); + pos += 1; + break; + case 2: + if (pos + 1 >= arrayLength) { + return false; + } + len = (c >>> 2) + 1; + offset = array[pos] + (array[pos + 1] << 8); + pos += 2; + break; + case 3: + if (pos + 3 >= arrayLength) { + return false; + } + len = (c >>> 2) + 1; + offset = array[pos] + (array[pos + 1] << 8) + (array[pos + 2] << 16) + (array[pos + 3] << 24); + pos += 4; + break; + default: + break; + } + if (offset === 0 || offset > outPos) { + return false; + } + selfCopyBytes(outBuffer, outPos, offset, len); + outPos += len; + } + } + return true; + } +} diff --git a/packages/beacon-node/src/network/gossip/snappyjs/error.ts b/packages/beacon-node/src/network/gossip/snappyjs/error.ts new file mode 100644 index 00000000000..84188e1b17f --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappyjs/error.ts @@ -0,0 +1,14 @@ +import {LodestarError} from "@lodestar/utils"; + +export enum SnappyErrorCode { + UNCOMPRESS_EXCEED_MAX_LENGTH = "SNAPPY_ERROR_UNCOMPRESS_EXCEED_MAX_LENGTH", + UNCOMPRESS_CANNOT_EXTRACT_LENGTH = "SNAPPY_ERROR_UNCOMPRESS_CANNOT_EXTRACT_LENGTH", + UNCOMPRESS_BUFFER_TOO_SMALL = "SNAPPY_ERROR_UNCOMPRESS_BUFFER_TOO_SMALL", + UNCOMPRESS_INVALID_BITSTREAM = "SNAPPY_ERROR_UNCOMPRESS_INVALID_BITSTREAM", +} + +export class SnappyError extends LodestarError { + constructor(type: T) { + super(type); + } +} diff --git a/packages/beacon-node/src/network/gossip/snappyjs/index.ts b/packages/beacon-node/src/network/gossip/snappyjs/index.ts new file mode 100644 index 00000000000..25bca7cb005 --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappyjs/index.ts @@ -0,0 +1,56 @@ +import {SnappyCompressor} from "./compressor.js"; +import {SnappyDecompressor} from "./decompressor.js"; +import {SnappyError, SnappyErrorCode} from "./error.js"; + +function isNode(): boolean { + if ( + typeof process === "object" && + typeof process.versions === "object" && + typeof process.versions.node !== "undefined" + ) { + return true; + } + return false; +} + +function isUint8Array(object: Uint8Array | Buffer): object is Uint8Array { + return object instanceof Uint8Array && (!isNode() || !Buffer.isBuffer(object)); +} + +export function uncompress(compressed: T, maxLength: number, outBuf?: Uint8Array): T { + const decompressor = new SnappyDecompressor(compressed); + const length = decompressor.readUncompressedLength(); + if (length === -1) { + throw new SnappyError({code: SnappyErrorCode.UNCOMPRESS_CANNOT_EXTRACT_LENGTH}); + } + if (maxLength !== undefined && length > maxLength) { + throw new SnappyError({code: SnappyErrorCode.UNCOMPRESS_EXCEED_MAX_LENGTH}); + } + if (outBuf !== undefined && outBuf.length < length) { + throw new SnappyError({code: SnappyErrorCode.UNCOMPRESS_BUFFER_TOO_SMALL}); + } + + const uncompressed = + outBuf !== undefined + ? outBuf.subarray(0, length) + : isUint8Array(compressed) + ? new Uint8Array(length) + : Buffer.allocUnsafe(length); + + if (!decompressor.uncompressToBuffer(uncompressed)) { + throw new SnappyError({code: SnappyErrorCode.UNCOMPRESS_INVALID_BITSTREAM}); + } + return uncompressed as T; +} + +export function compress(uncompressed: T): T { + const compressor = new SnappyCompressor(uncompressed); + const maxLength = compressor.maxCompressedLength(); + const uint8Mode = isUint8Array(uncompressed); + const compressed = uint8Mode ? new Uint8Array(maxLength) : Buffer.allocUnsafe(maxLength); + const length = compressor.compressToBuffer(compressed); + if (uint8Mode) { + return compressed.subarray(0, length) as T; + } + return compressed.slice(0, length) as T; +} From cbe7cfa33e3b1bb258cc602f561a226ab48b6b69 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 5 Dec 2025 09:53:43 +0700 Subject: [PATCH 2/9] feat: implement ISnappyDecompressor interface --- packages/beacon-node/package.json | 1 + .../src/network/gossip/snappy/index.ts | 13 ++++++++++ .../src/network/gossip/snappy/interface.ts | 4 ++++ .../snappy-js}/compressor.ts | 0 .../snappy-js}/decompressor.ts | 6 +++-- .../{snappyjs => snappy/snappy-js}/error.ts | 0 .../{snappyjs => snappy/snappy-js}/index.ts | 2 +- .../src/network/gossip/snappy/snappy-wasm.ts | 18 ++++++++++++++ .../test/unit/network/gossip/snappy.test.ts | 24 +++++++++++++++++++ 9 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 packages/beacon-node/src/network/gossip/snappy/index.ts create mode 100644 packages/beacon-node/src/network/gossip/snappy/interface.ts rename packages/beacon-node/src/network/gossip/{snappyjs => snappy/snappy-js}/compressor.ts (100%) rename packages/beacon-node/src/network/gossip/{snappyjs => snappy/snappy-js}/decompressor.ts (94%) rename packages/beacon-node/src/network/gossip/{snappyjs => snappy/snappy-js}/error.ts (100%) rename packages/beacon-node/src/network/gossip/{snappyjs => snappy/snappy-js}/index.ts (97%) create mode 100644 packages/beacon-node/src/network/gossip/snappy/snappy-wasm.ts create mode 100644 packages/beacon-node/test/unit/network/gossip/snappy.test.ts diff --git a/packages/beacon-node/package.json b/packages/beacon-node/package.json index 3c92b38b191..8b204247db9 100644 --- a/packages/beacon-node/package.json +++ b/packages/beacon-node/package.json @@ -166,6 +166,7 @@ "multiformats": "^11.0.1", "prom-client": "^15.1.0", "qs": "^6.11.1", + "snappyjs": "^0.7.0", "@chainsafe/snappy-wasm": "^0.5.0", "strict-event-emitter-types": "^2.0.0", "systeminformation": "^5.22.9", diff --git a/packages/beacon-node/src/network/gossip/snappy/index.ts b/packages/beacon-node/src/network/gossip/snappy/index.ts new file mode 100644 index 00000000000..9794d959323 --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappy/index.ts @@ -0,0 +1,13 @@ +import {GossipType} from "../interface.js"; +import {ISnappyDecompressor} from "./interface.js"; +import {SnappyDecompressor} from "./snappy-js/decompressor.js"; +import {SnappyWasmDecompressor} from "./snappy-wasm.js"; + +export function getSnappyDecompressor(topicType: GossipType, data: Uint8Array): ISnappyDecompressor { + switch (topicType) { + case GossipType.beacon_attestation: + return new SnappyDecompressor(data); + default: + return new SnappyWasmDecompressor(data); + } +} diff --git a/packages/beacon-node/src/network/gossip/snappy/interface.ts b/packages/beacon-node/src/network/gossip/snappy/interface.ts new file mode 100644 index 00000000000..6bebb37be59 --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappy/interface.ts @@ -0,0 +1,4 @@ +export interface ISnappyDecompressor { + readUncompressedLength(): number; + uncompressInto(outBuffer: Uint8Array): boolean; +} diff --git a/packages/beacon-node/src/network/gossip/snappyjs/compressor.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-js/compressor.ts similarity index 100% rename from packages/beacon-node/src/network/gossip/snappyjs/compressor.ts rename to packages/beacon-node/src/network/gossip/snappy/snappy-js/compressor.ts diff --git a/packages/beacon-node/src/network/gossip/snappyjs/decompressor.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-js/decompressor.ts similarity index 94% rename from packages/beacon-node/src/network/gossip/snappyjs/decompressor.ts rename to packages/beacon-node/src/network/gossip/snappy/snappy-js/decompressor.ts index 7ecba7cb8fb..4135c502119 100644 --- a/packages/beacon-node/src/network/gossip/snappyjs/decompressor.ts +++ b/packages/beacon-node/src/network/gossip/snappy/snappy-js/decompressor.ts @@ -1,3 +1,5 @@ +import {ISnappyDecompressor} from "../interface.js"; + const WORD_MASK = [0, 0xff, 0xffff, 0xffffff, 0xffffffff]; function copyBytes(fromArray: Uint8Array, fromPos: number, toArray: Uint8Array, toPos: number, length: number): void { for (let i = 0; i < length; i++) { @@ -11,7 +13,7 @@ function selfCopyBytes(array: Uint8Array, pos: number, offset: number, length: n } } -export class SnappyDecompressor { +export class SnappyDecompressor implements ISnappyDecompressor { private pos = 0; constructor(private readonly array: Uint8Array) {} @@ -37,7 +39,7 @@ export class SnappyDecompressor { return -1; } - uncompressToBuffer(outBuffer: Uint8Array): boolean { + uncompressInto(outBuffer: Uint8Array): boolean { const array = this.array; const arrayLength = array.length; let pos = this.pos; diff --git a/packages/beacon-node/src/network/gossip/snappyjs/error.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-js/error.ts similarity index 100% rename from packages/beacon-node/src/network/gossip/snappyjs/error.ts rename to packages/beacon-node/src/network/gossip/snappy/snappy-js/error.ts diff --git a/packages/beacon-node/src/network/gossip/snappyjs/index.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-js/index.ts similarity index 97% rename from packages/beacon-node/src/network/gossip/snappyjs/index.ts rename to packages/beacon-node/src/network/gossip/snappy/snappy-js/index.ts index 25bca7cb005..10ef35d7208 100644 --- a/packages/beacon-node/src/network/gossip/snappyjs/index.ts +++ b/packages/beacon-node/src/network/gossip/snappy/snappy-js/index.ts @@ -37,7 +37,7 @@ export function uncompress(compressed: T, maxLeng ? new Uint8Array(length) : Buffer.allocUnsafe(length); - if (!decompressor.uncompressToBuffer(uncompressed)) { + if (!decompressor.uncompressInto(uncompressed)) { throw new SnappyError({code: SnappyErrorCode.UNCOMPRESS_INVALID_BITSTREAM}); } return uncompressed as T; diff --git a/packages/beacon-node/src/network/gossip/snappy/snappy-wasm.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-wasm.ts new file mode 100644 index 00000000000..2eb612f850f --- /dev/null +++ b/packages/beacon-node/src/network/gossip/snappy/snappy-wasm.ts @@ -0,0 +1,18 @@ +import snappyWasm from "@chainsafe/snappy-wasm"; +import {ISnappyDecompressor} from "./interface.js"; + +// create singleton snappy encoder + decoder +const decoder = new snappyWasm.Decoder(); + +export class SnappyWasmDecompressor implements ISnappyDecompressor { + constructor(private readonly data: Uint8Array) {} + + readUncompressedLength(): number { + return snappyWasm.decompress_len(this.data); + } + + uncompressInto(outBuffer: Uint8Array): boolean { + decoder.decompress_into(this.data, outBuffer); + return true; + } +} diff --git a/packages/beacon-node/test/unit/network/gossip/snappy.test.ts b/packages/beacon-node/test/unit/network/gossip/snappy.test.ts new file mode 100644 index 00000000000..3ed95e5a830 --- /dev/null +++ b/packages/beacon-node/test/unit/network/gossip/snappy.test.ts @@ -0,0 +1,24 @@ +import {randomBytes} from "crypto"; +import {compress as snappyJsCompress} from "snappyjs"; +import {describe, expect, it} from "vitest"; +import {getSnappyDecompressor} from "../../../../src/network/gossip/snappy/index.js"; +import {GossipType} from "../../../../src/network/index.js"; + +describe("snappy", () => { + const lengths = [0, 1, 10, 100, 1000, 10000, 100000]; + for (const length of lengths) { + it(`should decompress data of length ${length} compressed by snappyjs`, () => { + const buffer = randomBytes(length); + const compressed = snappyJsCompress(buffer); + for (const gossipType of [GossipType.beacon_attestation, GossipType.beacon_block]) { + const decompressor = getSnappyDecompressor(gossipType, compressed); + const uncompressedLength = decompressor.readUncompressedLength(); + expect(uncompressedLength).toBe(length); + const out = new Uint8Array(length); + const success = decompressor.uncompressInto(out); + expect(success).toBe(true); + expect(out).toEqual(new Uint8Array(buffer)); + } + }); + } +}); From e717d37fd42fb142fa32076064c812fc7d062669 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 5 Dec 2025 10:05:01 +0700 Subject: [PATCH 3/9] feat: use different uncompressor based on topic type --- .../src/network/gossip/encoding.ts | 35 +++++++++++++------ .../src/network/gossip/snappy/index.ts | 4 +++ 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/packages/beacon-node/src/network/gossip/encoding.ts b/packages/beacon-node/src/network/gossip/encoding.ts index 32cc36d9a2f..c76cf06ec7d 100644 --- a/packages/beacon-node/src/network/gossip/encoding.ts +++ b/packages/beacon-node/src/network/gossip/encoding.ts @@ -9,6 +9,7 @@ import {ForkName} from "@lodestar/params"; import {intToBytes} from "@lodestar/utils"; import {MESSAGE_DOMAIN_VALID_SNAPPY} from "./constants.js"; import {Eth2GossipsubMetrics} from "./metrics.js"; +import {getSnappyDecompressor} from "./snappy/index.js"; import {GossipTopicCache, getGossipSSZType} from "./topic.js"; // Load WASM @@ -17,9 +18,8 @@ const xxhash = await xxhashFactory(); // Use salt to prevent msgId from being mined for collisions const h64Seed = BigInt(Math.floor(Math.random() * 1e9)); -// create singleton snappy encoder + decoder +// to compress outgoing data, we always go with snappy-wasm, this is singleton encoder const encoder = new snappyWasm.Encoder(); -const decoder = new snappyWasm.Decoder(); // Shared buffer to convert msgId to string const sharedMsgIdBuf = Buffer.alloc(20); @@ -86,26 +86,39 @@ export class DataTransformSnappy implements DataTransform { * - `outboundTransform()`: compress snappy payload */ inboundTransform(topicStr: string, data: Uint8Array): Uint8Array { - // check uncompressed data length before we actually decompress - const uncompressedDataLength = snappyWasm.decompress_len(data); - if (uncompressedDataLength > this.maxSizePerMessage) { - throw Error(`ssz_snappy decoded data length ${uncompressedDataLength} > ${this.maxSizePerMessage}`); - } - const topic = this.gossipTopicCache.getTopic(topicStr); const sszType = getGossipSSZType(topic); this.metrics?.dataTransform.inbound.inc({type: topic.type}); + // check uncompressed data length before we actually decompress + const decompressor = getSnappyDecompressor(topic.type, data); + const uncompressedDataLength = decompressor.readUncompressedLength(); + if (uncompressedDataLength > this.maxSizePerMessage) { + throw Error( + `ssz_snappy decoded data length ${uncompressedDataLength} > ${this.maxSizePerMessage} for topic ${topicStr}` + ); + } + if (uncompressedDataLength < sszType.minSize) { - throw Error(`ssz_snappy decoded data length ${uncompressedDataLength} < ${sszType.minSize}`); + throw Error( + `ssz_snappy decoded data length ${uncompressedDataLength} < ${sszType.minSize} for topic ${topicStr}` + ); } + if (uncompressedDataLength > sszType.maxSize) { - throw Error(`ssz_snappy decoded data length ${uncompressedDataLength} > ${sszType.maxSize}`); + throw Error( + `ssz_snappy decoded data length ${uncompressedDataLength} > ${sszType.maxSize} for topic ${topicStr}` + ); } // Only after sanity length checks, we can decompress the data const uncompressedData = Buffer.allocUnsafe(uncompressedDataLength); - decoder.decompress_into(data, uncompressedData); + if (!decompressor.uncompressInto(uncompressedData)) { + throw Error( + `ssz_snappy failed to decompress data for topic ${topicStr}, compressed length ${data.length}, expected uncompressed length ${uncompressedDataLength}` + ); + } + return uncompressedData; } diff --git a/packages/beacon-node/src/network/gossip/snappy/index.ts b/packages/beacon-node/src/network/gossip/snappy/index.ts index 9794d959323..2ebde8d6c06 100644 --- a/packages/beacon-node/src/network/gossip/snappy/index.ts +++ b/packages/beacon-node/src/network/gossip/snappy/index.ts @@ -3,6 +3,10 @@ import {ISnappyDecompressor} from "./interface.js"; import {SnappyDecompressor} from "./snappy-js/decompressor.js"; import {SnappyWasmDecompressor} from "./snappy-wasm.js"; +/** + * for decompression, we use different implementations based on topic type + * snappy-wasm is generally better for larger payloads and snappyjs is better for smaller payloads + */ export function getSnappyDecompressor(topicType: GossipType, data: Uint8Array): ISnappyDecompressor { switch (topicType) { case GossipType.beacon_attestation: From 3f091e8addaaa7ddceb35794ad55c1be39f90c85 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 5 Dec 2025 10:29:44 +0700 Subject: [PATCH 4/9] fix: remove snappy-js --- packages/beacon-node/package.json | 1 - .../test/unit/network/gossip/snappy.test.ts | 14 +++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/beacon-node/package.json b/packages/beacon-node/package.json index 8b204247db9..3c92b38b191 100644 --- a/packages/beacon-node/package.json +++ b/packages/beacon-node/package.json @@ -166,7 +166,6 @@ "multiformats": "^11.0.1", "prom-client": "^15.1.0", "qs": "^6.11.1", - "snappyjs": "^0.7.0", "@chainsafe/snappy-wasm": "^0.5.0", "strict-event-emitter-types": "^2.0.0", "systeminformation": "^5.22.9", diff --git a/packages/beacon-node/test/unit/network/gossip/snappy.test.ts b/packages/beacon-node/test/unit/network/gossip/snappy.test.ts index 3ed95e5a830..3ff33c7cf7a 100644 --- a/packages/beacon-node/test/unit/network/gossip/snappy.test.ts +++ b/packages/beacon-node/test/unit/network/gossip/snappy.test.ts @@ -1,15 +1,23 @@ import {randomBytes} from "crypto"; -import {compress as snappyJsCompress} from "snappyjs"; import {describe, expect, it} from "vitest"; +import snappyWasm from "@chainsafe/snappy-wasm"; import {getSnappyDecompressor} from "../../../../src/network/gossip/snappy/index.js"; import {GossipType} from "../../../../src/network/index.js"; +const encoder = new snappyWasm.Encoder(); + +function compress(data: Uint8Array): Uint8Array { + const compressedData = Buffer.allocUnsafe(snappyWasm.max_compress_len(data.length)); + const compressedLen = encoder.compress_into(data, compressedData); + return compressedData.subarray(0, compressedLen); +} + describe("snappy", () => { const lengths = [0, 1, 10, 100, 1000, 10000, 100000]; for (const length of lengths) { - it(`should decompress data of length ${length} compressed by snappyjs`, () => { + it(`should decompress data of length ${length} compressed by snappy-wasm`, () => { const buffer = randomBytes(length); - const compressed = snappyJsCompress(buffer); + const compressed = compress(buffer); for (const gossipType of [GossipType.beacon_attestation, GossipType.beacon_block]) { const decompressor = getSnappyDecompressor(gossipType, compressed); const uncompressedLength = decompressor.readUncompressedLength(); From c94ede7d452b29547a74f725949f95f45545d384 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 5 Dec 2025 10:36:48 +0700 Subject: [PATCH 5/9] fix: make consistent behaviour across implementations --- .../beacon-node/src/network/gossip/encoding.ts | 6 ++++++ .../src/network/gossip/snappy/snappy-wasm.ts | 14 +++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/packages/beacon-node/src/network/gossip/encoding.ts b/packages/beacon-node/src/network/gossip/encoding.ts index c76cf06ec7d..dd011fd95df 100644 --- a/packages/beacon-node/src/network/gossip/encoding.ts +++ b/packages/beacon-node/src/network/gossip/encoding.ts @@ -93,6 +93,12 @@ export class DataTransformSnappy implements DataTransform { // check uncompressed data length before we actually decompress const decompressor = getSnappyDecompressor(topic.type, data); const uncompressedDataLength = decompressor.readUncompressedLength(); + if (uncompressedDataLength < 0) { + throw Error( + `ssz_snappy failed to read uncompressed length for topic ${topicStr}, compressed length ${data.length}` + ); + } + if (uncompressedDataLength > this.maxSizePerMessage) { throw Error( `ssz_snappy decoded data length ${uncompressedDataLength} > ${this.maxSizePerMessage} for topic ${topicStr}` diff --git a/packages/beacon-node/src/network/gossip/snappy/snappy-wasm.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-wasm.ts index 2eb612f850f..c3887cb3215 100644 --- a/packages/beacon-node/src/network/gossip/snappy/snappy-wasm.ts +++ b/packages/beacon-node/src/network/gossip/snappy/snappy-wasm.ts @@ -8,11 +8,19 @@ export class SnappyWasmDecompressor implements ISnappyDecompressor { constructor(private readonly data: Uint8Array) {} readUncompressedLength(): number { - return snappyWasm.decompress_len(this.data); + try { + return snappyWasm.decompress_len(this.data); + } catch { + return -1; + } } uncompressInto(outBuffer: Uint8Array): boolean { - decoder.decompress_into(this.data, outBuffer); - return true; + try { + decoder.decompress_into(this.data, outBuffer); + return true; + } catch { + return false; + } } } From 2d4de6aed95f14bf24f263593dca86901471ecf0 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 5 Dec 2025 11:44:21 +0700 Subject: [PATCH 6/9] fix: use snappy-wasm for beacon_block and data_column_sidecar only --- packages/beacon-node/src/network/gossip/snappy/index.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/beacon-node/src/network/gossip/snappy/index.ts b/packages/beacon-node/src/network/gossip/snappy/index.ts index 2ebde8d6c06..73706f279fb 100644 --- a/packages/beacon-node/src/network/gossip/snappy/index.ts +++ b/packages/beacon-node/src/network/gossip/snappy/index.ts @@ -9,9 +9,11 @@ import {SnappyWasmDecompressor} from "./snappy-wasm.js"; */ export function getSnappyDecompressor(topicType: GossipType, data: Uint8Array): ISnappyDecompressor { switch (topicType) { - case GossipType.beacon_attestation: - return new SnappyDecompressor(data); - default: + case GossipType.beacon_block: + case GossipType.blob_sidecar: + case GossipType.data_column_sidecar: return new SnappyWasmDecompressor(data); + default: + return new SnappyDecompressor(data); } } From a74d3164cae1f32ed0bd05a380ffb1e90e045ae2 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 5 Dec 2025 13:45:41 +0700 Subject: [PATCH 7/9] fix: use Buffer.alloc() --- packages/beacon-node/src/network/gossip/encoding.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/beacon-node/src/network/gossip/encoding.ts b/packages/beacon-node/src/network/gossip/encoding.ts index dd011fd95df..8dc84e17b57 100644 --- a/packages/beacon-node/src/network/gossip/encoding.ts +++ b/packages/beacon-node/src/network/gossip/encoding.ts @@ -118,7 +118,8 @@ export class DataTransformSnappy implements DataTransform { } // Only after sanity length checks, we can decompress the data - const uncompressedData = Buffer.allocUnsafe(uncompressedDataLength); + // using Buffer.allocUnsafe() caused huge MarkSweepCompact gc on the main thread of sas nodes + const uncompressedData = Buffer.alloc(uncompressedDataLength); if (!decompressor.uncompressInto(uncompressedData)) { throw Error( `ssz_snappy failed to decompress data for topic ${topicStr}, compressed length ${data.length}, expected uncompressed length ${uncompressedDataLength}` @@ -139,7 +140,8 @@ export class DataTransformSnappy implements DataTransform { throw Error(`ssz_snappy encoded data length ${data.length} > ${this.maxSizePerMessage}`); } - const compressedData = Buffer.allocUnsafe(snappyWasm.max_compress_len(data.length)); + // using Buffer.allocUnsafe() caused huge MarkSweepCompact gc on the main thread of sas nodes + const compressedData = Buffer.alloc(snappyWasm.max_compress_len(data.length)); const compressedLen = encoder.compress_into(data, compressedData); return compressedData.subarray(0, compressedLen); } From 95416e84e041858e24d058cce333c3cd4803f73d Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 5 Dec 2025 14:11:46 +0700 Subject: [PATCH 8/9] fix: update snappy benchmark using ISnappyDecompressor interface --- .../test/perf/network/gossip/snappy.test.ts | 50 ++++++------------- 1 file changed, 15 insertions(+), 35 deletions(-) diff --git a/packages/beacon-node/test/perf/network/gossip/snappy.test.ts b/packages/beacon-node/test/perf/network/gossip/snappy.test.ts index 514ab7fe99f..5900cee65d3 100644 --- a/packages/beacon-node/test/perf/network/gossip/snappy.test.ts +++ b/packages/beacon-node/test/perf/network/gossip/snappy.test.ts @@ -1,8 +1,9 @@ import {randomBytes} from "node:crypto"; -import * as snappyRs from "snappy"; import * as snappyJs from "snappyjs"; import {bench, describe} from "@chainsafe/benchmark"; import snappyWasm from "@chainsafe/snappy-wasm"; +import {SnappyDecompressor} from "../../../../src/network/gossip/snappy/snappy-js/decompressor.js"; +import {SnappyWasmDecompressor} from "../../../../src/network/gossip/snappy/snappy-wasm.js"; describe("network / gossip / snappy", () => { const msgLens = [ @@ -32,16 +33,6 @@ describe("network / gossip / snappy", () => { }, }); - bench({ - id: `${msgLen} bytes - compress - snappy`, - runsFactor: RUNS_FACTOR, - fn: () => { - for (let i = 0; i < RUNS_FACTOR; i++) { - snappyRs.compressSync(uncompressed); - } - }, - }); - bench({ id: `${msgLen} bytes - compress - snappy-wasm`, runsFactor: RUNS_FACTOR, @@ -65,9 +56,8 @@ describe("network / gossip / snappy", () => { }); } }); - describe("uncompress", () => { - const decoder = new snappyWasm.Decoder(); + describe("uncompress", () => { for (const msgLen of msgLens) { const uncompressed = randomBytes(msgLen); const compressed = snappyJs.compress(uncompressed); @@ -78,17 +68,12 @@ describe("network / gossip / snappy", () => { runsFactor: RUNS_FACTOR, fn: () => { for (let i = 0; i < RUNS_FACTOR; i++) { - snappyJs.uncompress(compressed); - } - }, - }); - - bench({ - id: `${msgLen} bytes - uncompress - snappy`, - runsFactor: RUNS_FACTOR, - fn: () => { - for (let i = 0; i < RUNS_FACTOR; i++) { - snappyRs.uncompressSync(compressed); + const snappyJsDecompressor = new SnappyDecompressor(compressed); + const uncompressedDataLength = snappyJsDecompressor.readUncompressedLength(); + const uncompressedData = Buffer.alloc(uncompressedDataLength); + if (!snappyJsDecompressor.uncompressInto(uncompressedData)) { + throw Error("Decompression failed"); + } } }, }); @@ -98,17 +83,12 @@ describe("network / gossip / snappy", () => { runsFactor: RUNS_FACTOR, fn: () => { for (let i = 0; i < RUNS_FACTOR; i++) { - decoder.decompress(compressed); - } - }, - }); - - bench({ - id: `${msgLen} bytes - uncompress - snappy-wasm - prealloc`, - runsFactor: RUNS_FACTOR, - fn: () => { - for (let i = 0; i < RUNS_FACTOR; i++) { - decoder.decompress_into(compressed, Buffer.allocUnsafe(snappyWasm.decompress_len(compressed))); + const snappyWasmDecompressor = new SnappyWasmDecompressor(compressed); + const uncompressedDataLength = snappyWasmDecompressor.readUncompressedLength(); + const uncompressedData = Buffer.alloc(uncompressedDataLength); + if (!snappyWasmDecompressor.uncompressInto(uncompressedData)) { + throw Error("Decompression failed"); + } } }, }); From 732860ca2094dca8bae05d719f0a023c6564a426 Mon Sep 17 00:00:00 2001 From: Tuyen Nguyen Date: Fri, 5 Dec 2025 15:06:17 +0700 Subject: [PATCH 9/9] chore: link to snappy-js upstream repo --- .../beacon-node/src/network/gossip/snappy/snappy-js/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/beacon-node/src/network/gossip/snappy/snappy-js/index.ts b/packages/beacon-node/src/network/gossip/snappy/snappy-js/index.ts index 10ef35d7208..dcb412596e2 100644 --- a/packages/beacon-node/src/network/gossip/snappy/snappy-js/index.ts +++ b/packages/beacon-node/src/network/gossip/snappy/snappy-js/index.ts @@ -1,3 +1,4 @@ +/** Based on snappyjs - https://github.com/zhipeng-jia/snappyjs */ import {SnappyCompressor} from "./compressor.js"; import {SnappyDecompressor} from "./decompressor.js"; import {SnappyError, SnappyErrorCode} from "./error.js";