From 8118894e7aace29220d0a683cdccfeb368bfd770 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 4 Jun 2020 10:38:10 +0200 Subject: [PATCH 1/6] fix: use unidirectional streams (#78) * fix: use unidirectional streams * chore: use libp2p-pubsub@0.5.2 --- package.json | 2 +- test/2-nodes.spec.js | 45 +++++++-- test/floodsub.spec.js | 56 ++++++++++- test/gossip.js | 4 +- test/mesh.spec.js | 20 +++- test/multiple-nodes.spec.js | 195 ++++++++++++++++++++++++++++++++---- test/pubsub.spec.js | 32 ------ test/utils/index.js | 23 ++++- 8 files changed, 302 insertions(+), 75 deletions(-) diff --git a/package.json b/package.json index 51f1d9eb..8dd064be 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,7 @@ "err-code": "^2.0.0", "it-length-prefixed": "^3.0.0", "it-pipe": "^1.0.1", - "libp2p-pubsub": "^0.5.0", + "libp2p-pubsub": "~0.5.2", "p-map": "^4.0.0", "peer-id": "~0.13.12", "protons": "^1.0.1", diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js index a0a558b7..7c584efc 100644 --- a/test/2-nodes.spec.js +++ b/test/2-nodes.spec.js @@ -303,10 +303,8 @@ describe('2 nodes', () => { } = await createGossipsubNodes(2, true)) }) - after(() => Promise.all(nodes.map((n) => n.stop()))) - - it('existing subscriptions are sent upon peer connection', async function () { - this.timeout(5000) + // Make subscriptions prior to new nodes + before(() => { nodes[0].subscribe('Za') nodes[1].subscribe('Zb') @@ -314,17 +312,42 @@ describe('2 nodes', () => { expectSet(nodes[0].subscriptions, ['Za']) expect(nodes[1].peers.size).to.equal(0) expectSet(nodes[1].subscriptions, ['Zb']) + }) - // Connect nodes - const onConnect0 = registrarRecords[0][multicodec].onConnect - const onConnect1 = registrarRecords[1][multicodec].onConnect + after(() => Promise.all(nodes.map((n) => n.stop()))) - // Notice peers of connection - const [d0, d1] = ConnectionPair() - onConnect0(nodes[1].peerId, d0) - onConnect1(nodes[0].peerId, d1) + it('existing subscriptions are sent upon peer connection', async function () { + this.timeout(5000) + + const dial = async () => { + // Connect nodes + const onConnect0 = registrarRecords[0][multicodec].onConnect + const onConnect1 = registrarRecords[1][multicodec].onConnect + const handle0 = registrarRecords[0][multicodec].handler + const handle1 = registrarRecords[1][multicodec].handler + + // Notice peers of connection + const [d0, d1] = ConnectionPair() + await onConnect0(nodes[1].peerId, d0) + await handle1({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: nodes[0].peerId + } + }) + await onConnect1(nodes[0].peerId, d1) + await handle0({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: nodes[1].peerId + } + }) + } await Promise.all([ + dial(), new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)), new Promise((resolve) => nodes[1].once('pubsub:subscription-change', resolve)) ]) diff --git a/test/floodsub.spec.js b/test/floodsub.spec.js index 75e6ed16..831c1919 100644 --- a/test/floodsub.spec.js +++ b/test/floodsub.spec.js @@ -112,11 +112,27 @@ describe('gossipsub fallbacks to floodsub', () => { const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect + const handleGs = registrarRecords[0][floodsubMulticodec].handler + const handleFs = registrarRecords[1][floodsubMulticodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectGs(nodeFs.peerId, d0) - onConnectFs(nodeGs.peerId, d1) + await onConnectGs(nodeFs.peerId, d0) + await handleFs({ + protocol: floodsubMulticodec, + stream: d1.stream, + connection: { + remotePeer: nodeGs.peerId + } + }) + await onConnectFs(nodeGs.peerId, d1) + await handleGs({ + protocol: floodsubMulticodec, + stream: d0.stream, + connection: { + remotePeer: nodeFs.peerId + } + }) }) after(async function () { @@ -167,11 +183,27 @@ describe('gossipsub fallbacks to floodsub', () => { const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect + const handleGs = registrarRecords[0][floodsubMulticodec].handler + const handleFs = registrarRecords[1][floodsubMulticodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectGs(nodeFs.peerId, d0) - onConnectFs(nodeGs.peerId, d1) + await onConnectGs(nodeFs.peerId, d0) + await handleFs({ + protocol: floodsubMulticodec, + stream: d1.stream, + connection: { + remotePeer: nodeGs.peerId + } + }) + await onConnectFs(nodeGs.peerId, d1) + await handleGs({ + protocol: floodsubMulticodec, + stream: d0.stream, + connection: { + remotePeer: nodeFs.peerId + } + }) nodeGs.subscribe(topic) nodeFs.subscribe(topic) @@ -288,11 +320,27 @@ describe('gossipsub fallbacks to floodsub', () => { const onConnectGs = registrarRecords[0][floodsubMulticodec].onConnect const onConnectFs = registrarRecords[1][floodsubMulticodec].onConnect + const handleGs = registrarRecords[0][floodsubMulticodec].handler + const handleFs = registrarRecords[1][floodsubMulticodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() await onConnectGs(nodeFs.peerId, d0) + await handleFs({ + protocol: floodsubMulticodec, + stream: d1.stream, + connection: { + remotePeer: nodeGs.peerId + } + }) await onConnectFs(nodeGs.peerId, d1) + await handleGs({ + protocol: floodsubMulticodec, + stream: d0.stream, + connection: { + remotePeer: nodeFs.peerId + } + }) nodeGs.subscribe(topic) nodeFs.subscribe(topic) diff --git a/test/gossip.js b/test/gossip.js index 3a6f0aec..cf001f67 100644 --- a/test/gossip.js +++ b/test/gossip.js @@ -32,7 +32,7 @@ describe('gossip', () => { // add subscriptions to each node nodes.forEach((n) => n.subscribe(topic)) - connectGossipsubNodes(nodes, registrarRecords, multicodec) + await connectGossipsubNodes(nodes, registrarRecords, multicodec) await new Promise((resolve) => setTimeout(resolve, 1000)) @@ -69,7 +69,7 @@ describe('gossip', () => { nodes.forEach((n) => n.subscribe(topic)) // every node connected to every other - connectGossipsubNodes(nodes, registrarRecords, multicodec) + await connectGossipsubNodes(nodes, registrarRecords, multicodec) await new Promise((resolve) => setTimeout(resolve, 500)) // await mesh rebalancing await Promise.all(nodes.map((n) => new Promise((resolve) => n.once('gossipsub:heartbeat', resolve)))) diff --git a/test/mesh.spec.js b/test/mesh.spec.js index b3de12fe..78ab89d2 100644 --- a/test/mesh.spec.js +++ b/test/mesh.spec.js @@ -35,15 +35,31 @@ describe('mesh overlay', () => { // connect N (< GossipsubD) nodes to node0 const N = 4 const onConnect0 = registrarRecords[0][multicodec].onConnect + const handle0 = registrarRecords[0][multicodec].handler for (let i = nodes.length; i > nodes.length - N; i--) { const n = i - 1 const onConnectN = registrarRecords[n][multicodec].onConnect + const handleN = registrarRecords[n][multicodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnect0(nodes[n].peerId, d0) - onConnectN(nodes[0].peerId, d1) + await onConnect0(nodes[n].peerId, d0) + await handleN({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: nodes[0].peerId + } + }) + await onConnectN(nodes[0].peerId, d1) + await handle0({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: nodes[n].peerId + } + }) } // await mesh rebalancing diff --git a/test/multiple-nodes.spec.js b/test/multiple-nodes.spec.js index 7159eb09..9b7bc6e1 100644 --- a/test/multiple-nodes.spec.js +++ b/test/multiple-nodes.spec.js @@ -38,15 +38,46 @@ describe('multiple nodes (more than 2)', () => { const onConnectA = registrarRecords[0][multicodec].onConnect const onConnectB = registrarRecords[1][multicodec].onConnect const onConnectC = registrarRecords[2][multicodec].onConnect + const handleA = registrarRecords[0][multicodec].handler + const handleB = registrarRecords[1][multicodec].handler + const handleC = registrarRecords[2][multicodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectA(b.peerId, d0) - onConnectB(a.peerId, d1) + await onConnectA(b.peerId, d0) + await handleB({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: a.peerId + } + }) + await onConnectB(a.peerId, d1) + await handleA({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: b.peerId + } + }) const [d2, d3] = ConnectionPair() - onConnectB(c.peerId, d2) - onConnectC(b.peerId, d3) + await onConnectB(c.peerId, d2) + await handleC({ + protocol: multicodec, + stream: d3.stream, + connection: { + remotePeer: b.peerId + } + }) + await onConnectC(b.peerId, d3) + await handleB({ + protocol: multicodec, + stream: d2.stream, + connection: { + remotePeer: c.peerId + } + }) }) after(() => Promise.all(nodes.map((n) => n.stop()))) @@ -103,15 +134,46 @@ describe('multiple nodes (more than 2)', () => { const onConnectA = registrarRecords[0][multicodec].onConnect const onConnectB = registrarRecords[1][multicodec].onConnect const onConnectC = registrarRecords[2][multicodec].onConnect + const handleA = registrarRecords[0][multicodec].handler + const handleB = registrarRecords[1][multicodec].handler + const handleC = registrarRecords[2][multicodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectA(b.peerId, d0) - onConnectB(a.peerId, d1) + await onConnectA(b.peerId, d0) + await handleB({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: a.peerId + } + }) + await onConnectB(a.peerId, d1) + await handleA({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: b.peerId + } + }) const [d2, d3] = ConnectionPair() - onConnectB(c.peerId, d2) - onConnectC(b.peerId, d3) + await onConnectB(c.peerId, d2) + await handleC({ + protocol: multicodec, + stream: d3.stream, + connection: { + remotePeer: b.peerId + } + }) + await onConnectC(b.peerId, d3) + await handleB({ + protocol: multicodec, + stream: d2.stream, + connection: { + remotePeer: c.peerId + } + }) a.subscribe(topic) b.subscribe(topic) @@ -198,15 +260,46 @@ describe('multiple nodes (more than 2)', () => { const onConnectA = registrarRecords[0][multicodec].onConnect const onConnectB = registrarRecords[1][multicodec].onConnect const onConnectC = registrarRecords[2][multicodec].onConnect + const handleA = registrarRecords[0][multicodec].handler + const handleB = registrarRecords[1][multicodec].handler + const handleC = registrarRecords[2][multicodec].handler // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectA(b.peerId, d0) - onConnectB(a.peerId, d1) + await onConnectA(b.peerId, d0) + await handleB({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: a.peerId + } + }) + await onConnectB(a.peerId, d1) + await handleA({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: b.peerId + } + }) const [d2, d3] = ConnectionPair() - onConnectB(c.peerId, d2) - onConnectC(b.peerId, d3) + await onConnectB(c.peerId, d2) + await handleC({ + protocol: multicodec, + stream: d3.stream, + connection: { + remotePeer: b.peerId + } + }) + await onConnectC(b.peerId, d3) + await handleB({ + protocol: multicodec, + stream: d2.stream, + connection: { + remotePeer: c.peerId + } + }) a.subscribe(topic) b.subscribe(topic) @@ -264,22 +357,84 @@ describe('multiple nodes (more than 2)', () => { const onConnectD = registrarRecords[3][multicodec].onConnect const onConnectE = registrarRecords[4][multicodec].onConnect + const handleA = registrarRecords[0][multicodec].handler + const handleB = registrarRecords[1][multicodec].handler + const handleC = registrarRecords[2][multicodec].handler + const handleD = registrarRecords[3][multicodec].handler + const handleE = registrarRecords[4][multicodec].handler + // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectA(b.peerId, d0) - onConnectB(a.peerId, d1) + await onConnectA(b.peerId, d0) + await handleB({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: a.peerId + } + }) + await onConnectB(a.peerId, d1) + await handleA({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: b.peerId + } + }) const [d2, d3] = ConnectionPair() - onConnectB(c.peerId, d2) - onConnectC(b.peerId, d3) + await onConnectB(c.peerId, d2) + await handleC({ + protocol: multicodec, + stream: d3.stream, + connection: { + remotePeer: b.peerId + } + }) + await onConnectC(b.peerId, d3) + await handleB({ + protocol: multicodec, + stream: d2.stream, + connection: { + remotePeer: c.peerId + } + }) const [d4, d5] = ConnectionPair() - onConnectC(d.peerId, d4) - onConnectD(c.peerId, d5) + await onConnectC(d.peerId, d4) + await handleD({ + protocol: multicodec, + stream: d5.stream, + connection: { + remotePeer: c.peerId + } + }) + await onConnectD(c.peerId, d5) + await handleC({ + protocol: multicodec, + stream: d4.stream, + connection: { + remotePeer: d.peerId + } + }) const [d6, d7] = ConnectionPair() - onConnectD(e.peerId, d6) - onConnectE(d.peerId, d7) + await onConnectD(e.peerId, d6) + await handleE({ + protocol: multicodec, + stream: d7.stream, + connection: { + remotePeer: d.peerId + } + }) + await onConnectE(d.peerId, d7) + await handleD({ + protocol: multicodec, + stream: d6.stream, + connection: { + remotePeer: e.peerId + } + }) a.subscribe(topic) b.subscribe(topic) diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index d9991353..4b5bd93b 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -6,12 +6,10 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const sinon = require('sinon') -const pWaitFor = require('p-wait-for') const { utils } = require('libp2p-pubsub') const { createGossipsub, - createPeerId, mockRegistrar } = require('./utils') @@ -123,36 +121,6 @@ describe('Pubsub', () => { }) }) - describe('process', () => { - it('should disconnect peer on stream error', async () => { - sinon.spy(gossipsub, '_onPeerDisconnected') - - const peerId = await createPeerId() - const mockConn = { - newStream () { - return { - stream: { - sink: async source => { - for await (const _ of source) { // eslint-disable-line no-unused-vars - // mock stream just swallows any data sent - } - }, - source: (async function * () { // eslint-disable-line require-yield - // throw in a bit - await new Promise(resolve => setTimeout(resolve, 100)) - throw new Error('boom') - })() - } - } - } - } - - gossipsub._onPeerConnected(peerId, mockConn) - - await pWaitFor(() => gossipsub._onPeerDisconnected.calledWith(peerId), { timeout: 1000 }) - }) - }) - describe('topic validators', () => { it('should filter messages by topic validator', async () => { // use processRpcMessage.callCount to see if a message is valid or not diff --git a/test/utils/index.js b/test/utils/index.js index f6881941..0e74c8af 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -54,17 +54,34 @@ const createGossipsubNodes = async (n, shouldStart, options) => { exports.createGossipsubNodes = createGossipsubNodes -const connectGossipsubNodes = (nodes, registrarRecords, multicodec) => { +const connectGossipsubNodes = async (nodes, registrarRecords, multicodec) => { // connect all nodes for (let i = 0; i < nodes.length; i++) { for (let j = i + 1; j < nodes.length; j++) { const onConnectI = registrarRecords[i][multicodec].onConnect const onConnectJ = registrarRecords[j][multicodec].onConnect + const handleI = registrarRecords[i][multicodec].handler + const handleJ = registrarRecords[j][multicodec].handler + // Notice peers of connection const [d0, d1] = ConnectionPair() - onConnectI(nodes[j].peerId, d0) - onConnectJ(nodes[i].peerId, d1) + await onConnectI(nodes[j].peerId, d0) + await handleJ({ + protocol: multicodec, + stream: d1.stream, + connection: { + remotePeer: nodes[i].peerId + } + }) + await onConnectJ(nodes[i].peerId, d1) + await handleI({ + protocol: multicodec, + stream: d0.stream, + connection: { + remotePeer: nodes[j].peerId + } + }) } } From 2f1fdaa935aa242d060ffb2b95867e96bbece15b Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 4 Jun 2020 10:43:46 +0200 Subject: [PATCH 2/6] chore: update contributors --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 8dd064be..dc1b9b9d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "libp2p-gossipsub", - "version": "0.4.4", + "version": "0.4.5", "description": "A typescript implementation of gossipsub", "leadMaintainer": "Cayman Nava ", "main": "src/index.js", From 97943feb72b1cf992c99c9ecb0efe01c5d1d5a57 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 4 Jun 2020 10:43:47 +0200 Subject: [PATCH 3/6] chore: release version v0.4.5 --- CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d7477e0..7c3ff9e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ + +## [0.4.5](https://github.com/ChainSafe/gossipsub-js/compare/v0.4.4...v0.4.5) (2020-06-04) + + +### Bug Fixes + +* use unidirectional streams ([#78](https://github.com/ChainSafe/gossipsub-js/issues/78)) ([8118894](https://github.com/ChainSafe/gossipsub-js/commit/8118894)) + + + ## [0.4.4](https://github.com/ChainSafe/gossipsub-js/compare/v0.4.3...v0.4.4) (2020-06-03) From 32db62a5ced2f6638f2aab97fe2b87dcefa11c09 Mon Sep 17 00:00:00 2001 From: Cayman Date: Wed, 3 Jun 2020 16:23:58 -0500 Subject: [PATCH 4/6] chore: add peer score machinery --- package.json | 1 + test/peerScore.spec.js | 719 ++++++++++++++++++++++++++++++++++ test/scoreParams.spec.js | 419 ++++++++++++++++++++ ts/constants.ts | 2 + ts/score/computeScore.ts | 92 +++++ ts/score/index.ts | 3 + ts/score/messageDeliveries.ts | 92 +++++ ts/score/peerScore.ts | 596 ++++++++++++++++++++++++++++ ts/score/peerStats.ts | 114 ++++++ ts/score/scoreParams.ts | 376 ++++++++++++++++++ 10 files changed, 2414 insertions(+) create mode 100644 test/peerScore.spec.js create mode 100644 test/scoreParams.spec.js create mode 100644 ts/score/computeScore.ts create mode 100644 ts/score/index.ts create mode 100644 ts/score/messageDeliveries.ts create mode 100644 ts/score/peerScore.ts create mode 100644 ts/score/peerStats.ts create mode 100644 ts/score/scoreParams.ts diff --git a/package.json b/package.json index dc1b9b9d..b1a0fd6d 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ "dependencies": { "buffer": "^5.6.0", "debug": "^4.1.1", + "denque": "^1.4.1", "err-code": "^2.0.0", "it-length-prefixed": "^3.0.0", "it-pipe": "^1.0.1", diff --git a/test/peerScore.spec.js b/test/peerScore.spec.js new file mode 100644 index 00000000..7b1d4373 --- /dev/null +++ b/test/peerScore.spec.js @@ -0,0 +1,719 @@ +const { expect } = require('chai') +const PeerId = require('peer-id') +const { utils } = require('libp2p-pubsub') +const { PeerScore, createPeerScoreParams, createTopicScoreParams } = require('../src/score') + +const addrBook = new Map() +addrBook.getMultiaddrsForPeer = () => ([]) + +const makeTestMessage = (i) => { + return { + seqno: Buffer.alloc(8, i), + data: Buffer.from([i]), + from: "test", + topicIDs: [] + } +} + +describe('PeerScore', () => { + it('should score based on time in mesh', async () => { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 0.5, + timeInMeshWeight: 1, + timeInMeshQuantum: 1, + timeInMeshCap: 3600, + invalidMessageDeliveriesDecay: 0.1, + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + // Peer score should start at 0 + const ps = new PeerScore(params, addrBook, utils.msgId) + ps.addPeer(peerA) + + let aScore = ps.score(peerA) + expect(aScore, 'expected score to start at zero').to.equal(0) + + // The time in mesh depends on how long the peer has been grafted + ps.graft(peerA, mytopic) + const elapsed = tparams.timeInMeshQuantum * 100 + await new Promise(resolve => setTimeout(resolve, elapsed)) + + ps._refreshScores() + aScore = ps.score(peerA) + expect(aScore).to.be.gte( + tparams.topicWeight * tparams.timeInMeshWeight / tparams.timeInMeshQuantum * elapsed + ) + }) + it('should cap time in mesh score', async () => { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 0.5, + timeInMeshWeight: 1, + timeInMeshQuantum: 1, + timeInMeshCap: 10, + invalidMessageDeliveriesDecay: 0.1, + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + // Peer score should start at 0 + const ps = new PeerScore(params, addrBook, utils.msgId) + ps.addPeer(peerA) + + let aScore = ps.score(peerA) + expect(aScore, 'expected score to start at zero').to.equal(0) + + // The time in mesh depends on how long the peer has been grafted + ps.graft(peerA, mytopic) + const elapsed = tparams.timeInMeshQuantum * 40 + await new Promise(resolve => setTimeout(resolve, elapsed)) + + ps._refreshScores() + aScore = ps.score(peerA) + expect(aScore).to.be.gt( + tparams.topicWeight * tparams.timeInMeshWeight * tparams.timeInMeshCap * 0.5 + ) + expect(aScore).to.be.lt( + tparams.topicWeight * tparams.timeInMeshWeight * tparams.timeInMeshCap * 1.5 + ) + }) + it('should score first message deliveries', async () => { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.9, + invalidMessageDeliveriesDecay: 0.9, + firstMessageDeliveriesCap: 50000, + timeInMeshQuantum: 1000 + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + // Peer score should start at 0 + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + + let aScore = ps.score(peerA) + expect(aScore, 'expected score to start at zero').to.equal(0) + + // The time in mesh depends on how long the peer has been grafted + ps.graft(peerA, mytopic) + + // deliver a bunch of messages from peer A + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i) + msg.topicIDs = [mytopic] + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + } + + ps._refreshScores() + aScore = ps.score(peerA) + expect(aScore).to.be.equal( + tparams.topicWeight * tparams.firstMessageDeliveriesWeight * nMessages * tparams.firstMessageDeliveriesDecay + ) + }) + it('should cap first message deliveries score', async () => { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.9, + invalidMessageDeliveriesDecay: 0.9, + firstMessageDeliveriesCap: 50, + timeInMeshQuantum: 1000 + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + // Peer score should start at 0 + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + + let aScore = ps.score(peerA) + expect(aScore, 'expected score to start at zero').to.equal(0) + + // The time in mesh depends on how long the peer has been grafted + ps.graft(peerA, mytopic) + + // deliver a bunch of messages from peer A + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i) + msg.topicIDs = [mytopic] + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + } + + ps._refreshScores() + aScore = ps.score(peerA) + expect(aScore).to.be.equal( + tparams.topicWeight * tparams.firstMessageDeliveriesWeight * tparams.firstMessageDeliveriesCap * tparams.firstMessageDeliveriesDecay + ) + }) + it('should decay first message deliveries score', async () => { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.9, // decay 10% per decay interval + invalidMessageDeliveriesDecay: 0.9, + firstMessageDeliveriesCap: 50, + timeInMeshQuantum: 1000 + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + // Peer score should start at 0 + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + + let aScore = ps.score(peerA) + expect(aScore, 'expected score to start at zero').to.equal(0) + + // The time in mesh depends on how long the peer has been grafted + ps.graft(peerA, mytopic) + + // deliver a bunch of messages from peer A + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i) + msg.topicIDs = [mytopic] + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + } + + ps._refreshScores() + aScore = ps.score(peerA) + let expected = tparams.topicWeight * tparams.firstMessageDeliveriesWeight * tparams.firstMessageDeliveriesCap * tparams.firstMessageDeliveriesDecay + expect(aScore).to.be.equal(expected) + + // refreshing the scores applies the decay param + const decayInterals = 10 + for (let i = 0; i < decayInterals; i++) { + ps._refreshScores() + expected *= tparams.firstMessageDeliveriesDecay + } + aScore = ps.score(peerA) + expect(aScore).to.be.equal(expected) + }) + it('should score mesh message deliveries', async function () { + this.timeout(5000) + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesActivation: 1000, + meshMessageDeliveriesWindow: 10, + meshMessageDeliveriesThreshold: 20, + meshMessageDeliveriesCap: 100, + meshMessageDeliveriesDecay: 0.9, + invalidMessageDeliveriesDecay: 0.9, + firstMessageDeliveriesWeight: 0, + timeInMeshQuantum: 1000 + }) + // peer A always delivers the message first + // peer B delivers next (within the delivery window) + // peer C delivers outside the delivery window + // we expect peers A and B to have a score of zero, since all other param weights are zero + // peer C should have a negative score + const peerA = await PeerId.create({keyType: 'secp256k1'}) + const peerB = await PeerId.create({keyType: 'secp256k1'}) + const peerC = await PeerId.create({keyType: 'secp256k1'}) + const peers = [peerA, peerB, peerC] + // Peer score should start at 0 + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + peers.forEach(p => { + ps.addPeer(p) + ps.graft(p, mytopic) + }) + + // assert that nobody has been penalized yet for not delivering messages before activation time + ps._refreshScores() + peers.forEach(p => { + const score = ps.score(p) + expect( + score, + 'expected no mesh delivery penalty before activation time' + ).to.equal(0) + }) + // wait for the activation time to kick in + await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation)) + + // deliver a bunch of messages from peers + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i) + msg.topicIDs = [mytopic] + + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + + ps.duplicateMessage(peerB, msg) + + // deliver duplicate from peer C after the window + await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesWindow + 5)) + ps.duplicateMessage(peerC, msg) + } + ps._refreshScores() + const aScore = ps.score(peerA) + const bScore = ps.score(peerB) + const cScore = ps.score(peerC) + expect(aScore).to.be.gte(0) + expect(bScore).to.be.gte(0) + + // the penalty is the difference between the threshold and the actual mesh deliveries, squared. + // since we didn't deliver anything, this is just the value of the threshold + const penalty = tparams.meshMessageDeliveriesThreshold * tparams.meshMessageDeliveriesThreshold + const expected = tparams.topicWeight * tparams.meshMessageDeliveriesWeight * penalty + expect(cScore).to.be.equal(expected) + }) + it('should decay mesh message deliveries score', async function () { + this.timeout(5000) + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesActivation: 1000, + meshMessageDeliveriesWindow: 10, + meshMessageDeliveriesThreshold: 20, + meshMessageDeliveriesCap: 100, + meshMessageDeliveriesDecay: 0.9, + invalidMessageDeliveriesDecay: 0.9, + firstMessageDeliveriesWeight: 0, + timeInMeshQuantum: 1000 + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + // Peer score should start at 0 + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.graft(peerA, mytopic) + + // wait for the activation time to kick in + await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation)) + + // deliver a bunch of messages from peer A + const nMessages = 40 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i) + msg.topicIDs = [mytopic] + + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + } + ps._refreshScores() + let aScore = ps.score(peerA) + expect(aScore).to.be.gte(0) + + // we need to refresh enough times for the decay to bring us below the threshold + let decayedDeliveryCount = nMessages * tparams.meshMessageDeliveriesDecay + for (let i = 0; i < 20; i++) { + ps._refreshScores() + decayedDeliveryCount *= tparams.meshMessageDeliveriesDecay + } + aScore = ps.score(peerA) + // the penalty is the difference between the threshold and the (decayed) mesh deliveries, squared. + const deficit = tparams.meshMessageDeliveriesThreshold - decayedDeliveryCount + const penalty = deficit * deficit + const expected = tparams.topicWeight * tparams.meshMessageDeliveriesWeight * penalty + expect(aScore).to.be.equal(expected) + }) + it('should score mesh message failures', async function () { + this.timeout(5000) + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + // the mesh failure penalty is applied when a peer is pruned while their + // mesh deliveries are under the threshold. + // for this test, we set the mesh delivery threshold, but set + // meshMessageDeliveriesWeight to zero, so the only affect on the score + // is from the mesh failure penalty + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: 0.9, + + meshMessageDeliveriesWeight: 0, + meshMessageDeliveriesActivation: 1000, + meshMessageDeliveriesWindow: 10, + meshMessageDeliveriesThreshold: 20, + meshMessageDeliveriesCap: 100, + meshMessageDeliveriesDecay: 0.9, + + invalidMessageDeliveriesDecay: 0.9, + firstMessageDeliveriesWeight: 0, + timeInMeshQuantum: 1000 + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + const peerB = await PeerId.create({keyType: 'secp256k1'}) + const peers = [peerA, peerB] + // Peer score should start at 0 + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + peers.forEach(p => { + ps.addPeer(p) + ps.graft(p, mytopic) + }) + + // wait for the activation time to kick in + await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation)) + + // deliver a bunch of messages from peer A. peer B does nothing + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i) + msg.topicIDs = [mytopic] + + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + } + // peers A and B should both have zero scores, since the failure penalty hasn't been applied yet + ps._refreshScores() + let aScore = ps.score(peerA) + let bScore = ps.score(peerB) + expect(aScore).to.be.equal(0) + expect(bScore).to.be.equal(0) + + // prune peer B to apply the penalty + ps.prune(peerB, mytopic) + ps._refreshScores() + aScore = ps.score(peerA) + bScore = ps.score(peerB) + expect(aScore).to.be.equal(0) + + // penalty calculation is the same as for meshMessageDeliveries, but multiplied by meshFailurePenaltyWeight + // instead of meshMessageDeliveriesWeight + const penalty = tparams.meshMessageDeliveriesThreshold * tparams.meshMessageDeliveriesThreshold + const expected = tparams.topicWeight * tparams.meshFailurePenaltyWeight * penalty * tparams.meshFailurePenaltyDecay + expect(bScore).to.be.equal(expected) + }) + it('should score invalid message deliveries', async function () { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.9, + timeInMeshQuantum: 1000 + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.graft(peerA, mytopic) + + // deliver a bunch of messages from peer A + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i) + msg.topicIDs = [mytopic] + + ps.rejectMessage(peerA, msg) + } + ps._refreshScores() + let aScore = ps.score(peerA) + + const expected = tparams.topicWeight * tparams.invalidMessageDeliveriesWeight * (nMessages * tparams.invalidMessageDeliveriesDecay) ** 2 + expect(aScore).to.be.equal(expected) + }) + it('should decay invalid message deliveries score', async function () { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.9, + timeInMeshQuantum: 1000 + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.graft(peerA, mytopic) + + // deliver a bunch of messages from peer A + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i) + msg.topicIDs = [mytopic] + + ps.rejectMessage(peerA, msg) + } + ps._refreshScores() + let aScore = ps.score(peerA) + + let expected = tparams.topicWeight * tparams.invalidMessageDeliveriesWeight * (nMessages * tparams.invalidMessageDeliveriesDecay) ** 2 + expect(aScore).to.be.equal(expected) + + // refresh scores a few times to apply decay + for (let i = 0; i < 10; i++) { + ps._refreshScores() + expected *= tparams.invalidMessageDeliveriesDecay ** 2 + } + aScore = ps.score(peerA) + expect(aScore).to.be.equal(expected) + }) + it('should score invalid/ignored messages', async function () { + // this test adds coverage for the dark corners of message rejection + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.9, + timeInMeshQuantum: 1000 + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + const peerB = await PeerId.create({keyType: 'secp256k1'}) + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.addPeer(peerB) + + const msg = makeTestMessage(0) + msg.topicIDs = [mytopic] + + // insert a record + ps.validateMessage(peerA, msg) + + // this should have no effect in the score, and subsequent duplicate messages should have no effect either + ps.ignoreMessage(peerA, msg) + ps.duplicateMessage(peerB, msg) + + let aScore = ps.score(peerA) + let bScore = ps.score(peerB) + let expected = 0 + expect(aScore).to.equal(expected) + expect(bScore).to.equal(expected) + + // now clear the delivery record + ps.deliveryRecords.queue.peekFront().expire = Date.now() + await new Promise(resolve => setTimeout(resolve, 1)) + ps.deliveryRecords.gc() + + // insert a new record in the message deliveries + ps.validateMessage(peerA, msg) + + // and reject the message to make sure duplicates are also penalized + ps.rejectMessage(peerA, msg) + ps.duplicateMessage(peerB, msg) + + aScore = ps.score(peerA) + bScore = ps.score(peerB) + expected = -1 + expect(aScore).to.equal(expected) + expect(bScore).to.equal(expected) + + // now clear the delivery record again + ps.deliveryRecords.queue.peekFront().expire = Date.now() + await new Promise(resolve => setTimeout(resolve, 1)) + ps.deliveryRecords.gc() + + // insert a new record in the message deliveries + ps.validateMessage(peerA, msg) + + // and reject the message after a duplicate has arrived + ps.duplicateMessage(peerB, msg) + ps.rejectMessage(peerA, msg) + + aScore = ps.score(peerA) + bScore = ps.score(peerB) + expected = -4 + expect(aScore).to.equal(expected) + expect(bScore).to.equal(expected) + }) + it('should score w/ application score', async function () { + const mytopic = 'mytopic' + let appScoreValue = 0 + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + appSpecificScore: () => appScoreValue, + appSpecificWeight: 0.5, + decayToZero: 0.1 + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.graft(peerA, mytopic) + + for (let i = -100; i < 100; i++) { + appScoreValue = i + ps._refreshScores() + const aScore = ps.score(peerA) + const expected = i * params.appSpecificWeight + expect(aScore).to.equal(expected) + } + }) + it('should score w/ IP colocation', async function () { + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + invalidMessageDeliveriesDecay: 0.1, + IPColocationFactorThreshold: 1, + IPColocationFactorWeight: -1, + decayToZero: 0.1 + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + const peerB = await PeerId.create({keyType: 'secp256k1'}) + const peerC = await PeerId.create({keyType: 'secp256k1'}) + const peerD = await PeerId.create({keyType: 'secp256k1'}) + const peers = [peerA, peerB, peerC, peerD] + + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + peers.forEach(p => { + ps.addPeer(p) + ps.graft(p, mytopic) + }) + + const setIPsForPeer = (p, ips) => { + ps._setIPs(p, ips, []) + const pstats = ps.peerStats.get(p) + pstats.ips = ips + } + // peerA should have no penalty, but B, C, and D should be penalized for sharing an IP + setIPsForPeer(peerA, ['1.2.3.4']) + setIPsForPeer(peerB, ['2.3.4.5']) + setIPsForPeer(peerC, ['2.3.4.5', '3.4.5.6']) + setIPsForPeer(peerD, ['2.3.4.5']) + + ps._refreshScores() + const aScore = ps.score(peerA) + const bScore = ps.score(peerB) + const cScore = ps.score(peerC) + const dScore = ps.score(peerD) + + expect(aScore).to.equal(0) + + const nShared = 3 + const ipSurplus = nShared - params.IPColocationFactorThreshold + const penalty = ipSurplus ** 2 + const expected = params.IPColocationFactorWeight * penalty + expect(bScore).to.equal(expected) + expect(cScore).to.equal(expected) + expect(dScore).to.equal(expected) + }) + it('should score w/ behavior penalty', async function () { + const params = createPeerScoreParams({ + decayInterval: 1000, + behaviourPenaltyWeight: -1, + behaviourPenaltyDecay: 0.99, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1 + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + + // add penalty on a non-existent peer + ps.addPenalty(peerA, 1) + let aScore = ps.score(peerA) + expect(aScore).to.equal(0) + + // add the peer and test penalties + ps.addPeer(peerA) + + aScore = ps.score(peerA) + expect(aScore).to.equal(0) + + ps.addPenalty(peerA, 1) + aScore = ps.score(peerA) + expect(aScore).to.equal(-1) + + ps.addPenalty(peerA, 1) + aScore = ps.score(peerA) + expect(aScore).to.equal(-4) + + ps._refreshScores() + + aScore = ps.score(peerA) + expect(aScore).to.equal(-3.9204) + }) + it('should handle score retention', async function () { + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + decayInterval: 1000, + appSpecificScore: () => -1000, + appSpecificWeight: 1, + invalidMessageDeliveriesDecay: 0.1, + decayToZero: 0.1, + retainScore: 800, + }) + const peerA = await PeerId.create({keyType: 'secp256k1'}) + + const ps = new PeerScore(params, addrBook, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.graft(peerA, mytopic) + + // score should equal -1000 (app-specific score) + const expected = -1000 + ps._refreshScores() + let aScore = ps.score(peerA) + expect(aScore).to.equal(expected) + + // disconnect & wait half of the retainScoreTime + // should still have negative score + ps.removePeer(peerA) + const delay = params.retainScore / 2 + await new Promise(resolve => setTimeout(resolve, delay)) + ps._refreshScores() + aScore = ps.score(peerA) + expect(aScore).to.equal(expected) + + // wait remaining time (plus a little slop) and the score should reset to 0 + await new Promise(resolve => setTimeout(resolve, delay + 5)) + ps._refreshScores() + aScore = ps.score(peerA) + expect(aScore).to.equal(0) + }) +}) diff --git a/test/scoreParams.spec.js b/test/scoreParams.spec.js new file mode 100644 index 00000000..d7c9e8ce --- /dev/null +++ b/test/scoreParams.spec.js @@ -0,0 +1,419 @@ +const { expect } = require('chai') +const { + createPeerScoreThresholds, validatePeerScoreThresholds, + createTopicScoreParams, validateTopicScoreParams, + createPeerScoreParams, validatePeerScoreParams +} = require('../src/score/scoreParams') + +describe('PeerScoreThresholds validation', () => { + it('should throw on invalid PeerScoreThresholds', () => { + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + gossipThreshold: 1 + }) + )).to.throw + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + publishThreshold: 1 + }) + )).to.throw + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + gossipThreshold: -1, + publishThreshold: 0 + }) + )).to.throw + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + gossipThreshold: -1, + publishThreshold: -2 + }) + )).to.throw + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + acceptPXThreshold: -1 + }) + )).to.throw + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + opportunisticGraftThreshold: -1 + }) + )).to.throw + }) + it('should not throw on valid PeerScoreThresholds', () => { + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + gossipThreshold: -1, + publishThreshold: -2, + graylistThreshold: -3, + acceptPXThreshold: 1, + opportunisticGraftThreshold: 2 + }) + )).to.not.throw + }) +}) + +describe('TopicScoreParams validation', () => { + it('should throw on invalid TopicScoreParams', () => { + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + topicWeight: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshWeight: -1, + timeInMeshQuantum: 1000 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshWeight: 1, + timeInMeshQuantum: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshWeight: 1, + timeInMeshQuantum: 1000, + timeInMeshCap: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + firstMessageDeliveriesWeight: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 2 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: 1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 2 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesDecay: 5, + meshMessageDeliveriesThreshold: -3 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesDecay: 5, + meshMessageDeliveriesThreshold: 3, + meshMessageDeliveriesWindow: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesDecay: 5, + meshMessageDeliveriesThreshold: 3, + meshMessageDeliveriesWindow: 1, + meshMessageDeliveriesActivation: 1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshFailurePenaltyWeight: 1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: 2 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + invalidMessageDeliveriesWeight: 1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 2 + }) + )).to.throw + }) + it('should not throw on valid TopicScoreParams', () => { + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + topicWeight: 2, + timeInMeshWeight: 0.01, + timeInMeshQuantum: 1000, + timeInMeshCap: 10, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 10, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: 10, + meshMessageDeliveriesThreshold: 5, + meshMessageDeliveriesWindow: 1, + meshMessageDeliveriesActivation: 1000, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: 0.5, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.5 + }) + )).to.not.throw + }) +}) + +describe('PeerScoreParams validation', () => { + const appScore = () => 0 + + it('should throw on invalid PeerScoreParams', () => { + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: -1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + decayInterval: 1000, + decayToZero: 0.01 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: 1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: -1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: -1, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 2, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + behaviourPenaltyWeight: 1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + behaviourPenaltyWeight: -1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + behaviourPenaltyWeight: -1, + behaviourPenaltyDecay: 2 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1, + topics: { + test: { + topicWeight: -1, + timeInMeshWeight: 0.01, + timeInMeshQuantum: time.Second, + timeInMeshCap: 10, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 10, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: 10, + meshMessageDeliveriesThreshold: 5, + meshMessageDeliveriesWindow: 1, + meshMessageDeliveriesActivation: 1000, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: 0.5, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.5 + } + } + }) + )).to.throw + }) + it('should not throw on valid PeerScoreParams', () => { + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1, + behaviourPenaltyWeight: -1, + behaviourPenaltyDecay: 0.999 + }) + )).to.not.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1, + behaviourPenaltyWeight: -1, + behaviourPenaltyDecay: 0.999, + }) + )).to.not.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: time.Second, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1, + topics: { + test: { + topicWeight: 1, + timeInMeshWeight: 0.01, + timeInMeshQuantum: 1000, + timeInMeshCap: 10, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 10, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: 10, + meshMessageDeliveriesThreshold: 5, + meshMessageDeliveriesWindow: 1, + meshMessageDeliveriesActivation: 1000, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: 0.5, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.5, + }, + }, + }) + )).to.not.throw + }) +}) \ No newline at end of file diff --git a/ts/constants.ts b/ts/constants.ts index 7d68de34..b21f329d 100644 --- a/ts/constants.ts +++ b/ts/constants.ts @@ -22,3 +22,5 @@ export const GossipsubHeartbeatInterval = second // Fanout ttl export const GossipsubFanoutTTL = minute + +export const TimeCacheDuration = 120 * 1000 diff --git a/ts/score/computeScore.ts b/ts/score/computeScore.ts new file mode 100644 index 00000000..e76f5145 --- /dev/null +++ b/ts/score/computeScore.ts @@ -0,0 +1,92 @@ +import { PeerStats } from './peerStats' +import { PeerScoreParams } from './scoreParams' +import PeerId = require('peer-id') + +export function computeScore ( + peer: PeerId, + pstats: PeerStats, + params: PeerScoreParams, + peerIPs: Map> +): number { + let score = 0 + + // topic stores + Object.entries(pstats.topics).forEach(([topic, tstats]) => { + // the topic parameters + const topicParams = params.topics[topic] + if (!topicParams) { + // we are not scoring this topic + return + } + + let topicScore = 0 + + // P1: time in Mesh + if (tstats.inMesh) { + let p1 = tstats.meshTime / topicParams.timeInMeshQuantum + if (p1 > topicParams.timeInMeshCap) { + p1 = topicParams.timeInMeshCap + } + topicScore += p1 * topicParams.timeInMeshWeight + } + + // P2: first message deliveries + const p2 = tstats.firstMessageDeliveries + topicScore += p2 * topicParams.firstMessageDeliveriesWeight + + // P3: mesh message deliveries + if (tstats.meshMessageDeliveriesActive) { + if (tstats.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold) { + const deficit = topicParams.meshMessageDeliveriesThreshold - tstats.meshMessageDeliveries + const p3 = deficit * deficit + topicScore += p3 * topicParams.meshMessageDeliveriesWeight + } + } + + // P3b: + // NOTE: the weight of P3b is negative (validated in validateTopicScoreParams) so this detracts + const p3b = tstats.meshFailurePenalty + topicScore += p3b * topicParams.meshFailurePenaltyWeight + + // P4: invalid messages + // NOTE: the weight of P4 is negative (validated in validateTopicScoreParams) so this detracts + const p4 = tstats.invalidMessageDeliveries * tstats.invalidMessageDeliveries + topicScore += p4 * topicParams.invalidMessageDeliveriesWeight + + // update score, mixing with topic weight + score += topicScore * topicParams.topicWeight + }) + + // apply the topic score cap, if any + if (params.topicScoreCap > 0 && score > params.topicScoreCap) { + score = params.topicScoreCap + } + + // P5: application-specific score + const p5 = params.appSpecificScore(peer) + score += p5 * params.appSpecificWeight + + // P6: IP colocation factor + pstats.ips.forEach(ip => { + if (params.IPColocationFactorWhitelist.has(ip)) { + return + } + + // P6 has a cliff (IPColocationFactorThreshold) + // It's only applied iff at least that many peers are connected to us from that source IP addr. + // It is quadratic, and the weight is negative (validated in validatePeerScoreParams) + const peersInIP = peerIPs.get(ip) + const numPeersInIP = peersInIP ? peersInIP.size : 0 + if (numPeersInIP > params.IPColocationFactorThreshold) { + const surplus = numPeersInIP - params.IPColocationFactorThreshold + const p6 = surplus * surplus + score += p6 * params.IPColocationFactorWeight + } + }) + + // P7: behavioural pattern penalty + const p7 = pstats.behaviourPenalty * pstats.behaviourPenalty + score += p7 * params.behaviourPenaltyWeight + + return score +} diff --git a/ts/score/index.ts b/ts/score/index.ts new file mode 100644 index 00000000..1eea1395 --- /dev/null +++ b/ts/score/index.ts @@ -0,0 +1,3 @@ +export * from './scoreParams' +export * from './peerStats' +export * from './peerScore' diff --git a/ts/score/messageDeliveries.ts b/ts/score/messageDeliveries.ts new file mode 100644 index 00000000..1315ab19 --- /dev/null +++ b/ts/score/messageDeliveries.ts @@ -0,0 +1,92 @@ +import { TimeCacheDuration } from '../constants' +import Denque from 'denque' +import PeerId = require('peer-id') + +export enum DeliveryRecordStatus { + /** + * we don't know (yet) if the message is valid + */ + unknown, + /** + * we know the message is valid + */ + valid, + /** + * we know the message is invalid + */ + invalid, + /** + * we were instructed by the validator to ignore the message + */ + ignored +} + +export interface DeliveryRecord { + status: DeliveryRecordStatus + firstSeen: number + validated: number + peers: Set +} + +interface DeliveryQueueEntry { + msgId: string + expire: number +} + +/** + * Map of message ID to DeliveryRecord + * + * Maintains an internal queue for efficient gc of old messages + */ +export class MessageDeliveries { + private records: Map + private queue: Denque + + constructor () { + this.records = new Map() + this.queue = new Denque() + } + + ensureRecord (msgId: string): DeliveryRecord { + let drec = this.records.get(msgId) + if (drec) { + return drec + } + + // record doesn't exist yet + // create record + drec = { + status: DeliveryRecordStatus.unknown, + firstSeen: Date.now(), + validated: 0, + peers: new Set() + } + this.records.set(msgId, drec) + + // and add msgId to the queue + const entry: DeliveryQueueEntry = { + msgId, + expire: Date.now() + TimeCacheDuration + } + this.queue.push(entry) + + return drec + } + + gc (): void { + const now = Date.now() + // queue is sorted by expiry time + // remove expired messages, remove from queue until first un-expired message found + let head = this.queue.peekFront() + while (head && head.expire < now) { + this.records.delete(head.msgId) + this.queue.shift() + head = this.queue.peekFront() + } + } + + clear (): void { + this.records.clear() + this.queue.clear() + } +} diff --git a/ts/score/peerScore.ts b/ts/score/peerScore.ts new file mode 100644 index 00000000..513d51de --- /dev/null +++ b/ts/score/peerScore.ts @@ -0,0 +1,596 @@ +import { Message } from '../message' +import { PeerScoreParams, validatePeerScoreParams } from './scoreParams' +import { PeerStats, createPeerStats, ensureTopicStats } from './peerStats' +import { computeScore } from './computeScore' +import { MessageDeliveries, DeliveryRecordStatus } from './messageDeliveries' +import PeerId = require('peer-id') +import Multiaddr = require('multiaddr') +// eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore +import debug = require('debug') + +const log = debug('libp2p:gossipsub:score') + +interface AddressBook { + getMultiaddrsForPeer(id: PeerId): Multiaddr[] + // eslint-disable-next-line @typescript-eslint/ban-types + on(evt: string, fn: Function): void + // eslint-disable-next-line @typescript-eslint/ban-types + off(evt: string, fn: Function): void +} + +export class PeerScore { + /** + * The score parameters + */ + params: PeerScoreParams + /** + * Per-peer stats for score calculation + */ + peerStats: Map + /** + * IP colocation tracking; maps IP => set of peers. + */ + peerIPs: Map> + /** + * Recent message delivery timing/participants + */ + deliveryRecords: MessageDeliveries + /** + * Message ID function + */ + msgId: (message: Message) => string + _addressBook: AddressBook + _backgroundInterval: NodeJS.Timeout + + constructor (params: PeerScoreParams, addressBook: AddressBook, msgId: (message: Message) => string) { + validatePeerScoreParams(params) + this.params = params + this._addressBook = addressBook + this.peerStats = new Map() + this.peerIPs = new Map() + this.deliveryRecords = new MessageDeliveries() + this.msgId = msgId + } + + /** + * Start PeerScore instance + * @returns {void} + */ + start (): void { + if (this._backgroundInterval) { + throw new Error('Peer score already running') + } + this._backgroundInterval = setInterval(() => this.background(), this.params.decayInterval) + this._addressBook.on('change:multiaddrs', this._updateIPs) + } + + /** + * Stop PeerScore instance + * @returns {void} + */ + stop (): void { + if (!this._backgroundInterval) { + throw new Error('Peer store already stopped') + } + clearInterval(this._backgroundInterval) + delete this._backgroundInterval + this._addressBook.off('change:multiaddrs', this._updateIPs) + } + + /** + * Periodic maintenance + * @returns {void} + */ + background (): void { + this._refreshScores() + this.deliveryRecords.gc() + } + + /** + * Decays scores, and purges score records for disconnected peers once their expiry has elapsed. + * @returns {void} + */ + _refreshScores (): void { + const now = Date.now() + const decayToZero = this.params.decayToZero + + this.peerStats.forEach((pstats, id) => { + if (!pstats.connected) { + // has the retention perious expired? + if (now > pstats.expire) { + // yes, throw it away (but clean up the IP tracking first) + this._removeIPs(id, pstats.ips) + this.peerStats.delete(id) + } + + // we don't decay retained scores, as the peer is not active. + // this way the peer cannot reset a negative score by simply disconnecting and reconnecting, + // unless the retention period has ellapsed. + // similarly, a well behaved peer does not lose its score by getting disconnected. + return + } + + Object.entries(pstats.topics).forEach(([topic, tstats]) => { + const tparams = this.params.topics[topic] + if (!tparams) { + // we are not scoring this topic + // should be unreachable, we only add scored topics to pstats + return + } + + // decay counters + tstats.firstMessageDeliveries *= tparams.firstMessageDeliveriesDecay + if (tstats.firstMessageDeliveries < decayToZero) { + tstats.firstMessageDeliveries = 0 + } + tstats.meshMessageDeliveries *= tparams.meshMessageDeliveriesDecay + if (tstats.meshMessageDeliveries < decayToZero) { + tstats.meshMessageDeliveries = 0 + } + tstats.meshFailurePenalty *= tparams.meshFailurePenaltyDecay + if (tstats.meshFailurePenalty < decayToZero) { + tstats.meshFailurePenalty = 0 + } + tstats.invalidMessageDeliveries *= tparams.invalidMessageDeliveriesDecay + if (tstats.invalidMessageDeliveries < decayToZero) { + tstats.invalidMessageDeliveries = 0 + } + // update mesh time and activate mesh message delivery parameter if need be + if (tstats.inMesh) { + tstats.meshTime = now - tstats.graftTime + if (tstats.meshTime > tparams.meshMessageDeliveriesActivation) { + tstats.meshMessageDeliveriesActive = true + } + } + }) + // decay P7 counter + pstats.behaviourPenalty *= this.params.behaviourPenaltyDecay + if (pstats.behaviourPenalty < decayToZero) { + pstats.behaviourPenalty = 0 + } + }) + } + + /** + * @param {PeerId} id + * @returns {Number} + */ + score (id: PeerId): number { + const pstats = this.peerStats.get(id) + if (!pstats) { + return 0 + } + return computeScore(id, pstats, this.params, this.peerIPs) + } + + /** + * @param {PeerId} id + * @param {Number} penalty + * @returns {void} + */ + addPenalty (id: PeerId, penalty: number): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + pstats.behaviourPenalty += penalty + } + + /** + * @param {PeerId} id + * @returns {void} + */ + addPeer (id: PeerId): void { + // create peer stats (not including topic stats for each topic to be scored) + // topic stats will be added as needed + const pstats = createPeerStats({ + connected: true + }) + this.peerStats.set(id, pstats) + + // get + update peer IPs + const ips = this._getIPs(id) + this._setIPs(id, ips, pstats.ips) + pstats.ips = ips + } + + /** + * @param {PeerId} id + * @returns {void} + */ + removePeer (id: PeerId): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + // decide whether to retain the score; this currently only retains non-positive scores + // to dissuade attacks on the score function. + if (this.score(id) > 0) { + this._removeIPs(id, pstats.ips) + this.peerStats.delete(id) + return + } + + // furthermore, when we decide to retain the score, the firstMessageDelivery counters are + // reset to 0 and mesh delivery penalties applied. + Object.entries(pstats.topics).forEach(([topic, tstats]) => { + tstats.firstMessageDeliveries = 0 + + const threshold = this.params.topics[topic].meshMessageDeliveriesThreshold + if (tstats.inMesh && tstats.meshMessageDeliveriesActive && tstats.meshMessageDeliveries < threshold) { + const deficit = threshold - tstats.meshMessageDeliveries + tstats.meshFailurePenalty += deficit * deficit + } + + tstats.inMesh = false + }) + + pstats.connected = false + pstats.expire = Date.now() + this.params.retainScore + } + + /** + * @param {PeerId} id + * @param {String} topic + * @returns {void} + */ + graft (id: PeerId, topic: string): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + const tstats = ensureTopicStats(topic, pstats, this.params) + if (!tstats) { + return + } + + tstats.inMesh = true + tstats.graftTime = Date.now() + tstats.meshTime = 0 + tstats.meshMessageDeliveriesActive = false + } + + /** + * @param {PeerId} id + * @param {String} topic + * @returns {void} + */ + prune (id: PeerId, topic: string): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + const tstats = ensureTopicStats(topic, pstats, this.params) + if (!tstats) { + return + } + + // sticky mesh delivery rate failure penalty + const threshold = this.params.topics[topic].meshMessageDeliveriesThreshold + if (tstats.meshMessageDeliveriesActive && tstats.meshMessageDeliveries < threshold) { + const deficit = threshold - tstats.meshMessageDeliveries + tstats.meshFailurePenalty += deficit * deficit + } + tstats.inMesh = false + } + + /** + * @param {PeerId} id + * @param {Message} message + * @returns {void} + */ + validateMessage (id: PeerId, message: Message): void { + this.deliveryRecords.ensureRecord(this.msgId(message)) + } + + /** + * @param {PeerId} id + * @param {Message} message + * @returns {void} + */ + deliverMessage (id: PeerId, message: Message): void { + this._markFirstMessageDelivery(id, message) + + const drec = this.deliveryRecords.ensureRecord(this.msgId(message)) + const now = Date.now() + + // defensive check that this is the first delivery trace -- delivery status should be unknown + if (drec.status !== DeliveryRecordStatus.unknown) { + log( + 'unexpected delivery: message from %s was first seen %s ago and has delivery status %d', + id.toB58String(), now - drec.firstSeen, DeliveryRecordStatus[drec.status] + ) + return + } + + // mark the message as valid and reward mesh peers that have already forwarded it to us + drec.status = DeliveryRecordStatus.valid + drec.validated = now + if (drec.peers.has(id)) { + // this check is to make sure a peer can't send us a message twice and get a double count + // if it is a first delivery. + this._markDuplicateMessageDelivery(id, message) + } + } + + /** + * @param {PeerId} id + * @param {Message} message + * @returns {void} + */ + rejectMessage (id: PeerId, message: Message): void { + const drec = this.deliveryRecords.ensureRecord(this.msgId(message)) + + // defensive check that this is the first rejection -- delivery status should be unknown + if (drec.status !== DeliveryRecordStatus.unknown) { + log( + 'unexpected rejection: message from %s was first seen %s ago and has delivery status %d', + id.toB58String(), Date.now() - drec.firstSeen, DeliveryRecordStatus[drec.status] + ) + return + } + + // mark the message as invalid and penalize peers that have already forwarded it. + drec.status = DeliveryRecordStatus.invalid + + this._markInvalidMessageDelivery(id, message) + drec.peers.forEach(p => { + this._markInvalidMessageDelivery(p, message) + }) + } + + /** + * @param {PeerId} id + * @param {Message} message + * @returns {void} + */ + ignoreMessage (id: PeerId, message: Message): void { + const drec = this.deliveryRecords.ensureRecord(this.msgId(message)) + + // defensive check that this is the first ignore -- delivery status should be unknown + if (drec.status !== DeliveryRecordStatus.unknown) { + log( + 'unexpected ignore: message from %s was first seen %s ago and has delivery status %d', + id.toB58String(), Date.now() - drec.firstSeen, DeliveryRecordStatus[drec.status] + ) + return + } + + // mark the message as invalid and penalize peers that have already forwarded it. + drec.status = DeliveryRecordStatus.ignored + } + + /** + * @param {PeerId} id + * @param {Message} message + * @returns {void} + */ + duplicateMessage (id: PeerId, message: Message): void { + const drec = this.deliveryRecords.ensureRecord(this.msgId(message)) + + if (drec.peers.has(id)) { + // we have already seen this duplicate + return + } + + switch (drec.status) { + case DeliveryRecordStatus.unknown: + // the message is being validated; track the peer delivery and wait for + // the Deliver/Reject/Ignore notification. + drec.peers.add(id) + break + case DeliveryRecordStatus.valid: + // mark the peer delivery time to only count a duplicate delivery once. + drec.peers.add(id) + this._markDuplicateMessageDelivery(id, message, drec.validated) + break + case DeliveryRecordStatus.invalid: + // we no longer track delivery time + this._markInvalidMessageDelivery(id, message) + break + } + } + + /** + * Increments the "invalid message deliveries" counter for all scored topics the message is published in. + * @param {PeerId} id + * @param {Message} message + * @returns {void} + */ + _markInvalidMessageDelivery (id: PeerId, message: Message): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + message.topicIDs.forEach(topic => { + const tstats = ensureTopicStats(topic, pstats, this.params) + if (!tstats) { + return + } + + tstats.invalidMessageDeliveries += 1 + }) + } + + /** + * Increments the "first message deliveries" counter for all scored topics the message is published in, + * as well as the "mesh message deliveries" counter, if the peer is in the mesh for the topic. + * @param {PeerId} id + * @param {Message} message + * @returns {void} + */ + _markFirstMessageDelivery (id: PeerId, message: Message): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + message.topicIDs.forEach(topic => { + const tstats = ensureTopicStats(topic, pstats, this.params) + if (!tstats) { + return + } + + let cap = this.params.topics[topic].firstMessageDeliveriesCap + tstats.firstMessageDeliveries += 1 + if (tstats.firstMessageDeliveries > cap) { + tstats.firstMessageDeliveries = cap + } + + if (!tstats.inMesh) { + return + } + + cap = this.params.topics[topic].meshMessageDeliveriesCap + tstats.meshMessageDeliveries += 1 + if (tstats.meshMessageDeliveries > cap) { + tstats.meshMessageDeliveries = cap + } + }) + } + + /** + * Increments the "mesh message deliveries" counter for messages we've seen before, + * as long the message was received within the P3 window. + * @param {PeerId} id + * @param {Message} message + * @param {number} validatedTime + * @returns {void} + */ + _markDuplicateMessageDelivery (id: PeerId, message: Message, validatedTime = 0): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + const now = validatedTime ? Date.now() : 0 + + message.topicIDs.forEach(topic => { + const tstats = ensureTopicStats(topic, pstats, this.params) + if (!tstats) { + return + } + + if (!tstats.inMesh) { + return + } + + const tparams = this.params.topics[topic] + + // check against the mesh delivery window -- if the validated time is passed as 0, then + // the message was received before we finished validation and thus falls within the mesh + // delivery window. + if (validatedTime && now > validatedTime + tparams.meshMessageDeliveriesWindow) { + return + } + + const cap = tparams.meshMessageDeliveriesCap + tstats.meshMessageDeliveries += 1 + if (tstats.meshMessageDeliveries > cap) { + tstats.meshMessageDeliveries = cap + } + }) + } + + /** + * Gets the current IPs for a peer. + * @param {PeerId} id + * @returns {Array} + */ + _getIPs (id: PeerId): string[] { + return this._addressBook.getMultiaddrsForPeer(id) + .map(ma => { + return ma.toOptions().host + }) + } + + /** + * Called as a callback to addressbook updates + * @param {PeerId} id + * @param {Array} multiaddrs + * @returns {void} + */ + _updateIPs = (id: PeerId, multiaddrs: Multiaddr[]): void => { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + this._setIPs(id, multiaddrs.map(ma => ma.toOptions().host), pstats.ips) + } + + /** + * Adds tracking for the new IPs in the list, and removes tracking from the obsolete IPs. + * @param {PeerId} id + * @param {Array} newIPs + * @param {Array} oldIPs + * @returns {void} + */ + _setIPs (id: PeerId, newIPs: string[], oldIPs: string[]): void { + // add the new IPs to the tracking + // eslint-disable-next-line no-labels + addNewIPs: + for (const ip of newIPs) { + // check if it is in the old ips list + for (const xip of oldIPs) { + if (ip === xip) { + // eslint-disable-next-line no-labels + continue addNewIPs + } + } + // no, it's a new one -- add it to the tracker + let peers = this.peerIPs.get(ip) + if (!peers) { + peers = new Set() + this.peerIPs.set(ip, peers) + } + peers.add(id) + } + // remove the obsolete old IPs from the tracking + // eslint-disable-next-line no-labels + removeOldIPs: + for (const ip of oldIPs) { + // check if it is in the new ips list + for (const xip of newIPs) { + if (ip === xip) { + // eslint-disable-next-line no-labels + continue removeOldIPs + } + } + // no, its obselete -- remove it from the tracker + const peers = this.peerIPs.get(ip) + if (!peers) { + continue + } + peers.delete(id) + if (!peers.size) { + this.peerIPs.delete(ip) + } + } + } + + /** + * Removes an IP list from the tracking list for a peer. + * @param {PeerId} id + * @param {Array} ips + * @returns {void} + */ + _removeIPs (id: PeerId, ips: string[]): void { + ips.forEach(ip => { + const peers = this.peerIPs.get(ip) + if (!peers) { + return + } + + peers.delete(id) + if (!peers.size) { + this.peerIPs.delete(ip) + } + }) + } +} diff --git a/ts/score/peerStats.ts b/ts/score/peerStats.ts new file mode 100644 index 00000000..532ecf4b --- /dev/null +++ b/ts/score/peerStats.ts @@ -0,0 +1,114 @@ +import { PeerScoreParams } from './scoreParams' + +export interface PeerStats { + /** + * true if the peer is currently connected + */ + connected: boolean + + /** + * expiration time of the score stats for disconnected peers + */ + expire: number + + /** + * per topic stats + */ + topics: Record + + /** + * IP tracking; store as string for easy processing + */ + ips: string[] + + /** + * behavioural pattern penalties (applied by the router) + */ + behaviourPenalty: number +} + +export interface TopicStats { + /** + * true if the peer is in the mesh + */ + inMesh: boolean + + /** + * time when the peer was (last) GRAFTed; valid only when in mesh + */ + graftTime: number + + /** + * time in mesh (updated during refresh/decay to avoid calling gettimeofday on + * every score invocation) + */ + meshTime: number + + /** + * first message deliveries + */ + firstMessageDeliveries: number + + /** + * mesh message deliveries + */ + meshMessageDeliveries: number + + /** + * true if the peer has been enough time in the mesh to activate mess message deliveries + */ + meshMessageDeliveriesActive: boolean + + /** + * sticky mesh rate failure penalty counter + */ + meshFailurePenalty: number + + /** + * invalid message counter + */ + invalidMessageDeliveries: number +} + +export function createPeerStats (ps: Partial = {}): PeerStats { + return { + connected: false, + expire: 0, + ips: [], + behaviourPenalty: 0, + ...ps, + topics: ps.topics + ? Object.entries(ps.topics) + .reduce((topics, [topic, topicStats]) => { + topics[topic] = createTopicStats(topicStats) + return topics + }, {} as Record) + : {} + } +} + +export function createTopicStats (ts: Partial = {}): TopicStats { + return { + inMesh: false, + graftTime: 0, + meshTime: 0, + firstMessageDeliveries: 0, + meshMessageDeliveries: 0, + meshMessageDeliveriesActive: false, + meshFailurePenalty: 0, + invalidMessageDeliveries: 0, + ...ts + } +} + +export function ensureTopicStats (topic: string, ps: PeerStats, params: PeerScoreParams): TopicStats | undefined { + let ts = ps.topics[topic] + if (ts) { + return ts + } + if (!params.topics[topic]) { + return undefined + } + ps.topics[topic] = ts = createTopicStats() + return ts +} diff --git a/ts/score/scoreParams.ts b/ts/score/scoreParams.ts new file mode 100644 index 00000000..4e7c4870 --- /dev/null +++ b/ts/score/scoreParams.ts @@ -0,0 +1,376 @@ +import PeerId = require('peer-id') + +export interface PeerScoreThresholds { + /** + * gossipThreshold is the score threshold below which gossip propagation is supressed; + * should be negative. + */ + gossipThreshold: number + + /** + * publishThreshold is the score threshold below which we shouldn't publish when using flood + * publishing (also applies to fanout and floodsub peers); should be negative and <= GossipThreshold. + */ + publishThreshold: number + + /** + * graylistThreshold is the score threshold below which message processing is supressed altogether, + * implementing an effective graylist according to peer score; should be negative and <= PublisThreshold. + */ + graylistThreshold: number + + /** + * acceptPXThreshold is the score threshold below which PX will be ignored; this should be positive + * and limited to scores attainable by bootstrappers and other trusted nodes. + */ + acceptPXThreshold: number + + /** + * opportunisticGraftThreshold is the median mesh score threshold before triggering opportunistic + * grafting; this should have a small positive value. + */ + opportunisticGraftThreshold: number +} + +export function createPeerScoreThresholds (p: Partial): PeerScoreThresholds { + return { + gossipThreshold: 0, + publishThreshold: 0, + graylistThreshold: 0, + acceptPXThreshold: 0, + opportunisticGraftThreshold: 0, + ...p + } +} + +export function validatePeerScoreThresholds (p: PeerScoreThresholds): void { + if (p.gossipThreshold > 0) { + throw new Error('invalid gossip threshold; it must be <= 0') + } + if (p.publishThreshold > 0 || p.publishThreshold > p.gossipThreshold) { + throw new Error('invalid publish threshold; it must be <= 0 and <= gossip threshold') + } + if (p.graylistThreshold > 0 || p.graylistThreshold > p.publishThreshold) { + throw new Error('invalid graylist threshold; it must be <= 0 and <= publish threshold') + } + if (p.acceptPXThreshold < 0) { + throw new Error('invalid accept PX threshold; it must be >= 0') + } + if (p.opportunisticGraftThreshold < 0) { + throw new Error('invalid opportunistic grafting threshold; it must be >= 0') + } +} + +export interface PeerScoreParams { + /** + * Score parameters per topic. + */ + topics: Record + + /** + * Aggregate topic score cap; this limits the total contribution of topics towards a positive + * score. It must be positive (or 0 for no cap). + */ + topicScoreCap: number + + /** + * P5: Application-specific peer scoring + */ + appSpecificScore: (p: PeerId) => number + appSpecificWeight: number + + /** + * P6: IP-colocation factor. + * The parameter has an associated counter which counts the number of peers with the same IP. + * If the number of peers in the same IP exceeds IPColocationFactorThreshold, then the value + * is the square of the difference, ie (PeersInSameIP - IPColocationThreshold)^2. + * If the number of peers in the same IP is less than the threshold, then the value is 0. + * The weight of the parameter MUST be negative, unless you want to disable for testing. + * Note: In order to simulate many IPs in a managable manner when testing, you can set the weight to 0 + * thus disabling the IP colocation penalty. + */ + IPColocationFactorWeight: number + IPColocationFactorThreshold: number + IPColocationFactorWhitelist: Set + + /** + * P7: behavioural pattern penalties. + * This parameter has an associated counter which tracks misbehaviour as detected by the + * router. The router currently applies penalties for the following behaviors: + * - attempting to re-graft before the prune backoff time has elapsed. + * - not following up in IWANT requests for messages advertised with IHAVE. + * + * The value of the parameter is the square of the counter, which decays with BehaviourPenaltyDecay. + * The weight of the parameter MUST be negative (or zero to disable). + */ + behaviourPenaltyWeight: number + behaviourPenaltyDecay: number + + /** + * the decay interval for parameter counters. + */ + decayInterval: number + + /** + * counter value below which it is considered 0. + */ + decayToZero: number + + /** + * time to remember counters for a disconnected peer. + */ + retainScore: number +} + +export interface TopicScoreParams { + /** + * The weight of the topic. + */ + topicWeight: number + + /** + * P1: time in the mesh + * This is the time the peer has ben grafted in the mesh. + * The value of of the parameter is the time/TimeInMeshQuantum, capped by TimeInMeshCap + * The weight of the parameter MUST be positive (or zero to disable). + */ + timeInMeshWeight: number + timeInMeshQuantum: number + timeInMeshCap: number + + /** + * P2: first message deliveries + * This is the number of message deliveries in the topic. + * The value of the parameter is a counter, decaying with FirstMessageDeliveriesDecay, and capped + * by FirstMessageDeliveriesCap. + * The weight of the parameter MUST be positive (or zero to disable). + */ + firstMessageDeliveriesWeight: number + firstMessageDeliveriesDecay: number + firstMessageDeliveriesCap: number + + /** + * P3: mesh message deliveries + * This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesWindow of + * message validation; deliveries during validation also count and are retroactively applied + * when validation succeeds. + * This window accounts for the minimum time before a hostile mesh peer trying to game the score + * could replay back a valid message we just sent them. + * It effectively tracks first and near-first deliveries, ie a message seen from a mesh peer + * before we have forwarded it to them. + * The parameter has an associated counter, decaying with MeshMessageDeliveriesDecay. + * If the counter exceeds the threshold, its value is 0. + * If the counter is below the MeshMessageDeliveriesThreshold, the value is the square of + * the deficit, ie (MessageDeliveriesThreshold - counter)^2 + * The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh. + * The weight of the parameter MUST be negative (or zero to disable). + */ + meshMessageDeliveriesWeight: number + meshMessageDeliveriesDecay: number + meshMessageDeliveriesCap: number + meshMessageDeliveriesThreshold: number + meshMessageDeliveriesWindow: number + meshMessageDeliveriesActivation: number + + /** + * P3b: sticky mesh propagation failures + * This is a sticky penalty that applies when a peer gets pruned from the mesh with an active + * mesh message delivery penalty. + * The weight of the parameter MUST be negative (or zero to disable) + */ + meshFailurePenaltyWeight: number + meshFailurePenaltyDecay: number + + /** + * P4: invalid messages + * This is the number of invalid messages in the topic. + * The value of the parameter is the square of the counter, decaying with + * InvalidMessageDeliveriesDecay. + * The weight of the parameter MUST be negative (or zero to disable). + */ + invalidMessageDeliveriesWeight: number + invalidMessageDeliveriesDecay: number +} + +export function createPeerScoreParams (p: Partial): PeerScoreParams { + return { + topicScoreCap: 0, + appSpecificScore: (): number => 0, + appSpecificWeight: 0, + IPColocationFactorWeight: 0, + IPColocationFactorThreshold: 0, + IPColocationFactorWhitelist: new Set(p.IPColocationFactorWhitelist), + behaviourPenaltyWeight: 0, + behaviourPenaltyDecay: 0, + decayInterval: 0, + decayToZero: 0, + retainScore: 0, + ...p, + topics: p.topics + ? Object.entries(p.topics) + .reduce((topics, [topic, topicScoreParams]) => { + topics[topic] = createTopicScoreParams(topicScoreParams) + return topics + }, {} as Record) + : {} + } +} + +export function createTopicScoreParams (p: Partial): TopicScoreParams { + return { + topicWeight: 0, + timeInMeshWeight: 0, + timeInMeshQuantum: 0, + timeInMeshCap: 0, + firstMessageDeliveriesWeight: 0, + firstMessageDeliveriesDecay: 0, + firstMessageDeliveriesCap: 0, + meshMessageDeliveriesWeight: 0, + meshMessageDeliveriesDecay: 0, + meshMessageDeliveriesCap: 0, + meshMessageDeliveriesThreshold: 0, + meshMessageDeliveriesWindow: 0, + meshMessageDeliveriesActivation: 0, + meshFailurePenaltyWeight: 0, + meshFailurePenaltyDecay: 0, + invalidMessageDeliveriesWeight: 0, + invalidMessageDeliveriesDecay: 0, + ...p + } +} + +// peer score parameter validation +export function validatePeerScoreParams (p: PeerScoreParams): void { + for (const [topic, params] of Object.entries(p.topics)) { + try { + validateTopicScoreParams(params) + } catch (e) { + throw new Error(`invalid score parameters for topic ${topic}: ${e.message}`) + } + } + + // check that the topic score is 0 or something positive + if (p.topicScoreCap < 0) { + throw new Error('invalid topic score cap; must be positive (or 0 for no cap)') + } + + // check that we have an app specific score; the weight can be anything (but expected positive) + if (p.appSpecificScore === null || p.appSpecificScore === undefined) { + throw new Error('missing application specific score function') + } + + // check the IP colocation factor + if (p.IPColocationFactorWeight > 0) { + throw new Error('invalid IPColocationFactorWeight; must be negative (or 0 to disable)') + } + if (p.IPColocationFactorWeight !== 0 && p.IPColocationFactorThreshold < 1) { + throw new Error('invalid IPColocationFactorThreshold; must be at least 1') + } + + // check the behaviour penalty + if (p.behaviourPenaltyWeight > 0) { + throw new Error('invalid BehaviourPenaltyWeight; must be negative (or 0 to disable)') + } + if (p.behaviourPenaltyWeight !== 0 && (p.behaviourPenaltyDecay <= 0 || p.behaviourPenaltyDecay >= 1)) { + throw new Error('invalid BehaviourPenaltyDecay; must be between 0 and 1') + } + + // check the decay parameters + if (p.decayInterval < 1000) { + throw new Error('invalid DecayInterval; must be at least 1s') + } + if (p.decayToZero <= 0 || p.decayToZero >= 1) { + throw new Error('invalid DecayToZero; must be between 0 and 1') + } + + // no need to check the score retention; a value of 0 means that we don't retain scores +} + +export function validateTopicScoreParams (p: TopicScoreParams): void { + // make sure we have a sane topic weight + if (p.topicWeight < 0) { + throw new Error('invalid topic weight; must be >= 0') + } + + // check P1 + if (p.timeInMeshQuantum === 0) { + throw new Error('invalid TimeInMeshQuantum; must be non zero') + } + if (p.timeInMeshWeight < 0) { + throw new Error('invalid TimeInMeshWeight; must be positive (or 0 to disable)') + } + if (p.timeInMeshWeight !== 0 && p.timeInMeshQuantum <= 0) { + throw new Error('invalid TimeInMeshQuantum; must be positive') + } + if (p.timeInMeshWeight !== 0 && p.timeInMeshCap <= 0) { + throw new Error('invalid TimeInMeshCap; must be positive') + } + + // check P2 + if (p.firstMessageDeliveriesWeight < 0) { + throw new Error('invallid FirstMessageDeliveriesWeight; must be positive (or 0 to disable)') + } + if (p.firstMessageDeliveriesWeight !== 0 && (p.firstMessageDeliveriesDecay <= 0 || p.firstMessageDeliveriesDecay >= 1)) { + throw new Error('invalid FirstMessageDeliveriesDecay; must be between 0 and 1') + } + if (p.firstMessageDeliveriesWeight !== 0 && p.firstMessageDeliveriesCap <= 0) { + throw new Error('invalid FirstMessageDeliveriesCap; must be positive') + } + + // check P3 + if (p.meshMessageDeliveriesWeight > 0) { + throw new Error('invalid MeshMessageDeliveriesWeight; must be negative (or 0 to disable)') + } + if (p.meshMessageDeliveriesWeight !== 0 && (p.meshMessageDeliveriesDecay <= 0 || p.meshMessageDeliveriesDecay >= 1)) { + throw new Error('invalid MeshMessageDeliveriesDecay; must be between 0 and 1') + } + if (p.meshMessageDeliveriesWeight !== 0 && p.meshMessageDeliveriesCap <= 0) { + throw new Error('invalid MeshMessageDeliveriesCap; must be positive') + } + if (p.meshMessageDeliveriesWeight !== 0 && p.meshMessageDeliveriesThreshold <= 0) { + throw new Error('invalid MeshMessageDeliveriesThreshold; must be positive') + } + if (p.meshMessageDeliveriesWindow < 0) { + throw new Error('invalid MeshMessageDeliveriesWindow; must be non-negative') + } + if (p.meshMessageDeliveriesWeight !== 0 && p.meshMessageDeliveriesActivation < 1000) { + throw new Error('invalid MeshMessageDeliveriesActivation; must be at least 1s') + } + + // check P3b + if (p.meshFailurePenaltyWeight > 0) { + throw new Error('invalid MeshFailurePenaltyWeight; must be negative (or 0 to disable)') + } + if (p.meshFailurePenaltyWeight !== 0 && (p.meshFailurePenaltyDecay <= 0 || p.meshFailurePenaltyDecay >= 1)) { + throw new Error('invalid MeshFailurePenaltyDecay; must be between 0 and 1') + } + + // check P4 + if (p.invalidMessageDeliveriesWeight > 0) { + throw new Error('invalid InvalidMessageDeliveriesWeight; must be negative (or 0 to disable)') + } + if (p.invalidMessageDeliveriesDecay <= 0 || p.invalidMessageDeliveriesDecay >= 1) { + throw new Error('invalid InvalidMessageDeliveriesDecay; must be between 0 and 1') + } +} + +const DefaultDecayInterval = 1000 +const DefaultDecayToZero = 0.01 + +/** + * ScoreParameterDecay computes the decay factor for a parameter, assuming the DecayInterval is 1s + * and that the value decays to zero if it drops below 0.01 + */ +export function scoreParameterDecay (decay: number): number { + return scoreParameterDecayWithBase(decay, DefaultDecayInterval, DefaultDecayToZero) +} + +/** + * ScoreParameterDecay computes the decay factor for a parameter using base as the DecayInterval + */ +export function scoreParameterDecayWithBase (decay: number, base: number, decayToZero: number): number { + // the decay is linear, so after n ticks the value is factor^n + // so factor^n = decayToZero => factor = decayToZero^(1/n) + const ticks = decay / base + return decayToZero ** (1 / ticks) +} From 9a340911f2be2fe10cd94667670439a6fe8a292f Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 4 Jun 2020 13:30:38 -0500 Subject: [PATCH 5/6] chore: increase test timeouts --- test/peerScore.spec.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/peerScore.spec.js b/test/peerScore.spec.js index 7b1d4373..1903be72 100644 --- a/test/peerScore.spec.js +++ b/test/peerScore.spec.js @@ -222,7 +222,7 @@ describe('PeerScore', () => { expect(aScore).to.be.equal(expected) }) it('should score mesh message deliveries', async function () { - this.timeout(5000) + this.timeout(10000) // Create parameters with reasonable default values const mytopic = 'mytopic' const params = createPeerScoreParams({ @@ -299,7 +299,7 @@ describe('PeerScore', () => { expect(cScore).to.be.equal(expected) }) it('should decay mesh message deliveries score', async function () { - this.timeout(5000) + this.timeout(10000) // Create parameters with reasonable default values const mytopic = 'mytopic' const params = createPeerScoreParams({ @@ -355,7 +355,7 @@ describe('PeerScore', () => { expect(aScore).to.be.equal(expected) }) it('should score mesh message failures', async function () { - this.timeout(5000) + this.timeout(10000) // Create parameters with reasonable default values const mytopic = 'mytopic' const params = createPeerScoreParams({ From 2303125579314722e0d4f97f7640469a9316d074 Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 4 Jun 2020 17:33:36 -0500 Subject: [PATCH 6/6] chore: tweak test timeouts --- test/peerScore.spec.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/peerScore.spec.js b/test/peerScore.spec.js index 1903be72..bdd98ee6 100644 --- a/test/peerScore.spec.js +++ b/test/peerScore.spec.js @@ -326,7 +326,7 @@ describe('PeerScore', () => { ps.graft(peerA, mytopic) // wait for the activation time to kick in - await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation)) + await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation + 10)) // deliver a bunch of messages from peer A const nMessages = 40 @@ -395,7 +395,7 @@ describe('PeerScore', () => { }) // wait for the activation time to kick in - await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation)) + await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation + 10)) // deliver a bunch of messages from peer A. peer B does nothing const nMessages = 100 @@ -538,7 +538,7 @@ describe('PeerScore', () => { // now clear the delivery record ps.deliveryRecords.queue.peekFront().expire = Date.now() - await new Promise(resolve => setTimeout(resolve, 1)) + await new Promise(resolve => setTimeout(resolve, 5)) ps.deliveryRecords.gc() // insert a new record in the message deliveries @@ -556,7 +556,7 @@ describe('PeerScore', () => { // now clear the delivery record again ps.deliveryRecords.queue.peekFront().expire = Date.now() - await new Promise(resolve => setTimeout(resolve, 1)) + await new Promise(resolve => setTimeout(resolve, 5)) ps.deliveryRecords.gc() // insert a new record in the message deliveries