Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pool.completed() #110

Merged
merged 4 commits into from
Jun 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,20 +173,21 @@ const pool = Pool(() => spawn(new Worker("./workers/multiplier")), 8 /* optional

pool.events.subscribe(console.log)

await pool.queue(async multiplier => {
pool.queue(async multiplier => {
const multiplied = await multiplier(2, 3)
console.log(`2 * 3 = ${multiplied}`)

// When this async call completes, the worker thread (`multiplier`) will
// be marked as available for new work scheduled via `pool.queue()`
})

await pool.completed()
await pool.terminate()
```

Note that `pool.queue()` will schedule a task to be run in a deferred way. It might execute straight away or it might take a while until a new worker thread becomes available.

The promise returned by `pool.queue()` will resolve once the scheduled callback has been executed and completed. A failing scheduled callback will also make the promise returned by `pool.queue()` reject.
The promise returned by `pool.completed()` will resolve once the scheduled callbacks have been executed and completed. A failing job will also make the promise reject.

</details>

Expand Down
125 changes: 109 additions & 16 deletions src/master/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ let nextPoolID = 1
const hasSymbols = () => typeof Symbol === 'function'
const hasSymbol = (name: keyof typeof Symbol) => hasSymbols() && Boolean(Symbol[name])

function flatMap<In, Out>(array: In[], mapper: ((element: In) => Out[])): Out[] {
return array.reduce<Out[]>(
(flattened, element) => [...flattened, ...mapper(element)],
[]
)
}

function sleep(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms))
}

function slugify(text: string) {
return text.replace(/\W/g, " ").trim().replace(/\s+/g, "-")
}
Expand All @@ -26,6 +37,7 @@ export enum PoolEventType {
taskCompleted = "taskCompleted",
taskFailed = "taskFailed",
taskQueued = "taskQueued",
taskQueueDrained = "taskQueueDrained",
taskStart = "taskStart",
terminated = "terminated"
}
Expand All @@ -43,6 +55,8 @@ export type PoolEvent<ThreadType extends Thread> = {
} | {
type: PoolEventType.taskQueued,
taskID: number
} | {
type: PoolEventType.taskQueueDrained
} | {
type: PoolEventType.taskStart,
taskID: number,
Expand All @@ -64,7 +78,7 @@ export type PoolEvent<ThreadType extends Thread> = {

interface WorkerDescriptor<ThreadType extends Thread> {
init: Promise<ThreadType>
runningTasks: Array<Task<ThreadType, any>>
runningJobs: Array<Promise<any>>
}

function createArray(size: number): number[] {
Expand All @@ -79,7 +93,7 @@ function findIdlingWorker<ThreadType extends Thread>(
workers: Array<WorkerDescriptor<ThreadType>>,
maxConcurrency: number
): WorkerDescriptor<ThreadType> | undefined {
return workers.find(worker => worker.runningTasks.length < maxConcurrency)
return workers.find(worker => worker.runningJobs.length < maxConcurrency)
}

function spawnWorkers<ThreadType extends Thread>(
Expand All @@ -88,19 +102,52 @@ function spawnWorkers<ThreadType extends Thread>(
): Array<WorkerDescriptor<ThreadType>> {
return createArray(count).map((): WorkerDescriptor<ThreadType> => ({
init: spawnWorker(),
runningTasks: []
runningJobs: []
}))
}

/**
* Thread pool implementation managing a set of worker threads.
* Use it to queue jobs that are run on those threads with limited
* concurrency.
*/
export interface Pool<ThreadType extends Thread> {
/**
* Returns a promise that resolves once the job queue is emptied.
*
* @param allowResolvingImmediately Set to `true` to resolve immediately if job queue is currently empty.
*/
completed(allowResolvingImmediately?: boolean): Promise<any>

/**
* Returns an observable that yields pool events.
*/
events(): Observable<PoolEvent<ThreadType>>
queue<Return>(task: TaskRunFunction<ThreadType, Return>): Promise<Return>

/**
* Queue a job and return a promise that resolves once the job has been dequeued,
* started and finished.
*
* @param job An async function that takes a thread instance and invokes it.
*/
queue<Return>(job: TaskRunFunction<ThreadType, Return>): Promise<Return>

/**
* Terminate all pool threads.
*
* @param force Set to `true` to kill the thread even if it cannot be stopped gracefully.
*/
terminate(force?: boolean): Promise<void>
}

export interface PoolOptions {
/** Maximum no. of jobs to run on one worker thread at a time. Defaults to one. */
concurrency?: number

/** Gives that pool a name to be used for debug logging, letting you distinguish between log output of different pools. */
name?: string

/** No. of worker threads to spawn and to be managed by the pool. */
size?: number
}

Expand All @@ -117,7 +164,6 @@ function PoolConstructor<ThreadType extends Thread>(

let isClosing = false
let nextTaskID = 1
let runningTaskJobs: Array<Promise<any>> = []

const taskQueue: Array<Task<ThreadType, any>> = []
const workers = spawnWorkers(spawnWorker, size)
Expand All @@ -137,13 +183,18 @@ function PoolConstructor<ThreadType extends Thread>(
)

const scheduleWork = () => {
debug(`Attempt de-queueing a task to run it...`)
debug(`Attempt de-queueing a task in order to run it...`)

const availableWorker = findIdlingWorker(workers, concurrency)
if (!availableWorker) return

const nextTask = taskQueue.shift()
if (!nextTask) return
if (!nextTask) {
debug(`Task queue is empty`)
eventSubject.next({ type: PoolEventType.taskQueueDrained })
return
}


const workerID = workers.indexOf(availableWorker) + 1
debug(`Running task #${nextTask.id} on worker #${workerID}...`)
Expand All @@ -154,11 +205,20 @@ function PoolConstructor<ThreadType extends Thread>(
workerID
})

const run = async () => {
const run = async (worker: WorkerDescriptor<ThreadType>, task: Task<ThreadType, any>) => {
const removeJobFromWorkersRunningJobs = () => {
worker.runningJobs = worker.runningJobs.filter(someRunPromise => someRunPromise !== runPromise)
}

// Defer job execution by one tick to give handlers time to subscribe
await sleep(0)

try {
const returnValue = await nextTask.run(await availableWorker.init)
const returnValue = await task.run(await availableWorker.init)

debug(`Task #${nextTask.id} completed successfully`)
removeJobFromWorkersRunningJobs()

eventSubject.next({
type: PoolEventType.taskCompleted,
returnValue,
Expand All @@ -167,30 +227,56 @@ function PoolConstructor<ThreadType extends Thread>(
})
} catch(error) {
debug(`Task #${nextTask.id} failed`)
removeJobFromWorkersRunningJobs()

eventSubject.next({
type: PoolEventType.taskFailed,
taskID: nextTask.id,
error,
workerID
})
throw error
} finally {
runningTaskJobs = runningTaskJobs.filter(someRunPromise => someRunPromise !== runPromise)
if (!isClosing) {
scheduleWork()
}
}
}
const runPromise = run()
runningTaskJobs.push(runPromise)
const runPromise = run(availableWorker, nextTask)
availableWorker.runningJobs.push(runPromise)
}

const pool: Pool<ThreadType> = {
async completed(allowResolvingImmediately: boolean = false) {
const getCurrentlyRunningJobs = () => flatMap(workers, worker => worker.runningJobs)

if (allowResolvingImmediately && taskQueue.length === 0) {
return Promise.all(getCurrentlyRunningJobs())
}

const poolEventPromise = new Promise((resolve, reject) => {
const subscription = eventObservable.subscribe(event => {
if (event.type === PoolEventType.taskQueueDrained) {
subscription.unsubscribe()
resolve()
} else if (event.type === PoolEventType.taskFailed) {
subscription.unsubscribe()
reject(event.error)
}
})
})

await Promise.race([
poolEventPromise,
eventObservable // make a pool-wide error reject the completed() result promise
])
await Promise.all(getCurrentlyRunningJobs())
},

events() {
return eventObservable
},

async queue(taskFunction) {
queue(taskFunction) {
if (isClosing) {
throw Error(`Cannot schedule pool tasks after terminate() has been called.`)
}
Expand All @@ -207,7 +293,7 @@ function PoolConstructor<ThreadType extends Thread>(
taskID: task.id
})

return new Promise((resolve, reject) => {
const resultPromise = new Promise<any>((resolve, reject) => {
const eventSubscription = pool.events().subscribe(event => {
if (event.type === PoolEventType.taskCompleted && event.taskID === task.id) {
eventSubscription.unsubscribe()
Expand All @@ -227,12 +313,19 @@ function PoolConstructor<ThreadType extends Thread>(
reject(error)
}
})

// Don't raise an UnhandledPromiseRejection error if not handled
// Reason: Because we just return this promise for convenience, but usually only
// pool.completed() will be used, leaving this quasi-duplicate promise unhandled.
resultPromise.catch(() => undefined)

return resultPromise
},

async terminate(force?: boolean) {
isClosing = true
if (!force) {
await Promise.all(runningTaskJobs)
await pool.completed(true)
}
eventSubject.next({
type: PoolEventType.terminated,
Expand Down
44 changes: 44 additions & 0 deletions test/pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,53 @@ test.serial("thread pool basics work and events are emitted", async t => {
taskID: 1,
workerID: 1
},
{
type: Pool.EventType.taskQueueDrained
},
{
type: Pool.EventType.terminated,
remainingQueue: []
}
])
})

test.serial("pool.completed() works", async t => {
const returned: any[] = []

const spawnHelloWorld = () => spawn(new Worker("./workers/hello-world"))
const pool = Pool(spawnHelloWorld, 2)

for (let i = 0; i < 3; i++) {
pool.queue(async helloWorld => {
returned.push(await helloWorld())
})
}

await pool.completed()

t.deepEqual(returned, [
"Hello World",
"Hello World",
"Hello World"
])
})

test.serial("pool.completed() proxies errors", async t => {
const spawnHelloWorld = () => spawn(new Worker("./workers/hello-world"))
const pool = Pool(spawnHelloWorld, 2)

pool.queue(async () => {
throw Error("Ooopsie")
})

const error = await t.throwsAsync(() => pool.completed())
t.is(error.message, "Ooopsie")
})

test.serial("pool.completed(true) works", async t => {
const spawnHelloWorld = () => spawn(new Worker("./workers/hello-world"))
const pool = Pool(spawnHelloWorld, 2)

await pool.completed(true)
t.pass()
})