Skip to content

Commit

Permalink
add enable_compression
Browse files Browse the repository at this point in the history
  • Loading branch information
jshinonome committed Aug 8, 2024
1 parent 3821cc9 commit 6ddb5f1
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 6 deletions.
14 changes: 11 additions & 3 deletions crates/kola/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use polars::frame::DataFrame;
use xxhash_rust::xxh32;

use crate::errors::KolaError;
use crate::serde::{deserialize, serialize};
use crate::serde::{compress, deserialize, serialize};
use crate::types::{MsgType, K};

pub fn read_binary_table(path: &str) -> Result<DataFrame, KolaError> {
Expand Down Expand Up @@ -74,13 +74,21 @@ pub fn unzip_lz4(buf: &Vec<u8>, footer_index: usize, block_num: usize) -> Vec<u8
unzipped_bytes
}

pub fn generate_ipc_msg(msg_type: MsgType, k: K) -> Result<Vec<u8>, KolaError> {
pub fn generate_ipc_msg(
msg_type: MsgType,
enable_compression: bool,
k: K,
) -> Result<Vec<u8>, KolaError> {
let length = k.len()?;
let mut vec: Vec<u8> = Vec::with_capacity(length + 8);
vec.write(&[1, msg_type as u8, 0, 0]).unwrap();
vec.write(&(length as u32 + 8).to_le_bytes()).unwrap();
vec.write(&serialize(&k)?).unwrap();
Ok(vec)
if enable_compression {
Ok(compress(vec))
} else {
Ok(vec)
}
}

#[cfg(test)]
Expand Down
12 changes: 10 additions & 2 deletions py-kola/kola/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@ def read_binary(filepath: str) -> pl.DataFrame:
return read_binary_table(filepath)


def generate_ipc(msg_type: Literal["async", "sync", "response"], any: object) -> bytes:
def generate_ipc(
msg_type: Literal["async", "sync", "response"],
enable_compression: bool,
any: object,
) -> bytes:
if msg_type not in ["async", "sync", "response"]:
raise Exception("Expect async|sync|response msg type, but got %s", msg_type)
return generate_ipc_msg(["async", "sync", "response"].index(msg_type), any)
return generate_ipc_msg(
["async", "sync", "response"].index(msg_type),
enable_compression,
any,
)


__all__ = [read_binary]
3 changes: 2 additions & 1 deletion py-kola/src/q.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ pub fn read_binary_table(filepath: &str) -> PyResult<PyDataFrame> {
pub fn generate_ipc_msg<'a>(
py: Python<'a>,
msg_type: u8,
enable_compression: bool,
any: Bound<PyAny>,
) -> PyResult<Bound<'a, PyBytes>> {
let msg_type = if msg_type == 0 {
Expand All @@ -272,7 +273,7 @@ pub fn generate_ipc_msg<'a>(
} else {
MsgType::Response
};
match kola::io::generate_ipc_msg(msg_type, cast_to_k(any)?) {
match kola::io::generate_ipc_msg(msg_type, enable_compression, cast_to_k(any)?) {
Ok(bytes) => Ok(PyBytes::new_bound(py, &bytes)),
Err(e) => Err(PyKolaError::from(e).into()),
}
Expand Down

0 comments on commit 6ddb5f1

Please sign in to comment.