Skip to content

Commit

Permalink
Merge branch 'dev-2.0.0-rc' into dev-2.0.0-hetero-nn-fedpass
Browse files Browse the repository at this point in the history
  • Loading branch information
talkingwallace committed Nov 23, 2023
2 parents 26a3167 + aa7b18c commit 26f96e3
Show file tree
Hide file tree
Showing 236 changed files with 8,030 additions and 4,998 deletions.
Binary file added doc/2.0/images/union.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
229 changes: 127 additions & 102 deletions doc/2.0/osx/osx.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@ FATE1.X维护了多套通信架构,包括eggroll、spark+pulsar+nginx 、spark

## 新组件特性:

- 站点之间通信同时支持grpc、http1.X协议
- 传输接口兼容FATE1.X版本 FATE2.X版本

- 按照《金融业隐私计算互联互通平台技术规范》实现传输接口

- 支持多种计算引擎传输,包括eggroll、spark
- 支持在FATE1.X中无缝替代rollsite组件
- 同时支持同步rpc调用+消息队列

- 传输模式支持rpc、消息队列

- 传输协议支持grpc(rpc、消息队列模式)、http1.1(消息队列模式)

- 支持作为exchange中心节点部署 ,支持FATE1.X 、FATE2.X 接入
- 支持集群流量控制,可以针对不同参与方制定流控策略

- 路由配置与原eggroll基本一致,降低了移植难度
- 支持集群模式与standalone两种模式(默认为standalone模式,standalone已可满足大部分场景)
- 可根据接口中厂商编码可针对不同技术提供商做自定义开发


## 组件设计:

Expand All @@ -50,63 +56,105 @@ FATE1.X维护了多套通信架构,包括eggroll、spark+pulsar+nginx 、spark

上图为采用spark作为计算引擎时的部署架构:







![osx_on_spark.drawio](../images/osx_on_spark.drawio.png)



与其他厂商互联互通互联互通



![union](../images/union.png)



## 配置:

以下为osx最简配置,对应部署文件中的broker.properties
以下为osx最简配置,配置文件位于 {部署目录}/conf/broker/broker.properties

```properties
#grpc端口
grpc.port= 9370
#是否开启http server
open.http.server=false
# http端口
http.port=8080
# 是否开启grpc+TLS传输
open.grpc.tls.server=false
#grpc+TLS传输时使用的端口
grpc.tls.port=9883
#本方partyId,可以以逗号分隔并填写多个
self.party=10000
#部署方式 standalone/cluster,standalone代表单机模式 cluster代表集群模式
deploy.model=standalone
#集群模式下需要接入zookeeper,以下为zookeeper地址
zk.url=localhost:2181
#若要使用eggroll作为计算引擎,需要知悉eggroll cluster-manager组件的ip与端口
eggroll.cluster.manager.ip = localhost
grpc.port= 9377 (服务监听的grpc端口)
# eg: 9999,10000,10001 (本方partyId, 对应于互联互通协议中的nodeId)
self.party=10000
# (若使用eggroll作为计算引擎,此处填入eggroll cluster-manager 的ip)
eggroll.cluster.manager.ip = localhost
# (若使用eggroll作为计算引擎,此处填入eggroll cluster-manager 的端口)
eggroll.cluster.manager.port = 4670
```

全部配置:

| 名称 | 含义 | 默认值 | 是否必须配置 | 说明 |
| ---------------------------------------------- | ------------------------------------------------------------ | -------------------- | ----------------------------------- | ------------------------------------------------------------ |
| grpc.port | 服务监听grpc端口(非TLS) | 9370 || 该端口用做默认的集群内部通信端口 ,若是用于非生产环境,出于方便测试以及调试考虑,可以将此端口作为集群间通信端口。若是生产环境使用,出于安全考虑,不应该将此对外暴露,而是将使用TLS 的端口对外暴露 ,参考配置open.grpc.tls.server grpc.tls.port |
| self.party | 本方partyId ||| **此配置非常重要,需要在部署集群前确定己方partyId,此处的配置会影响请求是否能正确路由** |
| eggroll.cluster.manager.ip | 若使用eggroll作为计算引擎,此处填入eggroll cluster-manager 的ip ||| |
| eggroll.cluster.manager.port | 若使用eggroll作为计算引擎,此处填入eggroll cluster-manager 的端口 ||| |
| open.grpc.tls.server | 是否开启使用TLS的grpc端口 | false || 开启之后,服务将会监听一个使用TLS的grpc端口 |
| grpc.tls.port | 服务监听grpc端口(使用TLS) || 若open.grpc.tls.server =true 则必填 | 出于安全考虑,在生产上一般将此端口用做对外通信。而通过grpc.port配置的端口,则用于集群内部组件之间的通信。 |
| open.http.server | 是否开启http1.x协议端口(非TLS) | false || http协议目前只适用于队列模式传输,且FATE1.X版本接口不支持http协议,若使用了其他厂家提供的使用http协议的算法容器(FATE算法默认使用grpc),则可以开启httpServer,该配置默认关闭 |
| http.port | httpServer端口(非TLS) || 若open.http.server =true 则必填 | |
| open.https.server | 是否开启http1.x协议端口(TLS) | false || |
| http.context.path | http服务端配置 | /v1 || eg: http://127.0.0.1:9370/v1/interconn/chan/invoke 中的v1字符串 |
| http.servlet.path | http服务端配置 | /* || eg: http://127.0.0.1:9370/v1/interconn/chan/invoke 中v1/后的内容 |
| https.port | httpServer端口(使用TLS) || 若open.https.server=true 则必填 | |
| bind.host | 绑定本地ip(适用于http 与grpc server) | 0.0.0.0 || |
| grpc.server.max.concurrent.call.per.connection | 服务端单个grpc链接最大并发 | 1000 || |
| grpc.server.max.inbound.message.size | 服务端单个grpc包最大大小 | 2147483647 || |
| grpc.server.max.inbound.metadata.size | 服务端单个grpc包最大 metadata 大小 | 134217728 || |
| grpc.server.flow.control.window | 服务端grpc流控窗口大小 | 134217728 || |
| grpc.server.keepalive.without.calls.enabled | 服务端grpc是否允许连接没有调用是保持存活 | true || |
| grpc.client.max.inbound.message.size | 客户端单个grpc包最大大小 | 2147483647 || |
| grpc.client.flow.control.window | 客户端grpc流控窗口大小 | 134217728 || |
| | | | | |
| queue.max.free.time | 队列最大空闲时间 | 43200000(单位毫秒) || 空闲时间超过该配置的队列,将会被回收,释放本地资源 |
| queue.check.interval | 检查队列空闲定时任务间隔 | 60000(单位毫秒) || |
| consume.msg.waiting.timeout | 消费阻塞最大时间 | 3600000 || 若不同厂商算法组件消费接口中未指定超时时间,则使用配置作为超时时间 |
| grpc.oncompleted.wait.timeout | grpc流式传输中当一方已经完成传输后,等待另一方完成的时间 | 600(单位秒) || grpc流式传输接口中使用 |



## 路由:

路由配置相关文件为route_table.json ,与eggroll组件rollsite保持一致:
路由配置相关文件为{部署目录}/conf/broker/route_table.json ,与eggroll组件rollsite保持一致,下面介绍在不使用证书的情况下的操作步骤:

先检查{部署目录}/conf/broker/broker.properties 中配置 self.party,如下所示,则代表本方partyId为9999,(若是与遵循互联互通协议的其他厂商隐私计算产品对接,此处对应于互联互通协议的nodeId)

```
self.party=9999
```



**若发现该配置不符合预期,则需要修改成预期的partyId,并重启应用。**

本方partyId :9999 (若是与遵循互联互通协议的其他厂商隐私计算产品对接,此处对应于互联互通协议的nodeId)

若对方partyId 为10000 (若是与遵循互联互通协议的其他厂商隐私计算产品对接,此处对应于互联互通协议的nodeId),则按照如下配置

```json
{
"route_table":
{
"9999":
"9999": //己方partyId 9999 ,
{
"default":[
"fateflow":[ //配置己方路由只需要配置fateflow 地址就可以,需要注意这里需要配置fateflow的grpc端口,默认是9360
{
"port": 9370,
"ip": "localhost"
}
],
"fateflow":[
{
"port": 9360,
"port": 9360,
"ip": "localhost"
}
]
},
"10000":{
"default":[{
"port": 9889,
"ip": "localhost"
"10000":{ //对方partyId 10000
"default":[{ //配置对方路由,只需要配置default 地址就可以 , 地址为对方的osx grpc端口
"port": 9370,
"ip": "192.168.xx.xx"
}]

}
Expand All @@ -118,77 +166,17 @@ eggroll.cluster.manager.port = 4670
}
```

**路由表修改之后不需要重启应用,系统会自动读取。**


## 接口:
##

```protobuf
message Message{
string msgId = 1;//消息ID
bytes head = 2;//消息头部
bytes body = 3;//消息体
}
message TopicInfo{
string topic=1;
string ip = 2;
int32 port = 3;
int64 createTimestamp = 4;
int32 status = 5;
}
// PTP Private transfer protocol
// 通用报头名称编码,4层无Header以二进制填充到报头,7层以Header传输
enum Header {
Version = 0; // 协议版本 对应7层协议头x-ptp-version
TechProviderCode = 1; // 厂商编码 对应7层协议头x-ptp-tech-provider-code
TraceID = 4; // 链路追踪ID 对应7层协议头x-ptp-trace-id
Token = 5; // 认证令牌 对应7层协议头x-ptp-token
SourceNodeID = 6; // 发送端节点编号 对应7层协议头x-ptp-source-node-id
TargetNodeID = 7; // 接收端节点编号 对应7层协议头x-ptp-target-node-id
SourceInstID = 8; // 发送端机构编号 对应7层协议头x-ptp-source-inst-id
TargetInstID = 9; // 接收端机构编号 对应7层协议头x-ptp-target-inst-id
SessionID = 10; // 通信会话号,全网唯一 对应7层协议头x-ptp-session-id
}
// 通信扩展元数据编码,扩展信息均在metadata扩展
enum Metadata {
MessageTopic = 0; // 消息话题,异步场景
MessageCode = 1; // 消息编码,异步场景
SourceComponentName = 2; // 源组件名称
TargetComponentName = 3; // 目标组件名称
TargetMethod = 4; // 目标方法
MessageOffSet = 5; // 消息序列号
InstanceId = 6; // 实例ID
Timestamp = 7; // 时间戳
}
// 通信传输层输入报文编码
message Inbound {
map<string, string> metadata = 1; // 报头,可选,预留扩展,Dict,序列化协议由通信层统一实现
bytes payload = 2; // 报文,上层通信内容承载,序列化协议由上层基于SPI可插拔
}
// 通信传输层输出报文编码
message Outbound {
map<string, string> metadata = 1; // 报头,可选,预留扩展,Dict,序列化协议由通信层统一实现
bytes payload = 2; // 报文,上层通信内容承载,序列化协议由上层基于SPI可插拔
string code = 3; // 状态码
string message = 4; // 状态说明
}
// 互联互通如果使用异步传输协议作为标准参考,Header会复用metadata传输互联互通协议报头,且metadata中会传输异步场景下的消息相关属性
// 互联互通如果使用其他协议作为参考标准,Header会复用metadata传输互联互通协议报头
// 互联互通如果使用GRPC作为参考标准,Header会复用HTTP2的报头传输互联互通协议报头
service PrivateTransferProtocol {
rpc transport (stream Inbound) returns (stream Outbound);
rpc invoke (Inbound) returns (Outbound);
}
```



## 源码打包:
## 部署教程

1. 下载源码,打包机器需要安装好maven + jdk
2. 进入源码目录/deploy, 执行sh auto-package.sh, 执行完之后会在当前目录出现osx.tar.gz。
Expand All @@ -198,3 +186,40 @@ service PrivateTransferProtocol {
1. 部署机器需要安装jdk1.8+
2. 解压osx.tar.gz
3. 进入部署目录,执行sh service.sh start



### 日志分析:







### 证书相关:





###

### 常见问题:

















12 changes: 6 additions & 6 deletions doc/2.0/quick_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ base_path = os.path.abspath(os.path.join(__file__, os.path.pardir))
guest_data_path = os.path.join(base_path, "breast_hetero_guest.csv")
host_data_path = os.path.join(base_path, "breast_hetero_host.csv")

data_pipeline = FateFlowPipeline().set_roles(local="0")
data_pipeline = FateFlowPipeline().set_parties(local="0")
guest_meta = {
"delimiter": ",", "dtype": "float64", "label_type": "int64","label_name": "y", "match_id_name": "id"
}
Expand All @@ -59,14 +59,14 @@ from fate_client.pipeline.interface import DataWarehouseChannel


# create pipeline for training
pipeline = FateFlowPipeline().set_roles(guest="9999", host="10000")
pipeline = FateFlowPipeline().set_parties(guest="9999", host="10000")

# create psi component_desc
psi_0 = PSI("psi_0")
psi_0.guest.component_setting(
psi_0.guest.task_setting(
input_data=DataWarehouseChannel(name="breast_hetero_guest", namespace="experiment")
)
psi_0.hosts[0].component_setting(
psi_0.hosts[0].task_setting(
input_data=DataWarehouseChannel(name="breast_hetero_host", namespace="experiment")
)

Expand Down Expand Up @@ -103,10 +103,10 @@ predict_pipeline = FateFlowPipeline()

# add input to deployed_pipeline
deployed_pipeline = pipeline.get_deployed_pipeline()
deployed_pipeline.psi_0.guest.component_setting(
deployed_pipeline.psi_0.guest.task_setting(
input_data=DataWarehouseChannel(name="breast_hetero_guest", namespace=f"experiment")
)
deployed_pipeline.psi_0.hosts[0].component_setting(
deployed_pipeline.psi_0.hosts[0].task_setting(
input_data=DataWarehouseChannel(name="breast_hetero_host", namespace=f"experiment")
)

Expand Down
10 changes: 5 additions & 5 deletions doc/tutorial/pipeline_tutorial_hetero.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
"metadata": {},
"outputs": [],
"source": [
"pipeline = FateFlowPipeline().set_roles(guest='9999', host='10000', arbiter='10000')"
"pipeline = FateFlowPipeline().set_parties(guest='9999', host='10000', arbiter='10000')"
]
},
{
Expand All @@ -146,9 +146,9 @@
"outputs": [],
"source": [
"psi_0 = PSI(\"psi_0\")\n",
"psi_0.guest.component_setting(input_data=DataWarehouseChannel(name=\"breast_hetero_guest\",\n",
"psi_0.guest.task_setting(input_data=DataWarehouseChannel(name=\"breast_hetero_guest\",\n",
" namespace=\"experiment\"))\n",
"psi_0.hosts[0].component_setting(input_data=DataWarehouseChannel(name=\"breast_hetero_host\",\n",
"psi_0.hosts[0].task_setting(input_data=DataWarehouseChannel(name=\"breast_hetero_host\",\n",
" namespace=\"experiment\"))\n"
]
},
Expand Down Expand Up @@ -582,9 +582,9 @@
"metadata": {},
"outputs": [],
"source": [
"deployed_pipeline.psi_0.guest.component_setting(input_data=DataWarehouseChannel(name=\"breast_hetero_guest\",\n",
"deployed_pipeline.psi_0.guest.task_setting(input_data=DataWarehouseChannel(name=\"breast_hetero_guest\",\n",
" namespace=\"experiment\"))\n",
"deployed_pipeline.psi_0.hosts[0].component_setting(input_data=DataWarehouseChannel(name=\"breast_hetero_host\",\n",
"deployed_pipeline.psi_0.hosts[0].task_setting(input_data=DataWarehouseChannel(name=\"breast_hetero_host\",\n",
" namespace=\"experiment\"))"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
"metadata": {},
"outputs": [],
"source": [
"pipeline = FateFlowPipeline().set_roles(local=\"0\")\n",
"pipeline = FateFlowPipeline().set_parties(local=\"0\")\n",
"pipeline.set_site_role(\"local\")\n",
"pipeline.set_site_party_id(\"0\");"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ data:
role: host_0
tasks:
normal-lr:
script: test_lr_sid.py
script: test_lr.py
conf: "./breast_config.yaml"
Loading

0 comments on commit 26f96e3

Please sign in to comment.