Skip to content

Commit

Permalink
Run simulations concurrently. (#595)
Browse files Browse the repository at this point in the history
This PR changes the `Simulator.Run` method to run multiple simulations
in parallel. This does speed things up a bit (5x throughput for
TestPassingSimulations), but things still aren't as fast as I'd like. I
also need a lot of parallelism to speed things up fully which I don't
yet fully understand. I'll try to do some profiling and optimizations in
future PRs.
  • Loading branch information
mwhittaker authored Sep 11, 2023
1 parent e9b3b83 commit b3d336d
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 38 deletions.
1 change: 1 addition & 0 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ github.com/ServiceWeaver/weaver/internal/sim
sort
strings
sync
sync/atomic
testing
time
github.com/ServiceWeaver/weaver/internal/status
Expand Down
131 changes: 93 additions & 38 deletions internal/sim/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"fmt"
"math/rand"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/ServiceWeaver/weaver/internal/reflection"
"github.com/ServiceWeaver/weaver/runtime"
swruntime "github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/codegen"
"github.com/ServiceWeaver/weaver/runtime/protos"
)
Expand Down Expand Up @@ -231,7 +233,7 @@ func New(t *testing.T, x Workload, opts Options) *Simulator {
app := &protos.AppConfig{}
if opts.Config != "" {
var err error
app, err = runtime.ParseConfig("", opts.Config, codegen.ComponentConfigValidator)
app, err = swruntime.ParseConfig("", opts.Config, codegen.ComponentConfigValidator)
if err != nil {
t.Fatalf("sim.New: parse config: %v", err)
}
Expand Down Expand Up @@ -313,54 +315,107 @@ func validateWorkload(t reflect.Type) error {

// Run runs simulations for the provided duration.
func (s *Simulator) Run(duration time.Duration) Results {
// TODO(mwhittaker): Use a smarter algorithm to sweep over hyperparameters.
// TODO(mwhittaker): Read and run graveyard entries.
start := time.Now()
deadline := start.Add(duration)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

// TODO(mwhittaker): Use a smarter algorithm to sweep over hyperparameters.
// TODO(mwhittaker): Run simulations in multiple goroutines.
// TODO(mwhittaker): Read and run graveyard entries.
seed := time.Now().UnixNano()
count := 0
for numOps := 1; ; numOps++ {
for _, failureRate := range []float64{0.0, 0.01, 0.05, 0.1} {
for _, yieldRate := range []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0} {
for i := 0; i < 10; i++ {
if time.Now().After(deadline) {
return Results{
NumSimulations: count,
Duration: time.Since(start),
}
// Spawn n concurrent simulators. The simulators read options from the opts
// channel and write errors and failed results to the errs and
// failedResults channels. Simulation ends when we encounter an error or
// successfully find an invariant violation.
//
// TODO(mwhittaker): Optimize things and pick a smarter value of n.
const n = 1000
opts := make(chan options, n)
errs := make(chan error, n)
failedResults := make(chan Results, n)
done := sync.WaitGroup{}
numSimulations := int64(0)

// Spawn n simulating goroutines that read from the opts channel.
done.Add(n)
for i := 0; i < n; i++ {
go func() {
defer done.Done()
for {
select {
case <-ctx.Done():
return
case o := <-opts:
atomic.AddInt64(&numSimulations, 1)
switch r, err := s.runOne(ctx, o); {
case err != nil && err == ctx.Err():
// The simulation was cancelled because the deadline
// was met. Stop executing simulations.
return
case err != nil:
// The simulation failed to execute properly. Abort.
errs <- err
return
case r.Err != nil:
// The simulation successfully found an invariant
// violation. Return the result and stop execution.
failedResults <- r
return
default:
// The simulation ran without finding an invariant
// violation. Move on to the next simulation.
}
}
}
}()
}

seed++
count++
opts := options{
Seed: seed,
NumOps: numOps,
NumReplicas: 1,
FailureRate: failureRate,
YieldRate: yieldRate,
}
results, err := s.runOne(ctx, opts)
if err != nil && err == ctx.Err() {
return Results{
NumSimulations: count,
Duration: time.Since(start),
// Spawn a goroutine that writes to the opts channel.
done.Add(1)
go func() {
defer done.Done()
seed := time.Now().UnixNano()
for numOps := 1; ; numOps++ {
for _, failureRate := range []float64{0.0, 0.01, 0.05, 0.1} {
for _, yieldRate := range []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0} {
for i := 0; i < 10; i++ {
seed++
o := options{
Seed: seed,
NumOps: numOps,
NumReplicas: 1,
FailureRate: failureRate,
YieldRate: yieldRate,
}
select {
case <-ctx.Done():
return
case opts <- o:
}
}
if err != nil {
s.t.Fatal(err)
}
if results.Err != nil {
results.NumSimulations = count
results.Duration = time.Since(start)
return results
}
}
}
}
}()

select {
case err := <-errs:
// A simulation failed to execute properly.
s.t.Fatal(err)
return Results{}
case r := <-failedResults:
// A simulation successfully found an invariant violation.
cancel()
done.Wait()
r.NumSimulations = int(numSimulations)
r.Duration = time.Since(start)
return r
case <-ctx.Done():
// We hit our deadline. All simulations passed.
done.Wait()
return Results{
NumSimulations: int(numSimulations),
Duration: time.Since(start),
}
}
}

Expand Down

0 comments on commit b3d336d

Please sign in to comment.