Skip to content

Commit

Permalink
Modify PostgreSQL.import_data() in sql.py
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeqfu committed Feb 26, 2021
1 parent f648636 commit e6b0a13
Showing 1 changed file with 30 additions and 30 deletions.
60 changes: 30 additions & 30 deletions pyhelpers/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,7 +1103,7 @@ def psql_insert_copy(sql_table, sql_db_engine, column_name_list, data_iter):
sql_query = 'COPY {} ({}) FROM STDIN WITH CSV'.format(sql_table_name, sql_column_names)
con_cur.copy_expert(sql=sql_query, file=io_buffer)

def import_data(self, data, table_name, schema_name='public', if_exists='replace',
def import_data(self, data, table_name, schema_name='public', if_exists='fail',
force_replace=False, chunk_size=None, col_type=None, method='multi',
index=False, confirmation_required=True, verbose=False, **kwargs):
"""
Expand All @@ -1114,14 +1114,14 @@ def import_data(self, data, table_name, schema_name='public', if_exists='replace
and [`SQL-P-DD-2
<https://www.postgresql.org/docs/current/sql-copy.html/>`_].
:param data: data frame to be dumped into a database
:type data: pandas.DataFrame
:param data: tabular data to be dumped into a database
:type data: pandas.DataFrame or pandas.io.parsers.TextFileReader or list or tuple
:param table_name: name of a table
:type table_name: str
:param schema_name: name of a schema, defaults to ``'public'``
:type schema_name: str
:param if_exists: if the table already exists,
to ``'replace'`` (default), ``'append'`` or ``'fail'``
:param if_exists: if the table already exists, to ``'replace'``, ``'append'``
or, by default, ``'fail'`` and do nothing but raise a ValueError.
:type if_exists: str
:param force_replace: whether to force to replace existing table, defaults to ``False``
:type force_replace: bool
Expand Down Expand Up @@ -1174,34 +1174,34 @@ def import_data(self, data, table_name, schema_name='public', if_exists='replace
self.drop_table(table_name, schema_name,
confirmation_required=confirmation_required, verbose=verbose)

try:
if verbose:
if confirmation_required:
log_msg = "Importing data into {}".format(table_name_)
log_end_msg = " ... "
else:
log_msg = "Importing the data into table {} ... " \
"\n\tat {}".format(table_name_, self.address)
log_end_msg = " ... \n"
print(log_msg, end=log_end_msg)

if isinstance(data, pandas.io.parsers.TextFileReader):
for chunk in data:
chunk.to_sql(table_name, self.engine, schema=schema_name, if_exists=if_exists,
index=index, dtype=col_type, method=method, **kwargs)
del chunk
gc.collect()

else:
data.to_sql(table_name, self.engine, schema=schema_name, if_exists=if_exists,
index=index, chunksize=chunk_size, dtype=col_type, method=method,
**kwargs)
# try:
# if verbose:
# if confirmation_required:
# log_msg = "Importing data into {}".format(table_name_)
# log_end_msg = " ... "
# else:
# log_msg = "Importing the data into table {} ... " \
# "\n\tat {}".format(table_name_, self.address)
# log_end_msg = " ... \n"
# print(log_msg, end=log_end_msg)

if isinstance(data, (pandas.io.parsers.TextFileReader, list, tuple)):
for chunk in data:
chunk.to_sql(table_name, self.engine, schema=schema_name, if_exists=if_exists,
index=index, dtype=col_type, method=method, **kwargs)
del chunk
gc.collect()

print("Done.") if verbose else ""
else:
data.to_sql(table_name, self.engine, schema=schema_name, if_exists=if_exists,
index=index, chunksize=chunk_size, dtype=col_type, method=method,
**kwargs)
gc.collect()

except Exception as e:
print("Failed. {}".format(e))
# print("Done.") if verbose else ""
#
# except Exception as e:
# print("Failed. {}".format(e))

def read_table(self, table_name, schema_name='public', condition=None, chunk_size=None,
sorted_by=None, **kwargs):
Expand Down

0 comments on commit e6b0a13

Please sign in to comment.