diff --git a/pkg/placement/placement.go b/pkg/placement/placement.go index 9f2de79c138..a22d9a6ec47 100644 --- a/pkg/placement/placement.go +++ b/pkg/placement/placement.go @@ -252,7 +252,16 @@ func (p *Service) ReportDaprStatus(stream placementv1pb.Placement_ReportDaprStat // We need to use a background context here so dissemination isn't tied to the context of this stream placementTable := p.raftNode.FSM().PlacementState(p.minAPILevel < NoVirtualNodesInPlacementTablesAPILevel) - err = p.performTablesUpdate(context.Background(), []placementGRPCStream{stream}, placementTable) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + select { + case <-ctx.Done(): + case <-p.closedCh: + cancel() + } + }() + err = p.performTablesUpdate(ctx, []placementGRPCStream{stream}, placementTable) + cancel() if err != nil { return err } diff --git a/tests/integration/framework/framework.go b/tests/integration/framework/framework.go index 19730a25278..8e9a5625bd3 100644 --- a/tests/integration/framework/framework.go +++ b/tests/integration/framework/framework.go @@ -27,11 +27,7 @@ type options struct { // Option is a function that configures the Framework's options. type Option func(*options) -type Framework struct { - procs []process.Interface -} - -func Run(t *testing.T, ctx context.Context, opts ...Option) *Framework { +func Run(t *testing.T, ctx context.Context, opts ...Option) { t.Helper() o := options{} @@ -41,22 +37,9 @@ func Run(t *testing.T, ctx context.Context, opts ...Option) *Framework { t.Logf("starting %d processes", len(o.procs)) - for _, proc := range o.procs { + for i, proc := range o.procs { + i := i proc.Run(t, ctx) - } - - return &Framework{ - procs: o.procs, - } -} - -func (f *Framework) Cleanup(t *testing.T) { - t.Helper() - - t.Logf("stopping %d processes", len(f.procs)) - - // Cleanup processes in reverse order in a stack fashion (same as t.Cleanup). - for i := len(f.procs) - 1; i >= 0; i-- { - f.procs[i].Cleanup(t) + t.Cleanup(func() { o.procs[i].Cleanup(t) }) } } diff --git a/tests/integration/framework/process/exec/exec.go b/tests/integration/framework/process/exec/exec.go index a0c0f85f7da..95a062670c8 100644 --- a/tests/integration/framework/process/exec/exec.go +++ b/tests/integration/framework/process/exec/exec.go @@ -104,6 +104,7 @@ func (e *exec) Run(t *testing.T, ctx context.Context) { for k, v := range e.envs { e.cmd.Env = append(e.cmd.Env, k+"="+v) } + e.cmd.Env = append(e.cmd.Env, "DAPR_HOST_IP=127.0.0.1") require.NoError(t, e.cmd.Start()) } diff --git a/tests/integration/integration.go b/tests/integration/integration.go index 73f3e8f57bd..6754f80ecc7 100644 --- a/tests/integration/integration.go +++ b/tests/integration/integration.go @@ -65,13 +65,7 @@ func RunIntegrationTests(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second) t.Cleanup(cancel) - f := framework.Run(t, ctx, options...) - - t.Cleanup(func() { - t.Log("cleaning up framework") - f.Cleanup(t) - t.Log("done") - }) + framework.Run(t, ctx, options...) t.Run("run", func(t *testing.T) { t.Log("running test case")