Skip to content

Commit 3c70720

Browse files
committed
ck-8803
1 parent 9d201bb commit 3c70720

File tree

5 files changed

+46
-210
lines changed

5 files changed

+46
-210
lines changed

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java

+14-32
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,30 @@
1818
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
1919

2020
import org.apache.seatunnel.api.source.Boundedness;
21-
import org.apache.seatunnel.api.source.SeaTunnelSource;
22-
import org.apache.seatunnel.api.source.SourceReader;
23-
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
24-
import org.apache.seatunnel.api.source.SupportColumnProjection;
25-
import org.apache.seatunnel.api.source.SupportParallelism;
2621
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2722
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
28-
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2924

3025
import com.clickhouse.client.ClickHouseNode;
26+
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
27+
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
28+
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
3129

3230
import java.util.Collections;
3331
import java.util.List;
3432

35-
public class ClickhouseSource
36-
implements SeaTunnelSource<SeaTunnelRow, ClickhouseSourceSplit, ClickhouseSourceState>,
37-
SupportParallelism,
38-
SupportColumnProjection {
33+
public class ClickhouseSource extends AbstractSingleSplitSource<SeaTunnelRow> {
3934

40-
private List<ClickHouseNode> servers;
41-
private CatalogTable catalogTable;
42-
private String sql;
35+
private final List<ClickHouseNode> servers;
36+
private final CatalogTable catalogTable;
37+
private final String sql;
38+
private final SeaTunnelRowType rowTypeInfo;
4339

44-
public ClickhouseSource(List<ClickHouseNode> servers, CatalogTable catalogTable, String sql) {
40+
public ClickhouseSource(List<ClickHouseNode> servers, CatalogTable catalogTable, String sql, SeaTunnelRowType rowTypeInfo) {
4541
this.servers = servers;
4642
this.catalogTable = catalogTable;
4743
this.sql = sql;
44+
this.rowTypeInfo = rowTypeInfo;
4845
}
4946

5047
@Override
@@ -62,25 +59,10 @@ public List<CatalogTable> getProducedCatalogTables() {
6259
return Collections.singletonList(catalogTable);
6360
}
6461

65-
@Override
66-
public SourceReader<SeaTunnelRow, ClickhouseSourceSplit> createReader(
67-
SourceReader.Context readerContext) throws Exception {
68-
return new ClickhouseSourceReader(
69-
servers, readerContext, this.catalogTable.getSeaTunnelRowType(), sql);
70-
}
71-
72-
@Override
73-
public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> createEnumerator(
74-
SourceSplitEnumerator.Context<ClickhouseSourceSplit> enumeratorContext)
75-
throws Exception {
76-
return new ClickhouseSourceSplitEnumerator(enumeratorContext);
77-
}
7862

7963
@Override
80-
public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> restoreEnumerator(
81-
SourceSplitEnumerator.Context<ClickhouseSourceSplit> enumeratorContext,
82-
ClickhouseSourceState checkpointState)
83-
throws Exception {
84-
return new ClickhouseSourceSplitEnumerator(enumeratorContext);
64+
public AbstractSingleSplitReader<SeaTunnelRow> createReader(
65+
SingleSplitReaderContext readerContext) {
66+
return new ClickhouseSourceReader(servers, readerContext, sql, rowTypeInfo);
8567
}
8668
}

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@
2222
import org.apache.seatunnel.api.configuration.util.OptionRule;
2323
import org.apache.seatunnel.api.source.SeaTunnelSource;
2424
import org.apache.seatunnel.api.source.SourceSplit;
25-
import org.apache.seatunnel.api.table.catalog.CatalogTable;
26-
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
27-
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
28-
import org.apache.seatunnel.api.table.catalog.TableSchema;
25+
import org.apache.seatunnel.api.table.catalog.*;
2926
import org.apache.seatunnel.api.table.connector.TableSource;
3027
import org.apache.seatunnel.api.table.factory.Factory;
3128
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
3229
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
30+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
3331
import org.apache.seatunnel.common.constants.PluginType;
3432
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
3533
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
@@ -67,7 +65,7 @@ public String factoryIdentifier() {
6765
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
6866
ReadonlyConfig readonlyConfig = context.getOptions();
6967
List<ClickHouseNode> nodes = ClickhouseUtil.createNodes(readonlyConfig);
70-
68+
SeaTunnelRowType rowTypeInfo = CatalogTableUtil.buildWithConfig(readonlyConfig).getSeaTunnelRowType();
7169
String sql = readonlyConfig.get(SQL);
7270
ClickHouseNode currentServer = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
7371
try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol());
@@ -103,7 +101,7 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
103101
catalogName);
104102
return () ->
105103
(SeaTunnelSource<T, SplitT, StateT>)
106-
new ClickhouseSource(nodes, catalogTable, sql);
104+
new ClickhouseSource(nodes, catalogTable, sql, rowTypeInfo);
107105
} catch (ClickHouseException e) {
108106
throw new ClickhouseConnectorException(
109107
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java

+28-54
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.seatunnel.api.source.Boundedness;
2121
import org.apache.seatunnel.api.source.Collector;
22-
import org.apache.seatunnel.api.source.SourceReader;
2322
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2423
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2524
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
@@ -30,36 +29,33 @@
3029
import com.clickhouse.client.ClickHouseRequest;
3130
import com.clickhouse.client.ClickHouseResponse;
3231
import lombok.extern.slf4j.Slf4j;
32+
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
33+
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
3334

3435
import java.io.IOException;
35-
import java.util.ArrayList;
36-
import java.util.Collections;
3736
import java.util.List;
3837
import java.util.Random;
3938

4039
@Slf4j
41-
public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow, ClickhouseSourceSplit> {
40+
public class ClickhouseSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
4241

4342
private final List<ClickHouseNode> servers;
4443
private ClickHouseClient client;
4544
private final SeaTunnelRowType rowTypeInfo;
46-
private final SourceReader.Context readerContext;
45+
private final SingleSplitReaderContext readerContext;
4746
private ClickHouseRequest<?> request;
4847
private final String sql;
49-
private volatile boolean noMoreSplit;
5048

51-
private final List<ClickhouseSourceSplit> splits;
5249

5350
ClickhouseSourceReader(
5451
List<ClickHouseNode> servers,
55-
SourceReader.Context readerContext,
56-
SeaTunnelRowType rowTypeInfo,
57-
String sql) {
52+
SingleSplitReaderContext readerContext,
53+
String sql,
54+
SeaTunnelRowType rowTypeInfo) {
5855
this.servers = servers;
5956
this.readerContext = readerContext;
60-
this.rowTypeInfo = rowTypeInfo;
6157
this.sql = sql;
62-
this.splits = new ArrayList<>();
58+
this.rowTypeInfo = rowTypeInfo;
6359
}
6460

6561
@Override
@@ -80,31 +76,28 @@ public void close() throws IOException {
8076
@Override
8177
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
8278
synchronized (output.getCheckpointLock()) {
83-
if (!splits.isEmpty()) {
84-
try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) {
85-
response.stream()
86-
.forEach(
87-
record -> {
88-
Object[] values =
89-
new Object[this.rowTypeInfo.getFieldNames().length];
90-
for (int i = 0; i < record.size(); i++) {
91-
if (record.getValue(i).isNullOrEmpty()) {
92-
values[i] = null;
93-
} else {
94-
values[i] =
95-
TypeConvertUtil.valueUnwrap(
96-
this.rowTypeInfo.getFieldType(i),
97-
record.getValue(i));
98-
}
79+
80+
try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) {
81+
response.stream()
82+
.forEach(
83+
record -> {
84+
Object[] values =
85+
new Object[this.rowTypeInfo.getFieldNames().length];
86+
for (int i = 0; i < record.size(); i++) {
87+
if (record.getValue(i).isNullOrEmpty()) {
88+
values[i] = null;
89+
} else {
90+
values[i] =
91+
TypeConvertUtil.valueUnwrap(
92+
this.rowTypeInfo.getFieldType(i),
93+
record.getValue(i));
9994
}
100-
output.collect(new SeaTunnelRow(values));
101-
});
102-
}
103-
signalNoMoreElement();
95+
}
96+
output.collect(new SeaTunnelRow(values));
97+
});
10498
}
105-
if (noMoreSplit
106-
&& splits.isEmpty()
107-
&& Boundedness.BOUNDED.equals(readerContext.getBoundedness())) {
99+
signalNoMoreElement();
100+
if (Boundedness.BOUNDED.equals(readerContext.getBoundedness())) {
108101
signalNoMoreElement();
109102
}
110103
}
@@ -113,24 +106,5 @@ record -> {
113106
private void signalNoMoreElement() {
114107
log.info("Closed the bounded ClickHouse source");
115108
this.readerContext.signalNoMoreElement();
116-
this.splits.clear();
117109
}
118-
119-
@Override
120-
public List<ClickhouseSourceSplit> snapshotState(long checkpointId) throws Exception {
121-
return Collections.emptyList();
122-
}
123-
124-
@Override
125-
public void addSplits(List<ClickhouseSourceSplit> splits) {
126-
this.splits.addAll(splits);
127-
}
128-
129-
@Override
130-
public void handleNoMoreSplits() {
131-
noMoreSplit = true;
132-
}
133-
134-
@Override
135-
public void notifyCheckpointComplete(long checkpointId) throws Exception {}
136110
}

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java

-27
This file was deleted.

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java

-91
This file was deleted.

0 commit comments

Comments
 (0)