-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Set Stream._MAX_RECORDS_LIMIT
during tap testing
#1399
Conversation
Codecov Report
@@ Coverage Diff @@
## main #1399 +/- ##
==========================================
- Coverage 85.73% 85.67% -0.06%
==========================================
Files 57 57
Lines 4689 4691 +2
Branches 801 803 +2
==========================================
- Hits 4020 4019 -1
- Misses 481 483 +2
- Partials 188 189 +1
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Testing in tap-snowflake#15. Reduced total test run time from ~20m to ~2m 👏 |
More testing in MeltanoLabs/tap-stackexchange#196. Reduced test run time from ~8m to >1m ⏰ This is accomplished by adding an early exit to # overridden to test _MAX_RECORDS_LIMIT
def get_records(self, context: dict | None) -> Iterable[dict[str, Any]]:
"""Return a generator of record-type dictionary objects.
Each record emitted should be a dictionary of property names to their values.
Args:
context: Stream partition or context dictionary.
Yields:
One item per (possibly processed) record in the API.
"""
index = 1
for record in self.request_records(context):
transformed_record = self.post_process(record, context)
if transformed_record is None:
# Record filtered out during post_process()
continue
if self._MAX_RECORDS_LIMIT and (index + 1) == self._MAX_RECORDS_LIMIT:
# if we have reached the max records limit, return early
break
yield transformed_record
index += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be missing something but I don't see get_records
using the limit or handling the exception.
That change isn't in this PR 😅 All I am doing here is setting I didn't want to conflate too many similar but separate changes in one PR, and |
max_records_limit
during tap testingStream._MAX_RECORDS_LIMIT
during tap testing
Oh, ok. I've changed the title of the PR to better reflect that. Is there an issue to make |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requesting hold to allow time to review.
@aaronsteers nudge 🙏 Would be good to get this out before it goes stale 🚢 |
I spent more time reviewing this, and I don't think we can merge in this current state. Specifically, this doesn't seem to fit requirements of a stable increment. This change, without additional handling, would force exceptions to be raised during test functions. Perhaps we're presuming a future change we haven't agreed to, but that again is not a stable increment until/unless the exception specifically, and DX issues broadly, are addressed. I'd suggest 2 changes:
As I explain in #1349 and #1366, aborting the stream without reaching the end has to generate an error, and that error needs to be caught. Otherwise, state messages for the stream are not finalized - and cannot be finalized - without creating data corruption and violation of Singer Spec requirements. It's important to note that developers may run these codepaths without a detailed understanding of implications - and we need to be careful that all options presented by the SDK still promise valid taps that comply with the Singer Spec. Exposing a records limit variable will break compliance with the Singer Spec whenever that max limit is reached. There's a path forward here if we want to add this to a "partial success" return code, but the partial success return code needs to distinguish interruptions between these two categories: (1) streams that validly incremented state but had more records remaining, and (2) streams which did not successfully reach a bookmark and therefor represent the start of an infinite loop, where no progress will ever be made towards syncing the stream successfully. |
I've addressed my own feedback above into this (currently draft) PR: #1436. That PR combined with this one make the increment "stable", specifically in that we deal with the exceptions that are raised as a result of a max record count limit - including proper handling of streams according to the Related, this proposes non-zero exit code options for taps "aborted" which were able to be successfully paused and left in a valid state despite leaving records in the source: We don't need to tackle non-zero return codes as of now; this PR and #1436 both ensure that:
|
Closing in favour of #1436 |
This currently only works with
SQLStream
instances, which apply themax_records_limit
inget_records()
by default. For other types ofStream
, setting amax_records_limit
inSuiteConfig
will cause the tests to fail. See #1349 for more details.📚 Documentation preview 📚: https://meltano-sdk--1399.org.readthedocs.build/en/1399/