Skip to content
This repository was archived by the owner on Jun 27, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
"sinon": "^9.0.0"
},
"dependencies": {
"abort-controller": "^3.0.0",
"abortable-iterator": "^3.0.0",
"debug": "^4.1.1",
"err-code": "^2.0.0",
"it-length-prefixed": "^3.0.0",
Expand Down
62 changes: 34 additions & 28 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const PeerId = require('peer-id')
const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')

const message = require('./message')
const Peer = require('./peer')
const PeerStreams = require('./peerStreams')
const utils = require('./utils')
const {
signMessage,
Expand Down Expand Up @@ -88,14 +88,14 @@ class PubsubBaseProtocol extends EventEmitter {
/**
* Map of topics to which peers are subscribed to
*
* @type {Map<string, Peer>}
* @type {Map<string, Set<PeerStreams>>}
*/
this.topics = new Map()

/**
* Map of peers.
* Map of peer streams
*
* @type {Map<string, Peer>}
* @type {Map<string, PeerStreams>}
*/
this.peers = new Map()

Expand Down Expand Up @@ -125,9 +125,11 @@ class PubsubBaseProtocol extends EventEmitter {
this.log('starting')

// Incoming streams
// Called after a peer dials us
this.registrar.handle(this.multicodecs, this._onIncomingStream)

// register protocol with topology
// Topology callbacks called on connection manager changes
const topology = new MulticodecTopology({
multicodecs: this.multicodecs,
handlers: {
Expand Down Expand Up @@ -166,15 +168,16 @@ class PubsubBaseProtocol extends EventEmitter {
* @private
* @param {Object} props
* @param {string} props.protocol
* @param {DuplexStream} props.strean
* @param {Stream} props.stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @param {Stream} props.stream
* @param {IterableStream} props.stream

Per https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9, we should call this an IterableStream. This is not a node stream and might create confusion

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I called it DuplexIterableStream so its known to be a duplex object ({ sink, source })

* @param {Connection} props.connection connection
*/
_onIncomingStream ({ protocol, stream, connection }) {
const peerId = connection.remotePeer
const idB58Str = peerId.toB58String()
const peer = this._addPeer(peerId, [protocol])
const peer = this._addPeer(peerId, protocol)
peer.attachInboundStream(stream)

this._processMessages(idB58Str, stream, peer)
this._processMessages(idB58Str, peer.inboundStream, peer)
}

/**
Expand All @@ -187,11 +190,10 @@ class PubsubBaseProtocol extends EventEmitter {
const idB58Str = peerId.toB58String()
this.log('connected', idB58Str)

const peer = this._addPeer(peerId, this.multicodecs)

try {
const { stream } = await conn.newStream(this.multicodecs)
peer.attachConnection(stream)
const { stream, protocol } = await conn.newStream(this.multicodecs)
const peer = this._addPeer(peerId, protocol)
await peer.attachOutboundStream(stream)
} catch (err) {
this.log.err(err)
}
Expand All @@ -205,49 +207,50 @@ class PubsubBaseProtocol extends EventEmitter {
*/
_onPeerDisconnected (peerId, err) {
const idB58Str = peerId.toB58String()
const peer = this.peers.get(idB58Str)

this.log('connection ended', idB58Str, err ? err.message : '')
this._removePeer(peer)
this._removePeer(peerId)
}

/**
* Add a new connected peer to the peers map.
* Notifies the router that a peer has been connected
* @private
* @param {PeerId} peerId
* @param {Array<string>} protocols
* @returns {Peer}
* @param {string} protocol
* @returns {PeerStreams}
*/
_addPeer (peerId, protocols) {
_addPeer (peerId, protocol) {
const id = peerId.toB58String()
let existing = this.peers.get(id)

if (!existing) {
this.log('new peer', id)

const peer = new Peer({
const peer = new PeerStreams({
id: peerId,
protocols
protocol
})

this.peers.set(id, peer)
existing = peer

peer.once('close', () => this._removePeer(peer))
peer.once('close', () => this._removePeer(peerId))
}

return existing
}

/**
* Remove a peer from the peers map.
* Notifies the router that a peer has been disconnected.
* @private
* @param {Peer} peer peer state
* @returns {Peer}
* @param {PeerId} peerId
* @returns {PeerStreams | undefined}
*/
_removePeer (peer) {
_removePeer (peerId) {
if (!peerId) return
const id = peerId.toB58String()
const peer = this.peers.get(id)
if (!peer) return
const id = peer.id.toB58String()

this.log('delete peer', id)
this.peers.delete(id)
Expand Down Expand Up @@ -304,8 +307,11 @@ class PubsubBaseProtocol extends EventEmitter {
throw errcode(new Error('a string topic must be provided'), 'ERR_NOT_VALID_TOPIC')
}

return Array.from(this.peers.values())
.filter((peer) => peer.topics.has(topic))
const peersInTopic = this.topics.get(topic)
if (!peersInTopic) {
return []
}
return Array.from(peersInTopic)
.map((peer) => peer.id.toB58String())
}

Expand Down Expand Up @@ -360,7 +366,7 @@ class PubsubBaseProtocol extends EventEmitter {
* @abstract
* @param {string} idB58Str peer id string in base58
* @param {Connection} conn connection
* @param {Peer} peer A Pubsub Peer
* @param {PeerStreams} peer A Pubsub Peer
* @returns {void}
*
*/
Expand Down
202 changes: 0 additions & 202 deletions src/peer.js

This file was deleted.

Loading