Skip to content

Commit

Permalink
[DOP-6701] Add SparkLocalFS connection
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Jul 12, 2023
1 parent f4affb6 commit d91a3e6
Show file tree
Hide file tree
Showing 68 changed files with 582 additions and 83 deletions.
1 change: 1 addition & 0 deletions .github/workflows/data/file-df/ignored.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docs
5 changes: 5 additions & 0 deletions .github/workflows/data/file-df/tracked.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.github/workflows/data/file-df/**
onetl/file_df_connection/spark_file_df_connection.py
onetl/file/file_reader/**
onetl/file/file_writer/**
onetl/file/__init__.py
9 changes: 8 additions & 1 deletion .github/workflows/data/file/tracked.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
.github/workflows/data/file/**
onetl/file_connection/file_connection.py
onetl/file/**
onetl/file/file_downloader/**
onetl/file/file_mover/**
onetl/file/file_uploader/**
onetl/file/filter/**
onetl/file/limit/**
onetl/file/file_result.py
onetl/file/file_set.py
onetl/file/__init__.py
1 change: 1 addition & 0 deletions .github/workflows/data/local-fs/ignored.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docs
27 changes: 27 additions & 0 deletions .github/workflows/data/local-fs/matrix.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
min: &min
spark-version: 2.3.1
python-version: '3.7'
java-version: 8
os: ubuntu-latest

max: &max
spark-version: 3.4.1
python-version: '3.11'
java-version: 17
os: ubuntu-latest

latest: &latest
spark-version: latest
python-version: '3.11'
java-version: 20
os: ubuntu-latest

matrix:
small:
- <<: *max
full:
- <<: *min
- <<: *max
nightly:
- <<: *min
- <<: *latest
1 change: 1 addition & 0 deletions .github/workflows/data/local-fs/tracked.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
**/*local_fs*
44 changes: 44 additions & 0 deletions .github/workflows/get-matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ on:
value: ${{ jobs.get-matrix.outputs.matrix-hive }}
matrix-kafka:
value: ${{ jobs.get-matrix.outputs.matrix-kafka }}
matrix-local-fs:
value: ${{ jobs.get-matrix.outputs.matrix-local-fs }}
matrix-mongodb:
value: ${{ jobs.get-matrix.outputs.matrix-mongodb }}
matrix-mssql:
Expand Down Expand Up @@ -55,6 +57,7 @@ jobs:
matrix-greenplum: ${{ toJson(fromJson(steps.matrix-greenplum.outputs.result)[steps.key-greenplum.outputs.key]) }}
matrix-hive: ${{ toJson(fromJson(steps.matrix-hive.outputs.result)[steps.key-hive.outputs.key]) }}
matrix-kafka: ${{ toJson(fromJson(steps.matrix-kafka.outputs.result)[steps.key-kafka.outputs.key]) }}
matrix-local-fs: ${{ toJson(fromJson(steps.matrix-local-fs.outputs.result)[steps.key-local-fs.outputs.key]) }}
matrix-mongodb: ${{ toJson(fromJson(steps.matrix-mongodb.outputs.result)[steps.key-mongodb.outputs.key]) }}
matrix-mssql: ${{ toJson(fromJson(steps.matrix-mssql.outputs.result)[steps.key-mssql.outputs.key]) }}
matrix-mysql: ${{ toJson(fromJson(steps.matrix-mysql.outputs.result)[steps.key-mysql.outputs.key]) }}
Expand Down Expand Up @@ -111,6 +114,17 @@ jobs:
run: |
echo '${{ steps.changed-file.outputs.all_changed_files }}'
- name: Check if file-df-related files are changed
id: changed-file-df
uses: tj-actions/changed-files@v35
with:
files_from_source_file: .github/workflows/data/file-df/tracked.txt
files_ignore_from_source_file: .github/workflows/data/file-df/ignored.txt

- name: Print file-df-related files changed
run: |
echo '${{ steps.changed-file-df.outputs.all_changed_files }}'
- name: Check if core files are changed
id: changed-core
uses: tj-actions/changed-files@v35
Expand Down Expand Up @@ -261,6 +275,36 @@ jobs:
with:
cmd: yq -o=json '.matrix' .github/workflows/data/kafka/matrix.yml

- name: Check if LocalFS files are changed
id: changed-local-fs
uses: tj-actions/changed-files@v35
with:
files_from_source_file: .github/workflows/data/local-fs/tracked.txt
files_ignore_from_source_file: .github/workflows/data/local-fs/ignored.txt

- name: Print LocalFS files changed
run: |
echo '${{ steps.changed-local-fs.outputs.all_changed_files }}'
- name: Calculate LocalFS matrix key
id: key-local-fs
run: |
if ${{ inputs.nightly }}; then
key=nightly
elif ${{ steps.changed-base.outputs.any_changed }} || ${{ steps.changed-file-df.outputs.any_changed }} || ${{ steps.changed-local-fs.outputs.any_changed }}; then
key=full
else
key=small
fi
echo key=$key
echo key=$key >> $GITHUB_OUTPUT
- name: Get LocalFS matrix
id: matrix-local-fs
uses: mikefarah/yq@v4.33.3
with:
cmd: yq -o=json '.matrix' .github/workflows/data/local-fs/matrix.yml

- name: Check if MongoDB files are changed
id: changed-mongodb
uses: tj-actions/changed-files@v35
Expand Down
17 changes: 17 additions & 0 deletions .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,22 @@ jobs:
os: ${{ matrix.os }}
with-cache: false

tests-local-fs:
name: Run LocalFS tests (spark=${{ matrix.spark-version }}, java=${{ matrix.java-version }}, python=${{ matrix.python-version }}, os=${{ matrix.os }})
needs: [get-matrix]
strategy:
fail-fast: false
matrix:
include: ${{ fromJson(needs.get-matrix.outputs.matrix-local-fs) }}

uses: ./.github/workflows/test-local-fs.yml
with:
spark-version: ${{ matrix.spark-version }}
java-version: ${{ matrix.java-version }}
python-version: ${{ matrix.python-version }}
os: ${{ matrix.os }}
with-cache: false

tests-mongodb:
name: Run MongoDB tests (server=${{ matrix.mongodb-version }}, spark=${{ matrix.spark-version }}, java=${{ matrix.java-version }}, python=${{ matrix.python-version }}, os=${{ matrix.os }})
needs: [get-matrix]
Expand Down Expand Up @@ -306,6 +322,7 @@ jobs:
- tests-clickhouse
- tests-hive
- tests-kafka
- tests-local-fs
- tests-mongodb
- tests-mssql
- tests-mysql
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-clickhouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ jobs:
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py') }}
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-
- name: Cache pip
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ jobs:
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py') }}
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-clickhouse-
- name: Cache pip
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-greenplum.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ jobs:
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-greenplum-${{ hashFiles('onetl/connection/db_connection/*.py') }}
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-greenplum-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-greenplum-${{ hashFiles('onetl/connection/db_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-greenplum-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-greenplum-
- name: Cache pip
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-hive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ jobs:
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-hive-${{ hashFiles('onetl/connection/db_connection/*.py') }}
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-hive-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-hive-${{ hashFiles('onetl/connection/db_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-hive-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-hive-
- name: Cache pip
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ jobs:
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-kafka-${{ hashFiles('onetl/connection/db_connection/*.py') }}
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-kafka-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-kafka-${{ hashFiles('onetl/connection/db_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-kafka-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-kafka-
- name: Cache pip
Expand Down
80 changes: 80 additions & 0 deletions .github/workflows/test-local-fs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
name: Tests for LocalFS
on:
workflow_call:
inputs:
spark-version:
required: true
type: string
java-version:
required: true
type: string
python-version:
required: true
type: string
os:
required: true
type: string
with-cache:
required: false
type: boolean
default: true

jobs:
test-local-fs:
name: Run LocalFS tests (spark=${{ inputs.spark-version }}, java=${{ inputs.java-version }}, python=${{ inputs.python-version }}, os=${{ inputs.os }})
runs-on: ${{ inputs.os }}

steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Set up Java ${{ inputs.java-version }}
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: ${{ inputs.java-version }}

- name: Set up Python ${{ inputs.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ inputs.python-version }}

- name: Cache Ivy
uses: actions/cache@v3
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-local-fs-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-local-fs-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-local-fs-
- name: Cache pip
uses: actions/cache@v3
if: inputs.with-cache
with:
path: ~/.cache/pip
key: ${{ runner.os }}-python-${{ inputs.python-version }}-tests-local-fs-${{ hashFiles('requirements/core.txt', 'requirements/tests/base.txt', 'requirements/tests/spark-*.txt') }}
restore-keys: |
${{ runner.os }}-python-${{ inputs.python-version }}-tests-local-fs-${{ hashFiles('requirements/core.txt', 'requirements/tests/base.txt', 'requirements/tests/spark-*.txt') }}
${{ runner.os }}-python-${{ inputs.python-version }}-tests-local-fs-
- name: Upgrade pip
run: python -m pip install --upgrade pip setuptools wheel

- name: Install dependencies
run: |
pip install -I -r requirements/core.txt -r requirements/tests/base.txt -r requirements/tests/spark-${{ inputs.spark-version }}.txt
- name: Run tests
run: |
mkdir reports/ || echo "Directory exists"
sed '/^$/d' ./.env.local | sed '/^#/d' | sed 's/^/export /' > ./env
source ./env
./pytest_runner.sh -m local_fs
- name: Upload coverage results
uses: actions/upload-artifact@v3
with:
name: local-fs-spark-${{ inputs.spark-version }}-python-${{ inputs.python-version }}-os-${{ inputs.os }}
path: reports/*
4 changes: 2 additions & 2 deletions .github/workflows/test-mongodb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ jobs:
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mongodb-${{ hashFiles('onetl/connection/db_connection/*.py') }}
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mongodb-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mongodb-${{ hashFiles('onetl/connection/db_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mongodb-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mongodb-
- name: Cache pip
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-mssql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ jobs:
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mssql-${{ hashFiles('onetl/connection/db_connection/*.py') }}
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mssql-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mssql-${{ hashFiles('onetl/connection/db_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mssql-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mssql-
- name: Cache pip
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ jobs:
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mysql-${{ hashFiles('onetl/connection/db_connection/*.py') }}
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mysql-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mysql-${{ hashFiles('onetl/connection/db_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mysql-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-mysql-
- name: Cache pip
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-oracle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ jobs:
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-oracle-${{ hashFiles('onetl/connection/db_connection/*.py') }}
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-oracle-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-oracle-${{ hashFiles('onetl/connection/db_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-oracle-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-oracle-
- name: Cache pip
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-postgres.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ jobs:
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-postgres-${{ hashFiles('onetl/connection/db_connection/*.py') }}
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-postgres-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-postgres-${{ hashFiles('onetl/connection/db_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-postgres-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-postgres-
- name: Cache pip
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/test-teradata.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ jobs:
if: inputs.with-cache
with:
path: ~/.ivy2
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-teradata-${{ hashFiles('onetl/connection/db_connection/*.py') }}
key: ${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-teradata-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
restore-keys: |
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-teradata-${{ hashFiles('onetl/connection/db_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-teradata-${{ hashFiles('onetl/connection/db_connection/*.py', 'onetl/connection/file_df_connection/*.py') }}
${{ runner.os }}-ivy-${{ inputs.spark-version }}-tests-teradata-
- name: Cache pip
Expand Down
16 changes: 16 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,21 @@ jobs:
python-version: ${{ matrix.python-version }}
os: ${{ matrix.os }}

tests-local-fs:
name: Run LocalFS tests (spark=${{ matrix.spark-version }}, java=${{ matrix.java-version }}, python=${{ matrix.python-version }}, os=${{ matrix.os }})
needs: [get-matrix]
strategy:
fail-fast: false
matrix:
include: ${{ fromJson(needs.get-matrix.outputs.matrix-local-fs) }}

uses: ./.github/workflows/test-local-fs.yml
with:
spark-version: ${{ matrix.spark-version }}
java-version: ${{ matrix.java-version }}
python-version: ${{ matrix.python-version }}
os: ${{ matrix.os }}

tests-mongodb:
name: Run MongoDB tests (server=${{ matrix.mongodb-version }}, spark=${{ matrix.spark-version }}, java=${{ matrix.java-version }}, python=${{ matrix.python-version }}, os=${{ matrix.os }})
needs: [get-matrix]
Expand Down Expand Up @@ -290,6 +305,7 @@ jobs:
- tests-clickhouse
- tests-hive
- tests-kafka
- tests-local-fs
- tests-mongodb
- tests-mssql
- tests-mysql
Expand Down
Loading

0 comments on commit d91a3e6

Please sign in to comment.