Skip to content

Commit

Permalink
feat(executor): executor as event loop
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-daniel-gustafsson committed Nov 2, 2021
1 parent cb4608e commit fad2176
Show file tree
Hide file tree
Showing 17 changed files with 694 additions and 17 deletions.
98 changes: 98 additions & 0 deletions src/executor-event-loop/admin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package executorEL

/*
These seems to not be synchronised, so we could get problems correlating input/output
*/

import (
"bufio"
"fmt"
"os"
)

type AdminCommand int

const (
AdminQuit = iota
AdminDumpLog
AdminResetLog
AdminUnknown
)

// panic galore if we don't have it
func parseCommandFromString(command string) AdminCommand {
v, ok := map[string]AdminCommand{
"AdminQuit": AdminQuit,
"AdminDumpLog": AdminDumpLog,
"AdminResetLog": AdminResetLog,
}[command]
if !ok {
return AdminUnknown
} else {
return v
}
}

type AdminInterface struct {
Started bool
Incoming string
Outgoing string
ListenChannel chan AdminCommand
}

func NewAdmin(input string, output string) (*AdminInterface, *error) {
err := createPipe(input)
if err != nil {
return nil, err
}

err = createPipe(output)
if err != nil {
return nil, err
}

com := make(chan AdminCommand)
return &AdminInterface{
Started: false,
Incoming: input,
Outgoing: output,
ListenChannel: com,
}, nil
}

func (a *AdminInterface) findIncoming() {
for {
file, err := openPipe(a.Incoming, os.O_RDONLY)
if err != nil {
panic(err)
}
buf := bufio.NewScanner(file)
for buf.Scan() {
line := buf.Text()
if line == "" {
continue
}
command := parseCommandFromString(line)
if command != AdminUnknown {
a.ListenChannel <- command
} else {
fmt.Printf("Unknown command: %s\n", line)
}
}
}
}

func (a *AdminInterface) Listen() <-chan AdminCommand {
if !a.Started {
go a.findIncoming()
a.Started = true
}
return a.ListenChannel
}

func (a *AdminInterface) Respond(line string) {
if !a.Started {
panic("Can't send on admin interface before you Listen()")
}
writePipe(a.Outgoing, line)
}
71 changes: 71 additions & 0 deletions src/executor-event-loop/communication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package executorEL

import (
"bufio"
"encoding/json"
"fmt"
"os"
)

type CommandTransport struct {
Started bool
Incomming string
ListenChannel chan Envelope
}

func NewCommandTransport(input string) (*CommandTransport, *error) {
err := createPipe(input)
if err != nil {
return nil, err
}

com := make(chan Envelope)
return &CommandTransport{
Started: false,
Incomming: input,
ListenChannel: com,
}, nil
}

func (ct *CommandTransport) Send(env Envelope) {
j, err := json.Marshal(env)
if err != nil {
panic(err)
}

writePipe(env.Receiver.Address, string(j)+"\n")
}

func (ct *CommandTransport) findIncoming() {
for {

file, err := openPipe(ct.Incomming, os.O_RDONLY)
if err != nil {
panic(err)
}

buf := bufio.NewScanner(file)
for buf.Scan() {
line := buf.Text() // is this correct?
if line == "" {
continue
}
var envelope *Envelope
err := json.Unmarshal([]byte(line), &envelope)
if err != nil {
panic(err)
}

ct.ListenChannel <- *envelope
}
}
}

func (ct *CommandTransport) Listen() <-chan Envelope {
if !ct.Started {
go ct.findIncoming()
ct.Started = true
}

return ct.ListenChannel
}
82 changes: 82 additions & 0 deletions src/executor-event-loop/envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package executorEL

import (
"encoding/json"
)

type EnvelopeKind int

const (
Request = iota
Response
)

func (t EnvelopeKind) String() string {
return [...]string{"RequestKind", "ResponseKind"}[t]
}

func (t *EnvelopeKind) FromString(kind string) EnvelopeKind {
return map[string]EnvelopeKind{
"RequestKind": Request,
"ResponseKind": Response,
}[kind]
}

func (t EnvelopeKind) MarshalJSON() ([]byte, error) {
return json.Marshal(t.String())
}

func (t *EnvelopeKind) UnmarshalJSON(b []byte) error {
var s string
err := json.Unmarshal(b, &s)
if err != nil {
return err
}
*t = t.FromString(s)
return nil
}

type LocalRef struct {
Index int `json:"index"`
}

type RemoteRef struct {
Address string `json:"address"`
Index int `json:"index"`
}

func (r RemoteRef) ToLocal() LocalRef {
return LocalRef{Index: r.Index}
}

type CorrelationId int
type LogicalTime int

func (lt *LogicalTime) Incr() {
*lt = *lt + 1
}

func (lt *LogicalTime) Merge(olt LogicalTime) {
lm := LogicalTime(-1)
if int(*lt) < int(olt) {
lm = olt
} else {
lm = *lt
}
*lt = lm + 1
}

// we need to agree how this should work
type Message struct {
Kind string `json:"kind"`
Message json.RawMessage `json:"message"` // is json value
}

type Envelope struct {
Kind EnvelopeKind `json:"envelopeKind"`
Sender RemoteRef `json:"envelopeSender"`
Message Message `json:"envelopeMessage"`
Receiver RemoteRef `json:"envelopeReceiver"`
CorrelationId CorrelationId `json:"envelopeCorrelationId"`
LogicalTime LogicalTime `json:"envelopeLogicalTime"`
}
109 changes: 109 additions & 0 deletions src/executor-event-loop/eventloop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package executorEL

import (
"fmt"
"time"
)

type EventLoop struct {
// Name string // hardcoded to "executor" for now
Admin *AdminInterface
CommandTransport *CommandTransport
Log []TimestampedLogEntry
LogicalTime *LogicalTime
Executor *Executor
SchedulerRef RemoteRef
}

func NewEventLoop(ai *AdminInterface, ct *CommandTransport, executor *Executor) *EventLoop {
lt := LogicalTime(0)
return &EventLoop{
Admin: ai,
CommandTransport: ct,
Log: []TimestampedLogEntry{},
LogicalTime: &lt,
Executor: executor,
SchedulerRef: RemoteRef{"/tmp/scheduler", 0},
}
}

func (el *EventLoop) AddToLog(logDirection LogDirection, me LocalRef, env Message) {
tentry := TimestampedLogEntry{
LogEntry: LogEntry{
LocalRef: me,
RemoteRef: el.SchedulerRef,
Message: env,
Direction: logDirection,
},
LogicalTime: *el.LogicalTime,
Time: time.Now(),
}
el.Log = append(el.Log, tentry)
}

func (el *EventLoop) AddToAdminLog(cmd AdminCommand) {
el.AddToLog(LogResumeContinuation, LocalRef{0}, Message{"AdminCommand", []byte(fmt.Sprintf("Got command %d\n", cmd))})
}

func (el *EventLoop) toSchedulerEnvelope(me RemoteRef, msg Message, correlationId CorrelationId) Envelope {
return Envelope{
Kind: Response,
Sender: me,
Message: msg,
Receiver: el.SchedulerRef,
CorrelationId: correlationId,
LogicalTime: *el.LogicalTime,
}
}

func (el *EventLoop) processAdmin(cmd AdminCommand) (bool, []string) {
switch cmd {
case AdminQuit:
fmt.Printf("Shutting down....\n")
return true, []string{}
case AdminDumpLog:
fmt.Printf("dumping log\n")
log := make([]string, len(el.Log))
for _, e := range el.Log {
log = append(log, e.Serialise())
}
return false, log
case AdminResetLog:
fmt.Printf("resetting log\n")
el.Log = make([]TimestampedLogEntry, 0)
return false, []string{}
default:
fmt.Printf("Unknown admin command: %#v\n", cmd)
panic("Unhandled admin command")
}
}

func (el *EventLoop) Run() {
adminCommand := el.Admin.Listen()
commands := el.CommandTransport.Listen()
for {
fmt.Print("Looking for command\n")
select {
case cmd := <-adminCommand:
fmt.Printf("Found admin command\n")
el.LogicalTime.Incr()
el.AddToAdminLog(cmd)
quit, output := el.processAdmin(cmd)
if quit {
return
}
for _, entry := range output {
el.Admin.Respond(entry)
}
case envelope := <-commands:
fmt.Printf("Found message\n")
me := envelope.Receiver.ToLocal()
el.LogicalTime.Merge(envelope.LogicalTime)
el.AddToLog(LogResumeContinuation, me, envelope.Message)
outgoingMessage := el.Executor.processEnvelope(envelope)
outgoing := el.toSchedulerEnvelope(envelope.Receiver, outgoingMessage, envelope.CorrelationId)
el.AddToLog(LogSend, me, outgoingMessage)
el.CommandTransport.Send(outgoing)
}
}
}
Loading

0 comments on commit fad2176

Please sign in to comment.