Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b8bf044
feat: rendezvous protocol full implementation
vasco-santos May 20, 2020
d7290df
chore: interface-peer-discovery compliance
vasco-santos Jul 10, 2020
b080670
feat: garbage collector
vasco-santos Jul 13, 2020
ebb22d1
feat: cookie for discovery
vasco-santos Jul 13, 2020
9765a95
chore: cleanup
vasco-santos Jul 15, 2020
b6edaf3
chore: update aegir
vasco-santos Jul 17, 2020
0e304f9
chore: convert to seconds in the wire
vasco-santos Jul 17, 2020
b248924
chore: remove unregister comments for response
vasco-santos Jul 20, 2020
47641f7
feat: use signed peer records to exchange multiaddrs
vasco-santos Jul 22, 2020
b668c8a
chore: tests
vasco-santos Jul 22, 2020
7e3c541
chore: change readme
vasco-santos Jul 27, 2020
1a1590d
chore: update deps
vasco-santos Sep 22, 2020
b357829
chore: remove peer discovery interface as we will be creating libp2p.…
vasco-santos Sep 22, 2020
4abd363
chore: use uint8array instead of buffer
vasco-santos Sep 22, 2020
7763df2
chore: update libp2p integration doc
vasco-santos Sep 28, 2020
63d607b
chore: fix register ttl param return
vasco-santos Sep 28, 2020
5f45c6f
chore: update docs
vasco-santos Sep 29, 2020
640b64f
chore: remove enabled property from libp2p integration doc
vasco-santos Oct 5, 2020
8f6e148
chore: apply suggestions from code review
vasco-santos Nov 16, 2020
894ad2e
chore: separate server and client rendezvous
vasco-santos Nov 17, 2020
ee10d69
chore: add docker
vasco-santos Nov 17, 2020
3798fbb
fix: changed default values and moved them into the server with prope…
vasco-santos Nov 17, 2020
cdf2f6b
chore: update docs and constants
vasco-santos Nov 17, 2020
9fe0691
chore: add tests for protocol with direct connection to server
vasco-santos Nov 18, 2020
52fa2bd
chore: DoS protection with max registrations
vasco-santos Nov 19, 2020
06d53ac
chore: refactor client
vasco-santos Nov 21, 2020
d501d97
chore: add datastore and types
vasco-santos Dec 8, 2020
a2d5f83
chore: fix build
vasco-santos Dec 13, 2020
83cd4b7
chore: run with mysql
vasco-santos Dec 21, 2020
9b294f5
feat: gc
vasco-santos Dec 24, 2020
9bf5bcb
chore: add gc tests
vasco-santos Dec 24, 2020
7a569c7
chore: review docs and binary
vasco-santos Dec 24, 2020
c297156
chore: add datastore docs and model picture
vasco-santos Dec 28, 2020
e1cd224
chore: add library docs
vasco-santos Dec 28, 2020
264ce2a
chore: add docker setup docks
vasco-santos Dec 28, 2020
01ec7bc
chore: remove client code and move server into src
vasco-santos Jan 4, 2021
5f025d3
chore: use connection pool
vasco-santos Jan 11, 2021
2840251
fix: bin stdout addresses and ports correctly
vasco-santos Jan 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: garbage collector
  • Loading branch information
vasco-santos committed Jul 13, 2020
commit b0806705ae69e4bd3865b94f9f78f32032190e75
2 changes: 1 addition & 1 deletion src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

exports.PROTOCOL_MULTICODEC = '/rendezvous/1.0.0'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This spec doesn't mention this protocol at all, we should clarify this there and that this protocol runs over libp2p streams.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exports.MAX_NS_LENGTH = 255 // TODO: spec this
exports.MAX_LIMIT = 1000 // TODO: spec this
exports.MAX_LIMIT = 1000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Max of what?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to MAX_DISCOVERY_LIMIT

11 changes: 9 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const { Message } = require('./proto')
const MESSAGE_TYPE = Message.MessageType

const defaultServerOptions = {
enabled: true
enabled: true,
gcInterval: 3e5
}

/**
Expand All @@ -40,6 +41,7 @@ class Rendezvous {
* @param {number} [params.discovery.interval = 5000]
* @param {object} [params.server]
* @param {boolean} [params.server.enabled = true]
* @param {number} [params.server.gcInterval = 3e5]
*/
constructor ({ libp2p, options = {} }) {
this._libp2p = libp2p
Expand Down Expand Up @@ -79,7 +81,8 @@ class Rendezvous {

// Create Rendezvous point if enabled
if (this._serverOptions.enabled) {
this._server = new Server({ registrar: this._registrar })
this._server = new Server(this._registrar, this._serverOptions)
this._server.start()
}

// register protocol with topology
Expand Down Expand Up @@ -109,8 +112,12 @@ class Rendezvous {
log('stopping')

clearInterval(this._interval)

// unregister protocol and handlers
await this._registrar.unregister(this._registrarId)
if (this._serverOptions.enabled) {
this._server.stop()
}

this._registrarId = undefined
log('stopped')
Expand Down
55 changes: 50 additions & 5 deletions src/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,69 @@ const rpc = require('./rpc')
class RendezvousServer {
/**
* @constructor
* @param {object} params
* @param {Registrar} params.registrar
* @param {Registrar} registrar
* @param {object} options
* @param {number} options.gcInterval
*/
constructor ({ registrar }) {
constructor (registrar, { gcInterval = 3e5 } = {}) {
this._registrar = registrar
this._gcInterval = gcInterval

/**
* Registrations per namespace.
* @type {Map<string, Map<string, Registration>>}
*/
this.registrations = new Map()
}

/**
* Start rendezvous server for handling rendezvous streams and gc.
* @returns {void}
*/
start () {
if (this._interval) {
return
}

log('starting')

// Garbage collection
this._interval = setInterval(this._gc, this._gcInterval)

// Incoming streams handling
this._registrar.handle(PROTOCOL_MULTICODEC, rpc(this))

log('started')
}

/**
* Stops rendezvous server gc and clears registrations
*/
stop () {
clearInterval(this._interval)
this._interval = undefined
this.registrations.clear()

log('stopped')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unhandle is not called on the registrar.

}

// TODO: Should we have a start method to gv the expired registrations?
// I am removing them on discover, but it should be useful to have a gc too
/**
* Garbage collector to removed outdated registrations.
* @returns {void}
*/
_gc () {
const now = Date.now()

// Iterate namespaces
this.registrations.forEach((nsRegistrations) => {
// Iterate registrations for namespaces
nsRegistrations.forEach((reg, idStr) => {
if (now >= reg.expiration) {
nsRegistrations.delete(idStr)
}
})
})
}

/**
* Add a peer registration to a namespace.
Expand Down
167 changes: 167 additions & 0 deletions test/server.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
'use strict'
/* eslint-env mocha */

const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-as-promised'))
const { expect } = chai

const delay = require('delay')
const sinon = require('sinon')
const multiaddr = require('multiaddr')

const RendezvousServer = require('../src/server')

const { createPeerId } = require('./utils')

const registrar = {
handle: () => { }
}
const testNamespace = 'test-namespace'
const multiaddrs = [multiaddr('/ip4/127.0.0.1/tcp/0')].map((m) => m.buffer)

describe('rendezvous server', () => {
let rServer
let peerIds

before(async () => {
peerIds = await createPeerId({ number: 3 })
})

afterEach(() => {
rServer && rServer.stop()
})

it('calls registrar handle on start once', () => {
rServer = new RendezvousServer(registrar)

// Spy for handle
const spyHandle = sinon.spy(registrar, 'handle')

rServer.start()
expect(spyHandle).to.have.property('callCount', 1)

rServer.start()
expect(spyHandle).to.have.property('callCount', 1)
})

it('can add registrations to multiple namespaces', () => {
const otherNamespace = 'other-namespace'
rServer = new RendezvousServer(registrar)

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000)
// Add registration for peer 1 in a different namespace
rServer.addRegistration(otherNamespace, peerIds[0], multiaddrs, 1000)

// Add registration for peer 2 in test namespace
rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000)

const testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(2)

const otherNsRegistrations = rServer.getRegistrations(otherNamespace)
expect(otherNsRegistrations).to.have.lengthOf(1)
})

it('should be able to limit registrations to get', () => {
rServer = new RendezvousServer(registrar)

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000)
// Add registration for peer 2 in test namespace
rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000)

let testNsRegistrations = rServer.getRegistrations(testNamespace, 1)
expect(testNsRegistrations).to.have.lengthOf(1)

testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(2)
})

it('can remove registrations from a peer in a given namespace', () => {
rServer = new RendezvousServer(registrar)

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000)
// Add registration for peer 2 in test namespace
rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000)

let testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(2)

// Remove registration for peer0
rServer.removeRegistration(testNamespace, peerIds[0])

testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(1)
})

it('can remove all registrations from a peer', () => {
const otherNamespace = 'other-namespace'
rServer = new RendezvousServer(registrar)

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000)
// Add registration for peer 1 in a different namespace
rServer.addRegistration(otherNamespace, peerIds[0], multiaddrs, 1000)

let testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(1)

let otherNsRegistrations = rServer.getRegistrations(otherNamespace)
expect(otherNsRegistrations).to.have.lengthOf(1)

// Remove all registrations for peer0
rServer.removePeerRegistrations(peerIds[0])

testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(0)

otherNsRegistrations = rServer.getRegistrations(otherNamespace)
expect(otherNsRegistrations).to.have.lengthOf(0)
})

it('can attempt to remove a registration for a non existent namespace', () => {
const otherNamespace = 'other-namespace'
rServer = new RendezvousServer(registrar)

rServer.removeRegistration(otherNamespace, peerIds[0])
})

it('can attempt to remove a registration for a non existent peer', () => {
rServer = new RendezvousServer(registrar)

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000)

let testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(1)

// Remove registration for peer0
rServer.removeRegistration(testNamespace, peerIds[1])

testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(1)
})

it('gc expired records', async () => {
rServer = new RendezvousServer(registrar, { gcInterval: 300 })

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 500)
rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000)

let testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(2)

// wait for firt record to be removed
await delay(650)
testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(1)

await delay(400)
testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(0)
})
})
14 changes: 14 additions & 0 deletions test/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ const defaultConfig = {
}
}

/**
* Create Perr Id.
* @param {Object} [properties]
* @param {number} [properties.number] number of peers (default: 1).
* @return {Promise<Array<PeerId>>}
*/
async function createPeerId ({ number = 1 }) {
const peerIds = await pTimes(number, (i) => PeerId.createFromJSON(Peers[i]))

return peerIds
}

module.exports.createPeerId = createPeerId

/**
* Create libp2p nodes.
* @param {Object} [properties]
Expand Down