Skip to content

Commit

Permalink
prometheus metrics for external stream sink (#485)
Browse files Browse the repository at this point in the history
Co-authored-by: haohang <113408135+yokofly@users.noreply.github.com>
  • Loading branch information
qijun-niu-timeplus and yokofly authored Jan 16, 2024
1 parent c64ec3d commit ab22690
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 3 deletions.
12 changes: 12 additions & 0 deletions src/Storages/ExternalStream/ExternalStreamCounter.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,36 @@ class ExternalStreamCounter
inline uint64_t getReadBytes() const { return read_bytes.load(); }
inline uint64_t getReadCounts() const { return read_counts.load(); }
inline uint64_t getReadFailed() const { return read_failed.load(); }
inline uint64_t getWriteBytes() const { return write_bytes.load(); }
inline uint64_t getWriteCounts() const { return write_counts.load(); }
inline uint64_t getWriteFailed() const { return write_failed.load(); }

inline void addToReadBytes(uint64_t bytes) { read_bytes.fetch_add(bytes); }
inline void addToReadCounts(uint64_t counts) { read_counts.fetch_add(counts); }
inline void addToReadFailed(uint64_t amount) { read_failed.fetch_add(amount); }
inline void addToWriteBytes(uint64_t bytes) { write_bytes.fetch_add(bytes); }
inline void addToWriteCounts(uint64_t counts) { write_counts.fetch_add(counts); }
inline void addToWriteFailed(uint64_t amount) { write_failed.fetch_add(amount); }

std::map<String, uint64_t> getCounters() const
{
return {
{"ReadBytes", read_bytes.load()},
{"ReadCounts", read_counts.load()},
{"ReadFailed", read_failed.load()},
{"WriteBytes", write_bytes.load()},
{"WriteCounts", write_counts.load()},
{"WriteFailed", write_failed.load()},
};
}

private:
std::atomic<uint64_t> read_bytes;
std::atomic<uint64_t> read_counts;
std::atomic<uint64_t> read_failed;
std::atomic<uint64_t> write_bytes;
std::atomic<uint64_t> write_counts;
std::atomic<uint64_t> write_failed;
};

using ExternalStreamCounterPtr = std::shared_ptr<ExternalStreamCounter>;
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/ExternalStream/Kafka/Kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ SinkToStoragePtr Kafka::write(const ASTPtr & /*query*/, const StorageMetadataPtr
{
/// always validate before actual use
validate();
return std::make_shared<KafkaSink>(this, metadata_snapshot->getSampleBlock(), shards, message_key_ast, context, logger);
return std::make_shared<KafkaSink>(
this, metadata_snapshot->getSampleBlock(), shards, message_key_ast, context, logger, external_stream_counter);
}
}
21 changes: 20 additions & 1 deletion src/Storages/ExternalStream/Kafka/KafkaSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,19 @@ IColumn::Selector ChunkSharder::createSelector(Block block, Int32 shard_cnt) con
}
}

KafkaSink::KafkaSink(const Kafka * kafka, const Block & header, Int32 initial_partition_cnt, const ASTPtr & message_key_ast, ContextPtr context, Poco::Logger * logger_)
KafkaSink::KafkaSink(
const Kafka * kafka,
const Block & header,
Int32 initial_partition_cnt,
const ASTPtr & message_key_ast,
ContextPtr context,
Poco::Logger * logger_,
ExternalStreamCounterPtr external_stream_counter_)
: SinkToStorage(header, ProcessorID::ExternalTableDataSinkID)
, partition_cnt(initial_partition_cnt)
, one_message_per_row(kafka->produceOneMessagePerRow())
, logger(logger_)
, external_stream_counter(external_stream_counter_)
{
/// default values
std::vector<std::pair<String, String>> producer_params{
Expand Down Expand Up @@ -292,6 +300,7 @@ void KafkaSink::consume(Chunk chunk)
if (!chunk.hasRows())
return;

auto total_rows = chunk.rows();
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
auto blocks = partitioner->shard(std::move(block), partition_cnt);

Expand Down Expand Up @@ -371,10 +380,18 @@ void KafkaSink::consume(Chunk chunk)

rd_kafka_resp_err_t err {RD_KAFKA_RESP_ERR_NO_ERROR};
for (size_t i = 0; i < current_batch.size(); ++i)
{
if (current_batch[i].err)
{
err = current_batch[i].err;
external_stream_counter->addToWriteFailed(1);
}
else
{
batch_payload[i].release(); /// payload of messages which are succesfully handled by rd_kafka_produce_batch will be free'ed by librdkafka
external_stream_counter->addToWriteBytes(current_batch[i].len);
}
}

/// Clean up all the bookkeepings for the batch.
std::vector<rd_kafka_message_t> batch;
Expand All @@ -391,6 +408,8 @@ void KafkaSink::consume(Chunk chunk)

if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
throw Exception(klog::mapErrorCode(err), rd_kafka_err2str(err));
else
external_stream_counter->addToWriteCounts(total_rows);
}

void KafkaSink::onFinish()
Expand Down
11 changes: 10 additions & 1 deletion src/Storages/ExternalStream/Kafka/KafkaSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/BlockWithShard.h>
#include <Formats/FormatFactory.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/ExternalStream/ExternalStreamCounter.h>
#include <Storages/ExternalStream/Kafka/Kafka.h>
#include <Storages/ExternalStream/Kafka/WriteBufferFromKafkaSink.h>
#include <Common/ThreadPool.h>
Expand Down Expand Up @@ -46,7 +47,14 @@ class ChunkSharder
class KafkaSink final : public SinkToStorage
{
public:
KafkaSink(const Kafka * kafka, const Block & header, Int32 initial_partition_cnt, const ASTPtr & message_key, ContextPtr context, Poco::Logger * logger_);
KafkaSink(
const Kafka * kafka,
const Block & header,
Int32 initial_partition_cnt,
const ASTPtr & message_key,
ContextPtr context,
Poco::Logger * logger_,
ExternalStreamCounterPtr external_stream_counter_);
~KafkaSink() override;

String getName() const override { return "KafkaSink"; }
Expand Down Expand Up @@ -113,5 +121,6 @@ class KafkaSink final : public SinkToStorage
State state;

Poco::Logger * logger;
ExternalStreamCounterPtr external_stream_counter;
};
}

0 comments on commit ab22690

Please sign in to comment.