Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#151 Specify number of threads to consume by a task. #156

Merged
merged 1 commit into from
Feb 10, 2023

Conversation

jirifilip
Copy link
Collaborator

@jirifilip jirifilip commented Feb 9, 2023

Add an option to specify how many "threads" should a certain task consume with regards to the total number of threads set by pramen.parallel.tasks.

An example:

pramen {
  pipeline.name = "A testing pipeline"

  parallel.tasks = 4
}

pramen.operations = [
  {
    name = "easy job"
    ...
   # default is 1
    consume.threads = 1 # not a resource intensive job, so Pramen can run 4 of these at one time (if nothing else is running of course)
  },
  {
    name = "hard job"
    ...
    consume.threads = 4 # run only one instance of this operation at a time
  },
]

These are not real threads, it is more akin to an indication to pramen saying: "This is a resource-intenstive operation, so you don't want to have many of these running at once".

I tried thinking of a better name to not be as confusing but did not come up with anything useful. I was thinking about pramen.parallelism.available.slots and uses.slots but these do not sound very good 😄

Some notes

  • I tested this end-to-end. Was also thinking about how to unit test this better but did not come up with anything other than is in the PR.
  • the Try in the whenEnoughResourcesAreAvailable is there to ensure than the permits get released but I call .get() after to let the exception "bubble up". I was going through the code and it seems that the task run failure is already handled by the runTask() function and the Future failure is also handled. But still not sure if I had done this correctly.
  • It still spawns all of the threads. The ones that don't have enough "slots" are blocking on the semaphore until there are enough "threads" so that they can run.

@jirifilip jirifilip force-pushed the feature/151-number-of-threads-per-task branch 2 times, most recently from b3dadb4 to 43002a0 Compare February 9, 2023 14:50
@jirifilip jirifilip marked this pull request as ready for review February 9, 2023 14:58
@jirifilip jirifilip requested a review from yruslan as a code owner February 9, 2023 14:58
Copy link
Collaborator

@yruslan yruslan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Just a couple of tiny suggestions.

Also, the description to the PR is so good that you can add a small section to README explaining how to use 'consume.threads'.

Regarding tests - it is always not easy to test concurrency. You can try (maybe in one of next PRs) to test methods separately (whenEnoughResourcesAreAvailable, getTruncatedResourceCount, etc).

This is just an idea. To test whenEnoughResourcesAreAvailable() you can create a fixed thread pool in the test suite itself, and a list of tasks with known execution times. And then you can assert that the wall time is as expected, and the order of execution is as expected.

val maxThreads = appConfig.getInt(Keys.PARALLEL_TASKS)
val consumeThreads = ConfigUtils.getOptionInt(config, CONSUME_THREADS_KEY).getOrElse(DEFAULT_CONSUME_THREADS)

if (consumeThreads < 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<= 0 ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, crap, thank you for spotting this! 😄

@@ -168,7 +168,7 @@ abstract class TaskRunnerBase(conf: Config,
}

/**
* Does pre-run checks and vask validations.
* Does pre-run checks and task validations.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

private val executor: ExecutorService = newFixedThreadPool(runtimeConfig.parallelTasks)
implicit private val executionContext: ExecutionContextExecutorService = fromExecutorService(executor)

private val maxResources = runtimeConfig.parallelTasks
private val availableResources: Semaphore = new Semaphore(maxResources)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking making the semaphore 'fair'. Otherwise, it could happen that easy jobs can block hard jobs for some time.

Suggested change
private val availableResources: Semaphore = new Semaphore(maxResources)
private val availableResources: Semaphore = new Semaphore(maxResources, true)

val result = Try { action }
availableResources.release(resourceCount)

result.get
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of wrapping the dangerous code with Try, and using Try.get at the end!

@jirifilip jirifilip force-pushed the feature/151-number-of-threads-per-task branch from 43002a0 to da15e63 Compare February 10, 2023 11:43
@jirifilip jirifilip force-pushed the feature/151-number-of-threads-per-task branch from da15e63 to e4c01e2 Compare February 10, 2023 11:49
@jirifilip
Copy link
Collaborator Author

Thank you for the feedback, it was fun working on this! I added some description to the README on how to use the parallelism options :) Regarding the tests, it's a very good idea but I would do it in the next PR if that's ok (will have to transfer the tests to a different package and this PR could get too big).

@jirifilip jirifilip merged commit 96e1703 into main Feb 10, 2023
@jirifilip jirifilip deleted the feature/151-number-of-threads-per-task branch February 10, 2023 12:02
@yruslan yruslan mentioned this pull request Mar 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants