Skip to content

Commit a13f35f

Browse files
fix: ensure the order of the commands
Before this change, the broadcast() method would send the BROADCAST command and then apply it locally (which is required to retrieve the offset of the message, when connection state recovery is enabled), while the other commands like disconnectSockets() would first apply it locally and then send the command to the other nodes. So, for example: ```js io.emit("bye"); io.disconnectSockets(); ``` In that case, the clients connected to the io instance would not receive the "bye" event, while the clients connected to the other server instances would receive it before being disconnected. Related: - socketio/socket.io-redis-streams-adapter#13 - socketio/socket.io-postgres-adapter#12
1 parent 207c0db commit a13f35f

File tree

3 files changed

+74
-37
lines changed

3 files changed

+74
-37
lines changed

lib/cluster-adapter.ts

+45-36
Original file line numberDiff line numberDiff line change
@@ -503,55 +503,64 @@ export abstract class ClusterAdapter extends Adapter {
503503
super.broadcastWithAck(packet, opts, clientCountCallback, ack);
504504
}
505505

506-
override addSockets(opts: BroadcastOptions, rooms: Room[]) {
507-
super.addSockets(opts, rooms);
508-
506+
override async addSockets(opts: BroadcastOptions, rooms: Room[]) {
509507
const onlyLocal = opts.flags?.local;
510-
if (onlyLocal) {
511-
return;
508+
509+
if (!onlyLocal) {
510+
try {
511+
await this.publishAndReturnOffset({
512+
type: MessageType.SOCKETS_JOIN,
513+
data: {
514+
opts: encodeOptions(opts),
515+
rooms,
516+
},
517+
});
518+
} catch (e) {
519+
debug("[%s] error while publishing message: %s", this.uid, e.message);
520+
}
512521
}
513522

514-
this.publish({
515-
type: MessageType.SOCKETS_JOIN,
516-
data: {
517-
opts: encodeOptions(opts),
518-
rooms,
519-
},
520-
});
523+
super.addSockets(opts, rooms);
521524
}
522525

523-
override delSockets(opts: BroadcastOptions, rooms: Room[]) {
524-
super.delSockets(opts, rooms);
525-
526+
override async delSockets(opts: BroadcastOptions, rooms: Room[]) {
526527
const onlyLocal = opts.flags?.local;
527-
if (onlyLocal) {
528-
return;
528+
529+
if (!onlyLocal) {
530+
try {
531+
await this.publishAndReturnOffset({
532+
type: MessageType.SOCKETS_LEAVE,
533+
data: {
534+
opts: encodeOptions(opts),
535+
rooms,
536+
},
537+
});
538+
} catch (e) {
539+
debug("[%s] error while publishing message: %s", this.uid, e.message);
540+
}
529541
}
530542

531-
this.publish({
532-
type: MessageType.SOCKETS_LEAVE,
533-
data: {
534-
opts: encodeOptions(opts),
535-
rooms,
536-
},
537-
});
543+
super.delSockets(opts, rooms);
538544
}
539545

540-
override disconnectSockets(opts: BroadcastOptions, close: boolean) {
541-
super.disconnectSockets(opts, close);
542-
546+
override async disconnectSockets(opts: BroadcastOptions, close: boolean) {
543547
const onlyLocal = opts.flags?.local;
544-
if (onlyLocal) {
545-
return;
548+
549+
if (!onlyLocal) {
550+
try {
551+
await this.publishAndReturnOffset({
552+
type: MessageType.DISCONNECT_SOCKETS,
553+
data: {
554+
opts: encodeOptions(opts),
555+
close,
556+
},
557+
});
558+
} catch (e) {
559+
debug("[%s] error while publishing message: %s", this.uid, e.message);
560+
}
546561
}
547562

548-
this.publish({
549-
type: MessageType.DISCONNECT_SOCKETS,
550-
data: {
551-
opts: encodeOptions(opts),
552-
close,
553-
},
554-
});
563+
super.disconnectSockets(opts, close);
555564
}
556565

557566
async fetchSockets(opts: BroadcastOptions): Promise<any[]> {

test/cluster-adapter.ts

+25-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Server, Socket as ServerSocket } from "socket.io";
33
import { io as ioc, Socket as ClientSocket } from "socket.io-client";
44
import expect = require("expect.js");
55
import type { AddressInfo } from "net";
6-
import { times, shouldNotHappen } from "./util";
6+
import { times, shouldNotHappen, sleep } from "./util";
77
import {
88
ClusterAdapterWithHeartbeat,
99
type ClusterMessage,
@@ -243,6 +243,8 @@ describe("cluster adapter", () => {
243243
it("makes all socket instances join the specified room", async () => {
244244
servers[0].socketsJoin("room1");
245245

246+
await sleep();
247+
246248
expect(serverSockets[0].rooms.has("room1")).to.be(true);
247249
expect(serverSockets[1].rooms.has("room1")).to.be(true);
248250
expect(serverSockets[2].rooms.has("room1")).to.be(true);
@@ -254,6 +256,8 @@ describe("cluster adapter", () => {
254256

255257
servers[0].in("room1").socketsJoin("room2");
256258

259+
await sleep();
260+
257261
expect(serverSockets[0].rooms.has("room2")).to.be(true);
258262
expect(serverSockets[1].rooms.has("room2")).to.be(false);
259263
expect(serverSockets[2].rooms.has("room2")).to.be(true);
@@ -275,6 +279,8 @@ describe("cluster adapter", () => {
275279

276280
servers[0].socketsLeave("room1");
277281

282+
await sleep();
283+
278284
expect(serverSockets[0].rooms.has("room1")).to.be(false);
279285
expect(serverSockets[1].rooms.has("room1")).to.be(false);
280286
expect(serverSockets[2].rooms.has("room1")).to.be(false);
@@ -287,6 +293,8 @@ describe("cluster adapter", () => {
287293

288294
servers[0].in("room1").socketsLeave("room2");
289295

296+
await sleep();
297+
290298
expect(serverSockets[0].rooms.has("room2")).to.be(false);
291299
expect(serverSockets[1].rooms.has("room2")).to.be(false);
292300
expect(serverSockets[2].rooms.has("room2")).to.be(true);
@@ -318,6 +326,22 @@ describe("cluster adapter", () => {
318326

319327
servers[0].disconnectSockets(true);
320328
});
329+
330+
it("sends a packet before all socket instances disconnect", (done) => {
331+
const partialDone = times(3, done);
332+
333+
clientSockets.forEach((clientSocket) => {
334+
clientSocket.on("disconnect", shouldNotHappen(done));
335+
336+
clientSocket.on("bye", () => {
337+
clientSocket.off("disconnect");
338+
clientSocket.on("disconnect", partialDone);
339+
});
340+
});
341+
342+
servers[0].emit("bye");
343+
servers[0].disconnectSockets(true);
344+
});
321345
});
322346

323347
describe("fetchSockets", () => {

test/util.ts

+4
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,7 @@ export function times(count: number, fn: () => void) {
1313
export function shouldNotHappen(done) {
1414
return () => done(new Error("should not happen"));
1515
}
16+
17+
export function sleep() {
18+
return new Promise<void>((resolve) => process.nextTick(resolve));
19+
}

0 commit comments

Comments
 (0)