From e13d5b8ccff4da9e3bc994f281aeafe1b6a97117 Mon Sep 17 00:00:00 2001 From: Mihail Stoykov Date: Thu, 28 Oct 2021 15:06:56 +0300 Subject: [PATCH] Add basic event loop with some API to be used by modules As well as cut down setTimeout implementation A recent update to goja introduced Promise. The catch here is that Promise's then will only be called when goja exits executing js code and it has already been resolved. Also resolving and rejecting Promises needs to happen while no other js code is being executed. This more or less necessitates adding an event loop. Additionally because a call to a k6 modules such as `k6/http` might make a promise to signal when an http request is made, but if (no changes were made) the iteration then finishes before the request completes, nothing would've stopped the start of a *new* iteration (which would probably just again ask k6/http to make a new request and return Promise). This might be a desirable behaviour for some cases but arguably will be very confusing so this commit also adds a way to Reserve(name should be changed) a place on the queue (doesn't reserve an exact spot) so that the event loop will not let the iteration finish until it gets unreserved. Additional abstraction to make a "handled" Promise is added so that k6 js-modules can use it the same way goja.NewPromise but with the key the difference that the Promise will be waited to be resolved before the event loop can end. Additionally to that, some additional code was needed so there is an event loop for all special functions calls (setup, teardown, handleSummary, default) and the init context. And finally, a basic setTimeout implementation was added. There is no way to currently cancel the setTimeout and it doesn't take code as the first argument or additional arguments to be given to the callback later on. fixes #882 --- core/local/eventloop_test.go | 125 ++++++++++++++++++++++++++++++++++ js/bundle.go | 18 ++++- js/eventloop.go | 117 +++++++++++++++++++++++++++++++ js/eventloop_test.go | 71 +++++++++++++++++++ js/initcontext.go | 28 +++++++- js/modules/modules.go | 15 ++++ js/modulestest/modulestest.go | 10 +++ js/runner.go | 35 ++++++---- 8 files changed, 401 insertions(+), 18 deletions(-) create mode 100644 core/local/eventloop_test.go create mode 100644 js/eventloop.go create mode 100644 js/eventloop_test.go diff --git a/core/local/eventloop_test.go b/core/local/eventloop_test.go new file mode 100644 index 00000000000..b16a8c13eed --- /dev/null +++ b/core/local/eventloop_test.go @@ -0,0 +1,125 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package local + +import ( + "io/ioutil" + "net/url" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "go.k6.io/k6/js" + "go.k6.io/k6/lib" + "go.k6.io/k6/lib/metrics" + "go.k6.io/k6/lib/testutils" + "go.k6.io/k6/lib/types" + "go.k6.io/k6/loader" +) + +func TestEventLoop(t *testing.T) { + t.Parallel() + script := []byte(` + setTimeout(()=> {console.log("initcontext setTimeout")}, 200) + console.log("initcontext"); + export default function() { + setTimeout(()=> {console.log("default setTimeout")}, 200) + console.log("default"); + }; + export function setup() { + setTimeout(()=> {console.log("setup setTimeout")}, 200) + console.log("setup"); + }; + export function teardown() { + setTimeout(()=> {console.log("teardown setTimeout")}, 200) + console.log("teardown"); + }; + export function handleSummary() { + setTimeout(()=> {console.log("handleSummary setTimeout")}, 200) + console.log("handleSummary"); + }; +`) + + logger := logrus.New() + logger.SetOutput(ioutil.Discard) + logHook := testutils.SimpleLogrusHook{HookedLevels: []logrus.Level{logrus.InfoLevel}} + logger.AddHook(&logHook) + + registry := metrics.NewRegistry() + builtinMetrics := metrics.RegisterBuiltinMetrics(registry) + runner, err := js.New( + logger, + &loader.SourceData{ + URL: &url.URL{Path: "/script.js"}, + Data: script, + }, + nil, + lib.RuntimeOptions{}, + builtinMetrics, + registry, + ) + require.NoError(t, err) + + ctx, cancel, execScheduler, samples := newTestExecutionScheduler(t, runner, logger, + lib.Options{ + TeardownTimeout: types.NullDurationFrom(time.Second), + SetupTimeout: types.NullDurationFrom(time.Second), + }) + defer cancel() + + errCh := make(chan error, 1) + go func() { errCh <- execScheduler.Run(ctx, ctx, samples, builtinMetrics) }() + + select { + case err := <-errCh: + require.NoError(t, err) + _, err = runner.HandleSummary(ctx, &lib.Summary{RootGroup: &lib.Group{}}) + require.NoError(t, err) + entries := logHook.Drain() + msgs := make([]string, len(entries)) + for i, entry := range entries { + msgs[i] = entry.Message + } + require.Equal(t, []string{ + "initcontext", // first initialization + "initcontext setTimeout", + "initcontext", // for vu + "initcontext setTimeout", + "initcontext", // for setup + "initcontext setTimeout", + "setup", // setup + "setup setTimeout", + "default", // one iteration + "default setTimeout", + "initcontext", // for teardown + "initcontext setTimeout", + "teardown", // teardown + "teardown setTimeout", + "initcontext", // for handleSummary + "initcontext setTimeout", + "handleSummary", // handleSummary + "handleSummary setTimeout", + }, msgs) + case <-time.After(10 * time.Second): + t.Fatal("timed out") + } +} diff --git a/js/bundle.go b/js/bundle.go index 39bf259f6ff..d578812b900 100644 --- a/js/bundle.go +++ b/js/bundle.go @@ -28,6 +28,7 @@ import ( "fmt" "net/url" "runtime" + "time" "github.com/dop251/goja" "github.com/dop251/goja/parser" @@ -69,6 +70,7 @@ type BundleInstance struct { env map[string]string exports map[string]goja.Callable + loop *eventLoop } // NewBundle creates a new bundle from a source file and a filesystem. @@ -261,6 +263,7 @@ func (b *Bundle) Instantiate(logger logrus.FieldLogger, vuID uint64) (bi *Bundle Context: ctxPtr, exports: make(map[string]goja.Callable), env: b.RuntimeOptions.Env, + loop: init.loop, } // Grab any exported functions that could be executed. These were @@ -307,6 +310,16 @@ func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init * } rt.Set("__ENV", env) rt.Set("__VU", vuID) + _ = rt.Set("setTimeout", func(f func(), t float64) { + // TODO checks and fixes + // TODO maybe really return something to use with `clearTimeout + // TODO support arguments ... maybe + runOnLoop := init.loop.Reserve() + go func() { + time.Sleep(time.Duration(t * float64(time.Millisecond))) + runOnLoop(f) + }() + }) rt.Set("console", common.Bind(rt, newConsole(logger), init.ctxPtr)) if init.compatibilityMode == lib.CompatibilityModeExtended { @@ -324,7 +337,10 @@ func (b *Bundle) instantiate(logger logrus.FieldLogger, rt *goja.Runtime, init * ctx := common.WithInitEnv(context.Background(), initenv) *init.ctxPtr = common.WithRuntime(ctx, rt) unbindInit := common.BindToGlobal(rt, common.Bind(rt, init, init.ctxPtr)) - if _, err := rt.RunProgram(b.Program); err != nil { + var err error + init.loop.RunOnLoop(func() { _, err = rt.RunProgram(b.Program) }) + init.loop.Start(*init.ctxPtr) + if err != nil { var exception *goja.Exception if errors.As(err, &exception) { err = &scriptException{inner: exception} diff --git a/js/eventloop.go b/js/eventloop.go new file mode 100644 index 00000000000..1d323245209 --- /dev/null +++ b/js/eventloop.go @@ -0,0 +1,117 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package js + +import ( + "context" + "sync" +) + +// an event loop +// TODO: DO NOT USE AS IT'S NOT DONE +type eventLoop struct { + queueLock sync.Mutex + queue []func() + wakeupCh chan struct{} // maybe use sync.Cond ? + reservedCount int +} + +func newEventLoop() *eventLoop { + return &eventLoop{ + wakeupCh: make(chan struct{}, 1), + } +} + +// RunOnLoop queues the function to be called from/on the loop +// This needs to be called before calling `Start` +// TODO maybe have only Reserve as this is equal to `e.Reserve()(f)` +func (e *eventLoop) RunOnLoop(f func()) { + e.queueLock.Lock() + e.queue = append(e.queue, f) + e.queueLock.Unlock() + select { + case e.wakeupCh <- struct{}{}: + default: + } +} + +// Reserve "reserves" a spot on the loop, preventing it from returning/finishing. The returning function will queue it's +// argument and wakeup the loop if needed and also unreserve the spot so that the loop can exit. +// this should be used instead of MakeHandledPromise if a promise will not be returned +// TODO better name +func (e *eventLoop) Reserve() func(func()) { + e.queueLock.Lock() + e.reservedCount++ + e.queueLock.Unlock() + + return func(f func()) { + e.queueLock.Lock() + e.queue = append(e.queue, f) + e.reservedCount-- + e.queueLock.Unlock() + select { + case e.wakeupCh <- struct{}{}: + default: + } + } +} + +// Start will run the event loop until it's empty and there are no reserved spots +// or the context is done +//nolint:cyclop +func (e *eventLoop) Start(ctx context.Context) { + done := ctx.Done() + for { + select { // check if done + case <-done: + return + default: + } + + // acquire the queue + e.queueLock.Lock() + queue := e.queue + e.queue = make([]func(), 0, len(queue)) + reserved := e.reservedCount != 0 + e.queueLock.Unlock() + + if len(queue) == 0 { + if !reserved { // we have empty queue and nothing that reserved a spot + return + } + select { // wait until the reserved is done + case <-done: + return + case <-e.wakeupCh: + } + } + + for _, f := range queue { + // run each function in the queue if not done + select { + case <-done: + return + default: + f() + } + } + } +} diff --git a/js/eventloop_test.go b/js/eventloop_test.go new file mode 100644 index 00000000000..26487fc3e6c --- /dev/null +++ b/js/eventloop_test.go @@ -0,0 +1,71 @@ +/* + * + * k6 - a next-generation load testing tool + * Copyright (C) 2021 Load Impact + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package js + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestBasicEventLoop(t *testing.T) { + t.Parallel() + loop := newEventLoop() + var ran int + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + loop.RunOnLoop(func() { ran++ }) + loop.Start(ctx) + require.Equal(t, ran, 1) + loop.RunOnLoop(func() { ran++ }) + loop.RunOnLoop(func() { ran++ }) + loop.Start(ctx) + require.Equal(t, ran, 3) + loop.RunOnLoop(func() { ran++; cancel() }) + loop.RunOnLoop(func() { ran++ }) + loop.Start(ctx) + require.Equal(t, ran, 4) +} + +func TestEventLoopReserve(t *testing.T) { + t.Parallel() + loop := newEventLoop() + var ran int + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + loop.RunOnLoop(func() { + ran++ + r := loop.Reserve() + go func() { + time.Sleep(time.Second) + r(func() { + ran++ + }) + }() + }) + start := time.Now() + loop.Start(ctx) + took := time.Since(start) + require.Equal(t, ran, 2) + require.Greater(t, took, time.Second) +} diff --git a/js/initcontext.go b/js/initcontext.go index 4580ce9df5d..5d9d660fa5d 100644 --- a/js/initcontext.go +++ b/js/initcontext.go @@ -67,6 +67,7 @@ type InitContext struct { // Bound runtime; used to instantiate objects. runtime *goja.Runtime compiler *compiler.Compiler + loop *eventLoop // Pointer to a context that bridged modules are invoked with. ctxPtr *context.Context @@ -100,6 +101,7 @@ func NewInitContext( compatibilityMode: compatMode, logger: logger, modules: getJSModules(), + loop: newEventLoop(), } } @@ -126,6 +128,7 @@ func newBoundInitContext(base *InitContext, ctxPtr *context.Context, rt *goja.Ru compatibilityMode: base.compatibilityMode, logger: base.logger, modules: base.modules, + loop: newEventLoop(), } } @@ -153,6 +156,8 @@ func (i *InitContext) Require(arg string) goja.Value { type moduleInstanceCoreImpl struct { ctxPtr *context.Context + rt *goja.Runtime + loop *eventLoop // we can technically put lib.State here as well as anything else } @@ -169,7 +174,26 @@ func (m *moduleInstanceCoreImpl) GetState() *lib.State { } func (m *moduleInstanceCoreImpl) GetRuntime() *goja.Runtime { - return common.GetRuntime(*m.ctxPtr) // TODO thread it correctly instead + return m.rt +} + +func (m *moduleInstanceCoreImpl) AddToEventLoop(f func()) { + m.loop.RunOnLoop(f) +} + +// MakeHandledPromise will create and promise and return it's resolve, reject methods as well wrapped in such a way that +// it will block the eventloop from exiting before they are called even if the promise isn't resolved by the time the +// current script ends executing +func (m *moduleInstanceCoreImpl) MakeHandledPromise() (*goja.Promise, func(interface{}), func(interface{})) { + reserved := m.loop.Reserve() + p, resolve, reject := m.rt.NewPromise() + return p, func(i interface{}) { + // more stuff + reserved(func() { resolve(i) }) + }, func(i interface{}) { + // more stuff + reserved(func() { reject(i) }) + } } func toESModuleExports(exp modules.Exports) interface{} { @@ -201,7 +225,7 @@ func (i *InitContext) requireModule(name string) (goja.Value, error) { return nil, fmt.Errorf("unknown module: %s", name) } if modV2, ok := mod.(modules.IsModuleV2); ok { - instance := modV2.NewModuleInstance(&moduleInstanceCoreImpl{ctxPtr: i.ctxPtr}) + instance := modV2.NewModuleInstance(&moduleInstanceCoreImpl{ctxPtr: i.ctxPtr, rt: i.runtime, loop: i.loop}) return i.runtime.ToValue(toESModuleExports(instance.GetExports())), nil } if perInstance, ok := mod.(modules.HasModuleInstancePerVU); ok { diff --git a/js/modules/modules.go b/js/modules/modules.go index 5c940b07e17..d784851dc61 100644 --- a/js/modules/modules.go +++ b/js/modules/modules.go @@ -123,6 +123,21 @@ type InstanceCore interface { // sealing field will help probably with pointing users that they just need to embed this in their Instance // implementations + + // MakeHandledPromise needs a better name + // because of the way promises work and the fact that we probably don't want promises from one iteration to live + // till the next, this method lets a piece of module code say that they will be returning a promise that needs to be + // resolved/rejected within this iteration. K6 will not continue with a next iteration until either `resolve` or + // `reject` are called at which point the Promise usual handling of those will trigger. + // Caveats: this likely won't work if the Promise is rejected from within the js code + // This also will likely have problems with context canceling so both of those will need extra care + // TODO maybe export eventloop.Reserve and implement this in the js/common + MakeHandledPromise() (p *goja.Promise, resolve func(interface{}), reject func(interface{})) + + // AddToEventLoop needs a better name + // MUST only be called while absolutely certain that something will not let the iteration end between the start and + // end of the call + AddToEventLoop(func()) } // Exports is representation of ESM exports of a module diff --git a/js/modulestest/modulestest.go b/js/modulestest/modulestest.go index e00cae7c7df..7e36f1fadf1 100644 --- a/js/modulestest/modulestest.go +++ b/js/modulestest/modulestest.go @@ -58,3 +58,13 @@ func (m *InstanceCore) GetState() *lib.State { func (m *InstanceCore) GetRuntime() *goja.Runtime { return m.Runtime } + +// MakeHandledPromise is not really implemented +func (m *InstanceCore) MakeHandledPromise() (p *goja.Promise, resolve func(interface{}), reject func(interface{})) { + return m.Runtime.NewPromise() // TODO fix +} + +// AddToEventLoop is not really implemented +func (m *InstanceCore) AddToEventLoop(f func()) { + // TODO Implement +} diff --git a/js/runner.go b/js/runner.go index ae0a690af69..685f54f39ef 100644 --- a/js/runner.go +++ b/js/runner.go @@ -231,6 +231,7 @@ func (r *Runner) newVU(idLocal, idGlobal uint64, samplesOut chan<- stats.SampleC BPool: bpool.NewBufferPool(100), Samples: samplesOut, scenarioIter: make(map[string]uint64), + loop: bi.loop, } vu.state = &lib.State{ @@ -541,6 +542,7 @@ func (r *Runner) getTimeoutFor(stage string) time.Duration { type VU struct { BundleInstance + loop *eventLoop Runner *Runner Transport *http.Transport Dialer *netext.Dialer @@ -734,24 +736,27 @@ func (u *VU) runFn( u.state.Tags.Set("iter", strconv.FormatInt(u.state.Iteration, 10)) } - defer func() { - if r := recover(); r != nil { - gojaStack := u.Runtime.CaptureCallStack(20, nil) - err = fmt.Errorf("a panic occurred in VU code but was caught: %s", r) - // TODO figure out how to use PanicLevel without panicing .. this might require changing - // the logger we use see - // https://github.com/sirupsen/logrus/issues/1028 - // https://github.com/sirupsen/logrus/issues/993 - b := new(bytes.Buffer) - for _, s := range gojaStack { - s.Write(b) + u.loop.RunOnLoop(func() { + defer func() { + if r := recover(); r != nil { + gojaStack := u.Runtime.CaptureCallStack(20, nil) + err = fmt.Errorf("a panic occurred in VU code but was caught: %s", r) + // TODO figure out how to use PanicLevel without panicing .. this might require changing + // the logger we use see + // https://github.com/sirupsen/logrus/issues/1028 + // https://github.com/sirupsen/logrus/issues/993 + b := new(bytes.Buffer) + for _, s := range gojaStack { + s.Write(b) + } + u.state.Logger.Log(logrus.ErrorLevel, "panic: ", r, "\n", string(debug.Stack()), "\nGoja stack:\n", b.String()) } - u.state.Logger.Log(logrus.ErrorLevel, "panic: ", r, "\n", string(debug.Stack()), "\nGoja stack:\n", b.String()) - } - }() + }() + v, err = fn(goja.Undefined(), args...) // Actually run the JS script + }) startTime := time.Now() - v, err = fn(goja.Undefined(), args...) // Actually run the JS script + u.loop.Start(ctx) endTime := time.Now() var exception *goja.Exception if errors.As(err, &exception) {