Skip to content

Commit

Permalink
Merge pull request #13 from yohamta/issue-12
Browse files Browse the repository at this point in the history
Data compaction on job finish
  • Loading branch information
yottahmd authored Apr 26, 2022
2 parents 598163d + 72077c0 commit 6bdb5a9
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 26 deletions.
14 changes: 13 additions & 1 deletion cmd/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package main
import (
"fmt"
"testing"
"time"

"github.com/yohamta/jobctl/internal/controller"
"github.com/yohamta/jobctl/internal/database"
"github.com/yohamta/jobctl/internal/scheduler"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -37,14 +39,24 @@ func Test_retryCommand(t *testing.T) {
err = dw.Write(status.Status)
require.NoError(t, err)

time.Sleep(time.Second)

app = makeApp()
runAppTestOutput(app, appTest{
args: []string{"", "retry", fmt.Sprintf("--req=%s",
job.Status.RequestId), testConfig("cmd_retry.yaml")}, errored: false,
output: []string{"parameter is x"},
}, t)

assert.Eventually(t, func() bool {
job, err = controller.FromConfig(testConfig("cmd_retry.yaml"))
if err != nil {
return false
}
return job.Status.Status == scheduler.SchedulerStatus_Success
}, time.Millisecond*3000, time.Millisecond*100)

job, err = controller.FromConfig(testConfig("cmd_retry.yaml"))
require.NoError(t, err)
require.Equal(t, job.Status.Status, scheduler.SchedulerStatus_Success)
require.NotEqual(t, status.Status.RequestId, job.Status.RequestId)
}
27 changes: 19 additions & 8 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Agent struct {
logFilename string
reporter *reporter.Reporter
database *database.Database
dbFile string
dbWriter *database.Writer
socketServer *sock.Server
requestId string
Expand Down Expand Up @@ -203,7 +204,7 @@ func (a *Agent) setupRequestId() error {

func (a *Agent) setupDatabase() (err error) {
a.database = database.New(database.DefaultConfig())
a.dbWriter, _, err = a.database.NewWriter(a.Job.ConfigPath, time.Now())
a.dbWriter, a.dbFile, err = a.database.NewWriter(a.Job.ConfigPath, time.Now())
return
}

Expand Down Expand Up @@ -242,7 +243,12 @@ func (a *Agent) run() error {
if err != nil {
return err
}
defer a.dbWriter.Close()

defer func() {
if err := a.dbWriter.Close(); err != nil {
log.Printf("failed to close db writer. err: %v", err)
}
}()

a.dbWriter.Write(a.Status())

Expand All @@ -253,19 +259,18 @@ func (a *Agent) run() error {
log.Printf("failed to start socket server %v", err)
}
}()

defer func() {
a.socketServer.Shutdown()
}()

select {
case err := <-listen:
if err != nil {
return fmt.Errorf("failed to start the socket server.")
}
if err := <-listen; err != nil {
return fmt.Errorf("failed to start the socket server")
}

done := make(chan *scheduler.Node)
defer close(done)

go func() {
for node := range done {
status := a.Status()
Expand All @@ -287,6 +292,12 @@ func (a *Agent) run() error {
log.Printf("failed to send mail. %s", err)
}

if err := a.dbWriter.Close(); err != nil {
log.Printf("failed to close db writer. err: %v", err)
} else if err := a.database.Compact(a.Job.ConfigPath, a.dbFile); err != nil {
log.Printf("failed to compact data. %s", err)
}

return lastErr
}

Expand Down Expand Up @@ -317,7 +328,7 @@ func (a *Agent) checkIsRunning() error {
return err
}
if status.Status != scheduler.SchedulerStatus_None {
return fmt.Errorf("The job is already running. socket=%s",
return fmt.Errorf("the job is already running. socket=%s",
sock.GetSockAddr(a.Job.ConfigPath))
}
return nil
Expand Down
1 change: 0 additions & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (c *controller) StartJob(bin string, workDir string, params string) (err er
}

func (c *controller) RetryJob(bin string, workDir string, reqId string) (err error) {
log.Printf("retry start: %s, %s, %s, %s", bin, workDir, c.cfg.ConfigPath, reqId)
go func() {
args := []string{"retry"}
args = append(args, fmt.Sprintf("--req=%s", reqId))
Expand Down
51 changes: 42 additions & 9 deletions internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/yohamta/jobctl/internal/models"
"github.com/yohamta/jobctl/internal/settings"
"github.com/yohamta/jobctl/internal/utils"
)

type Database struct {
Expand Down Expand Up @@ -71,9 +70,6 @@ func (db *Database) NewWriter(configPath string, t time.Time) (*Writer, string,
}

func (db *Database) NewWriterFor(configPath string, file string) (*Writer, error) {
if !utils.FileExists(file) {
return nil, ErrNoDataFile
}
w := &Writer{
filename: file,
}
Expand Down Expand Up @@ -176,6 +172,39 @@ func (db *Database) RemoveOld(configPath string, retentionDays int) error {
return err
}

func (db *Database) Compact(configPath, original string) error {
status, err := ParseFile(original)
if err != nil {
return err
}

new := fmt.Sprintf("%s_c.dat",
strings.TrimSuffix(filepath.Base(original), path.Ext(original)))
f := path.Join(filepath.Dir(original), new)
w, err := db.NewWriterFor(configPath, f)
if err != nil {
return err
}

if err := w.Open(); err != nil {
return err
}
defer w.Close()

if err := w.Write(status.Status); err != nil {
if err := os.Remove(f); err != nil {
log.Printf("failed to remove %s : %s", f, err.Error())
}
return err
}

if err := os.Remove(original); err != nil {
return err
}

return nil
}

func (db *Database) dir(configPath string, prefix string) string {
h := md5.New()
h.Write([]byte(configPath))
Expand Down Expand Up @@ -221,19 +250,19 @@ func (db *Database) latest(configPath string, n int) ([]string, error) {
}

var (
ErrNoDataFile = fmt.Errorf("no data file found.")
ErrRequestIdNotFound = fmt.Errorf("request id not found.")
ErrNoDataFile = fmt.Errorf("no data file found")
ErrRequestIdNotFound = fmt.Errorf("request id not found")
)

var rTimestamp = regexp.MustCompile("2\\d{7}.\\d{2}.\\d{2}.\\d{2}")
var rTimestamp = regexp.MustCompile(`2\d{7}.\d{2}:\d{2}:\d{2}`)

func filterLatest(files []string, n int) ([]string, error) {
if len(files) == 0 {
return []string{}, ErrNoDataFile
}
sort.Slice(files, func(i, j int) bool {
t1 := rTimestamp.FindString(files[i])
t2 := rTimestamp.FindString(files[j])
t1 := timestamp(files[i])
t2 := timestamp(files[j])
return t1 > t2
})
ret := make([]string, 0, n)
Expand All @@ -243,6 +272,10 @@ func filterLatest(files []string, n int) ([]string, error) {
return ret, nil
}

func timestamp(file string) string {
return rTimestamp.FindString(file)
}

func findLastLine(f *os.File) (ret string, err error) {
// seek to -2 position to the end of the file
offset, err := f.Seek(-2, 2)
Expand Down
73 changes: 70 additions & 3 deletions internal/database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestDatabase(t *testing.T) {
"remove old files": testRemoveOldFiles,
"test read latest status": testReadLatestStatus,
"test read latest n status": testReadStatusN,
"test compaction": testCompactFile,
} {
t.Run(scenario, func(t *testing.T) {
dir, err := ioutil.TempDir("", "test-database")
Expand Down Expand Up @@ -100,17 +101,17 @@ func testWriteAndFindByRequestId(t *testing.T, db *Database) {
}{
{
models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil),
fmt.Sprintf("request-id-1"),
"request-id-1",
time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local),
},
{
models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil),
fmt.Sprintf("request-id-2"),
"request-id-2",
time.Date(2022, 1, 2, 0, 0, 0, 0, time.Local),
},
{
models.NewStatus(cfg, nil, scheduler.SchedulerStatus_None, 10000, nil, nil),
fmt.Sprintf("request-id-3"),
"request-id-3",
time.Date(2022, 1, 3, 0, 0, 0, 0, time.Local),
},
} {
Expand Down Expand Up @@ -225,6 +226,60 @@ func testReadStatusN(t *testing.T, db *Database) {
assert.Equal(t, cfg.Name, ret[1].Status.Name)
}

func testCompactFile(t *testing.T, db *Database) {
cfg := &config.Config{
Name: "test_compact_file",
ConfigPath: "test_compact_file.yaml",
}

dw, _, err := db.NewWriter(cfg.ConfigPath, time.Now())
require.NoError(t, err)
require.NoError(t, dw.Open())

for _, data := range []struct {
Status *models.Status
}{
{models.NewStatus(
cfg, nil, scheduler.SchedulerStatus_Running, 10000, nil, nil)},
{models.NewStatus(
cfg, nil, scheduler.SchedulerStatus_Cancel, 10000, nil, nil)},
{models.NewStatus(
cfg, nil, scheduler.SchedulerStatus_Success, 10000, nil, nil)},
} {
require.NoError(t, dw.Write(data.Status))
}

dw.Close()

var s *models.StatusFile = nil
if h, err := db.ReadStatusHist(cfg.ConfigPath, 1); len(h) > 0 || err != nil {
if err != nil {
t.Error(err)
} else {
s = h[0]
}
}
require.NotNil(t, s)

db2 := New(db.Config)
err = db2.Compact(cfg.ConfigPath, s.File)
assert.False(t, utils.FileExists(s.File))
require.NoError(t, err)

var s2 *models.StatusFile = nil
if h, err := db2.ReadStatusHist(cfg.ConfigPath, 1); len(h) > 0 || err != nil {
if err != nil {
t.Error(err)
} else {
s2 = h[0]
}
}
require.NotNil(t, s2)

assert.Regexp(t, `test_compact_file.*_c.dat`, s2.File)
assert.Equal(t, s.Status, s2.Status)
}

func testWriteStatus(t *testing.T, db *Database, cfg *config.Config, status *models.Status, tm time.Time) {
t.Helper()
dw, _, err := db.NewWriter(cfg.ConfigPath, tm)
Expand All @@ -233,3 +288,15 @@ func testWriteStatus(t *testing.T, db *Database, cfg *config.Config, status *mod
defer dw.Close()
require.NoError(t, dw.Write(status))
}

func TestTimestamp(t *testing.T) {
for _, tt := range []struct {
Name string
Want string
}{
{Name: "test_timestamp.20200101.10:00:00.dat", Want: "20200101.10:00:00"},
{Name: "test_timestamp.20200101.12:34:56_c.dat", Want: "20200101.12:34:56"},
} {
assert.Equal(t, tt.Want, timestamp(tt.Name))
}
}
15 changes: 13 additions & 2 deletions internal/database/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ type Writer struct {
writer *bufio.Writer
file *os.File
mu sync.Mutex
closed bool
}

func (w *Writer) Open() error {
if w.closed {
return fmt.Errorf("file was already closed")
}
var err error
w.file, err = utils.OpenOrCreateFile(w.filename)
if err != nil {
Expand All @@ -44,6 +48,13 @@ func (w *Writer) Write(st *models.Status) error {
return w.writer.Flush()
}

func (w *Writer) Close() {
w.file.Close()
func (w *Writer) Close() error {
if !w.closed {
if err := w.writer.Flush(); err != nil {
return err
}
w.file.Close()
w.closed = true
}
return nil
}
5 changes: 3 additions & 2 deletions internal/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (rp *Reporter) ReportSummary(status *models.Status, err error) {
buf.Write([]byte("\n"))
buf.Write([]byte("Details ->\n"))
buf.Write([]byte(renderTable(status.Nodes)))
log.Printf(buf.String())
log.Print(buf.String())
}

func (rp *Reporter) ReportMail(cfg *config.Config, status *models.Status) error {
Expand Down Expand Up @@ -93,8 +93,9 @@ func renderSummary(status *models.Status, err error) string {
if err != nil {
errText = err.Error()
}
t.AppendHeader(table.Row{"Name", "Started At", "Finished At", "Status", "Params", "Error"})
t.AppendHeader(table.Row{"RequestID", "Name", "Started At", "Finished At", "Status", "Params", "Error"})
t.AppendRow(table.Row{
status.RequestId,
status.Name,
status.StartedAt,
status.FinishedAt,
Expand Down

0 comments on commit 6bdb5a9

Please sign in to comment.