Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate chunksize from dask_cudf.read_csv #12394

Merged
merged 15 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,15 +517,7 @@ def read_orc(*args, **kwargs):
def read_csv(*args, **kwargs):
from dask_cudf.io import read_csv

chunksize = kwargs.pop("chunksize", None)
blocksize = kwargs.pop("blocksize", "default")
if chunksize is None and blocksize != "default":
chunksize = blocksize
return read_csv(
*args,
chunksize=chunksize,
**kwargs,
)
return read_csv(*args, **kwargs)

@staticmethod
def read_hdf(*args, **kwargs):
Expand Down
52 changes: 34 additions & 18 deletions python/dask_cudf/dask_cudf/io/csv.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

import os
from glob import glob
Expand All @@ -14,7 +14,7 @@
import cudf


def read_csv(path, chunksize="256 MiB", **kwargs):
def read_csv(path, blocksize="default", **kwargs):
"""
Read CSV files into a dask_cudf.DataFrame

Expand All @@ -27,7 +27,7 @@ def read_csv(path, chunksize="256 MiB", **kwargs):

In some cases it can break up large files:

>>> df = dask_cudf.read_csv("largefile.csv", chunksize="256 MiB")
>>> df = dask_cudf.read_csv("largefile.csv", blocksize="256 MiB")

It can read CSV files from external resources (e.g. S3, HTTP, FTP)

Expand All @@ -46,7 +46,7 @@ def read_csv(path, chunksize="256 MiB", **kwargs):
py._path.local.LocalPath), URL (including http, ftp, and S3 locations),
or any object with a read() method (such as builtin open() file
handler function or StringIO).
chunksize : int or str, default "256 MiB"
blocksize : int or str, default "256 MiB"
The target task partition size. If `None`, a single block
is used for each file.
**kwargs : dict
Expand All @@ -62,16 +62,32 @@ def read_csv(path, chunksize="256 MiB", **kwargs):
1 2 hello
2 3 ai
"""

# Handle `chunksize` deprecation
if "chunksize" in kwargs:
chunksize = kwargs.pop("chunksize", "default")
warn(
"`chunksize` is deprecated and will be removed in the future. "
"Please use `blocksize` instead.",
FutureWarning,
)
if blocksize == "default":
blocksize = chunksize

# Set default `blocksize`
if blocksize == "default":
blocksize = "256 MiB"

if "://" in str(path):
func = make_reader(cudf.read_csv, "read_csv", "CSV")
return func(path, blocksize=chunksize, **kwargs)
return func(path, blocksize=blocksize, **kwargs)
else:
return _internal_read_csv(path=path, chunksize=chunksize, **kwargs)
return _internal_read_csv(path=path, blocksize=blocksize, **kwargs)


def _internal_read_csv(path, chunksize="256 MiB", **kwargs):
if isinstance(chunksize, str):
chunksize = parse_bytes(chunksize)
def _internal_read_csv(path, blocksize="256 MiB", **kwargs):
if isinstance(blocksize, str):
blocksize = parse_bytes(blocksize)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved

if isinstance(path, list):
filenames = path
Expand All @@ -96,19 +112,19 @@ def _internal_read_csv(path, chunksize="256 MiB", **kwargs):
# Infer compression from first path by default
compression = infer_compression(filenames[0])

if compression and chunksize:
if compression and blocksize:
# compressed CSVs reading must read the entire file
kwargs.pop("byte_range", None)
warn(
"Warning %s compression does not support breaking apart files\n"
"Please ensure that each individual file can fit in memory and\n"
"use the keyword ``chunksize=None to remove this message``\n"
"Setting ``chunksize=(size of file)``" % compression
"use the keyword ``blocksize=None to remove this message``\n"
"Setting ``blocksize=(size of file)``" % compression
)
chunksize = None
blocksize = None

if chunksize is None:
return read_csv_without_chunksize(path, **kwargs)
if blocksize is None:
return read_csv_without_blocksize(path, **kwargs)

# Let dask.dataframe generate meta
dask_reader = make_reader(cudf.read_csv, "read_csv", "CSV")
Expand All @@ -128,11 +144,11 @@ def _internal_read_csv(path, chunksize="256 MiB", **kwargs):

for fn in filenames:
size = os.path.getsize(fn)
for start in range(0, size, chunksize):
for start in range(0, size, blocksize):
kwargs2 = kwargs.copy()
kwargs2["byte_range"] = (
start,
chunksize,
blocksize,
) # specify which chunk of the file we care about
if start != 0:
kwargs2["names"] = names # no header in the middle of the file
Expand All @@ -149,7 +165,7 @@ def _read_csv(fn, dtypes=None, **kwargs):
return cudf.read_csv(fn, **kwargs)


def read_csv_without_chunksize(path, **kwargs):
def read_csv_without_blocksize(path, **kwargs):
"""Read entire CSV with optional compression (gzip/zip)

Parameters
Expand Down
17 changes: 11 additions & 6 deletions python/dask_cudf/dask_cudf/io/tests/test_csv.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Copyright (c) 2019-2023, NVIDIA CORPORATION.

import gzip
import os
Expand Down Expand Up @@ -85,7 +85,7 @@ def test_read_csv_w_bytes(tmp_path):
df = pd.DataFrame(dict(x=np.arange(20), y=np.arange(20)))
df.to_csv(tmp_path / "data-*.csv", index=False)

df2 = dask_cudf.read_csv(tmp_path / "*.csv", chunksize="50 B")
df2 = dask_cudf.read_csv(tmp_path / "*.csv", blocksize="50 B")
assert df2.npartitions == 3
dd.assert_eq(df2, df, check_index=False)

Expand All @@ -95,7 +95,7 @@ def test_read_csv_compression(tmp_path):
df.to_csv(tmp_path / "data.csv.gz", index=False)

with pytest.warns(UserWarning) as w:
df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", chunksize="50 B")
df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", blocksize="50 B")

assert len(w) == 1
msg = str(w[0].message)
Expand All @@ -105,7 +105,7 @@ def test_read_csv_compression(tmp_path):
dd.assert_eq(df2, df, check_index=False)

with warnings.catch_warnings(record=True) as record:
df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", chunksize=None)
df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", blocksize=None)

assert not record

Expand All @@ -130,7 +130,7 @@ def test_read_csv_compression_file_list(tmp_path):

@pytest.mark.parametrize("size", [0, 3, 20])
@pytest.mark.parametrize("compression", ["gzip", None])
def test_read_csv_chunksize_none(tmp_path, compression, size):
def test_read_csv_blocksize_none(tmp_path, compression, size):
df = pd.DataFrame(dict(x=np.arange(size), y=np.arange(size)))

path = (
Expand All @@ -146,9 +146,14 @@ def test_read_csv_chunksize_none(tmp_path, compression, size):
typ = None

df.to_csv(path, index=False, compression=compression)
df2 = dask_cudf.read_csv(path, chunksize=None, dtype=typ)
df2 = dask_cudf.read_csv(path, blocksize=None, dtype=typ)
dd.assert_eq(df, df2)

# Test chunksize deprecation
with pytest.warns(FutureWarning, match="deprecated"):
df3 = dask_cudf.read_csv(path, chunksize=None, dtype=typ)
dd.assert_eq(df, df3)


@pytest.mark.parametrize("dtype", [{"b": str, "c": int}, None])
def test_csv_reader_usecols(tmp_path, dtype):
Expand Down