From 47c25eb5256bb15e6da5205ffb9152d235c23b6e Mon Sep 17 00:00:00 2001 From: Veeupup Date: Thu, 5 May 2022 11:33:22 +0800 Subject: [PATCH] timestamp serialization with tz Signed-off-by: Veeupup --- common/datavalues/src/types/data_type.rs | 5 +++++ .../datavalues/src/types/serializations/timestamp.rs | 4 ++++ common/datavalues/src/types/type_timestamp.rs | 4 ++++ query/src/servers/clickhouse/writers/query_writer.rs | 12 +++++++++++- query/src/servers/http/v1/http_query_handlers.rs | 2 +- .../src/servers/mysql/writers/query_result_writer.rs | 6 +++++- query/src/sessions/query_ctx_shared.rs | 1 + 7 files changed, 31 insertions(+), 3 deletions(-) diff --git a/common/datavalues/src/types/data_type.rs b/common/datavalues/src/types/data_type.rs index f24b27235c70..29683d772491 100644 --- a/common/datavalues/src/types/data_type.rs +++ b/common/datavalues/src/types/data_type.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use chrono_tz::Tz; use std::collections::BTreeMap; use common_arrow::arrow::datatypes::DataType as ArrowType; @@ -125,6 +126,10 @@ pub trait DataType: std::fmt::Debug + Sync + Send + DynClone { fn create_mutable(&self, capacity: usize) -> Box; fn create_serializer(&self) -> TypeSerializerImpl; + /// work for timestamp serializer + fn create_serializer_with_tz(&self, _tz: Tz) -> TypeSerializerImpl { + unimplemented!() + } fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl; } diff --git a/common/datavalues/src/types/serializations/timestamp.rs b/common/datavalues/src/types/serializations/timestamp.rs index 3ed7c28b380d..f619e4bdbb43 100644 --- a/common/datavalues/src/types/serializations/timestamp.rs +++ b/common/datavalues/src/types/serializations/timestamp.rs @@ -37,6 +37,10 @@ impl Default for TimestampSerializer { } impl TimestampSerializer { + pub fn new_with_tz(tz: Tz) -> Self { + Self {tz} + } + pub fn to_timestamp(&self, value: &i64) -> DateTime { value.to_timestamp(&self.tz) } diff --git a/common/datavalues/src/types/type_timestamp.rs b/common/datavalues/src/types/type_timestamp.rs index 5b21d91aa409..826016c7823b 100644 --- a/common/datavalues/src/types/type_timestamp.rs +++ b/common/datavalues/src/types/type_timestamp.rs @@ -150,6 +150,10 @@ impl DataType for TimestampType { TimestampSerializer::default().into() } + fn create_serializer_with_tz(&self, tz: Tz) -> TypeSerializerImpl { + TimestampSerializer::new_with_tz(tz).into() + } + fn create_deserializer(&self, capacity: usize) -> TypeDeserializerImpl { let tz = "UTC".parse::().unwrap(); TimestampDeserializer { diff --git a/query/src/servers/clickhouse/writers/query_writer.rs b/query/src/servers/clickhouse/writers/query_writer.rs index b2364e8b5366..872feb6f87fa 100644 --- a/query/src/servers/clickhouse/writers/query_writer.rs +++ b/query/src/servers/clickhouse/writers/query_writer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::borrow::Cow; +use chrono_tz::Tz; use common_io::prelude::FormatSettings; use common_base::ProgressValues; @@ -143,7 +144,16 @@ pub fn to_clickhouse_block(block: DataBlock, format: &FormatSettings) -> Result< let column = block.column(column_index); let field = block.schema().field(column_index); let name = field.name(); - let serializer = field.data_type().create_serializer(); + let serializer = if field.data_type().data_type_id() == TypeID::Timestamp { + let tz = + String::from_utf8(format.timezone.clone()).map_err(|_| ErrorCode::LogicalError("timezone must be set"))?; + let tz = tz.parse::().map_err(|_| { + ErrorCode::InvalidTimezone("Timezone has been checked and should be valid") + })?; + field.data_type().create_serializer_with_tz(tz) + }else { + field.data_type().create_serializer() + }; result.append_column(column::new_column( name, serializer.serialize_clickhouse_format(&column.convert_full_column(), format)?, diff --git a/query/src/servers/http/v1/http_query_handlers.rs b/query/src/servers/http/v1/http_query_handlers.rs index 15d0ac20ef16..1eb617214270 100644 --- a/query/src/servers/http/v1/http_query_handlers.rs +++ b/query/src/servers/http/v1/http_query_handlers.rs @@ -202,7 +202,7 @@ pub(crate) async fn query_handler( .try_create_query(&query_id, ctx, req) .await; - // TODO(veeupup): get query_ctx's format_settings here + // TODO(veeupup): get global query_ctx's format_settings, because we cann't set session settings now let format = FormatSettings::default(); match query { Ok(query) => { diff --git a/query/src/servers/mysql/writers/query_result_writer.rs b/query/src/servers/mysql/writers/query_result_writer.rs index 71a6658a1875..05aabf96f9b9 100644 --- a/query/src/servers/mysql/writers/query_result_writer.rs +++ b/query/src/servers/mysql/writers/query_result_writer.rs @@ -111,7 +111,11 @@ impl<'a, W: std::io::Write> DFQueryResultWriter<'a, W> { } let block = blocks[0].clone(); - let tz: Tz = "UTC".parse().unwrap(); + let tz = + String::from_utf8(format.timezone.clone()).map_err(|_| ErrorCode::LogicalError("timezone must be set"))?; + let tz = tz.parse::().map_err(|_| { + ErrorCode::InvalidTimezone("Timezone has been checked and should be valid") + })?; match convert_schema(block.schema()) { Err(error) => Self::err(&error, dataset_writer), Ok(columns) => { diff --git a/query/src/sessions/query_ctx_shared.rs b/query/src/sessions/query_ctx_shared.rs index 3da38e24efbe..b77f5a8b6cdd 100644 --- a/query/src/sessions/query_ctx_shared.rs +++ b/query/src/sessions/query_ctx_shared.rs @@ -263,6 +263,7 @@ impl QueryContextShared { format.field_delimiter = settings.get_field_delimiter()?; format.empty_as_default = settings.get_empty_as_default()? > 0; format.skip_header = settings.get_skip_header()? > 0; + format.timezone = settings.get_timezone()?; } Ok(format) }