diff --git a/packages/beacon-node/src/network/core/events.ts b/packages/beacon-node/src/network/core/events.ts index 31bb0cb5d89..d97341569a9 100644 --- a/packages/beacon-node/src/network/core/events.ts +++ b/packages/beacon-node/src/network/core/events.ts @@ -2,7 +2,7 @@ import EventEmitter from "node:events"; import {ResponseIncoming, ResponseOutgoing} from "@lodestar/reqresp"; import {AsyncIterableEventBus, IteratorEvent, RequestEvent} from "../../util/asyncIterableToEvents.js"; import {StrictEventEmitterSingleArg} from "../../util/strictEvents.js"; -import {EventDirection} from "../../util/workerEvents.js"; +import {EventDirection} from "../events.js"; import {IncomingRequestArgs, OutgoingRequestArgs} from "../reqresp/types.js"; export enum ReqRespBridgeEvent { diff --git a/packages/beacon-node/src/network/events.ts b/packages/beacon-node/src/network/events.ts index a960ade5b8c..20d34b9966e 100644 --- a/packages/beacon-node/src/network/events.ts +++ b/packages/beacon-node/src/network/events.ts @@ -3,7 +3,6 @@ import {PeerId, TopicValidatorResult} from "@libp2p/interface"; import {CustodyIndex, Status} from "@lodestar/types"; import {PeerIdStr} from "../util/peerId.js"; import {StrictEventEmitterSingleArg} from "../util/strictEvents.js"; -import {EventDirection} from "../util/workerEvents.js"; import {PendingGossipsubMessage} from "./processor/types.js"; import {RequestTypedContainer} from "./reqresp/ReqRespBeaconNode.js"; @@ -38,6 +37,13 @@ export type NetworkEventData = { }; }; +export enum EventDirection { + workerToMain, + mainToWorker, + /** Event not emitted through worker boundary */ + none, +} + export const networkEventDirection: Record = { [NetworkEvent.peerConnected]: EventDirection.workerToMain, [NetworkEvent.peerDisconnected]: EventDirection.workerToMain, diff --git a/packages/beacon-node/src/util/workerEvents.ts b/packages/beacon-node/src/util/workerEvents.ts index 807bf7a3061..24941bd3169 100644 --- a/packages/beacon-node/src/util/workerEvents.ts +++ b/packages/beacon-node/src/util/workerEvents.ts @@ -1,9 +1,11 @@ import {MessagePort, Worker} from "node:worker_threads"; +import {Message} from "@libp2p/interface"; import {Thread} from "@chainsafe/threads"; import {Logger} from "@lodestar/logger"; import {sleep} from "@lodestar/utils"; import {Metrics} from "../metrics/metrics.js"; import {NetworkCoreWorkerMetrics} from "../network/core/metrics.js"; +import {EventDirection, NetworkEvent} from "../network/events.js"; import {StrictEventEmitterSingleArg} from "./strictEvents.js"; const NANO_TO_SECOND_CONVERSION = 1e9; @@ -15,13 +17,6 @@ export type WorkerBridgeEvent = { data: EventData[keyof EventData]; }; -export enum EventDirection { - workerToMain, - mainToWorker, - /** Event not emitted through worker boundary */ - none, -} - /** * Bridges events from worker to main thread * Each event can only have one direction: @@ -63,7 +58,13 @@ export function wireEventsOnWorkerThread( posted: process.hrtime(), data, }; - parentPort.postMessage(workerEvent); + let transferList: ArrayBuffer[] | undefined = undefined; + if (eventName === NetworkEvent.pendingGossipsubMessage) { + const payload = data as {msg: Message}; + // Transfer the underlying ArrayBuffer to avoid copy for PendingGossipsubMessage + transferList = [payload.msg.data.buffer as ArrayBuffer]; + } + parentPort.postMessage(workerEvent, transferList); }); } } diff --git a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts index 0411c70a5a8..35fdc57f4d5 100644 --- a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts +++ b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts @@ -9,6 +9,7 @@ import {ZERO_HASH, ZERO_HASH_HEX} from "../../../../src/constants/constants.js"; import {ReqRespBridgeEvent, ReqRespBridgeEventData} from "../../../../src/network/core/events.js"; import {NetworkWorkerApi} from "../../../../src/network/core/index.js"; import { + EventDirection, GossipType, NetworkEvent, NetworkEventData, @@ -18,7 +19,6 @@ import { } from "../../../../src/network/index.js"; import {CommitteeSubscription} from "../../../../src/network/subnets/interface.js"; import {IteratorEventType} from "../../../../src/util/asyncIterableToEvents.js"; -import {EventDirection} from "../../../../src/util/workerEvents.js"; import {getValidPeerId, validPeerIdStr} from "../../../utils/peer.js"; import {EchoWorker, getEchoWorker} from "./workerEchoHandler.js";