Skip to content

Commit

Permalink
reproduce watch starvation and event loss
Browse files Browse the repository at this point in the history
Signed-off-by: Chao Chen <chaochn@amazon.com>
  • Loading branch information
chaochn47 committed Mar 11, 2024
1 parent 0c6b6ac commit d4e6456
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 0 deletions.
1 change: 1 addition & 0 deletions tests/e2e/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func newClient(t *testing.T, entpoints []string, cfg e2e.ClientConfig) *clientv3
Endpoints: entpoints,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
Logger: zap.NewNop(),
}
if tlscfg != nil {
ccfg.TLS, err = tlscfg.ClientConfig()
Expand Down
194 changes: 194 additions & 0 deletions tests/e2e/watch_delay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@ package e2e

import (
"context"
"errors"
"fmt"
"path"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"

v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
Expand Down Expand Up @@ -246,3 +252,191 @@ func continuouslyExecuteGetAll(ctx context.Context, t *testing.T, g *errgroup.Gr
return nil
})
}

// TestWatchOnStreamMultiplex ensures slow etcd watchers throws terminal ErrCompacted error if its
// next batch of events to be sent are compacted.
func TestWatchOnStreamMultiplex(t *testing.T) {
e2e.BeforeTest(t)
clus, err := e2e.NewEtcdProcessCluster(context.Background(), t, e2e.WithClusterSize(1))
require.NoError(t, err)
defer clus.Close()
endpoints := clus.EndpointsGRPC()
c := newClient(t, endpoints, e2e.ClientConfig{})
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()

g := errgroup.Group{}
watchKeyPrefix := "/registry/pods/"
commonWatchOpts := []clientv3.OpOption{
clientv3.WithCreatedNotify(),
clientv3.WithPrefix(),
clientv3.WithPrevKV(),
clientv3.WithProgressNotify(),
}

watchCacheEventsReceived := atomic.Int64{}
watchCacheWatcherExited := make(chan struct{})
watchCacheInitialized := make(chan struct{})
g.Go(func() error {
// simulate watch cache
lg := zaptest.NewLogger(t).Named("watch-cache")
defer func() { lg.Debug("watcher exited") }()

lastEventModifiedRevision := int64(1)
for wres := range c.Watch(rootCtx, watchKeyPrefix, commonWatchOpts...) {
if wres.Err() != nil {
close(watchCacheWatcherExited)
if errors.Is(wres.Err(), v3rpc.ErrCompacted) {
lg.Warn("got watch response error",
zap.Int64("last-received-events-kv-mod-revision", lastEventModifiedRevision),
zap.Int64("compact-revision", wres.CompactRevision))
return nil
}
return wres.Err()
}
if wres.Created {
close(watchCacheInitialized)
}
watchCacheEventsReceived.Add(int64(len(wres.Events)))
for _, ev := range wres.Events {
if ev.Kv.ModRevision != lastEventModifiedRevision+1 {
close(watchCacheWatcherExited)
return fmt.Errorf("event loss detected; want rev %d but got rev %d", lastEventModifiedRevision+1, ev.Kv.ModRevision)
}
lastEventModifiedRevision = ev.Kv.ModRevision
}
}
return nil
})
<-watchCacheInitialized

numOfDirectWatches := 800
for i := 0; i < numOfDirectWatches; i++ {
g.Go(func() error {
c.Watch(rootCtx, watchKeyPrefix, commonWatchOpts...)
return nil
})
}

eventsTriggered := atomic.Int64{}
loadCtx, loadCtxCancel := context.WithTimeout(rootCtx, time.Minute)
defer loadCtxCancel()
generateLoad(loadCtx, c, watchKeyPrefix, &g, &eventsTriggered)
compaction(loadCtx, t, &g, c)

// validate whether watch cache watcher is compacted or get all the events.
compareEventsReceivedAndTriggered(loadCtx, t, rootCtxCancel, &g, &watchCacheEventsReceived, &eventsTriggered, watchCacheWatcherExited)
require.NoError(t, g.Wait())
}

func generateLoad(ctx context.Context, c *clientv3.Client, watchKeyPrefix string, group *errgroup.Group, counter *atomic.Int64) {
numOfUpdater := 200
keyValuePayload := "bar"
for i := 0; i < numOfUpdater; i++ {
writeKeyPrefix := path.Join(watchKeyPrefix, fmt.Sprintf("%d", i))
group.Go(func() error {
count := 0
for {
select {
case <-ctx.Done():
return nil
default:
}
count++
key := path.Join(writeKeyPrefix, fmt.Sprintf("%d", count))
if _, err := c.Put(ctx, key, keyValuePayload); err == nil {
counter.Add(1)
}
if _, err := c.Delete(ctx, key); err == nil {
counter.Add(1)
}
time.Sleep(10 * time.Millisecond)
}
})
}
}

func compaction(ctx context.Context, t *testing.T, group *errgroup.Group, c *clientv3.Client) {
group.Go(func() error {
lg := zaptest.NewLogger(t).Named("compaction")
lastCompactRev := int64(-1)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
lg.Warn("context deadline exceeded, exit compaction routine")
return nil
case <-ticker.C:
}
if lastCompactRev < 0 {
gresp, err := c.Get(ctx, "foo")
if err != nil {
panic(err)
}
lastCompactRev = gresp.Header.Revision
continue
}
cres, err := c.Compact(ctx, lastCompactRev, clientv3.WithCompactPhysical())
if err != nil {
lg.Warn("failed to compact", zap.Error(err))
continue
}
lg.Debug("compacted rev", zap.Int64("compact-revision", lastCompactRev))
lastCompactRev = cres.Header.Revision
}
})
}

func compareEventsReceivedAndTriggered(
loadCtx context.Context,
t *testing.T,
rootCtxCancel context.CancelFunc,
group *errgroup.Group,
watchCacheEventsReceived *atomic.Int64,
eventsTriggered *atomic.Int64,
watchCacheWatcherExited <-chan struct{},
) {
group.Go(func() error {
defer rootCtxCancel() // cancel all the watchers and load.

lg := zaptest.NewLogger(t).Named("compareEvents")
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
timer := time.NewTimer(2 * time.Minute)
defer timer.Stop()

var once sync.Once
for {
// block until traffic is done.
select {
case <-loadCtx.Done():
once.Do(func() { lg.Info("load generator context is done") })
case <-watchCacheWatcherExited:
// watch cache watcher channel is expected to be closed with compacted error
// then there is no need to wait for load to verify watch cache receives all the events.
return nil
}

select {
case <-ticker.C:
case <-timer.C:
triggered := eventsTriggered.Load()
received := watchCacheEventsReceived.Load()
return fmt.Errorf("5 minutes passed since load generation is done, watch cache lost event detected; "+
"watch evetns received %d, received %d", received, triggered)
}
triggered := eventsTriggered.Load()
received := watchCacheEventsReceived.Load()
if received >= triggered {
lg.Info("The number of events watch cache received is high than or equal to events triggered on client side",
zap.Int64("watch-cache-received", received),
zap.Int64("traffic-triggered", triggered))
return nil
}
lg.Warn("watch events received is lagging behind",
zap.Int64("watch-events-received", received),
zap.Int64("events-triggered", triggered))
}
})
}

0 comments on commit d4e6456

Please sign in to comment.