Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 40 additions & 33 deletions test/pubsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ const expect = chai.expect
const sinon = require('sinon')

const { utils } = require('libp2p-pubsub')
const Peer = require('libp2p-pubsub/src/peer')
const { signMessage } = require('libp2p-pubsub/src/message/sign')
const PeerId = require('peer-id')
const {
createGossipsub,
mockRegistrar,
Expand Down Expand Up @@ -42,27 +45,28 @@ describe('Pubsub', () => {
})

describe('validate', () => {
it('should drop unsigned messages', () => {
it('should drop unsigned messages', async () => {
sinon.spy(gossipsub, '_processRpcMessage')
sinon.spy(gossipsub, 'validate')
sinon.stub(gossipsub.peers, 'get').returns({})

const topic = 'my-topic'
const peer = new Peer({ id: await PeerId.create() })
const rpc = {
subscriptions: [],
msgs: [{
from: gossipsub.peerId.id,
from: peer.id.toBytes(),
data: Buffer.from('an unsigned message'),
seqno: utils.randomSeqno(),
topicIDs: [topic]
}]
}

gossipsub._processRpc('QmAnotherPeer', {}, rpc)
gossipsub._processRpc(peer.id.toB58String(), peer, rpc)

return new Promise(resolve => setTimeout(() => {
return new Promise(resolve => setTimeout(async () => {
expect(gossipsub.validate.callCount).to.eql(1)
expect(gossipsub._processRpcMessage.called).to.eql(false)
expect(await gossipsub.validate.getCall(0).returnValue).to.eql(false)
resolve()
}, 500))
})
Expand All @@ -73,65 +77,67 @@ describe('Pubsub', () => {
sinon.stub(gossipsub.peers, 'get').returns({})

const topic = 'my-topic'
const signedMessage = await gossipsub._buildMessage({
from: gossipsub.peerId.id,
data: Buffer.from('an unsigned message'),
const peer = new Peer({ id: await PeerId.create() })
let signedMessage = {
from: peer.id.toBytes(),
data: Buffer.from('a signed message'),
seqno: utils.randomSeqno(),
topicIDs: [topic]
})
}
signedMessage = await signMessage(peer.id, signedMessage)

const rpc = {
subscriptions: [],
msgs: [signedMessage]
}

gossipsub._processRpc('QmAnotherPeer', {}, rpc)
gossipsub._processRpc(peer.id.toB58String(), peer, rpc)

return new Promise(resolve => setTimeout(() => {
return new Promise(resolve => setTimeout(async () => {
expect(gossipsub.validate.callCount).to.eql(1)
expect(gossipsub._processRpcMessage.callCount).to.eql(1)
expect(await gossipsub.validate.getCall(0).returnValue).to.be.eql(true)
resolve()
}, 500))
})

it('should not drop unsigned messages if strict signing is disabled', () => {
it('should not drop unsigned messages if strict signing is disabled', async () => {
sinon.spy(gossipsub, '_processRpcMessage')
sinon.spy(gossipsub, 'validate')
sinon.stub(gossipsub.peers, 'get').returns({})
// Disable strict signing
sinon.stub(gossipsub, 'strictSigning').value(false)

const topic = 'my-topic'
const peer = new Peer({ id: await PeerId.create() })
const rpc = {
subscriptions: [],
msgs: [{
from: gossipsub.peerId.id,
from: peer.id.toBytes(),
data: Buffer.from('an unsigned message'),
seqno: utils.randomSeqno(),
topicIDs: [topic]
}]
}

gossipsub._processRpc('QmAnotherPeer', {}, rpc)
gossipsub._processRpc(peer.id.toB58String(), peer, rpc)

return new Promise(resolve => setTimeout(() => {
return new Promise(resolve => setTimeout(async () => {
expect(gossipsub.validate.callCount).to.eql(1)
expect(gossipsub._processRpcMessage.callCount).to.eql(1)
expect(await gossipsub.validate.getCall(0).returnValue).to.eql(true)
resolve()
}, 500))
})
})

describe('topic validators', () => {
it('should filter messages by topic validator', async () => {
// use processRpcMessage.callCount to see if a message is valid or not
// a valid message will trigger processRpcMessage
sinon.stub(gossipsub, '_processRpcMessage')
// use validate.getCall(0).returnValue to see if a message is valid or not
sinon.spy(gossipsub, 'validate')
// Disable strict signing
sinon.stub(gossipsub, 'strictSigning').value(false)
sinon.stub(gossipsub.peers, 'get').returns({})
const filteredTopic = 't'
const peerStr = 'QmAnotherPeer'
gossipsub.peers.set(peerStr, {})
const peer = new Peer({ id: await PeerId.create() })

// Set a trivial topic validator
gossipsub.topicValidators.set(filteredTopic, (topic, peer, message) => {
Expand All @@ -142,33 +148,35 @@ describe('Pubsub', () => {
const validRpc = {
subscriptions: [],
msgs: [{
from: gossipsub.peerId.id,
from: peer.id.toBytes(),
data: Buffer.from('a message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic]
}]
}

// process valid message
gossipsub._processRpc(peerStr, {}, validRpc)
gossipsub._processRpc(peer.id.toB58String(), peer, validRpc)
await new Promise(resolve => setTimeout(resolve, 500))
expect(gossipsub._processRpcMessage.callCount).to.eql(1)
expect(gossipsub.validate.callCount).to.eql(1)
expect(await gossipsub.validate.getCall(0).returnValue).to.eql(true)

// invalid case
const invalidRpc = {
subscriptions: [],
msgs: [{
from: gossipsub.peerId.id,
from: peer.id.toBytes(),
data: Buffer.from('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic]
}]
}

// process invalid message
gossipsub._processRpc(peerStr, {}, invalidRpc)
gossipsub._processRpc(peer.id.toB58String(), peer, invalidRpc)
await new Promise(resolve => setTimeout(resolve, 500))
expect(gossipsub._processRpcMessage.callCount).to.eql(1)
expect(gossipsub.validate.callCount).to.eql(2)
expect(await gossipsub.validate.getCall(1).returnValue).to.eql(false)

// remove topic validator
gossipsub.topicValidators.delete(filteredTopic)
Expand All @@ -177,19 +185,18 @@ describe('Pubsub', () => {
const invalidRpc2 = {
subscriptions: [],
msgs: [{
from: gossipsub.peerId.id,
from: peer.id.toB58String(),
data: Buffer.from('a different message'),
seqno: utils.randomSeqno(),
topicIDs: [filteredTopic]
}]
}

// process previously invalid message, now is valid
gossipsub._processRpc(peerStr, {}, invalidRpc2)
gossipsub._processRpc(peer.id.toB58String(), peer, invalidRpc2)
await new Promise(resolve => setTimeout(resolve, 500))
expect(gossipsub._processRpcMessage.callCount).to.eql(2)
// cleanup
gossipsub.peers.delete(peerStr)
expect(gossipsub.validate.callCount).to.eql(3)
expect(await gossipsub.validate.getCall(2).returnValue).to.eql(true)
})
})
})
6 changes: 6 additions & 0 deletions ts/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,9 @@ export const GossipsubMaxIHaveMessages = 10
export const GossipsubIWantFollowupTime = 3 * second

export const TimeCacheDuration = 120 * 1000

export const enum ExtendedValidatorResult {
accept = 'accept',
reject = 'reject',
ignore = 'ignore'
}
101 changes: 72 additions & 29 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
ControlMessage, ControlIHave, ControlGraft, ControlIWant, ControlPrune
} from './message'
import * as constants from './constants'
import { ExtendedValidatorResult } from './constants'
import { Heartbeat } from './heartbeat'
import { getGossipPeers } from './getGossipPeers'
import { createGossipRpc, shuffle, hasGossipProtocol } from './utils'
Expand Down Expand Up @@ -280,49 +281,92 @@ class Gossipsub extends BasicPubsub {
* @param {Peer} peer
* @param {Message} msg
*/
_processRpcMessage (peer: Peer, msg: InMessage): void {
async _processRpcMessage (peer: Peer, msg: InMessage): Promise<void> {
const msgID = this.getMsgId(msg)

// Ignore if we've already seen the message
if (this.seenCache.has(msgID)) {
this.score.duplicateMessage(peer.id.toB58String(), msg as Message)
return
}
this.seenCache.put(msgID)

super._processRpcMessage(peer, msg)
this.score.validateMessage(peer.id.toB58String(), msg as Message)
await super._processRpcMessage(peer, msg)
}

/**
* Publish a message sent from a peer
* @override
* @param {Peer} peer
* @param {InMessage} msg
* @returns {void}
*/
_publishFrom (peer: Peer, msg: InMessage): void {
this.score.deliverMessage(peer.id.toB58String(), msg as Message)
const topics = msg.topicIDs

// If options.gossipIncoming is false, do NOT emit incoming messages to peers
if (!this._options.gossipIncoming) {
return
}

// Emit to floodsub peers
this.peers.forEach((peer) => {
if (peer.protocols.includes(constants.FloodsubID) &&
peer.id.toB58String() !== msg.from &&
utils.anyMatch(peer.topics, topics) &&
peer.isWritable
) {
peer.sendMessages(utils.normalizeOutRpcMessages([msg]))
this.log('publish msg on topics - floodsub', topics, peer.id.toB58String())
}
})
if (this._options.gossipIncoming) {
// Emit to floodsub peers
this.peers.forEach((peer) => {
if (peer.protocols.includes(constants.FloodsubID) &&
peer.id.toB58String() !== msg.from &&
utils.anyMatch(peer.topics, topics) &&
peer.isWritable
) {
peer.sendMessages(utils.normalizeOutRpcMessages([msg]))
this.log('publish msg on topics - floodsub', topics, peer.id.toB58String())
}
})

// Emit to peers in the mesh
topics.forEach((topic) => {
const meshPeers = this.mesh.get(topic)
if (!meshPeers) {
return
}
meshPeers.forEach((peer) => {
if (!peer.isWritable || peer.id.toB58String() === msg.from) {
// Emit to peers in the mesh
topics.forEach((topic) => {
const meshPeers = this.mesh.get(topic)
if (!meshPeers) {
return
}
peer.sendMessages(utils.normalizeOutRpcMessages([msg]))
this.log('publish msg on topic - meshsub', topic, peer.id.toB58String())
meshPeers.forEach((peer) => {
if (!peer.isWritable || peer.id.toB58String() === msg.from) {
return
}
peer.sendMessages(utils.normalizeOutRpcMessages([msg]))
this.log('publish msg on topic - meshsub', topic, peer.id.toB58String())
})
})
})
}

// Emit to self
super._publishFrom(peer, msg)
}

/**
* Coerse topic validator result to valid/invalid boolean
* Provide extended validator support
*
* @override
* @param {string} topic
* @param {Peer} peer
* @param {Message} message
* @param {unknown} result
* @returns {boolean}
*/
_processTopicValidatorResult (topic: string, peer: Peer, message: Message, result: unknown): boolean {
if (typeof result === 'string') {
// assume an extended topic validator result
switch (result) {
case ExtendedValidatorResult.accept:
return true
case ExtendedValidatorResult.reject:
this.score.rejectMessage(peer.id.toB58String(), message)
return false
case ExtendedValidatorResult.ignore:
this.score.ignoreMessage(peer.id.toB58String(), message)
return false
}
}
// assume a basic topic validator result
return super._processTopicValidatorResult(topic, peer, message, result)
}

/**
Expand Down Expand Up @@ -593,7 +637,6 @@ class Gossipsub extends BasicPubsub {
/**
* Publish messages
*
* Note: this function assumes all messages are well-formed RPC objects
* @override
* @param {Array<Message>} msgs
* @returns {void}
Expand Down
Loading