Skip to content

Commit

Permalink
[DOP-13900] Add note about connecting to Clickhouse cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed May 23, 2024
1 parent 1f73652 commit ffe408d
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 44 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/280.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add note about connecting to Clickhouse cluster.
26 changes: 21 additions & 5 deletions docs/connection/db_connection/clickhouse/prerequisites.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ BEFORE creating the connector instance.
See :ref:`install-spark` installation instruction for more details.

Connecting to Clickhouse
-----------------------
------------------------

Connection port
~~~~~~~~~~~~~~~
Expand All @@ -30,11 +30,27 @@ Connector can only use **HTTP** (usually ``8123`` port) or **HTTPS** (usually ``

TCP and GRPC protocols are NOT supported.

Clickhouse cluster interaction
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Connecting to cluster
~~~~~~~~~~~~~~~~~~~~~

If you're using Clickhouse cluster, it is currently possible to connect only to one specific cluster node.
Connecting to multiple nodes simultaneously is not supported.
It is possible to connect to Clickhouse cluster, and use it's load balancing capabilities to read or write data in parallel.
Each Spark executor can connect to random Clickhouse nodes, instead of sending all the data to a node specified in connection params.

This requires all Clickhouse servers to run on different hosts, and **listen the same HTTP port**.
Set ``auto_discovery=True`` to enable this feature (disabled by default):

.. code:: python
Clickhouse(
host="node1.of.cluster",
port=8123,
extra={
"auto_discovery": True,
"load_balancing_policy": "roundRobin",
},
)
See `official documentation <https://clickhouse.com/docs/en/integrations/java#configuring-node-discovery-load-balancing-and-failover>`_.

Required grants
~~~~~~~~~~~~~~~
Expand Down
68 changes: 43 additions & 25 deletions docs/connection/db_connection/clickhouse/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ Numeric types
~~~~~~~~~~~~~

+--------------------------------+-----------------------------------+-------------------------------+-------------------------------+
| Clickhouse type (read) | Spark type | Clickhouse type (write) | Clickhouse type (create) |
| Clickhouse type (read) | Spark type | Clickhouse type (write) | Clickhouse type (create) |
+================================+===================================+===============================+===============================+
| ``Bool`` | ``BooleanType()`` | ``UInt64`` | ``UInt64`` |
| ``Bool`` | ``BooleanType()`` | ``Bool`` | ``UInt64`` |
+--------------------------------+-----------------------------------+-------------------------------+-------------------------------+
| ``Decimal`` | ``DecimalType(P=10, S=0)`` | ``Decimal(P=10, S=0)`` | ``Decimal(P=10, S=0)`` |
+--------------------------------+-----------------------------------+-------------------------------+-------------------------------+
Expand Down Expand Up @@ -158,8 +158,8 @@ Numeric types
| ``Int64`` | ``LongType()`` | ``Int64`` | ``Int64`` |
+--------------------------------+-----------------------------------+-------------------------------+-------------------------------+
| ``Int128`` | unsupported [3]_ | | |
+--------------------------------+-----------------------------------+-------------------------------+-------------------------------+
| ``Int256`` | unsupported [3]_ | | |
+--------------------------------+ | | |
| ``Int256`` | | | |
+--------------------------------+-----------------------------------+-------------------------------+-------------------------------+
| ``-`` | ``ByteType()`` | ``Int8`` | ``Int8`` |
+--------------------------------+-----------------------------------+-------------------------------+-------------------------------+
Expand Down Expand Up @@ -198,22 +198,27 @@ Notes:
+===================================+======================================+==================================+===============================+
| ``Date`` | ``DateType()`` | ``Date`` | ``Date`` |
+-----------------------------------+--------------------------------------+----------------------------------+-------------------------------+
| ``Date32`` | ``DateType()`` | ``Date`` | ``Date`` |
| | | | **cannot be inserted** [6]_ |
| ``Date32`` | ``DateType()`` | ``Date`` | ``Date``, |
| | | | **cannot insert data** [4]_ |
+-----------------------------------+--------------------------------------+----------------------------------+-------------------------------+
| ``DateTime32``, seconds | ``TimestampType()`` | ``DateTime64(6)``, microseconds | ``DateTime32`` |
+-----------------------------------+--------------------------------------+----------------------------------+ seconds |
| ``DateTime64(3)``, milliseconds | ``TimestampType()`` | ``DateTime64(6)``, microseconds | **precision loss** [4]_ |
+-----------------------------------+--------------------------------------+----------------------------------+ |
| ``DateTime64(6)``, microseconds | ``TimestampType()`` | ``DateTime64(6)``, microseconds | |
+-----------------------------------+--------------------------------------+----------------------------------+ |
| ``DateTime64(7..9)``, nanoseconds | ``TimestampType()`` | ``DateTime64(6)`` | |
| | | microseconds | |
| | | **precision loss** [4]_ | |
| ``DateTime32``, seconds | ``TimestampType()``, microseconds | ``DateTime64(6)``, microseconds | ``DateTime32``, seconds |
+-----------------------------------+--------------------------------------+----------------------------------+-------------------------------+
| ``-`` | ``TimestampNTZType()`` | ``DateTime64(6)`` | |
| ``DateTime64(3)``, milliseconds | ``TimestampType()``, microseconds | ``DateTime64(6)``, microseconds | ``DateTime32``, seconds, |
| | | | **precision loss** [5]_ |
+-----------------------------------+--------------------------------------+----------------------------------+-------------------------------+
| ``IntervalNanosecond`` | ``LongType()`` | ```Int64`` | ``Int64`` |
| ``DateTime64(6)``, microseconds | ``TimestampType()``, microseconds | | ``DateTime32``, seconds, |
+-----------------------------------+--------------------------------------+ | **precision loss** [7]_ |
| ``DateTime64(7..9)``, nanoseconds | ``TimestampType()``, microseconds, | | |
| | **precision loss** [6]_ | | |
| | | | |
+-----------------------------------+--------------------------------------+ | |
| ``-`` | ``TimestampNTZType()``, microseconds | | |
+-----------------------------------+--------------------------------------+----------------------------------+-------------------------------+
| ``DateTime32(TZ)`` | unsupported [7]_ | | |
+-----------------------------------+ | | |
| ``DateTime64(P, TZ)`` | | | |
+-----------------------------------+--------------------------------------+----------------------------------+-------------------------------+
| ``IntervalNanosecond`` | ``LongType()`` | ``Int64`` | ``Int64`` |
+-----------------------------------+ | | |
| ``IntervalMicrosecond`` | | | |
+-----------------------------------+ | | |
Expand Down Expand Up @@ -262,17 +267,27 @@ Notes:
* `Spark TimestampType documentation <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/TimestampType.html>`_

.. [4]
Clickhouse support datetime up to nanoseconds precision (``23:59:59.999999999``),
but Spark ``TimestampType()`` supports datetime up to microseconds precision (``23:59:59.999999``).
Nanoseconds will be lost during read or write operations.
``Date32`` has different bytes representation than ``Date``, and inserting value of type ``Date32`` to ``Date`` column
leads to errors on Clickhouse side, e.g. ``Date(106617) should be between 0 and 65535 inclusive of both values``.
Although Spark does properly read the ``Date32`` column as ``DateType()``, and there should be no difference at all.
Probably this is some bug in Clickhouse driver.
.. [5]
Generic JDBC dialect generates DDL with Clickhouse type ``TIMESTAMP`` which is alias for ``DateTime32`` with precision up to seconds (``23:59:59``).
Inserting data with milliseconds precision (``23:59:59.999``) will lead to **throwing away milliseconds**.
Solution: create table manually, with proper column type.
.. [6]
Clickhouse will raise an exception that data in format ``2001-01-01 23:59:59.999999`` has data ``.999999`` which does not match format ``YYYY-MM-DD hh:mm:ss``.
So you can create Clickhouse table with Spark, but cannot write data to column of this type.
Clickhouse support datetime up to nanoseconds precision (``23:59:59.999999999``),
but Spark ``TimestampType()`` supports datetime up to microseconds precision (``23:59:59.999999``).
Nanoseconds will be lost during read or write operations.
Solution: create table manually, with proper column type.
.. [7]
Clickhouse will raise an exception that data in format ``2001-01-01 23:59:59.999999`` has data ``.999999`` which does not match format ``YYYY-MM-DD hh:mm:ss``
of ``DateTime32`` column type (see [5]_).
So Spark can create Clickhouse table, but cannot write data to column of this type.
Solution: create table manually, with proper column type.
String types
~~~~~~~~~~~~~
Expand All @@ -291,6 +306,8 @@ String types
| ``IPv4`` | | | |
+--------------------------------------+ | | |
| ``IPv6`` | | | |
+--------------------------------------+ | | |
| ``UUID`` | | | |
+--------------------------------------+------------------+ | |
| ``-`` | ``BinaryType()`` | | |
+--------------------------------------+------------------+------------------------+--------------------------+
Expand All @@ -311,7 +328,6 @@ Columns of these Clickhouse types cannot be read by Spark:
* ``Ring``
* ``SimpleAggregateFunction(func, T)``
* ``Tuple(T1, T2, ...)``
* ``UUID``

Dataframe with these Spark types be written to Clickhouse:
* ``ArrayType(T)``
Expand Down Expand Up @@ -359,9 +375,10 @@ For parsing JSON columns in ClickHouse, :obj:`JSON.parse_column <onetl.file.form
# Spark requires all columns to have some specific type, describe it
column_type = ArrayType(IntegerType())
json = JSON()
df = df.select(
df.id,
JSON().parse_column("array_column", column_type),
json.parse_column("array_column", column_type),
)
``DBWriter``
Expand Down Expand Up @@ -389,9 +406,10 @@ For writing JSON data to ClickHouse, use the :obj:`JSON.serialize_column <onetl.
""",
)
json = JSON()
df = df.select(
df.id,
JSON().serialize_column(df.array_column).alias("array_column_json"),
json.serialize_column(df.array_column).alias("array_column_json"),
)
writer.run(df)
Expand Down
20 changes: 10 additions & 10 deletions docs/connection/db_connection/kafka/prerequisites.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ Connection protocol
~~~~~~~~~~~~~~~~~~~

Kafka can support different connection protocols. List of currently supported protocols:
* :obj:`PLAINTEXT <onetl.connection.db_connection.kafka.kafka_plaintext_protocol.KafkaPlaintextProtocol>` (not secure)
* :obj:`SSL <onetl.connection.db_connection.kafka.kafka_ssl_protocol.KafkaSSLProtocol>` (secure, recommended)
* :obj:`PLAINTEXT <onetl.connection.db_connection.kafka.kafka_plaintext_protocol.KafkaPlaintextProtocol>` (not secure)
* :obj:`SSL <onetl.connection.db_connection.kafka.kafka_ssl_protocol.KafkaSSLProtocol>` (secure, recommended)

Note that specific port can listen for only one of these protocols, so it is important to set
proper port number + protocol combination.
Expand All @@ -47,24 +47,24 @@ Authentication mechanism

Kafka can support different authentication mechanism (also known as `SASL <https://en.wikipedia.org/wiki/Simple_Authentication_and_Security_Layer>`_).
List of currently supported mechanisms:
* :obj:`PLAIN <onetl.connection.db_connection.kafka.kafka_basic_auth.KafkaBasicAuth>`. To no confuse this with ``PLAINTEXT`` connection protocol, onETL uses name ``BasicAuth``.
* :obj:`GSSAPI <onetl.connection.db_connection.kafka.kafka_kerberos_auth.KafkaKerberosAuth>`. To simplify naming, onETL uses name ``KerberosAuth``.
* :obj:`SCRAM-SHA-256 or SCRAM-SHA-512 <onetl.connection.db_connection.kafka.kafka_scram_auth.KafkaScramAuth>` (recommended).
* :obj:`PLAIN <onetl.connection.db_connection.kafka.kafka_basic_auth.KafkaBasicAuth>`. To no confuse this with ``PLAINTEXT`` connection protocol, onETL uses name ``BasicAuth``.
* :obj:`GSSAPI <onetl.connection.db_connection.kafka.kafka_kerberos_auth.KafkaKerberosAuth>`. To simplify naming, onETL uses name ``KerberosAuth``.
* :obj:`SCRAM-SHA-256 or SCRAM-SHA-512 <onetl.connection.db_connection.kafka.kafka_scram_auth.KafkaScramAuth>` (recommended).

Different mechanisms use different types of credentials (login + password, keytab file, and so on).

Note that connection protocol and auth mechanism are set in pairs:
* If you see ``SASL_PLAINTEXT`` this means ``PLAINTEXT`` connection protocol + some auth mechanism.
* If you see ``SASL_SSL`` this means ``SSL`` connection protocol + some auth mechanism.
* If you see just ``PLAINTEXT`` or ``SSL`` (**no** ``SASL``), this means that authentication is disabled (anonymous access).
* If you see ``SASL_PLAINTEXT`` this means ``PLAINTEXT`` connection protocol + some auth mechanism.
* If you see ``SASL_SSL`` this means ``SSL`` connection protocol + some auth mechanism.
* If you see just ``PLAINTEXT`` or ``SSL`` (**no** ``SASL``), this means that authentication is disabled (anonymous access).

Please contact your Kafka administrator to get details about enabled auth mechanism in a specific Kafka instance.

Required grants
~~~~~~~~~~~~~~~

Ask your Kafka administrator to set following grants for a user, *if Kafka instance uses ACL*:
* ``Describe`` + ``Read`` for reading data from Kafka (Consumer).
* ``Describe`` + ``Write`` for writing data from Kafka (Producer).
* ``Describe`` + ``Read`` for reading data from Kafka (Consumer).
* ``Describe`` + ``Write`` for writing data from Kafka (Producer).

More details can be found in `documentation <https://kafka.apache.org/documentation/#operations_in_kafka>`_.
2 changes: 1 addition & 1 deletion docs/connection/db_connection/postgres/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ This is how Postgres connector performs this:

* Get names of columns in DataFrame. [1]_
* Perform ``SELECT * FROM table LIMIT 0`` query.
* Take only columns present in DataFrame (by name, case insensitive). For each found column get Clickhouse type.
* Take only columns present in DataFrame (by name, case insensitive) [2]_. For each found column get Postgres type.
* Find corresponding ``Spark type`` -> ``Postgres type (write)`` combination (see below) for each DataFrame column. If no combination is found, raise exception.
* If ``Postgres type (write)`` match ``Postgres type (read)``, no additional casts will be performed, DataFrame column will be written to Postgres as is.
* If ``Postgres type (write)`` does not match ``Postgres type (read)``, DataFrame column will be casted to target column type **on Postgres side**.
Expand Down
2 changes: 1 addition & 1 deletion docs/file/file_downloader/result.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ File Downloader Result
.. currentmodule:: onetl.file.file_downloader.result

.. autoclass:: DownloadResult
:members: successful, failed, skipped, missing, successful_count, failed_count, skipped_count, missing_count, total_count, successful_size, failed_size, skipped_size, total_size, raise_if_failed, reraise_failed, raise_if_missing, raise_if_skipped, raise_if_empty, is_empty, raise_if_contains_zero_size, details, summary, dict, json
:members: successful, failed, skipped, missing, successful_count, failed_count, skipped_count, missing_count, total_count, successful_size, failed_size, skipped_size, total_size, raise_if_failed, raise_if_missing, raise_if_skipped, raise_if_empty, is_empty, raise_if_contains_zero_size, details, summary, dict, json
2 changes: 1 addition & 1 deletion docs/file/file_mover/result.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ File Mover Result
.. currentmodule:: onetl.file.file_mover.result

.. autoclass:: MoveResult
:members: successful, failed, skipped, missing, successful_count, failed_count, skipped_count, missing_count, total_count, successful_size, failed_size, skipped_size, total_size, raise_if_failed, reraise_failed, raise_if_missing, raise_if_skipped, raise_if_empty, is_empty, raise_if_contains_zero_size, details, summary, dict, json
:members: successful, failed, skipped, missing, successful_count, failed_count, skipped_count, missing_count, total_count, successful_size, failed_size, skipped_size, total_size, raise_if_failed, raise_if_missing, raise_if_skipped, raise_if_empty, is_empty, raise_if_contains_zero_size, details, summary, dict, json
Loading

0 comments on commit ffe408d

Please sign in to comment.