Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

refactor(BUX-411): taskmanager simplification & tasq with redis fixes #510

Merged
merged 20 commits into from
Dec 20, 2023

Conversation

chris-4chain
Copy link
Contributor

@chris-4chain chris-4chain commented Dec 19, 2023

Pull Request Checklist

  • πŸ“– I created my PR using provided : CODE_STANDARDS
  • πŸ“– I have read the short Code of Conduct: CODE_OF_CONDUCT
  • 🏠 I tested my changes locally.
  • βœ… I have provided tests for my changes.
  • πŸ“ I have used conventional commits.
  • πŸ“— I have updated any related documentation.
  • πŸ’Ύ PR was issued based on the Github or Jira issue.

Initially, the goal of this task was to simplify the taskmanager by fixing taskq and localCron and make some refactorization & simplification changes. All are done:

  • I renamed all taskmanager.client-related names/filenames to more precise ones;
  • Removed unnecessary taskmanager.With functional options;
  • Simplified Redis configuration: taskmanager.WithRedis function;
  • Removed our localCron structure, instead, the github.com/robfig/cron/v3 - cronService is used directly.

After my changes, a lot of test files had to be adjusted - that's why so many files are modified.

I tested it locally running different examples and bux-server with backend/frontend. Tasks were correctly scheduled.

TL;DR

Additionally, I tested the bux, running several parallel instances with Redis configuration for taskq. Apparently, the codebase before my changes didn't allow to run it that way. So I fixed the issue, according to task-doc, adding:

if factoryType == FactoryRedis {
  if err := q.Consumer().Start(ctx); err != nil {
	  return err
  }
}

After that, the taskq in different nodes (instances) started to communicate via Redis.

But another problem appeared.
Number of cronJob executions in defined period was about number of active nodes.

For example, For cronJob with 30sec period and 5 active instances: The job executions among all nodes was 5.

In this PR I'm proposing a fix using distributed redis locks with TTL.

  • Every node has local cron enabled.
  • At the beginning of a cron handler I try to lock the task like this:
tryLock = func() bool {
	boolCmd := c.options.taskq.config.Redis.SetNX(ctx, key, "1", lockTime)
	return boolCmd.Val()
}
  • If any node didn't execute the job in lockTime period, the job is added to ataskq queue.

Diagrams for better understanding:

Previous solution

image

New solution

image

I tested it with 6 instances and collecting local metrics. My example task has 2sec period and every time it was executed it sent 1(one) to my local influxDB.

I analysed the results, grouping metrics in 2sec aggregation window and then summing the number of points per window.

Without cronlocks

We can see, that the number of tasks executed per period is not constant and definitely it is not one.
image

With cronlocks

Regardless how many instances are active, only one cronJob handler is executed per time window.
image

Note:

Of course, the solution can be developed, but, for now:

  • it doesn't break the single-node architecture
  • generally it allows to run app in multi-node way

In the future we should re-think whether using taskq is a good idea for us:

Maybe a good alternative will be asynq.

@chris-4chain chris-4chain requested a review from a team as a code owner December 19, 2023 11:32
Copy link
Contributor

mergify bot commented Dec 19, 2023

Welcome to our open-source project @chris-4chain! πŸ’˜

Copy link

codecov bot commented Dec 19, 2023

Codecov Report

Attention: 44 lines in your changes are missing coverage. Please review.

Comparison is base (1d3bbb2) 53.18% compared to head (12a41f8) 53.04%.
Report is 8 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #510      +/-   ##
==========================================
- Coverage   53.18%   53.04%   -0.15%     
==========================================
  Files         110      107       -3     
  Lines       11251    11146     -105     
==========================================
- Hits         5984     5912      -72     
+ Misses       4815     4787      -28     
+ Partials      452      447       -5     
Flag Coverage Ξ”
unittests 53.04% <75.14%> (-0.15%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Ξ”
client.go 60.50% <100.00%> (-0.59%) ⬇️
client_internal.go 72.04% <100.00%> (ΓΈ)
client_options.go 72.85% <100.00%> (-0.46%) ⬇️
taskmanager/options.go 100.00% <100.00%> (ΓΈ)
taskmanager/cron_jobs.go 51.85% <57.14%> (-4.82%) ⬇️
taskmanager/taskmanager.go 70.76% <70.76%> (ΓΈ)
taskmanager/taskq.go 77.19% <73.17%> (+7.28%) ⬆️

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Ξ” = absolute <relative> (impact), ΓΈ = not affected, ? = missing data
Powered by Codecov. Last update 1d3bbb2...12a41f8. Read the comment docs.

taskmanager/taskmanager.go Show resolved Hide resolved
taskmanager/taskq.go Show resolved Hide resolved
taskmanager/taskq.go Outdated Show resolved Hide resolved
taskmanager/taskq.go Show resolved Hide resolved
Comment on lines 139 to 141
// There are two ways to run a task:
// Option 1: Run the task immediately
if options.RunEveryPeriod == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// There are two ways to run a task:
// Option 1: Run the task immediately
if options.RunEveryPeriod == 0 {
if runImmediately(options) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a method to the TaskRunOptions struct. Now the line looks like this.

if options.runImmediately() {
		return c.options.taskq.queue.Add(taskMessage)
	}


// Debugging
c.DebugLog(fmt.Sprintf("registering task: %s...", c.options.taskq.tasks[task.Name].Name()))
// Option 2: Run the task periodically using cron
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO can be removed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Function names are readable enough so the comments were redundant. But I left the note comment about cron The first scheduled run

))
// The runEveryPeriod should be greater than 1 second
if runEveryPeriod < 1*time.Second {
runEveryPeriod = 1 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead of default we could return an error here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

taskmanager/interface.go Outdated Show resolved Hide resolved
taskmanager/taskmanager.go Show resolved Hide resolved
taskmanager/taskmanager.go Outdated Show resolved Hide resolved
taskmanager/taskmanager.go Show resolved Hide resolved
taskmanager/taskmanager.go Outdated Show resolved Hide resolved
taskmanager/taskmanager.go Outdated Show resolved Hide resolved
taskmanager/taskq.go Outdated Show resolved Hide resolved
taskmanager/taskq.go Outdated Show resolved Hide resolved
@mergify mergify bot merged commit 61b5f56 into master Dec 20, 2023
9 of 10 checks passed
@mergify mergify bot deleted the refactor-411-taskmanager-simplification branch December 20, 2023 12:53
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants