Replies: 2 comments
-
That's amazing! It's an interesting idea. |
Beta Was this translation helpful? Give feedback.
0 replies
-
Oof import type { EventEmitter } from 'node:stream'
import { MessageChannel } from 'node:worker_threads'
import type { MessagePort as _MessagePort } from 'node:worker_threads'
import Piscina from 'piscina'
import type TypedEmitter from 'typed-emitter'
type MessagePortBase<OutgoingMessage> = {
[K in keyof _MessagePort as K extends keyof EventEmitter ? never : K]:
K extends 'postMessage' ? (
(value: OutgoingMessage, transferList?: NonNullable<Parameters<_MessagePort['postMessage']>[1]>) => void
) : _MessagePort[K]
}
export interface MessagePort<OutgoingMessage = never, IncomingMessage = never> extends TypedEmitter<{
close: () => void
message: (value: IncomingMessage) => void
messageerror: (error: Error) => void
}>, MessagePortBase<OutgoingMessage> {}
type Pure<T> = { [K in keyof T]: T[K] } & unknown
type PickValues<T, V> = { [K in keyof T as T[K] extends V ? K : never]: T[K] } & unknown
type PiscinaOptions = NonNullable<ConstructorParameters<typeof Piscina>[0]>
export type SimOptions = Pure<{
[K in keyof PiscinaOptions]: PiscinaOptions[K]
} & {
url: URL
}>
export interface ClientPort<WorkerMessage = never, ClientMessage = never>
extends MessagePort<ClientMessage, WorkerMessage> {}
export interface WorkerPort<WorkerMessage = never, ClientMessage = never>
extends MessagePort<WorkerMessage, ClientMessage> {}
// eslint-disable-next-line @typescript-eslint/consistent-type-definitions
export type WithPort<WorkerMessage = never, ClientMessage = never> = {
messagePort: WorkerPort<WorkerMessage, ClientMessage>
signal: AbortSignal
}
interface AbortablePromise<T>
extends Promise<Awaited<T>> {
abortController: AbortController
}
interface AbortablePromiseWithPort<T, WorkerMessage = never, ClientMessage = never>
extends AbortablePromise<T> {
abortController: AbortController
messagePort: ClientPort<WorkerMessage, ClientMessage>
}
type ToCall<T> =
| T extends (this: infer F extends WithPort<any, any>, ...a: infer A) => infer R ? (
F extends WithPort<infer WorkerMessage, infer ClientMessage> ? (
(...a: A) => AbortablePromiseWithPort<Awaited<R>, WorkerMessage, ClientMessage>
) : never
) : T extends (...a: infer A) => infer R ? (
(...a: A) => AbortablePromise<Awaited<R>>
) : never
type SimPoolModule<T> = {
[K in keyof T as T[K] extends (...a: any) => any ? K : never]: ToCall<T[K]>
} & unknown
type KeepGenericModule<T> = {
[K in keyof T as T[K] extends (...a: any) => any ? K : never]:
T[K] extends (this: WithPort<infer WorkerMessage, infer ClientMessage>, ...a: infer A) => infer R ? (
(callback: (fn: T[K]) => any) => AbortablePromiseWithPort<Awaited<R>, WorkerMessage, ClientMessage>
) : T[K] extends (...a: infer A) => infer R ? (
(callback: (fn: T[K]) => any) => AbortablePromise<Awaited<R>>
) : never
}
type RunOptions = Pure<NonNullable<Parameters<Piscina['run']>[1]>>
export class SimPool<Module> extends Piscina {
#modulePromise: Promise<Module>
#awaitedModule?: Module = undefined
constructor(options: SimOptions) {
super({ filename: options.url.href })
this.#modulePromise = import(options.url.href)
.then((module: Module) => {
this.#awaitedModule = module
return module
})
}
async initSync() {
return this.#modulePromise.then(() => this)
}
/**
* @deprecated use `pool.async.<function>()` or `pool.runExt('<function>', [...args], [...tranferList])`
*/
run(task: any, options?: RunOptions | undefined): Promise<any> {
return super.run(task, options)
}
/**
* Runs the module export in the pool thread
*/
runExt<K extends keyof SimPoolModule<Module>>(
name: K,
args: Parameters<SimPoolModule<Module>[K]>,
options?: {
transferList?: RunOptions['transferList']
abortController?: AbortController
signal?: AbortSignal
thread?: 'pool' | 'current'
thisArg?: Pure<Partial<ThisParameterType<Module[K & keyof Module]>>>
},
): ReturnType<SimPoolModule<Module>[K]> {
const abortController = options?.abortController ?? new AbortController()
const channel = new MessageChannel()
const thisArg = { messagePort: channel.port1, ...options?.thisArg ?? {} }
const signal = options?.signal ?? abortController.signal
const task = { name, args, thisArg, signal }
const runOptions = {
name: 'call',
signal,
transferList: [channel.port1, ...options?.transferList ?? []],
}
const result = options?.thread === 'current'
? (this.module as any as { call(task: object): Promise<any> }).call(task)
: super.run(task, runOptions)
return Object.assign(result, {
abortController,
channel,
messagePort: channel.port2,
}) as never
}
#proxyWithGetter<T extends object>(get: (property: keyof T) => unknown) {
return new Proxy({} as any as T, {
get(target, property) {
return get(property as keyof T)
},
})
}
/**
* The module itself
* - allows generics
* - has constants
* - requires `await this.initSync()`
*/
get module(): { [K in keyof Module]: Module[K] } {
if (!this.#awaitedModule)
throw new Error('should `await pool.initSync()` before using sync calls!')
return this.#awaitedModule
}
/**
* allows to call module exports in current thread
* - is limited to async functions
* - allows generics
* - requires `export const call = callWrap(import('./<current module>'))`
*/
get asyncModule() {
type AsyncModule = PickValues<Module, ((...a: any[]) => Promise<any>)>
return this.#proxyWithGetter<AsyncModule>(
property => (...args: never) => this.runExt(property as any, args),
)
}
/**
* allows to call module exports in current thread
* - requires `export const call = callWrap(import('./<current module>'))`
* - is not compatible with **generics**
*/
get sync(): SimPoolModule<Module> {
return this.#proxyWithGetter<SimPoolModule<Module>>(
property => (...args: never) => this.runExt(property as any, args, { thread: 'current' }),
)
}
/**
* allows to call module exports in pool
* - requires `export const call = callWrap(import('./<current module>'))`
* - is not compatible with **generics**
*/
get async(): SimPoolModule<Module> {
return this.#proxyWithGetter<SimPoolModule<Module>>(
property => (...args: never) => this.runExt(property as any, args),
)
}
/**
* allows to call generic module exports in pool
* - requires `export const call = callWrap(import('./<current module>'))`
* - feels a beet weird
*/
get generic(): KeepGenericModule<Module> {
return this.#proxyWithGetter<KeepGenericModule<Module>>(
(property) => {
// returns (callback: (fn: T[K]) => any) => AbortablePromise<Awaited<R>>
return (callback: (call: Function) => any) => {
let args: any[] = []
callback((...a) => {
args = a
})
return this.runExt(property as any, args as never)
}
})
}
}
export function callWrap<Module extends object>(self: Promise<Module>) {
type SimModule = SimPoolModule<Omit<Module, 'call'>>
return async function call<K extends keyof SimModule>(task: {
name: K & string
args: Parameters<SimModule[K]>[]
thisArg: ThisParameterType<SimModule[K]>
}): Promise<Awaited<ReturnType<Extract<Module[K & keyof Module], (...a: any[]) => any>>>> {
const module = await self
if (!(task.name in module))
throw new Error(`No "${task.name}" exported`)
const fn = module[task.name as keyof Module] as Function
if (typeof fn !== 'function')
throw new Error(`"${task.name}" export was not a function`)
console.log('simcall', fn, task.args)
// eslint-disable-next-line @typescript-eslint/no-unsafe-return
return fn.apply(task.thisArg, task.args)
}
} import type { WithPort } from '../src/worker'
import { callWrap } from '../src/worker'
// REQUIRED
export const call = callWrap(import('./worker.test.2'))
export const cooonst = 'long'
export function summ(...a: number[]): number {
return a.reduce((v, e) => v + e, 0)
}
export function add(a: number, b: number): number
export function add(a: string, b: string): string
export function add(a, b) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/restrict-plus-operands
return a + b
}
// eslint-disable-next-line @typescript-eslint/require-await
export async function aidentity<T>(v: T) {
return v
}
export async function read<T>(this: WithPort<never, T>) {
return await new Promise((resolve) => {
this.messagePort.once('message', resolve)
})
}
// eslint-disable-next-line @typescript-eslint/require-await
export async function write<T>(this: WithPort<T, never>, value: T) {
this.messagePort.postMessage(value)
}
export async function ping<I, O>(this: WithPort<O, I>, ping: I, pong: O) {
await new Promise((resolve, reject) => {
this.messagePort.once('message', (value) => {
if (value !== ping) {
reject(new Error(`ping message should be ${JSON.stringify(ping)}}`))
}
else {
this.messagePort.postMessage(pong)
resolve('ok')
}
})
})
} const pool = new SimPool<typeof import('./worker.test.2')>({
url: new URL('./worker.test.2', import.meta.url),
})
await pool.initSync()
console.log(pool.sync.cooonst)
// await pool.runExt('test1', [{ foo: 'bar' }], [])
console.log(pool.async.add)
console.log({
summn: await pool.generic.add(add => add(2, 2)),
summs: await pool.generic.add(add => add('a', 'b')),
})
void pool.runExt('add', ['a', 'b'])
void pool.asyncModule.aidentity(123) |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I'm trying to implement a typed wrapper
Beta Was this translation helpful? Give feedback.
All reactions