Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-14059] Add Troubleshooting guide #275

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/275.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add generic ``Troubleshooting`` guide.
20 changes: 5 additions & 15 deletions docs/connection/file_df_connection/spark_s3/troubleshooting.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
Spark S3 Troubleshooting
========================

.. note::

General guide: :ref:`troubleshooting`.

More details:

* `Hadoop AWS Troubleshooting Guide <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/troubleshooting_s3a.html>`_
Expand Down Expand Up @@ -34,12 +38,7 @@ How to determine reason
Make logging more verbose
^^^^^^^^^^^^^^^^^^^^^^^^^

Change Spark session log level to ``DEBUG`` to print result of each attempt:

.. code:: python

spark.sparkContext.setLogLevel("debug")

Change Spark session log level to :ref:`DEBUG <spark-troubleshooting>` to print result of each attempt.
Resulting logs will look like this

.. dropdown:: See log
Expand Down Expand Up @@ -171,15 +170,6 @@ Resulting logs will look like this
23/08/03 11:25:10 DEBUG request: Retrying Request: GET https://test-bucket.localhost:9000 / Parameters: ({"list-type":["2"],"delimiter":["/"],"max-keys":["2"],"prefix":["fake/"],"fetch-owner":["false"]}Headers: (amz-sdk-invocation-id: e6d62603-96e4-a80f-10a1-816e0822bc71, Content-Type: application/octet-stream, User-Agent: Hadoop 3.3.4, aws-sdk-java/1.12.262 Linux/6.4.7-1-MANJARO OpenJDK_64-Bit_Server_VM/25.292-b10 java/1.8.0_292 scala/2.12.17 vendor/AdoptOpenJDK cfg/retry-mode/legacy, )
23/08/03 11:25:10 DEBUG AmazonHttpClient: Retriable error detected, will retry in 49ms, attempt number: 0

After getting all information you need, make logs less verbose:

.. code:: python

spark.sparkContext.setLogLevel("info")

# or
spark.sparkContext.setLogLevel("warn")

Change number of retries
^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
9 changes: 2 additions & 7 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
install/index
quickstart
concepts
logging
troubleshooting/index

.. toctree::
:maxdepth: 3
Expand Down Expand Up @@ -62,13 +64,6 @@
hooks/index
plugins

.. toctree::
:maxdepth: 2
:caption: Misc
:hidden:

logging

.. toctree::
:maxdepth: 2
:caption: Development
Expand Down
152 changes: 151 additions & 1 deletion docs/logging.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,157 @@
.. _logging:

Logging
=========
=======

Logging is quite important to understant what's going on under the hood of onETL.

Default logging level for Python interpreters is ``WARNING``,
but most of onETL logs are in ``INFO`` level, so users usually don't see much.

To change logging level, there is a function :obj:`setup_logging <onetl.log.setup_logging>`
which should be called at the top of the script:

.. code:: python

from onetl.log import setup_logging
from other.lib import some, more, imports

setup_logging()

# rest of code
...

This changes both log level and log formatting to something like this:

.. dropdown:: See logs

.. code:: text

2024-04-12 10:12:10,834 [INFO ] MainThread: |onETL| Using IncrementalStrategy as a strategy
2024-04-12 10:12:10,835 [INFO ] MainThread: =================================== DBReader.run() starts ===================================
2024-04-12 10:12:10,835 [INFO ] MainThread: |DBReader| Getting Spark type for HWM expression: 'updated_at'
2024-04-12 10:12:10,836 [INFO ] MainThread: |MSSQL| Fetching schema of table 'source_schema.table' ...
2024-04-12 10:12:11,636 [INFO ] MainThread: |MSSQL| Schema fetched.
2024-04-12 10:12:11,642 [INFO ] MainThread: |DBReader| Got Spark field: StructField('updated_at', TimestampType(), True)
2024-04-12 10:12:11,642 [INFO ] MainThread: |DBReader| Detected HWM type: 'ColumnDateTimeHWM'
2024-04-12 10:12:11,643 [INFO ] MainThread: |IncrementalStrategy| Fetching HWM from HorizonHWMStore:
2024-04-12 10:12:11,643 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb'
2024-04-12 10:12:12,181 [INFO ] MainThread: |IncrementalStrategy| Fetched HWM:
2024-04-12 10:12:12,182 [INFO ] MainThread: hwm = ColumnDateTimeHWM(
2024-04-12 10:12:12,182 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
2024-04-12 10:12:12,182 [INFO ] MainThread: entity = 'source_schema.table',
2024-04-12 10:12:12,182 [INFO ] MainThread: expression = 'updated_at',
2024-04-12 10:12:12,184 [INFO ] MainThread: value = datetime.datetime(2024, 4, 11, 18, 10, 2, 120000),
2024-04-12 10:12:12,184 [INFO ] MainThread: )
2024-04-12 10:12:12,184 [INFO ] MainThread: |MSSQL| -> |Spark| Reading DataFrame from source using parameters:
2024-04-12 10:12:12,185 [INFO ] MainThread: source = 'source_schema.table'
2024-04-12 10:12:12,185 [INFO ] MainThread: columns = [
2024-04-12 10:12:12,185 [INFO ] MainThread: 'id',
2024-04-12 10:12:12,186 [INFO ] MainThread: 'new_value',
2024-04-12 10:12:12,186 [INFO ] MainThread: 'old_value',
2024-04-12 10:12:12,186 [INFO ] MainThread: 'updated_at',
2024-04-12 10:12:12,186 [INFO ] MainThread: ]
2024-04-12 10:12:12,187 [INFO ] MainThread: where = "field = 'some'"
2024-04-12 10:12:12,187 [INFO ] MainThread: hwm = AutoDetectHWM(
2024-04-12 10:12:12,187 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
2024-04-12 10:12:12,187 [INFO ] MainThread: entity = 'source_schema.table',
2024-04-12 10:12:12,187 [INFO ] MainThread: expression = 'updated_at',
2024-04-12 10:12:12,188 [INFO ] MainThread: )
2024-04-12 10:12:12,188 [INFO ] MainThread: options = {
2024-04-12 10:12:12,188 [INFO ] MainThread: 'fetchsize': 100000,
2024-04-12 10:12:12,188 [INFO ] MainThread: 'numPartitions': 1,
2024-04-12 10:12:12,189 [INFO ] MainThread: 'partitioningMode': 'range',
2024-04-12 10:12:12,189 [INFO ] MainThread: }
2024-04-12 10:12:12,189 [INFO ] MainThread: |MSSQL| Checking connection availability...
2024-04-12 10:12:12,189 [INFO ] MainThread: |MSSQL| Using connection parameters:
2024-04-12 10:12:12,190 [INFO ] MainThread: user = 'db_user'
2024-04-12 10:12:12,190 [INFO ] MainThread: password = SecretStr('**********')
2024-04-12 10:12:12,190 [INFO ] MainThread: host = 'mssql.host'
2024-04-12 10:12:12,190 [INFO ] MainThread: port = 1433
2024-04-12 10:12:12,191 [INFO ] MainThread: database = 'somedb'
2024-04-12 10:12:12,191 [INFO ] MainThread: extra = {'ApplicationIntent': 'ReadOnly', 'trustServerCertificate': 'true'}
2024-04-12 10:12:12,191 [INFO ] MainThread: jdbc_url = 'jdbc:sqlserver:/mssql.host:1433'
2024-04-12 10:12:12,579 [INFO ] MainThread: |MSSQL| Connection is available.
2024-04-12 10:12:12,581 [INFO ] MainThread: |MSSQL| Executing SQL query (on driver):
2024-04-12 10:12:12,581 [INFO ] MainThread: SELECT
2024-04-12 10:12:12,581 [INFO ] MainThread: MIN(updated_at) AS "min",
2024-04-12 10:12:12,582 [INFO ] MainThread: MAX(updated_at) AS "max"
2024-04-12 10:12:12,582 [INFO ] MainThread: FROM
2024-04-12 10:12:12,582 [INFO ] MainThread: source_schema.table
2024-04-12 10:12:12,582 [INFO ] MainThread: WHERE
2024-04-12 10:12:12,582 [INFO ] MainThread: (field = 'some')
2024-04-12 10:12:12,583 [INFO ] MainThread: AND
2024-04-12 10:12:12,583 [INFO ] MainThread: (updated_at >= CAST('2024-04-11T18:10:02.120000' AS datetime2))
2024-04-12 10:16:22,537 [INFO ] MainThread: |MSSQL| Received values:
2024-04-12 10:16:22,538 [INFO ] MainThread: MIN(updated_at) = datetime.datetime(2024, 4, 11, 21, 10, 7, 397000)
2024-04-12 10:16:22,538 [INFO ] MainThread: MAX(updated_at) = datetime.datetime(2024, 4, 12, 13, 12, 2, 123000)
2024-04-12 10:16:22,540 [INFO ] MainThread: |MSSQL| Executing SQL query (on executor):
2024-04-12 10:16:22,540 [INFO ] MainThread: SELECT
2024-04-12 10:16:22,540 [INFO ] MainThread: id,
2024-04-12 10:16:22,541 [INFO ] MainThread: new_value,
2024-04-12 10:16:22,541 [INFO ] MainThread: old_value,
2024-04-12 10:16:22,541 [INFO ] MainThread: updated_at
2024-04-12 10:16:22,541 [INFO ] MainThread: FROM
2024-04-12 10:16:22,541 [INFO ] MainThread: source_schema.table
2024-04-12 10:16:22,542 [INFO ] MainThread: WHERE
2024-04-12 10:16:22,542 [INFO ] MainThread: (field = 'some')
2024-04-12 10:16:22,542 [INFO ] MainThread: AND
2024-04-12 10:16:22,542 [INFO ] MainThread: (updated_at > CAST('2024-04-11T18:10:02.120000' AS datetime2))
2024-04-12 10:16:22,542 [INFO ] MainThread: AND
2024-04-12 10:16:22,542 [INFO ] MainThread: (updated_at <= CAST('2024-04-12T13:12:02.123000' AS datetime2))
2024-04-12 10:16:22,892 [INFO ] MainThread: |Spark| DataFrame successfully created from SQL statement
2024-04-12 10:16:22,892 [INFO ] MainThread: ------------------------------------ DBReader.run() ends ------------------------------------
2024-04-12 10:40:42,409 [INFO ] MainThread: =================================== DBWriter.run() starts ===================================
2024-04-12 10:40:42,409 [INFO ] MainThread: |Spark| -> |Hive| Writing DataFrame to target using parameters:
2024-04-12 10:40:42,410 [INFO ] MainThread: target = 'target_source_schema.table'
2024-04-12 10:40:42,410 [INFO ] MainThread: options = {
2024-04-12 10:40:42,410 [INFO ] MainThread: 'mode': 'append',
2024-04-12 10:40:42,410 [INFO ] MainThread: 'format': 'orc',
2024-04-12 10:40:42,410 [INFO ] MainThread: 'partitionBy': 'part_dt',
2024-04-12 10:40:42,410 [INFO ] MainThread: }
2024-04-12 10:40:42,411 [INFO ] MainThread: df_schema:
2024-04-12 10:40:42,412 [INFO ] MainThread: root
2024-04-12 10:40:42,412 [INFO ] MainThread: |-- id: integer (nullable = true)
2024-04-12 10:40:42,413 [INFO ] MainThread: |-- new_value: string (nullable = true)
2024-04-12 10:40:42,413 [INFO ] MainThread: |-- old_value: string (nullable = true)
2024-04-12 10:40:42,413 [INFO ] MainThread: |-- updated_at: timestamp (nullable = true)
2024-04-12 10:40:42,413 [INFO ] MainThread: |-- part_dt: date (nullable = true)
2024-04-12 10:40:42,414 [INFO ] MainThread:
2024-04-12 10:40:42,421 [INFO ] MainThread: |Hive| Checking connection availability...
2024-04-12 10:40:42,421 [INFO ] MainThread: |Hive| Using connection parameters:
2024-04-12 10:40:42,421 [INFO ] MainThread: cluster = 'dwh'
2024-04-12 10:40:42,475 [INFO ] MainThread: |Hive| Connection is available.
2024-04-12 10:40:42,476 [INFO ] MainThread: |Hive| Fetching schema of table 'target_source_schema.table' ...
2024-04-12 10:40:43,518 [INFO ] MainThread: |Hive| Schema fetched.
2024-04-12 10:40:43,521 [INFO ] MainThread: |Hive| Table 'target_source_schema.table' already exists
2024-04-12 10:40:43,521 [WARNING ] MainThread: |Hive| User-specified options {'partitionBy': 'part_dt'} are ignored while inserting into existing table. Using only table parameters from Hive metastore
2024-04-12 10:40:43,782 [INFO ] MainThread: |Hive| Inserting data into existing table 'target_source_schema.table' ...
2024-04-12 11:06:07,396 [INFO ] MainThread: |Hive| Data is successfully inserted into table 'target_source_schema.table'.
2024-04-12 11:06:07,397 [INFO ] MainThread: ------------------------------------ DBWriter.run() ends ------------------------------------
2024-04-12 11:06:07,397 [INFO ] MainThread: |onETL| Exiting IncrementalStrategy
2024-04-12 11:06:07,397 [INFO ] MainThread: |IncrementalStrategy| Saving HWM to 'HorizonHWMStore':
2024-04-12 11:06:07,397 [INFO ] MainThread: hwm = ColumnDateTimeHWM(
2024-04-12 11:06:07,397 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb',
2024-04-12 11:06:07,397 [INFO ] MainThread: entity = 'source_source_schema.table',
2024-04-12 11:06:07,397 [INFO ] MainThread: expression = 'updated_at',
2024-04-12 11:06:07,397 [INFO ] MainThread: value = datetime.datetime(2024, 4, 12, 13, 12, 2, 123000),
2024-04-12 11:06:07,397 [INFO ] MainThread: )
2024-04-12 11:06:07,495 [INFO ] MainThread: |IncrementalStrategy| HWM has been saved

Each step performed by onETL is extensively logged, which should help with debugging.

You can make logs even more verbose by changing level to ``DEBUG``:

.. code:: python

from onetl.log import setup_logging

setup_logging(level="DEBUG", enable_clients=True)

# rest of code
...

This also changes log level for all underlying Python libraries, e.g. showing each HTTP request being made, and so on.

.. currentmodule:: onetl.log

Expand Down
25 changes: 25 additions & 0 deletions docs/troubleshooting/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
.. _troubleshooting:

Troubleshooting
===============

In case of error please follow instructions below:

* Read the logs or exception messages you've faced with.
* If Python logs are note verbose enough, :ref:`increase the log level <logging>`.
* If Spark logs are note verbose enough, :ref:`increase the log level <troubleshooting-spark>`.
* Read documentation related to a class or method you are using.
* `Google <https://google.com>`_ the error message, and carefully read the search result:
* `StackOverflow <https://stackoverflow.com/>`_ answers.
* `Spark <https://spark.apache.org/docs/latest/>`_ documentation.
* Documentation of database or filesystem you are connecting to.
* Documentation of underlying connector.
* Search for known `issues <https://github.com/MobileTeleSystems/onetl/issues>`_, or create a new one.
* Always use the most resent versions of onETL, PySpark and connector packages, :ref:`compatible with your environment <install-spark>`.

.. toctree::
:maxdepth: 3
:caption: Troubleshooting
:hidden:

spark
79 changes: 79 additions & 0 deletions docs/troubleshooting/spark.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
.. _troubleshooting-spark:

Spark Troubleshooting
=====================

Restarting Spark session
------------------------

Sometimes it is required to stop current Spark session and start a new one, e.g. to add some .jar packages, or change session config.
But PySpark not only starts Spark session, but also starts Java virtual machine (JVM) process in the background,
which. So calling ``sparkSession.stop()`` `does not shutdown JVM <https://issues.apache.org/jira/browse/SPARK-47740>`_,
and this can cause some issue.

Also apart from JVM properties, stopping Spark session does not clear Spark context, which is a global object. So new
Spark sessions are created using the same context object, and thus using the same Spark config options.

To properly stop Spark session, it is **required** to:
* Stop Spark session by calling ``sparkSession.stop()``.
* **STOP PYTHON INTERPRETER**, e.g. by calling ``sys.exit()``.
* Start new Python interpreter.
* Start new Spark session with config options you need.

Skipping some of these steps can lead to issues with creating new Spark session.

Driver log level
----------------

Default logging level for Spark session is ``WARN``. To show more verbose logs, use:

.. code:: python

spark.sparkContext.setLogLevel("INFO")

or increase verbosity even more:

.. code:: python

spark.sparkContext.setLogLevel("DEBUG")

After getting all information you need, you can return back the previous log level:

.. code:: python

spark.sparkContext.setLogLevel("WARN")

Executors log level
-------------------

``sparkContext.setLogLevel`` changes only log level of Spark session on Spark **driver**.
To make Spark executor logs more verbose, perform following steps:

* Create ``log4j.properties`` file with content like this:

.. code-block:: jproperties

log4j.rootCategory=DEBUG, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

* Stop existing Spark session and create a new one with following options:

.. code-block:: python

from pyspark.sql import SparkSession

spark = (
SparkSesion.builder.config("spark.files", "file:log4j.properties").config(
"spark.executor.extraJavaOptions", "-Dlog4j.configuration=file:log4j.properties"
)
# you can apply the same logging settings to Spark driver, by uncommenting the line below
# .config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=file:log4j.properties")
.getOrCreate()
)

Each Spark executor will receive a copy of ``log4j.properties`` file during start, and load it to change own log level.
Same approach can be used for Spark driver as well, to investigate issue when Spark session cannot properly start.