forked from databendlabs/databend
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransform_source.rs
53 lines (46 loc) · 1.34 KB
/
transform_source.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Copyright 2020 The FuseQuery Authors.
//
// Code is licensed under AGPL License, Version 3.0.
use async_trait::async_trait;
use std::sync::Arc;
use crate::contexts::FuseQueryContext;
use crate::datasources::Partition;
use crate::datastreams::SendableDataBlockStream;
use crate::error::{FuseQueryError, FuseQueryResult};
use crate::processors::IProcessor;
pub struct SourceTransform {
ctx: Arc<FuseQueryContext>,
db: String,
table: String,
partitions: Vec<Partition>,
}
impl SourceTransform {
pub fn try_create(
ctx: Arc<FuseQueryContext>,
db: &str,
table: &str,
partitions: Vec<Partition>,
) -> FuseQueryResult<Self> {
Ok(SourceTransform {
ctx,
db: db.to_string(),
table: table.to_string(),
partitions,
})
}
}
#[async_trait]
impl IProcessor for SourceTransform {
fn name(&self) -> &str {
"SourceTransform"
}
fn connect_to(&mut self, _: Arc<dyn IProcessor>) -> FuseQueryResult<()> {
Err(FuseQueryError::Internal(
"Cannot call SourceTransform connect_to".to_string(),
))
}
async fn execute(&self) -> FuseQueryResult<SendableDataBlockStream> {
let table = self.ctx.get_table(self.db.as_str(), self.table.as_str())?;
table.read(self.partitions.clone()).await
}
}