Skip to content

Commit

Permalink
feat: More memmaps through semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 23, 2024
1 parent 5294cb5 commit 292c445
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
3 changes: 2 additions & 1 deletion crates/polars-io/src/csv/read/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use polars_core::prelude::*;
use polars_core::{config, POOL};
use polars_error::feature_gated;
use polars_utils::index::Bounded;
use polars_utils::mmap::MMapSemaphore;
use rayon::prelude::*;

use super::buffer::Buffer;
Expand Down Expand Up @@ -38,7 +39,7 @@ pub fn count_rows(
polars_utils::open_file(path)?
};

let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let mmap = MMapSemaphore::new_from_file(&file).unwrap();
let owned = &mut vec![];
let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;

Expand Down
5 changes: 3 additions & 2 deletions crates/polars-io/src/ipc/ipc_reader_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_core::datatypes::IDX_DTYPE;
use polars_core::frame::DataFrame;
use polars_core::schema::{Schema, SchemaExt};
use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult};
use polars_utils::mmap::MMapSemaphore;
use polars_utils::pl_str::PlSmallStr;

use crate::cloud::{
Expand Down Expand Up @@ -136,7 +137,7 @@ impl IpcReaderAsync {
// TODO: Only download what is needed rather than the entire file by
// making use of the projection, row limit, predicate and such.
let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
let bytes = unsafe { memmap::Mmap::map(&file) }.unwrap();
let bytes = MMapSemaphore::new_from_file(&file).unwrap();

let projection = match options.projection.as_deref() {
Some(projection) => {
Expand Down Expand Up @@ -185,7 +186,7 @@ impl IpcReaderAsync {
// TODO: Only download what is needed rather than the entire file by
// making use of the projection, row limit, predicate and such.
let file = tokio::task::block_in_place(|| self.cache_entry.try_open_check_latest())?;
let bytes = unsafe { memmap::Mmap::map(&file) }.unwrap();
let bytes = MMapSemaphore::new_from_file(&file).unwrap();
get_row_count(&mut std::io::Cursor::new(bytes.as_ref()))
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/polars-utils/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,16 @@ pub struct MMapSemaphore {
impl MMapSemaphore {
pub fn new_from_file_with_options(
file: &File,
options: MmapOptions,
#[allow(unused_mut)] mut options: MmapOptions,
) -> PolarsResult<MMapSemaphore> {
// Set mmap size based on seek to end when running under Emscripten
#[cfg(target_os = "emscripten")]
{
let mut file = file;
let size = file.seek(SeekFrom::End(0)).unwrap();
options.len((size - offset) as usize);
}

let mmap = unsafe { options.map(file) }?;

#[cfg(target_family = "unix")]
Expand Down

0 comments on commit 292c445

Please sign in to comment.