Skip to content

Commit

Permalink
Test Python 3.13
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Jul 27, 2024
1 parent b657322 commit 2d50707
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 33 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/data/core/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ min: &min
max: &max
spark-version: 3.5.1
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
java-version: 20
os: ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/ftp/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ min: &min

max: &max
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
os: ubuntu-latest

latest: &latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/ftps/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ min: &min

max: &max
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
os: ubuntu-latest

latest: &latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/hdfs/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ max: &max
hadoop-version: hadoop3-hdfs
spark-version: 3.5.1
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
java-version: 20
os: ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/s3/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ max: &max
minio-version: 2024.4.18
spark-version: 3.5.1
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
java-version: 20
os: ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/sftp/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ min: &min

max: &max
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
os: ubuntu-latest

latest: &latest
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/data/webdav/matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ min: &min

max: &max
pydantic-version: 2
python-version: '3.12'
python-version: '3.13.0-beta.4'
os: ubuntu-latest

latest: &latest
Expand Down
19 changes: 14 additions & 5 deletions onetl/connection/db_connection/greenplum/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
import logging
import os
import textwrap
import threading
import warnings
from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar, Optional

from etl_entities.instance import Host

from onetl.connection.db_connection.jdbc_connection.options import JDBCReadOptions

try:
from pydantic.v1 import validator
from pydantic.v1 import PrivateAttr, SecretStr, validator
except (ImportError, AttributeError):
from pydantic import validator # type: ignore[no-redef, assignment]
from pydantic import validator, SecretStr, PrivateAttr # type: ignore[no-redef, assignment]

from onetl._util.classproperty import classproperty
from onetl._util.java import try_import_java_class
Expand All @@ -39,7 +40,9 @@
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCExecuteOptions,
JDBCFetchOptions,
JDBCOptions,
)
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCOptions as JDBCMixinOptions,
)
from onetl.exception import MISSING_JVM_CLASS_MSG, TooManyParallelJobsError
from onetl.hooks import slot, support_hooks
Expand Down Expand Up @@ -69,7 +72,7 @@ class GreenplumExtra(GenericOptions):

class Config:
extra = "allow"
prohibited_options = JDBCOptions.Config.prohibited_options
prohibited_options = JDBCMixinOptions.Config.prohibited_options


@support_hooks
Expand Down Expand Up @@ -157,6 +160,8 @@ class Greenplum(JDBCMixin, DBConnection):
"""

host: Host
user: str
password: SecretStr
database: str
port: int = 5432
extra: GreenplumExtra = GreenplumExtra()
Expand All @@ -166,6 +171,7 @@ class Greenplum(JDBCMixin, DBConnection):
SQLOptions = GreenplumSQLOptions
FetchOptions = GreenplumFetchOptions
ExecuteOptions = GreenplumExecuteOptions
JDBCOptions = JDBCMixinOptions

Extra = GreenplumExtra
Dialect = GreenplumDialect
Expand All @@ -174,6 +180,9 @@ class Greenplum(JDBCMixin, DBConnection):
CONNECTIONS_WARNING_LIMIT: ClassVar[int] = 31
CONNECTIONS_EXCEPTION_LIMIT: ClassVar[int] = 100

_CHECK_QUERY: ClassVar[str] = "SELECT 1"
_last_connection_and_options: Optional[threading.local] = PrivateAttr(default=None)

@slot
@classmethod
def get_packages(
Expand Down
39 changes: 35 additions & 4 deletions onetl/connection/db_connection/jdbc_connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

import logging
import secrets
import threading
import warnings
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, ClassVar, Optional

try:
from pydantic.v1 import PrivateAttr, SecretStr
except (ImportError, AttributeError):
from pydantic import PrivateAttr, SecretStr # type: ignore[no-redef, assignment]

from onetl._util.sql import clear_statement
from onetl.connection.db_connection.db_connection import DBConnection
Expand All @@ -19,7 +25,13 @@
JDBCWriteOptions,
)
from onetl.connection.db_connection.jdbc_mixin import JDBCMixin
from onetl.connection.db_connection.jdbc_mixin.options import JDBCFetchOptions
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCExecuteOptions,
JDBCFetchOptions,
)
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCOptions as JDBCMixinOptions,
)
from onetl.hooks import slot, support_hooks
from onetl.hwm import Window
from onetl.log import log_lines, log_with_indent
Expand All @@ -45,6 +57,16 @@

@support_hooks
class JDBCConnection(JDBCMixin, DBConnection):
user: str
password: SecretStr

DRIVER: ClassVar[str]
_CHECK_QUERY: ClassVar[str] = "SELECT 1"
_last_connection_and_options: Optional[threading.local] = PrivateAttr(default=None)

JDBCOptions = JDBCMixinOptions
FetchOptions = JDBCFetchOptions
ExecuteOptions = JDBCExecuteOptions
Dialect = JDBCDialect
ReadOptions = JDBCReadOptions
SQLOptions = JDBCSQLOptions
Expand Down Expand Up @@ -109,11 +131,16 @@ def read_source_as_df(
limit: int | None = None,
options: JDBCReadOptions | None = None,
) -> DataFrame:
if isinstance(options, JDBCLegacyOptions):
raw_options = self.ReadOptions.parse(options.dict(exclude_unset=True))
else:
raw_options = self.ReadOptions.parse(options)

read_options = self._set_lower_upper_bound(
table=source,
where=where,
hint=hint,
options=self.ReadOptions.parse(options),
options=raw_options,
)

new_columns = columns or ["*"]
Expand Down Expand Up @@ -170,7 +197,11 @@ def write_df_to_target(
target: str,
options: JDBCWriteOptions | None = None,
) -> None:
write_options = self.WriteOptions.parse(options)
if isinstance(options, JDBCLegacyOptions):
write_options = self.WriteOptions.parse(options.dict(exclude_unset=True))
else:
write_options = self.WriteOptions.parse(options)

jdbc_properties = self._get_jdbc_properties(write_options, exclude={"if_exists"}, exclude_none=True)

mode = (
Expand Down
14 changes: 13 additions & 1 deletion onetl/connection/db_connection/jdbc_connection/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,19 @@ def _check_partition_fields(cls, values):
"Deprecated in 0.5.0 and will be removed in 1.0.0. Use 'ReadOptions' or 'WriteOptions' instead",
category=UserWarning,
)
class JDBCLegacyOptions(JDBCReadOptions, JDBCWriteOptions):
class JDBCLegacyOptions(GenericOptions):
class Config:
prohibited_options = GENERIC_PROHIBITED_OPTIONS
known_options = READ_OPTIONS | WRITE_OPTIONS | READ_WRITE_OPTIONS
extra = "allow"

partition_column: Optional[str] = Field(default=None, alias="partitionColumn")
num_partitions: PositiveInt = Field(default=1, alias="numPartitions")
lower_bound: Optional[int] = Field(default=None, alias="lowerBound")
upper_bound: Optional[int] = Field(default=None, alias="upperBound")
session_init_statement: Optional[str] = Field(default=None, alias="sessionInitStatement")
query_timeout: Optional[int] = Field(default=None, alias="queryTimeout")
if_exists: JDBCTableExistBehavior = Field(default=JDBCTableExistBehavior.APPEND, alias="mode")
isolation_level: str = Field(default="READ_UNCOMMITTED", alias="isolationLevel")
fetchsize: int = 100_000
batchsize: int = 20_000
3 changes: 1 addition & 2 deletions onetl/connection/db_connection/jdbc_mixin/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
)
from onetl.exception import MISSING_JVM_CLASS_MSG
from onetl.hooks import slot, support_hooks
from onetl.impl import FrozenModel
from onetl.log import log_lines

if TYPE_CHECKING:
Expand Down Expand Up @@ -57,7 +56,7 @@ class JDBCStatementType(Enum):


@support_hooks
class JDBCMixin(FrozenModel):
class JDBCMixin:
"""
Compatibility layer between Python and Java SQL Module.
Expand Down
34 changes: 20 additions & 14 deletions onetl/strategy/incremental_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,11 @@

from etl_entities.hwm import HWM

from onetl.impl import BaseModel
from onetl.strategy.batch_hwm_strategy import BatchHWMStrategy
from onetl.strategy.hwm_strategy import HWMStrategy


class OffsetMixin(BaseModel):
hwm: Optional[HWM] = None
offset: Any = None

def fetch_hwm(self) -> None:
super().fetch_hwm()

if self.hwm and self.hwm.value is not None and self.offset is not None:
self.hwm -= self.offset


class IncrementalStrategy(OffsetMixin, HWMStrategy):
class IncrementalStrategy(HWMStrategy):
"""Incremental strategy for :ref:`db-reader`/:ref:`file-downloader`.
Used for fetching only new rows/files from a source
Expand Down Expand Up @@ -353,8 +341,17 @@ class IncrementalStrategy(OffsetMixin, HWMStrategy):
# current run will download only files which were not downloaded in previous runs
"""

hwm: Optional[HWM] = None
offset: Any = None

def fetch_hwm(self) -> None:
super().fetch_hwm()

if self.hwm and self.hwm.value is not None and self.offset is not None:
self.hwm -= self.offset

class IncrementalBatchStrategy(OffsetMixin, BatchHWMStrategy):

class IncrementalBatchStrategy(BatchHWMStrategy):
"""Incremental batch strategy for :ref:`db-reader`.
.. note::
Expand Down Expand Up @@ -669,6 +666,15 @@ class IncrementalBatchStrategy(OffsetMixin, BatchHWMStrategy):
"""

hwm: Optional[HWM] = None
offset: Any = None

def fetch_hwm(self) -> None:
super().fetch_hwm()

if self.hwm and self.hwm.value is not None and self.offset is not None:
self.hwm -= self.offset

def __next__(self):
self.save_hwm()
return super().__next__()
Expand Down

0 comments on commit 2d50707

Please sign in to comment.