Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add json as allowable file type to copy_s3 #844

Merged
merged 32 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2fca668
Add json as allowable file type to copy_s3
KasiaHinkson Jun 21, 2023
298becf
Add json to copy statement
KasiaHinkson Jun 22, 2023
efdb033
debug statement
KasiaHinkson Jun 22, 2023
3628c1f
Trying to see what the table looks like
KasiaHinkson Jun 22, 2023
698b622
debug
KasiaHinkson Jun 22, 2023
cb9bbc8
More debugging
KasiaHinkson Jun 22, 2023
75703dd
debug
KasiaHinkson Jun 22, 2023
6248b21
Line delimited
KasiaHinkson Jun 22, 2023
e36b30f
new line
KasiaHinkson Jun 22, 2023
9b8709a
missing quotes
KasiaHinkson Jun 22, 2023
0e69426
debug
KasiaHinkson Jun 22, 2023
e86090f
Remove debug statements
KasiaHinkson Jun 22, 2023
52b5fff
hiding creds for now
KasiaHinkson Jun 22, 2023
ee98c8c
Print redacted query
KasiaHinkson Jun 22, 2023
c63bb3e
debug statement
KasiaHinkson Jun 22, 2023
736dc50
debug
KasiaHinkson Jun 22, 2023
a94f1b4
debug
KasiaHinkson Jun 22, 2023
519b6bc
redacted isn't working
KasiaHinkson Jun 22, 2023
f00b407
debug
KasiaHinkson Jun 22, 2023
bfb4129
add delimiter to json
KasiaHinkson Jun 22, 2023
f1ec317
new line
KasiaHinkson Jun 22, 2023
e9f63b4
single quotes
KasiaHinkson Jun 22, 2023
977487f
remove debug
KasiaHinkson Jun 22, 2023
2141c4b
My changes
KasiaHinkson Jul 25, 2023
086fa61
logs
KasiaHinkson Jul 25, 2023
9870f32
log
KasiaHinkson Jul 25, 2023
e7d0461
more logs
KasiaHinkson Jul 25, 2023
3098d25
more logs
KasiaHinkson Jul 25, 2023
736be1e
more logs
KasiaHinkson Jul 25, 2023
a09e392
Removing logs
KasiaHinkson Jul 25, 2023
11d90e7
Removing more logs
KasiaHinkson Jul 25, 2023
bef4913
Merge branch 'main' into kasiah-json-copys3
KasiaHinkson Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7
FROM --platform=linux/amd64 python:3.7

####################
## Selenium setup ##
Expand Down
4 changes: 4 additions & 0 deletions parsons/databases/redshift/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ def copy_s3(
bucket_region=None,
strict_length=True,
template_table=None,
line_delimited=False,
):
"""
Copy a file from s3 to Redshift.
Expand Down Expand Up @@ -411,6 +412,8 @@ def copy_s3(
local_path = s3.get_file(bucket, key)
if data_type == "csv":
tbl = Table.from_csv(local_path, delimiter=csv_delimiter)
elif data_type == "json":
tbl = Table.from_json(local_path, line_delimited=line_delimited)
else:
raise TypeError("Invalid data type provided")

Expand All @@ -430,6 +433,7 @@ def copy_s3(
logger.info(f"{table_name} created.")

# Copy the table
logger.info(f"Data type is {data_type}")
copy_sql = self.copy_statement(
table_name,
bucket,
Expand Down
12 changes: 4 additions & 8 deletions parsons/databases/redshift/rs_copy_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@


class RedshiftCopyTable(object):

aws_access_key_id = None
aws_secret_access_key = None
iam_role = None
Expand Down Expand Up @@ -42,8 +41,9 @@ def copy_statement(
aws_secret_access_key=None,
compression=None,
bucket_region=None,
json_option="auto",
):

logger.info(f"Data type is {data_type}")
# Source / Destination
source = f"s3://{bucket}/{key}"

Expand Down Expand Up @@ -101,6 +101,8 @@ def copy_statement(
# Data Type
if data_type == "csv":
sql += f"csv delimiter '{csv_delimiter}' \n"
elif data_type == "json":
sql += f"json '{json_option}' \n"
else:
raise TypeError("Invalid data type specified.")

Expand All @@ -112,7 +114,6 @@ def copy_statement(
return sql

def get_creds(self, aws_access_key_id, aws_secret_access_key):

if aws_access_key_id and aws_secret_access_key:
# When we have credentials, then we don't need to set them again
pass
Expand All @@ -122,19 +123,16 @@ def get_creds(self, aws_access_key_id, aws_secret_access_key):
return f"credentials 'aws_iam_role={self.iam_role}'\n"

elif self.aws_access_key_id and self.aws_secret_access_key:

aws_access_key_id = self.aws_access_key_id
aws_secret_access_key = self.aws_secret_access_key

elif (
"AWS_ACCESS_KEY_ID" in os.environ and "AWS_SECRET_ACCESS_KEY" in os.environ
):

aws_access_key_id = os.environ["AWS_ACCESS_KEY_ID"]
aws_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]

else:

s3 = S3(use_env_token=self.use_env_token)
creds = s3.aws.session.get_credentials()
aws_access_key_id = creds.access_key
Expand All @@ -151,7 +149,6 @@ def temp_s3_copy(
aws_secret_access_key=None,
csv_encoding="utf-8",
):

if not self.s3_temp_bucket:
raise KeyError(
(
Expand Down Expand Up @@ -184,6 +181,5 @@ def temp_s3_copy(
return key

def temp_s3_delete(self, key):

if key:
self.s3.remove_file(self.s3_temp_bucket, key)
4 changes: 0 additions & 4 deletions parsons/databases/redshift/rs_create_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def create_statement(
columntypes=None,
strict_length=True,
):

# Warn the user if they don't provide a DIST key or a SORT key
self._log_key_warning(distkey=distkey, sortkey=sortkey, method="copy")

Expand Down Expand Up @@ -144,7 +143,6 @@ def vc_max(self, mapping, columns):
# Set the varchar width of a column to the maximum

for c in columns:

try:
idx = mapping["headers"].index(c)
mapping["longest"][idx] = self.VARCHAR_MAX
Expand All @@ -156,13 +154,11 @@ def vc_max(self, mapping, columns):
return mapping["longest"]

def vc_trunc(self, mapping):

return [
self.VARCHAR_MAX if c > self.VARCHAR_MAX else c for c in mapping["longest"]
]

def vc_validate(self, mapping):

return [1 if c == 0 else c for c in mapping["longest"]]

def create_sql(self, table_name, mapping, distkey=None, sortkey=None):
Expand Down