Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b8bf044
feat: rendezvous protocol full implementation
vasco-santos May 20, 2020
d7290df
chore: interface-peer-discovery compliance
vasco-santos Jul 10, 2020
b080670
feat: garbage collector
vasco-santos Jul 13, 2020
ebb22d1
feat: cookie for discovery
vasco-santos Jul 13, 2020
9765a95
chore: cleanup
vasco-santos Jul 15, 2020
b6edaf3
chore: update aegir
vasco-santos Jul 17, 2020
0e304f9
chore: convert to seconds in the wire
vasco-santos Jul 17, 2020
b248924
chore: remove unregister comments for response
vasco-santos Jul 20, 2020
47641f7
feat: use signed peer records to exchange multiaddrs
vasco-santos Jul 22, 2020
b668c8a
chore: tests
vasco-santos Jul 22, 2020
7e3c541
chore: change readme
vasco-santos Jul 27, 2020
1a1590d
chore: update deps
vasco-santos Sep 22, 2020
b357829
chore: remove peer discovery interface as we will be creating libp2p.…
vasco-santos Sep 22, 2020
4abd363
chore: use uint8array instead of buffer
vasco-santos Sep 22, 2020
7763df2
chore: update libp2p integration doc
vasco-santos Sep 28, 2020
63d607b
chore: fix register ttl param return
vasco-santos Sep 28, 2020
5f45c6f
chore: update docs
vasco-santos Sep 29, 2020
640b64f
chore: remove enabled property from libp2p integration doc
vasco-santos Oct 5, 2020
8f6e148
chore: apply suggestions from code review
vasco-santos Nov 16, 2020
894ad2e
chore: separate server and client rendezvous
vasco-santos Nov 17, 2020
ee10d69
chore: add docker
vasco-santos Nov 17, 2020
3798fbb
fix: changed default values and moved them into the server with prope…
vasco-santos Nov 17, 2020
cdf2f6b
chore: update docs and constants
vasco-santos Nov 17, 2020
9fe0691
chore: add tests for protocol with direct connection to server
vasco-santos Nov 18, 2020
52fa2bd
chore: DoS protection with max registrations
vasco-santos Nov 19, 2020
06d53ac
chore: refactor client
vasco-santos Nov 21, 2020
d501d97
chore: add datastore and types
vasco-santos Dec 8, 2020
a2d5f83
chore: fix build
vasco-santos Dec 13, 2020
83cd4b7
chore: run with mysql
vasco-santos Dec 21, 2020
9b294f5
feat: gc
vasco-santos Dec 24, 2020
9bf5bcb
chore: add gc tests
vasco-santos Dec 24, 2020
7a569c7
chore: review docs and binary
vasco-santos Dec 24, 2020
c297156
chore: add datastore docs and model picture
vasco-santos Dec 28, 2020
e1cd224
chore: add library docs
vasco-santos Dec 28, 2020
264ce2a
chore: add docker setup docks
vasco-santos Dec 28, 2020
01ec7bc
chore: remove client code and move server into src
vasco-santos Jan 4, 2021
5f025d3
chore: use connection pool
vasco-santos Jan 11, 2021
2840251
fix: bin stdout addresses and ports correctly
vasco-santos Jan 15, 2021
9da390f
chore: add benchmarks
vasco-santos Jan 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chore: refactor client
  • Loading branch information
vasco-santos committed Nov 21, 2020
commit 06d53ac334ad295d82e69080eda004ec2c6fd450
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
"it-length-prefixed": "^3.1.0",
"it-pipe": "^1.1.0",
"libp2p": "libp2p/js-libp2p#0.30.x",
"libp2p-interfaces": "^0.5.1",
"libp2p-mplex": "^0.10.0",
"libp2p-noise": "^2.0.1",
"libp2p-tcp": "^0.15.1",
Expand Down
5 changes: 1 addition & 4 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,5 @@

exports.codes = {
INVALID_NAMESPACE: 'ERR_INVALID_NAMESPACE',
INVALID_TTL: 'ERR_INVALID_TTL',
INVALID_LIMIT: 'ERR_INVALID_LIMIT',
NO_CONNECTED_RENDEZVOUS_SERVERS: 'ERR_NO_CONNECTED_RENDEZVOUS_SERVERS',
INVALID_COOKIE: 'ERR_INVALID_COOKIE'
NO_CONNECTED_RENDEZVOUS_SERVERS: 'ERR_NO_CONNECTED_RENDEZVOUS_SERVERS'
}
112 changes: 61 additions & 51 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const { toBuffer } = require('it-buffer')
const fromString = require('uint8arrays/from-string')
const toString = require('uint8arrays/to-string')

const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')
const PeerId = require('peer-id')

const { codes: errCodes } = require('./errors')
const {
Expand All @@ -27,11 +27,9 @@ const MESSAGE_TYPE = Message.MessageType
*/

/**
* Rendezvous point contains the connection to a rendezvous server, as well as,
* the cookies per namespace that the client received.
* Rendezvous point contains the cookies per namespace that the client received.
*
* @typedef {Object} RendezvousPoint
* @property {Connection} connection
* @property {Map<string, string>} cookies
*/

Expand All @@ -44,21 +42,24 @@ class Rendezvous {
* Libp2p Rendezvous. A lightweight mechanism for generalized peer discovery.
*
* @class
* @param {RendezvousProperties} params
* @param {RendezvousProperties & RendezvousOptions} params
*/
constructor ({ libp2p }) {
constructor ({ libp2p, maxRendezvousPoints }) {
this._libp2p = libp2p
this._peerId = libp2p.peerId
this._registrar = libp2p.registrar
this._peerStore = libp2p.peerStore
this._connectionManager = libp2p.connectionManager

this._maxRendezvousPoints = maxRendezvousPoints

this._isStarted = false

/**
* @type {Map<string, RendezvousPoint>}
*/
this._rendezvousPoints = new Map()

this._registrarId = undefined
this._onPeerConnected = this._onPeerConnected.bind(this)
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)
this._onProtocolChange = this._onProtocolChange.bind(this)
}

/**
Expand All @@ -67,71 +68,63 @@ class Rendezvous {
* @returns {void}
*/
start () {
if (this._registrarId) {
if (this._isStarted) {
return
}

log('starting')

// register protocol with topology
const topology = new MulticodecTopology({
multicodecs: PROTOCOL_MULTICODEC,
handlers: {
onConnect: this._onPeerConnected,
onDisconnect: this._onPeerDisconnected
}
})
this._registrarId = this._registrar.register(topology)
this._peerStore.on('change:protocols', this._onProtocolChange)
this._isStarted = true

log('started')
}

/**
* Unregister the rendezvous protocol and clear the state.
* Clear the rendezvous state and remove listeners.
*
* @returns {void}
*/
stop () {
if (!this._registrarId) {
if (!this._isStarted) {
return
}

log('stopping')

// unregister protocol and handlers
this._registrar.unregister(this._registrarId)

this._registrarId = undefined
this._peerStore.removeListener('change:protocols', this._onProtocolChange)
this._rendezvousPoints.clear()

this._isStarted = false
log('stopped')
}

/**
* Registrar notifies a connection successfully with rendezvous protocol.
*
* @private
* @param {PeerId} peerId - remote peer-id
* @param {Connection} conn - connection to the peer
*/
_onPeerConnected (peerId, conn) {
const idB58Str = peerId.toB58String()
log('connected', idB58Str)

this._rendezvousPoints.set(idB58Str, { connection: conn })
}

/**
* Registrar notifies a closing connection with rendezvous protocol.
* Check if a peer supports the rendezvous protocol.
* If the protocol is not supported, check if it was supported before and remove it as a rendezvous point.
* If the protocol is supported, add it to the known rendezvous points.
*
* @private
* @param {PeerId} peerId - peerId
* @param {Object} props
* @param {PeerId} props.peerId
* @param {Array<string>} props.protocols
* @returns {void}
*/
_onPeerDisconnected (peerId) {
const idB58Str = peerId.toB58String()
log('disconnected', idB58Str)
_onProtocolChange ({ peerId, protocols }) {
const id = peerId.toB58String()

// Check if it has the protocol
const hasProtocol = protocols.find(protocol => protocol === PROTOCOL_MULTICODEC)
const hasRendezvousPoint = this._rendezvousPoints.has(id)

// If no protocol, check if we were keeping the peer before
if (!hasProtocol && hasRendezvousPoint) {
this._rendezvousPoints.delete(id)
log(`removed ${id} from rendezvous points as it does not suport ${PROTOCOL_MULTICODEC} anymore`)
} else if (hasProtocol && !this._rendezvousPoints.has(id)) {
this._rendezvousPoints.set(id, { cookies: new Map() })
}

this._rendezvousPoints.delete(idB58Str)
// TODO: Hint that connection can be discarded?
}

/**
Expand All @@ -152,6 +145,10 @@ class Rendezvous {
throw errCode(new Error('no rendezvous servers connected'), errCodes.NO_CONNECTED_RENDEZVOUS_SERVERS)
}

// TODO: we should protect from getting to many rendezvous points and sending to all
// Should we have a custom max number of servers and a custom sorter function?
// Default to peers already connected

const message = Message.encode({
type: MESSAGE_TYPE.REGISTER,
register: {
Expand All @@ -163,7 +160,7 @@ class Rendezvous {

const registerTasks = []
const taskFn = async (id) => {
const { connection } = this._rendezvousPoints.get(id)
const connection = await this._libp2p.dial(PeerId.createFromCID(id))
const { stream } = await connection.newStream(PROTOCOL_MULTICODEC)

const [response] = await pipe(
Expand All @@ -175,6 +172,10 @@ class Rendezvous {
collect
)

if (!connection.streams.length) {
await connection.close()
}

const recMessage = Message.decode(response)

if (!recMessage.type === MESSAGE_TYPE.REGISTER_RESPONSE) {
Expand All @@ -193,6 +194,7 @@ class Rendezvous {
}

// Return first ttl
// pAny here?
const [returnTtl] = await Promise.all(registerTasks)

return returnTtl
Expand Down Expand Up @@ -224,7 +226,7 @@ class Rendezvous {

const unregisterTasks = []
const taskFn = async (id) => {
const { connection } = this._rendezvousPoints.get(id)
const connection = await this._libp2p.dial(PeerId.createFromCID(id))
const { stream } = await connection.newStream(PROTOCOL_MULTICODEC)

await pipe(
Expand All @@ -235,6 +237,10 @@ class Rendezvous {
for await (const _ of source) { } // eslint-disable-line
}
)

if (!connection.streams.length) {
await connection.close()
}
}

for (const id of this._rendezvousPoints.keys()) {
Expand Down Expand Up @@ -279,7 +285,8 @@ class Rendezvous {
})

// Send discover message and wait for response
const { stream } = await rp.connection.newStream(PROTOCOL_MULTICODEC)
const connection = await this._libp2p.dial(PeerId.createFromCID(id))
const { stream } = await connection.newStream(PROTOCOL_MULTICODEC)
const [response] = await pipe(
[message],
lp.encode(),
Expand All @@ -289,6 +296,10 @@ class Rendezvous {
collect
)

if (!connection.streams.length) {
await connection.close()
}

const recMessage = Message.decode(response)

if (!recMessage.type === MESSAGE_TYPE.DISCOVER_RESPONSE) {
Expand All @@ -305,7 +316,6 @@ class Rendezvous {
// Store cookie
rpCookies.set(ns, toString(recMessage.discoverResponse.cookie))
this._rendezvousPoints.set(id, {
connection: rp.connection,
cookies: rpCookies
})

Expand Down
5 changes: 5 additions & 0 deletions src/server/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
'use strict'

exports.codes = {
INVALID_COOKIE: 'ERR_INVALID_COOKIE'
}
2 changes: 1 addition & 1 deletion src/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const errCode = require('err-code')
const Libp2p = require('libp2p')
const PeerId = require('peer-id')

const { codes: errCodes } = require('../errors')
const { codes: errCodes } = require('./errors')
const rpc = require('./rpc')
const {
MIN_TTL,
Expand Down
2 changes: 1 addition & 1 deletion src/server/rpc/handlers/discover.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const { Message } = require('../../../proto')
const MESSAGE_TYPE = Message.MessageType
const RESPONSE_STATUS = Message.ResponseStatus

const { codes: errCodes } = require('../../../errors')
const { codes: errCodes } = require('../../errors')

/**
* @typedef {import('peer-id')} PeerId
Expand Down
Loading