Skip to content

Commit

Permalink
feat(runtime): Executor now give a LogWriter to all reactors in the…
Browse files Browse the repository at this point in the history
… constructor

This `LogWriter` implements the `io.Writer` interface and also the `Sync()`
interface that are used by the `zap` logging framework (and potentially others).
The executor will make sure that each time a reactor logs, that the log messages
will be correctly associated together with the message the reactor is reacting
to.
  • Loading branch information
symbiont-daniel-gustafsson committed Nov 19, 2021
1 parent f2fa5d2 commit 6d2cbbe
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 28 deletions.
5 changes: 4 additions & 1 deletion src/executor-event-loop/eventloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ func (el *EventLoop) Run() {
me := envelope.Receiver.ToLocal()
el.LogicalTime.Merge(envelope.LogicalTime)
el.AddToLog(LogResumeContinuation, me, envelope.Message)
outgoingMessage := el.Executor.processEnvelope(envelope)
outgoingMessage, _ /*rui*/ := el.Executor.processEnvelope(envelope)
// TODO we should add rui to the log
// el.LogicalTime.Incr()
// el.AddToLog()
el.LogicalTime.Incr()
outgoing := el.toSchedulerEnvelope(envelope.Receiver, outgoingMessage, envelope.CorrelationId)
el.AddToLog(LogSend, me, outgoingMessage)
Expand Down
94 changes: 68 additions & 26 deletions src/executor-event-loop/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,76 @@ package executorEL
import (
"encoding/json"
"fmt"
// "io/ioutil"
// "strconv"
"time"

// "go.uber.org/zap"
// "go.uber.org/zap/zapcore"

"github.com/symbiont-io/detsys-testkit/src/lib"
)

func jsonError(s string) string {
return fmt.Sprintf("{\"error\":\"%s\"}", s)
}

type StepInfo struct {
LogLines []string
type ReactorStepInfo struct {
SimulatedTime time.Time
LogLines []string
StateDiff json.RawMessage
}

type ComponentUpdate = func(component string) StepInfo
type ReactorsUpdateInfo = map[string]ReactorStepInfo
type ReactorConstructor = func(name string, logBuffer *LogWriter) lib.Reactor
type Buffers = map[string]*LogWriter

type Executor struct {
Reactors []string
Buffers Buffers
Topology lib.Topology
BuildReactor func(name string) lib.Reactor
BuildReactor ReactorConstructor
Marshaler lib.Marshaler
//Update ComponentUpdate
}

func buildTopology(constructor func(name string) lib.Reactor, reactors []string) lib.Topology {
func buildTopology(constructor ReactorConstructor, reactors []string) (lib.Topology, Buffers) {
items := make([]lib.Item, 0, len(reactors))
buffers := make(Buffers)
for _, n := range reactors {
items = append(items, lib.Item{n, constructor(n)})
buffer := newLogWriter()
buffers[n] = buffer
items = append(items, lib.Item{n, constructor(n, buffer)})
}
return lib.NewTopology(items...)
return lib.NewTopology(items...), buffers
}

func NewExecutor(reactors []string, buildReactor func(name string) lib.Reactor, m lib.Marshaler) *Executor {
return &Executor{
func NewExecutor(reactors []string, buildReactor ReactorConstructor, m lib.Marshaler) *Executor {
topo, buffers := buildTopology(buildReactor, reactors)
e := &Executor{
Reactors: reactors,
Topology: buildTopology(buildReactor, reactors),
Buffers: buffers,
Topology: topo,
BuildReactor: buildReactor,
Marshaler: m,
//Update: cu,
}
return e
}

func (ex Executor) DumpReactorLoglines(name string) []string {
buffer, ok := ex.Buffers[name]
if !ok {
panic(fmt.Sprintf("Couldn't find buffer for %s", name))
}
logs := buffer.dump()
return logs
}

func (ex Executor) Reset() {
ex.Topology = buildTopology(ex.BuildReactor, ex.Reactors)
ex.Topology, ex.Buffers = buildTopology(ex.BuildReactor, ex.Reactors)

}

func (el Executor) processEnvelope(env Envelope) Message {
func (el Executor) processEnvelope(env Envelope) (Message, ReactorsUpdateInfo) {

msg := env.Message

var returnMessage json.RawMessage
rui := make(map[string]ReactorStepInfo)
switch msg.Kind {
case "message":
fallthrough
Expand All @@ -74,17 +89,33 @@ func (el Executor) processEnvelope(env Envelope) Message {
heapBefore := dumpHeapJson(reactor)
oevs := reactor.Receive(sev.At, sev.From, sev.Event)
heapAfter := dumpHeapJson(reactor)
/* heapDiff := */ jsonDiff(heapBefore, heapAfter)
// si := el.Update(reactorName)
heapDiff := jsonDiff(heapBefore, heapAfter)
logLines := el.DumpReactorLoglines(reactorName)

rui[reactorName] = ReactorStepInfo{
SimulatedTime: sev.At,
LogLines: logLines,
StateDiff: heapDiff,
}

returnMessage = lib.MarshalUnscheduledEvents(reactorName, int(env.CorrelationId), oevs)
case "init":
var inits = make([]lib.Event, 0)

reactors := el.Topology.Reactors()
for _, reactor := range reactors {
inits = append(inits,
lib.OutEventsToEvents(reactor, el.Topology.Reactor(reactor).Init())...)
for _, reactorName := range reactors {
reactor := el.Topology.Reactor(reactorName)
heapBefore := dumpHeapJson(reactor)
inits = append(inits, lib.OutEventsToEvents(reactorName, reactor.Init())...)
heapAfter := dumpHeapJson(reactor)
heapDiff := jsonDiff(heapBefore, heapAfter)
logLines := el.DumpReactorLoglines(reactorName)
rui[reactorName] = ReactorStepInfo{
SimulatedTime: time.Unix(0, 0).UTC(),
LogLines: logLines,
StateDiff: heapDiff,
}

}

bs, err := json.Marshal(struct {
Expand All @@ -106,9 +137,19 @@ func (el Executor) processEnvelope(env Envelope) Message {
panic(err)
}
reactor := el.Topology.Reactor(req.Reactor)
heapBefore := dumpHeapJson(reactor)
oevs := reactor.Timer(req.At)
heapAfter := dumpHeapJson(reactor)
heapDiff := jsonDiff(heapBefore, heapAfter)
logLines := el.DumpReactorLoglines(req.Reactor)
rui[req.Reactor] = ReactorStepInfo{
SimulatedTime: req.At,
LogLines: logLines,
StateDiff: heapDiff,
}
returnMessage = lib.MarshalUnscheduledEvents(req.Reactor, int(env.CorrelationId), oevs)
case "fault":
// unclear how to log this as a ReactorsStepInfo...
type FaultRequest struct {
Reactor string `json:"to"`
Event string `json:"event"`
Expand All @@ -119,7 +160,8 @@ func (el Executor) processEnvelope(env Envelope) Message {
}
switch req.Event {
case "restart":
el.Topology.Insert(req.Reactor, el.BuildReactor(req.Reactor))
buffer := el.Buffers[req.Reactor]
el.Topology.Insert(req.Reactor, el.BuildReactor(req.Reactor, buffer))
default:
fmt.Printf("Unhandled fault type %s\n", req.Event)
panic("Unhandled fault type")
Expand All @@ -137,5 +179,5 @@ func (el Executor) processEnvelope(env Envelope) Message {
fmt.Printf("Unknown message type: %#v\n", msg.Kind)
panic("Unknown message type")
}
return Message{"Events", returnMessage}
return Message{"Events", returnMessage}, rui
}
45 changes: 45 additions & 0 deletions src/executor-event-loop/logwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package executorEL

type LogWriter [][]byte

func newLogWriter() *LogWriter {
x := LogWriter(make([][]byte, 0, 0))
return &x
}

func (lw *LogWriter) dump() []string {
logs := make([]string, 0, len(*lw))
for _, l := range *lw {
logs = append(logs, string(l))
}
*lw = LogWriter(make([][]byte, 0, 0))
return logs
}

func (_ *LogWriter) Sync() error {
return nil
}

func (lw *LogWriter) Write(p []byte) (n int, err error) {
if len(p) > 0 {
// make a copy of the line so that it doesn't get changed later
line := make([]byte, len(p))
copy(line, p)

// we remove last byte since it is an newline
*lw = append(*lw, line[:len(line)-1])
}
return len(p), nil
}

/*
func (lw LogWriter) AppendToLogger(logger *zap.Logger) *zap.Logger {
return logger.WithOptions(zap.WrapCore(func(c zapcore.Core) zapcore.Core {
config := zap.NewDevelopmentEncoderConfig()
config.TimeKey = ""
config.LineEnding = ""
dbLogger := zapcore.NewCore(zapcore.NewConsoleEncoder(config), lw, c)
return zapcore.NewTee(c, dbLogger)
}))
}
*/
2 changes: 1 addition & 1 deletion src/sut/register/executor/executorcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/symbiont-io/detsys-testkit/src/sut/register"
)

func constructor(name string) lib.Reactor {
func constructor(name string, logbuffer *executorEL.LogWriter) lib.Reactor {
switch name {
case "frontend":
return sut.NewFrontEnd4()
Expand Down

0 comments on commit 6d2cbbe

Please sign in to comment.