Skip to content

Commit

Permalink
refactor: Use CreateTest event
Browse files Browse the repository at this point in the history
The final push to make everything that goes to the db be an event to the
event_log.

This commit creates the `CreateTest` event, which contains the agenda and the
deployment. There is now also an `test_info` view that will contain the agenda
and deployment as columns (the data is still in json). This means that we no
longer use the `agenda`, `deployment` or `test` tables.

Noticeable renamings:
* in deployment we now call the reactors `reactor` instead of `component`.
  • Loading branch information
symbiont-daniel-gustafsson committed Jan 29, 2021
1 parent b63512d commit d224bf3
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 163 deletions.
20 changes: 20 additions & 0 deletions src/db/migrations/1611834879_create_test_event.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- +migrate Up
CREATE VIEW IF NOT EXISTS test_info AS
SELECT
json_extract(meta, '$.test-id') as test_id,
json_extract(data, '$.agenda') as agenda,
json_extract(data, '$.deployment') as deployment,
at as created_time
FROM event_log
WHERE event like 'CreateTest';

DROP TABLE IF EXISTS deployment;
DROP TABLE IF EXISTS agenda;
DROP TABLE IF EXISTS test;

-- +migrate Down
DROP VIEW IF EXISTS test_info;

CREATE TABLE IF NOT EXISTS test (rowid INTEGER PRIMARY KEY) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS agenda (rowid INTEGER PRIMARY KEY) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS deployment (rowid INTEGER PRIMARY KEY) WITHOUT ROWID;
18 changes: 14 additions & 4 deletions src/debugger/internal/debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,20 @@ type HeapDiff struct {
}

func GetInitHeap(testId lib.TestId) []HeapDiff {
query := fmt.Sprintf(`SELECT component,args
FROM deployment
WHERE test_id = %d`, testId.TestId)
return helper(query)
deploys, err := lib.DeploymentInfoForTest(testId)
if err != nil {
panic(err)
}

diffs := make([]HeapDiff, 0, len(deploys))
for _, dep := range deploys {
diffs = append(diffs, HeapDiff{
Reactor: dep.Reactor,
Diff: dep.Args,
})
}

return diffs
}

func GetHeapTrace(testId lib.TestId, runId lib.RunId) []HeapDiff {
Expand Down
22 changes: 3 additions & 19 deletions src/executor/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,17 @@ type ExecutionStepEvent struct {
}

func EmitExecutionStepEvent(db *sql.DB, event ExecutionStepEvent) {
metaBlob, err := json.Marshal(struct {
meta := struct {
Component string `json:"component"`
RunId lib.RunId `json:"run-id"`
TestId lib.TestId `json:"test-id"`
}{
Component: "executor",
RunId: event.Meta.RunId,
TestId: event.Meta.TestId,
})
if err != nil {
panic(err)
}

dataBlob, err := json.Marshal(struct {
data := struct {
Reactor string `json:"reactor"`
LogicalTime int `json:"logical-time"`
SimulatedTime time.Time `json:"simulated-time"`
Expand All @@ -43,20 +40,7 @@ func EmitExecutionStepEvent(db *sql.DB, event ExecutionStepEvent) {
SimulatedTime: event.SimulatedTime,
LogLines: event.LogLines,
HeapDiff: event.HeapDiff,
})
if err != nil {
panic(err)
}

stmt, err := db.Prepare(`INSERT INTO event_log(event, meta, data) VALUES(?,?,?)`)
if err != nil {
panic(err)
}
defer stmt.Close()

_, err = stmt.Exec("ExecutionStep", metaBlob, dataBlob)

if err != nil {
panic(err)
}
lib.EmitEvent(db, "ExecutionStep", meta, data)
}
33 changes: 4 additions & 29 deletions src/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,36 +187,14 @@ func Deploy(srv *http.Server, topology Topology, m lib.Marshaler) {
}

func topologyFromDeployment(testId lib.TestId, constructor func(string) lib.Reactor) (Topology, error) {
query := fmt.Sprintf(`SELECT component,type
FROM deployment
WHERE test_id = %d`, testId.TestId)
deployments, err := lib.DeploymentInfoForTest(testId)

db := lib.OpenDB()
defer db.Close()

rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()

topologyRaw := make(map[string]string)
type Column struct {
Component string
Type string
}
for rows.Next() {
column := Column{}
err := rows.Scan(&column.Component, &column.Type)
if err != nil {
return nil, err
}
topologyRaw[column.Component] = column.Type
}

topologyCooked := make(Topology)
for component, typ := range topologyRaw {
topologyCooked[component] = constructor(typ)
for _, deploy := range deployments {
topologyCooked[deploy.Reactor] = constructor(deploy.Type)
}
return topologyCooked, nil
}
Expand Down Expand Up @@ -277,7 +255,7 @@ func (e *Executor) SetTestId(testId lib.TestId) {
e.testId = testId
}

func NewExecutor(testId lib.TestId, marshaler lib.Marshaler, logger *zap.Logger, reactorNames []string, constructor func(name string, logger *zap.Logger) lib.Reactor) *Executor {
func NewExecutor(marshaler lib.Marshaler, logger *zap.Logger, reactorNames []string, constructor func(name string, logger *zap.Logger) lib.Reactor) *Executor {
topology := make(map[string]lib.Reactor)
buffers := make(map[string]*LogWriter)

Expand All @@ -293,7 +271,6 @@ func NewExecutor(testId lib.TestId, marshaler lib.Marshaler, logger *zap.Logger,
topology: topology,
buffers: buffers,
marshaler: marshaler,
testId: testId,
constructor: constructor,
logger: logger,
}
Expand All @@ -320,8 +297,6 @@ func (e *Executor) Deploy(srv *http.Server) {
}

func (e *Executor) Register() {
// Should probably separate the loading of the database to get deployment and register
// if so we could remove the need for `Executor` to know the test-id
components := make([]string, 0, len(e.topology))
for c, _ := range e.topology {
components = append(components, c)
Expand Down
39 changes: 23 additions & 16 deletions src/generator/detsys-generator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,34 @@ TEST="$1"
DETSYS_DB=${DETSYS_DB:-"${HOME}/.detsys.db"}

# Create test.
sqlite3 "${DETSYS_DB}" "INSERT INTO test DEFAULT VALUES"
TEST_ID=$(sqlite3 "${DETSYS_DB}" "SELECT max(id) from test")
TEST_ID=$(sqlite3 "${DETSYS_DB}" "SELECT IFNULL(max(test_id),-1)+1 from test_info")

META=''
DATA=''

if [ "${TEST}" == "register" ]; then
sqlite3 "${DETSYS_DB}" <<EOF
INSERT INTO agenda (test_id, id, kind, event, args, \`from\`, \`to\`, at)
VALUES
(${TEST_ID}, 0, "invoke", "write", '{"value": 1}', "client:0", "frontend", "1970-01-01T00:00:00Z"),
(${TEST_ID}, 1, "invoke", "read", "{}", "client:0", "frontend", "1970-01-01T00:00:10Z");
INSERT INTO deployment VALUES(${TEST_ID}, "frontend", "frontend", '{"inFlight":{},"inFlightSessionToClient":{},"nextSessionId":0}');
INSERT INTO deployment VALUES(${TEST_ID}, "register1", "register", '{"value":[]}');
INSERT INTO deployment VALUES(${TEST_ID}, "register2", "register", '{"value":[]}');
EOF
META=$(cat <<END_META
{"component": "detsys-generator", "test-id": ${TEST_ID}}
END_META
)
DATA=$(cat <<END_DATA
{"agenda":[{"kind": "invoke", "event": "write", "args": {"value": 1}, "from": "client:0", "to": "frontend", "at": "1970-01-01T00:00:00Z"}, {"kind": "invoke", "event": "read", "args": {}, "from": "client:0", "to": "frontend", "at": "1970-01-01T00:00:10Z"}], "deployment": [{"reactor": "frontend", "type": "frontend", "args": {"inFlight":{},"inFlightSessionToClient":{},"nextSessionId":0}}, {"reactor": "register1", "type": "register", "args": {"value":[]}}, {"reactor": "register2", "type": "register", "args": {"value":[]}}]}
END_DATA
)
elif [ "${TEST}" == "broadcast" ]; then
sqlite3 "${DETSYS_DB}" <<EOF
INSERT INTO deployment VALUES(${TEST_ID}, "A", "node", '{"log":"Hello world!","neighbours":{}}');
INSERT INTO deployment VALUES(${TEST_ID}, "B", "node", '{"log":"","neighbours":{}}');
INSERT INTO deployment VALUES(${TEST_ID}, "C", "node", '{"log":"","neighbours":{}}');
EOF
META=$(cat <<END_META
{"component": "detsys-generator", "test-id": ${TEST_ID}}
END_META
)
DATA=$(cat <<END_DATA
{"agenda":[], "deployment": [{"reactor": "A", "type": "node", "args": {"log":"Hello world!","neighbours":{}}} , {"reactor": "B", "type": "node", "args": {"log":"","neighbours":{}}} , {"reactor": "C", "type": "node", "args": {"log":"","neighbours":{}}}]}
END_DATA
)
fi

sqlite3 "${DETSYS_DB}" <<EOF
INSERT INTO event_log(event, meta,data) VALUES("CreateTest", '${META}', '${DATA}')
EOF
echo "${TEST_ID}"

popd > /dev/null
1 change: 1 addition & 0 deletions src/lib/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "lib",
srcs = [
"checker.go",
"event.go",
"generator.go",
"ldfi.go",
"lib.go",
Expand Down
30 changes: 30 additions & 0 deletions src/lib/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package lib

import (
"database/sql"
"encoding/json"
)

func EmitEvent(db *sql.DB, event string, meta interface{}, data interface{}) {
metaBlob, err := json.Marshal(meta)
if err != nil {
panic(err)
}

dataBlob, err := json.Marshal(data)
if err != nil {
panic(err)
}

stmt, err := db.Prepare(`INSERT INTO event_log(event, meta, data) VALUES(?,?,?)`)
if err != nil {
panic(err)
}
defer stmt.Close()

_, err = stmt.Exec(event, metaBlob, dataBlob)

if err != nil {
panic(err)
}
}
Loading

0 comments on commit d224bf3

Please sign in to comment.