Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay(dm): cancel when relay meet error to close goroutine #6803

Merged
merged 5 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,11 @@ func (r *Relay) handleEvents(
// 1. read events from upstream server
readTimer := time.Now()
rResult, err := reader2.GetEvent(ctx)
failpoint.Inject("RelayGetEventFailed", func(v failpoint.Value) {

failpoint.Inject("RelayGetEventFailed", func() {
err = errors.New("RelayGetEventFailed")
})
failpoint.Inject("RelayGetEventFailedAt", func(v failpoint.Value) {
if intVal, ok := v.(int); ok && intVal == eventIndex {
err = errors.New("fail point triggered")
_, gtid := r.meta.GTID()
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/duplicate_event/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ function run_with_prepared_source_config() {

# with a 5 rows insert txn: 1 * FormatDesc + 1 * PreviousGTID + 1 * GTID + 1 * BEGIN + 5 * (Table_map + Write_rows) + 1 * XID
# here we fail at the third write rows event, sync should retry and auto recover without any duplicate event
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=return(3);github.com/pingcap/tiflow/dm/relay/RelayAllowRetry=return"
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailedAt=return(3);github.com/pingcap/tiflow/dm/relay/RelayAllowRetry=return"

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
Expand Down
33 changes: 33 additions & 0 deletions dm/tests/new_relay/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,38 @@ function test_restart_relay_status() {
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_restart_relay_status passed"
}

function test_relay_leak() {
cleanup_process
cleanup_data $TEST_NAME
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/RelayGetEventFailed=return()"

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/check-enable: false/d" $WORK_DIR/source1.yaml
sed -i "/checker:/d" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID1 worker1"

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID1" \
"RelayGetEventFailed" 1

check_log_contain_with_retry 'dispatch auto resume relay' $WORK_DIR/worker1/log/dm-worker.log

count=$(curl "http://127.0.0.1:8262/debug/pprof/goroutine?debug=2" 2>/dev/null | grep -c doIntervalOps || true)
if [ $count -gt 1 ]; then
echo "relay goroutine leak detected, count expect 1 but got $count"
exit 1
fi
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_cant_dail_upstream passed"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_cant_dail_upstream? Is it another test?

lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}

function test_cant_dail_upstream() {
cleanup_process
cleanup_data $TEST_NAME
Expand Down Expand Up @@ -356,6 +388,7 @@ function test_relay_operations() {
}

function run() {
test_relay_leak
test_relay_operations
test_cant_dail_upstream
test_restart_relay_status
Expand Down
1 change: 1 addition & 0 deletions dm/worker/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (h *realRelayHolder) run() {
h.setResult(nil) // clear previous result

r := h.relay.Process(h.ctx)
h.cancel()

h.setResult(&r)
for _, err := range r.Errors {
Expand Down