diff --git a/package.json b/package.json index dc1b9b9d..b1a0fd6d 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ "dependencies": { "buffer": "^5.6.0", "debug": "^4.1.1", + "denque": "^1.4.1", "err-code": "^2.0.0", "it-length-prefixed": "^3.0.0", "it-pipe": "^1.0.1", diff --git a/test/peerScore.spec.js b/test/peerScore.spec.js new file mode 100644 index 00000000..20094a06 --- /dev/null +++ b/test/peerScore.spec.js @@ -0,0 +1,657 @@ +const { expect } = require('chai') +const PeerId = require('peer-id') +const { utils } = require('libp2p-pubsub') +const { PeerScore, createPeerScoreParams, createTopicScoreParams } = require('../src/score') + +const connectionManager = new Map() +connectionManager.getAll = () => ([]) + +const makeTestMessage = (i, topicIDs = []) => { + return { + seqno: Buffer.alloc(8, i), + data: Buffer.from([i]), + from: "test", + topicIDs + } +} + +describe('PeerScore', () => { + it('should score based on time in mesh', async () => { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + topicScoreCap: 1000 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 0.5, + timeInMeshWeight: 1, + timeInMeshQuantum: 1, + timeInMeshCap: 3600, + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + // Peer score should start at 0 + const ps = new PeerScore(params, connectionManager, utils.msgId) + ps.addPeer(peerA) + + let aScore = ps.score(peerA) + expect(aScore, 'expected score to start at zero').to.equal(0) + + // The time in mesh depends on how long the peer has been grafted + ps.graft(peerA, mytopic) + const elapsed = tparams.timeInMeshQuantum * 100 + await new Promise(resolve => setTimeout(resolve, elapsed)) + + ps._refreshScores() + aScore = ps.score(peerA) + expect(aScore).to.be.gte( + tparams.topicWeight * tparams.timeInMeshWeight / tparams.timeInMeshQuantum * elapsed + ) + }) + it('should cap time in mesh score', async () => { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 0.5, + timeInMeshWeight: 1, + timeInMeshQuantum: 1, + timeInMeshCap: 10, + invalidMessageDeliveriesDecay: 0.1, + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + // Peer score should start at 0 + const ps = new PeerScore(params, connectionManager, utils.msgId) + ps.addPeer(peerA) + + let aScore = ps.score(peerA) + expect(aScore, 'expected score to start at zero').to.equal(0) + + // The time in mesh depends on how long the peer has been grafted + ps.graft(peerA, mytopic) + const elapsed = tparams.timeInMeshQuantum * 40 + await new Promise(resolve => setTimeout(resolve, elapsed)) + + ps._refreshScores() + aScore = ps.score(peerA) + expect(aScore).to.be.gt( + tparams.topicWeight * tparams.timeInMeshWeight * tparams.timeInMeshCap * 0.5 + ) + expect(aScore).to.be.lt( + tparams.topicWeight * tparams.timeInMeshWeight * tparams.timeInMeshCap * 1.5 + ) + }) + it('should score first message deliveries', async () => { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + topicScoreCap: 1000 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.9, + firstMessageDeliveriesCap: 50000, + timeInMeshWeight: 0 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + // Peer score should start at 0 + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.graft(peerA, mytopic) + + // deliver a bunch of messages from peer A + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i, [mytopic]) + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + } + + ps._refreshScores() + const aScore = ps.score(peerA) + expect(aScore).to.be.equal( + tparams.topicWeight * tparams.firstMessageDeliveriesWeight * nMessages * tparams.firstMessageDeliveriesDecay + ) + }) + it('should cap first message deliveries score', async () => { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + topicScoreCap: 1000 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.9, + invalidMessageDeliveriesDecay: 0.9, + firstMessageDeliveriesCap: 50, + timeInMeshWeight: 0 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + // Peer score should start at 0 + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + + let aScore = ps.score(peerA) + expect(aScore, 'expected score to start at zero').to.equal(0) + + // The time in mesh depends on how long the peer has been grafted + ps.graft(peerA, mytopic) + + // deliver a bunch of messages from peer A + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i, [mytopic]) + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + } + + ps._refreshScores() + aScore = ps.score(peerA) + expect(aScore).to.be.equal( + tparams.topicWeight * tparams.firstMessageDeliveriesWeight * tparams.firstMessageDeliveriesCap * tparams.firstMessageDeliveriesDecay + ) + }) + it('should decay first message deliveries score', async () => { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + topicScoreCap: 1000 + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.9, // decay 10% per decay interval + invalidMessageDeliveriesDecay: 0.9, + firstMessageDeliveriesCap: 50, + timeInMeshWeight: 0 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + // Peer score should start at 0 + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + + let aScore = ps.score(peerA) + expect(aScore, 'expected score to start at zero').to.equal(0) + + // The time in mesh depends on how long the peer has been grafted + ps.graft(peerA, mytopic) + + // deliver a bunch of messages from peer A + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i, [mytopic]) + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + } + + ps._refreshScores() + aScore = ps.score(peerA) + let expected = tparams.topicWeight * tparams.firstMessageDeliveriesWeight * tparams.firstMessageDeliveriesCap * tparams.firstMessageDeliveriesDecay + expect(aScore).to.be.equal(expected) + + // refreshing the scores applies the decay param + const decayInterals = 10 + for (let i = 0; i < decayInterals; i++) { + ps._refreshScores() + expected *= tparams.firstMessageDeliveriesDecay + } + aScore = ps.score(peerA) + expect(aScore).to.be.equal(expected) + }) + it('should score mesh message deliveries', async function () { + this.timeout(10000) + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesActivation: 1000, + meshMessageDeliveriesWindow: 10, + meshMessageDeliveriesThreshold: 20, + meshMessageDeliveriesCap: 100, + meshMessageDeliveriesDecay: 0.9, + invalidMessageDeliveriesDecay: 0.9, + firstMessageDeliveriesWeight: 0, + timeInMeshWeight: 0 + }) + // peer A always delivers the message first + // peer B delivers next (within the delivery window) + // peer C delivers outside the delivery window + // we expect peers A and B to have a score of zero, since all other param weights are zero + // peer C should have a negative score + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const peerB = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const peerC = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const peers = [peerA, peerB, peerC] + // Peer score should start at 0 + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + peers.forEach(p => { + ps.addPeer(p) + ps.graft(p, mytopic) + }) + + // assert that nobody has been penalized yet for not delivering messages before activation time + ps._refreshScores() + peers.forEach(p => { + const score = ps.score(p) + expect( + score, + 'expected no mesh delivery penalty before activation time' + ).to.equal(0) + }) + // wait for the activation time to kick in + await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation)) + + // deliver a bunch of messages from peers + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i, [mytopic]) + + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + + ps.duplicateMessage(peerB, msg) + + // deliver duplicate from peer C after the window + await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesWindow + 5)) + ps.duplicateMessage(peerC, msg) + } + ps._refreshScores() + const aScore = ps.score(peerA) + const bScore = ps.score(peerB) + const cScore = ps.score(peerC) + expect(aScore).to.be.gte(0) + expect(bScore).to.be.gte(0) + + // the penalty is the difference between the threshold and the actual mesh deliveries, squared. + // since we didn't deliver anything, this is just the value of the threshold + const penalty = tparams.meshMessageDeliveriesThreshold * tparams.meshMessageDeliveriesThreshold + const expected = tparams.topicWeight * tparams.meshMessageDeliveriesWeight * penalty + expect(cScore).to.be.equal(expected) + }) + it('should decay mesh message deliveries score', async function () { + this.timeout(10000) + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesActivation: 1000, + meshMessageDeliveriesWindow: 10, + meshMessageDeliveriesThreshold: 20, + meshMessageDeliveriesCap: 100, + meshMessageDeliveriesDecay: 0.9, + invalidMessageDeliveriesDecay: 0.9, + firstMessageDeliveriesWeight: 0, + timeInMeshWeight: 0 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + // Peer score should start at 0 + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.graft(peerA, mytopic) + + // wait for the activation time to kick in + await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation + 10)) + + // deliver a bunch of messages from peer A + const nMessages = 40 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i, [mytopic]) + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + } + ps._refreshScores() + let aScore = ps.score(peerA) + expect(aScore).to.be.gte(0) + + // we need to refresh enough times for the decay to bring us below the threshold + let decayedDeliveryCount = nMessages * tparams.meshMessageDeliveriesDecay + for (let i = 0; i < 20; i++) { + ps._refreshScores() + decayedDeliveryCount *= tparams.meshMessageDeliveriesDecay + } + aScore = ps.score(peerA) + // the penalty is the difference between the threshold and the (decayed) mesh deliveries, squared. + const deficit = tparams.meshMessageDeliveriesThreshold - decayedDeliveryCount + const penalty = deficit * deficit + const expected = tparams.topicWeight * tparams.meshMessageDeliveriesWeight * penalty + expect(aScore).to.be.equal(expected) + }) + it('should score mesh message failures', async function () { + this.timeout(10000) + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + }) + // the mesh failure penalty is applied when a peer is pruned while their + // mesh deliveries are under the threshold. + // for this test, we set the mesh delivery threshold, but set + // meshMessageDeliveriesWeight to zero, so the only affect on the score + // is from the mesh failure penalty + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: 0.9, + + meshMessageDeliveriesWeight: 0, + meshMessageDeliveriesActivation: 1000, + meshMessageDeliveriesWindow: 10, + meshMessageDeliveriesThreshold: 20, + meshMessageDeliveriesCap: 100, + meshMessageDeliveriesDecay: 0.9, + + firstMessageDeliveriesWeight: 0, + timeInMeshWeight: 0 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const peerB = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const peers = [peerA, peerB] + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + + peers.forEach(p => { + ps.addPeer(p) + ps.graft(p, mytopic) + }) + + // wait for the activation time to kick in + await new Promise(resolve => setTimeout(resolve, tparams.meshMessageDeliveriesActivation + 10)) + + // deliver a bunch of messages from peer A. peer B does nothing + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i, [mytopic]) + ps.validateMessage(peerA, msg) + ps.deliverMessage(peerA, msg) + } + // peers A and B should both have zero scores, since the failure penalty hasn't been applied yet + ps._refreshScores() + let aScore = ps.score(peerA) + let bScore = ps.score(peerB) + expect(aScore).to.be.equal(0) + expect(bScore).to.be.equal(0) + + // prune peer B to apply the penalty + ps.prune(peerB, mytopic) + ps._refreshScores() + aScore = ps.score(peerA) + bScore = ps.score(peerB) + expect(aScore).to.be.equal(0) + + // penalty calculation is the same as for meshMessageDeliveries, but multiplied by meshFailurePenaltyWeight + // instead of meshMessageDeliveriesWeight + const penalty = tparams.meshMessageDeliveriesThreshold * tparams.meshMessageDeliveriesThreshold + const expected = tparams.topicWeight * tparams.meshFailurePenaltyWeight * penalty * tparams.meshFailurePenaltyDecay + expect(bScore).to.be.equal(expected) + }) + it('should score invalid message deliveries', async function () { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.9, + timeInMeshWeight: 0 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.graft(peerA, mytopic) + + // deliver a bunch of messages from peer A + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i, [mytopic]) + ps.rejectMessage(peerA, msg) + } + ps._refreshScores() + let aScore = ps.score(peerA) + + const expected = tparams.topicWeight * tparams.invalidMessageDeliveriesWeight * (nMessages * tparams.invalidMessageDeliveriesDecay) ** 2 + expect(aScore).to.be.equal(expected) + }) + it('should decay invalid message deliveries score', async function () { + // Create parameters with reasonable default values + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.9, + timeInMeshWeight: 0 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.graft(peerA, mytopic) + + // deliver a bunch of messages from peer A + const nMessages = 100 + for (let i = 0; i < nMessages; i++) { + const msg = makeTestMessage(i, [mytopic]) + ps.rejectMessage(peerA, msg) + } + ps._refreshScores() + let aScore = ps.score(peerA) + + let expected = tparams.topicWeight * tparams.invalidMessageDeliveriesWeight * (nMessages * tparams.invalidMessageDeliveriesDecay) ** 2 + expect(aScore).to.be.equal(expected) + + // refresh scores a few times to apply decay + for (let i = 0; i < 10; i++) { + ps._refreshScores() + expected *= tparams.invalidMessageDeliveriesDecay ** 2 + } + aScore = ps.score(peerA) + expect(aScore).to.be.equal(expected) + }) + it('should score invalid/ignored messages', async function () { + // this test adds coverage for the dark corners of message rejection + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + }) + const tparams = params.topics[mytopic] = createTopicScoreParams({ + topicWeight: 1, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.9, + timeInMeshQuantum: 1000 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const peerB = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.addPeer(peerB) + + const msg = makeTestMessage(0, [mytopic]) + + // insert a record + ps.validateMessage(peerA, msg) + + // this should have no effect in the score, and subsequent duplicate messages should have no effect either + ps.ignoreMessage(peerA, msg) + ps.duplicateMessage(peerB, msg) + + let aScore = ps.score(peerA) + let bScore = ps.score(peerB) + let expected = 0 + expect(aScore).to.equal(expected) + expect(bScore).to.equal(expected) + + // now clear the delivery record + ps.deliveryRecords.queue.peekFront().expire = Date.now() + await new Promise(resolve => setTimeout(resolve, 5)) + ps.deliveryRecords.gc() + + // insert a new record in the message deliveries + ps.validateMessage(peerA, msg) + + // and reject the message to make sure duplicates are also penalized + ps.rejectMessage(peerA, msg) + ps.duplicateMessage(peerB, msg) + + aScore = ps.score(peerA) + bScore = ps.score(peerB) + expected = -1 + expect(aScore).to.equal(expected) + expect(bScore).to.equal(expected) + + // now clear the delivery record again + ps.deliveryRecords.queue.peekFront().expire = Date.now() + await new Promise(resolve => setTimeout(resolve, 5)) + ps.deliveryRecords.gc() + + // insert a new record in the message deliveries + ps.validateMessage(peerA, msg) + + // and reject the message after a duplicate has arrived + ps.duplicateMessage(peerB, msg) + ps.rejectMessage(peerA, msg) + + aScore = ps.score(peerA) + bScore = ps.score(peerB) + expected = -4 + expect(aScore).to.equal(expected) + expect(bScore).to.equal(expected) + }) + it('should score w/ application score', async function () { + const mytopic = 'mytopic' + let appScoreValue = 0 + const params = createPeerScoreParams({ + appSpecificScore: () => appScoreValue, + appSpecificWeight: 0.5 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.graft(peerA, mytopic) + + for (let i = -100; i < 100; i++) { + appScoreValue = i + ps._refreshScores() + const aScore = ps.score(peerA) + const expected = i * params.appSpecificWeight + expect(aScore).to.equal(expected) + } + }) + it('should score w/ IP colocation', async function () { + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + IPColocationFactorThreshold: 1, + IPColocationFactorWeight: -1 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const peerB = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const peerC = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const peerD = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + const peers = [peerA, peerB, peerC, peerD] + + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + peers.forEach(p => { + ps.addPeer(p) + ps.graft(p, mytopic) + }) + + const setIPsForPeer = (p, ips) => { + ps._setIPs(p, ips, []) + const pstats = ps.peerStats.get(p) + pstats.ips = ips + } + // peerA should have no penalty, but B, C, and D should be penalized for sharing an IP + setIPsForPeer(peerA, ['1.2.3.4']) + setIPsForPeer(peerB, ['2.3.4.5']) + setIPsForPeer(peerC, ['2.3.4.5', '3.4.5.6']) + setIPsForPeer(peerD, ['2.3.4.5']) + + ps._refreshScores() + const aScore = ps.score(peerA) + const bScore = ps.score(peerB) + const cScore = ps.score(peerC) + const dScore = ps.score(peerD) + + expect(aScore).to.equal(0) + + const nShared = 3 + const ipSurplus = nShared - params.IPColocationFactorThreshold + const penalty = ipSurplus ** 2 + const expected = params.IPColocationFactorWeight * penalty + expect(bScore).to.equal(expected) + expect(cScore).to.equal(expected) + expect(dScore).to.equal(expected) + }) + it('should score w/ behavior penalty', async function () { + const params = createPeerScoreParams({ + behaviourPenaltyWeight: -1, + behaviourPenaltyDecay: 0.99 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + + // add penalty on a non-existent peer + ps.addPenalty(peerA, 1) + let aScore = ps.score(peerA) + expect(aScore).to.equal(0) + + // add the peer and test penalties + ps.addPeer(peerA) + + aScore = ps.score(peerA) + expect(aScore).to.equal(0) + + ps.addPenalty(peerA, 1) + aScore = ps.score(peerA) + expect(aScore).to.equal(-1) + + ps.addPenalty(peerA, 1) + aScore = ps.score(peerA) + expect(aScore).to.equal(-4) + + ps._refreshScores() + + aScore = ps.score(peerA) + expect(aScore).to.equal(-3.9204) + }) + it('should handle score retention', async function () { + const mytopic = 'mytopic' + const params = createPeerScoreParams({ + appSpecificScore: () => -1000, + appSpecificWeight: 1, + retainScore: 800 + }) + const peerA = (await PeerId.create({keyType: 'secp256k1'})).toB58String() + + const ps = new PeerScore(params, connectionManager, (msg) => utils.msgId(msg.from, msg.seqno)) + ps.addPeer(peerA) + ps.graft(peerA, mytopic) + + // score should equal -1000 (app-specific score) + const expected = -1000 + ps._refreshScores() + let aScore = ps.score(peerA) + expect(aScore).to.equal(expected) + + // disconnect & wait half of the retainScoreTime + // should still have negative score + ps.removePeer(peerA) + const delay = params.retainScore / 2 + await new Promise(resolve => setTimeout(resolve, delay)) + ps._refreshScores() + aScore = ps.score(peerA) + expect(aScore).to.equal(expected) + + // wait remaining time (plus a little slop) and the score should reset to 0 + await new Promise(resolve => setTimeout(resolve, delay + 5)) + ps._refreshScores() + aScore = ps.score(peerA) + expect(aScore).to.equal(0) + }) +}) diff --git a/test/peerScoreParams.spec.js b/test/peerScoreParams.spec.js new file mode 100644 index 00000000..b4ef1cbb --- /dev/null +++ b/test/peerScoreParams.spec.js @@ -0,0 +1,370 @@ +const { expect } = require('chai') +const { + createTopicScoreParams, validateTopicScoreParams, + createPeerScoreParams, validatePeerScoreParams +} = require('../src/score') + +describe('TopicScoreParams validation', () => { + it('should throw on invalid TopicScoreParams', () => { + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + topicWeight: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshWeight: -1, + timeInMeshQuantum: 1000 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshWeight: 1, + timeInMeshQuantum: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshWeight: 1, + timeInMeshQuantum: 1000, + timeInMeshCap: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + firstMessageDeliveriesWeight: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 2 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: 1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 2 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesDecay: 5, + meshMessageDeliveriesThreshold: -3 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesDecay: 5, + meshMessageDeliveriesThreshold: 3, + meshMessageDeliveriesWindow: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesDecay: 5, + meshMessageDeliveriesThreshold: 3, + meshMessageDeliveriesWindow: 1, + meshMessageDeliveriesActivation: 1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshFailurePenaltyWeight: 1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: 2 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + invalidMessageDeliveriesWeight: 1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: -1 + }) + )).to.throw + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + timeInMeshQuantum: 1000, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 2 + }) + )).to.throw + }) + it('should not throw on valid TopicScoreParams', () => { + expect(() => validateTopicScoreParams( + createTopicScoreParams({ + topicWeight: 2, + timeInMeshWeight: 0.01, + timeInMeshQuantum: 1000, + timeInMeshCap: 10, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 10, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: 10, + meshMessageDeliveriesThreshold: 5, + meshMessageDeliveriesWindow: 1, + meshMessageDeliveriesActivation: 1000, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: 0.5, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.5 + }) + )).to.not.throw + }) +}) + +describe('PeerScoreParams validation', () => { + const appScore = () => 0 + + it('should throw on invalid PeerScoreParams', () => { + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: -1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + decayInterval: 1000, + decayToZero: 0.01 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: 1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: -1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: -1, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 2, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + behaviourPenaltyWeight: 1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + behaviourPenaltyWeight: -1 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + behaviourPenaltyWeight: -1, + behaviourPenaltyDecay: 2 + }) + )).to.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1, + topics: { + test: { + topicWeight: -1, + timeInMeshWeight: 0.01, + timeInMeshQuantum: time.Second, + timeInMeshCap: 10, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 10, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: 10, + meshMessageDeliveriesThreshold: 5, + meshMessageDeliveriesWindow: 1, + meshMessageDeliveriesActivation: 1000, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: 0.5, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.5 + } + } + }) + )).to.throw + }) + it('should not throw on valid PeerScoreParams', () => { + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1, + behaviourPenaltyWeight: -1, + behaviourPenaltyDecay: 0.999 + }) + )).to.not.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: 1000, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1, + behaviourPenaltyWeight: -1, + behaviourPenaltyDecay: 0.999, + }) + )).to.not.throw + expect(() => validatePeerScoreParams( + createPeerScoreParams({ + topicScoreCap: 1, + appSpecificScore: appScore, + decayInterval: time.Second, + decayToZero: 0.01, + IPColocationFactorWeight: -1, + IPColocationFactorThreshold: 1, + topics: { + test: { + topicWeight: 1, + timeInMeshWeight: 0.01, + timeInMeshQuantum: 1000, + timeInMeshCap: 10, + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 10, + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: 10, + meshMessageDeliveriesThreshold: 5, + meshMessageDeliveriesWindow: 1, + meshMessageDeliveriesActivation: 1000, + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: 0.5, + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.5, + }, + }, + }) + )).to.not.throw + }) +}) diff --git a/test/peerScoreThresholds.spec.js b/test/peerScoreThresholds.spec.js new file mode 100644 index 00000000..9559cbb7 --- /dev/null +++ b/test/peerScoreThresholds.spec.js @@ -0,0 +1,53 @@ +const { expect } = require('chai') +const { + createPeerScoreThresholds, validatePeerScoreThresholds, +} = require('../src/score') + +describe('PeerScoreThresholds validation', () => { + it('should throw on invalid PeerScoreThresholds', () => { + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + gossipThreshold: 1 + }) + )).to.throw + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + publishThreshold: 1 + }) + )).to.throw + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + gossipThreshold: -1, + publishThreshold: 0 + }) + )).to.throw + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + gossipThreshold: -1, + publishThreshold: -2 + }) + )).to.throw + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + acceptPXThreshold: -1 + }) + )).to.throw + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + opportunisticGraftThreshold: -1 + }) + )).to.throw + }) + it('should not throw on valid PeerScoreThresholds', () => { + expect(() => validatePeerScoreThresholds( + createPeerScoreThresholds({ + gossipThreshold: -1, + publishThreshold: -2, + graylistThreshold: -3, + acceptPXThreshold: 1, + opportunisticGraftThreshold: 2 + }) + )).to.not.throw + }) +}) + diff --git a/ts/score/computeScore.ts b/ts/score/computeScore.ts new file mode 100644 index 00000000..5a20c6a9 --- /dev/null +++ b/ts/score/computeScore.ts @@ -0,0 +1,91 @@ +import { PeerStats } from './peerStats' +import { PeerScoreParams } from './peerScoreParams' + +export function computeScore ( + peer: string, + pstats: PeerStats, + params: PeerScoreParams, + peerIPs: Map> +): number { + let score = 0 + + // topic stores + Object.entries(pstats.topics).forEach(([topic, tstats]) => { + // the topic parameters + const topicParams = params.topics[topic] + if (!topicParams) { + // we are not scoring this topic + return + } + + let topicScore = 0 + + // P1: time in Mesh + if (tstats.inMesh) { + let p1 = tstats.meshTime / topicParams.timeInMeshQuantum + if (p1 > topicParams.timeInMeshCap) { + p1 = topicParams.timeInMeshCap + } + topicScore += p1 * topicParams.timeInMeshWeight + } + + // P2: first message deliveries + const p2 = tstats.firstMessageDeliveries + topicScore += p2 * topicParams.firstMessageDeliveriesWeight + + // P3: mesh message deliveries + if (tstats.meshMessageDeliveriesActive) { + if (tstats.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold) { + const deficit = topicParams.meshMessageDeliveriesThreshold - tstats.meshMessageDeliveries + const p3 = deficit * deficit + topicScore += p3 * topicParams.meshMessageDeliveriesWeight + } + } + + // P3b: + // NOTE: the weight of P3b is negative (validated in validateTopicScoreParams) so this detracts + const p3b = tstats.meshFailurePenalty + topicScore += p3b * topicParams.meshFailurePenaltyWeight + + // P4: invalid messages + // NOTE: the weight of P4 is negative (validated in validateTopicScoreParams) so this detracts + const p4 = tstats.invalidMessageDeliveries * tstats.invalidMessageDeliveries + topicScore += p4 * topicParams.invalidMessageDeliveriesWeight + + // update score, mixing with topic weight + score += topicScore * topicParams.topicWeight + }) + + // apply the topic score cap, if any + if (params.topicScoreCap > 0 && score > params.topicScoreCap) { + score = params.topicScoreCap + } + + // P5: application-specific score + const p5 = params.appSpecificScore(peer) + score += p5 * params.appSpecificWeight + + // P6: IP colocation factor + pstats.ips.forEach(ip => { + if (params.IPColocationFactorWhitelist.has(ip)) { + return + } + + // P6 has a cliff (IPColocationFactorThreshold) + // It's only applied if at least that many peers are connected to us from that source IP addr. + // It is quadratic, and the weight is negative (validated in validatePeerScoreParams) + const peersInIP = peerIPs.get(ip) + const numPeersInIP = peersInIP ? peersInIP.size : 0 + if (numPeersInIP > params.IPColocationFactorThreshold) { + const surplus = numPeersInIP - params.IPColocationFactorThreshold + const p6 = surplus * surplus + score += p6 * params.IPColocationFactorWeight + } + }) + + // P7: behavioural pattern penalty + const p7 = pstats.behaviourPenalty * pstats.behaviourPenalty + score += p7 * params.behaviourPenaltyWeight + + return score +} diff --git a/ts/score/constants.ts b/ts/score/constants.ts new file mode 100644 index 00000000..66474288 --- /dev/null +++ b/ts/score/constants.ts @@ -0,0 +1,2 @@ +export const ERR_INVALID_PEER_SCORE_PARAMS = 'ERR_INVALID_PEER_SCORE_PARAMS' +export const ERR_INVALID_PEER_SCORE_THRESHOLDS = 'ERR_INVALID_PEER_SCORE_THRESHOLDS' diff --git a/ts/score/index.ts b/ts/score/index.ts new file mode 100644 index 00000000..7a4209aa --- /dev/null +++ b/ts/score/index.ts @@ -0,0 +1,3 @@ +export * from './peerScoreParams' +export * from './peerScoreThresholds' +export * from './peerScore' diff --git a/ts/score/messageDeliveries.ts b/ts/score/messageDeliveries.ts new file mode 100644 index 00000000..7aa591f6 --- /dev/null +++ b/ts/score/messageDeliveries.ts @@ -0,0 +1,91 @@ +import { TimeCacheDuration } from '../constants' +import Denque from 'denque' + +export enum DeliveryRecordStatus { + /** + * we don't know (yet) if the message is valid + */ + unknown, + /** + * we know the message is valid + */ + valid, + /** + * we know the message is invalid + */ + invalid, + /** + * we were instructed by the validator to ignore the message + */ + ignored +} + +export interface DeliveryRecord { + status: DeliveryRecordStatus + firstSeen: number + validated: number + peers: Set +} + +interface DeliveryQueueEntry { + msgId: string + expire: number +} + +/** + * Map of message ID to DeliveryRecord + * + * Maintains an internal queue for efficient gc of old messages + */ +export class MessageDeliveries { + private records: Map + private queue: Denque + + constructor () { + this.records = new Map() + this.queue = new Denque() + } + + ensureRecord (msgId: string): DeliveryRecord { + let drec = this.records.get(msgId) + if (drec) { + return drec + } + + // record doesn't exist yet + // create record + drec = { + status: DeliveryRecordStatus.unknown, + firstSeen: Date.now(), + validated: 0, + peers: new Set() + } + this.records.set(msgId, drec) + + // and add msgId to the queue + const entry: DeliveryQueueEntry = { + msgId, + expire: Date.now() + TimeCacheDuration + } + this.queue.push(entry) + + return drec + } + + gc (): void { + const now = Date.now() + // queue is sorted by expiry time + // remove expired messages, remove from queue until first un-expired message found + let head = this.queue.peekFront() + while (head && head.expire < now) { + this.records.delete(head.msgId) + this.queue.shift() + head = this.queue.peekFront() + } + } + + clear (): void { + this.records.clear() + this.queue.clear() + } +} diff --git a/ts/score/peerScore.ts b/ts/score/peerScore.ts new file mode 100644 index 00000000..a1f9f8d2 --- /dev/null +++ b/ts/score/peerScore.ts @@ -0,0 +1,606 @@ +import { Message } from '../message' +import { PeerScoreParams, validatePeerScoreParams } from './peerScoreParams' +import { PeerStats, createPeerStats, ensureTopicStats } from './peerStats' +import { computeScore } from './computeScore' +import { MessageDeliveries, DeliveryRecordStatus } from './messageDeliveries' +import Multiaddr = require('multiaddr') +import PeerId = require('peer-id') +// eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore +import debug = require('debug') + +const log = debug('libp2p:gossipsub:score') + +interface Connection { + remoteAddr: Multiaddr + remotePeer: PeerId +} + +interface ConnectionManager { + getAll(id: string): Connection[] + // eslint-disable-next-line @typescript-eslint/ban-types + on(evt: string, fn: Function): void + // eslint-disable-next-line @typescript-eslint/ban-types + off(evt: string, fn: Function): void +} + +export class PeerScore { + /** + * The score parameters + */ + params: PeerScoreParams + /** + * Per-peer stats for score calculation + */ + peerStats: Map + /** + * IP colocation tracking; maps IP => set of peers. + */ + peerIPs: Map> + /** + * Recent message delivery timing/participants + */ + deliveryRecords: MessageDeliveries + /** + * Message ID function + */ + msgId: (message: Message) => string + _connectionManager: ConnectionManager + _backgroundInterval: NodeJS.Timeout + + constructor (params: PeerScoreParams, connectionManager: ConnectionManager, msgId: (message: Message) => string) { + validatePeerScoreParams(params) + this.params = params + this._connectionManager = connectionManager + this.peerStats = new Map() + this.peerIPs = new Map() + this.deliveryRecords = new MessageDeliveries() + this.msgId = msgId + } + + /** + * Start PeerScore instance + * @returns {void} + */ + start (): void { + if (this._backgroundInterval) { + log('Peer score already running') + return + } + this._backgroundInterval = setInterval(() => this.background(), this.params.decayInterval) + log('started') + } + + /** + * Stop PeerScore instance + * @returns {void} + */ + stop (): void { + if (!this._backgroundInterval) { + log('Peer score already stopped') + return + } + clearInterval(this._backgroundInterval) + delete this._backgroundInterval + this.peerIPs.clear() + this.peerStats.clear() + this.deliveryRecords.clear() + log('stopped') + } + + /** + * Periodic maintenance + * @returns {void} + */ + background (): void { + this._refreshScores() + this._updateIPs() + this.deliveryRecords.gc() + } + + /** + * Decays scores, and purges score records for disconnected peers once their expiry has elapsed. + * @returns {void} + */ + _refreshScores (): void { + const now = Date.now() + const decayToZero = this.params.decayToZero + + this.peerStats.forEach((pstats, id) => { + if (!pstats.connected) { + // has the retention perious expired? + if (now > pstats.expire) { + // yes, throw it away (but clean up the IP tracking first) + this._removeIPs(id, pstats.ips) + this.peerStats.delete(id) + } + + // we don't decay retained scores, as the peer is not active. + // this way the peer cannot reset a negative score by simply disconnecting and reconnecting, + // unless the retention period has ellapsed. + // similarly, a well behaved peer does not lose its score by getting disconnected. + return + } + + Object.entries(pstats.topics).forEach(([topic, tstats]) => { + const tparams = this.params.topics[topic] + if (!tparams) { + // we are not scoring this topic + // should be unreachable, we only add scored topics to pstats + return + } + + // decay counters + tstats.firstMessageDeliveries *= tparams.firstMessageDeliveriesDecay + if (tstats.firstMessageDeliveries < decayToZero) { + tstats.firstMessageDeliveries = 0 + } + tstats.meshMessageDeliveries *= tparams.meshMessageDeliveriesDecay + if (tstats.meshMessageDeliveries < decayToZero) { + tstats.meshMessageDeliveries = 0 + } + tstats.meshFailurePenalty *= tparams.meshFailurePenaltyDecay + if (tstats.meshFailurePenalty < decayToZero) { + tstats.meshFailurePenalty = 0 + } + tstats.invalidMessageDeliveries *= tparams.invalidMessageDeliveriesDecay + if (tstats.invalidMessageDeliveries < decayToZero) { + tstats.invalidMessageDeliveries = 0 + } + // update mesh time and activate mesh message delivery parameter if need be + if (tstats.inMesh) { + tstats.meshTime = now - tstats.graftTime + if (tstats.meshTime > tparams.meshMessageDeliveriesActivation) { + tstats.meshMessageDeliveriesActive = true + } + } + }) + // decay P7 counter + pstats.behaviourPenalty *= this.params.behaviourPenaltyDecay + if (pstats.behaviourPenalty < decayToZero) { + pstats.behaviourPenalty = 0 + } + }) + } + + /** + * Return the score for a peer + * @param {string} id + * @returns {Number} + */ + score (id: string): number { + const pstats = this.peerStats.get(id) + if (!pstats) { + return 0 + } + return computeScore(id, pstats, this.params, this.peerIPs) + } + + /** + * Apply a behavioural penalty to a peer + * @param {string} id + * @param {Number} penalty + * @returns {void} + */ + addPenalty (id: string, penalty: number): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + pstats.behaviourPenalty += penalty + } + + /** + * @param {string} id + * @returns {void} + */ + addPeer (id: string): void { + // create peer stats (not including topic stats for each topic to be scored) + // topic stats will be added as needed + const pstats = createPeerStats({ + connected: true + }) + this.peerStats.set(id, pstats) + + // get + update peer IPs + const ips = this._getIPs(id) + this._setIPs(id, ips, pstats.ips) + pstats.ips = ips + } + + /** + * @param {string} id + * @returns {void} + */ + removePeer (id: string): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + // decide whether to retain the score; this currently only retains non-positive scores + // to dissuade attacks on the score function. + if (this.score(id) > 0) { + this._removeIPs(id, pstats.ips) + this.peerStats.delete(id) + return + } + + // furthermore, when we decide to retain the score, the firstMessageDelivery counters are + // reset to 0 and mesh delivery penalties applied. + Object.entries(pstats.topics).forEach(([topic, tstats]) => { + tstats.firstMessageDeliveries = 0 + + const threshold = this.params.topics[topic].meshMessageDeliveriesThreshold + if (tstats.inMesh && tstats.meshMessageDeliveriesActive && tstats.meshMessageDeliveries < threshold) { + const deficit = threshold - tstats.meshMessageDeliveries + tstats.meshFailurePenalty += deficit * deficit + } + + tstats.inMesh = false + }) + + pstats.connected = false + pstats.expire = Date.now() + this.params.retainScore + } + + /** + * @param {string} id + * @param {String} topic + * @returns {void} + */ + graft (id: string, topic: string): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + const tstats = ensureTopicStats(topic, pstats, this.params) + if (!tstats) { + return + } + + tstats.inMesh = true + tstats.graftTime = Date.now() + tstats.meshTime = 0 + tstats.meshMessageDeliveriesActive = false + } + + /** + * @param {string} id + * @param {string} topic + * @returns {void} + */ + prune (id: string, topic: string): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + const tstats = ensureTopicStats(topic, pstats, this.params) + if (!tstats) { + return + } + + // sticky mesh delivery rate failure penalty + const threshold = this.params.topics[topic].meshMessageDeliveriesThreshold + if (tstats.meshMessageDeliveriesActive && tstats.meshMessageDeliveries < threshold) { + const deficit = threshold - tstats.meshMessageDeliveries + tstats.meshFailurePenalty += deficit * deficit + } + tstats.inMesh = false + } + + /** + * @param {string} id + * @param {Message} message + * @returns {void} + */ + validateMessage (id: string, message: Message): void { + this.deliveryRecords.ensureRecord(this.msgId(message)) + } + + /** + * @param {string} id + * @param {Message} message + * @returns {void} + */ + deliverMessage (id: string, message: Message): void { + this._markFirstMessageDelivery(id, message) + + const drec = this.deliveryRecords.ensureRecord(this.msgId(message)) + const now = Date.now() + + // defensive check that this is the first delivery trace -- delivery status should be unknown + if (drec.status !== DeliveryRecordStatus.unknown) { + log( + 'unexpected delivery: message from %s was first seen %s ago and has delivery status %d', + id, now - drec.firstSeen, DeliveryRecordStatus[drec.status] + ) + return + } + + // mark the message as valid and reward mesh peers that have already forwarded it to us + drec.status = DeliveryRecordStatus.valid + drec.validated = now + drec.peers.forEach(p => { + // this check is to make sure a peer can't send us a message twice and get a double count + // if it is a first delivery. + if (p !== id) { + this._markDuplicateMessageDelivery(p, message) + } + }) + } + + /** + * @param {string} id + * @param {Message} message + * @returns {void} + */ + rejectMessage (id: string, message: Message): void { + const drec = this.deliveryRecords.ensureRecord(this.msgId(message)) + + // defensive check that this is the first rejection -- delivery status should be unknown + if (drec.status !== DeliveryRecordStatus.unknown) { + log( + 'unexpected rejection: message from %s was first seen %s ago and has delivery status %d', + id, Date.now() - drec.firstSeen, DeliveryRecordStatus[drec.status] + ) + return + } + + // mark the message as invalid and penalize peers that have already forwarded it. + drec.status = DeliveryRecordStatus.invalid + + this._markInvalidMessageDelivery(id, message) + drec.peers.forEach(p => { + this._markInvalidMessageDelivery(p, message) + }) + } + + /** + * @param {string} id + * @param {Message} message + * @returns {void} + */ + ignoreMessage (id: string, message: Message): void { + const drec = this.deliveryRecords.ensureRecord(this.msgId(message)) + + // defensive check that this is the first ignore -- delivery status should be unknown + if (drec.status !== DeliveryRecordStatus.unknown) { + log( + 'unexpected ignore: message from %s was first seen %s ago and has delivery status %d', + id, Date.now() - drec.firstSeen, DeliveryRecordStatus[drec.status] + ) + return + } + + // mark the message as invalid and penalize peers that have already forwarded it. + drec.status = DeliveryRecordStatus.ignored + } + + /** + * @param {string} id + * @param {Message} message + * @returns {void} + */ + duplicateMessage (id: string, message: Message): void { + const drec = this.deliveryRecords.ensureRecord(this.msgId(message)) + + if (drec.peers.has(id)) { + // we have already seen this duplicate + return + } + + switch (drec.status) { + case DeliveryRecordStatus.unknown: + // the message is being validated; track the peer delivery and wait for + // the Deliver/Reject/Ignore notification. + drec.peers.add(id) + break + case DeliveryRecordStatus.valid: + // mark the peer delivery time to only count a duplicate delivery once. + drec.peers.add(id) + this._markDuplicateMessageDelivery(id, message, drec.validated) + break + case DeliveryRecordStatus.invalid: + // we no longer track delivery time + this._markInvalidMessageDelivery(id, message) + break + } + } + + /** + * Increments the "invalid message deliveries" counter for all scored topics the message is published in. + * @param {string} id + * @param {Message} message + * @returns {void} + */ + _markInvalidMessageDelivery (id: string, message: Message): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + message.topicIDs.forEach(topic => { + const tstats = ensureTopicStats(topic, pstats, this.params) + if (!tstats) { + return + } + + tstats.invalidMessageDeliveries += 1 + }) + } + + /** + * Increments the "first message deliveries" counter for all scored topics the message is published in, + * as well as the "mesh message deliveries" counter, if the peer is in the mesh for the topic. + * @param {string} id + * @param {Message} message + * @returns {void} + */ + _markFirstMessageDelivery (id: string, message: Message): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + message.topicIDs.forEach(topic => { + const tstats = ensureTopicStats(topic, pstats, this.params) + if (!tstats) { + return + } + + let cap = this.params.topics[topic].firstMessageDeliveriesCap + tstats.firstMessageDeliveries += 1 + if (tstats.firstMessageDeliveries > cap) { + tstats.firstMessageDeliveries = cap + } + + if (!tstats.inMesh) { + return + } + + cap = this.params.topics[topic].meshMessageDeliveriesCap + tstats.meshMessageDeliveries += 1 + if (tstats.meshMessageDeliveries > cap) { + tstats.meshMessageDeliveries = cap + } + }) + } + + /** + * Increments the "mesh message deliveries" counter for messages we've seen before, + * as long the message was received within the P3 window. + * @param {string} id + * @param {Message} message + * @param {number} validatedTime + * @returns {void} + */ + _markDuplicateMessageDelivery (id: string, message: Message, validatedTime = 0): void { + const pstats = this.peerStats.get(id) + if (!pstats) { + return + } + + const now = validatedTime ? Date.now() : 0 + + message.topicIDs.forEach(topic => { + const tstats = ensureTopicStats(topic, pstats, this.params) + if (!tstats) { + return + } + + if (!tstats.inMesh) { + return + } + + const tparams = this.params.topics[topic] + + // check against the mesh delivery window -- if the validated time is passed as 0, then + // the message was received before we finished validation and thus falls within the mesh + // delivery window. + if (validatedTime && now > validatedTime + tparams.meshMessageDeliveriesWindow) { + return + } + + const cap = tparams.meshMessageDeliveriesCap + tstats.meshMessageDeliveries += 1 + if (tstats.meshMessageDeliveries > cap) { + tstats.meshMessageDeliveries = cap + } + }) + } + + /** + * Gets the current IPs for a peer. + * @param {string} id + * @returns {Array} + */ + _getIPs (id: string): string[] { + return this._connectionManager.getAll(id) + .map(c => c.remoteAddr.toOptions().host) + } + + /** + * Adds tracking for the new IPs in the list, and removes tracking from the obsolete IPs. + * @param {string} id + * @param {Array} newIPs + * @param {Array} oldIPs + * @returns {void} + */ + _setIPs (id: string, newIPs: string[], oldIPs: string[]): void { + // add the new IPs to the tracking + // eslint-disable-next-line no-labels + addNewIPs: + for (const ip of newIPs) { + // check if it is in the old ips list + for (const xip of oldIPs) { + if (ip === xip) { + // eslint-disable-next-line no-labels + continue addNewIPs + } + } + // no, it's a new one -- add it to the tracker + let peers = this.peerIPs.get(ip) + if (!peers) { + peers = new Set() + this.peerIPs.set(ip, peers) + } + peers.add(id) + } + // remove the obsolete old IPs from the tracking + // eslint-disable-next-line no-labels + removeOldIPs: + for (const ip of oldIPs) { + // check if it is in the new ips list + for (const xip of newIPs) { + if (ip === xip) { + // eslint-disable-next-line no-labels + continue removeOldIPs + } + } + // no, its obselete -- remove it from the tracker + const peers = this.peerIPs.get(ip) + if (!peers) { + continue + } + peers.delete(id) + if (!peers.size) { + this.peerIPs.delete(ip) + } + } + } + + /** + * Removes an IP list from the tracking list for a peer. + * @param {string} id + * @param {Array} ips + * @returns {void} + */ + _removeIPs (id: string, ips: string[]): void { + ips.forEach(ip => { + const peers = this.peerIPs.get(ip) + if (!peers) { + return + } + + peers.delete(id) + if (!peers.size) { + this.peerIPs.delete(ip) + } + }) + } + + /** + * Update all peer IPs to currently open connections + * @returns {void} + */ + _updateIPs (): void { + this.peerStats.forEach((pstats, id) => { + const newIPs = this._getIPs(id) + this._setIPs(id, newIPs, pstats.ips) + pstats.ips = newIPs + }) + } +} diff --git a/ts/score/peerScoreParams.ts b/ts/score/peerScoreParams.ts new file mode 100644 index 00000000..51aa4eb0 --- /dev/null +++ b/ts/score/peerScoreParams.ts @@ -0,0 +1,395 @@ +import { ERR_INVALID_PEER_SCORE_PARAMS } from './constants' +// eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore +import errcode = require('err-code') + +// This file defines PeerScoreParams and TopicScoreParams interfaces +// as well as constructors, default constructors, and validation functions +// for these interfaces + +export interface PeerScoreParams { + /** + * Score parameters per topic. + */ + topics: Record + + /** + * Aggregate topic score cap; this limits the total contribution of topics towards a positive + * score. It must be positive (or 0 for no cap). + */ + topicScoreCap: number + + /** + * P5: Application-specific peer scoring + */ + appSpecificScore: (p: string) => number + appSpecificWeight: number + + /** + * P6: IP-colocation factor. + * The parameter has an associated counter which counts the number of peers with the same IP. + * If the number of peers in the same IP exceeds IPColocationFactorThreshold, then the value + * is the square of the difference, ie (PeersInSameIP - IPColocationThreshold)^2. + * If the number of peers in the same IP is less than the threshold, then the value is 0. + * The weight of the parameter MUST be negative, unless you want to disable for testing. + * Note: In order to simulate many IPs in a managable manner when testing, you can set the weight to 0 + * thus disabling the IP colocation penalty. + */ + IPColocationFactorWeight: number + IPColocationFactorThreshold: number + IPColocationFactorWhitelist: Set + + /** + * P7: behavioural pattern penalties. + * This parameter has an associated counter which tracks misbehaviour as detected by the + * router. The router currently applies penalties for the following behaviors: + * - attempting to re-graft before the prune backoff time has elapsed. + * - not following up in IWANT requests for messages advertised with IHAVE. + * + * The value of the parameter is the square of the counter, which decays with BehaviourPenaltyDecay. + * The weight of the parameter MUST be negative (or zero to disable). + */ + behaviourPenaltyWeight: number + behaviourPenaltyDecay: number + + /** + * the decay interval for parameter counters. + */ + decayInterval: number + + /** + * counter value below which it is considered 0. + */ + decayToZero: number + + /** + * time to remember counters for a disconnected peer. + */ + retainScore: number +} + +export interface TopicScoreParams { + /** + * The weight of the topic. + */ + topicWeight: number + + /** + * P1: time in the mesh + * This is the time the peer has ben grafted in the mesh. + * The value of the parameter is the time/TimeInMeshQuantum, capped by TimeInMeshCap + * The weight of the parameter MUST be positive (or zero to disable). + */ + timeInMeshWeight: number + timeInMeshQuantum: number + timeInMeshCap: number + + /** + * P2: first message deliveries + * This is the number of message deliveries in the topic. + * The value of the parameter is a counter, decaying with FirstMessageDeliveriesDecay, and capped + * by FirstMessageDeliveriesCap. + * The weight of the parameter MUST be positive (or zero to disable). + */ + firstMessageDeliveriesWeight: number + firstMessageDeliveriesDecay: number + firstMessageDeliveriesCap: number + + /** + * P3: mesh message deliveries + * This is the number of message deliveries in the mesh, within the MeshMessageDeliveriesWindow of + * message validation; deliveries during validation also count and are retroactively applied + * when validation succeeds. + * This window accounts for the minimum time before a hostile mesh peer trying to game the score + * could replay back a valid message we just sent them. + * It effectively tracks first and near-first deliveries, ie a message seen from a mesh peer + * before we have forwarded it to them. + * The parameter has an associated counter, decaying with MeshMessageDeliveriesDecay. + * If the counter exceeds the threshold, its value is 0. + * If the counter is below the MeshMessageDeliveriesThreshold, the value is the square of + * the deficit, ie (MessageDeliveriesThreshold - counter)^2 + * The penalty is only activated after MeshMessageDeliveriesActivation time in the mesh. + * The weight of the parameter MUST be negative (or zero to disable). + */ + meshMessageDeliveriesWeight: number + meshMessageDeliveriesDecay: number + meshMessageDeliveriesCap: number + meshMessageDeliveriesThreshold: number + meshMessageDeliveriesWindow: number + meshMessageDeliveriesActivation: number + + /** + * P3b: sticky mesh propagation failures + * This is a sticky penalty that applies when a peer gets pruned from the mesh with an active + * mesh message delivery penalty. + * The weight of the parameter MUST be negative (or zero to disable) + */ + meshFailurePenaltyWeight: number + meshFailurePenaltyDecay: number + + /** + * P4: invalid messages + * This is the number of invalid messages in the topic. + * The value of the parameter is the square of the counter, decaying with + * InvalidMessageDeliveriesDecay. + * The weight of the parameter MUST be negative (or zero to disable). + */ + invalidMessageDeliveriesWeight: number + invalidMessageDeliveriesDecay: number +} + +export const defaultPeerScoreParams: PeerScoreParams = { + topics: {}, + topicScoreCap: 10, + appSpecificScore: () => 0, + appSpecificWeight: 10, + IPColocationFactorWeight: -5, + IPColocationFactorThreshold: 10, + IPColocationFactorWhitelist: new Set(), + behaviourPenaltyWeight: -10, + behaviourPenaltyDecay: 0.2, + decayInterval: 1000, + decayToZero: 0.1, + retainScore: 3600 * 1000 +} + +export const defaultTopicScoreParams: TopicScoreParams = { + topicWeight: 0.5, + timeInMeshWeight: 1, + timeInMeshQuantum: 1, + timeInMeshCap: 3600, + + firstMessageDeliveriesWeight: 1, + firstMessageDeliveriesDecay: 0.5, + firstMessageDeliveriesCap: 2000, + + meshMessageDeliveriesWeight: -1, + meshMessageDeliveriesDecay: 0.5, + meshMessageDeliveriesCap: 100, + meshMessageDeliveriesThreshold: 20, + meshMessageDeliveriesWindow: 10, + meshMessageDeliveriesActivation: 5000, + + meshFailurePenaltyWeight: -1, + meshFailurePenaltyDecay: 0.5, + + invalidMessageDeliveriesWeight: -1, + invalidMessageDeliveriesDecay: 0.3 +} + +export function createPeerScoreParams (p: Partial = {}): PeerScoreParams { + return { + ...defaultPeerScoreParams, + ...p, + topics: p.topics + ? Object.entries(p.topics) + .reduce((topics, [topic, topicScoreParams]) => { + topics[topic] = createTopicScoreParams(topicScoreParams) + return topics + }, {} as Record) + : {} + } +} + +export function createTopicScoreParams (p: Partial = {}): TopicScoreParams { + return { + ...defaultTopicScoreParams, + ...p + } +} + +// peer score parameter validation +export function validatePeerScoreParams (p: PeerScoreParams): void { + for (const [topic, params] of Object.entries(p.topics)) { + try { + validateTopicScoreParams(params) + } catch (e) { + throw errcode( + new Error(`invalid score parameters for topic ${topic}: ${e.message}`), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + } + + // check that the topic score is 0 or something positive + if (p.topicScoreCap < 0) { + throw errcode( + new Error('invalid topic score cap; must be positive (or 0 for no cap)'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + + // check that we have an app specific score; the weight can be anything (but expected positive) + if (p.appSpecificScore === null || p.appSpecificScore === undefined) { + throw errcode( + new Error('missing application specific score function'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + + // check the IP colocation factor + if (p.IPColocationFactorWeight > 0) { + throw errcode( + new Error('invalid IPColocationFactorWeight; must be negative (or 0 to disable)'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.IPColocationFactorWeight !== 0 && p.IPColocationFactorThreshold < 1) { + throw errcode( + new Error('invalid IPColocationFactorThreshold; must be at least 1'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + + // check the behaviour penalty + if (p.behaviourPenaltyWeight > 0) { + throw errcode( + new Error('invalid BehaviourPenaltyWeight; must be negative (or 0 to disable)'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.behaviourPenaltyWeight !== 0 && (p.behaviourPenaltyDecay <= 0 || p.behaviourPenaltyDecay >= 1)) { + throw errcode( + new Error('invalid BehaviourPenaltyDecay; must be between 0 and 1'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + + // check the decay parameters + if (p.decayInterval < 1000) { + throw errcode( + new Error('invalid DecayInterval; must be at least 1s'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.decayToZero <= 0 || p.decayToZero >= 1) { + throw errcode( + new Error('invalid DecayToZero; must be between 0 and 1'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + + // no need to check the score retention; a value of 0 means that we don't retain scores +} + +export function validateTopicScoreParams (p: TopicScoreParams): void { + // make sure we have a sane topic weight + if (p.topicWeight < 0) { + throw errcode( + new Error('invalid topic weight; must be >= 0'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + + // check P1 + if (p.timeInMeshQuantum === 0) { + throw errcode( + new Error('invalid TimeInMeshQuantum; must be non zero'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.timeInMeshWeight < 0) { + throw errcode( + new Error('invalid TimeInMeshWeight; must be positive (or 0 to disable)'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.timeInMeshWeight !== 0 && p.timeInMeshQuantum <= 0) { + throw errcode( + new Error('invalid TimeInMeshQuantum; must be positive'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.timeInMeshWeight !== 0 && p.timeInMeshCap <= 0) { + throw errcode( + new Error('invalid TimeInMeshCap; must be positive'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + + // check P2 + if (p.firstMessageDeliveriesWeight < 0) { + throw errcode( + new Error('invallid FirstMessageDeliveriesWeight; must be positive (or 0 to disable)'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.firstMessageDeliveriesWeight !== 0 && (p.firstMessageDeliveriesDecay <= 0 || p.firstMessageDeliveriesDecay >= 1)) { + throw errcode( + new Error('invalid FirstMessageDeliveriesDecay; must be between 0 and 1'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.firstMessageDeliveriesWeight !== 0 && p.firstMessageDeliveriesCap <= 0) { + throw errcode( + new Error('invalid FirstMessageDeliveriesCap; must be positive'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + + // check P3 + if (p.meshMessageDeliveriesWeight > 0) { + throw errcode( + new Error('invalid MeshMessageDeliveriesWeight; must be negative (or 0 to disable)'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.meshMessageDeliveriesWeight !== 0 && (p.meshMessageDeliveriesDecay <= 0 || p.meshMessageDeliveriesDecay >= 1)) { + throw errcode( + new Error('invalid MeshMessageDeliveriesDecay; must be between 0 and 1'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.meshMessageDeliveriesWeight !== 0 && p.meshMessageDeliveriesCap <= 0) { + throw errcode( + new Error('invalid MeshMessageDeliveriesCap; must be positive'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.meshMessageDeliveriesWeight !== 0 && p.meshMessageDeliveriesThreshold <= 0) { + throw errcode( + new Error('invalid MeshMessageDeliveriesThreshold; must be positive'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.meshMessageDeliveriesWindow < 0) { + throw errcode( + new Error('invalid MeshMessageDeliveriesWindow; must be non-negative'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.meshMessageDeliveriesWeight !== 0 && p.meshMessageDeliveriesActivation < 1000) { + throw errcode( + new Error('invalid MeshMessageDeliveriesActivation; must be at least 1s'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + + // check P3b + if (p.meshFailurePenaltyWeight > 0) { + throw errcode( + new Error('invalid MeshFailurePenaltyWeight; must be negative (or 0 to disable)'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.meshFailurePenaltyWeight !== 0 && (p.meshFailurePenaltyDecay <= 0 || p.meshFailurePenaltyDecay >= 1)) { + throw errcode( + new Error('invalid MeshFailurePenaltyDecay; must be between 0 and 1'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + + // check P4 + if (p.invalidMessageDeliveriesWeight > 0) { + throw errcode( + new Error('invalid InvalidMessageDeliveriesWeight; must be negative (or 0 to disable)'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } + if (p.invalidMessageDeliveriesDecay <= 0 || p.invalidMessageDeliveriesDecay >= 1) { + throw errcode( + new Error('invalid InvalidMessageDeliveriesDecay; must be between 0 and 1'), + ERR_INVALID_PEER_SCORE_PARAMS + ) + } +} diff --git a/ts/score/peerScoreThresholds.ts b/ts/score/peerScoreThresholds.ts new file mode 100644 index 00000000..629b6227 --- /dev/null +++ b/ts/score/peerScoreThresholds.ts @@ -0,0 +1,88 @@ +import { ERR_INVALID_PEER_SCORE_THRESHOLDS } from './constants' +// eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore +import errcode = require('err-code') + +// This file defines PeerScoreThresholds interface +// as well as a constructor, default constructor, and validation function +// for this interface + +export interface PeerScoreThresholds { + /** + * gossipThreshold is the score threshold below which gossip propagation is supressed; + * should be negative. + */ + gossipThreshold: number + + /** + * publishThreshold is the score threshold below which we shouldn't publish when using flood + * publishing (also applies to fanout and floodsub peers); should be negative and <= GossipThreshold. + */ + publishThreshold: number + + /** + * graylistThreshold is the score threshold below which message processing is supressed altogether, + * implementing an effective graylist according to peer score; should be negative and <= PublisThreshold. + */ + graylistThreshold: number + + /** + * acceptPXThreshold is the score threshold below which PX will be ignored; this should be positive + * and limited to scores attainable by bootstrappers and other trusted nodes. + */ + acceptPXThreshold: number + + /** + * opportunisticGraftThreshold is the median mesh score threshold before triggering opportunistic + * grafting; this should have a small positive value. + */ + opportunisticGraftThreshold: number +} + +export const defaultPeerScoreThresholds: PeerScoreThresholds = { + gossipThreshold: -10, + publishThreshold: -50, + graylistThreshold: -80, + acceptPXThreshold: 10, + opportunisticGraftThreshold: 20 +} + +export function createPeerScoreThresholds (p: Partial = {}): PeerScoreThresholds { + return { + ...defaultPeerScoreThresholds, + ...p + } +} + +export function validatePeerScoreThresholds (p: PeerScoreThresholds): void { + if (p.gossipThreshold > 0) { + throw errcode( + new Error('invalid gossip threshold; it must be <= 0'), + ERR_INVALID_PEER_SCORE_THRESHOLDS + ) + } + if (p.publishThreshold > 0 || p.publishThreshold > p.gossipThreshold) { + throw errcode( + new Error('invalid publish threshold; it must be <= 0 and <= gossip threshold'), + ERR_INVALID_PEER_SCORE_THRESHOLDS + ) + } + if (p.graylistThreshold > 0 || p.graylistThreshold > p.publishThreshold) { + throw errcode( + new Error('invalid graylist threshold; it must be <= 0 and <= publish threshold'), + ERR_INVALID_PEER_SCORE_THRESHOLDS + ) + } + if (p.acceptPXThreshold < 0) { + throw errcode( + new Error('invalid accept PX threshold; it must be >= 0'), + ERR_INVALID_PEER_SCORE_THRESHOLDS + ) + } + if (p.opportunisticGraftThreshold < 0) { + throw errcode( + new Error('invalid opportunistic grafting threshold; it must be >= 0'), + ERR_INVALID_PEER_SCORE_THRESHOLDS + ) + } +} diff --git a/ts/score/peerStats.ts b/ts/score/peerStats.ts new file mode 100644 index 00000000..319f2973 --- /dev/null +++ b/ts/score/peerStats.ts @@ -0,0 +1,114 @@ +import { PeerScoreParams } from './peerScoreParams' + +export interface PeerStats { + /** + * true if the peer is currently connected + */ + connected: boolean + + /** + * expiration time of the score stats for disconnected peers + */ + expire: number + + /** + * per topic stats + */ + topics: Record + + /** + * IP tracking; store as string for easy processing + */ + ips: string[] + + /** + * behavioural pattern penalties (applied by the router) + */ + behaviourPenalty: number +} + +export interface TopicStats { + /** + * true if the peer is in the mesh + */ + inMesh: boolean + + /** + * time when the peer was (last) GRAFTed; valid only when in mesh + */ + graftTime: number + + /** + * time in mesh (updated during refresh/decay to avoid calling gettimeofday on + * every score invocation) + */ + meshTime: number + + /** + * first message deliveries + */ + firstMessageDeliveries: number + + /** + * mesh message deliveries + */ + meshMessageDeliveries: number + + /** + * true if the peer has been enough time in the mesh to activate mess message deliveries + */ + meshMessageDeliveriesActive: boolean + + /** + * sticky mesh rate failure penalty counter + */ + meshFailurePenalty: number + + /** + * invalid message counter + */ + invalidMessageDeliveries: number +} + +export function createPeerStats (ps: Partial = {}): PeerStats { + return { + connected: false, + expire: 0, + ips: [], + behaviourPenalty: 0, + ...ps, + topics: ps.topics + ? Object.entries(ps.topics) + .reduce((topics, [topic, topicStats]) => { + topics[topic] = createTopicStats(topicStats) + return topics + }, {} as Record) + : {} + } +} + +export function createTopicStats (ts: Partial = {}): TopicStats { + return { + inMesh: false, + graftTime: 0, + meshTime: 0, + firstMessageDeliveries: 0, + meshMessageDeliveries: 0, + meshMessageDeliveriesActive: false, + meshFailurePenalty: 0, + invalidMessageDeliveries: 0, + ...ts + } +} + +export function ensureTopicStats (topic: string, ps: PeerStats, params: PeerScoreParams): TopicStats | undefined { + let ts = ps.topics[topic] + if (ts) { + return ts + } + if (!params.topics[topic]) { + return undefined + } + ps.topics[topic] = ts = createTopicStats() + return ts +} diff --git a/ts/score/scoreParamDecay.ts b/ts/score/scoreParamDecay.ts new file mode 100644 index 00000000..4b433f9a --- /dev/null +++ b/ts/score/scoreParamDecay.ts @@ -0,0 +1,20 @@ +const DefaultDecayInterval = 1000 +const DefaultDecayToZero = 0.01 + +/** + * ScoreParameterDecay computes the decay factor for a parameter, assuming the DecayInterval is 1s + * and that the value decays to zero if it drops below 0.01 + */ +export function scoreParameterDecay (decay: number): number { + return scoreParameterDecayWithBase(decay, DefaultDecayInterval, DefaultDecayToZero) +} + +/** + * ScoreParameterDecay computes the decay factor for a parameter using base as the DecayInterval + */ +export function scoreParameterDecayWithBase (decay: number, base: number, decayToZero: number): number { + // the decay is linear, so after n ticks the value is factor^n + // so factor^n = decayToZero => factor = decayToZero^(1/n) + const ticks = decay / base + return decayToZero ** (1 / ticks) +}