Skip to content

Commit

Permalink
[Imporve][Fake-Connector-V2]support user-defined-schmea and random da…
Browse files Browse the repository at this point in the history
…ta for fake-table (apache#2406)

* [Connector-V2][JDBC-connector] optimization fake
Co-authored-by: tangjiafu <tangjiafu@corp.netease.com>
  • Loading branch information
laglangyue authored and TyrantLucifer committed Sep 18, 2022
1 parent ddb5513 commit ebb90ba
Show file tree
Hide file tree
Showing 34 changed files with 604 additions and 111 deletions.
12 changes: 5 additions & 7 deletions docs/en/connector-v2/sink/Assert.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@ A list value rule define the data value validation
### rule_type [string]

The following rules are supported for now
`
NOT_NULL, // value can't be null
MIN, // define the minimum value of data
MAX, // define the maximum value of data
MIN_LENGTH, // define the minimum string length of a string data
MAX_LENGTH // define the maximum string length of a string data
`
- NOT_NULL `value can't be null`
- MIN `define the minimum value of data`
- MAX `define the maximum value of data`
- MIN_LENGTH `define the minimum string length of a string data`
- MAX_LENGTH `define the maximum string length of a string data`

### rule_value [double]

Expand Down
76 changes: 76 additions & 0 deletions docs/en/connector-v2/source/FakeSource.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# FakeSource

> FakeSource connector
## Description

The FakeSource is a virtual data source, which randomly generates the number of rows according to the data structure of the user-defined schema,
just for testing, such as type conversion and feature testing

## Options

| name | type | required | default value |
|-------------------|--------|----------|---------------|
| result_table_name | string | yes | - |
| schema | config | yes | - |

### result_table_name [string]

The table name.

### type [string]
Table structure description ,you should assign schema option to tell connector how to parse data to the row you want.
**Tips**: Most of Unstructured-Datasource contain this param, such as LocalFile,HdfsFile.
**Example**:
```hocon
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_time = time
c_timestamp = timestamp
}
}
```

## Example
Simple source for FakeSource which contains enough datatype
```hocon
source {
FakeSource {
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_time = time
c_timestamp = timestamp
}
}
result_table_name = "fake"
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;

import java.io.Serializable;
import java.util.Map;

public class SeatunnelSchema {
public class SeatunnelSchema implements Serializable {
public static final String SCHEMA = "schema";
private static final String FIELD_KEY = "fields";
private static final String SIMPLE_SCHEMA_FILED = "content";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.fake.source;

import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.SHORT_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;

import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class FakeRandomData {
public static final String SCHEMA = "schema";
private final SeatunnelSchema schema;

public FakeRandomData(SeatunnelSchema schema) {
this.schema = schema;
}

public SeaTunnelRow randomRow() {
SeaTunnelRowType seaTunnelRowType = schema.getSeaTunnelRowType();
String[] fieldNames = seaTunnelRowType.getFieldNames();
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
List<Object> randomRow = new ArrayList<>(fieldNames.length);
for (SeaTunnelDataType<?> fieldType : fieldTypes) {
randomRow.add(randomColumnValue(fieldType));
}
return new SeaTunnelRow(randomRow.toArray());
}

@SuppressWarnings("magicnumber")
private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
if (BOOLEAN_TYPE.equals(fieldType)) {
return RandomUtils.nextInt(0, 2) == 1;
} else if (BYTE_TYPE.equals(fieldType)) {
return (byte) RandomUtils.nextInt(0, 255);
} else if (SHORT_TYPE.equals(fieldType)) {
return (short) RandomUtils.nextInt(Byte.MAX_VALUE, Short.MAX_VALUE);
} else if (INT_TYPE.equals(fieldType)) {
return RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE);
} else if (LONG_TYPE.equals(fieldType)) {
return RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE);
} else if (FLOAT_TYPE.equals(fieldType)) {
return RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE);
} else if (DOUBLE_TYPE.equals(fieldType)) {
return RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE);
} else if (STRING_TYPE.equals(fieldType)) {
return RandomStringUtils.randomAlphabetic(10);
} else if (LocalTimeType.LOCAL_DATE_TYPE.equals(fieldType)) {
return randomLocalDateTime().toLocalDate();
} else if (LocalTimeType.LOCAL_TIME_TYPE.equals(fieldType)) {
return randomLocalDateTime().toLocalTime();
} else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(fieldType)) {
return randomLocalDateTime();
} else if (fieldType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) fieldType;
return new BigDecimal(RandomStringUtils.randomNumeric(decimalType.getPrecision() - decimalType.getScale()) + "." +
RandomStringUtils.randomNumeric(decimalType.getScale()));
} else if (fieldType instanceof ArrayType) {
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
BasicType<?> elementType = arrayType.getElementType();
Object value = randomColumnValue(elementType);
Object arr = Array.newInstance(elementType.getTypeClass(), 1);
Array.set(arr, 0, value);
return arr;
} else if (fieldType instanceof MapType) {
MapType<?, ?> mapType = (MapType<?, ?>) fieldType;
SeaTunnelDataType<?> keyType = mapType.getKeyType();
Object key = randomColumnValue(keyType);
SeaTunnelDataType<?> valueType = mapType.getValueType();
Object value = randomColumnValue(valueType);
HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
objectObjectHashMap.put(key, value);
return objectObjectHashMap;
} else if (fieldType instanceof PrimitiveByteArrayType) {
return RandomUtils.nextBytes(100);
} else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
return Void.TYPE;
} else {
throw new UnsupportedOperationException("Unexpected value: " + fieldType);
}
}

@SuppressWarnings("magicnumber")
private LocalDateTime randomLocalDateTime() {
return LocalDateTime.of(
LocalDateTime.now().getYear(),
RandomUtils.nextInt(1, 12),
RandomUtils.nextInt(1, LocalDateTime.now().getDayOfMonth()),
RandomUtils.nextInt(0, 24),
RandomUtils.nextInt(0, 59)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
Expand All @@ -38,6 +37,7 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {

private Config pluginConfig;
private SeaTunnelContext seaTunnelContext;
private SeatunnelSchema schema;

@Override
public Boundedness getBoundedness() {
Expand All @@ -46,14 +46,12 @@ public Boundedness getBoundedness() {

@Override
public SeaTunnelRowType getProducedType() {
return new SeaTunnelRowType(
new String[]{"name", "age", "timestamp"},
new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE, BasicType.INT_TYPE, BasicType.LONG_TYPE});
return schema.getSeaTunnelRowType();
}

@Override
public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
return new FakeSourceReader(readerContext);
return new FakeSourceReader(readerContext, new FakeRandomData(schema));
}

@Override
Expand All @@ -64,6 +62,8 @@ public String getPluginName() {
@Override
public void prepare(Config pluginConfig) {
this.pluginConfig = pluginConfig;
assert pluginConfig.hasPath(FakeRandomData.SCHEMA);
this.schema = SeatunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeRandomData.SCHEMA));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {

private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);

private final SingleSplitReaderContext context;

private final String[] names = {"Wenjun", "Fanjia", "Zongwen", "CalvinKirs"};
private final int[] ages = {11, 22, 33, 44};
private final FakeRandomData fakeRandomData;

public FakeSourceReader(SingleSplitReaderContext context) {
public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData randomData) {
this.context = context;
this.fakeRandomData = randomData;
}

@Override
Expand All @@ -56,11 +53,8 @@ public void close() {
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
// Generate a random number of rows to emit.
Random random = ThreadLocalRandom.current();
int size = random.nextInt(10) + 1;
for (int i = 0; i < size; i++) {
int randomIndex = random.nextInt(names.length);
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[randomIndex], ages[randomIndex], System.currentTimeMillis()});
for (int i = 0; i < 10; i++) {
SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
output.collect(seaTunnelRow);
}
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
Expand Down
Loading

0 comments on commit ebb90ba

Please sign in to comment.