Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions test/2-nodes.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ describe('2 nodes', () => {
new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))
])

expect(first(nodes[0].mesh.get(topic)).id.toB58String()).to.equal(first(nodes[0].peers).id.toB58String())
expect(first(nodes[1].mesh.get(topic)).id.toB58String()).to.equal(first(nodes[1].peers).id.toB58String())
expect(first(nodes[0].mesh.get(topic))).to.equal(first(nodes[0].peers).id.toB58String())
expect(first(nodes[1].mesh.get(topic))).to.equal(first(nodes[1].peers).id.toB58String())
})
})

Expand Down
9 changes: 4 additions & 5 deletions test/gossip.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ describe('gossip', () => {

nodeA._pushGossip.getCalls()
.map((call) => call.args[0])
.forEach((peer) => {
const peerId = peer.id.toB58String()
nodeA.mesh.get(topic).forEach((meshPeer) => {
expect(meshPeer.id.toB58String()).to.not.equal(peerId)
.forEach((peerId) => {
nodeA.mesh.get(topic).forEach((meshPeerId) => {
expect(meshPeerId).to.not.equal(peerId)
})
})

Expand All @@ -74,7 +73,7 @@ describe('gossip', () => {
await delay(500)

const peerB = first(nodeA.mesh.get(topic))
const nodeB = nodes.find((n) => n.peerId.toB58String() === peerB.id.toB58String())
const nodeB = nodes.find((n) => n.peerId.toB58String() === peerB)

// set spy
sinon.spy(nodeA, '_piggybackControl')
Expand Down
13 changes: 6 additions & 7 deletions ts/getGossipPeers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { shuffle, hasGossipProtocol } from './utils'
import { PeerStreams } from './peerStreams'
import Gossipsub = require('./index')

/**
Expand All @@ -10,33 +9,33 @@ import Gossipsub = require('./index')
* @param {String} topic
* @param {Number} count
* @param {Function} [filter] a function to filter acceptable peers
* @returns {Set<Peer>}
* @returns {Set<string>}
*
*/
export function getGossipPeers (
router: Gossipsub,
topic: string,
count: number,
filter: (peerStreams: PeerStreams) => boolean = () => true
): Set<PeerStreams> {
filter: (id: string) => boolean = () => true
): Set<string> {
const peersInTopic = router.topics.get(topic)
if (!peersInTopic) {
return new Set()
}

// Adds all peers using our protocol
// that also pass the filter function
let peers: PeerStreams[] = []
let peers: string[] = []
peersInTopic.forEach((id) => {
const peerStreams = router.peers.get(id)
if (!peerStreams) {
return
}
if (
hasGossipProtocol(peerStreams.protocol) &&
filter(peerStreams)
filter(id)
) {
peers.push(peerStreams)
peers.push(id)
}
})

Expand Down
71 changes: 32 additions & 39 deletions ts/heartbeat.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as constants from './constants'
import { getGossipPeers } from './getGossipPeers'
import { shuffle } from './utils'
import { PeerStreams } from './peerStreams'
import Gossipsub = require('./index')
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
Expand Down Expand Up @@ -84,8 +83,10 @@ export class Heartbeat {
return s
}

const tograft = new Map<PeerStreams, string[]>()
const toprune = new Map<PeerStreams, string[]>()
// peer id => topic[]
const tograft = new Map<string, string[]>()
// peer id => topic[]
const toprune = new Map<string, string[]>()

// clean up expired backoffs
this.gossipsub._clearBackoff()
Expand All @@ -103,8 +104,7 @@ export class Heartbeat {
// maintain the mesh for topics we have joined
this.gossipsub.mesh.forEach((peers, topic) => {
// prune/graft helper functions (defined per topic)
const prunePeer = (p: PeerStreams): void => {
const id = p.id.toB58String()
const prunePeer = (id: string): void => {
this.gossipsub.log(
'HEARTBEAT: Remove mesh link to %s in %s',
id, topic
Expand All @@ -114,55 +114,52 @@ export class Heartbeat {
// add prune backoff record
this.gossipsub._addBackoff(id, topic)
// remove peer from mesh
peers.delete(p)
peers.delete(id)
// add to toprune
const topics = toprune.get(p)
const topics = toprune.get(id)
if (!topics) {
toprune.set(p, [topic])
toprune.set(id, [topic])
} else {
topics.push(topic)
}
}
const graftPeer = (p: PeerStreams): void => {
const id = p.id.toB58String()
const graftPeer = (id: string): void => {
this.gossipsub.log(
'HEARTBEAT: Add mesh link to %s in %s',
id, topic
)
// update peer score
this.gossipsub.score.graft(id, topic)
// add peer to mesh
peers.add(p)
peers.add(id)
// add to tograft
const topics = tograft.get(p)
const topics = tograft.get(id)
if (!topics) {
tograft.set(p, [topic])
tograft.set(id, [topic])
} else {
topics.push(topic)
}
}

// drop all peers with negative score
peers.forEach(p => {
const id = p.id.toB58String()
peers.forEach(id => {
const score = getScore(id)
if (score < 0) {
this.gossipsub.log(
'HEARTBEAT: Prune peer %s with negative score: score=%d, topic=%s',
id, score, topic
)
prunePeer(p)
prunePeer(id)
}
})

// do we have enough peers?
if (peers.size < constants.GossipsubDlo) {
const backoff = this.gossipsub.backoff.get(topic)
const ineed = constants.GossipsubD - peers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => {
const id = p.id.toB58String()
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, id => {
// filter out mesh peers, direct peers, peers we are backing off, peers with negative score
return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
return !peers.has(id) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
})

peersSet.forEach(graftPeer)
Expand All @@ -172,7 +169,7 @@ export class Heartbeat {
if (peers.size > constants.GossipsubDhi) {
let peersArray = Array.from(peers)
// sort by score
peersArray.sort((a, b) => getScore(b.id.toB58String()) - getScore(a.id.toB58String()))
peersArray.sort((a, b) => getScore(b) - getScore(a))
// We keep the first D_score peers by score and the remaining up to D randomly
// under the constraint that we keep D_out peers in the mesh (if we have that many)
peersArray = peersArray.slice(0, constants.GossipsubDscore).concat(
Expand Down Expand Up @@ -237,10 +234,9 @@ export class Heartbeat {
if (outbound < constants.GossipsubDout) {
const ineed = constants.GossipsubDout - outbound
const backoff = this.gossipsub.backoff.get(topic)
getGossipPeers(this.gossipsub, topic, ineed, (p: PeerStreams): boolean => {
const id = p.id.toB58String()
getGossipPeers(this.gossipsub, topic, ineed, (id: string): boolean => {
// filter our current mesh peers, direct peers, peers we are backing off, peers with negative score
return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
return !peers.has(id) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
}).forEach(graftPeer)
}
}
Expand All @@ -256,24 +252,23 @@ export class Heartbeat {

// now compute the median peer score in the mesh
const peersList = Array.from(peers)
.sort((a, b) => getScore(a.id.toB58String()) - getScore(b.id.toB58String()))
.sort((a, b) => getScore(a) - getScore(b))
const medianIndex = peers.size / 2
const medianScore = getScore(peersList[medianIndex].id.toB58String())
const medianScore = getScore(peersList[medianIndex])

// if the median score is below the threshold, select a better peer (if any) and GRAFT
if (medianScore < this.gossipsub._options.scoreThresholds.opportunisticGraftThreshold) {
const backoff = this.gossipsub.backoff.get(topic)
const peersToGraft = getGossipPeers(this.gossipsub, topic, constants.GossipsubOpportunisticGraftPeers, (p: PeerStreams): boolean => {
const id = p.id.toB58String()
const peersToGraft = getGossipPeers(this.gossipsub, topic, constants.GossipsubOpportunisticGraftPeers, (id: string): boolean => {
// filter out current mesh peers, direct peers, peers we are backing off, peers below or at threshold
return peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore
return peers.has(id) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore
})
peersToGraft.forEach(p => {
peersToGraft.forEach(id => {
this.gossipsub.log(
'HEARTBEAT: Opportunistically graft peer %s on topic %s',
p.id.toB58String(), topic
id, topic
)
graftPeer(p)
graftPeer(id)
})
}
}
Expand All @@ -296,28 +291,26 @@ export class Heartbeat {
this.gossipsub.fanout.forEach((fanoutPeers, topic) => {
// checks whether our peers are still in the topic and have a score above the publish threshold
const topicPeers = this.gossipsub.topics.get(topic)
fanoutPeers.forEach(p => {
const id = p.id.toB58String()
fanoutPeers.forEach(id => {
if (
!topicPeers!.has(id) ||
getScore(id) < this.gossipsub._options.scoreThresholds.publishThreshold
) {
fanoutPeers.delete(p)
fanoutPeers.delete(id)
}
})

// do we need more peers?
if (fanoutPeers.size < constants.GossipsubD) {
const ineed = constants.GossipsubD - fanoutPeers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, (p: PeerStreams): boolean => {
const id = p.id.toB58String()
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, (id: string): boolean => {
// filter out existing fanout peers, direct peers, and peers with score above the publish threshold
return !fanoutPeers.has(p) &&
return !fanoutPeers.has(id) &&
!this.gossipsub.direct.has(id) &&
getScore(id) >= this.gossipsub._options.scoreThresholds.publishThreshold
})
peersSet.forEach(p => {
fanoutPeers.add(p)
peersSet.forEach(id => {
fanoutPeers.add(id)
})
}

Expand Down
Loading