From 77993a058346f3b399105ad000c4f7525e73774a Mon Sep 17 00:00:00 2001 From: Alexander Butler <41213451+z3z1ma@users.noreply.github.com> Date: Fri, 24 Jun 2022 08:27:13 -0700 Subject: [PATCH] feat: Add end of pipe clean up hook to Sinks (#750) * add clean up hook to sinks called at end of pipe drain * make message more clear for devs and add decorator * unwrap the comprehension so mypy doesnt get mad * remove abstract method * fix flake8 errors Co-authored-by: Edgar R. M --- singer_sdk/sinks/core.py | 9 +++++++++ singer_sdk/target_base.py | 16 ++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 693596799d..1b5a50f0f2 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -401,3 +401,12 @@ def activate_version(self, new_version: int) -> None: "ACTIVATE_VERSION message received but not implemented by this target. " "Ignoring." ) + + def clean_up(self) -> None: + """Perform any clean up actions required at end of a stream. + + Implementations should ensure that clean up does not affect resources + that may be in use from other instances of the same sink. Stream name alone + should not be relied on, it's recommended to use a uuid as well. + """ + pass diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index fa400fae7c..dad17f09cd 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -273,7 +273,7 @@ def _process_lines(self, file_input: IO[str]) -> Counter[str]: def _process_endofpipe(self) -> None: """Called after all input lines have been read.""" - self.drain_all() + self.drain_all(is_endofpipe=True) def _process_record_message(self, message_dict: dict) -> None: """Process a RECORD message. @@ -403,15 +403,27 @@ def _process_activate_version_message(self, message_dict: dict) -> None: # Sink drain methods @final - def drain_all(self) -> None: + def drain_all(self, is_endofpipe: bool = False) -> None: """Drains all sinks, starting with those cleared due to changed schema. This method is internal to the SDK and should not need to be overridden. + + Args: + is_endofpipe: This is passed by the + :meth:`~singer_sdk.Sink._process_endofpipe()` which + is called after the target instance has finished + listening to the stdin """ state = copy.deepcopy(self._latest_state) self._drain_all(self._sinks_to_clear, 1) + if is_endofpipe: + for sink in self._sinks_to_clear: + sink.clean_up() self._sinks_to_clear = [] self._drain_all(list(self._sinks_active.values()), self.max_parallelism) + if is_endofpipe: + for sink in self._sinks_active.values(): + sink.clean_up() self._write_state_message(state) self._reset_max_record_age()