Skip to content

Commit

Permalink
[DOP-14042] Improve Hive documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed May 22, 2024
1 parent 1f73652 commit a92cf1d
Show file tree
Hide file tree
Showing 14 changed files with 558 additions and 127 deletions.
2 changes: 1 addition & 1 deletion docs/connection/db_connection/clickhouse/execute.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Syntax support

This method supports **any** query syntax supported by Clickhouse, like:

* ✅︎ ``CREATE TABLE ...``, ``CREATE VIEW ...``, ``DROP TABLE ...``, and so on
* ✅︎ ``CREATE TABLE ...``, ``CREATE VIEW ...``, and so on
* ✅︎ ``ALTER ...``
* ✅︎ ``INSERT INTO ... SELECT ...``, ``UPDATE ...``, ``DELETE ...``, and so on
* ✅︎ ``DROP TABLE ...``, ``DROP VIEW ...``, and so on
Expand Down
2 changes: 1 addition & 1 deletion docs/connection/db_connection/greenplum/execute.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Syntax support

This method supports **any** query syntax supported by Greenplum, like:

* ✅︎ ``CREATE TABLE ...``, ``CREATE VIEW ...``, ``DROP TABLE ...``, and so on
* ✅︎ ``CREATE TABLE ...``, ``CREATE VIEW ...``, and so on
* ✅︎ ``ALTER ...``
* ✅︎ ``INSERT INTO ... SELECT ...``, ``UPDATE ...``, ``DELETE ...``, and so on
* ✅︎ ``DROP TABLE ...``, ``DROP VIEW ...``, and so on
Expand Down
3 changes: 2 additions & 1 deletion docs/connection/db_connection/greenplum/read.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ Supported DBReader features
In case of Greenplum connector, ``DBReader`` does not generate raw ``SELECT`` query. Instead it relies on Spark SQL syntax
which in some cases (using column projection and predicate pushdown) can be converted to Greenplum SQL.

So ``columns``, ``where`` and ``hwm.expression`` should be specified in Spark SQL syntax, not Greenplum SQL.
So ``columns``, ``where`` and ``hwm.expression`` should be specified in `Spark SQL <https://spark.apache.org/docs/latest/sql-ref-syntax.html>`_ syntax,
not Greenplum SQL.

This is OK:

Expand Down
47 changes: 46 additions & 1 deletion docs/connection/db_connection/hive/execute.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,52 @@
.. _hive-execute:

Executing statements in Hive
============================
=============================

Use ``Hive.execute(...)`` to execute DDL and DML operations.

Syntax support
^^^^^^^^^^^^^^

This method supports **any** query syntax supported by Hive, like:

* ✅︎ ``CREATE TABLE ...``, ``CREATE VIEW ...``, and so on
* ✅︎ ``LOAD DATA ...``, and so on
* ✅︎ ``ALTER ...``
* ✅︎ ``INSERT INTO ... SELECT ...``, and so on
* ✅︎ ``DROP TABLE ...``, ``DROP VIEW ...``, and so on
* ✅︎ ``MSCK REPAIR TABLE ...``, and so on
* ✅︎ other statements not mentioned here
* ❌ ``SET ...; SELECT ...;`` - multiple statements not supported

.. warning::

Actually, query should be written using `SparkSQL <https://spark.apache.org/docs/latest/sql-ref-syntax.html#ddl-statements>`_ syntax, not HiveQL.

Examples
^^^^^^^^

.. code-block:: python
from onetl.connection import Hive
hive = Hive(...)
hive.execute("DROP TABLE schema.table")
hive.execute(
"""
CREATE TABLE schema.table AS (
id NUMBER,
key VARCHAR,
value DOUBLE
)
PARTITION BY (business_date DATE)
STORED AS orc
"""
)
Details
-------

.. currentmodule:: onetl.connection.db_connection.hive.connection

Expand Down
2 changes: 2 additions & 0 deletions docs/connection/db_connection/hive/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ Hive
:maxdepth: 1
:caption: Connection

prerequisites
connection

.. toctree::
:maxdepth: 1
:caption: Operations

read
sql
write
execute

Expand Down
130 changes: 130 additions & 0 deletions docs/connection/db_connection/hive/prerequisites.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
.. _hive-prerequisites:

Prerequisites
=============

.. note::

onETL's Hive connection is actually SparkSession with access to `Hive Thrift Metastore <https://docs.cloudera.com/cdw-runtime/1.5.0/hive-hms-overview/topics/hive-hms-introduction.html>`_
and HDFS/S3.
All data motion is made using Spark. Hive Metastore is used only to store tables and partitions metadata.

This connector does **NOT** require Hive server. It also does **NOT** use Hive JDBC connector.

Version Compatibility
---------------------

* Hive Metastore version: 0.12 - 3.1.3 (may require to add proper .jar file explicitly)
* Spark versions: 2.3.x - 3.5.x
* Java versions: 8 - 20

See `official documentation <https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html>`_.


Installing PySpark
------------------

To use Hive connector you should have PySpark installed (or injected to ``sys.path``)
BEFORE creating the connector instance.

See :ref:`install-spark` installation instruction for more details.

Connecting to Hive Metastore
----------------------------

.. note::

If you're using managed Hadoop cluster, skip this step, as all Spark configs are should already present on the host.

Create ``$SPARK_CONF_DIR/hive-site.xml`` with Hive Metastore URL:

.. code:: xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://metastore.host.name:9083</value>
</property>
</configuration>
Create ``$SPARK_CONF_DIR/core-site.xml`` with warehouse location ,e.g. HDFS IPC port of Hadoop namenode, or S3 bucket address & credentials:

.. tabs::

.. code-tab:: xml HDFS

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://myhadoopcluster:9820</value>
</property>
</configuration>

.. code-tab:: xml S3

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
!-- See https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#General_S3A_Client_configuration
<property>
<name>fs.defaultFS</name>
<value>s3a://mys3bucket/</value>
</property>
<property>
<name>fs.s3a.bucket.mybucket.endpoint</name>
<value>http://s3.somain</value>
</property>
<property>
<name>fs.s3a.bucket.mybucket.connection.ssl.enabled</name>
<value>false</value>
</property>
<property>
<name>fs.s3a.bucket.mybucket.path.style.access</name>
<value>true</value>
</property>
<property>
<name>fs.s3a.bucket.mybucket.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
</property>
<property>
<name>fs.s3a.bucket.mybucket.access.key</name>
<value>some-user</value>
</property>
<property>
<name>fs.s3a.bucket.mybucket.secret.key</name>
<value>mysecrettoken</value>
</property>
</configuration>

Using Kerberos
--------------

Some of Hadoop managed clusters use Kerberos authentication. In this case, you should call `kinit <https://web.mit.edu/kerberos/krb5-1.12/doc/user/user_commands/kinit.html>`_ command
**BEFORE** starting Spark session to generate Kerberos ticket. See :ref:`install-kerberos`.

Sometimes it is also required to pass keytab file to Spark config, allowing Spark executors to generate own Kerberos tickets:

.. tabs::

.. code-tab:: python Spark 3

SparkSession.builder
.option("spark.kerberos.access.hadoopFileSystems", "hdfs://namenode1.domain.com:9820,hdfs://namenode2.domain.com:9820")
.option("spark.kerberos.principal", "user")
.option("spark.kerberos.keytab", "/path/to/keytab")
.gerOrCreate()

.. code-tab:: python Spark 2

SparkSession.builder
.option("spark.yarn.access.hadoopFileSystems", "hdfs://namenode1.domain.com:9820,hdfs://namenode2.domain.com:9820")
.option("spark.yarn.principal", "user")
.option("spark.yarn.keytab", "/path/to/keytab")
.gerOrCreate()

See `Spark security documentation <https://spark.apache.org/docs/latest/security.html#kerberos>`_
for more details.
101 changes: 94 additions & 7 deletions docs/connection/db_connection/hive/read.rst
Original file line number Diff line number Diff line change
@@ -1,13 +1,100 @@
.. _hive-read:

Reading from Hive
=================
Reading from Hive using ``DBReader``
====================================

There are 2 ways of distributed data reading from Hive:
:obj:`DBReader <onetl.db.db_reader.db_reader.DBReader>` supports :ref:`strategy` for incremental data reading,
but does not support custom queries, like ``JOIN``.

* Using :obj:`DBReader <onetl.db.db_reader.db_reader.DBReader>` with different :ref:`strategy`
* Using :obj:`Hive.sql <onetl.connection.db_connection.hive.connection.Hive.sql>`
Supported DBReader features
---------------------------

.. currentmodule:: onetl.connection.db_connection.hive.connection
* ✅︎ ``columns``
* ✅︎ ``where``
* ✅︎ ``hwm``, supported strategies:
* * ✅︎ :ref:`snapshot-strategy`
* * ✅︎ :ref:`incremental-strategy`
* * ✅︎ :ref:`snapshot-batch-strategy`
* * ✅︎ :ref:`incremental-batch-strategy`
* ❌ ``hint`` (is not supported by Hive)
* ❌ ``df_schema``
* ❌ ``options`` (only Spark config params are used)

.. automethod:: Hive.sql
.. warning::

Actually, ``columns``, ``where`` and ``hwm.expression`` should be written using `SparkSQL <https://spark.apache.org/docs/latest/sql-ref-syntax.html#data-retrieval-statements>`_ syntax,
not HiveQL.

Examples
--------

Snapshot strategy:

.. code-block:: python
from onetl.connection import Hive
from onetl.db import DBReader
hive = Hive(...)
reader = DBReader(
connection=hive,
source="schema.table",
columns=["id", "key", "CAST(value AS text) value", "updated_dt"],
where="key = 'something'",
)
df = reader.run()
Incremental strategy:

.. code-block:: python
from onetl.connection import Hive
from onetl.db import DBReader
from onetl.strategy import IncrementalStrategy
hive = Hive(...)
reader = DBReader(
connection=hive,
source="schema.table",
columns=["id", "key", "CAST(value AS text) value", "updated_dt"],
where="key = 'something'",
hwm=DBReader.AutoDetectHWM(name="hive_hwm", expression="updated_dt"),
)
with IncrementalStrategy():
df = reader.run()
Recommendations
---------------

Use column-based file formats
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Prefer these file formats:
* `ORC <https://spark.apache.org/docs/latest/sql-data-sources-orc.html>`_
* `Parquet <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html>`_
* `Iceberg <https://iceberg.apache.org/spark-quickstart/>`_
* `Hudi <https://hudi.apache.org/docs/quick-start-guide/>`_
* `Delta <https://docs.delta.io/latest/quick-start.html#set-up-apache-spark-with-delta-lake>`_

For colum-based file formats, each file contains separated sections there column data is stored. The file footer contains
location of each column section/group. Spark can use this information to load only sections required by specific query, e.g. only selected columns,
to drastically speed up the query.

Another advantage is high compression ratio, e.g. 10x-100x in comparison to JSON or CSV.

Select only required columns
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Instead of passing ``"*"`` in ``DBReader(columns=[...])`` prefer passing exact column names.
This drastically reduces the amount of data read by Spark, **if column-based file formats are used**.

Use partition columns in ``where`` clause
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Queries should include ``WHERE`` clause with filters on Hive partitioning columns.
This allows Spark to read only small set of files (*partition pruning*) instead of scanning the entire table, so this drastically increases performance.

Supported operators are: ``=``, ``>``, ``<`` and ``BETWEEN``, and only against some **static** value.
Loading

0 comments on commit a92cf1d

Please sign in to comment.