Skip to content

Commit

Permalink
tests/robustness: unlock Delete/LeaseRevoke ops
Browse files Browse the repository at this point in the history
We should return token to that bucket if `nonUniqueWriteLimiter.Take()`
return true. After unlock Delete/LeaseRevoke ops, the model should be
updated for replay function. There are two updates for `toWatchEvents`.

1. When leaveRevokes op has deleted few keys, we should generate
   `delete-operation` events based on alphabetical order of deleted
   keys.
2. When putWithLease op hits non-exist lease, we should ignore that
   update event.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
  • Loading branch information
fuweid committed Jun 6, 2024
1 parent a0aee63 commit a326ce1
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 35 deletions.
2 changes: 2 additions & 0 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package robustness

import (
"context"
"math/rand"
"testing"
"time"

Expand All @@ -36,6 +37,7 @@ import (
var testRunner = framework.E2eTestRunner

func TestMain(m *testing.M) {
rand.Seed(time.Now().UnixNano())
testRunner.TestMain(m)
}

Expand Down
90 changes: 60 additions & 30 deletions tests/robustness/model/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package model

import (
"fmt"
"sort"
"strings"
)

Expand All @@ -29,7 +30,7 @@ func NewReplay(persistedRequests []EtcdRequest) *EtcdReplay {
if state.Revision != newState.Revision {
revisionToEtcdState = append(revisionToEtcdState, newState)
}
for _, e := range toWatchEvents(&state, request, response) {
for _, e := range toWatchEvents(&state, &newState, request, response) {
events = append(events, e)
}
state = newState
Expand Down Expand Up @@ -62,45 +63,74 @@ func (r *EtcdReplay) EventsForWatch(watch WatchRequest) (events []PersistedEvent
return events
}

func toWatchEvents(prevState *EtcdState, request EtcdRequest, response MaybeEtcdResponse) (events []PersistedEvent) {
if request.Type != Txn || response.Error != "" {
func toWatchEvents(prevState *EtcdState, newState *EtcdState, request EtcdRequest, response MaybeEtcdResponse) (events []PersistedEvent) {
if response.Error != "" {
return events
}
var ops []EtcdOperation
if response.Txn.Failure {
ops = request.Txn.OperationsOnFailure
} else {
ops = request.Txn.OperationsOnSuccess
}
for _, op := range ops {
switch op.Type {
case RangeOperation:
case DeleteOperation:
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Delete.Key,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Delete.Key]; ok {

switch request.Type {
case Txn:
var ops []EtcdOperation
if response.Txn.Failure {
ops = request.Txn.OperationsOnFailure
} else {
ops = request.Txn.OperationsOnSuccess
}
for _, op := range ops {
switch op.Type {
case RangeOperation:
case DeleteOperation:
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Delete.Key,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Delete.Key]; ok {
events = append(events, e)
}
case PutOperation:
_, leaseExists := prevState.Leases[op.Put.LeaseID]
if op.Put.LeaseID != 0 && !leaseExists {
break
}

e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Put.Key,
Value: op.Put.Value,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Put.Key]; !ok {
e.IsCreate = true
}
events = append(events, e)
default:
panic(fmt.Sprintf("unsupported operation type: %v", op))
}
}
case LeaseRevoke:
deletedKeys := []string{}
for key := range prevState.KeyValues {
if _, ok := newState.KeyValues[key]; ok {
continue
}
case PutOperation:
deletedKeys = append(deletedKeys, key)
}
sort.Strings(deletedKeys)

for _, key := range deletedKeys {
e := PersistedEvent{
Event: Event{
Type: op.Type,
Key: op.Put.Key,
Value: op.Put.Value,
Type: DeleteOperation,
Key: key,
},
Revision: response.Revision,
}
if _, ok := prevState.KeyValues[op.Put.Key]; !ok {
e.IsCreate = true
}
events = append(events, e)
default:
panic(fmt.Sprintf("unsupported operation type: %v", op))
}
}
return events
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/report/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) {
case raftReq.LeaseRevoke != nil:
return &model.EtcdRequest{
Type: model.LeaseRevoke,
LeaseRevoke: &model.LeaseRevokeRequest{LeaseID: raftReq.LeaseGrant.ID},
LeaseRevoke: &model.LeaseRevokeRequest{LeaseID: raftReq.LeaseRevoke.ID},
}, nil
case raftReq.LeaseGrant != nil:
return &model.EtcdRequest{
Expand Down
6 changes: 4 additions & 2 deletions tests/robustness/traffic/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,20 @@ func (t etcdTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter
return
default:
}
shouldReturn := false

// Avoid multiple failed writes in a row
if lastOperationSucceeded {
choices := t.requests
if !nonUniqueWriteLimiter.Take() {
if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn {
choices = filterOutNonUniqueEtcdWrites(choices)
}
requestType = pickRandom(choices)
} else {
requestType = Get
}
rev, err := client.Request(ctx, requestType, lastRev)
if requestType == Delete || requestType == LeaseRevoke {
if shouldReturn {
nonUniqueWriteLimiter.Return()
}
lastOperationSucceeded = err == nil
Expand Down
8 changes: 6 additions & 2 deletions tests/robustness/traffic/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,26 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids
_, err = kc.OptimisticDelete(writeCtx, key, rev)
nonUniqueWriteLimiter.Return()
} else {
shouldReturn := false

choices := t.writeChoices
if !nonUniqueWriteLimiter.Take() {
if shouldReturn = nonUniqueWriteLimiter.Take(); !shouldReturn {
choices = filterOutNonUniqueKubernetesWrites(t.writeChoices)
}
op := pickRandom(choices)
switch op {
case KubernetesDelete:
_, err = kc.OptimisticDelete(writeCtx, key, rev)
nonUniqueWriteLimiter.Return()
case KubernetesUpdate:
_, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev)
case KubernetesCreate:
err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID()))
default:
panic(fmt.Sprintf("invalid choice: %q", op))
}
if shouldReturn {
nonUniqueWriteLimiter.Return()
}
}
}
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste
}
}
case model.LeaseGrant:
case model.LeaseRevoke:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
Expand All @@ -216,6 +217,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati
}
case model.Range:
case model.LeaseGrant:
case model.LeaseRevoke:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
Expand Down

0 comments on commit a326ce1

Please sign in to comment.