Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/circuit/auto-relay.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ class AutoRelay {

try {
await this._transportManager.listen([multiaddr(listenAddr)])
// TODO: push announce multiaddrs update
// await this._libp2p.identifyService.pushToPeerStore()
// Announce multiaddrs will update on listen success by TransportManager event being triggered
} catch (err) {
log.error(err)
this._listenRelays.delete(id)
Expand Down
4 changes: 2 additions & 2 deletions src/circuit/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ module.exports = (libp2p) => {
const deleted = listeningAddrs.delete(connection.remotePeer.toB58String())

if (deleted) {
// TODO push announce multiaddrs update
// libp2p.identifyService.pushToPeerStore()
// Announce listen addresses change
listener.emit('close')
}
})

Expand Down
50 changes: 15 additions & 35 deletions src/identify/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ class IdentifyService {
*/
this.connectionManager = libp2p.connectionManager

this.connectionManager.on('peer:connect', (connection) => {
const peerId = connection.remotePeer

this.identify(connection, peerId).catch(log.error)
})

/**
* @property {PeerId}
*/
Expand All @@ -82,6 +76,19 @@ class IdentifyService {
this._protocols = protocols

this.handleMessage = this.handleMessage.bind(this)

this.connectionManager.on('peer:connect', (connection) => {
const peerId = connection.remotePeer

this.identify(connection, peerId).catch(log.error)
})

// When self multiaddrs change, trigger identify-push
this.peerStore.on('change:multiaddrs', ({ peerId }) => {
if (peerId.toString() === this.peerId.toString()) {
this.pushToPeerStore()
}
})
}

/**
Expand All @@ -90,7 +97,7 @@ class IdentifyService {
* @returns {Promise<void>}
*/
async push (connections) {
const signedPeerRecord = await this._getSelfPeerRecord()
const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId)
const listenAddrs = this._libp2p.multiaddrs.map((ma) => ma.bytes)
const protocols = Array.from(this._protocols.keys())

Expand Down Expand Up @@ -239,7 +246,7 @@ class IdentifyService {
publicKey = this.peerId.pubKey.bytes
}

const signedPeerRecord = await this._getSelfPeerRecord()
const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId)

const message = Message.encode({
protocolVersion: PROTOCOL_VERSION,
Expand Down Expand Up @@ -308,33 +315,6 @@ class IdentifyService {
// Update the protocols
this.peerStore.protoBook.set(id, message.protocols)
}

/**
* Get self signed peer record raw envelope.
* @return {Uint8Array}
*/
async _getSelfPeerRecord () {
const selfSignedPeerRecord = this.peerStore.addressBook.getRawEnvelope(this.peerId)

// TODO: support invalidation when dynamic multiaddrs are supported
if (selfSignedPeerRecord) {
return selfSignedPeerRecord
}

try {
const peerRecord = new PeerRecord({
peerId: this.peerId,
multiaddrs: this._libp2p.multiaddrs
})
const envelope = await Envelope.seal(peerRecord, this.peerId)
this.peerStore.addressBook.consumePeerRecord(envelope)

return this.peerStore.addressBook.getRawEnvelope(this.peerId)
} catch (err) {
log.error('failed to get self peer record')
}
return null
}
}

module.exports.IdentifyService = IdentifyService
Expand Down
2 changes: 1 addition & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ class Libp2p extends EventEmitter {
log('libp2p is stopping')

try {
this._isStarted = false
for (const service of this._discovery.values()) {
service.removeListener('peer', this._onDiscoveryPeer)
}
Expand Down Expand Up @@ -274,7 +275,6 @@ class Libp2p extends EventEmitter {
this.emit('error', err)
}
}
this._isStarted = false
log('libp2p has stopped')
}

Expand Down
20 changes: 20 additions & 0 deletions src/record/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
'use strict'

const Envelope = require('./envelope')
const PeerRecord = require('./peer-record')

/**
* Create (or update if existing) self peer record and store it in the AddressBook.
* @param {libp2p} libp2p
* @returns {Promise<void>}
*/
async function updateSelfPeerRecord (libp2p) {
const peerRecord = new PeerRecord({
peerId: libp2p.peerId,
multiaddrs: libp2p.multiaddrs
})
const envelope = await Envelope.seal(peerRecord, libp2p.peerId)
libp2p.peerStore.addressBook.consumePeerRecord(envelope)
}

module.exports.updateSelfPeerRecord = updateSelfPeerRecord
10 changes: 10 additions & 0 deletions src/transport-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const debug = require('debug')
const log = debug('libp2p:transports')
log.error = debug('libp2p:transports:error')

const { updateSelfPeerRecord } = require('./record/utils')

class TransportManager {
/**
* @constructor
Expand Down Expand Up @@ -62,6 +64,8 @@ class TransportManager {
log('closing listeners for %s', key)
while (listeners.length) {
const listener = listeners.pop()
listener.removeAllListeners('listening')
listener.removeAllListeners('close')
tasks.push(listener.close())
}
}
Expand Down Expand Up @@ -150,6 +154,10 @@ class TransportManager {
const listener = transport.createListener({}, this.onConnection)
this._listeners.get(key).push(listener)

// Track listen/close events
listener.on('listening', () => updateSelfPeerRecord(this.libp2p))
listener.on('close', () => updateSelfPeerRecord(this.libp2p))

// We need to attempt to listen on everything
tasks.push(listener.listen(addr))
}
Expand Down Expand Up @@ -194,6 +202,8 @@ class TransportManager {
if (this._listeners.has(key)) {
// Close any running listeners
for (const listener of this._listeners.get(key)) {
listener.removeAllListeners('listening')
listener.removeAllListeners('close')
await listener.close()
}
}
Expand Down
23 changes: 15 additions & 8 deletions test/dialing/direct.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,28 @@ describe('Dialing (direct, TCP)', () => {
let peerStore
let remoteAddr

before(async () => {
const [remotePeerId] = await Promise.all([
PeerId.createFromJSON(Peers[0])
beforeEach(async () => {
const [localPeerId, remotePeerId] = await Promise.all([
PeerId.createFromJSON(Peers[0]),
PeerId.createFromJSON(Peers[1])
])

peerStore = new PeerStore({ peerId: remotePeerId })
remoteTM = new TransportManager({
libp2p: {
addressManager: new AddressManager({ listen: [listenAddr] })
addressManager: new AddressManager({ listen: [listenAddr] }),
peerId: remotePeerId,
peerStore
},
upgrader: mockUpgrader
})
remoteTM.add(Transport.prototype[Symbol.toStringTag], Transport)

peerStore = new PeerStore({ peerId: remotePeerId })
localTM = new TransportManager({
libp2p: {},
libp2p: {
peerId: localPeerId,
peerStore: new PeerStore({ peerId: localPeerId })
},
upgrader: mockUpgrader
})
localTM.add(Transport.prototype[Symbol.toStringTag], Transport)
Expand All @@ -64,7 +71,7 @@ describe('Dialing (direct, TCP)', () => {
remoteAddr = remoteTM.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`)
})

after(() => remoteTM.close())
afterEach(() => remoteTM.close())

afterEach(() => {
sinon.restore()
Expand Down Expand Up @@ -110,7 +117,7 @@ describe('Dialing (direct, TCP)', () => {
peerStore
})

peerStore.addressBook.set(peerId, [remoteAddr])
peerStore.addressBook.set(peerId, remoteTM.getAddrs())

const connection = await dialer.connectToPeer(peerId)
expect(connection).to.exist()
Expand Down
3 changes: 0 additions & 3 deletions test/dialing/direct.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ describe('Dialing (direct, WebSockets)', () => {
const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()

sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
sinon.spy(libp2p.peerStore.protoBook, 'set')

// Wait for onConnection to be called
Expand All @@ -363,8 +362,6 @@ describe('Dialing (direct, WebSockets)', () => {
expect(libp2p.identifyService.identify.callCount).to.equal(1)
await libp2p.identifyService.identify.firstCall.returnValue

// Self + New peer
expect(libp2p.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2)
expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1)
})

Expand Down
51 changes: 45 additions & 6 deletions test/identify/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const { expect } = chai
const sinon = require('sinon')

const { EventEmitter } = require('events')
const delay = require('delay')
const PeerId = require('peer-id')
const duplexPair = require('it-pair/duplex')
const multiaddr = require('multiaddr')
Expand All @@ -22,6 +21,7 @@ const Libp2p = require('../../src')
const Envelope = require('../../src/record/envelope')
const PeerStore = require('../../src/peer-store')
const baseOptions = require('../utils/base-options.browser')
const { updateSelfPeerRecord } = require('../../src/record/utils')
const pkg = require('../../package.json')

const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser')
Expand Down Expand Up @@ -78,6 +78,9 @@ describe('Identify', () => {
sinon.spy(localIdentify.peerStore.addressBook, 'consumePeerRecord')
sinon.spy(localIdentify.peerStore.protoBook, 'set')

// Transport Manager creates signed peer record
await updateSelfPeerRecord(remoteIdentify._libp2p)

// Run identify
await Promise.all([
localIdentify.identify(localConnectionMock),
Expand Down Expand Up @@ -239,6 +242,10 @@ describe('Identify', () => {
sinon.spy(remoteIdentify.peerStore.addressBook, 'consumePeerRecord')
sinon.spy(remoteIdentify.peerStore.protoBook, 'set')

// Transport Manager creates signed peer record
await updateSelfPeerRecord(localIdentify._libp2p)
await updateSelfPeerRecord(remoteIdentify._libp2p)

// Run identify
await Promise.all([
localIdentify.push([localConnectionMock]),
Expand All @@ -249,7 +256,7 @@ describe('Identify', () => {
})
])

expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(1)
expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2)
expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1)

const addresses = localIdentify.peerStore.addressBook.get(localPeer)
Expand Down Expand Up @@ -359,8 +366,8 @@ describe('Identify', () => {
expect(connection).to.exist()

// Wait for peer store to be updated
// Dialer._createDialTarget (add), Identify (consume), Create self (consume)
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 2 && peerStoreSpyAdd.callCount === 1)
// Dialer._createDialTarget (add), Identify (consume)
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
expect(libp2p.identifyService.identify.callCount).to.equal(1)

// The connection should have no open streams
Expand All @@ -381,8 +388,6 @@ describe('Identify', () => {

const connection = await libp2p.dialer.connectToPeer(remoteAddr)
expect(connection).to.exist()
// Wait for nextTick to trigger the identify call
await delay(1)

// Wait for identify to finish
await libp2p.identifyService.identify.firstCall.returnValue
Expand All @@ -404,5 +409,39 @@ describe('Identify', () => {
// Verify the streams close
await pWaitFor(() => connection.streams.length === 0)
})

it('should push multiaddr updates to an already connected peer', async () => {
libp2p = new Libp2p({
...baseOptions,
peerId
})

await libp2p.start()

sinon.spy(libp2p.identifyService, 'identify')
sinon.spy(libp2p.identifyService, 'push')

const connection = await libp2p.dialer.connectToPeer(remoteAddr)
expect(connection).to.exist()

// Wait for identify to finish
await libp2p.identifyService.identify.firstCall.returnValue
sinon.stub(libp2p, 'isStarted').returns(true)

libp2p.peerStore.addressBook.add(libp2p.peerId, [multiaddr('/ip4/180.0.0.1/tcp/15001/ws')])

// Verify the remote peer is notified of change
expect(libp2p.identifyService.push.callCount).to.equal(1)
for (const call of libp2p.identifyService.push.getCalls()) {
const [connections] = call.args
expect(connections.length).to.equal(1)
expect(connections[0].remotePeer.toB58String()).to.equal(remoteAddr.getPeerId())
const results = await call.returnValue
expect(results.length).to.equal(1)
}

// Verify the streams close
await pWaitFor(() => connection.streams.length === 0)
})
})
})
11 changes: 8 additions & 3 deletions test/relay/auto-relay.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ describe('auto-relay', () => {
it('should not listen on a relayed address if peer disconnects', async () => {
const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length

// Spy if identify push is fired on adding/removing listen addr
sinon.spy(relayLibp2p1.identifyService, 'pushToPeerStore')

// Discover one relay and connect
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
await relayLibp2p1.dial(relayLibp2p2.peerId)
Expand All @@ -268,16 +271,18 @@ describe('auto-relay', () => {
expect(autoRelay1._listenRelays.size).to.equal(1)
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String())

// Spy if identify push is fired
sinon.spy(relayLibp2p1.identifyService, 'pushToPeerStore')
// Identify push for adding listen relay multiaddr
expect(relayLibp2p1.identifyService.pushToPeerStore.callCount).to.equal(1)

// Disconnect from peer used for relay
await relayLibp2p1.hangUp(relayLibp2p2.peerId)

// Wait for removed listening on the relay
await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length)
expect(autoRelay1._listenRelays.size).to.equal(0)
// TODO: identify-push expect(relayLibp2p1.identifyService.pushToPeerStore.callCount).to.equal(1)

// Identify push for removing listen relay multiaddr
expect(relayLibp2p1.identifyService.pushToPeerStore.callCount).to.equal(2)
})

it('should try to listen on other connected peers relayed address if one used relay disconnects', async () => {
Expand Down
Loading