Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle sync abort, reduce duplicate STATE messages, rename _MAX_RECORD_LIMIT as ABORT_AT_RECORD_COUNT #1436

Merged
merged 41 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
843388f
set max_records_limit during tap testing
Feb 7, 2023
3047aa0
make default max_records_limit None.
Feb 7, 2023
c688d76
unblock ci
Feb 7, 2023
f591152
Merge branch 'main' into kgpayne/apply-max-records-limit-during-testing
Feb 7, 2023
9234a1c
Merge branch 'main' into kgpayne/apply-max-records-limit-during-testing
Feb 7, 2023
e5d4576
apply suite_config to runners
Feb 7, 2023
24a6163
syntax
Feb 7, 2023
01ffdc2
unused import
Feb 7, 2023
ce0adb6
use partitioned stream state when checking max records
Feb 8, 2023
63a6051
exclude testing from coverage
Feb 8, 2023
ece1a55
Merge branch 'main' into kgpayne/apply-max-records-limit-during-testing
Feb 8, 2023
9128238
Merge branch 'main' into kgpayne/apply-max-records-limit-during-testing
Feb 8, 2023
6a08468
Merge branch 'main' into kgpayne/apply-max-records-limit-during-testing
Feb 9, 2023
ac15c7f
Merge branch 'main' into kgpayne/apply-max-records-limit-during-testing
Feb 9, 2023
11ca3fe
Merge branch 'main' into kgpayne/apply-max-records-limit-during-testing
Feb 14, 2023
936ffeb
Update pyproject.toml
Feb 14, 2023
5655ba8
Merge branch 'main' into kgpayne/apply-max-records-limit-during-testing
Feb 15, 2023
c82e9e4
feat: add exception classes
aaronsteers Feb 21, 2023
a9e3cce
chore: add helper fn `is_state_non_resumable()`
aaronsteers Feb 21, 2023
4b3a776
feat: handle abort exceptions
aaronsteers Feb 21, 2023
f4748e3
fix: add missing kw `metaclass=`
aaronsteers Feb 21, 2023
fffe64d
chore: cleanup _abort_sync() method
aaronsteers Feb 21, 2023
173d7dc
fix: new_tap() and new_target() refs
aaronsteers Feb 21, 2023
c3d27a7
fix: remove pyarrow workaround for python 3.11, poetry lock
aaronsteers Feb 21, 2023
4942bb6
chore: update jsonl recordings
aaronsteers Feb 21, 2023
6ba12e9
chore: remove pyarrow from extras
aaronsteers Feb 21, 2023
06b1e75
revert: pyarrow changes
aaronsteers Feb 21, 2023
9785b1c
chore: no need to skip extra state message
aaronsteers Feb 21, 2023
37988d7
fix: batch messages should reset state flush status
aaronsteers Feb 21, 2023
8b6926a
chore: remove dupes from expected state counter
aaronsteers Feb 21, 2023
5ea0a75
trigger drain on new record rather than state message
Mar 30, 2023
6f1b1d8
lint
Mar 30, 2023
cb68120
use max_records_limit in test SuiteConfig
Mar 30, 2023
9219d6a
Merge branch 'main' into kgpayne/apply-max-records-limit-during-testing
Mar 30, 2023
a92f23b
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 30, 2023
2005847
Merge remote-tracking branch 'origin/kgpayne/apply-max-records-limit-…
Mar 30, 2023
f21dc8d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 30, 2023
91be971
tidy up merge
Mar 30, 2023
f989bb4
Merge branch 'feat-handle-sync-abort' of github.com:meltano/sdk into …
Mar 30, 2023
3ae02e8
Merge branch 'main' into feat-handle-sync-abort
Mar 30, 2023
245c4fa
Merge branch 'main' into feat-handle-sync-abort
Mar 30, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 54 additions & 2 deletions singer_sdk/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import abc
import typing as t

if t.TYPE_CHECKING:
Expand All @@ -24,8 +25,59 @@ class MapExpressionError(Exception):
"""Failed map expression evaluation."""


class MaxRecordsLimitException(Exception):
"""Exception to raise if the maximum number of allowable records is exceeded."""
class RequestedAbortException(Exception):
"""Base class for abort and interrupt requests.

Whenever this exception is raised, streams will attempt to shut down gracefully and
will emit a final resumable `STATE` message if it is possible to do so.
"""


class MaxRecordsLimitException(RequestedAbortException):
"""Exception indicating the sync aborted due to too many records."""


class AbortedSyncExceptionBase(Exception, metaclass=abc.ABCMeta):
"""Base exception to raise when a stream sync is aborted.

Developers should not raise this directly, and instead should use:
1. `FatalAbortedSyncException` - Indicates the stream aborted abnormally and was not
able to reach a stable and resumable state.
2. `PausedSyncException` - Indicates the stream aborted abnormally and successfully
reached a 'paused' and resumable state.

Notes:
- `FULL_TABLE` sync operations cannot be paused and will always trigger a fatal
exception if aborted.
- `INCREMENTAL` and `LOG_BASED` streams are able to be paused only if a number of
preconditions are met, specifically, `state_partitioning_keys` cannot be
overridden and the stream must be declared with `is_sorted=True`.
"""


class AbortedSyncFailedException(AbortedSyncExceptionBase):
"""Exception to raise when sync is aborted and unable to reach a stable state.

This signifies that `FULL_TABLE` streams (if applicable) were successfully
completed, and any bookmarks from `INCREMENTAL` and `LOG_BASED` streams were
advanced and finalized successfully.
"""


class AbortedSyncPausedException(AbortedSyncExceptionBase):
"""Exception to raise when an aborted sync operation is paused successfully.

This exception indicates the stream aborted abnormally and successfully
reached a 'paused' status, and emitted a resumable state artifact before exiting.

Streams synced with `FULL_TABLE` replication can never have partial success or
'paused' status.

If this exception is raised, this signifies that additional records were left
on the source system and the sync operation aborted before reaching the end of the
stream. This exception signifies that bookmarks from `INCREMENTAL`
and `LOG_BASED` streams were successfully emitted and are resumable.
"""


class RecordsWithoutSchemaException(Exception):
Expand Down
16 changes: 14 additions & 2 deletions singer_sdk/helpers/_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,24 @@ def _greater_than_signpost(
return new_value > signpost


def is_state_non_resumable(stream_or_partition_state: dict) -> bool:
"""Return True when state is non-resumable.

This is determined by checking for a "progress marker" tag in the state artifact.
"""
return PROGRESS_MARKERS in stream_or_partition_state


def finalize_state_progress_markers(stream_or_partition_state: dict) -> dict | None:
"""Promote or wipe progress markers once sync is complete."""
"""Promote or wipe progress markers once sync is complete.

This marks any non-resumable progress markers as finalized. If there are
valid bookmarks present, they will be promoted to be resumable.
"""
signpost_value = stream_or_partition_state.pop(SIGNPOST_MARKER, None)
stream_or_partition_state.pop(STARTING_MARKER, None)
if (
PROGRESS_MARKERS in stream_or_partition_state
is_state_non_resumable(stream_or_partition_state)
and "replication_key" in stream_or_partition_state[PROGRESS_MARKERS]
):
# Replication keys valid (only) after sync is complete
Expand Down
Loading