diff --git a/server.js b/server.js index 98422a50..4b580b1f 100644 --- a/server.js +++ b/server.js @@ -478,6 +478,7 @@ let unassignCounter = new Map(); let workerAssigned = new WeakMap(); let onlineWorkers = new WeakSet(); +let imapInitialWorkersLoaded = false; let workers = new Map(); let workersMeta = new WeakMap(); let availableIMAPWorkers = new Set(); @@ -635,302 +636,318 @@ let spawnWorker = async type => { workers.get(type).add(worker); - worker.on('online', () => { - if (['smtp', 'imapProxy'].includes(type)) { - updateServerState(type, 'initializing').catch(err => logger.error({ msg: `Failed to update ${type} server state`, err })); - } - onlineWorkers.add(worker); + return new Promise((resolve, reject) => { + let isOnline = false; + let threadId = worker.threadId; - let workerMeta = workersMeta.has(worker) ? workersMeta.get(worker) : {}; - workerMeta.online = Date.now(); - workersMeta.set(worker, workerMeta); - }); + worker.on('online', () => { + if (['smtp', 'imapProxy'].includes(type)) { + updateServerState(type, 'initializing').catch(err => logger.error({ msg: `Failed to update ${type} server state`, err })); + } + onlineWorkers.add(worker); - let exitHandler = async exitCode => { - onlineWorkers.delete(worker); - metrics.threadStops.inc(); + let workerMeta = workersMeta.has(worker) ? workersMeta.get(worker) : {}; + workerMeta.online = Date.now(); + workersMeta.set(worker, workerMeta); - workers.get(type).delete(worker); + if (type !== 'imap') { + // imap workers need to wait until ready to accept accounts + isOnline = true; + resolve(threadId); + } + }); - if (['smtp', 'imapProxy'].includes(type)) { - updateServerState(type, suspendedWorkerTypes.has(type) ? 'suspended' : 'exited'); - } + let exitHandler = async exitCode => { + onlineWorkers.delete(worker); + metrics.threadStops.inc(); - if (type === 'imap') { - availableIMAPWorkers.delete(worker); - - if (workerAssigned.has(worker)) { - let accountList = workerAssigned.get(worker); - workerAssigned.delete(worker); - accountList.forEach(account => { - assigned.delete(account); - let shouldReassign = false; - // graceful reconnect - countUnassignment(account) - .then(sr => { - shouldReassign = sr; - }) - .catch(() => { - shouldReassign = true; - }) - .finally(() => { - unassigned.add(account); - if (shouldReassign) { - assignAccounts().catch(err => logger.error({ msg: 'Failed to assign accounts', n: 1, err })); - } - }); - }); + workers.get(type).delete(worker); + + if (['smtp', 'imapProxy'].includes(type)) { + updateServerState(type, suspendedWorkerTypes.has(type) ? 'suspended' : 'exited'); } - } - if (isClosing) { - return; - } + if (type === 'imap') { + availableIMAPWorkers.delete(worker); + + if (workerAssigned.has(worker)) { + let accountList = workerAssigned.get(worker); + workerAssigned.delete(worker); + accountList.forEach(account => { + assigned.delete(account); + let shouldReassign = false; + // graceful reconnect + countUnassignment(account) + .then(sr => { + shouldReassign = sr; + }) + .catch(() => { + shouldReassign = true; + }) + .finally(() => { + unassigned.add(account); + if (shouldReassign) { + assignAccounts().catch(err => logger.error({ msg: 'Failed to assign accounts', n: 1, err })); + } + }); + }); + } + } - // spawning a new worker trigger reassign - if (suspendedWorkerTypes.has(type)) { - logger.info({ msg: 'Worker thread closed', exitCode, type }); - } else { - logger.error({ msg: 'Worker exited', exitCode, type }); - } + if (isClosing) { + return; + } - // trigger new spawn - await new Promise(r => setTimeout(r, 1000)); - await spawnWorker(type); - }; + // spawning a new worker trigger reassign + if (suspendedWorkerTypes.has(type)) { + logger.info({ msg: 'Worker thread closed', exitCode, type }); + } else { + logger.error({ msg: 'Worker exited', exitCode, type }); + } - worker.on('exit', exitCode => { - exitHandler(exitCode).catch(err => { - logger.error({ msg: 'Failed to handle worker exit', exitCode, worker: worker.threadId, err }); - }); - }); + // trigger new spawn + await new Promise(r => setTimeout(r, 1000)); + await spawnWorker(type); + }; - worker.on('message', message => { - let workerMeta = workersMeta.has(worker) ? workersMeta.get(worker) : {}; - workerMeta.messages = workerMeta.messages ? ++workerMeta.messages : 1; - workersMeta.set(worker, workerMeta); + worker.on('exit', exitCode => { + if (!isOnline) { + let error = new Error(`Failed to start ${type} worker thread on initialization`); - if (!message) { - return; - } + error.workerType = type; + error.exitCode = exitCode; + error.threadId = threadId; - if (message.cmd === 'resp' && message.mid && callQueue.has(message.mid)) { - let { resolve, reject, timer } = callQueue.get(message.mid); - clearTimeout(timer); - callQueue.delete(message.mid); - if (message.error) { - let err = new Error(message.error); - if (message.code) { - err.code = message.code; - } - if (message.statusCode) { - err.statusCode = message.statusCode; - } - if (message.info) { - err.info = message.info; - } - return reject(err); - } else { - return resolve(message.response); + reject(error); } - } - if (message.cmd === 'call' && message.mid) { - return onCommand(worker, message.message) - .then(response => { - let transferList; - if (response && typeof response === 'object' && response._transfer === true) { - if (typeof response._response === 'object' && response._response && response._response.buffer) { - transferList = [response._response.buffer]; - } - response = response._response; - } + exitHandler(exitCode).catch(err => { + logger.error({ msg: 'Failed to handle worker exit', exitCode, type, worker: worker.threadId, err }); + }); + }); - let callPayload = { - cmd: 'resp', - mid: message.mid, - response - }; - - try { - postMessage(worker, callPayload, null, transferList); - } catch (err) { - if (Buffer.isBuffer(callPayload.response)) { - callPayload.response = `Buffer <${callPayload.response.length}B>`; - } + worker.on('message', message => { + let workerMeta = workersMeta.has(worker) ? workersMeta.get(worker) : {}; + workerMeta.messages = workerMeta.messages ? ++workerMeta.messages : 1; + workersMeta.set(worker, workerMeta); - logger.error({ msg: 'Failed to post state change to child', worker: worker.threadId, callPayload, err }); + if (!message) { + return; + } + + if (message.cmd === 'resp' && message.mid && callQueue.has(message.mid)) { + let { resolve, reject, timer } = callQueue.get(message.mid); + clearTimeout(timer); + callQueue.delete(message.mid); + if (message.error) { + let err = new Error(message.error); + if (message.code) { + err.code = message.code; } - }) - .catch(err => { - let callPayload = { - cmd: 'resp', - mid: message.mid, - error: err.message, - code: err.code, - statusCode: err.statusCode, - info: err.info - }; - - try { - postMessage(worker, callPayload); - } catch (err) { - logger.error({ msg: 'Failed to post state change to child', worker: worker.threadId, callPayload, err }); + if (message.statusCode) { + err.statusCode = message.statusCode; } - }); - } + if (message.info) { + err.info = message.info; + } + return reject(err); + } else { + return resolve(message.response); + } + } - switch (message.cmd) { - case 'metrics': { - let statUpdateKey = false; - let accountUpdateKey = false; + if (message.cmd === 'call' && message.mid) { + return onCommand(worker, message.message) + .then(response => { + let transferList; + if (response && typeof response === 'object' && response._transfer === true) { + if (typeof response._response === 'object' && response._response && response._response.buffer) { + transferList = [response._response.buffer]; + } + response = response._response; + } - let { account } = message.meta || {}; + let callPayload = { + cmd: 'resp', + mid: message.mid, + response + }; - switch (message.key) { - // gather for dashboard counter - case 'webhooks': { - let { status } = message.args[0] || {}; - statUpdateKey = `${message.key}:${status}`; - break; - } - - case 'webhookReq': { - break; - } + try { + postMessage(worker, callPayload, null, transferList); + } catch (err) { + if (Buffer.isBuffer(callPayload.response)) { + callPayload.response = `Buffer <${callPayload.response.length}B>`; + } - case 'events': { - let { event } = message.args[0] || {}; - if (account) { - accountUpdateKey = `${message.key}:${event}`; + logger.error({ msg: 'Failed to post state change to child', worker: worker.threadId, callPayload, err }); } + }) + .catch(err => { + let callPayload = { + cmd: 'resp', + mid: message.mid, + error: err.message, + code: err.code, + statusCode: err.statusCode, + info: err.info + }; - switch (event) { - case MESSAGE_NEW_NOTIFY: - case MESSAGE_DELETED_NOTIFY: - case CONNECT_ERROR_NOTIFY: - statUpdateKey = `${message.key}:${event}`; - break; + try { + postMessage(worker, callPayload); + } catch (err) { + logger.error({ msg: 'Failed to post state change to child', worker: worker.threadId, callPayload, err }); } - break; - } + }); + } - case 'apiCall': { - let { statusCode } = message.args[0] || {}; - let success = statusCode >= 200 && statusCode < 300; - statUpdateKey = `${message.key}:${success ? 'success' : 'fail'}`; - break; - } + switch (message.cmd) { + case 'metrics': { + let statUpdateKey = false; + let accountUpdateKey = false; + + let { account } = message.meta || {}; - case 'queuesProcessed': { - let { queue, status } = message.args[0] || {}; - if (['submit'].includes(queue)) { - statUpdateKey = `${queue}:${status === 'completed' ? 'success' : 'fail'}`; + switch (message.key) { + // gather for dashboard counter + case 'webhooks': { + let { status } = message.args[0] || {}; + statUpdateKey = `${message.key}:${status}`; + break; } - break; - } - } - if (statUpdateKey) { - // increment counter in redis + case 'webhookReq': { + break; + } - let now = new Date(); + case 'events': { + let { event } = message.args[0] || {}; + if (account) { + accountUpdateKey = `${message.key}:${event}`; + } - // we keep a separate hash value for each ISO day - let dateStr = `${now - .toISOString() - .substr(0, 10) - .replace(/[^0-9]+/g, '')}`; + switch (event) { + case MESSAGE_NEW_NOTIFY: + case MESSAGE_DELETED_NOTIFY: + case CONNECT_ERROR_NOTIFY: + statUpdateKey = `${message.key}:${event}`; + break; + } + break; + } - // hash key for bucket - let timeStr = `${now - .toISOString() - // bucket includes 1 minute - .substr(0, 16) - .replace(/[^0-9]+/g, '')}`; + case 'apiCall': { + let { statusCode } = message.args[0] || {}; + let success = statusCode >= 200 && statusCode < 300; + statUpdateKey = `${message.key}:${success ? 'success' : 'fail'}`; + break; + } - let hkey = `${REDIS_PREFIX}stats:${statUpdateKey}:${dateStr}`; + case 'queuesProcessed': { + let { queue, status } = message.args[0] || {}; + if (['submit'].includes(queue)) { + statUpdateKey = `${queue}:${status === 'completed' ? 'success' : 'fail'}`; + } + break; + } + } - let update = redis - .multi() - .hincrby(hkey, timeStr, 1) - .sadd(`${REDIS_PREFIX}stats:keys`, statUpdateKey) - // keep alive at most 2 days - .expire(hkey, MAX_DAYS_STATS + 1 * 24 * 3600); + if (statUpdateKey) { + // increment counter in redis + + let now = new Date(); + + // we keep a separate hash value for each ISO day + let dateStr = `${now + .toISOString() + .substr(0, 10) + .replace(/[^0-9]+/g, '')}`; + + // hash key for bucket + let timeStr = `${now + .toISOString() + // bucket includes 1 minute + .substr(0, 16) + .replace(/[^0-9]+/g, '')}`; + + let hkey = `${REDIS_PREFIX}stats:${statUpdateKey}:${dateStr}`; + + let update = redis + .multi() + .hincrby(hkey, timeStr, 1) + .sadd(`${REDIS_PREFIX}stats:keys`, statUpdateKey) + // keep alive at most 2 days + .expire(hkey, MAX_DAYS_STATS + 1 * 24 * 3600); + + if (account && accountUpdateKey) { + // increment account specific counter + let accountKey = `${REDIS_PREFIX}iad:${account}`; + update = update.hincrby(accountKey, `stats:count:${accountUpdateKey}`, 1); + } - if (account && accountUpdateKey) { - // increment account specific counter + update.exec().catch(() => false); + } else if (account && accountUpdateKey) { let accountKey = `${REDIS_PREFIX}iad:${account}`; - update = update.hincrby(accountKey, `stats:count:${accountUpdateKey}`, 1); + redis.hincrby(accountKey, `stats:count:${accountUpdateKey}`, 1).catch(() => false); } - update.exec().catch(() => false); - } else if (account && accountUpdateKey) { - let accountKey = `${REDIS_PREFIX}iad:${account}`; - redis.hincrby(accountKey, `stats:count:${accountUpdateKey}`, 1).catch(() => false); - } + if (message.key && metrics[message.key] && typeof metrics[message.key][message.method] === 'function') { + metrics[message.key][message.method](...message.args); + } - if (message.key && metrics[message.key] && typeof metrics[message.key][message.method] === 'function') { - metrics[message.key][message.method](...message.args); + return; } - return; + case 'settings': + availableIMAPWorkers.forEach(worker => { + try { + postMessage(worker, message); + } catch (err) { + logger.error({ msg: 'Failed to post command to child', worker: worker.threadId, callPayload: message, err }); + } + }); + return; + + case 'change': + switch (message.type) { + case 'smtpServerState': + case 'imapProxyServerState': { + let type = message.type.replace(/ServerState$/, ''); + updateServerState(type, message.key, message.payload).catch(err => + logger.error({ msg: `Failed to update ${type} server state`, err }) + ); + break; + } + default: + // forward all state changes to the API worker + for (let worker of workers.get('api') || []) { + try { + postMessage(worker, message, true); + } catch (err) { + logger.error({ msg: 'Failed to post state change to child', worker: worker.threadId, callPayload: message, err }); + } + } + } + break; } - case 'settings': - availableIMAPWorkers.forEach(worker => { - try { - postMessage(worker, message); - } catch (err) { - logger.error({ msg: 'Failed to post command to child', worker: worker.threadId, callPayload: message, err }); - } - }); - return; + switch (type) { + case 'imap': + if (message.cmd === 'ready') { + availableIMAPWorkers.add(worker); + isOnline = true; + resolve(worker.threadId); - case 'change': - switch (message.type) { - case 'smtpServerState': - case 'imapProxyServerState': { - let type = message.type.replace(/ServerState$/, ''); - updateServerState(type, message.key, message.payload).catch(err => logger.error({ msg: `Failed to update ${type} server state`, err })); - break; - } - default: - // forward all state changes to the API worker - for (let worker of workers.get('api')) { - try { - postMessage(worker, message, true); - } catch (err) { - logger.error({ msg: 'Failed to post state change to child', worker: worker.threadId, callPayload: message, err }); - } + if (imapInitialWorkersLoaded) { + assignAccounts().catch(err => logger.error({ msg: 'Failed to assign accounts', n: 2, err })); } - } - break; - } - - switch (type) { - case 'imap': - return processImapWorkerMessage(worker, message); - } + } + break; + } + }); }); }; -function processImapWorkerMessage(worker, message) { - if (!message || !message.cmd) { - logger.debug({ msg: 'Unexpected message', type: 'imap', message }); - - return; - } - - switch (message.cmd) { - case 'ready': - availableIMAPWorkers.add(worker); - // assign pending accounts - assignAccounts().catch(err => logger.error({ msg: 'Failed to assign accounts', n: 2, err })); - break; - } -} - async function call(worker, message, transferList) { return new Promise((resolve, reject) => { let mid = `${Date.now()}:${++mids}`; @@ -2051,9 +2068,19 @@ const startApplication = async () => { } // multiple IMAP connection handlers + let workerPromises = []; for (let i = 0; i < config.workers.imap; i++) { - await spawnWorker('imap'); + workerPromises.push(spawnWorker('imap')); + } + let threadIds = await Promise.all(workerPromises); + logger.info({ msg: 'IMAP workers started', workers: config.workers.imap, threadIds }); + + try { + await assignAccounts(); + } catch (err) { + logger.error({ msg: 'Failed to assign accounts', n: 2, err }); } + imapInitialWorkersLoaded = true; for (let i = 0; i < config.workers.webhooks; i++) { await spawnWorker('webhooks'); @@ -2076,7 +2103,7 @@ const startApplication = async () => { await spawnWorker('imapProxy'); } - // single worker for HTTP + // single worker for HTTP, start last to avoid running API requests for still-missing targets await spawnWorker('api'); }; diff --git a/workers/imap.js b/workers/imap.js index af8edaea..efa1e8ed 100644 --- a/workers/imap.js +++ b/workers/imap.js @@ -749,6 +749,8 @@ process.on('SIGINT', () => { }); main().catch(err => { - logger.error({ msg: 'Execution failed', err }); - setImmediate(() => process.exit(6)); + logger.fatal({ msg: 'Execution failed', err }); + logger.flush(() => { + process.exit(6); + }); });