From 6ddb5f18528d39bb06832f07df0422b501876306 Mon Sep 17 00:00:00 2001 From: Jo Shinonome Date: Thu, 8 Aug 2024 18:58:15 +0800 Subject: [PATCH] add enable_compression --- crates/kola/src/io.rs | 14 +++++++++++--- py-kola/kola/util.py | 12 ++++++++++-- py-kola/src/q.rs | 3 ++- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/crates/kola/src/io.rs b/crates/kola/src/io.rs index beb68e0..0ecca4c 100644 --- a/crates/kola/src/io.rs +++ b/crates/kola/src/io.rs @@ -8,7 +8,7 @@ use polars::frame::DataFrame; use xxhash_rust::xxh32; use crate::errors::KolaError; -use crate::serde::{deserialize, serialize}; +use crate::serde::{compress, deserialize, serialize}; use crate::types::{MsgType, K}; pub fn read_binary_table(path: &str) -> Result { @@ -74,13 +74,21 @@ pub fn unzip_lz4(buf: &Vec, footer_index: usize, block_num: usize) -> Vec Result, KolaError> { +pub fn generate_ipc_msg( + msg_type: MsgType, + enable_compression: bool, + k: K, +) -> Result, KolaError> { let length = k.len()?; let mut vec: Vec = Vec::with_capacity(length + 8); vec.write(&[1, msg_type as u8, 0, 0]).unwrap(); vec.write(&(length as u32 + 8).to_le_bytes()).unwrap(); vec.write(&serialize(&k)?).unwrap(); - Ok(vec) + if enable_compression { + Ok(compress(vec)) + } else { + Ok(vec) + } } #[cfg(test)] diff --git a/py-kola/kola/util.py b/py-kola/kola/util.py index dd3c72f..08ba55d 100644 --- a/py-kola/kola/util.py +++ b/py-kola/kola/util.py @@ -11,10 +11,18 @@ def read_binary(filepath: str) -> pl.DataFrame: return read_binary_table(filepath) -def generate_ipc(msg_type: Literal["async", "sync", "response"], any: object) -> bytes: +def generate_ipc( + msg_type: Literal["async", "sync", "response"], + enable_compression: bool, + any: object, +) -> bytes: if msg_type not in ["async", "sync", "response"]: raise Exception("Expect async|sync|response msg type, but got %s", msg_type) - return generate_ipc_msg(["async", "sync", "response"].index(msg_type), any) + return generate_ipc_msg( + ["async", "sync", "response"].index(msg_type), + enable_compression, + any, + ) __all__ = [read_binary] diff --git a/py-kola/src/q.rs b/py-kola/src/q.rs index 2e80dbe..9237a99 100644 --- a/py-kola/src/q.rs +++ b/py-kola/src/q.rs @@ -263,6 +263,7 @@ pub fn read_binary_table(filepath: &str) -> PyResult { pub fn generate_ipc_msg<'a>( py: Python<'a>, msg_type: u8, + enable_compression: bool, any: Bound, ) -> PyResult> { let msg_type = if msg_type == 0 { @@ -272,7 +273,7 @@ pub fn generate_ipc_msg<'a>( } else { MsgType::Response }; - match kola::io::generate_ipc_msg(msg_type, cast_to_k(any)?) { + match kola::io::generate_ipc_msg(msg_type, enable_compression, cast_to_k(any)?) { Ok(bytes) => Ok(PyBytes::new_bound(py, &bytes)), Err(e) => Err(PyKolaError::from(e).into()), }