diff --git a/src/datasets/arrow_dataset.py b/src/datasets/arrow_dataset.py index a42f6d3850a..bb3d067d34e 100644 --- a/src/datasets/arrow_dataset.py +++ b/src/datasets/arrow_dataset.py @@ -381,6 +381,20 @@ def to_tf_dataset( else: raise ImportError("Called a Tensorflow-specific function but Tensorflow is not installed.") + if (isinstance(columns, list) and len(columns) == 1) or ( + isinstance(label_cols, list) and len(label_cols) == 1 + ): + warnings.warn( + "The output of `to_tf_dataset` will change when a passing single element list for `labels` or " + "`columns` in the next datasets version. To return a tuple structure rather than dict, pass a " + "single string.\n" + "Old behaviour: columns=['a'], labels=['labels'] -> (tf.Tensor, tf.Tensor) \n" + " : columns='a', labels='labels' -> (tf.Tensor, tf.Tensor) \n" + "New behaviour: columns=['a'],labels=['labels'] -> ({'a': tf.Tensor}, {'labels': tf.Tensor}) \n" + " : columns='a', labels='labels' -> (tf.Tensor, tf.Tensor) ", + FutureWarning, + ) + if isinstance(tf.distribute.get_strategy(), tf.distribute.TPUStrategy): logger.warning( "Note that to_tf_dataset() loads the data with a generator rather than a full tf.data " @@ -1437,8 +1451,8 @@ def save_to_disk( else: pbar.update(content) else: - for kwargs in kwargs_per_job: - with pbar: + with pbar: + for kwargs in kwargs_per_job: for job_id, done, content in Dataset._save_to_disk_single(**kwargs): if done: shards_done += 1 diff --git a/src/datasets/builder.py b/src/datasets/builder.py index 6798ff678c7..eb57d0c66b0 100644 --- a/src/datasets/builder.py +++ b/src/datasets/builder.py @@ -1485,13 +1485,14 @@ def _prepare_split( result = None gen_kwargs = split_generator.gen_kwargs job_id = 0 - for job_id, done, content in self._prepare_split_single( - gen_kwargs=gen_kwargs, job_id=job_id, **_prepare_split_args - ): - if done: - result = content - else: - pbar.update(content) + with pbar: + for job_id, done, content in self._prepare_split_single( + gen_kwargs=gen_kwargs, job_id=job_id, **_prepare_split_args + ): + if done: + result = content + else: + pbar.update(content) # wrapping everything into lists for consistency with the multiprocessed code path assert result is not None, "Failed to retrieve results from prepare_split" examples_per_job, bytes_per_job, features_per_job, shards_per_job, shard_lengths_per_job = [ @@ -1513,21 +1514,22 @@ def _prepare_split( shard_lengths_per_job = [None] * num_jobs with Pool(num_proc) as pool: - for job_id, done, content in iflatmap_unordered( - pool, self._prepare_split_single, kwargs_iterable=kwargs_per_job - ): - if done: - # the content is the result of the job - ( - examples_per_job[job_id], - bytes_per_job[job_id], - features_per_job[job_id], - shards_per_job[job_id], - shard_lengths_per_job[job_id], - ) = content - else: - # the content is the number of examples progress update - pbar.update(content) + with pbar: + for job_id, done, content in iflatmap_unordered( + pool, self._prepare_split_single, kwargs_iterable=kwargs_per_job + ): + if done: + # the content is the result of the job + ( + examples_per_job[job_id], + bytes_per_job[job_id], + features_per_job[job_id], + shards_per_job[job_id], + shard_lengths_per_job[job_id], + ) = content + else: + # the content is the number of examples progress update + pbar.update(content) assert ( None not in examples_per_job diff --git a/src/datasets/features/features.py b/src/datasets/features/features.py index e1f709b4f80..a5a0d909f7d 100644 --- a/src/datasets/features/features.py +++ b/src/datasets/features/features.py @@ -634,6 +634,9 @@ def __init__(self, shape: tuple, dtype: str): raise ValueError("You must instantiate an array type with a value for dim that is > 1") if len(shape) != self.ndims: raise ValueError(f"shape={shape} and ndims={self.ndims} don't match") + for dim in range(1, self.ndims): + if shape[dim] is None: + raise ValueError(f"Support only dynamic size on first dimension. Got: {shape}") self.shape = tuple(shape) self.value_type = dtype self.storage_dtype = self._generate_dtype(self.value_type) @@ -711,53 +714,54 @@ def __getitem__(self, i): def to_numpy(self, zero_copy_only=True): storage: pa.ListArray = self.storage - size = 1 - - null_indices = np.arange(len(storage))[storage.is_null().to_numpy(zero_copy_only=False)] - for i in range(self.type.ndims): - size *= self.type.shape[i] - storage = storage.flatten() - numpy_arr = storage.to_numpy(zero_copy_only=zero_copy_only) - numpy_arr = numpy_arr.reshape(len(self) - len(null_indices), *self.type.shape) + if self.type.shape[0] is not None: + size = 1 + null_indices = np.arange(len(storage))[storage.is_null().to_numpy(zero_copy_only=False)] - if len(null_indices): - numpy_arr = np.insert(numpy_arr.astype(np.float64), null_indices, np.nan, axis=0) - - return numpy_arr + for i in range(self.type.ndims): + size *= self.type.shape[i] + storage = storage.flatten() + numpy_arr = storage.to_numpy(zero_copy_only=zero_copy_only) + numpy_arr = numpy_arr.reshape(len(self) - len(null_indices), *self.type.shape) - def to_list_of_numpy(self, zero_copy_only=True): - storage: pa.ListArray = self.storage - shape = self.type.shape - ndims = self.type.ndims - - for dim in range(1, ndims): - if shape[dim] is None: - raise ValueError(f"Support only dynamic size on first dimension. Got: {shape}") + if len(null_indices): + numpy_arr = np.insert(numpy_arr.astype(np.float64), null_indices, np.nan, axis=0) - arrays = [] - first_dim_offsets = np.array([off.as_py() for off in storage.offsets]) - for i, is_null in enumerate(storage.is_null().to_numpy(zero_copy_only=False)): - if is_null: - arrays.append(np.nan) + else: + shape = self.type.shape + ndims = self.type.ndims + arrays = [] + first_dim_offsets = np.array([off.as_py() for off in storage.offsets]) + for i, is_null in enumerate(storage.is_null().to_numpy(zero_copy_only=False)): + if is_null: + arrays.append(np.nan) + else: + storage_el = storage[i : i + 1] + first_dim = first_dim_offsets[i + 1] - first_dim_offsets[i] + # flatten storage + for _ in range(ndims): + storage_el = storage_el.flatten() + + numpy_arr = storage_el.to_numpy(zero_copy_only=zero_copy_only) + arrays.append(numpy_arr.reshape(first_dim, *shape[1:])) + + if len(np.unique(np.diff(first_dim_offsets))) > 1: + # ragged + numpy_arr = np.empty(len(arrays), dtype=object) + numpy_arr[:] = arrays else: - storage_el = storage[i : i + 1] - first_dim = first_dim_offsets[i + 1] - first_dim_offsets[i] - # flatten storage - for _ in range(ndims): - storage_el = storage_el.flatten() + numpy_arr = np.array(arrays) - numpy_arr = storage_el.to_numpy(zero_copy_only=zero_copy_only) - arrays.append(numpy_arr.reshape(first_dim, *shape[1:])) - - return arrays + return numpy_arr def to_pylist(self): zero_copy_only = _is_zero_copy_only(self.storage.type, unnest=True) - if self.type.shape[0] is None: - return self.to_list_of_numpy(zero_copy_only=zero_copy_only) + numpy_arr = self.to_numpy(zero_copy_only=zero_copy_only) + if self.type.shape[0] is None and numpy_arr.dtype == object: + return [arr.tolist() for arr in numpy_arr.tolist()] else: - return self.to_numpy(zero_copy_only=zero_copy_only).tolist() + return numpy_arr.tolist() class PandasArrayExtensionDtype(PandasExtensionDtype): @@ -767,16 +771,10 @@ def __init__(self, value_type: Union["PandasArrayExtensionDtype", np.dtype]): self._value_type = value_type def __from_arrow__(self, array: Union[pa.Array, pa.ChunkedArray]): - if array.type.shape[0] is None: - raise NotImplementedError( - "Dynamic first dimension is not supported for " - f"PandasArrayExtensionDtype, dimension: {array.type.shape}" - ) - zero_copy_only = _is_zero_copy_only(array.type, unnest=True) if isinstance(array, pa.ChunkedArray): - numpy_arr = np.vstack([chunk.to_numpy(zero_copy_only=zero_copy_only) for chunk in array.chunks]) - else: - numpy_arr = array.to_numpy(zero_copy_only=zero_copy_only) + array = array.type.wrap_array(pa.concat_arrays([chunk.storage for chunk in array.chunks])) + zero_copy_only = _is_zero_copy_only(array.storage.type, unnest=True) + numpy_arr = array.to_numpy(zero_copy_only=zero_copy_only) return PandasArrayExtensionArray(numpy_arr) @classmethod @@ -832,12 +830,25 @@ def copy(self, deep: bool = False) -> "PandasArrayExtensionArray": def _from_sequence( cls, scalars, dtype: Optional[PandasArrayExtensionDtype] = None, copy: bool = False ) -> "PandasArrayExtensionArray": - data = np.array(scalars, dtype=dtype if dtype is None else dtype.value_type, copy=copy) + if len(scalars) > 1 and all( + isinstance(x, np.ndarray) and x.shape == scalars[0].shape and x.dtype == scalars[0].dtype for x in scalars + ): + data = np.array(scalars, dtype=dtype if dtype is None else dtype.value_type, copy=copy) + else: + data = np.empty(len(scalars), dtype=object) + data[:] = scalars return cls(data, copy=copy) @classmethod def _concat_same_type(cls, to_concat: Sequence_["PandasArrayExtensionArray"]) -> "PandasArrayExtensionArray": - data = np.vstack([va._data for va in to_concat]) + if len(to_concat) > 1 and all( + va._data.shape == to_concat[0]._data.shape and va._data.dtype == to_concat[0]._data.dtype + for va in to_concat + ): + data = np.vstack([va._data for va in to_concat]) + else: + data = np.empty(len(to_concat), dtype=object) + data[:] = [va._data for va in to_concat] return cls(data, copy=False) @property diff --git a/src/datasets/formatting/formatting.py b/src/datasets/formatting/formatting.py index 9749c56859b..bc285e01257 100644 --- a/src/datasets/formatting/formatting.py +++ b/src/datasets/formatting/formatting.py @@ -168,16 +168,9 @@ def _arrow_array_to_numpy(self, pa_array: pa.Array) -> np.ndarray: if isinstance(pa_array.type, _ArrayXDExtensionType): # don't call to_pylist() to preserve dtype of the fixed-size array zero_copy_only = _is_zero_copy_only(pa_array.type.storage_dtype, unnest=True) - if pa_array.type.shape[0] is None: - array: List = [ - row - for chunk in pa_array.chunks - for row in chunk.to_list_of_numpy(zero_copy_only=zero_copy_only) - ] - else: - array: List = [ - row for chunk in pa_array.chunks for row in chunk.to_numpy(zero_copy_only=zero_copy_only) - ] + array: List = [ + row for chunk in pa_array.chunks for row in chunk.to_numpy(zero_copy_only=zero_copy_only) + ] else: zero_copy_only = _is_zero_copy_only(pa_array.type) and all( not _is_array_with_nulls(chunk) for chunk in pa_array.chunks @@ -189,10 +182,7 @@ def _arrow_array_to_numpy(self, pa_array: pa.Array) -> np.ndarray: if isinstance(pa_array.type, _ArrayXDExtensionType): # don't call to_pylist() to preserve dtype of the fixed-size array zero_copy_only = _is_zero_copy_only(pa_array.type.storage_dtype, unnest=True) - if pa_array.type.shape[0] is None: - array: List = pa_array.to_list_of_numpy(zero_copy_only=zero_copy_only) - else: - array: List = pa_array.to_numpy(zero_copy_only=zero_copy_only) + array: List = pa_array.to_numpy(zero_copy_only=zero_copy_only) else: zero_copy_only = _is_zero_copy_only(pa_array.type) and not _is_array_with_nulls(pa_array) array: List = pa_array.to_numpy(zero_copy_only=zero_copy_only).tolist() diff --git a/src/datasets/load.py b/src/datasets/load.py index 905907e7f70..e648f8c34ff 100644 --- a/src/datasets/load.py +++ b/src/datasets/load.py @@ -701,7 +701,9 @@ def __init__( increase_load_count(name, resource_type="dataset") def get_module(self) -> DatasetModule: - base_path = str(Path(self.data_dir).resolve()) if self.data_dir is not None else str(Path().resolve()) + base_path = ( + str(Path(self.data_dir).expanduser().resolve()) if self.data_dir is not None else str(Path().resolve()) + ) patterns = ( sanitize_patterns(self.data_files) if self.data_files is not None else get_data_patterns_locally(base_path) ) diff --git a/src/datasets/packaged_modules/json/json.py b/src/datasets/packaged_modules/json/json.py index c2e935902fe..3a6c98ecfd5 100644 --- a/src/datasets/packaged_modules/json/json.py +++ b/src/datasets/packaged_modules/json/json.py @@ -84,10 +84,11 @@ def _generate_tables(self, files): # We accept two format: a list of dicts or a dict of lists if isinstance(dataset, (list, tuple)): - mapping = {col: [dataset[i][col] for i in range(len(dataset))] for col in dataset[0].keys()} + keys = set().union(*[row.keys() for row in dataset]) + mapping = {col: [row.get(col) for row in dataset] for col in keys} else: mapping = dataset - pa_table = pa.Table.from_pydict(mapping=mapping) + pa_table = pa.Table.from_pydict(mapping) yield file_idx, self._cast_table(pa_table) # If the file has one json object per line @@ -137,7 +138,9 @@ def _generate_tables(self, files): # If possible, parse the file as a list of json objects and exit the loop if isinstance(dataset, list): # list is the only sequence type supported in JSON try: - pa_table = pa.Table.from_pylist(dataset) + keys = set().union(*[row.keys() for row in dataset]) + mapping = {col: [row.get(col) for row in dataset] for col in keys} + pa_table = pa.Table.from_pydict(mapping) except (pa.ArrowInvalid, AttributeError) as e: logger.error(f"Failed to read file '{file}' with error {type(e)}: {e}") raise ValueError(f"Not able to read records in the JSON file at {file}.") from None diff --git a/tests/features/test_array_xd.py b/tests/features/test_array_xd.py index 4481fab585a..5302dedb2e0 100644 --- a/tests/features/test_array_xd.py +++ b/tests/features/test_array_xd.py @@ -5,6 +5,7 @@ import numpy as np import pandas as pd +import pyarrow as pa import pytest from absl.testing import parameterized @@ -267,6 +268,39 @@ def test_to_pylist(self): pylist = arr_xd.to_pylist() for first_dim, single_arr in zip(first_dim_list, pylist): + self.assertIsInstance(single_arr, list) + self.assertTupleEqual(np.array(single_arr).shape, (first_dim, *fixed_shape)) + + def test_to_numpy(self): + fixed_shape = (2, 2) + + # ragged + first_dim_list = [1, 3, 10] + dataset = self.get_one_col_dataset(first_dim_list, fixed_shape) + arr_xd = SimpleArrowExtractor().extract_column(dataset._data) + self.assertIsInstance(arr_xd.type, Array3DExtensionType) + # replace with arr_xd = arr_xd.combine_chunks() when 12.0.0 will be the minimal required PyArrow version + arr_xd = arr_xd.type.wrap_array(pa.concat_arrays([chunk.storage for chunk in arr_xd.chunks])) + numpy_arr = arr_xd.to_numpy() + + self.assertIsInstance(numpy_arr, np.ndarray) + self.assertEqual(numpy_arr.dtype, object) + for first_dim, single_arr in zip(first_dim_list, numpy_arr): + self.assertIsInstance(single_arr, np.ndarray) + self.assertTupleEqual(single_arr.shape, (first_dim, *fixed_shape)) + + # non-ragged + first_dim_list = [4, 4, 4] + dataset = self.get_one_col_dataset(first_dim_list, fixed_shape) + arr_xd = SimpleArrowExtractor().extract_column(dataset._data) + self.assertIsInstance(arr_xd.type, Array3DExtensionType) + # replace with arr_xd = arr_xd.combine_chunks() when 12.0.0 will be the minimal required PyArrow version + arr_xd = arr_xd.type.wrap_array(pa.concat_arrays([chunk.storage for chunk in arr_xd.chunks])) + numpy_arr = arr_xd.to_numpy() + + self.assertIsInstance(numpy_arr, np.ndarray) + self.assertNotEqual(numpy_arr.dtype, object) + for first_dim, single_arr in zip(first_dim_list, numpy_arr): self.assertIsInstance(single_arr, np.ndarray) self.assertTupleEqual(single_arr.shape, (first_dim, *fixed_shape)) @@ -277,15 +311,37 @@ def test_iter_dataset(self): for first_dim, ds_row in zip(first_dim_list, dataset): single_arr = ds_row["image"] - self.assertIsInstance(single_arr, np.ndarray) - self.assertTupleEqual(single_arr.shape, (first_dim, *fixed_shape)) + self.assertIsInstance(single_arr, list) + self.assertTupleEqual(np.array(single_arr).shape, (first_dim, *fixed_shape)) - def test_to_pandas_fail(self): + def test_to_pandas(self): fixed_shape = (2, 2) + + # ragged first_dim_list = [1, 3, 10] dataset = self.get_one_col_dataset(first_dim_list, fixed_shape) - with self.assertRaises(NotImplementedError): - dataset.to_pandas() + df = dataset.to_pandas() + self.assertEqual(type(df.image.dtype), PandasArrayExtensionDtype) + numpy_arr = df.image.to_numpy() + + self.assertIsInstance(numpy_arr, np.ndarray) + self.assertEqual(numpy_arr.dtype, object) + for first_dim, single_arr in zip(first_dim_list, numpy_arr): + self.assertIsInstance(single_arr, np.ndarray) + self.assertTupleEqual(single_arr.shape, (first_dim, *fixed_shape)) + + # non-ragged + first_dim_list = [4, 4, 4] + dataset = self.get_one_col_dataset(first_dim_list, fixed_shape) + df = dataset.to_pandas() + self.assertEqual(type(df.image.dtype), PandasArrayExtensionDtype) + numpy_arr = df.image.to_numpy() + + self.assertIsInstance(numpy_arr, np.ndarray) + self.assertNotEqual(numpy_arr.dtype, object) + for first_dim, single_arr in zip(first_dim_list, numpy_arr): + self.assertIsInstance(single_arr, np.ndarray) + self.assertTupleEqual(single_arr.shape, (first_dim, *fixed_shape)) def test_map_dataset(self): fixed_shape = (2, 2) @@ -297,8 +353,8 @@ def test_map_dataset(self): # check also if above function resulted with 2x bigger first dim for first_dim, ds_row in zip(first_dim_list, dataset): single_arr = ds_row["image"] - self.assertIsInstance(single_arr, np.ndarray) - self.assertTupleEqual(single_arr.shape, (first_dim * 2, *fixed_shape)) + self.assertIsInstance(single_arr, list) + self.assertTupleEqual(np.array(single_arr).shape, (first_dim * 2, *fixed_shape)) @pytest.mark.parametrize("dtype, dummy_value", [("int32", 1), ("bool", True), ("float64", 1)]) diff --git a/tests/packaged_modules/test_json.py b/tests/packaged_modules/test_json.py index 17b7407f617..f935257271e 100644 --- a/tests/packaged_modules/test_json.py +++ b/tests/packaged_modules/test_json.py @@ -12,6 +12,7 @@ def jsonl_file(tmp_path): filename = tmp_path / "file.jsonl" data = textwrap.dedent( """\ + {"col_1": -1} {"col_1": 1, "col_2": 2} {"col_1": 10, "col_2": 20} """ @@ -27,6 +28,7 @@ def json_file_with_list_of_dicts(tmp_path): data = textwrap.dedent( """\ [ + {"col_1": -1}, {"col_1": 1, "col_2": 2}, {"col_1": 10, "col_2": 20} ] @@ -46,6 +48,7 @@ def json_file_with_list_of_dicts_field(tmp_path): "field1": 1, "field2": "aabb", "field3": [ + {"col_1": -1}, {"col_1": 1, "col_2": 2}, {"col_1": 10, "col_2": 20} ] @@ -69,7 +72,7 @@ def test_json_generate_tables(file_fixture, config_kwargs, request): json = Json(**config_kwargs) generator = json._generate_tables([[request.getfixturevalue(file_fixture)]]) pa_table = pa.concat_tables([table for _, table in generator]) - assert pa_table.to_pydict() == {"col_1": [1, 10], "col_2": [2, 20]} + assert pa_table.to_pydict() == {"col_1": [-1, 1, 10], "col_2": [None, 2, 20]} @pytest.mark.parametrize( @@ -98,4 +101,4 @@ def test_json_generate_tables_with_missing_features(file_fixture, config_kwargs, json = Json(**config_kwargs) generator = json._generate_tables([[request.getfixturevalue(file_fixture)]]) pa_table = pa.concat_tables([table for _, table in generator]) - assert pa_table.to_pydict() == {"col_1": [1, 10], "col_2": [2, 20], "missing_col": [None, None]} + assert pa_table.to_pydict() == {"col_1": [-1, 1, 10], "col_2": [None, 2, 20], "missing_col": [None, None, None]}