From 2fd355d141f81bd36eb4441e1cd8e1990174fae2 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Sun, 13 Nov 2022 21:47:20 +0800 Subject: [PATCH 01/10] [Connector-V2] [Clickhouse] Improve Clickhouse File Connector --- .../clickhouse/config/ClickhouseConfig.java | 5 + .../clickhouse/config/FileReaderOption.java | 75 +------- .../sink/file/ClickhouseFileSink.java | 33 +++- .../file/ClickhouseFileSinkAggCommitter.java | 94 ++++++++++ .../sink/file/ClickhouseFileSinkFactory.java | 3 +- .../sink/file/ClickhouseFileSinkWriter.java | 160 ++++++++++-------- .../clickhouse/state/CKFileAggCommitInfo.java | 35 ++++ .../clickhouse/state/CKFileCommitInfo.java | 35 ++++ 8 files changed, 293 insertions(+), 147 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java create mode 100644 seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java create mode 100644 seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java index 62f7bd398bd..61ef2b4d705 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java @@ -97,6 +97,11 @@ public class ClickhouseConfig { .defaultValue(ClickhouseFileCopyMethod.SCP).withDescription("The method of copy Clickhouse file"); public static final String NODE_ADDRESS = "node_address"; + + public static final Option NODE_FREE_PASSWORD = Options.key("node_free_password").booleanType() + .defaultValue(false).withDescription("Because seatunnel need to use scp or rsync for file transfer, " + + "seatunnel need clickhouse server-side access. If each spark node and clickhouse server are configured with password-free login, " + + "you can configure this option to true, otherwise you need to configure the corresponding node password in the node_pass configuration"); /** * The password of Clickhouse server node */ diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java index 283c1c4f975..7ef25fffe06 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java @@ -20,10 +20,13 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; +import lombok.Data; + import java.io.Serializable; import java.util.List; import java.util.Map; +@Data public class FileReaderOption implements Serializable { private ShardMetadata shardMetadata; @@ -40,6 +43,7 @@ public FileReaderOption(ShardMetadata shardMetadata, Map tableSc List fields, String clickhouseLocalPath, ClickhouseFileCopyMethod copyMethod, Map nodeUser, + boolean nodeFreePass, Map nodePassword) { this.shardMetadata = shardMetadata; this.tableSchema = tableSchema; @@ -47,78 +51,7 @@ public FileReaderOption(ShardMetadata shardMetadata, Map tableSc this.clickhouseLocalPath = clickhouseLocalPath; this.copyMethod = copyMethod; this.nodeUser = nodeUser; - this.nodePassword = nodePassword; - } - - public SeaTunnelRowType getSeaTunnelRowType() { - return seaTunnelRowType; - } - - public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; - } - - public boolean isNodeFreePass() { - return nodeFreePass; - } - - public void setNodeFreePass(boolean nodeFreePass) { this.nodeFreePass = nodeFreePass; - } - - public String getClickhouseLocalPath() { - return clickhouseLocalPath; - } - - public void setClickhouseLocalPath(String clickhouseLocalPath) { - this.clickhouseLocalPath = clickhouseLocalPath; - } - - public ClickhouseFileCopyMethod getCopyMethod() { - return copyMethod; - } - - public void setCopyMethod(ClickhouseFileCopyMethod copyMethod) { - this.copyMethod = copyMethod; - } - - public Map getNodeUser() { - return nodeUser; - } - - public void setNodeUser(Map nodeUser) { - this.nodeUser = nodeUser; - } - - public Map getNodePassword() { - return nodePassword; - } - - public void setNodePassword(Map nodePassword) { this.nodePassword = nodePassword; } - - public ShardMetadata getShardMetadata() { - return shardMetadata; - } - - public void setShardMetadata(ShardMetadata shardMetadata) { - this.shardMetadata = shardMetadata; - } - - public Map getTableSchema() { - return tableSchema; - } - - public void setTableSchema(Map tableSchema) { - this.tableSchema = tableSchema; - } - - public List getFields() { - return fields; - } - - public void setFields(List fields) { - this.fields = fields; - } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java index e32ecd2ef04..c11b3d297c1 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java @@ -23,6 +23,7 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_ADDRESS; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY; @@ -30,7 +31,10 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; @@ -43,8 +47,8 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil; @@ -59,10 +63,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; @AutoService(SeaTunnelSink.class) -public class ClickhouseFileSink implements SeaTunnelSink { +public class ClickhouseFileSink implements SeaTunnelSink { private FileReaderOption readerOption; @@ -78,7 +83,8 @@ public void prepare(Config config) throws PrepareFailException { throw new PrepareFailException(getPluginName(), PluginType.SINK, checkResult.getMsg()); } Map defaultConfigs = ImmutableMap.builder() - .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName()) + .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName()) + .put(NODE_FREE_PASSWORD.key(), NODE_FREE_PASSWORD.defaultValue()) .build(); config = config.withFallback(ConfigFactory.parseMap(defaultConfigs)); @@ -121,7 +127,7 @@ public void prepare(Config config) throws PrepareFailException { proxy.close(); this.readerOption = new FileReaderOption(shardMetadata, tableSchema, fields, config.getString(CLICKHOUSE_LOCAL_PATH.key()), - ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, nodePassword); + ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, config.getBoolean(NODE_FREE_PASSWORD.key()), nodePassword); } @Override @@ -135,7 +141,22 @@ public SeaTunnelDataType getConsumedType() { } @Override - public SinkWriter createWriter(SinkWriter.Context context) throws IOException { + public SinkWriter createWriter(SinkWriter.Context context) throws IOException { return new ClickhouseFileSinkWriter(readerOption, context); } + + @Override + public Optional> getCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public Optional> createAggregatedCommitter() throws IOException { + return Optional.of(new ClickhouseFileSinkAggCommitter(this.readerOption)); + } + + @Override + public Optional> getAggregatedCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java new file mode 100644 index 00000000000..80ce83aa554 --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; + +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileAggCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo; + +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClickhouseFileSinkAggCommitter implements SinkAggregatedCommitter { + + private final ClickhouseProxy proxy; + private final ClickhouseTable clickhouseTable; + + public ClickhouseFileSinkAggCommitter(FileReaderOption readerOption) { + proxy = new ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode()); + clickhouseTable = proxy.getClickhouseTable(readerOption.getShardMetadata().getDatabase(), + readerOption.getShardMetadata().getTable()); + } + + @Override + public List commit(List aggregatedCommitInfo) throws IOException { + aggregatedCommitInfo.forEach(commitInfo -> commitInfo.getDetachedFiles().forEach((shard, files) -> { + try { + this.attachFileToClickhouse(shard, files); + } catch (ClickHouseException e) { + throw new SeaTunnelException("failed commit file to clickhouse", e); + } + })); + return new ArrayList<>(); + } + + @Override + public CKFileAggCommitInfo combine(List commitInfos) { + Map> files = new HashMap<>(); + commitInfos.forEach(infos -> infos.getDetachedFiles().forEach((shard, file) -> { + if (files.containsKey(shard)) { + files.get(shard).addAll(file); + } else { + files.put(shard, file); + } + })); + return new CKFileAggCommitInfo(files); + } + + @Override + public void abort(List aggregatedCommitInfo) throws Exception { + + } + + @Override + public void close() throws IOException { + proxy.close(); + } + + private void attachFileToClickhouse(Shard shard, List clickhouseLocalFiles) throws ClickHouseException { + ClickHouseRequest request = proxy.getClickhouseConnection(shard); + for (String clickhouseLocalFile : clickhouseLocalFiles) { + ClickHouseResponse response = request.query(String.format("ALTER TABLE %s ATTACH PART '%s'", + clickhouseTable.getLocalTableName(), + clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 1))).executeAndWait(); + response.close(); + } + } + +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java index 2d9ccf220e3..550cc3a7080 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java @@ -22,6 +22,7 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY; @@ -44,6 +45,6 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH) - .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_PASS).build(); + .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, NODE_PASS).build(); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java index ac590091586..c69fd63d437 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java @@ -20,16 +20,14 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption; import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter; -import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo; import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState; -import com.clickhouse.client.ClickHouseException; -import com.clickhouse.client.ClickHouseRequest; -import com.clickhouse.client.ClickHouseResponse; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -45,34 +43,39 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; @Slf4j -public class ClickhouseFileSinkWriter implements SinkWriter { +public class ClickhouseFileSinkWriter implements SinkWriter { private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/seatunnel-file"; + private static final String CLICKHOUSE_LOCAL_FILE_SUFFIX = "/local_data.log"; private static final int UUID_LENGTH = 10; private final FileReaderOption readerOption; private final ShardRouter shardRouter; private final ClickhouseProxy proxy; private final ClickhouseTable clickhouseTable; private final Map> shardLocalDataPaths; - private final Map> rowCache; + private final Map rowCache; + + private final Map shardTempFile; + + private final int random = new Random().nextInt(100000); public ClickhouseFileSinkWriter(FileReaderOption readerOption, SinkWriter.Context context) { this.readerOption = readerOption; proxy = new ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode()); shardRouter = new ShardRouter(proxy, this.readerOption.getShardMetadata()); clickhouseTable = proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(), - this.readerOption.getShardMetadata().getTable()); + this.readerOption.getShardMetadata().getTable()); rowCache = new HashMap<>(Common.COLLECTION_SIZE); - + shardTempFile = new HashMap<>(); nodePasswordCheck(); // find file local save path of each node @@ -87,7 +90,20 @@ public ClickhouseFileSinkWriter(FileReaderOption readerOption, SinkWriter.Contex @Override public void write(SeaTunnelRow element) throws IOException { Shard shard = shardRouter.getShard(element); - rowCache.computeIfAbsent(shard, k -> new ArrayList<>()).add(element); + FileChannel channel = rowCache.computeIfAbsent(shard, k -> { + try { + String uuid = UUID.randomUUID().toString().substring(0, UUID_LENGTH).replaceAll("-", "_"); + String clickhouseLocalFile = String.format("%s/%s", CLICKHOUSE_LOCAL_FILE_PREFIX, uuid); + FileUtils.forceMkdir(new File(clickhouseLocalFile)); + String clickhouseLocalFileTmpFile = clickhouseLocalFile + CLICKHOUSE_LOCAL_FILE_SUFFIX; + shardTempFile.put(shard, clickhouseLocalFileTmpFile); + return FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), StandardOpenOption.WRITE, + StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); + } catch (IOException e) { + throw new SeaTunnelException("can't create new file to save tmp data", e); + } + }); + saveDataToFile(channel, element); } private void nodePasswordCheck() { @@ -102,56 +118,53 @@ private void nodePasswordCheck() { } @Override - public Optional prepareCommit() throws IOException { - return Optional.empty(); + public Optional prepareCommit() throws IOException { + for (FileChannel channel : rowCache.values()) { + channel.close(); + } + Map> detachedFiles = new HashMap<>(); + shardTempFile.forEach((shard, path) -> { + try { + List clickhouseLocalFiles = generateClickhouseLocalFiles(path); + // move file to server + moveClickhouseLocalFileToServer(shard, clickhouseLocalFiles); + detachedFiles.put(shard, clickhouseLocalFiles); + // clear local file + clearLocalFileDirectory(clickhouseLocalFiles); + } catch (Exception e) { + throw new SeaTunnelException("handle with file failed.", e); + } + }); + rowCache.clear(); + shardTempFile.clear(); + return Optional.of(new CKFileCommitInfo(detachedFiles)); } @Override public void abortPrepare() { - } @Override public void close() throws IOException { - rowCache.forEach(this::flush); + for (FileChannel channel : rowCache.values()) { + channel.close(); + } } - private void flush(Shard shard, List rows) { - try { - // generate clickhouse local file - // TODO generate file by sub rows to save memory - List clickhouseLocalFiles = generateClickhouseLocalFiles(rows); - // move file to server - attachClickhouseLocalFileToServer(shard, clickhouseLocalFiles); - // clear local file - clearLocalFileDirectory(clickhouseLocalFiles); - } catch (Exception e) { - throw new RuntimeException("Flush data into clickhouse file error", e); - } + private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row) throws IOException { + String data = this.readerOption.getFields().stream().map(field -> row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString()) + .collect(Collectors.joining("\t")) + "\n"; + MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(), + data.getBytes(StandardCharsets.UTF_8).length); + buffer.put(data.getBytes(StandardCharsets.UTF_8)); } - private List generateClickhouseLocalFiles(List rows) throws IOException, - InterruptedException { - if (rows.isEmpty()) { - return Collections.emptyList(); - } + private List generateClickhouseLocalFiles(String clickhouseLocalFileTmpFile) throws IOException, + InterruptedException { String uuid = UUID.randomUUID().toString().substring(0, UUID_LENGTH).replaceAll("-", "_"); - String clickhouseLocalFile = String.format("%s/%s", CLICKHOUSE_LOCAL_FILE_PREFIX, uuid); - FileUtils.forceMkdir(new File(clickhouseLocalFile)); - String clickhouseLocalFileTmpFile = clickhouseLocalFile + "/local_data.log"; - try (FileChannel fileChannel = FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), StandardOpenOption.WRITE, - StandardOpenOption.READ, StandardOpenOption.CREATE_NEW)) { - String data = rows.stream() - .map(row -> this.readerOption.getFields().stream().map(field -> row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString()) - .collect(Collectors.joining("\t"))) - .collect(Collectors.joining("\n")); - MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(), - data.getBytes(StandardCharsets.UTF_8).length); - buffer.put(data.getBytes(StandardCharsets.UTF_8)); - } - List localPaths = Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(" ")) - .collect(Collectors.toList()); + .collect(Collectors.toList()); + String clickhouseLocalFile = clickhouseLocalFileTmpFile.substring(0, clickhouseLocalFileTmpFile.length() - CLICKHOUSE_LOCAL_FILE_SUFFIX.length()); List command = new ArrayList<>(localPaths); if (localPaths.size() == 1) { command.add("local"); @@ -164,17 +177,17 @@ private List generateClickhouseLocalFiles(List rows) throw command.add("\"" + "temp_table" + uuid + "\""); command.add("-q"); command.add(String.format( - "\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"", - clickhouseTable.getCreateTableDDL().replace(clickhouseTable.getDatabase() + ".", "").replaceAll("`", ""), - clickhouseTable.getLocalTableName(), - readerOption.getTableSchema().keySet().stream().map(s -> { - if (readerOption.getFields().contains(s)) { - return s; - } else { - return "NULL"; - } - }).collect(Collectors.joining(",")), - uuid)); + "\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"", + clickhouseTable.getCreateTableDDL().replace(clickhouseTable.getDatabase() + ".", "").replaceAll("`", ""), + clickhouseTable.getLocalTableName(), + readerOption.getTableSchema().keySet().stream().map(s -> { + if (readerOption.getFields().contains(s)) { + return s; + } else { + return "NULL"; + } + }).collect(Collectors.joining(",")), + uuid)); command.add("--path"); command.add("\"" + clickhouseLocalFile + "\""); log.info("Generate clickhouse local file command: {}", String.join(" ", command)); @@ -189,6 +202,14 @@ private List generateClickhouseLocalFiles(List rows) throw log.info(line); } } + try (InputStream inputStream = start.getErrorStream(); + InputStreamReader inputStreamReader = new InputStreamReader(inputStream); + BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) { + String line; + while ((line = bufferedReader.readLine()) != null) { + log.error(line); + } + } start.waitFor(); File file = new File(clickhouseLocalFile + "/data/_local/" + clickhouseTable.getLocalTableName()); if (!file.exists()) { @@ -199,12 +220,20 @@ private List generateClickhouseLocalFiles(List rows) throw throw new RuntimeException("clickhouse local file not exists"); } return Arrays.stream(files) - .filter(File::isDirectory) - .filter(f -> !"detached".equals(f.getName())) - .map(File::getAbsolutePath).collect(Collectors.toList()); + .filter(File::isDirectory) + .filter(f -> !"detached".equals(f.getName())) + .map(f -> { + File newFile = new File(f.getParent() + "/" + f.getName() + "_" + random); + if (f.renameTo(newFile)) { + return newFile; + } else { + log.warn("rename file failed, will continue move file, but maybe cause file conflict"); + return f; + } + }).map(File::getAbsolutePath).collect(Collectors.toList()); } - private void attachClickhouseLocalFileToServer(Shard shard, List clickhouseLocalFiles) throws ClickHouseException { + private void moveClickhouseLocalFileToServer(Shard shard, List clickhouseLocalFiles) { String hostAddress = shard.getNode().getAddress().getHostName(); String user = readerOption.getNodeUser().getOrDefault(hostAddress, "root"); String password = readerOption.getNodePassword().getOrDefault(hostAddress, null); @@ -212,13 +241,6 @@ private void attachClickhouseLocalFileToServer(Shard shard, List clickho fileTransfer.init(); fileTransfer.transferAndChown(clickhouseLocalFiles, shardLocalDataPaths.get(shard).get(0) + "detached/"); fileTransfer.close(); - ClickHouseRequest request = proxy.getClickhouseConnection(shard); - for (String clickhouseLocalFile : clickhouseLocalFiles) { - ClickHouseResponse response = request.query(String.format("ALTER TABLE %s ATTACH PART '%s'", - clickhouseTable.getLocalTableName(), - clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 1))).executeAndWait(); - response.close(); - } } private void clearLocalFileDirectory(List clickhouseLocalFiles) { @@ -227,7 +249,7 @@ private void clearLocalFileDirectory(List clickhouseLocalFiles) { try { File file = new File(localFileDir); if (file.exists()) { - FileUtils.deleteDirectory(new File(localFileDir)); + FileUtils.deleteDirectory(file); } } catch (IOException e) { throw new RuntimeException("Unable to delete directory " + localFileDir, e); diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java new file mode 100644 index 00000000000..9962b9dcb0d --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileAggCommitInfo.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.state; + +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +@AllArgsConstructor +public class CKFileAggCommitInfo implements Serializable { + + private Map> detachedFiles; + +} diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java new file mode 100644 index 00000000000..1b5399b7c8b --- /dev/null +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/state/CKFileCommitInfo.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.clickhouse.state; + +import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +@Data +@AllArgsConstructor +public class CKFileCommitInfo implements Serializable { + + private Map> detachedFiles; + +} From 007d8986eead63fc4089ee6c734948c9944b369d Mon Sep 17 00:00:00 2001 From: Hisoka Date: Sun, 13 Nov 2022 21:52:00 +0800 Subject: [PATCH 02/10] [Connector-V2] [Clickhouse] Improve Clickhouse File Connector --- docs/en/connector-v2/sink/ClickhouseFile.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/en/connector-v2/sink/ClickhouseFile.md b/docs/en/connector-v2/sink/ClickhouseFile.md index 86f762a9cdc..eb064cc46f0 100644 --- a/docs/en/connector-v2/sink/ClickhouseFile.md +++ b/docs/en/connector-v2/sink/ClickhouseFile.md @@ -122,3 +122,7 @@ Sink plugin common parameters, please refer to [Sink Common Options](common-opti ### 2.2.0-beta 2022-09-26 - Support write data to ClickHouse File and move to ClickHouse data dir + +## Next version + +- Fix generated data part name conflict and improve file commit logic [3416](https://github.com/apache/incubator-seatunnel/pull/3416) \ No newline at end of file From eb6733756581c30089150cee5c72574484b6d1b6 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Sun, 13 Nov 2022 21:52:15 +0800 Subject: [PATCH 03/10] [Connector-V2] [Clickhouse] Improve Clickhouse File Connector --- docs/en/connector-v2/sink/ClickhouseFile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/ClickhouseFile.md b/docs/en/connector-v2/sink/ClickhouseFile.md index eb064cc46f0..1afe019d87f 100644 --- a/docs/en/connector-v2/sink/ClickhouseFile.md +++ b/docs/en/connector-v2/sink/ClickhouseFile.md @@ -123,6 +123,6 @@ Sink plugin common parameters, please refer to [Sink Common Options](common-opti - Support write data to ClickHouse File and move to ClickHouse data dir -## Next version +### Next version - Fix generated data part name conflict and improve file commit logic [3416](https://github.com/apache/incubator-seatunnel/pull/3416) \ No newline at end of file From 97d36f4320db5792c3ff00fb48f75cfb41a3821a Mon Sep 17 00:00:00 2001 From: Hisoka Date: Mon, 14 Nov 2022 10:36:13 +0800 Subject: [PATCH 04/10] Update docs/en/connector-v2/sink/ClickhouseFile.md Co-authored-by: Eric --- docs/en/connector-v2/sink/ClickhouseFile.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/ClickhouseFile.md b/docs/en/connector-v2/sink/ClickhouseFile.md index 1afe019d87f..252fe74d4d7 100644 --- a/docs/en/connector-v2/sink/ClickhouseFile.md +++ b/docs/en/connector-v2/sink/ClickhouseFile.md @@ -125,4 +125,4 @@ Sink plugin common parameters, please refer to [Sink Common Options](common-opti ### Next version -- Fix generated data part name conflict and improve file commit logic [3416](https://github.com/apache/incubator-seatunnel/pull/3416) \ No newline at end of file +- [BugFix] Fix generated data part name conflict and improve file commit logic [3416](https://github.com/apache/incubator-seatunnel/pull/3416) \ No newline at end of file From 8a0993b145201c98b3a4cf30b5b3efc1481f3d38 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 15 Nov 2022 11:03:10 +0800 Subject: [PATCH 05/10] [Clickhouse] Change Random number to Context index --- .../clickhouse/sink/file/ClickhouseFileSinkWriter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java index c69fd63d437..b2e473dcad8 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java @@ -47,7 +47,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Random; import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; @@ -66,10 +65,11 @@ public class ClickhouseFileSinkWriter implements SinkWriter shardTempFile; - private final int random = new Random().nextInt(100000); + private final SinkWriter.Context context; public ClickhouseFileSinkWriter(FileReaderOption readerOption, SinkWriter.Context context) { this.readerOption = readerOption; + this.context = context; proxy = new ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode()); shardRouter = new ShardRouter(proxy, this.readerOption.getShardMetadata()); clickhouseTable = proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(), @@ -223,7 +223,7 @@ private List generateClickhouseLocalFiles(String clickhouseLocalFileTmpF .filter(File::isDirectory) .filter(f -> !"detached".equals(f.getName())) .map(f -> { - File newFile = new File(f.getParent() + "/" + f.getName() + "_" + random); + File newFile = new File(f.getParent() + "/" + f.getName() + "_" + context.getIndexOfSubtask()); if (f.renameTo(newFile)) { return newFile; } else { From 903839e7f31dab7f9291005f6f4bb3ad45dd546f Mon Sep 17 00:00:00 2001 From: Hisoka Date: Tue, 22 Nov 2022 15:28:52 +0800 Subject: [PATCH 06/10] [Connector-V2] [Clickhouse] Improve Clickhouse add compatible mode --- docs/en/connector-v2/sink/ClickhouseFile.md | 11 ++++++-- .../clickhouse/config/ClickhouseConfig.java | 4 +++ .../clickhouse/config/FileReaderOption.java | 5 +++- .../sink/file/ClickhouseFileSink.java | 5 +++- .../sink/file/ClickhouseFileSinkFactory.java | 3 ++- .../sink/file/ClickhouseFileSinkWriter.java | 25 ++++++++++++++++--- 6 files changed, 44 insertions(+), 9 deletions(-) diff --git a/docs/en/connector-v2/sink/ClickhouseFile.md b/docs/en/connector-v2/sink/ClickhouseFile.md index 252fe74d4d7..a98d64e265e 100644 --- a/docs/en/connector-v2/sink/ClickhouseFile.md +++ b/docs/en/connector-v2/sink/ClickhouseFile.md @@ -22,7 +22,7 @@ Write data to Clickhouse can also be done using JDBC ## Options | name | type | required | default value | -| ---------------------- | ------- | -------- | ------------- | +|------------------------|---------|----------|---------------| | host | string | yes | - | | database | string | yes | - | | table | string | yes | - | @@ -36,6 +36,7 @@ Write data to Clickhouse can also be done using JDBC | node_pass.node_address | string | no | - | | node_pass.username | string | no | "root" | | node_pass.password | string | no | - | +| compatible_mode | boolean | no | false | | common-options | | no | - | ### host [string] @@ -94,6 +95,11 @@ The username corresponding to the clickhouse server, default root user. The password corresponding to the clickhouse server. +### compatible_mode [boolean] + +In the lower version of Clickhouse, the ClickhouseLocal program does not support the `--path` parameter, +you need to use this mode to take other ways to realize the `--path` parameter function + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details @@ -125,4 +131,5 @@ Sink plugin common parameters, please refer to [Sink Common Options](common-opti ### Next version -- [BugFix] Fix generated data part name conflict and improve file commit logic [3416](https://github.com/apache/incubator-seatunnel/pull/3416) \ No newline at end of file +- [BugFix] Fix generated data part name conflict and improve file commit logic [3416](https://github.com/apache/incubator-seatunnel/pull/3416) +- [Feature] Support compatible_mode compatible with lower version Clickhouse [3416](https://github.com/apache/incubator-seatunnel/pull/3416) \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java index 61ef2b4d705..857eb629bd4 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java @@ -96,6 +96,10 @@ public class ClickhouseConfig { public static final Option COPY_METHOD = Options.key("copy_method").enumType(ClickhouseFileCopyMethod.class) .defaultValue(ClickhouseFileCopyMethod.SCP).withDescription("The method of copy Clickhouse file"); + public static final Option COMPATIBLE_MODE = Options.key("compatible_mode").booleanType() + .defaultValue(false).withDescription("In the lower version of Clickhouse, the ClickhouseLocal program does not support the `--path` parameter, " + + "you need to use this mode to take other ways to realize the --path parameter function"); + public static final String NODE_ADDRESS = "node_address"; public static final Option NODE_FREE_PASSWORD = Options.key("node_free_password").booleanType() diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java index 7ef25fffe06..4d4cf119a1e 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java @@ -38,13 +38,15 @@ public class FileReaderOption implements Serializable { private Map nodeUser; private Map nodePassword; private SeaTunnelRowType seaTunnelRowType; + private boolean compatibleMode; public FileReaderOption(ShardMetadata shardMetadata, Map tableSchema, List fields, String clickhouseLocalPath, ClickhouseFileCopyMethod copyMethod, Map nodeUser, boolean nodeFreePass, - Map nodePassword) { + Map nodePassword, + boolean compatibleMode) { this.shardMetadata = shardMetadata; this.tableSchema = tableSchema; this.fields = fields; @@ -53,5 +55,6 @@ public FileReaderOption(ShardMetadata shardMetadata, Map tableSc this.nodeUser = nodeUser; this.nodeFreePass = nodeFreePass; this.nodePassword = nodePassword; + this.compatibleMode = compatibleMode; } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java index c11b3d297c1..3f733a6c24a 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; @@ -85,6 +86,7 @@ public void prepare(Config config) throws PrepareFailException { Map defaultConfigs = ImmutableMap.builder() .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName()) .put(NODE_FREE_PASSWORD.key(), NODE_FREE_PASSWORD.defaultValue()) + .put(COMPATIBLE_MODE.key(), COMPATIBLE_MODE.defaultValue()) .build(); config = config.withFallback(ConfigFactory.parseMap(defaultConfigs)); @@ -127,7 +129,8 @@ public void prepare(Config config) throws PrepareFailException { proxy.close(); this.readerOption = new FileReaderOption(shardMetadata, tableSchema, fields, config.getString(CLICKHOUSE_LOCAL_PATH.key()), - ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, config.getBoolean(NODE_FREE_PASSWORD.key()), nodePassword); + ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, config.getBoolean(NODE_FREE_PASSWORD.key()), nodePassword, + config.getBoolean(COMPATIBLE_MODE.key())); } @Override diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java index 550cc3a7080..998552b9da0 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_LOCAL_PATH; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COMPATIBLE_MODE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; @@ -45,6 +46,6 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH) - .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, NODE_PASS).build(); + .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE).build(); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java index b2e473dcad8..1a9d69955c9 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java @@ -33,6 +33,7 @@ import java.io.BufferedReader; import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -53,7 +54,10 @@ @Slf4j public class ClickhouseFileSinkWriter implements SinkWriter { - private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/seatunnel-file"; + private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/seatunnel/clickhouse-local/seatunnel-file"; + + private static final String CK_LOCAL_CONFIG_TEMPLATE = " %s default default" + + "1"; private static final String CLICKHOUSE_LOCAL_FILE_SUFFIX = "/local_data.log"; private static final int UUID_LENGTH = 10; private final FileReaderOption readerOption; @@ -161,7 +165,9 @@ private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row) throws IO private List generateClickhouseLocalFiles(String clickhouseLocalFileTmpFile) throws IOException, InterruptedException { - String uuid = UUID.randomUUID().toString().substring(0, UUID_LENGTH).replaceAll("-", "_"); + // temp file path format prefix//suffix + String[] tmpStrArr = clickhouseLocalFileTmpFile.split("/"); + String uuid = tmpStrArr[tmpStrArr.length - 2]; List localPaths = Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(" ")) .collect(Collectors.toList()); String clickhouseLocalFile = clickhouseLocalFileTmpFile.substring(0, clickhouseLocalFileTmpFile.length() - CLICKHOUSE_LOCAL_FILE_SUFFIX.length()); @@ -188,8 +194,19 @@ private List generateClickhouseLocalFiles(String clickhouseLocalFileTmpF } }).collect(Collectors.joining(",")), uuid)); - command.add("--path"); - command.add("\"" + clickhouseLocalFile + "\""); + if (readerOption.isCompatibleMode()) { + String ckLocalConfigPath = String.format("%s/%s/config.xml", CLICKHOUSE_LOCAL_FILE_PREFIX, uuid); + try (FileWriter writer = new FileWriter(ckLocalConfigPath)) { + writer.write(String.format(CK_LOCAL_CONFIG_TEMPLATE, clickhouseLocalFile)); + } catch (IOException e) { + throw new RuntimeException("Error occurs when create ck local config", e); + } + command.add("--config-file"); + command.add("\"" + ckLocalConfigPath + "\""); + } else { + command.add("--path"); + command.add("\"" + clickhouseLocalFile + "\""); + } log.info("Generate clickhouse local file command: {}", String.join(" ", command)); ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command)); Process start = processBuilder.start(); From 27096306df5e26aee9f11a24d8dc7b29cff655ac Mon Sep 17 00:00:00 2001 From: Hisoka Date: Mon, 5 Dec 2022 17:22:54 +0800 Subject: [PATCH 07/10] [Connector-V2] [Clickhouse] Improve Clickhouse File Connector --- .../clickhouse/sink/file/ClickhouseFileSink.java | 2 +- .../sink/file/ClickhouseFileSinkWriter.java | 13 ++++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java index 34083932786..12296e2641b 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java @@ -32,9 +32,9 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME; import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.serialization.DefaultSerializer; import org.apache.seatunnel.api.serialization.Serializer; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SinkWriter; diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java index 8af128802ad..2b13336cacd 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java @@ -21,7 +21,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.exception.CommonErrorCode; -import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption; import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException; @@ -107,7 +106,7 @@ public void write(SeaTunnelRow element) throws IOException { return FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW); } catch (IOException e) { - throw new SeaTunnelException("can't create new file to save tmp data", e); + throw new ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "can't create new file to save tmp data", e); } }); saveDataToFile(channel, element); @@ -202,7 +201,7 @@ private List generateClickhouseLocalFiles(String clickhouseLocalFileTmpF try (FileWriter writer = new FileWriter(ckLocalConfigPath)) { writer.write(String.format(CK_LOCAL_CONFIG_TEMPLATE, clickhouseLocalFile)); } catch (IOException e) { - throw new RuntimeException("Error occurs when create ck local config", e); + throw new ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "Error occurs when create ck local config"); } command.add("--config-file"); command.add("\"" + ckLocalConfigPath + "\""); @@ -222,6 +221,14 @@ private List generateClickhouseLocalFiles(String clickhouseLocalFileTmpF log.info(line); } } + try (InputStream inputStream = start.getErrorStream(); + InputStreamReader inputStreamReader = new InputStreamReader(inputStream); + BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) { + String line; + while ((line = bufferedReader.readLine()) != null) { + log.error(line); + } + } start.waitFor(); File file = new File(clickhouseLocalFile + "/data/_local/" + clickhouseTable.getLocalTableName()); if (!file.exists()) { From 6babe9a4808ec55773319a93d61f64581648b1f6 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Thu, 8 Dec 2022 16:52:01 +0800 Subject: [PATCH 08/10] [Improve][ClickhouseFile] Add ClickhouseFile config --- docs/en/connector-v2/sink/ClickhouseFile.md | 52 ++++++++++++------- .../clickhouse/config/ClickhouseConfig.java | 7 +++ .../clickhouse/config/FileReaderOption.java | 8 ++- .../sink/file/ClickhouseFileSink.java | 6 ++- .../sink/file/ClickhouseFileSinkFactory.java | 4 +- .../sink/file/ClickhouseFileSinkWriter.java | 21 +++++--- .../clickhouse/ClickhouseFactoryTest.java | 2 + 7 files changed, 69 insertions(+), 31 deletions(-) diff --git a/docs/en/connector-v2/sink/ClickhouseFile.md b/docs/en/connector-v2/sink/ClickhouseFile.md index a98d64e265e..f4d18c040ab 100644 --- a/docs/en/connector-v2/sink/ClickhouseFile.md +++ b/docs/en/connector-v2/sink/ClickhouseFile.md @@ -21,23 +21,25 @@ Write data to Clickhouse can also be done using JDBC ## Options -| name | type | required | default value | -|------------------------|---------|----------|---------------| -| host | string | yes | - | -| database | string | yes | - | -| table | string | yes | - | -| username | string | yes | - | -| password | string | yes | - | -| clickhouse_local_path | string | yes | - | -| sharding_key | string | no | - | -| copy_method | string | no | scp | -| node_free_password | boolean | no | false | -| node_pass | list | no | - | -| node_pass.node_address | string | no | - | -| node_pass.username | string | no | "root" | -| node_pass.password | string | no | - | -| compatible_mode | boolean | no | false | -| common-options | | no | - | +| name | type | required | default value | +|------------------------|---------|----------|----------------------------------------| +| host | string | yes | - | +| database | string | yes | - | +| table | string | yes | - | +| username | string | yes | - | +| password | string | yes | - | +| clickhouse_local_path | string | yes | - | +| sharding_key | string | no | - | +| copy_method | string | no | scp | +| node_free_password | boolean | no | false | +| node_pass | list | no | - | +| node_pass.node_address | string | no | - | +| node_pass.username | string | no | "root" | +| node_pass.password | string | no | - | +| compatible_mode | boolean | no | false | +| file_fields_delimiter | string | no | "\t" | +| file_temp_path | string | no | "/tmp/seatunnel/clickhouse-local/file" | +| common-options | | no | - | ### host [string] @@ -97,9 +99,19 @@ The password corresponding to the clickhouse server. ### compatible_mode [boolean] -In the lower version of Clickhouse, the ClickhouseLocal program does not support the `--path` parameter, +In the lower version of Clickhouse, the ClickhouseLocal program does not support the `--path` parameter, you need to use this mode to take other ways to realize the `--path` parameter function +### file_fields_delimiter [string] + +ClickhouseFile uses csv format to temporarily save data. If the data in the row contains the delimiter value +of csv, it may cause program exceptions. +Avoid this with this configuration. Value string has to be an exactly one character long + +### file_temp_path [string] + +The directory where ClickhouseFile stores temporary files locally. + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details @@ -108,8 +120,8 @@ Sink plugin common parameters, please refer to [Sink Common Options](common-opti ```hocon ClickhouseFile { - host = "192.168.0.1:8123" - database = "default" + host = "192.168.0.1:8123" + database = "default" table = "fake_all" username = "default" password = "" diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java index 857eb629bd4..3f48df99085 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java @@ -115,4 +115,11 @@ public class ClickhouseConfig { public static final Option> CLICKHOUSE_PREFIX = Options.key("clickhouse").mapType() .defaultValue(Collections.emptyMap()).withDescription("Clickhouse custom config"); + public static final Option FILE_FIELDS_DELIMITER = Options.key("file_fields_delimiter").stringType() + .defaultValue("\t").withDescription("ClickhouseFile uses csv format to temporarily save data. If the data in the row contains the delimiter value of csv," + + " it may cause program exceptions. Avoid this with this configuration. Value string has to be an exactly one character long"); + + public static final Option FILE_TEMP_PATH = Options.key("file_temp_path").stringType() + .defaultValue("/tmp/seatunnel/clickhouse-local/file").withDescription("The directory where ClickhouseFile stores temporary files locally."); + } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java index 4d4cf119a1e..fb00ee995ed 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java @@ -39,6 +39,8 @@ public class FileReaderOption implements Serializable { private Map nodePassword; private SeaTunnelRowType seaTunnelRowType; private boolean compatibleMode; + private String fileTempPath; + private String fileFieldsDelimiter; public FileReaderOption(ShardMetadata shardMetadata, Map tableSchema, List fields, String clickhouseLocalPath, @@ -46,7 +48,9 @@ public FileReaderOption(ShardMetadata shardMetadata, Map tableSc Map nodeUser, boolean nodeFreePass, Map nodePassword, - boolean compatibleMode) { + boolean compatibleMode, + String fileTempPath, + String fileFieldsDelimiter) { this.shardMetadata = shardMetadata; this.tableSchema = tableSchema; this.fields = fields; @@ -56,5 +60,7 @@ public FileReaderOption(ShardMetadata shardMetadata, Map tableSc this.nodeFreePass = nodeFreePass; this.nodePassword = nodePassword; this.compatibleMode = compatibleMode; + this.fileFieldsDelimiter = fileFieldsDelimiter; + this.fileTempPath = fileTempPath; } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java index 12296e2641b..d7c25ad8eef 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java @@ -22,6 +22,8 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_ADDRESS; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD; @@ -93,6 +95,8 @@ public void prepare(Config config) throws PrepareFailException { .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName()) .put(NODE_FREE_PASSWORD.key(), NODE_FREE_PASSWORD.defaultValue()) .put(COMPATIBLE_MODE.key(), COMPATIBLE_MODE.defaultValue()) + .put(FILE_TEMP_PATH.key(), FILE_TEMP_PATH.defaultValue()) + .put(FILE_FIELDS_DELIMITER.key(), FILE_FIELDS_DELIMITER.defaultValue()) .build(); config = config.withFallback(ConfigFactory.parseMap(defaultConfigs)); @@ -136,7 +140,7 @@ public void prepare(Config config) throws PrepareFailException { proxy.close(); this.readerOption = new FileReaderOption(shardMetadata, tableSchema, fields, config.getString(CLICKHOUSE_LOCAL_PATH.key()), ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, config.getBoolean(NODE_FREE_PASSWORD.key()), nodePassword, - config.getBoolean(COMPATIBLE_MODE.key())); + config.getBoolean(COMPATIBLE_MODE.key()), config.getString(FILE_TEMP_PATH.key()), config.getString(FILE_FIELDS_DELIMITER.key())); } @Override diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java index 998552b9da0..829d8e005cf 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java @@ -22,6 +22,8 @@ import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.COPY_METHOD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER; +import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD; import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS; @@ -46,6 +48,6 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder().required(HOST, TABLE, DATABASE, USERNAME, PASSWORD, CLICKHOUSE_LOCAL_PATH) - .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE).build(); + .optional(COPY_METHOD, SHARDING_KEY, FIELDS, NODE_FREE_PASSWORD, NODE_PASS, COMPATIBLE_MODE, FILE_FIELDS_DELIMITER, FILE_TEMP_PATH).build(); } } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java index 2b13336cacd..14b5c181cb3 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java @@ -56,7 +56,6 @@ @Slf4j public class ClickhouseFileSinkWriter implements SinkWriter { - private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/seatunnel/clickhouse-local/seatunnel-file"; private static final String CK_LOCAL_CONFIG_TEMPLATE = " %s default default" + "1"; @@ -99,7 +98,7 @@ public void write(SeaTunnelRow element) throws IOException { FileChannel channel = rowCache.computeIfAbsent(shard, k -> { try { String uuid = UUID.randomUUID().toString().substring(0, UUID_LENGTH).replaceAll("-", "_"); - String clickhouseLocalFile = String.format("%s/%s", CLICKHOUSE_LOCAL_FILE_PREFIX, uuid); + String clickhouseLocalFile = String.format("%s/%s", readerOption.getFileTempPath(), uuid); FileUtils.forceMkdir(new File(clickhouseLocalFile)); String clickhouseLocalFileTmpFile = clickhouseLocalFile + CLICKHOUSE_LOCAL_FILE_SUFFIX; shardTempFile.put(shard, clickhouseLocalFileTmpFile); @@ -130,15 +129,19 @@ public Optional prepareCommit() throws IOException { } Map> detachedFiles = new HashMap<>(); shardTempFile.forEach((shard, path) -> { + List clickhouseLocalFiles = null; try { - List clickhouseLocalFiles = generateClickhouseLocalFiles(path); + clickhouseLocalFiles = generateClickhouseLocalFiles(path); // move file to server moveClickhouseLocalFileToServer(shard, clickhouseLocalFiles); detachedFiles.put(shard, clickhouseLocalFiles); - // clear local file - clearLocalFileDirectory(clickhouseLocalFiles); } catch (Exception e) { throw new ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data into clickhouse file error", e); + } finally { + if (clickhouseLocalFiles != null && !clickhouseLocalFiles.isEmpty()) { + // clear local file + clearLocalFileDirectory(clickhouseLocalFiles); + } } }); rowCache.clear(); @@ -159,7 +162,7 @@ public void close() throws IOException { private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow row) throws IOException { String data = this.readerOption.getFields().stream().map(field -> row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString()) - .collect(Collectors.joining("\t")) + "\n"; + .collect(Collectors.joining(readerOption.getFileFieldsDelimiter())) + "\n"; MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(), data.getBytes(StandardCharsets.UTF_8).length); buffer.put(data.getBytes(StandardCharsets.UTF_8)); @@ -179,6 +182,8 @@ private List generateClickhouseLocalFiles(String clickhouseLocalFileTmpF } command.add("--file"); command.add(clickhouseLocalFileTmpFile); + command.add("--format_csv_delimiter"); + command.add("\"" + readerOption.getFileFieldsDelimiter() + "\""); command.add("-S"); command.add("\"" + this.readerOption.getFields().stream().map(field -> field + " " + readerOption.getTableSchema().get(field)).collect(Collectors.joining(",")) + "\""); command.add("-N"); @@ -197,7 +202,7 @@ private List generateClickhouseLocalFiles(String clickhouseLocalFileTmpF }).collect(Collectors.joining(",")), uuid)); if (readerOption.isCompatibleMode()) { - String ckLocalConfigPath = String.format("%s/%s/config.xml", CLICKHOUSE_LOCAL_FILE_PREFIX, uuid); + String ckLocalConfigPath = String.format("%s/%s/config.xml", readerOption.getFileTempPath(), uuid); try (FileWriter writer = new FileWriter(ckLocalConfigPath)) { writer.write(String.format(CK_LOCAL_CONFIG_TEMPLATE, clickhouseLocalFile)); } catch (IOException e) { @@ -264,7 +269,7 @@ private void moveClickhouseLocalFileToServer(Shard shard, List clickhous private void clearLocalFileDirectory(List clickhouseLocalFiles) { String clickhouseLocalFile = clickhouseLocalFiles.get(0); - String localFileDir = clickhouseLocalFile.substring(0, CLICKHOUSE_LOCAL_FILE_PREFIX.length() + UUID_LENGTH + 1); + String localFileDir = clickhouseLocalFile.substring(0, readerOption.getFileTempPath().length() + UUID_LENGTH + 1); try { File file = new File(localFileDir); if (file.exists()) { diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java index 2a4205016ed..e6c50b0611a 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseFactoryTest.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.clickhouse; import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.ClickhouseSinkFactory; +import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkFactory; import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceFactory; import org.junit.jupiter.api.Assertions; @@ -29,5 +30,6 @@ public class ClickhouseFactoryTest { public void testOptionRule() { Assertions.assertNotNull((new ClickhouseSourceFactory()).optionRule()); Assertions.assertNotNull((new ClickhouseSinkFactory()).optionRule()); + Assertions.assertNotNull((new ClickhouseFileSinkFactory()).optionRule()); } } From 63be4c8256fee9b783521523c24b5416503718ab Mon Sep 17 00:00:00 2001 From: Hisoka Date: Thu, 8 Dec 2022 16:59:44 +0800 Subject: [PATCH 09/10] [Improve][ClickhouseFile] Add ClickhouseFile config --- .../seatunnel/clickhouse/sink/file/ClickhouseFileSink.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java index d7c25ad8eef..7aa5010f23d 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java @@ -138,6 +138,10 @@ public void prepare(Config config) throws PrepareFailException { configObject -> configObject.toConfig().getString(PASSWORD.key()))); proxy.close(); + + if (config.getString(FILE_FIELDS_DELIMITER.key()).length() != 1) { + throw new ClickhouseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, FILE_FIELDS_DELIMITER.key() + " must be a single character"); + } this.readerOption = new FileReaderOption(shardMetadata, tableSchema, fields, config.getString(CLICKHOUSE_LOCAL_PATH.key()), ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, config.getBoolean(NODE_FREE_PASSWORD.key()), nodePassword, config.getBoolean(COMPATIBLE_MODE.key()), config.getString(FILE_TEMP_PATH.key()), config.getString(FILE_FIELDS_DELIMITER.key())); From 436747dcd3d8f9d75ff6ea9ce45f0ce58ba2736f Mon Sep 17 00:00:00 2001 From: Hisoka Date: Fri, 9 Dec 2022 15:05:01 +0800 Subject: [PATCH 10/10] Update docs/en/connector-v2/sink/ClickhouseFile.md Co-authored-by: hailin0 --- docs/en/connector-v2/sink/ClickhouseFile.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/connector-v2/sink/ClickhouseFile.md b/docs/en/connector-v2/sink/ClickhouseFile.md index f4d18c040ab..1eb2458d081 100644 --- a/docs/en/connector-v2/sink/ClickhouseFile.md +++ b/docs/en/connector-v2/sink/ClickhouseFile.md @@ -120,8 +120,8 @@ Sink plugin common parameters, please refer to [Sink Common Options](common-opti ```hocon ClickhouseFile { - host = "192.168.0.1:8123" - database = "default" + host = "192.168.0.1:8123" + database = "default" table = "fake_all" username = "default" password = ""