Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connector): make SplitEnumerator/Reader dyn #20098

Merged
merged 6 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 72 additions & 18 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#[macro_export]
macro_rules! for_all_classified_sources {
($macro:path $(,$extra_args:tt)*) => {
($macro:path $(, $extra_args:tt)*) => {
$macro! {
// cdc sources
{
Expand Down Expand Up @@ -67,7 +67,7 @@ macro_rules! for_all_connections {
#[macro_export]
macro_rules! for_all_sources_inner {
(
{$({ $cdc_source_type:ident }),* },
{ $({ $cdc_source_type:ident }),* },
{ $({ $source_variant:ident, $prop_name:ty, $split:ty }),* },
$macro:tt $(, $extra_args:tt)*
) => {
Expand All @@ -79,13 +79,14 @@ macro_rules! for_all_sources_inner {
[< $cdc_source_type Cdc >],
$crate::source::cdc::[< $cdc_source_type CdcProperties >],
$crate::source::cdc::DebeziumCdcSplit<$crate::source::cdc::$cdc_source_type>
},
)*
}
),*
,
$(
{ $source_variant, $prop_name, $split }
),*
}
$(,$extra_args)*
$(, $extra_args)*
}
}
};
Expand All @@ -98,22 +99,55 @@ macro_rules! for_all_sources {
};
}

/// The invocation:
/// ```ignore
/// dispatch_source_enum_inner!(
/// {
/// {A1,B1,C1},
/// {A2,B2,C2}
/// },
/// EnumType, enum_value, inner_ident, body
/// );
/// ```
/// expands to:
/// ```ignore
/// match enum_value {
/// EnumType::A1(inner_ident) => {
/// #[allow(dead_code)]
/// type PropType = B1;
/// #[allow(dead_code)]
/// type SplitType = C1;
/// {
/// body
/// }
/// }
/// EnumType::A2(inner_ident) => {
/// #[allow(dead_code)]
/// type PropType = B2;
/// #[allow(dead_code)]
/// type SplitType = C2;
/// {
/// body
/// }
/// }
/// }
/// ```
#[macro_export]
macro_rules! dispatch_source_enum_inner {
(
{$({$source_variant:ident, $prop_name:ty, $split:ty }),*},
$enum_name:ident,
$impl:tt,
{$inner_name:ident, $prop_type_name:ident, $split_type_name:ident},
$enum_type:ident,
$enum_value:expr,
$inner_name:ident,
$body:expr
) => {{
match $impl {
match $enum_value {
$(
$enum_name::$source_variant($inner_name) => {
$enum_type::$source_variant($inner_name) => {
#[allow(dead_code)]
type $prop_type_name = $prop_name;
type PropType = $prop_name;
#[allow(dead_code)]
type $split_type_name = $split;
type SplitType = $split;
{
$body
}
Expand All @@ -123,10 +157,28 @@ macro_rules! dispatch_source_enum_inner {
}}
}

/// Usage: `dispatch_source_enum!(EnumType, enum_value, |inner_ident| body)`.
///
/// Inside `body`:
/// - use `inner_ident` to represent the matched variant.
/// - use `PropType` to represent the concrete property type.
/// - use `SplitType` to represent the concrete split type.
///
/// Expands to:
/// ```ignore
/// match enum_value {
/// EnumType::Variant1(inner_ident) => {
/// body
/// }
/// ...
/// }
/// ```
///
/// Note: `inner_ident` must be passed as an argument due to macro hygiene.
#[macro_export]
macro_rules! dispatch_source_enum {
($enum_name:ident, $impl:expr, $inner_name:tt, $body:expr) => {{
$crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_name, { $impl }, $inner_name, $body}
($enum_type:ident, $enum_value:expr, |$inner_name:ident| $body:expr) => {{
$crate::for_all_sources! {$crate::dispatch_source_enum_inner, $enum_type, { $enum_value }, $inner_name, $body}
}};
}

Expand Down Expand Up @@ -167,11 +219,12 @@ macro_rules! match_source_name_str {
}};
}

/// [`dispatch_source_enum`] with `SplitImpl` as the enum type.
#[macro_export]
macro_rules! dispatch_split_impl {
($impl:expr, $inner_name:ident, $prop_type_name:ident, $body:expr) => {{
($impl:expr, | $inner_name:ident | $body:expr) => {{
use $crate::source::SplitImpl;
$crate::dispatch_source_enum! {SplitImpl, { $impl }, {$inner_name, $prop_type_name, IgnoreSplitType}, $body}
$crate::dispatch_source_enum! {SplitImpl, { $impl }, |$inner_name| $body}
}};
}

Expand Down Expand Up @@ -290,11 +343,12 @@ macro_rules! impl_split {
}
}

/// [`dispatch_source_enum`] with `ConnectorProperties` as the enum type.
#[macro_export]
macro_rules! dispatch_source_prop {
($impl:expr, $source_prop:tt, $body:expr) => {{
($connector_properties:expr, |$inner_ident:ident| $body:expr) => {{
use $crate::source::ConnectorProperties;
$crate::dispatch_source_enum! {ConnectorProperties, { $impl }, {$source_prop, IgnorePropType, IgnoreSplitType}, {$body}}
$crate::dispatch_source_enum! {ConnectorProperties, { $connector_properties }, |$inner_ident| {$body}}
}};
}

Expand Down
147 changes: 124 additions & 23 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use async_trait::async_trait;
use aws_sdk_s3::types::Object;
use bytes::Bytes;
use enum_as_inner::EnumAsInner;
use futures::future::try_join_all;
use futures::stream::BoxStream;
use futures::Stream;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use risingwave_common::array::StreamChunk;
use risingwave_common::bail;
Expand All @@ -31,6 +32,7 @@ use risingwave_common::types::{JsonbVal, Scalar};
use risingwave_pb::catalog::{PbSource, PbStreamSourceInfo};
use risingwave_pb::plan_common::ExternalTableDesc;
use risingwave_pb::source::ConnectorSplit;
use rw_futures_util::select_all;
use serde::de::DeserializeOwned;
use serde_json::json;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -113,21 +115,77 @@ impl<P: DeserializeOwned + UnknownFields> TryFromBTreeMap for P {
}
}

pub async fn create_split_reader<P: SourceProperties>(
#[derive(Default)]
pub struct CreateSplitReaderOpt {
pub support_multiple_splits: bool,
pub seek_to_latest: bool,
}

#[derive(Default)]
pub struct CreateSplitReaderResult {
pub latest_splits: Option<Vec<SplitImpl>>,
pub backfill_info: HashMap<SplitId, BackfillInfo>,
}

pub async fn create_split_readers<P: SourceProperties>(
prop: P,
splits: Vec<SplitImpl>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
) -> Result<P::SplitReader> {
opt: CreateSplitReaderOpt,
) -> Result<(BoxSourceChunkStream, CreateSplitReaderResult)> {
let splits = splits.into_iter().map(P::Split::try_from).try_collect()?;
P::SplitReader::new(prop, splits, parser_config, source_ctx, columns).await
let mut res = CreateSplitReaderResult {
backfill_info: HashMap::new(),
latest_splits: None,
};
if opt.support_multiple_splits {
let mut reader = P::SplitReader::new(
prop.clone(),
splits,
parser_config.clone(),
source_ctx.clone(),
columns.clone(),
)
.await?;
if opt.seek_to_latest {
res.latest_splits = Some(reader.seek_to_latest().await?);
}
res.backfill_info = reader.backfill_info();
Ok((reader.into_stream().boxed(), res))
} else {
let mut readers = try_join_all(splits.into_iter().map(|split| {
// TODO: is this reader split across multiple threads...? Realistically, we want
// source_ctx to live in a single actor.
P::SplitReader::new(
prop.clone(),
vec![split],
parser_config.clone(),
source_ctx.clone(),
columns.clone(),
)
}))
.await?;
if opt.seek_to_latest {
let mut latest_splits = vec![];
for reader in &mut readers {
latest_splits.extend(reader.seek_to_latest().await?);
}
res.latest_splits = Some(latest_splits);
}
res.backfill_info = readers.iter().flat_map(|r| r.backfill_info()).collect();
Ok((
select_all(readers.into_iter().map(|r| r.into_stream())).boxed(),
res,
))
}
}

/// [`SplitEnumerator`] fetches the split metadata from the external source service.
/// NOTE: It runs in the meta server, so probably it should be moved to the `meta` crate.
#[async_trait]
pub trait SplitEnumerator: Sized {
pub trait SplitEnumerator: Sized + Send {
type Split: SplitMetaData + Send;
type Properties;

Expand All @@ -139,6 +197,21 @@ pub trait SplitEnumerator: Sized {
pub type SourceContextRef = Arc<SourceContext>;
pub type SourceEnumeratorContextRef = Arc<SourceEnumeratorContext>;

/// Dyn-compatible [`SplitEnumerator`].
#[async_trait]
pub trait AnySplitEnumerator: Send {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>>;
}

#[async_trait]
impl<T: SplitEnumerator<Split: Into<SplitImpl>>> AnySplitEnumerator for T {
async fn list_splits(&mut self) -> Result<Vec<SplitImpl>> {
SplitEnumerator::list_splits(self)
.await
.map(|s| s.into_iter().map(|s| s.into()).collect())
}
}

/// The max size of a chunk yielded by source stream.
pub const MAX_CHUNK_SIZE: usize = 1024;

Expand Down Expand Up @@ -484,12 +557,13 @@ impl ConnectorProperties {

/// Load additional info from `PbSource`. Currently only used by CDC.
pub fn init_from_pb_source(&mut self, source: &PbSource) {
dispatch_source_prop!(self, prop, prop.init_from_pb_source(source))
dispatch_source_prop!(self, |prop| prop.init_from_pb_source(source))
}

/// Load additional info from `ExternalTableDesc`. Currently only used by CDC.
pub fn init_from_pb_cdc_table_desc(&mut self, cdc_table_desc: &ExternalTableDesc) {
dispatch_source_prop!(self, prop, prop.init_from_pb_cdc_table_desc(cdc_table_desc))
dispatch_source_prop!(self, |prop| prop
.init_from_pb_cdc_table_desc(cdc_table_desc))
}

pub fn support_multiple_splits(&self) -> bool {
Expand All @@ -498,16 +572,52 @@ impl ConnectorProperties {
|| matches!(self, ConnectorProperties::Gcs(_))
|| matches!(self, ConnectorProperties::Azblob(_))
}

pub async fn create_split_enumerator(
self,
context: crate::source::base::SourceEnumeratorContextRef,
) -> crate::error::ConnectorResult<Box<dyn AnySplitEnumerator>> {
let enumerator: Box<dyn AnySplitEnumerator> = dispatch_source_prop!(self, |prop| Box::new(
<PropType as SourceProperties>::SplitEnumerator::new(*prop, context).await?
));
Ok(enumerator)
}

pub async fn create_split_reader(
self,
splits: Vec<SplitImpl>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
mut opt: crate::source::CreateSplitReaderOpt,
) -> Result<(BoxSourceChunkStream, crate::source::CreateSplitReaderResult)> {
opt.support_multiple_splits = self.support_multiple_splits();
tracing::debug!(
?splits,
support_multiple_splits = opt.support_multiple_splits,
"spawning connector split reader",
);

dispatch_source_prop!(self, |prop| create_split_readers(
*prop,
splits,
parser_config,
source_ctx,
columns,
opt
)
.await)
}
}

for_all_sources!(impl_split);
for_all_connections!(impl_connection);

impl From<&SplitImpl> for ConnectorSplit {
fn from(split: &SplitImpl) -> Self {
dispatch_split_impl!(split, inner, SourcePropType, {
dispatch_split_impl!(split, |inner| {
ConnectorSplit {
split_type: String::from(SourcePropType::SOURCE_NAME),
split_type: String::from(PropType::SOURCE_NAME),
encoded_split: inner.encode_to_bytes().to_vec(),
}
})
Expand Down Expand Up @@ -564,7 +674,7 @@ impl SplitImpl {

impl SplitMetaData for SplitImpl {
fn id(&self) -> SplitId {
dispatch_split_impl!(self, inner, IgnoreType, inner.id())
dispatch_split_impl!(self, |inner| inner.id())
}

fn encode_to_json(&self) -> JsonbVal {
Expand All @@ -587,31 +697,22 @@ impl SplitMetaData for SplitImpl {
}

fn update_offset(&mut self, last_seen_offset: String) -> Result<()> {
dispatch_split_impl!(
self,
inner,
IgnoreType,
inner.update_offset(last_seen_offset)
)
dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset))
}
}

impl SplitImpl {
pub fn get_type(&self) -> String {
dispatch_split_impl!(self, _ignored, PropType, {
PropType::SOURCE_NAME.to_owned()
})
dispatch_split_impl!(self, |_inner| PropType::SOURCE_NAME.to_owned())
}

pub fn update_in_place(&mut self, last_seen_offset: String) -> Result<()> {
dispatch_split_impl!(self, inner, IgnoreType, {
inner.update_offset(last_seen_offset)?
});
dispatch_split_impl!(self, |inner| inner.update_offset(last_seen_offset)?);
Ok(())
}

pub fn encode_to_json_inner(&self) -> JsonbVal {
dispatch_split_impl!(self, inner, IgnoreType, inner.encode_to_json())
dispatch_split_impl!(self, |inner| inner.encode_to_json())
}
}

Expand Down
Loading
Loading