diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index e6247bfd3ff..b6be5ade6ba 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -517,15 +517,7 @@ def read_orc(*args, **kwargs): def read_csv(*args, **kwargs): from dask_cudf.io import read_csv - chunksize = kwargs.pop("chunksize", None) - blocksize = kwargs.pop("blocksize", "default") - if chunksize is None and blocksize != "default": - chunksize = blocksize - return read_csv( - *args, - chunksize=chunksize, - **kwargs, - ) + return read_csv(*args, **kwargs) @staticmethod def read_hdf(*args, **kwargs): diff --git a/python/dask_cudf/dask_cudf/io/csv.py b/python/dask_cudf/dask_cudf/io/csv.py index ebb02e3b6d4..b4d080fd182 100644 --- a/python/dask_cudf/dask_cudf/io/csv.py +++ b/python/dask_cudf/dask_cudf/io/csv.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2023, NVIDIA CORPORATION. import os from glob import glob @@ -14,7 +14,7 @@ import cudf -def read_csv(path, chunksize="256 MiB", **kwargs): +def read_csv(path, blocksize="default", **kwargs): """ Read CSV files into a dask_cudf.DataFrame @@ -27,7 +27,7 @@ def read_csv(path, chunksize="256 MiB", **kwargs): In some cases it can break up large files: - >>> df = dask_cudf.read_csv("largefile.csv", chunksize="256 MiB") + >>> df = dask_cudf.read_csv("largefile.csv", blocksize="256 MiB") It can read CSV files from external resources (e.g. S3, HTTP, FTP) @@ -46,7 +46,7 @@ def read_csv(path, chunksize="256 MiB", **kwargs): py._path.local.LocalPath), URL (including http, ftp, and S3 locations), or any object with a read() method (such as builtin open() file handler function or StringIO). - chunksize : int or str, default "256 MiB" + blocksize : int or str, default "256 MiB" The target task partition size. If `None`, a single block is used for each file. **kwargs : dict @@ -62,16 +62,32 @@ def read_csv(path, chunksize="256 MiB", **kwargs): 1 2 hello 2 3 ai """ + + # Handle `chunksize` deprecation + if "chunksize" in kwargs: + chunksize = kwargs.pop("chunksize", "default") + warn( + "`chunksize` is deprecated and will be removed in the future. " + "Please use `blocksize` instead.", + FutureWarning, + ) + if blocksize == "default": + blocksize = chunksize + + # Set default `blocksize` + if blocksize == "default": + blocksize = "256 MiB" + if "://" in str(path): func = make_reader(cudf.read_csv, "read_csv", "CSV") - return func(path, blocksize=chunksize, **kwargs) + return func(path, blocksize=blocksize, **kwargs) else: - return _internal_read_csv(path=path, chunksize=chunksize, **kwargs) + return _internal_read_csv(path=path, blocksize=blocksize, **kwargs) -def _internal_read_csv(path, chunksize="256 MiB", **kwargs): - if isinstance(chunksize, str): - chunksize = parse_bytes(chunksize) +def _internal_read_csv(path, blocksize="256 MiB", **kwargs): + if isinstance(blocksize, str): + blocksize = parse_bytes(blocksize) if isinstance(path, list): filenames = path @@ -96,19 +112,19 @@ def _internal_read_csv(path, chunksize="256 MiB", **kwargs): # Infer compression from first path by default compression = infer_compression(filenames[0]) - if compression and chunksize: + if compression and blocksize: # compressed CSVs reading must read the entire file kwargs.pop("byte_range", None) warn( "Warning %s compression does not support breaking apart files\n" "Please ensure that each individual file can fit in memory and\n" - "use the keyword ``chunksize=None to remove this message``\n" - "Setting ``chunksize=(size of file)``" % compression + "use the keyword ``blocksize=None to remove this message``\n" + "Setting ``blocksize=(size of file)``" % compression ) - chunksize = None + blocksize = None - if chunksize is None: - return read_csv_without_chunksize(path, **kwargs) + if blocksize is None: + return read_csv_without_blocksize(path, **kwargs) # Let dask.dataframe generate meta dask_reader = make_reader(cudf.read_csv, "read_csv", "CSV") @@ -128,11 +144,11 @@ def _internal_read_csv(path, chunksize="256 MiB", **kwargs): for fn in filenames: size = os.path.getsize(fn) - for start in range(0, size, chunksize): + for start in range(0, size, blocksize): kwargs2 = kwargs.copy() kwargs2["byte_range"] = ( start, - chunksize, + blocksize, ) # specify which chunk of the file we care about if start != 0: kwargs2["names"] = names # no header in the middle of the file @@ -149,7 +165,7 @@ def _read_csv(fn, dtypes=None, **kwargs): return cudf.read_csv(fn, **kwargs) -def read_csv_without_chunksize(path, **kwargs): +def read_csv_without_blocksize(path, **kwargs): """Read entire CSV with optional compression (gzip/zip) Parameters diff --git a/python/dask_cudf/dask_cudf/io/tests/test_csv.py b/python/dask_cudf/dask_cudf/io/tests/test_csv.py index e9c209a4ccb..2eb69b0bd5c 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_csv.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_csv.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2022, NVIDIA CORPORATION. +# Copyright (c) 2019-2023, NVIDIA CORPORATION. import gzip import os @@ -85,7 +85,7 @@ def test_read_csv_w_bytes(tmp_path): df = pd.DataFrame(dict(x=np.arange(20), y=np.arange(20))) df.to_csv(tmp_path / "data-*.csv", index=False) - df2 = dask_cudf.read_csv(tmp_path / "*.csv", chunksize="50 B") + df2 = dask_cudf.read_csv(tmp_path / "*.csv", blocksize="50 B") assert df2.npartitions == 3 dd.assert_eq(df2, df, check_index=False) @@ -95,7 +95,7 @@ def test_read_csv_compression(tmp_path): df.to_csv(tmp_path / "data.csv.gz", index=False) with pytest.warns(UserWarning) as w: - df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", chunksize="50 B") + df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", blocksize="50 B") assert len(w) == 1 msg = str(w[0].message) @@ -105,7 +105,7 @@ def test_read_csv_compression(tmp_path): dd.assert_eq(df2, df, check_index=False) with warnings.catch_warnings(record=True) as record: - df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", chunksize=None) + df2 = dask_cudf.read_csv(tmp_path / "*.csv.gz", blocksize=None) assert not record @@ -130,7 +130,7 @@ def test_read_csv_compression_file_list(tmp_path): @pytest.mark.parametrize("size", [0, 3, 20]) @pytest.mark.parametrize("compression", ["gzip", None]) -def test_read_csv_chunksize_none(tmp_path, compression, size): +def test_read_csv_blocksize_none(tmp_path, compression, size): df = pd.DataFrame(dict(x=np.arange(size), y=np.arange(size))) path = ( @@ -146,9 +146,14 @@ def test_read_csv_chunksize_none(tmp_path, compression, size): typ = None df.to_csv(path, index=False, compression=compression) - df2 = dask_cudf.read_csv(path, chunksize=None, dtype=typ) + df2 = dask_cudf.read_csv(path, blocksize=None, dtype=typ) dd.assert_eq(df, df2) + # Test chunksize deprecation + with pytest.warns(FutureWarning, match="deprecated"): + df3 = dask_cudf.read_csv(path, chunksize=None, dtype=typ) + dd.assert_eq(df, df3) + @pytest.mark.parametrize("dtype", [{"b": str, "c": int}, None]) def test_csv_reader_usecols(tmp_path, dtype):