Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add iwant request tracking
  • Loading branch information
wemeetagain committed Jul 6, 2020
commit b3942e4caa5f1ab9d11fb0e6f34402c3d3b94e6d
32 changes: 29 additions & 3 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { getGossipPeers } from './getGossipPeers'
import { createGossipRpc, shuffle, hasGossipProtocol } from './utils'
import { Peer } from './peer'
import { PeerScore, PeerScoreParams, PeerScoreThresholds, createPeerScoreParams, createPeerScoreThresholds } from './score'
import { IWantTracer } from './tracer'
import { AddrInfo, Libp2p } from './interfaces'
// @ts-ignore
import TimeCache = require('time-cache')
Expand Down Expand Up @@ -52,6 +53,7 @@ class Gossipsub extends BasicPubsub {
outbound: Map<Peer, boolean>
score: PeerScore
heartbeatTicks: number
gossipTracer: IWantTracer
_libp2p: Libp2p
_options: GossipOptions

Expand Down Expand Up @@ -197,6 +199,11 @@ class Gossipsub extends BasicPubsub {
*/
this.heartbeatTicks = 0

/**
* Tracks IHAVE/IWANT promises broken by peers
*/
this.gossipTracer = new IWantTracer(this._msgIdFn)

/**
* libp2p
*/
Expand Down Expand Up @@ -341,7 +348,9 @@ class Gossipsub extends BasicPubsub {
* @returns {void}
*/
_publishFrom (peer: Peer, msg: InMessage): void {
this.score.deliverMessage(peer.id.toB58String(), msg as Message)
const id = peer.id.toB58String()
this.score.deliverMessage(id, msg as Message)
this.gossipTracer.deliverMessage(id, msg as Message)
const topics = msg.topicIDs

// If options.gossipIncoming is false, do NOT emit incoming messages to peers
Expand Down Expand Up @@ -401,15 +410,18 @@ class Gossipsub extends BasicPubsub {
*/
_processTopicValidatorResult (topic: string, peer: Peer, message: Message, result: unknown): boolean {
if (typeof result === 'string') {
const id = peer.id.toB58String()
// assume an extended topic validator result
switch (result) {
case ExtendedValidatorResult.accept:
return true
case ExtendedValidatorResult.reject:
this.score.rejectMessage(peer.id.toB58String(), message)
this.score.rejectMessage(id, message)
this.gossipTracer.rejectMessage(id, message)
return false
case ExtendedValidatorResult.ignore:
this.score.ignoreMessage(peer.id.toB58String(), message)
this.score.ignoreMessage(id, message)
this.gossipTracer.rejectMessage(id, message)
return false
}
}
Expand Down Expand Up @@ -492,6 +504,8 @@ class Gossipsub extends BasicPubsub {
iwantList = iwantList.slice(0, iask)
this.iasked.set(id, iasked + iask)

this.gossipTracer.addPromise(id, iwantList)

return {
messageIDs: iwantList
}
Expand Down Expand Up @@ -671,6 +685,17 @@ class Gossipsub extends BasicPubsub {
}
}

/**
* Apply penalties from broken IHAVE/IWANT promises
* @returns {void}
*/
_applyIwantPenalties (): void {
this.gossipTracer.getBrokenPromises().forEach((count, p) => {
this.log('peer %s didn\'t follow up in %d IWANT requests; adding penalty', p, count)
this.score.addPenalty(p, count)
})
}

/**
* Clear expired backoff expiries
* @returns {void}
Expand Down Expand Up @@ -756,6 +781,7 @@ class Gossipsub extends BasicPubsub {
this.iasked = new Map()
this.backoff = new Map()
this.outbound = new Map()
this.gossipTracer.clear()
clearTimeout(this._directPeerInitial)
}

Expand Down
99 changes: 99 additions & 0 deletions ts/tracer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { Message } from './message'
import { GossipsubIWantFollowupTime } from './constants'

/**
* IWantTracer is an internal tracer that tracks IWANT requests in order to penalize
* peers who don't follow up on IWANT requests after an IHAVE advertisement.
* The tracking of promises is probabilistic to avoid using too much memory.
*
* Note: Do not confuse these 'promises' with JS Promise objects.
* These 'promises' are merely expectations of a peer's behavior.
*/
export class IWantTracer {
getMsgId: (msg: Message) => string
/**
* Promises to deliver a message
* Map per message id, per peer, promise expiration time
*/
promises: Map<string, Map<string, number>>
constructor (getMsgId: (msg: Message) => string) {
this.getMsgId = getMsgId
this.promises = new Map()
}

/**
* Track a promise to deliver a message from a list of msgIDs we are requesting
* @param {string} p peer id
* @param {string[]} msgIds
* @returns {void}
*/
addPromise (p: string, msgIds: string[]): void {
// pick msgId randomly from the list
const ix = Math.random() * msgIds.length
const msgId = msgIds[ix]

let peers = this.promises.get(msgId)
if (!peers) {
peers = new Map()
this.promises.set(msgId, peers)
}

if (!peers.has(p)) {
peers.set(p, Date.now() + GossipsubIWantFollowupTime)
}
}

/**
* Returns the number of broken promises for each peer who didn't follow up on an IWANT request.
* @returns {Map<string, number>}
*/
getBrokenPromises (): Map<string, number> {
const now = Date.now()
const result = new Map<string, number>()

this.promises.forEach((peers, msgId) => {
peers.forEach((expire, p) => {
// the promise has been broken
if (expire < now) {
// add 1 to result
result.set(p, (result.get(p) || 0) + 1)
// delete from tracked promises
peers.delete(p)
}
})
// clean up empty promises for a msgId
if (!peers.size) {
this.promises.delete(msgId)
}
})

return result
}

/**
* Someone delivered a message, stop tracking promises for it
* @param {string} p peer id
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we are not using the peerID

Copy link
Member Author

@wemeetagain wemeetagain Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to have the same function signature as PeerScore#deliverMessage.
So that we might in the future abstract the different 'tracers' as in go-libp2p-pubsub
See https://github.com/libp2p/go-libp2p-pubsub/blob/master/pubsub.go#L972
Eg: they don't call each one individually:

this.score.deliverMessage(p, msg)
this.gossipTracer.deliverMessage(p, msg)
this.tagTracer.deliverMessage(p, msg)

They have some sort of registration for each of their tracers, then all tracers get called on a single call to deliverMessage.
Note the go-libp2p-pubsub tracer functionality is happening on the pubsub layer, not the gossipsub layer. Aggregating our "tracers" together would be a first step towards moving in that direction. Not saying thats what we should do, but aligning function signatures is a prereq.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sounds good

* @param {Message} msg
* @returns {void}
*/
deliverMessage (p: string, msg: Message): void {
const msgId = this.getMsgId(msg)
this.promises.delete(msgId)
}

/**
* A message got rejected, so we can stop tracking promises and let the score penalty apply from invalid message delivery,
* unless its an obviously invalid message.
* @param {string} p peer id
* @param {Message} msg
* @returns {void}
*/
rejectMessage (p: string, msg: Message): void {
const msgId = this.getMsgId(msg)
this.promises.delete(msgId)
}

clear (): void {
this.promises.clear()
}
}