Skip to content

Commit

Permalink
Test Python 3.13
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Jul 27, 2024
1 parent b657322 commit dcacfae
Show file tree
Hide file tree
Showing 19 changed files with 146 additions and 84 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/data/core/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ min: &min
max: &max
spark-version: 3.5.1
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4-dev'
java-version: 20
os: ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/ftp/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ min: &min

max: &max
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
os: ubuntu-latest

latest: &latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/ftps/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ min: &min

max: &max
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
os: ubuntu-latest

latest: &latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/hdfs/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ max: &max
hadoop-version: hadoop3-hdfs
spark-version: 3.5.1
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
java-version: 20
os: ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/s3/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ max: &max
minio-version: 2024.4.18
spark-version: 3.5.1
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
java-version: 20
os: ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/sftp/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ min: &min

max: &max
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
os: ubuntu-latest

latest: &latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/webdav/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ min: &min

max: &max
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
os: ubuntu-latest

latest: &latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-sftp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:

- name: Install dependencies
run: |
pip install -I -r requirements/core.txt -r requirements/sftp.txt -r requirements/tests/base.txt -r requirements/tests/pydantic-${{ inputs.pydantic-version }}.txt
pip install -I -r requirements/core.txt -r requirements/sftp.txt -r requirements/tests/base.txt -r requirements/tests/pydantic-${{ inputs.pydantic-version }}.txt cffi==1.17.0rc1
- name: Run tests
run: |
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/test-webdav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ jobs:
with:
python-version: ${{ inputs.python-version }}

- name: Set up lxml libs
if: runner.os == 'Linux'
run: |
sudo apt-get update
sudo apt-get install --no-install-recommends libxml2-dev libxslt-dev
- name: Cache pip
uses: actions/cache@v4
if: inputs.with-cache
Expand Down
14 changes: 3 additions & 11 deletions onetl/base/base_file_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, ContextManager
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pyspark.sql import DataFrameReader, DataFrameWriter, SparkSession
Expand All @@ -30,7 +30,7 @@ def check_if_supported(self, spark: SparkSession) -> None:
"""

@abstractmethod
def apply_to_reader(self, reader: DataFrameReader) -> DataFrameReader | ContextManager[DataFrameReader]:
def apply_to_reader(self, reader: DataFrameReader) -> DataFrameReader:
"""
Apply provided format to :obj:`pyspark.sql.DataFrameReader`. |support_hooks|
Expand All @@ -40,10 +40,6 @@ def apply_to_reader(self, reader: DataFrameReader) -> DataFrameReader | ContextM
-------
:obj:`pyspark.sql.DataFrameReader`
DataFrameReader with options applied.
``ContextManager[DataFrameReader]``
If returned context manager, it will be entered before reading data and exited after creating a DataFrame.
Context manager's ``__enter__`` method should return :obj:`pyspark.sql.DataFrameReader` instance.
"""


Expand All @@ -68,7 +64,7 @@ def check_if_supported(self, spark: SparkSession) -> None:
"""

@abstractmethod
def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter | ContextManager[DataFrameWriter]:
def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter:
"""
Apply provided format to :obj:`pyspark.sql.DataFrameWriter`. |support_hooks|
Expand All @@ -78,8 +74,4 @@ def apply_to_writer(self, writer: DataFrameWriter) -> DataFrameWriter | ContextM
-------
:obj:`pyspark.sql.DataFrameWriter`
DataFrameWriter with options applied.
``ContextManager[DataFrameWriter]``
If returned context manager, it will be entered before writing and exited after writing a DataFrame.
Context manager's ``__enter__`` method should return :obj:`pyspark.sql.DataFrameWriter` instance.
"""
21 changes: 15 additions & 6 deletions onetl/connection/db_connection/greenplum/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
import logging
import os
import textwrap
import threading
import warnings
from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, Optional

from etl_entities.instance import Host

from onetl.connection.db_connection.jdbc_connection.options import JDBCReadOptions

try:
from pydantic.v1 import validator
from pydantic.v1 import PrivateAttr, SecretStr, validator
except (ImportError, AttributeError):
from pydantic import validator # type: ignore[no-redef, assignment]
from pydantic import validator, SecretStr, PrivateAttr # type: ignore[no-redef, assignment]

from onetl._util.classproperty import classproperty
from onetl._util.java import try_import_java_class
Expand All @@ -39,7 +40,9 @@
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCExecuteOptions,
JDBCFetchOptions,
JDBCOptions,
)
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCOptions as JDBCMixinOptions,
)
from onetl.exception import MISSING_JVM_CLASS_MSG, TooManyParallelJobsError
from onetl.hooks import slot, support_hooks
Expand Down Expand Up @@ -69,11 +72,11 @@ class GreenplumExtra(GenericOptions):

class Config:
extra = "allow"
prohibited_options = JDBCOptions.Config.prohibited_options
prohibited_options = JDBCMixinOptions.Config.prohibited_options


@support_hooks
class Greenplum(JDBCMixin, DBConnection):
class Greenplum(JDBCMixin, DBConnection): # noqa: WPS338
"""Greenplum connection. |support_hooks|
Based on package ``io.pivotal:greenplum-spark:2.2.0``
Expand Down Expand Up @@ -157,6 +160,8 @@ class Greenplum(JDBCMixin, DBConnection):
"""

host: Host
user: str
password: SecretStr
database: str
port: int = 5432
extra: GreenplumExtra = GreenplumExtra()
Expand All @@ -166,6 +171,7 @@ class Greenplum(JDBCMixin, DBConnection):
SQLOptions = GreenplumSQLOptions
FetchOptions = GreenplumFetchOptions
ExecuteOptions = GreenplumExecuteOptions
JDBCOptions = JDBCMixinOptions

Extra = GreenplumExtra
Dialect = GreenplumDialect
Expand All @@ -174,6 +180,9 @@ class Greenplum(JDBCMixin, DBConnection):
CONNECTIONS_WARNING_LIMIT: ClassVar[int] = 31
CONNECTIONS_EXCEPTION_LIMIT: ClassVar[int] = 100

_CHECK_QUERY: ClassVar[str] = "SELECT 1"
_last_connection_and_options: Optional[threading.local] = PrivateAttr(default=None)

@slot
@classmethod
def get_packages(
Expand Down
7 changes: 4 additions & 3 deletions onetl/connection/db_connection/hive/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from onetl._util.spark import inject_spark_param
from onetl._util.sql import clear_statement
from onetl.base import BaseWritableFileFormat
from onetl.connection.db_connection.db_connection import DBConnection
from onetl.connection.db_connection.hive.dialect import HiveDialect
from onetl.connection.db_connection.hive.options import (
Expand All @@ -23,7 +24,7 @@
HiveWriteOptions,
)
from onetl.connection.db_connection.hive.slots import HiveSlots
from onetl.file.format.file_format import WriteOnlyFileFormat
from onetl.file.format.file_format import ReadWriteFileFormat, WriteOnlyFileFormat
from onetl.hooks import slot, support_hooks
from onetl.hwm import Window
from onetl.log import log_lines, log_with_indent
Expand Down Expand Up @@ -456,7 +457,7 @@ def _format_write_options(self, write_options: HiveWriteOptions) -> dict:
exclude={"if_exists"},
)

if isinstance(write_options.format, WriteOnlyFileFormat):
if isinstance(write_options.format, (WriteOnlyFileFormat, ReadWriteFileFormat)):
options_dict["format"] = write_options.format.name
options_dict.update(write_options.format.dict(exclude={"name"}))

Expand Down Expand Up @@ -485,7 +486,7 @@ def _save_as_table(
writer = writer.option(method, value)

# deserialize passed OCR(), Parquet(), CSV(), etc. file formats
if isinstance(write_options.format, WriteOnlyFileFormat):
if isinstance(write_options.format, BaseWritableFileFormat):
writer = write_options.format.apply_to_writer(writer)
elif isinstance(write_options.format, str):
writer = writer.format(write_options.format)
Expand Down
4 changes: 2 additions & 2 deletions onetl/connection/db_connection/hive/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from typing_extensions import deprecated

from onetl.file.format.file_format import WriteOnlyFileFormat
from onetl.base import BaseWritableFileFormat
from onetl.impl import GenericOptions


Expand Down Expand Up @@ -199,7 +199,7 @@ class Config:
does not affect behavior.
"""

format: Union[str, WriteOnlyFileFormat] = "orc"
format: Union[str, BaseWritableFileFormat] = "orc"
"""Format of files which should be used for storing table data.
Examples
Expand Down
58 changes: 53 additions & 5 deletions onetl/connection/db_connection/jdbc_connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@

import logging
import secrets
import threading
import warnings
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, ClassVar, Optional

try:
from pydantic.v1 import PrivateAttr, SecretStr, validator
except (ImportError, AttributeError):
from pydantic import PrivateAttr, SecretStr, validator # type: ignore[no-redef, assignment]

from onetl._util.java import try_import_java_class
from onetl._util.sql import clear_statement
from onetl.connection.db_connection.db_connection import DBConnection
from onetl.connection.db_connection.jdbc_connection.dialect import JDBCDialect
Expand All @@ -19,7 +26,14 @@
JDBCWriteOptions,
)
from onetl.connection.db_connection.jdbc_mixin import JDBCMixin
from onetl.connection.db_connection.jdbc_mixin.options import JDBCFetchOptions
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCExecuteOptions,
JDBCFetchOptions,
)
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCOptions as JDBCMixinOptions,
)
from onetl.exception import MISSING_JVM_CLASS_MSG
from onetl.hooks import slot, support_hooks
from onetl.hwm import Window
from onetl.log import log_lines, log_with_indent
Expand All @@ -44,13 +58,38 @@


@support_hooks
class JDBCConnection(JDBCMixin, DBConnection):
class JDBCConnection(JDBCMixin, DBConnection): # noqa: WPS338
user: str
password: SecretStr

DRIVER: ClassVar[str]
_CHECK_QUERY: ClassVar[str] = "SELECT 1"
_last_connection_and_options: Optional[threading.local] = PrivateAttr(default=None)

JDBCOptions = JDBCMixinOptions
FetchOptions = JDBCFetchOptions
ExecuteOptions = JDBCExecuteOptions
Dialect = JDBCDialect
ReadOptions = JDBCReadOptions
SQLOptions = JDBCSQLOptions
WriteOptions = JDBCWriteOptions
Options = JDBCLegacyOptions

@validator("spark")
def _check_java_class_imported(cls, spark):
try:
try_import_java_class(spark, cls.DRIVER)
except Exception as e:
msg = MISSING_JVM_CLASS_MSG.format(
java_class=cls.DRIVER,
package_source=cls.__name__,
args="",
)
if log.isEnabledFor(logging.DEBUG):
log.debug("Missing Java class", exc_info=e, stack_info=True)
raise ValueError(msg) from e
return spark

@slot
def sql(
self,
Expand Down Expand Up @@ -109,11 +148,16 @@ def read_source_as_df(
limit: int | None = None,
options: JDBCReadOptions | None = None,
) -> DataFrame:
if isinstance(options, JDBCLegacyOptions):
raw_options = self.ReadOptions.parse(options.dict(exclude_unset=True))
else:
raw_options = self.ReadOptions.parse(options)

read_options = self._set_lower_upper_bound(
table=source,
where=where,
hint=hint,
options=self.ReadOptions.parse(options),
options=raw_options,
)

new_columns = columns or ["*"]
Expand Down Expand Up @@ -170,7 +214,11 @@ def write_df_to_target(
target: str,
options: JDBCWriteOptions | None = None,
) -> None:
write_options = self.WriteOptions.parse(options)
if isinstance(options, JDBCLegacyOptions):
write_options = self.WriteOptions.parse(options.dict(exclude_unset=True))
else:
write_options = self.WriteOptions.parse(options)

jdbc_properties = self._get_jdbc_properties(write_options, exclude={"if_exists"}, exclude_none=True)

mode = (
Expand Down
14 changes: 13 additions & 1 deletion onetl/connection/db_connection/jdbc_connection/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,19 @@ def _check_partition_fields(cls, values):
"Deprecated in 0.5.0 and will be removed in 1.0.0. Use 'ReadOptions' or 'WriteOptions' instead",
category=UserWarning,
)
class JDBCLegacyOptions(JDBCReadOptions, JDBCWriteOptions):
class JDBCLegacyOptions(GenericOptions):
class Config:
prohibited_options = GENERIC_PROHIBITED_OPTIONS
known_options = READ_OPTIONS | WRITE_OPTIONS | READ_WRITE_OPTIONS
extra = "allow"

partition_column: Optional[str] = Field(default=None, alias="partitionColumn")
num_partitions: PositiveInt = Field(default=1, alias="numPartitions")
lower_bound: Optional[int] = Field(default=None, alias="lowerBound")
upper_bound: Optional[int] = Field(default=None, alias="upperBound")
session_init_statement: Optional[str] = Field(default=None, alias="sessionInitStatement")
query_timeout: Optional[int] = Field(default=None, alias="queryTimeout")
if_exists: JDBCTableExistBehavior = Field(default=JDBCTableExistBehavior.APPEND, alias="mode")
isolation_level: str = Field(default="READ_UNCOMMITTED", alias="isolationLevel")
fetchsize: int = 100_000
batchsize: int = 20_000
Loading

0 comments on commit dcacfae

Please sign in to comment.