From d224bf3be63bf582dd06b6a88b6abf085062d6d9 Mon Sep 17 00:00:00 2001 From: Daniel Gustafsson Date: Thu, 28 Jan 2021 17:16:15 +0100 Subject: [PATCH] refactor: Use CreateTest event The final push to make everything that goes to the db be an event to the event_log. This commit creates the `CreateTest` event, which contains the agenda and the deployment. There is now also an `test_info` view that will contain the agenda and deployment as columns (the data is still in json). This means that we no longer use the `agenda`, `deployment` or `test` tables. Noticeable renamings: * in deployment we now call the reactors `reactor` instead of `component`. --- .../1611834879_create_test_event.sql | 20 ++++ src/debugger/internal/debugger.go | 18 ++- src/executor/event.go | 22 +--- src/executor/executor.go | 33 +----- src/generator/detsys-generator.sh | 39 ++++--- src/lib/BUILD.bazel | 1 + src/lib/event.go | 30 +++++ src/lib/generator.go | 108 +++++++----------- src/lib/scheduler.go | 33 ++---- src/lib/util.go | 47 ++++++++ src/scheduler/src/scheduler/db.clj | 12 +- 11 files changed, 200 insertions(+), 163 deletions(-) create mode 100644 src/db/migrations/1611834879_create_test_event.sql create mode 100644 src/lib/event.go diff --git a/src/db/migrations/1611834879_create_test_event.sql b/src/db/migrations/1611834879_create_test_event.sql new file mode 100644 index 00000000..88e7a2b6 --- /dev/null +++ b/src/db/migrations/1611834879_create_test_event.sql @@ -0,0 +1,20 @@ +-- +migrate Up +CREATE VIEW IF NOT EXISTS test_info AS + SELECT + json_extract(meta, '$.test-id') as test_id, + json_extract(data, '$.agenda') as agenda, + json_extract(data, '$.deployment') as deployment, + at as created_time + FROM event_log + WHERE event like 'CreateTest'; + +DROP TABLE IF EXISTS deployment; +DROP TABLE IF EXISTS agenda; +DROP TABLE IF EXISTS test; + +-- +migrate Down +DROP VIEW IF EXISTS test_info; + +CREATE TABLE IF NOT EXISTS test (rowid INTEGER PRIMARY KEY) WITHOUT ROWID; +CREATE TABLE IF NOT EXISTS agenda (rowid INTEGER PRIMARY KEY) WITHOUT ROWID; +CREATE TABLE IF NOT EXISTS deployment (rowid INTEGER PRIMARY KEY) WITHOUT ROWID; diff --git a/src/debugger/internal/debugger.go b/src/debugger/internal/debugger.go index ed3d301d..615c4bab 100644 --- a/src/debugger/internal/debugger.go +++ b/src/debugger/internal/debugger.go @@ -22,10 +22,20 @@ type HeapDiff struct { } func GetInitHeap(testId lib.TestId) []HeapDiff { - query := fmt.Sprintf(`SELECT component,args - FROM deployment - WHERE test_id = %d`, testId.TestId) - return helper(query) + deploys, err := lib.DeploymentInfoForTest(testId) + if err != nil { + panic(err) + } + + diffs := make([]HeapDiff, 0, len(deploys)) + for _, dep := range deploys { + diffs = append(diffs, HeapDiff{ + Reactor: dep.Reactor, + Diff: dep.Args, + }) + } + + return diffs } func GetHeapTrace(testId lib.TestId, runId lib.RunId) []HeapDiff { diff --git a/src/executor/event.go b/src/executor/event.go index 0b73f390..7f24869d 100644 --- a/src/executor/event.go +++ b/src/executor/event.go @@ -18,7 +18,7 @@ type ExecutionStepEvent struct { } func EmitExecutionStepEvent(db *sql.DB, event ExecutionStepEvent) { - metaBlob, err := json.Marshal(struct { + meta := struct { Component string `json:"component"` RunId lib.RunId `json:"run-id"` TestId lib.TestId `json:"test-id"` @@ -26,12 +26,9 @@ func EmitExecutionStepEvent(db *sql.DB, event ExecutionStepEvent) { Component: "executor", RunId: event.Meta.RunId, TestId: event.Meta.TestId, - }) - if err != nil { - panic(err) } - dataBlob, err := json.Marshal(struct { + data := struct { Reactor string `json:"reactor"` LogicalTime int `json:"logical-time"` SimulatedTime time.Time `json:"simulated-time"` @@ -43,20 +40,7 @@ func EmitExecutionStepEvent(db *sql.DB, event ExecutionStepEvent) { SimulatedTime: event.SimulatedTime, LogLines: event.LogLines, HeapDiff: event.HeapDiff, - }) - if err != nil { - panic(err) } - stmt, err := db.Prepare(`INSERT INTO event_log(event, meta, data) VALUES(?,?,?)`) - if err != nil { - panic(err) - } - defer stmt.Close() - - _, err = stmt.Exec("ExecutionStep", metaBlob, dataBlob) - - if err != nil { - panic(err) - } + lib.EmitEvent(db, "ExecutionStep", meta, data) } diff --git a/src/executor/executor.go b/src/executor/executor.go index b5672a81..d41e6c22 100644 --- a/src/executor/executor.go +++ b/src/executor/executor.go @@ -187,36 +187,14 @@ func Deploy(srv *http.Server, topology Topology, m lib.Marshaler) { } func topologyFromDeployment(testId lib.TestId, constructor func(string) lib.Reactor) (Topology, error) { - query := fmt.Sprintf(`SELECT component,type - FROM deployment - WHERE test_id = %d`, testId.TestId) + deployments, err := lib.DeploymentInfoForTest(testId) - db := lib.OpenDB() - defer db.Close() - - rows, err := db.Query(query) if err != nil { return nil, err } - defer rows.Close() - - topologyRaw := make(map[string]string) - type Column struct { - Component string - Type string - } - for rows.Next() { - column := Column{} - err := rows.Scan(&column.Component, &column.Type) - if err != nil { - return nil, err - } - topologyRaw[column.Component] = column.Type - } - topologyCooked := make(Topology) - for component, typ := range topologyRaw { - topologyCooked[component] = constructor(typ) + for _, deploy := range deployments { + topologyCooked[deploy.Reactor] = constructor(deploy.Type) } return topologyCooked, nil } @@ -277,7 +255,7 @@ func (e *Executor) SetTestId(testId lib.TestId) { e.testId = testId } -func NewExecutor(testId lib.TestId, marshaler lib.Marshaler, logger *zap.Logger, reactorNames []string, constructor func(name string, logger *zap.Logger) lib.Reactor) *Executor { +func NewExecutor(marshaler lib.Marshaler, logger *zap.Logger, reactorNames []string, constructor func(name string, logger *zap.Logger) lib.Reactor) *Executor { topology := make(map[string]lib.Reactor) buffers := make(map[string]*LogWriter) @@ -293,7 +271,6 @@ func NewExecutor(testId lib.TestId, marshaler lib.Marshaler, logger *zap.Logger, topology: topology, buffers: buffers, marshaler: marshaler, - testId: testId, constructor: constructor, logger: logger, } @@ -320,8 +297,6 @@ func (e *Executor) Deploy(srv *http.Server) { } func (e *Executor) Register() { - // Should probably separate the loading of the database to get deployment and register - // if so we could remove the need for `Executor` to know the test-id components := make([]string, 0, len(e.topology)) for c, _ := range e.topology { components = append(components, c) diff --git a/src/generator/detsys-generator.sh b/src/generator/detsys-generator.sh index b28aafc7..b09669f4 100755 --- a/src/generator/detsys-generator.sh +++ b/src/generator/detsys-generator.sh @@ -8,27 +8,34 @@ TEST="$1" DETSYS_DB=${DETSYS_DB:-"${HOME}/.detsys.db"} # Create test. -sqlite3 "${DETSYS_DB}" "INSERT INTO test DEFAULT VALUES" -TEST_ID=$(sqlite3 "${DETSYS_DB}" "SELECT max(id) from test") +TEST_ID=$(sqlite3 "${DETSYS_DB}" "SELECT IFNULL(max(test_id),-1)+1 from test_info") + +META='' +DATA='' if [ "${TEST}" == "register" ]; then - sqlite3 "${DETSYS_DB}" < /dev/null diff --git a/src/lib/BUILD.bazel b/src/lib/BUILD.bazel index 3fe011fd..4c3c1b72 100644 --- a/src/lib/BUILD.bazel +++ b/src/lib/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "lib", srcs = [ "checker.go", + "event.go", "generator.go", "ldfi.go", "lib.go", diff --git a/src/lib/event.go b/src/lib/event.go new file mode 100644 index 00000000..6005ed59 --- /dev/null +++ b/src/lib/event.go @@ -0,0 +1,30 @@ +package lib + +import ( + "database/sql" + "encoding/json" +) + +func EmitEvent(db *sql.DB, event string, meta interface{}, data interface{}) { + metaBlob, err := json.Marshal(meta) + if err != nil { + panic(err) + } + + dataBlob, err := json.Marshal(data) + if err != nil { + panic(err) + } + + stmt, err := db.Prepare(`INSERT INTO event_log(event, meta, data) VALUES(?,?,?)`) + if err != nil { + panic(err) + } + defer stmt.Close() + + _, err = stmt.Exec(event, metaBlob, dataBlob) + + if err != nil { + panic(err) + } +} diff --git a/src/lib/generator.go b/src/lib/generator.go index 9db8af2e..0e0cc1d9 100644 --- a/src/lib/generator.go +++ b/src/lib/generator.go @@ -31,73 +31,45 @@ func GenerateTest(test string) TestId { type Topology = map[string]Reactor type Agenda = []ScheduledEvent -func generateNewEmptyTest() TestId { - var testId TestId +func GenerateTestFromTopologyAndAgenda(topology Topology, agenda Agenda) TestId { + db := OpenDB() + defer db.Close() + var testId TestId { - db := OpenDB() - defer db.Close() - stmt, err := db.Prepare(`INSERT INTO test DEFAULT VALUES`) + stmt, err := db.Prepare(`SELECT IFNULL(max(test_id),-1)+1 FROM test_info`) defer stmt.Close() - _, err = stmt.Exec() if err != nil { panic(err) } - } - - { - db := OpenDB() - defer db.Close() - stmt, err := db.Prepare(`SELECT max(id) FROM test`) - defer stmt.Close() if err = stmt.QueryRow().Scan(&testId.TestId); err != nil { panic(err) } } - return testId -} - -func setDeploymentHeap(testId TestId, topology Topology) { - db := OpenDB() - defer db.Close() - - stmt, err := db.Prepare(`INSERT INTO deployment(test_id, component, type, args) - VALUES(?,?,?,?)`) - if err != nil { - panic(err) - } - defer stmt.Close() - - for k, r := range topology { - j, err := json.Marshal(r) - if err != nil { - panic(err) - } - typ := strings.ToLower(strings.Split(reflect.TypeOf(r).String(), ".")[1]) - _, err = stmt.Exec(testId.TestId, k, typ, j) - if err != nil { - panic(err) - } + meta := struct { + Component string `json:"component"` + TestId int `json:"test-id"` + }{ + Component: "generator", + TestId: testId.TestId, } -} -func setAgenda(testId TestId, agenda Agenda) { - type Entry struct { - Kind string - Event string - Args []byte - From string - To string - At time.Time + type AgendaItem struct { + Kind string `json:"kind"` + Event string `json:"event"` + Args json.RawMessage `json:"args"` + From string `json:"from"` + To string `json:"to"` + At time.Time `json:"at"` } - entries := make([]Entry, len(agenda)) - for id, entry := range agenda { + entries := make([]AgendaItem, 0, len(agenda)) + for _, entry := range agenda { var kind, event string - var args []byte + var args json.RawMessage switch ev := entry.Event.(type) { case ClientRequest: @@ -119,37 +91,41 @@ func setAgenda(testId TestId, agenda Agenda) { default: panic(fmt.Sprintf("Unknown message type %#v\n", ev)) } - entries[id] = Entry{ + entries = append(entries, AgendaItem{ Kind: kind, Event: event, Args: args, From: entry.From, To: entry.To, At: entry.At, - } + }) } - db := OpenDB() - defer db.Close() + deployment := make([]DeploymentInfo, 0, len(topology)) - stmt, err := db.Prepare("INSERT INTO agenda(test_id, id, kind, event, args, `from`, `to`, at) VALUES(?,?,?,?,?,?,?,?)") - if err != nil { - panic(err) - } - defer stmt.Close() - - for id, entry := range entries { - _, err = stmt.Exec(testId.TestId, id, entry.Kind, entry.Event, entry.Args, entry.From, entry.To, entry.At.Format(time.RFC3339Nano)) + for reactor, r := range topology { + args, err := json.Marshal(r) if err != nil { panic(err) } + + typ := strings.ToLower(strings.Split(reflect.TypeOf(r).String(), ".")[1]) + deployment = append(deployment, DeploymentInfo{ + Reactor: reactor, + Type: typ, + Args: args, + }) } -} -func GenerateTestFromTopologyAndAgenda(topology Topology, agenda Agenda) TestId { - testId := generateNewEmptyTest() - setDeploymentHeap(testId, topology) - setAgenda(testId, agenda) + data := struct { + Agenda []AgendaItem `json:"agenda"` + Deployment []DeploymentInfo `json:"deployment"` + }{ + Agenda: entries, + Deployment: deployment, + } + + EmitEvent(db, "CreateTest", meta, data) return testId } diff --git a/src/lib/scheduler.go b/src/lib/scheduler.go index 31f8ac2f..43452653 100644 --- a/src/lib/scheduler.go +++ b/src/lib/scheduler.go @@ -123,43 +123,30 @@ func Step() json.RawMessage { return result } -func componentsFromDeployment(testId TestId) ([]string, error) { - query := fmt.Sprintf(`SELECT component - FROM deployment - WHERE test_id = %d`, testId.TestId) +func reactorsFromDeployment(testId TestId) ([]string, error) { + deploys, err := DeploymentInfoForTest(testId) - db := OpenDB() - defer db.Close() - - rows, err := db.Query(query) if err != nil { return nil, err } - defer rows.Close() - var components []string - type Column struct { - Component string - } - for rows.Next() { - column := Column{} - err := rows.Scan(&column.Component) - if err != nil { - return nil, err - } - components = append(components, column.Component) + reactors := make([]string, 0, len(deploys)) + + for _, dep := range deploys { + reactors = append(reactors, dep.Reactor) } - return components, nil + + return reactors, nil } func Register(testId TestId) { // TODO(stevan): Make executorUrl part of topology/deployment. const executorUrl string = "http://localhost:3001/api/v1/" - components, err := componentsFromDeployment(testId) + reactors, err := reactorsFromDeployment(testId) if err != nil { panic(err) } - RegisterExecutor(executorUrl, components) + RegisterExecutor(executorUrl, reactors) } diff --git a/src/lib/util.go b/src/lib/util.go index 89c67dd4..e14f10d8 100644 --- a/src/lib/util.go +++ b/src/lib/util.go @@ -102,6 +102,53 @@ func OpenDB() *sql.DB { return db } +type DeploymentInfo struct { + Reactor string `json:"reactor"` + Type string `json:"type"` + Args json.RawMessage `json:"args"` +} + +func DeploymentInfoForTest(testId TestId) ([]DeploymentInfo, error) { + db := OpenDB() + defer db.Close() + + rows, err := db.Query(`SELECT deployment + FROM test_info + WHERE test_id = ?`, testId.TestId) + if err != nil { + return nil, err + } + defer rows.Close() + + var deployments = make([]DeploymentInfo, 0) + found_one := false + for rows.Next() { + // we should only find one test with a given test-id + if found_one { + return nil, errors.New(fmt.Sprintf("We found multiple tests with id: %d", testId.TestId)) + } + found_one = true + + var jsonBlob []byte + err := rows.Scan(&jsonBlob) + if err != nil { + return nil, err + } + var columns []DeploymentInfo + err = json.Unmarshal(jsonBlob, &columns) + + if err != nil { + return nil, err + } + + for _, column := range columns { + deployments = append(deployments, column) + } + } + + return deployments, nil +} + type TimeFromString time.Time func (tf *TimeFromString) Scan(src interface{}) error { diff --git a/src/scheduler/src/scheduler/db.clj b/src/scheduler/src/scheduler/db.clj index 09ffe264..e4c0ad2a 100644 --- a/src/scheduler/src/scheduler/db.clj +++ b/src/scheduler/src/scheduler/db.clj @@ -28,12 +28,12 @@ [test-id] (->> (jdbc/execute! ds - ["SELECT * FROM agenda WHERE test_id = ? ORDER BY id ASC" test-id] - {:return-keys true :builder-fn rs/as-unqualified-lower-maps}) - (mapv #(-> % - (dissoc :id :test_id) - (update :at time/instant) - (update :args json/read))))) + ["SELECT agenda FROM test_info WHERE test_id = ?" test-id] + {:builder-fn rs/as-unqualified-lower-maps}) + first ;; we should only have one test for the test_id.. + :agenda + json/read + (mapv #(update % :at time/instant)))) (defn next-run-id! [test-id]