diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 36c423336..c603f9567 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -49,6 +49,9 @@ type RetryConfig struct { } func (a *Agent) Run() error { + if err := a.setupRequestId(); err != nil { + return err + } a.init() if err := a.setupGraph(); err != nil { return err @@ -61,7 +64,6 @@ func (a *Agent) Run() error { } setup := []func() error{ a.checkIsRunning, - a.setupRequestId, a.setupDatabase, a.setupSocketServer, } @@ -140,6 +142,7 @@ func (a *Agent) init() { OnSuccess: a.DAG.HandlerOn.Success, OnFailure: a.DAG.HandlerOn.Failure, OnCancel: a.DAG.HandlerOn.Cancel, + RequestId: a.requestId, }) a.reporter = &reporter.Reporter{ Config: &reporter.Config{ @@ -150,9 +153,10 @@ func (a *Agent) init() { }), }} a.logFilename = filepath.Join( - a.DAG.LogDir, fmt.Sprintf("%s.%s.log", + a.DAG.LogDir, fmt.Sprintf("%s.%s.%s.log", utils.ValidFilename(a.DAG.Name, "_"), - time.Now().Format("20060102.15:04:05"), + a.requestId, + time.Now().Format("20060102.15:04:05.000"), )) } @@ -185,7 +189,7 @@ func (a *Agent) setupRequestId() error { func (a *Agent) setupDatabase() (err error) { a.database = database.New(database.DefaultConfig()) - a.dbWriter, a.dbFile, err = a.database.NewWriter(a.DAG.ConfigPath, time.Now()) + a.dbWriter, a.dbFile, err = a.database.NewWriter(a.DAG.ConfigPath, time.Now(), a.requestId) return } diff --git a/internal/database/database.go b/internal/database/database.go index 07fc7af2a..32730d984 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -59,8 +59,8 @@ func ParseFile(file string) (*models.Status, error) { return m, nil } -func (db *Database) NewWriter(configPath string, t time.Time) (*Writer, string, error) { - f, err := db.newFile(configPath, t) +func (db *Database) NewWriter(configPath string, t time.Time, requestId string) (*Writer, string, error) { + f, err := db.newFile(configPath, t, requestId) if err != nil { return nil, "", err } @@ -175,11 +175,11 @@ func (db *Database) dir(configPath string, prefix string) string { return filepath.Join(db.Dir, fmt.Sprintf("%s-%s", prefix, v)) } -func (db *Database) newFile(configPath string, t time.Time) (string, error) { +func (db *Database) newFile(configPath string, t time.Time, requestId string) (string, error) { if configPath == "" { return "", fmt.Errorf("configPath is empty") } - fileName := fmt.Sprintf("%s.%s.dat", db.pattern(configPath), t.Format("20060102.15:04:05")) + fileName := fmt.Sprintf("%s.%s.%s.dat", db.pattern(configPath), requestId, t.Format("20060102.15:04:05.000")) return fileName, nil } @@ -191,7 +191,7 @@ func (db *Database) pattern(configPath string) string { func (db *Database) latestToday(configPath string, day time.Time) (string, error) { var ret = []string{} - pattern := fmt.Sprintf("%s.%s*.dat", db.pattern(configPath), day.Format("20060102")) + pattern := fmt.Sprintf("%s.*.%s*.dat", db.pattern(configPath), day.Format("20060102")) matches, err := filepath.Glob(pattern) if err == nil || len(matches) > 0 { ret = filterLatest(matches, 1) diff --git a/internal/database/database_test.go b/internal/database/database_test.go index 011db51ca..947ae0539 100644 --- a/internal/database/database_test.go +++ b/internal/database/database_test.go @@ -52,13 +52,14 @@ func testNewDataFile(t *testing.T, db *Database) { ConfigPath: "test_new_data_file.yaml", } timestamp := time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local) - f, err := db.newFile(cfg.ConfigPath, timestamp) + requestId := "request-id-1" + f, err := db.newFile(cfg.ConfigPath, timestamp, requestId) require.NoError(t, err) p := utils.ValidFilename(strings.TrimSuffix( path.Base(cfg.ConfigPath), path.Ext(cfg.ConfigPath)), "_") - assert.Regexp(t, fmt.Sprintf("%s.*/%s.20220101.00:00:00.dat", p, p), f) + assert.Regexp(t, fmt.Sprintf("%s.*/%s.%s.20220101.00:00:00.000.dat", p, p, requestId), f) - _, err = db.newFile("", timestamp) + _, err = db.newFile("", timestamp, requestId) require.Error(t, err) } @@ -71,22 +72,28 @@ func testWriteAndFindFiles(t *testing.T, db *Database) { for _, data := range []struct { Status *models.Status + RequestId string Timestamp time.Time }{ { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + "request-id-1", time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local), }, { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + "request-id-2", time.Date(2022, 1, 2, 0, 0, 0, 0, time.Local), }, { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + "request-id-3", time.Date(2022, 1, 3, 0, 0, 0, 0, time.Local), }, } { - testWriteStatus(t, db, cfg, data.Status, data.Timestamp) + status := data.Status + status.RequestId = data.RequestId + testWriteStatus(t, db, cfg, status, data.Timestamp) } files := db.latest(db.pattern(cfg.ConfigPath)+"*.dat", 2) @@ -142,21 +149,27 @@ func testRemoveOldFiles(t *testing.T, db *Database) { for _, data := range []struct { Status *models.Status + RequestId string Timestamp time.Time }{ { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + "request-id-1", time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local), }, { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + "request-id-2", time.Date(2022, 1, 2, 0, 0, 0, 0, time.Local), }, { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + "request-id-3", time.Date(2022, 1, 3, 0, 0, 0, 0, time.Local), }, } { + status := data.Status + status.RequestId = data.RequestId testWriteStatus(t, db, cfg, data.Status, data.Timestamp) } @@ -176,7 +189,8 @@ func testReadLatestStatus(t *testing.T, db *Database) { cfg := &config.Config{ ConfigPath: "test_config_status_reader.yaml", } - dw, _, err := db.NewWriter(cfg.ConfigPath, time.Now()) + requestId := "request-id-1" + dw, _, err := db.NewWriter(cfg.ConfigPath, time.Now(), requestId) require.NoError(t, err) err = dw.Open() require.NoError(t, err) @@ -206,21 +220,27 @@ func testReadStatusN(t *testing.T, db *Database) { for _, data := range []struct { Status *models.Status + RequestId string Timestamp time.Time }{ { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + "request-id-1", time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local), }, { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + "request-id-2", time.Date(2022, 1, 2, 0, 0, 0, 0, time.Local), }, { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), + "request-id-3", time.Date(2022, 1, 3, 0, 0, 0, 0, time.Local), }, } { + status := data.Status + status.RequestId = data.RequestId testWriteStatus(t, db, cfg, data.Status, data.Timestamp) } @@ -238,8 +258,9 @@ func testCompactFile(t *testing.T, db *Database) { Name: "test_compact_file", ConfigPath: "test_compact_file.yaml", } + requestId := "request-id-1" - dw, _, err := db.NewWriter(cfg.ConfigPath, time.Now()) + dw, _, err := db.NewWriter(cfg.ConfigPath, time.Now(), requestId) require.NoError(t, err) require.NoError(t, dw.Open()) @@ -286,7 +307,7 @@ func testErrorReadFile(t *testing.T, db *Database) { _, err := ParseFile("invalid_file.dat") require.Error(t, err) - _, _, err = db.NewWriter("", time.Now()) + _, _, err = db.NewWriter("", time.Now(), "") require.Error(t, err) _, err = db.ReadStatusToday("invalid_file.yaml") @@ -324,7 +345,7 @@ func testErrorParseFile(t *testing.T, db *Database) { func testWriteStatus(t *testing.T, db *Database, cfg *config.Config, status *models.Status, tm time.Time) { t.Helper() - dw, _, err := db.NewWriter(cfg.ConfigPath, tm) + dw, _, err := db.NewWriter(cfg.ConfigPath, tm, status.RequestId) require.NoError(t, err) require.NoError(t, dw.Open()) defer dw.Close() diff --git a/internal/database/writer_test.go b/internal/database/writer_test.go index cd92533c6..ebbbc88d7 100644 --- a/internal/database/writer_test.go +++ b/internal/database/writer_test.go @@ -18,7 +18,7 @@ func testWriteStatusToFile(t *testing.T, db *Database) { Name: "test_write_status", ConfigPath: "test_write_status.yaml", } - dw, file, err := db.NewWriter(cfg.ConfigPath, time.Now()) + dw, file, err := db.NewWriter(cfg.ConfigPath, time.Now(), "request-id-1") require.NoError(t, err) require.NoError(t, dw.Open()) defer func() { @@ -45,7 +45,7 @@ func testWriteStatusToExistingFile(t *testing.T, db *Database) { Name: "test_append_to_existing", ConfigPath: "test_append_to_existing.yaml", } - dw, file, err := db.NewWriter(cfg.ConfigPath, time.Now()) + dw, file, err := db.NewWriter(cfg.ConfigPath, time.Now(), "request-id-1") require.NoError(t, err) require.NoError(t, dw.Open()) diff --git a/internal/scheduler/node.go b/internal/scheduler/node.go index b0daf24e3..cd7f82844 100644 --- a/internal/scheduler/node.go +++ b/internal/scheduler/node.go @@ -124,11 +124,12 @@ func (n *Node) cancel() { } } -func (n *Node) setupLog(logDir string) { +func (n *Node) setupLog(logDir string, requestId string) { n.StartedAt = time.Now() - n.Log = filepath.Join(logDir, fmt.Sprintf("%s.%s.log", + n.Log = filepath.Join(logDir, fmt.Sprintf("%s.%s.%s.log", utils.ValidFilename(n.Name, "_"), - n.StartedAt.Format("20060102.15:04:05"), + requestId, + n.StartedAt.Format("20060102.15:04:05.000"), )) } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index f90996777..239dce433 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -58,6 +58,7 @@ type Config struct { OnSuccess *config.Step OnFailure *config.Step OnCancel *config.Step + RequestId string } func New(config *Config) *Scheduler { @@ -117,7 +118,7 @@ func (sc *Scheduler) Schedule(g *ExecutionGraph, done chan *Node) error { }() if !sc.Dry { - node.setupLog(sc.LogDir) + node.setupLog(sc.LogDir, sc.RequestId) node.openLogFile() defer node.closeLogFile() } @@ -203,7 +204,7 @@ func (sc *Scheduler) runHandlerNode(node *Node) error { node.updateStatus(NodeStatus_Running) if !sc.Dry { - node.setupLog(sc.LogDir) + node.setupLog(sc.LogDir, sc.RequestId) node.openLogFile() defer node.closeLogFile() err := node.Execute()