Skip to content

Commit

Permalink
Merge pull request #4938 from FederatedAI/feature-2.0.0-beta-flow_upload
Browse files Browse the repository at this point in the history
Feature 2.0.0 beta flow upload
  • Loading branch information
mgqa34 authored Jun 20, 2023
2 parents 5399af2 + 5e1b6d3 commit 73db110
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
11 changes: 6 additions & 5 deletions python/fate/arch/dataframe/_frame_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@ def _dense_format_to_frame(self, ctx, table):
from .ops._indexer import get_partition_order_by_raw_table
partition_order_mappings = get_partition_order_by_raw_table(table)
# partition_order_mappings = _get_partition_order(table)
functools.partial(_to_blocks,
data_manager=data_manager,
retrieval_index_dict=retrieval_index_dict,
partition_order_mappings=partition_order_mappings)
table = table.mapValues(lambda value: value.split(self._delimiter, -1))
to_block_func = functools.partial(_to_blocks,
data_manager=data_manager,
retrieval_index_dict=retrieval_index_dict,
partition_order_mappings=partition_order_mappings)
block_table = table.mapPartitions(
_to_blocks,
to_block_func,
use_previous_behavior=False
)

Expand Down
25 changes: 20 additions & 5 deletions python/fate/arch/dataframe/manager/block_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ def __init__(self, *args, **kwargs):

@staticmethod
def convert_block(block):
return torch.tensor(block, dtype=torch.int32)
try:
return torch.tensor(block, dtype=torch.int32)
except ValueError:
return torch.tensor(np.array(block, dtype="int32"), dtype=torch.int32)


class Int64Block(Block):
Expand All @@ -230,7 +233,10 @@ def __init__(self, *args, **kwargs):

@staticmethod
def convert_block(block):
return torch.tensor(block, dtype=torch.int64)
try:
return torch.tensor(block, dtype=torch.int64)
except ValueError:
return torch.tensor(np.array(block, dtype="int64"), dtype=torch.int64)


class Float32Block(Block):
Expand All @@ -240,7 +246,10 @@ def __init__(self, *args, **kwargs):

@staticmethod
def convert_block(block):
return torch.tensor(block, dtype=torch.float32)
try:
return torch.tensor(block, dtype=torch.float32)
except ValueError:
return torch.tensor(np.array(block, dtype="float32"), dtype=torch.float32)


class Float64Block(Block):
Expand All @@ -250,7 +259,10 @@ def __init__(self, *args, **kwargs):

@staticmethod
def convert_block(block):
return torch.tensor(block, dtype=torch.float64)
try:
return torch.tensor(block, dtype=torch.float64)
except ValueError:
return torch.tensor(np.array(block, dtype="float64"), dtype=torch.float64)


class BoolBlock(Block):
Expand All @@ -260,7 +272,10 @@ def __init__(self, *args, **kwargs):

@staticmethod
def convert_block(block):
return torch.tensor(block, dtype=torch.bool)
try:
return torch.tensor(block, dtype=torch.bool)
except ValueError:
return torch.tensor(np.array(block, dtype="bool"), dtype=torch.bool)


class IndexBlock(Block):
Expand Down
8 changes: 4 additions & 4 deletions python/fate/components/components/dataframe_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
# limitations under the License.
#
from fate.components.core import LOCAL, Role, cpn
from typing import Union, List, Dict


@cpn.component(roles=[LOCAL])
@cpn.table_input("table", roles=[LOCAL])
@cpn.dataframe_output("dataframe_output", roles=[LOCAL])
@cpn.parameter("namespace", type=str, default=",", optional=True)
@cpn.parameter("name", type=str, default="dense", optional=True)
@cpn.parameter("namespace", type=str, default=None, optional=True)
@cpn.parameter("name", type=str, default=None, optional=True)
@cpn.parameter("anonymous_role", type=str, default=None, optional=True)
@cpn.parameter("anonymous_party_id", type=str, default=None, optional=True)
def dataframe_transformer(
Expand All @@ -35,6 +34,7 @@ def dataframe_transformer(
anonymous_party_id,
):
from fate.arch.dataframe import TableReader

metadata = table.schema
table_reader = TableReader(
sample_id_name=metadata.get("sample_id_name", None),
Expand All @@ -57,4 +57,4 @@ def dataframe_transformer(
)

df = table_reader.to_frame(ctx, table)
ctx.writer(dataframe_output).write(df, namespace=namespace, name=name)
dataframe_output.write(ctx, df, name=name, namespace=namespace)

0 comments on commit 73db110

Please sign in to comment.