diff --git a/docker/conda/environments/cuda11.5_dev.yml b/docker/conda/environments/cuda11.5_dev.yml index f1cd8899f4..213316f765 100644 --- a/docker/conda/environments/cuda11.5_dev.yml +++ b/docker/conda/environments/cuda11.5_dev.yml @@ -35,7 +35,6 @@ dependencies: - cuda-python<=11.7.0 # Remove when Issue #251 is closed - cudatoolkit=11.5 - cudf 22.08 - - cudf_kafka 22.08.* - cupy=9.5.0 - cython=0.29.24 - datacompy=0.8 @@ -57,6 +56,7 @@ dependencies: - gxx_linux-64=9.4 - include-what-you-use=0.18 - isort + - librdkafka=1.7.0 - mlflow>=1.23 - myst-parser==0.17 - networkx=2.8 @@ -74,6 +74,7 @@ dependencies: - pytest - pytest-benchmark>=4.0 - pytest-cov + - python-confluent-kafka=1.7.0 - python-graphviz - python=3.8 - rapidjson=1.1.0 diff --git a/morpheus/_lib/include/morpheus/stages/kafka_source.hpp b/morpheus/_lib/include/morpheus/stages/kafka_source.hpp index 33bbb19484..4cbd64deb3 100644 --- a/morpheus/_lib/include/morpheus/stages/kafka_source.hpp +++ b/morpheus/_lib/include/morpheus/stages/kafka_source.hpp @@ -57,7 +57,8 @@ class KafkaSourceStage : public srf::pysrf::PythonSource config, bool disable_commit = false, bool disable_pre_filtering = false, - size_t stop_after = 0); + size_t stop_after = 0, + bool async_commits = true); ~KafkaSourceStage() override = default; @@ -107,6 +108,7 @@ class KafkaSourceStage : public srf::pysrf::PythonSource config, bool disable_commits, bool disable_pre_filtering, - size_t stop_after = 0); + size_t stop_after = 0, + bool async_commits = true); }; #pragma GCC visibility pop } // namespace morpheus diff --git a/morpheus/_lib/src/python_modules/stages.cpp b/morpheus/_lib/src/python_modules/stages.cpp index c12b2e5402..ae75797062 100644 --- a/morpheus/_lib/src/python_modules/stages.cpp +++ b/morpheus/_lib/src/python_modules/stages.cpp @@ -132,7 +132,8 @@ PYBIND11_MODULE(stages, m) py::arg("config"), py::arg("disable_commits") = false, py::arg("disable_pre_filtering") = false, - py::arg("stop_after") = 0); + py::arg("stop_after") = 0, + py::arg("async_commits") = true); py::class_, srf::segment::ObjectProperties, diff --git a/morpheus/_lib/src/stages/kafka_source.cpp b/morpheus/_lib/src/stages/kafka_source.cpp index 801e0151e4..874c3fdfd3 100644 --- a/morpheus/_lib/src/stages/kafka_source.cpp +++ b/morpheus/_lib/src/stages/kafka_source.cpp @@ -257,7 +257,8 @@ KafkaSourceStage::KafkaSourceStage(std::size_t max_batch_size, std::map config, bool disable_commit, bool disable_pre_filtering, - size_t stop_after) : + size_t stop_after, + bool async_commits) : PythonSource(build()), m_max_batch_size(max_batch_size), m_topic(std::move(topic)), @@ -265,7 +266,8 @@ KafkaSourceStage::KafkaSourceStage(std::size_t max_batch_size, m_config(std::move(config)), m_disable_commit(disable_commit), m_disable_pre_filtering(disable_pre_filtering), - m_stop_after{stop_after} + m_stop_after{stop_after}, + m_async_commits(async_commits) {} KafkaSourceStage::subscriber_fn_t KafkaSourceStage::build() @@ -334,7 +336,14 @@ KafkaSourceStage::subscriber_fn_t KafkaSourceStage::build() if (should_commit) { - CHECK_KAFKA(consumer->commitAsync(), RdKafka::ERR_NO_ERROR, "Error during commitAsync"); + if (m_async_commits) + { + CHECK_KAFKA(consumer->commitAsync(), RdKafka::ERR_NO_ERROR, "Error during commitAsync"); + } + else + { + CHECK_KAFKA(consumer->commitSync(), RdKafka::ERR_NO_ERROR, "Error during commit"); + } } } @@ -571,10 +580,18 @@ std::shared_ptr> KafkaSourceStageInterfac std::map config, bool disable_commits, bool disable_pre_filtering, - size_t stop_after) + size_t stop_after, + bool async_commits) { - auto stage = builder.construct_object( - name, max_batch_size, topic, batch_timeout_ms, config, disable_commits, disable_pre_filtering, stop_after); + auto stage = builder.construct_object(name, + max_batch_size, + topic, + batch_timeout_ms, + config, + disable_commits, + disable_pre_filtering, + stop_after, + async_commits); return stage; } diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 6c42c67a86..e2bf528415 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -14,12 +14,12 @@ import logging import time -import weakref from enum import Enum +from io import StringIO +import confluent_kafka as ck import pandas as pd import srf -from cudf_kafka._lib.kafka import KafkaDatasource import cudf @@ -57,6 +57,8 @@ class KafkaSourceStage(SingleOutputSource): Input kafka topic. group_id : str Specifies the name of the consumer group a Kafka consumer belongs to. + client_id : str, default = None + An optional identifier of the consumer. poll_interval : str Seconds that elapse between polling Kafka for new messages. Follows the pandas interval format. disable_commit : bool, default = False @@ -70,69 +72,54 @@ class KafkaSourceStage(SingleOutputSource): information on the effects of each value." stop_after: int, default = 0 Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0` + async_commits: bool, default = True + Enable commits to be performed asynchronously. Ignored if `disable_commit` is `True`. """ def __init__(self, c: Config, bootstrap_servers: str, - input_topic: str, + input_topic: str = "test_pcap", group_id: str = "morpheus", client_id: str = None, poll_interval: str = "10millis", disable_commit: bool = False, disable_pre_filtering: bool = False, auto_offset_reset: AutoOffsetReset = "latest", - stop_after: int = 0): + stop_after: int = 0, + async_commits: bool = True): super().__init__(c) if isinstance(auto_offset_reset, AutoOffsetReset): auto_offset_reset = auto_offset_reset.value - self._consumer_conf = { + self._consumer_params = { 'bootstrap.servers': bootstrap_servers, 'group.id': group_id, 'session.timeout.ms': "60000", "auto.offset.reset": auto_offset_reset } if client_id is not None: - self._consumer_conf['client.id'] = client_id + self._consumer_params['client.id'] = client_id - self._input_topic = input_topic - self._poll_interval = poll_interval + self._topic = input_topic self._max_batch_size = c.pipeline_batch_size self._max_concurrent = c.num_threads self._disable_commit = disable_commit self._disable_pre_filtering = disable_pre_filtering self._stop_after = stop_after + self._async_commits = async_commits self._client = None # Flag to indicate whether or not we should stop self._stop_requested = False - # What gets passed to streamz kafka - topic = self._input_topic - consumer_params = self._consumer_conf - poll_interval = self._poll_interval - npartitions = None - refresh_partitions = False - max_batch_size = self._max_batch_size - keys = False - - self._consumer_params = consumer_params - # Override the auto-commit config to enforce custom streamz checkpointing - self._consumer_params['enable.auto.commit'] = 'false' - if 'auto.offset.reset' not in self._consumer_params.keys(): - self._consumer_params['auto.offset.reset'] = 'earliest' - self._topic = topic - self._npartitions = npartitions - self._refresh_partitions = refresh_partitions - if self._npartitions is not None and self._npartitions <= 0: - raise ValueError("Number of Kafka topic partitions must be > 0.") self._poll_interval = pd.Timedelta(poll_interval).total_seconds() - self._max_batch_size = max_batch_size - self._keys = keys self._started = False + self._records_emitted = 0 + self._num_messages = 0 + @property def name(self) -> str: return "from-kafka" @@ -147,221 +134,88 @@ def stop(self): return super().stop() - def _source_generator(self): - # Each invocation of this function makes a new thread so recreate the producers - - # Set some initial values - npartitions = self._npartitions - consumer = None - consumer_params = self._consumer_params - - try: - - # Now begin the script - import confluent_kafka as ck - - consumer = ck.Consumer(consumer_params) - - # weakref.finalize(self, lambda c=consumer: _close_consumer(c)) - tp = ck.TopicPartition(self._topic, 0, 0) - - attempts = 0 - max_attempts = 5 - records_emitted = 0 - - # Attempt to connect to the cluster. Try 5 times before giving up - while attempts < max_attempts: - try: - # blocks for consumer thread to come up - consumer.get_watermark_offsets(tp) + def _process_batch(self, consumer, batch): + message_meta = None + if len(batch): + buffer = StringIO() - logger.debug("Connected to Kafka source at '%s' on attempt #%d/%d", - self._consumer_conf["bootstrap.servers"], - attempts + 1, - max_attempts) - - break - except (RuntimeError, ck.KafkaException): - attempts += 1 - - # Raise the error if we hit the max - if (attempts >= max_attempts): - logger.exception(("Error while getting Kafka watermark offsets. Max attempts (%d) reached. " - "Check the bootstrap_servers parameter ('%s')"), - max_attempts, - self._consumer_conf["bootstrap.servers"]) - raise - else: - logger.warning("Error while getting Kafka watermark offsets. Attempt #%d/%d", - attempts, - max_attempts, - exc_info=True) - - # Exponential backoff - time.sleep(2.0**attempts) + for msg in batch: + payload = msg.value() + if payload is not None: + buffer.write(payload.decode("utf-8")) + buffer.write("\n") + df = None try: - if npartitions is None: + buffer.seek(0) + df = cudf.io.read_json(buffer, engine='cudf', lines=True, orient='records') + except Exception as e: + logger.error("Error parsing payload into a dataframe : {}".format(e)) + finally: + if (not self._disable_commit): + for msg in batch: + consumer.commit(message=msg, asynchronous=self._async_commits) - kafka_cluster_metadata = consumer.list_topics(self._topic) + if df is not None: + num_records = len(df) + message_meta = MessageMeta(df) + self._records_emitted += num_records + self._num_messages += 1 - npartitions = len(kafka_cluster_metadata.topics[self._topic].partitions) + if self._stop_after > 0 and self._records_emitted >= self._stop_after: + self._stop_requested = True - positions = [0] * npartitions + batch.clear() - tps = [] - for partition in range(npartitions): - tps.append(ck.TopicPartition(self._topic, partition)) + return message_meta - while not self._stop_requested: - try: - committed = consumer.committed(tps, timeout=1) - except ck.KafkaException: - pass + def _source_generator(self): + consumer = None + try: + consumer = ck.Consumer(self._consumer_params) + consumer.subscribe([self._topic]) + + batch = [] + + while not self._stop_requested: + do_process_batch = False + do_sleep = False + + msg = consumer.poll(timeout=1.0) + if msg is None: + do_process_batch = True + do_sleep = True + + else: + msg_error = msg.error() + if msg_error is None: + batch.append(msg) + if len(batch) == self._max_batch_size: + do_process_batch = True + + elif msg_error == ck.KafkaError._PARTITION_EOF: + do_process_batch = True + do_sleep = True else: - for tp in committed: - positions[tp.partition] = tp.offset - break - - while not self._stop_requested: - out = [] - - if self._refresh_partitions: - kafka_cluster_metadata = consumer.list_topics(self._topic) - - new_partitions = len(kafka_cluster_metadata.topics[self._topic].partitions) - - if new_partitions > npartitions: - positions.extend([-1001] * (new_partitions - npartitions)) - npartitions = new_partitions + raise ck.KafkaException(msg_error) - for partition in range(npartitions): + if do_process_batch: + message_meta = self._process_batch(consumer, batch) + if message_meta is not None: + yield message_meta - tp = ck.TopicPartition(self._topic, partition, 0) + if do_sleep and not self._stop_requested: + time.sleep(self._poll_interval) - try: - low, high = consumer.get_watermark_offsets(tp, timeout=0.1) - except (RuntimeError, ck.KafkaException): - continue + message_meta = self._process_batch(consumer, batch) + if message_meta is not None: + yield message_meta - if 'auto.offset.reset' in consumer_params.keys(): - if consumer_params['auto.offset.reset'] == 'latest' and positions[partition] == -1001: - positions[partition] = high - - current_position = positions[partition] - - lowest = max(current_position, low) - - if high > lowest + self._max_batch_size: - high = lowest + self._max_batch_size - if high > lowest: - out.append((consumer_params, self._topic, partition, self._keys, lowest, high - 1)) - positions[partition] = high - - consumer_params['auto.offset.reset'] = 'earliest' - - if (out): - for part in out: - - meta = self._kafka_params_to_messagemeta(part) - - # Once the meta goes out of scope, commit it - def commit(topic, part_no, keys, lowest, offset): - # topic, part_no, _, _, offset = part[1:] - try: - _tp = ck.TopicPartition(topic, part_no, offset + 1) - consumer.commit(offsets=[_tp], asynchronous=True) - except Exception: - logger.exception(("Error occurred in `from-kafka` stage with " - "broker '%s' while committing message: %d"), - self._consumer_conf["bootstrap.servers"], - offset) - - if (not self._disable_commit): - weakref.finalize(meta, commit, *part[1:]) - - # Push the message meta - yield meta - - records_emitted += meta.count - if self._stop_after > 0 and records_emitted >= self._stop_after: - raise StopIteration() - else: - time.sleep(self._poll_interval) - except StopIteration: - raise - except Exception: - logger.exception(("Error occurred in `from-kafka` stage with broker '%s' while processing messages"), - self._consumer_conf["bootstrap.servers"]) - raise - except StopIteration: - pass finally: # Close the consumer and call on_completed if (consumer): consumer.close() - def _kafka_params_to_messagemeta(self, x: tuple): - - # Unpack - kafka_params, topic, partition, keys, low, high = x - - gdf = self._read_gdf(kafka_params, topic=topic, partition=partition, lines=True, start=low, end=high + 1) - - return MessageMeta(gdf) - - @staticmethod - def _read_gdf(kafka_configs, - topic=None, - partition=0, - lines=True, - start=0, - end=0, - batch_timeout=10000, - delimiter="\n", - message_format="json"): - """ - Replicates `custreamz.Consumer.read_gdf` function which does not work for some reason. - """ - - if topic is None: - raise ValueError("ERROR: You must specifiy the topic " - "that you want to consume from") - - kafka_datasource = None - - try: - kafka_datasource = KafkaDatasource( - kafka_configs, - topic.encode(), - partition, - start, - end, - batch_timeout, - delimiter.encode(), - ) - - cudf_readers = { - "json": cudf.io.read_json, - "csv": cudf.io.read_csv, - "orc": cudf.io.read_orc, - "avro": cudf.io.read_avro, - "parquet": cudf.io.read_parquet, - } - - result = cudf_readers[message_format](kafka_datasource, engine="cudf", lines=lines) - - return cudf.DataFrame._from_data(data=result._data, index=result._index) - except Exception: - logger.exception("Error occurred converting KafkaDatasource to Dataframe.") - finally: - if (kafka_datasource is not None): - # Close up the cudf datasource instance - # TODO: Ideally the C++ destructor should handle the - # unsubscribe and closing the socket connection. - kafka_datasource.unsubscribe() - kafka_datasource.close(batch_timeout) - def _build_source(self, builder: srf.Builder) -> StreamPair: if (self._build_cpp_node()): @@ -373,7 +227,8 @@ def _build_source(self, builder: srf.Builder) -> StreamPair: self._consumer_params, self._disable_commit, self._disable_pre_filtering, - self._stop_after) + self._stop_after, + self._async_commits) # Only use multiple progress engines with C++. The python implementation will duplicate messages with # multiple threads diff --git a/tests/test_abp_kafka.py b/tests/test_abp_kafka.py index 5578069f39..217fcdf7dd 100755 --- a/tests/test_abp_kafka.py +++ b/tests/test_abp_kafka.py @@ -105,7 +105,6 @@ def async_infer(callback=None, **k): # Fill our topic with the input data num_records = write_file_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, val_file_name) - # Disabling commits due to known issue in Python impl: https://github.com/nv-morpheus/Morpheus/issues/294 pipe = LinearPipeline(config) pipe.set_source( KafkaSourceStage(config, @@ -113,8 +112,8 @@ def async_infer(callback=None, **k): input_topic=kafka_topics.input_topic, auto_offset_reset="earliest", poll_interval="1seconds", - disable_commit=True, - stop_after=num_records)) + stop_after=num_records, + client_id="test_abp_no_cpp_reader")) pipe.add_stage(DeserializeStage(config)) pipe.add_stage(PreprocessFILStage(config)) pipe.add_stage( @@ -123,7 +122,10 @@ def async_infer(callback=None, **k): pipe.add_stage(AddClassificationsStage(config)) pipe.add_stage(SerializeStage(config)) pipe.add_stage( - WriteToKafkaStage(config, bootstrap_servers=kafka_bootstrap_servers, output_topic=kafka_topics.output_topic)) + WriteToKafkaStage(config, + bootstrap_servers=kafka_bootstrap_servers, + output_topic=kafka_topics.output_topic, + client_id="test_abp_no_cpp_writer")) pipe.run() diff --git a/tests/test_cli.py b/tests/test_cli.py index c9a0144886..cb8baaf240 100755 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -490,8 +490,8 @@ def test_pipeline_fil_all(self, config, callback_values, tmp_path, mlflow_uri): assert not file_source._iterative assert isinstance(from_kafka, KafkaSourceStage) - assert from_kafka._consumer_conf['bootstrap.servers'] == 'kserv1:123,kserv2:321' - assert from_kafka._input_topic == 'test_topic' + assert from_kafka._consumer_params['bootstrap.servers'] == 'kserv1:123,kserv2:321' + assert from_kafka._topic == 'test_topic' assert isinstance(deserialize, DeserializeStage) assert isinstance(filter_stage, FilterDetectionsStage) @@ -703,8 +703,8 @@ def test_pipeline_nlp_all(self, config, callback_values, tmp_path, mlflow_uri): assert not file_source._iterative assert isinstance(from_kafka, KafkaSourceStage) - assert from_kafka._consumer_conf['bootstrap.servers'] == 'kserv1:123,kserv2:321' - assert from_kafka._input_topic == 'test_topic' + assert from_kafka._consumer_params['bootstrap.servers'] == 'kserv1:123,kserv2:321' + assert from_kafka._topic == 'test_topic' assert isinstance(deserialize, DeserializeStage) assert isinstance(filter_stage, FilterDetectionsStage) diff --git a/tests/test_kafka_source_stage_pipe.py b/tests/test_kafka_source_stage_pipe.py index 2410817249..db7310e1b0 100644 --- a/tests/test_kafka_source_stage_pipe.py +++ b/tests/test_kafka_source_stage_pipe.py @@ -14,15 +14,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json import os import typing import numpy as np import pytest +import srf from morpheus._lib.file_types import FileTypes +from morpheus.config import Config from morpheus.io.deserializers import read_file_to_df from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.pipeline.single_port_stage import SinglePortStage +from morpheus.stages.general.trigger_stage import TriggerStage from morpheus.stages.input.kafka_source_stage import KafkaSourceStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.serialize_stage import SerializeStage @@ -67,3 +72,120 @@ def test_kafka_source_stage_pipe(tmp_path, config, kafka_bootstrap_servers: str, output_data = np.around(output_data, 2) assert output_data.tolist() == input_data.tolist() + + +class OffsetChecker(SinglePortStage): + """ + Verifies that the kafka offsets are being updated as a way of verifying that the + consumer is performing a commit. + """ + + def __init__(self, c: Config, bootstrap_servers: str, group_id: str): + super().__init__(c) + + # importing here so that running without the --run_kafka flag won't fail due + # to not having the kafka libs installed + from kafka import KafkaAdminClient + + self._client = KafkaAdminClient(bootstrap_servers=bootstrap_servers) + self._group_id = group_id + self._offsets = None + + @property + def name(self) -> str: + return "morpheus_offset_checker" + + def accepted_types(self) -> typing.Tuple: + """ + Accepted input types for this stage are returned. + + Returns + ------- + typing.Tuple + Accepted input types. + + """ + return (typing.Any, ) + + def supports_cpp_node(self): + return False + + def _offset_checker(self, x): + at_least_one_gt = False + new_offsets = self._client.list_consumer_group_offsets(self._group_id) + + if self._offsets is not None: + for (tp, prev_offset) in self._offsets.items(): + new_offset = new_offsets[tp] + + assert new_offset.offset >= prev_offset.offset + + if new_offset.offset > prev_offset.offset: + at_least_one_gt = True + + print(f"************\n{self._offsets}\n{new_offsets}\n********", flush=True) + assert at_least_one_gt + + self._offsets = new_offsets + + return x + + def _build_single(self, builder: srf.Builder, input_stream): + node = builder.make_node(self.unique_name, self._offset_checker) + builder.make_edge(input_stream[0], node) + + return node, input_stream[1] + + +@pytest.mark.kafka +@pytest.mark.slow +@pytest.mark.parametrize('num_records', [10, 100, 1000]) +def test_kafka_source_commit(num_records, + tmp_path, + config, + kafka_bootstrap_servers: str, + kafka_topics: typing.Tuple[str, str]) -> None: + + input_file = os.path.join(tmp_path, "input_data.json") + with open(input_file, 'w') as fh: + for i in range(num_records): + fh.write("{}\n".format(json.dumps({'v': i}))) + + num_written = write_file_to_kafka(kafka_bootstrap_servers, kafka_topics.input_topic, input_file) + assert num_written == num_records + + out_file = os.path.join(tmp_path, 'results.jsonlines') + + pipe = LinearPipeline(config) + pipe.set_source( + KafkaSourceStage(config, + bootstrap_servers=kafka_bootstrap_servers, + input_topic=kafka_topics.input_topic, + auto_offset_reset="earliest", + poll_interval="1seconds", + group_id='morpheus', + client_id='morpheus_kafka_source_commit', + stop_after=num_records, + async_commits=False)) + + pipe.add_stage(OffsetChecker(config, bootstrap_servers=kafka_bootstrap_servers, group_id='morpheus')) + pipe.add_stage(TriggerStage(config)) + + pipe.add_stage(DeserializeStage(config)) + pipe.add_stage(SerializeStage(config)) + pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) + pipe.run() + + assert os.path.exists(out_file) + + input_data = read_file_to_df(input_file, file_type=FileTypes.Auto).values + output_data = read_file_to_df(out_file, file_type=FileTypes.Auto).values + + assert len(input_data) == num_records + assert len(output_data) == num_records + + # Somehow 0.7 ends up being 0.7000000000000001 + input_data = np.around(input_data, 2) + output_data = np.around(output_data, 2) + + assert output_data.tolist() == input_data.tolist()