diff --git a/cmd/retry_test.go b/cmd/retry_test.go index 587acf0cf..a862dc44f 100644 --- a/cmd/retry_test.go +++ b/cmd/retry_test.go @@ -3,11 +3,13 @@ package main import ( "fmt" "testing" + "time" "github.com/yohamta/jobctl/internal/controller" "github.com/yohamta/jobctl/internal/database" "github.com/yohamta/jobctl/internal/scheduler" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -37,6 +39,8 @@ func Test_retryCommand(t *testing.T) { err = dw.Write(status.Status) require.NoError(t, err) + time.Sleep(time.Second) + app = makeApp() runAppTestOutput(app, appTest{ args: []string{"", "retry", fmt.Sprintf("--req=%s", @@ -44,7 +48,15 @@ func Test_retryCommand(t *testing.T) { output: []string{"parameter is x"}, }, t) + assert.Eventually(t, func() bool { + job, err = controller.FromConfig(testConfig("cmd_retry.yaml")) + if err != nil { + return false + } + return job.Status.Status == scheduler.SchedulerStatus_Success + }, time.Millisecond*3000, time.Millisecond*100) + job, err = controller.FromConfig(testConfig("cmd_retry.yaml")) require.NoError(t, err) - require.Equal(t, job.Status.Status, scheduler.SchedulerStatus_Success) + require.NotEqual(t, status.Status.RequestId, job.Status.RequestId) } diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 23e72bac1..fb20257bc 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -33,6 +33,7 @@ type Agent struct { logFilename string reporter *reporter.Reporter database *database.Database + dbFile string dbWriter *database.Writer socketServer *sock.Server requestId string @@ -203,7 +204,7 @@ func (a *Agent) setupRequestId() error { func (a *Agent) setupDatabase() (err error) { a.database = database.New(database.DefaultConfig()) - a.dbWriter, _, err = a.database.NewWriter(a.Job.ConfigPath, time.Now()) + a.dbWriter, a.dbFile, err = a.database.NewWriter(a.Job.ConfigPath, time.Now()) return } @@ -242,7 +243,12 @@ func (a *Agent) run() error { if err != nil { return err } - defer a.dbWriter.Close() + + defer func() { + if err := a.dbWriter.Close(); err != nil { + log.Printf("failed to close db writer. err: %v", err) + } + }() a.dbWriter.Write(a.Status()) @@ -253,19 +259,18 @@ func (a *Agent) run() error { log.Printf("failed to start socket server %v", err) } }() + defer func() { a.socketServer.Shutdown() }() - select { - case err := <-listen: - if err != nil { - return fmt.Errorf("failed to start the socket server.") - } + if err := <-listen; err != nil { + return fmt.Errorf("failed to start the socket server") } done := make(chan *scheduler.Node) defer close(done) + go func() { for node := range done { status := a.Status() @@ -287,6 +292,12 @@ func (a *Agent) run() error { log.Printf("failed to send mail. %s", err) } + if err := a.dbWriter.Close(); err != nil { + log.Printf("failed to close db writer. err: %v", err) + } else if err := a.database.Compact(a.Job.ConfigPath, a.dbFile); err != nil { + log.Printf("failed to compact data. %s", err) + } + return lastErr } @@ -317,7 +328,7 @@ func (a *Agent) checkIsRunning() error { return err } if status.Status != scheduler.SchedulerStatus_None { - return fmt.Errorf("The job is already running. socket=%s", + return fmt.Errorf("the job is already running. socket=%s", sock.GetSockAddr(a.Job.ConfigPath)) } return nil diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 3b77aa237..25d9aa2be 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -96,7 +96,6 @@ func (c *controller) StartJob(bin string, workDir string, params string) (err er } func (c *controller) RetryJob(bin string, workDir string, reqId string) (err error) { - log.Printf("retry start: %s, %s, %s, %s", bin, workDir, c.cfg.ConfigPath, reqId) go func() { args := []string{"retry"} args = append(args, fmt.Sprintf("--req=%s", reqId)) diff --git a/internal/database/database.go b/internal/database/database.go index ce1616d6f..d0ee72f87 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -16,7 +16,6 @@ import ( "github.com/yohamta/jobctl/internal/models" "github.com/yohamta/jobctl/internal/settings" - "github.com/yohamta/jobctl/internal/utils" ) type Database struct { @@ -71,9 +70,6 @@ func (db *Database) NewWriter(configPath string, t time.Time) (*Writer, string, } func (db *Database) NewWriterFor(configPath string, file string) (*Writer, error) { - if !utils.FileExists(file) { - return nil, ErrNoDataFile - } w := &Writer{ filename: file, } @@ -176,6 +172,39 @@ func (db *Database) RemoveOld(configPath string, retentionDays int) error { return err } +func (db *Database) Compact(configPath, original string) error { + status, err := ParseFile(original) + if err != nil { + return err + } + + new := fmt.Sprintf("%s_c.dat", + strings.TrimSuffix(filepath.Base(original), path.Ext(original))) + f := path.Join(filepath.Dir(original), new) + w, err := db.NewWriterFor(configPath, f) + if err != nil { + return err + } + + if err := w.Open(); err != nil { + return err + } + defer w.Close() + + if err := w.Write(status.Status); err != nil { + if err := os.Remove(f); err != nil { + log.Printf("failed to remove %s : %s", f, err.Error()) + } + return err + } + + if err := os.Remove(original); err != nil { + return err + } + + return nil +} + func (db *Database) dir(configPath string, prefix string) string { h := md5.New() h.Write([]byte(configPath)) @@ -221,19 +250,19 @@ func (db *Database) latest(configPath string, n int) ([]string, error) { } var ( - ErrNoDataFile = fmt.Errorf("no data file found.") - ErrRequestIdNotFound = fmt.Errorf("request id not found.") + ErrNoDataFile = fmt.Errorf("no data file found") + ErrRequestIdNotFound = fmt.Errorf("request id not found") ) -var rTimestamp = regexp.MustCompile("2\\d{7}.\\d{2}.\\d{2}.\\d{2}") +var rTimestamp = regexp.MustCompile(`2\d{7}.\d{2}:\d{2}:\d{2}`) func filterLatest(files []string, n int) ([]string, error) { if len(files) == 0 { return []string{}, ErrNoDataFile } sort.Slice(files, func(i, j int) bool { - t1 := rTimestamp.FindString(files[i]) - t2 := rTimestamp.FindString(files[j]) + t1 := timestamp(files[i]) + t2 := timestamp(files[j]) return t1 > t2 }) ret := make([]string, 0, n) @@ -243,6 +272,10 @@ func filterLatest(files []string, n int) ([]string, error) { return ret, nil } +func timestamp(file string) string { + return rTimestamp.FindString(file) +} + func findLastLine(f *os.File) (ret string, err error) { // seek to -2 position to the end of the file offset, err := f.Seek(-2, 2) diff --git a/internal/database/database_test.go b/internal/database/database_test.go index b05b7fe1d..a66dd7b3f 100644 --- a/internal/database/database_test.go +++ b/internal/database/database_test.go @@ -29,6 +29,7 @@ func TestDatabase(t *testing.T) { "remove old files": testRemoveOldFiles, "test read latest status": testReadLatestStatus, "test read latest n status": testReadStatusN, + "test compaction": testCompactFile, } { t.Run(scenario, func(t *testing.T) { dir, err := ioutil.TempDir("", "test-database") @@ -100,17 +101,17 @@ func testWriteAndFindByRequestId(t *testing.T, db *Database) { }{ { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), - fmt.Sprintf("request-id-1"), + "request-id-1", time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local), }, { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), - fmt.Sprintf("request-id-2"), + "request-id-2", time.Date(2022, 1, 2, 0, 0, 0, 0, time.Local), }, { models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil), - fmt.Sprintf("request-id-3"), + "request-id-3", time.Date(2022, 1, 3, 0, 0, 0, 0, time.Local), }, } { @@ -225,6 +226,60 @@ func testReadStatusN(t *testing.T, db *Database) { assert.Equal(t, cfg.Name, ret[1].Status.Name) } +func testCompactFile(t *testing.T, db *Database) { + cfg := &config.Config{ + Name: "test_compact_file", + ConfigPath: "test_compact_file.yaml", + } + + dw, _, err := db.NewWriter(cfg.ConfigPath, time.Now()) + require.NoError(t, err) + require.NoError(t, dw.Open()) + + for _, data := range []struct { + Status *models.Status + }{ + {models.NewStatus( + cfg, nil, scheduler.SchedulerStatus_Running, 10000, nil, nil)}, + {models.NewStatus( + cfg, nil, scheduler.SchedulerStatus_Cancel, 10000, nil, nil)}, + {models.NewStatus( + cfg, nil, scheduler.SchedulerStatus_Success, 10000, nil, nil)}, + } { + require.NoError(t, dw.Write(data.Status)) + } + + dw.Close() + + var s *models.StatusFile = nil + if h, err := db.ReadStatusHist(cfg.ConfigPath, 1); len(h) > 0 || err != nil { + if err != nil { + t.Error(err) + } else { + s = h[0] + } + } + require.NotNil(t, s) + + db2 := New(db.Config) + err = db2.Compact(cfg.ConfigPath, s.File) + assert.False(t, utils.FileExists(s.File)) + require.NoError(t, err) + + var s2 *models.StatusFile = nil + if h, err := db2.ReadStatusHist(cfg.ConfigPath, 1); len(h) > 0 || err != nil { + if err != nil { + t.Error(err) + } else { + s2 = h[0] + } + } + require.NotNil(t, s2) + + assert.Regexp(t, `test_compact_file.*_c.dat`, s2.File) + assert.Equal(t, s.Status, s2.Status) +} + func testWriteStatus(t *testing.T, db *Database, cfg *config.Config, status *models.Status, tm time.Time) { t.Helper() dw, _, err := db.NewWriter(cfg.ConfigPath, tm) @@ -233,3 +288,15 @@ func testWriteStatus(t *testing.T, db *Database, cfg *config.Config, status *mod defer dw.Close() require.NoError(t, dw.Write(status)) } + +func TestTimestamp(t *testing.T) { + for _, tt := range []struct { + Name string + Want string + }{ + {Name: "test_timestamp.20200101.10:00:00.dat", Want: "20200101.10:00:00"}, + {Name: "test_timestamp.20200101.12:34:56_c.dat", Want: "20200101.12:34:56"}, + } { + assert.Equal(t, tt.Want, timestamp(tt.Name)) + } +} diff --git a/internal/database/writer.go b/internal/database/writer.go index 902a56608..8ed786d8a 100644 --- a/internal/database/writer.go +++ b/internal/database/writer.go @@ -16,9 +16,13 @@ type Writer struct { writer *bufio.Writer file *os.File mu sync.Mutex + closed bool } func (w *Writer) Open() error { + if w.closed { + return fmt.Errorf("file was already closed") + } var err error w.file, err = utils.OpenOrCreateFile(w.filename) if err != nil { @@ -44,6 +48,13 @@ func (w *Writer) Write(st *models.Status) error { return w.writer.Flush() } -func (w *Writer) Close() { - w.file.Close() +func (w *Writer) Close() error { + if !w.closed { + if err := w.writer.Flush(); err != nil { + return err + } + w.file.Close() + w.closed = true + } + return nil } diff --git a/internal/reporter/reporter.go b/internal/reporter/reporter.go index a7b335bce..a135e7891 100644 --- a/internal/reporter/reporter.go +++ b/internal/reporter/reporter.go @@ -46,7 +46,7 @@ func (rp *Reporter) ReportSummary(status *models.Status, err error) { buf.Write([]byte("\n")) buf.Write([]byte("Details ->\n")) buf.Write([]byte(renderTable(status.Nodes))) - log.Printf(buf.String()) + log.Print(buf.String()) } func (rp *Reporter) ReportMail(cfg *config.Config, status *models.Status) error { @@ -93,8 +93,9 @@ func renderSummary(status *models.Status, err error) string { if err != nil { errText = err.Error() } - t.AppendHeader(table.Row{"Name", "Started At", "Finished At", "Status", "Params", "Error"}) + t.AppendHeader(table.Row{"RequestID", "Name", "Started At", "Finished At", "Status", "Params", "Error"}) t.AppendRow(table.Row{ + status.RequestId, status.Name, status.StartedAt, status.FinishedAt,