Skip to content

Commit

Permalink
Implement Flag Store (#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
yottahmd authored Sep 10, 2023
1 parent 098efa9 commit 94f9765
Show file tree
Hide file tree
Showing 34 changed files with 475 additions and 418 deletions.
26 changes: 20 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,27 @@ There are many existing tools such as Airflow, but many of these require you to
Dagu is a single command line tool that uses the local file system to store data, so no database management system or cloud service is required. DAGs are defined in a declarative YAML format, and existing programs can be used without modification.
## **Roadmap**
### 1. Refactoring and Scalability Improvements:
- Move to a worker architecture to support distributed processing.
- Integrate with popular cloud providers for seamless scaling and deployment of Dagu workers.
- Develop a centralized dashboard for users to manage, monitor, and scale multiple Dagu workers.
- Refactor to worker architecture
- Database support
- Allow saving DAGs in database
- Allow embedding in other applications
- Allow custom executors
- Allow calling DAGs from other DAGs
### 2. Database and Persistence Enhancements:
- Introduce sql database support for improved data persistence and backup.
- Allow option to integrate with cloud object storage for logging.
### 3. Integration and Extensibility:
- Allow Dagu to be easily embedded within other applications.
### 4. Advanced Executor Features:
- Introduce custom executor ability, allowing users to define their own execution logic.
- Allow DAGs to trigger or be triggered by other DAGs, facilitating modular and reusable pipelines.
### 5. Security and Compliance:
- Implement role-based access control for finer-grained permissions.
### 6. Community and Ecosystem Development:
- Build a SaaS application for users to easily deploy and manage multiple Dagu workers in the cloud.
## **Contributors**
<a href="https://github.com/dagu-dev/dagu/graphs/contributors">
Expand Down
1 change: 0 additions & 1 deletion app/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func ConfigProvider() *config.Config {
if err := config.LoadConfig(home); err != nil {
panic(err)
}
// TODO: fixme
cfgInstance = config.Get()
return cfgInstance
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func testStatusEventual(t *testing.T, e engine.Engine, dagFile string, expected
require.NoError(t, err)

require.Eventually(t, func() bool {
status, err := e.GetStatus(d)
status, err := e.GetCurrentStatus(d)
require.NoError(t, err)
return expected == status.Status
}, time.Millisecond*5000, time.Millisecond*50)
Expand All @@ -116,7 +116,7 @@ func testLastStatusEventual(t *testing.T, hs persistence.HistoryStore, dag strin
t.Helper()
require.Eventually(t, func() bool {
// TODO: do not use history store directly.
status := hs.ReadStatusHist(dag, 1)
status := hs.ReadStatusRecent(dag, 1)
if len(status) < 1 {
return false
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func restartCmd() *cobra.Command {
}

func stopDAGIfRunning(e engine.Engine, dag *dag.DAG) {
st, err := e.GetStatus(dag)
st, err := e.GetCurrentStatus(dag)
checkError(err)

// Stop the DAG if it is running.
Expand All @@ -56,7 +56,7 @@ func stopDAGIfRunning(e engine.Engine, dag *dag.DAG) {

func stopRunningDAG(e engine.Engine, dag *dag.DAG) error {
for {
st, err := e.GetStatus(dag)
st, err := e.GetCurrentStatus(dag)
checkError(err)

if st.Status != scheduler.SchedulerStatus_Running {
Expand All @@ -75,7 +75,7 @@ func waitForRestart(restartWait time.Duration) {
}

func getPreviousExecutionParams(e engine.Engine, dag *dag.DAG) string {
st, err := e.GetLastStatus(dag)
st, err := e.GetLatestStatus(dag)
checkError(err)

return st.Params
Expand Down
2 changes: 1 addition & 1 deletion cmd/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestRestartCommand(t *testing.T) {
df := client.NewDataStoreFactory(config.Get())
e = engine.NewFactory(df, nil).Create()

sts := e.GetRecentStatuses(d, 2)
sts := e.GetRecentHistory(d, 2)
require.Len(t, sts, 2)
require.Equal(t, sts[0].Status.Params, sts[1].Status.Params)

Expand Down
2 changes: 1 addition & 1 deletion cmd/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestRetryCommand(t *testing.T) {
testRunCommand(t, startCmd(), cmdTest{args: []string{"start", `--params="foo"`, dagFile}})

// Find the request ID.
s, err := e.ReadStatus(dagFile, false)
s, err := e.GetStatus(dagFile)
require.NoError(t, err)
require.Equal(t, s.Status.Status, scheduler.SchedulerStatus_Success)
require.NotNil(t, s.Status)
Expand Down
2 changes: 1 addition & 1 deletion cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func createStatusCommand() *cobra.Command {
df := client.NewDataStoreFactory(config.Get())
e := engine.NewFactory(df, config.Get()).Create()

status, err := e.GetStatus(loadedDAG)
status, err := e.GetCurrentStatus(loadedDAG)
checkError(err)

res := &model.StatusResponse{Status: status}
Expand Down
2 changes: 1 addition & 1 deletion internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (a *Agent) dryRun() error {
}

func (a *Agent) checkIsRunning() error {
status, err := a.engine.GetStatus(a.DAG)
status, err := a.engine.GetCurrentStatus(a.DAG)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestRunDAG(t *testing.T) {
d := testLoadDAG(t, "run.yaml")
a := agent.New(&agent.Config{DAG: d}, e, df)

status, _ := e.GetLastStatus(d)
status, _ := e.GetLatestStatus(d)
require.Equal(t, scheduler.SchedulerStatus_None, status.Status)

go func() {
Expand All @@ -60,7 +60,7 @@ func TestRunDAG(t *testing.T) {
time.Sleep(100 * time.Millisecond)

require.Eventually(t, func() bool {
status, err := e.GetLastStatus(d)
status, err := e.GetLatestStatus(d)
require.NoError(t, err)
return status.Status == scheduler.SchedulerStatus_Success
}, time.Second*2, time.Millisecond*100)
Expand All @@ -70,7 +70,7 @@ func TestRunDAG(t *testing.T) {
a = agent.New(&agent.Config{DAG: d}, e, df)
err := a.Run(context.Background())
require.NoError(t, err)
statusList := e.GetRecentStatuses(d, 100)
statusList := e.GetRecentHistory(d, 100)
require.Equal(t, 1, len(statusList))
}

Expand Down Expand Up @@ -136,7 +136,7 @@ func TestCancelDAG(t *testing.T) {
time.Sleep(time.Millisecond * 100)
abort(a)
time.Sleep(time.Millisecond * 500)
status, err := e.GetLastStatus(d)
status, err := e.GetLatestStatus(d)
require.NoError(t, err)
require.Equal(t, scheduler.SchedulerStatus_Cancel, status.Status)
}
Expand Down
29 changes: 13 additions & 16 deletions internal/dag/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dag

import (
"bytes"
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -16,30 +15,28 @@ import (
"gopkg.in/yaml.v2"
)

var ErrDAGNotFound = errors.New("DAG was not found")

// Loader is a config loader.
type Loader struct {
BaseConfig string
}

// Load loads config from file.
func (cl *Loader) Load(f, params string) (*DAG, error) {
return cl.loadDAGWithOptions(f, params, false, false, false)
return cl.loadWithOptions(f, params, false, false, false)
}

// LoadwIithoutEval loads config from file without evaluating env variables.
// LoadWithoutEval loads config from file without evaluating env variables.
func (cl *Loader) LoadWithoutEval(f string) (*DAG, error) {
return cl.loadDAGWithOptions(f, "", false, true, true)
return cl.loadWithOptions(f, "", false, true, true)
}

// LoadMetadataOnly loads config from file and returns only the headline data.
func (cl *Loader) LoadMetadataOnly(f string) (*DAG, error) {
return cl.loadDAGWithOptions(f, "", true, true, true)
// LoadMetadata loads config from file and returns only the headline data.
func (cl *Loader) LoadMetadata(f string) (*DAG, error) {
return cl.loadWithOptions(f, "", true, true, true)
}

// loadDAGWithOptions loads the config file with the provided options.
func (cl *Loader) loadDAGWithOptions(f, params string, loadMetadataOnly, skipEnvEval, skipEnvSetup bool) (*DAG, error) {
// loadWithOptions loads the config file with the provided options.
func (cl *Loader) loadWithOptions(f, params string, loadMetadataOnly, skipEnvEval, skipEnvSetup bool) (*DAG, error) {
return cl.loadDAG(f,
&BuildDAGOptions{
parameters: params,
Expand Down Expand Up @@ -162,11 +159,11 @@ func (cl *Loader) loadBaseConfigIfRequired(file string, opts *BuildDAGOptions) (
return &DAG{Name: strings.TrimSuffix(filepath.Base(file), filepath.Ext(file))}, nil
}

type mergeTranformer struct{}
type mergeTransformer struct{}

var _ mergo.Transformers = (*mergeTranformer)(nil)
var _ mergo.Transformers = (*mergeTransformer)(nil)

func (mt *mergeTranformer) Transformer(typ reflect.Type) func(dst, src reflect.Value) error {
func (mt *mergeTransformer) Transformer(typ reflect.Type) func(dst, src reflect.Value) error {
if typ == reflect.TypeOf(MailOn{}) {
return func(dst, src reflect.Value) error {
if dst.CanSet() {
Expand Down Expand Up @@ -199,7 +196,7 @@ func (fl *fileLoader) readFile(file string) (config map[string]interface{}, err
return fl.unmarshalData(data)
}

// unmarshalData unmarshals the data from a byte slice into a map.
// unmarshalData unmarshals the data into a map.
func (fl *fileLoader) unmarshalData(data []byte) (map[string]interface{}, error) {
var cm map[string]interface{}
err := yaml.NewDecoder(bytes.NewReader(data)).Decode(&cm)
Expand All @@ -224,6 +221,6 @@ func (cdl *configDefinitionLoader) decode(cm map[string]interface{}) (*configDef
// merge merges the source DAG into the destination DAG.
func (cdl *configDefinitionLoader) merge(dst, src *DAG) error {
err := mergo.Merge(dst, src, mergo.WithOverride,
mergo.WithTransformers(&mergeTranformer{}))
mergo.WithTransformers(&mergeTransformer{}))
return err
}
2 changes: 1 addition & 1 deletion internal/dag/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestLoaingErrors(t *testing.T) {
func TestLoadingHeadlineOnly(t *testing.T) {
l := &Loader{}

d, err := l.LoadMetadataOnly(path.Join(testdataDir, "default.yaml"))
d, err := l.LoadMetadata(path.Join(testdataDir, "default.yaml"))
require.NoError(t, err)

require.Equal(t, d.Name, "default")
Expand Down
Loading

0 comments on commit 94f9765

Please sign in to comment.