-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#151 Specify number of threads to consume by a task. #156
Conversation
b3dadb4
to
43002a0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Just a couple of tiny suggestions.
Also, the description to the PR is so good that you can add a small section to README explaining how to use 'consume.threads'.
Regarding tests - it is always not easy to test concurrency. You can try (maybe in one of next PRs) to test methods separately (whenEnoughResourcesAreAvailable
, getTruncatedResourceCount
, etc).
This is just an idea. To test whenEnoughResourcesAreAvailable()
you can create a fixed thread pool in the test suite itself, and a list of tasks with known execution times. And then you can assert that the wall time is as expected, and the order of execution is as expected.
val maxThreads = appConfig.getInt(Keys.PARALLEL_TASKS) | ||
val consumeThreads = ConfigUtils.getOptionInt(config, CONSUME_THREADS_KEY).getOrElse(DEFAULT_CONSUME_THREADS) | ||
|
||
if (consumeThreads < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<= 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, crap, thank you for spotting this! 😄
@@ -168,7 +168,7 @@ abstract class TaskRunnerBase(conf: Config, | |||
} | |||
|
|||
/** | |||
* Does pre-run checks and vask validations. | |||
* Does pre-run checks and task validations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆
private val executor: ExecutorService = newFixedThreadPool(runtimeConfig.parallelTasks) | ||
implicit private val executionContext: ExecutionContextExecutorService = fromExecutorService(executor) | ||
|
||
private val maxResources = runtimeConfig.parallelTasks | ||
private val availableResources: Semaphore = new Semaphore(maxResources) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking making the semaphore 'fair'. Otherwise, it could happen that easy jobs can block hard jobs for some time.
private val availableResources: Semaphore = new Semaphore(maxResources) | |
private val availableResources: Semaphore = new Semaphore(maxResources, true) |
val result = Try { action } | ||
availableResources.release(resourceCount) | ||
|
||
result.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of wrapping the dangerous code with Try
, and using Try.get at the end!
43002a0
to
da15e63
Compare
da15e63
to
e4c01e2
Compare
Thank you for the feedback, it was fun working on this! I added some description to the README on how to use the parallelism options :) Regarding the tests, it's a very good idea but I would do it in the next PR if that's ok (will have to transfer the tests to a different package and this PR could get too big). |
Add an option to specify how many "threads" should a certain task consume with regards to the total number of threads set by
pramen.parallel.tasks
.An example:
These are not real threads, it is more akin to an indication to pramen saying: "This is a resource-intenstive operation, so you don't want to have many of these running at once".
I tried thinking of a better name to not be as confusing but did not come up with anything useful. I was thinking about
pramen.parallelism.available.slots
anduses.slots
but these do not sound very good 😄Some notes
Try
in thewhenEnoughResourcesAreAvailable
is there to ensure than the permits get released but I call.get()
after to let the exception "bubble up". I was going through the code and it seems that the task run failure is already handled by therunTask()
function and the Future failure is also handled. But still not sure if I had done this correctly.