Skip to content

Commit

Permalink
feat: add support for inter-server communication
Browse files Browse the repository at this point in the history
Syntax:

```js
// server A
io.serverSideEmit("hello", "world");

// server B
io.on("hello", (arg) => {
  console.log(arg); // prints "world"
});
```

With acknowledgements:

```js
// server A
io.serverSideEmit("hello", "world", (err, responses) => {
  console.log(responses); // prints ["hi"]
});

// server B
io.on("hello", (arg, callback) => {
  callback("hi");
});
```

This feature replaces the customHook/customRequest API from the Redis
adapter: socketio/socket.io-redis-adapter#370
  • Loading branch information
darrachequesne committed May 10, 2021
1 parent dc381b7 commit 93cce05
Showing 6 changed files with 176 additions and 43 deletions.
28 changes: 21 additions & 7 deletions lib/client.ts
Original file line number Diff line number Diff line change
@@ -18,16 +18,23 @@ interface WriteOptions {

export class Client<
ListenEvents extends EventsMap,
EmitEvents extends EventsMap
EmitEvents extends EventsMap,
ServerSideEvents extends EventsMap
> {
public readonly conn;

private readonly id: string;
private readonly server: Server<ListenEvents, EmitEvents>;
private readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
private readonly encoder: Encoder;
private readonly decoder: Decoder;
private sockets: Map<SocketId, Socket<ListenEvents, EmitEvents>> = new Map();
private nsps: Map<string, Socket<ListenEvents, EmitEvents>> = new Map();
private sockets: Map<
SocketId,
Socket<ListenEvents, EmitEvents, ServerSideEvents>
> = new Map();
private nsps: Map<
string,
Socket<ListenEvents, EmitEvents, ServerSideEvents>
> = new Map();
private connectTimeout?: NodeJS.Timeout;

/**
@@ -37,7 +44,10 @@ export class Client<
* @param conn
* @package
*/
constructor(server: Server<ListenEvents, EmitEvents>, conn: any) {
constructor(
server: Server<ListenEvents, EmitEvents, ServerSideEvents>,
conn: any
) {
this.server = server;
this.conn = conn;
this.encoder = server.encoder;
@@ -98,7 +108,11 @@ export class Client<
this.server._checkNamespace(
name,
auth,
(dynamicNspName: Namespace<ListenEvents, EmitEvents> | false) => {
(
dynamicNspName:
| Namespace<ListenEvents, EmitEvents, ServerSideEvents>
| false
) => {
if (dynamicNspName) {
debug("dynamic namespace %s was created", dynamicNspName);
this.doConnect(name, auth);
@@ -156,7 +170,7 @@ export class Client<
*
* @private
*/
_remove(socket: Socket<ListenEvents, EmitEvents>): void {
_remove(socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>): void {
if (this.sockets.has(socket.id)) {
const nsp = this.sockets.get(socket.id)!.nsp.name;
this.sockets.delete(socket.id);
49 changes: 38 additions & 11 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import {
DefaultEventsMap,
EventParams,
StrictEventEmitter,
EventNames,
} from "./typed-events";

const debug = debugModule("socket.io:server");
@@ -170,13 +171,18 @@ interface ServerOptions extends EngineAttachOptions {

export class Server<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = {}
> extends StrictEventEmitter<
{},
ServerSideEvents,
EmitEvents,
NamespaceReservedEventsMap<ListenEvents, EmitEvents>
NamespaceReservedEventsMap<ListenEvents, EmitEvents, ServerSideEvents>
> {
public readonly sockets: Namespace<ListenEvents, EmitEvents>;
public readonly sockets: Namespace<
ListenEvents,
EmitEvents,
ServerSideEvents
>;
/**
* A reference to the underlying Engine.IO server.
*
@@ -197,10 +203,13 @@ export class Server<
/**
* @private
*/
_nsps: Map<string, Namespace<ListenEvents, EmitEvents>> = new Map();
_nsps: Map<
string,
Namespace<ListenEvents, EmitEvents, ServerSideEvents>
> = new Map();
private parentNsps: Map<
ParentNspNameMatchFn,
ParentNamespace<ListenEvents, EmitEvents>
ParentNamespace<ListenEvents, EmitEvents, ServerSideEvents>
> = new Map();
private _adapter?: typeof Adapter;
private _serveClient: boolean;
@@ -280,7 +289,9 @@ export class Server<
_checkNamespace(
name: string,
auth: { [key: string]: any },
fn: (nsp: Namespace<ListenEvents, EmitEvents> | false) => void
fn: (
nsp: Namespace<ListenEvents, EmitEvents, ServerSideEvents> | false
) => void
): void {
if (this.parentNsps.size === 0) return fn(false);

@@ -589,8 +600,8 @@ export class Server<
*/
public of(
name: string | RegExp | ParentNspNameMatchFn,
fn?: (socket: Socket<ListenEvents, EmitEvents>) => void
): Namespace<ListenEvents, EmitEvents> {
fn?: (socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>) => void
): Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
if (typeof name === "function" || name instanceof RegExp) {
const parentNsp = new ParentNamespace(this);
debug("initializing parent namespace %s", parentNsp.name);
@@ -649,7 +660,7 @@ export class Server<
*/
public use(
fn: (
socket: Socket<ListenEvents, EmitEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
next: (err?: ExtendedError) => void
) => void
): this {
@@ -686,7 +697,9 @@ export class Server<
* @return self
* @public
*/
public except(name: Room | Room[]): Server<ListenEvents, EmitEvents> {
public except(
name: Room | Room[]
): Server<ListenEvents, EmitEvents, ServerSideEvents> {
this.sockets.except(name);
return this;
}
@@ -713,6 +726,20 @@ export class Server<
return this;
}

/**
* Emit a packet to other Socket.IO servers
*
* @param ev - the event name
* @param args - an array of arguments, which may include an acknowledgement callback at the end
* @public
*/
public serverSideEmit<Ev extends EventNames<ServerSideEvents>>(
ev: Ev,
...args: EventParams<ServerSideEvents, Ev>
): boolean {
return this.sockets.serverSideEmit(ev, ...args);
}

/**
* Gets a list of socket ids.
*
72 changes: 57 additions & 15 deletions lib/namespace.ts
Original file line number Diff line number Diff line change
@@ -20,35 +20,43 @@ export interface ExtendedError extends Error {

export interface NamespaceReservedEventsMap<
ListenEvents extends EventsMap,
EmitEvents extends EventsMap
EmitEvents extends EventsMap,
ServerSideEvents extends EventsMap
> {
connect: (socket: Socket<ListenEvents, EmitEvents>) => void;
connection: (socket: Socket<ListenEvents, EmitEvents>) => void;
connect: (socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>) => void;
connection: (
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>
) => void;
}

export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set<
keyof NamespaceReservedEventsMap<never, never, never>
>(<const>["connect", "connection"]);

export class Namespace<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = {}
> extends StrictEventEmitter<
{},
ServerSideEvents,
EmitEvents,
NamespaceReservedEventsMap<ListenEvents, EmitEvents>
NamespaceReservedEventsMap<ListenEvents, EmitEvents, ServerSideEvents>
> {
public readonly name: string;
public readonly sockets: Map<
SocketId,
Socket<ListenEvents, EmitEvents>
Socket<ListenEvents, EmitEvents, ServerSideEvents>
> = new Map();

public adapter: Adapter;

/** @private */
readonly server: Server<ListenEvents, EmitEvents>;
readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;

/** @private */
_fns: Array<
(
socket: Socket<ListenEvents, EmitEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
next: (err?: ExtendedError) => void
) => void
> = [];
@@ -62,7 +70,10 @@ export class Namespace<
* @param server instance
* @param name
*/
constructor(server: Server<ListenEvents, EmitEvents>, name: string) {
constructor(
server: Server<ListenEvents, EmitEvents, ServerSideEvents>,
name: string
) {
super();
this.server = server;
this.name = name;
@@ -88,7 +99,7 @@ export class Namespace<
*/
public use(
fn: (
socket: Socket<ListenEvents, EmitEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
next: (err?: ExtendedError) => void
) => void
): this {
@@ -104,7 +115,7 @@ export class Namespace<
* @private
*/
private run(
socket: Socket<ListenEvents, EmitEvents>,
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
fn: (err: ExtendedError | null) => void
) {
const fns = this._fns.slice(0);
@@ -166,10 +177,10 @@ export class Namespace<
* @private
*/
_add(
client: Client<ListenEvents, EmitEvents>,
client: Client<ListenEvents, EmitEvents, ServerSideEvents>,
query,
fn?: () => void
): Socket<ListenEvents, EmitEvents> {
): Socket<ListenEvents, EmitEvents, ServerSideEvents> {
debug("adding socket to nsp %s", this.name);
const socket = new Socket(this, client, query);
this.run(socket, (err) => {
@@ -212,7 +223,7 @@ export class Namespace<
*
* @private
*/
_remove(socket: Socket<ListenEvents, EmitEvents>): void {
_remove(socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>): void {
if (this.sockets.has(socket.id)) {
this.sockets.delete(socket.id);
} else {
@@ -255,6 +266,37 @@ export class Namespace<
return this;
}

/**
* Emit a packet to other Socket.IO servers
*
* @param ev - the event name
* @param args - an array of arguments, which may include an acknowledgement callback at the end
* @public
*/
public serverSideEmit<Ev extends EventNames<ServerSideEvents>>(
ev: Ev,
...args: EventParams<ServerSideEvents, Ev>
): boolean {
if (RESERVED_EVENTS.has(ev)) {
throw new Error(`"${ev}" is a reserved event name`);
}
args.unshift(ev);
this.adapter.serverSideEmit(args);
return true;
}

/**
* Called when a packet is received from another Socket.IO server
*
* @param args - an array of arguments, which may include an acknowledgement callback at the end
*
* @private
*/
_onServerSideEmit(args: any[]) {
const event = args.shift();
this.emitUntyped(event, args);
}

/**
* Gets a list of clients.
*
15 changes: 10 additions & 5 deletions lib/parent-namespace.ts
Original file line number Diff line number Diff line change
@@ -10,12 +10,15 @@ import type { BroadcastOptions } from "socket.io-adapter";

export class ParentNamespace<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents
> extends Namespace<ListenEvents, EmitEvents> {
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = {}
> extends Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
private static count: number = 0;
private children: Set<Namespace<ListenEvents, EmitEvents>> = new Set();
private children: Set<
Namespace<ListenEvents, EmitEvents, ServerSideEvents>
> = new Set();

constructor(server: Server<ListenEvents, EmitEvents>) {
constructor(server: Server<ListenEvents, EmitEvents, ServerSideEvents>) {
super(server, "/_" + ParentNamespace.count++);
}

@@ -43,7 +46,9 @@ export class ParentNamespace<
return true;
}

createChild(name: string): Namespace<ListenEvents, EmitEvents> {
createChild(
name: string
): Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
const namespace = new Namespace(this.server, name);
namespace._fns = this._fns.slice(0);
this.listeners("connect").forEach((listener) =>
11 changes: 6 additions & 5 deletions lib/socket.ts
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ export interface EventEmitterReservedEventsMap {

export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set<
| ClientReservedEvents
| keyof NamespaceReservedEventsMap<never, never>
| keyof NamespaceReservedEventsMap<never, never, never>
| keyof SocketReservedEventsMap
| keyof EventEmitterReservedEventsMap
>(<const>[
@@ -110,7 +110,8 @@ export interface Handshake {

export class Socket<
ListenEvents extends EventsMap = DefaultEventsMap,
EmitEvents extends EventsMap = ListenEvents
EmitEvents extends EventsMap = ListenEvents,
ServerSideEvents extends EventsMap = {}
> extends StrictEventEmitter<
ListenEvents,
EmitEvents,
@@ -126,7 +127,7 @@ export class Socket<
public connected: boolean;
public disconnected: boolean;

private readonly server: Server<ListenEvents, EmitEvents>;
private readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
private readonly adapter: Adapter;
private acks: Map<number, () => void> = new Map();
private fns: Array<
@@ -144,8 +145,8 @@ export class Socket<
* @package
*/
constructor(
readonly nsp: Namespace<ListenEvents, EmitEvents>,
readonly client: Client<ListenEvents, EmitEvents>,
readonly nsp: Namespace<ListenEvents, EmitEvents, ServerSideEvents>,
readonly client: Client<ListenEvents, EmitEvents, ServerSideEvents>,
auth: object
) {
super();
Loading

0 comments on commit 93cce05

Please sign in to comment.