diff --git a/docs/changelog/next_release/275.improvement.rst b/docs/changelog/next_release/275.improvement.rst new file mode 100644 index 000000000..5985618a0 --- /dev/null +++ b/docs/changelog/next_release/275.improvement.rst @@ -0,0 +1 @@ +Add generic ``Troubleshooting`` guide. diff --git a/docs/connection/file_df_connection/spark_s3/troubleshooting.rst b/docs/connection/file_df_connection/spark_s3/troubleshooting.rst index 2f3017547..e3474c20e 100644 --- a/docs/connection/file_df_connection/spark_s3/troubleshooting.rst +++ b/docs/connection/file_df_connection/spark_s3/troubleshooting.rst @@ -3,6 +3,10 @@ Spark S3 Troubleshooting ======================== +.. note:: + + General guide: :ref:`troubleshooting`. + More details: * `Hadoop AWS Troubleshooting Guide `_ @@ -34,12 +38,7 @@ How to determine reason Make logging more verbose ^^^^^^^^^^^^^^^^^^^^^^^^^ -Change Spark session log level to ``DEBUG`` to print result of each attempt: - -.. code:: python - - spark.sparkContext.setLogLevel("debug") - +Change Spark session log level to :ref:`DEBUG ` to print result of each attempt. Resulting logs will look like this .. dropdown:: See log @@ -171,15 +170,6 @@ Resulting logs will look like this 23/08/03 11:25:10 DEBUG request: Retrying Request: GET https://test-bucket.localhost:9000 / Parameters: ({"list-type":["2"],"delimiter":["/"],"max-keys":["2"],"prefix":["fake/"],"fetch-owner":["false"]}Headers: (amz-sdk-invocation-id: e6d62603-96e4-a80f-10a1-816e0822bc71, Content-Type: application/octet-stream, User-Agent: Hadoop 3.3.4, aws-sdk-java/1.12.262 Linux/6.4.7-1-MANJARO OpenJDK_64-Bit_Server_VM/25.292-b10 java/1.8.0_292 scala/2.12.17 vendor/AdoptOpenJDK cfg/retry-mode/legacy, ) 23/08/03 11:25:10 DEBUG AmazonHttpClient: Retriable error detected, will retry in 49ms, attempt number: 0 -After getting all information you need, make logs less verbose: - -.. code:: python - - spark.sparkContext.setLogLevel("info") - - # or - spark.sparkContext.setLogLevel("warn") - Change number of retries ^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/index.rst b/docs/index.rst index 71d3fc250..479a7c72c 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -17,6 +17,8 @@ install/index quickstart concepts + logging + troubleshooting/index .. toctree:: :maxdepth: 3 @@ -62,13 +64,6 @@ hooks/index plugins -.. toctree:: - :maxdepth: 2 - :caption: Misc - :hidden: - - logging - .. toctree:: :maxdepth: 2 :caption: Development diff --git a/docs/logging.rst b/docs/logging.rst index 6fc4eda41..1198a64d1 100644 --- a/docs/logging.rst +++ b/docs/logging.rst @@ -1,7 +1,157 @@ .. _logging: Logging -========= +======= + +Logging is quite important to understant what's going on under the hood of onETL. + +Default logging level for Python interpreters is ``WARNING``, +but most of onETL logs are in ``INFO`` level, so users usually don't see much. + +To change logging level, there is a function :obj:`setup_logging ` +which should be called at the top of the script: + +.. code:: python + + from onetl.log import setup_logging + from other.lib import some, more, imports + + setup_logging() + + # rest of code + ... + +This changes both log level and log formatting to something like this: + +.. dropdown:: See logs + + .. code:: text + + 2024-04-12 10:12:10,834 [INFO ] MainThread: |onETL| Using IncrementalStrategy as a strategy + 2024-04-12 10:12:10,835 [INFO ] MainThread: =================================== DBReader.run() starts =================================== + 2024-04-12 10:12:10,835 [INFO ] MainThread: |DBReader| Getting Spark type for HWM expression: 'updated_at' + 2024-04-12 10:12:10,836 [INFO ] MainThread: |MSSQL| Fetching schema of table 'source_schema.table' ... + 2024-04-12 10:12:11,636 [INFO ] MainThread: |MSSQL| Schema fetched. + 2024-04-12 10:12:11,642 [INFO ] MainThread: |DBReader| Got Spark field: StructField('updated_at', TimestampType(), True) + 2024-04-12 10:12:11,642 [INFO ] MainThread: |DBReader| Detected HWM type: 'ColumnDateTimeHWM' + 2024-04-12 10:12:11,643 [INFO ] MainThread: |IncrementalStrategy| Fetching HWM from HorizonHWMStore: + 2024-04-12 10:12:11,643 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb' + 2024-04-12 10:12:12,181 [INFO ] MainThread: |IncrementalStrategy| Fetched HWM: + 2024-04-12 10:12:12,182 [INFO ] MainThread: hwm = ColumnDateTimeHWM( + 2024-04-12 10:12:12,182 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb', + 2024-04-12 10:12:12,182 [INFO ] MainThread: entity = 'source_schema.table', + 2024-04-12 10:12:12,182 [INFO ] MainThread: expression = 'updated_at', + 2024-04-12 10:12:12,184 [INFO ] MainThread: value = datetime.datetime(2024, 4, 11, 18, 10, 2, 120000), + 2024-04-12 10:12:12,184 [INFO ] MainThread: ) + 2024-04-12 10:12:12,184 [INFO ] MainThread: |MSSQL| -> |Spark| Reading DataFrame from source using parameters: + 2024-04-12 10:12:12,185 [INFO ] MainThread: source = 'source_schema.table' + 2024-04-12 10:12:12,185 [INFO ] MainThread: columns = [ + 2024-04-12 10:12:12,185 [INFO ] MainThread: 'id', + 2024-04-12 10:12:12,186 [INFO ] MainThread: 'new_value', + 2024-04-12 10:12:12,186 [INFO ] MainThread: 'old_value', + 2024-04-12 10:12:12,186 [INFO ] MainThread: 'updated_at', + 2024-04-12 10:12:12,186 [INFO ] MainThread: ] + 2024-04-12 10:12:12,187 [INFO ] MainThread: where = "field = 'some'" + 2024-04-12 10:12:12,187 [INFO ] MainThread: hwm = AutoDetectHWM( + 2024-04-12 10:12:12,187 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb', + 2024-04-12 10:12:12,187 [INFO ] MainThread: entity = 'source_schema.table', + 2024-04-12 10:12:12,187 [INFO ] MainThread: expression = 'updated_at', + 2024-04-12 10:12:12,188 [INFO ] MainThread: ) + 2024-04-12 10:12:12,188 [INFO ] MainThread: options = { + 2024-04-12 10:12:12,188 [INFO ] MainThread: 'fetchsize': 100000, + 2024-04-12 10:12:12,188 [INFO ] MainThread: 'numPartitions': 1, + 2024-04-12 10:12:12,189 [INFO ] MainThread: 'partitioningMode': 'range', + 2024-04-12 10:12:12,189 [INFO ] MainThread: } + 2024-04-12 10:12:12,189 [INFO ] MainThread: |MSSQL| Checking connection availability... + 2024-04-12 10:12:12,189 [INFO ] MainThread: |MSSQL| Using connection parameters: + 2024-04-12 10:12:12,190 [INFO ] MainThread: user = 'db_user' + 2024-04-12 10:12:12,190 [INFO ] MainThread: password = SecretStr('**********') + 2024-04-12 10:12:12,190 [INFO ] MainThread: host = 'mssql.host' + 2024-04-12 10:12:12,190 [INFO ] MainThread: port = 1433 + 2024-04-12 10:12:12,191 [INFO ] MainThread: database = 'somedb' + 2024-04-12 10:12:12,191 [INFO ] MainThread: extra = {'ApplicationIntent': 'ReadOnly', 'trustServerCertificate': 'true'} + 2024-04-12 10:12:12,191 [INFO ] MainThread: jdbc_url = 'jdbc:sqlserver:/mssql.host:1433' + 2024-04-12 10:12:12,579 [INFO ] MainThread: |MSSQL| Connection is available. + 2024-04-12 10:12:12,581 [INFO ] MainThread: |MSSQL| Executing SQL query (on driver): + 2024-04-12 10:12:12,581 [INFO ] MainThread: SELECT + 2024-04-12 10:12:12,581 [INFO ] MainThread: MIN(updated_at) AS "min", + 2024-04-12 10:12:12,582 [INFO ] MainThread: MAX(updated_at) AS "max" + 2024-04-12 10:12:12,582 [INFO ] MainThread: FROM + 2024-04-12 10:12:12,582 [INFO ] MainThread: source_schema.table + 2024-04-12 10:12:12,582 [INFO ] MainThread: WHERE + 2024-04-12 10:12:12,582 [INFO ] MainThread: (field = 'some') + 2024-04-12 10:12:12,583 [INFO ] MainThread: AND + 2024-04-12 10:12:12,583 [INFO ] MainThread: (updated_at >= CAST('2024-04-11T18:10:02.120000' AS datetime2)) + 2024-04-12 10:16:22,537 [INFO ] MainThread: |MSSQL| Received values: + 2024-04-12 10:16:22,538 [INFO ] MainThread: MIN(updated_at) = datetime.datetime(2024, 4, 11, 21, 10, 7, 397000) + 2024-04-12 10:16:22,538 [INFO ] MainThread: MAX(updated_at) = datetime.datetime(2024, 4, 12, 13, 12, 2, 123000) + 2024-04-12 10:16:22,540 [INFO ] MainThread: |MSSQL| Executing SQL query (on executor): + 2024-04-12 10:16:22,540 [INFO ] MainThread: SELECT + 2024-04-12 10:16:22,540 [INFO ] MainThread: id, + 2024-04-12 10:16:22,541 [INFO ] MainThread: new_value, + 2024-04-12 10:16:22,541 [INFO ] MainThread: old_value, + 2024-04-12 10:16:22,541 [INFO ] MainThread: updated_at + 2024-04-12 10:16:22,541 [INFO ] MainThread: FROM + 2024-04-12 10:16:22,541 [INFO ] MainThread: source_schema.table + 2024-04-12 10:16:22,542 [INFO ] MainThread: WHERE + 2024-04-12 10:16:22,542 [INFO ] MainThread: (field = 'some') + 2024-04-12 10:16:22,542 [INFO ] MainThread: AND + 2024-04-12 10:16:22,542 [INFO ] MainThread: (updated_at > CAST('2024-04-11T18:10:02.120000' AS datetime2)) + 2024-04-12 10:16:22,542 [INFO ] MainThread: AND + 2024-04-12 10:16:22,542 [INFO ] MainThread: (updated_at <= CAST('2024-04-12T13:12:02.123000' AS datetime2)) + 2024-04-12 10:16:22,892 [INFO ] MainThread: |Spark| DataFrame successfully created from SQL statement + 2024-04-12 10:16:22,892 [INFO ] MainThread: ------------------------------------ DBReader.run() ends ------------------------------------ + 2024-04-12 10:40:42,409 [INFO ] MainThread: =================================== DBWriter.run() starts =================================== + 2024-04-12 10:40:42,409 [INFO ] MainThread: |Spark| -> |Hive| Writing DataFrame to target using parameters: + 2024-04-12 10:40:42,410 [INFO ] MainThread: target = 'target_source_schema.table' + 2024-04-12 10:40:42,410 [INFO ] MainThread: options = { + 2024-04-12 10:40:42,410 [INFO ] MainThread: 'mode': 'append', + 2024-04-12 10:40:42,410 [INFO ] MainThread: 'format': 'orc', + 2024-04-12 10:40:42,410 [INFO ] MainThread: 'partitionBy': 'part_dt', + 2024-04-12 10:40:42,410 [INFO ] MainThread: } + 2024-04-12 10:40:42,411 [INFO ] MainThread: df_schema: + 2024-04-12 10:40:42,412 [INFO ] MainThread: root + 2024-04-12 10:40:42,412 [INFO ] MainThread: |-- id: integer (nullable = true) + 2024-04-12 10:40:42,413 [INFO ] MainThread: |-- new_value: string (nullable = true) + 2024-04-12 10:40:42,413 [INFO ] MainThread: |-- old_value: string (nullable = true) + 2024-04-12 10:40:42,413 [INFO ] MainThread: |-- updated_at: timestamp (nullable = true) + 2024-04-12 10:40:42,413 [INFO ] MainThread: |-- part_dt: date (nullable = true) + 2024-04-12 10:40:42,414 [INFO ] MainThread: + 2024-04-12 10:40:42,421 [INFO ] MainThread: |Hive| Checking connection availability... + 2024-04-12 10:40:42,421 [INFO ] MainThread: |Hive| Using connection parameters: + 2024-04-12 10:40:42,421 [INFO ] MainThread: cluster = 'dwh' + 2024-04-12 10:40:42,475 [INFO ] MainThread: |Hive| Connection is available. + 2024-04-12 10:40:42,476 [INFO ] MainThread: |Hive| Fetching schema of table 'target_source_schema.table' ... + 2024-04-12 10:40:43,518 [INFO ] MainThread: |Hive| Schema fetched. + 2024-04-12 10:40:43,521 [INFO ] MainThread: |Hive| Table 'target_source_schema.table' already exists + 2024-04-12 10:40:43,521 [WARNING ] MainThread: |Hive| User-specified options {'partitionBy': 'part_dt'} are ignored while inserting into existing table. Using only table parameters from Hive metastore + 2024-04-12 10:40:43,782 [INFO ] MainThread: |Hive| Inserting data into existing table 'target_source_schema.table' ... + 2024-04-12 11:06:07,396 [INFO ] MainThread: |Hive| Data is successfully inserted into table 'target_source_schema.table'. + 2024-04-12 11:06:07,397 [INFO ] MainThread: ------------------------------------ DBWriter.run() ends ------------------------------------ + 2024-04-12 11:06:07,397 [INFO ] MainThread: |onETL| Exiting IncrementalStrategy + 2024-04-12 11:06:07,397 [INFO ] MainThread: |IncrementalStrategy| Saving HWM to 'HorizonHWMStore': + 2024-04-12 11:06:07,397 [INFO ] MainThread: hwm = ColumnDateTimeHWM( + 2024-04-12 11:06:07,397 [INFO ] MainThread: name = 'updated_at#source_schema.table@mssql:/mssql.host:1433/somedb', + 2024-04-12 11:06:07,397 [INFO ] MainThread: entity = 'source_source_schema.table', + 2024-04-12 11:06:07,397 [INFO ] MainThread: expression = 'updated_at', + 2024-04-12 11:06:07,397 [INFO ] MainThread: value = datetime.datetime(2024, 4, 12, 13, 12, 2, 123000), + 2024-04-12 11:06:07,397 [INFO ] MainThread: ) + 2024-04-12 11:06:07,495 [INFO ] MainThread: |IncrementalStrategy| HWM has been saved + +Each step performed by onETL is extensively logged, which should help with debugging. + +You can make logs even more verbose by changing level to ``DEBUG``: + +.. code:: python + + from onetl.log import setup_logging + + setup_logging(level="DEBUG", enable_clients=True) + + # rest of code + ... + +This also changes log level for all underlying Python libraries, e.g. showing each HTTP request being made, and so on. .. currentmodule:: onetl.log diff --git a/docs/troubleshooting/index.rst b/docs/troubleshooting/index.rst new file mode 100644 index 000000000..536170561 --- /dev/null +++ b/docs/troubleshooting/index.rst @@ -0,0 +1,25 @@ +.. _troubleshooting: + +Troubleshooting +=============== + +In case of error please follow instructions below: + +* Read the logs or exception messages you've faced with. + * If Python logs are note verbose enough, :ref:`increase the log level `. + * If Spark logs are note verbose enough, :ref:`increase the log level `. +* Read documentation related to a class or method you are using. +* `Google `_ the error message, and carefully read the search result: + * `StackOverflow `_ answers. + * `Spark `_ documentation. + * Documentation of database or filesystem you are connecting to. + * Documentation of underlying connector. +* Search for known `issues `_, or create a new one. +* Always use the most resent versions of onETL, PySpark and connector packages, :ref:`compatible with your environment `. + +.. toctree:: + :maxdepth: 3 + :caption: Troubleshooting + :hidden: + + spark diff --git a/docs/troubleshooting/spark.rst b/docs/troubleshooting/spark.rst new file mode 100644 index 000000000..0c1a20eb9 --- /dev/null +++ b/docs/troubleshooting/spark.rst @@ -0,0 +1,79 @@ +.. _troubleshooting-spark: + +Spark Troubleshooting +===================== + +Restarting Spark session +------------------------ + +Sometimes it is required to stop current Spark session and start a new one, e.g. to add some .jar packages, or change session config. +But PySpark not only starts Spark session, but also starts Java virtual machine (JVM) process in the background, +which. So calling ``sparkSession.stop()`` `does not shutdown JVM `_, +and this can cause some issue. + +Also apart from JVM properties, stopping Spark session does not clear Spark context, which is a global object. So new +Spark sessions are created using the same context object, and thus using the same Spark config options. + +To properly stop Spark session, it is **required** to: +* Stop Spark session by calling ``sparkSession.stop()``. +* **STOP PYTHON INTERPRETER**, e.g. by calling ``sys.exit()``. +* Start new Python interpreter. +* Start new Spark session with config options you need. + +Skipping some of these steps can lead to issues with creating new Spark session. + +Driver log level +---------------- + +Default logging level for Spark session is ``WARN``. To show more verbose logs, use: + +.. code:: python + + spark.sparkContext.setLogLevel("INFO") + +or increase verbosity even more: + +.. code:: python + + spark.sparkContext.setLogLevel("DEBUG") + +After getting all information you need, you can return back the previous log level: + +.. code:: python + + spark.sparkContext.setLogLevel("WARN") + +Executors log level +------------------- + +``sparkContext.setLogLevel`` changes only log level of Spark session on Spark **driver**. +To make Spark executor logs more verbose, perform following steps: + +* Create ``log4j.properties`` file with content like this: + + .. code-block:: jproperties + + log4j.rootCategory=DEBUG, console + + log4j.appender.console=org.apache.log4j.ConsoleAppender + log4j.appender.console.target=System.err + log4j.appender.console.layout=org.apache.log4j.PatternLayout + log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +* Stop existing Spark session and create a new one with following options: + + .. code-block:: python + + from pyspark.sql import SparkSession + + spark = ( + SparkSesion.builder.config("spark.files", "file:log4j.properties").config( + "spark.executor.extraJavaOptions", "-Dlog4j.configuration=file:log4j.properties" + ) + # you can apply the same logging settings to Spark driver, by uncommenting the line below + # .config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=file:log4j.properties") + .getOrCreate() + ) + +Each Spark executor will receive a copy of ``log4j.properties`` file during start, and load it to change own log level. +Same approach can be used for Spark driver as well, to investigate issue when Spark session cannot properly start.