Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

depedency cascade #833

Merged
merged 3 commits into from
Jun 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions parsons/databases/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ def query_with_connection(self, sql, connection, parameters=None, commit=True):
# rows in the correct order

with self.cursor(connection) as cursor:

if "credentials" not in sql:
logger.debug(f"SQL Query: {sql}")
cursor.execute(sql, parameters)
Expand All @@ -235,7 +234,6 @@ def query_with_connection(self, sql, connection, parameters=None, commit=True):
return None

else:

# Fetch the data in batches, and "pickle" the rows to a temp file.
# (We pickle rather than writing to, say, a CSV, so that we maintain
# all the type information for each field.)
Expand Down Expand Up @@ -397,7 +395,6 @@ def copy_s3(
"""

with self.connection() as connection:

if self._create_table_precheck(connection, table_name, if_exists):
if template_table:
sql = f"CREATE TABLE {table_name} (LIKE {template_table})"
Expand Down Expand Up @@ -620,7 +617,6 @@ def copy(
cols = None

with self.connection() as connection:

# Check to see if the table exists. If it does not or if_exists = drop, then
# create the new table.
if self._create_table_precheck(connection, table_name, if_exists):
Expand Down Expand Up @@ -859,7 +855,6 @@ def generate_manifest(
# Generate manifest file
manifest = {"entries": []}
for bucket in buckets:

# Retrieve list of files in bucket
key_list = s3.list_keys(bucket, prefix=prefix)
for key in key_list:
Expand Down Expand Up @@ -982,7 +977,6 @@ def upsert(
raise ValueError("Primary key column contains duplicate values.")

with self.connection() as connection:

try:
# Copy to a staging table
logger.info(f"Building staging table: {staging_tbl}")
Expand Down Expand Up @@ -1087,7 +1081,9 @@ def drop_dependencies_for_cols(self, schema, table, cols):
tbl = self.query_with_connection(sql_depend, connection)
dropped_views = [row["table_name"] for row in tbl]
if dropped_views:
sql_drop = "\n".join([f"drop view {view};" for view in dropped_views])
sql_drop = "\n".join(
[f"drop view {view} CASCADE;" for view in dropped_views]
)
tbl = self.query_with_connection(sql_drop, connection)
logger.info(f"Dropped the following views: {dropped_views}")

Expand Down