Skip to content

Commit

Permalink
Support config column/primaryKey/constraintKey in schema (apache#5564)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored Oct 8, 2023
1 parent a6e5850 commit eac76b4
Show file tree
Hide file tree
Showing 85 changed files with 2,233 additions and 328 deletions.
153 changes: 150 additions & 3 deletions docs/en/concept/schema-feature.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,55 @@

## Why we need schema

Some NoSQL databases or message queue are not strongly limited schema, so the schema cannot be obtained through the api. At this time, a schema needs to be defined to convert to SeaTunnelRowType and obtain data.
Some NoSQL databases or message queue are not strongly limited schema, so the schema cannot be obtained through the api.
At this time, a schema needs to be defined to convert to TableSchema and obtain data.

## What type supported at now
## SchemaOptions

We can use SchemaOptions to define schema, the SchemaOptions contains some config to define the schema. e.g. columns, primaryKey, constraintKeys.

```
schema = {
columns = [
...
]
primaryKey {
...
}
constraintKeys {
...
}
}
```

### Columns

Columns is a list of config used to define the column in schema, each column can contains name, type, nullable, defaultValue, comment field.

```
columns = [
{
name = id
type = bigint
nullable = false
columnLength = 20
defaultValue = 0
comment = "primary key id"
}
]
```

| Field | Required | Default Value | Description |
|:-------------|:---------|:--------------|----------------------------------------------------------------------------------|
| name | Yes | - | The name of the column |
| type | Yes | - | The data type of the column |
| nullable | No | true | If the column can be nullable |
| columnLength | No | 0 | The length of the column which will be useful when you need to define the length |
| defaultValue | No | null | The default value of the column |
| comment | No | null | The comment of the column |

#### What type supported at now

| Data type | Value type in Java | Description |
|:----------|:---------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
Expand All @@ -26,9 +72,110 @@ Some NoSQL databases or message queue are not strongly limited schema, so the sc
| map | `java.util.Map` | A Map is an object that maps keys to values. The key type includes `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` `timestamp` `null` , and the value type includes `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `decimal` `date` `time` `timestamp` `null` `array` `map`. |
| array | `ValueType[]` | A array is a data type that represents a collection of elements. The element type includes `int` `string` `boolean` `tinyint` `smallint` `bigint` `float` `double` `array` `map`. |

### PrimaryKey

Primary key is a config used to define the primary key in schema, it contains name, columns field.

```
primaryKey {
name = id
columns = [id]
}
```

| Field | Required | Default Value | Description |
|:--------|:---------|:--------------|-----------------------------------|
| name | Yes | - | The name of the primaryKey |
| columns | Yes | - | The column list in the primaryKey |

### ConstraintKeys

Constraint keys is a list of config used to define the constraint keys in schema, it contains constraintName, constraintType, constraintColumns field.

```
constraintKeys = [
{
constraintName = "id_index"
constraintType = KEY
constraintColumns = [
{
columnName = "id"
sortType = ASC
}
]
},
]
```

| Field | Required | Default Value | Description |
|:------------------|:---------|:--------------|-------------------------------------------------------------------------------------------------------------------------------------------|
| constraintName | Yes | - | The name of the constraintKey |
| constraintType | No | KEY | The type of the constraintKey |
| constraintColumns | Yes | - | The column list in the primaryKey, each column should contains constraintType and sortType, sortType support ASC and DESC, default is ASC |

#### What constraintType supported at now

| ConstraintType | Description |
|:---------------|:------------|
| INDEX_KEY | key |
| UNIQUE_KEY | unique key |

## How to use schema

`schema` defines the format of the data,it contains`fields` properties. `fields` define the field properties,it's a K-V key-value pair, the Key is the field name and the value is field type. Here is an example.
### Recommended

```
source {
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema {
columns = [
{
name = id
type = bigint
nullable = false
defaultValue = 0
comment = "primary key id"
},
{
name = name
type = "string"
nullable = true
comment = "name"
},
{
name = age
type = int
nullable = true
comment = "age"
}
]
primaryKey {
name = "id"
columnNames = [id]
}
constraintKeys = [
{
constraintName = "unique_name"
constraintType = UNIQUE_KEY
constraintColumns = [
{
columnName = "name"
sortType = ASC
}
]
},
]
}
}
}
```

### Deprecated

If you only need to define the column, you can use fields to define the column, this is a simple way but will be remove in the future.

```
source {
Expand Down
79 changes: 66 additions & 13 deletions docs/en/connector-v2/sink/Assert.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,36 @@ A flink sink plugin which can assert illegal data by user defined rules

## Options

| name | type | required | default value |
|------------------------------------------|------------|----------|---------------|
| rules | ConfigMap | yes | - |
| rules.field_rules | string | yes | - |
| rules.field_rules.field_name | string | yes | - |
| rules.field_rules.field_type | string | no | - |
| rules.field_rules.field_value | ConfigList | no | - |
| rules.field_rules.field_value.rule_type | string | no | - |
| rules.field_rules.field_value.rule_value | double | no | - |
| rules.row_rules | string | yes | - |
| rules.row_rules.rule_type | string | no | - |
| rules.row_rules.rule_value | string | no | - |
| common-options | | no | - |
| name | type | required | default value |
|------------------------------------------------------------------------------------------------|------------|----------|---------------|
| rules | ConfigMap | yes | - |
| rules.field_rules | string | yes | - |
| rules.field_rules.field_name | string | yes | - |
| rules.field_rules.field_type | string | no | - |
| rules.field_rules.field_value | ConfigList | no | - |
| rules.field_rules.field_value.rule_type | string | no | - |
| rules.field_rules.field_value.rule_value | double | no | - |
| rules.row_rules | string | yes | - |
| rules.row_rules.rule_type | string | no | - |
| rules.row_rules.rule_value | string | no | - |
| rules.catalog_table_rule | ConfigMap | no | - |
| rules.catalog_table_rule.primary_key_rule | ConfigMap | no | - |
| rules.catalog_table_rule.primary_key_rule.primary_key_name | string | no | - |
| rules.catalog_table_rule.primary_key_rule.primary_key_columns | list | no | - |
| rules.catalog_table_rule.constraint_key_rule | ConfigList | no | - |
| rules.catalog_table_rule.constraint_key_rule.constraint_key_name | string | no | - |
| rules.catalog_table_rule.constraint_key_rule.constraint_key_type | string | no | - |
| rules.catalog_table_rule.constraint_key_rule.constraint_key_columns | ConfigList | no | - |
| rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_column_name | string | no | - |
| rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_sort_type | string | no | - |
| rules.catalog_table_rule.column_rule | ConfigList | no | - |
| rules.catalog_table_rule.column_rule.name | string | no | - |
| rules.catalog_table_rule.column_rule.type | string | no | - |
| rules.catalog_table_rule.column_rule.column_length | int | no | - |
| rules.catalog_table_rule.column_rule.nullable | boolean | no | - |
| rules.catalog_table_rule.column_rule.default_value | string | no | - |
| rules.catalog_table_rule.column_rule.comment | comment | no | - |
| common-options | | no | - |

### rules [ConfigMap]

Expand Down Expand Up @@ -61,6 +78,10 @@ The following rules are supported for now

the value related to rule type

### catalog_table_rule [ConfigMap]

Used to assert the catalog table is same with the user defined table.

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
Expand Down Expand Up @@ -117,6 +138,38 @@ Assert {
]
}
]
catalog_table_rule {
primary_key_rule = {
primary_key_name = "primary key"
primary_key_columns = ["id"]
}
constraint_key_rule = [
{
constraint_key_name = "unique_name"
constraint_key_type = UNIQUE_KEY
constraint_key_columns = [
{
constraint_key_column_name = "id"
constraint_key_sort_type = ASC
}
]
}
]
column_rule = [
{
name = "id"
type = bigint
},
{
name = "name"
type = string
},
{
name = "age"
type = int
}
]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -72,21 +73,50 @@ public <T> T get(Option<T> option, boolean flatten) {
}

public Map<String, String> toMap() {
return toMap(true);
}

public Config toConfig() {
return toConfig(true);
}

/**
* Transform to Config todo: This method should be removed after we remove Config
*
* @return Config
*/
public Config toConfig(boolean flatten) {
if (flatten) {
return ConfigFactory.parseMap(flatteningMap(confData));
}
return ConfigFactory.parseMap(confData);
}

public Map<String, String> toMap(boolean flatten) {
if (confData.isEmpty()) {
return Collections.emptyMap();
}

Map<String, String> result = new LinkedHashMap<>();
toMap(result);
toMap(result, flatten);
return result;
}

public void toMap(Map<String, String> result) {
toMap(result, true);
}

public void toMap(Map<String, String> result, boolean flatten) {
if (confData.isEmpty()) {
return;
}
Map<String, Object> flatteningMap = flatteningMap(confData);
for (Map.Entry<String, Object> entry : flatteningMap.entrySet()) {
Map<String, Object> map;
if (flatten) {
map = flatteningMap(confData);
} else {
map = confData;
}
for (Map.Entry<String, Object> entry : map.entrySet()) {
result.put(entry.getKey(), convertToJsonString(entry.getValue()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;

import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -41,8 +42,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA;

@Slf4j
public class ConfigUtil {
private static final JavaPropsMapper PROPERTIES_MAPPER = new JavaPropsMapper();
Expand Down Expand Up @@ -72,8 +71,10 @@ public static Map<String, Object> treeMap(Map<String, Object> rawMap) {
Map<String, Object> result = loadPropertiesStyleMap(properties);
// Special case, we shouldn't change key in schema config.
// TODO we should not hard code it, it should be as a config.
if (rawMap.containsKey(SCHEMA.key())) {
result.put(SCHEMA.key(), rawMap.get(SCHEMA.key()));
if (rawMap.containsKey(TableSchemaOptions.SCHEMA.key())) {
result.put(
TableSchemaOptions.SCHEMA.key(),
rawMap.get(TableSchemaOptions.SCHEMA.key()));
}
return result;
} catch (JsonProcessingException e) {
Expand Down
Loading

0 comments on commit eac76b4

Please sign in to comment.