Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay(dm): cancel when relay meet error to close goroutine (#6803) #6814

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dm/dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (h *realRelayHolder) run() {
h.setResult(nil) // clear previous result

r := h.relay.Process(h.ctx)
h.cancel()

h.setResult(&r)
for _, err := range r.Errors {
Expand Down
6 changes: 5 additions & 1 deletion dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,11 @@ func (r *Relay) handleEvents(
// 1. read events from upstream server
readTimer := time.Now()
rResult, err := reader2.GetEvent(ctx)
failpoint.Inject("RelayGetEventFailed", func(v failpoint.Value) {

failpoint.Inject("RelayGetEventFailed", func() {
err = errors.New("RelayGetEventFailed")
})
failpoint.Inject("RelayGetEventFailedAt", func(v failpoint.Value) {
if intVal, ok := v.(int); ok && intVal == eventIndex {
err = errors.New("fail point triggered")
_, gtid := r.meta.GTID()
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/duplicate_event/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ function run_with_prepared_source_config() {

# with a 5 rows insert txn: 1 * FormatDesc + 1 * PreviousGTID + 1 * GTID + 1 * BEGIN + 5 * (Table_map + Write_rows) + 1 * XID
# here we fail at the third write rows event, sync should retry and auto recover without any duplicate event
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=return(3);github.com/pingcap/tiflow/dm/relay/RelayAllowRetry=return"
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailedAt=return(3);github.com/pingcap/tiflow/dm/relay/RelayAllowRetry=return"

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
Expand Down
36 changes: 36 additions & 0 deletions dm/tests/new_relay/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,38 @@ function test_restart_relay_status() {
"bound" 2
}

function test_relay_leak() {
cleanup_process
cleanup_data $TEST_NAME
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=return()"

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/check-enable: false/d" $WORK_DIR/source1.yaml
sed -i "/checker:/d" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID1 worker1"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID1" \
"RelayGetEventFailed" 1

check_log_contain_with_retry 'dispatch auto resume relay' $WORK_DIR/worker1/log/dm-worker.log

count=$(curl "http://127.0.0.1:8262/debug/pprof/goroutine?debug=2" 2>/dev/null | grep -c doIntervalOps || true)
if [ $count -gt 1 ]; then
echo "relay goroutine leak detected, count expect 1 but got $count"
exit 1
fi
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_relay_leak passed"
}

function test_kill_dump_connection() {
cleanup_data $TEST_NAME
cleanup_process
Expand Down Expand Up @@ -202,6 +234,10 @@ function run() {
test_restart_relay_status
test_cant_dail_downstream
test_cant_dail_upstream
test_relay_leak

cleanup_process
cleanup_data $TEST_NAME

export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/ReportRelayLogSpaceInBackground=return(1)"

Expand Down