diff --git a/src/croniter/croniter.py b/src/croniter/croniter.py index 50fecf6..3b94337 100644 --- a/src/croniter/croniter.py +++ b/src/croniter/croniter.py @@ -157,10 +157,11 @@ class croniter(object): def __init__(self, expr_format, start_time=None, ret_type=float, day_or=True, max_years_between_matches=None, is_prev=False, - hash_id=None, implement_cron_bug=False): + hash_id=None, implement_cron_bug=False, expand_from_start_time=False): self._ret_type = ret_type self._day_or = day_or self._implement_cron_bug = implement_cron_bug + self._expand_from_start_time = expand_from_start_time if hash_id: if not isinstance(hash_id, (bytes, str)): @@ -184,7 +185,9 @@ def __init__(self, expr_format, start_time=None, ret_type=float, self.cur = None self.set_current(start_time, force=False) - self.expanded, self.nth_weekday_of_month = self.expand(expr_format, hash_id=hash_id) + self.expanded, self.nth_weekday_of_month = self.expand(expr_format, + hash_id=hash_id, + from_timestamp=self.dst_start_time if self._expand_from_start_time else None) self.expressions = EXPRESSIONS[(expr_format, hash_id)] self._is_prev = is_prev @@ -197,6 +200,8 @@ def _alphaconv(cls, index, key, expressions): "[{0}] is not acceptable".format(" ".join(expressions))) def get_next(self, ret_type=None, start_time=None): + if start_time and self._expand_from_start_time: + raise ValueError("start_time is not supported when using expand_from_start_time = True.") self.set_current(start_time, force=True) return self._get_next(ret_type or self._ret_type, is_prev=False) @@ -632,7 +637,7 @@ def is_leap(self, year): return False @classmethod - def _expand(cls, expr_format, hash_id=None): + def _expand(cls, expr_format, hash_id=None, from_timestamp: int = None): # Split the expression in components, and normalize L -> l, MON -> mon, # etc. Keep expr_format untouched so we can use it in the exception # messages. @@ -663,7 +668,7 @@ def _expand(cls, expr_format, hash_id=None): for i, expr in enumerate(expressions): for expanderid, expander in EXPANDERS.items(): - expr = expander(cls).expand(efl, i, expr, hash_id=hash_id) + expr = expander(cls).expand(efl, i, expr, hash_id=hash_id, from_timestamp=from_timestamp) e_list = expr.split(',') res = [] @@ -738,6 +743,10 @@ def _expand(cls, expr_format, hash_id=None): ): raise CroniterBadCronError( "{0} is out of bands".format(expr_format)) + + if from_timestamp: + low = cls._get_low_from_current_date_number(i, step, from_timestamp) + try: rng = range(low, high + 1, step) except ValueError as exc: @@ -815,10 +824,10 @@ def _expand(cls, expr_format, hash_id=None): return expanded, nth_weekday_of_month @classmethod - def expand(cls, expr_format, hash_id=None): + def expand(cls, expr_format, hash_id=None, from_timestamp: int = None): """Shallow non Croniter ValueError inside a nice CroniterBadCronError""" try: - return cls._expand(expr_format, hash_id=hash_id) + return cls._expand(expr_format, hash_id=hash_id, from_timestamp=from_timestamp) except (ValueError,) as exc: error_type, error_instance, traceback = sys.exc_info() if isinstance(exc, CroniterError): @@ -830,6 +839,22 @@ def expand(cls, expr_format, hash_id=None): else: raise CroniterBadCronError("{0}".format(exc)) + @classmethod + def _get_low_from_current_date_number(cls, i: int, step: int, from_timestamp: int) -> int: + dt = datetime.datetime.fromtimestamp(from_timestamp, tz=datetime.timezone.utc) + if i == 0: + return dt.minute % step + if i == 1: + return dt.hour % step + if i == 2: + return ((dt.day - 1) % step) + 1 + if i == 3: + return dt.month % step + if i == 4: + return (dt.weekday() + 1) % step + + raise ValueError("Can't get current date number for index larger than 4") + @classmethod def is_valid(cls, expression, hash_id=None, encoding='UTF-8'): if hash_id: @@ -862,7 +887,7 @@ def match_range(cls, cron_expression, from_datetime, to_datetime, day_or=True): def croniter_range(start, stop, expr_format, ret_type=None, day_or=True, exclude_ends=False, - _croniter=None): + expand_from_start_time=False, _croniter=None): """ Generator that provides all times from start to stop matching the given cron expression. If the cron expression matches either 'start' and/or 'stop', those times will be returned as @@ -897,7 +922,7 @@ def croniter_range(start, stop, expr_format, ret_type=None, day_or=True, exclude stop -= ms1 year_span = math.floor(abs(stop.year - start.year)) + 1 ic = _croniter(expr_format, start, ret_type=datetime.datetime, day_or=day_or, - max_years_between_matches=year_span) + max_years_between_matches=year_span, expand_from_start_time=expand_from_start_time) # define a continue (cont) condition function and step function for the main while loop if start < stop: # Forward def cont(v): diff --git a/src/croniter/tests/test_croniter.py b/src/croniter/tests/test_croniter.py index 2e03c2d..0642089 100755 --- a/src/croniter/tests/test_croniter.py +++ b/src/croniter/tests/test_croniter.py @@ -1655,6 +1655,80 @@ def test_bug_62_leap(self): ret = croniter("15 22 29 2 *", datetime(2024, 2, 29)).get_prev(datetime) self.assertEqual(ret, datetime(2020, 2, 29, 22, 15)) + def test_expand_from_start_time_minute(self): + seven_seconds_interval_pattern = "*/7 * * * *" + ret1 = croniter(seven_seconds_interval_pattern, start_time=datetime(2024, 7, 11, 10, 11), expand_from_start_time=True).get_next(datetime) + self.assertEqual(ret1, datetime(2024, 7, 11, 10, 18)) + + ret2 = croniter(seven_seconds_interval_pattern, start_time=datetime(2024, 7, 11, 10, 12), expand_from_start_time=True).get_next(datetime) + self.assertEqual(ret2, datetime(2024, 7, 11, 10, 19)) + + ret3 = croniter(seven_seconds_interval_pattern, start_time=datetime(2024, 7, 11, 10, 11), expand_from_start_time=True).get_prev(datetime) + self.assertEqual(ret3, datetime(2024, 7, 11, 10, 4)) + + ret4 = croniter(seven_seconds_interval_pattern, start_time=datetime(2024, 7, 11, 10, 12), expand_from_start_time=True).get_prev(datetime) + self.assertEqual(ret4, datetime(2024, 7, 11, 10, 5)) + + def test_expand_from_start_time_hour(self): + seven_hours_interval_pattern = "0 */7 * * *" + ret1 = croniter(seven_hours_interval_pattern, start_time=datetime(2024, 7, 11, 15, 0), expand_from_start_time=True).get_next(datetime) + self.assertEqual(ret1, datetime(2024, 7, 11, 22, 0)) + + ret2 = croniter(seven_hours_interval_pattern, start_time=datetime(2024, 7, 11, 16, 0), expand_from_start_time=True).get_next(datetime) + self.assertEqual(ret2, datetime(2024, 7, 11, 23, 0)) + + ret3 = croniter(seven_hours_interval_pattern, start_time=datetime(2024, 7, 11, 15, 0), expand_from_start_time=True).get_prev(datetime) + self.assertEqual(ret3, datetime(2024, 7, 11, 8, 0)) + + ret4 = croniter(seven_hours_interval_pattern, start_time=datetime(2024, 7, 11, 16, 0), expand_from_start_time=True).get_prev(datetime) + self.assertEqual(ret4, datetime(2024, 7, 11, 9, 0)) + + def test_expand_from_start_time_date(self): + five_days_interval_pattern = "0 0 */5 * *" + ret1 = croniter(five_days_interval_pattern, start_time=datetime(2024, 7, 12), expand_from_start_time=True).get_next(datetime) + self.assertEqual(ret1, datetime(2024, 7, 17)) + + ret2 = croniter(five_days_interval_pattern, start_time=datetime(2024, 7, 13), expand_from_start_time=True).get_next(datetime) + self.assertEqual(ret2, datetime(2024, 7, 18)) + + ret3 = croniter(five_days_interval_pattern, start_time=datetime(2024, 7, 12), expand_from_start_time=True).get_prev(datetime) + self.assertEqual(ret3, datetime(2024, 7, 7)) + + ret4 = croniter(five_days_interval_pattern, start_time=datetime(2024, 7, 13), expand_from_start_time=True).get_prev(datetime) + self.assertEqual(ret4, datetime(2024, 7, 8)) + + def test_expand_from_start_time_month(self): + three_monts_interval_pattern = "0 0 1 */3 *" + ret1 = croniter(three_monts_interval_pattern, start_time=datetime(2024, 7, 1), expand_from_start_time=True).get_next(datetime) + self.assertEqual(ret1, datetime(2024, 10, 1)) + + ret2 = croniter(three_monts_interval_pattern, start_time=datetime(2024, 8, 1), expand_from_start_time=True).get_next(datetime) + self.assertEqual(ret2, datetime(2024, 11, 1)) + + ret3 = croniter(three_monts_interval_pattern, start_time=datetime(2024, 7, 1), expand_from_start_time=True).get_prev(datetime) + self.assertEqual(ret3, datetime(2024, 4, 1)) + + ret4 = croniter(three_monts_interval_pattern, start_time=datetime(2024, 8, 1), expand_from_start_time=True).get_prev(datetime) + self.assertEqual(ret4, datetime(2024, 5, 1)) + + def test_expand_from_start_time_day_of_week(self): + three_monts_interval_pattern = "0 0 * * */2" + ret1 = croniter(three_monts_interval_pattern, start_time=datetime(2024, 7, 10), expand_from_start_time=True).get_next(datetime) + self.assertEqual(ret1, datetime(2024, 7, 12)) + + ret2 = croniter(three_monts_interval_pattern, start_time=datetime(2024, 7, 11), expand_from_start_time=True).get_next(datetime) + self.assertEqual(ret2, datetime(2024, 7, 13)) + + ret3 = croniter(three_monts_interval_pattern, start_time=datetime(2024, 7, 10), expand_from_start_time=True).get_prev(datetime) + self.assertEqual(ret3, datetime(2024, 7, 8)) + + ret4 = croniter(three_monts_interval_pattern, start_time=datetime(2024, 7, 11), expand_from_start_time=True).get_prev(datetime) + self.assertEqual(ret4, datetime(2024, 7, 9)) + + def test_get_next_fails_with_expand_from_start_time_true(self): + expanded_croniter = croniter("0 0 */5 * *", expand_from_start_time=True) + self.assertRaises(ValueError, expanded_croniter.get_next, datetime, start_time=datetime(2024, 7, 12)) + if __name__ == '__main__': unittest.main()