Skip to content

Commit

Permalink
chore: add is_visible in column for connector (risingwavelabs#8592)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Mar 16, 2023
1 parent 075d50a commit 14bfc62
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 49 deletions.
1 change: 1 addition & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ pub type DataType = risingwave_common::types::DataType;
pub struct Column {
pub name: String,
pub data_type: DataType,
pub is_visible: bool,
}

/// Split id resides in every source message, use `Arc` to avoid copying.
Expand Down
101 changes: 63 additions & 38 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ use risingwave_common::util::iter_util::ZipEqFast;

use crate::source::{SourceFormat, SourceMessage, SourceMeta, SplitId, StreamChunkWithState};

pub enum FieldDesc {
// field is invisible, generate None
Invisible,
Visible(FieldGeneratorImpl),
}

pub struct DatagenEventGenerator {
// fields_map: HashMap<String, FieldGeneratorImpl>,
field_names: Vec<String>,
fields_vec: Vec<FieldGeneratorImpl>,
fields_vec: Vec<FieldDesc>,
source_format: SourceFormat,
data_types: Vec<DataType>,
offset: u64,
Expand All @@ -46,7 +52,7 @@ pub struct DatagenMeta {
impl DatagenEventGenerator {
#[allow(clippy::too_many_arguments)]
pub fn new(
fields_vec: Vec<FieldGeneratorImpl>,
fields_vec: Vec<FieldDesc>,
field_names: Vec<String>,
source_format: SourceFormat,
data_types: Vec<DataType>,
Expand Down Expand Up @@ -96,16 +102,23 @@ impl DatagenEventGenerator {
.iter()
.zip_eq_fast(self.fields_vec.iter_mut())
{
let value = field_generator.generate_json(self.offset);
if value.is_null() {
reach_end = true;
tracing::info!(
"datagen split {} stop generate, offset {}",
self.split_id,
self.offset
);
break 'outer;
}
let value = match field_generator {
FieldDesc::Invisible => continue,
FieldDesc::Visible(field_generator) => {
let value = field_generator.generate_json(self.offset);
if value.is_null() {
reach_end = true;
tracing::info!(
"datagen split {} stop generate, offset {}",
self.split_id,
self.offset
);
break 'outer;
}
value
}
};

map.insert(name.clone(), value);
}
Bytes::from(serde_json::Value::from(map).to_string())
Expand Down Expand Up @@ -159,16 +172,24 @@ impl DatagenEventGenerator {
'outer: for _ in 0..num_rows_to_generate {
let mut row = Vec::with_capacity(self.fields_vec.len());
for field_generator in &mut self.fields_vec {
let datum = field_generator.generate_datum(self.offset);
if datum.is_none() {
reach_end = true;
tracing::info!(
"datagen split {} stop generate, offset {}",
self.split_id,
self.offset
);
break 'outer;
}
let datum = match field_generator {
FieldDesc::Invisible => None,
FieldDesc::Visible(field_generator) => {
let datum = field_generator.generate_datum(self.offset);
if datum.is_none() {
reach_end = true;
tracing::info!(
"datagen split {} stop generate, offset {}",
self.split_id,
self.offset
);
break 'outer;
};

datum
}
};

row.push(datum);
}

Expand Down Expand Up @@ -214,22 +235,26 @@ mod tests {

let data_types = vec![DataType::Int32, DataType::Float32];
let fields_vec = vec![
FieldGeneratorImpl::with_number_sequence(
data_types[0].clone(),
Some(start.to_string()),
Some(end.to_string()),
split_index,
split_num,
)
.unwrap(),
FieldGeneratorImpl::with_number_sequence(
data_types[1].clone(),
Some(start.to_string()),
Some(end.to_string()),
split_index,
split_num,
)
.unwrap(),
FieldDesc::Visible(
FieldGeneratorImpl::with_number_sequence(
data_types[0].clone(),
Some(start.to_string()),
Some(end.to_string()),
split_index,
split_num,
)
.unwrap(),
),
FieldDesc::Visible(
FieldGeneratorImpl::with_number_sequence(
data_types[1].clone(),
Some(start.to_string()),
Some(end.to_string()),
split_index,
split_num,
)
.unwrap(),
),
];

let generator = DatagenEventGenerator::new(
Expand Down
27 changes: 19 additions & 8 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::impl_common_split_reader_logic;
use crate::parser::{ParserConfig, SpecificParserConfig};
use crate::source::data_gen_util::spawn_data_generation_stream;
use crate::source::datagen::source::SEQUENCE_FIELD_KIND;
use crate::source::datagen::{DatagenProperties, DatagenSplit};
use crate::source::datagen::{DatagenProperties, DatagenSplit, FieldDesc};
use crate::source::{
BoxSourceStream, BoxSourceWithStateStream, Column, DataType, SourceContextRef, SplitId,
SplitImpl, SplitMetaData, SplitReader,
Expand Down Expand Up @@ -106,13 +106,18 @@ impl SplitReader for DatagenSplitReader {
for column in columns {
// let name = column.name.clone();
let data_type = column.data_type.clone();
let gen = generator_from_data_type(
column.data_type,
&fields_option_map,
&column.name,
split_index,
split_num,
)?;

let gen = if column.is_visible {
FieldDesc::Visible(generator_from_data_type(
column.data_type,
&fields_option_map,
&column.name,
split_index,
split_num,
)?)
} else {
FieldDesc::Invisible
};
fields_vec.push(gen);
data_types.push(data_type);
field_names.push(column.name);
Expand Down Expand Up @@ -284,21 +289,25 @@ mod tests {
Column {
name: "random_int".to_string(),
data_type: DataType::Int32,
is_visible: true,
},
Column {
name: "random_float".to_string(),
data_type: DataType::Float32,
is_visible: true,
},
Column {
name: "sequence_int".to_string(),
data_type: DataType::Int32,
is_visible: true,
},
Column {
name: "struct".to_string(),
data_type: DataType::Struct(Arc::new(StructType {
fields: vec![DataType::Int32],
field_names: vec!["random_int".to_string()],
})),
is_visible: true,
},
];
let state = vec![SplitImpl::Datagen(DatagenSplit {
Expand Down Expand Up @@ -364,10 +373,12 @@ mod tests {
Column {
name: "_".to_string(),
data_type: DataType::Int64,
is_visible: true,
},
Column {
name: "random_int".to_string(),
data_type: DataType::Int32,
is_visible: true,
},
];
let state = vec![SplitImpl::Datagen(DatagenSplit {
Expand Down
21 changes: 21 additions & 0 deletions src/connector/src/source/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ impl SourceColumnDesc {
is_meta: false,
}
}

#[inline]
pub fn is_visible(&self) -> bool {
!self.is_row_id && !self.is_meta
}
}

impl From<&ColumnDesc> for SourceColumnDesc {
Expand Down Expand Up @@ -75,3 +80,19 @@ impl From<&SourceColumnDesc> for ColumnDesc {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_is_visible() {
let mut c = SourceColumnDesc::simple("a", DataType::Int32, ColumnId::new(0));
assert!(c.is_visible());
c.is_row_id = true;
assert!(!c.is_visible());
c.is_row_id = false;
c.is_meta = true;
assert!(!c.is_visible());
}
}
6 changes: 3 additions & 3 deletions src/source/src/connector_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ impl ConnectorSource {
let data_gen_columns = Some(
columns
.iter()
.cloned()
.map(|col| Column {
name: col.name,
data_type: col.data_type,
name: col.name.clone(),
data_type: col.data_type.clone(),
is_visible: col.is_visible(),
})
.collect_vec(),
);
Expand Down

0 comments on commit 14bfc62

Please sign in to comment.