Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: joshvanl <me@joshvanl.dev>
  • Loading branch information
JoshVanL committed Dec 5, 2023
1 parent 64341ae commit 8c02f71
Showing 1 changed file with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -59,6 +60,7 @@ type rebalancing struct {
placementStream placementv1pb.Placement_ReportDaprStatusClient
activeActors []atomic.Bool
doubleActivationCh chan string
wg sync.WaitGroup
}

func (i *rebalancing) Setup(t *testing.T) []framework.Option {
Expand Down Expand Up @@ -97,6 +99,7 @@ func (i *rebalancing) Setup(t *testing.T) []framework.Option {
}

func (i *rebalancing) Run(t *testing.T, ctx context.Context) {
t.Cleanup(i.wg.Wait)
i.place.WaitUntilRunning(t, ctx)

// Wait for daprd to be ready
Expand All @@ -113,7 +116,11 @@ func (i *rebalancing) Run(t *testing.T, ctx context.Context) {
placementClientReady := make(chan error)
placementCtx, placementCancel := context.WithCancel(ctx)
defer placementCancel()
go i.getPlacementClient(placementCtx, placementClientReady)
i.wg.Add(1)
go func() {
defer i.wg.Add(1)
i.getPlacementClient(placementCtx, placementClientReady)
}()
select {
case <-ctx.Done():
require.Fail(t, "Placement client not ready in time")
Expand All @@ -137,7 +144,9 @@ func (i *rebalancing) Run(t *testing.T, ctx context.Context) {
errCh := make(chan error)

// To start, monitor for double activations
i.wg.Add(1)
go func() {
defer i.wg.Add(1)
errs := make([]error, 0)
for doubleAct := range i.doubleActivationCh {
// An empty message is a signal to stop
Expand All @@ -155,7 +164,9 @@ func (i *rebalancing) Run(t *testing.T, ctx context.Context) {

// Schedule reminders to be executed in 0s
for j := 0; j < iterations; j++ {
i.wg.Add(1)
go func(j int) {
defer i.wg.Add(1)
rctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
body := `{"dueTime": "0s"}`
Expand All @@ -182,7 +193,9 @@ func (i *rebalancing) Run(t *testing.T, ctx context.Context) {
}

// In parallel, add another node to the placement which will trigger a rebalancing
i.wg.Add(1)
go func() {
defer i.wg.Add(1)
rErr := i.reportStatusToPlacement(ctx, i.placementStream, []string{"myactortype"})
if rErr != nil {
errCh <- fmt.Errorf("failed to trigger rebalancing: %w", rErr)
Expand All @@ -193,7 +206,9 @@ func (i *rebalancing) Run(t *testing.T, ctx context.Context) {

// Also invoke the same actors using actor invocation
for j := 0; j < iterations; j++ {
i.wg.Add(1)
go func(j int) {
defer i.wg.Add(1)
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
daprdURL := fmt.Sprintf("http://localhost:%d/v1.0/actors/myactortype/myactorid-%d/method/foo", i.daprd[0].HTTPPort(), j)
Expand All @@ -217,7 +232,9 @@ func (i *rebalancing) Run(t *testing.T, ctx context.Context) {
}

// After 2s, stop doubleActivationCh by sending an empty message
i.wg.Add(1)
go func() {
defer i.wg.Add(1)
<-time.After(2 * time.Second)
i.doubleActivationCh <- ""
}()
Expand Down Expand Up @@ -405,7 +422,9 @@ func (i *rebalancing) reportStatusToPlacement(ctx context.Context, stream placem
}

errCh := make(chan error)
i.wg.Add(1)
go func() {
defer i.wg.Add(1)
for {
o, rerr := stream.Recv()
if rerr != nil {
Expand Down

0 comments on commit 8c02f71

Please sign in to comment.