Skip to content

Commit

Permalink
Deprecate chunksize from dask_cudf.read_csv (#12394)
Browse files Browse the repository at this point in the history
Deprecates `chunksize` in favor of `blocksize` to be consistent with `dask.dataframe.read_csv`.

Closes #11847

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #12394
  • Loading branch information
rjzamora authored Jan 13, 2023
1 parent 126829c commit ec7f8c6
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 33 deletions.
10 changes: 1 addition & 9 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,15 +517,7 @@ def read_orc(*args, **kwargs):
def read_csv(*args, **kwargs):
from dask_cudf.io import read_csv

chunksize = kwargs.pop("chunksize", None)
blocksize = kwargs.pop("blocksize", "default")
if chunksize is None and blocksize != "default":
chunksize = blocksize
return read_csv(
*args,
chunksize=chunksize,
**kwargs,
)
return read_csv(*args, **kwargs)

@staticmethod
def read_hdf(*args, **kwargs):
Expand Down
52 changes: 34 additions & 18 deletions python/dask_cudf/dask_cudf/io/csv.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2023, NVIDIA CORPORATION.

import os
from glob import glob
Expand All @@ -14,7 +14,7 @@
import cudf


def read_csv(path, chunksize="256 MiB", **kwargs):
def read_csv(path, blocksize="default", **kwargs):
"""
Read CSV files into a dask_cudf.DataFrame
Expand All @@ -27,7 +27,7 @@ def read_csv(path, chunksize="256 MiB", **kwargs):
In some cases it can break up large files:
>>> df = dask_cudf.read_csv("largefile.csv", chunksize="256 MiB")
>>> df = dask_cudf.read_csv("largefile.csv", blocksize="256 MiB")
It can read CSV files from external resources (e.g. S3, HTTP, FTP)
Expand All @@ -46,7 +46,7 @@ def read_csv(path, chunksize="256 MiB", **kwargs):
py._path.local.LocalPath), URL (including http, ftp, and S3 locations),
or any object with a read() method (such as builtin open() file
handler function or StringIO).
chunksize : int or str, default "256 MiB"
blocksize : int or str, default "256 MiB"
The target task partition size. If `None`, a single block
is used for each file.
**kwargs : dict
Expand All @@ -62,16 +62,32 @@ def read_csv(path, chunksize="256 MiB", **kwargs):
1 2 hello
2 3 ai
"""

# Handle `chunksize` deprecation
if "chunksize" in kwargs:
chunksize = kwargs.pop("chunksize", "default")
warn(
"`chunksize` is deprecated and will be removed in the future. "
"Please use `blocksize` instead.",
FutureWarning,
)
if blocksize == "default":
blocksize = chunksize

# Set default `blocksize`
if blocksize == "default":
blocksize = "256 MiB"

if "://" in str(path):
func = make_reader(cudf.read_csv, "read_csv", "CSV")
return func(path, blocksize=chunksize, **kwargs)
return func(path, blocksize=blocksize, **kwargs)
else:
return _internal_read_csv(path=path, chunksize=chunksize, **kwargs)
return _internal_read_csv(path=path, blocksize=blocksize, **kwargs)


def _internal_read_csv(path, chunksize="256 MiB", **kwargs):
if isinstance(chunksize, str):
chunksize = parse_bytes(chunksize)
def _internal_read_csv(path, blocksize="256 MiB", **kwargs):
if isinstance(blocksize, str):
blocksize = parse_bytes(blocksize)

if isinstance(path, list):
filenames = path
Expand All @@ -96,19 +112,19 @@ def _internal_read_csv(path, chunksize="256 MiB", **kwargs):
# Infer compression from first path by default
compression = infer_compression(filenames[0])

if compression and chunksize:
if compression and blocksize:
# compressed CSVs reading must read the entire file
kwargs.pop("byte_range", None)
warn(
"Warning %s compression does not support breaking apart files\n"
"Please ensure that each individual file can fit in memory and\n"
"use the keyword ``chunksize=None to remove this message``\n"
"Setting ``chunksize=(size of file)``" % compression
"use the keyword ``blocksize=None to remove this message``\n"
"Setting ``blocksize=(size of file)``" % compression
)
chunksize = None
blocksize = None

if chunksize is None:
return read_csv_without_chunksize(path, **kwargs)
if blocksize is None:
return read_csv_without_blocksize(path, **kwargs)

# Let dask.dataframe generate meta
dask_reader = make_reader(cudf.read_csv, "read_csv", "CSV")
Expand All @@ -128,11 +144,11 @@ def _internal_read_csv(path, chunksize="256 MiB", **kwargs):

for fn in filenames:
size = os.path.getsize(fn)
for start in range(0, size, chunksize):
for start in range(0, size, blocksize):
kwargs2 = kwargs.copy()
kwargs2["byte_range"] = (
start,
chunksize,
blocksize,
) # specify which chunk of the file we care about
if start != 0:
kwargs2["names"] = names # no header in the middle of the file
Expand All @@ -149,7 +165,7 @@ def _read_csv(fn, dtypes=None, **kwargs):
return cudf.read_csv(fn, **kwargs)


def read_csv_without_chunksize(path, **kwargs):
def read_csv_without_blocksize(path, **kwargs):
"""Read entire CSV with optional compression (gzip/zip)
Parameters
Expand Down
17 changes: 11 additions & 6 deletions python/dask_cudf/dask_cudf/io/tests/test_csv.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Copyright (c) 2019-2023, NVIDIA CORPORATION.

import gzip
import os
Expand Down Expand Up @@ -85,7 +85,7 @@ def test_read_csv_w_bytes(tmp_path):
df = pd.DataFrame(dict(x=np.arange(20), y=np.arange(20)))
df.to_csv(tmp_path / "data-*.csv", index=False)

df2 = dask_cudf.read_csv(tmp_path / "*.csv", chunksize="50 B")
df2 = dask_cudf.read_csv(tmp_path / "*.csv", blocksize="50 B")
assert df2.npartitions == 3
dd.assert_eq(df2, df, check_index=False)

Expand All @@ -95,7 +95,7 @@ def test_read_csv_compression(tmp_path):
df.to_csv(tmp_path / "data.csv.gz", index=False)

with pytest.warns(UserWarning) as w:
df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", chunksize="50 B")
df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", blocksize="50 B")

assert len(w) == 1
msg = str(w[0].message)
Expand All @@ -105,7 +105,7 @@ def test_read_csv_compression(tmp_path):
dd.assert_eq(df2, df, check_index=False)

with warnings.catch_warnings(record=True) as record:
df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", chunksize=None)
df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", blocksize=None)

assert not record

Expand All @@ -130,7 +130,7 @@ def test_read_csv_compression_file_list(tmp_path):

@pytest.mark.parametrize("size", [0, 3, 20])
@pytest.mark.parametrize("compression", ["gzip", None])
def test_read_csv_chunksize_none(tmp_path, compression, size):
def test_read_csv_blocksize_none(tmp_path, compression, size):
df = pd.DataFrame(dict(x=np.arange(size), y=np.arange(size)))

path = (
Expand All @@ -146,9 +146,14 @@ def test_read_csv_chunksize_none(tmp_path, compression, size):
typ = None

df.to_csv(path, index=False, compression=compression)
df2 = dask_cudf.read_csv(path, chunksize=None, dtype=typ)
df2 = dask_cudf.read_csv(path, blocksize=None, dtype=typ)
dd.assert_eq(df, df2)

# Test chunksize deprecation
with pytest.warns(FutureWarning, match="deprecated"):
df3 = dask_cudf.read_csv(path, chunksize=None, dtype=typ)
dd.assert_eq(df, df3)


@pytest.mark.parametrize("dtype", [{"b": str, "c": int}, None])
def test_csv_reader_usecols(tmp_path, dtype):
Expand Down

0 comments on commit ec7f8c6

Please sign in to comment.