diff --git a/.aegir.js b/.aegir.js new file mode 100644 index 0000000..924bd21 --- /dev/null +++ b/.aegir.js @@ -0,0 +1,22 @@ +'use strict' + +const Utils = require('./test/utils.peer') + +let Server + +async function pre (done) { + Server = await Utils.createServer(require('./test/server.id.json')) + done() +} + +function post (done) { + Server.stop() + Server.swarm.stop(done) +} + +module.exports = { + hooks: { + pre, + post + } +} diff --git a/package.json b/package.json index 8e3af76..a5abc40 100644 --- a/package.json +++ b/package.json @@ -3,9 +3,15 @@ "version": "0.0.0", "description": "A javascript implementation of the rendezvous protocol for libp2p", "leadMaintainer": "Vasco Santos ", - "main": "index.js", + "main": "src/index.js", "scripts": { - "test": "aegir test" + "test": "aegir test", + "test:node": "aegir test -t node", + "test:browser": "aegir test -t browser", + "test:webworker": "aegir test -t webworker" + }, + "browser": { + "test/utils.js": "./test/utils.browser.js" }, "keywords": [ "libp2p", @@ -18,16 +24,20 @@ "dependencies": { "chai": "^4.1.2", "dirty-chai": "^2.0.1", + "immutable": "4.0.0-rc.9", + "promisify-es6": "^1.0.3", "protons": "^1.0.1", - "pull-protocol-buffers": "^0.1.2" + "pull-protocol-buffers": "^0.1.2", + "pull-through": "^1.0.18" }, "devDependencies": { - "aegir": "^13.1.0", - "libp2p": "^0.20.2", + "aegir": "^14.0.0", + "libp2p": "^0.20.4", "libp2p-mplex": "^0.7.0", "libp2p-secio": "^0.10.0", "libp2p-spdy": "^0.12.1", - "libp2p-tcp": "^0.12.0" + "libp2p-tcp": "^0.12.0", + "libp2p-websockets": "^0.12.0" }, "repository": { "type": "git", diff --git a/src/client.js b/src/client.js new file mode 100644 index 0000000..59bd01a --- /dev/null +++ b/src/client.js @@ -0,0 +1,204 @@ +'use strict' + +const Sync = require('./sync') +const RPC = require('./rpc') +const pull = require('pull-stream') + +const debug = require('debug') +const log = debug('libp2p:rendezvous:client') +const {parallel, map} = require('async') + +class Client { + constructor (swarm) { + this.swarm = swarm + this.store = Sync.create() + this._dialLock = {} + this._failedCache = {} + } + + dial (peer) { + const id = peer.id.toB58String() + this.sync() + + // check if we need to dial + if (this._failedCache[id]) return log('not dialing %s because dial previously failed', id) + if (this._dialLock[id]) return log('not dialing %s because dial is already in progress', id) + if (Sync.getPoint(this.store, id)) return log('not dialing %s because peer is already connected', id) + + this._dialLock[id] = true // prevent race + log('dialing %s', id) + + const cb = (err) => { + delete this._dialLock[id] + + if (err) { + log('dialing %s failed: %s', err) + this._failedCache[id] = true + return + } + + log('dialing %s succeeded', id) + this.sync() + } + + // do the actual dialing + this.swarm.dialProtocol(peer, '/p2p/rendezvous/1.0.0', (err, conn) => { + if (err) return cb(err) + + conn.getPeerInfo((err, pi) => { + if (err) return cb(err) + + let rpc = RPC(pi, this) + + pull( + conn, + rpc, + conn + ) + + this.store = Sync.addPoint(this.store, id, rpc.rpc) + cb() + }) + }) + } + + sync () { + if (this._syncLock) { + this._needResync = true + return + } + this._syncLock = true + log('syncing') + this.store = Sync.clearPoints(this.store) + let actions = [] // async rpc calls + + // adds register / unregsiter actions to "actions" array + /* + pseudo-code: + + for all store.points as point: + for all point.registrations as pReg: + if store.registrations does not contain pReg: + actions push "unregister pReg @ point" + delete point.registrations[pReg] + for all store.registrations as reg: + if point.registrations does not contain reg: + actions push "register reg @ point" + set point.registrations[reg] + */ + + let points = this.store.get('points') + let registrations = this.store.get('registrations') + + this.store = this.store.set('points', points.reduce((points, point, id) => { + let regs = point.get('registrations') + + regs = regs.reduce((regs, pReg, pRegId) => { + if (!registrations.get(pRegId)) { + log('sync: unregister@%s: %s', id, pRegId) + actions.push(cb => point.toJS().rpc().unregister(pRegId, pReg.peer.id.toBytes(), cb)) + return regs.delete(pRegId) + } + + return regs + }, regs) + + regs = registrations.reduce((regs, reg, regId) => { + if (!regs.get(regId)) { + log('sync: register@%s: %s', id, regId) + actions.push(cb => point.toJS().rpc().register(regId, reg.peer, reg.ttl, cb)) + return regs.set(regId, reg) + } + + return regs + }, regs) + + return points.set(id, point.set('registrations', regs)) + }, points)) + + log('do sync') + parallel(actions, (err) => { + log('done sync') + delete this._syncLock + + if (err) { + log(err) // ??? + } + + if (this._needResync) { + delete this._needResync + this.sync() + } + }) + } + + register (ns, peer, ttl, cb) { + if (typeof ttl === 'function') { + cb = ttl + ttl = 0 + } + if (typeof peer === 'function') { + ttl = 0 + cb = peer + peer = this.swarm.peerInfo + } + + this.store = this.store.set('registrations', this.store.get('registrations').set(ns, {peer, ttl})) + this.sync() + } + + _discover (peerID, ns, limit, cb) { + if (typeof limit === 'function') { + cb = limit + limit = 0 + } + if (typeof ns === 'function') { + limit = 0 + cb = ns + ns = null + } + + log('discover@%s: %s limit=%s', peerID, ns || '', limit) + + let point = this.store.get('points').get(peerID) + if (!point || !point.get('rpc')().online()) { + return cb(new Error('Point not connected!')) + } + + point.get('rpc')().discover(ns, limit, point.get('cookies').get(ns) || Buffer.from(''), (err, res) => { + if (err) return cb(err) + this.store.set('points', + this.store.get('points').set(peerID, + this.store.get('points').get(peerID).set('cookies', + this.store.get('points').get(peerID).get('cookies').set(ns, res.cookie)))) + return cb(null, res.peers) + }) + } + + discover (ns, cb) { + if (typeof ns === 'function') { + cb = ns + } + + let ids = this.store.get('points').toArray().map(p => p[0]) + + map(ids, + (peerID, cb) => this._discover(peerID, ns, 0, (err, res) => err ? cb(null, []) : cb(null, res)), + (err, res) => err ? cb(err) : cb(null, res.reduce((a, b) => a.concat(b), []))) + } + + unregister (ns, id) { + if (!ns) { + id = this.swarm.peerInfo.id.toBytes() + ns = null + } + if (!id) { + id = this.swarm.peerInfo.id.toBytes() + } + + this.store = this.store.set('registrations', this.store.get('registrations').delete(ns)) + this.sync() + } +} + +module.exports = Client diff --git a/src/index.js b/src/index.js index fcbf075..72169a9 100644 --- a/src/index.js +++ b/src/index.js @@ -1,77 +1,64 @@ 'use strict' -const RPC = require('./rpc') +const debug = require('debug') +const log = debug('libp2p:rendezvous') const noop = () => {} -class RendezvousDiscovery { - constructor (swarm) { - this.swarm = swarm - this.peers = [] - } - - _dial (pi, cb) { - if (!cb) cb = noop - this.swarm.dialProtocol(pi, '/rendezvous/1.0.0', (err, conn) => { - if (err) return cb(err) - const rpc = new RPC() - rpc.setup(conn, err => { - if (err) return cb(err) - this.peers.push(rpc) - cb() - }) - }) - } - - _rpc (cmd, ...a) { // TODO: add. round-robin / multicast / anycast? - this.peers[0][cmd](...a) - } +const Client = require('./client') +const EE = require('events').EventEmitter - register (ns, peer, cb) { - this._rpc('register', ns, peer, 0, cb) // TODO: interface does not expose ttl option?! +class RendezvousDiscovery extends EE { + constructor (swarm, opt) { + super() + this._client = new Client(swarm, opt) + this._discover = {} + this.swarm = swarm + this.tag = 'rendezvous' } - - discover (ns, limit, cookie, cb) { - if (typeof cookie === 'function') { - cb = cookie - cookie = Buffer.from('') - } - if (typeof limit === 'function') { - cookie = Buffer.from('') - cb = limit - limit = 0 + start (cb) { + log('start') + this._loop = setInterval(this._discoverLoop.bind(this), 10 * 1000) + this.swarm.on('peer:connect', (peer) => this._client.dial(peer)) + this.swarm.on('peer:disconnect', () => this._client.sync()) + if (cb) { + cb() } - if (typeof ns === 'function') { - cookie = Buffer.from('') - limit = 0 - cb = ns - ns = null + } + stop (cb) { + log('stop') + clearInterval(this._loop) + this._client.stop() + if (cb) { + cb() } - - this._rpc('discover', ns, limit, cookie, cb) } - - unregister (ns, id) { + // TODO: https://github.com/libp2p/specs/issues/47 + register (ns) { if (!ns) { - id = this.swarm.peerInfo.id.toBytes() - ns = null - } - if (!id) { - id = this.swarm.peerInfo.id.toBytes() + ns = null // need cannonical form of "empty" } - - this._rpc('unregister', ns, id) + log('register', ns) + this._discover[ns] = true + this._client.register(ns, noop) } - - start (cb) { - this.swarm.on('peer:connect', peer => { - this._dial(peer) - }) - cb() + unregister (ns) { + if (!ns) { + ns = null // need cannonical form of "empty" + } + log('unregister', ns) + delete this._discover[ns] + this._client.unregister(ns, noop) } - - stop (cb) { - // TODO: shutdown all conns - cb() + _discoverLoop() { + log('discover loop') + for (const ns in this._discover) { + this._client.discover(ns, (err, peers) => { + peers.forEach(peer => { + this.emit('peer', peer) + this.emit(ns ? 'ns:' + ns : 'global', peer) + }) + }) + } } } diff --git a/src/rpc.js b/src/rpc.js index 5d0781b..f069277 100644 --- a/src/rpc.js +++ b/src/rpc.js @@ -5,7 +5,7 @@ const ppb = require('pull-protocol-buffers') const {Message, MessageType} = require('./proto') const Pushable = require('pull-pushable') const debug = require('debug') -const log = debug('libp2p-rendezvous:rpc') +const log = debug('libp2p:rendezvous:rpc') const Peer = require('peer-info') const Id = require('peer-id') const once = require('once') @@ -22,133 +22,125 @@ function wrap (f, t) { return cb } -class RPC { - constructor () { - this.source = Pushable() - this.cbs = { - discover: [], - register: [] +function sendRequest (input, type, data) { + input.push({ + type: MessageType[type.toUpperCase()], + [type.toLowerCase()]: data + }) +} + +const rpcCommands = { + register: (input, cbs, ns, peer, ttl, cb) => { + sendRequest(input, 'register', { + ns, + peer: { + id: peer.id.toBytes(), + addrs: peer.multiaddrs.toArray().map(a => a.buffer) + }, + ttl + }) + + cbs.push(wrap(cb, TIMEOUT)) + }, + discover: (input, cbs, ns, limit, cookie, cb) => { + sendRequest(input, 'discover', { + ns, + limit, + cookie + }) + + cbs.push(wrap(cb, TIMEOUT)) + }, + unregister: (input, cbs, ns, id, cb) => { + sendRequest(input, 'unregister', { + ns, + id + }) + + if (typeof cb === 'function') { // simulate cb + setImmediate(() => cb()) } } - sink (read) { - const next = (end, msg, doend) => { - if (doend) { - log('crash@%s: %s', this.id, doend) - return read(doend, next) - } - if (end) { - this.online = false - log('end@%s: %s', this.id, end) - this.source.end() - return - } - let f - let pi - switch (msg.type) { - case MessageType.REGISTER_RESPONSE: - f = this.cbs.register.shift() - if (typeof f !== 'function') { - log('register@%s: response ignored, no cb found!', this.id) - return read(null, next) - } else { - let e - if (msg.registerResponse.status) { - e = new Error('Server returned error: ' + (msg.registerResponse.statusText || '(unknown code)')) - } - f(e) - } - break - case MessageType.DISCOVER_RESPONSE: - try { - f = this.cbs.discover.shift() - if (typeof f !== 'function') { - log('discover@%s: response ignored, no cb found!', this.id) - return read(null, next) - } else { - if (msg.discoverResponse.status) { - return setImmediate(() => f(new Error('Server returned error: ' + (msg.discoverResponse.statusText || '(unknown code)')))) - } - pi = msg.discoverResponse.registrations.map(p => { - try { - // TODO: use other values like ttl/ns in peer-info? - const pi = new Peer(new Id(p.peer.id)) - p.peer.addrs.forEach(a => pi.multiaddrs.add(a)) - return pi - } catch (e) { - log('discover@%s: invalid pi returned: %s', this.id, e) - } - }).filter(Boolean) - setImmediate(() => f(null, { - cookie: msg.discoverResponse.cookie, - peers: pi - })) - } - } catch (e) { - f(e) - return next(null, null, e) - } - break - default: // should that disconnect or just get ignored? - log('error@%s: sent wrong msg type %s', this.id, msg.type) - return next(null, null, true) - } - read(null, next) +} + +const handlers = { + [MessageType.REGISTER_RESPONSE]: (msg) => { + let res = [] + const {status, statusText} = msg.registerResponse + if (status) { + res.push(new Error('Server returned error: ' + (statusText || '(unknown error)'))) } - read(null, next) - } - setup (conn, cb) { - conn.getPeerInfo((err, pi) => { - if (err) return cb(err) - this.pi = pi - this.id = pi.id.toB58String() - pull( - conn, - ppb.decode(Message), - this, - ppb.encode(Message), - conn - ) - - this.online = true - cb() - }) - } + return res + }, + [MessageType.DISCOVER_RESPONSE]: (msg) => { + let res = [] + const {cookie, status, statusText, registrations} = msg.discoverResponse - register (ns, peer, ttl, cb) { - this.source.push({ - type: MessageType.REGISTER, - register: { - ns, - peer: { - id: peer.id.toBytes(), - addrs: peer.multiaddrs.toArray().map(a => a.buffer) - }, - ttl - } - }) - this.cbs.register.push(wrap(cb, TIMEOUT)) + if (status) { + res.push(new Error('Server returned error: ' + (statusText || '(unknown error)'))) + } else { + res.push(null) + + let peers = registrations.map(p => { + try { + const pi = new Peer(new Id(p.peer.id)) + p.peer.addrs.forEach(a => pi.multiaddrs.add(a)) + return pi + } catch (e) { + log('discover: invalid pi ignored: %s', e) + } + }).filter(Boolean) + + res.push({ + cookie, + peers + }) + } + + return res } +} + +const RPC = () => { + const input = Pushable() + let cbs = [] - discover (ns, limit, cookie, cb) { - this.source.push({ - type: MessageType.DISCOVER, - discover: { - ns, - limit, - cookie + const methods = {} + for (const p in rpcCommands) { + methods[p] = (...a) => { + if (!online) { + let f = a.pop() + if (typeof f === 'function') { + f(new Error('Offline!')) + } + return } - }) - this.cbs.discover.push(wrap(cb, TIMEOUT)) + rpcCommands[p](input, cbs, ...a) + } } - unregister (ns, id) { - this.source.push({ - type: MessageType.UNREGISTER, - unregister: { - ns, - id - } - }) + let online = true + methods.online = () => online + + return { + source: pull( + input, + ppb.encode(Message) + ), + sink: pull( + ppb.decode(Message), + pull.drain(data => { + let cb = cbs.shift() + if (!cb) return log('ignore rpc, no cb') + let handler = handlers[data.type] + if (handler) { + cb(...handler(data)) + } else { + log('no response handler for %s', data.type) + } + }, () => (online = false)) + ), + rpc: () => methods } } diff --git a/src/server/index.js b/src/server/index.js index e7d7ca7..85c7a40 100644 --- a/src/server/index.js +++ b/src/server/index.js @@ -1,34 +1,34 @@ 'use strict' -// const {waterfall} = require('async') -const RPC = require('./rpc') const debug = require('debug') const log = debug('libp2p:rendezvous:server') -const AsyncQueue = require('./queue') -const BasicStore = require('./store/basic') +const pull = require('pull-stream') + +const ImmutableStore = require('./store/immutable') +const RPC = require('./rpc') class Server { constructor (opt) { if (!opt) opt = {} - this.node = opt.node - this.config = opt.config - this.que = new AsyncQueue() - this.table = { - NS: {}, - RPC: {} - } - const Store = opt.store || BasicStore - this.store = new Store(this) - this._stubNS = this.store.create(Buffer.alloc(256, '0').toString()) + this.swarm = opt.swarm + this.Store = opt.store || ImmutableStore + this.store = this.Store.createStore(opt.storeConfig || {}) + this.gcTime = opt.gcIntv || 60 * 1000 } start () { - this.gcIntv = setInterval(this.gc.bind(this), 60 * 1000) - this.node.handle('/rendezvous/1.0.0', (proto, conn) => { - const rpc = new RPC(this) - rpc.setup(conn, err => { + this.gcIntv = setInterval(this.gc.bind(this), this.gcTime) + this.swarm.handle('/p2p/rendezvous/1.0.0', (proto, conn) => { + conn.getPeerInfo((err, pi) => { if (err) return log(err) - this.storeRPC(rpc) + + log('rpc from %s', pi.id.toB58String()) + + pull( + conn, + RPC(pi, this), + conn + ) }) }) } @@ -36,37 +36,11 @@ class Server { stop () { clearInterval(this.gcIntv) // TODO: clear vars, shutdown conns, etc. - this.node.unhandle('/rendezvous/1.0.0') - } - - storeRPC (rpc) { - // TODO: should a peer that's connected twice be overriden or rejected? - this.table.RPC[rpc.id] = rpc - // TODO: remove on disconnect - } - - getNS (name, create) { - if (!this.table.NS[name]) { - if (create) { - return (this.table.NS[name] = this.store.create(name)) - } else { - return this._stubNS - } - } - return this.table.NS[name] + this.swarm.unhandle('/p2p/rendezvous/1.0.0') } gc () { - Object.keys(this.table.NS).forEach(ns => { - const n = this.table.NS[ns] - const removed = n.gc() - if (n.useless) { - log('drop NS %s because it is empty', n.name) - delete this.table.NS[ns] - } else { - if (removed) n.update() - } - }) + this.store = this.Store.clearEmptyNamespaces(this.Store.clearExpiredAll(this.store, Date.now())) } } diff --git a/src/server/queue.js b/src/server/queue.js deleted file mode 100644 index f12b5c6..0000000 --- a/src/server/queue.js +++ /dev/null @@ -1,33 +0,0 @@ -'use strict' - -const debug = require('debug') -const log = debug('libp2p:rendezvous:queue') - -class AsyncQueue { - constructor () { - this.tasks = [] - this.taskIds = {} - this.triggered = false - } - add (name, fnc) { - if (this.taskIds[name]) return - log('queueing %s', name) - this.taskIds[name] = true - this.tasks.push(fnc) - this.trigger() - } - trigger () { - if (this.triggered) return - this.triggered = true - setTimeout(() => { - log('exec') - this.tasks.forEach(f => f()) - this.tasks = [] - this.taskIds = {} - this.triggered = false - log('exec done') - }, 100).unref() - } -} - -module.exports = AsyncQueue diff --git a/src/server/rpc.js b/src/server/rpc.js index fd49d2d..e53a179 100644 --- a/src/server/rpc.js +++ b/src/server/rpc.js @@ -5,14 +5,16 @@ const ppb = require('pull-protocol-buffers') const {Message, MessageType, ResponseStatus} = require('../proto') const Pushable = require('pull-pushable') const debug = require('debug') -const log = debug('libp2p-rendezvous:server:rpc') +const log = debug('libp2p:rendezvous:server:rpc') const Peer = require('peer-info') const Id = require('peer-id') +const through = require('pull-through') const MAX_NS_LENGTH = 255 // TODO: spec this -const MAX_LIMIT = 1000 // TODO: spec this +const MAX_DISCOVER_LIMIT = 1000 // TODO: spec this const registerErrors = { + 0: 'OK', 100: 'Invalid namespace provided', 101: 'Invalid peer-info provided', 102: 'Invalid TTL provided', @@ -21,139 +23,154 @@ const registerErrors = { 300: 'Internal Server Error' } -const craftStatus = (status) => { +const makeStatus = (status) => { return { status, statusText: registerErrors[status] } } -class RPC { - constructor (main) { - this.main = main - this.source = Pushable() - } - sink (read) { - const next = (end, msg, doend) => { - if (doend) { - log('crash@%s: %s', this.id, doend) - return read(doend, next) +const makeResponse = (type, data) => { + let o = { type: MessageType[type.toUpperCase() + '_RESPONSE'] } + o[type.toLowerCase() + 'Response'] = data + return o +} + +const handlers = { // a handler takes (peerInfo, peerIdAsB58String, StoreClass, store, msg, peerIsOnline) and returns [newStore, responseOrNull] + [MessageType.REGISTER]: (pi, id, Store, store, msg, isOnline) => { + let {ns, peer, ttl} = msg.register + log('register@%s: trying register on %s', id, ns || '') + if (peer.id && new Id(peer.id).toB58String() !== id) { // check if this peer really owns address (TODO: get rid of that) + log('register@%s: auth err (want %s)', id, new Id(peer.id).toB58String()) + return [store, makeResponse('request', makeStatus(ResponseStatus.E_NOT_AUTHORIZED))] + } else if (!peer.id) { + peer.id = pi.id.toBytes() // field is optional so add it before creating the record + } + + if (ns > MAX_NS_LENGTH) { + log('register@%s: ns invalid', id) + return [store, makeResponse('register', makeStatus(ResponseStatus.E_INVALID_NAMESPACE))] + } + + pi = new Peer(new Id(peer.id)) + peer.addrs.forEach(a => pi.multiaddrs.add(a)) + + if (!ttl) { + ttl = isOnline + } + + let record = { + peer: pi, + ttl, + received_at: Date.now() + } + + if (ns) { + store = Store.addPeerToNamespace(Store.createNamespace(store, ns), ns, record) // TODO: should this add to global ns too? + } else { + store = Store.addPeer(store, record) + } + + log('register@%s: registered on %s', id, ns || '') + + return [store, makeResponse('register', makeStatus(ResponseStatus.OK))] + }, + [MessageType.UNREGISTER]: (pi, id, Store, store, msg) => { + let ns = msg.unregister.ns + log('unregister@%s: unregister from %s', id, ns || '') + + if (ns) { + store = Store.removePeerFromNamespace(store, ns, id) + } else { + store = Store.removePeer(store, id) + } + + return [store] + }, + [MessageType.DISCOVER]: (pi, id, Store, store, msg) => { + let {ns, limit, cookie} = msg.discover + if (limit <= 0 || limit > MAX_DISCOVER_LIMIT) limit = MAX_DISCOVER_LIMIT + log('discover@%s: discover on %s (%s peers)', id, ns || '', limit) + + let nsStore + let registrations = [] + + if (ns) { + nsStore = Store.utils.getNamespaces(store).get(ns) + } else { + nsStore = store.get('global_namespace') + } + + if (nsStore) { + if (cookie && cookie.length) { // if client gave us a cookie, try to parse it + cookie = parseInt(String(cookie), 10) } - if (end) { - this.online = false - log('end@%s: %s', this.id, end) - this.source.end() - return + if (Number.isNaN(cookie) || typeof cookie !== 'number') { // if cookie is invalid, set it to 0 + cookie = 0 } - switch (msg.type) { - case MessageType.REGISTER: - try { - log('register@%s: trying register on %s', this.id, msg.register.ns) - if (msg.register.peer.id && new Id(msg.register.peer.id).toB58String() !== this.id) { - log('register@%s: auth err (want %s)', this.id, new Id(msg.register.peer.id).toB58String()) - this.source.push({ - type: MessageType.REGISTER_RESPONSE, - registerResponse: craftStatus(ResponseStatus.E_NOT_AUTHORIZED) - }) - return read(null, next) - } else if (!msg.register.peer.id) { - msg.register.peer.id = this.pi.id.toBytes() - } - if (msg.register.ns > MAX_NS_LENGTH) { - log('register@%s: ns err', this.id) - this.source.push({ - type: MessageType.REGISTER_RESPONSE, - registerResponse: craftStatus(ResponseStatus.E_INVALID_NAMESPACE) - }) - return read(null, next) - } - const pi = new Peer(new Id(msg.register.peer.id)) - msg.register.peer.addrs.forEach(a => pi.multiaddrs.add(a)) - this.main.getNS(msg.register.ns, true).addPeer(pi, Date.now(), msg.register.ttl, () => this.online) - log('register@%s: ok', this.id) - this.source.push({ - type: MessageType.REGISTER_RESPONSE, - registerResponse: craftStatus(ResponseStatus.OK) - }) - } catch (e) { - log('register@%s: internal error', this.id) - log(e) - this.source.push({ - type: MessageType.REGISTER_RESPONSE, - registerResponse: craftStatus(ResponseStatus.E_INTERNAL_ERROR) - }) - return read(null, next) - } - break - case MessageType.UNREGISTER: - try { - log('unregister@%s: unregister from %s', this.id, msg.unregister.ns) - // TODO: currently ignores id since there is no ownership error. change? - this.main.getNS(msg.unregister.ns).removePeer(this.id) - } catch (e) { - return next(null, null, e) - } - break - case MessageType.DISCOVER: - try { - // TODO: add more errors - log('discover@%s: discover on %s', this.id, msg.discover.ns) - if (msg.discover.limit <= 0 || msg.discover.limit > MAX_LIMIT) msg.discover.limit = MAX_LIMIT - const {peers, cookie} = this.main.getNS(msg.discover.ns).getPeers(msg.discover.cookie || Buffer.from(''), msg.discover.limit, this.id) - log('discover@%s: got %s peers', this.id, peers.length) - this.source.push({ - type: MessageType.DISCOVER_RESPONSE, - discoverResponse: { - registrations: peers.map(p => { - return { - ns: msg.discover.ns, - peer: { - id: p.pi.id.toBytes(), - addrs: p.pi.multiaddrs.toArray().map(a => a.buffer) - }, - ttl: p.ttl - } - }), - cookie - } - }) - } catch (e) { - log('discover@%s: internal error', this.id) - log(e) - this.source.push({ - type: MessageType.DISCOVER_RESPONSE, - registerResponse: craftStatus(ResponseStatus.E_INTERNAL_ERROR) - }) - return read(null, next) - } - break - // case MessageType.REGISTER_RESPONSE: - // case MessageType.DISCOVER_RESPONSE: - default: // should that disconnect or just get ignored? - log('error@%s: sent wrong msg type %s', this.id, msg.type) - return next(null, null, true) + registrations = nsStore.toArray() + .map(r => r[1].toJS()) // get only value without key + .filter(e => e.received_at >= cookie) // filter out previous peers + .slice(0, limit + 1) + .filter(e => e.peer.id.toB58String() !== id) // filter out own peer-id + + let next = registrations[limit] + registrations = registrations.slice(0, limit) + let last = registrations[registrations.length - 1] + + if (next) { + if (next.recieved_at === last.received_at) { + cookie = last.received_at + } else { + cookie = last.received_at + 1 + } + } else if (last) { + cookie = last.received_at } - read(null, next) + cookie = Buffer.from(String(cookie)) + } else { + cookie = Buffer.from('0') } - read(null, next) - } - setup (conn, cb) { - conn.getPeerInfo((err, pi) => { - if (err) return cb(err) - this.pi = pi - this.id = pi.id.toB58String() - pull( - conn, - ppb.decode(Message), - this, - ppb.encode(Message), - conn - ) - - this.online = true - cb() - }) + + if (registrations.length) { + registrations = registrations.map(p => { + return { + ns, + peer: { + id: p.peer.id.toBytes(), + addrs: p.peer.multiaddrs.toArray().map(a => a.buffer) + } + } + }) + } + + return [store, makeResponse('discover', Object.assign(makeStatus(ResponseStatus.OK), { + registrations, + cookie + }))] } } +const RPC = (pi, main) => { + let id = pi.id.toB58String() + + let online = true + + return pull( + ppb.decode(Message), + through(function (data) { + let handler = handlers[data.type] + if (!handler) return log('ignore@%s: invalid/unknown type %s', id, data.type) // ignore msg + let [store, resp] = handler(pi, id, main.Store, main.store, data, () => online) + if (resp) this.queue(resp) + main.store = store // update store + main.gc() + }, end => { + online = false + log('end@%s: %s', id, end) + }), + ppb.encode(Message) + ) +} + module.exports = RPC diff --git a/src/server/store/basic/index.js b/src/server/store/basic/index.js deleted file mode 100644 index 0095090..0000000 --- a/src/server/store/basic/index.js +++ /dev/null @@ -1,60 +0,0 @@ -'use strict' - -class NS { - constructor (name, que) { // name is a utf8 string - this.name = name - this.hexName = Buffer.from(name).toString('hex') // needed to prevent queue-dos attacks - this.que = que - this.id = {} - this.sorted = [] - } - addPeer (pi, ts, ttl, isOnline) { // isOnline returns a bool if the rpc connection still exists - const id = pi.id.toB58String() - this.id[id] = {pi, ts, ttl} - if (ttl) { - let expireAt = ts + ttl * 1000 - this.id[id].online = () => Date.now() >= expireAt - } else { - this.id[id].online = isOnline - } - this.update() - } - removePeer (pid) { - delete this.id[pid] - this.update() - } - update () { - this.que.add('sort@' + this.hexName, () => { - this.sorted = Object.keys(this.id).map(id => { return {id, ts: this.id[id].ts} }).sort((a, b) => a.ts - b.ts) - }) - } - getPeers (cookie, limit, ownId) { - cookie = cookie.length ? parseInt(cookie.toString(), 10) : 0 - let p = this.sorted.filter(p => p.ts > cookie && p.id !== ownId).slice(0, limit).map(p => this.id[p.id]) - let newCookie - if (p.length) { - newCookie = Buffer.from(p[p.length - 1].ts.toString()) - } else { - newCookie = Buffer.from('') - } - return {cookie: newCookie, peers: p} - } - gc () { - return Object.keys(this.id).filter(k => !this.id[k].online()).map(k => delete this.id[k]).length - } - get useless () { - return !Object.keys(this.id).length - } -} - -class BasicStore { - constructor (main) { - this.main = main - } - create (name) { - return new NS(name, this.main.que) - } -} - -module.exports = BasicStore -module.exports.NS = NS diff --git a/src/server/store/immutable/index.js b/src/server/store/immutable/index.js new file mode 100644 index 0000000..e2d0520 --- /dev/null +++ b/src/server/store/immutable/index.js @@ -0,0 +1,215 @@ +'use strict' + +const { Map } = require('immutable') + +// Helper for checking if a peer has the neccessary properties +const validatePeerRecord = (peerRecord) => { + // Should validate that this is a PeerInfo instead + if (!peerRecord.peer.id.toB58String) { + return new Error('Missing `peerRecord.peer.id._id`') + } + if (!peerRecord.peer.multiaddrs) { + return new Error('Missing `peerRecord.addrs`') + } + if (!peerRecord.ttl) { + return new Error('Missing `peerRecord.ttl`') + } + if (!peerRecord.received_at) { + return new Error('Missing `peerRecord.received_at`') + } +} + +// Creates the default revision store +const createRevisionStore = () => { + return Map({ + _rev: 0, + global_namespace: Map(), + namespaces: Map() + }) +} + +// Helper for incrementing the revision in a store +const incrementRevision = (store) => { + return store.set('_rev', store.get('_rev') + 1) +} + +// Helper for getting the namespace of a store +const getNamespaces = (store) => { + return store.get('namespaces') +} + +// Helper for setting the namespace of a store +const setNamespaces = (store, value) => { + return store.set('namespaces', value) +} + +// Creates a peer table within a store +const createNamespace = (store, name) => { + // Check if namespace already exists + if (getNamespaces(store).get(name)) { + return store + } else { + // Didn't exists, let's create it with a empty Map + return setNamespaces(store, getNamespaces(store).set(name, Map({}))) + } +} + +// Adds a peer to a peer table within a namespace +const addPeerToNamespace = (store, peerTableName, peerRecord) => { + const peerErr = validatePeerRecord(peerRecord) + if (peerErr) { + throw new Error('Peer was not valid for adding to rendezvous namespace. ' + peerErr) + } + // Get a version of the peer table we can modify + let newPeerTable = getNamespaces(store).get(peerTableName) + // Add the new peerRecord to the peer table + newPeerTable = newPeerTable.set(peerRecord.peer.id.toB58String(), Map(peerRecord)) + // We made a modification, lets increment the revision + store = incrementRevision(store) + // Return the new store with the new values + return setNamespaces(store, getNamespaces(store).set(peerTableName, newPeerTable)) +} + +// Add a peer to the global namespace +const addPeer = (store, peerRecord) => { + const peerErr = validatePeerRecord(peerRecord) + if (peerErr) { + throw new Error('Peer was not valid for adding to rendezvous namespace. ' + peerErr) + } + // We made a modification, lets increment the revision + store = incrementRevision(store) + // Return the new store with the new values + return store.set('global_namespace', store.get('global_namespace').set(peerRecord.peer.id.toB58String(), Map(peerRecord))) +} + +// Removes a peer from a peer table within a namespace +const removePeerFromNamespace = (store, peerTableName, peerID) => { + // Get a version of the peer table we can modify + let newPeerTable = getNamespaces(store).get(peerTableName) + // remove the Peer from it + newPeerTable = newPeerTable.remove(peerID) + // We made a modification, lets increment the revision + store = incrementRevision(store) + // Return the new store with new values + return setNamespaces(store, getNamespaces(store).set(peerTableName, newPeerTable)) +} + +// Removes a peer from the global namespace +const removePeer = (store, peerID) => { + // We made a modification, lets increment the revision + store = incrementRevision(store) + // Return the new store with new values + return store.set('global_namespace', store.get('global_namespace').delete(peerID)) +} + +// Removes a namespace +const removeNamespace = (store, peerTableName) => { + // We made a modification, lets increment the revision + store = incrementRevision(store) + // Return the new store with new values + return setNamespaces(store, getNamespaces(store).delete(peerTableName)) +} + +// Checks all the ttls and removes peers that are expired +const clearExpiredFromNamespace = (store, peerTableName, currentTime) => { + // Get the peer table + const peerTable = getNamespaces(store).get(peerTableName) + // Go through all peers + const newStore = peerTable.reduce((accStore, v) => { + // Check if ttl is function (clears peer after disconnect) + if (typeof v.get('ttl') === 'function') { + if (!v.get('ttl')()) { + return removePeer(accStore, v.get('id')) + } + + return accStore + } + + const expiresAt = new Date(v.get('received_at')) + + // Add TTL seconds to date to get when it should expire + expiresAt.setSeconds(expiresAt.getSeconds() + v.get('ttl')) + + // Get amount of seconds diff with current time + const diffInSeconds = (expiresAt - currentTime) / 1000 + + // If it's less than zero, peer has expired and we should remove it + if (diffInSeconds < 0) { + return removePeerFromNamespace(accStore, peerTableName, v.get('peer').id.toB58String()) + } + return accStore + }, store) + // Return the new store with new values + return newStore +} + +// Checks all the ttls and removes peers that are expired +const clearExpiredFromGlobalNamespace = (store, currentTime) => { + // Go through all peers + const newStore = store.get('global_namespace').reduce((accStore, v) => { + // Check if ttl is function (clears peer after disconnect) + if (typeof v.get('ttl') === 'function') { + if (!v.get('ttl')()) { + return removePeer(accStore, v.get('id')) + } + + return accStore + } + + const expiresAt = new Date(v.get('received_at')) + + // Add TTL seconds to date to get when it should expire + expiresAt.setSeconds(expiresAt.getSeconds() + v.get('ttl')) + + // Get amount of seconds diff with current time + const diffInSeconds = (expiresAt - currentTime) / 1000 + + // If it's less than zero, peer has expired and we should remove it + if (diffInSeconds < 0) { + return removePeer(accStore, v.get('id')) + } + return accStore + }, store) + // Return the new store with new values + return newStore +} + +const clearExpired = (store, peerTableName, currentTime) => { + const newStore = clearExpiredFromGlobalNamespace(store, currentTime) + return clearExpiredFromNamespace(newStore, peerTableName, currentTime) +} + +const clearExpiredAll = (store, currentTime) => { + const newStore = clearExpiredFromGlobalNamespace(store, currentTime) + // return clearExpiredFromNamespace(newStore, peerTableName, currentTime) + return getNamespaces(newStore).reduce((store, _, peerTableName) => { + return clearExpiredFromNamespace(newStore, peerTableName, currentTime) + }, newStore) +} + +const clearEmptyNamespaces = (store) => { + return getNamespaces(store).reduce((store, ns, id) => { + if (!ns.size) { + return removeNamespace(store, id) + } + + return store + }, store) +} + +module.exports = { + createStore: createRevisionStore, + createNamespace, + addPeer, + addPeerToNamespace, + removePeer, + removePeerFromNamespace, + removeNamespace, + clearEmptyNamespaces, + clearExpired, + clearExpiredAll, + utils: { + getNamespaces, + setNamespaces + } +} diff --git a/src/sync.js b/src/sync.js new file mode 100644 index 0000000..b70cc7a --- /dev/null +++ b/src/sync.js @@ -0,0 +1,57 @@ +'use strict' + +const { Map } = require('immutable') + +const createSyncState = () => { + return Map({ + _rev: 0, + registrations: Map(), + points: Map() + }) +} + +// Helper for incrementing the revision in a store +const incrementRevision = (store) => { + return store.set('_rev', store.get('_rev') + 1) +} + +// Add a point together with it's rpc client +const addPoint = (store, id, rpc) => { + if (getPoint(store, id)) throw new Error('Trying to override ' + id) + store = incrementRevision(store) + return store.set('points', store.get('points').set(id, Map({ + registrations: Map(), + cookies: Map(), + rpc + }))) +} + +// Get a point and the current registrations +const getPoint = (store, id) => { + return store.get(id) +} + +// Remove a point +const removePoint = (store, id) => { + store = incrementRevision(store) + return store.set('points', store.get('points').delete(id)) +} + +// Clears offline points +const clearPoints = (store) => { + return store.get('points').reduce((store, point, id) => { + if (!point.get('rpc')().online()) { + return removePoint(store, id) + } + + return store + }, store) +} + +module.exports = { + create: createSyncState, + addPoint, + getPoint, + removePoint, + clearPoints +} diff --git a/test/client.id.json b/test/client1.id.json similarity index 100% rename from test/client.id.json rename to test/client1.id.json diff --git a/test/discovery.spec.js b/test/discovery.spec.js index 687b40f..43c2d9f 100644 --- a/test/discovery.spec.js +++ b/test/discovery.spec.js @@ -3,66 +3,85 @@ /* eslint-env mocha */ const {parallel} = require('async') -const Utils = require('./utils') +const Utils = require('./utils.peer') +const pull = require('pull-stream') +const proto = require('../src/proto') +const promisify = require('promisify-es6') const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect +const wait = () => new Promise((resolve) => setTimeout(() => resolve(), 100)) chai.use(dirtyChai) +const discover = (client, ns) => promisify(client._client.discover.bind(client._client))(ns || null) + describe('discovery', () => { - let client + let client1 let client2 - let server - before(done => { - Utils.default((err, _client, _server, _client2) => { - if (err) return done(err) - client = _client - client2 = _client2 - server = _server - parallel([client, client2].map(c => cb => c._dial(server.node.peerInfo, cb)), done) - }) + before(async () => { + client1 = await Utils.createRendezvousPeer(require('./client1.id.json')) + client2 = await Utils.createRendezvousPeer(require('./client2.id.json')) }) - it('register', done => { - parallel( - [client, client2].map(c => cb => c.register('hello', c.swarm.peerInfo, cb)), - (...a) => setTimeout(() => done(...a), 100) // Queue is being processed every 100ms - ) + it('register client1@hello', async () => { + client1.register('hello') + await wait() }) - - it('discover', done => { - client.discover('hello', (err, res) => { - if (err) return done(err) - expect(err).to.not.exist() - expect(res.peers).to.have.lengthOf(1) - expect(res.peers[0].id.toB58String()).to.equal(client2.swarm.peerInfo.id.toB58String()) - done() - }) + it('discover client1@hello from client2', async () => { + const res = await discover(client2, 'hello') + expect(res).to.have.lengthOf(1) + expect(res[0].id.toB58String()).to.equal(client1.swarm.peerInfo.id.toB58String()) + }) + it('can\'t discover client1@ from client2', async () => { + const res = await discover(client2) + expect(res).to.have.lengthOf(0) + }) + it('can\'t discover client1@hello from client1', async () => { + const res = await discover(client1, 'hello') + expect(res).to.have.lengthOf(0) }) - it('unregister', done => { - client2.unregister('hello') - setTimeout(() => done(), 100) // Queue is being processed every 100ms + it('dial client2->client1', async () => { + const res = await discover(client2, 'hello') + await promisify(client2.swarm.dial.bind(client2.swarm, res[0]))() }) - it('discover (after unregister)', done => { - client.discover('hello', (err, res) => { - if (err) return done(err) - expect(err).to.not.exist() - expect(res.peers).to.have.lengthOf(0) - done() - }) + it('register client2@', async () => { + client2.register() + await wait() + }) + it('discover client2@ from client1', async () => { + const res = await discover(client1) + expect(res).to.have.lengthOf(1) + expect(res[0].id.toB58String()).to.equal(client2.swarm.peerInfo.id.toB58String()) + }) + it('can\'t discover client2@hello from client2', async () => { + const res = await discover(client2, 'hello') + expect(res).to.have.lengthOf(1) + expect(res[0].id.toB58String()).to.equal(client1.swarm.peerInfo.id.toB58String()) // check if id is NOT from client2 + }) + it('can\'t discover client2@ from client2', async () => { + const res = await discover(client2) + expect(res).to.have.lengthOf(0) }) - it('unregister other client', done => { - client.unregister('hello') - setTimeout(() => done(), 100) // Queue is being processed every 100ms + it('unregister client1@hello', async () => { + client1.unregister('hello') + await wait() + }) + it('can\'t discover client1@hello from client2 anymore', async () => { + const res = await discover(client2, 'hello') + expect(res).to.have.lengthOf(0) }) - it('gc', () => { - server.gc() - expect(Object.keys(server.table.NS)).to.have.lengthOf(0) + it('unregister client2@', async () => { + client2.unregister() + await wait() + }) + it('can\'t discover client2@ from client1 anymore', async () => { + const res = await discover(client1) + expect(res).to.have.lengthOf(0) }) }) diff --git a/test/store.spec.js b/test/store.spec.js new file mode 100644 index 0000000..b1e6c63 --- /dev/null +++ b/test/store.spec.js @@ -0,0 +1,159 @@ +'use strict' + +/* eslint-env mocha */ + +const assert = require('assert') +const PeerInfo = require('peer-info') +const PeerID = require('peer-id') +const multiaddr = require('multiaddr') + +const { + createStore, + createNamespace, + utils, + addPeer, + addPeerToNamespace, + removePeer, + removePeerFromNamespace, + clearExpired, + clearEmptyNamespaces, + clearEmpty +} = require('../src/server/store/immutable') + +const Utils = require('./utils.store') + +const { + assertNumberOfNamespaces, + assertNumberOfPeersInNamespace, + assertNumberOfPeers, + assertRevisionNumber, + createPeerRecord +} = Utils + +const DateNow = Utils.DateNow = () => new Date('2018-05-17T13:00:00.000Z') // mock date + +describe('immutable store', () => { + it('starts with no revisions', () => { + const store = createStore() + + assertRevisionNumber(store, 0) + assertNumberOfNamespaces(store, 0) + }) + + it('add namespace', () => { + const store = createNamespace(createStore(), 'my-app') + + assertRevisionNumber(store, 0) + assertNumberOfNamespaces(store, 1) + assertNumberOfPeersInNamespace(store, 'my-app', 0) + }) + + it('add multiple namespaces', () => { + let store = createStore() + store = createNamespace(store, 'my-app-1') + store = createNamespace(store, 'my-app-2') + store = createNamespace(store, 'my-app-3') + + assertRevisionNumber(store, 0) + assertNumberOfNamespaces(store, 3) + }) + + it('add duplicated namespace', () => { + let store = createStore() + store = createNamespace(store, 'my-app') + store = createNamespace(store, 'my-app') + + assertRevisionNumber(store, 0) + assertNumberOfNamespaces(store, 1) + assertNumberOfPeersInNamespace(store, 'my-app', 0) + }) + + it('add duplicate namespace wont clear existing peers', async () => { + const peerRecord = await createPeerRecord() + let store = createNamespace(createStore(), 'my-app') + store = addPeerToNamespace(store, 'my-app', peerRecord) + store = createNamespace(store, 'my-app') + + assertRevisionNumber(store, 1) + assertNumberOfNamespaces(store, 1) + assertNumberOfPeersInNamespace(store, 'my-app', 1) + }) + + it('can add peer to global namespace', async () => { + const peerRecord = await createPeerRecord() + let store = createStore() + store = addPeer(store, peerRecord) + + assertRevisionNumber(store, 1) + assertNumberOfNamespaces(store, 0) + assertNumberOfPeers(store, 1) + }) + + it('can remove peer from global namespace', async () => { + const peerRecord = await createPeerRecord() + let store = createStore() + store = addPeer(store, peerRecord) + store = removePeer(store, peerRecord.peer.id.toB58String()) + + assertRevisionNumber(store, 2) + assertNumberOfNamespaces(store, 0) + assertNumberOfPeers(store, 0) + }) + + it('can add peer to namespace', async () => { + const peerRecord = await createPeerRecord() + let store = createNamespace(createStore(), 'my-app') + store = addPeerToNamespace(store, 'my-app', peerRecord) + + assertRevisionNumber(store, 1) + assertNumberOfNamespaces(store, 1) + assertNumberOfPeersInNamespace(store, 'my-app', 1) + }) + + it('can remove peer from namespace', async () => { + const peerRecord = await createPeerRecord() + let store = createNamespace(createStore(), 'my-app') + store = addPeerToNamespace(store, 'my-app', peerRecord) + store = removePeerFromNamespace(store, 'my-app', peerRecord.peer.id.toB58String()) + + assertRevisionNumber(store, 2) + assertNumberOfNamespaces(store, 1) + assertNumberOfPeersInNamespace(store, 'my-app', 0) + }) + + it('gc clears expired peers', async () => { + const peerRecord = await createPeerRecord() + let store = createNamespace(createStore(), 'my-app') + store = addPeerToNamespace(store, 'my-app', peerRecord) + const dateAfterExpired = new Date('2018-05-17T13:02:00.000Z') + store = clearExpired(store, 'my-app', dateAfterExpired) + + assertRevisionNumber(store, 2) + assertNumberOfNamespaces(store, 1) + assertNumberOfPeersInNamespace(store, 'my-app', 0) + assertNumberOfPeers(store, 0) + }) + + it('gc leaves non-expired peers in store', async () => { + const peerRecord = await createPeerRecord() + let store = createNamespace(createStore(), 'my-app') + store = addPeerToNamespace(store, 'my-app', peerRecord) + const dateBeforeExpired = new Date('2018-05-17T13:00:00.000Z') + store = clearExpired(store, 'my-app', dateBeforeExpired) + + assertRevisionNumber(store, 1) + assertNumberOfNamespaces(store, 1) + assertNumberOfPeersInNamespace(store, 'my-app', 1) + // console.log(JSON.stringify(store.toJSON(), null, 2)) + }) + + it('gc clears empty namespace', async () => { + const peerRecord = await createPeerRecord() + let store = createNamespace(createNamespace(createStore(), 'my-app'), 'my-app-2') + store = addPeerToNamespace(store, 'my-app', peerRecord) + store = clearEmptyNamespaces(store) + + assertRevisionNumber(store, 2) + assertNumberOfNamespaces(store, 1) + }) +}) diff --git a/test/utils.js b/test/utils.js deleted file mode 100644 index 177aef9..0000000 --- a/test/utils.js +++ /dev/null @@ -1,87 +0,0 @@ -'use strict' - -const Libp2p = require('libp2p') -const TCP = require('libp2p-tcp') -const MPLEX = require('libp2p-mplex') -const SPDY = require('libp2p-spdy') -const SECIO = require('libp2p-secio') - -const Id = require('peer-id') -const Peer = require('peer-info') - -const Server = require('../src/server') -const Client = require('../src') - -const Utils = module.exports = (id, addrs, cb) => { - Id.createFromJSON(require(id), (err, id) => { - if (err) return cb(err) - const peer = new Peer(id) - addrs.forEach(a => peer.multiaddrs.add(a)) - - const swarm = new Libp2p({ - transport: [ - new TCP() - ], - connection: { - muxer: [ - MPLEX, - SPDY - ], - crypto: [SECIO] - } - }, peer, null, { - relay: { - enabled: true, - hop: { - enabled: true, - active: false - } - } - }) - - swarm.start(err => { - if (err) return cb(err) - cb(null, swarm) - }) - }) -} - -Utils.id = (id, addrs, cb) => { - Id.createFromJSON(require(id), (err, id) => { - if (err) return cb(err) - const peer = new Peer(id) - addrs.forEach(a => peer.multiaddrs.add(a)) - cb(null, peer) - }) -} - -Utils.createServer = (id, addrs, opt, cb) => { - Utils(id, addrs, (err, swarm) => { - if (err) return cb(err) - const server = new Server(Object.assign(opt || {}, {node: swarm})) - server.start() - return cb(null, server, swarm) - }) -} - -Utils.createClient = (id, addrs, cb) => { - Utils(id, addrs, (err, swarm) => { - if (err) return cb(err) - const client = new Client(swarm) - client.start(err => { - if (err) return cb(err) - return cb(null, client, swarm) - }) - }) -} - -Utils.default = cb => Utils.createServer('./server.id.json', ['/ip4/0.0.0.0/tcp/0'], {}, (err, server) => { - if (err) return cb(err) - Utils.createClient('./client.id.json', ['/ip4/0.0.0.0/tcp/0'], (err, client) => { - if (err) return cb(err) - Utils.createClient('./client2.id.json', ['/ip4/0.0.0.0/tcp/0'], (err, client2) => { - if (err) return cb(err) - return cb(null, client, server, client2) - }) - }) -}) diff --git a/test/utils.peer.js b/test/utils.peer.js new file mode 100644 index 0000000..0c56816 --- /dev/null +++ b/test/utils.peer.js @@ -0,0 +1,77 @@ +'use strict' + +const Libp2p = require('libp2p') +const Id = require('peer-id') +const Peer = require('peer-info') +const promisify = require('promisify-es6') + +const WS = require('libp2p-websockets') +const MPLEX = require('libp2p-mplex') +const SECIO = require('libp2p-secio') + +const defaultAddrs = process.toString() === '[object process]' ? ['/ip4/127.0.0.1/tcp/0/ws'] : [] // don't try to create ws-server in browser +const defaultServerAddrs = ['/ip4/127.0.0.1/tcp/5334/ws'] +const Rendezvous = require('../src') +const Server = require('../src/server') + +const Utils = module.exports = { + createSwarm: (id, addrs, lp2pOpt, post) => new Promise((resolve, reject) => { + Id.createFromJSON(id, (err, peerID) => { + if (err) return reject(err) + const peer = new Peer(peerID) + addrs.forEach(a => peer.multiaddrs.add(a)) + + const swarm = new Libp2p({ + transport: [ + new WS() + ], + connection: { + muxer: [ + MPLEX + ], + crypto: [SECIO] + } + }, peer, null, lp2pOpt || {}) + if (post) { + post(swarm) + } + swarm.start(err => { + if (err) return reject(err) + resolve(swarm) + }) + }) + }), + createServer: async (id, conf, addrs) => { + const swarm = await Utils.createSwarm(id, addrs || defaultServerAddrs, { + relay: { + enabled: true, + hop: { + enabled: true, + active: true + } + } + }) + const server = new Server(Object.assign(Object.assign({}, conf || {}), {swarm})) + server.start() + return server + }, + createRendezvousPeer: async (id, conf, addrs) => { + let rendezvous + const swarm = await Utils.createSwarm(id, addrs || defaultAddrs, { + relay: { + enabled: true + } + }, (swarm) => { + rendezvous = new Rendezvous(swarm, conf || {}) + }) + rendezvous.start() + await promisify(swarm.dial.bind(swarm, await Utils.createServerPeerInfo()))() + await new Promise((resolve) => setTimeout(() => resolve(), 500)) + return rendezvous + }, + createServerPeerInfo: async () => { + const peer = new Peer(await promisify(Id.createFromJSON)(require('./server.id.json'))) + defaultServerAddrs.forEach(a => peer.multiaddrs.add(a)) + return peer + } +} diff --git a/test/utils.store.js b/test/utils.store.js new file mode 100644 index 0000000..2844155 --- /dev/null +++ b/test/utils.store.js @@ -0,0 +1,48 @@ +'use strict' + +const Id = require('peer-id') +const Peer = require('peer-info') +const multiaddr = require('multiaddr') +const { getNamespaces } = require('../src/server/store/immutable').utils +const assert = require('assert') + +// Helper for asserting the number of namespaces +const assertNumberOfNamespaces = (store, numberOfNamespaces) => { + assert.equal(Object.keys(getNamespaces(store).toJSON()).length, numberOfNamespaces) +} + +// Helper for asserting the number of peers in a namespace +const assertNumberOfPeersInNamespace = (store, namespace, numberOfPeers) => { + assert.equal(Object.keys(getNamespaces(store).get(namespace).toJSON()).length, numberOfPeers) +} + +// Helper for asserting the number of peers in the global namespace +const assertNumberOfPeers = (store, numberOfPeers) => { + assert.equal(Object.keys(store.get('global_namespace').toJSON()).length, numberOfPeers) +} + +// Helper for asserting which revision we're currently at +const assertRevisionNumber = (store, numberOfRevision) => { + assert.equal(store.get('_rev'), numberOfRevision) +} + +const createPeerRecord = () => new Promise((resolve, reject) => { + Id.create({bits: 512}, (err, id) => { + if (err) reject(err) + const peer = new Peer(id) + peer.multiaddrs.add(multiaddr('/ip4/127.0.0.1/tcp/0')) + resolve({ + peer, + ttl: 60, + received_at: (module.exports.DateNow || Date.now)() + }) + }) +}) + +module.exports = { + assertNumberOfNamespaces, + assertNumberOfPeersInNamespace, + assertNumberOfPeers, + assertRevisionNumber, + createPeerRecord +}