-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
make retry() more flexible #19331
Merged
Merged
make retry() more flexible #19331
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,6 +59,7 @@ export | |
EachLine, | ||
Enum, | ||
Enumerate, | ||
ExponentialBackOff, | ||
Factorization, | ||
FileMonitor, | ||
FloatRange, | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,7 @@ pgenerate(f, c) = pgenerate(default_worker_pool(), f, c) | |
pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...)) | ||
|
||
""" | ||
pmap([::AbstractWorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection | ||
pmap([::AbstractWorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[]), retry_check=nothing) -> collection | ||
|
||
Transform collection `c` by applying `f` to each element using available | ||
workers and tasks. | ||
|
@@ -59,27 +59,27 @@ you can specify an error handling function via argument `on_error` which takes i | |
the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value | ||
which is then returned inline with the results to the caller. | ||
|
||
Failed computation can also be retried via `retry_on`, `retry_n`, `retry_max_delay`, which are passed through | ||
to `retry` as arguments `retry_on`, `n` and `max_delay` respectively. If batching is specified, and an entire batch fails, | ||
all items in the batch are retried. | ||
Failed computation can also be retried via `retry_delays`, `retry_check`, which | ||
are passed through to `retry` as keyword arguments `delays` and `check`, | ||
respectively. If batching is specified, and an entire batch fails, all items in | ||
the batch are retried. | ||
|
||
The following are equivalent: | ||
|
||
* `pmap(f, c; distributed=false)` and `asyncmap(f,c)` | ||
* `pmap(f, c; retry_n=1)` and `asyncmap(retry(remote(f)),c)` | ||
* `pmap(f, c; retry_n=1, on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)` | ||
* `pmap(f, c; retry_delays=Base.ExponentialBackOff())` and `asyncmap(retry(remote(f)),c)` | ||
* `pmap(f, c; retry_delays=Base.ExponentialBackOff(), on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)` | ||
""" | ||
function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing, | ||
retry_n=0, | ||
retry_max_delay=DEFAULT_RETRY_MAX_DELAY, | ||
retry_on=DEFAULT_RETRY_ON, | ||
retry_delays=[], | ||
retry_check=nothing, | ||
# deprecated keyword args: | ||
err_retry=nothing, err_stop=nothing, pids=nothing) | ||
#15409 | ||
if err_retry !== nothing | ||
depwarn("err_retry is deprecated, use pmap(retry(f), c...).", :pmap) | ||
if err_retry == true | ||
f = retry(f) | ||
f = retry(f, delays=retry_delays, check=retry_check) | ||
end | ||
end | ||
if pids !== nothing | ||
|
@@ -110,8 +110,8 @@ function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_er | |
f = remote(p, f) | ||
end | ||
|
||
if retry_n > 0 | ||
f = wrap_retry(f, retry_on, retry_n, retry_max_delay) | ||
if length(retry_delays) > 0 | ||
f = wrap_retry(f, retry_delays, retry_check) | ||
end | ||
if on_error !== nothing | ||
f = wrap_on_error(f, on_error) | ||
|
@@ -122,18 +122,18 @@ function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_er | |
# During batch processing, We need to ensure that if on_error is set, it is called | ||
# for each element in error, and that we return as many elements as the original list. | ||
# retry, if set, has to be called element wise and we will do a best-effort | ||
# to ensure that we do not call mapped function on the same element more than retry_n. | ||
# to ensure that we do not call mapped function on the same element more than length(retry_delays). | ||
# This guarantee is not possible in case of worker death / network errors, wherein | ||
# we will retry the entire batch on a new worker. | ||
if (on_error !== nothing) || (retry_n > 0) | ||
if (on_error !== nothing) || (length(retry_delays) > 0) | ||
f = wrap_on_error(f, (x,e)->BatchProcessingError(x,e); capture_data=true) | ||
end | ||
f = wrap_batch(f, p, on_error) | ||
results = asyncmap(f, c; ntasks=()->nworkers(p), batch_size=batch_size) | ||
|
||
# handle error processing.... | ||
if (on_error !== nothing) || (retry_n > 0) | ||
process_batch_errors!(p, f_orig, results, on_error, retry_on, retry_n, retry_max_delay) | ||
if (on_error !== nothing) || (length(retry_delays) > 0) | ||
process_batch_errors!(p, f_orig, results, on_error, retry_delays, retry_check) | ||
end | ||
|
||
return results | ||
|
@@ -158,7 +158,7 @@ function wrap_on_error(f, on_error; capture_data=false) | |
end | ||
end | ||
|
||
wrap_retry(f, retry_on, n, max_delay) = retry(f, retry_on; n=n, max_delay=max_delay) | ||
wrap_retry(f, retry_delays, retry_check) = retry(f, delays=retry_delays, check=retry_check) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason for this was a |
||
function wrap_batch(f, p, on_error) | ||
f = asyncmap_batch(f) | ||
|
@@ -177,9 +177,9 @@ end | |
|
||
asyncmap_batch(f) = batch -> asyncmap(x->f(x...), batch) | ||
|
||
function process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry_max_delay) | ||
function process_batch_errors!(p, f, results, on_error, retry_delays, retry_check) | ||
# Handle all the ones in error in another pmap, with batch size set to 1 | ||
if (on_error !== nothing) || (retry_n > 0) | ||
if (on_error !== nothing) || (length(retry_delays) > 0) | ||
reprocess = [] | ||
for (idx, v) in enumerate(results) | ||
if isa(v, BatchProcessingError) | ||
|
@@ -190,13 +190,11 @@ function process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry | |
if length(reprocess) > 0 | ||
errors = [x[2] for x in reprocess] | ||
exceptions = [x.ex for x in errors] | ||
if (retry_n > 0) && all([retry_on(ex) for ex in exceptions]) | ||
retry_n = retry_n - 1 | ||
state = start(retry_delays) | ||
if (length(retry_delays) > 0) && | ||
(retry_check==nothing || all([retry_check(state,ex)[2] for ex in exceptions])) | ||
error_processed = pmap(p, f, [x.data for x in errors]; | ||
on_error=on_error, | ||
retry_on=retry_on, | ||
retry_n=retry_n, | ||
retry_max_delay=retry_max_delay) | ||
on_error = on_error, retry_delays = collect(retry_delays)[2:end], retry_check = retry_check) | ||
elseif on_error !== nothing | ||
error_processed = map(on_error, exceptions) | ||
else | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this does get exported, then be sure to add it to the stdlib doc index