Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b8bf044
feat: rendezvous protocol full implementation
vasco-santos May 20, 2020
d7290df
chore: interface-peer-discovery compliance
vasco-santos Jul 10, 2020
b080670
feat: garbage collector
vasco-santos Jul 13, 2020
ebb22d1
feat: cookie for discovery
vasco-santos Jul 13, 2020
9765a95
chore: cleanup
vasco-santos Jul 15, 2020
b6edaf3
chore: update aegir
vasco-santos Jul 17, 2020
0e304f9
chore: convert to seconds in the wire
vasco-santos Jul 17, 2020
b248924
chore: remove unregister comments for response
vasco-santos Jul 20, 2020
47641f7
feat: use signed peer records to exchange multiaddrs
vasco-santos Jul 22, 2020
b668c8a
chore: tests
vasco-santos Jul 22, 2020
7e3c541
chore: change readme
vasco-santos Jul 27, 2020
1a1590d
chore: update deps
vasco-santos Sep 22, 2020
b357829
chore: remove peer discovery interface as we will be creating libp2p.…
vasco-santos Sep 22, 2020
4abd363
chore: use uint8array instead of buffer
vasco-santos Sep 22, 2020
7763df2
chore: update libp2p integration doc
vasco-santos Sep 28, 2020
63d607b
chore: fix register ttl param return
vasco-santos Sep 28, 2020
5f45c6f
chore: update docs
vasco-santos Sep 29, 2020
640b64f
chore: remove enabled property from libp2p integration doc
vasco-santos Oct 5, 2020
8f6e148
chore: apply suggestions from code review
vasco-santos Nov 16, 2020
894ad2e
chore: separate server and client rendezvous
vasco-santos Nov 17, 2020
ee10d69
chore: add docker
vasco-santos Nov 17, 2020
3798fbb
fix: changed default values and moved them into the server with prope…
vasco-santos Nov 17, 2020
cdf2f6b
chore: update docs and constants
vasco-santos Nov 17, 2020
9fe0691
chore: add tests for protocol with direct connection to server
vasco-santos Nov 18, 2020
52fa2bd
chore: DoS protection with max registrations
vasco-santos Nov 19, 2020
06d53ac
chore: refactor client
vasco-santos Nov 21, 2020
d501d97
chore: add datastore and types
vasco-santos Dec 8, 2020
a2d5f83
chore: fix build
vasco-santos Dec 13, 2020
83cd4b7
chore: run with mysql
vasco-santos Dec 21, 2020
9b294f5
feat: gc
vasco-santos Dec 24, 2020
9bf5bcb
chore: add gc tests
vasco-santos Dec 24, 2020
7a569c7
chore: review docs and binary
vasco-santos Dec 24, 2020
c297156
chore: add datastore docs and model picture
vasco-santos Dec 28, 2020
e1cd224
chore: add library docs
vasco-santos Dec 28, 2020
264ce2a
chore: add docker setup docks
vasco-santos Dec 28, 2020
01ec7bc
chore: remove client code and move server into src
vasco-santos Jan 4, 2021
5f025d3
chore: use connection pool
vasco-santos Jan 11, 2021
2840251
fix: bin stdout addresses and ports correctly
vasco-santos Jan 15, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: separate server and client rendezvous
  • Loading branch information
vasco-santos committed Nov 17, 2020
commit 894ad2e7986487b67136903059b15cff11e810db
14 changes: 7 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,26 @@
"it-buffer": "^0.1.2",
"it-length-prefixed": "^3.1.0",
"it-pipe": "^1.1.0",
"libp2p": "^0.29.0",
"libp2p-interfaces": "^0.5.1",
"libp2p-mplex": "^0.10.0",
"libp2p-noise": "^2.0.1",
"libp2p-tcp": "^0.15.1",
"libp2p-websockets": "^0.14.0",
"menoetius": "0.0.2",
"minimist": "^1.2.5",
"multiaddr": "^8.0.0",
"peer-id": "^0.14.1",
"protons": "^2.0.0",
"streaming-iterables": "^5.0.2",
"uint8arrays": "^1.1.0"
},
"peerDependencies": {
"libp2p": "^0.29.0"
},
"devDependencies": {
"aegir": "^26.0.0",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"delay": "^4.4.0",
"dirty-chai": "^2.0.1",
"libp2p": "^0.29.0",
"libp2p-mplex": "^0.10.0",
"libp2p-noise": "^2.0.1",
"libp2p-websockets": "^0.14.0",
"p-defer": "^3.0.0",
"p-times": "^3.0.0",
"p-wait-for": "^3.1.0",
Expand Down
62 changes: 3 additions & 59 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@ const toString = require('uint8arrays/to-string')

const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')

const Server = require('./server')
const { codes: errCodes } = require('./errors')
const { PROTOCOL_MULTICODEC } = require('./constants')
const { Message } = require('./proto')
const MESSAGE_TYPE = Message.MessageType

const defaultServerOptions = {
enabled: true,
gcInterval: 3e5
}

/**
* Rendezvous point contains the connection to a rendezvous server, as well as,
* the cookies per namespace that the client received.
Expand All @@ -42,33 +36,17 @@ class Rendezvous {
* @constructor
* @param {object} params
* @param {Libp2p} params.libp2p
* @param {object} [params.server]
* @param {boolean} [params.server.enabled = true]
* @param {number} [params.server.gcInterval = 3e5]
*/
constructor ({ libp2p, server = {} }) {
constructor ({ libp2p }) {
this._libp2p = libp2p
this._peerId = libp2p.peerId
this._registrar = libp2p.registrar

this._serverOptions = {
...defaultServerOptions,
...server
}

/**
* @type {Map<string, RendezvousPoint>}
*/
this._rendezvousPoints = new Map()

/**
* Client cookies per namespace for own server
* @type {Map<string, string>}
*/
this._cookiesSelf = new Map()

this._server = undefined

this._registrarId = undefined
this._onPeerConnected = this._onPeerConnected.bind(this)
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)
Expand All @@ -85,12 +63,6 @@ class Rendezvous {

log('starting')

// Create and start Rendezvous server if enabled
if (this._serverOptions.enabled) {
this._server = new Server(this._libp2p, this._serverOptions)
this._server.start()
}

// register protocol with topology
const topology = new MulticodecTopology({
multicodecs: PROTOCOL_MULTICODEC,
Expand All @@ -105,7 +77,7 @@ class Rendezvous {
}

/**
* Unregister the rendezvous protocol and the streams with other peers will be closed.
* Unregister the rendezvous protocol and clear the state.
* @returns {void}
*/
stop () {
Expand All @@ -115,17 +87,11 @@ class Rendezvous {

log('stopping')

clearInterval(this._interval)

// unregister protocol and handlers
this._registrar.unregister(this._registrarId)
if (this._serverOptions.enabled) {
this._server.stop()
}

this._registrarId = undefined
this._rendezvousPoints.clear()
this._cookiesSelf.clear()

log('stopped')
}
Expand Down Expand Up @@ -153,10 +119,6 @@ class Rendezvous {
log('disconnected', idB58Str)

this._rendezvousPoints.delete(idB58Str)

if (this._server) {
this._server.removePeerRegistrations(peerId)
}
}

/**
Expand Down Expand Up @@ -271,7 +233,7 @@ class Rendezvous {
* Discover peers registered under a given namespace
* @param {string} ns
* @param {number} [limit]
* @returns {AsyncIterable<{ signedPeerRecord: Buffer, ns: string, ttl: number }>}
* @returns {AsyncIterable<{ signedPeerRecord: Uint8Array, ns: string, ttl: number }>}
*/
async * discover (ns, limit) {
// Are there available rendezvous servers?
Expand All @@ -285,24 +247,6 @@ class Rendezvous {
ttl: r.ttl * 1e3 // convert to ms
})

// Local search if Server enabled
if (this._server) {
const cookieSelf = this._cookiesSelf.get(ns)
const { cookie: cookieS, registrations: localRegistrations } = this._server.getRegistrations(ns, { limit, cookie: cookieSelf })

for (const r of localRegistrations) {
yield registrationTransformer(r)

limit--
if (limit === 0) {
return
}
}

// Store cookie self
this._cookiesSelf.set(ns, cookieS)
}

// Iterate over all rendezvous points
for (const [id, rp] of this._rendezvousPoints.entries()) {
const rpCookies = rp.cookies || new Map()
Expand Down
93 changes: 93 additions & 0 deletions src/server/bin.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env node

'use strict'

// Usage: $0 [--peerId <jsonFilePath>] [--listenMultiaddrs <ma> ... <ma>] [--announceMultiaddrs <ma> ... <ma>] [--metricsMultiaddr <ma>] [--disableMetrics]

/* eslint-disable no-console */

const debug = require('debug')
const log = debug('libp2p:rendezvous:bin')

const fs = require('fs')
const http = require('http')
const menoetius = require('menoetius')
const argv = require('minimist')(process.argv.slice(2))

const TCP = require('libp2p-tcp')
const Websockets = require('libp2p-websockets')
const Muxer = require('libp2p-mplex')
const { NOISE: Crypto } = require('libp2p-noise')

const multiaddr = require('multiaddr')
const PeerId = require('peer-id')

const RendezvousServer = require('./index')
const { getAnnounceAddresses, getListenAddresses } = require('./utils')

async function main () {
// Metrics
let metricsServer
const metrics = !(argv.disableMetrics || process.env.DISABLE_METRICS)
const metricsMa = multiaddr(argv.metricsMultiaddr || argv.ma || process.env.METRICSMA || '/ip4/127.0.0.1/tcp/8003')
const metricsAddr = metricsMa.nodeAddress()

// Multiaddrs
const listenAddresses = getListenAddresses(argv)
const announceAddresses = getAnnounceAddresses(argv)

// PeerId
let peerId
if (argv.peerId) {
const peerData = fs.readFileSync(argv.peerId)
peerId = await PeerId.createFromJSON(JSON.parse(peerData))
} else {
peerId = await PeerId.create()
log('You are using an automatically generated peer.')
log('If you want to keep the same address for the server you should provide a peerId with --peerId <jsonFilePath>')
}

// Create Rendezvous server
const rendezvousServer = new RendezvousServer({
modules: {
transport: [Websockets, TCP],
streamMuxer: [Muxer],
connEncryption: [Crypto]
},
peerId,
addresses: {
listen: listenAddresses,
announce: announceAddresses
}
})

await rendezvousServer.start()

if (metrics) {
log('enabling metrics')
metricsServer = http.createServer((req, res) => {
if (req.url !== '/metrics') {
res.statusCode = 200
res.end()
}
})

menoetius.instrument(metricsServer)

metricsServer.listen(metricsAddr.port, metricsAddr.address, () => {
console.log(`metrics server listening on ${metricsAddr.port}`)
})
}

const stop = async () => {
console.log('Stopping...', rendezvousServer.multiaddrs)
await rendezvousServer.stop()
metricsServer && await metricsServer.close()
process.exit(0)
}

process.on('SIGTERM', stop)
process.on('SIGINT', stop)
}

main()
21 changes: 13 additions & 8 deletions src/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const debug = require('debug')
const log = debug('libp2p:rendezvous-server')
log.error = debug('libp2p:rendezvous-server:error')

const Libp2p = require('libp2p')
const PeerId = require('peer-id')

const { PROTOCOL_MULTICODEC, MAX_LIMIT } = require('../constants')
Expand All @@ -27,16 +28,16 @@ const rpc = require('./rpc')
/**
* Libp2p rendezvous server.
*/
class RendezvousServer {
class RendezvousServer extends Libp2p {
/**
* @constructor
* @param {Libp2p} libp2p
* @param {Libp2pOptions} libp2pOptions
* @param {object} [options]
* @param {number} [options.gcInterval = 3e5]
*/
constructor (libp2p, { gcInterval = 3e5 } = {}) {
this._registrar = libp2p.registrar
this._peerStore = libp2p.peerStore
constructor (libp2pOptions, { gcInterval = 3e5 } = {}) {
super(libp2pOptions)

this._gcInterval = gcInterval

/**
Expand All @@ -59,6 +60,8 @@ class RendezvousServer {
* @returns {void}
*/
start () {
super.start()

if (this._interval) {
return
}
Expand All @@ -69,7 +72,7 @@ class RendezvousServer {
this._interval = setInterval(this._gc, this._gcInterval)

// Incoming streams handling
this._registrar.handle(PROTOCOL_MULTICODEC, rpc(this))
this.registrar.handle(PROTOCOL_MULTICODEC, rpc(this))

log('started')
}
Expand All @@ -79,6 +82,8 @@ class RendezvousServer {
* @returns {void}
*/
stop () {
super.stop()

clearInterval(this._interval)
this._interval = undefined

Expand Down Expand Up @@ -143,7 +148,7 @@ class RendezvousServer {
this.nsRegistrations.set(ns, nsReg)

// Store envelope in the AddressBook
this._peerStore.addressBook.consumePeerRecord(envelope)
this.peerStore.addressBook.consumePeerRecord(envelope)
}

/**
Expand Down Expand Up @@ -213,7 +218,7 @@ class RendezvousServer {
cRegistrations.add(nsReg.id)
registrations.push({
ns,
signedPeerRecord: this._peerStore.addressBook.getRawEnvelope(PeerId.createFromB58String(idStr)),
signedPeerRecord: this.peerStore.addressBook.getRawEnvelope(PeerId.createFromB58String(idStr)),
ttl: Date.now() - nsReg.expiration
})

Expand Down
41 changes: 41 additions & 0 deletions src/server/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict'

const multiaddr = require('multiaddr')

function getAnnounceAddresses(argv) {
const announceAddr = argv.announceMultiaddrs || argv.am
const announceAddresses = announceAddr ? [multiaddr(announceAddr)] : []

if (argv.announceMultiaddrs || argv.am) {
const flagIndex = process.argv.findIndex((e) => e === '--announceMultiaddrs' || e === '--am')
const tmpEndIndex = process.argv.slice(flagIndex + 1).findIndex((e) => e.startsWith('--'))
const endIndex = tmpEndIndex !== -1 ? tmpEndIndex : process.argv.length - flagIndex - 1

for (let i = flagIndex + 1; i < flagIndex + endIndex; i++) {
announceAddresses.push(multiaddr(process.argv[i + 1]))
}
}

return announceAddresses
}

module.exports.getAnnounceAddresses = getAnnounceAddresses

function getListenAddresses(argv) {
const listenAddr = argv.listenMultiaddrs || argv.lm || '/ip4/127.0.0.1/tcp/15002/ws'
const listenAddresses = [multiaddr(listenAddr)]

if (argv.listenMultiaddrs || argv.lm) {
const flagIndex = process.argv.findIndex((e) => e === '--listenMultiaddrs' || e === '--lm')
const tmpEndIndex = process.argv.slice(flagIndex + 1).findIndex((e) => e.startsWith('--'))
const endIndex = tmpEndIndex !== -1 ? tmpEndIndex : process.argv.length - flagIndex - 1

for (let i = flagIndex + 1; i < flagIndex + endIndex; i++) {
listenAddresses.push(multiaddr(process.argv[i + 1]))
}
}

return listenAddresses
}

module.exports.getListenAddresses = getListenAddresses
Loading