Skip to content

Commit

Permalink
Admin interface #37 (#51)
Browse files Browse the repository at this point in the history
Implemented a new admin interface using bootstrap
  • Loading branch information
Oleg Sidorov authored May 16, 2019
1 parent 307675a commit 37cd0cb
Show file tree
Hide file tree
Showing 34 changed files with 1,299 additions and 166 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# [WIP] The Flow Framework

![logo](https://github.com/whiteboxio/flow/blob/master/flow.png)
![logo](https://github.com/whiteboxio/flow/blob/master/assets/flow.png)

[![Build Status](https://travis-ci.com/awesome-flow/flow.svg?branch=master)](https://travis-ci.com/awesome-flow/flow) [![Coverage Status](https://coveralls.io/repos/github/awesome-flow/flow/badge.svg?branch=master)](https://coveralls.io/github/awesome-flow/flow?branch=master)

Expand Down Expand Up @@ -83,8 +83,9 @@ link A to link B. In this case we say that A has an *outcoming connector*, an B
has an *incoming connector*.

Links come with the semantics of connectability: some of them can have outcoming
connectors only: we call them out-links, or *receivers* (this is where the data comes into the pipeline), and some can have
incoming connectors only: in-links, or *sinks* (where the data leaves the pipeline). A receiver is a link that
connectors only: we call them out-links, or *receivers* (this is where the data
comes into the pipeline), and some can have incoming connectors only: in-links,
or *sinks* (where the data leaves the pipeline). A receiver is a link that
receives internal messages: a network listener, pub-sub client etc. They ingest
messages into the pipeline. A sink has the opposite purpose: to send messages
somewhere else. This is where the lifecycle of the message ends. An example
Expand Down
File renamed without changes
88 changes: 50 additions & 38 deletions cmd/flowd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"github.com/awesome-flow/flow/pkg/cast"
"github.com/awesome-flow/flow/pkg/types"

"github.com/awesome-flow/flow/pkg/admin"
"github.com/awesome-flow/flow/pkg/cfg"
"github.com/awesome-flow/flow/pkg/global"
"github.com/awesome-flow/flow/pkg/metrics"
"github.com/awesome-flow/flow/pkg/pipeline"
webapp "github.com/awesome-flow/flow/web/app"
log "github.com/sirupsen/logrus"
)

Expand All @@ -21,122 +21,134 @@ const (
ProgramName = "flowd"
)

func errorf(format string, args ...interface{}) {
log.Errorf("⚠️ "+format, args...)
}

func infof(format string, args ...interface{}) {
log.Infof(format, args...)
}

func fatalf(format string, args ...interface{}) {
log.Fatalf("❌ "+format, args...)
}

func main() {

log.Infof("Starting %s", ProgramName)
infof("Starting %s", ProgramName)

log.Infof("Initializing config repo")
infof("Initializing config repo")
repo := cfg.NewRepository()
repo.DefineSchema(cast.ConfigSchema)
global.Store("config", repo)

log.Infof("Registering default provider")
infof("Registering default provider")
if _, err := cfg.NewDefaultProvider(repo, 0); err != nil {
log.Errorf("⚠️ Failed to register default provider: %s", err)
errorf("Failed to register default provider: %s", err)
}

log.Infof("Registering env provider")
infof("Registering env provider")
if _, err := cfg.NewEnvProvider(repo, 10); err != nil {
log.Errorf("⚠️ Failed to register env provider: %s", err)
errorf("Failed to register env provider: %s", err)
}

if _, err := cfg.NewYamlProvider(repo, 20); err != nil {
log.Errorf("⚠️ Failed to register yaml provider: %s", err)
errorf("Failed to register yaml provider: %s", err)
}

log.Infof("Registering cli provider")
infof("Registering cli provider")
if _, err := cfg.NewCliProvider(repo, 30); err != nil {
log.Errorf("⚠️ Failed to register cli provider: %s", err)
errorf("Failed to register cli provider: %s", err)
}

log.Infof("Initializing config providers")
infof("Initializing config providers")
if err := repo.SetUp(); err != nil {
log.Errorf("⚠️ Failed to initialise config repo: %s", err)
errorf("Failed to initialise config repo: %s", err)
}

log.Infof("Starting %s version %d, process ID: %d",
infof("Starting %s version %d, process ID: %d",
ProgramName, MajVersion, os.Getpid())

log.Infof("Initializing the pipeline")
infof("Initializing the pipeline")

syscfgval, ok := repo.Get(types.NewKey("system"))
if !ok {
log.Fatalf("❌ Failed to get system config")
fatalf("Failed to get system config")
}
syscfg := syscfgval.(types.CfgBlockSystem)

if err := metrics.Initialize(&syscfg); err != nil {
log.Errorf("⚠️ Failed to initialize metrics module: %s", err)
errorf("Failed to initialize metrics module: %s", err)
}

compsval, ok := repo.Get(types.NewKey("components"))
if !ok {
log.Fatalf("❌ Failed to get components config")
fatalf("Failed to get components config")
}
compscfg := compsval.(map[string]types.CfgBlockComponent)

pplval, ok := repo.Get(types.NewKey("pipeline"))
if !ok {
log.Fatalf("❌ Failed to get pipeline config")
fatalf("Failed to get pipeline config")
}
pplcfg := pplval.(map[string]types.CfgBlockPipeline)

pipeline, pplErr := pipeline.NewPipeline(compscfg, pplcfg)
if pplErr != nil {
log.Fatalf("❌ Failed to initialize the pipeline: %s", pplErr)
fatalf("Failed to initialize the pipeline: %s", pplErr)
}
global.Store("pipeline", pipeline)
log.Info("✅ Pipeline is successfully initialized")
infof("Pipeline is successfully initialized")

if explanation, err := pipeline.Explain(); err != nil {
log.Errorf("⚠️ Failed to explain the pipeline: %s", err.Error())
errorf("Failed to explain the pipeline: %s", err.Error())
} else {
log.Info("Pipeline GraphViz diagram (plot using https://www.planttext.com):")
infof("Pipeline GraphViz diagram (plot using https://www.planttext.com):")
fmt.Println(explanation)
}

log.Info("Activating the pipeline")
infof("Activating the pipeline")
startErr := pipeline.Start()
if startErr != nil {
log.Fatalf("❌ Failed to start the pipeline: %s", startErr)
fatalf("Failed to start the pipeline: %s", startErr)
}
log.Info("✅️ Pipeline is successfully activated")
infof("Pipeline is successfully activated")

var adminmux *admin.HttpMux
var adminmux *webapp.HttpMux
if syscfg.Admin.Enabled {
var err error
log.Infof("Starting admin interface on %s", syscfg.Admin.BindAddr)
adminmux, err = admin.NewHttpMux(&syscfg)
infof("Starting admin interface on %s", syscfg.Admin.BindAddr)
adminmux, err = webapp.NewHttpMux(&syscfg)
if err != nil {
log.Fatalf("❌ Failed to start admin interface: %s", err)
fatalf("Failed to start admin interface: %s", err)
}
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
log.Info("Terminating the daemon")
infof("Terminating the daemon")

if adminmux != nil {
log.Info("Stopping admin interface")
if err := adminmux.Stop(); err != nil {
log.Errorf("⚠️ Error while stopping admin interface: %s", err.Error())
errorf("Error while stopping admin interface: %s", err.Error())
}
log.Infof("✅️ Done")
infof("Done")
}

log.Infof("Stopping the pipeline")
infof("Stopping the pipeline")
stopErr := pipeline.Stop()
if stopErr != nil {
log.Fatalf("❌ Failed to stop the pipeline: %s", stopErr)
fatalf("Failed to stop the pipeline: %s", stopErr)
}
log.Infof("✅️ Done")
infof("Done")

log.Infof("Stopping the config repo")
infof("Stopping the config repo")
if repoErr := repo.TearDown(); repoErr != nil {
log.Fatalf("❌ Failed to tear down config repo: %s", repoErr)
fatalf("Failed to tear down config repo: %s", repoErr)
}
log.Infof("✅️ Done")
infof("Done")

os.Exit(0)
}
87 changes: 0 additions & 87 deletions pkg/admin/agent/graphviz.go

This file was deleted.

31 changes: 0 additions & 31 deletions pkg/admin/agent/index.go

This file was deleted.

File renamed without changes.
File renamed without changes.
14 changes: 10 additions & 4 deletions pkg/admin/agent/config.go → web/app/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"github.com/awesome-flow/flow/pkg/global"
)

type ConfigPage struct {
Title string
Config string
}

func init() {
RegisterWebAgent(
NewDummyWebAgent(
Expand All @@ -20,15 +25,16 @@ func init() {
return
}
cfgdata := repo.(*cfg.Repository).Explain()
data, err := json.Marshal(cfgdata)
js, err := json.Marshal(cfgdata)
if err != nil {
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte(err.Error()))
return
}
rw.Header().Add("Content-Type", "application/json")
rw.WriteHeader(http.StatusOK)
rw.Write(data)
respondWith(rw, RespHtml, "config", &ConfigPage{
Title: "Flow active config",
Config: string(js),
})
},
),
)
Expand Down
File renamed without changes.
Loading

0 comments on commit 37cd0cb

Please sign in to comment.