Skip to content

Commit

Permalink
Overwrite some inherited methods on PostgresTable
Browse files Browse the repository at this point in the history
Some postgres columns (e.g. if they have a capital letter) need to be
surrounded by double quotes
  • Loading branch information
austinweisgrau committed Sep 18, 2024
1 parent 64bc60b commit 493d3ba
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions parsons/databases/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from parsons.databases.alchemy import Alchemy
from parsons.databases.database_connector import DatabaseConnector
from parsons.etl.table import Table
from typing import Optional
import logging
import os

Expand Down Expand Up @@ -87,7 +88,7 @@ def copy(
self.query_with_connection(sql, connection, commit=False)
logger.info(f"{table_name} created.")

sql = f"COPY {table_name} ({','.join(tbl.columns)}) FROM STDIN CSV HEADER;"
sql = f"""COPY {table_name} ("{'","'.join(tbl.columns)}") FROM STDIN CSV HEADER;"""

with self.cursor(connection) as cursor:
cursor.copy_expert(sql, open(tbl.to_csv(), "r"))
Expand All @@ -102,4 +103,35 @@ def table(self, table_name):
class PostgresTable(BaseTable):
# Postgres table object.

pass
def max_value(self, column: str):
"""Get the max value of this column from the table."""
return self.db.query(
f"""
SELECT "{column}"
FROM {self.table}
ORDER BY "{column}" DESC
LIMIT 1
"""
).first

def get_updated_rows(
self,
updated_at_column: str,
cutoff_value,
offset: int = 0,
chunk_size: Optional[int] = None,
) -> Table:
"""Get rows that have a greater updated_at_column value than the one provided."""
sql = f"""
SELECT *
FROM {self.table}
WHERE "{updated_at_column}" > %s
"""
if chunk_size:
sql += f" LIMIT {chunk_size}"

sql += f" OFFSET {offset}"

result = self.db.query(sql, parameters=[cutoff_value])

return result

0 comments on commit 493d3ba

Please sign in to comment.