diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js index 5f058fa1..1ec5c93d 100644 --- a/test/2-nodes.spec.js +++ b/test/2-nodes.spec.js @@ -113,8 +113,8 @@ describe('2 nodes', () => { new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve)) ]) - expect(first(nodes[0].mesh.get(topic)).id.toB58String()).to.equal(first(nodes[0].peers).id.toB58String()) - expect(first(nodes[1].mesh.get(topic)).id.toB58String()).to.equal(first(nodes[1].peers).id.toB58String()) + expect(first(nodes[0].mesh.get(topic))).to.equal(first(nodes[0].peers).id.toB58String()) + expect(first(nodes[1].mesh.get(topic))).to.equal(first(nodes[1].peers).id.toB58String()) }) }) diff --git a/test/gossip.spec.js b/test/gossip.spec.js index 2304ac12..72948c8b 100644 --- a/test/gossip.spec.js +++ b/test/gossip.spec.js @@ -47,10 +47,9 @@ describe('gossip', () => { nodeA._pushGossip.getCalls() .map((call) => call.args[0]) - .forEach((peer) => { - const peerId = peer.id.toB58String() - nodeA.mesh.get(topic).forEach((meshPeer) => { - expect(meshPeer.id.toB58String()).to.not.equal(peerId) + .forEach((peerId) => { + nodeA.mesh.get(topic).forEach((meshPeerId) => { + expect(meshPeerId).to.not.equal(peerId) }) }) @@ -74,7 +73,7 @@ describe('gossip', () => { await delay(500) const peerB = first(nodeA.mesh.get(topic)) - const nodeB = nodes.find((n) => n.peerId.toB58String() === peerB.id.toB58String()) + const nodeB = nodes.find((n) => n.peerId.toB58String() === peerB) // set spy sinon.spy(nodeA, '_piggybackControl') diff --git a/ts/getGossipPeers.ts b/ts/getGossipPeers.ts index 10207e73..4d6181bd 100644 --- a/ts/getGossipPeers.ts +++ b/ts/getGossipPeers.ts @@ -1,5 +1,4 @@ import { shuffle, hasGossipProtocol } from './utils' -import { PeerStreams } from './peerStreams' import Gossipsub = require('./index') /** @@ -10,15 +9,15 @@ import Gossipsub = require('./index') * @param {String} topic * @param {Number} count * @param {Function} [filter] a function to filter acceptable peers - * @returns {Set} + * @returns {Set} * */ export function getGossipPeers ( router: Gossipsub, topic: string, count: number, - filter: (peerStreams: PeerStreams) => boolean = () => true -): Set { + filter: (id: string) => boolean = () => true +): Set { const peersInTopic = router.topics.get(topic) if (!peersInTopic) { return new Set() @@ -26,7 +25,7 @@ export function getGossipPeers ( // Adds all peers using our protocol // that also pass the filter function - let peers: PeerStreams[] = [] + let peers: string[] = [] peersInTopic.forEach((id) => { const peerStreams = router.peers.get(id) if (!peerStreams) { @@ -34,9 +33,9 @@ export function getGossipPeers ( } if ( hasGossipProtocol(peerStreams.protocol) && - filter(peerStreams) + filter(id) ) { - peers.push(peerStreams) + peers.push(id) } }) diff --git a/ts/heartbeat.ts b/ts/heartbeat.ts index 6a08b362..33d6cc07 100644 --- a/ts/heartbeat.ts +++ b/ts/heartbeat.ts @@ -1,7 +1,6 @@ import * as constants from './constants' import { getGossipPeers } from './getGossipPeers' import { shuffle } from './utils' -import { PeerStreams } from './peerStreams' import Gossipsub = require('./index') // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore @@ -84,8 +83,10 @@ export class Heartbeat { return s } - const tograft = new Map() - const toprune = new Map() + // peer id => topic[] + const tograft = new Map() + // peer id => topic[] + const toprune = new Map() // clean up expired backoffs this.gossipsub._clearBackoff() @@ -103,8 +104,7 @@ export class Heartbeat { // maintain the mesh for topics we have joined this.gossipsub.mesh.forEach((peers, topic) => { // prune/graft helper functions (defined per topic) - const prunePeer = (p: PeerStreams): void => { - const id = p.id.toB58String() + const prunePeer = (id: string): void => { this.gossipsub.log( 'HEARTBEAT: Remove mesh link to %s in %s', id, topic @@ -114,17 +114,16 @@ export class Heartbeat { // add prune backoff record this.gossipsub._addBackoff(id, topic) // remove peer from mesh - peers.delete(p) + peers.delete(id) // add to toprune - const topics = toprune.get(p) + const topics = toprune.get(id) if (!topics) { - toprune.set(p, [topic]) + toprune.set(id, [topic]) } else { topics.push(topic) } } - const graftPeer = (p: PeerStreams): void => { - const id = p.id.toB58String() + const graftPeer = (id: string): void => { this.gossipsub.log( 'HEARTBEAT: Add mesh link to %s in %s', id, topic @@ -132,26 +131,25 @@ export class Heartbeat { // update peer score this.gossipsub.score.graft(id, topic) // add peer to mesh - peers.add(p) + peers.add(id) // add to tograft - const topics = tograft.get(p) + const topics = tograft.get(id) if (!topics) { - tograft.set(p, [topic]) + tograft.set(id, [topic]) } else { topics.push(topic) } } // drop all peers with negative score - peers.forEach(p => { - const id = p.id.toB58String() + peers.forEach(id => { const score = getScore(id) if (score < 0) { this.gossipsub.log( 'HEARTBEAT: Prune peer %s with negative score: score=%d, topic=%s', id, score, topic ) - prunePeer(p) + prunePeer(id) } }) @@ -159,10 +157,9 @@ export class Heartbeat { if (peers.size < constants.GossipsubDlo) { const backoff = this.gossipsub.backoff.get(topic) const ineed = constants.GossipsubD - peers.size - const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => { - const id = p.id.toB58String() + const peersSet = getGossipPeers(this.gossipsub, topic, ineed, id => { // filter out mesh peers, direct peers, peers we are backing off, peers with negative score - return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0 + return !peers.has(id) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0 }) peersSet.forEach(graftPeer) @@ -172,7 +169,7 @@ export class Heartbeat { if (peers.size > constants.GossipsubDhi) { let peersArray = Array.from(peers) // sort by score - peersArray.sort((a, b) => getScore(b.id.toB58String()) - getScore(a.id.toB58String())) + peersArray.sort((a, b) => getScore(b) - getScore(a)) // We keep the first D_score peers by score and the remaining up to D randomly // under the constraint that we keep D_out peers in the mesh (if we have that many) peersArray = peersArray.slice(0, constants.GossipsubDscore).concat( @@ -237,10 +234,9 @@ export class Heartbeat { if (outbound < constants.GossipsubDout) { const ineed = constants.GossipsubDout - outbound const backoff = this.gossipsub.backoff.get(topic) - getGossipPeers(this.gossipsub, topic, ineed, (p: PeerStreams): boolean => { - const id = p.id.toB58String() + getGossipPeers(this.gossipsub, topic, ineed, (id: string): boolean => { // filter our current mesh peers, direct peers, peers we are backing off, peers with negative score - return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0 + return !peers.has(id) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0 }).forEach(graftPeer) } } @@ -256,24 +252,23 @@ export class Heartbeat { // now compute the median peer score in the mesh const peersList = Array.from(peers) - .sort((a, b) => getScore(a.id.toB58String()) - getScore(b.id.toB58String())) + .sort((a, b) => getScore(a) - getScore(b)) const medianIndex = peers.size / 2 - const medianScore = getScore(peersList[medianIndex].id.toB58String()) + const medianScore = getScore(peersList[medianIndex]) // if the median score is below the threshold, select a better peer (if any) and GRAFT if (medianScore < this.gossipsub._options.scoreThresholds.opportunisticGraftThreshold) { const backoff = this.gossipsub.backoff.get(topic) - const peersToGraft = getGossipPeers(this.gossipsub, topic, constants.GossipsubOpportunisticGraftPeers, (p: PeerStreams): boolean => { - const id = p.id.toB58String() + const peersToGraft = getGossipPeers(this.gossipsub, topic, constants.GossipsubOpportunisticGraftPeers, (id: string): boolean => { // filter out current mesh peers, direct peers, peers we are backing off, peers below or at threshold - return peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore + return peers.has(id) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore }) - peersToGraft.forEach(p => { + peersToGraft.forEach(id => { this.gossipsub.log( 'HEARTBEAT: Opportunistically graft peer %s on topic %s', - p.id.toB58String(), topic + id, topic ) - graftPeer(p) + graftPeer(id) }) } } @@ -296,28 +291,26 @@ export class Heartbeat { this.gossipsub.fanout.forEach((fanoutPeers, topic) => { // checks whether our peers are still in the topic and have a score above the publish threshold const topicPeers = this.gossipsub.topics.get(topic) - fanoutPeers.forEach(p => { - const id = p.id.toB58String() + fanoutPeers.forEach(id => { if ( !topicPeers!.has(id) || getScore(id) < this.gossipsub._options.scoreThresholds.publishThreshold ) { - fanoutPeers.delete(p) + fanoutPeers.delete(id) } }) // do we need more peers? if (fanoutPeers.size < constants.GossipsubD) { const ineed = constants.GossipsubD - fanoutPeers.size - const peersSet = getGossipPeers(this.gossipsub, topic, ineed, (p: PeerStreams): boolean => { - const id = p.id.toB58String() + const peersSet = getGossipPeers(this.gossipsub, topic, ineed, (id: string): boolean => { // filter out existing fanout peers, direct peers, and peers with score above the publish threshold - return !fanoutPeers.has(p) && + return !fanoutPeers.has(id) && !this.gossipsub.direct.has(id) && getScore(id) >= this.gossipsub._options.scoreThresholds.publishThreshold }) - peersSet.forEach(p => { - fanoutPeers.add(p) + peersSet.forEach(id => { + fanoutPeers.add(id) }) } diff --git a/ts/index.ts b/ts/index.ts index ae688511..5e58cb6a 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -42,15 +42,15 @@ class Gossipsub extends BasicPubsub { peers: Map direct: Set topics: Map> - mesh: Map> - fanout: Map> + mesh: Map> + fanout: Map> lastpub: Map - gossip: Map - control: Map + gossip: Map + control: Map peerhave:Map iasked:Map backoff: Map> - outbound: Map + outbound: Map score: PeerScore heartbeatTicks: number gossipTracer: IWantTracer @@ -120,36 +120,41 @@ class Gossipsub extends BasicPubsub { /** * Map of topic meshes + * topic => peer id set * - * @type {Map>} + * @type {Map>} */ this.mesh = new Map() /** * Map of topics to set of peers. These mesh peers are the ones to which we are publishing without a topic membership + * topic => peer id set * - * @type {Map>} + * @type {Map>} */ this.fanout = new Map() /** * Map of last publish time for fanout topics + * topic => last publish time * - * @type {Map} + * @type {Map} */ this.lastpub = new Map() /** * Map of pending messages to gossip + * peer id => control messages * - * @type {Map> } + * @type {Map> } */ this.gossip = new Map() /** * Map of control messages + * peer id => control message * - * @type {Map} + * @type {Map} */ this.control = new Map() @@ -172,8 +177,9 @@ class Gossipsub extends BasicPubsub { /** * Connection direction cache, marks peers with outbound connections + * peer id => direction * - * @type {Map} + * @type {Map} */ this.outbound = new Map() @@ -250,28 +256,29 @@ class Gossipsub extends BasicPubsub { */ _removePeer (peerId: PeerId): PeerStreams { const peerStreams = super._removePeer(peerId) + const id = peerId.toB58String() // Remove this peer from the mesh // eslint-disable-next-line no-unused-vars for (const peers of this.mesh.values()) { - peers.delete(peerStreams) + peers.delete(id) } // Remove this peer from the fanout // eslint-disable-next-line no-unused-vars for (const peers of this.fanout.values()) { - peers.delete(peerStreams) + peers.delete(id) } // Remove from gossip mapping - this.gossip.delete(peerStreams) + this.gossip.delete(id) // Remove from control mapping - this.control.delete(peerStreams) + this.control.delete(id) // Remove from backoff mapping - this.outbound.delete(peerStreams) + this.outbound.delete(id) // Remove from peer scoring - this.score.removePeer(peerId.toB58String()) + this.score.removePeer(id) return peerStreams } @@ -285,10 +292,10 @@ class Gossipsub extends BasicPubsub { * @param {RPC} rpc * @returns {boolean} */ - _processRpc (idB58Str: string, peerStreams: PeerStreams, rpc: RPC): boolean { - if (super._processRpc(idB58Str, peerStreams, rpc)) { + _processRpc (id: string, peerStreams: PeerStreams, rpc: RPC): boolean { + if (super._processRpc(id, peerStreams, rpc)) { if (rpc.control) { - this._processRpcControlMessage(peerStreams, rpc.control) + this._processRpcControlMessage(id, rpc.control) } return true } @@ -297,26 +304,26 @@ class Gossipsub extends BasicPubsub { /** * Handles an rpc control message from a peer - * @param {PeerStreams} peerStreams + * @param {string} id peer id * @param {ControlMessage} controlMsg * @returns {void} */ - _processRpcControlMessage (peerStreams: PeerStreams, controlMsg: ControlMessage): void { + _processRpcControlMessage (id: string, controlMsg: ControlMessage): void { if (!controlMsg) { return } - const iwant = this._handleIHave(peerStreams, controlMsg.ihave) - const ihave = this._handleIWant(peerStreams, controlMsg.iwant) - const prune = this._handleGraft(peerStreams, controlMsg.graft) - this._handlePrune(peerStreams, controlMsg.prune) + const iwant = this._handleIHave(id, controlMsg.ihave) + const ihave = this._handleIWant(id, controlMsg.iwant) + const prune = this._handleGraft(id, controlMsg.graft) + this._handlePrune(id, controlMsg.prune) if (!iwant || !ihave || !prune) { return } const outRpc = createGossipRpc(ihave, { iwant: [iwant], prune }) - this._sendRpc(peerStreams, outRpc) + this._sendRpc(id, outRpc) } /** @@ -356,17 +363,16 @@ class Gossipsub extends BasicPubsub { // If options.gossipIncoming is false, do NOT emit incoming messages to peers if (this._options.gossipIncoming) { // Emit to floodsub peers - this.peers.forEach((peer) => { - const id = peer.id.toB58String() - if (peer.protocol === constants.FloodsubID && + this.peers.forEach((peer, id) => { + if ( + peer.protocol === constants.FloodsubID && id !== msg.from && topics.some(topic => { const t = this.topics.get(topic) return t && t.has(id) - }) && - peer.isWritable + }) ) { - this._sendRpc(peer, rpc) + this._sendRpc(id, rpc) this.log('publish msg on topics - floodsub', topics, id) } }) @@ -377,12 +383,12 @@ class Gossipsub extends BasicPubsub { if (!meshPeers) { return } - meshPeers.forEach((peer) => { - if (!peer.isWritable || peer.id.toB58String() === msg.from) { + meshPeers.forEach((id) => { + if (id === msg.from) { return } - this._sendRpc(peer, rpc) - this.log('publish msg on topic - meshsub', topic, peer.id.toB58String()) + this._sendRpc(id, rpc) + this.log('publish msg on topic - meshsub', topic, id) }) }) } @@ -427,13 +433,12 @@ class Gossipsub extends BasicPubsub { /** * Handles IHAVE messages - * @param {PeerStreams} peerStreams + * @param {string} id peer id * @param {Array} ihave * @returns {ControlIWant} */ - _handleIHave (peerStreams: PeerStreams, ihave: ControlIHave[]): ControlIWant | undefined { + _handleIHave (id: string, ihave: ControlIHave[]): ControlIWant | undefined { // we ignore IHAVE gossip from any peer whose score is below the gossips threshold - const id = peerStreams.id.toB58String() const score = this.score.score(id) if (score < this._options.scoreThresholds.gossipThreshold) { this.log( @@ -510,13 +515,13 @@ class Gossipsub extends BasicPubsub { /** * Handles IWANT messages * Returns messages to send back to peer - * @param {PeerStreams} peerStreams + * @param {string} id peer id * @param {Array} iwant * @returns {Array} */ - _handleIWant (peerStreams: PeerStreams, iwant: ControlIWant[]): Message[] | undefined { + _handleIWant (id: string, iwant: ControlIWant[]): Message[] | undefined { // @type {Map} - const ihave = new Map() + const ihave = new Map() iwant.forEach(({ messageIDs }) => { messageIDs.forEach((msgID) => { @@ -531,20 +536,19 @@ class Gossipsub extends BasicPubsub { return } - this.log('IWANT: Sending %d messages to %s', ihave.size, peerStreams.id.toB58String()) + this.log('IWANT: Sending %d messages to %s', ihave.size, id) - return Array.from(ihave.values()) + return Array.from(ihave.values()).map(utils.normalizeOutRpcMessage) } /** * Handles Graft messages - * @param {PeerStreams} peerStreams + * @param {string} id peer id * @param {Array} graft * @return {Array} */ - _handleGraft (peerStreams: PeerStreams, graft: ControlGraft[]): ControlPrune[] | undefined { + _handleGraft (id: string, graft: ControlGraft[]): ControlPrune[] | undefined { const prune: string[] = [] - const id = peerStreams.id.toB58String() const score = this.score.score(id) const now = this._now() @@ -560,7 +564,7 @@ class Gossipsub extends BasicPubsub { } // check if peer is already in the mesh; if so do nothing - if (peersInMesh.has(peerStreams)) { + if (peersInMesh.has(id)) { return } @@ -607,14 +611,14 @@ class Gossipsub extends BasicPubsub { // check the number of mesh peers; if it is at (or over) Dhi, we only accept grafts // from peers with outbound connections; this is a defensive check to restrict potential // mesh takeover attacks combined with love bombing - if (peersInMesh.size >= constants.GossipsubDhi && !this.outbound.get(peerStreams)) { + if (peersInMesh.size >= constants.GossipsubDhi && !this.outbound.get(id)) { prune.push(topicID) this._addBackoff(id, topicID) return } this.log('GRAFT: Add mesh link from %s in %s', id, topicID) - peersInMesh.add(peerStreams) + peersInMesh.add(id) peersInTopic.add(id) }) @@ -627,12 +631,11 @@ class Gossipsub extends BasicPubsub { /** * Handles Prune messages - * @param {PeerStreams} peerStreams + * @param {string} id peer id * @param {Array} prune * @returns {void} */ - _handlePrune (peerStreams: PeerStreams, prune: ControlPrune[]): void { - const id = peerStreams.id.toB58String() + _handlePrune (id: string, prune: ControlPrune[]): void { prune.forEach(({ topicID, backoff }) => { if (!topicID) { return @@ -643,7 +646,7 @@ class Gossipsub extends BasicPubsub { return } this.log('PRUNE: Remove mesh link to %s in %s', id, topicID) - peersInMesh.delete(peerStreams) + peersInMesh.delete(id) peersInTopic.delete(id) // is there a backoff specified by the peer? if so obey it if (typeof backoff === 'number' && backoff > 0) { @@ -835,33 +838,31 @@ class Gossipsub extends BasicPubsub { if (fanoutPeers) { // these peers have a score above the publish threshold, which may be negative // so drop the ones with a negative score - fanoutPeers.forEach(p => { - if (this.score.score(p.id.toB58String()) < 0) { - fanoutPeers.delete(p) + fanoutPeers.forEach(id => { + if (this.score.score(id) < 0) { + fanoutPeers.delete(id) } }) if (fanoutPeers.size < constants.GossipsubD) { // we need more peers; eager, as this would get fixed in the next heartbeat - getGossipPeers(this, topic, constants.GossipsubD - fanoutPeers.size, (p: PeerStreams): boolean => { - const id = p.id.toB58String() + getGossipPeers(this, topic, constants.GossipsubD - fanoutPeers.size, (id: string): boolean => { // filter our current peers, direct peers, and peers with negative scores - return !fanoutPeers.has(p) && !this.direct.has(id) && this.score.score(id) >= 0 - }).forEach(p => fanoutPeers.add(p)) + return !fanoutPeers.has(id) && !this.direct.has(id) && this.score.score(id) >= 0 + }).forEach(id => fanoutPeers.add(id)) } this.mesh.set(topic, fanoutPeers) this.fanout.delete(topic) this.lastpub.delete(topic) } else { - const peers = getGossipPeers(this, topic, constants.GossipsubD, (p: PeerStreams): boolean => { - const id = p.id.toB58String() + const peers = getGossipPeers(this, topic, constants.GossipsubD, (id: string): boolean => { // filter direct peers and peers with negative score return !this.direct.has(id) && this.score.score(id) >= 0 }) this.mesh.set(topic, peers) } - this.mesh.get(topic)!.forEach((peer) => { - this.log('JOIN: Add mesh link to %s in %s', peer.id.toB58String(), topic) - this._sendGraft(peer, topic) + this.mesh.get(topic)!.forEach((id) => { + this.log('JOIN: Add mesh link to %s in %s', id, topic) + this._sendGraft(id, topic) }) }) } @@ -880,9 +881,9 @@ class Gossipsub extends BasicPubsub { // Send PRUNE to mesh peers const meshPeers = this.mesh.get(topic) if (meshPeers) { - meshPeers.forEach((peer) => { - this.log('LEAVE: Remove mesh link to %s in %s', peer.id.toB58String(), topic) - this._sendPrune(peer, topic) + meshPeers.forEach((id) => { + this.log('LEAVE: Remove mesh link to %s in %s', id, topic) + this._sendPrune(id, topic) }) this.mesh.delete(topic) } @@ -914,7 +915,7 @@ class Gossipsub extends BasicPubsub { this.messageCache.put(msg) - const tosend = new Set() + const tosend = new Set() msg.topicIDs.forEach((topic) => { const peersInTopic = this.topics.get(topic) if (!peersInTopic) { @@ -926,10 +927,7 @@ class Gossipsub extends BasicPubsub { // send to direct peers and _all_ peers meeting the publishThreshold peersInTopic.forEach(id => { if (this.direct.has(id) || this.score.score(id) >= this._options.scoreThresholds.publishThreshold) { - const peerStreams = this.peers.get(id) - if (peerStreams) { - tosend.add(peerStreams) - } + tosend.add(id) } }) } else { @@ -939,10 +937,7 @@ class Gossipsub extends BasicPubsub { // direct peers this.direct.forEach(id => { - const peerStreams = this.peers.get(id) - if (peerStreams) { - tosend.add(peerStreams) - } + tosend.add(id) }) // floodsub peers @@ -953,7 +948,7 @@ class Gossipsub extends BasicPubsub { return } if (peerStreams.protocol === constants.FloodsubID && score >= this._options.scoreThresholds.publishThreshold) { - tosend.add(peerStreams) + tosend.add(id) } }) @@ -964,8 +959,8 @@ class Gossipsub extends BasicPubsub { meshPeers = this.fanout.get(topic) if (!meshPeers) { // If we are not in the fanout, then pick peers in topic above the publishThreshold - const peers = getGossipPeers(this, topic, constants.GossipsubD, peer => { - return this.score.score(peer.id.toB58String()) >= this._options.scoreThresholds.publishThreshold + const peers = getGossipPeers(this, topic, constants.GossipsubD, id => { + return this.score.score(id) >= this._options.scoreThresholds.publishThreshold }) if (peers.size > 0) { @@ -988,71 +983,75 @@ class Gossipsub extends BasicPubsub { const rpc = createGossipRpc([ await this._buildMessage(msg) ]) - tosend.forEach((peer) => { - if (peer.id.toB58String() === msg.from) { + tosend.forEach((id) => { + if (id === msg.from) { return } - this._sendRpc(peer, rpc) + this._sendRpc(id, rpc) }) } /** * Sends a GRAFT message to a peer - * @param {PeerStreams} peerStreams - * @param {String} topic + * @param {string} id peer id + * @param {string} topic * @returns {void} */ - _sendGraft (peerStreams: PeerStreams, topic: string): void { + _sendGraft (id: string, topic: string): void { const graft = [{ topicID: topic }] const out = createGossipRpc([], { graft }) - this._sendRpc(peerStreams, out) + this._sendRpc(id, out) } /** * Sends a PRUNE message to a peer - * @param {PeerStreams} peerStreams - * @param {String} topic + * @param {string} id peer id + * @param {string} topic * @returns {void} */ - _sendPrune (peerStreams: PeerStreams, topic: string): void { + _sendPrune (id: string, topic: string): void { const prune = [ - this._makePrune(peerStreams.id.toB58String(), topic) + this._makePrune(id, topic) ] const out = createGossipRpc([], { prune }) - this._sendRpc(peerStreams, out) + this._sendRpc(id, out) } - _sendRpc (peerStreams: PeerStreams, outRpc: RPC): void { + /** + * @override + */ + _sendRpc (id: string, outRpc: RPC): void { + const peerStreams = this.peers.get(id) if (!peerStreams || !peerStreams.isWritable) { return } // piggyback control message retries - const ctrl = this.control.get(peerStreams) + const ctrl = this.control.get(id) if (ctrl) { - this._piggybackControl(peerStreams, outRpc, ctrl) - this.control.delete(peerStreams) + this._piggybackControl(id, outRpc, ctrl) + this.control.delete(id) } // piggyback gossip - const ihave = this.gossip.get(peerStreams) + const ihave = this.gossip.get(id) if (ihave) { - this._piggybackGossip(peerStreams, outRpc, ihave) - this.gossip.delete(peerStreams) + this._piggybackGossip(id, outRpc, ihave) + this.gossip.delete(id) } peerStreams.write(RPCCodec.encode(outRpc)) } - _piggybackControl (peerStreams: PeerStreams, outRpc: RPC, ctrl: ControlMessage): void { + _piggybackControl (id: string, outRpc: RPC, ctrl: ControlMessage): void { const tograft = (ctrl.graft || []) - .filter(({ topicID }) => (topicID && this.mesh.get(topicID) || new Set()).has(peerStreams)) + .filter(({ topicID }) => (topicID && this.mesh.get(topicID) || new Set()).has(id)) const toprune = (ctrl.prune || []) - .filter(({ topicID }) => !(topicID && this.mesh.get(topicID) || new Set()).has(peerStreams)) + .filter(({ topicID }) => !(topicID && this.mesh.get(topicID) || new Set()).has(id)) if (!tograft.length && !toprune.length) { return @@ -1066,7 +1065,7 @@ class Gossipsub extends BasicPubsub { } } - _piggybackGossip (peerStreams: PeerStreams, outRpc: RPC, ihave: ControlIHave[]): void { + _piggybackGossip (id: string, outRpc: RPC, ihave: ControlIHave[]): void { if (!outRpc.control) { outRpc.control = { ihave: [], iwant: [], graft: [], prune: [] } } @@ -1075,39 +1074,37 @@ class Gossipsub extends BasicPubsub { /** * Send graft and prune messages - * @param {Map>} tograft - * @param {Map>} toprune + * @param {Map>} tograft peer id => topic[] + * @param {Map>} toprune peer id => topic[] */ - _sendGraftPrune (tograft: Map, toprune: Map): void { - for (const [p, topics] of tograft) { - const id = p.id.toB58String() + _sendGraftPrune (tograft: Map, toprune: Map): void { + for (const [id, topics] of tograft) { const graft = topics.map((topicID) => ({ topicID })) let prune: ControlPrune[] = [] // If a peer also has prunes, process them now - const pruning = toprune.get(p) + const pruning = toprune.get(id) if (pruning) { prune = pruning.map((topicID) => this._makePrune(id, topicID)) - toprune.delete(p) + toprune.delete(id) } const outRpc = createGossipRpc([], { graft, prune }) - this._sendRpc(p, outRpc) + this._sendRpc(id, outRpc) } - for (const [p, topics] of toprune) { - const id = p.id.toB58String() + for (const [id, topics] of toprune) { const prune = topics.map((topicID) => this._makePrune(id, topicID)) const outRpc = createGossipRpc([], { prune }) - this._sendRpc(p, outRpc) + this._sendRpc(id, outRpc) } } /** * Emits gossip to peers in a particular topic - * @param {String} topic - * @param {Set} exclude peers to exclude + * @param {string} topic + * @param {Set} exclude peers to exclude * @returns {void} */ - _emitGossip (topic: string, exclude: Set): void { + _emitGossip (topic: string, exclude: Set): void { const messageIDs = this.messageCache.getGossipIDs(topic) if (!messageIDs.length) { return @@ -1126,7 +1123,7 @@ class Gossipsub extends BasicPubsub { // First we collect the peers above gossipThreshold that are not in the exclude set // and then randomly select from that set // We also exclude direct peers, as there is no reason to emit gossip to them - const peersToGossip: PeerStreams[] = [] + const peersToGossip: string[] = [] const topicPeers = this.topics.get(topic) if (!topicPeers) { // no topic peers, no gossip @@ -1138,12 +1135,12 @@ class Gossipsub extends BasicPubsub { return } if ( - !exclude.has(peerStreams) && + !exclude.has(id) && !this.direct.has(id) && hasGossipProtocol(peerStreams.protocol) && this.score.score(id) >= this._options.scoreThresholds.gossipThreshold ) { - peersToGossip.push(peerStreams) + peersToGossip.push(id) } }) @@ -1158,7 +1155,7 @@ class Gossipsub extends BasicPubsub { shuffle(peersToGossip) } // Emit the IHAVE gossip to the selected peers up to the target - peersToGossip.slice(0, target).forEach(p => { + peersToGossip.slice(0, target).forEach(id => { let peerMessageIDs = messageIDs if (messageIDs.length > constants.GossipsubMaxIHaveLength) { // shuffle and slice message IDs per peer so that we emit a different set for each peer @@ -1166,7 +1163,7 @@ class Gossipsub extends BasicPubsub { // coverage when we do truncate peerMessageIDs = shuffle(peerMessageIDs.slice()).slice(0, constants.GossipsubMaxIHaveLength) } - this._pushGossip(p, { + this._pushGossip(id, { topicID: topic, messageIDs: peerMessageIDs }) @@ -1197,10 +1194,10 @@ class Gossipsub extends BasicPubsub { * @param {Array} controlIHaveMsgs * @returns {void} */ - _pushGossip (peerStreams: PeerStreams, controlIHaveMsgs: ControlIHave): void { - this.log('Add gossip to %s', peerStreams.id.toB58String()) - const gossip = this.gossip.get(peerStreams) || [] - this.gossip.set(peerStreams, gossip.concat(controlIHaveMsgs)) + _pushGossip (id: string, controlIHaveMsgs: ControlIHave): void { + this.log('Add gossip to %s', id) + const gossip = this.gossip.get(id) || [] + this.gossip.set(id, gossip.concat(controlIHaveMsgs)) } /** diff --git a/ts/pubsub.js b/ts/pubsub.js index f369d137..1184a5b5 100644 --- a/ts/pubsub.js +++ b/ts/pubsub.js @@ -80,13 +80,10 @@ class BasicPubSub extends Pubsub { */ async _onPeerConnected (peerId, conn) { await super._onPeerConnected(peerId, conn) - const idB58Str = peerId.toB58String() - const peerStreams = this.peers.get(idB58Str) + const id = peerId.toB58String() - if (peerStreams && peerStreams.isWritable) { - // Immediately send my own subscriptions to the newly established conn - this._sendSubscriptions(peerStreams, Array.from(this.subscriptions), true) - } + // Immediately send my own subscriptions to the newly established conn + this._sendSubscriptions(id, Array.from(this.subscriptions), true) } /** @@ -312,7 +309,7 @@ class BasicPubSub extends Pubsub { }) // Broadcast SUBSCRIBE to all peers - this.peers.forEach((peer) => this._sendSubscriptions(peer, topics, true)) + this.peers.forEach((_, id) => this._sendSubscriptions(id, topics, true)) } /** @@ -348,7 +345,7 @@ class BasicPubSub extends Pubsub { }) // Broadcast UNSUBSCRIBE to all peers ready - this.peers.forEach((peer) => this._sendSubscriptions(peer, topics, false)) + this.peers.forEach((_, id) => this._sendSubscriptions(id, topics, false)) } /** @@ -417,11 +414,12 @@ class BasicPubSub extends Pubsub { /** * Send an rpc object to a peer - * @param {PeerStreams} peerStreams + * @param {string} id peer id * @param {RPC} rpc * @returns {void} */ - _sendRpc (peerStreams, rpc) { + _sendRpc (id, rpc) { + const peerStreams = this.peers.get(id) if (!peerStreams || !peerStreams.isWritable) { return } @@ -430,13 +428,13 @@ class BasicPubSub extends Pubsub { /** * Send subscroptions to a peer - * @param {PeerStreams} peerStreams + * @param {string} id peer id * @param {string[]} topics * @param {boolean} subscribe set to false for unsubscriptions * @returns {void} */ - _sendSubscriptions (peerStreams, topics, subscribe) { - return this._sendRpc(peerStreams, { + _sendSubscriptions (id, topics, subscribe) { + return this._sendRpc(id, { subscriptions: topics.map(t => ({ topicID: t, subscribe: subscribe })) }) }