Skip to content

Commit

Permalink
Merge pull request #376 from open-AIMS/multiple-rcps-refactor
Browse files Browse the repository at this point in the history
Refactor run_scenarios to improve when running multiple rcps
  • Loading branch information
ConnectedSystems committed Jun 26, 2023
2 parents b79ef42 + 307089d commit e8e1200
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 96 deletions.
79 changes: 49 additions & 30 deletions src/io/result_io.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
const COMPRESSOR = Zarr.BloscCompressor(cname="zstd", clevel=2, shuffle=true)


function get_geometry(df::DataFrame)
if columnindex(df, :geometry) > 0
return df.geometry
Expand Down Expand Up @@ -162,7 +165,10 @@ Sets up an on-disk result store.
│ ├───seed
│ └───shade
├───results
│ ├───relative_cover
| ├───juvenile_indicator
| ├───relative_juveniles
│ ├───relative_taxa_cover
│ ├───total_absolute_cover
│ ├───relative_shelter_volume
│ └───absolute_shelter_volume
├───site_data
Expand All @@ -180,30 +186,25 @@ Sets up an on-disk result store.
# Arguments
- `domain` : ADRIA scenario domain
- `param_df` : ADRIA scenario specification
- `scen_spec` : ADRIA scenario specification
# Returns
domain, (relative_cover, relative_shelter_volume, absolute_shelter_volume, site_ranks, seed_log, fog_log, shade_log)
domain, (total_absolute_cover, relative_shelter_volume, absolute_shelter_volume, relative_juveniles,
juvenile_indicator, relative_taxa_cover, site_ranks, seed_log, fog_log, shade_log)
"""
function setup_result_store!(domain::Domain, param_df::DataFrame)::Tuple
if "RCP" in names(param_df)
param_df = param_df[:, Not("RCP")] # Ignore RCP column if it exists
end

# TODO: Support setting up a combined result store.

# Insert RCP column and populate with this dataset's RCP
insertcols!(param_df, 1, :RCP => parse(Float64, domain.RCP))

function setup_result_store!(domain::Domain, scen_spec::DataFrame)::Tuple
@set! domain.scenario_invoke_time = replace(string(now()), "T" => "_", ":" => "_", "." => "_")
log_location::String = joinpath(ENV["ADRIA_OUTPUT_DIR"], "$(domain.name)__RCPs$(domain.RCP)__$(domain.scenario_invoke_time)")

# Collect defined RCPs
rcps = string.(unique(scen_spec, "RCP")[!, "RCP"])
log_location::String = _result_location(domain, rcps)

z_store = DirectoryStore(log_location)

# Store copy of inputs
input_loc::String = joinpath(z_store.folder, INPUTS)
input_dims::Tuple{Int64,Int64} = size(param_df)
attrs::Dict = scenario_attributes(domain, param_df)
input_dims::Tuple{Int64,Int64} = size(scen_spec)
attrs::Dict = scenario_attributes(domain, scen_spec)

# Write a copy of spatial data to the result set
mkdir(joinpath(log_location, "site_data"))
Expand All @@ -218,17 +219,18 @@ function setup_result_store!(domain::Domain, param_df::DataFrame)::Tuple
GDF.write(geo_fn, domain.site_data; geom_columns=(:geom,), driver="geojson")
end

inputs = zcreate(Float64, input_dims...; fill_value=-9999.0, fill_as_missing=false, path=input_loc, chunks=input_dims, attrs=attrs)

# Store copy of model specification as CSV
mkdir(joinpath(log_location, "model_spec"))
model_spec(domain, joinpath(log_location, "model_spec", "model_spec.csv"))

# Create store for scenario spec
inputs = zcreate(Float64, input_dims...; fill_value=-9999.0, fill_as_missing=false, path=input_loc, chunks=input_dims, attrs=attrs)

# Store post-processed table of input parameters.
# +1 skips the RCP column
integer_params = findall(domain.model[:ptype] .== "integer")
map_to_discrete!(param_df[:, integer_params.+1], getindex.(domain.model[:bounds], 2)[integer_params])
inputs[:, :] = Matrix(param_df)
map_to_discrete!(scen_spec[:, integer_params.+1], getindex.(domain.model[:bounds], 2)[integer_params])
inputs[:, :] = Matrix(scen_spec)

tf, n_sites, _ = size(domain.dhw_scens)

Expand All @@ -243,15 +245,13 @@ function setup_result_store!(domain::Domain, param_df::DataFrame)::Tuple
elseif d == "sites"
append!(dl, n_sites)
elseif d == "scenarios"
append!(dl, nrow(param_df))
append!(dl, nrow(scen_spec))
end
end

return (dl...,)
end

compressor = Zarr.BloscCompressor(cname="zstd", clevel=2, shuffle=true)

met_names = [:total_absolute_cover, :relative_shelter_volume,
:absolute_shelter_volume, :relative_juveniles, :juvenile_indicator]

Expand All @@ -267,7 +267,7 @@ function setup_result_store!(domain::Domain, param_df::DataFrame)::Tuple
fill_value=nothing, fill_as_missing=false,
path=joinpath(z_store.folder, RESULTS, string(m_name)), chunks=(result_dims[1:end-1]..., 1),
attrs=dim_struct,
compressor=compressor)
compressor=COMPRESSOR)
for m_name in met_names
]

Expand All @@ -280,16 +280,27 @@ function setup_result_store!(domain::Domain, param_df::DataFrame)::Tuple
attrs=Dict(
:structure => string.(ADRIA.metrics.relative_taxa_cover.dims)
),
compressor=compressor))
compressor=COMPRESSOR))
push!(met_names, :relative_taxa_cover)

dhw_stats_store = store_env_summary(domain.dhw_scens, "dhw_scenario", joinpath(z_store.folder, ENV_STATS, "dhw"), domain.RCP, compressor)
wave_stats_store = store_env_summary(domain.wave_scens, "wave_scenario", joinpath(z_store.folder, ENV_STATS, "wave"), domain.RCP, compressor)
# dhw and wave zarrays
dhw_stats = []
wave_stats = []
dhw_stat_names = []
wave_stat_names = []
for rcp in rcps
push!(dhw_stats, store_env_summary(domain.dhw_scens, "dhw_scenario", joinpath(z_store.folder, ENV_STATS, "dhw"), rcp, COMPRESSOR))
push!(wave_stats, store_env_summary(domain.wave_scens, "wave_scenario", joinpath(z_store.folder, ENV_STATS, "wave"), rcp, COMPRESSOR))

push!(dhw_stat_names, Symbol("dhw_stat_$rcp"))
push!(wave_stat_names, Symbol("wave_stat_$rcp"))
end
stat_store_names = vcat(dhw_stat_names, wave_stat_names)

# Group all data stores
stores = [stores..., dhw_stats_store, wave_stats_store, setup_logs(z_store, unique_sites(domain), nrow(param_df), tf, n_sites)...]
stores = [stores..., dhw_stats..., wave_stats..., setup_logs(z_store, unique_sites(domain), nrow(scen_spec), tf, n_sites)...]

return domain, (; zip((met_names..., :dhw_stats, :wave_stats, :site_ranks, :seed_log, :fog_log, :shade_log,), stores)...)
return domain, (; zip((met_names..., stat_store_names..., :site_ranks, :seed_log, :fog_log, :shade_log,), stores)...)
end

"""
Expand All @@ -316,7 +327,6 @@ function _recreate_stats_from_store(zarr_store_path::String)::Dict{String,Abstra
return stat_d
end


"""
load_results(result_loc::String)::ResultSet
load_results(domain::Domain)::ResultSet
Expand Down Expand Up @@ -433,3 +443,12 @@ Generate path to the data store of results for the given Domain.
function result_location(d::Domain)::String
return joinpath(ENV["ADRIA_OUTPUT_DIR"], "$(d.name)__RCPs$(d.RCP)__$(d.scenario_invoke_time)")
end

"""
_result_location(d::Domain, rcps::Vector{String})::String
Generate path to the data store of results for the given Domain and RCPs names.
"""
function _result_location(d::Domain, rcps::Vector{String})::String
return joinpath(ENV["ADRIA_OUTPUT_DIR"], "$(d.name)__RCPs_$(join(rcps, "_"))__$(d.scenario_invoke_time)")
end
125 changes: 60 additions & 65 deletions src/scenario.jl
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ end


"""
run_scenarios(param_df::DataFrame, domain::Domain; remove_workers=true)
run_scenarios(param_df::DataFrame, domain::Domain, rcp::String; remove_workers=true)
run_scenarios(param_df::DataFrame, domain::Domain, rcp::Array{String}; remove_workers=true)
run_scenarios(param_df::DataFrame, domain::Domain, RCP::String; show_progress=true, remove_workers=true)
run_scenarios(param_df::DataFrame, domain::Domain, RCP::Vector{String}; show_progress=true, remove_workers=true)
Run scenarios defined by the parameter table storing results to disk.
Scenarios are run in parallel where the number of scenarios > 256.
Expand All @@ -54,97 +53,93 @@ Scenarios are run in parallel where the number of scenarios > 256.
...
julia> rs_45 = ADRIA.run_scenarios(p_df, dom, "45")
julia> rs_45_60 = ADRIA.run_scenarios(p_df, dom, ["45", "60"])
julia> rs_all = ADRIA.run_scenarios(p_df, dom)
```
# Arguments
- param_df : DataFrame of scenarios to run
- domain : Domain, to run scenarios with
- rcp : ID of RCP(s) to run scenarios under.
- remove_workers : if running in parallel, removes workers after completion
- RCP : ID or list of of RCP(s) to run scenarios under.
- show_progress : Display progress
- remove_workers : If running in parallel, removes workers after completion
# Returns
ResultSet
"""
function run_scenarios(param_df::DataFrame, domain::Domain; remove_workers=true)::ResultSet
# Identify available data
avail_data::Vector{String} = readdir(joinpath(domain.env_layer_md.dpkg_path, "DHWs"))
RCP_ids = replace.(avail_data, "dhwRCP" => "", ".nc" => "")

@info "Running scenarios for RCPs: $(RCP_ids)"
return run_scenarios(param_df, domain, RCP_ids::Array{String}; remove_workers=remove_workers)
end
function run_scenarios(param_df::DataFrame, domain::Domain, RCP::String; show_progress=true, remove_workers=true)::ResultSet
return run_scenarios(param_df, domain, [RCP]; show_progress, remove_workers)
end
function run_scenarios(param_df::DataFrame, domain::Domain, RCP::Vector{String}; show_progress=true, remove_workers=true)::ResultSet
# Initialize ADRIA configuration options
setup()
parallel = (nrow(param_df) > 128) && (parse(Bool, ENV["ADRIA_DEBUG"]) == false)
if parallel
_setup_workers()
sleep(2) # wait a bit while workers spin-up
@eval @everywhere using ADRIA
end

# Sort RCPs so the dataframe order match the output filepath
RCP = sort(RCP)

domain = switch_RCPs!(domain, RCP)
domain, data_store = ADRIA.setup_result_store!(domain, param_df)
@info "Running $(nrow(param_df)) scenarios over $(length(RCP)) RCPs: $RCP"

# Cross product between rcps and param_df to have every row of param_df for each rcp
rcps_df = DataFrame(RCP=parse.(Int64, RCP))
scenarios_df = crossjoin(param_df, rcps_df)
sort!(scenarios_df, :RCP)

@info "Setting up Result Set"
domain, data_store = ADRIA.setup_result_store!(domain, scenarios_df)

# Convert DataFrame to named matrix for faster iteration
scenarios_matrix = NamedDimsArray(
Matrix(scenarios_df);
scenarios=1:nrow(scenarios_df),
factors=names(scenarios_df)
)

# Setup cache to reuse for each scenario run
cache = setup_cache(domain)
run_msg = "Running $(nrow(param_df)) scenarios for RCP $RCP"

# Convert to named matrix for faster iteration
scen_mat = NamedDimsArray(Matrix(param_df); scenarios=1:nrow(param_df), factors=names(param_df))
parallel = (nrow(param_df) >= 4096) && (parse(Bool, ENV["ADRIA_DEBUG"]) == false)
if parallel && nworkers() == 1
@info "Setting up parallel processing..."
spinup_time = @elapsed begin
_setup_workers()
sleep(2) # wait a bit while workers spin-up

# Batch run scenarios
func = (dfx) -> run_scenario(dfx..., domain, data_store, cache)
if parallel
if show_progress
@showprogress run_msg 4 pmap(func, enumerate(eachrow(scen_mat)))
else
pmap(func, enumerate(eachrow(scen_mat)))
# load ADRIA on workers and define helper function
@everywhere @eval using ADRIA
@everywhere @eval func = (dfx) -> run_scenario(dfx..., domain, data_store, cache)
end

if remove_workers
_remove_workers()
end
@info "Time taken to spin up workers: $(spinup_time) seconds"

# Define number of scenarios to run before returning results to main
# https://discourse.julialang.org/t/parallelism-understanding-pmap-and-the-batch-size-parameter/15604/2
# https://techytok.com/lesson-parallel-computing/
b_size = 4
else
if show_progress
@showprogress run_msg 4 map(func, enumerate(eachrow(scen_mat)))
else
map(func, enumerate(eachrow(scen_mat)))
end
b_size = 1
end

return load_results(domain)
end
function run_scenarios(param_df::DataFrame, domain::Domain, RCP_ids::Array{String}; show_progress=true, remove_workers=true)::ResultSet
@info "Running $(nrow(param_df)) scenarios across $(length(RCP_ids)) RCPs"

setup()
output_dir = ENV["ADRIA_OUTPUT_DIR"]
# Define local helper
func = (dfx) -> run_scenario(dfx..., domain, data_store, cache)

result_sets::Vector{ResultSet} = Vector{ResultSet}(undef, length(RCP_ids))
for (i, RCP) in enumerate(RCP_ids)
tmp_dir = mktempdir(prefix="ADRIA_")
ENV["ADRIA_OUTPUT_DIR"] = tmp_dir
for rcp in RCP
run_msg = "Running $(nrow(param_df)) scenarios for RCP $rcp"

result_sets[i] = run_scenarios(param_df, domain, RCP; show_progress=show_progress, remove_workers=false)
# Switch RCPs so correct data is loaded
domain = switch_RCPs!(domain, rcp)
target_rows = findall(scenarios_matrix("RCP") .== parse(Float64, rcp))
if show_progress
@showprogress run_msg 4 pmap(func, zip(target_rows, eachrow(scenarios_matrix[target_rows, :])), batch_size=b_size)
else
pmap(func, zip(target_rows, eachrow(scenarios_matrix[target_rows, :])), batch_size=b_size)
end
end

if remove_workers
if parallel && remove_workers
_remove_workers()
end

ENV["ADRIA_OUTPUT_DIR"] = output_dir

rs = combine_results(result_sets...)

# Remove temporary result dirs
for t in result_sets
rm(result_location(t); force=true, recursive=true)
end

return rs
return load_results(_result_location(domain, RCP))
end


"""
run_scenario(idx::Int64, param_set::Union{AbstractVector, DataFrameRow}, domain::Domain, data_store::NamedTuple, cache::NamedTuple)::NamedTuple
run_scenario(idx::Int64, param_set::Union{AbstractVector, DataFrameRow}, domain::Domain, data_store::NamedTuple)::NamedTuple
Expand Down Expand Up @@ -458,7 +453,7 @@ function run_model(domain::Domain, param_set::NamedDimsArray, corals::DataFrame,

env_horizon = zeros(Int64(param_set("plan_horizon") + 1), n_sites) # temporary cache for planning horizon

# basal_area_per_settler is the area in m^2 of a size class one coral
# basal_area_per_settler is the area in m^2 of a size class one coral
basal_area_per_settler = corals.colony_area_cm2[corals.class_id.==1] ./ 100 .^ 2
@inbounds for tstep::Int64 in 2:tf
p_step::Int64 = tstep - 1
Expand Down
2 changes: 1 addition & 1 deletion src/utils/setup.jl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ end

"""Remove workers and free up memory."""
function _remove_workers()::Nothing
if nprocs() > 1
if nworkers() > 1
rmprocs(workers()...)
end

Expand Down

0 comments on commit e8e1200

Please sign in to comment.