diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index e8663b52..6cee1eb6 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -8,6 +8,9 @@ const expect = chai.expect const sinon = require('sinon') const { utils } = require('libp2p-pubsub') +const Peer = require('libp2p-pubsub/src/peer') +const { signMessage } = require('libp2p-pubsub/src/message/sign') +const PeerId = require('peer-id') const { createGossipsub, mockRegistrar, @@ -42,27 +45,28 @@ describe('Pubsub', () => { }) describe('validate', () => { - it('should drop unsigned messages', () => { + it('should drop unsigned messages', async () => { sinon.spy(gossipsub, '_processRpcMessage') sinon.spy(gossipsub, 'validate') sinon.stub(gossipsub.peers, 'get').returns({}) const topic = 'my-topic' + const peer = new Peer({ id: await PeerId.create() }) const rpc = { subscriptions: [], msgs: [{ - from: gossipsub.peerId.id, + from: peer.id.toBytes(), data: Buffer.from('an unsigned message'), seqno: utils.randomSeqno(), topicIDs: [topic] }] } - gossipsub._processRpc('QmAnotherPeer', {}, rpc) + gossipsub._processRpc(peer.id.toB58String(), peer, rpc) - return new Promise(resolve => setTimeout(() => { + return new Promise(resolve => setTimeout(async () => { expect(gossipsub.validate.callCount).to.eql(1) - expect(gossipsub._processRpcMessage.called).to.eql(false) + expect(await gossipsub.validate.getCall(0).returnValue).to.eql(false) resolve() }, 500)) }) @@ -73,28 +77,30 @@ describe('Pubsub', () => { sinon.stub(gossipsub.peers, 'get').returns({}) const topic = 'my-topic' - const signedMessage = await gossipsub._buildMessage({ - from: gossipsub.peerId.id, - data: Buffer.from('an unsigned message'), + const peer = new Peer({ id: await PeerId.create() }) + let signedMessage = { + from: peer.id.toBytes(), + data: Buffer.from('a signed message'), seqno: utils.randomSeqno(), topicIDs: [topic] - }) + } + signedMessage = await signMessage(peer.id, signedMessage) const rpc = { subscriptions: [], msgs: [signedMessage] } - gossipsub._processRpc('QmAnotherPeer', {}, rpc) + gossipsub._processRpc(peer.id.toB58String(), peer, rpc) - return new Promise(resolve => setTimeout(() => { + return new Promise(resolve => setTimeout(async () => { expect(gossipsub.validate.callCount).to.eql(1) - expect(gossipsub._processRpcMessage.callCount).to.eql(1) + expect(await gossipsub.validate.getCall(0).returnValue).to.be.eql(true) resolve() }, 500)) }) - it('should not drop unsigned messages if strict signing is disabled', () => { + it('should not drop unsigned messages if strict signing is disabled', async () => { sinon.spy(gossipsub, '_processRpcMessage') sinon.spy(gossipsub, 'validate') sinon.stub(gossipsub.peers, 'get').returns({}) @@ -102,21 +108,22 @@ describe('Pubsub', () => { sinon.stub(gossipsub, 'strictSigning').value(false) const topic = 'my-topic' + const peer = new Peer({ id: await PeerId.create() }) const rpc = { subscriptions: [], msgs: [{ - from: gossipsub.peerId.id, + from: peer.id.toBytes(), data: Buffer.from('an unsigned message'), seqno: utils.randomSeqno(), topicIDs: [topic] }] } - gossipsub._processRpc('QmAnotherPeer', {}, rpc) + gossipsub._processRpc(peer.id.toB58String(), peer, rpc) - return new Promise(resolve => setTimeout(() => { + return new Promise(resolve => setTimeout(async () => { expect(gossipsub.validate.callCount).to.eql(1) - expect(gossipsub._processRpcMessage.callCount).to.eql(1) + expect(await gossipsub.validate.getCall(0).returnValue).to.eql(true) resolve() }, 500)) }) @@ -124,14 +131,13 @@ describe('Pubsub', () => { describe('topic validators', () => { it('should filter messages by topic validator', async () => { - // use processRpcMessage.callCount to see if a message is valid or not - // a valid message will trigger processRpcMessage - sinon.stub(gossipsub, '_processRpcMessage') + // use validate.getCall(0).returnValue to see if a message is valid or not + sinon.spy(gossipsub, 'validate') // Disable strict signing sinon.stub(gossipsub, 'strictSigning').value(false) + sinon.stub(gossipsub.peers, 'get').returns({}) const filteredTopic = 't' - const peerStr = 'QmAnotherPeer' - gossipsub.peers.set(peerStr, {}) + const peer = new Peer({ id: await PeerId.create() }) // Set a trivial topic validator gossipsub.topicValidators.set(filteredTopic, (topic, peer, message) => { @@ -142,7 +148,7 @@ describe('Pubsub', () => { const validRpc = { subscriptions: [], msgs: [{ - from: gossipsub.peerId.id, + from: peer.id.toBytes(), data: Buffer.from('a message'), seqno: utils.randomSeqno(), topicIDs: [filteredTopic] @@ -150,15 +156,16 @@ describe('Pubsub', () => { } // process valid message - gossipsub._processRpc(peerStr, {}, validRpc) + gossipsub._processRpc(peer.id.toB58String(), peer, validRpc) await new Promise(resolve => setTimeout(resolve, 500)) - expect(gossipsub._processRpcMessage.callCount).to.eql(1) + expect(gossipsub.validate.callCount).to.eql(1) + expect(await gossipsub.validate.getCall(0).returnValue).to.eql(true) // invalid case const invalidRpc = { subscriptions: [], msgs: [{ - from: gossipsub.peerId.id, + from: peer.id.toBytes(), data: Buffer.from('a different message'), seqno: utils.randomSeqno(), topicIDs: [filteredTopic] @@ -166,9 +173,10 @@ describe('Pubsub', () => { } // process invalid message - gossipsub._processRpc(peerStr, {}, invalidRpc) + gossipsub._processRpc(peer.id.toB58String(), peer, invalidRpc) await new Promise(resolve => setTimeout(resolve, 500)) - expect(gossipsub._processRpcMessage.callCount).to.eql(1) + expect(gossipsub.validate.callCount).to.eql(2) + expect(await gossipsub.validate.getCall(1).returnValue).to.eql(false) // remove topic validator gossipsub.topicValidators.delete(filteredTopic) @@ -177,7 +185,7 @@ describe('Pubsub', () => { const invalidRpc2 = { subscriptions: [], msgs: [{ - from: gossipsub.peerId.id, + from: peer.id.toB58String(), data: Buffer.from('a different message'), seqno: utils.randomSeqno(), topicIDs: [filteredTopic] @@ -185,11 +193,10 @@ describe('Pubsub', () => { } // process previously invalid message, now is valid - gossipsub._processRpc(peerStr, {}, invalidRpc2) + gossipsub._processRpc(peer.id.toB58String(), peer, invalidRpc2) await new Promise(resolve => setTimeout(resolve, 500)) - expect(gossipsub._processRpcMessage.callCount).to.eql(2) - // cleanup - gossipsub.peers.delete(peerStr) + expect(gossipsub.validate.callCount).to.eql(3) + expect(await gossipsub.validate.getCall(2).returnValue).to.eql(true) }) }) }) diff --git a/ts/constants.ts b/ts/constants.ts index db17b611..c4df9ab1 100644 --- a/ts/constants.ts +++ b/ts/constants.ts @@ -206,3 +206,9 @@ export const GossipsubMaxIHaveMessages = 10 export const GossipsubIWantFollowupTime = 3 * second export const TimeCacheDuration = 120 * 1000 + +export const enum ExtendedValidatorResult { + accept = 'accept', + reject = 'reject', + ignore = 'ignore' +} diff --git a/ts/index.ts b/ts/index.ts index 06daa4a3..66d4a411 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -8,6 +8,7 @@ import { ControlMessage, ControlIHave, ControlGraft, ControlIWant, ControlPrune } from './message' import * as constants from './constants' +import { ExtendedValidatorResult } from './constants' import { Heartbeat } from './heartbeat' import { getGossipPeers } from './getGossipPeers' import { createGossipRpc, shuffle, hasGossipProtocol } from './utils' @@ -280,49 +281,92 @@ class Gossipsub extends BasicPubsub { * @param {Peer} peer * @param {Message} msg */ - _processRpcMessage (peer: Peer, msg: InMessage): void { + async _processRpcMessage (peer: Peer, msg: InMessage): Promise { const msgID = this.getMsgId(msg) // Ignore if we've already seen the message if (this.seenCache.has(msgID)) { + this.score.duplicateMessage(peer.id.toB58String(), msg as Message) return } this.seenCache.put(msgID) - super._processRpcMessage(peer, msg) + this.score.validateMessage(peer.id.toB58String(), msg as Message) + await super._processRpcMessage(peer, msg) + } + + /** + * Publish a message sent from a peer + * @override + * @param {Peer} peer + * @param {InMessage} msg + * @returns {void} + */ + _publishFrom (peer: Peer, msg: InMessage): void { + this.score.deliverMessage(peer.id.toB58String(), msg as Message) const topics = msg.topicIDs // If options.gossipIncoming is false, do NOT emit incoming messages to peers - if (!this._options.gossipIncoming) { - return - } - - // Emit to floodsub peers - this.peers.forEach((peer) => { - if (peer.protocols.includes(constants.FloodsubID) && - peer.id.toB58String() !== msg.from && - utils.anyMatch(peer.topics, topics) && - peer.isWritable - ) { - peer.sendMessages(utils.normalizeOutRpcMessages([msg])) - this.log('publish msg on topics - floodsub', topics, peer.id.toB58String()) - } - }) + if (this._options.gossipIncoming) { + // Emit to floodsub peers + this.peers.forEach((peer) => { + if (peer.protocols.includes(constants.FloodsubID) && + peer.id.toB58String() !== msg.from && + utils.anyMatch(peer.topics, topics) && + peer.isWritable + ) { + peer.sendMessages(utils.normalizeOutRpcMessages([msg])) + this.log('publish msg on topics - floodsub', topics, peer.id.toB58String()) + } + }) - // Emit to peers in the mesh - topics.forEach((topic) => { - const meshPeers = this.mesh.get(topic) - if (!meshPeers) { - return - } - meshPeers.forEach((peer) => { - if (!peer.isWritable || peer.id.toB58String() === msg.from) { + // Emit to peers in the mesh + topics.forEach((topic) => { + const meshPeers = this.mesh.get(topic) + if (!meshPeers) { return } - peer.sendMessages(utils.normalizeOutRpcMessages([msg])) - this.log('publish msg on topic - meshsub', topic, peer.id.toB58String()) + meshPeers.forEach((peer) => { + if (!peer.isWritable || peer.id.toB58String() === msg.from) { + return + } + peer.sendMessages(utils.normalizeOutRpcMessages([msg])) + this.log('publish msg on topic - meshsub', topic, peer.id.toB58String()) + }) }) - }) + } + + // Emit to self + super._publishFrom(peer, msg) + } + + /** + * Coerse topic validator result to valid/invalid boolean + * Provide extended validator support + * + * @override + * @param {string} topic + * @param {Peer} peer + * @param {Message} message + * @param {unknown} result + * @returns {boolean} + */ + _processTopicValidatorResult (topic: string, peer: Peer, message: Message, result: unknown): boolean { + if (typeof result === 'string') { + // assume an extended topic validator result + switch (result) { + case ExtendedValidatorResult.accept: + return true + case ExtendedValidatorResult.reject: + this.score.rejectMessage(peer.id.toB58String(), message) + return false + case ExtendedValidatorResult.ignore: + this.score.ignoreMessage(peer.id.toB58String(), message) + return false + } + } + // assume a basic topic validator result + return super._processTopicValidatorResult(topic, peer, message, result) } /** @@ -593,7 +637,6 @@ class Gossipsub extends BasicPubsub { /** * Publish messages * - * Note: this function assumes all messages are well-formed RPC objects * @override * @param {Array} msgs * @returns {void} diff --git a/ts/pubsub.js b/ts/pubsub.js index 6fb0d518..782109e8 100644 --- a/ts/pubsub.js +++ b/ts/pubsub.js @@ -159,19 +159,6 @@ class BasicPubSub extends Pubsub { if (msgs.length) { msgs.forEach(async message => { const msg = utils.normalizeInRpcMessage(message) - - // Ensure the message is valid before processing it - try { - const isValid = await this.validate(message, peer) - if (!isValid) { - this.log('Message is invalid, dropping it.') - return - } - } catch (err) { - this.log('Error in message validation, dropping it. %O', err) - return - } - this._processRpcMessage(peer, msg) }) } @@ -248,17 +235,43 @@ class BasicPubSub extends Pubsub { * @param {Peer} peer * @param {RPC.Message} msg */ - _processRpcMessage (peer, msg) { + async _processRpcMessage (peer, msg) { if (this.peerId.toB58String() === msg.from && !this._options.emitSelf) { return } + // Ensure the message is valid before processing it + try { + const isValid = await this.validate(utils.normalizeOutRpcMessage(msg), peer) + if (!isValid) { + this.log('Message is invalid, dropping it.') + return + } + } catch (err) { + this.log('Error in message validation, dropping it. %O', err) + return + } + + this._publishFrom(peer, msg) + } + /** + * Publish a message sent from a peer + * @param {Peer} peer + * @param {InMessage} msg + * @returns {void} + */ + _publishFrom (peer, msg) { // Emit to self - this._emitMessage(msg.topicIDs, msg) + this._emitMessage(msg) } - _emitMessage (topics, message) { - topics.forEach((topic) => { + /** + * Emit a message from a peer + * @param {Peer} peer + * @param {InMessage} message + */ + _emitMessage (message) { + message.topicIDs.forEach((topic) => { if (this.subscriptions.has(topic)) { this.emit(topic, message) } @@ -379,7 +392,7 @@ class BasicPubSub extends Pubsub { * @override * @param {Array|string} topics * @param {Array|any} messages - * @returns {void} + * @returns {Promise} */ async publish (topics, messages) { if (!this.started) { @@ -393,23 +406,22 @@ class BasicPubSub extends Pubsub { const from = this.peerId.toB58String() - const buildMessage = (msg, cb) => { - const seqno = utils.randomSeqno() - const msgObj = { + const buildMessage = data => { + return { from: from, - data: msg, - seqno: seqno, + data: data, + seqno: utils.randomSeqno(), topicIDs: topics } - // Emit to self if I'm interested and emitSelf enabled - this._options.emitSelf && this._emitMessages(topics, [msgObj]) - - return this._buildMessage(msgObj) } - const msgObjects = await pMap(messages, buildMessage) + const msgObjects = messages.map(buildMessage) + // Emit to self if I'm interested and emitSelf enabled + this._options.emitSelf && msgObjects.forEach(msg => this._emitMessage(msg)) + + const signMessage = (msg, cb) => this._buildMessage(msg) // send to all the other peers - this._publish(utils.normalizeOutRpcMessages(msgObjects)) + this._publish(await pMap(msgObjects, signMessage)) } /** @@ -434,23 +446,10 @@ class BasicPubSub extends Pubsub { return this.defaultMsgIdFn(msg) } - _emitMessages (topics, messages) { - topics.forEach((topic) => { - if (!this.subscriptions.has(topic)) { - return - } - - messages.forEach((message) => { - this.emit(topic, message) - }) - }) - } - /** * Publish messages * - * Note: this function assumes all messages are well-formed RPC objects - * @param {Array} msgs + * @param {Array} msgs * @returns {void} */ _publish (msgs) {