diff --git a/.aegir.js b/.aegir.js index ce11caa..dd3f99b 100644 --- a/.aegir.js +++ b/.aegir.js @@ -65,7 +65,7 @@ const before = async () => { interval: 5000 }) // Some more time waiting - await delay(10e3) + await delay(12e3) } const after = async () => { diff --git a/package.json b/package.json index 4a71f77..3d4347e 100644 --- a/package.json +++ b/package.json @@ -76,6 +76,7 @@ "ipfs-utils": "^5.0.1", "is-ci": "^2.0.0", "p-defer": "^3.0.0", + "p-retry": "^4.2.0", "p-times": "^3.0.0", "p-wait-for": "^3.1.0", "sinon": "^9.0.3" diff --git a/src/server/index.js b/src/server/index.js index f9b8608..2d4b46a 100644 --- a/src/server/index.js +++ b/src/server/index.js @@ -26,6 +26,7 @@ const { GC_MAX_REGISTRATIONS, PROTOCOL_MULTICODEC } = require('./constants') +const { fallbackNullish } = require('./utils') /** * @typedef {import('./datastores/interface').Datastore} Datastore @@ -65,23 +66,21 @@ class RendezvousServer extends Libp2p { constructor (libp2pOptions, options) { super(libp2pOptions) - this._gcDelay = options.gcDelay || 3e5 - this._gcInterval = options.gcInterval || 7.2e6 - this._minTtl = options.minTtl || MIN_TTL - this._maxTtl = options.maxTtl || MAX_TTL - this._maxNsLength = options.maxNsLength || MAX_NS_LENGTH - this._maxDiscoveryLimit = options.maxDiscoveryLimit || MAX_DISCOVER_LIMIT - this._maxPeerRegistrations = options.maxPeerRegistrations || MAX_PEER_REGISTRATIONS + this._minTtl = fallbackNullish(options.minTtl, MIN_TTL) + this._maxTtl = fallbackNullish(options.maxTtl, MAX_TTL) + this._maxNsLength = fallbackNullish(options.maxNsLength, MAX_NS_LENGTH) + this._maxDiscoveryLimit = fallbackNullish(options.maxDiscoveryLimit, MAX_DISCOVER_LIMIT) + this._maxPeerRegistrations = fallbackNullish(options.maxPeerRegistrations, MAX_PEER_REGISTRATIONS) this.rendezvousDatastore = options.datastore this._registrationsCount = 0 this._lastGcTs = 0 - this._gcDelay = options.gcBootDelay || GC_BOOT_DELAY - this._gcInterval = options.gcInterval || GC_INTERVAL - this._gcMinInterval = options.gcMinInterval || GC_MIN_INTERVAL - this._gcMinRegistrations = options.gcMinRegistrations || GC_MIN_REGISTRATIONS - this._gcMaxRegistrations = options.gcMaxRegistrations || GC_MAX_REGISTRATIONS + this._gcDelay = fallbackNullish(options.gcBootDelay, GC_BOOT_DELAY) + this._gcInterval = fallbackNullish(options.gcInterval, GC_INTERVAL) + this._gcMinInterval = fallbackNullish(options.gcMinInterval, GC_MIN_INTERVAL) + this._gcMinRegistrations = fallbackNullish(options.gcMinRegistrations, GC_MIN_REGISTRATIONS) + this._gcMaxRegistrations = fallbackNullish(options.gcMaxRegistrations, GC_MAX_REGISTRATIONS) this._gcJob = this._gcJob.bind(this) } diff --git a/src/server/utils.js b/src/server/utils.js index 0d5dfbd..becdf70 100644 --- a/src/server/utils.js +++ b/src/server/utils.js @@ -46,7 +46,23 @@ function getListenAddresses (argv) { return listenAddresses } +/** + * Nullish coalescing operator implementation + * + * @template T + * @param {any} value + * @param {T} d - default value + * @returns {T} + */ +function fallbackNullish (value, d) { + if (value === null || value === undefined) { + return d + } + return value +} + module.exports = { getAnnounceAddresses, - getListenAddresses + getListenAddresses, + fallbackNullish } diff --git a/test/server.spec.js b/test/server.spec.js index 8449372..c7acddd 100644 --- a/test/server.spec.js +++ b/test/server.spec.js @@ -3,6 +3,9 @@ const { expect } = require('aegir/utils/chai') const delay = require('delay') +const sinon = require('sinon') +const pRetry = require('p-retry') +const pWaitFor = require('p-wait-for') const multiaddr = require('multiaddr') const Envelope = require('libp2p/src/record/envelope') @@ -41,6 +44,7 @@ describe('rendezvous server', () => { afterEach(async () => { await datastore.reset() rServer && await rServer.stop() + sinon.reset() }) it('can start a rendezvous server', async () => { @@ -299,52 +303,135 @@ describe('rendezvous server', () => { .and.to.have.property('code', errCodes.INVALID_COOKIE) }) - it.skip('gc expired records', async () => { + it('gc expired records on regular interval', async function () { + this.timeout(35e3) + rServer = new RendezvousServer({ ...defaultLibp2pConfig, peerId: peerIds[0] - }, { datastore, gcInterval: 300 }) + }, { + datastore, + gcInterval: 1000, + gcBootDelay: 1000, + gcMinInterval: 0, + gcMinRegistrations: 0 + }) + const spy = sinon.spy(rServer, '_gc') await rServer.start() - // Add registration for peer 1 in test namespace - await rServer.addRegistration(testNamespace, peerIds[1], signedPeerRecords[1], 500) - await rServer.addRegistration(testNamespace, peerIds[2], signedPeerRecords[2], 1000) + // Add registrations in test namespace + await rServer.addRegistration(testNamespace, peerIds[1], signedPeerRecords[1], 1500) + await rServer.addRegistration(testNamespace, peerIds[2], signedPeerRecords[2], 3200) let r = await rServer.getRegistrations(testNamespace) expect(r.registrations).to.have.lengthOf(2) - // wait for firt record to be removed - await delay(650) + // wait for firt record to be removed (2nd gc) + await pWaitFor(() => spy.callCount >= 2) + r = await rServer.getRegistrations(testNamespace) expect(r.registrations).to.have.lengthOf(1) - await delay(400) - r = await rServer.getRegistrations(testNamespace) - expect(r.registrations).to.have.lengthOf(0) + // wait for second record to be removed + await pRetry(async () => { + r = await rServer.getRegistrations(testNamespace) + expect(r.registrations).to.have.lengthOf(0) + }) }) - it.skip('garbage collector should remove cookies of discarded records', async () => { + it('gc expired records when maximum threshold', async function () { + this.timeout(35e3) + rServer = new RendezvousServer({ ...defaultLibp2pConfig, peerId: peerIds[0] - }, { datastore, gcDelay: 300, gcInterval: 300 }) + }, { + datastore, + // gcMinInterval: 0, + gcMaxRegistrations: 2 + }) + const spy = sinon.spy(rServer, '_gc') await rServer.start() - // Add registration for peer 1 in test namespace + // Add registrations in test namespace await rServer.addRegistration(testNamespace, peerIds[1], signedPeerRecords[1], 500) - // Get current registrations - const { registrations } = await rServer.getRegistrations(testNamespace) - expect(registrations).to.exist() - expect(registrations).to.have.lengthOf(1) + let r = await rServer.getRegistrations(testNamespace) + expect(r.registrations).to.have.lengthOf(1) + + // Validate peer + let envelope = await Envelope.openAndCertify(r.registrations[0].signedPeerRecord, PeerRecord.DOMAIN) + expect(envelope.peerId.toString()).to.eql(peerIds[1].toString()) + + // Wait for previous record to be expired + await delay(500) + + // Add registrations in test namespace exceending the max number for gc trigger + await rServer.addRegistration(testNamespace, peerIds[2], signedPeerRecords[2], 3200) + + await pWaitFor(() => spy.callCount === 1) - // Verify internal state - // expect(rServer.datastore.nsRegistrations.get(testNamespace).size).to.eql(1) - // expect(rServer.datastore.cookieRegistrations.get(cookie)).to.exist() + // retry as rServer._gc is async and it can be removing + await pRetry(async () => { + r = await rServer.getRegistrations(testNamespace) + expect(r.registrations).to.have.lengthOf(1) - await delay(800) + envelope = await Envelope.openAndCertify(r.registrations[0].signedPeerRecord, PeerRecord.DOMAIN) + expect(envelope.peerId.toString()).to.eql(peerIds[2].toString()) + }) + }) + + it('gc expired records when maximum threshold only if gc min interval', async function () { + this.timeout(45e3) + + rServer = new RendezvousServer({ + ...defaultLibp2pConfig, + peerId: peerIds[0] + }, { + datastore, + gcMaxRegistrations: 2 + }) + const spy = sinon.spy(rServer, '_gc') + await rServer.start() + + // Add registrations in test namespace + await rServer.addRegistration(testNamespace, peerIds[1], signedPeerRecords[1], 500) + + let r = await rServer.getRegistrations(testNamespace) + expect(r.registrations).to.have.lengthOf(1) + + // Wait for previous record to be expired + await delay(500) + + // Add registrations in test namespace exceending the max number for gc trigger + await rServer.addRegistration(testNamespace, peerIds[2], signedPeerRecords[2], 3000) + + // Wait for gc + await pWaitFor(() => spy.callCount === 1) + + // retry as rServer._gc is async and it can take longer to finish + await pRetry(async () => { + r = await rServer.getRegistrations(testNamespace) + expect(r.registrations).to.have.lengthOf(1) + }) + + // Wait for second record to be expired + await delay(3000) + + // Add a new registration + await rServer.addRegistration(testNamespace, peerIds[1], signedPeerRecords[1], 1000) - // expect(rServer.datastore.nsRegistrations.get(testNamespace).size).to.eql(0) - // expect(rServer.datastore.cookieRegistrations.get(cookie)).to.not.exist() + await Promise.race([ + async () => { + // GC should not be triggered, even with max registrations as minInterval was not reached + await pWaitFor(() => spy.callCount === 2) + throw new Error('should not call gc') + }, + // It should return 0 records, even without gc, as expired records are not returned + await pRetry(async () => { + r = await rServer.getRegistrations(testNamespace) + expect(r.registrations).to.have.lengthOf(0) + }) + ]) }) })