-
Notifications
You must be signed in to change notification settings - Fork 2.5k
映射规则配置
因为alibaba的特殊业务,比如:
- 同步数据同时,需要同步数据关联的文件 (需要做数据join)
- 同步会员数据,敏感字段信息不能同步到美国站点. (需要做数据过滤)
- 两地机房的数据库可能为异构数据库,(需要做字段类型,名字等转化.)
为解决这些业务,otter引入了映射规则这一概念,用于描述这样的一种同步业务的关系,其粒度可以精确到一张表,或者是一整个库.
otter中每个pipeline可以设置多个映射规则,每个映射规则描述一个数据库同步的内容,比如源表是哪张,同步到哪张目标表。
可以先看下:Otter数据入库算法 , 因为otter采用了pk hash的并行载入算法,会将原先binlog中的事务进行打散做并行处理提升同步性能。原先事务被拆散后,通过这个权重概念,来提供“业务上类事务功能".
举个实际点的例子来看:
- 比如有两张表,product(记录产品的属性信息)和product_detail(记录产品的详情信息),product_detail上有个字段为product_id与product进行关联. 目前为1:1的关系
- 业务上插入数据可以通过事务,先插入product,然后插入product_detail,最后一起提交到数据库. 正常,页面上通过查询用户的product表的数据,发现有产品记录,然后通过product_id去查询product_detail表,查到后才显示产品页面
- 假如同步到目标库后,打散事务后,同步过程如果先插入了product表,后插入product_detail表,这时如果有用户访问该产品记录,可能就会遇到product_detail不存在的情况,从而导致页面出错.
- 所以,我们通过权重定义,比如将product_detail权重定义为5,将product定义为10。 otter会优先同步权重低的表数据,最终可以保证查询product有数据后,product_detail一定有数据,避免业务出错.
如何进入视图编辑:
点击下一步后,进入视图编辑页面:
说明:
- 映射规则配置页面,可以选择视图模式为:include或exclude,代表正向匹配或者逆向排除.
- 视图配置页面,只支持存在的数据表(因为要获取数据表结构,所以.*等正则的数据表不支持配置该功能)
- 视图配置列表,左右选中列表会按照顺序进行对应,做映射时需按照顺序进行选择.
举个例子:
如果要排除表字段A的同步,则只需要选择为exclude模式,然后视图编辑页面选择左右皆选择A字段即可,点击保存.
首先解释一下,需要字段组同步的需求.
- 文件同步. 一条记录对应的图片,可能会有一个或者多个字段,比如会有image_path,image_version来决定图片,所以我们可以定义这两个字段为一组,只要满足组内任意一个字段的变更,就会认为需要文件同步.
- 数据上的组同步,比如国家,省份,城市,可能在数据库为三个字段. 如果是双A同步,两地同时修改这些字段,但业务上可能在A地修改了国家为美国,在B地修改为省份为浙江,然后一同步,最终就会变成美国,浙江这样的情况. 这种情况可以通过group来解决,将国家,省份,城市做一个group,组内任何一个字段发生了变更,其余字段会做为整体一起变更.
再来看一下配置:(点击视图编辑页面的下一步,即可进入)
说明:
- 也可不配置视图,单独配置字段组,此时可选择的字段即为当前所有字段(映射规则按照同名映射).
主要分为两类:
- 文件同步
- 自定义数据同步
具体代码扩展方式和配置可参见: Otter扩展性
配置方式:
首先解释一下文件同步的需求,阿里巴巴国际站业务,主要模式为对外贸易,卖家基本在国内,买家在国外. 所以,目前我们的机房部署为杭州和美国各一个,卖家访问杭州机房,买家访问美国机房。卖家在国内发布产品和图片,通过otter同步到美国,同步产品数据记录的同时,同样需要把图片同步过去,保证买家用户的访问体验. 所以,基于这个业务场景,衍生出了文件同步的需求.
所以,做文件同步的几个前提:
- 异地同步 (需要部署为两个node,S/E和T/L分为两地. )
- 数据触发文件同步 (数据库记录做为类似文件索引信息,不支持单独同步文件)
- 本地文件同步 (需要同步的文件需要和node部署在一台机器上或者通过nfs mount,如果要同步 公司自带的分布式文件系统的数据,otter需要做额外扩展)
文件同步准备工作:
- 准备两台机器,分别部署上两个node
- 配置channel/pipeline同步,配置映射规则
- 编写FileResolver解析类,根据binlog中的变更数据,转化为文件的路径信息. 例子:TestFileResolver
-
public class TestFileResolver extends AbstractFileResolver {
public FileInfo[] getFileInfo(Map<String, String> rowMap) { // 基本步骤: // 1. 获取binlog中的变更字段,比如组成文件有多个字段组成version+path // 2. 基于字段内容,构造一个文件路径,目前开源版本只支持本地文件的同步.(如果是网络文件,建议进行NFS mount到ndde机器). // 3. 返回FileInfo数组,(目前不支持目录同步,如果是目录需要展开为多个FileInfo的子文件),如果不需要同步,则返回null. String path = rowMap.get("FIELD"); //注意为大写 FileInfo fileInfo = null; if (StringUtils.isNotEmpty(path)) { fileInfo = new FileInfo(path); return new FileInfo[] { fileInfo }; } else { return null; } }
}
通过前面的字段视图映射,或许可以解决80%的需求,但总会有一小撮的特殊用户,希望可以自定义自己的同步数据内容,所以otter引入了自定义数据同步为EventProcessor,允许你任意改变整个同步过程中的数据内容.
可以支持的需求:
- 根据字段内容,判断是否需要屏蔽本记录同步
- 动态增加/减少字段
- 动态修改字段内容
- 动态改变事件类型(Insert/Update/Delete)
几点注意:
- EventProcessor主要是在E模块进行数据处理,也就是EventProcessor处理后的数据,会再次经过视图配置,字段组映射,文件同步,最后进入Transform处理.
- EventProcessor修改数据中的schema/table name需要谨慎,因为会继续后续的E/T/L流程,所以需要保证修改后的name在映射规则列表里有配置,否则同步会出错.
一个例子:(比如我想将源库的每条binlog变更,记录到一个日志表binlog,映射规则配置为.*所有表的同步)
public class TestEventProcessor extends AbstractEventProcessor {public boolean process(EventData eventData) { // 基本步骤: // 1. 获取binlog中的变更字段 // 2. 根据业务逻辑进行判断,如果需要忽略本条数据同步,直接返回false,否则返回true // 3. 根据业务逻辑进行逻辑转化,比如可以修改整个EventData数据. // 本文例子:源库的每条binlog变更,记录到一个日志表binlog // create table test.binlog( // id bigint(20) auto_increment, // oschema varchar(256), // otable varchar(256), // gtime varchar(32) // ovalue text, // primary key(id); // ) // 在process处理中,可以修改EventData的任何数据,达到数据转换的效果, just have fun. JSONObject col = new JSONObject(); JSONArray array = new JSONArray(); for (EventColumn column : eventData.getColumns()) { JSONObject obj = this.doColumn(column); array.add(obj); } for (EventColumn key : eventData.getKeys()) { JSONObject obj = this.doColumn(key); array.add(obj); } // 记录原始的表信息 col.put("schema", eventData.getSchemaName()); col.put("table", eventData.getTableName()); col.put("columns", array); col.put("dml", eventData.getEventType()); col.put("exectime", eventData.getExecuteTime()); // 构造新的主键 EventColumn id = new EventColumn(); id.setColumnValue(eventData.getSchemaName()); id.setColumnType(Types.BIGINT); id.setColumnName("id"); // 构造新的字段 EventColumn schema = new EventColumn(); schema.setColumnValue(eventData.getSchemaName()); schema.setColumnType(Types.VARCHAR); schema.setColumnName("oschema"); EventColumn table = new EventColumn(); table.setColumnValue(eventData.getTableName()); table.setColumnType(Types.VARCHAR); table.setColumnName("otable"); EventColumn ovalue = new EventColumn(); ovalue.setColumnValue(col.toJSONString()); ovalue.setColumnType(Types.VARCHAR); ovalue.setColumnName("ovalue"); EventColumn gtime = new EventColumn(); gtime.setColumnValue(eventData.getExecuteTime() + ""); gtime.setColumnType(Types.VARCHAR); gtime.setColumnName("gtime"); // 替换为新的字段和主键信息 List<EventColumn> cols = new ArrayList<EventColumn>(); cols.add(schema); cols.add(table); cols.add(gtime); cols.add(ovalue); eventData.setColumns(cols); List<EventColumn> keys = new ArrayList<EventColumn>(); keys.add(id); eventData.setKeys(keys); //修改数据meta信息 eventData.setEventType(EventType.INSERT); eventData.setSchemaName("test"); eventData.setTableName("binlog"); return true; } private JSONObject doColumn(EventColumn column) { JSONObject obj = new JSONObject(); obj.put("name", column.getColumnName()); obj.put("update", column.isUpdate()); obj.put("key", column.isKey()); if (column.getColumnType() != Types.BLOB && column.getColumnType() != Types.CLOB) { obj.put("value", column.getColumnValue()); } else { obj.put("value", ""); } return obj; }
}