Skip to content

Commit 779b28b

Browse files
committed
Queue now respects max items.
1 parent 521a334 commit 779b28b

File tree

2 files changed

+23
-5
lines changed

2 files changed

+23
-5
lines changed

packages/core/src/queue/DefaultEventQueue.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ export class DefaultEventQueue implements IEventQueue {
5050
private _queue: EventQueueItem[] = [];
5151
private _loadPersistedEvents = true;
5252

53-
// TODO: Implement support for max queue items.
5453
constructor(
5554
private config: Configuration,
5655
private maxItems: number = 250
@@ -277,12 +276,21 @@ export class DefaultEventQueue implements IEventQueue {
277276
this._lastFileTimestamp = Math.max(Date.now(), this._lastFileTimestamp + 1);
278277
const file = `${this.QUEUE_PREFIX}${this._lastFileTimestamp}.json`;
279278

280-
this._queue.push({ file, event });
281-
if (this.config.usePersistedQueueStorage) {
279+
const { storage, log } = this.config.services;
280+
const useStorage: boolean = this.config.usePersistedQueueStorage;
281+
if (this._queue.push({ file, event }) > this.maxItems) {
282+
log.trace("Removing oldest queue entry: maxItems exceeded");
283+
const item = this._queue.shift();
284+
if (useStorage) {
285+
await storage.removeItem(item.file);
286+
}
287+
}
288+
289+
if (useStorage) {
282290
try {
283-
await this.config.services.storage.setItem(file, JSON.stringify(event));
291+
await storage.setItem(file, JSON.stringify(event));
284292
} catch (ex) {
285-
this.config.services.log.error(`Error saving queue item to storage: ${ex.message}`)
293+
log.error(`Error saving queue item to storage: ${ex.message}`)
286294
}
287295
}
288296

packages/core/test/queue/DefaultEventQueue.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Configuration } from "../../src/configuration/Configuration.js";
22
import { Event } from "../../src/models/Event.js";
3+
import { DefaultEventQueue } from "../../src/queue/DefaultEventQueue.js";
34
import { delay } from "../helpers.js";
45

56
describe("DefaultEventQueue", () => {
@@ -60,4 +61,13 @@ describe("DefaultEventQueue", () => {
6061
expect(await config.services.storage.length()).toBe(1);
6162
}
6263
});
64+
65+
test("should respect max items", async () => {
66+
config.services.queue = new DefaultEventQueue(config, 1);
67+
const event: Event = { type: "log", reference_id: "123454321" };
68+
for (let index = 0; index < 2; index++) {
69+
await config.services.queue.enqueue(event);
70+
expect(await config.services.storage.length()).toBe(1);
71+
}
72+
});
6373
});

0 commit comments

Comments
 (0)