Skip to content

Commit

Permalink
fix: serialize querry result to parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-suela authored and lucasfcnunes committed Mar 28, 2023
1 parent 7b7eef2 commit 5a90740
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export default function QueryControlDropdown(props) {
queryResult={props.queryResult}
embed={props.embed}
apiKey={props.apiKey}>
<FileExcelOutlinedIcon /> Download as Parquet File
<FileOutlinedIcon /> Download as Parquet File
</QueryResultsLink>
</Menu.Item>
</Menu>
Expand Down
2 changes: 1 addition & 1 deletion redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def make_excel_response(query_result):
def make_parquet_response(query_result):
headers = {
# https://issues.apache.org/jira/browse/PARQUET-1889
"Content-Type": "application/parquet",
# "Content-Type": "application/parquet"
}
return make_response(
serialize_query_result_to_parquet(query_result), 200, headers
Expand Down
66 changes: 46 additions & 20 deletions redash/serializers/query_result.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import csv
import io
from typing import Optional

import pyarrow
import pyarrow.compute
import pyarrow.parquet
import xlsxwriter
from dateutil.parser import isoparse as parse_date
Expand All @@ -18,6 +20,8 @@
)
from redash.utils import UnicodeWriter, json_loads

logging.getLogger(__name__)


def _convert_format(fmt):
return (
Expand Down Expand Up @@ -139,43 +143,65 @@ def serialize_query_result_to_xlsx(query_result):
def serialize_query_result_to_parquet(query_result):
output = io.BytesIO()
query_data = query_result.data

def redash_datetime_to_pyarrow_timestamp(
table: "pyarrow.Table",
field: "pyarrow.Field",
conversion: Optional[dict] = None,
) -> "pyarrow.Table":
column_index: int = table.schema.get_field_index(field.name)
column_data = pyarrow.compute.strptime(
table.column(column_index),
format=conversion["redash_format"],
unit="s",
)
new_table = table.set_column(column_index, field.name, column_data)
return new_table

conversions = [
{"pandas_type": pyarrow.bool_, "redash_type": TYPE_BOOLEAN},
{"pyarrow_type": pyarrow.bool_(), "redash_type": TYPE_BOOLEAN},
{
# "pyarrow_type": pyarrow.date64,
"pyarrow_type": pyarrow.string,
"pyarrow_type": pyarrow.date32(),
"redash_type": TYPE_DATE,
# "to_redash": lambda x: x.strftime("%Y-%m-%d %H:%M:%S"),
# "to_pyarrow": lambda x: x,
"redash_format": r"%Y-%m-%d",
"redash_to_pyarrow": redash_datetime_to_pyarrow_timestamp,
},
{
# "pyarrow_type": pyarrow.timestamp,
"pyarrow_type": pyarrow.string,
"pyarrow_type": pyarrow.timestamp("s"),
"redash_type": TYPE_DATETIME,
# "to_redash": lambda x: x.strftime("%Y-%m-%d %H:%M:%S"),
# "to_pyarrow": lambda x: x,
"redash_format": r"%Y-%m-%d %H:%M:%S",
"redash_to_pyarrow": redash_datetime_to_pyarrow_timestamp,
},
{"pyarrow_type": pyarrow.float64, "redash_type": TYPE_FLOAT},
{"pyarrow_type": pyarrow.int64, "redash_type": TYPE_INTEGER},
{"pyarrow_type": pyarrow.string, "redash_type": TYPE_STRING},
{"pyarrow_type": pyarrow.float64(), "redash_type": TYPE_FLOAT},
{"pyarrow_type": pyarrow.int64(), "redash_type": TYPE_INTEGER},
{"pyarrow_type": pyarrow.string(), "redash_type": TYPE_STRING},
]

table = pyarrow.Table.from_pylist(query_data["rows"])
fields = []

for column in query_data["columns"]:
for conversion in conversions:
if column["type"] == conversion["redash_type"]:
fields.append(pyarrow.field(column["name"], conversion["pyarrow_type"]))
field = pyarrow.field(
name=column["name"],
type=conversion["pyarrow_type"],
metadata={"friendly_name": column["friendly_name"]},
)
fields.append(field)
converter = conversion.get("redash_to_pyarrow")
if converter:
table = converter(
table=table,
field=field,
conversion=conversion,
)
break

table = pyarrow.Table.from_pylist(query_data["rows"])
print(table)
target_schema = pyarrow.schema(fields)
table = table.cast(target_schema=target_schema)
with pyarrow.parquet.ParquetWriter(
where=output,
schema=pyarrow.schema(
fields,
# metadata={"friendly_name": "id"},
),
schema=target_schema,
) as writer:
writer.write_table(table)

Expand Down
2 changes: 1 addition & 1 deletion requirements_all_ds.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ nzpy>=1.15
nzalchemy
python-arango==6.1.0
pinotdb>=0.4.5
pyarrow==10.0.0
pyarrow==10.0.0

0 comments on commit 5a90740

Please sign in to comment.