From 292c4459e8c6d20ed9f84d4366b2d4c2215ab0c6 Mon Sep 17 00:00:00 2001 From: ritchie Date: Mon, 23 Dec 2024 16:08:02 +0100 Subject: [PATCH 1/2] feat: More memmaps through semaphore --- crates/polars-io/src/csv/read/parser.rs | 3 ++- crates/polars-io/src/ipc/ipc_reader_async.rs | 5 +++-- crates/polars-utils/src/mmap.rs | 10 +++++++++- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/crates/polars-io/src/csv/read/parser.rs b/crates/polars-io/src/csv/read/parser.rs index fe46429917bf..c296e055e56e 100644 --- a/crates/polars-io/src/csv/read/parser.rs +++ b/crates/polars-io/src/csv/read/parser.rs @@ -6,6 +6,7 @@ use polars_core::prelude::*; use polars_core::{config, POOL}; use polars_error::feature_gated; use polars_utils::index::Bounded; +use polars_utils::mmap::MMapSemaphore; use rayon::prelude::*; use super::buffer::Buffer; @@ -38,7 +39,7 @@ pub fn count_rows( polars_utils::open_file(path)? }; - let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; + let mmap = MMapSemaphore::new_from_file(&file).unwrap(); let owned = &mut vec![]; let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?; diff --git a/crates/polars-io/src/ipc/ipc_reader_async.rs b/crates/polars-io/src/ipc/ipc_reader_async.rs index a6626bd3a04f..58810f239ef1 100644 --- a/crates/polars-io/src/ipc/ipc_reader_async.rs +++ b/crates/polars-io/src/ipc/ipc_reader_async.rs @@ -7,6 +7,7 @@ use polars_core::datatypes::IDX_DTYPE; use polars_core::frame::DataFrame; use polars_core::schema::{Schema, SchemaExt}; use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult}; +use polars_utils::mmap::MMapSemaphore; use polars_utils::pl_str::PlSmallStr; use crate::cloud::{ @@ -136,7 +137,7 @@ impl IpcReaderAsync { // TODO: Only download what is needed rather than the entire file by // making use of the projection, row limit, predicate and such. let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?; - let bytes = unsafe { memmap::Mmap::map(&file) }.unwrap(); + let bytes = MMapSemaphore::new_from_file(&file).unwrap(); let projection = match options.projection.as_deref() { Some(projection) => { @@ -185,7 +186,7 @@ impl IpcReaderAsync { // TODO: Only download what is needed rather than the entire file by // making use of the projection, row limit, predicate and such. let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?; - let bytes = unsafe { memmap::Mmap::map(&file) }.unwrap(); + let bytes = MMapSemaphore::new_from_file(&file).unwrap(); get_row_count(&mut std::io::Cursor::new(bytes.as_ref())) } } diff --git a/crates/polars-utils/src/mmap.rs b/crates/polars-utils/src/mmap.rs index bdac6e095a94..188dda93a277 100644 --- a/crates/polars-utils/src/mmap.rs +++ b/crates/polars-utils/src/mmap.rs @@ -282,8 +282,16 @@ pub struct MMapSemaphore { impl MMapSemaphore { pub fn new_from_file_with_options( file: &File, - options: MmapOptions, + #[allow(unused_mut)] mut options: MmapOptions, ) -> PolarsResult { + // Set mmap size based on seek to end when running under Emscripten + #[cfg(target_os = "emscripten")] + { + let mut file = file; + let size = file.seek(SeekFrom::End(0)).unwrap(); + options.len((size - offset) as usize); + } + let mmap = unsafe { options.map(file) }?; #[cfg(target_family = "unix")] From 4e130dd8351cd26d74ab7e66008cf9c227520392 Mon Sep 17 00:00:00 2001 From: ritchie Date: Mon, 23 Dec 2024 16:18:10 +0100 Subject: [PATCH 2/2] imports --- crates/polars-utils/src/mmap.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/polars-utils/src/mmap.rs b/crates/polars-utils/src/mmap.rs index 188dda93a277..c3331e3213ed 100644 --- a/crates/polars-utils/src/mmap.rs +++ b/crates/polars-utils/src/mmap.rs @@ -287,6 +287,7 @@ impl MMapSemaphore { // Set mmap size based on seek to end when running under Emscripten #[cfg(target_os = "emscripten")] { + use std::io::{Seek, SeekFrom}; let mut file = file; let size = file.seek(SeekFrom::End(0)).unwrap(); options.len((size - offset) as usize);