Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-15764] Remove fetchsize from JDBCWriteOptions #288

Merged
merged 1 commit into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/data/db/tracked.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.github/workflows/data/db/**
onetl/db_connection/db_connection.py
onetl/db_connection/jdbc*.py
onetl/db_connection/db_connection/*
onetl/db_connection/dialect_mixins/*
onetl/db_connection/jdbc_connection/*
onetl/db_connection/jdbc_mixin/*
onetl/db/**
1 change: 1 addition & 0 deletions docs/changelog/next_release/288.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove ``fetchsize`` from ``JDBC.WriteOptions``.
16 changes: 9 additions & 7 deletions onetl/connection/db_connection/greenplum/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from etl_entities.instance import Host

from onetl.connection.db_connection.jdbc_connection.options import JDBCReadOptions

try:
from pydantic.v1 import validator
except (ImportError, AttributeError):
Expand Down Expand Up @@ -346,17 +348,17 @@ def get_df_schema(
self,
source: str,
columns: list[str] | None = None,
options: JDBCOptions | None = None,
options: JDBCReadOptions | None = None,
) -> StructType:
log.info("|%s| Fetching schema of table %r ...", self.__class__.__name__, source)

query = self.dialect.get_sql_query(source, columns=columns, limit=0, compact=True)
jdbc_options = self.JDBCOptions.parse(options).copy(update={"fetchsize": 0})
jdbc_options = self.ReadOptions.parse(options).copy(update={"fetchsize": 0})

log.debug("|%s| Executing SQL query (on driver):", self.__class__.__name__)
log_lines(log, query, level=logging.DEBUG)

df = self._query_on_driver(query, jdbc_options)
df = self._query_on_driver(query, self.FetchOptions.parse(jdbc_options.dict()))
log.info("|%s| Schema fetched.", self.__class__.__name__)

return df.schema
Expand All @@ -368,10 +370,10 @@ def get_min_max_values(
window: Window,
hint: Any | None = None,
where: Any | None = None,
options: JDBCOptions | None = None,
options: JDBCReadOptions | None = None,
) -> tuple[Any, Any]:
log.info("|%s| Getting min and max values for %r ...", self.__class__.__name__, window.expression)
jdbc_options = self.JDBCOptions.parse(options).copy(update={"fetchsize": 1})
jdbc_options = self.ReadOptions.parse(options).copy(update={"fetchsize": 1})

query = self.dialect.get_sql_query(
table=source,
Expand All @@ -391,7 +393,7 @@ def get_min_max_values(
log.info("|%s| Executing SQL query (on driver):", self.__class__.__name__)
log_lines(log, query)

df = self._query_on_driver(query, jdbc_options)
df = self._query_on_driver(query, self.FetchOptions.parse(jdbc_options.dict()))
row = df.collect()[0]
min_value = row["min"]
max_value = row["max"]
Expand Down Expand Up @@ -437,7 +439,7 @@ def _connector_params(
**extra,
}

def _options_to_connection_properties(self, options: JDBCOptions | JDBCExecuteOptions | JDBCFetchOptions):
def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExecuteOptions):
# See https://github.com/pgjdbc/pgjdbc/pull/1252
# Since 42.2.9 Postgres JDBC Driver added new option readOnlyMode=transaction
# Which is not a desired behavior, because `.fetch()` method should always be read-only
Expand Down
12 changes: 7 additions & 5 deletions onetl/connection/db_connection/jdbc_connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
JDBCWriteOptions,
)
from onetl.connection.db_connection.jdbc_mixin import JDBCMixin
from onetl.connection.db_connection.jdbc_mixin.options import JDBCOptions
from onetl.connection.db_connection.jdbc_mixin.options import JDBCFetchOptions
from onetl.hooks import slot, support_hooks
from onetl.hwm import Window
from onetl.log import log_lines, log_with_indent
Expand Down Expand Up @@ -265,10 +265,12 @@ def _exclude_partition_options(
self,
options: JDBCReadOptions,
fetchsize: int,
) -> JDBCOptions:
return options.copy(
update={"fetchsize": fetchsize},
exclude={"partition_column", "lower_bound", "upper_bound", "num_partitions", "partitioning_mode"},
) -> JDBCFetchOptions:
return self.FetchOptions.parse(
options.copy(
update={"fetchsize": fetchsize},
exclude={"partition_column", "lower_bound", "upper_bound", "num_partitions", "partitioning_mode"},
).dict(),
)

def _set_lower_upper_bound(
Expand Down
54 changes: 44 additions & 10 deletions onetl/connection/db_connection/jdbc_connection/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from enum import Enum
from typing import Optional

from onetl.connection.db_connection.jdbc_mixin.options import JDBCFetchOptions

try:
from pydantic.v1 import Field, PositiveInt, root_validator
except (ImportError, AttributeError):
Expand All @@ -14,11 +16,15 @@
from typing_extensions import deprecated

from onetl._internal import to_camel
from onetl.connection.db_connection.jdbc_mixin.options import JDBCOptions
from onetl.impl import GenericOptions

# options from spark.read.jdbc which are populated by JDBCConnection methods
GENERIC_PROHIBITED_OPTIONS = frozenset(
(
"user",
"password",
"driver",
"url",
"table",
"dbtable",
"query",
Expand Down Expand Up @@ -104,7 +110,7 @@ def __str__(self):
return str(self.value)


class JDBCReadOptions(JDBCOptions):
class JDBCReadOptions(JDBCFetchOptions):
"""Spark JDBC reading options.

.. note ::
Expand Down Expand Up @@ -136,7 +142,8 @@ class JDBCReadOptions(JDBCOptions):

class Config:
known_options = READ_OPTIONS | READ_WRITE_OPTIONS
prohibited_options = JDBCOptions.Config.prohibited_options | GENERIC_PROHIBITED_OPTIONS | WRITE_OPTIONS
prohibited_options = GENERIC_PROHIBITED_OPTIONS | WRITE_OPTIONS
extra = "allow"
alias_generator = to_camel

# Options in DataFrameWriter.jdbc() method
Expand Down Expand Up @@ -185,6 +192,14 @@ class Config:
"""
'''

query_timeout: Optional[int] = Field(default=None, alias="queryTimeout")
"""The number of seconds the driver will wait for a statement to execute.
Zero means there is no limit.

This option depends on driver implementation,
some drivers can check the timeout of each query instead of an entire JDBC batch.
"""

fetchsize: int = 100_000
"""Fetch N rows from an opened cursor per one read round.

Expand Down Expand Up @@ -380,7 +395,7 @@ def _partitioning_mode_actions(cls, values):
return values


class JDBCWriteOptions(JDBCOptions):
class JDBCWriteOptions(GenericOptions):
"""Spark JDBC writing options.

.. note ::
Expand All @@ -406,7 +421,8 @@ class JDBCWriteOptions(JDBCOptions):

class Config:
known_options = WRITE_OPTIONS | READ_WRITE_OPTIONS
prohibited_options = JDBCOptions.Config.prohibited_options | GENERIC_PROHIBITED_OPTIONS | READ_OPTIONS
prohibited_options = GENERIC_PROHIBITED_OPTIONS | READ_OPTIONS
extra = "allow"
alias_generator = to_camel

if_exists: JDBCTableExistBehavior = Field(default=JDBCTableExistBehavior.APPEND, alias="mode")
Expand Down Expand Up @@ -481,6 +497,14 @@ class Config:
Renamed ``mode`` → ``if_exists``
"""

query_timeout: Optional[int] = Field(default=None, alias="queryTimeout")
"""The number of seconds the driver will wait for a statement to execute.
Zero means there is no limit.

This option depends on driver implementation,
some drivers can check the timeout of each query instead of an entire JDBC batch.
"""

batchsize: int = 20_000
"""How many rows can be inserted per round trip.

Expand Down Expand Up @@ -531,7 +555,7 @@ def _mode_is_deprecated(cls, values):
return values


class JDBCSQLOptions(JDBCOptions):
class JDBCSQLOptions(GenericOptions):
"""Options specifically for SQL queries

These options allow you to specify configurations for executing SQL queries
Expand Down Expand Up @@ -580,10 +604,10 @@ class JDBCSQLOptions(JDBCOptions):
"""Number of jobs created by Spark to read the table content in parallel.""" # noqa: WPS322

lower_bound: Optional[int] = None
"""Defines the starting boundary for partitioning the query's data. Mandatory if :obj:`~partition_column~ is set""" # noqa: WPS322
"""Defines the starting boundary for partitioning the query's data. Mandatory if :obj:`~partition_column` is set""" # noqa: WPS322

upper_bound: Optional[int] = None
"""Sets the ending boundary for data partitioning. Mandatory if :obj:`~partition_column~ is set""" # noqa: WPS322
"""Sets the ending boundary for data partitioning. Mandatory if :obj:`~partition_column` is set""" # noqa: WPS322

session_init_statement: Optional[str] = None
'''After each database session is opened to the remote DB and before starting to read data,
Expand All @@ -603,6 +627,14 @@ class JDBCSQLOptions(JDBCOptions):
"""
'''

query_timeout: Optional[int] = Field(default=None, alias="queryTimeout")
"""The number of seconds the driver will wait for a statement to execute.
Zero means there is no limit.

This option depends on driver implementation,
some drivers can check the timeout of each query instead of an entire JDBC batch.
"""

fetchsize: int = 100_000
"""Fetch N rows from an opened cursor per one read round.

Expand All @@ -624,7 +656,8 @@ class JDBCSQLOptions(JDBCOptions):

class Config:
known_options = READ_OPTIONS - {"partitioning_mode"}
prohibited_options = JDBCOptions.Config.prohibited_options | {"partitioning_mode"}
prohibited_options = GENERIC_PROHIBITED_OPTIONS | WRITE_OPTIONS | {"partitioning_mode"}
extra = "allow"
alias_generator = to_camel

@root_validator(pre=True)
Expand All @@ -645,4 +678,5 @@ def _check_partition_fields(cls, values):
)
class JDBCLegacyOptions(JDBCReadOptions, JDBCWriteOptions):
class Config:
prohibited_options = JDBCOptions.Config.prohibited_options
prohibited_options = GENERIC_PROHIBITED_OPTIONS
extra = "allow"
22 changes: 12 additions & 10 deletions onetl/connection/db_connection/jdbc_mixin/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from enum import Enum, auto
from typing import TYPE_CHECKING, Callable, ClassVar, Optional, TypeVar

from onetl.impl.generic_options import GenericOptions

try:
from pydantic.v1 import Field, PrivateAttr, SecretStr, validator
except (ImportError, AttributeError):
Expand Down Expand Up @@ -222,7 +224,7 @@ def fetch(
def execute(
self,
statement: str,
options: JDBCExecuteOptions | JDBCMixinOptions | dict | None = None,
options: JDBCExecuteOptions | dict | None = None,
) -> DataFrame | None:
"""
**Immediately** execute DDL, DML or procedure/function **on Spark driver**. |support_hooks|
Expand Down Expand Up @@ -267,7 +269,7 @@ def execute(
log_lines(log, statement)

call_options = (
self.ExecuteOptions.parse(options.dict())
self.ExecuteOptions.parse(options.dict()) # type: ignore
if isinstance(options, JDBCMixinOptions)
else self.ExecuteOptions.parse(options)
)
Expand Down Expand Up @@ -302,7 +304,7 @@ def _check_java_class_imported(cls, spark):
def _query_on_driver(
self,
query: str,
options: JDBCMixinOptions | JDBCFetchOptions | JDBCExecuteOptions,
options: JDBCFetchOptions | JDBCExecuteOptions,
) -> DataFrame:
return self._execute_on_driver(
statement=query,
Expand All @@ -315,7 +317,7 @@ def _query_on_driver(
def _query_optional_on_driver(
self,
query: str,
options: JDBCMixinOptions | JDBCFetchOptions,
options: JDBCFetchOptions,
) -> DataFrame | None:
return self._execute_on_driver(
statement=query,
Expand All @@ -328,7 +330,7 @@ def _query_optional_on_driver(
def _call_on_driver(
self,
query: str,
options: JDBCMixinOptions | JDBCExecuteOptions,
options: JDBCExecuteOptions,
) -> DataFrame | None:
return self._execute_on_driver(
statement=query,
Expand All @@ -340,7 +342,7 @@ def _call_on_driver(

def _get_jdbc_properties(
self,
options: JDBCFetchOptions | JDBCExecuteOptions | JDBCMixinOptions,
options: GenericOptions,
**kwargs,
) -> dict[str, str]:
"""
Expand All @@ -350,7 +352,7 @@ def _get_jdbc_properties(
result.update(options.dict(by_alias=True, **kwargs))
return stringify(result)

def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExecuteOptions | JDBCMixinOptions):
def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExecuteOptions):
"""
Converts human-readable Options class to ``java.util.Properties``.

Expand All @@ -371,7 +373,7 @@ def _options_to_connection_properties(self, options: JDBCFetchOptions | JDBCExec
)
return jdbc_options.asConnectionProperties()

def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions | JDBCMixinOptions):
def _get_jdbc_connection(self, options: JDBCFetchOptions | JDBCExecuteOptions):
if not self._last_connection_and_options:
# connection class can be used in multiple threads.
# each Python thread creates its own thread in JVM
Expand Down Expand Up @@ -413,7 +415,7 @@ def _execute_on_driver(
statement: str,
statement_type: JDBCStatementType,
callback: Callable[..., T],
options: JDBCFetchOptions | JDBCExecuteOptions | JDBCMixinOptions,
options: JDBCFetchOptions | JDBCExecuteOptions,
read_only: bool,
) -> T:
"""
Expand All @@ -435,7 +437,7 @@ def _execute_statement(
self,
jdbc_statement,
statement: str,
options: JDBCMixinOptions | JDBCFetchOptions | JDBCExecuteOptions,
options: JDBCFetchOptions | JDBCExecuteOptions,
callback: Callable[..., T],
read_only: bool,
) -> T:
Expand Down
3 changes: 1 addition & 2 deletions onetl/connection/db_connection/postgres/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from onetl.connection.db_connection.jdbc_mixin.options import (
JDBCExecuteOptions,
JDBCFetchOptions,
JDBCOptions,
)
from onetl.connection.db_connection.postgres.dialect import PostgresDialect
from onetl.connection.db_connection.postgres.options import (
Expand Down Expand Up @@ -182,7 +181,7 @@ def instance_url(self) -> str:

def _options_to_connection_properties(
self,
options: JDBCOptions | JDBCFetchOptions | JDBCExecuteOptions,
options: JDBCFetchOptions | JDBCExecuteOptions,
): # noqa: WPS437
# See https://github.com/pgjdbc/pgjdbc/pull/1252
# Since 42.2.9 Postgres JDBC Driver added new option readOnlyMode=transaction
Expand Down