Skip to content

Commit

Permalink
fix: concurrent connection write-write conflict
Browse files Browse the repository at this point in the history
fixes the following when running in dbt:
```
10:25:54    Database Error in seed statementline_current (seeds/fixtures/statementline_current.csv)
  TransactionContext Error: Catalog write-write conflict on alter with "Table\0main\0_fs_users_ext\0Schema\0main\0main"
```
  • Loading branch information
tekumara committed Jul 10, 2024
1 parent 057adab commit 96ba682
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
2 changes: 2 additions & 0 deletions fakesnow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import snowflake.connector.pandas_tools

import fakesnow.fakes as fakes
from fakesnow.global_database import create_global_database


@contextmanager
Expand Down Expand Up @@ -52,6 +53,7 @@ def patch(
assert not isinstance(snowflake.connector.connect, mock.MagicMock), "Snowflake connector is already patched"

duck_conn = duckdb.connect(database=":memory:")
create_global_database(duck_conn)

fake_fns = {
# every time we connect, create a new cursor (ie: connection) so we can isolate each connection's
Expand Down
3 changes: 0 additions & 3 deletions fakesnow/fakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import fakesnow.info_schema as info_schema
import fakesnow.macros as macros
import fakesnow.transforms as transforms
from fakesnow.global_database import create_global_database
from fakesnow.variables import Variables

SCHEMA_UNSET = "schema_unset"
Expand Down Expand Up @@ -533,8 +532,6 @@ def __init__(
self._paramstyle = snowflake.connector.paramstyle
self.variables = Variables()

create_global_database(duck_conn)

# create database if needed
if (
create_database
Expand Down
20 changes: 16 additions & 4 deletions tests/test_connect.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

from __future__ import annotations

import concurrent.futures

# ruff: noqa: E501
# pyright: reportOptionalMemberAccess=false
import tempfile
Expand Down Expand Up @@ -43,11 +44,25 @@ def test_connect_different_sessions_use_database(_fakesnow_no_auto_create: None)
assert cur.fetchall() == [(1, "Jenny", "P"), (2, "Jasper", "M")]


def test_connect_concurrently(_fakesnow: None) -> None:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future_a = executor.submit(snowflake.connector.connect)
future_b = executor.submit(snowflake.connector.connect)

futures = [future_a, future_b]

for future in concurrent.futures.as_completed(futures):
# exceptions if any will be raised here. we want to avoid
# duckdb.duckdb.TransactionException: TransactionContext Error: Catalog write-write conflict
_ = future.result()


def test_connect_db_path_can_create_database() -> None:
with tempfile.TemporaryDirectory(prefix="fakesnow-test") as db_path, fakesnow.patch(db_path=db_path):
cursor = snowflake.connector.connect().cursor()
cursor.execute("CREATE DATABASE db2")


def test_connect_db_path_reuse():
with tempfile.TemporaryDirectory(prefix="fakesnow-test") as db_path:
with (
Expand All @@ -68,8 +83,6 @@ def test_connect_db_path_reuse():
assert cur.execute("select * from example").fetchall() == [(420,)]




def test_connect_without_database(_fakesnow_no_auto_create: None):
with snowflake.connector.connect() as conn, conn.cursor() as cur:
with pytest.raises(snowflake.connector.errors.ProgrammingError) as excinfo:
Expand Down Expand Up @@ -154,7 +167,6 @@ def test_connect_without_schema(_fakesnow: None):
assert conn.schema == "SCHEMA1"



def test_connect_with_non_existent_db_or_schema(_fakesnow_no_auto_create: None):
# can connect with db that doesn't exist
with snowflake.connector.connect(database="marts") as conn, conn.cursor() as cur:
Expand Down

0 comments on commit 96ba682

Please sign in to comment.