Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixes for state system
  • Loading branch information
mkg20001 committed May 19, 2018
commit f39b4280ad50f8b6bc912f0209236ee998dc54a6
59 changes: 47 additions & 12 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,68 @@

const RPC = require('./rpc')
const noop = () => {}
const once = require('once')
const debug = require('debug')
const log = debug('libp2p:rendezvous')
const State = require('./state')
const {each, map} = require('async')

class RendezvousDiscovery {
constructor (swarm) {
this.swarm = swarm
this.rpc = []
this.rpcById = {}
this.state = new State(this)
this.swarm.on('peer:connect', peer => {
this._dial(peer)
})
}

_getState (id) {
id = id.toString('hex')
if (!this.state[id]) return (this.state[id] = new State(this, id))
return this.state[id]
}

_dial (pi, cb) {
if (!cb) cb = noop
cb = once(cb)
if (!this.state) return cb()
this._cleanPeers()
if (this.rpcById[pi.id.toB58String()]) {
log('skip reconnecting %s', pi.id.toB58String())
return cb()
}
this.swarm.dialProtocol(pi, '/rendezvous/1.0.0', (err, conn) => {
if (err) return cb(err)
const rpc = new RPC()
rpc.setup(conn, err => {
if (err) return cb(err)
this.state.manage(rpc)
this.state.syncState(cb)

this.rpc.push(rpc)
this.rpcById[rpc.id] = rpc

rpc.cursors = {}
rpc.registrations = {}

log('add new peer %s', rpc.id)
this._syncAll(cb)
})
})
}

_cleanPeers () {
this.rpc = this.rpc.filter(peer => {
if (peer.online) return true
log('drop disconnected peer %s', peer.id)
delete this.rpcById[peer.id]
return false
})
}

_syncAll (cb) {
each(Object.keys(this.state), (s, cb) => this.state[s].syncState(cb), cb)
}

register (ns, peer, ttl, cb) {
if (typeof ttl === 'function') {
cb = ttl
Expand All @@ -40,7 +75,7 @@ class RendezvousDiscovery {
peer = this.swarm.peerInfo
}

this.state.register(ns, peer, ttl, cb)
this._getState(peer.id.toBytes()).register(ns, peer, ttl, cb)
}

discover (ns, limit, /* cookie, */ cb) {
Expand All @@ -60,7 +95,7 @@ class RendezvousDiscovery {
ns = null
}

this.state.discover(ns, limit, cb)
this._cleanPeers()
}

unregister (ns, id) {
Expand All @@ -72,20 +107,20 @@ class RendezvousDiscovery {
id = this.swarm.peerInfo.id.toBytes()
}

this.state.unregister(ns, id)
this._getState(id).unregister(ns)
}

start (cb) {
this.state = new State(this)
this.state = {}
cb()
}

stop (cb) {
this.state.shutdown(err => {
if (err) return cb(err)
this.state = null
cb()
})
this.rpc.filter(rpc => rpc.online).forEach(rpc => rpc.end())
this.state = null
this.rpc = []
this.rpcById = {}
cb()
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class RPC {
register: []
}
}
end () {
if (this.online) {
this.source.end()
}
}
sink (read) {
const next = (end, msg, doend) => {
if (doend) {
Expand Down
44 changes: 22 additions & 22 deletions src/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ const {each, waterfall} = require('async')
const noop = () => {}
const once = require('once')

class State { // TODO: add multiple peer-id logic (maybe use different states per id?)
constructor (main) {
class State {
constructor (main, id) {
this.rpc = main.rpc
this.myId = id
this.byId = main.rpcById
this.cleanPeers = main._cleanPeers.bind(main)
this.registrations = []
this.regById = {}
}
Expand All @@ -20,43 +22,41 @@ class State { // TODO: add multiple peer-id logic (maybe use different states pe
this.registrations.push(this.regById[ns])
this.syncState(cb)
}
unregister (ns /*, id */) {
unregister (ns) {
if (!this.regById[ns]) throw new Error('NS ' + JSON.stringify(ns) + ' not registered!') // TODO: should this throw?
delete this.regById[ns]
this.registrations = this.registrations.filter(r => r.ns !== ns)
this.syncState(noop)
}
manage (rpc) {
this.rpc.push(rpc)
this.byId[rpc.id] = rpc

rpc.cursors = {}
rpc.registrations = []

log('manage peer %s', rpc.id)
rpcReg (rpc, set) {
if (set) {
rpc.registrations[this.myId] = set
}
if (!rpc.registrations[this.myId]) rpc.registrations[this.myId] = []
return rpc.registrations[this.myId]
}
syncState (cb) {
if (!cb) cb = noop
cb = once(cb)
this.rpc = this.rpc.filter(peer => {
if (peer.online) return true
log('drop disconnected peer %s', peer.id)
delete this.byId[peer.id]
return false
})
this.cleanPeers()
log('syncing state with %s peer(s)', this.rpc.length)
each(this.rpc, (rpc, cb) => {
let toRegister = this.registrations.filter(r => rpc.registrations.indexOf(r.ns) === -1)
let toUnregister = rpc.registrations.filter(r => !this.regById[r])
let toRegister = this.registrations.filter(r => this.rpcReg(rpc).indexOf(r.ns) === -1)
let toUnregister = this.rpcReg(rpc).filter(r => !this.regById[r])
waterfall([
cb => each(toRegister, (reg, cb) => {
log('sync@%s: register %s', rpc.id, reg.ns)
rpc.register(reg.ns, reg.peer, reg.ttl, cb)
}, e => cb(e)),
cb => each(toUnregister, (regId, cb) => {
log('sync@%s: unregister', rpc.id, regId)
delete rpc.cursors[regId]
rpc.register(regId, cb)
}, e => cb(e))
rpc.unregister(regId, Buffer.from(this.myId, 'hex')) // TODO: shouldn't this be async?
cb()
}, e => cb(e)),
cb => {
this.rpcReg(rpc, this.rpcReg(rpc).filter(r => toUnregister.indexOf(r) === -1).concat(toRegister.map(r => r.ns)))
cb()
}
], cb)
}, cb)
}
Expand Down
4 changes: 4 additions & 0 deletions test/discovery.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ describe('discovery', () => {
setTimeout(() => done(), 100) // Queue is being processed every 100ms
})

it('stop clients', done => {
parallel([client, client2].map(c => cb => c.stop(cb)), done)
})

it('gc', () => {
server.gc()
expect(Object.keys(server.table.NS)).to.have.lengthOf(0)
Expand Down