diff --git a/.github/workflows/data/file-df/ignored.txt b/.github/workflows/data/file-df/ignored.txt new file mode 100644 index 000000000..d8f8d4692 --- /dev/null +++ b/.github/workflows/data/file-df/ignored.txt @@ -0,0 +1 @@ +docs diff --git a/.github/workflows/data/file-df/tracked.txt b/.github/workflows/data/file-df/tracked.txt new file mode 100644 index 000000000..6aa7c2165 --- /dev/null +++ b/.github/workflows/data/file-df/tracked.txt @@ -0,0 +1,5 @@ +.github/workflows/data/file-df/** +onetl/file_df_connection/spark_file_df_connection.py +onetl/file/file_reader/** +onetl/file/file_writer/** +onetl/file/__init__.py diff --git a/.github/workflows/data/file/tracked.txt b/.github/workflows/data/file/tracked.txt index 94d33b151..b5772adf4 100644 --- a/.github/workflows/data/file/tracked.txt +++ b/.github/workflows/data/file/tracked.txt @@ -1,3 +1,10 @@ .github/workflows/data/file/** onetl/file_connection/file_connection.py -onetl/file/** +onetl/file/file_downloader/** +onetl/file/file_mover/** +onetl/file/file_uploader/** +onetl/file/filter/** +onetl/file/limit/** +onetl/file/file_result.py +onetl/file/file_set.py +onetl/file/__init__.py diff --git a/.github/workflows/data/local-fs/ignored.txt b/.github/workflows/data/local-fs/ignored.txt new file mode 100644 index 000000000..d8f8d4692 --- /dev/null +++ b/.github/workflows/data/local-fs/ignored.txt @@ -0,0 +1 @@ +docs diff --git a/.github/workflows/data/local-fs/matrix.yml b/.github/workflows/data/local-fs/matrix.yml new file mode 100644 index 000000000..567832c59 --- /dev/null +++ b/.github/workflows/data/local-fs/matrix.yml @@ -0,0 +1,27 @@ +min: &min + spark-version: 2.3.1 + python-version: '3.7' + java-version: 8 + os: ubuntu-latest + +max: &max + spark-version: 3.4.1 + python-version: '3.11' + java-version: 17 + os: ubuntu-latest + +latest: &latest + spark-version: latest + python-version: '3.11' + java-version: 20 + os: ubuntu-latest + +matrix: + small: + - <<: *max + full: + - <<: *min + - <<: *max + nightly: + - <<: *min + - <<: *latest diff --git a/.github/workflows/data/local-fs/tracked.txt b/.github/workflows/data/local-fs/tracked.txt new file mode 100644 index 000000000..c763aed1e --- /dev/null +++ b/.github/workflows/data/local-fs/tracked.txt @@ -0,0 +1 @@ +**/*local_fs* diff --git a/.github/workflows/get-matrix.yml b/.github/workflows/get-matrix.yml index 25dff6f98..d487e64e9 100644 --- a/.github/workflows/get-matrix.yml +++ b/.github/workflows/get-matrix.yml @@ -17,6 +17,8 @@ on: value: ${{ jobs.get-matrix.outputs.matrix-hive }} matrix-kafka: value: ${{ jobs.get-matrix.outputs.matrix-kafka }} + matrix-local-fs: + value: ${{ jobs.get-matrix.outputs.matrix-local-fs }} matrix-mongodb: value: ${{ jobs.get-matrix.outputs.matrix-mongodb }} matrix-mssql: @@ -55,6 +57,7 @@ jobs: matrix-greenplum: ${{ toJson(fromJson(steps.matrix-greenplum.outputs.result)[steps.key-greenplum.outputs.key]) }} matrix-hive: ${{ toJson(fromJson(steps.matrix-hive.outputs.result)[steps.key-hive.outputs.key]) }} matrix-kafka: ${{ toJson(fromJson(steps.matrix-kafka.outputs.result)[steps.key-kafka.outputs.key]) }} + matrix-local-fs: ${{ toJson(fromJson(steps.matrix-local-fs.outputs.result)[steps.key-local-fs.outputs.key]) }} matrix-mongodb: ${{ toJson(fromJson(steps.matrix-mongodb.outputs.result)[steps.key-mongodb.outputs.key]) }} matrix-mssql: ${{ toJson(fromJson(steps.matrix-mssql.outputs.result)[steps.key-mssql.outputs.key]) }} matrix-mysql: ${{ toJson(fromJson(steps.matrix-mysql.outputs.result)[steps.key-mysql.outputs.key]) }} @@ -111,6 +114,17 @@ jobs: run: | echo '${{ steps.changed-file.outputs.all_changed_files }}' + - name: Check if file-df-related files are changed + id: changed-file-df + uses: tj-actions/changed-files@v35 + with: + files_from_source_file: .github/workflows/data/file-df/tracked.txt + files_ignore_from_source_file: .github/workflows/data/file-df/ignored.txt + + - name: Print file-df-related files changed + run: | + echo '${{ steps.changed-file-df.outputs.all_changed_files }}' + - name: Check if core files are changed id: changed-core uses: tj-actions/changed-files@v35 @@ -261,6 +275,36 @@ jobs: with: cmd: yq -o=json '.matrix' .github/workflows/data/kafka/matrix.yml + - name: Check if LocalFS files are changed + id: changed-local-fs + uses: tj-actions/changed-files@v35 + with: + files_from_source_file: .github/workflows/data/local-fs/tracked.txt + files_ignore_from_source_file: .github/workflows/data/local-fs/ignored.txt + + - name: Print LocalFS files changed + run: | + echo '${{ steps.changed-local-fs.outputs.all_changed_files }}' + + - name: Calculate LocalFS matrix key + id: key-local-fs + run: | + if ${{ inputs.nightly }}; then + key=nightly + elif ${{ steps.changed-base.outputs.any_changed }} || ${{ steps.changed-file-df.outputs.any_changed }} || ${{ steps.changed-local-fs.outputs.any_changed }}; then + key=full + else + key=small + fi + echo key=$key + echo key=$key >> $GITHUB_OUTPUT + + - name: Get LocalFS matrix + id: matrix-local-fs + uses: mikefarah/yq@v4.33.3 + with: + cmd: yq -o=json '.matrix' .github/workflows/data/local-fs/matrix.yml + - name: Check if MongoDB files are changed id: changed-mongodb uses: tj-actions/changed-files@v35 diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index e3111135a..b95dbf69e 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -106,6 +106,22 @@ jobs: os: ${{ matrix.os }} with-cache: false + tests-local-fs: + name: Run LocalFS tests (spark=${{ matrix.spark-version }}, java=${{ matrix.java-version }}, python=${{ matrix.python-version }}, os=${{ matrix.os }}) + needs: [get-matrix] + strategy: + fail-fast: false + matrix: + include: ${{ fromJson(needs.get-matrix.outputs.matrix-local-fs) }} + + uses: ./.github/workflows/test-local-fs.yml + with: + spark-version: ${{ matrix.spark-version }} + java-version: ${{ matrix.java-version }} + python-version: ${{ matrix.python-version }} + os: ${{ matrix.os }} + with-cache: false + tests-mongodb: name: Run MongoDB tests (server=${{ matrix.mongodb-version }}, spark=${{ matrix.spark-version }}, java=${{ matrix.java-version }}, python=${{ matrix.python-version }}, os=${{ matrix.os }}) needs: [get-matrix] @@ -306,6 +322,7 @@ jobs: - tests-clickhouse - tests-hive - tests-kafka + - tests-local-fs - tests-mongodb - tests-mssql - tests-mysql diff --git a/.github/workflows/test-clickhouse.yml b/.github/workflows/test-clickhouse.yml index 21c6adb7d..bb36c5b0d 100644 --- a/.github/workflows/test-clickhouse.yml +++ b/.github/workflows/test-clickhouse.yml @@ -58,9 +58,9 @@ jobs: if: inputs.with-cache with: path: ~/.ivy2 - key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py') }} + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} restore-keys: | - ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse- - name: Cache pip diff --git a/.github/workflows/test-core.yml b/.github/workflows/test-core.yml index b3fb61c2d..11caaa854 100644 --- a/.github/workflows/test-core.yml +++ b/.github/workflows/test-core.yml @@ -44,9 +44,9 @@ jobs: if: inputs.with-cache with: path: ~/.ivy2 - key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py') }} + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} restore-keys: | - ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse- - name: Cache pip diff --git a/.github/workflows/test-greenplum.yml b/.github/workflows/test-greenplum.yml index 289512ead..0a7812ec9 100644 --- a/.github/workflows/test-greenplum.yml +++ b/.github/workflows/test-greenplum.yml @@ -60,9 +60,9 @@ jobs: if: inputs.with-cache with: path: ~/.ivy2 - key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-greenplum-${{ hashFiles('onetl/connection/db_connection/*.py') }} + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-greenplum-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} restore-keys: | - ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-greenplum-${{ hashFiles('onetl/connection/db_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-greenplum-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-greenplum- - name: Cache pip diff --git a/.github/workflows/test-hive.yml b/.github/workflows/test-hive.yml index 3c578908a..939fee079 100644 --- a/.github/workflows/test-hive.yml +++ b/.github/workflows/test-hive.yml @@ -44,9 +44,9 @@ jobs: if: inputs.with-cache with: path: ~/.ivy2 - key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-hive-${{ hashFiles('onetl/connection/db_connection/*.py') }} + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-hive-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} restore-keys: | - ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-hive-${{ hashFiles('onetl/connection/db_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-hive-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-hive- - name: Cache pip diff --git a/.github/workflows/test-kafka.yml b/.github/workflows/test-kafka.yml index 7ce51789f..480f7224e 100644 --- a/.github/workflows/test-kafka.yml +++ b/.github/workflows/test-kafka.yml @@ -71,9 +71,9 @@ jobs: if: inputs.with-cache with: path: ~/.ivy2 - key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-kafka-${{ hashFiles('onetl/connection/db_connection/*.py') }} + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-kafka-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} restore-keys: | - ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-kafka-${{ hashFiles('onetl/connection/db_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-kafka-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-kafka- - name: Cache pip diff --git a/.github/workflows/test-local-fs.yml b/.github/workflows/test-local-fs.yml new file mode 100644 index 000000000..57873a5c9 --- /dev/null +++ b/.github/workflows/test-local-fs.yml @@ -0,0 +1,80 @@ +name: Tests for LocalFS +on: + workflow_call: + inputs: + spark-version: + required: true + type: string + java-version: + required: true + type: string + python-version: + required: true + type: string + os: + required: true + type: string + with-cache: + required: false + type: boolean + default: true + +jobs: + test-local-fs: + name: Run LocalFS tests (spark=${{ inputs.spark-version }}, java=${{ inputs.java-version }}, python=${{ inputs.python-version }}, os=${{ inputs.os }}) + runs-on: ${{ inputs.os }} + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Java ${{ inputs.java-version }} + uses: actions/setup-java@v3 + with: + distribution: temurin + java-version: ${{ inputs.java-version }} + + - name: Set up Python ${{ inputs.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ inputs.python-version }} + + - name: Cache Ivy + uses: actions/cache@v3 + if: inputs.with-cache + with: + path: ~/.ivy2 + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-local-fs-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} + restore-keys: | + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-local-fs-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-local-fs- + + - name: Cache pip + uses: actions/cache@v3 + if: inputs.with-cache + with: + path: ~/.cache/pip + key: ${{ runner.os }}-python-${{ inputs.python-version }}-tests-local-fs-${{ hashFiles('requirements/core.txt', 'requirements/tests/base.txt', 'requirements/tests/spark-*.txt') }} + restore-keys: | + ${{ runner.os }}-python-${{ inputs.python-version }}-tests-local-fs-${{ hashFiles('requirements/core.txt', 'requirements/tests/base.txt', 'requirements/tests/spark-*.txt') }} + ${{ runner.os }}-python-${{ inputs.python-version }}-tests-local-fs- + + - name: Upgrade pip + run: python -m pip install --upgrade pip setuptools wheel + + - name: Install dependencies + run: | + pip install -I -r requirements/core.txt -r requirements/tests/base.txt -r requirements/tests/spark-${{ inputs.spark-version }}.txt + + - name: Run tests + run: | + mkdir reports/ || echo "Directory exists" + sed '/^$/d' ./.env.local | sed '/^#/d' | sed 's/^/export /' > ./env + source ./env + ./pytest_runner.sh -m local_fs + + - name: Upload coverage results + uses: actions/upload-artifact@v3 + with: + name: local-fs-spark-${{ inputs.spark-version }}-python-${{ inputs.python-version }}-os-${{ inputs.os }} + path: reports/* diff --git a/.github/workflows/test-mongodb.yml b/.github/workflows/test-mongodb.yml index 51ee269d7..0d9d80cca 100644 --- a/.github/workflows/test-mongodb.yml +++ b/.github/workflows/test-mongodb.yml @@ -56,9 +56,9 @@ jobs: if: inputs.with-cache with: path: ~/.ivy2 - key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mongodb-${{ hashFiles('onetl/connection/db_connection/*.py') }} + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mongodb-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} restore-keys: | - ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mongodb-${{ hashFiles('onetl/connection/db_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mongodb-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mongodb- - name: Cache pip diff --git a/.github/workflows/test-mssql.yml b/.github/workflows/test-mssql.yml index a50ba363f..b84c2f7f1 100644 --- a/.github/workflows/test-mssql.yml +++ b/.github/workflows/test-mssql.yml @@ -59,9 +59,9 @@ jobs: if: inputs.with-cache with: path: ~/.ivy2 - key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mssql-${{ hashFiles('onetl/connection/db_connection/*.py') }} + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mssql-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} restore-keys: | - ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mssql-${{ hashFiles('onetl/connection/db_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mssql-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mssql- - name: Cache pip diff --git a/.github/workflows/test-mysql.yml b/.github/workflows/test-mysql.yml index 433304c31..c7d4937b5 100644 --- a/.github/workflows/test-mysql.yml +++ b/.github/workflows/test-mysql.yml @@ -58,9 +58,9 @@ jobs: if: inputs.with-cache with: path: ~/.ivy2 - key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mysql-${{ hashFiles('onetl/connection/db_connection/*.py') }} + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mysql-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} restore-keys: | - ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mysql-${{ hashFiles('onetl/connection/db_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mysql-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mysql- - name: Cache pip diff --git a/.github/workflows/test-oracle.yml b/.github/workflows/test-oracle.yml index 23e74f0c4..213d555cd 100644 --- a/.github/workflows/test-oracle.yml +++ b/.github/workflows/test-oracle.yml @@ -61,9 +61,9 @@ jobs: if: inputs.with-cache with: path: ~/.ivy2 - key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-oracle-${{ hashFiles('onetl/connection/db_connection/*.py') }} + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-oracle-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} restore-keys: | - ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-oracle-${{ hashFiles('onetl/connection/db_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-oracle-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-oracle- - name: Cache pip diff --git a/.github/workflows/test-postgres.yml b/.github/workflows/test-postgres.yml index cc6cb49f9..819d3f533 100644 --- a/.github/workflows/test-postgres.yml +++ b/.github/workflows/test-postgres.yml @@ -57,9 +57,9 @@ jobs: if: inputs.with-cache with: path: ~/.ivy2 - key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-postgres-${{ hashFiles('onetl/connection/db_connection/*.py') }} + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-postgres-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} restore-keys: | - ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-postgres-${{ hashFiles('onetl/connection/db_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-postgres-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-postgres- - name: Cache pip diff --git a/.github/workflows/test-teradata.yml b/.github/workflows/test-teradata.yml index 988816edc..31ec3712b 100644 --- a/.github/workflows/test-teradata.yml +++ b/.github/workflows/test-teradata.yml @@ -44,9 +44,9 @@ jobs: if: inputs.with-cache with: path: ~/.ivy2 - key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-teradata-${{ hashFiles('onetl/connection/db_connection/*.py') }} + key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-teradata-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} restore-keys: | - ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-teradata-${{ hashFiles('onetl/connection/db_connection/*.py') }} + ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-teradata-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }} ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-teradata- - name: Cache pip diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 20b38a27d..8bb039bf7 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -102,6 +102,21 @@ jobs: python-version: ${{ matrix.python-version }} os: ${{ matrix.os }} + tests-local-fs: + name: Run LocalFS tests (spark=${{ matrix.spark-version }}, java=${{ matrix.java-version }}, python=${{ matrix.python-version }}, os=${{ matrix.os }}) + needs: [get-matrix] + strategy: + fail-fast: false + matrix: + include: ${{ fromJson(needs.get-matrix.outputs.matrix-local-fs) }} + + uses: ./.github/workflows/test-local-fs.yml + with: + spark-version: ${{ matrix.spark-version }} + java-version: ${{ matrix.java-version }} + python-version: ${{ matrix.python-version }} + os: ${{ matrix.os }} + tests-mongodb: name: Run MongoDB tests (server=${{ matrix.mongodb-version }}, spark=${{ matrix.spark-version }}, java=${{ matrix.java-version }}, python=${{ matrix.python-version }}, os=${{ matrix.os }}) needs: [get-matrix] @@ -290,6 +305,7 @@ jobs: - tests-clickhouse - tests-hive - tests-kafka + - tests-local-fs - tests-mongodb - tests-mssql - tests-mysql diff --git a/README.rst b/README.rst index eebb0c62e..a7ad2fb09 100644 --- a/README.rst +++ b/README.rst @@ -58,39 +58,41 @@ Requirements Supported storages ------------------ -+------------+------------+----------------------------------------------------------------------------------------------------------+ -| Type | Storage | Powered by | -+============+============+==========================================================================================================+ -| Database | Clickhouse | Apache Spark `JDBC Data Source `_ | -+ +------------+ + -| | MSSQL | | -+ +------------+ + -| | MySQL | | -+ +------------+ + -| | Postgres | | -+ +------------+ + -| | Oracle | | -+ +------------+ + -| | Teradata | | -+ +------------+----------------------------------------------------------------------------------------------------------+ -| | Hive | Apache Spark `Hive integration `_ | -+ +------------+----------------------------------------------------------------------------------------------------------+ -| | Greenplum | Pivotal `Greenplum Spark connector `_ | -+ +------------+----------------------------------------------------------------------------------------------------------+ -| | MongoDB | `MongoDB Spark connector `_ | -+------------+------------+----------------------------------------------------------------------------------------------------------+ -| File | HDFS | `HDFS Python client `_ | -+ +------------+----------------------------------------------------------------------------------------------------------+ -| | S3 | `minio-py client `_ | -+ +------------+----------------------------------------------------------------------------------------------------------+ -| | SFTP | `Paramiko library `_ | -+ +------------+----------------------------------------------------------------------------------------------------------+ -| | FTP | `FTPUtil library `_ | -+ +------------+ + -| | FTPS | | -+ +------------+----------------------------------------------------------------------------------------------------------+ -| | WebDAV | `WebdavClient3 library `_ | -+------------+------------+----------------------------------------------------------------------------------------------------------+ ++--------------------+--------------+--------------------------------------------------------------------------------------------------------------+ +| Type | Storage | Powered by | ++====================+==============+==============================================================================================================+ +| Database | Clickhouse | Apache Spark `JDBC Data Source `_ | ++ +--------------+ + +| | MSSQL | | ++ +--------------+ + +| | MySQL | | ++ +--------------+ + +| | Postgres | | ++ +--------------+ + +| | Oracle | | ++ +--------------+ + +| | Teradata | | ++ +--------------+--------------------------------------------------------------------------------------------------------------+ +| | Hive | Apache Spark `Hive integration `_ | ++ +--------------+--------------------------------------------------------------------------------------------------------------+ +| | Greenplum | Pivotal `Greenplum Spark connector `_ | ++ +--------------+--------------------------------------------------------------------------------------------------------------+ +| | MongoDB | `MongoDB Spark connector `_ | ++--------------------+--------------+--------------------------------------------------------------------------------------------------------------+ +| File | HDFS | `HDFS Python client `_ | ++ +--------------+--------------------------------------------------------------------------------------------------------------+ +| | S3 | `minio-py client `_ | ++ +--------------+--------------------------------------------------------------------------------------------------------------+ +| | SFTP | `Paramiko library `_ | ++ +--------------+--------------------------------------------------------------------------------------------------------------+ +| | FTP | `FTPUtil library `_ | ++ +--------------+ + +| | FTPS | | ++ +--------------+--------------------------------------------------------------------------------------------------------------+ +| | WebDAV | `WebdavClient3 library `_ | ++--------------------+--------------+--------------------------------------------------------------------------------------------------------------+ +| Files as DataFrame | SparkLocalFS | Apache Spark `File Data Source `_ | ++--------------------+--------------+--------------------------------------------------------------------------------------------------------------+ .. documentation diff --git a/conftest.py b/conftest.py index f9c9e4e9d..f18970697 100644 --- a/conftest.py +++ b/conftest.py @@ -433,6 +433,24 @@ def spark_mock() -> SparkSession: return spark +@pytest.fixture( + scope="function", + params=[ + pytest.param("yarn", marks=[pytest.mark.db_connection, pytest.mark.connection]), + pytest.param("k8s", marks=[pytest.mark.db_connection, pytest.mark.connection]), + ], +) +def spark_cluster_mock(request) -> SparkSession: + from pyspark.sql import SparkSession + + spark = Mock(spec=SparkSession) + spark.sparkContext = Mock() + spark.sparkContext.appName = "abc" + spark.conf = Mock() + spark.conf.get = Mock(return_value=request.param) + return spark + + @pytest.fixture() def processing(request, spark): processing_classes = { diff --git a/docs/changelog/next_release/67.improvement.rst b/docs/changelog/next_release/67.improvement.rst new file mode 100644 index 000000000..76cd7bdaf --- /dev/null +++ b/docs/changelog/next_release/67.improvement.rst @@ -0,0 +1 @@ +Remove redundant checks for driver availability in Greenplum and MongoDB connections. diff --git a/docs/db_connection/clickhouse.rst b/docs/connection/db_connection/clickhouse.rst similarity index 100% rename from docs/db_connection/clickhouse.rst rename to docs/connection/db_connection/clickhouse.rst diff --git a/docs/db_connection/greenplum/greenplum.rst b/docs/connection/db_connection/greenplum/greenplum.rst similarity index 100% rename from docs/db_connection/greenplum/greenplum.rst rename to docs/connection/db_connection/greenplum/greenplum.rst diff --git a/docs/db_connection/greenplum/index.rst b/docs/connection/db_connection/greenplum/index.rst similarity index 100% rename from docs/db_connection/greenplum/index.rst rename to docs/connection/db_connection/greenplum/index.rst diff --git a/docs/db_connection/greenplum/prerequisites.rst b/docs/connection/db_connection/greenplum/prerequisites.rst similarity index 100% rename from docs/db_connection/greenplum/prerequisites.rst rename to docs/connection/db_connection/greenplum/prerequisites.rst diff --git a/docs/db_connection/hive.rst b/docs/connection/db_connection/hive.rst similarity index 100% rename from docs/db_connection/hive.rst rename to docs/connection/db_connection/hive.rst diff --git a/docs/db_connection/index.rst b/docs/connection/db_connection/index.rst similarity index 100% rename from docs/db_connection/index.rst rename to docs/connection/db_connection/index.rst diff --git a/docs/db_connection/mongodb.rst b/docs/connection/db_connection/mongodb.rst similarity index 100% rename from docs/db_connection/mongodb.rst rename to docs/connection/db_connection/mongodb.rst diff --git a/docs/db_connection/mssql.rst b/docs/connection/db_connection/mssql.rst similarity index 100% rename from docs/db_connection/mssql.rst rename to docs/connection/db_connection/mssql.rst diff --git a/docs/db_connection/mysql.rst b/docs/connection/db_connection/mysql.rst similarity index 100% rename from docs/db_connection/mysql.rst rename to docs/connection/db_connection/mysql.rst diff --git a/docs/db_connection/oracle.rst b/docs/connection/db_connection/oracle.rst similarity index 100% rename from docs/db_connection/oracle.rst rename to docs/connection/db_connection/oracle.rst diff --git a/docs/db_connection/postgres.rst b/docs/connection/db_connection/postgres.rst similarity index 100% rename from docs/db_connection/postgres.rst rename to docs/connection/db_connection/postgres.rst diff --git a/docs/db_connection/teradata.rst b/docs/connection/db_connection/teradata.rst similarity index 100% rename from docs/db_connection/teradata.rst rename to docs/connection/db_connection/teradata.rst diff --git a/docs/file_connection/ftp.rst b/docs/connection/file_connection/ftp.rst similarity index 100% rename from docs/file_connection/ftp.rst rename to docs/connection/file_connection/ftp.rst diff --git a/docs/file_connection/ftps.rst b/docs/connection/file_connection/ftps.rst similarity index 100% rename from docs/file_connection/ftps.rst rename to docs/connection/file_connection/ftps.rst diff --git a/docs/file_connection/hdfs.rst b/docs/connection/file_connection/hdfs.rst similarity index 100% rename from docs/file_connection/hdfs.rst rename to docs/connection/file_connection/hdfs.rst diff --git a/docs/file_connection/index.rst b/docs/connection/file_connection/index.rst similarity index 100% rename from docs/file_connection/index.rst rename to docs/connection/file_connection/index.rst diff --git a/docs/file_connection/s3.rst b/docs/connection/file_connection/s3.rst similarity index 100% rename from docs/file_connection/s3.rst rename to docs/connection/file_connection/s3.rst diff --git a/docs/file_connection/sftp.rst b/docs/connection/file_connection/sftp.rst similarity index 100% rename from docs/file_connection/sftp.rst rename to docs/connection/file_connection/sftp.rst diff --git a/docs/file_connection/webdav.rst b/docs/connection/file_connection/webdav.rst similarity index 100% rename from docs/file_connection/webdav.rst rename to docs/connection/file_connection/webdav.rst diff --git a/docs/connection/file_df_connection/index.rst b/docs/connection/file_df_connection/index.rst new file mode 100644 index 000000000..62775c398 --- /dev/null +++ b/docs/connection/file_df_connection/index.rst @@ -0,0 +1,10 @@ +.. _file-df-connections: + +File DataFrame Connections +========================== + +.. toctree:: + :maxdepth: 1 + :caption: File DataFrame Connections + + Spark LocalFS diff --git a/docs/connection/file_df_connection/spark_local_fs.rst b/docs/connection/file_df_connection/spark_local_fs.rst new file mode 100644 index 000000000..48f81450a --- /dev/null +++ b/docs/connection/file_df_connection/spark_local_fs.rst @@ -0,0 +1,9 @@ +.. _spark-local-fs: + +Spark LocalFS +============= + +.. currentmodule:: onetl.connection.file_df_connection.spark_local_fs + +.. autoclass:: SparkLocalFS + :members: check diff --git a/docs/connection/index.rst b/docs/connection/index.rst new file mode 100644 index 000000000..454f6ad1a --- /dev/null +++ b/docs/connection/index.rst @@ -0,0 +1,19 @@ +.. _connection: + +.. toctree:: + :maxdepth: 2 + :caption: DB Connection + + db_connection/index + +.. toctree:: + :maxdepth: 2 + :caption: File Connection + + file_connection/index + +.. toctree:: + :maxdepth: 2 + :caption: File DataFrame Connection + + file_df_connection/index diff --git a/docs/index.rst b/docs/index.rst index acb051af8..f2693eaa7 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -19,13 +19,11 @@ concepts .. toctree:: - :maxdepth: 2 + :maxdepth: 3 :caption: Connection - :glob: :hidden: - db_connection/index - file_connection/index + connection/index .. toctree:: :maxdepth: 3 diff --git a/onetl/base/base_connection.py b/onetl/base/base_connection.py index 6094bb4e6..e67af2ae0 100644 --- a/onetl/base/base_connection.py +++ b/onetl/base/base_connection.py @@ -13,6 +13,9 @@ # limitations under the License. from abc import ABC, abstractmethod +from typing import TypeVar + +T = TypeVar("T") class BaseConnection(ABC): @@ -21,7 +24,7 @@ class BaseConnection(ABC): """ @abstractmethod - def check(self): + def check(self: T) -> T: """Check source availability. |support_hooks| If not, an exception will be raised. diff --git a/onetl/connection/__init__.py b/onetl/connection/__init__.py index a267a9d8c..d334cb8f7 100644 --- a/onetl/connection/__init__.py +++ b/onetl/connection/__init__.py @@ -18,6 +18,9 @@ from onetl.connection.db_connection.db_connection import DBConnection from onetl.connection.file_connection.file_connection import FileConnection +from onetl.connection.file_df_connection.spark_file_df_connection import ( + SparkFileDFConnection, +) if TYPE_CHECKING: from onetl.connection.db_connection.clickhouse import Clickhouse @@ -36,6 +39,7 @@ from onetl.connection.file_connection.s3 import S3 from onetl.connection.file_connection.sftp import SFTP from onetl.connection.file_connection.webdav import WebDAV + from onetl.connection.file_df_connection.spark_local_fs import SparkLocalFS db_connection_modules = { "Clickhouse": "clickhouse", @@ -59,6 +63,10 @@ "WebDAV": "webdav", } +file_df_connections_modules = { + "SparkLocalFS": "spark_local_fs", +} + def __getattr__(name: str): if name in db_connection_modules: @@ -69,4 +77,8 @@ def __getattr__(name: str): submodule = file_connections_modules[name] return getattr(import_module(f"onetl.connection.file_connection.{submodule}"), name) + if name in file_df_connections_modules: + submodule = file_df_connections_modules[name] + return getattr(import_module(f"onetl.connection.file_df_connection.{submodule}"), name) + raise ImportError(f"cannot import name {name!r} from {__name__!r}") diff --git a/onetl/connection/db_connection/greenplum.py b/onetl/connection/db_connection/greenplum.py index 2651fdfe6..61ea887b6 100644 --- a/onetl/connection/db_connection/greenplum.py +++ b/onetl/connection/db_connection/greenplum.py @@ -500,7 +500,6 @@ def read_source_as_df( end_at: Statement | None = None, options: ReadOptions | dict | None = None, ) -> DataFrame: - self._check_driver_imported() read_options = self.ReadOptions.parse(options).dict(by_alias=True, exclude_none=True) log.info("|%s| Executing SQL query (on executor):", self.__class__.__name__) where = self.Dialect._condition_assembler(condition=where, start_from=start_from, end_at=end_at) @@ -526,7 +525,6 @@ def write_df_to_target( target: str, options: WriteOptions | dict | None = None, ) -> None: - self._check_driver_imported() write_options = self.WriteOptions.parse(options) options_dict = write_options.dict(by_alias=True, exclude_none=True, exclude={"mode"}) @@ -606,10 +604,10 @@ def get_min_max_bounds( def _check_driver_imported(self): gateway = self.spark._sc._gateway # type: ignore class_name = "io.pivotal.greenplum.spark.GreenplumRelationProvider" - missing_class = getattr(gateway.jvm, class_name) + missing_class = getattr(gateway.jvm, class_name) # type: ignore try: - gateway.help(missing_class, display=False) + gateway.help(missing_class, display=False) # type: ignore except Exception: spark_version = "_".join(self.spark.version.split(".")[:2]) log.error( diff --git a/onetl/connection/db_connection/jdbc_mixin.py b/onetl/connection/db_connection/jdbc_mixin.py index f3e56e62e..0a31775a8 100644 --- a/onetl/connection/db_connection/jdbc_mixin.py +++ b/onetl/connection/db_connection/jdbc_mixin.py @@ -384,10 +384,10 @@ def execute( def _check_driver_imported(self): gateway = self.spark._sc._gateway # type: ignore - driver_class = getattr(gateway.jvm, self.driver) + driver_class = getattr(gateway.jvm, self.driver) # type: ignore try: - gateway.help(driver_class, display=False) + gateway.help(driver_class, display=False) # type: ignore except Exception: log.error( MISSING_JVM_CLASS_MSG, diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 1d52ef8a0..06e7069f0 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -155,7 +155,7 @@ def instance_url(self): return "kafka://" + self.cluster def check(self): - ... + return self def read_table( # type: ignore self, diff --git a/onetl/connection/db_connection/mongodb.py b/onetl/connection/db_connection/mongodb.py index 172a0371c..dc1f50d7e 100644 --- a/onetl/connection/db_connection/mongodb.py +++ b/onetl/connection/db_connection/mongodb.py @@ -701,8 +701,6 @@ def get_min_max_bounds( where: dict | None = None, options: ReadOptions | dict | None = None, ) -> tuple[Any, Any]: - self._check_driver_imported() - log.info("|Spark| Getting min and max values for column %r", column) read_options = self.ReadOptions.parse(options).dict(by_alias=True, exclude_none=True) @@ -749,8 +747,6 @@ def read_source_as_df( end_at: Statement | None = None, options: ReadOptions | dict | None = None, ) -> DataFrame: - self._check_driver_imported() - read_options = self.ReadOptions.parse(options).dict(by_alias=True, exclude_none=True) final_where = self.Dialect._condition_assembler( condition=where, @@ -792,7 +788,6 @@ def write_df_to_target( target: str, options: WriteOptions | dict | None = None, ) -> None: - self._check_driver_imported() write_options = self.WriteOptions.parse(options) mode = write_options.mode write_options = write_options.dict(by_alias=True, exclude_none=True, exclude={"mode"}) @@ -816,10 +811,10 @@ def _check_driver_imported(self): gateway = self.spark._sc._gateway # type: ignore class_name = "com.mongodb.spark.sql.connector.MongoTableProvider" - missing_class = getattr(gateway.jvm, class_name) + missing_class = getattr(gateway.jvm, class_name) # type: ignore try: - gateway.help(missing_class, display=False) + gateway.help(missing_class, display=False) # type: ignore except Exception as e: log.error( MISSING_JVM_CLASS_MSG, diff --git a/onetl/connection/file_connection/mixins/__init__.py b/onetl/connection/file_connection/mixins/__init__.py index c2a1297b1..7a01169aa 100644 --- a/onetl/connection/file_connection/mixins/__init__.py +++ b/onetl/connection/file_connection/mixins/__init__.py @@ -1 +1,15 @@ +# Copyright 2023 MTS (Mobile Telesystems) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from onetl.connection.file_connection.mixins.rename_dir_mixin import RenameDirMixin diff --git a/onetl/connection/file_df_connection/__init__.py b/onetl/connection/file_df_connection/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/onetl/connection/file_df_connection/spark_file_df_connection.py b/onetl/connection/file_df_connection/spark_file_df_connection.py new file mode 100644 index 000000000..4b85bd3ba --- /dev/null +++ b/onetl/connection/file_df_connection/spark_file_df_connection.py @@ -0,0 +1,161 @@ +# Copyright 2023 MTS (Mobile Telesystems) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import textwrap +from abc import abstractmethod +from logging import getLogger +from typing import TYPE_CHECKING + +from pydantic import Field + +from onetl.base import BaseFileDFConnection, BaseFileFormat, PurePathProtocol +from onetl.hooks import slot, support_hooks +from onetl.impl import FrozenModel +from onetl.log import log_with_indent + +if TYPE_CHECKING: + from pyspark.sql import DataFrame, SparkSession + from pyspark.sql.types import StructType + +log = getLogger(__name__) + + +@support_hooks +class SparkFileDFConnection(BaseFileDFConnection, FrozenModel): + """ + Generic class for any Spark-based FileDFConnection classes. + """ + + spark: SparkSession = Field(repr=False) + + @slot + def check(self): + self._check_if_schema_supported() + log.info("|%s| Checking connection availability...", self.__class__.__name__) + self._log_parameters() + try: + path = self._get_spark_default_path() + fs = self._get_spark_fs() + fs.getFileStatus(path).isFile() # type: ignore + log.info("|%s| Connection is available.", self.__class__.__name__) + except Exception as e: + raise RuntimeError("Connection is unavailable") from e + return self + + @slot + def read_files_as_df( + self, + path: PurePathProtocol, + format: BaseFileFormat, # noqa: WPS125 + df_schema: StructType | None = None, + ) -> DataFrame: + reader = format.apply_to_reader(self.spark.read) + if df_schema: + reader = reader.schema(df_schema) + url = self._convert_to_url(path) + return reader.load(url) + + @slot + def write_df_as_files( + self, + df: DataFrame, + path: PurePathProtocol, + format: BaseFileFormat, # noqa: WPS125 + ) -> None: + writer = format.apply_to_writer(df.write) + url = self._convert_to_url(path) + writer.save(url) + + def _check_if_schema_supported(self) -> None: + """ + Check if filesystem is supported by Spark + """ + scheme = self._get_spark_default_path().toUri().getScheme() # type: ignore + try: + self._get_spark_fs() + except Exception: + msg = f"Spark session does not support filesystem '{scheme}://'.\n{self._installation_instructions}" + log.error(msg, exc_info=False) + raise + + @property + @abstractmethod + def _installation_instructions(self) -> str: + """ + Return installation instruction to use in :obj:`~check` method. + """ + + @abstractmethod + def _convert_to_url(self, path: PurePathProtocol) -> str: + """ + Return path with Spark-specific schema prefix, like ``file://``, ``hdfs://``, ``s3a://bucket``. + """ + + @abstractmethod + def _get_default_path(self) -> PurePathProtocol: + """ + Return default path. + + Used by :obj:`~check` method to check connection availability. + """ + + def _get_spark_default_path(self): + """ + Return object of ``org.apache.hadoop.fs.Path`` class for :obj:`~_get_default_path`. + """ + path = self._convert_to_url(self._get_default_path()) + jvm = self.spark.sparkContext._jvm + return jvm.org.apache.hadoop.fs.Path(path) # type: ignore + + def _get_spark_fs(self): + """ + Return object of ``org.apache.hadoop.fs.FileSystem`` class for :obj:`~_get_default_path`. + """ + path = self._get_spark_default_path() + conf = self.spark.sparkContext._jsc.hadoopConfiguration() # type: ignore + return path.getFileSystem(conf) # type: ignore + + @classmethod + def _forward_refs(cls) -> dict[str, type]: + # avoid importing pyspark unless user called the constructor, + # as we allow user to use `Connection.package` for creating Spark session + + refs = super()._forward_refs() + try: + from pyspark.sql import SparkSession # noqa: WPS442 + except (ImportError, NameError) as e: + raise ImportError( + textwrap.dedent( + f""" + Cannot import module "pyspark". + + You should install package as follows: + pip install onetl[spark] + + or inject PySpark to sys.path in some other way BEFORE creating {cls.__name__} instance. + """, + ).strip(), + ) from e + + refs["SparkSession"] = SparkSession + return refs + + def _log_parameters(self): + log.info("|Spark| Using connection parameters:") + log_with_indent("type = %s", self.__class__.__name__) + parameters = self.dict(by_alias=True, exclude_none=True, exclude={"spark"}) + for attr, value in sorted(parameters.items()): + log_with_indent("%s = %r", attr, value) diff --git a/onetl/connection/file_df_connection/spark_local_fs.py b/onetl/connection/file_df_connection/spark_local_fs.py new file mode 100644 index 000000000..ef26b0771 --- /dev/null +++ b/onetl/connection/file_df_connection/spark_local_fs.py @@ -0,0 +1,91 @@ +# Copyright 2023 MTS (Mobile Telesystems) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import os +from pathlib import Path + +from pydantic import validator + +from onetl.base import BaseFileFormat, PurePathProtocol +from onetl.connection.file_df_connection.spark_file_df_connection import ( + SparkFileDFConnection, +) +from onetl.hooks import slot, support_hooks + + +@support_hooks +class SparkLocalFS(SparkFileDFConnection): + """ + Spark connection to local filesystem. |support_hooks| + + Based on `Spark Generic File Data Source `_. + + .. warning:: + + Requires `Spark with Hadoop libraries `_. + + .. warning:: + + Currently supports only Spark sessions created with option ``spark.master: local``. + + .. note:: + + Supports only reading files as Spark DataFrame and writing DataFrame to files. + + Does NOT support file operations, like create, delete, rename, etc. + + Parameters + ---------- + spark : :class:`pyspark.sql.SparkSession` + Spark session + + Examples + -------- + + .. code:: python + + from onetl.connection import SparkLocalFS + from pyspark.sql import SparkSession + + # create Spark session + spark = SparkSession.builder.master("local").appName("spark-app-name").getOrCreate() + + # create connection + local_fs = SparkLocalFS(spark=spark).check() + """ + + @slot + @classmethod + def check_if_format_supported(cls, format: BaseFileFormat) -> None: # noqa: WPS125 + # any format is supported + pass + + @validator("spark") + def _validate_spark(cls, spark): + master = spark.conf.get("spark.master") + if not master.startswith("local"): + raise ValueError(f"Currently supports only spark.master='local', got {master!r}") + return spark + + @property + def _installation_instructions(self) -> str: + return "Please install Spark with Hadoop libraries" + + def _convert_to_url(self, path: PurePathProtocol) -> str: + return "file://" + os.fspath(path) + + def _get_default_path(self): + return Path(os.getcwd()) diff --git a/pytest.ini b/pytest.ini index bbcaee161..063c77567 100644 --- a/pytest.ini +++ b/pytest.ini @@ -5,21 +5,23 @@ cache_dir = .pytest_cache log_cli_level = INFO markers = connection: Tests which calls connection constructor or imports optional requirements - db_connection: Tests which calls DB connection constructor + db_connection: Tests which requires PySpark to be installed + file_df_connection: Tests which requires PySpark to be installed file_connection: Tests which imports optional requirements - webdav: WebDAV tests - hdfs: HDFS tests + clickhouse: Clickhouse tests ftp: FTP tests ftps: FTPS tests - sftp: SFTP tests - s3: S3 tests greenplum: Greenplum tests - postgres: Postgres tests + hdfs: HDFS tests hive: Hive tests - oracle: Oracle tests - clickhouse: Clickhouse tests - mysql: MySQL tests - mssql: MSSQL tests + kafka: Kafka tests + local_fs: LocalFS tests mongodb: MongoDB tests + mssql: MSSQL tests + mysql: MySQL tests + oracle: Oracle tests + postgres: Postgres tests + s3: S3 tests + sftp: SFTP tests teradata: Teradata tests - kafka: Kafka tests + webdav: WebDAV tests diff --git a/tests/tests_integration/test_file_df_connection_integration/test_spark_local_fs_integration.py b/tests/tests_integration/test_file_df_connection_integration/test_spark_local_fs_integration.py new file mode 100644 index 000000000..c5d433b75 --- /dev/null +++ b/tests/tests_integration/test_file_df_connection_integration/test_spark_local_fs_integration.py @@ -0,0 +1,18 @@ +import logging + +import pytest + +from onetl.connection import SparkLocalFS + +pytestmark = [pytest.mark.local_fs, pytest.mark.file_df_connection, pytest.mark.connection] + + +def test_spark_local_fs_check(spark, caplog): + local_fs = SparkLocalFS(spark=spark) + + with caplog.at_level(logging.INFO): + assert local_fs.check() == local_fs + + assert "type = SparkLocalFS" in caplog.text + + assert "Connection is available" in caplog.text diff --git a/tests/tests_unit/test_file_df_connection_unit/test_spark_local_fs_unit.py b/tests/tests_unit/test_file_df_connection_unit/test_spark_local_fs_unit.py new file mode 100644 index 000000000..cefc94197 --- /dev/null +++ b/tests/tests_unit/test_file_df_connection_unit/test_spark_local_fs_unit.py @@ -0,0 +1,18 @@ +import re + +import pytest + +from onetl.connection import SparkLocalFS + +pytestmark = [pytest.mark.local_fs, pytest.mark.file_df_connection, pytest.mark.connection] + + +def test_spark_local_fs_spark_local(spark_mock): + conn = SparkLocalFS(spark=spark_mock) + assert conn.spark == spark_mock + + +def test_spark_local_fs_spark_non_local(spark_cluster_mock): + msg = re.escape("Currently supports only spark.master='local'") + with pytest.raises(ValueError, match=msg): + SparkLocalFS(spark=spark_cluster_mock) diff --git a/tests/tests_unit/tests_db_connection_unit/test_clickhouse_unit.py b/tests/tests_unit/tests_db_connection_unit/test_clickhouse_unit.py index 81840ba13..430cde843 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_clickhouse_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_clickhouse_unit.py @@ -2,7 +2,7 @@ from onetl.connection import Clickhouse -pytestmark = pytest.mark.clickhouse +pytestmark = [pytest.mark.clickhouse, pytest.mark.db_connection, pytest.mark.connection] def test_clickhouse_class_attributes(): diff --git a/tests/tests_unit/tests_db_connection_unit/test_db_options_unit.py b/tests/tests_unit/tests_db_connection_unit/test_db_options_unit.py index d8129f3c9..78a715bb2 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_db_options_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_db_options_unit.py @@ -5,6 +5,8 @@ from onetl.connection import Greenplum, Hive, Oracle, Postgres +pytestmark = [pytest.mark.db_connection, pytest.mark.connection] + @pytest.mark.parametrize( "options_class", diff --git a/tests/tests_unit/tests_db_connection_unit/test_greenplum_unit.py b/tests/tests_unit/tests_db_connection_unit/test_greenplum_unit.py index c28457735..858feec93 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_greenplum_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_greenplum_unit.py @@ -3,7 +3,7 @@ from onetl.connection import Greenplum from onetl.connection.db_connection.greenplum import GreenplumWriteMode -pytestmark = pytest.mark.greenplum +pytestmark = [pytest.mark.greenplum, pytest.mark.db_connection, pytest.mark.connection] def test_greenplum_class_attributes(): diff --git a/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py b/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py index 9b401abbd..ceea3f5a4 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_hive_unit.py @@ -9,7 +9,7 @@ from onetl.connection.db_connection.hive import HiveWriteMode from onetl.hooks import hook -pytestmark = pytest.mark.hive +pytestmark = [pytest.mark.hive, pytest.mark.db_connection, pytest.mark.connection] def test_hive_missing_args(spark_mock): diff --git a/tests/tests_unit/tests_db_connection_unit/test_jdbc_options_unit.py b/tests/tests_unit/tests_db_connection_unit/test_jdbc_options_unit.py index fd914568d..c146c97ed 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_jdbc_options_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_jdbc_options_unit.py @@ -4,6 +4,8 @@ from onetl.connection import Oracle, Postgres from onetl.connection.db_connection.jdbc_connection import JDBCWriteMode +pytestmark = [pytest.mark.db_connection, pytest.mark.connection] + def test_jdbc_read_options_default(): options = Oracle.ReadOptions() diff --git a/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py b/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py index ec6e7244b..3bc008c9c 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_kafka_unit.py @@ -6,7 +6,7 @@ from onetl.connection import Kafka -pytestmark = pytest.mark.kafka +pytestmark = [pytest.mark.kafka, pytest.mark.db_connection, pytest.mark.connection] @pytest.mark.parametrize( diff --git a/tests/tests_unit/tests_db_connection_unit/test_mongodb_unit.py b/tests/tests_unit/tests_db_connection_unit/test_mongodb_unit.py index 8671efbc0..4fb68c29e 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_mongodb_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_mongodb_unit.py @@ -4,7 +4,7 @@ from onetl.connection import MongoDB -pytestmark = pytest.mark.mongodb +pytestmark = [pytest.mark.mongodb, pytest.mark.db_connection, pytest.mark.connection] def test_mongodb(spark_mock): diff --git a/tests/tests_unit/tests_db_connection_unit/test_mssql_unit.py b/tests/tests_unit/tests_db_connection_unit/test_mssql_unit.py index e57a0a165..77eeea1a5 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_mssql_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_mssql_unit.py @@ -2,7 +2,7 @@ from onetl.connection import MSSQL -pytestmark = pytest.mark.mssql +pytestmark = [pytest.mark.mssql, pytest.mark.db_connection, pytest.mark.connection] def test_mssql_class_attributes(): diff --git a/tests/tests_unit/tests_db_connection_unit/test_mysql_unit.py b/tests/tests_unit/tests_db_connection_unit/test_mysql_unit.py index e48c0215c..75004804b 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_mysql_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_mysql_unit.py @@ -2,7 +2,7 @@ from onetl.connection import MySQL -pytestmark = pytest.mark.mysql +pytestmark = [pytest.mark.mysql, pytest.mark.db_connection, pytest.mark.connection] def test_mysql_class_attributes(): diff --git a/tests/tests_unit/tests_db_connection_unit/test_oracle_unit.py b/tests/tests_unit/tests_db_connection_unit/test_oracle_unit.py index c7f82b16b..9999c95df 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_oracle_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_oracle_unit.py @@ -2,7 +2,7 @@ from onetl.connection import Oracle -pytestmark = pytest.mark.oracle +pytestmark = [pytest.mark.oracle, pytest.mark.db_connection, pytest.mark.connection] def test_oracle_class_attributes(): diff --git a/tests/tests_unit/tests_db_connection_unit/test_postgres_unit.py b/tests/tests_unit/tests_db_connection_unit/test_postgres_unit.py index e16d2aa9a..c29819924 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_postgres_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_postgres_unit.py @@ -2,7 +2,7 @@ from onetl.connection import Postgres -pytestmark = pytest.mark.postgres +pytestmark = [pytest.mark.postgres, pytest.mark.db_connection, pytest.mark.connection] def test_postgres_class_attributes(): diff --git a/tests/tests_unit/tests_db_connection_unit/test_teradata_unit.py b/tests/tests_unit/tests_db_connection_unit/test_teradata_unit.py index 80b758b3c..257147814 100644 --- a/tests/tests_unit/tests_db_connection_unit/test_teradata_unit.py +++ b/tests/tests_unit/tests_db_connection_unit/test_teradata_unit.py @@ -2,7 +2,7 @@ from onetl.connection import Teradata -pytestmark = pytest.mark.teradata +pytestmark = [pytest.mark.teradata, pytest.mark.db_connection, pytest.mark.connection] def test_teradata_class_attributes():