From a666cebf07cfeac0e38f5f5a3f6be8fe16113e61 Mon Sep 17 00:00:00 2001 From: Cayman Date: Wed, 15 Jul 2020 10:02:23 -0500 Subject: [PATCH 1/6] feat: refactor peer class --- package.json | 2 + src/index.js | 62 ++++++++------ src/peer.js | 202 -------------------------------------------- src/peerStreams.js | 197 ++++++++++++++++++++++++++++++++++++++++++ test/pubsub.spec.js | 6 +- 5 files changed, 236 insertions(+), 233 deletions(-) delete mode 100644 src/peer.js create mode 100644 src/peerStreams.js diff --git a/package.json b/package.json index 29b41fd..05124a0 100644 --- a/package.json +++ b/package.json @@ -52,6 +52,8 @@ "sinon": "^9.0.0" }, "dependencies": { + "abort-controller": "^3.0.0", + "abortable-iterator": "^3.0.0", "debug": "^4.1.1", "err-code": "^2.0.0", "it-length-prefixed": "^3.0.0", diff --git a/src/index.js b/src/index.js index 96741cb..8459bf5 100644 --- a/src/index.js +++ b/src/index.js @@ -8,7 +8,7 @@ const PeerId = require('peer-id') const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology') const message = require('./message') -const Peer = require('./peer') +const PeerStreams = require('./peerStreams') const utils = require('./utils') const { signMessage, @@ -88,14 +88,14 @@ class PubsubBaseProtocol extends EventEmitter { /** * Map of topics to which peers are subscribed to * - * @type {Map} + * @type {Map>} */ this.topics = new Map() /** - * Map of peers. + * Map of peer streams * - * @type {Map} + * @type {Map} */ this.peers = new Map() @@ -125,9 +125,11 @@ class PubsubBaseProtocol extends EventEmitter { this.log('starting') // Incoming streams + // Called after a peer dials us this.registrar.handle(this.multicodecs, this._onIncomingStream) // register protocol with topology + // Topology callbacks called on connection manager changes const topology = new MulticodecTopology({ multicodecs: this.multicodecs, handlers: { @@ -166,15 +168,16 @@ class PubsubBaseProtocol extends EventEmitter { * @private * @param {Object} props * @param {string} props.protocol - * @param {DuplexStream} props.strean + * @param {Stream} props.stream * @param {Connection} props.connection connection */ _onIncomingStream ({ protocol, stream, connection }) { const peerId = connection.remotePeer const idB58Str = peerId.toB58String() - const peer = this._addPeer(peerId, [protocol]) + const peer = this._addPeer(peerId, protocol) + peer.attachInboundStream(stream) - this._processMessages(idB58Str, stream, peer) + this._processMessages(idB58Str, peer.inboundStream, peer) } /** @@ -187,11 +190,10 @@ class PubsubBaseProtocol extends EventEmitter { const idB58Str = peerId.toB58String() this.log('connected', idB58Str) - const peer = this._addPeer(peerId, this.multicodecs) - try { - const { stream } = await conn.newStream(this.multicodecs) - peer.attachConnection(stream) + const { stream, protocol } = await conn.newStream(this.multicodecs) + const peer = this._addPeer(peerId, protocol) + await peer.attachOutboundStream(stream) } catch (err) { this.log.err(err) } @@ -205,49 +207,50 @@ class PubsubBaseProtocol extends EventEmitter { */ _onPeerDisconnected (peerId, err) { const idB58Str = peerId.toB58String() - const peer = this.peers.get(idB58Str) this.log('connection ended', idB58Str, err ? err.message : '') - this._removePeer(peer) + this._removePeer(peerId) } /** - * Add a new connected peer to the peers map. + * Notifies the router that a peer has been connected * @private * @param {PeerId} peerId - * @param {Array} protocols - * @returns {Peer} + * @param {string} protocol + * @returns {PeerStreams} */ - _addPeer (peerId, protocols) { + _addPeer (peerId, protocol) { const id = peerId.toB58String() let existing = this.peers.get(id) if (!existing) { this.log('new peer', id) - const peer = new Peer({ + const peer = new PeerStreams({ id: peerId, - protocols + protocol }) this.peers.set(id, peer) existing = peer - peer.once('close', () => this._removePeer(peer)) + peer.once('close', () => this._removePeer(peerId)) } return existing } /** - * Remove a peer from the peers map. + * Notifies the router that a peer has been disconnected. * @private - * @param {Peer} peer peer state - * @returns {Peer} + * @param {PeerId} peerId + * @returns {PeerStreams | undefined} */ - _removePeer (peer) { + _removePeer (peerId) { + if (!peerId) return + const id = peerId.toB58String() + const peer = this.peers.get(id) if (!peer) return - const id = peer.id.toB58String() this.log('delete peer', id) this.peers.delete(id) @@ -304,8 +307,11 @@ class PubsubBaseProtocol extends EventEmitter { throw errcode(new Error('a string topic must be provided'), 'ERR_NOT_VALID_TOPIC') } - return Array.from(this.peers.values()) - .filter((peer) => peer.topics.has(topic)) + const peersInTopic = this.topics.get(topic) + if (!peersInTopic) { + return [] + } + return Array.from(peersInTopic) .map((peer) => peer.id.toB58String()) } @@ -360,7 +366,7 @@ class PubsubBaseProtocol extends EventEmitter { * @abstract * @param {string} idB58Str peer id string in base58 * @param {Connection} conn connection - * @param {Peer} peer A Pubsub Peer + * @param {PeerStreams} peer A Pubsub Peer * @returns {void} * */ diff --git a/src/peer.js b/src/peer.js deleted file mode 100644 index 6163d85..0000000 --- a/src/peer.js +++ /dev/null @@ -1,202 +0,0 @@ -'use strict' - -const EventEmitter = require('events') - -const lp = require('it-length-prefixed') -const pushable = require('it-pushable') -const pipe = require('it-pipe') -const debug = require('debug') - -const log = debug('libp2p-pubsub:peer') -log.error = debug('libp2p-pubsub:peer:error') - -const { RPC } = require('./message') - -/** - * The known state of a connected peer. - */ -class Peer extends EventEmitter { - /** - * @param {PeerId} id - * @param {Array} protocols - */ - constructor ({ id, protocols }) { - super() - - /** - * @type {PeerId} - */ - this.id = id - /** - * @type {string} - */ - this.protocols = protocols - /** - * @type {Connection} - */ - this.conn = null - /** - * @type {Set} - */ - this.topics = new Set() - /** - * @type {Pushable} - */ - this.stream = null - } - - /** - * Is the peer connected currently? - * - * @type {boolean} - */ - get isConnected () { - return Boolean(this.conn) - } - - /** - * Do we have a connection to write on? - * - * @type {boolean} - */ - get isWritable () { - return Boolean(this.stream) - } - - /** - * Send a message to this peer. - * Throws if there is no `stream` to write to available. - * - * @param {Buffer} msg - * @returns {undefined} - */ - write (msg) { - if (!this.isWritable) { - const id = this.id.toB58String() - throw new Error('No writable connection to ' + id) - } - - this.stream.push(msg) - } - - /** - * Attach the peer to a connection and setup a write stream - * - * @param {Connection} conn - * @returns {void} - */ - async attachConnection (conn) { - const _prevStream = this.stream - if (_prevStream) { - // End the stream without emitting a close event - await _prevStream.end(false) - } - - this.stream = pushable({ - onEnd: (emit) => { - // close readable side of the stream - this.conn.reset && this.conn.reset() - this.conn = null - this.stream = null - if (emit !== false) { - this.emit('close') - } - } - }) - this.conn = conn - - pipe( - this.stream, - lp.encode(), - conn - ).catch(err => { - log.error(err) - }) - - // Only emit if the connection is new - if (!_prevStream) { - this.emit('connection') - } - } - - _sendRawSubscriptions (topics, subscribe) { - if (topics.size === 0) { - return - } - - const subs = [] - topics.forEach((topic) => { - subs.push({ - subscribe: subscribe, - topicID: topic - }) - }) - - this.write(RPC.encode({ - subscriptions: subs - })) - } - - /** - * Send the given subscriptions to this peer. - * @param {Set|Array} topics - * @returns {undefined} - */ - sendSubscriptions (topics) { - this._sendRawSubscriptions(topics, true) - } - - /** - * Send the given unsubscriptions to this peer. - * @param {Set|Array} topics - * @returns {undefined} - */ - sendUnsubscriptions (topics) { - this._sendRawSubscriptions(topics, false) - } - - /** - * Send messages to this peer. - * - * @param {Array} msgs - * @returns {undefined} - */ - sendMessages (msgs) { - this.write(RPC.encode({ - msgs: msgs - })) - } - - /** - * Bulk process subscription updates. - * - * @param {Array} changes - * @returns {undefined} - */ - updateSubscriptions (changes) { - changes.forEach((subopt) => { - if (subopt.subscribe) { - this.topics.add(subopt.topicID) - } else { - this.topics.delete(subopt.topicID) - } - }) - } - - /** - * Closes the open connection to peer - * @returns {void} - */ - close () { - // End the pushable - if (this.stream) { - this.stream.end() - } - - this.conn = null - this.stream = null - this.emit('close') - } -} - -module.exports = Peer diff --git a/src/peerStreams.js b/src/peerStreams.js new file mode 100644 index 0000000..7d2d4a4 --- /dev/null +++ b/src/peerStreams.js @@ -0,0 +1,197 @@ +'use strict' + +const EventEmitter = require('events') + +const lp = require('it-length-prefixed') +const pushable = require('it-pushable') +const pipe = require('it-pipe') +const abortable = require('abortable-iterator') +const AbortController = require('abort-controller') +const debug = require('debug') + +const log = debug('libp2p-pubsub:peer') +log.error = debug('libp2p-pubsub:peer:error') + +/** + * Thin wrapper around a peer's inbound / outbound pubsub streams + */ +class PeerStreams extends EventEmitter { + /** + * @param {PeerId} id + * @param {string} protocol + */ + constructor ({ id, protocol }) { + super() + + /** + * @type {PeerId} + */ + this.id = id + /** + * Established protocol + * @type {string} + */ + this.protocol = protocol + /** + * The raw outbound stream, as retrieved from conn.newStream + * @private + * @type {Stream} + */ + this._rawOutboundStream = null + /** + * The raw inbound stream, as retrieved from the callback from libp2p.handle + * @private + * @type {Stream} + */ + this._rawInboundStream = null + /** + * An AbortController for controlled shutdown of the inbound stream + * @private + * @type {AbortController} + */ + this._inboundAbortController = null + /** + * Write stream -- its preferable to use the write method + * @type {Pushable} + */ + this.outboundStream = null + /** + * Read stream + * @type {Stream} + */ + this.inboundStream = null + } + + /** + * Do we have a connection to read from? + * + * @type {boolean} + */ + get isReadable () { + return Boolean(this.inboundStream) + } + + /** + * Do we have a connection to write on? + * + * @type {boolean} + */ + get isWritable () { + return Boolean(this.outboundStream) + } + + /** + * Send a message to this peer. + * Throws if there is no `stream` to write to available. + * + * @param {Buffer} data + * @returns {undefined} + */ + write (data) { + if (!this.isWritable) { + const id = this.id.toB58String() + throw new Error('No writable connection to ' + id) + } + + this.outboundStream.push(data) + } + + /** + * Attach a raw inbound stream and setup a read stream + * + * @param {Stream} stream + * @returns {void} + */ + attachInboundStream (stream) { + // If an inbound stream already exists, + // use the abort controller to gently close it + const _prevStream = this.inboundStream + if (_prevStream) { + this._inboundAbortController.abort() + } + + // Create and attach a new inbound stream + // The inbound stream is: + // - abortable, set to only return on abort, rather than throw + // - transformed with length-prefix transform + this._inboundAbortController = new AbortController() + this._rawInboundStream = stream + this.inboundStream = abortable( + pipe( + this._rawInboundStream, + lp.decode() + ), + this._inboundAbortController.signal, + { returnOnAbort: true } + ) + + // Only emit if the connection is new + if (_prevStream) { + this.emit('stream:inbound') + } + } + + /** + * Attach a raw outbound stream and setup a write stream + * + * @param {Stream} stream + * @returns {Promise} + */ + async attachOutboundStream (stream) { + // If an outbound stream already exists, + // gently close it + const _prevStream = this.outboundStream + if (_prevStream) { + // End the stream without emitting a close event + await this.outboundStream.end(false) + } + + this._rawOutboundStream = stream + this.outboundStream = pushable({ + onEnd: (shouldEmit) => { + // close writable side of the stream + this._rawOutboundStream.reset && this._rawOutboundStream.reset() + this._rawOutboundStream = null + this.outboundStream = null + if (shouldEmit !== false) { + this.emit('close') + } + } + }) + + pipe( + this.outboundStream, + lp.encode(), + this._rawOutboundStream + ).catch(err => { + log.error(err) + }) + + // Only emit if the connection is new + if (_prevStream) { + this.emit('stream:outbound') + } + } + + /** + * Closes the open connection to peer + * @returns {void} + */ + close () { + // End the pushable + if (this.outboundStream) { + this.outboundStream.end() + } + if (this.inboundStream) { + this._inboundAbortController.abort() + } + + this._rawOutboundStream = null + this.outboundStream = null + this._rawInboundStream = null + this.inboundStream = null + this.emit('close') + } +} + +module.exports = PeerStreams diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 2f40dd4..5d2c5f7 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -8,7 +8,7 @@ const expect = chai.expect const sinon = require('sinon') const PubsubBaseProtocol = require('../src') -const Peer = require('../src/peer') +const PeerStreams = require('../src/peerStreams') const { randomSeqno } = require('../src/utils') const { createPeerId, @@ -340,10 +340,10 @@ describe('pubsub base protocol', () => { expect(peersSubscribed).to.be.empty() // Set mock peer subscribed - const peer = new Peer({ id: peerId }) + const peer = new PeerStreams({ id: peerId }) const id = peer.id.toB58String() - peer.topics.add(topic) + pubsub.topics.set(topic, new Set([peer])) pubsub.peers.set(id, peer) peersSubscribed = pubsub.getSubscribers(topic) From be7ee92a8dde3d8ddaab9f064fe8f4bb1c4b581d Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 16 Jul 2020 11:49:25 -0500 Subject: [PATCH 2/6] chore: rename peer to peer streams --- src/index.js | 34 ++++++++++++++++++---------------- src/peerStreams.js | 4 ++-- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/index.js b/src/index.js index 8459bf5..e364257 100644 --- a/src/index.js +++ b/src/index.js @@ -156,7 +156,7 @@ class PubsubBaseProtocol extends EventEmitter { await this.registrar.unregister(this._registrarId) this.log('stopping') - this.peers.forEach((peer) => peer.close()) + this.peers.forEach((peerStreams) => peerStreams.close()) this.peers = new Map() this.started = false @@ -221,23 +221,25 @@ class PubsubBaseProtocol extends EventEmitter { */ _addPeer (peerId, protocol) { const id = peerId.toB58String() - let existing = this.peers.get(id) + const existing = this.peers.get(id) + // If peer streams already exists, do nothing + if (existing) { + return existing + } - if (!existing) { - this.log('new peer', id) + // else create a new peer streams - const peer = new PeerStreams({ - id: peerId, - protocol - }) + this.log('new peer', id) - this.peers.set(id, peer) - existing = peer + const peerStreams = new PeerStreams({ + id: peerId, + protocol + }) - peer.once('close', () => this._removePeer(peerId)) - } + this.peers.set(id, peerStreams) + peerStreams.once('close', () => this._removePeer(peerId)) - return existing + return peerStreams } /** @@ -249,13 +251,13 @@ class PubsubBaseProtocol extends EventEmitter { _removePeer (peerId) { if (!peerId) return const id = peerId.toB58String() - const peer = this.peers.get(id) - if (!peer) return + const peerStreams = this.peers.get(id) + if (!peerStreams) return this.log('delete peer', id) this.peers.delete(id) - return peer + return peerStreams } /** diff --git a/src/peerStreams.js b/src/peerStreams.js index 7d2d4a4..f55c41d 100644 --- a/src/peerStreams.js +++ b/src/peerStreams.js @@ -9,8 +9,8 @@ const abortable = require('abortable-iterator') const AbortController = require('abort-controller') const debug = require('debug') -const log = debug('libp2p-pubsub:peer') -log.error = debug('libp2p-pubsub:peer:error') +const log = debug('libp2p-pubsub:peer-streams') +log.error = debug('libp2p-pubsub:peer-streams:error') /** * Thin wrapper around a peer's inbound / outbound pubsub streams From 795fe71b21a8747045d06bd692af6ba10802c57f Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 16 Jul 2020 11:51:39 -0500 Subject: [PATCH 3/6] chore: fix stream events emitted --- src/peerStreams.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/peerStreams.js b/src/peerStreams.js index f55c41d..45d76ab 100644 --- a/src/peerStreams.js +++ b/src/peerStreams.js @@ -126,7 +126,7 @@ class PeerStreams extends EventEmitter { ) // Only emit if the connection is new - if (_prevStream) { + if (!_prevStream) { this.emit('stream:inbound') } } @@ -168,7 +168,7 @@ class PeerStreams extends EventEmitter { }) // Only emit if the connection is new - if (_prevStream) { + if (!_prevStream) { this.emit('stream:outbound') } } From 20c62b18e4bec1b13da864dcb18096afac15b98f Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 16 Jul 2020 12:00:15 -0500 Subject: [PATCH 4/6] chore: clarify iterable stream --- src/index.js | 2 +- src/peerStreams.js | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/index.js b/src/index.js index e364257..2b55ae7 100644 --- a/src/index.js +++ b/src/index.js @@ -168,7 +168,7 @@ class PubsubBaseProtocol extends EventEmitter { * @private * @param {Object} props * @param {string} props.protocol - * @param {Stream} props.stream + * @param {DuplexIterableStream} props.stream * @param {Connection} props.connection connection */ _onIncomingStream ({ protocol, stream, connection }) { diff --git a/src/peerStreams.js b/src/peerStreams.js index 45d76ab..162ee69 100644 --- a/src/peerStreams.js +++ b/src/peerStreams.js @@ -35,13 +35,13 @@ class PeerStreams extends EventEmitter { /** * The raw outbound stream, as retrieved from conn.newStream * @private - * @type {Stream} + * @type {DuplexIterableStream} */ this._rawOutboundStream = null /** * The raw inbound stream, as retrieved from the callback from libp2p.handle * @private - * @type {Stream} + * @type {DuplexIterableStream} */ this._rawInboundStream = null /** @@ -57,7 +57,7 @@ class PeerStreams extends EventEmitter { this.outboundStream = null /** * Read stream - * @type {Stream} + * @type {DuplexIterableStream} */ this.inboundStream = null } @@ -99,7 +99,7 @@ class PeerStreams extends EventEmitter { /** * Attach a raw inbound stream and setup a read stream * - * @param {Stream} stream + * @param {DuplexIterableStream} stream * @returns {void} */ attachInboundStream (stream) { From 9d9c3c8006467436a38eada4067a54d1c4118ba0 Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 16 Jul 2020 12:04:20 -0500 Subject: [PATCH 5/6] chore: close peer streams in removePeer --- src/index.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/index.js b/src/index.js index 2b55ae7..2625374 100644 --- a/src/index.js +++ b/src/index.js @@ -254,6 +254,11 @@ class PubsubBaseProtocol extends EventEmitter { const peerStreams = this.peers.get(id) if (!peerStreams) return + // close peer streams + peerStreams.removeAllListeners() + peerStreams.close() + + // delete peer streams this.log('delete peer', id) this.peers.delete(id) From 559a72b8c7ae33e5b342b9da166bf08ff86bffa6 Mon Sep 17 00:00:00 2001 From: Cayman Date: Thu, 16 Jul 2020 12:04:46 -0500 Subject: [PATCH 6/6] chore: improve peerStreams#close comments --- src/peerStreams.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/peerStreams.js b/src/peerStreams.js index 162ee69..ac1ab1f 100644 --- a/src/peerStreams.js +++ b/src/peerStreams.js @@ -178,10 +178,11 @@ class PeerStreams extends EventEmitter { * @returns {void} */ close () { - // End the pushable + // End the outbound stream if (this.outboundStream) { this.outboundStream.end() } + // End the inbound stream if (this.inboundStream) { this._inboundAbortController.abort() }