Skip to content

Commit a1c3ee3

Browse files
pyalexkevjumba
authored andcommitted
fix: Use timestamp type when converting unixtimestamp feature type to arrow (#2593)
* use timestamp Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * add timezone to type definition Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
1 parent 3ec943a commit a1c3ee3

File tree

3 files changed

+109
-83
lines changed

3 files changed

+109
-83
lines changed

go/types/typeconversion.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ func ProtoTypeToArrowType(sample *types.Value) (arrow.DataType, error) {
4040
case *types.Value_DoubleListVal:
4141
return arrow.ListOf(arrow.PrimitiveTypes.Float64), nil
4242
case *types.Value_UnixTimestampVal:
43-
return arrow.FixedWidthTypes.Time32s, nil
43+
return arrow.FixedWidthTypes.Timestamp_s, nil
4444
case *types.Value_UnixTimestampListVal:
45-
return arrow.ListOf(arrow.FixedWidthTypes.Time32s), nil
45+
return arrow.ListOf(arrow.FixedWidthTypes.Timestamp_s), nil
4646
default:
4747
return nil,
4848
fmt.Errorf("unsupported proto type in proto to arrow conversion: %s", sample.Val)
@@ -80,9 +80,9 @@ func ValueTypeEnumToArrowType(t types.ValueType_Enum) (arrow.DataType, error) {
8080
case types.ValueType_DOUBLE_LIST:
8181
return arrow.ListOf(arrow.PrimitiveTypes.Float64), nil
8282
case types.ValueType_UNIX_TIMESTAMP:
83-
return arrow.FixedWidthTypes.Time32s, nil
83+
return arrow.FixedWidthTypes.Timestamp_s, nil
8484
case types.ValueType_UNIX_TIMESTAMP_LIST:
85-
return arrow.ListOf(arrow.FixedWidthTypes.Time32s), nil
85+
return arrow.ListOf(arrow.FixedWidthTypes.Timestamp_s), nil
8686
default:
8787
return nil,
8888
fmt.Errorf("unsupported value type enum in enum to arrow type conversion: %s", t)
@@ -119,9 +119,9 @@ func copyProtoValuesToArrowArray(builder array.Builder, values []*types.Value) e
119119
for _, v := range values {
120120
fieldBuilder.Append(v.GetDoubleVal())
121121
}
122-
case *array.Time32Builder:
122+
case *array.TimestampBuilder:
123123
for _, v := range values {
124-
fieldBuilder.Append(arrow.Time32(v.GetUnixTimestampVal()))
124+
fieldBuilder.Append(arrow.Timestamp(v.GetUnixTimestampVal()))
125125
}
126126
case *array.ListBuilder:
127127
for _, list := range values {
@@ -157,9 +157,9 @@ func copyProtoValuesToArrowArray(builder array.Builder, values []*types.Value) e
157157
for _, v := range list.GetDoubleListVal().GetVal() {
158158
valueBuilder.Append(v)
159159
}
160-
case *array.Time32Builder:
160+
case *array.TimestampBuilder:
161161
for _, v := range list.GetUnixTimestampListVal().GetVal() {
162-
valueBuilder.Append(arrow.Time32(v))
162+
valueBuilder.Append(arrow.Timestamp(v))
163163
}
164164
}
165165
}
@@ -227,10 +227,10 @@ func ArrowValuesToProtoValues(arr arrow.Array) ([]*types.Value, error) {
227227
}
228228
values = append(values,
229229
&types.Value{Val: &types.Value_BoolListVal{BoolListVal: &types.BoolList{Val: vals}}})
230-
case arrow.FixedWidthTypes.Time32s:
230+
case arrow.FixedWidthTypes.Timestamp_s:
231231
vals := make([]int64, int(offsets[idx])-pos)
232232
for j := pos; j < int(offsets[idx]); j++ {
233-
vals[j-pos] = int64(listValues.(*array.Time32).Value(j))
233+
vals[j-pos] = int64(listValues.(*array.Timestamp).Value(j))
234234
}
235235

236236
values = append(values,
@@ -278,11 +278,11 @@ func ArrowValuesToProtoValues(arr arrow.Array) ([]*types.Value, error) {
278278
values = append(values,
279279
&types.Value{Val: &types.Value_StringVal{StringVal: arr.(*array.String).Value(idx)}})
280280
}
281-
case arrow.FixedWidthTypes.Time32s:
281+
case arrow.FixedWidthTypes.Timestamp_s:
282282
for idx := 0; idx < arr.Len(); idx++ {
283283
values = append(values,
284284
&types.Value{Val: &types.Value_UnixTimestampVal{
285-
UnixTimestampVal: int64(arr.(*array.Time32).Value(idx))}})
285+
UnixTimestampVal: int64(arr.(*array.Timestamp).Value(idx))}})
286286
}
287287
default:
288288
return nil, fmt.Errorf("unsupported arrow to proto conversion for type %s", arr.DataType())

sdk/python/feast/embedded_go/online_features_service.py

+9-71
Original file line numberDiff line numberDiff line change
@@ -14,59 +14,17 @@
1414
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesResponse
1515
from feast.protos.feast.types import Value_pb2
1616
from feast.repo_config import RepoConfig
17+
from feast.types import from_value_type
1718
from feast.value_type import ValueType
1819

1920
from .lib.embedded import DataTable, NewOnlineFeatureService, OnlineFeatureServiceConfig
2021
from .lib.go import Slice_string
22+
from .type_map import FEAST_TYPE_TO_ARROW_TYPE, arrow_array_to_array_of_proto
2123

2224
if TYPE_CHECKING:
2325
from feast.feature_store import FeatureStore
2426

2527

26-
ARROW_TYPE_TO_PROTO_FIELD = {
27-
pa.int32(): "int32_val",
28-
pa.int64(): "int64_val",
29-
pa.float32(): "float_val",
30-
pa.float64(): "double_val",
31-
pa.bool_(): "bool_val",
32-
pa.string(): "string_val",
33-
pa.binary(): "bytes_val",
34-
pa.time32("s"): "unix_timestamp_val",
35-
}
36-
37-
ARROW_LIST_TYPE_TO_PROTO_FIELD = {
38-
pa.int32(): "int32_list_val",
39-
pa.int64(): "int64_list_val",
40-
pa.float32(): "float_list_val",
41-
pa.float64(): "double_list_val",
42-
pa.bool_(): "bool_list_val",
43-
pa.string(): "string_list_val",
44-
pa.binary(): "bytes_list_val",
45-
pa.time32("s"): "unix_timestamp_list_val",
46-
}
47-
48-
ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS = {
49-
pa.int32(): Value_pb2.Int32List,
50-
pa.int64(): Value_pb2.Int64List,
51-
pa.float32(): Value_pb2.FloatList,
52-
pa.float64(): Value_pb2.DoubleList,
53-
pa.bool_(): Value_pb2.BoolList,
54-
pa.string(): Value_pb2.StringList,
55-
pa.binary(): Value_pb2.BytesList,
56-
pa.time32("s"): Value_pb2.Int64List,
57-
}
58-
59-
# used for entity types only
60-
PROTO_TYPE_TO_ARROW_TYPE = {
61-
ValueType.INT32: pa.int32(),
62-
ValueType.INT64: pa.int64(),
63-
ValueType.FLOAT: pa.float32(),
64-
ValueType.DOUBLE: pa.float64(),
65-
ValueType.STRING: pa.string(),
66-
ValueType.BYTES: pa.binary(),
67-
}
68-
69-
7028
class EmbeddedOnlineFeatureServer:
7129
def __init__(
7230
self, repo_path: str, repo_config: RepoConfig, feature_store: "FeatureStore"
@@ -179,8 +137,10 @@ def _to_arrow(value, type_hint: Optional[ValueType]) -> pa.Array:
179137
if isinstance(value, Value_pb2.RepeatedValue):
180138
_proto_to_arrow(value)
181139

182-
if type_hint in PROTO_TYPE_TO_ARROW_TYPE:
183-
return pa.array(value, PROTO_TYPE_TO_ARROW_TYPE[type_hint])
140+
if type_hint:
141+
feast_type = from_value_type(type_hint)
142+
if feast_type in FEAST_TYPE_TO_ARROW_TYPE:
143+
return pa.array(value, FEAST_TYPE_TO_ARROW_TYPE[feast_type])
184144

185145
return pa.array(value)
186146

@@ -263,31 +223,9 @@ def record_batch_to_online_response(record_batch):
263223
[Value_pb2.Value()] * len(record_batch.columns[idx])
264224
)
265225
else:
266-
if isinstance(field.type, pa.ListType):
267-
proto_list_class = ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS[
268-
field.type.value_type
269-
]
270-
proto_field_name = ARROW_LIST_TYPE_TO_PROTO_FIELD[field.type.value_type]
271-
272-
column = record_batch.columns[idx]
273-
if field.type.value_type == pa.time32("s"):
274-
column = column.cast(pa.list_(pa.int32()))
275-
276-
for v in column.tolist():
277-
feature_vector.values.append(
278-
Value_pb2.Value(**{proto_field_name: proto_list_class(val=v)})
279-
)
280-
else:
281-
proto_field_name = ARROW_TYPE_TO_PROTO_FIELD[field.type]
282-
283-
column = record_batch.columns[idx]
284-
if field.type == pa.time32("s"):
285-
column = column.cast(pa.int32())
286-
287-
for v in column.tolist():
288-
feature_vector.values.append(
289-
Value_pb2.Value(**{proto_field_name: v})
290-
)
226+
feature_vector.values.extend(
227+
arrow_array_to_array_of_proto(field.type, record_batch.columns[idx])
228+
)
291229

292230
resp.results.append(feature_vector)
293231
resp.metadata.feature_names.val.append(field.name)
+88
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
from typing import List
2+
3+
import pyarrow as pa
4+
import pytz
5+
6+
from feast.protos.feast.types import Value_pb2
7+
from feast.types import Array, PrimitiveFeastType
8+
9+
PA_TIMESTAMP_TYPE = pa.timestamp("s", tz=pytz.UTC)
10+
11+
ARROW_TYPE_TO_PROTO_FIELD = {
12+
pa.int32(): "int32_val",
13+
pa.int64(): "int64_val",
14+
pa.float32(): "float_val",
15+
pa.float64(): "double_val",
16+
pa.bool_(): "bool_val",
17+
pa.string(): "string_val",
18+
pa.binary(): "bytes_val",
19+
PA_TIMESTAMP_TYPE: "unix_timestamp_val",
20+
}
21+
22+
ARROW_LIST_TYPE_TO_PROTO_FIELD = {
23+
pa.int32(): "int32_list_val",
24+
pa.int64(): "int64_list_val",
25+
pa.float32(): "float_list_val",
26+
pa.float64(): "double_list_val",
27+
pa.bool_(): "bool_list_val",
28+
pa.string(): "string_list_val",
29+
pa.binary(): "bytes_list_val",
30+
PA_TIMESTAMP_TYPE: "unix_timestamp_list_val",
31+
}
32+
33+
ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS = {
34+
pa.int32(): Value_pb2.Int32List,
35+
pa.int64(): Value_pb2.Int64List,
36+
pa.float32(): Value_pb2.FloatList,
37+
pa.float64(): Value_pb2.DoubleList,
38+
pa.bool_(): Value_pb2.BoolList,
39+
pa.string(): Value_pb2.StringList,
40+
pa.binary(): Value_pb2.BytesList,
41+
PA_TIMESTAMP_TYPE: Value_pb2.Int64List,
42+
}
43+
44+
FEAST_TYPE_TO_ARROW_TYPE = {
45+
PrimitiveFeastType.INT32: pa.int32(),
46+
PrimitiveFeastType.INT64: pa.int64(),
47+
PrimitiveFeastType.FLOAT32: pa.float32(),
48+
PrimitiveFeastType.FLOAT64: pa.float64(),
49+
PrimitiveFeastType.STRING: pa.string(),
50+
PrimitiveFeastType.BYTES: pa.binary(),
51+
PrimitiveFeastType.BOOL: pa.bool_(),
52+
PrimitiveFeastType.UNIX_TIMESTAMP: pa.timestamp("s"),
53+
Array(PrimitiveFeastType.INT32): pa.list_(pa.int32()),
54+
Array(PrimitiveFeastType.INT64): pa.list_(pa.int64()),
55+
Array(PrimitiveFeastType.FLOAT32): pa.list_(pa.float32()),
56+
Array(PrimitiveFeastType.FLOAT64): pa.list_(pa.float64()),
57+
Array(PrimitiveFeastType.STRING): pa.list_(pa.string()),
58+
Array(PrimitiveFeastType.BYTES): pa.list_(pa.binary()),
59+
Array(PrimitiveFeastType.BOOL): pa.list_(pa.bool_()),
60+
Array(PrimitiveFeastType.UNIX_TIMESTAMP): pa.list_(pa.timestamp("s")),
61+
}
62+
63+
64+
def arrow_array_to_array_of_proto(
65+
arrow_type: pa.DataType, arrow_array: pa.Array
66+
) -> List[Value_pb2.Value]:
67+
values = []
68+
if isinstance(arrow_type, pa.ListType):
69+
proto_list_class = ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS[arrow_type.value_type]
70+
proto_field_name = ARROW_LIST_TYPE_TO_PROTO_FIELD[arrow_type.value_type]
71+
72+
if arrow_type.value_type == PA_TIMESTAMP_TYPE:
73+
arrow_array = arrow_array.cast(pa.list_(pa.int64()))
74+
75+
for v in arrow_array.tolist():
76+
values.append(
77+
Value_pb2.Value(**{proto_field_name: proto_list_class(val=v)})
78+
)
79+
else:
80+
proto_field_name = ARROW_TYPE_TO_PROTO_FIELD[arrow_type]
81+
82+
if arrow_type == PA_TIMESTAMP_TYPE:
83+
arrow_array = arrow_array.cast(pa.int64())
84+
85+
for v in arrow_array.tolist():
86+
values.append(Value_pb2.Value(**{proto_field_name: v}))
87+
88+
return values

0 commit comments

Comments
 (0)