Skip to content

Commit

Permalink
fix(worker-close): If worker dies then do not try to update accounts …
Browse files Browse the repository at this point in the history
…statuses to prevent race conditions
  • Loading branch information
andris9 committed Feb 4, 2025
1 parent 861bc23 commit d532365
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 79 deletions.
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version: '3.7'
name: emailengine-bundle

services:
emailengine:
restart: always
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"scripts": {
"start": "node server.js",
"dev": "EE_OPENAPI_VERBOSE=true EENGINE_LOG_RAW=true node --tls-keylog=keylog.txt server --dbs.redis='redis://127.0.0.1:6379/9' --api.port=7003 --api.host=0.0.0.0 | tee $HOME/ee.log.dev.txt | pino-pretty",
"single": "EE_OPENAPI_VERBOSE=true EENGINE_LOG_RAW=true EENGINE_SECRET=your-encryption-key EENGINE_WORKERS=1 node --inspect server --dbs.redis='redis://127.0.0.1:6379/10' --api.port=7003 --api.host=0.0.0.0 | tee $HOME/ee.log.single.txt | pino-pretty",
"single": "EE_OPENAPI_VERBOSE=true EENGINE_LOG_RAW=true EENGINE_SECRET=your-encryption-key EENGINE_WORKERS=1 node --inspect server --dbs.redis='redis://127.0.0.1:9999/10' --api.port=7003 --api.host=0.0.0.0 | tee $HOME/ee.log.single.txt | pino-pretty",
"gmail": "EE_OPENAPI_VERBOSE=true EENGINE_LOG_RAW=true EENGINE_SECRET=your-encryption-key EENGINE_WORKERS=2 node --inspect server --dbs.redis='redis://127.0.0.1:6379/11' --api.port=7003 --api.host=0.0.0.0 | tee $HOME/ee.log.gmail.txt | pino-pretty",
"test": "NODE_ENV=test grunt",
"swagger": "./getswagger.sh",
Expand Down
84 changes: 7 additions & 77 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -526,63 +526,6 @@ const postMessage = (worker, payload, ignoreOffline, transferList) => {
return result;
};

const countUnassignment = async account => {
if (!unassignCounter.has(account)) {
unassignCounter.set(account, []);
}
let arr = unassignCounter.get(account);
arr.push(Date.now());

if (arr.length > 10) {
arr = arr.slice(-10);
unassignCounter.set(account, arr);
}

let delay = 0;
if (arr.length === 1) {
delay = 0;
} else {
let lastDelay = arr[arr.length - 1] - arr[arr.length - 2];
// if last delay was longer than a minute, then reset
if (lastDelay >= 60 * 1000) {
delay = 0;
} else {
delay = Math.min(Math.ceil(lastDelay * 1.5), 60 * 1000);
}
}

redis.hSetExists(`${REDIS_PREFIX}iad:${account}`, 'state', 'disconnected').catch(err => {
logger.error({ msg: 'Failed to post update account state', account, state: 'disconnected', err });
});

for (let worker of workers.get('api')) {
let callPayload = {
cmd: 'change',
account,
type: 'state',
key: 'disconnected',
payload: null
};

try {
postMessage(worker, callPayload, true);
} catch (err) {
logger.error({ msg: 'Failed to post state change to child', worker: worker.threadId, callPayload, err });
}
}

if (delay) {
await new Promise(resolve => setTimeout(resolve, delay));
return true;
} else {
return false;
}
};

let clearUnassignmentCounter = account => {
unassignCounter.delete(account);
};

let updateServerState = async (type, state, payload) => {
await redis.hset(`${REDIS_PREFIX}${type}`, 'state', state);
if (payload) {
Expand Down Expand Up @@ -743,25 +686,13 @@ let spawnWorker = async type => {
if (workerAssigned.has(worker)) {
let accountList = workerAssigned.get(worker);
workerAssigned.delete(worker);
accountList.forEach(account => {

for (let account of accountList) {
assigned.delete(account);
let shouldReassign = false;
// graceful reconnect
countUnassignment(account)
.then(sr => {
shouldReassign = sr;
})
.catch(() => {
shouldReassign = true;
})
.finally(() => {
unassigned.add(account);

if (shouldReassign) {
assignAccounts().catch(err => logger.error({ msg: 'Failed to assign accounts', n: 1, err }));
}
});
});
unassigned.add(account);
}

assignAccounts().catch(err => logger.error({ msg: 'Failed to assign accounts', n: 1, err }));
}
}

Expand Down Expand Up @@ -1880,7 +1811,6 @@ async function onCommand(worker, message) {

case 'delete':
unassigned.delete(message.account); // if set
clearUnassignmentCounter(message.account);
if (assigned.has(message.account)) {
let assignedWorker = assigned.get(message.account);
if (workerAssigned.has(assignedWorker)) {
Expand Down Expand Up @@ -2330,7 +2260,7 @@ const startApplication = async () => {
try {
await assignAccounts();
} catch (err) {
logger.error({ msg: 'Failed to assign accounts', n: 2, err });
logger.error({ msg: 'Failed to assign accounts', n: 4, err });
}
imapInitialWorkersLoaded = true;

Expand Down

0 comments on commit d532365

Please sign in to comment.