Skip to content

Commit

Permalink
[FLINK-28908][python] Fix LIST type in Python DataStream API
Browse files Browse the repository at this point in the history
This closes #20539.
  • Loading branch information
vancior98 authored and dianfu committed Aug 15, 2022
1 parent 95d14ed commit f0545f4
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 7 deletions.
4 changes: 2 additions & 2 deletions flink-python/pyflink/common/typeinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ def to_internal_type(self, obj):
zip(self.get_field_names(), self._field_types, self._need_conversion))
elif isinstance(obj, Row) and hasattr(obj, "_fields"):
return (obj.get_row_kind().value,) + tuple(
f.to_internal_type(obj.get(n)) if c else obj.get(n)
f.to_internal_type(obj[n]) if c else obj[n]
for n, f, c in
zip(self.get_field_names(), self._field_types, self._need_conversion))
elif isinstance(obj, Row):
Expand All @@ -463,7 +463,7 @@ def to_internal_type(self, obj):
return (RowKind.INSERT.value,) + tuple(obj.get(n) for n in self.get_field_names())
elif isinstance(obj, Row) and hasattr(obj, "_fields"):
return (obj.get_row_kind().value,) + tuple(
obj.get(n) for n in self.get_field_names())
obj[n] for n in self.get_field_names())
elif isinstance(obj, Row):
return (obj.get_row_kind().value,) + tuple(obj)
elif isinstance(obj, (list, tuple)):
Expand Down
8 changes: 8 additions & 0 deletions flink-python/pyflink/datastream/tests/test_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,14 @@ def test_session_window_late_merge(self):
expected = ['(hi,3)']
self.assert_equals_sorted(expected, results)

def test_java_list_deserialization(self):
row_type_info = Types.ROW_NAMED(['list'], [Types.LIST(Types.INT())])
ds = self.env.from_collection([Row(list=[1, 2, 3])], type_info=row_type_info)
ds.map(lambda e: str(e), Types.STRING()).add_sink(self.test_sink)
self.env.execute('test_java_list_deserialization')
expected = ['Row(list=[1, 2, 3])']
self.assert_equals(self.test_sink.get_results(), expected)


class StreamingModeDataStreamTests(DataStreamTests, PyFlinkStreamingTestCase):
def test_data_stream_name(self):
Expand Down
12 changes: 8 additions & 4 deletions flink-python/pyflink/fn_execution/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,13 +673,17 @@ def from_type_info_proto(type_info):
return RowCoder(
[from_type_info_proto(f.field_type) for f in type_info.row_type_info.fields],
[f.field_name for f in type_info.row_type_info.fields])
elif field_type_name == type_info_name.PRIMITIVE_ARRAY:
elif field_type_name in (
type_info_name.PRIMITIVE_ARRAY,
type_info_name.LIST,
):
if type_info.collection_element_type.type_name == type_info_name.BYTE:
return BinaryCoder()
return PrimitiveArrayCoder(from_type_info_proto(type_info.collection_element_type))
elif field_type_name in (type_info_name.BASIC_ARRAY,
type_info_name.OBJECT_ARRAY,
type_info_name.LIST):
elif field_type_name in (
type_info_name.BASIC_ARRAY,
type_info_name.OBJECT_ARRAY,
):
return GenericArrayCoder(from_type_info_proto(type_info.collection_element_type))
elif field_type_name == type_info_name.TUPLE:
return TupleCoder([from_type_info_proto(field_type)
Expand Down
2 changes: 1 addition & 1 deletion flink-python/pyflink/table/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,7 @@ def to_sql_type(self, obj):
for n, f, c in zip(self.names, self.fields, self._need_conversion))
elif isinstance(obj, Row) and hasattr(obj, "_fields"):
return (obj.get_row_kind().value,) + tuple(
f.to_sql_type(obj.get(n)) if c else obj.get(n)
f.to_sql_type(obj[n]) if c else obj[n]
for n, f, c in zip(self.names, self.fields, self._need_conversion))
elif isinstance(obj, Row):
return (obj.get_row_kind().value, ) + tuple(
Expand Down

0 comments on commit f0545f4

Please sign in to comment.