Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions ts/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ export class Heartbeat {
// clean up expired backoffs
this.gossipsub._clearBackoff()

// ensure direct peers are connected
this.gossipsub._directConnect()

// maintain the mesh for topics we have joined
this.gossipsub.mesh.forEach((peers, topic) => {
// prune/graft helper functions (defined per topic)
Expand Down Expand Up @@ -151,8 +154,8 @@ export class Heartbeat {
const ineed = constants.GossipsubD - peers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, p => {
const id = p.id.toB58String()
// filter out mesh peers, peers we are backing off, peers with negative score
return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
// filter out mesh peers, direct peers, peers we are backing off, peers with negative score
return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
})

peersSet.forEach(graftPeer)
Expand Down Expand Up @@ -229,8 +232,8 @@ export class Heartbeat {
const backoff = this.gossipsub.backoff.get(topic)
getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => {
const id = p.id.toB58String()
// filter our current mesh peers, peers we are backing off, peers with negative score
return !peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
// filter our current mesh peers, direct peers, peers we are backing off, peers with negative score
return !peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) >= 0
}).forEach(graftPeer)
}
}
Expand All @@ -255,8 +258,8 @@ export class Heartbeat {
const backoff = this.gossipsub.backoff.get(topic)
const peersToGraft = getGossipPeers(this.gossipsub, topic, constants.GossipsubOpportunisticGraftPeers, (p: Peer): boolean => {
const id = p.id.toB58String()
// filter out current mesh peers, peres we are backing off, peers below or at threshold
return peers.has(p) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore
// filter out current mesh peers, direct peers, peers we are backing off, peers below or at threshold
return peers.has(p) && !this.gossipsub.direct.has(id) && (!backoff || !backoff.has(id)) && getScore(id) > medianScore
})
peersToGraft.forEach(p => {
this.gossipsub.log(
Expand Down Expand Up @@ -299,9 +302,11 @@ export class Heartbeat {
if (fanoutPeers.size < constants.GossipsubD) {
const ineed = constants.GossipsubD - fanoutPeers.size
const peersSet = getGossipPeers(this.gossipsub, topic, ineed, (p: Peer): boolean => {
// filter out existing fanout peers and peers with score above the publish threshold
const id = p.id.toB58String()
// filter out existing fanout peers, direct peers, and peers with score above the publish threshold
return !fanoutPeers.has(p) &&
getScore(p.id.toB58String()) >= this.gossipsub._options.scoreThresholds.publishThreshold
!this.gossipsub.direct.has(id) &&
getScore(id) >= this.gossipsub._options.scoreThresholds.publishThreshold
})
peersSet.forEach(p => {
fanoutPeers.add(p)
Expand Down
119 changes: 111 additions & 8 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { getGossipPeers } from './getGossipPeers'
import { createGossipRpc, shuffle, hasGossipProtocol } from './utils'
import { Peer } from './peer'
import { PeerScore, PeerScoreParams, PeerScoreThresholds, createPeerScoreParams, createPeerScoreThresholds } from './score'
import { Libp2p } from './interfaces'
import { AddrInfo, Libp2p } from './interfaces'
// @ts-ignore
import TimeCache = require('time-cache')
import PeerId = require('peer-id')
Expand All @@ -29,6 +29,7 @@ interface GossipInputOptions {
messageCache: MessageCache
scoreParams: Partial<PeerScoreParams>
scoreThresholds: Partial<PeerScoreThresholds>
directPeers: AddrInfo[]
}

interface GossipOptions extends GossipInputOptions {
Expand All @@ -38,6 +39,7 @@ interface GossipOptions extends GossipInputOptions {

class Gossipsub extends BasicPubsub {
peers: Map<string, Peer>
direct: Set<string>
topics: Map<string, Set<Peer>>
mesh: Map<string, Set<Peer>>
fanout: Map<string, Set<Peer>>
Expand All @@ -64,6 +66,7 @@ class Gossipsub extends BasicPubsub {
* @param {Object} [options.messageCache] override the default MessageCache
* @param {Object} [options.scoreParams] peer score parameters
* @param {Object} [options.scoreThresholds] peer score thresholds
* @param {AddrInfo[]} [options.directPeers] peers with which we will maintain direct connections
* @constructor
*/
constructor (
Expand All @@ -75,6 +78,7 @@ class Gossipsub extends BasicPubsub {
gossipIncoming: true,
fallbackToFloodsub: true,
floodPublish: true,
directPeers: [],
...options,
scoreParams: createPeerScoreParams(options.scoreParams),
scoreThresholds: createPeerScoreThresholds(options.scoreThresholds)
Expand All @@ -92,6 +96,17 @@ class Gossipsub extends BasicPubsub {
options: _options
})

/**
* Direct peers
* @type {Set<string>}
*/
this.direct = new Set(_options.directPeers.map(p => p.id.toB58String()))

// set direct peer addresses in the address book
_options.directPeers.forEach(p => {
p.addrs.forEach(ma => libp2p.peerStore.addressBook.add(p.id, ma))
})

/**
* Cache of seen messages
*
Expand Down Expand Up @@ -349,6 +364,16 @@ class Gossipsub extends BasicPubsub {
super._publishFrom(peer, msg)
}

/**
* Whether to accept a message from a peer
* @override
* @param {string} id
* @returns {boolean}
*/
_acceptFrom (id: string): boolean {
return this.direct.has(id) || this.score.score(id) >= this._options.scoreThresholds.graylistThreshold
}

/**
* Coerse topic validator result to valid/invalid boolean
* Provide extended validator support
Expand Down Expand Up @@ -467,6 +492,14 @@ class Gossipsub extends BasicPubsub {
return
}

// we don't GRAFT to/from direct peers; complain loudly if this happens
if (this.direct.has(id)) {
this.log('GRAFT: ignoring request from direct peer %s', id)
// this is possibly a bug from a non-reciprical configuration; send a PRUNE
prune.push(topicID)
return
}

// make sure we are not backing off that peer
const expire = this.backoff.get(topicID)?.get(id)
if (typeof expire === 'number' && now < expire) {
Expand Down Expand Up @@ -600,6 +633,31 @@ class Gossipsub extends BasicPubsub {
})
}

/**
* Maybe reconnect to direct peers
* @returns {void}
*/
_directConnect (): void {
// we only do this every few ticks to allow pending connections to complete and account for
// restarts/downtime
if (this.heartbeatTicks % constants.GossipsubDirectConnectTicks !== 0) {
return
}

const toconnect: string[] = []
this.direct.forEach(id => {
const peer = this.peers.get(id)
if (!peer || !peer.isConnected) {
toconnect.push(id)
}
})
if (toconnect.length) {
toconnect.forEach(id => {
this._connect(id)
})
}
}

/**
* Mounts the gossipsub protocol onto the libp2p node and sends our
* our subscriptions to every peer connected
Expand All @@ -610,6 +668,12 @@ class Gossipsub extends BasicPubsub {
await super.start()
this.heartbeat.start()
this.score.start()
// connect to direct peers
this._directPeerInitial = setTimeout(() => {
this.direct.forEach(id => {
this._connect(id)
})
}, constants.GossipsubDirectConnectInitialDelay)
}

/**
Expand All @@ -629,6 +693,16 @@ class Gossipsub extends BasicPubsub {
this.control = new Map()
this.backoff = new Map()
this.outbound = new Map()
clearTimeout(this._directPeerInitial)
}

/**
* Connect to a peer using the gossipsub protocol
* @param {string} id
* @returns {void}
*/
_connect (id: string): void {
this._libp2p.dialProtocol(id, this.multicodecs)
}

/**
Expand Down Expand Up @@ -669,14 +743,32 @@ class Gossipsub extends BasicPubsub {
this.log('JOIN %s', topics)

;(topics as string[]).forEach((topic) => {
// Send GRAFT to mesh peers
const fanoutPeers = this.fanout.get(topic)
if (fanoutPeers) {
// these peers have a score above the publish threshold, which may be negative
// so drop the ones with a negative score
fanoutPeers.forEach(p => {
if (this.score.score(p.id.toB58String()) < 0) {
fanoutPeers.delete(p)
}
})
if (fanoutPeers.size < constants.GossipsubD) {
// we need more peers; eager, as this would get fixed in the next heartbeat
getGossipPeers(this, topic, constants.GossipsubD - fanoutPeers.size, (p: Peer): boolean => {
const id = p.id.toB58String()
// filter our current peers, direct peers, and peers with negative scores
return !fanoutPeers.has(p) && !this.direct.has(id) && this.score.score(id) >= 0
}).forEach(p => fanoutPeers.add(p))
}
this.mesh.set(topic, fanoutPeers)
this.fanout.delete(topic)
this.lastpub.delete(topic)
} else {
const peers = getGossipPeers(this, topic, constants.GossipsubD)
const peers = getGossipPeers(this, topic, constants.GossipsubD, (p: Peer): boolean => {
const id = p.id.toB58String()
// filter direct peers and peers with negative score
return !this.direct.has(id) && this.score.score(id) >= 0
})
this.mesh.set(topic, peers)
}
this.mesh.get(topic)!.forEach((peer) => {
Expand Down Expand Up @@ -744,18 +836,26 @@ class Gossipsub extends BasicPubsub {

if (this._options.floodPublish) {
// flood-publish behavior
// send to _all_ peers meeting the publishThreshold
// send to direct peers and _all_ peers meeting the publishThreshold
peersInTopic.forEach(peer => {
const score = this.score.score(peer.id.toB58String())
if (score >= this._options.scoreThresholds.publishThreshold) {
const id = peer.id.toB58String()
if (this.direct.has(id) || this.score.score(id) >= this._options.scoreThresholds.publishThreshold) {
tosend.add(peer)
}
})
} else {
// non-flood-publish behavior
// send to subscribed floodsub peers
// send to direct peers, subscribed floodsub peers
// and some mesh peers above publishThreshold

// direct peers
this.direct.forEach(id => {
const peer = this.peers.get(id)
if (peer) {
tosend.add(peer)
}
})

// floodsub peers
peersInTopic.forEach((peer) => {
if (peer.protocols.includes(constants.FloodsubID)) {
Expand Down Expand Up @@ -929,17 +1029,20 @@ class Gossipsub extends BasicPubsub {
// Send gossip to GossipFactor peers above threshold with a minimum of D_lazy
// First we collect the peers above gossipThreshold that are not in the exclude set
// and then randomly select from that set
// We also exclude direct peers, as there is no reason to emit gossip to them
const peersToGossip: Peer[] = []
const topicPeers = this.topics.get(topic)
if (!topicPeers) {
// no topic peers, no gossip
return
}
topicPeers.forEach(p => {
const id = p.id.toB58String()
if (
!exclude.has(p) &&
!this.direct.has(id) &&
hasGossipProtocol(p.protocols) &&
this.score.score(p.id.toB58String()) >= this._options.scoreThresholds.gossipThreshold
this.score.score(id) >= this._options.scoreThresholds.gossipThreshold
) {
peersToGossip.push(p)
}
Expand Down