From 4180531c6f921b59bff13a0948bfb485b6c059ad Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Tue, 23 May 2023 14:05:47 +0800 Subject: [PATCH] [refactor](routineload)Refactored routineload to improve scalability (#19834) - The data source parameters are sunk into the specific data source class - Simplify some code logic to reduce code complexity - Provide a data source factory class to extract public logic - Code that removes tests from production code. We should not include code for testing purposes in any production code. --- fe/fe-core/src/main/cup/sql_parser.cup | 9 +- .../doris/analysis/AlterRoutineLoadStmt.java | 36 +- .../doris/analysis/CreateRoutineLoadStmt.java | 45 +- .../RoutineLoadDataSourceProperties.java | 419 ------------------ .../AbstractDataSourceProperties.java | 106 +++++ .../load/routineload/KafkaRoutineLoadJob.java | 168 +++---- .../RoutineLoadDataSourcePropertyFactory.java | 42 ++ .../load/routineload/RoutineLoadJob.java | 27 +- .../load/routineload/RoutineLoadManager.java | 7 +- .../routineload/kafka/KafkaConfiguration.java | 70 +++ .../kafka/KafkaDataSourceProperties.java | 282 ++++++++++++ .../AlterRoutineLoadJobOperationLog.java | 10 +- .../apache/doris/persist/gson/GsonUtils.java | 10 +- .../analysis/AlterRoutineLoadStmtTest.java | 104 +++-- .../analysis/CreateRoutineLoadStmtTest.java | 19 +- .../RoutineLoadDataSourcePropertiesTest.java | 348 --------------- .../routineload/KafkaRoutineLoadJobTest.java | 15 +- .../routineload/RoutineLoadManagerTest.java | 9 +- .../AlterRoutineLoadOperationLogTest.java | 26 +- .../persist/DataSourcePropertiesTest.java | 108 +++++ 20 files changed, 855 insertions(+), 1005 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/routineload/AbstractDataSourceProperties.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadDataSourcePropertyFactory.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfiguration.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaDataSourceProperties.java delete mode 100644 fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/persist/DataSourcePropertiesTest.java diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 605e92e448d84e..152694a3698500 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -912,7 +912,7 @@ nonterminal ExplainOptions opt_explain_options; nonterminal Boolean opt_tmp; nonterminal OutFileClause opt_outfile; -nonterminal RoutineLoadDataSourceProperties opt_datasource_properties; +nonterminal Map opt_datasource_properties; nonterminal Boolean opt_signed_unsigned; @@ -1345,15 +1345,14 @@ alter_stmt ::= opt_datasource_properties ::= // empty {: - RESULT = new RoutineLoadDataSourceProperties(); + RESULT = new HashMap(); :} | KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN {: - // the 3rd parameter "true" means this is for AlterRoutineLoad operation. - RESULT = new RoutineLoadDataSourceProperties(type, customProperties, true); + Map properties = new HashMap(customProperties); + RESULT = properties; :} ; - quantity ::= INTEGER_LITERAL:number {: diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java index 96a800b9bceefc..edf2800fc934b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java @@ -19,15 +19,18 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.FeConstants; import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.load.routineload.AbstractDataSourceProperties; +import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; import org.apache.doris.load.routineload.RoutineLoadJob; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import lombok.Getter; +import org.apache.commons.collections.MapUtils; import java.util.Map; import java.util.Optional; @@ -62,17 +65,17 @@ public class AlterRoutineLoadStmt extends DdlStmt { private final LabelName labelName; private final Map jobProperties; - private final RoutineLoadDataSourceProperties dataSourceProperties; + private final Map dataSourceMapProperties; // save analyzed job properties. // analyzed data source properties are saved in dataSourceProperties. private Map analyzedJobProperties = Maps.newHashMap(); public AlterRoutineLoadStmt(LabelName labelName, Map jobProperties, - RoutineLoadDataSourceProperties dataSourceProperties) { + Map dataSourceProperties) { this.labelName = labelName; this.jobProperties = jobProperties != null ? jobProperties : Maps.newHashMap(); - this.dataSourceProperties = dataSourceProperties; + this.dataSourceMapProperties = dataSourceProperties != null ? dataSourceProperties : Maps.newHashMap(); } public String getDbName() { @@ -88,13 +91,16 @@ public Map getAnalyzedJobProperties() { } public boolean hasDataSourceProperty() { - return dataSourceProperties.hasAnalyzedProperties(); + return MapUtils.isNotEmpty(dataSourceMapProperties); } - public RoutineLoadDataSourceProperties getDataSourceProperties() { - return dataSourceProperties; + public Map getDataSourceMapProperties() { + return dataSourceMapProperties; } + @Getter + public AbstractDataSourceProperties dataSourceProperties; + @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); @@ -106,7 +112,7 @@ public void analyze(Analyzer analyzer) throws UserException { // check data source properties checkDataSourceProperties(); - if (analyzedJobProperties.isEmpty() && !dataSourceProperties.hasAnalyzedProperties()) { + if (analyzedJobProperties.isEmpty() && MapUtils.isEmpty(dataSourceMapProperties)) { throw new AnalysisException("No properties are specified"); } } @@ -200,13 +206,15 @@ private void checkJobProperties() throws UserException { } private void checkDataSourceProperties() throws UserException { - if (!FeConstants.runningUnitTest) { - RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() - .checkPrivAndGetJob(getDbName(), getLabel()); - dataSourceProperties.setTimezone(job.getTimezone()); - } else { - dataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); + if (MapUtils.isEmpty(dataSourceMapProperties)) { + return; } + RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager() + .getJob(getDbName(), getLabel()); + this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory + .createDataSource(job.getDataSourceType().name(), dataSourceMapProperties); + dataSourceProperties.setAlter(true); + dataSourceProperties.setTimezone(job.getTimezone()); dataSourceProperties.analyze(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 31df6592160056..442dfa3a0e441d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -25,12 +25,13 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeNameFormat; -import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.load.routineload.AbstractDataSourceProperties; +import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.qe.ConnectContext; @@ -104,20 +105,14 @@ public class CreateRoutineLoadStmt extends DdlStmt { public static final String NUM_AS_STRING = "num_as_string"; public static final String FUZZY_PARSE = "fuzzy_parse"; - // kafka type properties - public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list"; - public static final String KAFKA_TOPIC_PROPERTY = "kafka_topic"; - // optional - public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions"; - public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets"; - public static final String KAFKA_DEFAULT_OFFSETS = "kafka_default_offsets"; - public static final String KAFKA_ORIGIN_DEFAULT_OFFSETS = "kafka_origin_default_offsets"; - private static final String NAME_TYPE = "ROUTINE LOAD NAME"; public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism"; public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet"; + private AbstractDataSourceProperties dataSourceProperties; + + private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(DESIRED_CONCURRENT_NUMBER_PROPERTY) .add(MAX_ERROR_NUMBER_PROPERTY) @@ -142,7 +137,6 @@ public class CreateRoutineLoadStmt extends DdlStmt { private final List loadPropertyList; private final Map jobProperties; private final String typeName; - private final RoutineLoadDataSourceProperties dataSourceProperties; // the following variables will be initialized after analyze // -1 as unset, the default value will set in RoutineLoadJob @@ -193,7 +187,8 @@ public CreateRoutineLoadStmt(LabelName labelName, String tableName, List> getKafkaPartitionOffsets() { - return this.dataSourceProperties.getKafkaPartitionOffsets(); - } - - public Map getCustomKafkaProperties() { - return this.dataSourceProperties.getCustomKafkaProperties(); - } - public LoadTask.MergeType getMergeType() { return mergeType; } - public boolean isOffsetsForTimes() { - return this.dataSourceProperties.isOffsetsForTimes(); + public AbstractDataSourceProperties getDataSourceProperties() { + return dataSourceProperties; } public String getComment() { @@ -474,9 +453,9 @@ private void checkJobProperties() throws UserException { format = "json"; jsonPaths = jobProperties.getOrDefault(JSONPATHS, ""); jsonRoot = jobProperties.getOrDefault(JSONROOT, ""); - stripOuterArray = Boolean.valueOf(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false")); - numAsString = Boolean.valueOf(jobProperties.getOrDefault(NUM_AS_STRING, "false")); - fuzzyParse = Boolean.valueOf(jobProperties.getOrDefault(FUZZY_PARSE, "false")); + stripOuterArray = Boolean.parseBoolean(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false")); + numAsString = Boolean.parseBoolean(jobProperties.getOrDefault(NUM_AS_STRING, "false")); + fuzzyParse = Boolean.parseBoolean(jobProperties.getOrDefault(FUZZY_PARSE, "false")); } else { throw new UserException("Format type is invalid. format=`" + format + "`"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java deleted file mode 100644 index 95ff5519c1153d..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RoutineLoadDataSourceProperties.java +++ /dev/null @@ -1,419 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.analysis; - -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Pair; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.load.routineload.KafkaProgress; -import org.apache.doris.load.routineload.LoadDataSourceType; - -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.base.Strings; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.gson.annotations.SerializedName; -import org.apache.commons.lang3.math.NumberUtils; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.TimeZone; -import java.util.regex.Pattern; - -public class RoutineLoadDataSourceProperties { - - private static final ImmutableSet DATA_SOURCE_PROPERTIES_SET = new ImmutableSet.Builder() - .add(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY) - .add(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY) - .add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY) - .add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY) - .add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS) - .build(); - - private static final ImmutableSet CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET - = new ImmutableSet.Builder() - .add(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY) - .add(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY) - .add(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY) - .add(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY) - .add(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS) - .build(); - - // origin properties, no need to persist - private Map properties = Maps.newHashMap(); - private boolean isAlter = false; - - @SerializedName(value = "type") - private String type = "KAFKA"; - @SerializedName(value = "kafkaPartitionOffsets") - private List> kafkaPartitionOffsets = Lists.newArrayList(); - @SerializedName(value = "customKafkaProperties") - private Map customKafkaProperties = Maps.newHashMap(); - @SerializedName(value = "isOffsetsForTimes") - private boolean isOffsetsForTimes = false; - @SerializedName(value = "kafkaBrokerList") - private String kafkaBrokerList; - @SerializedName(value = "KafkaTopic") - private String kafkaTopic; - @SerializedName(value = "timezone") - private String timezone; - - public RoutineLoadDataSourceProperties() { - // for unit test, and empty data source properties when altering routine load - this.isAlter = true; - } - - public RoutineLoadDataSourceProperties(String type, Map properties, boolean isAlter) { - this.type = type.toUpperCase(); - this.properties = properties; - this.isAlter = isAlter; - } - - public void analyze() throws UserException { - if (properties.isEmpty()) { - if (!isAlter) { - throw new AnalysisException("No data source properties"); - } else { - // for alter routine load stmt, the datasource property can by null - return; - } - } - Preconditions.checkState(!Strings.isNullOrEmpty(timezone), "timezone must be set before analyzing"); - checkDataSourceProperties(); - } - - public boolean hasAnalyzedProperties() { - return !kafkaPartitionOffsets.isEmpty() || !customKafkaProperties.isEmpty(); - } - - public String getType() { - return type; - } - - public List> getKafkaPartitionOffsets() { - return kafkaPartitionOffsets; - } - - public void setKafkaPartitionOffsets(List> kafkaPartitionOffsets) { - this.kafkaPartitionOffsets = kafkaPartitionOffsets; - } - - public Map getCustomKafkaProperties() { - return customKafkaProperties; - } - - public void setTimezone(String timezone) { - this.timezone = timezone; - } - - public String getKafkaBrokerList() { - return kafkaBrokerList; - } - - public String getKafkaTopic() { - return kafkaTopic; - } - - public boolean isOffsetsForTimes() { - return isOffsetsForTimes; - } - - private void checkDataSourceProperties() throws UserException { - LoadDataSourceType sourceType; - try { - sourceType = LoadDataSourceType.valueOf(type); - } catch (IllegalArgumentException e) { - throw new AnalysisException("routine load job does not support this type " + type); - } - switch (sourceType) { - case KAFKA: - checkKafkaProperties(); - break; - default: - break; - } - } - - /* - * Kafka properties includes follows: - * 1. broker list - * 2. topic - * 3. partition offset info - * 4. other properties start with "property." - */ - private void checkKafkaProperties() throws UserException { - ImmutableSet propertySet = isAlter - ? CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET : DATA_SOURCE_PROPERTIES_SET; - Optional optional = properties.keySet().stream() - .filter(entity -> !propertySet.contains(entity)) - .filter(entity -> !entity.startsWith("property.")) - .findFirst(); - if (optional.isPresent()) { - throw new AnalysisException(optional.get() + " is invalid kafka property or can not be set"); - } - - // check broker list - kafkaBrokerList = Strings.nullToEmpty(properties.get(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY)) - .replaceAll(" ", ""); - if (!isAlter && Strings.isNullOrEmpty(kafkaBrokerList)) { - throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY + " is a required property"); - } - if (!Strings.isNullOrEmpty(kafkaBrokerList)) { - String[] kafkaBrokerList = this.kafkaBrokerList.split(","); - for (String broker : kafkaBrokerList) { - if (!Pattern.matches(CreateRoutineLoadStmt.ENDPOINT_REGEX, broker)) { - throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY + ":" + broker - + " not match pattern " + CreateRoutineLoadStmt.ENDPOINT_REGEX); - } - } - } - - // check topic - kafkaTopic = Strings.nullToEmpty(properties.get(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY)) - .replaceAll(" ", ""); - if (!isAlter && Strings.isNullOrEmpty(kafkaTopic)) { - throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY + " is a required property"); - } - - // check custom kafka property - // This should be done before check partition and offsets, because we need KAFKA_DEFAULT_OFFSETS, - // which is in custom properties. - analyzeCustomProperties(this.properties, this.customKafkaProperties); - - // The partition offset properties are all optional, - // and there are 5 valid cases for specifying partition offsets: - // A. partition, offset and default offset are not set - // Doris will set default offset to OFFSET_END - // B. partition and offset are set, default offset is not set - // fill the "kafkaPartitionOffsets" with partition and offset - // C. partition and default offset are set, offset is not set - // fill the "kafkaPartitionOffsets" with partition and default offset - // D. partition is set, offset and default offset are not set - // this is only valid when doing create routine load operation, - // fill the "kafkaPartitionOffsets" with partition and OFFSET_END - // E. only default offset is set. - // this is only valid when doing alter routine load operation. - // Other cases are illegal. - - // check partitions - String kafkaPartitionsString = properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY); - if (kafkaPartitionsString != null) { - analyzeKafkaPartitionProperty(kafkaPartitionsString, this.kafkaPartitionOffsets); - } - - // check offset - String kafkaOffsetsString = properties.get(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY); - String kafkaDefaultOffsetString = customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS); - if (kafkaOffsetsString != null && kafkaDefaultOffsetString != null) { - throw new AnalysisException("Only one of " + CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY - + " and " + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS + " can be set."); - } - if (isAlter && kafkaPartitionsString != null - && kafkaOffsetsString == null && kafkaDefaultOffsetString == null) { - // if this is an alter operation, the partition and (default)offset must be set together. - throw new AnalysisException("Must set offset or default offset with partition property"); - } - - if (kafkaOffsetsString != null) { - this.isOffsetsForTimes = analyzeKafkaOffsetProperty(kafkaOffsetsString, - this.kafkaPartitionOffsets, this.timezone); - } else { - // offset is not set, check default offset. - this.isOffsetsForTimes = analyzeKafkaDefaultOffsetProperty(this.customKafkaProperties, this.timezone); - if (!this.kafkaPartitionOffsets.isEmpty()) { - // Case C - kafkaDefaultOffsetString = customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS); - setDefaultOffsetForPartition(this.kafkaPartitionOffsets, - kafkaDefaultOffsetString, this.isOffsetsForTimes); - } - } - } - - private static void setDefaultOffsetForPartition(List> kafkaPartitionOffsets, - String kafkaDefaultOffsetString, boolean isOffsetsForTimes) { - if (isOffsetsForTimes) { - for (Pair pair : kafkaPartitionOffsets) { - pair.second = Long.valueOf(kafkaDefaultOffsetString); - } - } else { - for (Pair pair : kafkaPartitionOffsets) { - if (kafkaDefaultOffsetString.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { - pair.second = KafkaProgress.OFFSET_BEGINNING_VAL; - } else { - pair.second = KafkaProgress.OFFSET_END_VAL; - } - } - } - } - - // If the default offset is not set, set the default offset to OFFSET_END. - // If the offset is in datetime format, convert it to a timestamp, - // and also save the origin datatime formatted offset - // in "customKafkaProperties" - // return true if the offset is in datetime format. - private static boolean analyzeKafkaDefaultOffsetProperty( - Map customKafkaProperties, String timeZoneStr) - throws AnalysisException { - customKafkaProperties.putIfAbsent(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, KafkaProgress.OFFSET_END); - String defaultOffsetStr = customKafkaProperties.get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS); - TimeZone timeZone = TimeUtils.getOrSystemTimeZone(timeZoneStr); - long defaultOffset = TimeUtils.timeStringToLong(defaultOffsetStr, timeZone); - if (defaultOffset != -1) { - // this is a datetime format offset - customKafkaProperties.put(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, String.valueOf(defaultOffset)); - // we convert datetime to timestamp, and save the origin datetime formatted offset for further use. - customKafkaProperties.put(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS, defaultOffsetStr); - return true; - } else { - if (!defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING) - && !defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) { - throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS - + " can only be set to OFFSET_BEGINNING, OFFSET_END or date time"); - } - return false; - } - } - - // init "kafkaPartitionOffsets" with partition property. - // The offset will be set to OFFSET_END for now, and will be changed in later analysis process. - private static void analyzeKafkaPartitionProperty(String kafkaPartitionsString, - List> kafkaPartitionOffsets) throws AnalysisException { - kafkaPartitionsString = kafkaPartitionsString.replaceAll(" ", ""); - if (kafkaPartitionsString.isEmpty()) { - throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY - + " could not be a empty string"); - } - String[] kafkaPartitionsStringList = kafkaPartitionsString.split(","); - for (String s : kafkaPartitionsStringList) { - try { - kafkaPartitionOffsets.add(Pair.of(getIntegerValueFromString( - s, CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY), KafkaProgress.OFFSET_END_VAL)); - } catch (AnalysisException e) { - throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY - + " must be a number string with comma-separated"); - } - } - } - - // Fill the partition's offset with given kafkaOffsetsString, - // Return true if offset is specified by timestamp. - private static boolean analyzeKafkaOffsetProperty(String kafkaOffsetsString, - List> kafkaPartitionOffsets, String timeZoneStr) - throws UserException { - if (Strings.isNullOrEmpty(kafkaOffsetsString)) { - throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY + " could not be a empty string"); - } - List kafkaOffsetsStringList = Splitter.on(",").trimResults().splitToList(kafkaOffsetsString); - if (kafkaOffsetsStringList.size() != kafkaPartitionOffsets.size()) { - throw new AnalysisException("Partitions number should be equals to offsets number"); - } - - // We support two ways to specify the offset, - // one is to specify the offset directly, the other is to specify a timestamp. - // Doris will get the offset of the corresponding partition through the timestamp. - // The user can only choose one of these methods. - boolean foundTime = false; - boolean foundOffset = false; - for (String kafkaOffsetsStr : kafkaOffsetsStringList) { - if (TimeUtils.timeStringToLong(kafkaOffsetsStr) != -1) { - foundTime = true; - } else { - foundOffset = true; - } - } - if (foundTime && foundOffset) { - throw new AnalysisException("The offset of the partition cannot be specified by the timestamp " - + "and the offset at the same time"); - } - - if (foundTime) { - // convert all datetime strs to timestamps - // and set them as the partition's offset. - // These timestamps will be converted to real offset when job is running. - TimeZone timeZone = TimeUtils.getOrSystemTimeZone(timeZoneStr); - for (int i = 0; i < kafkaOffsetsStringList.size(); i++) { - String kafkaOffsetsStr = kafkaOffsetsStringList.get(i); - long timestamp = TimeUtils.timeStringToLong(kafkaOffsetsStr, timeZone); - Preconditions.checkState(timestamp != -1); - kafkaPartitionOffsets.get(i).second = timestamp; - } - } else { - for (int i = 0; i < kafkaOffsetsStringList.size(); i++) { - String kafkaOffsetsStr = kafkaOffsetsStringList.get(i); - if (kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { - kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_BEGINNING_VAL; - } else if (kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) { - kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_END_VAL; - } else if (NumberUtils.isDigits(kafkaOffsetsStr)) { - kafkaPartitionOffsets.get(i).second = Long.valueOf(NumberUtils.toLong(kafkaOffsetsStr)); - } else { - throw new AnalysisException(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY - + " must be an integer or a date time"); - } - } - } - - return foundTime; - } - - private static void analyzeCustomProperties(Map dataSourceProperties, - Map customKafkaProperties) throws AnalysisException { - for (Map.Entry dataSourceProperty : dataSourceProperties.entrySet()) { - if (dataSourceProperty.getKey().startsWith("property.")) { - String propertyKey = dataSourceProperty.getKey(); - String propertyValue = dataSourceProperty.getValue(); - String[] propertyValueArr = propertyKey.split("\\."); - if (propertyValueArr.length < 2) { - throw new AnalysisException("kafka property value could not be a empty string"); - } - customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") + 1), propertyValue); - } - // can be extended in the future which other prefix - } - } - - private static int getIntegerValueFromString(String valueString, String propertyName) throws AnalysisException { - if (valueString.isEmpty()) { - throw new AnalysisException(propertyName + " could not be a empty string"); - } - int value; - try { - value = Integer.valueOf(valueString); - } catch (NumberFormatException e) { - throw new AnalysisException(propertyName + " must be a integer"); - } - return value; - } - - @Override - public String toString() { - if (!hasAnalyzedProperties()) { - return "empty"; - } - - StringBuilder sb = new StringBuilder(); - sb.append("type: ").append(type); - sb.append(", kafka partition offsets: ").append(kafkaPartitionOffsets); - sb.append(", custome properties: ").append(customKafkaProperties); - return sb.toString(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/AbstractDataSourceProperties.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/AbstractDataSourceProperties.java new file mode 100644 index 00000000000000..7a5ffcd7d37c07 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/AbstractDataSourceProperties.java @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.gson.annotations.SerializedName; +import lombok.Data; +import lombok.Getter; +import org.apache.commons.collections.MapUtils; + +import java.util.List; +import java.util.Map; + +/** + * Abstract class for data source properties + * All routine load data source properties should extend this class + */ +@Data +public abstract class AbstractDataSourceProperties { + + /** + * Original data source properties + * we can use this map to get all original properties + * and this is only a temporary parameter and will be of no use after convert ends + */ + @Getter + protected Map originalDataSourceProperties; + + @SerializedName(value = "type") + private String dataSourceType; + + /** + * Is it an ALTER operation + */ + private boolean isAlter = false; + + @SerializedName(value = "timezone") + protected String timezone; + + + public AbstractDataSourceProperties(Map dataSourceProperties) { + this.originalDataSourceProperties = dataSourceProperties; + } + + protected abstract String getDataSourceType(); + + protected abstract List getRequiredProperties() throws UserException; + + /** + * Check required properties + * we can check for optional mutex parameters, and whether the concrete type is null in the future + * + * @throws UserException + */ + protected void checkRequiredProperties() throws UserException { + if (isAlter) { + return; + } + List requiredProperties = getRequiredProperties(); + for (String requiredProperty : requiredProperties) { + if (!originalDataSourceProperties.containsKey(requiredProperty) + && null != originalDataSourceProperties.get(requiredProperty)) { + throw new IllegalArgumentException("Required property " + requiredProperty + " is missing"); + } + } + } + + public void analyze() throws UserException { + if (isAlter && MapUtils.isEmpty(originalDataSourceProperties)) { + throw new AnalysisException("No data source properties"); + } + Preconditions.checkState(!Strings.isNullOrEmpty(timezone), "timezone must be set before analyzing"); + checkRequiredProperties(); + this.dataSourceType = getDataSourceType(); + this.convertAndCheckDataSourceProperties(); + } + + /** + * Convert and check data source properties + * This method should be implemented by sub class + * It will be called in analyze method + * It will convert data source properties to correct type + * + * @throws UserException if any error occurs + */ + public abstract void convertAndCheckDataSourceProperties() throws UserException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index d20f32bac4d20a..51f27eb107d73a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -19,7 +19,6 @@ import org.apache.doris.analysis.AlterRoutineLoadStmt; import org.apache.doris.analysis.CreateRoutineLoadStmt; -import org.apache.doris.analysis.RoutineLoadDataSourceProperties; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -38,6 +37,8 @@ import org.apache.doris.common.util.SmallFileMgr; import org.apache.doris.common.util.SmallFileMgr.SmallFile; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.routineload.kafka.KafkaConfiguration; +import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.thrift.TFileCompressType; import org.apache.doris.transaction.TransactionState; @@ -49,6 +50,8 @@ import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.parquet.Strings; @@ -59,6 +62,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.TimeZone; @@ -78,7 +82,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { private String topic; // optional, user want to load partitions. private List customKafkaPartitions = Lists.newArrayList(); - // current kafka partitions is the actually partition which will be fetched + // current kafka partitions is the actual partition which will be fetched private List currentKafkaPartitions = Lists.newArrayList(); // optional, user want to set default offset when new partition add or offset not set. // kafkaDefaultOffSet has two formats, one is the time format, eg: "2021-10-10 11:00:00", @@ -187,10 +191,13 @@ private void convertCustomProperties(boolean rebuild) throws DdlException { // KAFKA_DEFAULT_OFFSETS, and this attribute will be converted into a timestamp during the analyzing phase, // thus losing some information. So we use KAFKA_ORIGIN_DEFAULT_OFFSETS to store the original datetime // formatted KAFKA_DEFAULT_OFFSETS value - if (convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS)) { - kafkaDefaultOffSet = convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS); - } else if (convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)) { - kafkaDefaultOffSet = convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS); + if (convertedCustomProperties.containsKey(KafkaConfiguration.KAFKA_ORIGIN_DEFAULT_OFFSETS.getName())) { + kafkaDefaultOffSet = convertedCustomProperties + .remove(KafkaConfiguration.KAFKA_ORIGIN_DEFAULT_OFFSETS.getName()); + return; + } + if (convertedCustomProperties.containsKey(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName())) { + kafkaDefaultOffSet = convertedCustomProperties.remove(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName()); } } @@ -323,38 +330,19 @@ private void updateKafkaPartitions() throws UserException { // else if kafka partitions of topic has been changed, return true. // else return false // update current kafka partition at the same time - // current kafka partitions = customKafkaPartitions == 0 ? all of partition of kafka topic : customKafkaPartitions + // current kafka partitions = customKafkaPartitions == 0 ? all partition of kafka topic : customKafkaPartitions @Override protected boolean unprotectNeedReschedule() throws UserException { // only running and need_schedule job need to be changed current kafka partitions if (this.state == JobState.RUNNING || this.state == JobState.NEED_SCHEDULE) { - if (customKafkaPartitions != null && customKafkaPartitions.size() != 0) { + if (CollectionUtils.isNotEmpty(customKafkaPartitions)) { currentKafkaPartitions = customKafkaPartitions; return false; - } else { - // the newCurrentKafkaPartition should be already updated in preCheckNeedScheduler() - Preconditions.checkNotNull(this.newCurrentKafkaPartition); - if (currentKafkaPartitions.containsAll(this.newCurrentKafkaPartition)) { - if (currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) { - currentKafkaPartitions = this.newCurrentKafkaPartition; - if (LOG.isDebugEnabled()) { - LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) - .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) - .add("msg", "current kafka partitions has been change") - .build()); - } - return true; - } else { - // if the partitions of currentKafkaPartitions and progress are inconsistent, - // We should also update the progress - for (Integer kafkaPartition : currentKafkaPartitions) { - if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { - return true; - } - } - return false; - } - } else { + } + // the newCurrentKafkaPartition should be already updated in preCheckNeedScheduler() + Preconditions.checkNotNull(this.newCurrentKafkaPartition); + if (new HashSet<>(currentKafkaPartitions).containsAll(this.newCurrentKafkaPartition)) { + if (currentKafkaPartitions.size() > this.newCurrentKafkaPartition.size()) { currentKafkaPartitions = this.newCurrentKafkaPartition; if (LOG.isDebugEnabled()) { LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) @@ -363,13 +351,33 @@ protected boolean unprotectNeedReschedule() throws UserException { .build()); } return true; + } else { + // if the partitions of currentKafkaPartitions and progress are inconsistent, + // We should also update the progress + for (Integer kafkaPartition : currentKafkaPartitions) { + if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) { + return true; + } + } + return false; + } + } else { + currentKafkaPartitions = this.newCurrentKafkaPartition; + if (LOG.isDebugEnabled()) { + LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id) + .add("current_kafka_partitions", Joiner.on(",").join(currentKafkaPartitions)) + .add("msg", "current kafka partitions has been change") + .build()); } + return true; } - } else if (this.state == JobState.PAUSED) { + + } + if (this.state == JobState.PAUSED) { return ScheduleRule.isNeedAutoSchedule(this); - } else { - return false; } + return false; + } @Override @@ -393,9 +401,10 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr // init kafka routine load job long id = Env.getCurrentEnv().getNextId(); + KafkaDataSourceProperties kafkaProperties = (KafkaDataSourceProperties) stmt.getDataSourceProperties(); KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(), db.getClusterName(), db.getId(), tableId, - stmt.getKafkaBrokerList(), stmt.getKafkaTopic(), stmt.getUserInfo()); + kafkaProperties.getBrokerList(), kafkaProperties.getTopic(), stmt.getUserInfo()); kafkaRoutineLoadJob.setOptional(stmt); kafkaRoutineLoadJob.checkCustomProperties(); kafkaRoutineLoadJob.checkCustomPartition(); @@ -484,27 +493,28 @@ private List> getNewPartitionOffsetsFromDefaultOffset(List> kafkaPartitionOffsets = stmt.getKafkaPartitionOffsets(); - boolean isForTimes = stmt.isOffsetsForTimes(); + // this is an unprotected method which is called in the initialization function + private void setCustomKafkaPartitions(KafkaDataSourceProperties kafkaDataSourceProperties) throws LoadException { + + List> kafkaPartitionOffsets = kafkaDataSourceProperties.getKafkaPartitionOffsets(); + boolean isForTimes = kafkaDataSourceProperties.isOffsetsForTimes(); if (isForTimes) { // the offset is set by date time, we need to get the real offset by time - kafkaPartitionOffsets = KafkaUtil.getOffsetsForTimes(stmt.getKafkaBrokerList(), stmt.getKafkaTopic(), - convertedCustomProperties, stmt.getKafkaPartitionOffsets()); + kafkaPartitionOffsets = KafkaUtil.getOffsetsForTimes(kafkaDataSourceProperties.getBrokerList(), + kafkaDataSourceProperties.getTopic(), + convertedCustomProperties, kafkaDataSourceProperties.getKafkaPartitionOffsets()); } for (Pair partitionOffset : kafkaPartitionOffsets) { @@ -590,8 +600,8 @@ public void readFields(DataInput in) throws IOException { @Override public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException { Map jobProperties = stmt.getAnalyzedJobProperties(); - RoutineLoadDataSourceProperties dataSourceProperties = stmt.getDataSourceProperties(); - if (dataSourceProperties.isOffsetsForTimes()) { + KafkaDataSourceProperties dataSourceProperties = (KafkaDataSourceProperties) stmt.getDataSourceProperties(); + if (null != dataSourceProperties && dataSourceProperties.isOffsetsForTimes()) { // if the partition offset is set by timestamp, convert it to real offset convertTimestampToOffset(dataSourceProperties); } @@ -612,7 +622,7 @@ public void modifyProperties(AlterRoutineLoadStmt stmt) throws UserException { } } - private void convertTimestampToOffset(RoutineLoadDataSourceProperties dataSourceProperties) throws UserException { + private void convertTimestampToOffset(KafkaDataSourceProperties dataSourceProperties) throws UserException { List> partitionOffsets = dataSourceProperties.getKafkaPartitionOffsets(); if (partitionOffsets.isEmpty()) { return; @@ -623,42 +633,40 @@ private void convertTimestampToOffset(RoutineLoadDataSourceProperties dataSource } private void modifyPropertiesInternal(Map jobProperties, - RoutineLoadDataSourceProperties dataSourceProperties) + KafkaDataSourceProperties dataSourceProperties) throws DdlException { + if (null != dataSourceProperties) { + List> kafkaPartitionOffsets = Lists.newArrayList(); + Map customKafkaProperties = Maps.newHashMap(); - List> kafkaPartitionOffsets = Lists.newArrayList(); - Map customKafkaProperties = Maps.newHashMap(); - - if (dataSourceProperties.hasAnalyzedProperties()) { - kafkaPartitionOffsets = dataSourceProperties.getKafkaPartitionOffsets(); - customKafkaProperties = dataSourceProperties.getCustomKafkaProperties(); - } + if (MapUtils.isNotEmpty(dataSourceProperties.getOriginalDataSourceProperties())) { + kafkaPartitionOffsets = dataSourceProperties.getKafkaPartitionOffsets(); + customKafkaProperties = dataSourceProperties.getCustomKafkaProperties(); + } - // modify partition offset first - if (!kafkaPartitionOffsets.isEmpty()) { - // we can only modify the partition that is being consumed - ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); - } + // modify partition offset first + if (!kafkaPartitionOffsets.isEmpty()) { + // we can only modify the partition that is being consumed + ((KafkaProgress) progress).modifyOffset(kafkaPartitionOffsets); + } - if (!customKafkaProperties.isEmpty()) { - this.customProperties.putAll(customKafkaProperties); - convertCustomProperties(true); + if (!customKafkaProperties.isEmpty()) { + this.customProperties.putAll(customKafkaProperties); + convertCustomProperties(true); + } + // modify broker list and topic + if (!Strings.isNullOrEmpty(dataSourceProperties.getBrokerList())) { + this.brokerList = dataSourceProperties.getBrokerList(); + } + if (!Strings.isNullOrEmpty(dataSourceProperties.getTopic())) { + this.topic = dataSourceProperties.getTopic(); + } } - if (!jobProperties.isEmpty()) { Map copiedJobProperties = Maps.newHashMap(jobProperties); modifyCommonJobProperties(copiedJobProperties); this.jobProperties.putAll(copiedJobProperties); } - - // modify broker list and topic - if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaBrokerList())) { - this.brokerList = dataSourceProperties.getKafkaBrokerList(); - } - if (!Strings.isNullOrEmpty(dataSourceProperties.getKafkaTopic())) { - this.topic = dataSourceProperties.getKafkaTopic(); - } - LOG.info("modify the properties of kafka routine load job: {}, jobProperties: {}, datasource properties: {}", this.id, jobProperties, dataSourceProperties); } @@ -666,7 +674,7 @@ private void modifyPropertiesInternal(Map jobProperties, @Override public void replayModifyProperties(AlterRoutineLoadJobOperationLog log) { try { - modifyPropertiesInternal(log.getJobProperties(), log.getDataSourceProperties()); + modifyPropertiesInternal(log.getJobProperties(), (KafkaDataSourceProperties) log.getDataSourceProperties()); } catch (DdlException e) { // should not happen LOG.error("failed to replay modify kafka routine load job: {}", id, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadDataSourcePropertyFactory.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadDataSourcePropertyFactory.java new file mode 100644 index 00000000000000..10f20a001280fa --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadDataSourcePropertyFactory.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload; + +import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; + +import java.util.Map; + +/** + * RoutineLoadDataSourcePropertyFactory is used to create data source properties + * for routine load job. + *

+ * Currently, we only support kafka data source. + * If we want to support more data source, we can add more data source properties here. + * And we can add more data source type in LoadDataSourceType. + * Then we can use this factory to create data source properties. + *

+ */ +public class RoutineLoadDataSourcePropertyFactory { + + public static AbstractDataSourceProperties createDataSource(String type, Map parameters) { + if (type.equals(LoadDataSourceType.KAFKA.name())) { + return new KafkaDataSourceProperties(parameters); + } + throw new IllegalArgumentException("Unknown routine load data source type: " + type); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 6937c9ad019658..5d4d2ecf8cf4e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -48,6 +48,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.load.routineload.kafka.KafkaConfiguration; import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.RoutineLoadOperation; @@ -74,6 +75,7 @@ import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -162,6 +164,7 @@ public boolean isFinalState() { protected Separator lineDelimiter; protected int desireTaskConcurrentNum; // optional protected JobState state = JobState.NEED_SCHEDULE; + @Getter protected LoadDataSourceType dataSourceType; // max number of error data in max batch rows * 10 // maxErrorNum / (maxBatchRows * 10) = max error rate of routine load job @@ -486,7 +489,7 @@ public boolean isStrictMode() { if (value == null) { return DEFAULT_STRICT_MODE; } - return Boolean.valueOf(value); + return Boolean.parseBoolean(value); } @Override @@ -543,17 +546,17 @@ public String getFormat() { @Override public boolean isStripOuterArray() { - return Boolean.valueOf(jobProperties.get(PROPS_STRIP_OUTER_ARRAY)); + return Boolean.parseBoolean(jobProperties.get(PROPS_STRIP_OUTER_ARRAY)); } @Override public boolean isNumAsString() { - return Boolean.valueOf(jobProperties.get(PROPS_NUM_AS_STRING)); + return Boolean.parseBoolean(jobProperties.get(PROPS_NUM_AS_STRING)); } @Override public boolean isFuzzyParse() { - return Boolean.valueOf(jobProperties.get(PROPS_FUZZY_PARSE)); + return Boolean.parseBoolean(jobProperties.get(PROPS_FUZZY_PARSE)); } @Override @@ -1422,11 +1425,11 @@ public String getShowCreateInfo() { getCustomProperties().forEach((k, v) -> appendProperties(sb, k, v, false)); if (progress instanceof KafkaProgress) { // append partitions and offsets. - // the offsets is the next offset to be consumed. + // the offsets are the next offset to be consumed. List> pairs = ((KafkaProgress) progress).getPartitionOffsetPairs(false); - appendProperties(sb, CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, + appendProperties(sb, KafkaConfiguration.KAFKA_PARTITIONS.getName(), Joiner.on(", ").join(pairs.stream().map(p -> p.first).toArray()), false); - appendProperties(sb, CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, + appendProperties(sb, KafkaConfiguration.KAFKA_OFFSETS.getName(), Joiner.on(", ").join(pairs.stream().map(p -> p.second).toArray()), false); } // remove the last "," @@ -1673,27 +1676,27 @@ protected void readFields(DataInput in) throws IOException { // for ALTER ROUTINE LOAD protected void modifyCommonJobProperties(Map jobProperties) { if (jobProperties.containsKey(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)) { - this.desireTaskConcurrentNum = Integer.valueOf( + this.desireTaskConcurrentNum = Integer.parseInt( jobProperties.remove(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)); } if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)) { - this.maxErrorNum = Long.valueOf( + this.maxErrorNum = Long.parseLong( jobProperties.remove(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)); } if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)) { - this.maxBatchIntervalS = Long.valueOf( + this.maxBatchIntervalS = Long.parseLong( jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY)); } if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)) { - this.maxBatchRows = Long.valueOf( + this.maxBatchRows = Long.parseLong( jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)); } if (jobProperties.containsKey(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)) { - this.maxBatchSizeBytes = Long.valueOf( + this.maxBatchSizeBytes = Long.parseLong( jobProperties.remove(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 2c9cf2e5cbe9b2..581b0375f2fab7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -549,7 +549,7 @@ public RoutineLoadJob getJob(String dbFullName, String jobName) throws MetaNotFo else return all of result */ public List getJob(String dbFullName, String jobName, - boolean includeHistory, PatternMatcher matcher) + boolean includeHistory, PatternMatcher matcher) throws MetaNotFoundException { Preconditions.checkArgument(jobName == null || matcher == null, "jobName and matcher cannot be not null at the same time"); @@ -739,8 +739,9 @@ public void replayChangeRoutineLoadJob(RoutineLoadOperation operation) { public void alterRoutineLoadJob(AlterRoutineLoadStmt stmt) throws UserException { RoutineLoadJob job = checkPrivAndGetJob(stmt.getDbName(), stmt.getLabel()); if (stmt.hasDataSourceProperty() - && !stmt.getDataSourceProperties().getType().equalsIgnoreCase(job.dataSourceType.name())) { - throw new DdlException("The specified job type is not: " + stmt.getDataSourceProperties().getType()); + && !stmt.getDataSourceProperties().getDataSourceType().equalsIgnoreCase(job.dataSourceType.name())) { + throw new DdlException("The specified job type is not: " + + stmt.getDataSourceProperties().getDataSourceType()); } job.modifyProperties(stmt); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfiguration.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfiguration.java new file mode 100644 index 00000000000000..8fb65fbe0cacdd --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaConfiguration.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload.kafka; + +import com.google.common.base.Splitter; + +import java.util.Arrays; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum KafkaConfiguration { + + KAFKA_BROKER_LIST("kafka_broker_list", null, value -> value.replace(" ", "")), + + KAFKA_TOPIC("kafka_topic", null, value -> value.replace(" ", "")), + + KAFKA_PARTITIONS("kafka_partitions", null, partitionsString -> + Arrays.stream(partitionsString.replace(" ", "").split(",")) + .map(Integer::parseInt) + .collect(Collectors.toList())), + + KAFKA_OFFSETS("kafka_offsets", null, offsetsString -> Splitter.on(",").trimResults().splitToList(offsetsString)), + + KAFKA_DEFAULT_OFFSETS("kafka_default_offsets", "OFFSET_END", offset -> offset), + KAFKA_ORIGIN_DEFAULT_OFFSETS("kafka_origin_default_offsets", null, offset -> offset); + + private final String name; + + public String getName() { + return name; + } + + private final Object defaultValue; + + private final Function converter; + + KafkaConfiguration(String name, T defaultValue, Function converter) { + this.name = name; + this.defaultValue = defaultValue; + this.converter = (Function) converter; + } + + KafkaConfiguration getByName(String name) { + return Arrays.stream(KafkaConfiguration.values()) + .filter(config -> config.getName().equals(name)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("Unknown configuration " + name)); + } + + + public T getParameterValue(String param) { + Object value = param != null ? converter.apply(param) : defaultValue; + return (T) value; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaDataSourceProperties.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaDataSourceProperties.java new file mode 100644 index 00000000000000..42e5f3e03ff7c6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/kafka/KafkaDataSourceProperties.java @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.routineload.kafka; + +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.routineload.AbstractDataSourceProperties; +import org.apache.doris.load.routineload.KafkaProgress; +import org.apache.doris.load.routineload.LoadDataSourceType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; +import java.util.regex.Pattern; + +/** + * Kafka data source properties + */ +public class KafkaDataSourceProperties extends AbstractDataSourceProperties { + + private static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]"; + + private static final String CUSTOM_KAFKA_PROPERTY_PREFIX = "property."; + + @Getter + @Setter + @SerializedName(value = "kafkaPartitionOffsets") + private List> kafkaPartitionOffsets = Lists.newArrayList(); + + @Getter + @SerializedName(value = "customKafkaProperties") + private Map customKafkaProperties; + + @Getter + @SerializedName(value = "isOffsetsForTimes") + private boolean isOffsetsForTimes = false; + + @Getter + @SerializedName(value = "brokerList") + private String brokerList; + + @Getter + @SerializedName(value = "topic") + private String topic; + + private static final ImmutableSet CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET = + new ImmutableSet.Builder().add(KafkaConfiguration.KAFKA_BROKER_LIST.getName()) + .add(KafkaConfiguration.KAFKA_TOPIC.getName()) + .add(KafkaConfiguration.KAFKA_PARTITIONS.getName()) + .add(KafkaConfiguration.KAFKA_OFFSETS.getName()) + .add(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName()).build(); + + public KafkaDataSourceProperties(Map dataSourceProperties) { + super(dataSourceProperties); + } + + @Override + protected String getDataSourceType() { + return LoadDataSourceType.KAFKA.name(); + } + + @Override + protected List getRequiredProperties() { + return Arrays.asList(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), KafkaConfiguration.KAFKA_TOPIC.getName()); + } + + @Override + public void convertAndCheckDataSourceProperties() throws UserException { + Optional optional = originalDataSourceProperties.keySet() + .stream().filter(entity -> !CONFIGURABLE_DATA_SOURCE_PROPERTIES_SET.contains(entity)) + .filter(entity -> !entity.startsWith(CUSTOM_KAFKA_PROPERTY_PREFIX)).findFirst(); + if (optional.isPresent()) { + throw new AnalysisException(optional.get() + " is invalid kafka property or can not be set"); + } + + this.brokerList = KafkaConfiguration.KAFKA_BROKER_LIST.getParameterValue(originalDataSourceProperties + .get(KafkaConfiguration.KAFKA_BROKER_LIST.getName())); + if (!isAlter() && StringUtils.isBlank(brokerList)) { + throw new AnalysisException(KafkaConfiguration.KAFKA_BROKER_LIST.getName() + " is a required property"); + } + //check broker list + if (StringUtils.isNotBlank(brokerList)) { + for (String broker : brokerList.split(",")) { + if (!Pattern.matches(ENDPOINT_REGEX, broker)) { + throw new AnalysisException(KafkaConfiguration.KAFKA_BROKER_LIST + + ":" + broker + " not match pattern " + ENDPOINT_REGEX); + } + } + } + //check topic + this.topic = KafkaConfiguration.KAFKA_TOPIC.getParameterValue(originalDataSourceProperties + .get(KafkaConfiguration.KAFKA_TOPIC.getName())); + if (!isAlter() && StringUtils.isBlank(topic)) { + throw new AnalysisException(KafkaConfiguration.KAFKA_TOPIC.getName() + " is a required property"); + } + // check custom kafka property + // This should be done before check partition and offsets, because we need KAFKA_DEFAULT_OFFSETS, + // which is in custom properties. + analyzeCustomProperties(); + + List partitions = KafkaConfiguration.KAFKA_PARTITIONS.getParameterValue(originalDataSourceProperties + .get(KafkaConfiguration.KAFKA_PARTITIONS.getName())); + if (CollectionUtils.isNotEmpty(partitions)) { + analyzeKafkaPartitionProperty(partitions); + } + //check offset + List offsets = KafkaConfiguration.KAFKA_OFFSETS.getParameterValue(originalDataSourceProperties + .get(KafkaConfiguration.KAFKA_OFFSETS.getName())); + String defaultOffsetString = originalDataSourceProperties + .get(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName()); + if (CollectionUtils.isNotEmpty(offsets) && StringUtils.isNotBlank(defaultOffsetString)) { + throw new AnalysisException("Only one of " + KafkaConfiguration.KAFKA_OFFSETS.getName() + " and " + + KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName() + " can be set."); + } + if (isAlter() && CollectionUtils.isNotEmpty(partitions) && CollectionUtils.isEmpty(offsets) + && StringUtils.isBlank(defaultOffsetString)) { + // if this is an alter operation, the partition and (default)offset must be set together. + throw new AnalysisException("Must set offset or default offset with partition property"); + } + if (CollectionUtils.isNotEmpty(offsets)) { + this.isOffsetsForTimes = analyzeKafkaOffsetProperty(offsets); + return; + } + this.isOffsetsForTimes = analyzeKafkaDefaultOffsetProperty(); + if (CollectionUtils.isNotEmpty(kafkaPartitionOffsets)) { + defaultOffsetString = customKafkaProperties.get(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName()); + setDefaultOffsetForPartition(this.kafkaPartitionOffsets, defaultOffsetString, this.isOffsetsForTimes); + } + + } + + private static void setDefaultOffsetForPartition(List> kafkaPartitionOffsets, + String kafkaDefaultOffsetString, boolean isOffsetsForTimes) { + if (isOffsetsForTimes) { + for (Pair pair : kafkaPartitionOffsets) { + pair.second = Long.valueOf(kafkaDefaultOffsetString); + } + } else { + for (Pair pair : kafkaPartitionOffsets) { + if (kafkaDefaultOffsetString.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { + pair.second = KafkaProgress.OFFSET_BEGINNING_VAL; + } else { + pair.second = KafkaProgress.OFFSET_END_VAL; + } + } + } + } + + // init "kafkaPartitionOffsets" with partition property. + // The offset will be set to OFFSET_END for now, and will be changed in later analysis process. + private void analyzeKafkaPartitionProperty(List partitions) { + partitions.forEach(partition -> this.kafkaPartitionOffsets + .add(Pair.of(partition, KafkaProgress.OFFSET_END_VAL))); + } + + private void analyzeCustomProperties() throws AnalysisException { + this.customKafkaProperties = new HashMap<>(); + for (Map.Entry dataSourceProperty : originalDataSourceProperties.entrySet()) { + if (dataSourceProperty.getKey().startsWith(CUSTOM_KAFKA_PROPERTY_PREFIX)) { + String propertyKey = dataSourceProperty.getKey(); + String propertyValue = dataSourceProperty.getValue(); + String[] propertyValueArr = propertyKey.split("\\."); + if (propertyValueArr.length < 2) { + throw new AnalysisException("kafka property value could not be a empty string"); + } + this.customKafkaProperties.put(propertyKey.substring(propertyKey.indexOf(".") + 1), propertyValue); + } + // can be extended in the future which other prefix + } + } + + // Fill the partition's offset with given kafkaOffsetsString, + // Return true if offset is specified by timestamp. + private boolean analyzeKafkaOffsetProperty(List kafkaOffsetsStringList) throws UserException { + if (kafkaOffsetsStringList.size() != kafkaPartitionOffsets.size()) { + throw new AnalysisException("Partitions number should be equals to offsets number"); + } + + // We support two ways to specify the offset, + // one is to specify the offset directly, the other is to specify a timestamp. + // Doris will get the offset of the corresponding partition through the timestamp. + // The user can only choose one of these methods. + boolean foundTime = false; + boolean foundOffset = false; + for (String kafkaOffsetsStr : kafkaOffsetsStringList) { + if (TimeUtils.timeStringToLong(kafkaOffsetsStr) != -1) { + foundTime = true; + } else { + foundOffset = true; + } + } + if (foundTime && foundOffset) { + throw new AnalysisException("The offset of the partition cannot be specified by the timestamp " + + "and the offset at the same time"); + } + + if (foundTime) { + // convert all datetime strs to timestamps + // and set them as the partition's offset. + // These timestamps will be converted to real offset when job is running. + TimeZone timeZone = TimeUtils.getOrSystemTimeZone(getTimezone()); + for (int i = 0; i < kafkaOffsetsStringList.size(); i++) { + String kafkaOffsetsStr = kafkaOffsetsStringList.get(i); + long timestamp = TimeUtils.timeStringToLong(kafkaOffsetsStr, timeZone); + Preconditions.checkState(timestamp != -1); + kafkaPartitionOffsets.get(i).second = timestamp; + } + } else { + for (int i = 0; i < kafkaOffsetsStringList.size(); i++) { + String kafkaOffsetsStr = kafkaOffsetsStringList.get(i); + if (kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) { + kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_BEGINNING_VAL; + } else if (kafkaOffsetsStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) { + kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_END_VAL; + } else if (NumberUtils.isDigits(kafkaOffsetsStr)) { + kafkaPartitionOffsets.get(i).second = NumberUtils.toLong(kafkaOffsetsStr); + } else { + throw new AnalysisException(KafkaConfiguration.KAFKA_OFFSETS.getName() + + " must be an integer or a date time"); + } + } + } + return foundTime; + } + + // If the default offset is not set, set the default offset to OFFSET_END. + // If the offset is in datetime format, convert it to a timestamp, + // and also save the origin datatime formatted offset + // in "customKafkaProperties" + // return true if the offset is in datetime format. + private boolean analyzeKafkaDefaultOffsetProperty() throws AnalysisException { + customKafkaProperties.putIfAbsent(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName(), KafkaProgress.OFFSET_END); + String defaultOffsetStr = customKafkaProperties.get(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName()); + TimeZone timeZone = TimeUtils.getOrSystemTimeZone(this.getTimezone()); + long defaultOffset = TimeUtils.timeStringToLong(defaultOffsetStr, timeZone); + if (defaultOffset != -1) { + // this is a datetime format offset + customKafkaProperties.put(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName(), + String.valueOf(defaultOffset)); + // we convert datetime to timestamp, and save the origin datetime formatted offset for further use. + customKafkaProperties.put(KafkaConfiguration.KAFKA_ORIGIN_DEFAULT_OFFSETS.getName(), defaultOffsetStr); + return true; + } else { + if (!defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING) + && !defaultOffsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) { + throw new AnalysisException(KafkaConfiguration.KAFKA_DEFAULT_OFFSETS.getName() + + " can only be set to OFFSET_BEGINNING, OFFSET_END or date time"); + } + return false; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java index 78b1d5f1c982a5..4729882f7927fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AlterRoutineLoadJobOperationLog.java @@ -17,9 +17,9 @@ package org.apache.doris.persist; -import org.apache.doris.analysis.RoutineLoadDataSourceProperties; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.load.routineload.AbstractDataSourceProperties; import org.apache.doris.persist.gson.GsonUtils; import com.google.gson.annotations.SerializedName; @@ -29,17 +29,17 @@ import java.io.IOException; import java.util.Map; -public class AlterRoutineLoadJobOperationLog implements Writable { +public class AlterRoutineLoadJobOperationLog implements Writable { @SerializedName(value = "jobId") private long jobId; @SerializedName(value = "jobProperties") private Map jobProperties; @SerializedName(value = "dataSourceProperties") - private RoutineLoadDataSourceProperties dataSourceProperties; + private AbstractDataSourceProperties dataSourceProperties; public AlterRoutineLoadJobOperationLog(long jobId, Map jobProperties, - RoutineLoadDataSourceProperties dataSourceProperties) { + AbstractDataSourceProperties dataSourceProperties) { this.jobId = jobId; this.jobProperties = jobProperties; this.dataSourceProperties = dataSourceProperties; @@ -53,7 +53,7 @@ public Map getJobProperties() { return jobProperties; } - public RoutineLoadDataSourceProperties getDataSourceProperties() { + public AbstractDataSourceProperties getDataSourceProperties() { return dataSourceProperties; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 60fd334b7f899a..d90a9de9997a8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -62,6 +62,8 @@ import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog; import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo; +import org.apache.doris.load.routineload.AbstractDataSourceProperties; +import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.load.sync.SyncJob; import org.apache.doris.load.sync.canal.CanalSyncJob; import org.apache.doris.policy.Policy; @@ -188,6 +190,11 @@ public class GsonUtils { .registerSubtype(IcebergRestExternalCatalog.class, IcebergRestExternalCatalog.class.getSimpleName()) .registerSubtype(IcebergDLFExternalCatalog.class, IcebergDLFExternalCatalog.class.getSimpleName()) .registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName()); + // routine load data source + private static RuntimeTypeAdapterFactory rdsTypeAdapterFactory = + RuntimeTypeAdapterFactory.of( + AbstractDataSourceProperties.class, "clazz") + .registerSubtype(KafkaDataSourceProperties.class, KafkaDataSourceProperties.class.getSimpleName()); private static RuntimeTypeAdapterFactory dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of( DatabaseIf.class, "clazz") @@ -229,6 +236,7 @@ public class GsonUtils { .registerTypeAdapterFactory(policyTypeAdapterFactory).registerTypeAdapterFactory(dsTypeAdapterFactory) .registerTypeAdapterFactory(dbTypeAdapterFactory).registerTypeAdapterFactory(tblTypeAdapterFactory) .registerTypeAdapterFactory(hbResponseTypeAdapterFactory) + .registerTypeAdapterFactory(rdsTypeAdapterFactory) .registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()) .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter()); @@ -460,7 +468,7 @@ public JsonElement serialize(AtomicBoolean atomicBoolean, Type type, public static final class ImmutableMapDeserializer implements JsonDeserializer> { @Override public ImmutableMap deserialize(final JsonElement json, final Type type, - final JsonDeserializationContext context) throws JsonParseException { + final JsonDeserializationContext context) throws JsonParseException { final Type type2 = TypeUtils.parameterize(Map.class, ((ParameterizedType) type).getActualTypeArguments()); final Map map = context.deserialize(json, type2); return ImmutableMap.copyOf(map); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java index e3405b4096af10..9a4b7d19e037b2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/AlterRoutineLoadStmtTest.java @@ -17,9 +17,14 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.load.routineload.KafkaRoutineLoadJob; +import org.apache.doris.load.routineload.kafka.KafkaConfiguration; +import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -31,13 +36,9 @@ import org.junit.Before; import org.junit.Test; +import java.util.HashMap; import java.util.Map; -/* - * Author: Chenmingyu - * Date: Jul 20, 2020 - */ - public class AlterRoutineLoadStmtTest { private Analyzer analyzer; @@ -46,7 +47,7 @@ public class AlterRoutineLoadStmtTest { private AccessControllerManager accessManager; @Before - public void setUp() { + public void setUp() throws MetaNotFoundException { analyzer = AccessTestUtil.fetchAdminAnalyzer(false); FeConstants.runningUnitTest = true; new Expectations() { @@ -67,52 +68,61 @@ public void setUp() { } @Test - public void testNormal() { + public void testNormal() throws UserException { + new Expectations() { + { + Env.getCurrentEnv().getRoutineLoadManager() + .getJob(anyString, anyString); + minTimes = 0; + result = new KafkaRoutineLoadJob(); + } + }; { // CHECKSTYLE IGNORE THIS LINE Map jobProperties = Maps.newHashMap(); jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY, "200000"); - String typeName = "kafka"; Map dataSourceProperties = Maps.newHashMap(); dataSourceProperties.put("property.client.id", "101"); dataSourceProperties.put("property.group.id", "mygroup"); - dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); - dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000, 30000"); - RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties, true); + dataSourceProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), "1,2,3"); + dataSourceProperties.put(KafkaConfiguration.KAFKA_OFFSETS.getName(), "10000, 20000, 30000"); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), - jobProperties, routineLoadDataSourceProperties); - try { - stmt.analyze(analyzer); - } catch (UserException e) { - Assert.fail(); - } - + jobProperties, dataSourceProperties); + stmt.analyze(analyzer); Assert.assertEquals(2, stmt.getAnalyzedJobProperties().size()); Assert.assertTrue(stmt.getAnalyzedJobProperties().containsKey(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY)); Assert.assertTrue(stmt.getAnalyzedJobProperties().containsKey(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY)); Assert.assertTrue(stmt.hasDataSourceProperty()); - Assert.assertEquals(2, stmt.getDataSourceProperties().getCustomKafkaProperties().size()); - Assert.assertTrue(stmt.getDataSourceProperties().getCustomKafkaProperties().containsKey("group.id")); - Assert.assertTrue(stmt.getDataSourceProperties().getCustomKafkaProperties().containsKey("client.id")); - Assert.assertEquals(3, stmt.getDataSourceProperties().getKafkaPartitionOffsets().size()); + KafkaDataSourceProperties kafkaDataSourceProperties = (KafkaDataSourceProperties) stmt.getDataSourceProperties(); + Assert.assertEquals(2, kafkaDataSourceProperties.getCustomKafkaProperties().size()); + Assert.assertTrue(kafkaDataSourceProperties.getCustomKafkaProperties().containsKey("group.id")); + Assert.assertTrue(kafkaDataSourceProperties.getCustomKafkaProperties().containsKey("client.id")); + Assert.assertEquals(3, kafkaDataSourceProperties.getKafkaPartitionOffsets().size()); } // CHECKSTYLE IGNORE THIS LINE } @Test(expected = AnalysisException.class) - public void testNoProperties() throws AnalysisException, UserException { + public void testNoProperties() throws UserException { AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), - Maps.newHashMap(), new RoutineLoadDataSourceProperties()); + Maps.newHashMap(), new HashMap<>()); stmt.analyze(analyzer); } @Test - public void testUnsupportedProperties() { + public void testUnsupportedProperties() throws MetaNotFoundException { + new Expectations() { + { + Env.getCurrentEnv().getRoutineLoadManager() + .getJob(anyString, anyString); + minTimes = 0; + result = new KafkaRoutineLoadJob(); + } + }; { // CHECKSTYLE IGNORE THIS LINE Map jobProperties = Maps.newHashMap(); jobProperties.put(CreateRoutineLoadStmt.FORMAT, "csv"); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), - jobProperties, new RoutineLoadDataSourceProperties()); + jobProperties, new HashMap<>()); try { stmt.analyze(analyzer); Assert.fail(); @@ -127,13 +137,11 @@ public void testUnsupportedProperties() { { // CHECKSTYLE IGNORE THIS LINE Map jobProperties = Maps.newHashMap(); jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); - String typeName = "kafka"; Map dataSourceProperties = Maps.newHashMap(); - dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "new_topic"); - RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties, true); + dataSourceProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), "new_topic"); + //fixme: should be failed AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), - jobProperties, routineLoadDataSourceProperties); + jobProperties, dataSourceProperties); try { stmt.analyze(analyzer); @@ -148,13 +156,10 @@ public void testUnsupportedProperties() { { // CHECKSTYLE IGNORE THIS LINE Map jobProperties = Maps.newHashMap(); jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); - String typeName = "kafka"; Map dataSourceProperties = Maps.newHashMap(); - dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); - RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties, true); + dataSourceProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), "1,2,3"); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), - jobProperties, routineLoadDataSourceProperties); + jobProperties, dataSourceProperties); try { stmt.analyze(analyzer); Assert.fail(); @@ -168,14 +173,11 @@ public void testUnsupportedProperties() { { // CHECKSTYLE IGNORE THIS LINE Map jobProperties = Maps.newHashMap(); jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); - String typeName = "kafka"; Map dataSourceProperties = Maps.newHashMap(); - dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); - dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000"); - RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties, true); + dataSourceProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), "1,2,3"); + dataSourceProperties.put(KafkaConfiguration.KAFKA_OFFSETS.getName(), "1000, 2000"); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), - jobProperties, routineLoadDataSourceProperties); + jobProperties, dataSourceProperties); try { stmt.analyze(analyzer); Assert.fail(); @@ -189,13 +191,10 @@ public void testUnsupportedProperties() { { // CHECKSTYLE IGNORE THIS LINE Map jobProperties = Maps.newHashMap(); jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); - String typeName = "kafka"; Map dataSourceProperties = Maps.newHashMap(); - dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1000, 2000, 3000"); - RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties, true); + dataSourceProperties.put(KafkaConfiguration.KAFKA_OFFSETS.getName(), "1000, 2000, 3000"); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), - jobProperties, routineLoadDataSourceProperties); + jobProperties, dataSourceProperties); try { stmt.analyze(analyzer); Assert.fail(); @@ -210,16 +209,13 @@ public void testUnsupportedProperties() { Map jobProperties = Maps.newHashMap(); jobProperties.put(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY, "100"); jobProperties.put(CreateRoutineLoadStmt.MAX_BATCH_SIZE_PROPERTY, "200000"); - String typeName = "kafka"; Map dataSourceProperties = Maps.newHashMap(); dataSourceProperties.put("property.client.id", "101"); dataSourceProperties.put("property.group.id", "mygroup"); - dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3"); - dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000, 30000"); - RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties( - typeName, dataSourceProperties, true); + dataSourceProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), "1,2,3"); + dataSourceProperties.put(KafkaConfiguration.KAFKA_OFFSETS.getName(), "10000, 20000, 30000"); AlterRoutineLoadStmt stmt = new AlterRoutineLoadStmt(new LabelName("db1", "label1"), - jobProperties, routineLoadDataSourceProperties); + jobProperties, dataSourceProperties); try { stmt.analyze(analyzer); Assert.fail(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java index e2b16f17df3b65..a59314952ad0e7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java @@ -25,6 +25,8 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.LoadDataSourceType; +import org.apache.doris.load.routineload.kafka.KafkaConfiguration; +import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; @@ -116,9 +118,9 @@ public void testAnalyzeWithDuplicateProperty(@Injectable Analyzer analyzer) thro String typeName = LoadDataSourceType.KAFKA.name(); Map customProperties = Maps.newHashMap(); - customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); - customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); - customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); + customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), topicName); + customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), serverAddress); + customProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), kafkaPartitionString); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, @@ -166,9 +168,9 @@ public void testAnalyze(@Injectable Analyzer analyzer, String typeName = LoadDataSourceType.KAFKA.name(); Map customProperties = Maps.newHashMap(); - customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); - customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); - customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); + customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), topicName); + customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), serverAddress); + customProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), kafkaPartitionString); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, @@ -197,8 +199,9 @@ public void analyze(Analyzer analyzer1) { Assert.assertEquals(partitionNames.getPartitionNames(), createRoutineLoadStmt.getRoutineLoadDesc().getPartitionNames().getPartitionNames()); Assert.assertEquals(2, createRoutineLoadStmt.getDesiredConcurrentNum()); Assert.assertEquals(0, createRoutineLoadStmt.getMaxErrorNum()); - Assert.assertEquals(serverAddress, createRoutineLoadStmt.getKafkaBrokerList()); - Assert.assertEquals(topicName, createRoutineLoadStmt.getKafkaTopic()); + KafkaDataSourceProperties kafkaDataSourceProperties = (KafkaDataSourceProperties) createRoutineLoadStmt.getDataSourceProperties(); + Assert.assertEquals(serverAddress, kafkaDataSourceProperties.getBrokerList()); + Assert.assertEquals(topicName, kafkaDataSourceProperties.getTopic()); Assert.assertEquals("+08:00", createRoutineLoadStmt.getTimezone()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java deleted file mode 100644 index 6ceac2c71c8e2a..00000000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/RoutineLoadDataSourcePropertiesTest.java +++ /dev/null @@ -1,348 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.analysis; - -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Pair; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.load.routineload.KafkaProgress; - -import com.google.common.collect.Maps; -import org.junit.Assert; -import org.junit.Test; - -import java.util.List; -import java.util.Map; - -public class RoutineLoadDataSourcePropertiesTest { - - @Test - public void testCreateNormal() throws UserException { - // normal - Map properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); - properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); - properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); - properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "100, 101, 102"); - RoutineLoadDataSourceProperties dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - dsProperties.analyze(); - Assert.assertEquals("127.0.0.1:8080", dsProperties.getKafkaBrokerList()); - Assert.assertEquals("test", dsProperties.getKafkaTopic()); - List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); - Assert.assertEquals(3, partitinOffsets.size()); - Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); - Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); - Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); - Assert.assertEquals(Long.valueOf(100), partitinOffsets.get(0).second); - Assert.assertEquals(Long.valueOf(101), partitinOffsets.get(1).second); - Assert.assertEquals(Long.valueOf(102), partitinOffsets.get(2).second); - Assert.assertFalse(dsProperties.isOffsetsForTimes()); - } catch (AnalysisException e) { - Assert.fail(e.getMessage()); - } - - // normal, with datetime - properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); - properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); - properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); - properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00"); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - dsProperties.analyze(); - Assert.assertEquals("127.0.0.1:8080", dsProperties.getKafkaBrokerList()); - Assert.assertEquals("test", dsProperties.getKafkaTopic()); - List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); - Assert.assertEquals(3, partitinOffsets.size()); - Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); - Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); - Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); - Assert.assertEquals(Long.valueOf(1633834800000L), partitinOffsets.get(0).second); - Assert.assertEquals(Long.valueOf(1633834800000L), partitinOffsets.get(1).second); - Assert.assertEquals(Long.valueOf(1633838400000L), partitinOffsets.get(2).second); - Assert.assertTrue(dsProperties.isOffsetsForTimes()); - } catch (AnalysisException e) { - Assert.fail(e.getMessage()); - } - - // normal, with default offset as datetime - properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); - properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); - properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); - properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00"); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - dsProperties.analyze(); - Assert.assertEquals("127.0.0.1:8080", dsProperties.getKafkaBrokerList()); - Assert.assertEquals("test", dsProperties.getKafkaTopic()); - List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); - Assert.assertEquals(3, partitinOffsets.size()); - Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); - Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); - Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); - Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(0).second); - Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(1).second); - Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(2).second); - Assert.assertEquals(2, dsProperties.getCustomKafkaProperties().size()); - Assert.assertEquals("1578585600000", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)); - Assert.assertEquals("2020-01-10 00:00:00", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS)); - Assert.assertTrue(dsProperties.isOffsetsForTimes()); - } catch (AnalysisException e) { - Assert.fail(e.getMessage()); - } - - // normal, only set default offset as datetime - properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); - properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); - properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00"); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - dsProperties.analyze(); - Assert.assertEquals("127.0.0.1:8080", dsProperties.getKafkaBrokerList()); - Assert.assertEquals("test", dsProperties.getKafkaTopic()); - List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); - Assert.assertEquals(0, partitinOffsets.size()); - Assert.assertEquals(2, dsProperties.getCustomKafkaProperties().size()); - Assert.assertEquals("1578585600000", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)); - Assert.assertEquals("2020-01-10 00:00:00", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS)); - } catch (AnalysisException e) { - Assert.fail(e.getMessage()); - } - - // normal, only set default offset as integer - properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); - properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); - properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, KafkaProgress.OFFSET_END); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - dsProperties.analyze(); - Assert.assertEquals("127.0.0.1:8080", dsProperties.getKafkaBrokerList()); - Assert.assertEquals("test", dsProperties.getKafkaTopic()); - List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); - Assert.assertEquals(0, partitinOffsets.size()); - Assert.assertEquals(1, dsProperties.getCustomKafkaProperties().size()); - Assert.assertEquals(KafkaProgress.OFFSET_END, dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)); - } catch (AnalysisException e) { - Assert.fail(e.getMessage()); - } - } - - @Test - public void testCreateAbnormal() { - // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS together - Map properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); - properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); - properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); - properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1, 1"); - properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1"); - RoutineLoadDataSourceProperties dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - dsProperties.analyze(); - Assert.fail(); - } catch (UserException e) { - Assert.assertTrue(e.getMessage().contains("Only one of kafka_offsets and kafka_default_offsets can be set.")); - } - - // can not set datetime formatted offset and integer offset together - properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); - properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); - properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); - properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 2020-10-10 12:11:11, 1"); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - dsProperties.analyze(); - Assert.fail(); - } catch (UserException e) { - Assert.assertTrue(e.getMessage().contains( - "The offset of the partition cannot be specified by the timestamp " - + "and the offset at the same time")); - } - - // no partitions but has offset - properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); - properties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "test"); - properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1, 1"); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, false); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - dsProperties.analyze(); - Assert.fail(); - } catch (UserException e) { - Assert.assertTrue(e.getMessage().contains("Partitions number should be equals to offsets number")); - } - } - - @Test - public void testAlterNormal() throws UserException { - // normal - Map properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); - properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "100, 101, 102"); - RoutineLoadDataSourceProperties dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - dsProperties.analyze(); - Assert.assertEquals("", dsProperties.getKafkaBrokerList()); - Assert.assertEquals("", dsProperties.getKafkaTopic()); - List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); - Assert.assertEquals(3, partitinOffsets.size()); - Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); - Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); - Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); - Assert.assertEquals(Long.valueOf(100), partitinOffsets.get(0).second); - Assert.assertEquals(Long.valueOf(101), partitinOffsets.get(1).second); - Assert.assertEquals(Long.valueOf(102), partitinOffsets.get(2).second); - } catch (AnalysisException e) { - Assert.fail(e.getMessage()); - } - - // normal, with datetime - properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); - properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "2021-10-10 11:00:00, 2021-10-10 11:00:00, 2021-10-10 12:00:00"); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS togather - dsProperties.analyze(); - List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); - Assert.assertEquals(3, partitinOffsets.size()); - Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); - Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); - Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); - Assert.assertEquals(Long.valueOf(1633834800000L), partitinOffsets.get(0).second); - Assert.assertEquals(Long.valueOf(1633834800000L), partitinOffsets.get(1).second); - Assert.assertEquals(Long.valueOf(1633838400000L), partitinOffsets.get(2).second); - } catch (AnalysisException e) { - Assert.fail(e.getMessage()); - } - - // normal, with default offset as datetime - properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); - properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00"); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS togather - dsProperties.analyze(); - List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); - Assert.assertEquals(3, partitinOffsets.size()); - Assert.assertEquals(Integer.valueOf(0), partitinOffsets.get(0).first); - Assert.assertEquals(Integer.valueOf(1), partitinOffsets.get(1).first); - Assert.assertEquals(Integer.valueOf(2), partitinOffsets.get(2).first); - Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(0).second); - Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(1).second); - Assert.assertEquals(Long.valueOf(1578585600000L), partitinOffsets.get(2).second); - Assert.assertEquals(2, dsProperties.getCustomKafkaProperties().size()); - Assert.assertEquals("1578585600000", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)); - Assert.assertEquals("2020-01-10 00:00:00", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS)); - } catch (AnalysisException e) { - Assert.fail(e.getMessage()); - } - - // normal, only set default offset, with utc timezone - properties = Maps.newHashMap(); - properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "2020-01-10 00:00:00"); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); - dsProperties.setTimezone(TimeUtils.UTC_TIME_ZONE); - try { - // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS togather - dsProperties.analyze(); - List> partitinOffsets = dsProperties.getKafkaPartitionOffsets(); - Assert.assertEquals(0, partitinOffsets.size()); - Assert.assertEquals(2, dsProperties.getCustomKafkaProperties().size()); - Assert.assertEquals("1578614400000", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)); - Assert.assertEquals("2020-01-10 00:00:00", dsProperties.getCustomKafkaProperties().get(CreateRoutineLoadStmt.KAFKA_ORIGIN_DEFAULT_OFFSETS)); - } catch (AnalysisException e) { - Assert.fail(e.getMessage()); - } - } - - @Test - public void testAlterAbnormal() { - // now support set KAFKA_BROKER_LIST_PROPERTY - Map properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080"); - properties.put("property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS, "-1"); - RoutineLoadDataSourceProperties dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - dsProperties.analyze(); - Assert.fail(); - } catch (UserException e) { - Assert.assertTrue(e.getMessage().contains("kafka_default_offsets can only be set to OFFSET_BEGINNING, OFFSET_END or date time")); - } - - // can not set datetime formatted offset and integer offset together - properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1, 2"); - properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 2020-10-10 12:11:11, 1"); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS togather - dsProperties.analyze(); - Assert.fail(); - } catch (UserException e) { - Assert.assertTrue(e.getMessage().contains( - "The offset of the partition cannot be specified by the timestamp " - + "and the offset at the same time")); - } - - // no partitions but has offset - properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "1, 1, 1"); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - // can not set KAFKA_OFFSETS_PROPERTY and KAFKA_DEFAULT_OFFSETS togather - dsProperties.analyze(); - Assert.fail(); - } catch (UserException e) { - Assert.assertTrue(e.getMessage().contains("Partitions number should be equals to offsets number")); - } - - // only set partition - properties = Maps.newHashMap(); - properties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1, 1, 1"); - dsProperties = new RoutineLoadDataSourceProperties("KAFKA", properties, true); - dsProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); - try { - dsProperties.analyze(); - Assert.fail(); - } catch (UserException e) { - Assert.assertTrue(e.getMessage().contains("Must set offset or default offset with partition property")); - } - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 54021122edd36e..2c0a1d13d45b50 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -22,7 +22,6 @@ import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.ParseNode; import org.apache.doris.analysis.PartitionNames; -import org.apache.doris.analysis.RoutineLoadDataSourceProperties; import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; @@ -39,6 +38,8 @@ import org.apache.doris.common.util.KafkaUtil; import org.apache.doris.load.RoutineLoadDesc; import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.load.routineload.kafka.KafkaConfiguration; +import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TResourceInfo; @@ -255,10 +256,10 @@ public void testFromCreateStmt(@Mocked Env env, PartitionInfo partitionInfo = new PartitionInfo(topicName, Integer.valueOf(s), null, null, null); kafkaPartitionInfoList.add(partitionInfo); } - RoutineLoadDataSourceProperties dsProperties = new RoutineLoadDataSourceProperties(); + KafkaDataSourceProperties dsProperties = new KafkaDataSourceProperties(null); dsProperties.setKafkaPartitionOffsets(partitionIdToOffset); - Deencapsulation.setField(dsProperties, "kafkaBrokerList", serverAddress); - Deencapsulation.setField(dsProperties, "kafkaTopic", topicName); + Deencapsulation.setField(dsProperties, "brokerList", serverAddress); + Deencapsulation.setField(dsProperties, "topic", topicName); Deencapsulation.setField(createRoutineLoadStmt, "dataSourceProperties", dsProperties); long dbId = 1L; @@ -309,9 +310,9 @@ private CreateRoutineLoadStmt initCreateRoutineLoadStmt() { String typeName = LoadDataSourceType.KAFKA.name(); Map customProperties = Maps.newHashMap(); - customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); - customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); - customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, kafkaPartitionString); + customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), topicName); + customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), serverAddress); + customProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), kafkaPartitionString); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 8b4fc8910c0c8d..7bd776df3eb832 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -40,6 +40,7 @@ import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.loadv2.LoadTask; +import org.apache.doris.load.routineload.kafka.KafkaConfiguration; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.EditLog; @@ -91,9 +92,9 @@ public void testAddJobByStmt(@Injectable AccessControllerManager accessManager, String typeName = LoadDataSourceType.KAFKA.name(); Map customProperties = Maps.newHashMap(); String topicName = "topic1"; - customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), topicName); String serverAddress = "http://127.0.0.1:8080"; - customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); + customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), serverAddress); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties, @@ -160,9 +161,9 @@ public void testCreateJobAuthDeny(@Injectable AccessControllerManager accessMana String typeName = LoadDataSourceType.KAFKA.name(); Map customProperties = Maps.newHashMap(); String topicName = "topic1"; - customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, topicName); + customProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), topicName); String serverAddress = "http://127.0.0.1:8080"; - customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, serverAddress); + customProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), serverAddress); CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, properties, typeName, customProperties, diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java index 7572cff364af42..a93b4b3a3d2832 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/AlterRoutineLoadOperationLogTest.java @@ -18,9 +18,10 @@ package org.apache.doris.persist; import org.apache.doris.analysis.CreateRoutineLoadStmt; -import org.apache.doris.analysis.RoutineLoadDataSourceProperties; import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.routineload.kafka.KafkaConfiguration; +import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import com.google.common.collect.Maps; import org.junit.Assert; @@ -49,13 +50,13 @@ public void testSerializeAlterRoutineLoadOperationLog() throws IOException, User Map jobProperties = Maps.newHashMap(); jobProperties.put(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY, "5"); - String typeName = "kafka"; Map dataSourceProperties = Maps.newHashMap(); - dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "0, 1"); - dataSourceProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "10000, 20000"); + dataSourceProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), "0, 1"); + dataSourceProperties.put(KafkaConfiguration.KAFKA_OFFSETS.getName(), "10000, 20000"); dataSourceProperties.put("property.group.id", "mygroup"); - RoutineLoadDataSourceProperties routineLoadDataSourceProperties = new RoutineLoadDataSourceProperties(typeName, - dataSourceProperties, true); + KafkaDataSourceProperties routineLoadDataSourceProperties = new KafkaDataSourceProperties( + dataSourceProperties); + routineLoadDataSourceProperties.setAlter(true); routineLoadDataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE); routineLoadDataSourceProperties.analyze(); @@ -71,14 +72,15 @@ public void testSerializeAlterRoutineLoadOperationLog() throws IOException, User AlterRoutineLoadJobOperationLog log2 = AlterRoutineLoadJobOperationLog.read(in); Assert.assertEquals(1, log2.getJobProperties().size()); Assert.assertEquals("5", log2.getJobProperties().get(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY)); - Assert.assertEquals("", log2.getDataSourceProperties().getKafkaBrokerList()); - Assert.assertEquals("", log2.getDataSourceProperties().getKafkaTopic()); - Assert.assertEquals(1, log2.getDataSourceProperties().getCustomKafkaProperties().size()); - Assert.assertEquals("mygroup", log2.getDataSourceProperties().getCustomKafkaProperties().get("group.id")); + KafkaDataSourceProperties kafkaDataSourceProperties = (KafkaDataSourceProperties) log2.getDataSourceProperties(); + Assert.assertEquals(null, kafkaDataSourceProperties.getBrokerList()); + Assert.assertEquals(null, kafkaDataSourceProperties.getTopic()); + Assert.assertEquals(1, kafkaDataSourceProperties.getCustomKafkaProperties().size()); + Assert.assertEquals("mygroup", kafkaDataSourceProperties.getCustomKafkaProperties().get("group.id")); Assert.assertEquals(routineLoadDataSourceProperties.getKafkaPartitionOffsets().get(0), - log2.getDataSourceProperties().getKafkaPartitionOffsets().get(0)); + kafkaDataSourceProperties.getKafkaPartitionOffsets().get(0)); Assert.assertEquals(routineLoadDataSourceProperties.getKafkaPartitionOffsets().get(1), - log2.getDataSourceProperties().getKafkaPartitionOffsets().get(1)); + kafkaDataSourceProperties.getKafkaPartitionOffsets().get(1)); in.close(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/persist/DataSourcePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/persist/DataSourcePropertiesTest.java new file mode 100644 index 00000000000000..ed4852ee6a2c0d --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/persist/DataSourcePropertiesTest.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist; + +import org.apache.doris.common.UserException; +import org.apache.doris.common.io.Text; +import org.apache.doris.load.routineload.AbstractDataSourceProperties; +import org.apache.doris.load.routineload.kafka.KafkaConfiguration; +import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.common.collect.Maps; +import com.google.gson.JsonParseException; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; +import java.util.Map; + +public class DataSourcePropertiesTest { + + @Test + public void testKafkaDataSourceSerializatio() throws IOException, UserException { + Map dataSourceProperties = Maps.newHashMap(); + dataSourceProperties.put(KafkaConfiguration.KAFKA_TOPIC.getName(), "mytopic"); + dataSourceProperties.put(KafkaConfiguration.KAFKA_PARTITIONS.getName(), "0, 1"); + dataSourceProperties.put(KafkaConfiguration.KAFKA_OFFSETS.getName(), "10000, 20000"); + dataSourceProperties.put(KafkaConfiguration.KAFKA_BROKER_LIST.getName(), "127.0.0.1:9080"); + dataSourceProperties.put("property.group.id", "mygroup"); + KafkaDataSourceProperties kafkaDataSourceProperties = new KafkaDataSourceProperties(dataSourceProperties); + kafkaDataSourceProperties.convertAndCheckDataSourceProperties(); + File file = new File("./kafka_datasource_properties"); + file.createNewFile(); + try (DataOutputStream out = new DataOutputStream(Files.newOutputStream(file.toPath()))) { + String json = GsonUtils.GSON.toJson(kafkaDataSourceProperties); + Text.writeString(out, json); + out.flush(); + } + try (DataInputStream dis = new DataInputStream(Files.newInputStream(file.toPath()))) { + String json = Text.readString(dis); + KafkaDataSourceProperties properties = (KafkaDataSourceProperties) + GsonUtils.GSON.fromJson(json, AbstractDataSourceProperties.class); + Assertions.assertEquals(properties.getTopic(), + dataSourceProperties.get(KafkaConfiguration.KAFKA_TOPIC.getName())); + } + } + + @Test(expected = JsonParseException.class) + public void testNotSupportDataSourceSerializatio() throws IOException { + TestDataSourceProperties testDataSourceProperties = new TestDataSourceProperties(Maps.newHashMap()); + File file = new File("./test_datasource_properties"); + + file.createNewFile(); + try (DataOutputStream out = new DataOutputStream(Files.newOutputStream(file.toPath()))) { + String json = GsonUtils.GSON.toJson(testDataSourceProperties); + Text.writeString(out, json); + out.flush(); + } + + try (DataInputStream dis = new DataInputStream(Files.newInputStream(file.toPath()))) { + String json = Text.readString(dis); + GsonUtils.GSON.fromJson(json, AbstractDataSourceProperties.class); + } + } + + class TestDataSourceProperties extends AbstractDataSourceProperties { + + public TestDataSourceProperties(Map originalDataSourceProperties) { + super(originalDataSourceProperties); + } + + @Override + protected String getDataSourceType() { + return "test"; + } + + @Override + protected List getRequiredProperties() throws UserException { + return null; + } + + @Override + public void convertAndCheckDataSourceProperties() throws UserException { + + } + + } +}