Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-2.0-pick](partial update) Pick some PRs about partial update #28844

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
Expand Down Expand Up @@ -144,6 +145,8 @@ public class NativeInsertStmt extends InsertStmt {

private InsertType insertType = InsertType.NATIVE_INSERT;

boolean hasEmptyTargetColumns = false;

enum InsertType {
NATIVE_INSERT("insert_"),
UPDATE("update_"),
Expand Down Expand Up @@ -475,11 +478,7 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
Set<String> mentionedColumns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
List<String> realTargetColumnNames;
if (targetColumnNames == null) {
if (!isFromDeleteOrUpdateStmt
&& analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
throw new AnalysisException("You must explicitly specify the columns to be updated when "
+ "updating partial columns using the INSERT statement.");
}
hasEmptyTargetColumns = true;
// the mentioned columns are columns which are visible to user, so here we use
// getBaseSchema(), not getFullSchema()
for (Column col : targetTable.getBaseSchema(false)) {
Expand Down Expand Up @@ -581,16 +580,16 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException {
}
}

if (analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
trySetPartialUpdate();
}

// check if size of select item equal with columns mentioned in statement
if (mentionedColumns.size() != queryStmt.getResultExprs().size()) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_COUNT,
mentionedColumns.size(), queryStmt.getResultExprs().size());
}

if (analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
trySetPartialUpdate();
}

// Check if all columns mentioned is enough
checkColumnCoverage(mentionedColumns, targetTable.getBaseSchema());

Expand Down Expand Up @@ -941,7 +940,7 @@ public void complete() throws UserException {
throw new DdlException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes((OlapTable) targetTable);
if (!isFromDeleteOrUpdateStmt && isPartialUpdate) {
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
}
}
Expand Down Expand Up @@ -1025,8 +1024,15 @@ private void trySetPartialUpdate() throws UserException {
return;
}
OlapTable olapTable = (OlapTable) targetTable;
if (olapTable.getKeysType() != KeysType.UNIQUE_KEYS) {
return;
}
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("Partial update is only allowed in unique table with merge-on-write enabled.");
throw new UserException("Partial update is only allowed on unique table with merge-on-write enabled.");
}
if (hasEmptyTargetColumns) {
throw new AnalysisException("You must explicitly specify the columns to be updated when "
+ "updating partial columns using the INSERT statement.");
}
for (Column col : olapTable.getFullSchema()) {
boolean exists = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
throw new DdlException("txn does not exist: " + txn.getTxnId());
}
state.addTableIndexes(physicalOlapTableSink.getTargetTable());
if (physicalOlapTableSink.isFromNativeInsertStmt() && physicalOlapTableSink.isPartialUpdate()) {
if (physicalOlapTableSink.isPartialUpdate()) {
state.setSchemaForPartialUpdate(physicalOlapTableSink.getTargetTable());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,19 @@
3 3 3 2 3
4 4 4 1 2

-- !sql --
10000 2017-10-01 2017-10-01T08:00:05 北京 20 0 2017-10-01T06:00 20 10 10
10000 2017-10-01 2017-10-01T09:00:05 北京 20 0 2017-10-01T07:00 15 2 2

-- !sql --
10000 2017-10-01 2017-10-01T08:00:05 北京 20 0 2017-10-01T06:00 20 10 10
10000 2017-10-01 2017-10-01T09:00:05 北京 20 0 2017-10-01T07:00 15 2 2

-- !sql --
10000 2017-10-01 2017-10-01T08:00:05 北京 20 0 2017-10-01T06:00 20 10 10
10000 2017-10-01 2017-10-01T09:00:05 北京 20 0 2017-10-01T07:00 15 2 2

-- !sql --
10000 2017-10-01 2017-10-01T08:00:05 北京 20 0 2017-10-01T06:00 20 10 10
10000 2017-10-01 2017-10-01T09:00:05 北京 20 0 2017-10-01T07:00 15 2 2

Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,96 @@ suite("test_partial_update_native_insert_stmt", "p0") {
sql "sync;"
}
}

// test that session variable `enable_unique_key_partial_update` will only affect unique tables
for (def use_nerieds : [true, false]) {
logger.info("current params: use_nerieds: ${use_nerieds}")
if (use_nerieds) {
sql "set enable_nereids_planner=true;"
sql "set enable_nereids_dml=true;"
sql "set enable_fallback_to_original_planner=false;"
sql "sync;"
} else {
sql "set enable_nereids_planner=false;"
sql "set enable_nereids_dml=false;"
sql "sync;"
}

sql "set enable_unique_key_partial_update=true;"
sql "sync;"

def tableName8 = "test_partial_update_native_insert_stmt_agg_${use_nerieds}"
sql """ DROP TABLE IF EXISTS ${tableName8}; """
sql """ CREATE TABLE IF NOT EXISTS ${tableName8} (
`user_id` LARGEINT NOT NULL,
`date` DATE NOT NULL,
`timestamp` DATETIME NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT,
`sex` TINYINT,
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00",
`cost` BIGINT SUM DEFAULT "0",
`max_dwell_time` INT MAX DEFAULT "0",
`min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)AGGREGATE KEY(`user_id`, `date`, `timestamp` ,`city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");"""

sql """insert into ${tableName8} values
(10000,"2017-10-01","2017-10-01 08:00:05","北京",20,0,"2017-10-01 06:00:00",20,10,10),
(10000,"2017-10-01","2017-10-01 09:00:05","北京",20,0,"2017-10-01 07:00:00",15,2,2); """
qt_sql "select * from ${tableName8} order by user_id;"


def tableName9 = "test_partial_update_native_insert_stmt_dup_${use_nerieds}"
sql """ DROP TABLE IF EXISTS ${tableName9}; """
sql """ CREATE TABLE IF NOT EXISTS ${tableName9} (
`user_id` LARGEINT NOT NULL,
`date` DATE NOT NULL,
`timestamp` DATETIME NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT,
`sex` TINYINT,
`last_visit_date` DATETIME DEFAULT "1970-01-01 00:00:00",
`cost` BIGINT DEFAULT "0",
`max_dwell_time` INT DEFAULT "0",
`min_dwell_time` INT DEFAULT "99999" COMMENT "用户最小停留时间"
)DUPLICATE KEY(`user_id`, `date`, `timestamp` ,`city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1");"""

sql """insert into ${tableName9} values
(10000,"2017-10-01","2017-10-01 08:00:05","北京",20,0,"2017-10-01 06:00:00",20,10,10),
(10000,"2017-10-01","2017-10-01 09:00:05","北京",20,0,"2017-10-01 07:00:00",15,2,2); """
qt_sql "select * from ${tableName9} order by user_id;"


def tableName10 = "test_partial_update_native_insert_stmt_mor_${use_nerieds}"
sql """ DROP TABLE IF EXISTS ${tableName10}; """
sql """ CREATE TABLE IF NOT EXISTS ${tableName10} (
`user_id` LARGEINT NOT NULL,
`date` DATE NOT NULL,
`timestamp` DATETIME NOT NULL,
`city` VARCHAR(20),
`age` SMALLINT,
`sex` TINYINT,
`last_visit_date` DATETIME DEFAULT "1970-01-01 00:00:00",
`cost` BIGINT DEFAULT "0",
`max_dwell_time` INT DEFAULT "0",
`min_dwell_time` INT DEFAULT "99999" COMMENT "用户最小停留时间"
)UNIQUE KEY(`user_id`, `date`, `timestamp` ,`city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1", "enable_unique_key_merge_on_write" = "false");"""

test {
sql """insert into ${tableName10} values
(10000,"2017-10-01","2017-10-01 08:00:05","北京",20,0,"2017-10-01 06:00:00",20,10,10),
(10000,"2017-10-01","2017-10-01 09:00:05","北京",20,0,"2017-10-01 07:00:00",15,2,2); """
exception "Partial update is only allowed on unique table with merge-on-write enabled"
}

sql """ DROP TABLE IF EXISTS ${tableName8}; """
sql """ DROP TABLE IF EXISTS ${tableName9}; """
sql """ DROP TABLE IF EXISTS ${tableName10}; """
}
}