Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix time overlap handling in concatenation #2247

Merged
merged 8 commits into from
Oct 30, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 92 additions & 67 deletions esmvalcore/preprocessor/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import os
from itertools import groupby
from pathlib import Path
from typing import Optional
from typing import Optional, NamedTuple
from warnings import catch_warnings, filterwarnings

import cftime
import iris
import iris.aux_factory
import iris.exceptions
import isodate
import numpy as np
import yaml
from cf_units import suppress_errors
Expand All @@ -22,7 +22,6 @@
from esmvalcore.iris_helpers import merge_cube_attributes

from .._task import write_ncl_settings
from ._time import clip_timerange

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -206,70 +205,96 @@ def _concatenate_cubes(cubes, check_level):
return concatenated


def _check_time_overlaps(cubes):
"""Handle time overlaps."""
times = [cube.coord('time').core_points() for cube in cubes]
for index, _ in enumerate(times[:-1]):
overlap = np.intersect1d(times[index], times[index + 1])
if overlap.size != 0:
overlapping_cubes = cubes[index:index + 2]
time_1 = overlapping_cubes[0].coord('time').core_points()
time_2 = overlapping_cubes[1].coord('time').core_points()

# case 1: both cubes start at the same time -> return longer cube
if time_1[0] == time_2[0]:
if time_1[-1] <= time_2[-1]:
cubes.pop(index)
discarded_cube_index = 0
used_cube_index = 1
else:
cubes.pop(index + 1)
discarded_cube_index = 1
used_cube_index = 0
logger.debug(
"Both cubes start at the same time but cube %s "
"ends before %s",
overlapping_cubes[discarded_cube_index],
overlapping_cubes[used_cube_index],
)
logger.debug(
"Cube %s contains all needed data so using it fully",
overlapping_cubes[used_cube_index],
)

# case 2: cube1 starts before cube2
# case 2.1: cube1 ends after cube2 -> return cube1
elif time_1[-1] > time_2[-1]:
cubes.pop(index + 1)
logger.debug("Using only data from %s", overlapping_cubes[0])

# case 2.2: cube1 ends before cube2 -> use full cube2
# and shorten cube1
else:
new_time = np.delete(
time_1,
np.argwhere(np.in1d(time_1, overlap)),
)
new_dates = overlapping_cubes[0].coord('time').units.num2date(
new_time)
logger.debug(
"Extracting time slice between %s and %s from cube %s "
"to use it for concatenation with cube %s",
new_dates[0],
new_dates[-1],
overlapping_cubes[0],
overlapping_cubes[1],
)

start_point = isodate.datetime_isoformat(
new_dates[0], format=isodate.isostrf.DT_BAS_COMPLETE)
end_point = isodate.datetime_isoformat(
new_dates[-1], format=isodate.isostrf.DT_BAS_COMPLETE)
new_cube = clip_timerange(overlapping_cubes[0],
f'{start_point}/{end_point}')

cubes[index] = new_cube
return cubes
class _TimesHelper:

def __init__(self, time):
self.times = time.core_points()
self.units = str(time.units)

def __getattr__(self, name):
return getattr(self.times, name)

def __len__(self):
return len(self.times)

def __getitem__(self, key):
return self.times[key]


def _check_time_overlaps(cubes: iris.cube.CubeList) -> iris.cube.CubeList:
"""Handle time overlaps.

Parameters
----------
cubes : iris.cube.CubeList
A list of cubes belonging to a single timeseries,
ordered by starting point with possible overlaps.

Returns
-------
iris.cube.CubeList
A list of cubes belonging to a single timeseries,
ordered by starting point with no overlaps.
"""
if len(cubes) < 2:
return cubes

class _TrackedCube(NamedTuple):
cube: iris.cube.Cube
times: iris.coords.DimCoord
start: float
end: float

@classmethod
def from_cube(cls, cube):
"""Construct tracked cube."""
times = cube.coord("time")
start, end = times.core_points()[[0, -1]]
return cls(cube, times, start, end)

new_cubes = iris.cube.CubeList()
current_cube = _TrackedCube.from_cube(cubes[0])
for new_cube in map(_TrackedCube.from_cube, cubes[1:]):
if new_cube.start > current_cube.end:
# no overlap, use current cube and start again from new cube
logger.debug("Using %s", current_cube.cube)
new_cubes.append(current_cube.cube)
current_cube = new_cube
continue
# overlap
if current_cube.end > new_cube.end:
# current cube ends after new one, just forget new cube
logger.debug(
"Discarding %s because the time range "
"is already covered by %s", new_cube.cube, current_cube.cube)
continue
if new_cube.start == current_cube.start:
# new cube completely covers current one
# forget current cube
current_cube = new_cube
logger.debug(
"Discarding %s because the time range is covered by %s",
current_cube.cube, new_cube.cube)
continue
# new cube ends after current one,
# use all of new cube, and shorten current cube to
# eliminate overlap with new cube
cut_index = cftime.time2index(
new_cube.start,
_TimesHelper(current_cube.times),
current_cube.times.units.calendar,
select="before",
) + 1
logger.debug("Using %s shortened to %s due to overlap",
current_cube.cube,
current_cube.times.cell(cut_index).point)
new_cubes.append(current_cube.cube[:cut_index])
current_cube = new_cube

logger.debug("Using %s", current_cube.cube)
new_cubes.append(current_cube.cube)

return new_cubes


def _fix_calendars(cubes):
Expand Down