Skip to content

Commit

Permalink
Merge pull request #3288 from Veeupup/string-improvement
Browse files Browse the repository at this point in the history
[string performence improvement]
  • Loading branch information
BohuTANG authored Dec 8, 2021
2 parents 9676d1a + 4046f6b commit d0d34ba
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 63 deletions.
2 changes: 2 additions & 0 deletions common/datavalues/src/arrays/string/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod builder;
mod iterator;
mod transform;

pub use builder::*;
use common_arrow::arrow::array::*;
Expand All @@ -23,6 +24,7 @@ use common_arrow::arrow::datatypes::DataType as ArrowDataType;
use common_exception::ErrorCode;
use common_exception::Result;
pub use iterator::*;
pub use transform::*;

use crate::prelude::*;

Expand Down
89 changes: 89 additions & 0 deletions common/datavalues/src/arrays/string/transform.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use common_arrow::arrow::array::*;
use common_arrow::arrow::bitmap::MutableBitmap;
use common_arrow::arrow::buffer::MutableBuffer;

use crate::prelude::*;

pub fn transform<F>(from: &DFStringArray, estimate_bytes: usize, mut f: F) -> DFStringArray
where F: FnMut(&[u8], &mut [u8]) -> Option<usize> {
let mut values: MutableBuffer<u8> = MutableBuffer::with_capacity(estimate_bytes);
let mut offsets: MutableBuffer<i64> = MutableBuffer::with_capacity(from.len() + 1);
let mut validity = MutableBitmap::with_capacity(from.len());
offsets.push(0);

let mut offset: usize = 0;

unsafe {
for x in from.into_no_null_iter() {
let bytes =
std::slice::from_raw_parts_mut(values.as_mut_ptr(), values.capacity() - offset);
if let Some(len) = f(x, bytes) {
offset += len;
offsets.push(i64::from_isize(offset as isize).unwrap());
validity.push(true);
} else {
offsets.push(offset as i64);
validity.push(false);
}
}
values.set_len(offset);
values.shrink_to_fit();
let validity = combine_validities(from.array.validity(), Some(&validity.into()));
let array = BinaryArray::<i64>::from_data_unchecked(
BinaryArray::<i64>::default_data_type(),
offsets.into(),
values.into(),
validity,
);
DFStringArray::from_arrow_array(&array)
}
}

pub fn transform_with_no_null<F>(
from: &DFStringArray,
estimate_bytes: usize,
mut f: F,
) -> DFStringArray
where
F: FnMut(&[u8], &mut [u8]) -> usize,
{
let mut values: MutableBuffer<u8> = MutableBuffer::with_capacity(estimate_bytes);
let mut offsets: MutableBuffer<i64> = MutableBuffer::with_capacity(from.len() + 1);
offsets.push(0);

let mut offset: usize = 0;

unsafe {
for x in from.into_no_null_iter() {
let bytes =
std::slice::from_raw_parts_mut(values.as_mut_ptr(), values.capacity() - offset);
let len = f(x, bytes);

offset += len;
offsets.push(i64::from_isize(offset as isize).unwrap());
}
values.set_len(offset);
values.shrink_to_fit();
let array = BinaryArray::<i64>::from_data_unchecked(
BinaryArray::<i64>::default_data_type(),
offsets.into(),
values.into(),
from.array.validity().cloned(),
);
DFStringArray::from_arrow_array(&array)
}
}
30 changes: 20 additions & 10 deletions common/functions/src/scalars/strings/base_64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_datavalues::prelude::DFStringArray;

use super::string2string::String2StringFunction;
use super::string2string::StringOperator;

Expand All @@ -22,27 +24,35 @@ pub struct Encode {

impl StringOperator for Encode {
#[inline]
fn apply<'a>(&'a mut self, s: &'a [u8]) -> Option<&'a [u8]> {
fn apply_with_no_null<'a>(&'a mut self, s: &'a [u8], buffer: &mut [u8]) -> usize {
self.buf.resize(s.len() * 4 / 3 + 4, 0);
let bytes_written = base64::encode_config_slice(s, base64::STANDARD, &mut self.buf);
Some(&self.buf[..bytes_written])
base64::encode_config_slice(s, base64::STANDARD, buffer)
}

fn estimate_bytes(&self, array: &DFStringArray) -> usize {
array.inner().values().len() * 4 / 3 + array.len() * 4
}
}

#[derive(Clone, Default)]
pub struct Decode {
buf: Vec<u8>,
}
pub struct Decode {}

impl StringOperator for Decode {
#[inline]
fn apply<'a>(&'a mut self, s: &'a [u8]) -> Option<&'a [u8]> {
self.buf.resize((s.len() + 3) / 4 * 3, 0);
match base64::decode_config_slice(s, base64::STANDARD, &mut self.buf) {
Ok(bw) => Some(&self.buf[..bw]),
fn apply<'a>(&'a mut self, s: &'a [u8], buffer: &mut [u8]) -> Option<usize> {
match base64::decode_config_slice(s, base64::STANDARD, buffer) {
Ok(len) => Some(len),
Err(_) => None,
}
}

fn may_turn_to_null(&self) -> bool {
true
}

fn estimate_bytes(&self, array: &DFStringArray) -> usize {
array.inner().values().len() * 4 / 3 + array.len() * 4
}
}

pub type Base64EncodeFunction = String2StringFunction<Encode>;
Expand Down
72 changes: 55 additions & 17 deletions common/functions/src/scalars/strings/quote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,71 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_datavalues::prelude::DFStringArray;

use super::string2string::String2StringFunction;
use super::string2string::StringOperator;
#[derive(Clone, Default)]
pub struct Quote {
buffer: Vec<u8>,
}
pub struct Quote {}

impl StringOperator for Quote {
#[inline]
fn apply<'a>(&'a mut self, value: &'a [u8]) -> Option<&'a [u8]> {
self.buffer.clear();

fn apply_with_no_null<'a>(&'a mut self, value: &'a [u8], buffer: &mut [u8]) -> usize {
let mut offset = 0;
for ch in value {
match *ch {
0 => self.buffer.extend_from_slice(&[b'\\', b'0']),
b'\'' => self.buffer.extend_from_slice(&[b'\\', b'\'']),
b'\"' => self.buffer.extend_from_slice(&[b'\\', b'\"']),
8 => self.buffer.extend_from_slice(&[b'\\', b'b']),
b'\n' => self.buffer.extend_from_slice(&[b'\\', b'n']),
b'\r' => self.buffer.extend_from_slice(&[b'\\', b'r']),
b'\t' => self.buffer.extend_from_slice(&[b'\\', b't']),
b'\\' => self.buffer.extend_from_slice(&[b'\\', b'\\']),
_ => self.buffer.push(*ch),
}
0 => {
let x = &mut buffer[offset..offset + 2];
x.copy_from_slice(&[b'\\', b'0']);
offset += 2;
}
b'\'' => {
let x = &mut buffer[offset..offset + 2];
x.copy_from_slice(&[b'\\', b'\'']);
offset += 2;
}
b'\"' => {
let x = &mut buffer[offset..offset + 2];
x.copy_from_slice(&[b'\\', b'\"']);
offset += 2;
}
8 => {
let x = &mut buffer[offset..offset + 2];
x.copy_from_slice(&[b'\\', b'b']);
offset += 2;
}
b'\n' => {
let x = &mut buffer[offset..offset + 2];
x.copy_from_slice(&[b'\\', b'n']);
offset += 2;
}
b'\r' => {
let x = &mut buffer[offset..offset + 2];
x.copy_from_slice(&[b'\\', b'r']);
offset += 2;
}
b'\t' => {
let x = &mut buffer[offset..offset + 2];
x.copy_from_slice(&[b'\\', b't']);
offset += 2;
}
b'\\' => {
let x = &mut buffer[offset..offset + 2];
x.copy_from_slice(&[b'\\', b'\\']);
offset += 2;
}
_ => {
let x = &mut buffer[offset..offset + 1];
x.copy_from_slice(&[*ch]);
offset += 1;
}
};
}
offset
}

Some(&self.buffer[..self.buffer.len()])
fn estimate_bytes(&self, array: &DFStringArray) -> usize {
array.inner().values().len() * 2
}
}

Expand Down
15 changes: 7 additions & 8 deletions common/functions/src/scalars/strings/reverse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,20 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::string2string::String2StringFunction;
use super::string2string::StringOperator;

#[derive(Clone, Default)]
pub struct Reverse {
buffer: Vec<u8>,
}
pub struct Reverse {}

impl StringOperator for Reverse {
#[inline]
fn apply<'a>(&'a mut self, s: &'a [u8]) -> Option<&'a [u8]> {
self.buffer.clear();
self.buffer.extend_from_slice(s);
self.buffer.reverse();
Some(&self.buffer[..])
fn apply_with_no_null<'a>(&'a mut self, s: &'a [u8], buffer: &mut [u8]) -> usize {
let buffer = &mut buffer[0..s.len()];
buffer.copy_from_slice(s);
buffer.reverse();
s.len()
}
}

Expand Down
45 changes: 31 additions & 14 deletions common/functions/src/scalars/strings/string2string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,24 @@ use common_datavalues::prelude::*;
use common_exception::ErrorCode;
use common_exception::Result;

// use common_tracing::tracing;
use crate::scalars::function_factory::FunctionDescription;
use crate::scalars::function_factory::FunctionFeatures;
use crate::scalars::Function;

pub trait StringOperator: Send + Sync + Clone + Default + 'static {
fn apply<'a>(&'a mut self, _: &'a [u8]) -> Option<&'a [u8]>;
fn apply<'a>(&'a mut self, _: &'a [u8], _: &mut [u8]) -> Option<usize> {
None
}
fn apply_with_no_null<'a>(&'a mut self, _: &'a [u8], _: &mut [u8]) -> usize {
0
}
fn may_turn_to_null(&self) -> bool {
false
}
fn estimate_bytes(&self, array: &DFStringArray) -> usize {
array.inner().values().len()
}
}

/// A common function template that transform string column into string column
Expand Down Expand Up @@ -74,21 +86,26 @@ impl<T: StringOperator> Function for String2StringFunction<T> {

fn eval(&self, columns: &DataColumnsWithField, input_rows: usize) -> Result<DataColumn> {
let mut op = T::default();
let column: DataColumn = columns[0]

let array = columns[0]
.column()
.cast_with_type(&DataType::String)?
.to_minimal_array()?
.string()?
.into_iter()
.fold(
StringArrayBuilder::with_capacity(columns[0].column().len()),
|mut builder, s| {
builder.append_option(s.and_then(|values| op.apply(values)));
builder
},
)
.finish()
.into();
.to_minimal_array()?;

let estimate_bytes = op.estimate_bytes(array.string()?);

let column: DataColumn = if op.may_turn_to_null() {
transform(array.string()?, estimate_bytes, |val, buffer| {
op.apply(val, buffer)
})
.into()
} else {
transform_with_no_null(array.string()?, estimate_bytes, |val, buffer| {
op.apply_with_no_null(val, buffer)
})
.into()
};

Ok(column.resize_constant(input_rows))
}
}
Expand Down
Loading

0 comments on commit d0d34ba

Please sign in to comment.