Skip to content

映射规则配置

agapple edited this page Sep 26, 2013 · 1 revision

背景

因为alibaba的特殊业务,比如:

  1. 同步数据同时,需要同步数据关联的文件 (需要做数据join)
  2. 同步会员数据,敏感字段信息不能同步到美国站点. (需要做数据过滤)
  3. 两地机房的数据库可能为异构数据库,(需要做字段类型,名字等转化.)

为解决这些业务,otter引入了映射规则这一概念,用于描述这样的一种同步业务的关系,其粒度可以精确到一张表,或者是一整个库.

映射规则

表映射

otter中每个pipeline可以设置多个映射规则,每个映射规则描述一个数据库同步的内容,比如源表是哪张,同步到哪张目标表。

权重的概念

可以先看下:Otter数据入库算法 , 因为otter采用了pk hash的并行载入算法,会将原先binlog中的事务进行打散做并行处理提升同步性能。原先事务被拆散后,通过这个权重概念,来提供“业务上类事务功能".

举个实际点的例子来看:

  • 比如有两张表,product(记录产品的属性信息)和product_detail(记录产品的详情信息),product_detail上有个字段为product_id与product进行关联. 目前为1:1的关系
  • 业务上插入数据可以通过事务,先插入product,然后插入product_detail,最后一起提交到数据库. 正常,页面上通过查询用户的product表的数据,发现有产品记录,然后通过product_id去查询product_detail表,查到后才显示产品页面
  • 假如同步到目标库后,打散事务后,同步过程如果先插入了product表,后插入product_detail表,这时如果有用户访问该产品记录,可能就会遇到product_detail不存在的情况,从而导致页面出错.
  • 所以,我们通过权重定义,比如将product_detail权重定义为5,将product定义为10。 otter会优先同步权重低的表数据,最终可以保证查询product有数据后,product_detail一定有数据,避免业务出错.

视图映射

如何进入视图编辑:

点击下一步后,进入视图编辑页面:



说明:

  • 映射规则配置页面,可以选择视图模式为:include或exclude,代表正向匹配或者逆向排除.
  • 视图配置页面,只支持存在的数据表(因为要获取数据表结构,所以.*等正则的数据表不支持配置该功能)
  • 视图配置列表,左右选中列表会按照顺序进行对应,做映射时需按照顺序进行选择.

举个例子:

如果要排除表字段A的同步,则只需要选择为exclude模式,然后视图编辑页面选择左右皆选择A字段即可,点击保存.

字段组映射

首先解释一下,需要字段组同步的需求.

  1. 文件同步. 一条记录对应的图片,可能会有一个或者多个字段,比如会有image_path,image_version来决定图片,所以我们可以定义这两个字段为一组,只要满足组内任意一个字段的变更,就会认为需要文件同步.
  2. 数据上的组同步,比如国家,省份,城市,可能在数据库为三个字段. 如果是双A同步,两地同时修改这些字段,但业务上可能在A地修改了国家为美国,在B地修改为省份为浙江,然后一同步,最终就会变成美国,浙江这样的情况. 这种情况可以通过group来解决,将国家,省份,城市做一个group,组内任何一个字段发生了变更,其余字段会做为整体一起变更.

再来看一下配置:(点击视图编辑页面的下一步,即可进入)


说明:

  1. 也可不配置视图,单独配置字段组,此时可选择的字段即为当前所有字段(映射规则按照同名映射).

高级映射

主要分为两类:

  1. 文件同步
  2. 自定义数据同步

具体代码扩展方式和配置可参见: Otter扩展性

配置方式:



文件同步

首先解释一下文件同步的需求,阿里巴巴国际站业务,主要模式为对外贸易,卖家基本在国内,买家在国外. 所以,目前我们的机房部署为杭州和美国各一个,卖家访问杭州机房,买家访问美国机房。卖家在国内发布产品和图片,通过otter同步到美国,同步产品数据记录的同时,同样需要把图片同步过去,保证买家用户的访问体验. 所以,基于这个业务场景,衍生出了文件同步的需求.

所以,做文件同步的几个前提:

  1. 异地同步 (需要部署为两个node,S/E和T/L分为两地. )
  2. 数据触发文件同步 (数据库记录做为类似文件索引信息,不支持单独同步文件)
  3. 本地文件同步 (需要同步的文件需要和node部署在一台机器上或者通过nfs mount,如果要同步 公司自带的分布式文件系统的数据,otter需要做额外扩展)

文件同步准备工作:

  1. 准备两台机器,分别部署上两个node
  2. 配置channel/pipeline同步,配置映射规则
  3. 编写FileResolver解析类,根据binlog中的变更数据,转化为文件的路径信息. 例子:TestFileResolver
  4. public class TestFileResolver extends AbstractFileResolver {
    
    public FileInfo[] getFileInfo(Map<String, String> rowMap) {
        // 基本步骤:
        // 1. 获取binlog中的变更字段,比如组成文件有多个字段组成version+path
        // 2. 基于字段内容,构造一个文件路径,目前开源版本只支持本地文件的同步.(如果是网络文件,建议进行NFS mount到ndde机器).
        // 3. 返回FileInfo数组,(目前不支持目录同步,如果是目录需要展开为多个FileInfo的子文件),如果不需要同步,则返回null.
        String path = rowMap.get("FIELD"); //注意为大写
        FileInfo fileInfo = null;
        if (StringUtils.isNotEmpty(path)) {
            fileInfo = new FileInfo(path);
            return new FileInfo[] { fileInfo };
        } else {
            return null;
        }
    }
    

    }

自定义数据同步

通过前面的字段视图映射,或许可以解决80%的需求,但总会有一小撮的特殊用户,希望可以自定义自己的同步数据内容,所以otter引入了自定义数据同步为EventProcessor,允许你任意改变整个同步过程中的数据内容.

可以支持的需求:

  1. 根据字段内容,判断是否需要屏蔽本记录同步
  2. 动态增加/减少字段
  3. 动态修改字段内容
  4. 动态改变事件类型(Insert/Update/Delete)

几点注意:

  1. EventProcessor主要是在E模块进行数据处理,也就是EventProcessor处理后的数据,会再次经过视图配置,字段组映射,文件同步,最后进入Transform处理.
  2. EventProcessor修改数据中的schema/table name需要谨慎,因为会继续后续的E/T/L流程,所以需要保证修改后的name在映射规则列表里有配置,否则同步会出错.

一个例子:(比如我想将源库的每条binlog变更,记录到一个日志表binlog,映射规则配置为.*所有表的同步)

代码:TestEventProcessor

public class TestEventProcessor extends AbstractEventProcessor {
public boolean process(EventData eventData) {
    // 基本步骤:
    // 1. 获取binlog中的变更字段
    // 2. 根据业务逻辑进行判断,如果需要忽略本条数据同步,直接返回false,否则返回true
    // 3. 根据业务逻辑进行逻辑转化,比如可以修改整个EventData数据.  

    // 本文例子:源库的每条binlog变更,记录到一个日志表binlog
    // create table test.binlog(
    //        id bigint(20) auto_increment,
    //        oschema varchar(256),
    //        otable varchar(256),
    //        gtime varchar(32)
    //        ovalue text,
    //        primary key(id);
    //    )
    // 在process处理中,可以修改EventData的任何数据,达到数据转换的效果, just have fun.
    JSONObject col = new JSONObject();
    JSONArray array = new JSONArray();
    for (EventColumn column : eventData.getColumns()) {
        JSONObject obj = this.doColumn(column);
        array.add(obj);
    }

    for (EventColumn key : eventData.getKeys()) {
        JSONObject obj = this.doColumn(key);
        array.add(obj);
    }
    // 记录原始的表信息
    col.put("schema", eventData.getSchemaName());
    col.put("table", eventData.getTableName());
    col.put("columns", array);
    col.put("dml", eventData.getEventType());
    col.put("exectime", eventData.getExecuteTime());

    // 构造新的主键
    EventColumn id = new EventColumn();
    id.setColumnValue(eventData.getSchemaName());
    id.setColumnType(Types.BIGINT);
    id.setColumnName("id");
    // 构造新的字段
    EventColumn schema = new EventColumn();
    schema.setColumnValue(eventData.getSchemaName());
    schema.setColumnType(Types.VARCHAR);
    schema.setColumnName("oschema");

    EventColumn table = new EventColumn();
    table.setColumnValue(eventData.getTableName());
    table.setColumnType(Types.VARCHAR);
    table.setColumnName("otable");

    EventColumn ovalue = new EventColumn();
    ovalue.setColumnValue(col.toJSONString());
    ovalue.setColumnType(Types.VARCHAR);
    ovalue.setColumnName("ovalue");

    EventColumn gtime = new EventColumn();
    gtime.setColumnValue(eventData.getExecuteTime() + "");
    gtime.setColumnType(Types.VARCHAR);
    gtime.setColumnName("gtime");

    // 替换为新的字段和主键信息
    List<EventColumn> cols = new ArrayList<EventColumn>();
    cols.add(schema);
    cols.add(table);
    cols.add(gtime);
    cols.add(ovalue);
    eventData.setColumns(cols);

    List<EventColumn> keys = new ArrayList<EventColumn>();
    keys.add(id);
    eventData.setKeys(keys);

    //修改数据meta信息
    eventData.setEventType(EventType.INSERT);
    eventData.setSchemaName("test");
    eventData.setTableName("binlog");
    return true;
}

private JSONObject doColumn(EventColumn column) {
    JSONObject obj = new JSONObject();
    obj.put("name", column.getColumnName());
    obj.put("update", column.isUpdate());
    obj.put("key", column.isKey());
    if (column.getColumnType() != Types.BLOB && column.getColumnType() != Types.CLOB) {
        obj.put("value", column.getColumnValue());
    } else {
        obj.put("value", "");
    }
    return obj;
}

}