Skip to content

Commit

Permalink
feat(api): Allow to override EENGINE_TIMEOUT value for a single API r…
Browse files Browse the repository at this point in the history
…equest
  • Loading branch information
andris9 committed Oct 31, 2023
1 parent bacd182 commit 9a3aec3
Show file tree
Hide file tree
Showing 9 changed files with 450 additions and 88 deletions.
202 changes: 175 additions & 27 deletions lib/account.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class Account {

this.secret = options.secret;

this.timeout = options.timeout ? Number(options.timeout) : 0;

this.esClient = options.esClient;

this.call = options.call; // async method to request data from parent
Expand Down Expand Up @@ -531,7 +533,11 @@ class Account {
if (reconnectRequested) {
// changes detected!
this.logger.info({ msg: 'IMAP configuration changed for account', account: this.account });
await this.call({ cmd: 'update', account: this.account });
await this.call({
cmd: 'update',
account: this.account,
timeout: this.timeout
});
}

return {
Expand Down Expand Up @@ -601,10 +607,18 @@ class Account {
if (result[0][1] && result[0][1].account) {
// existing user
state = 'existing';
await this.call({ cmd: 'update', account: this.account });
await this.call({
cmd: 'update',
account: this.account,
timeout: this.timeout
});
} else {
state = 'new';
await this.call({ cmd: 'new', account: this.account });
await this.call({
cmd: 'new',
account: this.account,
timeout: this.timeout
});
}

return { account: this.account, state };
Expand Down Expand Up @@ -677,7 +691,11 @@ class Account {
this.logger.error({ msg: 'Failed to add entry to documents queue', err });
}

await this.call({ cmd: 'delete', account: this.account });
await this.call({
cmd: 'delete',
account: this.account,
timeout: this.timeout
});

return {
account: this.account,
Expand All @@ -691,7 +709,16 @@ class Account {
const { port1, port2 } = new MessageChannel();
const stream = new MessagePortReadable(port1);

let streamCreated = await this.call({ cmd: 'getRawMessage', account: this.account, message, port: port2 }, [port2]);
let streamCreated = await this.call(
{
cmd: 'getRawMessage',
account: this.account,
message,
timeout: this.timeout,
port: port2
},
[port2]
);

if (streamCreated && streamCreated.headers) {
stream.headers = streamCreated.headers;
Expand All @@ -706,7 +733,16 @@ class Account {
const { port1, port2 } = new MessageChannel();
const stream = new MessagePortReadable(port1);

let streamCreated = await this.call({ cmd: 'getAttachment', account: this.account, attachment, port: port2 }, [port2]);
let streamCreated = await this.call(
{
cmd: 'getAttachment',
account: this.account,
attachment,
timeout: this.timeout,
port: port2
},
[port2]
);

if (streamCreated && streamCreated.headers) {
stream.headers = streamCreated.headers;
Expand Down Expand Up @@ -806,13 +842,26 @@ class Account {
async updateMessage(message, updates) {
await this.loadAccountData(this.account, true);

return await this.call({ cmd: 'updateMessage', account: this.account, message, updates });
return await this.call({
cmd: 'updateMessage',
account: this.account,
message,
updates,
timeout: this.timeout
});
}

async updateMessages(path, search, updates) {
await this.loadAccountData(this.account, true);

return await this.call({ cmd: 'updateMessages', account: this.account, path, search, updates });
return await this.call({
cmd: 'updateMessages',
account: this.account,
path,
search,
updates,
timeout: this.timeout
});
}

async listMailboxes(query) {
Expand All @@ -824,47 +873,94 @@ class Account {
};
}

return await this.call({ cmd: 'listMailboxes', account: this.account, options });
return await this.call({
cmd: 'listMailboxes',
account: this.account,
options,
timeout: this.timeout
});
}

async moveMessage(message, target) {
await this.loadAccountData(this.account, true);
return await this.call({ cmd: 'moveMessage', account: this.account, message, target });
return await this.call({
cmd: 'moveMessage',
account: this.account,
message,
target,
timeout: this.timeout
});
}

async moveMessages(source, search, target) {
await this.loadAccountData(this.account, true);
return await this.call({ cmd: 'moveMessages', account: this.account, source, search, target });
return await this.call({
cmd: 'moveMessages',
account: this.account,
source,
search,
target,
timeout: this.timeout
});
}

async deleteMessage(message, force) {
await this.loadAccountData(this.account, true);

return await this.call({ cmd: 'deleteMessage', account: this.account, message, force });
return await this.call({
cmd: 'deleteMessage',
account: this.account,
message,
force,
timeout: this.timeout
});
}

async deleteMessages(path, search, force) {
await this.loadAccountData(this.account, true);

return await this.call({ cmd: 'deleteMessages', account: this.account, path, search, force });
return await this.call({
cmd: 'deleteMessages',
account: this.account,
path,
search,
force,
timeout: this.timeout
});
}

async createMailbox(path) {
await this.loadAccountData(this.account, true);

return await this.call({ cmd: 'createMailbox', account: this.account, path });
return await this.call({
cmd: 'createMailbox',
account: this.account,
path,
timeout: this.timeout
});
}

async renameMailbox(path, newPath) {
await this.loadAccountData(this.account, true);

return await this.call({ cmd: 'renameMailbox', account: this.account, path, newPath });
return await this.call({
cmd: 'renameMailbox',
account: this.account,
path,
newPath,
timeout: this.timeout
});
}

async deleteMailbox(path) {
await this.loadAccountData(this.account, true);

return await this.call({ cmd: 'deleteMailbox', account: this.account, path });
return await this.call({
cmd: 'deleteMailbox',
account: this.account,
path,
timeout: this.timeout
});
}

async getText(text, options) {
Expand Down Expand Up @@ -913,7 +1009,13 @@ class Account {

await this.loadAccountData(this.account, true);

return await this.call({ cmd: 'getText', account: this.account, text, options });
return await this.call({
cmd: 'getText',
account: this.account,
text,
options,
timeout: this.timeout
});
}

async getMessage(message, options) {
Expand Down Expand Up @@ -1081,7 +1183,13 @@ class Account {

await this.loadAccountData(this.account, true);

let messageData = await this.call({ cmd: 'getMessage', account: this.account, message, options });
let messageData = await this.call({
cmd: 'getMessage',
account: this.account,
message,
options,
timeout: this.timeout
});
if (!messageData) {
let message = 'Requested message was not found';
let error = Boom.boomify(new Error(message), { statusCode: 404 });
Expand Down Expand Up @@ -1220,7 +1328,16 @@ class Account {
}

// mailbox seems to exist, so call parent to resolve open connection
return await this.call(Object.assign({ cmd: 'listMessages', account: this.account }, query));
return await this.call(
Object.assign(
{
cmd: 'listMessages',
account: this.account
},
query,
{ timeout: this.timeout }
)
);
}

async searchMessages(query, searchOpts) {
Expand Down Expand Up @@ -1618,13 +1735,27 @@ class Account {
}

// mailbox seems to exist, so call parent to resolve open connection
return await this.call(Object.assign({ cmd: 'listMessages', account: this.account }, query));
return await this.call(
Object.assign(
{
cmd: 'listMessages',
account: this.account
},
query,
{ timeout: this.timeout }
)
);
}

async uploadMessage(data) {
await this.loadAccountData(this.account, true);

let messageData = await this.call({ cmd: 'uploadMessage', account: this.account, data });
let messageData = await this.call({
cmd: 'uploadMessage',
account: this.account,
data,
timeout: this.timeout
});
return messageData;
}

Expand All @@ -1637,7 +1768,7 @@ class Account {
account: this.account,
data,
// extended wait period when sending emails
timeout: 10 * 60 * 1000
timeout: Math.max(this.timeout, 10 * 60 * 1000)
}
//typeof data.raw === 'object' ? [data.raw] : []
);
Expand All @@ -1653,7 +1784,8 @@ class Account {
cmd: 'queueMessage',
account: this.account,
data,
meta
meta,
timeout: this.timeout
}
//typeof data.raw === 'object' ? [data.raw] : []
);
Expand All @@ -1664,7 +1796,11 @@ class Account {
await this.loadAccountData(this.account, true);

if (data.reconnect) {
await this.call({ cmd: 'update', account: this.account });
await this.call({
cmd: 'update',
account: this.account,
timeout: this.timeout
});
return true;
}
return false;
Expand All @@ -1674,7 +1810,11 @@ class Account {
await this.loadAccountData(this.account, true);

if (data.sync) {
await this.call({ cmd: 'sync', account: this.account });
await this.call({
cmd: 'sync',
account: this.account,
timeout: this.timeout
});
return true;
}
return false;
Expand Down Expand Up @@ -1715,7 +1855,11 @@ class Account {
}

try {
await this.call({ cmd: 'pause', account: this.account });
await this.call({
cmd: 'pause',
account: this.account,
timeout: this.timeout
});

let notifyFrom = data.notifyFrom && data.notifyFrom !== 'now' ? data.notifyFrom : new Date();

Expand Down Expand Up @@ -1800,7 +1944,11 @@ class Account {
let finalize = async () => {
// Wait a bit before resuming. Just to be sure all pending processes have been completed.
await new Promise(r => setTimeout(r, 5 * 1000));
await this.call({ cmd: 'resume', account: this.account });
await this.call({
cmd: 'resume',
account: this.account,
timeout: this.timeout
});
};
finalize().catch(err => {
this.logger.error({ msg: 'Failed to finish flushing', account: this.account, err });
Expand Down
Loading

0 comments on commit 9a3aec3

Please sign in to comment.