Skip to content

Commit

Permalink
Rollback to v3.20.0 (JDASoftwareGroup#471)
Browse files Browse the repository at this point in the history
Revert "Bump codecov/codecov-action from v1.4.1 to v1.5.0 (JDASoftwareGroup#466)"
This reverts commit fdc9779.

Revert "fix mistakes in documentation"
This reverts commit 4e4b5e0.

Revert "Bump pre-commit/action from v2.0.0 to v2.0.3 (JDASoftwareGroup#460)"
This reverts commit d027ca2.

Revert "Bump codecov/codecov-action from v1.4.0 to v1.4.1 (JDASoftwareGroup#461)"
This reverts commit 97cd553.

Revert "Bump codecov/codecov-action from v1.3.1 to v1.4.0 (JDASoftwareGroup#458)"
This reverts commit e48d67a.

Revert "Fix bug when loading few columns of a dataset with many primary indices (JDASoftwareGroup#446)"
This reverts commit 90ee486.

Revert "Prepare release 4.0.1"
This reverts commit b278503.

Revert "Fix tests for dask dataframe and delayed backends"
This reverts commit 5520f74.

Revert "Add end-to-end regression test"
This reverts commit 8a3e6ae.

Revert "Fix dataset corruption after updates (JDASoftwareGroup#445)"
This reverts commit a26e840.

Revert "Set release date for 4.0"
This reverts commit 08a8094.

Revert "Return dask scalar for store and update from ddf (JDASoftwareGroup#437)"
This reverts commit 494732d.

Revert "Add tests for non-default table (JDASoftwareGroup#440)"
This reverts commit 3807a02.

Revert "Bump codecov/codecov-action from v1.2.2 to v1.3.1 (JDASoftwareGroup#441)"
This reverts commit f7615ec.

Revert "Set default for dates_as_object to True (JDASoftwareGroup#436)"
This reverts commit 75ffdb5.

Revert "Remove inferred indices (JDASoftwareGroup#438)"
This reverts commit b1e2535.

Revert "fix typo: 'KTK_CUBE_UUID_SEPERATOR' -> 'KTK_CUBE_UUID_SEPARATOR' (JDASoftwareGroup#422)"
This reverts commit b349cee.

Revert "Remove all deprecated arguments (JDASoftwareGroup#434)"
This reverts commit 74f0790.

Revert "Remove multi table feature (JDASoftwareGroup#431)"
This reverts commit 032856a.
  • Loading branch information
ilia-zaitcev-by committed Jun 11, 2021
1 parent db840b5 commit cbee12d
Show file tree
Hide file tree
Showing 81 changed files with 4,888 additions and 1,580 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ jobs:
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: pre-commit/action@v2.0.3
- uses: pre-commit/action@v2.0.0
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ jobs:
run: python setup.py sdist bdist_wheel

- name: Codecov
uses: codecov/codecov-action@v1.5.0
uses: codecov/codecov-action@v1.2.2
with:
# NOTE: `token` is not required, because the kartothek repo is public
file: ./coverage.xml
Expand Down
12 changes: 12 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,23 @@
Changelog
=========

Version 5.0.0 (2021-05-xx)
==========================

This release rolls all the changes introduced with 4.x back to 3.20.0.

As the incompatibility between 4.0 and 5.0 will be an issue for some customers, we encourage you to use the very stable
kartothek 3.20.0 and not version 4.x.

Please refer the Issue #471 for further information.


Kartothek 4.0.3 (2021-06-10)
============================

* Pin dask to not use 2021.5.1 and 2020.6.0 (#475)


Kartothek 4.0.2 (2021-06-07)
============================

Expand Down
4 changes: 3 additions & 1 deletion asv_bench/benchmarks/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ def setup(self, cardinality, num_values, partitions_to_merge):
unique_vals = ["{:010d}".format(n) for n in range(cardinality)]
array = [unique_vals[x % len(unique_vals)] for x in range(num_values)]
self.df = pd.DataFrame({self.column: array})
self.mp = MetaPartition(label=self.table, data=self.df, metadata_version=4)
self.mp = MetaPartition(
label=self.table, data={"core": self.df}, metadata_version=4
)
self.mp_indices = self.mp.build_indices([self.column])
self.merge_indices.append(self.mp_indices)

Expand Down
6 changes: 4 additions & 2 deletions asv_bench/benchmarks/metapartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ def setup(self, num_rows, dtype):
self.mp = MetaPartition(
label="primary_key={}/base_label".format(dtype[0]),
metadata_version=4,
schema=self.schema,
table_meta={"table": self.schema},
)

def time_reconstruct_index(self, num_rows, dtype):

self.mp._reconstruct_index_columns(
df=self.df,
key_indices=[("primary_key", str(dtype[1]))],
table="table",
columns=None,
categories=None,
date_as_object=False,
Expand All @@ -50,7 +51,8 @@ def time_reconstruct_index_categorical(self, num_rows, dtype):
self.mp._reconstruct_index_columns(
df=self.df,
key_indices=[("primary_key", str(dtype[1]))],
table="table",
columns=None,
categories="primary_key",
categories={"table": ["primary_key"]},
date_as_object=False,
)
16 changes: 8 additions & 8 deletions asv_bench/benchmarks/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
from .config import AsvBenchmarkConfig


def generate_mp():
def generate_mp(dataset_metadata=None):
return MetaPartition(
label=uuid.uuid4().hex,
schema=make_meta(get_dataframe_alltypes(), origin="alltypes"),
file="fakefile",
table_meta={"table": make_meta(get_dataframe_alltypes(), origin="alltypes")},
files={"table": "fakefile"},
dataset_metadata=dataset_metadata,
)


Expand Down Expand Up @@ -49,7 +50,8 @@ class TimeStoreDataset(AsvBenchmarkConfig):

def setup(self, num_partitions, max_depth, num_leafs):
self.store = get_store_from_url("hfs://{}".format(tempfile.mkdtemp()))
self.partitions = [generate_mp() for _ in range(num_partitions)]
dataset_metadata = generate_metadata(max_depth, num_leafs)
self.partitions = [generate_mp(dataset_metadata) for _ in range(num_partitions)]
self.dataset_uuid = "dataset_uuid"
self.user_dataset_metadata = {}

Expand All @@ -68,10 +70,8 @@ class TimePersistMetadata(AsvBenchmarkConfig):

def setup(self, num_partitions):
self.store = get_store_from_url("hfs://{}".format(tempfile.mkdtemp()))
self.schemas = [generate_mp().schema for _ in range(num_partitions)]
self.partitions = [generate_mp() for _ in range(num_partitions)]
self.dataset_uuid = "dataset_uuid"

def time_persist_common_metadata(self, num_partitions):
persist_common_metadata(
self.schemas, None, self.store, self.dataset_uuid, "name"
)
persist_common_metadata(self.partitions, None, self.store, self.dataset_uuid)
5 changes: 5 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,8 @@
"kartothek.serialization._generic": "kartothek.serialization",
"kartothek.serialization._parquet": "kartothek.serialization",
}

# In particular the deprecation warning in DatasetMetadata.table_schema is
# raising too many warning to handle sensibly using ipython directive pseudo
# decorators. Remove this with 4.X again
ipython_warning_is_error = False
58 changes: 45 additions & 13 deletions docs/guide/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Setup a store
# Load your data
# By default the single dataframe is stored in the 'core' table
df_from_store = read_table(store=store_url, dataset_uuid=dataset_uuid)
df_from_store = read_table(store=store_url, dataset_uuid=dataset_uuid, table="table")
df_from_store
Expand All @@ -58,8 +58,14 @@ Write
# We'll define two partitions which both have two tables
input_list_of_partitions = [
pd.DataFrame({"A": range(10)}),
pd.DataFrame({"A": range(10, 20)}),
{
"label": "FirstPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
{
"label": "SecondPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
]
# The pipeline will return a :class:`~kartothek.core.dataset.DatasetMetadata` object
Expand Down Expand Up @@ -90,10 +96,17 @@ Read
# In case you were using the dataset created in the Write example
for d1, d2 in zip(
list_of_partitions,
[pd.DataFrame({"A": range(10)}), pd.DataFrame({"A": range(10, 20)}),],
[
# FirstPartition
{"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
# SecondPartition
{"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
],
):
for k1, k2 in zip(d1, d2):
assert k1 == k2
for kv1, kv2 in zip(d1.items(), d2.items()):
k1, v1 = kv1
k2, v2 = kv2
assert k1 == k2 and all(v1 == v2)
Iter
Expand All @@ -107,8 +120,14 @@ Write
from kartothek.api.dataset import store_dataframes_as_dataset__iter
input_list_of_partitions = [
pd.DataFrame({"A": range(10)}),
pd.DataFrame({"A": range(10, 20)}),
{
"label": "FirstPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
{
"label": "SecondPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
]
# The pipeline will return a :class:`~kartothek.core.dataset.DatasetMetadata` object
Expand Down Expand Up @@ -141,10 +160,17 @@ Read
# In case you were using the dataset created in the Write example
for d1, d2 in zip(
list_of_partitions,
[pd.DataFrame({"A": range(10)}), pd.DataFrame({"A": range(10, 20)}),],
[
# FirstPartition
{"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
# SecondPartition
{"FirstCategory": pd.DataFrame(), "SecondCategory": pd.DataFrame()},
],
):
for k1, k2 in zip(d1, d2):
assert k1 == k2
for kv1, kv2 in zip(d1.items(), d2.items()):
k1, v1 = kv1
k2, v2 = kv2
assert k1 == k2 and all(v1 == v2)
Dask
````
Expand All @@ -158,8 +184,14 @@ Write
from kartothek.api.dataset import store_delayed_as_dataset
input_list_of_partitions = [
pd.DataFrame({"A": range(10)}),
pd.DataFrame({"A": range(10, 20)}),
{
"label": "FirstPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
{
"label": "SecondPartition",
"data": [("FirstCategory", pd.DataFrame()), ("SecondCategory", pd.DataFrame())],
},
]
# This will return a :class:`~dask.delayed`. The figure below
Expand Down
84 changes: 65 additions & 19 deletions docs/guide/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ Getting Started
===============


Kartothek manages datasets that consist of files that contain tables. It does so by offering
a metadata definition to handle these datasets efficiently.

Datasets in Kartothek are made up of one or more ``tables``, each with a unique schema.
When working with Kartothek tables as a Python user, we will use :class:`~pandas.DataFrame`
as the user-facing type.

Expand Down Expand Up @@ -127,25 +131,33 @@ This class holds information about the structure and schema of the dataset.

.. ipython:: python
dm.table_name
dm.tables
sorted(dm.partitions.keys())
dm.schema.remove_metadata()
dm.table_meta["table"].remove_metadata() # Arrow schema
For this guide, two attributes that are noteworthy are ``tables`` and ``partitions``:

For this guide we want to take a closer look at the ``partitions`` attribute.
``partitions`` are the physical "pieces" of data which together constitute the
contents of a dataset. Data is written to storage on a per-partition basis. See
the section on partitioning for further details: :ref:`partitioning_section`.
- Each dataset has one or more ``tables``, where each table is a logical collection of data,
bound together by a common schema.
- ``partitions`` are the physical "pieces" of data which together constitute the
contents of a dataset. Data is written to storage on a per-partition basis.
See the section on partitioning for further details: :ref:`partitioning_section`.

The attribute ``schema`` can be accessed to see the underlying schema of the dataset.
The attribute ``table_meta`` can be accessed to see the underlying schema of the dataset.
See :ref:`type_system` for more information.

To store multiple dataframes into a dataset, it is possible to pass a collection of
dataframes; the exact format will depend on the I/O backend used.

Kartothek assumes these dataframes are different chunks of the same table and
will therefore be required to have the same schema. A ``ValueError`` will be
thrown otherwise.
Additionally, Kartothek supports several data input formats,
it does not need to always be a plain ``pd.DataFrame``.
See :func:`~kartothek.io_components.metapartition.parse_input_to_metapartition` for
further details.

If table names are not specified when passing an iterator of dataframes,
Kartothek assumes these dataframes are different chunks of the same table
and expects their schemas to be identical. A ``ValueError`` will be thrown otherwise.
For example,

.. ipython:: python
Expand Down Expand Up @@ -182,6 +194,39 @@ For example,
.. note:: Read these sections for more details: :ref:`type_system`, :ref:`dataset_spec`


When we do not explicitly define the name of the table and partition, Kartothek uses the
default table name ``table`` and generates a UUID for the partition name.

.. admonition:: A more complex example: multiple named tables

Sometimes it may be useful to write multiple dataframes with different schemas into
a single dataset. This can be achieved by creating a dataset with multiple tables.

In this example, we create a dataset with two tables: ``core-table`` and ``aux-table``.
The schemas of the tables are identical across partitions (each dictionary in the
``dfs`` list argument represents a partition).

.. ipython:: python
dfs = [
{
"data": {
"core-table": pd.DataFrame({"id": [22, 23], "f": [1.1, 2.4]}),
"aux-table": pd.DataFrame({"id": [22], "col1": ["x"]}),
}
},
{
"data": {
"core-table": pd.DataFrame({"id": [29, 31], "f": [3.2, 0.6]}),
"aux-table": pd.DataFrame({"id": [31], "col1": ["y"]}),
}
},
]
dm = store_dataframes_as_dataset(store_url, dataset_uuid="two-tables", dfs=dfs)
dm.tables
Reading data from storage
=========================

Expand All @@ -193,24 +238,24 @@ table of the dataset as a pandas DataFrame.
from kartothek.api.dataset import read_table
read_table("a_unique_dataset_identifier", store_url)
read_table("a_unique_dataset_identifier", store_url, table="table")
We can also read a dataframe iteratively, using
:func:`~kartothek.io.iter.read_dataset_as_dataframes__iterator`. This will return a generator of :class:`pandas.DataFrame` where every element represents one file. For example,
:func:`~kartothek.io.iter.read_dataset_as_dataframes__iterator`. This will return a generator
of dictionaries (one dictionary for each `partition`), where the keys of each dictionary
represent the `tables` of the dataset. For example,

.. ipython:: python
from kartothek.api.dataset import read_dataset_as_dataframes__iterator
for partition_index, df in enumerate(
read_dataset_as_dataframes__iterator(
dataset_uuid="a_unique_dataset_identifier", store=store_url
)
for partition_index, df_dict in enumerate(
read_dataset_as_dataframes__iterator(dataset_uuid="two-tables", store=store_url)
):
# Note: There is no guarantee on the ordering
print(f"Partition #{partition_index}")
print(f"Data: \n{df}")
for table_name, table_df in df_dict.items():
print(f"Table: {table_name}. Data: \n{table_df}")
Respectively, the ``dask.delayed`` back-end provides the function
:func:`~kartothek.io.dask.delayed.read_dataset_as_delayed`, which has a very similar
Expand All @@ -230,7 +275,8 @@ function but returns a collection of ``dask.delayed`` objects.

.. ipython:: python
read_table("a_unique_dataset_identifier", store_url, predicates=[[("A", "<", 2.5)]])
# Read only values table `core-table` where `f` < 2.5
read_table("two-tables", store_url, table="core-table", predicates=[[("f", "<", 2.5)]])
.. _storefact: https://github.com/blue-yonder/storefact
.. _dask: https://docs.dask.org/en/latest/
Loading

0 comments on commit cbee12d

Please sign in to comment.