diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6d7477e0..7c3ff9e6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,13 @@
+
+## [0.4.5](https://github.com/ChainSafe/gossipsub-js/compare/v0.4.4...v0.4.5) (2020-06-04)
+
+
+### Bug Fixes
+
+* use unidirectional streams ([#78](https://github.com/ChainSafe/gossipsub-js/issues/78)) ([8118894](https://github.com/ChainSafe/gossipsub-js/commit/8118894))
+
+
+
## [0.4.4](https://github.com/ChainSafe/gossipsub-js/compare/v0.4.3...v0.4.4) (2020-06-03)
diff --git a/package.json b/package.json
index 51f1d9eb..b1a0fd6d 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "libp2p-gossipsub",
- "version": "0.4.4",
+ "version": "0.4.5",
"description": "A typescript implementation of gossipsub",
"leadMaintainer": "Cayman Nava ",
"main": "src/index.js",
@@ -38,10 +38,11 @@
"dependencies": {
"buffer": "^5.6.0",
"debug": "^4.1.1",
+ "denque": "^1.4.1",
"err-code": "^2.0.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.0.1",
- "libp2p-pubsub": "^0.5.0",
+ "libp2p-pubsub": "~0.5.2",
"p-map": "^4.0.0",
"peer-id": "~0.13.12",
"protons": "^1.0.1",
diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js
index a0a558b7..7c584efc 100644
--- a/test/2-nodes.spec.js
+++ b/test/2-nodes.spec.js
@@ -303,10 +303,8 @@ describe('2 nodes', () => {
} = await createGossipsubNodes(2, true))
})
- after(() => Promise.all(nodes.map((n) => n.stop())))
-
- it('existing subscriptions are sent upon peer connection', async function () {
- this.timeout(5000)
+ // Make subscriptions prior to new nodes
+ before(() => {
nodes[0].subscribe('Za')
nodes[1].subscribe('Zb')
@@ -314,17 +312,42 @@ describe('2 nodes', () => {
expectSet(nodes[0].subscriptions, ['Za'])
expect(nodes[1].peers.size).to.equal(0)
expectSet(nodes[1].subscriptions, ['Zb'])
+ })
- // Connect nodes
- const onConnect0 = registrarRecords[0][multicodec].onConnect
- const onConnect1 = registrarRecords[1][multicodec].onConnect
+ after(() => Promise.all(nodes.map((n) => n.stop())))
- // Notice peers of connection
- const [d0, d1] = ConnectionPair()
- onConnect0(nodes[1].peerId, d0)
- onConnect1(nodes[0].peerId, d1)
+ it('existing subscriptions are sent upon peer connection', async function () {
+ this.timeout(5000)
+
+ const dial = async () => {
+ // Connect nodes
+ const onConnect0 = registrarRecords[0][multicodec].onConnect
+ const onConnect1 = registrarRecords[1][multicodec].onConnect
+ const handle0 = registrarRecords[0][multicodec].handler
+ const handle1 = registrarRecords[1][multicodec].handler
+
+ // Notice peers of connection
+ const [d0, d1] = ConnectionPair()
+ await onConnect0(nodes[1].peerId, d0)
+ await handle1({
+ protocol: multicodec,
+ stream: d1.stream,
+ connection: {
+ remotePeer: nodes[0].peerId
+ }
+ })
+ await onConnect1(nodes[0].peerId, d1)
+ await handle0({
+ protocol: multicodec,
+ stream: d0.stream,
+ connection: {
+ remotePeer: nodes[1].peerId
+ }
+ })
+ }
await Promise.all([
+ dial(),
new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)),
new Promise((resolve) => nodes[1].once('pubsub:subscription-change', resolve))
])
diff --git a/test/floodsub.spec.js b/test/floodsub.spec.js
index 75e6ed16..831c1919 100644
--- a/test/floodsub.spec.js
+++ b/test/floodsub.spec.js
@@ -112,11 +112,27 @@ describe('gossipsub fallbacks to floodsub', () => {
const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
+ const handleGs = registrarRecords[0][floodsubMulticodec].handler
+ const handleFs = registrarRecords[1][floodsubMulticodec].handler
// Notice peers of connection
const [d0, d1] = ConnectionPair()
- onConnectGs(nodeFs.peerId, d0)
- onConnectFs(nodeGs.peerId, d1)
+ await onConnectGs(nodeFs.peerId, d0)
+ await handleFs({
+ protocol: floodsubMulticodec,
+ stream: d1.stream,
+ connection: {
+ remotePeer: nodeGs.peerId
+ }
+ })
+ await onConnectFs(nodeGs.peerId, d1)
+ await handleGs({
+ protocol: floodsubMulticodec,
+ stream: d0.stream,
+ connection: {
+ remotePeer: nodeFs.peerId
+ }
+ })
})
after(async function () {
@@ -167,11 +183,27 @@ describe('gossipsub fallbacks to floodsub', () => {
const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
+ const handleGs = registrarRecords[0][floodsubMulticodec].handler
+ const handleFs = registrarRecords[1][floodsubMulticodec].handler
// Notice peers of connection
const [d0, d1] = ConnectionPair()
- onConnectGs(nodeFs.peerId, d0)
- onConnectFs(nodeGs.peerId, d1)
+ await onConnectGs(nodeFs.peerId, d0)
+ await handleFs({
+ protocol: floodsubMulticodec,
+ stream: d1.stream,
+ connection: {
+ remotePeer: nodeGs.peerId
+ }
+ })
+ await onConnectFs(nodeGs.peerId, d1)
+ await handleGs({
+ protocol: floodsubMulticodec,
+ stream: d0.stream,
+ connection: {
+ remotePeer: nodeFs.peerId
+ }
+ })
nodeGs.subscribe(topic)
nodeFs.subscribe(topic)
@@ -288,11 +320,27 @@ describe('gossipsub fallbacks to floodsub', () => {
const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect
const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect
+ const handleGs = registrarRecords[0][floodsubMulticodec].handler
+ const handleFs = registrarRecords[1][floodsubMulticodec].handler
// Notice peers of connection
const [d0, d1] = ConnectionPair()
await onConnectGs(nodeFs.peerId, d0)
+ await handleFs({
+ protocol: floodsubMulticodec,
+ stream: d1.stream,
+ connection: {
+ remotePeer: nodeGs.peerId
+ }
+ })
await onConnectFs(nodeGs.peerId, d1)
+ await handleGs({
+ protocol: floodsubMulticodec,
+ stream: d0.stream,
+ connection: {
+ remotePeer: nodeFs.peerId
+ }
+ })
nodeGs.subscribe(topic)
nodeFs.subscribe(topic)
diff --git a/test/gossip.js b/test/gossip.js
index 3a6f0aec..cf001f67 100644
--- a/test/gossip.js
+++ b/test/gossip.js
@@ -32,7 +32,7 @@ describe('gossip', () => {
// add subscriptions to each node
nodes.forEach((n) => n.subscribe(topic))
- connectGossipsubNodes(nodes, registrarRecords, multicodec)
+ await connectGossipsubNodes(nodes, registrarRecords, multicodec)
await new Promise((resolve) => setTimeout(resolve, 1000))
@@ -69,7 +69,7 @@ describe('gossip', () => {
nodes.forEach((n) => n.subscribe(topic))
// every node connected to every other
- connectGossipsubNodes(nodes, registrarRecords, multicodec)
+ await connectGossipsubNodes(nodes, registrarRecords, multicodec)
await new Promise((resolve) => setTimeout(resolve, 500))
// await mesh rebalancing
await Promise.all(nodes.map((n) => new Promise((resolve) => n.once('gossipsub:heartbeat', resolve))))
diff --git a/test/mesh.spec.js b/test/mesh.spec.js
index b3de12fe..78ab89d2 100644
--- a/test/mesh.spec.js
+++ b/test/mesh.spec.js
@@ -35,15 +35,31 @@ describe('mesh overlay', () => {
// connect N (< GossipsubD) nodes to node0
const N = 4
const onConnect0 = registrarRecords[0][multicodec].onConnect
+ const handle0 = registrarRecords[0][multicodec].handler
for (let i = nodes.length; i > nodes.length - N; i--) {
const n = i - 1
const onConnectN = registrarRecords[n][multicodec].onConnect
+ const handleN = registrarRecords[n][multicodec].handler
// Notice peers of connection
const [d0, d1] = ConnectionPair()
- onConnect0(nodes[n].peerId, d0)
- onConnectN(nodes[0].peerId, d1)
+ await onConnect0(nodes[n].peerId, d0)
+ await handleN({
+ protocol: multicodec,
+ stream: d1.stream,
+ connection: {
+ remotePeer: nodes[0].peerId
+ }
+ })
+ await onConnectN(nodes[0].peerId, d1)
+ await handle0({
+ protocol: multicodec,
+ stream: d0.stream,
+ connection: {
+ remotePeer: nodes[n].peerId
+ }
+ })
}
// await mesh rebalancing
diff --git a/test/multiple-nodes.spec.js b/test/multiple-nodes.spec.js
index 7159eb09..9b7bc6e1 100644
--- a/test/multiple-nodes.spec.js
+++ b/test/multiple-nodes.spec.js
@@ -38,15 +38,46 @@ describe('multiple nodes (more than 2)', () => {
const onConnectA = registrarRecords[0][multicodec].onConnect
const onConnectB = registrarRecords[1][multicodec].onConnect
const onConnectC = registrarRecords[2][multicodec].onConnect
+ const handleA = registrarRecords[0][multicodec].handler
+ const handleB = registrarRecords[1][multicodec].handler
+ const handleC = registrarRecords[2][multicodec].handler
// Notice peers of connection
const [d0, d1] = ConnectionPair()
- onConnectA(b.peerId, d0)
- onConnectB(a.peerId, d1)
+ await onConnectA(b.peerId, d0)
+ await handleB({
+ protocol: multicodec,
+ stream: d1.stream,
+ connection: {
+ remotePeer: a.peerId
+ }
+ })
+ await onConnectB(a.peerId, d1)
+ await handleA({
+ protocol: multicodec,
+ stream: d0.stream,
+ connection: {
+ remotePeer: b.peerId
+ }
+ })
const [d2, d3] = ConnectionPair()
- onConnectB(c.peerId, d2)
- onConnectC(b.peerId, d3)
+ await onConnectB(c.peerId, d2)
+ await handleC({
+ protocol: multicodec,
+ stream: d3.stream,
+ connection: {
+ remotePeer: b.peerId
+ }
+ })
+ await onConnectC(b.peerId, d3)
+ await handleB({
+ protocol: multicodec,
+ stream: d2.stream,
+ connection: {
+ remotePeer: c.peerId
+ }
+ })
})
after(() => Promise.all(nodes.map((n) => n.stop())))
@@ -103,15 +134,46 @@ describe('multiple nodes (more than 2)', () => {
const onConnectA = registrarRecords[0][multicodec].onConnect
const onConnectB = registrarRecords[1][multicodec].onConnect
const onConnectC = registrarRecords[2][multicodec].onConnect
+ const handleA = registrarRecords[0][multicodec].handler
+ const handleB = registrarRecords[1][multicodec].handler
+ const handleC = registrarRecords[2][multicodec].handler
// Notice peers of connection
const [d0, d1] = ConnectionPair()
- onConnectA(b.peerId, d0)
- onConnectB(a.peerId, d1)
+ await onConnectA(b.peerId, d0)
+ await handleB({
+ protocol: multicodec,
+ stream: d1.stream,
+ connection: {
+ remotePeer: a.peerId
+ }
+ })
+ await onConnectB(a.peerId, d1)
+ await handleA({
+ protocol: multicodec,
+ stream: d0.stream,
+ connection: {
+ remotePeer: b.peerId
+ }
+ })
const [d2, d3] = ConnectionPair()
- onConnectB(c.peerId, d2)
- onConnectC(b.peerId, d3)
+ await onConnectB(c.peerId, d2)
+ await handleC({
+ protocol: multicodec,
+ stream: d3.stream,
+ connection: {
+ remotePeer: b.peerId
+ }
+ })
+ await onConnectC(b.peerId, d3)
+ await handleB({
+ protocol: multicodec,
+ stream: d2.stream,
+ connection: {
+ remotePeer: c.peerId
+ }
+ })
a.subscribe(topic)
b.subscribe(topic)
@@ -198,15 +260,46 @@ describe('multiple nodes (more than 2)', () => {
const onConnectA = registrarRecords[0][multicodec].onConnect
const onConnectB = registrarRecords[1][multicodec].onConnect
const onConnectC = registrarRecords[2][multicodec].onConnect
+ const handleA = registrarRecords[0][multicodec].handler
+ const handleB = registrarRecords[1][multicodec].handler
+ const handleC = registrarRecords[2][multicodec].handler
// Notice peers of connection
const [d0, d1] = ConnectionPair()
- onConnectA(b.peerId, d0)
- onConnectB(a.peerId, d1)
+ await onConnectA(b.peerId, d0)
+ await handleB({
+ protocol: multicodec,
+ stream: d1.stream,
+ connection: {
+ remotePeer: a.peerId
+ }
+ })
+ await onConnectB(a.peerId, d1)
+ await handleA({
+ protocol: multicodec,
+ stream: d0.stream,
+ connection: {
+ remotePeer: b.peerId
+ }
+ })
const [d2, d3] = ConnectionPair()
- onConnectB(c.peerId, d2)
- onConnectC(b.peerId, d3)
+ await onConnectB(c.peerId, d2)
+ await handleC({
+ protocol: multicodec,
+ stream: d3.stream,
+ connection: {
+ remotePeer: b.peerId
+ }
+ })
+ await onConnectC(b.peerId, d3)
+ await handleB({
+ protocol: multicodec,
+ stream: d2.stream,
+ connection: {
+ remotePeer: c.peerId
+ }
+ })
a.subscribe(topic)
b.subscribe(topic)
@@ -264,22 +357,84 @@ describe('multiple nodes (more than 2)', () => {
const onConnectD = registrarRecords[3][multicodec].onConnect
const onConnectE = registrarRecords[4][multicodec].onConnect
+ const handleA = registrarRecords[0][multicodec].handler
+ const handleB = registrarRecords[1][multicodec].handler
+ const handleC = registrarRecords[2][multicodec].handler
+ const handleD = registrarRecords[3][multicodec].handler
+ const handleE = registrarRecords[4][multicodec].handler
+
// Notice peers of connection
const [d0, d1] = ConnectionPair()
- onConnectA(b.peerId, d0)
- onConnectB(a.peerId, d1)
+ await onConnectA(b.peerId, d0)
+ await handleB({
+ protocol: multicodec,
+ stream: d1.stream,
+ connection: {
+ remotePeer: a.peerId
+ }
+ })
+ await onConnectB(a.peerId, d1)
+ await handleA({
+ protocol: multicodec,
+ stream: d0.stream,
+ connection: {
+ remotePeer: b.peerId
+ }
+ })
const [d2, d3] = ConnectionPair()
- onConnectB(c.peerId, d2)
- onConnectC(b.peerId, d3)
+ await onConnectB(c.peerId, d2)
+ await handleC({
+ protocol: multicodec,
+ stream: d3.stream,
+ connection: {
+ remotePeer: b.peerId
+ }
+ })
+ await onConnectC(b.peerId, d3)
+ await handleB({
+ protocol: multicodec,
+ stream: d2.stream,
+ connection: {
+ remotePeer: c.peerId
+ }
+ })
const [d4, d5] = ConnectionPair()
- onConnectC(d.peerId, d4)
- onConnectD(c.peerId, d5)
+ await onConnectC(d.peerId, d4)
+ await handleD({
+ protocol: multicodec,
+ stream: d5.stream,
+ connection: {
+ remotePeer: c.peerId
+ }
+ })
+ await onConnectD(c.peerId, d5)
+ await handleC({
+ protocol: multicodec,
+ stream: d4.stream,
+ connection: {
+ remotePeer: d.peerId
+ }
+ })
const [d6, d7] = ConnectionPair()
- onConnectD(e.peerId, d6)
- onConnectE(d.peerId, d7)
+ await onConnectD(e.peerId, d6)
+ await handleE({
+ protocol: multicodec,
+ stream: d7.stream,
+ connection: {
+ remotePeer: d.peerId
+ }
+ })
+ await onConnectE(d.peerId, d7)
+ await handleD({
+ protocol: multicodec,
+ stream: d6.stream,
+ connection: {
+ remotePeer: e.peerId
+ }
+ })
a.subscribe(topic)
b.subscribe(topic)
diff --git a/test/peerScore.spec.js b/test/peerScore.spec.js
new file mode 100644
index 00000000..bdd98ee6
--- /dev/null
+++ b/test/peerScore.spec.js
@@ -0,0 +1,719 @@
+const { expect } = require('chai')
+const PeerId = require('peer-id')
+const { utils } = require('libp2p-pubsub')
+const { PeerScore, createPeerScoreParams, createTopicScoreParams } = require('../src/score')
+
+const addrBook = new Map()
+addrBook.getMultiaddrsForPeer = () => ([])
+
+const makeTestMessage = (i) => {
+ return {
+ seqno: Buffer.alloc(8, i),
+ data: Buffer.from([i]),
+ from: "test",
+ topicIDs: []
+ }
+}
+
+describe('PeerScore', () => {
+ it('should score based on time in mesh', async () => {
+ // Create parameters with reasonable default values
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ const tparams = params.topics[mytopic] = createTopicScoreParams({
+ topicWeight: 0.5,
+ timeInMeshWeight: 1,
+ timeInMeshQuantum: 1,
+ timeInMeshCap: 3600,
+ invalidMessageDeliveriesDecay: 0.1,
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ // Peer score should start at 0
+ const ps = new PeerScore(params, addrBook, utils.msgId)
+ ps.addPeer(peerA)
+
+ let aScore = ps.score(peerA)
+ expect(aScore, 'expected score to start at zero').to.equal(0)
+
+ // The time in mesh depends on how long the peer has been grafted
+ ps.graft(peerA, mytopic)
+ const elapsed = tparams.timeInMeshQuantum * 100
+ await new Promise(resolve => setTimeout(resolve, elapsed))
+
+ ps._refreshScores()
+ aScore = ps.score(peerA)
+ expect(aScore).to.be.gte(
+ tparams.topicWeight * tparams.timeInMeshWeight / tparams.timeInMeshQuantum * elapsed
+ )
+ })
+ it('should cap time in mesh score', async () => {
+ // Create parameters with reasonable default values
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ const tparams = params.topics[mytopic] = createTopicScoreParams({
+ topicWeight: 0.5,
+ timeInMeshWeight: 1,
+ timeInMeshQuantum: 1,
+ timeInMeshCap: 10,
+ invalidMessageDeliveriesDecay: 0.1,
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ // Peer score should start at 0
+ const ps = new PeerScore(params, addrBook, utils.msgId)
+ ps.addPeer(peerA)
+
+ let aScore = ps.score(peerA)
+ expect(aScore, 'expected score to start at zero').to.equal(0)
+
+ // The time in mesh depends on how long the peer has been grafted
+ ps.graft(peerA, mytopic)
+ const elapsed = tparams.timeInMeshQuantum * 40
+ await new Promise(resolve => setTimeout(resolve, elapsed))
+
+ ps._refreshScores()
+ aScore = ps.score(peerA)
+ expect(aScore).to.be.gt(
+ tparams.topicWeight * tparams.timeInMeshWeight * tparams.timeInMeshCap * 0.5
+ )
+ expect(aScore).to.be.lt(
+ tparams.topicWeight * tparams.timeInMeshWeight * tparams.timeInMeshCap * 1.5
+ )
+ })
+ it('should score first message deliveries', async () => {
+ // Create parameters with reasonable default values
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ const tparams = params.topics[mytopic] = createTopicScoreParams({
+ topicWeight: 1,
+ firstMessageDeliveriesWeight: 1,
+ firstMessageDeliveriesDecay: 0.9,
+ invalidMessageDeliveriesDecay: 0.9,
+ firstMessageDeliveriesCap: 50000,
+ timeInMeshQuantum: 1000
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ // Peer score should start at 0
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ ps.addPeer(peerA)
+
+ let aScore = ps.score(peerA)
+ expect(aScore, 'expected score to start at zero').to.equal(0)
+
+ // The time in mesh depends on how long the peer has been grafted
+ ps.graft(peerA, mytopic)
+
+ // deliver a bunch of messages from peer A
+ const nMessages = 100
+ for (let i = 0; i < nMessages; i++) {
+ const msg = makeTestMessage(i)
+ msg.topicIDs = [mytopic]
+ ps.validateMessage(peerA, msg)
+ ps.deliverMessage(peerA, msg)
+ }
+
+ ps._refreshScores()
+ aScore = ps.score(peerA)
+ expect(aScore).to.be.equal(
+ tparams.topicWeight * tparams.firstMessageDeliveriesWeight * nMessages * tparams.firstMessageDeliveriesDecay
+ )
+ })
+ it('should cap first message deliveries score', async () => {
+ // Create parameters with reasonable default values
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ const tparams = params.topics[mytopic] = createTopicScoreParams({
+ topicWeight: 1,
+ firstMessageDeliveriesWeight: 1,
+ firstMessageDeliveriesDecay: 0.9,
+ invalidMessageDeliveriesDecay: 0.9,
+ firstMessageDeliveriesCap: 50,
+ timeInMeshQuantum: 1000
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ // Peer score should start at 0
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ ps.addPeer(peerA)
+
+ let aScore = ps.score(peerA)
+ expect(aScore, 'expected score to start at zero').to.equal(0)
+
+ // The time in mesh depends on how long the peer has been grafted
+ ps.graft(peerA, mytopic)
+
+ // deliver a bunch of messages from peer A
+ const nMessages = 100
+ for (let i = 0; i < nMessages; i++) {
+ const msg = makeTestMessage(i)
+ msg.topicIDs = [mytopic]
+ ps.validateMessage(peerA, msg)
+ ps.deliverMessage(peerA, msg)
+ }
+
+ ps._refreshScores()
+ aScore = ps.score(peerA)
+ expect(aScore).to.be.equal(
+ tparams.topicWeight * tparams.firstMessageDeliveriesWeight * tparams.firstMessageDeliveriesCap * tparams.firstMessageDeliveriesDecay
+ )
+ })
+ it('should decay first message deliveries score', async () => {
+ // Create parameters with reasonable default values
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ const tparams = params.topics[mytopic] = createTopicScoreParams({
+ topicWeight: 1,
+ firstMessageDeliveriesWeight: 1,
+ firstMessageDeliveriesDecay: 0.9, // decay 10% per decay interval
+ invalidMessageDeliveriesDecay: 0.9,
+ firstMessageDeliveriesCap: 50,
+ timeInMeshQuantum: 1000
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ // Peer score should start at 0
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ ps.addPeer(peerA)
+
+ let aScore = ps.score(peerA)
+ expect(aScore, 'expected score to start at zero').to.equal(0)
+
+ // The time in mesh depends on how long the peer has been grafted
+ ps.graft(peerA, mytopic)
+
+ // deliver a bunch of messages from peer A
+ const nMessages = 100
+ for (let i = 0; i < nMessages; i++) {
+ const msg = makeTestMessage(i)
+ msg.topicIDs = [mytopic]
+ ps.validateMessage(peerA, msg)
+ ps.deliverMessage(peerA, msg)
+ }
+
+ ps._refreshScores()
+ aScore = ps.score(peerA)
+ let expected = tparams.topicWeight * tparams.firstMessageDeliveriesWeight * tparams.firstMessageDeliveriesCap * tparams.firstMessageDeliveriesDecay
+ expect(aScore).to.be.equal(expected)
+
+ // refreshing the scores applies the decay param
+ const decayInterals = 10
+ for (let i = 0; i < decayInterals; i++) {
+ ps._refreshScores()
+ expected *= tparams.firstMessageDeliveriesDecay
+ }
+ aScore = ps.score(peerA)
+ expect(aScore).to.be.equal(expected)
+ })
+ it('should score mesh message deliveries', async function () {
+ this.timeout(10000)
+ // Create parameters with reasonable default values
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ const tparams = params.topics[mytopic] = createTopicScoreParams({
+ topicWeight: 1,
+ meshMessageDeliveriesWeight: -1,
+ meshMessageDeliveriesActivation: 1000,
+ meshMessageDeliveriesWindow: 10,
+ meshMessageDeliveriesThreshold: 20,
+ meshMessageDeliveriesCap: 100,
+ meshMessageDeliveriesDecay: 0.9,
+ invalidMessageDeliveriesDecay: 0.9,
+ firstMessageDeliveriesWeight: 0,
+ timeInMeshQuantum: 1000
+ })
+ // peer A always delivers the message first
+ // peer B delivers next (within the delivery window)
+ // peer C delivers outside the delivery window
+ // we expect peers A and B to have a score of zero, since all other param weights are zero
+ // peer C should have a negative score
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ const peerB = await PeerId.create({keyType: 'secp256k1'})
+ const peerC = await PeerId.create({keyType: 'secp256k1'})
+ const peers = [peerA, peerB, peerC]
+ // Peer score should start at 0
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ peers.forEach(p => {
+ ps.addPeer(p)
+ ps.graft(p, mytopic)
+ })
+
+ // assert that nobody has been penalized yet for not delivering messages before activation time
+ ps._refreshScores()
+ peers.forEach(p => {
+ const score = ps.score(p)
+ expect(
+ score,
+ 'expected no mesh delivery penalty before activation time'
+ ).to.equal(0)
+ })
+ // wait for the activation time to kick in
+ await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation))
+
+ // deliver a bunch of messages from peers
+ const nMessages = 100
+ for (let i = 0; i < nMessages; i++) {
+ const msg = makeTestMessage(i)
+ msg.topicIDs = [mytopic]
+
+ ps.validateMessage(peerA, msg)
+ ps.deliverMessage(peerA, msg)
+
+ ps.duplicateMessage(peerB, msg)
+
+ // deliver duplicate from peer C after the window
+ await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesWindow + 5))
+ ps.duplicateMessage(peerC, msg)
+ }
+ ps._refreshScores()
+ const aScore = ps.score(peerA)
+ const bScore = ps.score(peerB)
+ const cScore = ps.score(peerC)
+ expect(aScore).to.be.gte(0)
+ expect(bScore).to.be.gte(0)
+
+ // the penalty is the difference between the threshold and the actual mesh deliveries, squared.
+ // since we didn't deliver anything, this is just the value of the threshold
+ const penalty = tparams.meshMessageDeliveriesThreshold * tparams.meshMessageDeliveriesThreshold
+ const expected = tparams.topicWeight * tparams.meshMessageDeliveriesWeight * penalty
+ expect(cScore).to.be.equal(expected)
+ })
+ it('should decay mesh message deliveries score', async function () {
+ this.timeout(10000)
+ // Create parameters with reasonable default values
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ const tparams = params.topics[mytopic] = createTopicScoreParams({
+ topicWeight: 1,
+ meshMessageDeliveriesWeight: -1,
+ meshMessageDeliveriesActivation: 1000,
+ meshMessageDeliveriesWindow: 10,
+ meshMessageDeliveriesThreshold: 20,
+ meshMessageDeliveriesCap: 100,
+ meshMessageDeliveriesDecay: 0.9,
+ invalidMessageDeliveriesDecay: 0.9,
+ firstMessageDeliveriesWeight: 0,
+ timeInMeshQuantum: 1000
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ // Peer score should start at 0
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ ps.addPeer(peerA)
+ ps.graft(peerA, mytopic)
+
+ // wait for the activation time to kick in
+ await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation + 10))
+
+ // deliver a bunch of messages from peer A
+ const nMessages = 40
+ for (let i = 0; i < nMessages; i++) {
+ const msg = makeTestMessage(i)
+ msg.topicIDs = [mytopic]
+
+ ps.validateMessage(peerA, msg)
+ ps.deliverMessage(peerA, msg)
+ }
+ ps._refreshScores()
+ let aScore = ps.score(peerA)
+ expect(aScore).to.be.gte(0)
+
+ // we need to refresh enough times for the decay to bring us below the threshold
+ let decayedDeliveryCount = nMessages * tparams.meshMessageDeliveriesDecay
+ for (let i = 0; i < 20; i++) {
+ ps._refreshScores()
+ decayedDeliveryCount *= tparams.meshMessageDeliveriesDecay
+ }
+ aScore = ps.score(peerA)
+ // the penalty is the difference between the threshold and the (decayed) mesh deliveries, squared.
+ const deficit = tparams.meshMessageDeliveriesThreshold - decayedDeliveryCount
+ const penalty = deficit * deficit
+ const expected = tparams.topicWeight * tparams.meshMessageDeliveriesWeight * penalty
+ expect(aScore).to.be.equal(expected)
+ })
+ it('should score mesh message failures', async function () {
+ this.timeout(10000)
+ // Create parameters with reasonable default values
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ // the mesh failure penalty is applied when a peer is pruned while their
+ // mesh deliveries are under the threshold.
+ // for this test, we set the mesh delivery threshold, but set
+ // meshMessageDeliveriesWeight to zero, so the only affect on the score
+ // is from the mesh failure penalty
+ const tparams = params.topics[mytopic] = createTopicScoreParams({
+ topicWeight: 1,
+ meshFailurePenaltyWeight: -1,
+ meshFailurePenaltyDecay: 0.9,
+
+ meshMessageDeliveriesWeight: 0,
+ meshMessageDeliveriesActivation: 1000,
+ meshMessageDeliveriesWindow: 10,
+ meshMessageDeliveriesThreshold: 20,
+ meshMessageDeliveriesCap: 100,
+ meshMessageDeliveriesDecay: 0.9,
+
+ invalidMessageDeliveriesDecay: 0.9,
+ firstMessageDeliveriesWeight: 0,
+ timeInMeshQuantum: 1000
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ const peerB = await PeerId.create({keyType: 'secp256k1'})
+ const peers = [peerA, peerB]
+ // Peer score should start at 0
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ peers.forEach(p => {
+ ps.addPeer(p)
+ ps.graft(p, mytopic)
+ })
+
+ // wait for the activation time to kick in
+ await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation + 10))
+
+ // deliver a bunch of messages from peer A. peer B does nothing
+ const nMessages = 100
+ for (let i = 0; i < nMessages; i++) {
+ const msg = makeTestMessage(i)
+ msg.topicIDs = [mytopic]
+
+ ps.validateMessage(peerA, msg)
+ ps.deliverMessage(peerA, msg)
+ }
+ // peers A and B should both have zero scores, since the failure penalty hasn't been applied yet
+ ps._refreshScores()
+ let aScore = ps.score(peerA)
+ let bScore = ps.score(peerB)
+ expect(aScore).to.be.equal(0)
+ expect(bScore).to.be.equal(0)
+
+ // prune peer B to apply the penalty
+ ps.prune(peerB, mytopic)
+ ps._refreshScores()
+ aScore = ps.score(peerA)
+ bScore = ps.score(peerB)
+ expect(aScore).to.be.equal(0)
+
+ // penalty calculation is the same as for meshMessageDeliveries, but multiplied by meshFailurePenaltyWeight
+ // instead of meshMessageDeliveriesWeight
+ const penalty = tparams.meshMessageDeliveriesThreshold * tparams.meshMessageDeliveriesThreshold
+ const expected = tparams.topicWeight * tparams.meshFailurePenaltyWeight * penalty * tparams.meshFailurePenaltyDecay
+ expect(bScore).to.be.equal(expected)
+ })
+ it('should score invalid message deliveries', async function () {
+ // Create parameters with reasonable default values
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ const tparams = params.topics[mytopic] = createTopicScoreParams({
+ topicWeight: 1,
+ invalidMessageDeliveriesWeight: -1,
+ invalidMessageDeliveriesDecay: 0.9,
+ timeInMeshQuantum: 1000
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ ps.addPeer(peerA)
+ ps.graft(peerA, mytopic)
+
+ // deliver a bunch of messages from peer A
+ const nMessages = 100
+ for (let i = 0; i < nMessages; i++) {
+ const msg = makeTestMessage(i)
+ msg.topicIDs = [mytopic]
+
+ ps.rejectMessage(peerA, msg)
+ }
+ ps._refreshScores()
+ let aScore = ps.score(peerA)
+
+ const expected = tparams.topicWeight * tparams.invalidMessageDeliveriesWeight * (nMessages * tparams.invalidMessageDeliveriesDecay) ** 2
+ expect(aScore).to.be.equal(expected)
+ })
+ it('should decay invalid message deliveries score', async function () {
+ // Create parameters with reasonable default values
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ const tparams = params.topics[mytopic] = createTopicScoreParams({
+ topicWeight: 1,
+ invalidMessageDeliveriesWeight: -1,
+ invalidMessageDeliveriesDecay: 0.9,
+ timeInMeshQuantum: 1000
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ ps.addPeer(peerA)
+ ps.graft(peerA, mytopic)
+
+ // deliver a bunch of messages from peer A
+ const nMessages = 100
+ for (let i = 0; i < nMessages; i++) {
+ const msg = makeTestMessage(i)
+ msg.topicIDs = [mytopic]
+
+ ps.rejectMessage(peerA, msg)
+ }
+ ps._refreshScores()
+ let aScore = ps.score(peerA)
+
+ let expected = tparams.topicWeight * tparams.invalidMessageDeliveriesWeight * (nMessages * tparams.invalidMessageDeliveriesDecay) ** 2
+ expect(aScore).to.be.equal(expected)
+
+ // refresh scores a few times to apply decay
+ for (let i = 0; i < 10; i++) {
+ ps._refreshScores()
+ expected *= tparams.invalidMessageDeliveriesDecay ** 2
+ }
+ aScore = ps.score(peerA)
+ expect(aScore).to.be.equal(expected)
+ })
+ it('should score invalid/ignored messages', async function () {
+ // this test adds coverage for the dark corners of message rejection
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ const tparams = params.topics[mytopic] = createTopicScoreParams({
+ topicWeight: 1,
+ invalidMessageDeliveriesWeight: -1,
+ invalidMessageDeliveriesDecay: 0.9,
+ timeInMeshQuantum: 1000
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ const peerB = await PeerId.create({keyType: 'secp256k1'})
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ ps.addPeer(peerA)
+ ps.addPeer(peerB)
+
+ const msg = makeTestMessage(0)
+ msg.topicIDs = [mytopic]
+
+ // insert a record
+ ps.validateMessage(peerA, msg)
+
+ // this should have no effect in the score, and subsequent duplicate messages should have no effect either
+ ps.ignoreMessage(peerA, msg)
+ ps.duplicateMessage(peerB, msg)
+
+ let aScore = ps.score(peerA)
+ let bScore = ps.score(peerB)
+ let expected = 0
+ expect(aScore).to.equal(expected)
+ expect(bScore).to.equal(expected)
+
+ // now clear the delivery record
+ ps.deliveryRecords.queue.peekFront().expire = Date.now()
+ await new Promise(resolve => setTimeout(resolve, 5))
+ ps.deliveryRecords.gc()
+
+ // insert a new record in the message deliveries
+ ps.validateMessage(peerA, msg)
+
+ // and reject the message to make sure duplicates are also penalized
+ ps.rejectMessage(peerA, msg)
+ ps.duplicateMessage(peerB, msg)
+
+ aScore = ps.score(peerA)
+ bScore = ps.score(peerB)
+ expected = -1
+ expect(aScore).to.equal(expected)
+ expect(bScore).to.equal(expected)
+
+ // now clear the delivery record again
+ ps.deliveryRecords.queue.peekFront().expire = Date.now()
+ await new Promise(resolve => setTimeout(resolve, 5))
+ ps.deliveryRecords.gc()
+
+ // insert a new record in the message deliveries
+ ps.validateMessage(peerA, msg)
+
+ // and reject the message after a duplicate has arrived
+ ps.duplicateMessage(peerB, msg)
+ ps.rejectMessage(peerA, msg)
+
+ aScore = ps.score(peerA)
+ bScore = ps.score(peerB)
+ expected = -4
+ expect(aScore).to.equal(expected)
+ expect(bScore).to.equal(expected)
+ })
+ it('should score w/ application score', async function () {
+ const mytopic = 'mytopic'
+ let appScoreValue = 0
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ appSpecificScore: () => appScoreValue,
+ appSpecificWeight: 0.5,
+ decayToZero: 0.1
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ ps.addPeer(peerA)
+ ps.graft(peerA, mytopic)
+
+ for (let i = -100; i < 100; i++) {
+ appScoreValue = i
+ ps._refreshScores()
+ const aScore = ps.score(peerA)
+ const expected = i * params.appSpecificWeight
+ expect(aScore).to.equal(expected)
+ }
+ })
+ it('should score w/ IP colocation', async function () {
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ invalidMessageDeliveriesDecay: 0.1,
+ IPColocationFactorThreshold: 1,
+ IPColocationFactorWeight: -1,
+ decayToZero: 0.1
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+ const peerB = await PeerId.create({keyType: 'secp256k1'})
+ const peerC = await PeerId.create({keyType: 'secp256k1'})
+ const peerD = await PeerId.create({keyType: 'secp256k1'})
+ const peers = [peerA, peerB, peerC, peerD]
+
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ peers.forEach(p => {
+ ps.addPeer(p)
+ ps.graft(p, mytopic)
+ })
+
+ const setIPsForPeer = (p, ips) => {
+ ps._setIPs(p, ips, [])
+ const pstats = ps.peerStats.get(p)
+ pstats.ips = ips
+ }
+ // peerA should have no penalty, but B, C, and D should be penalized for sharing an IP
+ setIPsForPeer(peerA, ['1.2.3.4'])
+ setIPsForPeer(peerB, ['2.3.4.5'])
+ setIPsForPeer(peerC, ['2.3.4.5', '3.4.5.6'])
+ setIPsForPeer(peerD, ['2.3.4.5'])
+
+ ps._refreshScores()
+ const aScore = ps.score(peerA)
+ const bScore = ps.score(peerB)
+ const cScore = ps.score(peerC)
+ const dScore = ps.score(peerD)
+
+ expect(aScore).to.equal(0)
+
+ const nShared = 3
+ const ipSurplus = nShared - params.IPColocationFactorThreshold
+ const penalty = ipSurplus ** 2
+ const expected = params.IPColocationFactorWeight * penalty
+ expect(bScore).to.equal(expected)
+ expect(cScore).to.equal(expected)
+ expect(dScore).to.equal(expected)
+ })
+ it('should score w/ behavior penalty', async function () {
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ behaviourPenaltyWeight: -1,
+ behaviourPenaltyDecay: 0.99,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+
+ // add penalty on a non-existent peer
+ ps.addPenalty(peerA, 1)
+ let aScore = ps.score(peerA)
+ expect(aScore).to.equal(0)
+
+ // add the peer and test penalties
+ ps.addPeer(peerA)
+
+ aScore = ps.score(peerA)
+ expect(aScore).to.equal(0)
+
+ ps.addPenalty(peerA, 1)
+ aScore = ps.score(peerA)
+ expect(aScore).to.equal(-1)
+
+ ps.addPenalty(peerA, 1)
+ aScore = ps.score(peerA)
+ expect(aScore).to.equal(-4)
+
+ ps._refreshScores()
+
+ aScore = ps.score(peerA)
+ expect(aScore).to.equal(-3.9204)
+ })
+ it('should handle score retention', async function () {
+ const mytopic = 'mytopic'
+ const params = createPeerScoreParams({
+ decayInterval: 1000,
+ appSpecificScore: () => -1000,
+ appSpecificWeight: 1,
+ invalidMessageDeliveriesDecay: 0.1,
+ decayToZero: 0.1,
+ retainScore: 800,
+ })
+ const peerA = await PeerId.create({keyType: 'secp256k1'})
+
+ const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno))
+ ps.addPeer(peerA)
+ ps.graft(peerA, mytopic)
+
+ // score should equal -1000 (app-specific score)
+ const expected = -1000
+ ps._refreshScores()
+ let aScore = ps.score(peerA)
+ expect(aScore).to.equal(expected)
+
+ // disconnect & wait half of the retainScoreTime
+ // should still have negative score
+ ps.removePeer(peerA)
+ const delay = params.retainScore / 2
+ await new Promise(resolve => setTimeout(resolve, delay))
+ ps._refreshScores()
+ aScore = ps.score(peerA)
+ expect(aScore).to.equal(expected)
+
+ // wait remaining time (plus a little slop) and the score should reset to 0
+ await new Promise(resolve => setTimeout(resolve, delay + 5))
+ ps._refreshScores()
+ aScore = ps.score(peerA)
+ expect(aScore).to.equal(0)
+ })
+})
diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js
index d9991353..4b5bd93b 100644
--- a/test/pubsub.spec.js
+++ b/test/pubsub.spec.js
@@ -6,12 +6,10 @@ const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const sinon = require('sinon')
-const pWaitFor = require('p-wait-for')
const { utils } = require('libp2p-pubsub')
const {
createGossipsub,
- createPeerId,
mockRegistrar
} = require('./utils')
@@ -123,36 +121,6 @@ describe('Pubsub', () => {
})
})
- describe('process', () => {
- it('should disconnect peer on stream error', async () => {
- sinon.spy(gossipsub, '_onPeerDisconnected')
-
- const peerId = await createPeerId()
- const mockConn = {
- newStream () {
- return {
- stream: {
- sink: async source => {
- for await (const _ of source) { // eslint-disable-line no-unused-vars
- // mock stream just swallows any data sent
- }
- },
- source: (async function * () { // eslint-disable-line require-yield
- // throw in a bit
- await new Promise(resolve => setTimeout(resolve, 100))
- throw new Error('boom')
- })()
- }
- }
- }
- }
-
- gossipsub._onPeerConnected(peerId, mockConn)
-
- await pWaitFor(() => gossipsub._onPeerDisconnected.calledWith(peerId), { timeout: 1000 })
- })
- })
-
describe('topic validators', () => {
it('should filter messages by topic validator', async () => {
// use processRpcMessage.callCount to see if a message is valid or not
diff --git a/test/scoreParams.spec.js b/test/scoreParams.spec.js
new file mode 100644
index 00000000..d7c9e8ce
--- /dev/null
+++ b/test/scoreParams.spec.js
@@ -0,0 +1,419 @@
+const { expect } = require('chai')
+const {
+ createPeerScoreThresholds, validatePeerScoreThresholds,
+ createTopicScoreParams, validateTopicScoreParams,
+ createPeerScoreParams, validatePeerScoreParams
+} = require('../src/score/scoreParams')
+
+describe('PeerScoreThresholds validation', () => {
+ it('should throw on invalid PeerScoreThresholds', () => {
+ expect(() => validatePeerScoreThresholds(
+ createPeerScoreThresholds({
+ gossipThreshold: 1
+ })
+ )).to.throw
+ expect(() => validatePeerScoreThresholds(
+ createPeerScoreThresholds({
+ publishThreshold: 1
+ })
+ )).to.throw
+ expect(() => validatePeerScoreThresholds(
+ createPeerScoreThresholds({
+ gossipThreshold: -1,
+ publishThreshold: 0
+ })
+ )).to.throw
+ expect(() => validatePeerScoreThresholds(
+ createPeerScoreThresholds({
+ gossipThreshold: -1,
+ publishThreshold: -2
+ })
+ )).to.throw
+ expect(() => validatePeerScoreThresholds(
+ createPeerScoreThresholds({
+ acceptPXThreshold: -1
+ })
+ )).to.throw
+ expect(() => validatePeerScoreThresholds(
+ createPeerScoreThresholds({
+ opportunisticGraftThreshold: -1
+ })
+ )).to.throw
+ })
+ it('should not throw on valid PeerScoreThresholds', () => {
+ expect(() => validatePeerScoreThresholds(
+ createPeerScoreThresholds({
+ gossipThreshold: -1,
+ publishThreshold: -2,
+ graylistThreshold: -3,
+ acceptPXThreshold: 1,
+ opportunisticGraftThreshold: 2
+ })
+ )).to.not.throw
+ })
+})
+
+describe('TopicScoreParams validation', () => {
+ it('should throw on invalid TopicScoreParams', () => {
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ topicWeight: -1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshWeight: -1,
+ timeInMeshQuantum: 1000
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshWeight: 1,
+ timeInMeshQuantum: -1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshWeight: 1,
+ timeInMeshQuantum: 1000,
+ timeInMeshCap: -1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ firstMessageDeliveriesWeight: -1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ firstMessageDeliveriesWeight: 1,
+ firstMessageDeliveriesDecay: -1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ firstMessageDeliveriesWeight: 1,
+ firstMessageDeliveriesDecay: 2
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ firstMessageDeliveriesWeight: 1,
+ firstMessageDeliveriesDecay: 0.5,
+ firstMessageDeliveriesCap: -1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ meshMessageDeliveriesWeight: 1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ meshMessageDeliveriesWeight: -1,
+ meshMessageDeliveriesDecay: -1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ meshMessageDeliveriesWeight: -1,
+ meshMessageDeliveriesDecay: 2
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ meshMessageDeliveriesWeight: -1,
+ meshMessageDeliveriesDecay: 0.5,
+ meshMessageDeliveriesCap: -1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ meshMessageDeliveriesWeight: -1,
+ meshMessageDeliveriesDecay: 0.5,
+ meshMessageDeliveriesDecay: 5,
+ meshMessageDeliveriesThreshold: -3
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ meshMessageDeliveriesWeight: -1,
+ meshMessageDeliveriesDecay: 0.5,
+ meshMessageDeliveriesDecay: 5,
+ meshMessageDeliveriesThreshold: 3,
+ meshMessageDeliveriesWindow: -1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ meshMessageDeliveriesWeight: -1,
+ meshMessageDeliveriesDecay: 0.5,
+ meshMessageDeliveriesDecay: 5,
+ meshMessageDeliveriesThreshold: 3,
+ meshMessageDeliveriesWindow: 1,
+ meshMessageDeliveriesActivation: 1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ meshFailurePenaltyWeight: 1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ meshFailurePenaltyWeight: -1,
+ meshFailurePenaltyDecay: -1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ meshFailurePenaltyWeight: -1,
+ meshFailurePenaltyDecay: 2
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ invalidMessageDeliveriesWeight: 1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ invalidMessageDeliveriesWeight: -1,
+ invalidMessageDeliveriesDecay: -1
+ })
+ )).to.throw
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ timeInMeshQuantum: 1000,
+ invalidMessageDeliveriesWeight: -1,
+ invalidMessageDeliveriesDecay: 2
+ })
+ )).to.throw
+ })
+ it('should not throw on valid TopicScoreParams', () => {
+ expect(() => validateTopicScoreParams(
+ createTopicScoreParams({
+ topicWeight: 2,
+ timeInMeshWeight: 0.01,
+ timeInMeshQuantum: 1000,
+ timeInMeshCap: 10,
+ firstMessageDeliveriesWeight: 1,
+ firstMessageDeliveriesDecay: 0.5,
+ firstMessageDeliveriesCap: 10,
+ meshMessageDeliveriesWeight: -1,
+ meshMessageDeliveriesDecay: 0.5,
+ meshMessageDeliveriesCap: 10,
+ meshMessageDeliveriesThreshold: 5,
+ meshMessageDeliveriesWindow: 1,
+ meshMessageDeliveriesActivation: 1000,
+ meshFailurePenaltyWeight: -1,
+ meshFailurePenaltyDecay: 0.5,
+ invalidMessageDeliveriesWeight: -1,
+ invalidMessageDeliveriesDecay: 0.5
+ })
+ )).to.not.throw
+ })
+})
+
+describe('PeerScoreParams validation', () => {
+ const appScore = () => 0
+
+ it('should throw on invalid PeerScoreParams', () => {
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ topicScoreCap: -1,
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: 0.01
+ })
+ )).to.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ topicScoreCap: 1,
+ decayInterval: 1000,
+ decayToZero: 0.01
+ })
+ )).to.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ topicScoreCap: 1,
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: 0.01,
+ IPColocationFactorWeight: 1
+ })
+ )).to.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ topicScoreCap: 1,
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: 0.01,
+ IPColocationFactorWeight: -1,
+ IPColocationFactorThreshold: -1
+ })
+ )).to.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ topicScoreCap: 1,
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: 0.01,
+ IPColocationFactorWeight: -1,
+ IPColocationFactorThreshold: 1
+ })
+ )).to.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ topicScoreCap: 1,
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: -1,
+ IPColocationFactorWeight: -1,
+ IPColocationFactorThreshold: 1
+ })
+ )).to.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ topicScoreCap: 1,
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: 2,
+ IPColocationFactorWeight: -1,
+ IPColocationFactorThreshold: 1
+ })
+ )).to.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: 0.01,
+ behaviourPenaltyWeight: 1
+ })
+ )).to.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: 0.01,
+ behaviourPenaltyWeight: -1
+ })
+ )).to.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: 0.01,
+ behaviourPenaltyWeight: -1,
+ behaviourPenaltyDecay: 2
+ })
+ )).to.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ topicScoreCap: 1,
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: 0.01,
+ IPColocationFactorWeight: -1,
+ IPColocationFactorThreshold: 1,
+ topics: {
+ test: {
+ topicWeight: -1,
+ timeInMeshWeight: 0.01,
+ timeInMeshQuantum: time.Second,
+ timeInMeshCap: 10,
+ firstMessageDeliveriesWeight: 1,
+ firstMessageDeliveriesDecay: 0.5,
+ firstMessageDeliveriesCap: 10,
+ meshMessageDeliveriesWeight: -1,
+ meshMessageDeliveriesDecay: 0.5,
+ meshMessageDeliveriesCap: 10,
+ meshMessageDeliveriesThreshold: 5,
+ meshMessageDeliveriesWindow: 1,
+ meshMessageDeliveriesActivation: 1000,
+ meshFailurePenaltyWeight: -1,
+ meshFailurePenaltyDecay: 0.5,
+ invalidMessageDeliveriesWeight: -1,
+ invalidMessageDeliveriesDecay: 0.5
+ }
+ }
+ })
+ )).to.throw
+ })
+ it('should not throw on valid PeerScoreParams', () => {
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: 0.01,
+ IPColocationFactorWeight: -1,
+ IPColocationFactorThreshold: 1,
+ behaviourPenaltyWeight: -1,
+ behaviourPenaltyDecay: 0.999
+ })
+ )).to.not.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ topicScoreCap: 1,
+ appSpecificScore: appScore,
+ decayInterval: 1000,
+ decayToZero: 0.01,
+ IPColocationFactorWeight: -1,
+ IPColocationFactorThreshold: 1,
+ behaviourPenaltyWeight: -1,
+ behaviourPenaltyDecay: 0.999,
+ })
+ )).to.not.throw
+ expect(() => validatePeerScoreParams(
+ createPeerScoreParams({
+ topicScoreCap: 1,
+ appSpecificScore: appScore,
+ decayInterval: time.Second,
+ decayToZero: 0.01,
+ IPColocationFactorWeight: -1,
+ IPColocationFactorThreshold: 1,
+ topics: {
+ test: {
+ topicWeight: 1,
+ timeInMeshWeight: 0.01,
+ timeInMeshQuantum: 1000,
+ timeInMeshCap: 10,
+ firstMessageDeliveriesWeight: 1,
+ firstMessageDeliveriesDecay: 0.5,
+ firstMessageDeliveriesCap: 10,
+ meshMessageDeliveriesWeight: -1,
+ meshMessageDeliveriesDecay: 0.5,
+ meshMessageDeliveriesCap: 10,
+ meshMessageDeliveriesThreshold: 5,
+ meshMessageDeliveriesWindow: 1,
+ meshMessageDeliveriesActivation: 1000,
+ meshFailurePenaltyWeight: -1,
+ meshFailurePenaltyDecay: 0.5,
+ invalidMessageDeliveriesWeight: -1,
+ invalidMessageDeliveriesDecay: 0.5,
+ },
+ },
+ })
+ )).to.not.throw
+ })
+})
\ No newline at end of file
diff --git a/test/utils/index.js b/test/utils/index.js
index f6881941..0e74c8af 100644
--- a/test/utils/index.js
+++ b/test/utils/index.js
@@ -54,17 +54,34 @@ const createGossipsubNodes = async (n, shouldStart, options) => {
exports.createGossipsubNodes = createGossipsubNodes
-const connectGossipsubNodes = (nodes, registrarRecords, multicodec) => {
+const connectGossipsubNodes = async (nodes, registrarRecords, multicodec) => {
// connect all nodes
for (let i = 0; i < nodes.length; i++) {
for (let j = i + 1; j < nodes.length; j++) {
const onConnectI = registrarRecords[i][multicodec].onConnect
const onConnectJ = registrarRecords[j][multicodec].onConnect
+ const handleI = registrarRecords[i][multicodec].handler
+ const handleJ = registrarRecords[j][multicodec].handler
+
// Notice peers of connection
const [d0, d1] = ConnectionPair()
- onConnectI(nodes[j].peerId, d0)
- onConnectJ(nodes[i].peerId, d1)
+ await onConnectI(nodes[j].peerId, d0)
+ await handleJ({
+ protocol: multicodec,
+ stream: d1.stream,
+ connection: {
+ remotePeer: nodes[i].peerId
+ }
+ })
+ await onConnectJ(nodes[i].peerId, d1)
+ await handleI({
+ protocol: multicodec,
+ stream: d0.stream,
+ connection: {
+ remotePeer: nodes[j].peerId
+ }
+ })
}
}
diff --git a/ts/constants.ts b/ts/constants.ts
index 7d68de34..b21f329d 100644
--- a/ts/constants.ts
+++ b/ts/constants.ts
@@ -22,3 +22,5 @@ export const GossipsubHeartbeatInterval = second
// Fanout ttl
export const GossipsubFanoutTTL = minute
+
+export const TimeCacheDuration = 120 * 1000
diff --git a/ts/score/computeScore.ts b/ts/score/computeScore.ts
new file mode 100644
index 00000000..e76f5145
--- /dev/null
+++ b/ts/score/computeScore.ts
@@ -0,0 +1,92 @@
+import { PeerStats } from './peerStats'
+import { PeerScoreParams } from './scoreParams'
+import PeerId = require('peer-id')
+
+export function computeScore (
+ peer: PeerId,
+ pstats: PeerStats,
+ params: PeerScoreParams,
+ peerIPs: Map>
+): number {
+ let score = 0
+
+ // topic stores
+ Object.entries(pstats.topics).forEach(([topic, tstats]) => {
+ // the topic parameters
+ const topicParams = params.topics[topic]
+ if (!topicParams) {
+ // we are not scoring this topic
+ return
+ }
+
+ let topicScore = 0
+
+ // P1: time in Mesh
+ if (tstats.inMesh) {
+ let p1 = tstats.meshTime / topicParams.timeInMeshQuantum
+ if (p1 > topicParams.timeInMeshCap) {
+ p1 = topicParams.timeInMeshCap
+ }
+ topicScore += p1 * topicParams.timeInMeshWeight
+ }
+
+ // P2: first message deliveries
+ const p2 = tstats.firstMessageDeliveries
+ topicScore += p2 * topicParams.firstMessageDeliveriesWeight
+
+ // P3: mesh message deliveries
+ if (tstats.meshMessageDeliveriesActive) {
+ if (tstats.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold) {
+ const deficit = topicParams.meshMessageDeliveriesThreshold - tstats.meshMessageDeliveries
+ const p3 = deficit * deficit
+ topicScore += p3 * topicParams.meshMessageDeliveriesWeight
+ }
+ }
+
+ // P3b:
+ // NOTE: the weight of P3b is negative (validated in validateTopicScoreParams) so this detracts
+ const p3b = tstats.meshFailurePenalty
+ topicScore += p3b * topicParams.meshFailurePenaltyWeight
+
+ // P4: invalid messages
+ // NOTE: the weight of P4 is negative (validated in validateTopicScoreParams) so this detracts
+ const p4 = tstats.invalidMessageDeliveries * tstats.invalidMessageDeliveries
+ topicScore += p4 * topicParams.invalidMessageDeliveriesWeight
+
+ // update score, mixing with topic weight
+ score += topicScore * topicParams.topicWeight
+ })
+
+ // apply the topic score cap, if any
+ if (params.topicScoreCap > 0 && score > params.topicScoreCap) {
+ score = params.topicScoreCap
+ }
+
+ // P5: application-specific score
+ const p5 = params.appSpecificScore(peer)
+ score += p5 * params.appSpecificWeight
+
+ // P6: IP colocation factor
+ pstats.ips.forEach(ip => {
+ if (params.IPColocationFactorWhitelist.has(ip)) {
+ return
+ }
+
+ // P6 has a cliff (IPColocationFactorThreshold)
+ // It's only applied iff at least that many peers are connected to us from that source IP addr.
+ // It is quadratic, and the weight is negative (validated in validatePeerScoreParams)
+ const peersInIP = peerIPs.get(ip)
+ const numPeersInIP = peersInIP ? peersInIP.size : 0
+ if (numPeersInIP > params.IPColocationFactorThreshold) {
+ const surplus = numPeersInIP - params.IPColocationFactorThreshold
+ const p6 = surplus * surplus
+ score += p6 * params.IPColocationFactorWeight
+ }
+ })
+
+ // P7: behavioural pattern penalty
+ const p7 = pstats.behaviourPenalty * pstats.behaviourPenalty
+ score += p7 * params.behaviourPenaltyWeight
+
+ return score
+}
diff --git a/ts/score/index.ts b/ts/score/index.ts
new file mode 100644
index 00000000..1eea1395
--- /dev/null
+++ b/ts/score/index.ts
@@ -0,0 +1,3 @@
+export * from './scoreParams'
+export * from './peerStats'
+export * from './peerScore'
diff --git a/ts/score/messageDeliveries.ts b/ts/score/messageDeliveries.ts
new file mode 100644
index 00000000..1315ab19
--- /dev/null
+++ b/ts/score/messageDeliveries.ts
@@ -0,0 +1,92 @@
+import { TimeCacheDuration } from '../constants'
+import Denque from 'denque'
+import PeerId = require('peer-id')
+
+export enum DeliveryRecordStatus {
+ /**
+ * we don't know (yet) if the message is valid
+ */
+ unknown,
+ /**
+ * we know the message is valid
+ */
+ valid,
+ /**
+ * we know the message is invalid
+ */
+ invalid,
+ /**
+ * we were instructed by the validator to ignore the message
+ */
+ ignored
+}
+
+export interface DeliveryRecord {
+ status: DeliveryRecordStatus
+ firstSeen: number
+ validated: number
+ peers: Set
+}
+
+interface DeliveryQueueEntry {
+ msgId: string
+ expire: number
+}
+
+/**
+ * Map of message ID to DeliveryRecord
+ *
+ * Maintains an internal queue for efficient gc of old messages
+ */
+export class MessageDeliveries {
+ private records: Map
+ private queue: Denque
+
+ constructor () {
+ this.records = new Map()
+ this.queue = new Denque()
+ }
+
+ ensureRecord (msgId: string): DeliveryRecord {
+ let drec = this.records.get(msgId)
+ if (drec) {
+ return drec
+ }
+
+ // record doesn't exist yet
+ // create record
+ drec = {
+ status: DeliveryRecordStatus.unknown,
+ firstSeen: Date.now(),
+ validated: 0,
+ peers: new Set()
+ }
+ this.records.set(msgId, drec)
+
+ // and add msgId to the queue
+ const entry: DeliveryQueueEntry = {
+ msgId,
+ expire: Date.now() + TimeCacheDuration
+ }
+ this.queue.push(entry)
+
+ return drec
+ }
+
+ gc (): void {
+ const now = Date.now()
+ // queue is sorted by expiry time
+ // remove expired messages, remove from queue until first un-expired message found
+ let head = this.queue.peekFront()
+ while (head && head.expire < now) {
+ this.records.delete(head.msgId)
+ this.queue.shift()
+ head = this.queue.peekFront()
+ }
+ }
+
+ clear (): void {
+ this.records.clear()
+ this.queue.clear()
+ }
+}
diff --git a/ts/score/peerScore.ts b/ts/score/peerScore.ts
new file mode 100644
index 00000000..513d51de
--- /dev/null
+++ b/ts/score/peerScore.ts
@@ -0,0 +1,596 @@
+import { Message } from '../message'
+import { PeerScoreParams, validatePeerScoreParams } from './scoreParams'
+import { PeerStats, createPeerStats, ensureTopicStats } from './peerStats'
+import { computeScore } from './computeScore'
+import { MessageDeliveries, DeliveryRecordStatus } from './messageDeliveries'
+import PeerId = require('peer-id')
+import Multiaddr = require('multiaddr')
+// eslint-disable-next-line @typescript-eslint/ban-ts-comment
+// @ts-ignore
+import debug = require('debug')
+
+const log = debug('libp2p:gossipsub:score')
+
+interface AddressBook {
+ getMultiaddrsForPeer(id: PeerId): Multiaddr[]
+ // eslint-disable-next-line @typescript-eslint/ban-types
+ on(evt: string, fn: Function): void
+ // eslint-disable-next-line @typescript-eslint/ban-types
+ off(evt: string, fn: Function): void
+}
+
+export class PeerScore {
+ /**
+ * The score parameters
+ */
+ params: PeerScoreParams
+ /**
+ * Per-peer stats for score calculation
+ */
+ peerStats: Map
+ /**
+ * IP colocation tracking; maps IP => set of peers.
+ */
+ peerIPs: Map>
+ /**
+ * Recent message delivery timing/participants
+ */
+ deliveryRecords: MessageDeliveries
+ /**
+ * Message ID function
+ */
+ msgId: (message: Message) => string
+ _addressBook: AddressBook
+ _backgroundInterval: NodeJS.Timeout
+
+ constructor (params: PeerScoreParams, addressBook: AddressBook, msgId: (message: Message) => string) {
+ validatePeerScoreParams(params)
+ this.params = params
+ this._addressBook = addressBook
+ this.peerStats = new Map()
+ this.peerIPs = new Map()
+ this.deliveryRecords = new MessageDeliveries()
+ this.msgId = msgId
+ }
+
+ /**
+ * Start PeerScore instance
+ * @returns {void}
+ */
+ start (): void {
+ if (this._backgroundInterval) {
+ throw new Error('Peer score already running')
+ }
+ this._backgroundInterval = setInterval(() => this.background(), this.params.decayInterval)
+ this._addressBook.on('change:multiaddrs', this._updateIPs)
+ }
+
+ /**
+ * Stop PeerScore instance
+ * @returns {void}
+ */
+ stop (): void {
+ if (!this._backgroundInterval) {
+ throw new Error('Peer store already stopped')
+ }
+ clearInterval(this._backgroundInterval)
+ delete this._backgroundInterval
+ this._addressBook.off('change:multiaddrs', this._updateIPs)
+ }
+
+ /**
+ * Periodic maintenance
+ * @returns {void}
+ */
+ background (): void {
+ this._refreshScores()
+ this.deliveryRecords.gc()
+ }
+
+ /**
+ * Decays scores, and purges score records for disconnected peers once their expiry has elapsed.
+ * @returns {void}
+ */
+ _refreshScores (): void {
+ const now = Date.now()
+ const decayToZero = this.params.decayToZero
+
+ this.peerStats.forEach((pstats, id) => {
+ if (!pstats.connected) {
+ // has the retention perious expired?
+ if (now > pstats.expire) {
+ // yes, throw it away (but clean up the IP tracking first)
+ this._removeIPs(id, pstats.ips)
+ this.peerStats.delete(id)
+ }
+
+ // we don't decay retained scores, as the peer is not active.
+ // this way the peer cannot reset a negative score by simply disconnecting and reconnecting,
+ // unless the retention period has ellapsed.
+ // similarly, a well behaved peer does not lose its score by getting disconnected.
+ return
+ }
+
+ Object.entries(pstats.topics).forEach(([topic, tstats]) => {
+ const tparams = this.params.topics[topic]
+ if (!tparams) {
+ // we are not scoring this topic
+ // should be unreachable, we only add scored topics to pstats
+ return
+ }
+
+ // decay counters
+ tstats.firstMessageDeliveries *= tparams.firstMessageDeliveriesDecay
+ if (tstats.firstMessageDeliveries < decayToZero) {
+ tstats.firstMessageDeliveries = 0
+ }
+ tstats.meshMessageDeliveries *= tparams.meshMessageDeliveriesDecay
+ if (tstats.meshMessageDeliveries < decayToZero) {
+ tstats.meshMessageDeliveries = 0
+ }
+ tstats.meshFailurePenalty *= tparams.meshFailurePenaltyDecay
+ if (tstats.meshFailurePenalty < decayToZero) {
+ tstats.meshFailurePenalty = 0
+ }
+ tstats.invalidMessageDeliveries *= tparams.invalidMessageDeliveriesDecay
+ if (tstats.invalidMessageDeliveries < decayToZero) {
+ tstats.invalidMessageDeliveries = 0
+ }
+ // update mesh time and activate mesh message delivery parameter if need be
+ if (tstats.inMesh) {
+ tstats.meshTime = now - tstats.graftTime
+ if (tstats.meshTime > tparams.meshMessageDeliveriesActivation) {
+ tstats.meshMessageDeliveriesActive = true
+ }
+ }
+ })
+ // decay P7 counter
+ pstats.behaviourPenalty *= this.params.behaviourPenaltyDecay
+ if (pstats.behaviourPenalty < decayToZero) {
+ pstats.behaviourPenalty = 0
+ }
+ })
+ }
+
+ /**
+ * @param {PeerId} id
+ * @returns {Number}
+ */
+ score (id: PeerId): number {
+ const pstats = this.peerStats.get(id)
+ if (!pstats) {
+ return 0
+ }
+ return computeScore(id, pstats, this.params, this.peerIPs)
+ }
+
+ /**
+ * @param {PeerId} id
+ * @param {Number} penalty
+ * @returns {void}
+ */
+ addPenalty (id: PeerId, penalty: number): void {
+ const pstats = this.peerStats.get(id)
+ if (!pstats) {
+ return
+ }
+ pstats.behaviourPenalty += penalty
+ }
+
+ /**
+ * @param {PeerId} id
+ * @returns {void}
+ */
+ addPeer (id: PeerId): void {
+ // create peer stats (not including topic stats for each topic to be scored)
+ // topic stats will be added as needed
+ const pstats = createPeerStats({
+ connected: true
+ })
+ this.peerStats.set(id, pstats)
+
+ // get + update peer IPs
+ const ips = this._getIPs(id)
+ this._setIPs(id, ips, pstats.ips)
+ pstats.ips = ips
+ }
+
+ /**
+ * @param {PeerId} id
+ * @returns {void}
+ */
+ removePeer (id: PeerId): void {
+ const pstats = this.peerStats.get(id)
+ if (!pstats) {
+ return
+ }
+
+ // decide whether to retain the score; this currently only retains non-positive scores
+ // to dissuade attacks on the score function.
+ if (this.score(id) > 0) {
+ this._removeIPs(id, pstats.ips)
+ this.peerStats.delete(id)
+ return
+ }
+
+ // furthermore, when we decide to retain the score, the firstMessageDelivery counters are
+ // reset to 0 and mesh delivery penalties applied.
+ Object.entries(pstats.topics).forEach(([topic, tstats]) => {
+ tstats.firstMessageDeliveries = 0
+
+ const threshold = this.params.topics[topic].meshMessageDeliveriesThreshold
+ if (tstats.inMesh && tstats.meshMessageDeliveriesActive && tstats.meshMessageDeliveries < threshold) {
+ const deficit = threshold - tstats.meshMessageDeliveries
+ tstats.meshFailurePenalty += deficit * deficit
+ }
+
+ tstats.inMesh = false
+ })
+
+ pstats.connected = false
+ pstats.expire = Date.now() + this.params.retainScore
+ }
+
+ /**
+ * @param {PeerId} id
+ * @param {String} topic
+ * @returns {void}
+ */
+ graft (id: PeerId, topic: string): void {
+ const pstats = this.peerStats.get(id)
+ if (!pstats) {
+ return
+ }
+
+ const tstats = ensureTopicStats(topic, pstats, this.params)
+ if (!tstats) {
+ return
+ }
+
+ tstats.inMesh = true
+ tstats.graftTime = Date.now()
+ tstats.meshTime = 0
+ tstats.meshMessageDeliveriesActive = false
+ }
+
+ /**
+ * @param {PeerId} id
+ * @param {String} topic
+ * @returns {void}
+ */
+ prune (id: PeerId, topic: string): void {
+ const pstats = this.peerStats.get(id)
+ if (!pstats) {
+ return
+ }
+
+ const tstats = ensureTopicStats(topic, pstats, this.params)
+ if (!tstats) {
+ return
+ }
+
+ // sticky mesh delivery rate failure penalty
+ const threshold = this.params.topics[topic].meshMessageDeliveriesThreshold
+ if (tstats.meshMessageDeliveriesActive && tstats.meshMessageDeliveries < threshold) {
+ const deficit = threshold - tstats.meshMessageDeliveries
+ tstats.meshFailurePenalty += deficit * deficit
+ }
+ tstats.inMesh = false
+ }
+
+ /**
+ * @param {PeerId} id
+ * @param {Message} message
+ * @returns {void}
+ */
+ validateMessage (id: PeerId, message: Message): void {
+ this.deliveryRecords.ensureRecord(this.msgId(message))
+ }
+
+ /**
+ * @param {PeerId} id
+ * @param {Message} message
+ * @returns {void}
+ */
+ deliverMessage (id: PeerId, message: Message): void {
+ this._markFirstMessageDelivery(id, message)
+
+ const drec = this.deliveryRecords.ensureRecord(this.msgId(message))
+ const now = Date.now()
+
+ // defensive check that this is the first delivery trace -- delivery status should be unknown
+ if (drec.status !== DeliveryRecordStatus.unknown) {
+ log(
+ 'unexpected delivery: message from %s was first seen %s ago and has delivery status %d',
+ id.toB58String(), now - drec.firstSeen, DeliveryRecordStatus[drec.status]
+ )
+ return
+ }
+
+ // mark the message as valid and reward mesh peers that have already forwarded it to us
+ drec.status = DeliveryRecordStatus.valid
+ drec.validated = now
+ if (drec.peers.has(id)) {
+ // this check is to make sure a peer can't send us a message twice and get a double count
+ // if it is a first delivery.
+ this._markDuplicateMessageDelivery(id, message)
+ }
+ }
+
+ /**
+ * @param {PeerId} id
+ * @param {Message} message
+ * @returns {void}
+ */
+ rejectMessage (id: PeerId, message: Message): void {
+ const drec = this.deliveryRecords.ensureRecord(this.msgId(message))
+
+ // defensive check that this is the first rejection -- delivery status should be unknown
+ if (drec.status !== DeliveryRecordStatus.unknown) {
+ log(
+ 'unexpected rejection: message from %s was first seen %s ago and has delivery status %d',
+ id.toB58String(), Date.now() - drec.firstSeen, DeliveryRecordStatus[drec.status]
+ )
+ return
+ }
+
+ // mark the message as invalid and penalize peers that have already forwarded it.
+ drec.status = DeliveryRecordStatus.invalid
+
+ this._markInvalidMessageDelivery(id, message)
+ drec.peers.forEach(p => {
+ this._markInvalidMessageDelivery(p, message)
+ })
+ }
+
+ /**
+ * @param {PeerId} id
+ * @param {Message} message
+ * @returns {void}
+ */
+ ignoreMessage (id: PeerId, message: Message): void {
+ const drec = this.deliveryRecords.ensureRecord(this.msgId(message))
+
+ // defensive check that this is the first ignore -- delivery status should be unknown
+ if (drec.status !== DeliveryRecordStatus.unknown) {
+ log(
+ 'unexpected ignore: message from %s was first seen %s ago and has delivery status %d',
+ id.toB58String(), Date.now() - drec.firstSeen, DeliveryRecordStatus[drec.status]
+ )
+ return
+ }
+
+ // mark the message as invalid and penalize peers that have already forwarded it.
+ drec.status = DeliveryRecordStatus.ignored
+ }
+
+ /**
+ * @param {PeerId} id
+ * @param {Message} message
+ * @returns {void}
+ */
+ duplicateMessage (id: PeerId, message: Message): void {
+ const drec = this.deliveryRecords.ensureRecord(this.msgId(message))
+
+ if (drec.peers.has(id)) {
+ // we have already seen this duplicate
+ return
+ }
+
+ switch (drec.status) {
+ case DeliveryRecordStatus.unknown:
+ // the message is being validated; track the peer delivery and wait for
+ // the Deliver/Reject/Ignore notification.
+ drec.peers.add(id)
+ break
+ case DeliveryRecordStatus.valid:
+ // mark the peer delivery time to only count a duplicate delivery once.
+ drec.peers.add(id)
+ this._markDuplicateMessageDelivery(id, message, drec.validated)
+ break
+ case DeliveryRecordStatus.invalid:
+ // we no longer track delivery time
+ this._markInvalidMessageDelivery(id, message)
+ break
+ }
+ }
+
+ /**
+ * Increments the "invalid message deliveries" counter for all scored topics the message is published in.
+ * @param {PeerId} id
+ * @param {Message} message
+ * @returns {void}
+ */
+ _markInvalidMessageDelivery (id: PeerId, message: Message): void {
+ const pstats = this.peerStats.get(id)
+ if (!pstats) {
+ return
+ }
+
+ message.topicIDs.forEach(topic => {
+ const tstats = ensureTopicStats(topic, pstats, this.params)
+ if (!tstats) {
+ return
+ }
+
+ tstats.invalidMessageDeliveries += 1
+ })
+ }
+
+ /**
+ * Increments the "first message deliveries" counter for all scored topics the message is published in,
+ * as well as the "mesh message deliveries" counter, if the peer is in the mesh for the topic.
+ * @param {PeerId} id
+ * @param {Message} message
+ * @returns {void}
+ */
+ _markFirstMessageDelivery (id: PeerId, message: Message): void {
+ const pstats = this.peerStats.get(id)
+ if (!pstats) {
+ return
+ }
+
+ message.topicIDs.forEach(topic => {
+ const tstats = ensureTopicStats(topic, pstats, this.params)
+ if (!tstats) {
+ return
+ }
+
+ let cap = this.params.topics[topic].firstMessageDeliveriesCap
+ tstats.firstMessageDeliveries += 1
+ if (tstats.firstMessageDeliveries > cap) {
+ tstats.firstMessageDeliveries = cap
+ }
+
+ if (!tstats.inMesh) {
+ return
+ }
+
+ cap = this.params.topics[topic].meshMessageDeliveriesCap
+ tstats.meshMessageDeliveries += 1
+ if (tstats.meshMessageDeliveries > cap) {
+ tstats.meshMessageDeliveries = cap
+ }
+ })
+ }
+
+ /**
+ * Increments the "mesh message deliveries" counter for messages we've seen before,
+ * as long the message was received within the P3 window.
+ * @param {PeerId} id
+ * @param {Message} message
+ * @param {number} validatedTime
+ * @returns {void}
+ */
+ _markDuplicateMessageDelivery (id: PeerId, message: Message, validatedTime = 0): void {
+ const pstats = this.peerStats.get(id)
+ if (!pstats) {
+ return
+ }
+
+ const now = validatedTime ? Date.now() : 0
+
+ message.topicIDs.forEach(topic => {
+ const tstats = ensureTopicStats(topic, pstats, this.params)
+ if (!tstats) {
+ return
+ }
+
+ if (!tstats.inMesh) {
+ return
+ }
+
+ const tparams = this.params.topics[topic]
+
+ // check against the mesh delivery window -- if the validated time is passed as 0, then
+ // the message was received before we finished validation and thus falls within the mesh
+ // delivery window.
+ if (validatedTime && now > validatedTime + tparams.meshMessageDeliveriesWindow) {
+ return
+ }
+
+ const cap = tparams.meshMessageDeliveriesCap
+ tstats.meshMessageDeliveries += 1
+ if (tstats.meshMessageDeliveries > cap) {
+ tstats.meshMessageDeliveries = cap
+ }
+ })
+ }
+
+ /**
+ * Gets the current IPs for a peer.
+ * @param {PeerId} id
+ * @returns {Array}
+ */
+ _getIPs (id: PeerId): string[] {
+ return this._addressBook.getMultiaddrsForPeer(id)
+ .map(ma => {
+ return ma.toOptions().host
+ })
+ }
+
+ /**
+ * Called as a callback to addressbook updates
+ * @param {PeerId} id
+ * @param {Array} multiaddrs
+ * @returns {void}
+ */
+ _updateIPs = (id: PeerId, multiaddrs: Multiaddr[]): void => {
+ const pstats = this.peerStats.get(id)
+ if (!pstats) {
+ return
+ }
+
+ this._setIPs(id, multiaddrs.map(ma => ma.toOptions().host), pstats.ips)
+ }
+
+ /**
+ * Adds tracking for the new IPs in the list, and removes tracking from the obsolete IPs.
+ * @param {PeerId} id
+ * @param {Array} newIPs
+ * @param {Array} oldIPs
+ * @returns {void}
+ */
+ _setIPs (id: PeerId, newIPs: string[], oldIPs: string[]): void {
+ // add the new IPs to the tracking
+ // eslint-disable-next-line no-labels
+ addNewIPs:
+ for (const ip of newIPs) {
+ // check if it is in the old ips list
+ for (const xip of oldIPs) {
+ if (ip === xip) {
+ // eslint-disable-next-line no-labels
+ continue addNewIPs
+ }
+ }
+ // no, it's a new one -- add it to the tracker
+ let peers = this.peerIPs.get(ip)
+ if (!peers) {
+ peers = new Set()
+ this.peerIPs.set(ip, peers)
+ }
+ peers.add(id)
+ }
+ // remove the obsolete old IPs from the tracking
+ // eslint-disable-next-line no-labels
+ removeOldIPs:
+ for (const ip of oldIPs) {
+ // check if it is in the new ips list
+ for (const xip of newIPs) {
+ if (ip === xip) {
+ // eslint-disable-next-line no-labels
+ continue removeOldIPs
+ }
+ }
+ // no, its obselete -- remove it from the tracker
+ const peers = this.peerIPs.get(ip)
+ if (!peers) {
+ continue
+ }
+ peers.delete(id)
+ if (!peers.size) {
+ this.peerIPs.delete(ip)
+ }
+ }
+ }
+
+ /**
+ * Removes an IP list from the tracking list for a peer.
+ * @param {PeerId} id
+ * @param {Array} ips
+ * @returns {void}
+ */
+ _removeIPs (id: PeerId, ips: string[]): void {
+ ips.forEach(ip => {
+ const peers = this.peerIPs.get(ip)
+ if (!peers) {
+ return
+ }
+
+ peers.delete(id)
+ if (!peers.size) {
+ this.peerIPs.delete(ip)
+ }
+ })
+ }
+}
diff --git a/ts/score/peerStats.ts b/ts/score/peerStats.ts
new file mode 100644
index 00000000..532ecf4b
--- /dev/null
+++ b/ts/score/peerStats.ts
@@ -0,0 +1,114 @@
+import { PeerScoreParams } from './scoreParams'
+
+export interface PeerStats {
+ /**
+ * true if the peer is currently connected
+ */
+ connected: boolean
+
+ /**
+ * expiration time of the score stats for disconnected peers
+ */
+ expire: number
+
+ /**
+ * per topic stats
+ */
+ topics: Record
+
+ /**
+ * IP tracking; store as string for easy processing
+ */
+ ips: string[]
+
+ /**
+ * behavioural pattern penalties (applied by the router)
+ */
+ behaviourPenalty: number
+}
+
+export interface TopicStats {
+ /**
+ * true if the peer is in the mesh
+ */
+ inMesh: boolean
+
+ /**
+ * time when the peer was (last) GRAFTed; valid only when in mesh
+ */
+ graftTime: number
+
+ /**
+ * time in mesh (updated during refresh/decay to avoid calling gettimeofday on
+ * every score invocation)
+ */
+ meshTime: number
+
+ /**
+ * first message deliveries
+ */
+ firstMessageDeliveries: number
+
+ /**
+ * mesh message deliveries
+ */
+ meshMessageDeliveries: number
+
+ /**
+ * true if the peer has been enough time in the mesh to activate mess message deliveries
+ */
+ meshMessageDeliveriesActive: boolean
+
+ /**
+ * sticky mesh rate failure penalty counter
+ */
+ meshFailurePenalty: number
+
+ /**
+ * invalid message counter
+ */
+ invalidMessageDeliveries: number
+}
+
+export function createPeerStats (ps: Partial = {}): PeerStats {
+ return {
+ connected: false,
+ expire: 0,
+ ips: [],
+ behaviourPenalty: 0,
+ ...ps,
+ topics: ps.topics
+ ? Object.entries(ps.topics)
+ .reduce((topics, [topic, topicStats]) => {
+ topics[topic] = createTopicStats(topicStats)
+ return topics
+ }, {} as Record)
+ : {}
+ }
+}
+
+export function createTopicStats (ts: Partial = {}): TopicStats {
+ return {
+ inMesh: false,
+ graftTime: 0,
+ meshTime: 0,
+ firstMessageDeliveries: 0,
+ meshMessageDeliveries: 0,
+ meshMessageDeliveriesActive: false,
+ meshFailurePenalty: 0,
+ invalidMessageDeliveries: 0,
+ ...ts
+ }
+}
+
+export function ensureTopicStats (topic: string, ps: PeerStats, params: PeerScoreParams): TopicStats | undefined {
+ let ts = ps.topics[topic]
+ if (ts) {
+ return ts
+ }
+ if (!params.topics[topic]) {
+ return undefined
+ }
+ ps.topics[topic] = ts = createTopicStats()
+ return ts
+}
diff --git a/ts/score/scoreParams.ts b/ts/score/scoreParams.ts
new file mode 100644
index 00000000..4e7c4870
--- /dev/null
+++ b/ts/score/scoreParams.ts
@@ -0,0 +1,376 @@
+import PeerId = require('peer-id')
+
+export interface PeerScoreThresholds {
+ /**
+ * gossipThreshold is the score threshold below which gossip propagation is supressed;
+ * should be negative.
+ */
+ gossipThreshold: number
+
+ /**
+ * publishThreshold is the score threshold below which we shouldn't publish when using flood
+ * publishing (also applies to fanout and floodsub peers); should be negative and <= GossipThreshold.
+ */
+ publishThreshold: number
+
+ /**
+ * graylistThreshold is the score threshold below which message processing is supressed altogether,
+ * implementing an effective graylist according to peer score; should be negative and <= PublisThreshold.
+ */
+ graylistThreshold: number
+
+ /**
+ * acceptPXThreshold is the score threshold below which PX will be ignored; this should be positive
+ * and limited to scores attainable by bootstrappers and other trusted nodes.
+ */
+ acceptPXThreshold: number
+
+ /**
+ * opportunisticGraftThreshold is the median mesh score threshold before triggering opportunistic
+ * grafting; this should have a small positive value.
+ */
+ opportunisticGraftThreshold: number
+}
+
+export function createPeerScoreThresholds (p: Partial): PeerScoreThresholds {
+ return {
+ gossipThreshold: 0,
+ publishThreshold: 0,
+ graylistThreshold: 0,
+ acceptPXThreshold: 0,
+ opportunisticGraftThreshold: 0,
+ ...p
+ }
+}
+
+export function validatePeerScoreThresholds (p: PeerScoreThresholds): void {
+ if (p.gossipThreshold > 0) {
+ throw new Error('invalid gossip threshold; it must be <= 0')
+ }
+ if (p.publishThreshold > 0 || p.publishThreshold > p.gossipThreshold) {
+ throw new Error('invalid publish threshold; it must be <= 0 and <= gossip threshold')
+ }
+ if (p.graylistThreshold > 0 || p.graylistThreshold > p.publishThreshold) {
+ throw new Error('invalid graylist threshold; it must be <= 0 and <= publish threshold')
+ }
+ if (p.acceptPXThreshold < 0) {
+ throw new Error('invalid accept PX threshold; it must be >= 0')
+ }
+ if (p.opportunisticGraftThreshold < 0) {
+ throw new Error('invalid opportunistic grafting threshold; it must be >= 0')
+ }
+}
+
+export interface PeerScoreParams {
+ /**
+ * Score parameters per topic.
+ */
+ topics: Record
+
+ /**
+ * Aggregate topic score cap; this limits the total contribution of topics towards a positive
+ * score. It must be positive (or 0 for no cap).
+ */
+ topicScoreCap: number
+
+ /**
+ * P5: Application-specific peer scoring
+ */
+ appSpecificScore: (p: PeerId) => number
+ appSpecificWeight: number
+
+ /**
+ * P6: IP-colocation factor.
+ * The parameter has an associated counter which counts the number of peers with the same IP.
+ * If the number of peers in the same IP exceeds IPColocationFactorThreshold, then the value
+ * is the square of the difference, ie (PeersInSameIP - IPColocationThreshold)^2.
+ * If the number of peers in the same IP is less than the threshold, then the value is 0.
+ * The weight of the parameter MUST be negative, unless you want to disable for testing.
+ * Note: In order to simulate many IPs in a managable manner when testing, you can set the weight to 0
+ * thus disabling the IP colocation penalty.
+ */
+ IPColocationFactorWeight: number
+ IPColocationFactorThreshold: number
+ IPColocationFactorWhitelist: Set
+
+ /**
+ * P7: behavioural pattern penalties.
+ * This parameter has an associated counter which tracks misbehaviour as detected by the
+ * router. The router currently applies penalties for the following behaviors:
+ * - attempting to re-graft before the prune backoff time has elapsed.
+ * - not following up in IWANT requests for messages advertised with IHAVE.
+ *
+ * The value of the parameter is the square of the counter, which decays with BehaviourPenaltyDecay.
+ * The weight of the parameter MUST be negative (or zero to disable).
+ */
+ behaviourPenaltyWeight: number
+ behaviourPenaltyDecay: number
+
+ /**
+ * the decay interval for parameter counters.
+ */
+ decayInterval: number
+
+ /**
+ * counter value below which it is considered 0.
+ */
+ decayToZero: number
+
+ /**
+ * time to remember counters for a disconnected peer.
+ */
+ retainScore: number
+}
+
+export interface TopicScoreParams {
+ /**
+ * The weight of the topic.
+ */
+ topicWeight: number
+
+ /**
+ * P1: time in the mesh
+ * This is the time the peer has ben grafted in the mesh.
+ * The value of of the parameter is the time/TimeInMeshQuantum, capped by TimeInMeshCap
+ * The weight of the parameter MUST be positive (or zero to disable).
+ */
+ timeInMeshWeight: number
+ timeInMeshQuantum: number
+ timeInMeshCap: number
+
+ /**
+ * P2: first message deliveries
+ * This is the number of message deliveries in the topic.
+ * The value of the parameter is a counter, decaying with FirstMessageDeliveriesDecay, and capped
+ * by FirstMessageDeliveriesCap.
+ * The weight of the parameter MUST be positive (or zero to disable).
+ */
+ firstMessageDeliveriesWeight: number
+ firstMessageDeliveriesDecay: number
+ firstMessageDeliveriesCap: number
+
+ /**
+ * P3: mesh message deliveries
+ * This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesWindow of
+ * message validation; deliveries during validation also count and are retroactively applied
+ * when validation succeeds.
+ * This window accounts for the minimum time before a hostile mesh peer trying to game the score
+ * could replay back a valid message we just sent them.
+ * It effectively tracks first and near-first deliveries, ie a message seen from a mesh peer
+ * before we have forwarded it to them.
+ * The parameter has an associated counter, decaying with MeshMessageDeliveriesDecay.
+ * If the counter exceeds the threshold, its value is 0.
+ * If the counter is below the MeshMessageDeliveriesThreshold, the value is the square of
+ * the deficit, ie (MessageDeliveriesThreshold - counter)^2
+ * The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh.
+ * The weight of the parameter MUST be negative (or zero to disable).
+ */
+ meshMessageDeliveriesWeight: number
+ meshMessageDeliveriesDecay: number
+ meshMessageDeliveriesCap: number
+ meshMessageDeliveriesThreshold: number
+ meshMessageDeliveriesWindow: number
+ meshMessageDeliveriesActivation: number
+
+ /**
+ * P3b: sticky mesh propagation failures
+ * This is a sticky penalty that applies when a peer gets pruned from the mesh with an active
+ * mesh message delivery penalty.
+ * The weight of the parameter MUST be negative (or zero to disable)
+ */
+ meshFailurePenaltyWeight: number
+ meshFailurePenaltyDecay: number
+
+ /**
+ * P4: invalid messages
+ * This is the number of invalid messages in the topic.
+ * The value of the parameter is the square of the counter, decaying with
+ * InvalidMessageDeliveriesDecay.
+ * The weight of the parameter MUST be negative (or zero to disable).
+ */
+ invalidMessageDeliveriesWeight: number
+ invalidMessageDeliveriesDecay: number
+}
+
+export function createPeerScoreParams (p: Partial): PeerScoreParams {
+ return {
+ topicScoreCap: 0,
+ appSpecificScore: (): number => 0,
+ appSpecificWeight: 0,
+ IPColocationFactorWeight: 0,
+ IPColocationFactorThreshold: 0,
+ IPColocationFactorWhitelist: new Set(p.IPColocationFactorWhitelist),
+ behaviourPenaltyWeight: 0,
+ behaviourPenaltyDecay: 0,
+ decayInterval: 0,
+ decayToZero: 0,
+ retainScore: 0,
+ ...p,
+ topics: p.topics
+ ? Object.entries(p.topics)
+ .reduce((topics, [topic, topicScoreParams]) => {
+ topics[topic] = createTopicScoreParams(topicScoreParams)
+ return topics
+ }, {} as Record)
+ : {}
+ }
+}
+
+export function createTopicScoreParams (p: Partial): TopicScoreParams {
+ return {
+ topicWeight: 0,
+ timeInMeshWeight: 0,
+ timeInMeshQuantum: 0,
+ timeInMeshCap: 0,
+ firstMessageDeliveriesWeight: 0,
+ firstMessageDeliveriesDecay: 0,
+ firstMessageDeliveriesCap: 0,
+ meshMessageDeliveriesWeight: 0,
+ meshMessageDeliveriesDecay: 0,
+ meshMessageDeliveriesCap: 0,
+ meshMessageDeliveriesThreshold: 0,
+ meshMessageDeliveriesWindow: 0,
+ meshMessageDeliveriesActivation: 0,
+ meshFailurePenaltyWeight: 0,
+ meshFailurePenaltyDecay: 0,
+ invalidMessageDeliveriesWeight: 0,
+ invalidMessageDeliveriesDecay: 0,
+ ...p
+ }
+}
+
+// peer score parameter validation
+export function validatePeerScoreParams (p: PeerScoreParams): void {
+ for (const [topic, params] of Object.entries(p.topics)) {
+ try {
+ validateTopicScoreParams(params)
+ } catch (e) {
+ throw new Error(`invalid score parameters for topic ${topic}: ${e.message}`)
+ }
+ }
+
+ // check that the topic score is 0 or something positive
+ if (p.topicScoreCap < 0) {
+ throw new Error('invalid topic score cap; must be positive (or 0 for no cap)')
+ }
+
+ // check that we have an app specific score; the weight can be anything (but expected positive)
+ if (p.appSpecificScore === null || p.appSpecificScore === undefined) {
+ throw new Error('missing application specific score function')
+ }
+
+ // check the IP colocation factor
+ if (p.IPColocationFactorWeight > 0) {
+ throw new Error('invalid IPColocationFactorWeight; must be negative (or 0 to disable)')
+ }
+ if (p.IPColocationFactorWeight !== 0 && p.IPColocationFactorThreshold < 1) {
+ throw new Error('invalid IPColocationFactorThreshold; must be at least 1')
+ }
+
+ // check the behaviour penalty
+ if (p.behaviourPenaltyWeight > 0) {
+ throw new Error('invalid BehaviourPenaltyWeight; must be negative (or 0 to disable)')
+ }
+ if (p.behaviourPenaltyWeight !== 0 && (p.behaviourPenaltyDecay <= 0 || p.behaviourPenaltyDecay >= 1)) {
+ throw new Error('invalid BehaviourPenaltyDecay; must be between 0 and 1')
+ }
+
+ // check the decay parameters
+ if (p.decayInterval < 1000) {
+ throw new Error('invalid DecayInterval; must be at least 1s')
+ }
+ if (p.decayToZero <= 0 || p.decayToZero >= 1) {
+ throw new Error('invalid DecayToZero; must be between 0 and 1')
+ }
+
+ // no need to check the score retention; a value of 0 means that we don't retain scores
+}
+
+export function validateTopicScoreParams (p: TopicScoreParams): void {
+ // make sure we have a sane topic weight
+ if (p.topicWeight < 0) {
+ throw new Error('invalid topic weight; must be >= 0')
+ }
+
+ // check P1
+ if (p.timeInMeshQuantum === 0) {
+ throw new Error('invalid TimeInMeshQuantum; must be non zero')
+ }
+ if (p.timeInMeshWeight < 0) {
+ throw new Error('invalid TimeInMeshWeight; must be positive (or 0 to disable)')
+ }
+ if (p.timeInMeshWeight !== 0 && p.timeInMeshQuantum <= 0) {
+ throw new Error('invalid TimeInMeshQuantum; must be positive')
+ }
+ if (p.timeInMeshWeight !== 0 && p.timeInMeshCap <= 0) {
+ throw new Error('invalid TimeInMeshCap; must be positive')
+ }
+
+ // check P2
+ if (p.firstMessageDeliveriesWeight < 0) {
+ throw new Error('invallid FirstMessageDeliveriesWeight; must be positive (or 0 to disable)')
+ }
+ if (p.firstMessageDeliveriesWeight !== 0 && (p.firstMessageDeliveriesDecay <= 0 || p.firstMessageDeliveriesDecay >= 1)) {
+ throw new Error('invalid FirstMessageDeliveriesDecay; must be between 0 and 1')
+ }
+ if (p.firstMessageDeliveriesWeight !== 0 && p.firstMessageDeliveriesCap <= 0) {
+ throw new Error('invalid FirstMessageDeliveriesCap; must be positive')
+ }
+
+ // check P3
+ if (p.meshMessageDeliveriesWeight > 0) {
+ throw new Error('invalid MeshMessageDeliveriesWeight; must be negative (or 0 to disable)')
+ }
+ if (p.meshMessageDeliveriesWeight !== 0 && (p.meshMessageDeliveriesDecay <= 0 || p.meshMessageDeliveriesDecay >= 1)) {
+ throw new Error('invalid MeshMessageDeliveriesDecay; must be between 0 and 1')
+ }
+ if (p.meshMessageDeliveriesWeight !== 0 && p.meshMessageDeliveriesCap <= 0) {
+ throw new Error('invalid MeshMessageDeliveriesCap; must be positive')
+ }
+ if (p.meshMessageDeliveriesWeight !== 0 && p.meshMessageDeliveriesThreshold <= 0) {
+ throw new Error('invalid MeshMessageDeliveriesThreshold; must be positive')
+ }
+ if (p.meshMessageDeliveriesWindow < 0) {
+ throw new Error('invalid MeshMessageDeliveriesWindow; must be non-negative')
+ }
+ if (p.meshMessageDeliveriesWeight !== 0 && p.meshMessageDeliveriesActivation < 1000) {
+ throw new Error('invalid MeshMessageDeliveriesActivation; must be at least 1s')
+ }
+
+ // check P3b
+ if (p.meshFailurePenaltyWeight > 0) {
+ throw new Error('invalid MeshFailurePenaltyWeight; must be negative (or 0 to disable)')
+ }
+ if (p.meshFailurePenaltyWeight !== 0 && (p.meshFailurePenaltyDecay <= 0 || p.meshFailurePenaltyDecay >= 1)) {
+ throw new Error('invalid MeshFailurePenaltyDecay; must be between 0 and 1')
+ }
+
+ // check P4
+ if (p.invalidMessageDeliveriesWeight > 0) {
+ throw new Error('invalid InvalidMessageDeliveriesWeight; must be negative (or 0 to disable)')
+ }
+ if (p.invalidMessageDeliveriesDecay <= 0 || p.invalidMessageDeliveriesDecay >= 1) {
+ throw new Error('invalid InvalidMessageDeliveriesDecay; must be between 0 and 1')
+ }
+}
+
+const DefaultDecayInterval = 1000
+const DefaultDecayToZero = 0.01
+
+/**
+ * ScoreParameterDecay computes the decay factor for a parameter, assuming the DecayInterval is 1s
+ * and that the value decays to zero if it drops below 0.01
+ */
+export function scoreParameterDecay (decay: number): number {
+ return scoreParameterDecayWithBase(decay, DefaultDecayInterval, DefaultDecayToZero)
+}
+
+/**
+ * ScoreParameterDecay computes the decay factor for a parameter using base as the DecayInterval
+ */
+export function scoreParameterDecayWithBase (decay: number, base: number, decayToZero: number): number {
+ // the decay is linear, so after n ticks the value is factor^n
+ // so factor^n = decayToZero => factor = decayToZero^(1/n)
+ const ticks = decay / base
+ return decayToZero ** (1 / ticks)
+}