From 8f18e47c3218d755e66ed31b387634b8703c6aea Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 22 Jul 2022 08:11:19 -0700 Subject: [PATCH 01/20] Simplify Python impl of KafkaSourceStage --- morpheus/stages/input/kafka_source_stage.py | 270 +++----------------- 1 file changed, 32 insertions(+), 238 deletions(-) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 54e58869f2..de0e00cad8 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -15,10 +15,11 @@ import logging import time import weakref +from email import message +import confluent_kafka as ck import pandas as pd import srf -from cudf_kafka._lib.kafka import KafkaDatasource import cudf @@ -62,22 +63,22 @@ def __init__(self, c: Config, bootstrap_servers: str, input_topic: str = "test_pcap", - group_id: str = "custreamz", + group_id: str = "morpheus", poll_interval: str = "10millis", disable_commit: bool = False, disable_pre_filtering: bool = False, auto_offset_reset: str = "latest"): super().__init__(c) - self._consumer_conf = { + self._consumer_params = { 'bootstrap.servers': bootstrap_servers, 'group.id': group_id, 'session.timeout.ms': "60000", - "auto.offset.reset": auto_offset_reset + "auto.offset.reset": auto_offset_reset, + 'enable.auto.commit': False } - self._input_topic = input_topic - self._poll_interval = poll_interval + self._topic = input_topic self._max_batch_size = c.pipeline_batch_size self._max_concurrent = c.num_threads self._disable_commit = disable_commit @@ -87,30 +88,7 @@ def __init__(self, # Flag to indicate whether or not we should stop self._stop_requested = False - # What gets passed to streamz kafka - topic = self._input_topic - consumer_params = self._consumer_conf - poll_interval = self._poll_interval - npartitions = None - refresh_partitions = False - max_batch_size = self._max_batch_size - keys = False - engine = None - - self._consumer_params = consumer_params - # Override the auto-commit config to enforce custom streamz checkpointing - self._consumer_params['enable.auto.commit'] = 'false' - if 'auto.offset.reset' not in self._consumer_params.keys(): - self._consumer_params['auto.offset.reset'] = 'earliest' - self._topic = topic - self._npartitions = npartitions - self._refresh_partitions = refresh_partitions - if self._npartitions is not None and self._npartitions <= 0: - raise ValueError("Number of Kafka topic partitions must be > 0.") self._poll_interval = pd.Timedelta(poll_interval).total_seconds() - self._max_batch_size = max_batch_size - self._keys = keys - self._engine = engine self._started = False @property @@ -131,224 +109,40 @@ def _source_generator(self): # Each invocation of this function makes a new thread so recreate the producers # Set some initial values - npartitions = self._npartitions consumer = None - consumer_params = self._consumer_params - try: - - # Now begin the script - import confluent_kafka as ck - - if self._engine == "cudf": # pragma: no cover - from custreamz import kafka - - if self._engine == "cudf": # pragma: no cover - consumer = kafka.Consumer(consumer_params) - else: - consumer = ck.Consumer(consumer_params) - - # weakref.finalize(self, lambda c=consumer: _close_consumer(c)) - tp = ck.TopicPartition(self._topic, 0, 0) - - attempts = 0 - max_attempts = 5 - - # Attempt to connect to the cluster. Try 5 times before giving up - while attempts < max_attempts: - try: - # blocks for consumer thread to come up - consumer.get_watermark_offsets(tp) - - logger.debug("Connected to Kafka source at '%s' on attempt #%d/%d", - self._consumer_conf["bootstrap.servers"], - attempts + 1, - max_attempts) - - break - except (RuntimeError, ck.KafkaException): - attempts += 1 - - # Raise the error if we hit the max - if (attempts >= max_attempts): - logger.exception(("Error while getting Kafka watermark offsets. Max attempts (%d) reached. " - "Check the bootstrap_servers parameter ('%s')"), - max_attempts, - self._consumer_conf["bootstrap.servers"]) - raise - else: - logger.warning("Error while getting Kafka watermark offsets. Attempt #%d/%d", - attempts, - max_attempts, - exc_info=True) - - # Exponential backoff - time.sleep(2.0**attempts) - - try: - if npartitions is None: - - kafka_cluster_metadata = consumer.list_topics(self._topic) - - if self._engine == "cudf": # pragma: no cover - npartitions = len(kafka_cluster_metadata[self._topic.encode('utf-8')]) - else: - npartitions = len(kafka_cluster_metadata.topics[self._topic].partitions) - - positions = [0] * npartitions - - tps = [] - for partition in range(npartitions): - tps.append(ck.TopicPartition(self._topic, partition)) - - while not self._stop_requested: - try: - committed = consumer.committed(tps, timeout=1) - except ck.KafkaException: - pass - else: - for tp in committed: - positions[tp.partition] = tp.offset - break - - while not self._stop_requested: - out = [] - - if self._refresh_partitions: - kafka_cluster_metadata = consumer.list_topics(self._topic) - - if self._engine == "cudf": # pragma: no cover - new_partitions = len(kafka_cluster_metadata[self._topic.encode('utf-8')]) - else: - new_partitions = len(kafka_cluster_metadata.topics[self._topic].partitions) - - if new_partitions > npartitions: - positions.extend([-1001] * (new_partitions - npartitions)) - npartitions = new_partitions - - for partition in range(npartitions): - - tp = ck.TopicPartition(self._topic, partition, 0) - - try: - low, high = consumer.get_watermark_offsets(tp, timeout=0.1) - except (RuntimeError, ck.KafkaException): - continue - - if 'auto.offset.reset' in consumer_params.keys(): - if consumer_params['auto.offset.reset'] == 'latest' and positions[partition] == -1001: - positions[partition] = high - - current_position = positions[partition] - - lowest = max(current_position, low) - - if high > lowest + self._max_batch_size: - high = lowest + self._max_batch_size - if high > lowest: - out.append((consumer_params, self._topic, partition, self._keys, lowest, high - 1)) - positions[partition] = high - - consumer_params['auto.offset.reset'] = 'earliest' - - if (out): - for part in out: - - meta = self._kafka_params_to_messagemeta(part) - - # Once the meta goes out of scope, commit it - def commit(topic, part_no, keys, lowest, offset): - # topic, part_no, _, _, offset = part[1:] - try: - _tp = ck.TopicPartition(topic, part_no, offset + 1) - consumer.commit(offsets=[_tp], asynchronous=True) - except Exception: - logger.exception(("Error occurred in `from-kafka` stage with " - "broker '%s' while committing message: %d"), - self._consumer_conf["bootstrap.servers"], - offset) - - if (not self._disable_commit): - weakref.finalize(meta, commit, *part[1:]) - - # Push the message meta - yield meta - else: - time.sleep(self._poll_interval) - except Exception: - logger.exception(("Error occurred in `from-kafka` stage with broker '%s' while processing messages"), - self._consumer_conf["bootstrap.servers"]) - raise + consumer = ck.Consumer(self._consumer_params) + consumer.subscribe([self._topic]) + + while not self._stop_requested: + msg = consumer.poll(timeout=1.0) + if msg is None: + time.sleep(self._poll_interval) + continue + + msg_error = msg.error() + if msg_error is None: + payload = msg.value() + if payload is not None: + df = cudf.io.read_json(payload, lines=True) + if (not self._disable_commit): + consumer.commit(message=msg) + + yield MessageMeta(df) + + elif msg_error == ck.KafkaError._PARTITION_EOF: + time.sleep(self._poll_interval) + else: + raise ck.KafkaException(msg_error) finally: # Close the consumer and call on_completed if (consumer): consumer.close() - def _kafka_params_to_messagemeta(self, x: tuple): - - # Unpack - kafka_params, topic, partition, keys, low, high = x - - gdf = self._read_gdf(kafka_params, topic=topic, partition=partition, lines=True, start=low, end=high + 1) - - return MessageMeta(gdf) - - @staticmethod - def _read_gdf(kafka_configs, - topic=None, - partition=0, - lines=True, - start=0, - end=0, - batch_timeout=10000, - delimiter="\n", - message_format="json"): - """ - Replicates `custreamz.Consumer.read_gdf` function which does not work for some reason. - """ - - if topic is None: - raise ValueError("ERROR: You must specifiy the topic " - "that you want to consume from") - - kafka_datasource = None - - try: - kafka_datasource = KafkaDatasource( - kafka_configs, - topic.encode(), - partition, - start, - end, - batch_timeout, - delimiter.encode(), - ) - - cudf_readers = { - "json": cudf.io.read_json, - "csv": cudf.io.read_csv, - "orc": cudf.io.read_orc, - "avro": cudf.io.read_avro, - "parquet": cudf.io.read_parquet, - } - - result = cudf_readers[message_format](kafka_datasource, engine="cudf", lines=lines) - - return cudf.DataFrame._from_data(data=result._data, index=result._index) - except Exception: - logger.exception("Error occurred converting KafkaDatasource to Dataframe.") - finally: - if (kafka_datasource is not None): - # Close up the cudf datasource instance - # TODO: Ideally the C++ destructor should handle the - # unsubscribe and closing the socket connection. - kafka_datasource.unsubscribe() - kafka_datasource.close(batch_timeout) - def _build_source(self, builder: srf.Builder) -> StreamPair: - if (self._build_cpp_node()): + if (self._build_cpp_node() and False): source = _stages.KafkaSourceStage(builder, self.unique_name, self._max_batch_size, @@ -362,6 +156,6 @@ def _build_source(self, builder: srf.Builder) -> StreamPair: # multiple threads source.launch_options.pe_count = self._max_concurrent else: - source = builder.make_source(self.unique_name, self._source_generator) + source = builder.make_source(self.unique_name, self._source_generator()) return source, MessageMeta From a230e781e539c69b08f2b29f5c4bd2455ccd3d20 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 22 Jul 2022 08:23:27 -0700 Subject: [PATCH 02/20] Handle mal-formed payloads --- morpheus/stages/input/kafka_source_stage.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index de0e00cad8..eb7879a0f3 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -124,11 +124,17 @@ def _source_generator(self): if msg_error is None: payload = msg.value() if payload is not None: - df = cudf.io.read_json(payload, lines=True) - if (not self._disable_commit): - consumer.commit(message=msg) - - yield MessageMeta(df) + df = None + try: + df = cudf.io.read_json(payload, lines=True) + except Exception as e: + logger.error("Error parsing payload into a dataframe : {}".format(e)) + finally: + if (not self._disable_commit): + consumer.commit(message=msg) + + if df is not None: + yield MessageMeta(df) elif msg_error == ck.KafkaError._PARTITION_EOF: time.sleep(self._poll_interval) From faf03751abbe51425947b0efca785c3bf3c2d3d2 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 22 Jul 2022 08:30:59 -0700 Subject: [PATCH 03/20] Re-enable cpp impl --- morpheus/stages/input/kafka_source_stage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index eb7879a0f3..28a142fbac 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -75,7 +75,7 @@ def __init__(self, 'group.id': group_id, 'session.timeout.ms': "60000", "auto.offset.reset": auto_offset_reset, - 'enable.auto.commit': False + 'enable.auto.commit': 'false' } self._topic = input_topic @@ -148,7 +148,7 @@ def _source_generator(self): def _build_source(self, builder: srf.Builder) -> StreamPair: - if (self._build_cpp_node() and False): + if (self._build_cpp_node()): source = _stages.KafkaSourceStage(builder, self.unique_name, self._max_batch_size, From 3e5e87f01c58029ee3452edc3439ccec9b6c4b06 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 22 Jul 2022 08:34:39 -0700 Subject: [PATCH 04/20] Remove unused imports --- morpheus/stages/input/kafka_source_stage.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 28a142fbac..73517c4b24 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -14,8 +14,6 @@ import logging import time -import weakref -from email import message import confluent_kafka as ck import pandas as pd From b1939b6d527afc8e670d5e1892aaf8a985b45a84 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 6 Oct 2022 10:11:49 -0700 Subject: [PATCH 05/20] Fix merge error --- morpheus/stages/input/kafka_source_stage.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 79fca2dec7..425ba76ca1 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -56,6 +56,8 @@ class KafkaSourceStage(SingleOutputSource): Input kafka topic. group_id : str Specifies the name of the consumer group a Kafka consumer belongs to. + client_id : str, default = None + An optional identifier of the consumer. poll_interval : str Seconds that elapse between polling Kafka for new messages. Follows the pandas interval format. disable_commit : bool, default = False @@ -76,6 +78,7 @@ def __init__(self, bootstrap_servers: str, input_topic: str = "test_pcap", group_id: str = "morpheus", + client_id: str = None, poll_interval: str = "10millis", disable_commit: bool = False, disable_pre_filtering: bool = False, @@ -83,6 +86,9 @@ def __init__(self, stop_after: int = 0): super().__init__(c) + if isinstance(auto_offset_reset, AutoOffsetReset): + auto_offset_reset = auto_offset_reset.value + self._consumer_params = { 'bootstrap.servers': bootstrap_servers, 'group.id': group_id, @@ -91,7 +97,7 @@ def __init__(self, 'enable.auto.commit': 'false' } if client_id is not None: - self._consumer_conf['client.id'] = client_id + self._consumer_params['client.id'] = client_id self._topic = input_topic self._max_batch_size = c.pipeline_batch_size From 18cd0211726e03d655e3d0415b60f0c0e2d95fba Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 6 Oct 2022 10:16:08 -0700 Subject: [PATCH 06/20] Update kafka tests --- tests/test_cli.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index c9a0144886..cb8baaf240 100755 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -490,8 +490,8 @@ def test_pipeline_fil_all(self, config, callback_values, tmp_path, mlflow_uri): assert not file_source._iterative assert isinstance(from_kafka, KafkaSourceStage) - assert from_kafka._consumer_conf['bootstrap.servers'] == 'kserv1:123,kserv2:321' - assert from_kafka._input_topic == 'test_topic' + assert from_kafka._consumer_params['bootstrap.servers'] == 'kserv1:123,kserv2:321' + assert from_kafka._topic == 'test_topic' assert isinstance(deserialize, DeserializeStage) assert isinstance(filter_stage, FilterDetectionsStage) @@ -703,8 +703,8 @@ def test_pipeline_nlp_all(self, config, callback_values, tmp_path, mlflow_uri): assert not file_source._iterative assert isinstance(from_kafka, KafkaSourceStage) - assert from_kafka._consumer_conf['bootstrap.servers'] == 'kserv1:123,kserv2:321' - assert from_kafka._input_topic == 'test_topic' + assert from_kafka._consumer_params['bootstrap.servers'] == 'kserv1:123,kserv2:321' + assert from_kafka._topic == 'test_topic' assert isinstance(deserialize, DeserializeStage) assert isinstance(filter_stage, FilterDetectionsStage) From 044a941e394a1c0c7d65cf01530de4e56ec2f8c4 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 6 Oct 2022 11:35:20 -0700 Subject: [PATCH 07/20] Implement stop_after functionality --- morpheus/stages/input/kafka_source_stage.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 425ba76ca1..c64214bb45 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -136,6 +136,8 @@ def _source_generator(self): consumer = ck.Consumer(self._consumer_params) consumer.subscribe([self._topic]) + records_emitted = 0 + while not self._stop_requested: msg = consumer.poll(timeout=1.0) if msg is None: @@ -156,7 +158,12 @@ def _source_generator(self): consumer.commit(message=msg) if df is not None: + num_records = len(df) yield MessageMeta(df) + records_emitted += num_records + + if self._stop_after > 0 and records_emitted >= self._stop_after: + self._stop_requested = True elif msg_error == ck.KafkaError._PARTITION_EOF: time.sleep(self._poll_interval) From 3904f0fc944c7a61920ae62be637ac85c0d8b651 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 6 Oct 2022 12:24:01 -0700 Subject: [PATCH 08/20] Set client_ids and don't disable commits --- tests/test_abp_kafka.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/test_abp_kafka.py b/tests/test_abp_kafka.py index 5578069f39..c6c4bde036 100755 --- a/tests/test_abp_kafka.py +++ b/tests/test_abp_kafka.py @@ -113,8 +113,8 @@ def async_infer(callback=None, **k): input_topic=kafka_topics.input_topic, auto_offset_reset="earliest", poll_interval="1seconds", - disable_commit=True, - stop_after=num_records)) + stop_after=num_records, + client_id="test_abp_no_cpp_reader")) pipe.add_stage(DeserializeStage(config)) pipe.add_stage(PreprocessFILStage(config)) pipe.add_stage( @@ -123,7 +123,10 @@ def async_infer(callback=None, **k): pipe.add_stage(AddClassificationsStage(config)) pipe.add_stage(SerializeStage(config)) pipe.add_stage( - WriteToKafkaStage(config, bootstrap_servers=kafka_bootstrap_servers, output_topic=kafka_topics.output_topic)) + WriteToKafkaStage(config, + bootstrap_servers=kafka_bootstrap_servers, + output_topic=kafka_topics.output_topic, + client_id="test_abp_no_cpp_writer")) pipe.run() From d5604902164d9bea50c3782acdee1257e2e596d7 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 6 Oct 2022 13:42:37 -0700 Subject: [PATCH 09/20] Use auto-commit when disable_commit is true --- morpheus/stages/input/kafka_source_stage.py | 4 ++-- tests/test_abp_kafka.py | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index c64214bb45..96fc600878 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -94,7 +94,7 @@ def __init__(self, 'group.id': group_id, 'session.timeout.ms': "60000", "auto.offset.reset": auto_offset_reset, - 'enable.auto.commit': 'false' + 'enable.auto.commit': str(disable_commit).lower() } if client_id is not None: self._consumer_params['client.id'] = client_id @@ -192,6 +192,6 @@ def _build_source(self, builder: srf.Builder) -> StreamPair: # multiple threads source.launch_options.pe_count = self._max_concurrent else: - source = builder.make_source(self.unique_name, self._source_generator()) + source = builder.make_source(self.unique_name, self._source_generator) return source, MessageMeta diff --git a/tests/test_abp_kafka.py b/tests/test_abp_kafka.py index c6c4bde036..a46d5d4c07 100755 --- a/tests/test_abp_kafka.py +++ b/tests/test_abp_kafka.py @@ -113,6 +113,7 @@ def async_infer(callback=None, **k): input_topic=kafka_topics.input_topic, auto_offset_reset="earliest", poll_interval="1seconds", + disable_commit=True, stop_after=num_records, client_id="test_abp_no_cpp_reader")) pipe.add_stage(DeserializeStage(config)) @@ -133,6 +134,7 @@ def async_infer(callback=None, **k): val_df = read_file_to_df(val_file_name, file_type=FileTypes.Auto, df_type='pandas') output_buf = StringIO() + kafka_consumer.seek_to_beginning() for rec in kafka_consumer: output_buf.write("{}\n".format(rec.value.decode("utf-8"))) From 59348388775de9588310d58269f87e5a942a7f57 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 6 Oct 2022 13:43:34 -0700 Subject: [PATCH 10/20] Remove call to seek --- tests/test_abp_kafka.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_abp_kafka.py b/tests/test_abp_kafka.py index a46d5d4c07..8faf57b167 100755 --- a/tests/test_abp_kafka.py +++ b/tests/test_abp_kafka.py @@ -134,7 +134,6 @@ def async_infer(callback=None, **k): val_df = read_file_to_df(val_file_name, file_type=FileTypes.Auto, df_type='pandas') output_buf = StringIO() - kafka_consumer.seek_to_beginning() for rec in kafka_consumer: output_buf.write("{}\n".format(rec.value.decode("utf-8"))) From 511fb6d7d2be3de8f749aa7ee743c71e20b0244b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 12 Oct 2022 08:02:41 -0700 Subject: [PATCH 11/20] Bump SRF version --- cmake/dependencies.cmake | 2 +- docker/conda/environments/cuda11.5_dev.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index f015c2c578..8976d8dcc3 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -103,7 +103,7 @@ include(deps/Configure_rdkafka) # SRF (Should come after all third party but before NVIDIA repos) # ===== -set(SRF_VERSION 22.09 CACHE STRING "Which version of SRF to use") +set(SRF_VERSION 22.11 CACHE STRING "Which version of SRF to use") include(deps/Configure_srf) # CuDF diff --git a/docker/conda/environments/cuda11.5_dev.yml b/docker/conda/environments/cuda11.5_dev.yml index 8e67abe68b..d7de2b5ad6 100644 --- a/docker/conda/environments/cuda11.5_dev.yml +++ b/docker/conda/environments/cuda11.5_dev.yml @@ -79,7 +79,7 @@ dependencies: - scikit-build=0.13 - sphinx - sphinx_rtd_theme - - srf 22.09.* + - srf 22.11.* - sysroot_linux-64=2.17 - tqdm=4 - typing_utils=0.1 From 761c74f5ed560fca0afe6831b3f7d4ea03e39234 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 12 Oct 2022 09:28:44 -0700 Subject: [PATCH 12/20] wip --- morpheus/stages/input/kafka_source_stage.py | 9 +++++++++ tests/test_abp_kafka.py | 2 -- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 96fc600878..5c92c933ee 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import sys import time from enum import Enum @@ -128,6 +129,8 @@ def stop(self): return super().stop() def _source_generator(self): + # TODO : Needs to batch records until _stop_requested, _PARTITION_EOF or batch size has been hit + # Each invocation of this function makes a new thread so recreate the producers # Set some initial values @@ -137,6 +140,7 @@ def _source_generator(self): consumer.subscribe([self._topic]) records_emitted = 0 + num_messages = 0 while not self._stop_requested: msg = consumer.poll(timeout=1.0) @@ -161,6 +165,7 @@ def _source_generator(self): num_records = len(df) yield MessageMeta(df) records_emitted += num_records + num_messages += 1 if self._stop_after > 0 and records_emitted >= self._stop_after: self._stop_requested = True @@ -170,6 +175,10 @@ def _source_generator(self): else: raise ck.KafkaException(msg_error) + print("******** completed after {} records in {} messages".format(records_emitted, num_messages), + flush=True, + file=sys.stderr) + finally: # Close the consumer and call on_completed if (consumer): diff --git a/tests/test_abp_kafka.py b/tests/test_abp_kafka.py index 8faf57b167..217fcdf7dd 100755 --- a/tests/test_abp_kafka.py +++ b/tests/test_abp_kafka.py @@ -105,7 +105,6 @@ def async_infer(callback=None, **k): # Fill our topic with the input data num_records = write_file_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, val_file_name) - # Disabling commits due to known issue in Python impl: https://github.com/nv-morpheus/Morpheus/issues/294 pipe = LinearPipeline(config) pipe.set_source( KafkaSourceStage(config, @@ -113,7 +112,6 @@ def async_infer(callback=None, **k): input_topic=kafka_topics.input_topic, auto_offset_reset="earliest", poll_interval="1seconds", - disable_commit=True, stop_after=num_records, client_id="test_abp_no_cpp_reader")) pipe.add_stage(DeserializeStage(config)) From 3ce2bf4c4f7b4de42e06f2c2ae423b4c87a85e6c Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 21 Oct 2022 11:03:31 -0700 Subject: [PATCH 13/20] wip --- morpheus/stages/input/kafka_source_stage.py | 69 ++++++++++++++------- 1 file changed, 45 insertions(+), 24 deletions(-) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 5c92c933ee..24d152c99c 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -114,6 +114,9 @@ def __init__(self, self._poll_interval = pd.Timedelta(poll_interval).total_seconds() self._started = False + self._records_emitted = 0 + self._num_messages = 0 + @property def name(self) -> str: return "from-kafka" @@ -128,6 +131,35 @@ def stop(self): return super().stop() + def _process_batch(self, consumer, batch): + if len(batch): + payloads = [] + for msg in batch: + payload = msg.value() + if payload is not None: + payloads.append(payload) + + df = None + try: + df = cudf.io.read_json(payloads, lines=True) + except Exception as e: + logger.error("Error parsing payload into a dataframe : {}".format(e)) + finally: + if (not self._disable_commit): + for msg in batch: + consumer.commit(message=msg) + + if df is not None: + num_records = len(df) + yield MessageMeta(df) + self._records_emitted += num_records + self._num_messages += 1 + + if self._stop_after > 0 and self._records_emitted >= self._stop_after: + self._stop_requested = True + + batch.clear() + def _source_generator(self): # TODO : Needs to batch records until _stop_requested, _PARTITION_EOF or batch size has been hit @@ -139,43 +171,32 @@ def _source_generator(self): consumer = ck.Consumer(self._consumer_params) consumer.subscribe([self._topic]) - records_emitted = 0 - num_messages = 0 + batch = [] while not self._stop_requested: msg = consumer.poll(timeout=1.0) if msg is None: - time.sleep(self._poll_interval) + self._process_batch(consumer, batch) + + if not self._stop_requested: + time.sleep(self._poll_interval) continue msg_error = msg.error() if msg_error is None: - payload = msg.value() - if payload is not None: - df = None - try: - df = cudf.io.read_json(payload, lines=True) - except Exception as e: - logger.error("Error parsing payload into a dataframe : {}".format(e)) - finally: - if (not self._disable_commit): - consumer.commit(message=msg) - - if df is not None: - num_records = len(df) - yield MessageMeta(df) - records_emitted += num_records - num_messages += 1 - - if self._stop_after > 0 and records_emitted >= self._stop_after: - self._stop_requested = True + batch.append(msg) + if len(batch) == self._max_batch_size: + self._process_batch(consumer, batch) elif msg_error == ck.KafkaError._PARTITION_EOF: - time.sleep(self._poll_interval) + self._process_batch(consumer, batch) + if not self._stop_requested: + time.sleep(self._poll_interval) else: raise ck.KafkaException(msg_error) - print("******** completed after {} records in {} messages".format(records_emitted, num_messages), + print("******** completed after {} records in {} messages".format(self._records_emitted, + self._num_messages), flush=True, file=sys.stderr) From 5d7d03c2f92d40501e1ee5d29db4c9684e3ebf50 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 21 Oct 2022 12:50:00 -0700 Subject: [PATCH 14/20] Restructure so we are only calling self._process_batch from a single location --- morpheus/stages/input/kafka_source_stage.py | 68 +++++++++++++-------- 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 24d152c99c..4ff2681865 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -16,6 +16,7 @@ import sys import time from enum import Enum +from io import StringIO import confluent_kafka as ck import pandas as pd @@ -132,16 +133,20 @@ def stop(self): return super().stop() def _process_batch(self, consumer, batch): + message_meta = None if len(batch): - payloads = [] + buffer = StringIO() + for msg in batch: payload = msg.value() if payload is not None: - payloads.append(payload) + buffer.write(payload.decode("utf-8")) + buffer.write("\n") df = None try: - df = cudf.io.read_json(payloads, lines=True) + buffer.seek(0) + df = cudf.io.read_json(buffer, engine='cudf', lines=True, orient='records') except Exception as e: logger.error("Error parsing payload into a dataframe : {}".format(e)) finally: @@ -151,14 +156,16 @@ def _process_batch(self, consumer, batch): if df is not None: num_records = len(df) - yield MessageMeta(df) + message_meta = MessageMeta(df) self._records_emitted += num_records self._num_messages += 1 if self._stop_after > 0 and self._records_emitted >= self._stop_after: self._stop_requested = True - batch.clear() + batch.clear() + + return message_meta def _source_generator(self): # TODO : Needs to batch records until _stop_requested, _PARTITION_EOF or batch size has been hit @@ -174,31 +181,38 @@ def _source_generator(self): batch = [] while not self._stop_requested: + do_process_batch = False + do_sleep = False + msg = consumer.poll(timeout=1.0) if msg is None: - self._process_batch(consumer, batch) - - if not self._stop_requested: - time.sleep(self._poll_interval) - continue - - msg_error = msg.error() - if msg_error is None: - batch.append(msg) - if len(batch) == self._max_batch_size: - self._process_batch(consumer, batch) - - elif msg_error == ck.KafkaError._PARTITION_EOF: - self._process_batch(consumer, batch) - if not self._stop_requested: - time.sleep(self._poll_interval) - else: - raise ck.KafkaException(msg_error) + do_process_batch = True + do_sleep = True - print("******** completed after {} records in {} messages".format(self._records_emitted, - self._num_messages), - flush=True, - file=sys.stderr) + else: + msg_error = msg.error() + if msg_error is None: + batch.append(msg) + if len(batch) == self._max_batch_size: + do_process_batch = True + + elif msg_error == ck.KafkaError._PARTITION_EOF: + do_process_batch = True + do_sleep = True + else: + raise ck.KafkaException(msg_error) + + if do_process_batch: + message_meta = self._process_batch(consumer, batch) + if message_meta is not None: + yield message_meta + + if do_sleep and not self._stop_requested: + time.sleep(self._poll_interval) + + message_meta = self._process_batch(consumer, batch) + if message_meta is not None: + yield message_meta finally: # Close the consumer and call on_completed From 59d2d2ce96850a6f6481f30ffb6b4d9b3d537c30 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 24 Oct 2022 08:31:43 -0700 Subject: [PATCH 15/20] Remove unused import --- morpheus/stages/input/kafka_source_stage.py | 1 - 1 file changed, 1 deletion(-) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 4ff2681865..f79604c5c3 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -13,7 +13,6 @@ # limitations under the License. import logging -import sys import time from enum import Enum from io import StringIO From 9139fefa95b03d6123abca080d74927d78f4be03 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 24 Oct 2022 13:03:22 -0700 Subject: [PATCH 16/20] Optionally commit messages synchronously, makes testing easier --- .../include/morpheus/stages/kafka_source.hpp | 7 +++-- morpheus/_lib/src/python_modules/stages.cpp | 3 +- morpheus/_lib/src/stages/kafka_source.cpp | 29 +++++++++++++++---- morpheus/stages/input/kafka_source_stage.py | 14 +++++---- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/morpheus/_lib/include/morpheus/stages/kafka_source.hpp b/morpheus/_lib/include/morpheus/stages/kafka_source.hpp index 33bbb19484..4cbd64deb3 100644 --- a/morpheus/_lib/include/morpheus/stages/kafka_source.hpp +++ b/morpheus/_lib/include/morpheus/stages/kafka_source.hpp @@ -57,7 +57,8 @@ class KafkaSourceStage : public srf::pysrf::PythonSource config, bool disable_commit = false, bool disable_pre_filtering = false, - size_t stop_after = 0); + size_t stop_after = 0, + bool async_commits = true); ~KafkaSourceStage() override = default; @@ -107,6 +108,7 @@ class KafkaSourceStage : public srf::pysrf::PythonSource config, bool disable_commits, bool disable_pre_filtering, - size_t stop_after = 0); + size_t stop_after = 0, + bool async_commits = true); }; #pragma GCC visibility pop } // namespace morpheus diff --git a/morpheus/_lib/src/python_modules/stages.cpp b/morpheus/_lib/src/python_modules/stages.cpp index c12b2e5402..ae75797062 100644 --- a/morpheus/_lib/src/python_modules/stages.cpp +++ b/morpheus/_lib/src/python_modules/stages.cpp @@ -132,7 +132,8 @@ PYBIND11_MODULE(stages, m) py::arg("config"), py::arg("disable_commits") = false, py::arg("disable_pre_filtering") = false, - py::arg("stop_after") = 0); + py::arg("stop_after") = 0, + py::arg("async_commits") = true); py::class_, srf::segment::ObjectProperties, diff --git a/morpheus/_lib/src/stages/kafka_source.cpp b/morpheus/_lib/src/stages/kafka_source.cpp index a4eeaecfe1..1c1ba56bab 100644 --- a/morpheus/_lib/src/stages/kafka_source.cpp +++ b/morpheus/_lib/src/stages/kafka_source.cpp @@ -256,7 +256,8 @@ KafkaSourceStage::KafkaSourceStage(std::size_t max_batch_size, std::map config, bool disable_commit, bool disable_pre_filtering, - size_t stop_after) : + size_t stop_after, + bool async_commits) : PythonSource(build()), m_max_batch_size(max_batch_size), m_topic(std::move(topic)), @@ -264,7 +265,8 @@ KafkaSourceStage::KafkaSourceStage(std::size_t max_batch_size, m_config(std::move(config)), m_disable_commit(disable_commit), m_disable_pre_filtering(disable_pre_filtering), - m_stop_after{stop_after} + m_stop_after{stop_after}, + m_async_commits(async_commits) {} KafkaSourceStage::subscriber_fn_t KafkaSourceStage::build() @@ -333,7 +335,14 @@ KafkaSourceStage::subscriber_fn_t KafkaSourceStage::build() if (should_commit) { - CHECK_KAFKA(consumer->commitAsync(), RdKafka::ERR_NO_ERROR, "Error during commitAsync"); + if (m_async_commits) + { + CHECK_KAFKA(consumer->commitAsync(), RdKafka::ERR_NO_ERROR, "Error during commitAsync"); + } + else + { + CHECK_KAFKA(consumer->commitSync(), RdKafka::ERR_NO_ERROR, "Error during commit"); + } } } @@ -594,10 +603,18 @@ std::shared_ptr> KafkaSourceStageInterfac std::map config, bool disable_commits, bool disable_pre_filtering, - size_t stop_after) + size_t stop_after, + bool async_commits) { - auto stage = builder.construct_object( - name, max_batch_size, topic, batch_timeout_ms, config, disable_commits, disable_pre_filtering, stop_after); + auto stage = builder.construct_object(name, + max_batch_size, + topic, + batch_timeout_ms, + config, + disable_commits, + disable_pre_filtering, + stop_after, + async_commits); return stage; } diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index f79604c5c3..0daa1df9aa 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -72,6 +72,8 @@ class KafkaSourceStage(SingleOutputSource): information on the effects of each value." stop_after: int, default = 0 Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0` + async_commits: bool, default = True + Enable commits to be performed asynchronously. Ignored if `disable_commit` is `True`. """ def __init__(self, @@ -84,7 +86,8 @@ def __init__(self, disable_commit: bool = False, disable_pre_filtering: bool = False, auto_offset_reset: AutoOffsetReset = "latest", - stop_after: int = 0): + stop_after: int = 0, + async_commits: bool = True): super().__init__(c) if isinstance(auto_offset_reset, AutoOffsetReset): @@ -94,8 +97,7 @@ def __init__(self, 'bootstrap.servers': bootstrap_servers, 'group.id': group_id, 'session.timeout.ms': "60000", - "auto.offset.reset": auto_offset_reset, - 'enable.auto.commit': str(disable_commit).lower() + "auto.offset.reset": auto_offset_reset } if client_id is not None: self._consumer_params['client.id'] = client_id @@ -106,6 +108,7 @@ def __init__(self, self._disable_commit = disable_commit self._disable_pre_filtering = disable_pre_filtering self._stop_after = stop_after + self._async_commits = async_commits self._client = None # Flag to indicate whether or not we should stop @@ -151,7 +154,7 @@ def _process_batch(self, consumer, batch): finally: if (not self._disable_commit): for msg in batch: - consumer.commit(message=msg) + consumer.commit(message=msg, asynchronous=self._async_commits) if df is not None: num_records = len(df) @@ -229,7 +232,8 @@ def _build_source(self, builder: srf.Builder) -> StreamPair: self._consumer_params, self._disable_commit, self._disable_pre_filtering, - self._stop_after) + self._stop_after, + self._async_commits) # Only use multiple progress engines with C++. The python implementation will duplicate messages with # multiple threads From 197adb7187426db77f90261938344c17e1f0039b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 24 Oct 2022 13:05:00 -0700 Subject: [PATCH 17/20] Add unittest to verify kafka messages are being committed prior to being emitted --- tests/test_kafka_source_stage_pipe.py | 122 ++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/tests/test_kafka_source_stage_pipe.py b/tests/test_kafka_source_stage_pipe.py index 2410817249..db7310e1b0 100644 --- a/tests/test_kafka_source_stage_pipe.py +++ b/tests/test_kafka_source_stage_pipe.py @@ -14,15 +14,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import os import typing import numpy as np import pytest +import srf from morpheus._lib.file_types import FileTypes +from morpheus.config import Config from morpheus.io.deserializers import read_file_to_df from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.pipeline.single_port_stage import SinglePortStage +from morpheus.stages.general.trigger_stage import TriggerStage from morpheus.stages.input.kafka_source_stage import KafkaSourceStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.serialize_stage import SerializeStage @@ -67,3 +72,120 @@ def test_kafka_source_stage_pipe(tmp_path, config, kafka_bootstrap_servers: str, output_data = np.around(output_data, 2) assert output_data.tolist() == input_data.tolist() + + +class OffsetChecker(SinglePortStage): + """ + Verifies that the kafka offsets are being updated as a way of verifying that the + consumer is performing a commit. + """ + + def __init__(self, c: Config, bootstrap_servers: str, group_id: str): + super().__init__(c) + + # importing here so that running without the --run_kafka flag won't fail due + # to not having the kafka libs installed + from kafka import KafkaAdminClient + + self._client = KafkaAdminClient(bootstrap_servers=bootstrap_servers) + self._group_id = group_id + self._offsets = None + + @property + def name(self) -> str: + return "morpheus_offset_checker" + + def accepted_types(self) -> typing.Tuple: + """ + Accepted input types for this stage are returned. + + Returns + ------- + typing.Tuple + Accepted input types. + + """ + return (typing.Any, ) + + def supports_cpp_node(self): + return False + + def _offset_checker(self, x): + at_least_one_gt = False + new_offsets = self._client.list_consumer_group_offsets(self._group_id) + + if self._offsets is not None: + for (tp, prev_offset) in self._offsets.items(): + new_offset = new_offsets[tp] + + assert new_offset.offset >= prev_offset.offset + + if new_offset.offset > prev_offset.offset: + at_least_one_gt = True + + print(f"************\n{self._offsets}\n{new_offsets}\n********", flush=True) + assert at_least_one_gt + + self._offsets = new_offsets + + return x + + def _build_single(self, builder: srf.Builder, input_stream): + node = builder.make_node(self.unique_name, self._offset_checker) + builder.make_edge(input_stream[0], node) + + return node, input_stream[1] + + +@pytest.mark.kafka +@pytest.mark.slow +@pytest.mark.parametrize('num_records', [10, 100, 1000]) +def test_kafka_source_commit(num_records, + tmp_path, + config, + kafka_bootstrap_servers: str, + kafka_topics: typing.Tuple[str, str]) -> None: + + input_file = os.path.join(tmp_path, "input_data.json") + with open(input_file, 'w') as fh: + for i in range(num_records): + fh.write("{}\n".format(json.dumps({'v': i}))) + + num_written = write_file_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, input_file) + assert num_written == num_records + + out_file = os.path.join(tmp_path, 'results.jsonlines') + + pipe = LinearPipeline(config) + pipe.set_source( + KafkaSourceStage(config, + bootstrap_servers=kafka_bootstrap_servers, + input_topic=kafka_topics.input_topic, + auto_offset_reset="earliest", + poll_interval="1seconds", + group_id='morpheus', + client_id='morpheus_kafka_source_commit', + stop_after=num_records, + async_commits=False)) + + pipe.add_stage(OffsetChecker(config, bootstrap_servers=kafka_bootstrap_servers, group_id='morpheus')) + pipe.add_stage(TriggerStage(config)) + + pipe.add_stage(DeserializeStage(config)) + pipe.add_stage(SerializeStage(config)) + pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) + pipe.run() + + assert os.path.exists(out_file) + + input_data = read_file_to_df(input_file, file_type=FileTypes.Auto).values + output_data = read_file_to_df(out_file, file_type=FileTypes.Auto).values + + assert len(input_data) == num_records + assert len(output_data) == num_records + + # Somehow 0.7 ends up being 0.7000000000000001 + input_data = np.around(input_data, 2) + output_data = np.around(output_data, 2) + + assert output_data.tolist() == input_data.tolist() From 6b5296fbedaa24093cb6e4e04fe8088033e2c9a5 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 3 Nov 2022 09:02:13 -0700 Subject: [PATCH 18/20] Replace dep on cudf_kafka with librdkafka --- docker/conda/environments/cuda11.5_dev.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/conda/environments/cuda11.5_dev.yml b/docker/conda/environments/cuda11.5_dev.yml index 097b223da7..5c2df645ab 100644 --- a/docker/conda/environments/cuda11.5_dev.yml +++ b/docker/conda/environments/cuda11.5_dev.yml @@ -35,7 +35,6 @@ dependencies: - cuda-python<=11.7.0 # Remove when Issue #251 is closed - cudatoolkit=11.5 - cudf 22.08 - - cudf_kafka 22.08.* - cupy=9.5.0 - cython=0.29.24 - datacompy=0.8 @@ -56,6 +55,7 @@ dependencies: - gxx_linux-64=9.4 - include-what-you-use=0.18 - isort + - librdkafka=1.7.0 - mlflow>=1.23 - myst-parser==0.17 - networkx=2.8 From 4bd3955c2d915caf96409f1a5b66a1fc2bef26ec Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 3 Nov 2022 14:33:53 -0700 Subject: [PATCH 19/20] Add explicit dep on python-confluent-kafka which previously was pulled in by cudf_kafka --- docker/conda/environments/cuda11.5_dev.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker/conda/environments/cuda11.5_dev.yml b/docker/conda/environments/cuda11.5_dev.yml index 5c2df645ab..ba4503d3d3 100644 --- a/docker/conda/environments/cuda11.5_dev.yml +++ b/docker/conda/environments/cuda11.5_dev.yml @@ -73,6 +73,7 @@ dependencies: - pytest - pytest-benchmark>=4.0 - pytest-cov + - python-confluent-kafka=1.7.0 - python-graphviz - python=3.8 - rapidjson=1.1.0 From 499e4f56465774a5ac4766f2876ec78cc1531246 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Mon, 14 Nov 2022 14:05:07 -0800 Subject: [PATCH 20/20] Remove out of date comments --- morpheus/stages/input/kafka_source_stage.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 0daa1df9aa..e2bf528415 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -170,11 +170,6 @@ def _process_batch(self, consumer, batch): return message_meta def _source_generator(self): - # TODO : Needs to batch records until _stop_requested, _PARTITION_EOF or batch size has been hit - - # Each invocation of this function makes a new thread so recreate the producers - - # Set some initial values consumer = None try: consumer = ck.Consumer(self._consumer_params)