Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interim save #24

Merged
merged 5 commits into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies:
- xarray
- pip
- pip:
- tqdm
- typhon
- sphinx-autodoc2
- sphinx-book-theme
Expand Down
6 changes: 5 additions & 1 deletion pydropsonde/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .processor import Sonde, Gridded
import configparser
import inspect
from tqdm import tqdm
import os
import xarray as xr

Expand Down Expand Up @@ -275,7 +276,7 @@ def iterate_Sonde_method_over_dict_of_Sondes_objects(

for function_name in functions:
new_dict = {}
for key, value in my_dict.items():
for key, value in tqdm(my_dict.items()):
function = getattr(Sonde, function_name)
result = function(value, **get_args_for_function(config, function))
if result is not None:
Expand Down Expand Up @@ -442,6 +443,7 @@ def run_pipeline(pipeline: dict, config: configparser.ConfigParser):
"intake": "sondes",
"apply": iterate_Sonde_method_over_dict_of_Sondes_objects,
"functions": [
"check_interim_l3",
"get_l2_filename",
"add_l2_ds",
"create_prep_l3",
Expand All @@ -450,6 +452,8 @@ def run_pipeline(pipeline: dict, config: configparser.ConfigParser):
"remove_non_mono_incr_alt",
"interpolate_alt",
"add_attributes_as_var",
"make_prep_interim",
"save_interim_l3",
],
"output": "sondes",
"comment": "This step reads from the saved L2 files and prepares individual sonde datasets before they can be concatenated to create L3.",
Expand Down
44 changes: 43 additions & 1 deletion pydropsonde/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from pydropsonde.helper import rawreader as rr
import pydropsonde.helper as hh
from ._version import __version__

_no_default = object()

Expand All @@ -33,6 +34,7 @@ class Sonde:

sort_index: np.datetime64 = field(init=False, repr=False)
serial_id: str
cont: bool = True
_: KW_ONLY
launch_time: Optional[Any] = None

Expand Down Expand Up @@ -990,6 +992,29 @@ def create_prep_l3(self):
object.__setattr__(self, "_prep_l3_ds", _prep_l3_ds)
return self

def check_interim_l3(
self, interim_l3_path: str = None, interim_l3_filename: str = None
):
if interim_l3_path is None:
interim_l3_path = self.l2_dir.replace("Level_2", "Level_3_interim").replace(
self.flight_id, ""
)
if interim_l3_filename is None:
interim_l3_filename = "interim_l3_{sonde_id}_{version}.nc".format(
sonde_id=self.serial_id, version=__version__
)
else:
interim_l3_filename = interim_l3_filename.format(
sonde_id=self.serial_id, version=__version__
)
if os.path.exists(os.path.join(interim_l3_path, interim_l3_filename)):
ds = xr.open_dataset(interim_l3_path + interim_l3_filename)
object.__setattr__(self, "_interim_l3_ds", ds)
object.__setattr__(self, "cont", False)
return self
else:
return self

def add_q_and_theta_to_l2_ds(self):
"""
Adds potential temperature and specific humidity to the L2 dataset.
Expand Down Expand Up @@ -1118,6 +1143,23 @@ def add_attributes_as_var(self):
object.__setattr__(self, "_prep_l3_ds", _prep_l3_ds)
return self

def make_prep_interim(self):
object.__setattr__(self, "_interim_l3_ds", self._prep_l3_ds)
return self

def save_interim_l3(self, interim_l3_path: str = None, interim_l3_name: str = None):
if interim_l3_path is None:
interim_l3_path = self.l2_dir.replace("Level_2", "Level_3_interim").replace(
self.flight_id, ""
)
if interim_l3_name is None:
interim_l3_name = "interim_l3_{sonde_id}_{version}.nc".format(
sonde_id=self.serial_id, version=__version__
)
os.makedirs(interim_l3_path, exist_ok=True)
self._interim_l3_ds.to_netcdf(os.path.join(interim_l3_path, interim_l3_name))
return self


@dataclass(order=True)
class Gridded:
Expand All @@ -1127,7 +1169,7 @@ def concat_sondes(self):
"""
function to concatenate all sondes using the combination of all measurement times and launch times
"""
list_of_l2_ds = [sonde._prep_l3_ds for sonde in self.sondes.values()]
list_of_l2_ds = [sonde._interim_l3_ds for sonde in self.sondes.values()]
combined = xr.combine_by_coords(list_of_l2_ds)
combined["iwv"] = combined.iwv.mean("alt")
self._interim_l3_ds = combined
Expand Down
Loading