Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: add savepoint integration tests #7378

Merged
merged 11 commits into from
Oct 18, 2022
29 changes: 29 additions & 0 deletions tests/integration_tests/savepoint/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/tidb_cdc_test/savepoint/sync_diff/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["savepoint.?*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""

[data-sources.tidb0]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
161 changes: 161 additions & 0 deletions tests/integration_tests/savepoint/data/prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
drop database if exists `savepoint`;
create database `savepoint`;
use `savepoint`;

create table t(id int key, a int, index idx(a));

-- Test savepoint in optimistic transaction.

-- Check savepoint with same name will overwrite the old
begin optimistic;
savepoint s1;
insert into t values (1, 1);
savepoint s1;
insert into t values (2, 2);
rollback to s1;
commit;
select * from t order by id;

-- Check rollback to savepoint will delete the later savepoint.
delete from t;
begin optimistic;
insert into t values (1, 1);
savepoint s1;
insert into t values (2, 2);
savepoint s2;
insert into t values (3, 3);
savepoint s3;
rollback to s2;
commit;

-- Check rollback to savepoint will rollback insert.
delete from t;
begin optimistic;
insert into t values (1, 1), (2, 2);
savepoint s1;
insert into t values (3, 3);
rollback to s1;
insert into t values (3, 5);
commit;

-- Check rollback to savepoint will rollback insert onduplicate update.
delete from t;
insert into t values (1, 1);
begin optimistic;
insert into t values (2, 2);
savepoint s1;
insert into t values (1, 1), (2, 2), (3, 3) on duplicate key update a=a+1;
rollback to s1;
commit;
insert into t values (3, 3);

-- Check rollback to savepoint will rollback update.
delete from t;
begin optimistic;
insert into t values (1, 1), (2, 2);
savepoint s1;
update t set a=a+1 where id = 1;
rollback to s1;
update t set a=a+1 where id = 2;
commit;
update t set a=a+1 where id in (1, 2);

-- Check rollback to savepoint will rollback update.
delete from t;
insert into t values (1, 1), (2, 2);
begin optimistic;
insert into t values (3, 3);
update t set a=a+1 where id in (1, 3);
savepoint s1;
update t set a=a+1 where id in (2, 3);
rollback to s1;
commit;

-- Check rollback to savepoint will rollback delete.
delete from t;
insert into t values (1, 1), (2, 2);
begin optimistic;
insert into t values (3, 3);
savepoint s1;
delete from t where id in (1, 3);
rollback to s1;
commit;

-- Test savepoint in pessimistic transaction.
delete from t;

begin pessimistic;
savepoint s1;
insert into t values (1, 1);
savepoint s1;
insert into t values (2, 2);
rollback to s1;
commit;

-- Check rollback to savepoint will delete the later savepoint.
delete from t;
begin pessimistic;
insert into t values (1, 1);
savepoint s1;
insert into t values (2, 2);
savepoint s2;
insert into t values (3, 3);
savepoint s3;
rollback to s2;
commit;

-- Check rollback to savepoint will rollback insert.
delete from t;
begin pessimistic;
insert into t values (1, 1), (2, 2);
savepoint s1;
insert into t values (3, 3);
rollback to s1;
insert into t values (3, 5);
commit;

-- Check rollback to savepoint will rollback insert onduplicate update and release lock.
delete from t;
insert into t values (1, 1);
begin pessimistic;
insert into t values (2, 2);
savepoint s1;
insert into t values (1, 1), (2, 2), (3, 3) on duplicate key update a=a+1;
rollback to s1;
commit;
insert into t values (3, 3);

-- Check rollback to savepoint will rollback update.
delete from t;
begin pessimistic;
insert into t values (1, 1), (2, 2);
savepoint s1;
update t set a=a+1 where id = 1;
rollback to s1;
update t set a=a+1 where id = 2;
commit;
update t set a=a+1 where id in (1, 2);

-- Check rollback to savepoint will rollback update.
delete from t;
insert into t values (1, 1), (2, 2);
begin pessimistic;
insert into t values (3, 3);
update t set a=a+1 where id in (1, 3);
savepoint s1;
update t set a=a+1 where id in (2, 3);
rollback to s1;
commit;

-- Check rollback to savepoint will rollback delete.
delete from t;
insert into t values (1, 1), (2, 2);
begin pessimistic;
insert into t values (3, 3);
savepoint s1;
delete from t where id in (1, 3);
rollback to s1;
commit;

create table finish_mark (id int PRIMARY KEY);

43 changes: 43 additions & 0 deletions tests/integration_tests/savepoint/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

TOPIC_NAME="ticdc-savepoint-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
*) SINK_URI="mysql://normal:123456@127.0.0.1:3306/" ;;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
fi
run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists savepoint.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"