-
Notifications
You must be signed in to change notification settings - Fork 130
/
deserializers.py
117 lines (94 loc) · 4.04 KB
/
deserializers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""DataFrame deserializers."""
import io
import typing
import pandas as pd
import cudf
from morpheus.common import FileTypes
from morpheus.common import determine_file_type
from morpheus.common import read_file_to_df as read_file_to_df_cpp
from morpheus.config import CppConfig
from morpheus.io.utils import filter_null_data
from morpheus.utils.type_aliases import DataFrameType
def read_file_to_df(file_name: typing.Union[str, io.IOBase],
file_type: FileTypes = FileTypes.Auto,
parser_kwargs: dict = None,
filter_nulls: bool = True,
df_type: typing.Literal["cudf", "pandas"] = "pandas") -> DataFrameType:
"""
Reads a file into a dataframe and performs any of the necessary cleanup.
Parameters
----------
file_name : str
File to read.
file_type : `morpheus.common.FileTypes`
Type of file. Leave as Auto to determine from the extension.
parser_kwargs : dict, optional
Any argument to pass onto the parse, by default {}. Ignored when C++ execution is enabled and `df_type="cudf"`
filter_nulls : bool, optional
Whether to filter null rows after loading, by default True.
df_type : typing.Literal[, optional
What type of parser to use. Options are 'cudf' and 'pandas', by default "pandas".
Returns
-------
DataFrameType
A parsed DataFrame.
"""
# The C++ reader only supports cudf dataframes
if (CppConfig.get_should_use_cpp() and df_type == "cudf"):
df = read_file_to_df_cpp(file_name, file_type)
if (filter_nulls):
df = filter_null_data(df)
return df
if (parser_kwargs is None):
parser_kwargs = {}
mode = file_type
if (mode == FileTypes.Auto):
# The DFPFileToDataFrameStage passes an instance of an fsspec file opener instead of a filename to this method.
# The opener objects are subclasses of io.IOBase, which avoids introducing fsspec to this part of the API
if (isinstance(file_name, io.IOBase)):
if (hasattr(file_name, 'path')): # This attr is not in the base
fp = file_name.path
else:
raise ValueError("Unable to determine file type from instance of io.IOBase,"
" set `file_type` to a value other than Auto")
else:
fp = file_name
mode = determine_file_type(fp)
# Special args for JSON
kwargs = {}
if (mode == FileTypes.JSON):
kwargs["lines"] = True
# Update with any args set by the user. User values overwrite defaults
kwargs.update(parser_kwargs)
df_class = cudf if df_type == "cudf" else pd
df = None
if (mode == FileTypes.JSON):
df = df_class.read_json(file_name, **kwargs)
elif (mode == FileTypes.CSV):
df: pd.DataFrame = df_class.read_csv(file_name, **kwargs)
if (len(df.columns) > 1 and df.columns[0] == "Unnamed: 0" and df.iloc[:, 0].dtype == cudf.dtype(int)):
df.set_index("Unnamed: 0", drop=True, inplace=True)
df.index.name = ""
df.sort_index(inplace=True)
elif (mode == FileTypes.PARQUET):
df = df_class.read_parquet(file_name, **kwargs)
else:
assert False, "Unsupported file type mode: {}".format(mode)
assert df is not None
if (filter_nulls):
df = filter_null_data(df)
return df