Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Python impl for KafkaSourceStage #300

Merged
33 commits merged into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8f18e47
Simplify Python impl of KafkaSourceStage
dagardner-nv Jul 22, 2022
a230e78
Handle mal-formed payloads
dagardner-nv Jul 22, 2022
faf0375
Re-enable cpp impl
dagardner-nv Jul 22, 2022
3e5e87f
Remove unused imports
dagardner-nv Jul 22, 2022
c77fc4c
Merge branch 'branch-22.11' into david-from-kafka-py
dagardner-nv Oct 6, 2022
b1939b6
Fix merge error
dagardner-nv Oct 6, 2022
18cd021
Update kafka tests
dagardner-nv Oct 6, 2022
044a941
Implement stop_after functionality
dagardner-nv Oct 6, 2022
3904f0f
Set client_ids and don't disable commits
dagardner-nv Oct 6, 2022
d560490
Use auto-commit when disable_commit is true
dagardner-nv Oct 6, 2022
5934838
Remove call to seek
dagardner-nv Oct 6, 2022
511fb6d
Bump SRF version
dagardner-nv Oct 12, 2022
dca8d2c
Merge branch 'david-srf-22.11' into david-from-kafka-py
dagardner-nv Oct 12, 2022
761c74f
wip
dagardner-nv Oct 12, 2022
3acec6e
Merge branch 'branch-22.11' into david-from-kafka-py
dagardner-nv Oct 12, 2022
7668eb1
Merge branch 'branch-22.11' into david-from-kafka-py
dagardner-nv Oct 21, 2022
3ce2bf4
wip
dagardner-nv Oct 21, 2022
5d7d03c
Restructure so we are only calling self._process_batch from a single …
dagardner-nv Oct 21, 2022
3892912
Merge branch 'branch-22.11' into david-from-kafka-py
dagardner-nv Oct 21, 2022
59d2d2c
Remove unused import
dagardner-nv Oct 24, 2022
4f5c028
Merge branch 'branch-22.11' into david-from-kafka-py
dagardner-nv Oct 24, 2022
9139fef
Optionally commit messages synchronously, makes testing easier
dagardner-nv Oct 24, 2022
197adb7
Add unittest to verify kafka messages are being committed prior to be…
dagardner-nv Oct 24, 2022
738082b
Merge branch 'branch-22.11' into david-from-kafka-py
dagardner-nv Oct 27, 2022
289beba
Merge branch 'branch-22.11' into david-from-kafka-py
dagardner-nv Oct 31, 2022
19cc38d
Merge branch 'branch-22.11' into david-from-kafka-py
dagardner-nv Nov 1, 2022
6b5296f
Replace dep on cudf_kafka with librdkafka
dagardner-nv Nov 3, 2022
4bd3955
Add explicit dep on python-confluent-kafka which previously was pulle…
dagardner-nv Nov 3, 2022
6b66c4e
Merge branch 'branch-22.11' into david-from-kafka-py
dagardner-nv Nov 7, 2022
a0b8161
Merge branch 'branch-22.11' into david-from-kafka-py
dagardner-nv Nov 7, 2022
e9f0267
Merge branch 'branch-22.11' into david-from-kafka-py
mdemoret-nv Nov 11, 2022
580b69c
Merge branch 'branch-22.11' into david-from-kafka-py
dagardner-nv Nov 14, 2022
499e4f5
Remove out of date comments
dagardner-nv Nov 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docker/conda/environments/cuda11.5_dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ dependencies:
- cuda-python<=11.7.0 # Remove when Issue #251 is closed
- cudatoolkit=11.5
- cudf 22.08
- cudf_kafka 22.08.*
- cupy=9.5.0
- cython=0.29.24
- datacompy=0.8
Expand All @@ -57,6 +56,7 @@ dependencies:
- gxx_linux-64=9.4
- include-what-you-use=0.18
- isort
- librdkafka=1.7.0
- mlflow>=1.23
- myst-parser==0.17
- networkx=2.8
Expand All @@ -74,6 +74,7 @@ dependencies:
- pytest
- pytest-benchmark>=4.0
- pytest-cov
- python-confluent-kafka=1.7.0
- python-graphviz
- python=3.8
- rapidjson=1.1.0
Expand Down
7 changes: 5 additions & 2 deletions morpheus/_lib/include/morpheus/stages/kafka_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class KafkaSourceStage : public srf::pysrf::PythonSource<std::shared_ptr<Message
std::map<std::string, std::string> config,
bool disable_commit = false,
bool disable_pre_filtering = false,
size_t stop_after = 0);
size_t stop_after = 0,
bool async_commits = true);

~KafkaSourceStage() override = default;

Expand Down Expand Up @@ -107,6 +108,7 @@ class KafkaSourceStage : public srf::pysrf::PythonSource<std::shared_ptr<Message
bool m_disable_commit{false};
bool m_disable_pre_filtering{false};
bool m_requires_commit{false}; // Whether or not manual committing is required
bool m_async_commits{true};
size_t m_stop_after{0};

void *m_rebalancer;
Expand All @@ -129,7 +131,8 @@ struct KafkaSourceStageInterfaceProxy
std::map<std::string, std::string> config,
bool disable_commits,
bool disable_pre_filtering,
size_t stop_after = 0);
size_t stop_after = 0,
bool async_commits = true);
};
#pragma GCC visibility pop
} // namespace morpheus
3 changes: 2 additions & 1 deletion morpheus/_lib/src/python_modules/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ PYBIND11_MODULE(stages, m)
py::arg("config"),
py::arg("disable_commits") = false,
py::arg("disable_pre_filtering") = false,
py::arg("stop_after") = 0);
py::arg("stop_after") = 0,
py::arg("async_commits") = true);

py::class_<srf::segment::Object<PreprocessFILStage>,
srf::segment::ObjectProperties,
Expand Down
29 changes: 23 additions & 6 deletions morpheus/_lib/src/stages/kafka_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,17 @@ KafkaSourceStage::KafkaSourceStage(std::size_t max_batch_size,
std::map<std::string, std::string> config,
bool disable_commit,
bool disable_pre_filtering,
size_t stop_after) :
size_t stop_after,
bool async_commits) :
PythonSource(build()),
m_max_batch_size(max_batch_size),
m_topic(std::move(topic)),
m_batch_timeout_ms(batch_timeout_ms),
m_config(std::move(config)),
m_disable_commit(disable_commit),
m_disable_pre_filtering(disable_pre_filtering),
m_stop_after{stop_after}
m_stop_after{stop_after},
m_async_commits(async_commits)
{}

KafkaSourceStage::subscriber_fn_t KafkaSourceStage::build()
Expand Down Expand Up @@ -334,7 +336,14 @@ KafkaSourceStage::subscriber_fn_t KafkaSourceStage::build()

if (should_commit)
{
CHECK_KAFKA(consumer->commitAsync(), RdKafka::ERR_NO_ERROR, "Error during commitAsync");
if (m_async_commits)
{
CHECK_KAFKA(consumer->commitAsync(), RdKafka::ERR_NO_ERROR, "Error during commitAsync");
}
else
{
CHECK_KAFKA(consumer->commitSync(), RdKafka::ERR_NO_ERROR, "Error during commit");
}
}
}

Expand Down Expand Up @@ -571,10 +580,18 @@ std::shared_ptr<srf::segment::Object<KafkaSourceStage>> KafkaSourceStageInterfac
std::map<std::string, std::string> config,
bool disable_commits,
bool disable_pre_filtering,
size_t stop_after)
size_t stop_after,
bool async_commits)
{
auto stage = builder.construct_object<KafkaSourceStage>(
name, max_batch_size, topic, batch_timeout_ms, config, disable_commits, disable_pre_filtering, stop_after);
auto stage = builder.construct_object<KafkaSourceStage>(name,
max_batch_size,
topic,
batch_timeout_ms,
config,
disable_commits,
disable_pre_filtering,
stop_after,
async_commits);

return stage;
}
Expand Down
Loading