Skip to content

Commit

Permalink
fix: Close source and target connections after executing a validation (
Browse files Browse the repository at this point in the history
  • Loading branch information
nj1973 authored Jul 23, 2024
1 parent 5506680 commit 3dc9fa7
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 37 deletions.
72 changes: 36 additions & 36 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,49 +531,49 @@ def run_validation(config_manager, dry_run=False, verbose=False):
dry_run (bool): Print source and target SQL to stdout in lieu of validation.
verbose (bool): Validation setting to log queries run.
"""
validator = DataValidation(
with DataValidation(
config_manager.config,
validation_builder=None,
result_handler=None,
verbose=verbose,
)

if dry_run:
sql_alchemy_clients = [
"mysql",
"oracle",
"postgres",
"db2",
"mssql",
"redshift",
"snowflake",
]

source_query = validator.validation_builder.get_source_query().compile()
if config_manager.source_client.name in sql_alchemy_clients:
source_query = source_query.compile(
config_manager.source_client.con.engine,
compile_kwargs={"literal_binds": True},
)
) as validator:

if dry_run:
sql_alchemy_clients = [
"mysql",
"oracle",
"postgres",
"db2",
"mssql",
"redshift",
"snowflake",
]

source_query = validator.validation_builder.get_source_query().compile()
if config_manager.source_client.name in sql_alchemy_clients:
source_query = source_query.compile(
config_manager.source_client.con.engine,
compile_kwargs={"literal_binds": True},
)

target_query = validator.validation_builder.get_target_query().compile()
if config_manager.target_client.name in sql_alchemy_clients:
target_query = target_query.compile(
config_manager.target_client.con.engine,
compile_kwargs={"literal_binds": True},
)
target_query = validator.validation_builder.get_target_query().compile()
if config_manager.target_client.name in sql_alchemy_clients:
target_query = target_query.compile(
config_manager.target_client.con.engine,
compile_kwargs={"literal_binds": True},
)

print(
json.dumps(
{
"source_query": str(source_query),
"target_query": str(target_query),
},
indent=4,
print(
json.dumps(
{
"source_query": str(source_query),
"target_query": str(target_query),
},
indent=4,
)
)
)
else:
validator.execute()
else:
validator.execute()


def run_validations(args, config_managers):
Expand Down
12 changes: 12 additions & 0 deletions data_validation/data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ def __init__(
# Initialize the default Result Handler if None was supplied
self.result_handler = result_handler or self.config_manager.get_result_handler()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
if hasattr(self, "config_manager"):
try:
self.config_manager.source_client.con.dispose()
self.config_manager.target_client.con.dispose()
except Exception as exc:
# No need to reraise, we can silently fail if exiting throws up an issue.
logging.warning("Exception closing connections: %s", str(exc))

# TODO(dhercher) we planned on shifting this to use an Execution Handler.
# Leaving to to swast on the design of how this should look.
def execute(self):
Expand Down
9 changes: 8 additions & 1 deletion third_party/ibis/ibis_oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,15 @@ def do_connect(

self.database_name = sa_url.database
engine = sa.create_engine(
sa_url, poolclass=sa.pool.StaticPool, arraysize=self.arraysize
sa_url,
poolclass=sa.pool.StaticPool,
arraysize=self.arraysize,
)
try:
# Identify the session in Oracle as DVT, no-op if this fails.
engine.raw_connection().connection.module = "DVT"
except Exception:
pass

@sa.event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
Expand Down

0 comments on commit 3dc9fa7

Please sign in to comment.