Skip to content

Commit

Permalink
Add more documentation about supported types (JDASoftwareGroup#415)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Feb 18, 2021
1 parent 6514c1f commit 28939c7
Show file tree
Hide file tree
Showing 13 changed files with 369 additions and 55 deletions.
3 changes: 2 additions & 1 deletion docs/guide/dask_indexing.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.. _dask_index:

(Re-)Store a dask index
~~~~~~~~~~~~~~~~~~~~~~~
=======================

Calculating a dask index is usually a very expensive operation which requires data to be shuffled around. To (re-)store the dask index we can use the `dask_index_on` keyword.

Expand Down
7 changes: 7 additions & 0 deletions docs/guide/partitioning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,10 @@ When investigating the index, we can also see that a query for a given value in
.. _spill-to-disk: https://distributed.dask.org/en/latest/worker.html#memory-management


See Also
--------

* :doc:`../spec/indexing`
* :doc:`../spec/efficient_querying`
6 changes: 3 additions & 3 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ these datasets in data pipelines. Kartothek handles I/O, tracks dataset
partitions and selects subsets of data transparently.

To get started, have a look at our :doc:`guide/getting_started` guide,
head to the description of the :doc:`spec/index` or head straight to the API documentation :doc:`api`.
head to the description of the :doc:`spec/format_specification` or head straight to the API documentation :doc:`api`.

What is a (real) Kartothek?
---------------------------
Expand Down Expand Up @@ -73,12 +73,12 @@ A Kartothek (or more modern: Zettelkasten/Katalogkasten) is a tool to organize
:caption: Background
:hidden:

Specification <spec/index>
Specification <spec/format_specification>
Type System <spec/type_system>
DataFrame Serialization <spec/serialization>
KeyValueStore Interface <spec/store_interface>
Storage Layout <spec/storage_layout>
Partition Indices <spec/partition_indices>
Indexing <spec/indexing>
Efficient Querying <spec/efficient_querying>
Parallel Execution with Dask <spec/parallel_dask>

Expand Down
10 changes: 8 additions & 2 deletions docs/spec/efficient_querying.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,15 @@ refine them based on your results.
* Look for the right sizing of the parquet files.
* Check your row group sizing. They allow for partial loading of a Parquet file.
* Sort column contents to produce compact and disjoint value ranges between row
groups. This allows for better selections of row groups.
groups. This allows for better selections of row groups.
* Avoid string columns. Try to use a more specific datatype. See
:doc:`type_system` for more information on possible column types.

See :func:`kartothek.io.dask.dataframe.update_dataset_from_ddf` for more
information.
information.


See also
--------
* :doc:`indexing`
* :doc:`../guide/partitioning`
2 changes: 1 addition & 1 deletion docs/spec/index.rst → docs/spec/format_specification.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,4 @@ can be seen in the example below.
all rows in a partition.
* The value of the indices map is the name of the Parquet file storing the
index.
* For a storage specification of the indices, see :ref:`partition_indices`
* For a description of the indices, see :ref:`indexing`
142 changes: 142 additions & 0 deletions docs/spec/indexing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
.. _indexing:

Indexing
========

Kartothek uses different types of `inverted file indices`_ to enable efficient partition pruning and improve query performance, see also :doc:`efficient_querying` for more hints on how to optimize performance. This section describes the different types of indices, how to create them and how to interact with them


Principle in-memory representation
----------------------------------

All currently supported kartothek index types are inverted indices and are mapping observed values of a given field to a list of partitions where they were observed.

.. ipython:: python
index_dct = {1: ["table/12345"], 2: ["table/12345", "table/6789"]}
Where, in this example, the value ``42`` is found in exactly one partition which is labeled ``table/partitionA=42/12345``.

Users typically do not interact with indices directly since querying a dataset will automatically load and interact with the indices. For some applications it is still quite useful to interact with them directly.

All indices implement :class:`~kartothek.core.index.IndexBase` which allows the user to interact with the indices in some useful ways.

.. ipython:: python
from kartothek.core.index import IndexBase
index = IndexBase(column="FieldName", index_dct=index_dct)
index.dtype
index.observed_values()
index.eval_operator(">=", 2)
index.as_flat_series()
Partition Indices
-----------------

The first index type kartothek offers is a partition index. The partition index is created by partitioning a dataset in a hive-like partition scheme.

.. ipython:: python
:suppress:
import string
import pandas as pd
from kartothek.core.utils import ensure_store
store = ensure_store("hmemory://")
from kartothek.io.eager import store_dataframes_as_dataset
.. ipython:: python
df = pd.DataFrame(
{
"PartField": ["A"] * 5 + ["B"] * 5,
"IndexedField": list(range(5)) + list(range(3, 8)),
"Payload": [string.ascii_letters[i] for i in range(10)],
}
)
dm = store_dataframes_as_dataset(
store=store,
dataset_uuid="indexing_docs",
dfs=[df],
partition_on=["PartField"],
secondary_indices=["IndexedField"],
).load_all_indices(store)
sorted(store.keys())
part_index = dm.indices["PartField"]
part_index
This kind of index is also called a `primary index`. This implies the property that a given file is guaranteed to only contain **one** unique value of the given field. This can also be observed when investigating the flat structure of the index.

.. ipython:: python
part_index.as_flat_series()
This property makes this kind of index very powerful if used correctly since it prunes the partitions exactly to the user query and enables exact removal of data when mutating datasets (see :doc:`../guide/mutating_datasets`).

For data with high cardinality this kind of index is not well suited since it would result in a highly fragmented dataset with too many, too small files.


Secondary indices
-----------------

Secondary indices are the most powerful type of indices which allow us to reference files without having to encode any kind of values in the keys. They can be created by supplying the `secondary_indices` keyword argument as shown above. The user interaction works similarly to the


Persistence
~~~~~~~~~~~

A secondary index is persisted as a Parquet file with the following
(Parquet) schema:
The field name corresponds to the name of the column in the persisted
DataFrame.
The partition is a list of partition identifiers, as used in the keys of
the partitions map and the data filename. (Note: the partition identifier
is used instead of the data filename as a single partition can span multiple
files containing different column sets using the same row selection.)


Typing
------

Every index has a well defined arrow data type which is usually inferred automatically and ensured to be consistent with the overall dataset schema.

.. ipython:: python
part_index.dtype
Supported data types for indices include

* ``bool``
* ``(u)int{8,16,32,64}``
* ``float{32,64}``
* ``str``
* ``bytes``
* ``pd.Timestamp`` (with and without timezones)
* ``datetime.date``

.. important::

Nullable fields are not properly supported and depending on the used API, the behaviour may be slightly different.

In particular, the plain dataset API will usually drop nan/nulls silently while the Cube API will raise an exception.


See also
--------
* :doc:`efficient_querying` for some general hints for faster quering
* :doc:`storage_layout`
* :doc:`../guide/partitioning` for some guidance on how to partition a dataset
* :doc:`../guide/dask_indexing`



.. _inverted file indices: https://en.wikipedia.org/wiki/Inverted_index
16 changes: 0 additions & 16 deletions docs/spec/partition_indices.rst

This file was deleted.

80 changes: 70 additions & 10 deletions docs/spec/serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,92 @@ Serialise Pandas DataFrames to/from bytes
Serialisation to bytes
----------------------

For the serialsation, we need to pick a format serialiser, you either use
:func:`~kartothek.serialization.default_serializer` or explicitly select a serialiser,
For the serialsation, we need to pick a format serializer, you either use
:func:`~kartothek.serialization.default_serializer` or explicitly select a serializer,
e.g. :class:`~kartothek.serialization.ParquetSerializer`.

.. code:: python
from kartothek.serialization import ParquetSerializer
serialiser = ParquetSerializer()
serializer = ParquetSerializer()
df = ...
serialiser.store(store, "storage_key", df)
serializer.store(store, "storage_key", df)
Deserialisation
---------------

For deserialisation, you don't have to instantiate any serialiser as the correct
For deserialisation, you don't have to instantiate any serializer as the correct
one is determined from the filename.

.. code:: python
from kartothek.serialization import DataFrameSerializer
df = DataFrameSerializer.restore_dataframe(store, "file.parquet")
# You can also supply a filter on the loaded DataFrame, e.g.
df = DataFrameSerializer.restore_dataframe(store, "file.parquet", "c_id > 42000")
# Currently these filter queries are passed to pandas.DataFrame.query but in
# future they could be further passed on to the file format depening on if
# the format supports predicate pushdown (currently this is only Parquet)
Supported data types
--------------------

Kartothek generally does not impose any restrictions on the data types to be used as long as they are compatible and in alignment with the :doc:`pyarrow pandas integration<python/pandas>`.

For a detailed explanation about how types are handled, please consult :doc:`type_system`.

.. _predicate_pushdown:

Filtering / Predicate pushdown
------------------------------

You can provide a filter expression in a `DNF`_ in a format of a nested list where every inner list is interpreted as a logical `conjunction` (``AND``) whereas the entire expression is interpreted as one `disjunction` (``OR``)

.. code:: python
prediactes = [
[("ColumnA", "==", 5),],
[("ColumnA", ">", 5), ("ColumnB", "<=", datetime.date(2021, 1, 1)),],
]
The above list of predicates can be interpreted as the following whereclause::

ColumnA = 5 OR (ColumnA > 5 AND ColumnB < '2021-01-01')


The predicate expression can be provided to the `predicates` keyword argument of the serializer and/or full dataset read interfaces.


.. note::

All kartothek reading pipelines are exposing this `predicates` argument as well where it is not only used for predicate pushdown but also for partition pruning. See :doc:`efficient_querying` for details.

The kartothek cube interface, see :doc:`../guide/cube/kartothek_cubes`, exposes a similar mechanism via the :class:`~kartothek.core.cube.conditions.Condition` and :class:`~kartothek.core.cube.conditions.Conjunction` classes.


Literals, operators and typing
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The literals used for building the predicates are tuples with three elements.

.. code::
(<FieldName: str>, <Operator: str>, <Value: Any>)
* ``FieldName`` is a str identifying the column this literal describes.
* ``Operator`` is a string for the logical operation applied to the field. Available operators are ``==``, ``!=``, ``<=``, ``>=``, ``<``, ``>``, ``in``
* ``Value`` is the actual value for the query. The type of this value is always required to be identical to the fields data type. We apply the same type normalization for the predicates as described in :doc:`type_system`.


Filtering for missing values / nulls is supported with operators `==`, `!=` and `in` and values `np.nan` and `None` for float and string columns respectively.


See also
--------
* :class:`~kartothek.serialization.DataFrameSerializer`
* :class:`~kartothek.serialization.ParquetSerializer`
* :doc:`efficient_querying`
* :doc:`../guide/cube/query_system`
* :doc:`type_system`


.. _DNF: https://en.wikipedia.org/wiki/Disjunctive_normal_form
35 changes: 35 additions & 0 deletions docs/spec/type_system.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ This is the colleague of `Date`_ and stores the time at a given day.
The normalization of ``time32[U]`` and ``time64[U]`` (where ``U`` is either ``"s"`` for seconds or ``"ms"`` for
milliseconds) is currently not implemented. This might change in a future metadata version.

.. _timestamp:

Timestamp
~~~~~~~~~
A combination of `Date`_ and `Time`_ and is particularly useful to store when an event occurred without the need to
Expand All @@ -244,6 +246,35 @@ We cannot treat timestamps for different timezones as the same time because the
semantic meaning. We also cannot treat timestamps with different unit types as same since they all have very different
ranges. So, no normalization is implemented for timestamps.

.. note::

For compatibility reasons, `kartothek` coerces timestamps to `us` accuracy, effectively truncating the timestamp. If the timestamp actually has a higher accuracy, arrow raises an exception, rejecting it

.. ipython:: python
:suppress:
import pandas as pd
from kartothek.serialization import ParquetSerializer
from kartothek.core.utils import ensure_store
store = ensure_store("memory://")
ser = ParquetSerializer()
.. ipython:: python
:okexcept:
df = pd.DataFrame({"nano": [pd.Timestamp("2021-01-01 00:00:00.0000001")]})
# nanosecond resolution
ser.store(store, "key", df)
One possibility to deal with this is to set the appropriate accuracy using `pandas.Timestamp.ceil`_ or `pandas.Timestamp.floor`_

.. ipython:: python
df.nano = df.nano.dt.ceil("us")
ser.restore_dataframe(store, ser.store(store, "key", df))
Lists
~~~~~
They are used to store a set of elements in a fixed order, like a list of cities to visit, or a plan how to connect
Expand Down Expand Up @@ -392,6 +423,8 @@ also have meant to be a ``string`` or ``date32`` column, but pyarrow cannot know

To keep things pragmatic, we ignore ``null`` during type checks.

.. _`Dictionary Encoding`:

Dictionary Encoding
-------------------
Dictionary encoded data is normally produced by Pandas categoricals:
Expand Down Expand Up @@ -525,3 +558,5 @@ Kartothek aims to be as compatible as possible with them.
.. _Turbodbc: https://github.com/blue-yonder/turbodbc
.. _Unicode: https://unicode.org/
.. _UNIX Epoch: https://en.wikipedia.org/wiki/Unix_time
.. _pandas.Timestamp.ceil: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Timestamp.ceil.html#pandas.Timestamp.ceil
.. _pandas.Timestamp.floor: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Timestamp.floor.html
Loading

0 comments on commit 28939c7

Please sign in to comment.