diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index e2ddd01832c86..14f68f386782a 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -363,6 +363,7 @@ pub type DataType = risingwave_common::types::DataType; pub struct Column { pub name: String, pub data_type: DataType, + pub is_visible: bool, } /// Split id resides in every source message, use `Arc` to avoid copying. diff --git a/src/connector/src/source/datagen/source/generator.rs b/src/connector/src/source/datagen/source/generator.rs index 3d747921aef02..8c4bf59a7cd05 100644 --- a/src/connector/src/source/datagen/source/generator.rs +++ b/src/connector/src/source/datagen/source/generator.rs @@ -26,10 +26,16 @@ use risingwave_common::util::iter_util::ZipEqFast; use crate::source::{SourceFormat, SourceMessage, SourceMeta, SplitId, StreamChunkWithState}; +pub enum FieldDesc { + // field is invisible, generate None + Invisible, + Visible(FieldGeneratorImpl), +} + pub struct DatagenEventGenerator { // fields_map: HashMap, field_names: Vec, - fields_vec: Vec, + fields_vec: Vec, source_format: SourceFormat, data_types: Vec, offset: u64, @@ -46,7 +52,7 @@ pub struct DatagenMeta { impl DatagenEventGenerator { #[allow(clippy::too_many_arguments)] pub fn new( - fields_vec: Vec, + fields_vec: Vec, field_names: Vec, source_format: SourceFormat, data_types: Vec, @@ -96,16 +102,23 @@ impl DatagenEventGenerator { .iter() .zip_eq_fast(self.fields_vec.iter_mut()) { - let value = field_generator.generate_json(self.offset); - if value.is_null() { - reach_end = true; - tracing::info!( - "datagen split {} stop generate, offset {}", - self.split_id, - self.offset - ); - break 'outer; - } + let value = match field_generator { + FieldDesc::Invisible => continue, + FieldDesc::Visible(field_generator) => { + let value = field_generator.generate_json(self.offset); + if value.is_null() { + reach_end = true; + tracing::info!( + "datagen split {} stop generate, offset {}", + self.split_id, + self.offset + ); + break 'outer; + } + value + } + }; + map.insert(name.clone(), value); } Bytes::from(serde_json::Value::from(map).to_string()) @@ -159,16 +172,24 @@ impl DatagenEventGenerator { 'outer: for _ in 0..num_rows_to_generate { let mut row = Vec::with_capacity(self.fields_vec.len()); for field_generator in &mut self.fields_vec { - let datum = field_generator.generate_datum(self.offset); - if datum.is_none() { - reach_end = true; - tracing::info!( - "datagen split {} stop generate, offset {}", - self.split_id, - self.offset - ); - break 'outer; - } + let datum = match field_generator { + FieldDesc::Invisible => None, + FieldDesc::Visible(field_generator) => { + let datum = field_generator.generate_datum(self.offset); + if datum.is_none() { + reach_end = true; + tracing::info!( + "datagen split {} stop generate, offset {}", + self.split_id, + self.offset + ); + break 'outer; + }; + + datum + } + }; + row.push(datum); } @@ -214,22 +235,26 @@ mod tests { let data_types = vec![DataType::Int32, DataType::Float32]; let fields_vec = vec![ - FieldGeneratorImpl::with_number_sequence( - data_types[0].clone(), - Some(start.to_string()), - Some(end.to_string()), - split_index, - split_num, - ) - .unwrap(), - FieldGeneratorImpl::with_number_sequence( - data_types[1].clone(), - Some(start.to_string()), - Some(end.to_string()), - split_index, - split_num, - ) - .unwrap(), + FieldDesc::Visible( + FieldGeneratorImpl::with_number_sequence( + data_types[0].clone(), + Some(start.to_string()), + Some(end.to_string()), + split_index, + split_num, + ) + .unwrap(), + ), + FieldDesc::Visible( + FieldGeneratorImpl::with_number_sequence( + data_types[1].clone(), + Some(start.to_string()), + Some(end.to_string()), + split_index, + split_num, + ) + .unwrap(), + ), ]; let generator = DatagenEventGenerator::new( diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index c0780c5604622..a3f17c1ca4b35 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -26,7 +26,7 @@ use crate::impl_common_split_reader_logic; use crate::parser::{ParserConfig, SpecificParserConfig}; use crate::source::data_gen_util::spawn_data_generation_stream; use crate::source::datagen::source::SEQUENCE_FIELD_KIND; -use crate::source::datagen::{DatagenProperties, DatagenSplit}; +use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc}; use crate::source::{ BoxSourceStream, BoxSourceWithStateStream, Column, DataType, SourceContextRef, SplitId, SplitImpl, SplitMetaData, SplitReader, @@ -106,13 +106,18 @@ impl SplitReader for DatagenSplitReader { for column in columns { // let name = column.name.clone(); let data_type = column.data_type.clone(); - let gen = generator_from_data_type( - column.data_type, - &fields_option_map, - &column.name, - split_index, - split_num, - )?; + + let gen = if column.is_visible { + FieldDesc::Visible(generator_from_data_type( + column.data_type, + &fields_option_map, + &column.name, + split_index, + split_num, + )?) + } else { + FieldDesc::Invisible + }; fields_vec.push(gen); data_types.push(data_type); field_names.push(column.name); @@ -284,14 +289,17 @@ mod tests { Column { name: "random_int".to_string(), data_type: DataType::Int32, + is_visible: true, }, Column { name: "random_float".to_string(), data_type: DataType::Float32, + is_visible: true, }, Column { name: "sequence_int".to_string(), data_type: DataType::Int32, + is_visible: true, }, Column { name: "struct".to_string(), @@ -299,6 +307,7 @@ mod tests { fields: vec![DataType::Int32], field_names: vec!["random_int".to_string()], })), + is_visible: true, }, ]; let state = vec![SplitImpl::Datagen(DatagenSplit { @@ -364,10 +373,12 @@ mod tests { Column { name: "_".to_string(), data_type: DataType::Int64, + is_visible: true, }, Column { name: "random_int".to_string(), data_type: DataType::Int32, + is_visible: true, }, ]; let state = vec![SplitImpl::Datagen(DatagenSplit { diff --git a/src/connector/src/source/manager.rs b/src/connector/src/source/manager.rs index 25afb4f55917c..79df615980a4e 100644 --- a/src/connector/src/source/manager.rs +++ b/src/connector/src/source/manager.rs @@ -48,6 +48,11 @@ impl SourceColumnDesc { is_meta: false, } } + + #[inline] + pub fn is_visible(&self) -> bool { + !self.is_row_id && !self.is_meta + } } impl From<&ColumnDesc> for SourceColumnDesc { @@ -75,3 +80,19 @@ impl From<&SourceColumnDesc> for ColumnDesc { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_visible() { + let mut c = SourceColumnDesc::simple("a", DataType::Int32, ColumnId::new(0)); + assert!(c.is_visible()); + c.is_row_id = true; + assert!(!c.is_visible()); + c.is_row_id = false; + c.is_meta = true; + assert!(!c.is_visible()); + } +} diff --git a/src/source/src/connector_source.rs b/src/source/src/connector_source.rs index a131e4c9a3928..979e905b0e596 100644 --- a/src/source/src/connector_source.rs +++ b/src/source/src/connector_source.rs @@ -101,10 +101,10 @@ impl ConnectorSource { let data_gen_columns = Some( columns .iter() - .cloned() .map(|col| Column { - name: col.name, - data_type: col.data_type, + name: col.name.clone(), + data_type: col.data_type.clone(), + is_visible: col.is_visible(), }) .collect_vec(), );