Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SQLConnector: managing transactions #2338

Open
raulbonet opened this issue Mar 24, 2024 · 0 comments
Open

feat: SQLConnector: managing transactions #2338

raulbonet opened this issue Mar 24, 2024 · 0 comments
Labels
kind/Feature New feature or request SQL Support for SQL taps and targets valuestream/SDK

Comments

@raulbonet
Copy link
Contributor

Feature scope

Targets (data type handling, batching, SQL object generation, tests, etc.)

Description

In this issue, there was a discussion on refactoring connections.

Background
To sum up, if I understand correctly:

  1. There were a lot of dangling connections because SQLConnector did not encapsulate the connection management itself, but Taps/Streams and other objects were opening connections (and not closing them), which generated problems.

  2. The solutions that was agreed was making SQLConnector encapsulate the connection, so only SQLConnector can use the connection object, and connection handling should be opaque to other objects.

Problem
The problem I am seeing here is that I am not seeing an easy way of managing transactions then. Right now, the SQLConnector, for each method, uses with self._connect() as conn, conn.begin():.

But I think this does not allow us to group several operations in a single transaction. I know not all DBMS allow to include DDL operations in a transaction, but a lot of them do.

Concretly, I am working on the target-postgres here. Right now:

  • SQLConnector is extended and sublcassed into PostgresConnector.
  • A lot of methods are overriden with a different signature, with a lot of copy&paste from the original method, but only changing a couple of lines to include everything in one transaction.

Over time, this makes the overriden implementation diverge (a lot) from the Meltano SDK, which is not ideal.

Having a single transaction can make sense in targets, where I might want to have the ALTER statement of a table and the bulk_insert_records() in a single transaction.

Ideas

  1. View SQLConnector as just a wrapper on SQLAlchemy to encapsulate some simple operations and it should resemble as much as possible SQLAlchemy's implementation. SQLAlchemy expects commits and connections to be managed by the caller, therefore maybe this class is not the best place to manage connections. Thinking of targets now, maybe SQLSink is a better place? (that's where the current target-postgres implementation manages it). Therefore, all DDL methods should have a connector or session argument that you can pass to them. If this argument was None, we could default to the current implementation of with self._connect() as conn, conn.begin():.

  2. Create another intermediate class to encapsulate the DDL but not the execution itself. For example, now we have:

class SQLConnector:
def _create_empty_column(
        self,
        full_table_name: str,
        column_name: str,
        sql_type: sa.types.TypeEngine,
    ) -> None:
        if not self.allow_column_add:
            msg = "Adding columns is not supported."
            raise NotImplementedError(msg)

        column_add_ddl = self.get_column_add_ddl(
            table_name=full_table_name,
            column_name=column_name,
            column_type=sql_type,
        )
        with self._connect() as conn, conn.begin():
            conn.execute(column_add_ddl)

We could have something like:

class SQLConnectorBase:
  def _create_empty_column(
        self,
        full_table_name: str,
        column_name: str,
        sql_type: sa.types.TypeEngine,
    ) -> sa.DDL:
        if not self.allow_column_add:
            msg = "Adding columns is not supported."
            raise NotImplementedError(msg)

        column_add_ddl = self.get_column_add_ddl(
            table_name=full_table_name,
            column_name=column_name,
            column_type=sql_type,
        )
        # COMMENTED OUT
        # with self._connect() as conn, conn.begin():
        #   conn.execute(column_add_ddl)


class SQLConnector(SQLConnectorBase):
  def _create_empty_column(
        self,
        full_table_name: str,
        column_name: str,
        sql_type: sa.types.TypeEngine,
    ) -> None:
        column_ddl = super()._create_empty_column(full_table_name, column_name, sql_type)
        with self._connect() as conn, conn.begin():
           conn.execute(column_add_ddl)
           
           
# Now, in target-postgres, I could create this:
class PostgresConnector(SQLConnectorBase):

  def _create_empty_column(
        self,
        full_table_name: str,
        column_name: str,
        sql_type: sa.types.TypeEngine,
        # New argument
        connection
    ) -> None:
        column_ddl = super()._create_empty_column(full_table_name, column_name, sql_type)
        connection.execute(column_add_ddl)
        # ... commit transaction somewhere else outside `PostgresConnector`

@qbatten , you were the OP of the issue I link, if you have any ideas they would be very welcome!

@raulbonet raulbonet added kind/Feature New feature or request valuestream/SDK labels Mar 24, 2024
@edgarrmondragon edgarrmondragon added the SQL Support for SQL taps and targets label Jul 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/Feature New feature or request SQL Support for SQL taps and targets valuestream/SDK
Projects
None yet
Development

No branches or pull requests

2 participants