Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix potential data loss for shared source #19443

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions e2e_test/source_inline/cdc/mysql/mysql_create_drop.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ create source s with (

sleep 2s

# At the beginning, the source is paused. It will resume after a downstream is created.
system ok
internal_table.mjs --name s --type '' --count
----
count: 0


statement ok
create table tt1_shared (v1 int,
Expand Down
69 changes: 50 additions & 19 deletions e2e_test/source_inline/kafka/shared_source.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,17 @@ select count(*) from rw_internal_tables where name like '%s0%';

sleep 1s

# SourceExecutor's ingestion does not start (state table is empty), even after sleep
statement ok
flush;

# SourceExecutor's starts from latest.
system ok
internal_table.mjs --name s0 --type source
----
(empty)
0,"{""split_info"": {""partition"": 0, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
1,"{""split_info"": {""partition"": 1, ""start_offset"": 0, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


statement ok
Expand All @@ -72,12 +78,6 @@ create materialized view mv_1 as select * from s0;
# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 2s

# SourceExecutor's ingestion started, but it only starts from latest (offset 1).
system ok
internal_table.mjs --name s0 --type source
----
(empty)


# SourceBackfill starts from offset 0, with backfill_info: HasDataToBackfill { latest_offset: "0" } (decided by kafka high watermark).
# (meaning upstream already consumed offset 0, so we only need to backfill to offset 0)
Expand Down Expand Up @@ -144,7 +144,7 @@ EOF

sleep 2s

# SourceExecutor's finally got new data now.
# SourceExecutor's got new data.
system ok
internal_table.mjs --name s0 --type source
----
Expand Down Expand Up @@ -185,16 +185,6 @@ select v1, v2 from mv_1;
4 dd


# start_offset changed to 1
system ok
internal_table.mjs --name s0 --type source
----
0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 2, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 3, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"


# Transition from SourceCachingUp to Finished after consuming one upstream message.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
Expand Down Expand Up @@ -334,6 +324,47 @@ internal_table.mjs --name s0 --type source
# # risedev psql -c "select name, flags, parallelism from rw_fragments JOIN rw_relations ON rw_fragments.table_id = rw_relations.id order by name;"


# Test: rate limit and resume won't lose data

statement ok
alter source s0 set source_rate_limit to 0;


system ok
cat <<EOF | rpk topic produce shared_source -f "%p %v\n" -p 0
0 {"v1": 1, "v2": "a"}
1 {"v1": 2, "v2": "b"}
2 {"v1": 3, "v2": "c"}
3 {"v1": 4, "v2": "d"}
EOF

sleep 2s

# no data goes in

query ?? rowsort
select v1, count(*) from mv_1 group by v1;
----
1 12
2 12
3 13
4 14

statement ok
alter source s0 set source_rate_limit to default;

sleep 3s

# data comes in
query ?? rowsort
select v1, count(*) from mv_1 group by v1;
----
1 13
2 13
3 14
4 15


statement ok
drop source s0 cascade;

Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ impl SourceExecutor {
ConnectorProperties::default(),
None,
));
let stream = self
let (stream, _) = self
.source
.build_stream(Some(self.split_list), self.column_ids, source_ctx)
.build_stream(Some(self.split_list), self.column_ids, source_ctx, false)
.await?;

#[for_await]
Expand Down
8 changes: 6 additions & 2 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub trait TryFromBTreeMap: Sized + UnknownFields {
/// Represents `WITH` options for sources.
///
/// Each instance should add a `#[derive(with_options::WithOptions)]` marker.
pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions {
pub trait SourceProperties: TryFromBTreeMap + Clone + WithOptions + std::fmt::Debug {
const SOURCE_NAME: &'static str;
type Split: SplitMetaData
+ TryFrom<SplitImpl, Error = crate::error::ConnectorError>
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
}
}

pub async fn create_split_reader<P: SourceProperties + std::fmt::Debug>(
pub async fn create_split_reader<P: SourceProperties>(
prop: P,
splits: Vec<SplitImpl>,
parser_config: ParserConfig,
Expand Down Expand Up @@ -375,6 +375,10 @@ pub trait SplitReader: Sized + Send {
fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
HashMap::new()
}

async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
Err(anyhow!("seek_to_latest is not supported for this connector").into())
}
}

/// Information used to determine whether we should start and finish source backfill.
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn build_cdc_table_id(source_id: u32, external_table_name: &str) -> String {
format!("{}.{}", source_id, external_table_name)
}

pub trait CdcSourceTypeTrait: Send + Sync + Clone + 'static {
pub trait CdcSourceTypeTrait: Send + Sync + Clone + std::fmt::Debug + 'static {
const CDC_CONNECTOR_NAME: &'static str;
fn source_type() -> CdcSourceType;
}
Expand Down
2 changes: 0 additions & 2 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ impl SplitEnumerator for KafkaSplitEnumerator {
partition,
start_offset: start_offsets.remove(&partition).unwrap(),
stop_offset: stop_offsets.remove(&partition).unwrap(),
hack_seek_to_latest: false,
})
.collect();

Expand Down Expand Up @@ -299,7 +298,6 @@ impl KafkaSplitEnumerator {
partition: *partition,
start_offset: Some(start_offset),
stop_offset: Some(stop_offset),
hack_seek_to_latest:false
}
})
.collect::<Vec<KafkaSplit>>())
Expand Down
34 changes: 29 additions & 5 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ use crate::source::kafka::{
};
use crate::source::{
into_chunk_stream, BackfillInfo, BoxChunkSourceStream, Column, SourceContextRef, SplitId,
SplitMetaData, SplitReader,
SplitImpl, SplitMetaData, SplitReader,
};

pub struct KafkaSplitReader {
consumer: StreamConsumer<RwConsumerContext>,
offsets: HashMap<SplitId, (Option<i64>, Option<i64>)>,
backfill_info: HashMap<SplitId, BackfillInfo>,
splits: Vec<KafkaSplit>,
sync_call_timeout: Duration,
bytes_per_second: usize,
max_num_messages: usize,
parser_config: ParserConfig,
Expand Down Expand Up @@ -110,12 +112,10 @@ impl SplitReader for KafkaSplitReader {

let mut offsets = HashMap::new();
let mut backfill_info = HashMap::new();
for split in splits {
for split in splits.clone() {
offsets.insert(split.id(), (split.start_offset, split.stop_offset));

if split.hack_seek_to_latest {
tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::End)?;
} else if let Some(offset) = split.start_offset {
if let Some(offset) = split.start_offset {
tpl.add_partition_offset(
split.topic.as_str(),
split.partition,
Expand Down Expand Up @@ -168,8 +168,10 @@ impl SplitReader for KafkaSplitReader {
Ok(Self {
consumer,
offsets,
splits,
backfill_info,
bytes_per_second,
sync_call_timeout: properties.common.sync_call_timeout,
max_num_messages,
parser_config,
source_ctx,
Expand All @@ -185,6 +187,28 @@ impl SplitReader for KafkaSplitReader {
fn backfill_info(&self) -> HashMap<SplitId, BackfillInfo> {
self.backfill_info.clone()
}

async fn seek_to_latest(&mut self) -> Result<Vec<SplitImpl>> {
let mut latest_splits: Vec<SplitImpl> = Vec::new();
let mut tpl = TopicPartitionList::with_capacity(self.splits.len());
for mut split in self.splits.clone() {
// we can't get latest offset if we use Offset::End, so we just fetch watermark here.
let (_low, high) = self
.consumer
.fetch_watermarks(
split.topic.as_str(),
split.partition,
self.sync_call_timeout,
)
.await?;
tpl.add_partition_offset(split.topic.as_str(), split.partition, Offset::Offset(high))?;
split.start_offset = Some(high - 1);
latest_splits.push(split.into());
}
// replace the previous assignment
self.consumer.assign(&tpl)?;
Ok(latest_splits)
}
}

impl KafkaSplitReader {
Expand Down
12 changes: 0 additions & 12 deletions src/connector/src/source/kafka/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ pub struct KafkaSplit {
/// A better approach would be to make it **inclusive**. <https://github.com/risingwavelabs/risingwave/pull/16257>
pub(crate) start_offset: Option<i64>,
pub(crate) stop_offset: Option<i64>,
#[serde(skip)]
/// Used by shared source to hackily seek to the latest offset without fetching start offset first.
/// XXX: But why do we fetch low watermark for latest start offset..?
///
/// When this is `true`, `start_offset` will be ignored.
pub(crate) hack_seek_to_latest: bool,
}

impl SplitMetaData for KafkaSplit {
Expand Down Expand Up @@ -72,16 +66,10 @@ impl KafkaSplit {
partition,
start_offset,
stop_offset,
hack_seek_to_latest: false,
}
}

pub fn get_topic_and_partition(&self) -> (String, i32) {
(self.topic.clone(), self.partition)
}

/// This should only be used for a fresh split, not persisted in state table yet.
pub fn seek_to_latest_offset(&mut self) {
self.hack_seek_to_latest = true;
}
}
28 changes: 22 additions & 6 deletions src/connector/src/source/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use crate::source::filesystem::opendal_source::{
use crate::source::filesystem::{FsPageItem, OpendalFsSplit};
use crate::source::{
create_split_reader, BackfillInfo, BoxChunkSourceStream, BoxTryStream, Column,
ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitReader,
WaitCheckpointTask,
ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext, SplitId, SplitImpl,
SplitReader, WaitCheckpointTask,
};
use crate::{dispatch_source_prop, WithOptionsSecResolved};

Expand Down Expand Up @@ -211,14 +211,17 @@ impl SourceReader {
}

/// Build `SplitReader`s and then `BoxChunkSourceStream` from the given `ConnectorState` (`SplitImpl`s).
///
/// If `seek_to_latest` is true, will also return the latest splits after seek.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we always return the splits?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds OK to me. But ConnectorState is also Option<Vec<SplitImpl>> (which is also a little unnecessary to me), so perhaps we should refactor that together. NTFS

pub async fn build_stream(
&self,
state: ConnectorState,
column_ids: Vec<ColumnId>,
source_ctx: Arc<SourceContext>,
) -> ConnectorResult<BoxChunkSourceStream> {
seek_to_latest: bool,
) -> ConnectorResult<(BoxChunkSourceStream, Option<Vec<SplitImpl>>)> {
let Some(splits) = state else {
return Ok(pending().boxed());
return Ok((pending().boxed(), None));
};
let config = self.config.clone();
let columns = self.get_target_columns(column_ids)?;
Expand All @@ -243,7 +246,7 @@ impl SourceReader {

let support_multiple_splits = config.support_multiple_splits();
dispatch_source_prop!(config, prop, {
let readers = if support_multiple_splits {
let mut readers = if support_multiple_splits {
tracing::debug!(
"spawning connector split reader for multiple splits {:?}",
splits
Expand All @@ -268,7 +271,20 @@ impl SourceReader {
.await?
};

Ok(select_all(readers.into_iter().map(|r| r.into_stream())).boxed())
let latest_splits = if seek_to_latest {
let mut latest_splits = Vec::new();
for reader in &mut readers {
latest_splits.extend(reader.seek_to_latest().await?);
}
Some(latest_splits)
} else {
None
};

Ok((
select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
latest_splits,
))
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/source/fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
batch: SplitBatch,
rate_limit_rps: Option<u32>,
) -> StreamExecutorResult<BoxChunkSourceStream> {
let stream = source_desc
let (stream, _) = source_desc
.source
.build_stream(batch, column_ids, Arc::new(source_ctx))
.build_stream(batch, column_ids, Arc::new(source_ctx), false)
.await
.map_err(StreamExecutorError::connector_error)?;
Ok(apply_rate_limit(stream, rate_limit_rps).boxed())
Expand Down
1 change: 0 additions & 1 deletion src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,6 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
.await?;

if self.should_report_finished(&backfill_stage.states) {
tracing::debug!("progress finish");
self.progress.finish(
barrier.epoch,
backfill_stage.total_backfilled_rows(),
Expand Down
Loading
Loading