Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(messaging): catch send error #29

Merged
merged 1 commit into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/messaging/src/background.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const backgroundPort: Port = {
if (!originMessage) return chrome.runtime.sendMessage(message);
const [, sender] = originMessage;
if (!sender.tab?.id) return chrome.runtime.sendMessage(message);
chrome.tabs.sendMessage(sender.tab.id, message, { documentId: sender.documentId, frameId: sender.frameId });
return chrome.tabs.sendMessage(sender.tab.id, message, { documentId: sender.documentId, frameId: sender.frameId });
},
};

Expand Down
8 changes: 3 additions & 5 deletions packages/messaging/src/client-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ const clientPort: Port = {
};
},
send(message) {
if (typeof message?.d?.to === 'number') {
chrome.tabs.sendMessage(message.d.to, message);
return;
}
chrome.runtime.sendMessage(message);
return typeof message?.d?.to === 'number'
? chrome.tabs.sendMessage(message.d.to, message)
: chrome.runtime.sendMessage(message);
},
};

Expand Down
25 changes: 25 additions & 0 deletions packages/messaging/src/core/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,28 @@ it('should serialize error message', async () => {
expectMessagingIsNotLeaked(receiver);
expectMessagingIsNotLeaked(sender);
});

it('should cleanup after sending error', async () => {
const { port1, port2 } = new MessageChannel();
const { promise, resolve } = withResolvers<void>();

const receiverPort = fromMessagePort(port1);
const receiver = createMessaging(receiverPort, {
async onStream(_message, subscriber) {
const timer = setInterval(() => subscriber.next(null), 30);
return () => {
clearInterval(timer);
resolve();
};
},
});
const sender = createMessaging(fromMessagePort(port2));

sender.stream(null, {
next: () => (receiverPort.send = () => Promise.reject('disconnected')),
});

await promise;

expectMessagingIsNotLeaked(receiver);
});
2 changes: 1 addition & 1 deletion packages/messaging/src/core/__tests__/test-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Messaging, Port } from '../index';

export function fromMessagePort(port: MessagePort): Port {
return {
send: (message) => port.postMessage(message),
send: async (message) => port.postMessage(message),
onMessage(listener) {
const ac = new AbortController();
port.addEventListener('message', (ev) => listener(ev.data), { signal: ac.signal });
Expand Down
99 changes: 62 additions & 37 deletions packages/messaging/src/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,14 @@ export type CleanupFunction = VoidFunction;
export interface Port {
name?: string;
onMessage(listener: (message: any, ...rest: unknown[]) => Promisable<any>): VoidFunction;
send(message: any, originMessage?: [message: any, ...unknown[]]): void;
send(message: any, originMessage?: [message: any, ...unknown[]]): Promise<unknown>;
}

/**
* @private
*/
interface Packet {
interface AnyPacket {
/**
* Packet type
*
* - `r`: Request / Response
* - `s`: Stream
* - `e`: Event
*/
t: 'r' | 'R' | 's' | 'S' | 'e';
t: string;

/**
* ID
Expand All @@ -31,9 +24,43 @@ interface Packet {
/**
* Data
*/
d: unknown;
}

interface RequestPacket extends AnyPacket {
t: 'r';
d: any;
}

interface ResponsePacket extends AnyPacket {
t: 'R';
d: { data: unknown } | { error: unknown };
}

interface StreamPacket extends AnyPacket {
t: 's';
d: any;
}

interface StreamDataPacket extends AnyPacket {
t: 'S';
d:
| { next: unknown }
| { error: unknown }
| {
/**
* true: by calling `.complete()`
* false: unsubscribe
*/
complete: boolean;
};
}

/**
* @private
*/
type Packet = RequestPacket | ResponsePacket | StreamPacket | StreamDataPacket;

export type RequestHandler = (message: any) => Promisable<any>;
export type StreamHandler = (message: any, subscriber: Observer<any>) => Promisable<CleanupFunction | void>;

Expand Down Expand Up @@ -77,23 +104,23 @@ export function createMessaging(port: Port, options?: CreateMessagingOptions): M

if (!isPacket(message)) return;

function reply(message: Packet) {
port.send(message, originMessage);
function reply(message: ResponsePacket | StreamDataPacket) {
return port.send(message, originMessage);
}

switch (message.t) {
case 'r': {
if (!onRequest) return;
const data = intercept(message.d, INTERCEPT_ABORT);
if (data === INTERCEPT_ABORT) return;
let d;
let d: ResponsePacket['d'];
try {
const response = await onRequest(data);
d = { data: response };
} catch (err) {
d = { error: err };
}
reply({ t: 'R', i: message.i, d });
reply({ t: 'R', i: message.i, d }).catch(noop);
break;
}
case 'R': {
Expand All @@ -112,27 +139,30 @@ export function createMessaging(port: Port, options?: CreateMessagingOptions): M
const data = intercept(message.d, INTERCEPT_ABORT);
if (data === INTERCEPT_ABORT) return;

function terminate(d: { error: unknown } | { complete: boolean }) {
reply({ t: 'S', i: message.i, d });
processingStreamCleanups.get(message.i)?.();
function safelyTerminate(d: { error: unknown } | { complete: boolean }) {
reply({ t: 'S', i: message.i, d }).catch(noop);
const cleanupFn = processingStreamCleanups.get(message.i);
processingStreamCleanups.delete(message.i);
cleanupFn && cleanupFn();
}

// sending again means "unsubscribe"
if (processingStreamCleanups.has(message.i)) {
terminate({ complete: false });
safelyTerminate({ complete: false });
break;
}

const observer: Observer<any> = {
next(value) {
reply({ t: 'S', i: message.i, d: { next: value } });
reply({ t: 'S', i: message.i, d: { next: value } }).catch((error) => {
safelyTerminate({ error });
});
},
error(error) {
terminate({ error });
safelyTerminate({ error });
},
complete() {
terminate({ complete: true });
safelyTerminate({ complete: true });
},
};

Expand All @@ -141,7 +171,7 @@ export function createMessaging(port: Port, options?: CreateMessagingOptions): M
processingStreamCleanups.set(message.i, cleanup);
} catch (error) {
// Error is not serializable in `chrome.runtime.Port`
terminate({ error: error instanceof Error ? error.message : error });
safelyTerminate({ error: error instanceof Error ? error.message : error });
}
break;
}
Expand All @@ -152,11 +182,11 @@ export function createMessaging(port: Port, options?: CreateMessagingOptions): M
if ('next' in data) {
observer.next?.(data.next);
} else if ('error' in data) {
observer.error?.(data.error);
ongoingStreamObservers.delete(message.i);
observer.error?.(data.error);
} else if ('complete' in data) {
data.complete && observer.complete?.();
ongoingStreamObservers.delete(message.i);
data.complete && observer.complete?.();
}
break;
}
Expand All @@ -171,28 +201,23 @@ export function createMessaging(port: Port, options?: CreateMessagingOptions): M
const resolvers = withResolvers<T>();
const id = randomID();
ongoingRequestResolvers.set(id, resolvers);
try {
port.send({ t: 'r', i: id, d: data } satisfies Packet);
} catch (err) {
port.send({ t: 'r', i: id, d: data } satisfies Packet).catch((err) => {
resolvers.reject(err);
}
});
return resolvers.promise;
},
stream(data, observer) {
const id = randomID();
ongoingStreamObservers.set(id, observer);
try {
port.send({ t: 's', i: id, d: data } satisfies Packet);
return () => {
if (!ongoingStreamObservers.has(id)) return;
port.send({ t: 's', i: id, d: null } satisfies Packet);
};
} catch (err) {
port.send({ t: 's', i: id, d: data } satisfies Packet).catch((err) => {
queueMicrotask(() => {
observer.error?.(err);
});
return noop;
}
});
return () => {
if (!ongoingStreamObservers.has(id)) return;
port.send({ t: 's', i: id, d: null } satisfies Packet).catch(noop);
};
},
dispose() {
offMessage();
Expand Down
Loading