Skip to content

Commit

Permalink
fix(messaging): catch send error
Browse files Browse the repository at this point in the history
  • Loading branch information
tmkx committed Feb 4, 2024
1 parent 11b6e75 commit 206ff6e
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 44 deletions.
2 changes: 1 addition & 1 deletion packages/messaging/src/background.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const backgroundPort: Port = {
if (!originMessage) return chrome.runtime.sendMessage(message);
const [, sender] = originMessage;
if (!sender.tab?.id) return chrome.runtime.sendMessage(message);
chrome.tabs.sendMessage(sender.tab.id, message, { documentId: sender.documentId, frameId: sender.frameId });
return chrome.tabs.sendMessage(sender.tab.id, message, { documentId: sender.documentId, frameId: sender.frameId });
},
};

Expand Down
8 changes: 3 additions & 5 deletions packages/messaging/src/client-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ const clientPort: Port = {
};
},
send(message) {
if (typeof message?.d?.to === 'number') {
chrome.tabs.sendMessage(message.d.to, message);
return;
}
chrome.runtime.sendMessage(message);
return typeof message?.d?.to === 'number'
? chrome.tabs.sendMessage(message.d.to, message)
: chrome.runtime.sendMessage(message);
},
};

Expand Down
25 changes: 25 additions & 0 deletions packages/messaging/src/core/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,28 @@ it('should serialize error message', async () => {
expectMessagingIsNotLeaked(receiver);
expectMessagingIsNotLeaked(sender);
});

it('should cleanup after sending error', async () => {
const { port1, port2 } = new MessageChannel();
const { promise, resolve } = withResolvers<void>();

const receiverPort = fromMessagePort(port1);
const receiver = createMessaging(receiverPort, {
async onStream(_message, subscriber) {
const timer = setInterval(() => subscriber.next(null), 30);
return () => {
clearInterval(timer);
resolve();
};
},
});
const sender = createMessaging(fromMessagePort(port2));

sender.stream(null, {
next: () => (receiverPort.send = () => Promise.reject('disconnected')),
});

await promise;

expectMessagingIsNotLeaked(receiver);
});
2 changes: 1 addition & 1 deletion packages/messaging/src/core/__tests__/test-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Messaging, Port } from '../index';

export function fromMessagePort(port: MessagePort): Port {
return {
send: (message) => port.postMessage(message),
send: async (message) => port.postMessage(message),
onMessage(listener) {
const ac = new AbortController();
port.addEventListener('message', (ev) => listener(ev.data), { signal: ac.signal });
Expand Down
99 changes: 62 additions & 37 deletions packages/messaging/src/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,14 @@ export type CleanupFunction = VoidFunction;
export interface Port {
name?: string;
onMessage(listener: (message: any, ...rest: unknown[]) => Promisable<any>): VoidFunction;
send(message: any, originMessage?: [message: any, ...unknown[]]): void;
send(message: any, originMessage?: [message: any, ...unknown[]]): Promise<unknown>;
}

/**
* @private
*/
interface Packet {
interface AnyPacket {
/**
* Packet type
*
* - `r`: Request / Response
* - `s`: Stream
* - `e`: Event
*/
t: 'r' | 'R' | 's' | 'S' | 'e';
t: string;

/**
* ID
Expand All @@ -31,9 +24,43 @@ interface Packet {
/**
* Data
*/
d: unknown;
}

interface RequestPacket extends AnyPacket {
t: 'r';
d: any;
}

interface ResponsePacket extends AnyPacket {
t: 'R';
d: { data: unknown } | { error: unknown };
}

interface StreamPacket extends AnyPacket {
t: 's';
d: any;
}

interface StreamDataPacket extends AnyPacket {
t: 'S';
d:
| { next: unknown }
| { error: unknown }
| {
/**
* true: by calling `.complete()`
* false: unsubscribe
*/
complete: boolean;
};
}

/**
* @private
*/
type Packet = RequestPacket | ResponsePacket | StreamPacket | StreamDataPacket;

export type RequestHandler = (message: any) => Promisable<any>;
export type StreamHandler = (message: any, subscriber: Observer<any>) => Promisable<CleanupFunction | void>;

Expand Down Expand Up @@ -77,23 +104,23 @@ export function createMessaging(port: Port, options?: CreateMessagingOptions): M

if (!isPacket(message)) return;

function reply(message: Packet) {
port.send(message, originMessage);
function reply(message: ResponsePacket | StreamDataPacket) {
return port.send(message, originMessage);
}

switch (message.t) {
case 'r': {
if (!onRequest) return;
const data = intercept(message.d, INTERCEPT_ABORT);
if (data === INTERCEPT_ABORT) return;
let d;
let d: ResponsePacket['d'];
try {
const response = await onRequest(data);
d = { data: response };
} catch (err) {
d = { error: err };
}
reply({ t: 'R', i: message.i, d });
reply({ t: 'R', i: message.i, d }).catch(noop);
break;
}
case 'R': {
Expand All @@ -112,27 +139,30 @@ export function createMessaging(port: Port, options?: CreateMessagingOptions): M
const data = intercept(message.d, INTERCEPT_ABORT);
if (data === INTERCEPT_ABORT) return;

function terminate(d: { error: unknown } | { complete: boolean }) {
reply({ t: 'S', i: message.i, d });
processingStreamCleanups.get(message.i)?.();
function safelyTerminate(d: { error: unknown } | { complete: boolean }) {
reply({ t: 'S', i: message.i, d }).catch(noop);
const cleanupFn = processingStreamCleanups.get(message.i);
processingStreamCleanups.delete(message.i);
cleanupFn && cleanupFn();
}

// sending again means "unsubscribe"
if (processingStreamCleanups.has(message.i)) {
terminate({ complete: false });
safelyTerminate({ complete: false });
break;
}

const observer: Observer<any> = {
next(value) {
reply({ t: 'S', i: message.i, d: { next: value } });
reply({ t: 'S', i: message.i, d: { next: value } }).catch((error) => {
safelyTerminate({ error });
});
},
error(error) {
terminate({ error });
safelyTerminate({ error });
},
complete() {
terminate({ complete: true });
safelyTerminate({ complete: true });
},
};

Expand All @@ -141,7 +171,7 @@ export function createMessaging(port: Port, options?: CreateMessagingOptions): M
processingStreamCleanups.set(message.i, cleanup);
} catch (error) {
// Error is not serializable in `chrome.runtime.Port`
terminate({ error: error instanceof Error ? error.message : error });
safelyTerminate({ error: error instanceof Error ? error.message : error });
}
break;
}
Expand All @@ -152,11 +182,11 @@ export function createMessaging(port: Port, options?: CreateMessagingOptions): M
if ('next' in data) {
observer.next?.(data.next);
} else if ('error' in data) {
observer.error?.(data.error);
ongoingStreamObservers.delete(message.i);
observer.error?.(data.error);
} else if ('complete' in data) {
data.complete && observer.complete?.();
ongoingStreamObservers.delete(message.i);
data.complete && observer.complete?.();
}
break;
}
Expand All @@ -171,28 +201,23 @@ export function createMessaging(port: Port, options?: CreateMessagingOptions): M
const resolvers = withResolvers<T>();
const id = randomID();
ongoingRequestResolvers.set(id, resolvers);
try {
port.send({ t: 'r', i: id, d: data } satisfies Packet);
} catch (err) {
port.send({ t: 'r', i: id, d: data } satisfies Packet).catch((err) => {
resolvers.reject(err);
}
});
return resolvers.promise;
},
stream(data, observer) {
const id = randomID();
ongoingStreamObservers.set(id, observer);
try {
port.send({ t: 's', i: id, d: data } satisfies Packet);
return () => {
if (!ongoingStreamObservers.has(id)) return;
port.send({ t: 's', i: id, d: null } satisfies Packet);
};
} catch (err) {
port.send({ t: 's', i: id, d: data } satisfies Packet).catch((err) => {
queueMicrotask(() => {
observer.error?.(err);
});
return noop;
}
});
return () => {
if (!ongoingStreamObservers.has(id)) return;
port.send({ t: 's', i: id, d: null } satisfies Packet).catch(noop);
};
},
dispose() {
offMessage();
Expand Down

0 comments on commit 206ff6e

Please sign in to comment.