-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
to_rust.rs
106 lines (92 loc) · 3.84 KB
/
to_rust.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use polars_core::export::rayon::prelude::*;
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use polars_core::utils::arrow::ffi;
use polars_core::POOL;
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;
use pyo3::types::PyList;
use crate::error::PyPolarsErr;
pub fn field_to_rust(obj: &PyAny) -> PyResult<Field> {
let schema = Box::new(ffi::ArrowSchema::empty());
let schema_ptr = &*schema as *const ffi::ArrowSchema;
// make the conversion through PyArrow's private API
obj.call_method1("_export_to_c", (schema_ptr as Py_uintptr_t,))?;
let field = unsafe { ffi::import_field_from_c(schema.as_ref()).map_err(PyPolarsErr::from)? };
Ok((&field).into())
}
// PyList<Field> which you get by calling `list(schema)`
pub fn pyarrow_schema_to_rust(obj: &PyList) -> PyResult<Schema> {
obj.into_iter().map(field_to_rust).collect()
}
pub fn array_to_rust(obj: &Bound<PyAny>) -> PyResult<ArrayRef> {
// prepare a pointer to receive the Array struct
let array = Box::new(ffi::ArrowArray::empty());
let schema = Box::new(ffi::ArrowSchema::empty());
let array_ptr = &*array as *const ffi::ArrowArray;
let schema_ptr = &*schema as *const ffi::ArrowSchema;
// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
obj.call_method1(
"_export_to_c",
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
)?;
unsafe {
let field = ffi::import_field_from_c(schema.as_ref()).map_err(PyPolarsErr::from)?;
let array = ffi::import_array_from_c(*array, field.data_type).map_err(PyPolarsErr::from)?;
Ok(array)
}
}
pub fn to_rust_df(rb: &[Bound<PyAny>]) -> PyResult<DataFrame> {
let schema = rb
.first()
.ok_or_else(|| PyPolarsErr::Other("empty table".into()))?
.getattr("schema")?;
let names = schema.getattr("names")?.extract::<Vec<String>>()?;
let dfs = rb
.iter()
.map(|rb| {
let mut run_parallel = false;
let columns = (0..names.len())
.map(|i| {
let array = rb.call_method1("column", (i,))?;
let arr = array_to_rust(&array)?;
run_parallel |= matches!(
arr.data_type(),
ArrowDataType::Utf8 | ArrowDataType::Dictionary(_, _, _)
);
Ok(arr)
})
.collect::<PyResult<Vec<_>>>()?;
// we parallelize this part because we can have dtypes that are not zero copy
// for instance string -> large-utf8
// dict encoded to categorical
let columns = if run_parallel {
POOL.install(|| {
columns
.into_par_iter()
.enumerate()
.map(|(i, arr)| {
let s = Series::try_from((names[i].as_str(), arr))
.map_err(PyPolarsErr::from)?;
Ok(s)
})
.collect::<PyResult<Vec<_>>>()
})
} else {
columns
.into_iter()
.enumerate()
.map(|(i, arr)| {
let s = Series::try_from((names[i].as_str(), arr))
.map_err(PyPolarsErr::from)?;
Ok(s)
})
.collect::<PyResult<Vec<_>>>()
}?;
// no need to check as a record batch has the same guarantees
Ok(unsafe { DataFrame::new_no_checks(columns) })
})
.collect::<PyResult<Vec<_>>>()?;
Ok(accumulate_dataframes_vertical_unchecked(dfs))
}