Skip to content

Commit

Permalink
timestamp serialization with tz
Browse files Browse the repository at this point in the history
Signed-off-by: Veeupup <code@tanweime.com>
  • Loading branch information
Veeupup committed May 5, 2022
1 parent db78d7a commit 47c25eb
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 3 deletions.
5 changes: 5 additions & 0 deletions common/datavalues/src/types/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::any::Any;
use chrono_tz::Tz;
use std::collections::BTreeMap;

use common_arrow::arrow::datatypes::DataType as ArrowType;
Expand Down Expand Up @@ -125,6 +126,10 @@ pub trait DataType: std::fmt::Debug + Sync + Send + DynClone {

fn create_mutable(&self, capacity: usize) -> Box<dyn MutableColumn>;
fn create_serializer(&self) -> TypeSerializerImpl;
/// work for timestamp serializer
fn create_serializer_with_tz(&self, _tz: Tz) -> TypeSerializerImpl {
unimplemented!()
}
fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl;
}

Expand Down
4 changes: 4 additions & 0 deletions common/datavalues/src/types/serializations/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ impl Default for TimestampSerializer {
}

impl TimestampSerializer {
pub fn new_with_tz(tz: Tz) -> Self {
Self {tz}
}

pub fn to_timestamp(&self, value: &i64) -> DateTime<Tz> {
value.to_timestamp(&self.tz)
}
Expand Down
4 changes: 4 additions & 0 deletions common/datavalues/src/types/type_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ impl DataType for TimestampType {
TimestampSerializer::default().into()
}

fn create_serializer_with_tz(&self, tz: Tz) -> TypeSerializerImpl {
TimestampSerializer::new_with_tz(tz).into()
}

fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl {
let tz = "UTC".parse::<Tz>().unwrap();
TimestampDeserializer {
Expand Down
12 changes: 11 additions & 1 deletion query/src/servers/clickhouse/writers/query_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::borrow::Cow;
use chrono_tz::Tz;

use common_io::prelude::FormatSettings;
use common_base::ProgressValues;
Expand Down Expand Up @@ -143,7 +144,16 @@ pub fn to_clickhouse_block(block: DataBlock, format: &FormatSettings) -> Result<
let column = block.column(column_index);
let field = block.schema().field(column_index);
let name = field.name();
let serializer = field.data_type().create_serializer();
let serializer = if field.data_type().data_type_id() == TypeID::Timestamp {
let tz =
String::from_utf8(format.timezone.clone()).map_err(|_| ErrorCode::LogicalError("timezone must be set"))?;
let tz = tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
field.data_type().create_serializer_with_tz(tz)
}else {
field.data_type().create_serializer()
};
result.append_column(column::new_column(
name,
serializer.serialize_clickhouse_format(&column.convert_full_column(), format)?,
Expand Down
2 changes: 1 addition & 1 deletion query/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ pub(crate) async fn query_handler(
.try_create_query(&query_id, ctx, req)
.await;

// TODO(veeupup): get query_ctx's format_settings here
// TODO(veeupup): get global query_ctx's format_settings, because we cann't set session settings now
let format = FormatSettings::default();
match query {
Ok(query) => {
Expand Down
6 changes: 5 additions & 1 deletion query/src/servers/mysql/writers/query_result_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ impl<'a, W: std::io::Write> DFQueryResultWriter<'a, W> {
}

let block = blocks[0].clone();
let tz: Tz = "UTC".parse().unwrap();
let tz =
String::from_utf8(format.timezone.clone()).map_err(|_| ErrorCode::LogicalError("timezone must be set"))?;
let tz = tz.parse::<Tz>().map_err(|_| {
ErrorCode::InvalidTimezone("Timezone has been checked and should be valid")
})?;
match convert_schema(block.schema()) {
Err(error) => Self::err(&error, dataset_writer),
Ok(columns) => {
Expand Down
1 change: 1 addition & 0 deletions query/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ impl QueryContextShared {
format.field_delimiter = settings.get_field_delimiter()?;
format.empty_as_default = settings.get_empty_as_default()? > 0;
format.skip_header = settings.get_skip_header()? > 0;
format.timezone = settings.get_timezone()?;
}
Ok(format)
}
Expand Down

0 comments on commit 47c25eb

Please sign in to comment.