Skip to content

Commit

Permalink
[Improve][Connector-v2-Fake]Supports direct definition of data values…
Browse files Browse the repository at this point in the history
…(row) (#2839)

* [Improve][Connector-v2]Supports direct definition of data values(row)
  • Loading branch information
laglangyue authored Sep 26, 2022
1 parent b6f7e77 commit b7d9dde
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 3 deletions.
8 changes: 8 additions & 0 deletions docs/en/connector-v2/source/FakeSource.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,21 @@ just for testing, such as type conversion and feature testing
|-------------------|--------|----------|---------------|
| result_table_name | string | yes | - |
| schema | config | yes | - |
| row.num | long | no | 10 |

### result_table_name [string]

The table name.

### type [string]

Table structure description ,you should assign schema option to tell connector how to parse data to the row you want.
**Tips**: Most of Unstructured-Datasource contain this param, such as LocalFile,HdfsFile.
**Example**:

### row.num
Number of additional rows of generated data

```hocon
schema = {
fields {
Expand All @@ -55,7 +61,9 @@ schema = {
```

## Example

Simple source for FakeSource which contains enough datatype

```hocon
source {
FakeSource {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.fake.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

public class FakeOptions implements Serializable {

private static final String ROW_NUM = "row.num";
private static final Long DEFAULT_ROW_NUM = 10L;
@Getter
@Setter
private Long rowNum;

public static FakeOptions parse(Config config) {
FakeOptions fakeOptions = new FakeOptions();
fakeOptions.setRowNum(config.hasPath(ROW_NUM) ? config.getLong(ROW_NUM) : DEFAULT_ROW_NUM);
return fakeOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class FakeSource extends AbstractSingleSplitSource<SeaTunnelRow> {
private Config pluginConfig;
private JobContext jobContext;
private SeaTunnelSchema schema;
private FakeOptions fakeOptions;

@Override
public Boundedness getBoundedness() {
Expand All @@ -51,7 +52,7 @@ public SeaTunnelRowType getProducedType() {

@Override
public AbstractSingleSplitReader<SeaTunnelRow> createReader(SingleSplitReaderContext readerContext) throws Exception {
return new FakeSourceReader(readerContext, new FakeRandomData(schema));
return new FakeSourceReader(readerContext, new FakeRandomData(schema), fakeOptions);
}

@Override
Expand All @@ -64,6 +65,7 @@ public void prepare(Config pluginConfig) {
this.pluginConfig = pluginConfig;
assert pluginConfig.hasPath(FakeRandomData.SCHEMA);
this.schema = SeaTunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeRandomData.SCHEMA));
this.fakeOptions = FakeOptions.parse(pluginConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
private final SingleSplitReaderContext context;

private final FakeRandomData fakeRandomData;
private final FakeOptions options;

public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData randomData) {
public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData randomData, FakeOptions options) {
this.context = context;
this.fakeRandomData = randomData;
this.options = options;
}

@Override
Expand All @@ -53,7 +55,7 @@ public void close() {
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
// Generate a random number of rows to emit.
for (int i = 0; i < 10; i++) {
for (int i = 0; i < options.getRowNum(); i++) {
SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow();
output.collect(seaTunnelRow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ env {

source {
FakeSource {
row.num = 16
result_table_name = "fake"
schema = {
fields {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ env {
source {
# This is a example input plugin **only for test and demonstrate the feature input plugin**
FakeSource {
row.num = 16
schema = {
fields {
c_map = "map<string, string>"
Expand Down

0 comments on commit b7d9dde

Please sign in to comment.