Skip to content

Commit

Permalink
Apply refactor to gcp.py
Browse files Browse the repository at this point in the history
  • Loading branch information
nweires committed Dec 11, 2023
1 parent a65239d commit b1b3d42
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 113 deletions.
2 changes: 1 addition & 1 deletion buildstockbatch/aws/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -1706,7 +1706,7 @@ def run_job(cls, job_id, bucket, prefix, job_name, region):
logger.debug("Extracting {}".format(epw_filename))
f_out.write(gzip.decompress(f_gz.getvalue()))

cls.run_simulations(cfg, jobs_d, job_id, sim_dir, S3FileSystem(), bucket, prefix)
cls.run_simulations(cfg, jobs_d, job_id, sim_dir, S3FileSystem(), f"{bucket}/{prefix}")


@log_error_details()
Expand Down
2 changes: 1 addition & 1 deletion buildstockbatch/cloud/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, output_path):
dpouts = []
simulation_output_tar_filename = sim_dir.parent / "simulation_outputs.tar.gz"
asset_dirs = os.listdir(sim_dir)
ts_output_dir = (f"{output_path}/results/simulation_output/timeseries",)
ts_output_dir = f"{output_path}/results/simulation_output/timeseries"

with tarfile.open(str(simulation_output_tar_filename), "w:gz") as simout_tar:
for building_id, upgrade_idx in jobs_d["batch"]:
Expand Down
113 changes: 2 additions & 111 deletions buildstockbatch/gcp/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
"""
import argparse
import collections
import csv
from dask.distributed import Client as DaskClient
from datetime import datetime
from fsspec.implementations.local import LocalFileSystem
from gcsfs import GCSFileSystem
import gzip
from joblib import Parallel, delayed
Expand All @@ -31,7 +29,6 @@
import pathlib
import re
import shutil
import subprocess
import tarfile
import time
import tqdm
Expand Down Expand Up @@ -724,33 +721,7 @@ def run_task(cls, task_index, job_name, gcs_bucket, gcs_prefix):
weather_dir = sim_dir / "weather"
os.makedirs(weather_dir, exist_ok=True)

# Make a lookup of which parameter points to the weather file from options_lookup.tsv
with open(sim_dir / "lib" / "resources" / "options_lookup.tsv", "r", encoding="utf-8") as f:
tsv_reader = csv.reader(f, delimiter="\t")
next(tsv_reader) # skip headers
param_name = None
epws_by_option = {}
for row in tsv_reader:
row_has_epw = [x.endswith(".epw") for x in row[2:]]
if sum(row_has_epw):
if row[0] != param_name and param_name is not None:
raise RuntimeError(
"The epw files are specified in options_lookup.tsv under more than one parameter "
f"type: {param_name}, {row[0]}"
) # noqa: E501
epw_filename = row[row_has_epw.index(True) + 2].split("=")[1]
param_name = row[0]
option_name = row[1]
epws_by_option[option_name] = epw_filename

# Look through the buildstock.csv to find the appropriate location and epw
epws_to_download = set()
building_ids = [x[0] for x in jobs_d["batch"]]
with open(sim_dir / "lib" / "housing_characteristics" / "buildstock.csv", "r", encoding="utf-8") as f:
csv_reader = csv.DictReader(f)
for row in csv_reader:
if int(row["Building"]) in building_ids:
epws_to_download.add(epws_by_option[row[param_name]])
epws_to_download = cls.get_epws_to_download(sim_dir, jobs_d)

# Download and unzip the epws needed for these simulations
for epw_filename in epws_to_download:
Expand All @@ -761,88 +732,8 @@ def run_task(cls, task_index, job_name, gcs_bucket, gcs_prefix):
with open(weather_dir / epw_filename, "wb") as f_out:
logger.debug("Extracting {}".format(epw_filename))
f_out.write(gzip.decompress(f_gz.getvalue()))
asset_dirs = os.listdir(sim_dir)

gcs_fs = GCSFileSystem()
local_fs = LocalFileSystem()
reporting_measures = cls.get_reporting_measures(cfg)
dpouts = []
simulation_output_tar_filename = sim_dir.parent / "simulation_outputs.tar.gz"
with tarfile.open(str(simulation_output_tar_filename), "w:gz") as simout_tar:
for building_id, upgrade_idx in jobs_d["batch"]:
upgrade_id = 0 if upgrade_idx is None else upgrade_idx + 1
sim_id = f"bldg{building_id:07d}up{upgrade_id:02d}"

# Create OSW
osw = cls.create_osw(cfg, jobs_d["n_datapoints"], sim_id, building_id, upgrade_idx)
with open(os.path.join(sim_dir, "in.osw"), "w") as f:
json.dump(osw, f, indent=4)

# Run Simulation
with open(sim_dir / "os_stdout.log", "w") as f_out:
try:
logger.debug("Running {}".format(sim_id))
subprocess.run(
["openstudio", "run", "-w", "in.osw"],
check=True,
stdout=f_out,
stderr=subprocess.STDOUT,
cwd=str(sim_dir),
)
except subprocess.CalledProcessError:
logger.debug(f"Simulation failed: see {sim_id}/os_stdout.log")

# Clean Up simulation directory
cls.cleanup_sim_dir(
sim_dir,
gcs_fs,
f"{gcs_bucket}/{gcs_prefix}/results/simulation_output/timeseries",
upgrade_id,
building_id,
)

# Read data_point_out.json
dpout = postprocessing.read_simulation_outputs(
local_fs, reporting_measures, str(sim_dir), upgrade_id, building_id
)
dpouts.append(dpout)

# Add the rest of the simulation outputs to the tar archive
logger.info("Archiving simulation outputs")
for dirpath, dirnames, filenames in os.walk(sim_dir):
if dirpath == str(sim_dir):
for dirname in set(dirnames).intersection(asset_dirs):
dirnames.remove(dirname)
for filename in filenames:
abspath = os.path.join(dirpath, filename)
relpath = os.path.relpath(abspath, sim_dir)
simout_tar.add(abspath, os.path.join(sim_id, relpath))

# Clear directory for next simulation
logger.debug("Clearing out simulation directory")
for item in set(os.listdir(sim_dir)).difference(asset_dirs):
if os.path.isdir(item):
shutil.rmtree(item)
elif os.path.isfile(item):
os.remove(item)

blob = bucket.blob(f"{gcs_prefix}/results/simulation_output/simulations_job{task_index}.tar.gz")
blob.upload_from_filename(simulation_output_tar_filename)

# Upload aggregated dpouts as a json file
with gcs_fs.open(
f"{gcs_bucket}/{gcs_prefix}/results/simulation_output/results_job{task_index}.json.gz", "wb"
) as f1:
with gzip.open(f1, "wt", encoding="utf-8") as f2:
json.dump(dpouts, f2)

# Remove files (it helps docker if we don't leave a bunch of files laying around)
os.remove(simulation_output_tar_filename)
for item in os.listdir(sim_dir):
if os.path.isdir(item):
shutil.rmtree(item)
elif os.path.isfile(item):
os.remove(item)
cls.run_simulations(cfg, task_index, jobs_d, sim_dir, GCSFileSystem(), f"{gcs_bucket}/{gcs_prefix}")

# todo: Do cleanup (which the aws script does, in the 'nrel/aws_batch' branch)
# todo: aws-shared (see file comment): Such cleanup should be shared with the aws script.
Expand Down

0 comments on commit b1b3d42

Please sign in to comment.