Skip to content

Commit

Permalink
Fix memory leaking
Browse files Browse the repository at this point in the history
  • Loading branch information
NV4RE committed Aug 26, 2020
1 parent a7a1ece commit ffbea7c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 33 deletions.
21 changes: 11 additions & 10 deletions src/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,17 @@ const processCommands = async (
};

export const executor = async () => {
try {
const commands: Command.AllCommand[] = await poll(commandConsumerClient);
if (commands.length) {
await processCommands(commands);
commandConsumerClient.commit();
while (true) {
try {
const commands: Command.AllCommand[] = await poll(commandConsumerClient);
if (commands.length) {
await processCommands(commands);
commandConsumerClient.commit();
}
} catch (error) {
// Handle consume error
console.warn('command', error);
await sleep(1000);
}
} catch (error) {
// Handle consume error
console.warn(error);
await sleep(1000);
}
setImmediate(executor);
};
46 changes: 23 additions & 23 deletions src/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -716,30 +716,30 @@ export const processUpdateTasks = async (
};

export const executor = async () => {
try {
const tasksUpdate: Event.ITaskUpdate[] = await poll(
stateConsumerClient,
200,
);
if (tasksUpdate.length) {
// Grouped by workflowId, so it can execute parallely, but same workflowId have to run sequential bacause it can effect each other
const groupedTasks = R.values(
R.groupBy(R.path(['workflowId']), tasksUpdate),
);

await Promise.all(
groupedTasks.map((workflowTasksUpdate: Event.ITaskUpdate[]) =>
processUpdateTasks(workflowTasksUpdate),
),
while (true) {
try {
const tasksUpdate: Event.ITaskUpdate[] = await poll(
stateConsumerClient,
200,
);

stateConsumerClient.commit();
if (tasksUpdate.length) {
// Grouped by workflowId, so it can execute parallely, but same workflowId have to run sequential bacause it can effect each other
const groupedTasks = R.values(
R.groupBy(R.path(['workflowId']), tasksUpdate),
);

await Promise.all(
groupedTasks.map((workflowTasksUpdate: Event.ITaskUpdate[]) =>
processUpdateTasks(workflowTasksUpdate),
),
);

stateConsumerClient.commit();
}
} catch (error) {
// Handle error here
console.warn('state', error);
await sleep(1000);
}
} catch (error) {
// Handle error here
console.warn(error);
await sleep(1000);
} finally {
setImmediate(executor);
}
};

0 comments on commit ffbea7c

Please sign in to comment.