Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NebulaGraph-DataX:ReaderAndWriter Development #1

Merged
merged 18 commits into from
Oct 28, 2022
60 changes: 60 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# nebula支持DataX

## 1 方案思路

本项目架构主要参考DataX官方代码库,项目主要分为Writer和Reader两部分进行开发和测试。由于DataX官方代码库中尚未接入其他图数据库插件,所以本项目开发中也考虑到图数据库与其他数据库开发的差异。

***从原理角度解释项目:***

我们以从关系型数据库MySQL中的数据同步插入到NebulaGraph中为例。之所以能够把关系型数据库中的表数据同步到NebulaGraph图数据库中,是因为nebula中存在Tag和Edge Type这两个类表概念,Tag作为标签类型,标识着某一类点的属性,可以近似为关系型数据库的表结构,而Edge Type同理,也可以近似为关系型数据中的关系表结构。因此当我们想同步关系型数据中某些实体表时,则对应的就是nebula中对应的Tag。而某一Tag下会对应多个Vertex,其相当于关系型数据中的表项,而Edge对于Edge Type的关系同样类似Tag和Vertex的关系。因此无论nebula作为Reader还是Writer都可以从结构角度匹配上关系型数据库中的表结构。

**Reader插件开发的实现思路:**

通过nebula nGql的查询语句 match 查询Tag标签下对应的所有Vertex节点,并返回节点集合组装成record包集合,通过DataX发送给关系型数据库,注意关系型数据库的Table表需要和Tag标签的名称完全一致,否则会出现匹配失败。同理,利用match查询语句查询某个制定edge_type类型下的所有边,然后通过关系型数据库的插入语句写入目标数据库中。需要注意的是,当使用match语句时,需要确保Tag和Edge Type已经建立索引。

**Writer插件开发的实现思路:**

在开发Writer插件时,我们需要利用配置文件中的column字段,利用元信息获得哪些字段属于哪些标签和边类型,然后进行匹配。在获取这些信息后,我们利用从reader插件端获取到的record包集合,通过Java8中stream的filter,map等操作组装插入语句。由于我们规定关系型数据库的表名称和图数据库的标签和边类型必须一致,所以我们可以直接利用tag_name和edgeType_name进行字段的匹配,通过insert vertex <tag_name>和insert edge <edge_type>语句插入到nebula中。

综上所述,我们可以采用table来代替tag和edge_type,并作为tableMeta中的tableType字段。

## 2 项目架构

## 代码架构

***以NebulaGraphWriter开发为例,其代码开发主要分为五大部分:***

第一部分为继承Writer抽象类的NebulaGraphWriter的主程序开发,其中需要实现Job和Task的各个方法,完成Writer插件与DataX框架的接入。其最核心的部分为startWrite方法的开发,为降低整个项目的耦合性,将业务逻辑独立出来,归入到DataHandler部分进行实现。

第二部分为DataHandler的开发与实现,也就是整个Writer插件业务逻辑部分的具体实现,其中最核心的为handle方法的实现。handle方法中首先通过DriverManager获取数据库连接对象,连接到NebulaGraph中。然后初始化SchemaManager对象,并加载数据库元信息,主要包括标签元信息TagMeta,边类型元信息EdgeTypeMeta (此处将TagMeta和EdgeTypeMeta均合并到TableMeta中),以及字段元信息ColumnMeta,完成数据库静态信息的匹配。之后构造一个ArrayList用于记录从reader端通过DataX channel发送到writer端的Record包集合。整个写入逻辑中需要封装到writeBatch方法中,需要根据规定的Batch大小限定每次写入的批量大小,写入方法通过组装拼接nebula nGql的DDL insert语句实现。

第三部分则是对应的各种元信息的定义,需要明确各种元信息对应的属性参数。元信息需要与用户配置的参数对应匹配。在DataHandler的构造函数中需要通过用户配置的json参数文件,读取出其中各项配置Key,尤其是Column字段和Tag以及EdgeType字段。

第四部分为配置信息,主要是Key和Constants,其包括对应的配置文件的各种参数,还有对应的常量信息,如批量大小的size设置。

第五部分则是SchemaManager,用于实现meta元信息的匹配,加载和创建。

## 3 项目数据链路及流程图

用户需要配置的job.json中有很多重要的参数,其中最主要的就是columns字段和tables字段,其中包含数据库对应的字段信息以及需要同步的表信息。

以Mysql为Reader,nebula为Writer:

首先我们需要创建好两个数据库,MySQL则构建多对多型数据库,实体类为论文,作者,类别,连接类为书籍的著作所有权,类别所属情况。nebula则需要构建标签论文,作者以及类别,边类型包括著作所有权和类别所属表,论文和作者的对应关系,论文和类别的对应关系都是多对多关系),这种情况是需要单独开设额外的表来存储实体类之间的关系的。

![image-20221028141133870](/Users/eldinzhou/Library/Application Support/typora-user-images/image-20221028141133870.png)

## 4 项目文档

项目Reader和Writer插件的详细文档说明请参考如下链接:

***NebulaGraphReader***: [NebulaGraphReader插件说明文档](./nebulagraphreader/doc/nebulagraphreader.md)

***NebulaGraphWriter***: [NebulaGraphWriter插件说明文档](./nebulagraphwriter/doc/nebulagraphwriter-CN.md)

项目GitHub链接:https://github.com/nebula-contrib/nebula-datax-plugin/pull/1

可参考链接:https://github.com/nebula-contrib/DataX



214 changes: 214 additions & 0 deletions nebulagraphreader/doc/nebulagraphreader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
# DataX NebulaGraphReader

## 1 快速介绍

NebulaGraphReader插件实现了从NebulaGraph中读取数据。在底层实现上,NebulaGraphReader通过JDBC连接远程NebulaGraph数据库,并执行nGql语句(默认采用LOOKUP语句,后续可考虑扩展MATCH FETCH GO等更复杂功能强大的nGql查询语句)从NebulaGraph同空间中查询出来。

## 2 实现原理

简而言之,NebulaGraphReader通过JDBC连接器连接到远程的NebulaGraph数据库,并通过用户配置信息生成查询nGql语句,发送给远程NebulaGraph数据库,并将该nGql执行返回结果使用DataX框架中支持的数据类型封装成抽象数据集(record形式),并传递给下游Writer端处理。

对于用于配置的table, column, where信息,NebulaGraphReader将其拼接为nGql语句发送到NebulaGraph数据库中;而对于用户配置的querySql信息,则经过校验后直接发送给NebulaGraph数据库中。

## 3 功能说明

### 3.1 配置样例

- 配置一个从NebulaGraph数据库同步数据到本地内存的作业:

```json
{
"job": {
"content": [
{
"reader": {
"name": "nebulagraphreader",
"parameter": {
"username": "root",
"password": "nebula",
"connection": [
{
"table": [
"player"
],
"jdbcUrl": [
"jdbc:nebula://cba"
]
}
],
"column": [
"name",
"age"
],
"where": "player.age >= 22"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```

- 配置一个用户定义的nGql语句从NebulaGraph数据库同步数据到本地内存的作业:

```json
{
"job": {
"content": [
{
"reader": {
"name": "nebulagraphreader",
"parameter": {
"username": "root",
"password": "nebula",
"connection": [
{
"querySql": [
"lookup on player where player.age >= 22 yield properties(vertex).name, properties(vertex).age"
],
"jdbcUrl": [
"jdbc:nebula://cba"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
```



### 3.2 参数说明

- **jdbcUrl**
- 描述:目标数据源的JDBC连接信息,注意,jdbcUrl必须包含在connection配置单元中.NebulaGraph的JDBC信息请参考:[nebula-jdbc连接器的使用](https://github.com/nebula-contrib/nebula-jdbc)
- 必选:是
- 默认值:无
- **username**
- 描述:数据库用户名
- 必选:是
- 默认值:无
- **password**
- 描述:用户名密码
- 必选:是
- 默认值:无
- **table**
- 描述:表名的集合,图数据库NebulaGraph在DataX数据同步语境中的表概念可以理解成标签和边类型,table应当包含column参数重的所有列,使用 JSON 的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一 schema 结构, NebulaGraphReader不予检查表是否同一逻辑表。注意,table必须包含在 connection 配置单元中。
- 必选:是(除非使用querySql,否则为必选项)
- 默认值:无
- **querySql**
- 描述:在某些业务场景中,where不足以描述所要求筛选的条件,用户可以通过自定义配置querySql中的nGql语句,供插件直接使用,并向NebulaGraph发送查询语句。当用户配置querySql后,table,column和where字段就会被自动忽略。例如需要同时操作多个标签或者边类型中的数据时,则可以使用该配置。
- 必选:否
- 默认值:无
- **column**
- 描述:需同步字段的集合,字段的顺序应与record中的column的顺序一致,即需要与writer端的column字段顺序和名称一一对应。
- 必选:是
- 默认值:无
- **where**
- 描述:筛选条件中的where子句,NebulaGraphReader根据指定的column, table, where条件拼接nGql,并根据该nGql进行筛选数据。
- 必选:否
- 默认值:无


### 3.3 类型转换

DataX中的数据类型与NebulaGraph中数据类型的映射转换关系

| DataX内部类型 | NebulaGraph数据类型 |
| :------------ | :----------------------------------------- |
| LONG | INT INT64 INT32 INT16 INT8 |
| DOUBLE | FLOAT DOUBLE |
| STRING | FIXED_STRING(N) STRING |
| BOOLEAN | BOOL |
| BYTES | 暂无对应数据类型 |
| DATE | DATE TIME DATETIME(暂不支持,后续完善支持) |

### 3.4 关系型数据库到NebulaGraph的参考示例

| 数据迁移示例 | 配置的示例 |
| ------------------ | ------------------------------------------------------------ |
| MySQL到NebulaGraph | [NebulaGraph到关系型数据库 标签->点表](../src/test/resources/n2mysql.json) |
| 待补充 | |

## 4. 性能报告

### 4.1 环境准备

#### 4.1.1 数据特征

建表语句:

单行记录类似于:

#### 4.1.2 机器参数

* 执行DataX的机器参数为:
1. cpu:
2. mem:
3. net: 千兆双网卡
4. disc: DataX 数据不落磁盘,不统计此项

* NebulaGraph数据库机器参数为:
1. cpu:
2. mem:
3. net: 千兆双网卡
4. disc:

#### 4.1.3 DataX jvm 参数

```
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
```

### 4.2 测试报告

#### 4.2.1 单表测试报告

| 通道数 | DataX速度(Rec/s) | DataX流量(MB/s) | DataX机器网卡流出流量(MB/s) | DataX机器运行负载 | DB网卡进入流量(MB/s) | DB运行负载 | DB TPS |
| ------ | ---------------- | --------------- | --------------------------- | ----------------- | -------------------- | ---------- | ------ |
| 1 | | | | | | | |
| 4 | | | | | | | |
| 8 | | | | | | | |
| 16 | | | | | | | |
| 32 | | | | | | | |

说明:

1.

#### 4.2.4 性能测试小结



## 5 约束限制



## FAQ
108 changes: 108 additions & 0 deletions nebulagraphreader/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>com.alibaba.datax.nebulagraphreader</groupId>
<artifactId>nebulagraphreader</artifactId>
<name>nebulagraphreader</name>
<version>0.0.1-SNAPSHOT</version>

<packaging>jar</packaging>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.nebula-contrib</groupId>
<artifactId>nebula-jdbc</artifactId>
<version>3.0.0</version>
</dependency>

<dependency>
<groupId>com.vesoft</groupId>
<artifactId>client</artifactId>
<version>3.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${jdk-version}</source>
<target>${jdk-version}</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors> <!--描述文件路径-->
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase> <!-- 绑定到package生命周期阶段上 -->
<goals>
<goal>single</goal> <!-- 只运行一次 -->
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<!-- 包含哪些测试用例 -->
<includes>
<include>**/*Test.java</include>
</includes>
<!-- 不包含哪些测试用例 -->
<excludes>
</excludes>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>

</plugins>
</build>

</project>
Loading