Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add participantChannel to isolate keeping count of participants
participantChannel is a special channel used only to keep track of participants. No data goes through it. This prevents bugs associated with having non-participating subscribers on the channels that have data going through them.
  • Loading branch information
doodlesbykumbi authored May 18, 2021
commit 8a94e519419659193ef286295353cacc13cefbd3
18 changes: 13 additions & 5 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export class RedisAdapter extends Adapter {
public readonly requestsTimeout: number;

private readonly channel: string;
private readonly participantChannel: string;
private readonly requestChannel: string;
private readonly responseChannel: string;
private requests: Map<string, Request> = new Map();
Expand Down Expand Up @@ -94,6 +95,7 @@ export class RedisAdapter extends Adapter {
const prefix = opts.key || "socket.io";

this.channel = prefix + "#" + nsp.name + "#";
this.participantChannel = prefix + "-participant#" + this.nsp.name + "#";
this.requestChannel = prefix + "-request#" + this.nsp.name + "#";
this.responseChannel = prefix + "-response#" + this.nsp.name + "#";

Expand All @@ -107,7 +109,7 @@ export class RedisAdapter extends Adapter {
this.subClient.on("pmessageBuffer", this.onmessage.bind(this));

this.subClient.subscribe(
[this.requestChannel, this.responseChannel],
[this.participantChannel, this.requestChannel, this.responseChannel],
onError
);
this.subClient.on("messageBuffer", this.onrequest.bind(this));
Expand Down Expand Up @@ -855,12 +857,17 @@ export class RedisAdapter extends Adapter {
const nodes = this.pubClient.nodes();
return Promise.all(
nodes.map((node) =>
node.send_command("pubsub", ["numsub", this.requestChannel])
node.send_command("pubsub", [
"numsub",
this.requestChannel,
this.participantChannel,
])
)
).then((values) => {
let numSub = 0;
values.map((value) => {
numSub += parseInt(value[1], 10);
// Fall back to requestChannel for backwards compatibility
numSub += parseInt(value[3] || value[1], 10);
});
return numSub;
});
Expand All @@ -869,10 +876,11 @@ export class RedisAdapter extends Adapter {
return new Promise((resolve, reject) => {
this.pubClient.send_command(
"pubsub",
["numsub", this.requestChannel],
["numsub", this.requestChannel, this.participantChannel],
(err, numSub) => {
if (err) return reject(err);
resolve(parseInt(numSub[1], 10));
// Fall back to requestChannel for backwards compatibility
resolve(parseInt(numSub[3] || numSub[1], 10));
}
);
});
Expand Down