Skip to content

Commit

Permalink
fix(go-client): scan will not recover when encounter `error_invalid_s…
Browse files Browse the repository at this point in the history
…tate` (#1106)
  • Loading branch information
foreverneverer committed Aug 15, 2022
1 parent 744460a commit 6e6d8eb
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 10 deletions.
78 changes: 69 additions & 9 deletions go-client/pegasus/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -219,6 +220,10 @@ func TestPegasusTableConnector_ScanInclusive(t *testing.T) {
clearDatabase(t, tb)
}

func GetScannerRpcErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.GetScannerRequest) (*rrdb.ScanResponse, error) {
return nil, base.ERR_INVALID_STATE
}

func ScanRpcErrorForTest(_ *session.ReplicaSession, ctx context.Context, gpid *base.Gpid, partitionHash uint64, request *rrdb.ScanRequest) (*rrdb.ScanResponse, error) {
return nil, base.ERR_INVALID_STATE
}
Expand Down Expand Up @@ -270,28 +275,83 @@ func TestPegasusTableConnector_ScanFailRecover(t *testing.T) {
}
assert.Equal(t, 1, successCount)

// test rpc error
mockRpcFailedErrorTable, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
scanner, _ = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
rpcFailedMocked := false
// test getScanner rpc error
scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
assert.Nil(t, err)
rpcGetScannerFailedMocked := false
recallGetScanner := true
var getScannerFailedMock *gomonkey.Patches
successCount = 0
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
complete, _, _, _, error := scanner.Next(ctx)
// mock rpc error, follow request will be recovered automatically
if !rpcFailedMocked {
mock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanRpcErrorForTest)
rpcFailedMocked = true
if recallGetScanner && rpcGetScannerFailedMocked { // GetScannerFailedMocked = true, recall GetScanner to trigger the error when execute scanner.Next
scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
assert.Nil(t, err)
}
complete, _, _, _, errNext := scanner.Next(ctx)
if !rpcGetScannerFailedMocked { // mock replicaSession.GetScanner rpc error, the next loop request will be failed
getScannerFailedMock = gomonkey.ApplyMethod(reflect.TypeOf(session), "GetScanner", GetScannerRpcErrorForTest)
rpcGetScannerFailedMocked = true
}
cancel()
if complete {
break
}

if errNext == nil {
successCount++
continue
}
// error encounter ERR_INVALID_STATE and auto-trigger re-config that means rpcGetScannerFailedMocked can be reset
if strings.Contains(errNext.Error(), "ERR_INVALID_STATE") &&
strings.Contains(errNext.Error(), "updateConfig=true") {
getScannerFailedMock.Reset()
recallGetScanner = false
} else if strings.Contains(errNext.Error(), "recover after next loop") {
continue
} else {
mock.Reset()
break
}
}
// since re-call once getScanner, so the successCount = 100 + 1
assert.Equal(t, 101, successCount)

// test scan rpc error
getScannerFailedMock.Reset()
rpcScanFailedMocked := false
var scanFailedMock *gomonkey.Patches
successCount = 0
scanner, err = mockRpcFailedErrorTable.GetScanner(context.Background(), []byte("h1"), []byte(""), []byte(""), opts)
assert.Nil(t, err)
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*500)
complete, _, _, _, errNext := scanner.Next(ctx)
if !rpcScanFailedMocked { // mock scan rpc error, the next loop request will be failed but recovered automatically
scanFailedMock = gomonkey.ApplyMethod(reflect.TypeOf(session), "Scan", ScanRpcErrorForTest)
rpcScanFailedMocked = true
}
cancel()
if complete {
break
}
if error == nil {

if errNext == nil {
successCount++
continue
}

// error encounter ERR_INVALID_STATE and auto-trigger re-config that means rpcGetScannerFailedMocked can be reset
if strings.Contains(errNext.Error(), "ERR_INVALID_STATE") &&
strings.Contains(errNext.Error(), "updateConfig=true") {
scanFailedMock.Reset()
} else if strings.Contains(errNext.Error(), "recover after next loop") {
continue
} else {
break
}
}
assert.Equal(t, 100, successCount)
Expand Down
8 changes: 7 additions & 1 deletion go-client/pegasus/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,13 @@ func (p *pegasusScanner) startScanPartition(ctx context.Context) (completed bool

part := p.table.getPartitionByGpid(p.curGpid)
response, err := part.GetScanner(ctx, p.curGpid, p.curHash, request)

if err != nil {
p.batchStatus = batchRpcError
if updateConfig, _, errHandler := p.table.handleReplicaError(err, part); errHandler != nil {
err = fmt.Errorf("scan failed, error = %s, try resolve it(updateConfig=%v), result = %s", err, updateConfig, errHandler)
}
return
}
err = p.onRecvScanResponse(response, err)
if err == nil {
return p.doNext(ctx)
Expand Down

0 comments on commit 6e6d8eb

Please sign in to comment.