Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threaded Dispatcher #142

Merged
merged 19 commits into from
Sep 11, 2024
Merged

Threaded Dispatcher #142

merged 19 commits into from
Sep 11, 2024

Conversation

shikokuchuo
Copy link
Owner

@shikokuchuo shikokuchuo commented Sep 2, 2024

Closes #74. Addresses key remaining issue of #97.

To use:

daemons(dispatcher = NA)

Requires the dispatcher branch of dev nanonext.

Implementation currently benefits from:

  • stack-allocated objects ensuring memory safety
  • higher efficiency with re-use of Aios etc.

This is a straight port of existing dispatcher using inproc sockets, only somewhat optimised. To keep things as light as possible, there is currently limited information retrieval from the dispatcher thread.

Resultant performance is good, and is closer to the non-dispatcher rather than dispatcher case.

Does not yet support the interfaces required for crew autoscaling.

A re-design is yet possible prior to merge.

@shikokuchuo shikokuchuo self-assigned this Sep 2, 2024
@shikokuchuo shikokuchuo added the enhancement New feature or request label Sep 2, 2024
@codecov-commenter
Copy link

codecov-commenter commented Sep 2, 2024

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 99.71%. Comparing base (ab4bde1) to head (2d81ac4).

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #142      +/-   ##
==========================================
+ Coverage   99.70%   99.71%   +0.01%     
==========================================
  Files           9        9              
  Lines         667      704      +37     
==========================================
+ Hits          665      702      +37     
  Misses          2        2              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@shikokuchuo
Copy link
Owner Author

Initial overhead benchmarking shows threaded dispatcher at ~1.2x non-dispatcher case. Not a bad result at all given this is prior to any optimization.

library(mirai)
library(future)

daemons(8, dispatcher = FALSE, .compute = "non-dispatcher")
#> [1] 8
daemons(8, dispatcher = NA, .compute = "threaded-dispatcher")
#> [1] 8
daemons(8, dispatcher = TRUE, .compute = "dispatcher")
#> [1] 8
plan("multisession", workers = 8)

bench::mark(
  mirai(1, .compute = "non-dispatcher")[],
  mirai(1, .compute = "threaded-dispatcher")[],
  mirai(1, .compute = "dispatcher")[],
  value(future(1)),
  relative = TRUE
)
#> # A tibble: 4 × 6
#>   expression                             min median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>                           <dbl>  <dbl>     <dbl>     <dbl>    <dbl>
#> 1 "mirai(1, .compute = \"non-dispatc… 1   e0   1         782.      5.37      Inf
#> 2 "mirai(1, .compute = \"threaded-di… 1.26e0   1.16      699.      1         Inf
#> 3 "mirai(1, .compute = \"dispatcher\… 1.57e0   1.57      470.      1         Inf
#> 4 "value(future(1))"                  1.08e3 893.          1     278.        NaN

Created on 2024-09-03 with reprex v2.1.1

@shikokuchuo shikokuchuo force-pushed the dispatcher branch 2 times, most recently from 953ed72 to 2fe3c8a Compare September 6, 2024 13:35
@shikokuchuo
Copy link
Owner Author

FYI @wlandau this is now ready to merge from my side as experimental. I still have to add documentation and tests.

It doesn't have the same interfaces to support autoscaling, but otherwise works similarly to existing dispatcher. I'll open another thread so we can track {crew}-related discussions - it may be we find more efficient ways of doing things.

@wlandau
Copy link

wlandau commented Sep 10, 2024

Very exciting work! Looking forward to digging into specifics.

@shikokuchuo
Copy link
Owner Author

Dispatcher branch is merged at nanonext.

@shikokuchuo
Copy link
Owner Author

For the record, at point of merge, threaded dispatcher overhead at ~15%. This is really quite good as it has kept up with improvements in base performance over this period.

library(mirai)
library(future)

daemons(8, dispatcher = FALSE, .compute = "non-dispatcher")
#> [1] 8
daemons(8, dispatcher = NA, .compute = "threaded-dispatcher")
#> [1] 8
daemons(8, dispatcher = TRUE, .compute = "dispatcher")
#> [1] 8
plan("multisession", workers = 8)

bench::mark(
  mirai(1, .compute = "non-dispatcher")[],
  mirai(1, .compute = "threaded-dispatcher")[],
  mirai(1, .compute = "dispatcher")[],
  value(future(1)),
  relative = TRUE
)
#> # A tibble: 4 × 6
#>   expression                             min median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>                           <dbl>  <dbl>     <dbl>     <dbl>    <dbl>
#> 1 "mirai(1, .compute = \"non-dispatc… 1   e0   1         791.      8.66      Inf
#> 2 "mirai(1, .compute = \"threaded-di… 1.21e0   1.15      700.      1         Inf
#> 3 "mirai(1, .compute = \"dispatcher\… 1.53e0   1.59      506.      1         NaN
#> 4 "value(future(1))"                  1.02e3 945.          1     278.        NaN

Created on 2024-09-11 with reprex v2.1.1

@shikokuchuo shikokuchuo merged commit 1a7e45f into main Sep 11, 2024
7 checks passed
@shikokuchuo shikokuchuo deleted the dispatcher branch September 11, 2024 16:04
R/daemons.R Show resolved Hide resolved
@shikokuchuo
Copy link
Owner Author

If we use collect_mirai(), which is recommended for production contexts rather than the interruptible [] method, we are now at 1000x performance.

library(mirai)
library(future)

daemons(8, dispatcher = "none", .compute = "none")
#> [1] 8
daemons(8, dispatcher = "thread", .compute = "thread")
#> [1] 8
daemons(8, dispatcher = "process", .compute = "process")
#> [1] 8
plan("multisession", workers = 8)

bench::mark(
  collect_mirai(mirai(1, .compute = "none")),
  collect_mirai(mirai(1, .compute = "thread")),
  collect_mirai(mirai(1, .compute = "process")),
  value(future(1)),
  relative = TRUE
)
#> # A tibble: 4 × 6
#>   expression                             min median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr>                           <dbl>  <dbl>     <dbl>     <dbl>    <dbl>
#> 1 "collect_mirai(mirai(1, .compute =… 1   e0   1         894.      8.07      NaN
#> 2 "collect_mirai(mirai(1, .compute =… 1.14e0   1.13      788.      1         NaN
#> 3 "collect_mirai(mirai(1, .compute =… 1.53e0   1.54      539.      1         Inf
#> 4 "value(future(1))"                  1.08e3 999.          1     278.        NaN

Created on 2024-11-05 with reprex v2.1.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants