Skip to content

Commit

Permalink
switch to asyncmap for fetching results from multiple workers
Browse files Browse the repository at this point in the history
(this reduces the amount of waiting for network communication)

Closes #603
  • Loading branch information
exaexa committed May 2, 2022
1 parent e3f89fd commit 581400d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/analysis/sampling/warmup_variability.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ function warmup_from_variability(
end
)

map(fetch, save_at.(workers, :cobrexa_sampling_warmup_optmodel, Ref(save_model)))
asyncmap(fetch, save_at.(workers, :cobrexa_sampling_warmup_optmodel, Ref(save_model)))

fluxes = hcat(
dpmap(
Expand All @@ -92,7 +92,7 @@ function warmup_from_variability(
)

# free the data on workers
map(fetch, remove_from.(workers, :cobrexa_sampling_warmup_optmodel))
asyncmap(fetch, remove_from.(workers, :cobrexa_sampling_warmup_optmodel))

return fluxes, lbs, ubs
end
Expand Down
21 changes: 12 additions & 9 deletions src/analysis/screening.jl
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ function _screen_impl(
workers = [myid()],
)::Array where {V<:AbstractVector,A,N}

map(fetch, save_at.(workers, :cobrexa_screen_variants_model, Ref(model)))
map(fetch, save_at.(workers, :cobrexa_screen_variants_analysis_fn, Ref(analysis)))
map(fetch, get_from.(workers, Ref(:(precache!(cobrexa_screen_variants_model)))))
asyncmap(fetch, save_at.(workers, :cobrexa_screen_variants_model, Ref(model)))
asyncmap(fetch, save_at.(workers, :cobrexa_screen_variants_analysis_fn, Ref(analysis)))
asyncmap(fetch, get_from.(workers, Ref(:(precache!(cobrexa_screen_variants_model)))))

res = pmap(
(vars, args)::Tuple -> screen_variant(
Expand All @@ -144,8 +144,8 @@ function _screen_impl(
zip(variants, args),
)

map(fetch, remove_from.(workers, :cobrexa_screen_variants_model))
map(fetch, remove_from.(workers, :cobrexa_screen_variants_analysis_fn))
asyncmap(fetch, remove_from.(workers, :cobrexa_screen_variants_model))
asyncmap(fetch, remove_from.(workers, :cobrexa_screen_variants_analysis_fn))

return res
end
Expand Down Expand Up @@ -276,7 +276,7 @@ function _screen_optmodel_modifications_impl(
workers = [myid()],
)::Array where {V<:AbstractVector,VF<:AbstractVector,A,N}

map(
asyncmap(
fetch,
save_at.(
workers,
Expand All @@ -290,12 +290,15 @@ function _screen_optmodel_modifications_impl(
),
),
)
map(fetch, save_at.(workers, :cobrexa_screen_optmodel_modifications_fn, Ref(analysis)))
asyncmap(
fetch,
save_at.(workers, :cobrexa_screen_optmodel_modifications_fn, Ref(analysis)),
)

res = pmap(_screen_optmodel_item, CachingPool(workers), zip(modifications, args))

map(fetch, remove_from.(workers, :cobrexa_screen_optmodel_modifications_data))
map(fetch, remove_from.(workers, :cobrexa_screen_optmodel_modifications_fn))
asyncmap(fetch, remove_from.(workers, :cobrexa_screen_optmodel_modifications_data))
asyncmap(fetch, remove_from.(workers, :cobrexa_screen_optmodel_modifications_fn))

return res
end

0 comments on commit 581400d

Please sign in to comment.