Skip to content

Commit

Permalink
fix: handle sync abort, reduce duplicate STATE messages, rename `_M…
Browse files Browse the repository at this point in the history
…AX_RECORD_LIMIT` as `ABORT_AT_RECORD_COUNT` (#1436)

* set max_records_limit during tap testing

* make default max_records_limit None.

* unblock ci

* apply suite_config to runners

* syntax

* unused import

* use partitioned stream state when checking max records

* exclude testing from coverage

* Update pyproject.toml

* feat: add exception classes

added: RequestedAbortException, MaxRecordsLimitException, AbortedSyncExceptionBase, AbortedSyncFailedException, AbortedSyncPausedException

* chore: add helper fn `is_state_non_resumable()`

* feat: handle abort exceptions

* fix: add missing kw `metaclass=`

* chore: cleanup _abort_sync() method

* fix: new_tap() and new_target() refs

* fix: remove pyarrow workaround for python 3.11, poetry lock

* chore: update jsonl recordings

* chore: remove pyarrow from extras

* revert: pyarrow changes

* chore: no need to skip extra state message

* fix: batch messages should reset state flush status

* chore: remove dupes from expected state counter

* trigger drain on new record rather than state message

* lint

* use max_records_limit in test SuiteConfig

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* tidy up merge

---------

Co-authored-by: Ken Payne <ken@meltano.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 30, 2023
1 parent 131d701 commit 0411c2c
Show file tree
Hide file tree
Showing 28 changed files with 278 additions and 124 deletions.
52 changes: 26 additions & 26 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 54 additions & 2 deletions singer_sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import abc
import typing as t

if t.TYPE_CHECKING:
Expand All @@ -24,8 +25,59 @@ class MapExpressionError(Exception):
"""Failed map expression evaluation."""


class MaxRecordsLimitException(Exception):
"""Exception to raise if the maximum number of allowable records is exceeded."""
class RequestedAbortException(Exception):
"""Base class for abort and interrupt requests.
Whenever this exception is raised, streams will attempt to shut down gracefully and
will emit a final resumable `STATE` message if it is possible to do so.
"""


class MaxRecordsLimitException(RequestedAbortException):
"""Exception indicating the sync aborted due to too many records."""


class AbortedSyncExceptionBase(Exception, metaclass=abc.ABCMeta):
"""Base exception to raise when a stream sync is aborted.
Developers should not raise this directly, and instead should use:
1. `FatalAbortedSyncException` - Indicates the stream aborted abnormally and was not
able to reach a stable and resumable state.
2. `PausedSyncException` - Indicates the stream aborted abnormally and successfully
reached a 'paused' and resumable state.
Notes:
- `FULL_TABLE` sync operations cannot be paused and will always trigger a fatal
exception if aborted.
- `INCREMENTAL` and `LOG_BASED` streams are able to be paused only if a number of
preconditions are met, specifically, `state_partitioning_keys` cannot be
overridden and the stream must be declared with `is_sorted=True`.
"""


class AbortedSyncFailedException(AbortedSyncExceptionBase):
"""Exception to raise when sync is aborted and unable to reach a stable state.
This signifies that `FULL_TABLE` streams (if applicable) were successfully
completed, and any bookmarks from `INCREMENTAL` and `LOG_BASED` streams were
advanced and finalized successfully.
"""


class AbortedSyncPausedException(AbortedSyncExceptionBase):
"""Exception to raise when an aborted sync operation is paused successfully.
This exception indicates the stream aborted abnormally and successfully
reached a 'paused' status, and emitted a resumable state artifact before exiting.
Streams synced with `FULL_TABLE` replication can never have partial success or
'paused' status.
If this exception is raised, this signifies that additional records were left
on the source system and the sync operation aborted before reaching the end of the
stream. This exception signifies that bookmarks from `INCREMENTAL`
and `LOG_BASED` streams were successfully emitted and are resumable.
"""


class RecordsWithoutSchemaException(Exception):
Expand Down
16 changes: 14 additions & 2 deletions singer_sdk/helpers/_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,24 @@ def _greater_than_signpost(
return new_value > signpost


def is_state_non_resumable(stream_or_partition_state: dict) -> bool:
"""Return True when state is non-resumable.
This is determined by checking for a "progress marker" tag in the state artifact.
"""
return PROGRESS_MARKERS in stream_or_partition_state


def finalize_state_progress_markers(stream_or_partition_state: dict) -> dict | None:
"""Promote or wipe progress markers once sync is complete."""
"""Promote or wipe progress markers once sync is complete.
This marks any non-resumable progress markers as finalized. If there are
valid bookmarks present, they will be promoted to be resumable.
"""
signpost_value = stream_or_partition_state.pop(SIGNPOST_MARKER, None)
stream_or_partition_state.pop(STARTING_MARKER, None)
if (
PROGRESS_MARKERS in stream_or_partition_state
is_state_non_resumable(stream_or_partition_state)
and "replication_key" in stream_or_partition_state[PROGRESS_MARKERS]
):
# Replication keys valid (only) after sync is complete
Expand Down
Loading

0 comments on commit 0411c2c

Please sign in to comment.