Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DOCSP-5410: Automatically restart workers #5

Open
wants to merge 1 commit into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions src/TaskWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
'use strict'

const Worker = require('tiny-worker')
const log = require('./log').log

const MAXIMUM_BACKLOG = 20

/** A web worker with a promise-oriented message-call interface. */
class TaskWorker {
/**
* Create a new TaskWorker.
* @param {string} scriptPath - A path to a JS file to execute.
*/
constructor(scriptPath) {
this.scriptPath = scriptPath
this.backlog = 0
this.pending = new Map()
this.messageId = 0
this.lastStarted = null
this.dead = false
this.worker = null
this.start()
}

/**
* Send a message to this TaskWorker.
* @param {map} message - An object to send to the worker.
* @return {Promise}
*/
send(message) {
if (this.backlog > MAXIMUM_BACKLOG) {
throw new Error('backlog-exceeded')
}

if (!this.worker) {
throw new Error('Worker not running')
}

return new Promise((resolve, reject) => {
const messageId = this.messageId
this.messageId += 1
this.backlog += 1

this.worker.postMessage({message: message, messageId: messageId})
this.pending.set(messageId, [resolve, reject])
})
}

/**
* Handler for messages received from the worker.
* @private
* @param {MessageEvent} event
* @return {Promise<?, Error>}
*/
onmessage(event) {
const pair = this.pending.get(event.data.messageId)
if (!pair) {
log.error(`Got unknown message ID ${event.data.messageId}`)
return
}

this.backlog -= 1
this.pending.delete(event.data.messageId)
const [resolve, reject] = pair
if (event.data.error) {
reject(new Error(event.data.error))
return
}

resolve(event.data)
}

/**
* Start the worker process.
* @return {number}
*/
start() {
// Do nothing if the child is still running
if (this.worker && this.worker.child.connected) {
return this.worker.child.pid
}

// If we died within the past hour, don't restart. Something is wrong
if (this.lastStarted && ((new Date()) - this.lastStarted) < TaskWorker.MIN_RESTART_INTERVAL) {
this.dead = true
}

if (this.dead) {
return -1
}

const worker = new Worker(this.scriptPath)
worker.onmessage = this.onmessage.bind(this)
worker.child.addListener('exit', (code, signal) => {
log.warning(`Worker exited: code=${code} signal=${signal}`)
this.stop()

// Don't restart if graceful or due to SIGINT
if (code === 0 || signal === 'SIGINT') {
return
}

// Wait a random interval up to a minute before restarting
// This might help prevent a thundering herd problem
const randomFactor = (
TaskWorker.MAX_RESTART_TIMEOUT - TaskWorker.MIN_RESTART_TIMEOUT) +
TaskWorker.MIN_RESTART_TIMEOUT
setTimeout(() => this.start(), (Math.random() * randomFactor))
})


this.stop()
this.worker = worker

this.lastStarted = new Date()
return this.worker.child.pid
}

stop() {
for (const pair of this.pending.values()) {
pair[1](new Error('Worker terminated'))
}

this.backlog = 0
this.pending.clear()
this.messageId = 0

if (this.worker && this.worker.child.connected) {
this.worker.terminate()
}

this.worker = null
}
}

// Configurable knobs
// If a restart happens less than this number of ms from the last restart, flag the worker as dead
// Default: 1 hour
TaskWorker.MIN_RESTART_INTERVAL = 1000 * 60 * 60

// We wait a random amount of time before restarting a stopped worker. Default: 1-10 seconds
TaskWorker.MIN_RESTART_TIMEOUT = 1000
TaskWorker.MAX_RESTART_TIMEOUT = 1000 * 9

exports.TaskWorker = TaskWorker
90 changes: 15 additions & 75 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,24 @@ const util = require('util')
const zlib = require('zlib')

const Pool = require('./pool.js').Pool
const TaskWorker = require('./TaskWorker.js').TaskWorker
const log = require('./log.js').log
const dive = require('dive')
const iltorb = require('iltorb')
const Logger = require('basic-logger')
const S3 = require('aws-sdk/clients/s3')
const Worker = require('tiny-worker')

process.title = 'marian'

const MAXIMUM_QUERY_LENGTH = 100

// If a worker's backlog rises above this threshold, reject the request.
// This prevents the server from getting bogged down for unbounded periods of time.
const MAXIMUM_BACKLOG = 20
const WARNING_BACKLOG = 15

const STANDARD_HEADERS = {
'X-Content-Type-Options': 'nosniff'
}

const log = new Logger({
showTimestamp: true,
})

/**
* Find an acceptable compression format for the client, and return a compressed
* version of the content if possible. Otherwise return the original input text.
Expand Down Expand Up @@ -80,66 +75,6 @@ function checkMethod(req, res, method) {
return true
}

/** A web worker with a promise-oriented message-call interface. */
class TaskWorker {
/**
* Create a new TaskWorker.
* @param {string} scriptPath - A path to a JS file to execute.
*/
constructor(scriptPath) {
this.worker = new Worker(scriptPath)
this.worker.onmessage = this.onmessage.bind(this)

this.backlog = 0
this.pending = new Map()
this.messageId = 0
}

/**
* Send a message to this TaskWorker.
* @param {map} message - An object to send to the worker.
* @return {Promise}
*/
send(message) {
if (this.backlog > MAXIMUM_BACKLOG) {
throw new Error('backlog-exceeded')
}

return new Promise((resolve, reject) => {
const messageId = this.messageId
this.messageId += 1
this.backlog += 1

this.worker.postMessage({message: message, messageId: messageId})
this.pending.set(messageId, [resolve, reject])
})
}

/**
* Handler for messages received from the worker.
* @private
* @param {MessageEvent} event
* @return {Promise<?, Error>}
*/
onmessage(event) {
const pair = this.pending.get(event.data.messageId)
if (!pair) {
log.error(`Got unknown message ID ${event.data.messageId}`)
return
}

this.backlog -= 1
this.pending.delete(event.data.messageId)
const [resolve, reject] = pair
if (event.data.error) {
reject(new Error(event.data.error))
return
}

resolve(event.data)
}
}

class Index {
constructor(manifestSource) {
this.manifestSource = manifestSource
Expand All @@ -150,10 +85,11 @@ class Index {
this.currentlyIndexing = false

const MAX_WORKERS = parseInt(process.env.MAX_WORKERS) || 2
this.workers = new Pool(Math.min(os.cpus().length, MAX_WORKERS), () => new TaskWorker(pathModule.join(__dirname, 'worker-searcher.js')))
const nWorkers = Math.min(os.cpus().length, MAX_WORKERS)
this.workers = new Pool(nWorkers, () => new TaskWorker(pathModule.join(__dirname, 'worker-searcher.js')))

// Suspend all of our workers until we have an index
for (const worker of this.workers.pool) {
for (const worker of this.workers) {
this.workers.suspend(worker)
}
}
Expand Down Expand Up @@ -282,7 +218,7 @@ class Index {

this.errors = []
setTimeout(async () => {
for (const worker of this.workers.pool) {
for (const worker of this.workers) {
this.workers.suspend(worker)
try {
await worker.send({sync: manifests})
Expand Down Expand Up @@ -396,9 +332,15 @@ class Marian {
body = await compress(req, headers, body)

// If all workers are overloaded, return 503
let statusCode = 200
if (status.workers.filter((n) => n <= WARNING_BACKLOG).length === 0) {
statusCode = 503
// If a worker is dead, return 500
let statusCode = 503
for (const workerState of status.workers) {
if (workerState === 'd') {
statusCode = 500
break
} else if (workerState <= WARNING_BACKLOG) {
statusCode = 200
}
}

res.writeHead(statusCode, headers)
Expand Down Expand Up @@ -468,8 +410,6 @@ class Marian {
}

async function main() {
Logger.setLevel('info', true)

const server = new Marian(process.argv[2])
server.start(8080)
}
Expand Down
8 changes: 8 additions & 0 deletions src/log.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict'

const Logger = require('basic-logger')
Logger.setLevel('info', true)

exports.log = new Logger({
showTimestamp: true,
})
8 changes: 8 additions & 0 deletions src/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,21 @@ class Pool {

getStatus() {
return this.pool.map((worker) => {
if (worker.dead) {
return 'd'
}

if (!this.suspended.has(worker)) {
return worker.backlog
}

return 's'
})
}

[Symbol.iterator]() {
return this.pool.values()
}
}

exports.Pool = Pool
2 changes: 1 addition & 1 deletion test/test_pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ describe('Pool', () => {
})

it('Should throw if no elements are available', () => {
for (const worker of pool.pool) {
for (const worker of pool) {
pool.suspend(worker)
}

Expand Down
43 changes: 43 additions & 0 deletions test/test_taskworker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/* eslint-env node, mocha */
'use strict'

const assert = require('assert')
const TaskWorker = require('../src/TaskWorker.js').TaskWorker
TaskWorker.MIN_RESTART_INTERVAL = 200
TaskWorker.MIN_RESTART_TIMEOUT = 10
TaskWorker.MAX_RESTART_TIMEOUT = 10

function promiseTimeout(time) {
return new Promise(resolve => setTimeout(resolve, time))
}

describe('TaskWorker', function() {
this.slow(1000)

const workerPath = 'test/worker.js'
const worker = new TaskWorker(workerPath)

it('Should work', async () => {
assert.equal((await worker.send('ping')).message, 'pong')
assert.equal((await worker.send('ping')).message, 'pong')
})

it('Should restart and reject stale requests', async () => {
await promiseTimeout(200)
await assert.rejects(async () => await worker.send('die'), new Error('Worker terminated'))
await promiseTimeout(50)
assert.equal((await worker.send('ping')).message, 'pong')
})

it('Should avoid restarting too much', async () => {
assert.strictEqual(worker.dead, false)
await assert.rejects(async () => await worker.send('die'), new Error('Worker terminated'))
await promiseTimeout(10)
await assert.rejects(async () => await worker.send('ping'), new Error('Worker not running'))
assert.strictEqual(worker.dead, true)
})

after(() => {
worker.stop()
})
})
12 changes: 12 additions & 0 deletions test/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict'

self.onmessage = (ev) => {
const message = ev.data.message
const messageId = ev.data.messageId

if (message === 'ping') {
self.postMessage({message: 'pong', messageId: messageId})
} else if (message === 'die') {
process.exit(1)
}
}