Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](routineload)Refactored routineload to improve scalability #19834

Merged
merged 10 commits into from
May 23, 2023
9 changes: 4 additions & 5 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,7 @@ nonterminal ExplainOptions opt_explain_options;
nonterminal Boolean opt_tmp;

nonterminal OutFileClause opt_outfile;
nonterminal RoutineLoadDataSourceProperties opt_datasource_properties;
nonterminal Map<String, String> opt_datasource_properties;

nonterminal Boolean opt_signed_unsigned;

Expand Down Expand Up @@ -1345,15 +1345,14 @@ alter_stmt ::=
opt_datasource_properties ::=
// empty
{:
RESULT = new RoutineLoadDataSourceProperties();
RESULT = new HashMap<String, String>();
:}
| KW_FROM ident:type LPAREN key_value_map:customProperties RPAREN
{:
// the 3rd parameter "true" means this is for AlterRoutineLoad operation.
RESULT = new RoutineLoadDataSourceProperties(type, customProperties, true);
Map<String, String> properties = new HashMap<String, String>(customProperties);
RESULT = properties;
:}
;

quantity ::=
INTEGER_LITERAL:number
{:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
import org.apache.doris.load.routineload.RoutineLoadJob;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import lombok.Getter;
import org.apache.commons.collections.MapUtils;

import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -62,17 +65,17 @@ public class AlterRoutineLoadStmt extends DdlStmt {

private final LabelName labelName;
private final Map<String, String> jobProperties;
private final RoutineLoadDataSourceProperties dataSourceProperties;
private final Map<String, String> dataSourceMapProperties;

// save analyzed job properties.
// analyzed data source properties are saved in dataSourceProperties.
private Map<String, String> analyzedJobProperties = Maps.newHashMap();

public AlterRoutineLoadStmt(LabelName labelName, Map<String, String> jobProperties,
RoutineLoadDataSourceProperties dataSourceProperties) {
Map<String, String> dataSourceProperties) {
this.labelName = labelName;
this.jobProperties = jobProperties != null ? jobProperties : Maps.newHashMap();
this.dataSourceProperties = dataSourceProperties;
this.dataSourceMapProperties = dataSourceProperties != null ? dataSourceProperties : Maps.newHashMap();
}

public String getDbName() {
Expand All @@ -88,13 +91,16 @@ public Map<String, String> getAnalyzedJobProperties() {
}

public boolean hasDataSourceProperty() {
return dataSourceProperties.hasAnalyzedProperties();
return MapUtils.isNotEmpty(dataSourceMapProperties);
}

public RoutineLoadDataSourceProperties getDataSourceProperties() {
return dataSourceProperties;
public Map<String, String> getDataSourceMapProperties() {
return dataSourceMapProperties;
}

@Getter
public AbstractDataSourceProperties dataSourceProperties;

@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
Expand All @@ -106,7 +112,7 @@ public void analyze(Analyzer analyzer) throws UserException {
// check data source properties
checkDataSourceProperties();

if (analyzedJobProperties.isEmpty() && !dataSourceProperties.hasAnalyzedProperties()) {
if (analyzedJobProperties.isEmpty() && MapUtils.isEmpty(dataSourceMapProperties)) {
throw new AnalysisException("No properties are specified");
}
}
Expand Down Expand Up @@ -200,13 +206,15 @@ private void checkJobProperties() throws UserException {
}

private void checkDataSourceProperties() throws UserException {
if (!FeConstants.runningUnitTest) {
RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
.checkPrivAndGetJob(getDbName(), getLabel());
dataSourceProperties.setTimezone(job.getTimezone());
} else {
dataSourceProperties.setTimezone(TimeUtils.DEFAULT_TIME_ZONE);
if (MapUtils.isEmpty(dataSourceMapProperties)) {
return;
}
RoutineLoadJob job = Env.getCurrentEnv().getRoutineLoadManager()
.getJob(getDbName(), getLabel());
this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
.createDataSource(job.getDataSourceType().name(), dataSourceMapProperties);
dataSourceProperties.setAlter(true);
dataSourceProperties.setTimezone(job.getTimezone());
dataSourceProperties.analyze();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.RoutineLoadDataSourcePropertyFactory;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.qe.ConnectContext;

Expand Down Expand Up @@ -104,20 +105,14 @@ public class CreateRoutineLoadStmt extends DdlStmt {
public static final String NUM_AS_STRING = "num_as_string";
public static final String FUZZY_PARSE = "fuzzy_parse";

// kafka type properties
public static final String KAFKA_BROKER_LIST_PROPERTY = "kafka_broker_list";
public static final String KAFKA_TOPIC_PROPERTY = "kafka_topic";
// optional
public static final String KAFKA_PARTITIONS_PROPERTY = "kafka_partitions";
public static final String KAFKA_OFFSETS_PROPERTY = "kafka_offsets";
public static final String KAFKA_DEFAULT_OFFSETS = "kafka_default_offsets";
public static final String KAFKA_ORIGIN_DEFAULT_OFFSETS = "kafka_origin_default_offsets";

private static final String NAME_TYPE = "ROUTINE LOAD NAME";
public static final String ENDPOINT_REGEX = "[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]";
public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism";
public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";

private AbstractDataSourceProperties dataSourceProperties;


private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(DESIRED_CONCURRENT_NUMBER_PROPERTY)
.add(MAX_ERROR_NUMBER_PROPERTY)
Expand All @@ -142,7 +137,6 @@ public class CreateRoutineLoadStmt extends DdlStmt {
private final List<ParseNode> loadPropertyList;
private final Map<String, String> jobProperties;
private final String typeName;
private final RoutineLoadDataSourceProperties dataSourceProperties;

// the following variables will be initialized after analyze
// -1 as unset, the default value will set in RoutineLoadJob
Expand Down Expand Up @@ -193,7 +187,8 @@ public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNo
this.loadPropertyList = loadPropertyList;
this.jobProperties = jobProperties == null ? Maps.newHashMap() : jobProperties;
this.typeName = typeName.toUpperCase();
this.dataSourceProperties = new RoutineLoadDataSourceProperties(this.typeName, dataSourceProperties, false);
this.dataSourceProperties = RoutineLoadDataSourcePropertyFactory
.createDataSource(typeName, dataSourceProperties);
this.mergeType = mergeType;
if (comment != null) {
this.comment = comment;
Expand Down Expand Up @@ -284,28 +279,12 @@ public String getJsonRoot() {
return jsonRoot;
}

public String getKafkaBrokerList() {
return this.dataSourceProperties.getKafkaBrokerList();
}

public String getKafkaTopic() {
return this.dataSourceProperties.getKafkaTopic();
}

public List<Pair<Integer, Long>> getKafkaPartitionOffsets() {
return this.dataSourceProperties.getKafkaPartitionOffsets();
}

public Map<String, String> getCustomKafkaProperties() {
return this.dataSourceProperties.getCustomKafkaProperties();
}

public LoadTask.MergeType getMergeType() {
return mergeType;
}

public boolean isOffsetsForTimes() {
return this.dataSourceProperties.isOffsetsForTimes();
public AbstractDataSourceProperties getDataSourceProperties() {
return dataSourceProperties;
}

public String getComment() {
Expand Down Expand Up @@ -474,9 +453,9 @@ private void checkJobProperties() throws UserException {
format = "json";
jsonPaths = jobProperties.getOrDefault(JSONPATHS, "");
jsonRoot = jobProperties.getOrDefault(JSONROOT, "");
stripOuterArray = Boolean.valueOf(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
numAsString = Boolean.valueOf(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
fuzzyParse = Boolean.valueOf(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
stripOuterArray = Boolean.parseBoolean(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
numAsString = Boolean.parseBoolean(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
fuzzyParse = Boolean.parseBoolean(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
} else {
throw new UserException("Format type is invalid. format=`" + format + "`");
}
Expand Down
Loading