Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown Kafka consumers and producer when the external stream is stopped #645

Merged
merged 5 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@
M(2534, UDA_NOT_APPLICABLE) \
M(2600, CANNOT_WRITE_TO_KAFKA) \
M(2601, NO_AVAILABLE_KAFKA_CONSUMER) \
M(2602, NO_AVAILABLE_KAFKA_PRODUCER) \
M(2610, FORMAT_SCHEMA_ALREADY_EXISTS) \
M(2611, UNKNOWN_FORMAT_SCHEMA) \
M(2612, AMBIGUOUS_FORMAT_SCHEMA) \
Expand Down
6 changes: 3 additions & 3 deletions src/Common/PoolBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,15 @@ class PoolBase : private boost::noncopyable
/** The maximum size of the pool. */
unsigned max_items;

/** Pool. */
Objects items;

/** Lock to access the pool. */
std::mutex mutex;
std::condition_variable available;

protected:

/** Pool. */
Objects items; /// proton: updated

Poco::Logger * log;

PoolBase(unsigned max_items_, Poco::Logger * log_)
Expand Down
6 changes: 5 additions & 1 deletion src/Storages/ExternalStream/Kafka/Consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace DB

namespace ErrorCodes
{
extern const int NO_AVAILABLE_KAFKA_CONSUMER;
extern const int RESOURCE_NOT_FOUND;
}

Expand Down Expand Up @@ -37,7 +38,7 @@ void Consumer::backgroundPoll(UInt64 poll_timeout_ms) const
{
LOG_INFO(logger, "Start consumer poll");

while (!stopped.test())
while (!stopped)
rd_kafka_poll(rk.get(), poll_timeout_ms);

LOG_INFO(logger, "Consumer poll stopped");
Expand Down Expand Up @@ -70,6 +71,9 @@ void Consumer::stopConsume(Topic & topic, Int32 parition)

void Consumer::consumeBatch(Topic & topic, Int32 partition, uint32_t count, int32_t timeout_ms, Consumer::Callback callback, ErrorCallback error_callback) const
{
if (stopped)
throw Exception(ErrorCodes::NO_AVAILABLE_KAFKA_CONSUMER, "Cannot consume from stopped consummer");
zliang-min marked this conversation as resolved.
Show resolved Hide resolved

std::unique_ptr<rd_kafka_message_t *, decltype(free) *> rkmessages
{
static_cast<rd_kafka_message_t **>(malloc(sizeof(rd_kafka_message_t *) * count)), free
Expand Down
9 changes: 4 additions & 5 deletions src/Storages/ExternalStream/Kafka/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ class Consumer : boost::noncopyable
{
public:
Consumer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, const String & logger_name_prefix);
~Consumer()
{
stopped.test_and_set();
}
~Consumer() { shutdown(); }

rd_kafka_t * getHandle() const { return rk.get(); }

Expand All @@ -33,14 +30,16 @@ class Consumer : boost::noncopyable

void consumeBatch(Topic & topic, Int32 partition, uint32_t count, int32_t timeout_ms, Callback callback, ErrorCallback error_callback) const;

void shutdown() { stopped = true; }

std::string name() const { return rd_kafka_name(rk.get()); }

private:
void backgroundPoll(UInt64 poll_timeout_ms) const;

klog::KafkaPtr rk {nullptr, rd_kafka_destroy};
ThreadPool poller;
std::atomic_flag stopped;
bool stopped {false};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiple thread unsafe ? one thread call shutdown, another thread call consumeBatch

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears there is still race

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, an atomic is not needed here, there won't be a race.

So, the stopped flag is to for telling the Consumer (and the Producer in the below case) that it's already stopped. And, when we set stopped to true, we don't do anything else ( like, we don't detroy anything ), so there is nothing to guard with it, i.e. there is no such logic in our code:

if (!stopped)
{
    stopped = true;
    /// do something
}
else
{
    /// do something else
}

So there is no race.

The purpose is to to let operations on the Consumer fail, but it does not have to fail immediately. Take your case as an example, if one thread call shutdown, and another call consumeBatch, it's fine, it does not care which one wins, even consumeBatch wins, nothing bad will happen, and next time when consumeBatch is called, it will fail as expected, and then, KafkaSource will stopped reading more data. And also, we don't even need to guard shutdown, it can be called multiple times, no side effects.

Please let me know if this does not make sense to you.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless it does not have any impact in current implementation, I reverted it back, and now use std::atomic_flag.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race i meant it not the stopped data member but at high level..

  1. Client call isStopped(), returns false, context switch to 2)
  2. Consumer then stopped, switch back to 1)
  3. Client already checked consumer is not stopped, but it actually is, continue operating on stopped consumer ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for producer and we run into this problem before

Poco::Logger * logger;
};

Expand Down
3 changes: 3 additions & 0 deletions src/Storages/ExternalStream/Kafka/ConsumerPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class ConsumerPool : public IConsumerPool, private PoolBase<Consumer>
if (stopped.test_and_set())
return;

for (const auto & pooled_consumer : items)
pooled_consumer->object->shutdown();

LOG_INFO(log, "Shutting down consumer pool, waiting for all consumers to be freed");
waitForNoMoreInUse();
LOG_INFO(log, "All consumers are freed");
Expand Down
20 changes: 10 additions & 10 deletions src/Storages/ExternalStream/Kafka/Kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,36 +497,36 @@ Pipe Kafka::read(
return pipe;
}

RdKafka::Producer & Kafka::getProducer()
std::shared_ptr<RdKafka::Producer> Kafka::getProducer()
{
if (producer)
return *producer;
return producer;

std::scoped_lock lock(producer_mutex);
/// Check again in case of losing the race
if (producer)
return *producer;
return producer;

auto producer_ptr = std::make_unique<RdKafka::Producer>(*conf, settings->poll_waittime_ms.value, getLoggerName());
auto producer_ptr = std::make_shared<RdKafka::Producer>(*conf, settings->poll_waittime_ms.value, getLoggerName());
producer.swap(producer_ptr);

return *producer;
return producer;
}

RdKafka::Topic & Kafka::getProducerTopic()
std::shared_ptr<RdKafka::Topic> Kafka::getProducerTopic()
{
if (producer_topic)
return *producer_topic;
return producer_topic;

std::scoped_lock lock(producer_mutex);
/// Check again in case of losing the race
if (producer_topic)
return *producer_topic;
return producer_topic;

auto topic_ptr = std::make_unique<RdKafka::Topic>(*getProducer().getHandle(), topicName());
auto topic_ptr = std::make_shared<RdKafka::Topic>(*getProducer()->getHandle(), topicName());
producer_topic.swap(topic_ptr);

return *producer_topic;
return producer_topic;
}

SinkToStoragePtr Kafka::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context)
Expand Down
24 changes: 18 additions & 6 deletions src/Storages/ExternalStream/Kafka/Kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,26 @@ class Kafka final : public StorageExternalStreamImpl
void startup() override { LOG_INFO(logger, "Starting Kafka External Stream"); }
void shutdown() override {
LOG_INFO(logger, "Shutting down Kafka External Stream");

consumer_pool->shutdown();
if (producer)
producer->shutdown();

/// Must release all resources here rather than relying on the deconstructor.
/// Because the `Kafka` instance will not be destroyed immediately when the external stream gets dropped.
consumer_pool.reset();
if (producer_topic)
producer_topic.reset();
{
// producer_topic.reset();
std::shared_ptr<RdKafka::Topic> empty_topic_ptr;
producer_topic.swap(empty_topic_ptr);
}
if (producer)
producer.reset();
{
// producer.reset();
std::shared_ptr<RdKafka::Producer> empty_producer_ptr;
producer.swap(empty_producer_ptr);
}
tryRemoveTempDir(logger);
}
bool supportsSubcolumns() const override { return true; }
Expand All @@ -76,8 +88,8 @@ class Kafka final : public StorageExternalStreamImpl
const ASTPtr & shardingExprAst() const { assert(!engine_args.empty()); return engine_args[0]; }
bool hasCustomShardingExpr() const;

RdKafka::Producer & getProducer();
RdKafka::Topic & getProducerTopic();
std::shared_ptr<RdKafka::Producer> getProducer();
std::shared_ptr<RdKafka::Topic> getProducerTopic();

RdKafka::ConsumerPool::Entry getConsumer() const
{
Expand Down Expand Up @@ -112,8 +124,8 @@ class Kafka final : public StorageExternalStreamImpl
ConfPtr conf;
/// The Producer instance and Topic instance can be used by multiple sinks at the same time, thus we only need one of each.
std::mutex producer_mutex;
std::unique_ptr<RdKafka::Producer> producer;
std::unique_ptr<RdKafka::Topic> producer_topic;
std::shared_ptr<RdKafka::Producer> producer;
std::shared_ptr<RdKafka::Topic> producer_topic;
/// A Consumer can only be used by one source at the same time (technically speaking, it can be used by multple sources as long as each source read from a different topic,
/// but we will leave this as an enhancement later, probably when we introduce the `Connection` concept), thus we need a consumer pool.
RdKafka::ConsumerPoolPtr consumer_pool;
Expand Down
20 changes: 12 additions & 8 deletions src/Storages/ExternalStream/Kafka/KafkaSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_WRITE_TO_KAFKA;
extern const int TYPE_MISMATCH;
extern const int INVALID_SETTING_VALUE;
extern const int NO_AVAILABLE_KAFKA_PRODUCER;
extern const int TYPE_MISMATCH;
}

namespace
Expand Down Expand Up @@ -131,11 +132,11 @@ KafkaSink::KafkaSink(
: SinkToStorage(header, ProcessorID::ExternalTableDataSinkID)
, producer(kafka.getProducer())
, topic(kafka.getProducerTopic())
, partition_cnt(topic.getPartitionCount())
, partition_cnt(topic->getPartitionCount())
, one_message_per_row(kafka.produceOneMessagePerRow())
, topic_refresh_interval_ms(kafka.topicRefreshIntervalMs())
, external_stream_counter(external_stream_counter_)
, logger(&Poco::Logger::get(fmt::format("{}.{}", kafka.getLoggerName(), producer.name())))
, logger(&Poco::Logger::get(fmt::format("{}.{}", kafka.getLoggerName(), producer->name())))
{
wb = std::make_unique<WriteBufferFromKafkaSink>([this](char * pos, size_t len) { addMessageToBatch(pos, len); });

Expand Down Expand Up @@ -178,7 +179,7 @@ KafkaSink::KafkaSink(
auto metadata_refresh_stopwatch = Stopwatch();
/// Use a small sleep interval to avoid blocking operation for a long just (in case refresh_interval_ms is big).
auto sleep_ms = std::min(UInt64(500), refresh_interval_ms);
while (!is_finished.test())
while (!producer->isStopped() && !is_finished.test())
{
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
/// Fetch topic metadata for partition updates
Expand All @@ -189,7 +190,7 @@ KafkaSink::KafkaSink(

try
{
partition_cnt = topic.getPartitionCount();
partition_cnt = topic->getPartitionCount();
}
catch (...) /// do not break the loop until finished
{
Expand Down Expand Up @@ -227,6 +228,9 @@ void KafkaSink::consume(Chunk chunk)
if (!chunk.hasRows())
return;

if (producer->isStopped())
throw Exception(ErrorCodes::NO_AVAILABLE_KAFKA_PRODUCER, "Cannot produce messages to a stopped producer");

auto total_rows = chunk.rows();
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
auto blocks = partitioner->shard(std::move(block), partition_cnt);
Expand Down Expand Up @@ -299,7 +303,7 @@ void KafkaSink::consume(Chunk chunk)

/// With `wb->setAutoFlush()`, it makes sure that all messages are generated for the chunk at this point.
rd_kafka_produce_batch(
topic.getHandle(),
topic->getHandle(),
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_FREE | RD_KAFKA_MSG_F_PARTITION | RD_KAFKA_MSG_F_BLOCK,
current_batch.data(),
Expand Down Expand Up @@ -355,7 +359,7 @@ void KafkaSink::onFinish()
/// Make sure all outstanding requests are transmitted and handled.
/// It should not block for ever here, otherwise, it will block proton from stopping the job
/// or block proton from terminating.
if (auto err = rd_kafka_flush(producer.getHandle(), 15000 /* time_ms */); err)
if (auto err = rd_kafka_flush(producer->getHandle(), 15000 /* time_ms */); err)
LOG_ERROR(logger, "Failed to flush kafka producer, error={}", rd_kafka_err2str(err));

if (auto err = lastSeenError(); err != RD_KAFKA_RESP_ERR_NO_ERROR)
Expand Down Expand Up @@ -405,7 +409,7 @@ void KafkaSink::checkpoint(CheckpointContextPtr context)
if (is_finished.test())
{
/// for a final check, it should not wait for too long
if (auto err = rd_kafka_flush(producer.getHandle(), 15000 /* time_ms */); err)
if (auto err = rd_kafka_flush(producer->getHandle(), 15000 /* time_ms */); err)
throw Exception(klog::mapErrorCode(err), "Failed to flush kafka producer, error={}", rd_kafka_err2str(err));

if (auto err = lastSeenError(); err != RD_KAFKA_RESP_ERR_NO_ERROR)
Expand Down
5 changes: 2 additions & 3 deletions src/Storages/ExternalStream/Kafka/KafkaSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ class KafkaSink final : public SinkToStorage
/// for all out-go messages, regardless if a message is successfully delivered or not)
size_t outstandingMessages() const noexcept { return state.outstandings - (state.acked + state.error_count); }

RdKafka::Producer & producer;
RdKafka::Topic & topic;
// std::unique_ptr<RdKafka::Topic> topic;
std::shared_ptr<RdKafka::Producer> producer;
std::shared_ptr<RdKafka::Topic> topic;

Int32 partition_cnt {0};
bool one_message_per_row {false};
Expand Down
3 changes: 1 addition & 2 deletions src/Storages/ExternalStream/Kafka/Producer.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#include <Common/logger_useful.h>
#include <Storages/ExternalStream/Kafka/Producer.h>

namespace DB
Expand Down Expand Up @@ -37,7 +36,7 @@ void Producer::backgroundPoll(UInt64 poll_timeout_ms) const
{
LOG_INFO(logger, "Start producer poll");

while (!stopped.test())
while (!stopped)
rd_kafka_poll(rk.get(), poll_timeout_ms);

LOG_INFO(logger, "Producer poll stopped");
Expand Down
11 changes: 6 additions & 5 deletions src/Storages/ExternalStream/Kafka/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@ class Producer : boost::noncopyable
{
public:
Producer(const rd_kafka_conf_t & rk_conf, UInt64 poll_timeout_ms, const String & logger_name_prefix);
~Producer()
{
stopped.test_and_set();
}
~Producer() { shutdown(); }

rd_kafka_t * getHandle() const { return rk.get(); }

std::string name() const { return rd_kafka_name(rk.get()); }

void shutdown() { stopped = true; }

bool isStopped() const { return stopped; }

private:
void backgroundPoll(UInt64 poll_timeout_ms) const;

klog::KafkaPtr rk {nullptr, rd_kafka_destroy};
ThreadPool poller;
std::atomic_flag stopped;
bool stopped;
zliang-min marked this conversation as resolved.
Show resolved Hide resolved
Poco::Logger * logger;
};

Expand Down
Loading