Skip to content

Commit

Permalink
update unit
Browse files Browse the repository at this point in the history
  • Loading branch information
laglangyue committed Aug 18, 2022
1 parent 8623bce commit 3cc2d9f
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.seatunnel.connectors.seatunnel.common.schema;

import java.io.Serializable;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
Expand All @@ -33,6 +32,7 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;

import java.io.Serializable;
import java.util.Map;

public class SeatunnelSchema implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,27 @@
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
import static org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE;

import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;

import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class FakeRandomData {

private final SeatunnelSchema schema;
Expand All @@ -64,6 +67,7 @@ public SeaTunnelRow randomRow() {
return new SeaTunnelRow(randomRow.toArray());
}

@SuppressWarnings("magicnumber")
private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
if (BOOLEAN_TYPE.equals(fieldType)) {
return RandomUtils.nextInt(0, 2) == 1;
Expand Down Expand Up @@ -104,17 +108,19 @@ private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
Object key = randomColumnValue(keyType);
SeaTunnelDataType<?> valueType = mapType.getValueType();
Object value = randomColumnValue(valueType);
return new HashMap<Object, Object>() {{
put(key, value);
}};
HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
objectObjectHashMap.put(key, value);
return objectObjectHashMap;
} else if (fieldType instanceof PrimitiveByteArrayType) {
return RandomUtils.nextBytes(10);
} else if (VOID_TYPE.equals(fieldType) || fieldType == null) {
return Void.TYPE;
} else {
throw new IllegalStateException("Unexpected value: " + fieldType);
}
}


@SuppressWarnings("magicnumber")
private LocalDateTime randomLocalDateTime() {
return LocalDateTime.of(
LocalDateTime.now().getYear(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {

private final FakeRandomData fakeRandomData;

public FakeSourceReader(SingleSplitReaderContext context,FakeRandomData randomData) {
public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData randomData) {
this.context = context;
this.fakeRandomData = randomData;
}
Expand All @@ -55,7 +55,6 @@ public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException
// Generate a random number of rows to emit.
for (int i = 0; i < 10; i++) {
SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
System.out.println(seaTunnelRow);
output.collect(seaTunnelRow);
}
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.fake.source;

import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.io.FileNotFoundException;
import java.lang.reflect.Array;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.Map;

public class FakeRandomDataTest {

@ParameterizedTest
@ValueSource(strings = {"complex.schema.conf", "simple.schema.conf"})
public void testComplexSchemaParse(String conf) throws FileNotFoundException, URISyntaxException {
Config testConfigFile = getTestConfigFile(conf);
SeatunnelSchema seatunnelSchema = SeatunnelSchema.buildWithConfig(testConfigFile);
FakeRandomData fakeRandomData = new FakeRandomData(seatunnelSchema);
SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
Assertions.assertNotNull(seaTunnelRow);
Object[] fields = seaTunnelRow.getFields();
Assertions.assertNotNull(fields);
SeaTunnelRowType seaTunnelRowType = seatunnelSchema.getSeaTunnelRowType();
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getSqlType() != SqlType.NULL) {
Assertions.assertNotNull(fields[i]);
} else {
Assertions.assertSame(fields[i], Void.TYPE);
}
if (fieldTypes[i].getSqlType() == SqlType.MAP) {
Assertions.assertTrue(fields[i] instanceof Map);
Map<?, ?> field = (Map) fields[i];
field.forEach((k, v) -> Assertions.assertTrue(k != null && v != null));
}
if (fieldTypes[i].getSqlType() == SqlType.ARRAY) {
Assertions.assertTrue(fields[i].getClass().isArray());
Assertions.assertNotNull(Array.get(fields[i], 0));
}
}
}

private Config getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
if (!configFile.startsWith("/")) {
configFile = "/" + configFile;
}
URL resource = FakeRandomDataTest.class.getResource(configFile);
if (resource == null) {
throw new FileNotFoundException("Can't find config file: " + configFile);
}
String path = Paths.get(resource.toURI()).toString();
Config config = ConfigFactory.parseFile(new File(path));
assert config.hasPath("schema");
return config.getConfig("schema");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

schema {
fields {
map = "map<string, map<string, string>>"
map_array = "map<string, map<string, array<int>>>"
array = "array<tinyint>"
string = string
boolean = boolean
tinyint = tinyint
smallint = smallint
int = int
bigint = bigint
float = float
double = double
decimal = "decimal(30, 8)"
null = "null"
bytes = bytes
date = date
time = time
timestamp = timestamp
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

schema {
fields {
map = "map<string, string>"
array = "array<tinyint>"
string = string
boolean = boolean
tinyint = tinyint
smallint = smallint
int = int
bigint = bigint
float = float
double = double
decimal = "decimal(30, 8)"
null = "null"
bytes = bytes
date = date
time = time
timestamp = timestamp
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ transform {

# you can also use other transform plugins, such as sql
sql {
sql = "select c_string,c_array from fake"
sql = "select c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_decimal,c_null,c_bytes from fake"
result_table_name = "sql"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.spark.unsafe.types.UTF8String;

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
Expand Down

0 comments on commit 3cc2d9f

Please sign in to comment.