From b3d336dd2ced2a34f9a963297ae3e2c7d7bf1f5c Mon Sep 17 00:00:00 2001 From: Michael Whittaker Date: Mon, 11 Sep 2023 15:01:10 -0700 Subject: [PATCH] Run simulations concurrently. (#595) This PR changes the `Simulator.Run` method to run multiple simulations in parallel. This does speed things up a bit (5x throughput for TestPassingSimulations), but things still aren't as fast as I'd like. I also need a lot of parallelism to speed things up fully which I don't yet fully understand. I'll try to do some profiling and optimizations in future PRs. --- godeps.txt | 1 + internal/sim/api.go | 131 +++++++++++++++++++++++++++++++------------- 2 files changed, 94 insertions(+), 38 deletions(-) diff --git a/godeps.txt b/godeps.txt index fe39b02b8..58f0479e1 100644 --- a/godeps.txt +++ b/godeps.txt @@ -293,6 +293,7 @@ github.com/ServiceWeaver/weaver/internal/sim sort strings sync + sync/atomic testing time github.com/ServiceWeaver/weaver/internal/status diff --git a/internal/sim/api.go b/internal/sim/api.go index 165c9f79d..eecfafced 100644 --- a/internal/sim/api.go +++ b/internal/sim/api.go @@ -27,11 +27,13 @@ import ( "fmt" "math/rand" "reflect" + "sync" + "sync/atomic" "testing" "time" "github.com/ServiceWeaver/weaver/internal/reflection" - "github.com/ServiceWeaver/weaver/runtime" + swruntime "github.com/ServiceWeaver/weaver/runtime" "github.com/ServiceWeaver/weaver/runtime/codegen" "github.com/ServiceWeaver/weaver/runtime/protos" ) @@ -231,7 +233,7 @@ func New(t *testing.T, x Workload, opts Options) *Simulator { app := &protos.AppConfig{} if opts.Config != "" { var err error - app, err = runtime.ParseConfig("", opts.Config, codegen.ComponentConfigValidator) + app, err = swruntime.ParseConfig("", opts.Config, codegen.ComponentConfigValidator) if err != nil { t.Fatalf("sim.New: parse config: %v", err) } @@ -313,54 +315,107 @@ func validateWorkload(t reflect.Type) error { // Run runs simulations for the provided duration. func (s *Simulator) Run(duration time.Duration) Results { + // TODO(mwhittaker): Use a smarter algorithm to sweep over hyperparameters. + // TODO(mwhittaker): Read and run graveyard entries. start := time.Now() deadline := start.Add(duration) ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() - // TODO(mwhittaker): Use a smarter algorithm to sweep over hyperparameters. - // TODO(mwhittaker): Run simulations in multiple goroutines. - // TODO(mwhittaker): Read and run graveyard entries. - seed := time.Now().UnixNano() - count := 0 - for numOps := 1; ; numOps++ { - for _, failureRate := range []float64{0.0, 0.01, 0.05, 0.1} { - for _, yieldRate := range []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0} { - for i := 0; i < 10; i++ { - if time.Now().After(deadline) { - return Results{ - NumSimulations: count, - Duration: time.Since(start), - } + // Spawn n concurrent simulators. The simulators read options from the opts + // channel and write errors and failed results to the errs and + // failedResults channels. Simulation ends when we encounter an error or + // successfully find an invariant violation. + // + // TODO(mwhittaker): Optimize things and pick a smarter value of n. + const n = 1000 + opts := make(chan options, n) + errs := make(chan error, n) + failedResults := make(chan Results, n) + done := sync.WaitGroup{} + numSimulations := int64(0) + + // Spawn n simulating goroutines that read from the opts channel. + done.Add(n) + for i := 0; i < n; i++ { + go func() { + defer done.Done() + for { + select { + case <-ctx.Done(): + return + case o := <-opts: + atomic.AddInt64(&numSimulations, 1) + switch r, err := s.runOne(ctx, o); { + case err != nil && err == ctx.Err(): + // The simulation was cancelled because the deadline + // was met. Stop executing simulations. + return + case err != nil: + // The simulation failed to execute properly. Abort. + errs <- err + return + case r.Err != nil: + // The simulation successfully found an invariant + // violation. Return the result and stop execution. + failedResults <- r + return + default: + // The simulation ran without finding an invariant + // violation. Move on to the next simulation. } + } + } + }() + } - seed++ - count++ - opts := options{ - Seed: seed, - NumOps: numOps, - NumReplicas: 1, - FailureRate: failureRate, - YieldRate: yieldRate, - } - results, err := s.runOne(ctx, opts) - if err != nil && err == ctx.Err() { - return Results{ - NumSimulations: count, - Duration: time.Since(start), + // Spawn a goroutine that writes to the opts channel. + done.Add(1) + go func() { + defer done.Done() + seed := time.Now().UnixNano() + for numOps := 1; ; numOps++ { + for _, failureRate := range []float64{0.0, 0.01, 0.05, 0.1} { + for _, yieldRate := range []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0} { + for i := 0; i < 10; i++ { + seed++ + o := options{ + Seed: seed, + NumOps: numOps, + NumReplicas: 1, + FailureRate: failureRate, + YieldRate: yieldRate, + } + select { + case <-ctx.Done(): + return + case opts <- o: } - } - if err != nil { - s.t.Fatal(err) - } - if results.Err != nil { - results.NumSimulations = count - results.Duration = time.Since(start) - return results } } } } + }() + + select { + case err := <-errs: + // A simulation failed to execute properly. + s.t.Fatal(err) + return Results{} + case r := <-failedResults: + // A simulation successfully found an invariant violation. + cancel() + done.Wait() + r.NumSimulations = int(numSimulations) + r.Duration = time.Since(start) + return r + case <-ctx.Done(): + // We hit our deadline. All simulations passed. + done.Wait() + return Results{ + NumSimulations: int(numSimulations), + Duration: time.Since(start), + } } }