From b59366742ab9621e1811448d22698eba33d166b9 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 19 Aug 2022 11:27:46 +0800 Subject: [PATCH 1/3] relay(dm): cancel when relay meet error to close goroutine Signed-off-by: lance6716 --- dm/relay/relay.go | 7 ++++++- dm/tests/duplicate_event/run.sh | 2 +- dm/tests/new_relay/run.sh | 33 +++++++++++++++++++++++++++++++++ dm/worker/relay.go | 1 + 4 files changed, 41 insertions(+), 2 deletions(-) diff --git a/dm/relay/relay.go b/dm/relay/relay.go index d8649b61498..8cb25ec4ebf 100644 --- a/dm/relay/relay.go +++ b/dm/relay/relay.go @@ -574,7 +574,12 @@ func (r *Relay) handleEvents( // 1. read events from upstream server readTimer := time.Now() rResult, err := reader2.GetEvent(ctx) - failpoint.Inject("RelayGetEventFailed", func(v failpoint.Value) { + // TODO: inject an error here for test + + failpoint.Inject("RelayGetEventFailed", func() { + err = errors.New("RelayGetEventFailed") + }) + failpoint.Inject("RelayGetEventFailedAt", func(v failpoint.Value) { if intVal, ok := v.(int); ok && intVal == eventIndex { err = errors.New("fail point triggered") _, gtid := r.meta.GTID() diff --git a/dm/tests/duplicate_event/run.sh b/dm/tests/duplicate_event/run.sh index 4274df9bc87..03d4d313126 100644 --- a/dm/tests/duplicate_event/run.sh +++ b/dm/tests/duplicate_event/run.sh @@ -41,7 +41,7 @@ function run_with_prepared_source_config() { # with a 5 rows insert txn: 1 * FormatDesc + 1 * PreviousGTID + 1 * GTID + 1 * BEGIN + 5 * (Table_map + Write_rows) + 1 * XID # here we fail at the third write rows event, sync should retry and auto recover without any duplicate event - export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=return(3);github.com/pingcap/tiflow/dm/relay/RelayAllowRetry=return" + export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailedAt=return(3);github.com/pingcap/tiflow/dm/relay/RelayAllowRetry=return" run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT diff --git a/dm/tests/new_relay/run.sh b/dm/tests/new_relay/run.sh index 79e8c513505..e9b7dbb4297 100755 --- a/dm/tests/new_relay/run.sh +++ b/dm/tests/new_relay/run.sh @@ -90,6 +90,38 @@ function test_restart_relay_status() { echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_restart_relay_status passed" } +function test_relay_leak() { + cleanup_process + cleanup_data $TEST_NAME + export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=return()" + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + sed -i "/check-enable: false/d" $WORK_DIR/source1.yaml + sed -i "/checker:/d" $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "RelayGetEventFailed" 1 + + check_log_contain_with_retry 'dispatch auto resume relay' $WORK_DIR/worker1/log/dm-worker.log + + count=`curl "http://127.0.0.1:8262/debug/pprof/goroutine?debug=2" 2>/dev/null| grep -c doIntervalOps || true` + if [ $count -gt 1 ]; then + echo "relay goroutine leak detected, count expect 1 but got $count" + exit 1 + fi + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_cant_dail_upstream passed" +} + function test_cant_dail_upstream() { cleanup_process cleanup_data $TEST_NAME @@ -356,6 +388,7 @@ function test_relay_operations() { } function run() { + test_relay_leak test_relay_operations test_cant_dail_upstream test_restart_relay_status diff --git a/dm/worker/relay.go b/dm/worker/relay.go index 93b8b906261..551023c0457 100644 --- a/dm/worker/relay.go +++ b/dm/worker/relay.go @@ -138,6 +138,7 @@ func (h *realRelayHolder) run() { h.setResult(nil) // clear previous result r := h.relay.Process(h.ctx) + h.cancel() h.setResult(&r) for _, err := range r.Errors { From 635418ae4a3a16e40255b86b38a97b8f9c4108a1 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 19 Aug 2022 11:29:49 +0800 Subject: [PATCH 2/3] fix lint Signed-off-by: lance6716 --- dm/relay/relay.go | 1 - dm/tests/new_relay/run.sh | 12 ++++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/dm/relay/relay.go b/dm/relay/relay.go index 8cb25ec4ebf..33f6c448504 100644 --- a/dm/relay/relay.go +++ b/dm/relay/relay.go @@ -574,7 +574,6 @@ func (r *Relay) handleEvents( // 1. read events from upstream server readTimer := time.Now() rResult, err := reader2.GetEvent(ctx) - // TODO: inject an error here for test failpoint.Inject("RelayGetEventFailed", func() { err = errors.New("RelayGetEventFailed") diff --git a/dm/tests/new_relay/run.sh b/dm/tests/new_relay/run.sh index e9b7dbb4297..0809aed9f37 100755 --- a/dm/tests/new_relay/run.sh +++ b/dm/tests/new_relay/run.sh @@ -114,11 +114,11 @@ function test_relay_leak() { check_log_contain_with_retry 'dispatch auto resume relay' $WORK_DIR/worker1/log/dm-worker.log - count=`curl "http://127.0.0.1:8262/debug/pprof/goroutine?debug=2" 2>/dev/null| grep -c doIntervalOps || true` - if [ $count -gt 1 ]; then - echo "relay goroutine leak detected, count expect 1 but got $count" - exit 1 - fi + count=$(curl "http://127.0.0.1:8262/debug/pprof/goroutine?debug=2" 2>/dev/null | grep -c doIntervalOps || true) + if [ $count -gt 1 ]; then + echo "relay goroutine leak detected, count expect 1 but got $count" + exit 1 + fi echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_cant_dail_upstream passed" } @@ -388,7 +388,7 @@ function test_relay_operations() { } function run() { - test_relay_leak + test_relay_leak test_relay_operations test_cant_dail_upstream test_restart_relay_status From 0746319e57df7fa111913e973fa2d8f4df140615 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 19 Aug 2022 14:17:15 +0800 Subject: [PATCH 3/3] Update dm/tests/new_relay/run.sh --- dm/tests/new_relay/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/tests/new_relay/run.sh b/dm/tests/new_relay/run.sh index 0809aed9f37..fcd5c93dcfe 100755 --- a/dm/tests/new_relay/run.sh +++ b/dm/tests/new_relay/run.sh @@ -119,7 +119,7 @@ function test_relay_leak() { echo "relay goroutine leak detected, count expect 1 but got $count" exit 1 fi - echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_cant_dail_upstream passed" + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_relay_leak passed" } function test_cant_dail_upstream() {