Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix use incorrect start sn after recovery #650

Merged
merged 1 commit into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Common/ProtonCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ namespace DB
{
namespace ProtonConsts
{
constexpr int64_t LogStartSN = 0;

/// Reserved column names / aliases for streaming processing
const String STREAMING_WINDOW_START = "window_start";
const String STREAMING_WINDOW_END = "window_end";
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Streaming/ISource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void ISource::recover(CheckpointContextPtr ckpt_ctx_)
last_checkpointed_sn = lastProcessedSN();

/// Reset consume offset started from the next of last checkpointed sn (if not manually reset before recovery)
if (!reseted_start_sn.has_value())
if (!reseted_start_sn.has_value() && last_checkpointed_sn >= 0)
doResetStartSN(last_checkpointed_sn + 1);
}

Expand Down
32 changes: 20 additions & 12 deletions src/Storages/ExternalStream/Kafka/KafkaSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ KafkaSource::KafkaSource(
, query_context(std::move(query_context_))
, logger(&Poco::Logger::get(fmt::format("{}.{}", kafka.getLoggerName(), consumer->name())))
{
if (offset > 0)
ckpt_data.last_sn = offset - 1;

assert(external_stream_counter);

if (auto batch_count = query_context->getSettingsRef().record_consume_batch_count; batch_count != 0)
Expand All @@ -67,7 +70,7 @@ KafkaSource::KafkaSource(
assert((physical_header.columns() == 1 && !format_executor) || format_executor);

header_chunk = Chunk(header.getColumns(), 0);
iter = result_chunks.begin();
iter = result_chunks_with_sns.begin();
}

KafkaSource::~KafkaSource()
Expand All @@ -91,33 +94,37 @@ Chunk KafkaSource::generate()
consume_started = true;
}

if (result_chunks.empty() || iter == result_chunks.end())
if (result_chunks_with_sns.empty() || iter == result_chunks_with_sns.end())
{
readAndProcess();

if (isCancelled())
return {};

/// After processing blocks, check again to see if there are new results
if (result_chunks.empty() || iter == result_chunks.end())
if (result_chunks_with_sns.empty() || iter == result_chunks_with_sns.end())
/// Act as a heart beat
return header_chunk.clone();

/// result_blocks is not empty, fallthrough
}

return std::move(*iter++);
ckpt_data.last_sn = iter->second;
return std::move((iter++)->first);
}

void KafkaSource::readAndProcess()
{
result_chunks.clear();
result_chunks_with_sns.clear();
current_batch.clear();
current_batch.reserve(header.columns());

auto callback = [this](void * rkmessage, size_t total_count, void * data)
Int64 current_batch_last_sn = -1;
auto callback = [&current_batch_last_sn, this](void * rkmessage, size_t total_count, void * data)
{
parseMessage(rkmessage, total_count, data);
auto current_offset = parseMessage(rkmessage, total_count, data);
if (current_offset.has_value()) [[likely]]
current_batch_last_sn = *current_offset;
};

auto error_callback = [this](rd_kafka_resp_err_t err)
Expand All @@ -131,22 +138,23 @@ void KafkaSource::readAndProcess()
if (!current_batch.empty())
{
auto rows = current_batch[0]->size();
result_chunks.emplace_back(std::move(current_batch), rows);
assert(current_batch_last_sn >= 0);
result_chunks_with_sns.emplace_back(Chunk{std::move(current_batch), rows}, current_batch_last_sn);
}

iter = result_chunks.begin();
iter = result_chunks_with_sns.begin();
}

void KafkaSource::parseMessage(void * rkmessage, size_t /*total_count*/, void * /*data*/)
std::optional<Int64> KafkaSource::parseMessage(void * rkmessage, size_t /*total_count*/, void * /*data*/)
{
auto * message = static_cast<rd_kafka_message_t *>(rkmessage);

if (unlikely(message->offset < offset))
/// Ignore the message which has lower offset than what clients like to have
return;
return {};

parseFormat(message);
ckpt_data.last_sn = message->offset;
return message->offset;
}

void KafkaSource::parseFormat(const rd_kafka_message_t * kmessage)
Expand Down
7 changes: 4 additions & 3 deletions src/Storages/ExternalStream/Kafka/KafkaSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class KafkaSource final : public Streaming::ISource
void calculateColumnPositions();
void initFormatExecutor();

void parseMessage(void * kmessage, size_t total_count, void * data);
/// \brief Parse a Kafka message and return the offset of current message. (return nullopt if it is not a valid message)
std::optional<Int64> parseMessage(void * kmessage, size_t total_count, void * data);
void parseFormat(const rd_kafka_message_s * kmessage);

inline void readAndProcess();
Expand Down Expand Up @@ -74,8 +75,8 @@ class KafkaSource final : public Streaming::ISource
bool request_virtual_columns = false;

std::optional<String> format_error;
std::vector<Chunk> result_chunks;
std::vector<Chunk>::iterator iter;
std::vector<std::pair<Chunk, Int64>> result_chunks_with_sns;
std::vector<std::pair<Chunk, Int64>>::iterator iter;
MutableColumns current_batch;

UInt32 record_consume_batch_count = 1000;
Expand Down
5 changes: 3 additions & 2 deletions src/Storages/Streaming/StreamingStoreSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <Interpreters/inplaceBlockConversions.h>
#include <KafkaLog/KafkaWALPool.h>
#include <Common/ProtonCommon.h>
#include <Common/logger_useful.h>

namespace DB
Expand All @@ -18,7 +19,7 @@ StreamingStoreSource::StreamingStoreSource(
Poco::Logger * log_)
: StreamingStoreSourceBase(header, storage_snapshot_, std::move(context_), log_, ProcessorID::StreamingStoreSourceID)
{
if (sn > 0)
if (sn >= ProtonConsts::LogStartSN)
last_sn = sn - 1;

const auto & settings = query_context->getSettingsRef();
Expand Down Expand Up @@ -144,7 +145,7 @@ std::pair<String, Int32> StreamingStoreSource::getStreamShard() const

void StreamingStoreSource::doResetStartSN(Int64 sn)
{
if (sn >= 0)
if (sn >= ProtonConsts::LogStartSN)
{
if (nativelog_reader)
nativelog_reader->resetSequenceNumber(sn);
Expand Down
59 changes: 59 additions & 0 deletions tests/stream/test_stream_smoke/0098_fixed_issues2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,62 @@ tests:
- query_id: fixed-issues2-2-1
expected_results:
- ['1']

- id: 1
tags:
- query state
name: issue 646
description: recover the query with incorrect start sn (0) if there is no checkpointed sn (-1)
steps:
- statements:
- client: python
query_type: table
query: drop stream if exists fixed_issues2_stream;

- client: python
query_type: table
exists: fixed_issues2_stream
exists_wait: 2
wait: 1
query: create stream fixed_issues2_stream(i int);

- client: python
query_type: table
depends_on_stream: fixed_issues2_stream
wait: 1
query: insert into fixed_issues2_stream(i) values(1)(2)(3);

- client: python
query_id: fixed-issues2-1
query_type: stream
wait: 2
query: subscribe to select count() from fixed_issues2_stream emit periodic 1s;

- client: python
query_type: type
depends_on: fixed-issues2-1
wait: 3
query: kill query where query_id='fixed-issues2-1';

- client: python
query_id: fixed-issues2-1-1
query_type: stream
wait: 1
query: recover from 'fixed-issues2-1';

- client: python
query_type: table
depends_on: fixed-issues2-1
query: insert into fixed_issues2_stream(i) values(4);

- client: python
query_type: table
wait: 3
query: unsubscribe to 'fixed-issues2-1';

expected_results:
- query_id: fixed-issues2-1
expected_results: []
- query_id: fixed-issues2-1-1
expected_results:
- [1]
Loading