Skip to content

Commit

Permalink
Refactor MVIRI dataset access
Browse files Browse the repository at this point in the history
  • Loading branch information
sfinkens committed Oct 18, 2024
1 parent d6d4406 commit 61533c5
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 137 deletions.
215 changes: 119 additions & 96 deletions satpy/readers/mviri_l1b_fiduceo_nc.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,113 +454,151 @@ def is_high_resol(resolution):
return resolution == HIGH_RESOL


class DatasetWrapper:
"""Helper class for accessing the dataset."""

def __init__(self, nc):
"""Wrap the given dataset."""
self.nc = nc

self._decode_cf()
self._fix_duplicate_dimensions(self.nc)


@property
def attrs(self):
"""Exposes dataset attributes."""
return self.nc.attrs

def __getitem__(self, item):
"""Get a variable from the dataset."""
ds = self.nc[item]
if self._should_dims_be_renamed(ds):
ds = self._rename_dims(ds)
elif self._coordinates_not_assigned(ds):
ds = self._reassign_coords(ds)
class DatasetPreprocessor:
"""Helper class for preprocessing the dataset."""

def preprocess(self, ds):
"""Preprocess the given dataset."""
ds = self._rename_vars(ds)
ds = self._decode_cf(ds)
ds = self._fix_duplicate_dimensions(ds)
self._reassign_coords(ds)
self._cleanup_attrs(ds)
return ds

def _should_dims_be_renamed(self, ds):
"""Determine whether dataset dimensions need to be renamed."""
return "y_ir_wv" in ds.dims or "y_tie" in ds.dims

def _rename_dims(self, ds):
"""Rename dataset dimensions to match satpy's expectations."""
def _rename_vars(self, ds):
"""Rename variables to match satpy's expectations."""
new_names = {
"y_ir_wv": "y",
"x_ir_wv": "x",
"y_tie": "y",
"x_tie": "x"
"time_ir_wv": "time",
}
new_names_avail = {
old: new
for old, new in new_names.items()
if old in ds
}
for old_name, new_name in new_names.items():
if old_name in ds.dims:
ds = ds.rename({old_name: new_name})
return ds.rename(new_names_avail)

def _decode_cf(self, ds):
"""Decode data according to CF conventions."""
# CF decoding fails because time coordinate contains fill values.
# Decode time separately, then decode rest using decode_cf().
time = self._decode_time(ds)
ds = ds.drop_vars(time.name)
ds = xr.decode_cf(ds)
ds[time.name] = (time.dims, time.values)
return ds

def _coordinates_not_assigned(self, ds):
return "y" in ds.dims and "y" not in ds.coords
def _decode_time(self, ds):
"""Decode time using fill value and offset.
Replace fill values with NaT.
"""
time = ds["time"]
time_dec = (time + time.attrs["add_offset"]).astype("datetime64[s]").astype("datetime64[ns]")
is_fill_value = time == time.attrs["_FillValue"]
return xr.where(is_fill_value, np.datetime64("NaT"), time_dec)

def _fix_duplicate_dimensions(self, ds):
"""Rename dimensions as duplicate dimensions names are not supported by xarray."""
ds = ds.copy()
ds.variables["covariance_spectral_response_function_vis"].dims = ("srf_size_1", "srf_size_2")
ds.variables["channel_correlation_matrix_independent"].dims = ("channel_1", "channel_2")
ds.variables["channel_correlation_matrix_structured"].dims = ("channel_1", "channel_2")
return ds.drop_dims(["channel", "srf_size"])

def _reassign_coords(self, ds):
"""Re-assign coordinates.
For some reason xarray doesn't assign coordinates to all high
resolution data variables.
resolution data variables. In that case ds["varname"] doesn't
have coords, but they're still in ds.coords.
"""
return ds.assign_coords({"y": self.nc.coords["y"],
"x": self.nc.coords["x"]})
for var_name, data_array in ds.data_vars.items():
if self._coordinates_not_assigned(data_array):
ds[var_name] = data_array.assign_coords(
{
"y": ds.coords["y"],
"x": ds.coords["x"]
}
)

def _coordinates_not_assigned(self, data_array):
return "y" in data_array.dims and "y" not in data_array.coords

def _cleanup_attrs(self, ds):
"""Cleanup dataset attributes."""
# Remove ancillary_variables attribute to avoid downstream
# satpy warnings.
ds.attrs.pop("ancillary_variables", None)

def _decode_cf(self):
"""Decode data."""
# time decoding with decode_cf results in error - decode separately!
time_dims, time = self._decode_time()
self.nc = self.nc.drop_vars(time.name)
self.nc = xr.decode_cf(self.nc)
self.nc[time.name] = (time_dims, time.values)

def _decode_time(self):
"""Decode time using fill value and offset."""
time = self.get_time()
time_dims = self.nc[time.name].dims
time = xr.where(time == time.attrs["_FillValue"], np.datetime64("NaT"),
(time + time.attrs["add_offset"]).astype("datetime64[s]").astype("datetime64[ns]"))

return (time_dims, time)

def _fix_duplicate_dimensions(self, nc):
"""Rename dimensions as duplicate dimensions names are not supported by xarray."""
nc.variables["covariance_spectral_response_function_vis"].dims = ("srf_size_1", "srf_size_2")
self.nc = nc.drop_dims("srf_size")
nc.variables["channel_correlation_matrix_independent"].dims = ("channel_1", "channel_2")
nc.variables["channel_correlation_matrix_structured"].dims = ("channel_1", "channel_2")
self.nc = nc.drop_dims("channel")
for data_array in ds.data_vars.values():
data_array.attrs.pop("ancillary_variables", None)

def get_time(self):
"""Get time coordinate.

Variable is sometimes named "time" and sometimes "time_ir_wv".
"""
try:
return self["time_ir_wv"]
except KeyError:
return self["time"]
class DatasetAccessor:
"""Helper class for accessing the dataset."""

def __init__(self, ds):
"""Wrap the given dataset."""
self.ds = ds

@property
def attrs(self):
"""Exposes dataset attributes."""
return self.ds.attrs

def __getitem__(self, item):
"""Get a variable from the dataset."""
data_array = self.ds[item]
if self._should_dims_be_renamed(data_array):
return self._rename_dims(data_array)
return data_array

def _should_dims_be_renamed(self, data_array):
"""Determine whether dataset dimensions need to be renamed."""
return "y_ir_wv" in data_array.dims or "y_tie" in data_array.dims

def _rename_dims(self, data_array):
"""Rename dataset dimensions to match satpy's expectations."""
new_names = {
"y_ir_wv": "y",
"x_ir_wv": "x",
"y_tie": "y",
"x_tie": "x"
}
new_names_avail = {
old: new
for old, new in new_names.items()
if old in data_array.dims
}
return data_array.rename(new_names_avail)

def get_xy_coords(self, resolution):
"""Get x and y coordinates for the given resolution."""
if is_high_resol(resolution):
return self.nc.coords["x"], self.nc.coords["y"]
return self.nc.coords["x_ir_wv"], self.nc.coords["x_ir_wv"]
return self.ds.coords["x"], self.ds.coords["y"]
return self.ds.coords["x_ir_wv"], self.ds.coords["x_ir_wv"]

def get_image_size(self, resolution):
"""Get image size for the given resolution."""
if is_high_resol(resolution):
return self.nc.coords["y"].size
return self.nc.coords["y_ir_wv"].size
return self.ds.coords["y"].size
return self.ds.coords["y_ir_wv"].size


def open_dataset(filename):
"""Load dataset from the given file."""
nc_raw = xr.open_dataset(
filename,
chunks={"x": CHUNK_SIZE,
"y": CHUNK_SIZE,
"x_ir_wv": CHUNK_SIZE,
"y_ir_wv": CHUNK_SIZE},
# see dataset preprocessor for why decoding is disabled
decode_cf=False,
decode_times=False,
mask_and_scale=False,
)
nc_preproc = DatasetPreprocessor().preprocess(nc_raw)
return DatasetAccessor(nc_preproc)


class FiduceoMviriBase(BaseFileHandler):
Expand All @@ -584,24 +622,9 @@ def __init__(self, filename, filename_info, filetype_info, # noqa: D417
super(FiduceoMviriBase, self).__init__(
filename, filename_info, filetype_info)
self.mask_bad_quality = mask_bad_quality
nc_raw = xr.open_dataset(
filename,
chunks={"x": CHUNK_SIZE,
"y": CHUNK_SIZE,
"x_ir_wv": CHUNK_SIZE,
"y_ir_wv": CHUNK_SIZE},
# see dataset wrapper for why decoding is disabled
decode_cf=False,
decode_times=False,
mask_and_scale=False,
)

self.nc = DatasetWrapper(nc_raw)

self.nc = open_dataset(filename)
self.projection_longitude = self._get_projection_longitude(filename_info)

self.calib_coefs = self._get_calib_coefs()

self._get_angles = functools.lru_cache(maxsize=8)(
self._get_angles_uncached
)
Expand Down Expand Up @@ -745,7 +768,7 @@ def _get_acq_time_uncached(self, resolution):
Note that the acquisition time does not increase monotonically
with the scanline number due to the scan pattern and rectification.
"""
time2d = self.nc.get_time()
time2d = self.nc["time"]
_, target_y = self.nc.get_xy_coords(resolution)
return Interpolator.interp_acq_time(time2d, target_y=target_y.values)

Expand Down
Loading

0 comments on commit 61533c5

Please sign in to comment.