Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
support handling invalidation push messages
  • Loading branch information
sjpotter committed Mar 14, 2024
commit 319ab87e2e7985644177d7fa6ff2625d24fb6403
26 changes: 24 additions & 2 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
import encodeCommand from '../RESP/encoder';
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types';
import { CommandArguments, TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { AbortError, ErrorReply } from '../errors';
import { MonitorCallback } from '.';
Expand Down Expand Up @@ -56,6 +56,8 @@ export default class RedisCommandsQueue {
return this.#pubSub.isActive;
}

#invalidateCallback?: (key: RedisArgument | null) => unknown;

constructor(
respVersion: RespVersions,
maxLength: number | null | undefined,
Expand Down Expand Up @@ -109,13 +111,33 @@ export default class RedisCommandsQueue {
onErrorReply: err => this.#onErrorReply(err),
onPush: push => {
if (!this.#onPush(push)) {

switch (push[0].toString()) {
case "invalidate": {
console.log("invalidate push message");
if (this.#invalidateCallback) {
if (push[1] !== null) {
for (const key of push[1]) {
console.log(`invalidating key ${key}`);
this.#invalidateCallback(key);
}
} else {
console.log(`invalidating all keys`);
this.#invalidateCallback(null);
}
}
break;
}
}
}
},
getTypeMapping: () => this.#getTypeMapping()
});
}

setInvalidateCallback(callback?: (key: RedisArgument | null) => unknown) {
this.#invalidateCallback = callback;
}

addCommand<T>(
args: CommandArguments,
options?: CommandOptions
Expand Down