From b127e054d80c4cc1ccb74c9eff301b41ec48b205 Mon Sep 17 00:00:00 2001 From: Denis Gukov Date: Sat, 12 Feb 2022 17:15:15 +0500 Subject: [PATCH] feat(be): add max parallel tasks to project settings and ability to suppress success alerts for tasks --- db/Migration.go | 1 + db/Project.go | 11 ++-- db/Task.go | 18 +++--- db/Template.go | 2 + db/sql/migrations/v2.8.51.sql | 2 + db/sql/project.go | 3 +- db/sql/template.go | 11 ++-- services/tasks/alert.go | 7 ++- services/tasks/pool.go | 109 ++++++++++++++++++---------------- services/tasks/runner.go | 43 ++------------ util/config.go | 3 +- 11 files changed, 100 insertions(+), 110 deletions(-) create mode 100644 db/sql/migrations/v2.8.51.sql diff --git a/db/Migration.go b/db/Migration.go index c3d5f7629..690d0ecff 100644 --- a/db/Migration.go +++ b/db/Migration.go @@ -56,6 +56,7 @@ func GetMigrations() []Migration { {Version: "2.8.39"}, {Version: "2.8.40"}, {Version: "2.8.42"}, + {Version: "2.8.51"}, } } diff --git a/db/Project.go b/db/Project.go index f8238debc..859f10e75 100644 --- a/db/Project.go +++ b/db/Project.go @@ -6,9 +6,10 @@ import ( // Project is the top level structure in Semaphore type Project struct { - ID int `db:"id" json:"id"` - Name string `db:"name" json:"name" binding:"required"` - Created time.Time `db:"created" json:"created"` - Alert bool `db:"alert" json:"alert"` - AlertChat *string `db:"alert_chat" json:"alert_chat"` + ID int `db:"id" json:"id"` + Name string `db:"name" json:"name" binding:"required"` + Created time.Time `db:"created" json:"created"` + Alert bool `db:"alert" json:"alert"` + AlertChat *string `db:"alert_chat" json:"alert_chat"` + MaxParallelTasks int `db:"max_parallel_tasks" json:"max_parallel_tasks"` } diff --git a/db/Task.go b/db/Task.go index 4d899c331..45dc215a2 100644 --- a/db/Task.go +++ b/db/Task.go @@ -4,13 +4,15 @@ import ( "time" ) +type TaskStatus string + const ( - TaskRunningStatus = "running" - TaskWaitingStatus = "waiting" - TaskStoppingStatus = "stopping" - TaskStoppedStatus = "stopped" - TaskSuccessStatus = "success" - TaskFailStatus = "error" + TaskRunningStatus TaskStatus = "running" + TaskWaitingStatus TaskStatus = "waiting" + TaskStoppingStatus TaskStatus = "stopping" + TaskStoppedStatus TaskStatus = "stopped" + TaskSuccessStatus TaskStatus = "success" + TaskFailStatus TaskStatus = "error" ) //Task is a model of a task which will be executed by the runner @@ -19,8 +21,8 @@ type Task struct { TemplateID int `db:"template_id" json:"template_id" binding:"required"` ProjectID int `db:"project_id" json:"project_id"` - Status string `db:"status" json:"status"` - Debug bool `db:"debug" json:"debug"` + Status TaskStatus `db:"status" json:"status"` + Debug bool `db:"debug" json:"debug"` DryRun bool `db:"dry_run" json:"dry_run"` diff --git a/db/Template.go b/db/Template.go index ca095d80a..059ca24db 100644 --- a/db/Template.go +++ b/db/Template.go @@ -71,6 +71,8 @@ type Template struct { // Do not use it in your code. Use SurveyVars instead. SurveyVarsJSON *string `db:"survey_vars" json:"-"` SurveyVars []SurveyVar `db:"-" json:"survey_vars"` + + SuppressSuccessAlerts bool `db:"suppress_success_alerts" json:"suppress_success_alerts"` } func (tpl *Template) Validate() error { diff --git a/db/sql/migrations/v2.8.51.sql b/db/sql/migrations/v2.8.51.sql new file mode 100644 index 000000000..e81ed3707 --- /dev/null +++ b/db/sql/migrations/v2.8.51.sql @@ -0,0 +1,2 @@ +alter table `project` add column `max_parallel_tasks` int not null default 0; +alter table `project__template` add column `suppress_success_alerts` bool not null default false; diff --git a/db/sql/project.go b/db/sql/project.go index 5108b9f7d..63e36e1e0 100644 --- a/db/sql/project.go +++ b/db/sql/project.go @@ -85,10 +85,11 @@ func (d *SqlDb) DeleteProject(projectID int) error { func (d *SqlDb) UpdateProject(project db.Project) error { _, err := d.exec( - "update project set name=?, alert=?, alert_chat=? where id=?", + "update project set name=?, alert=?, alert_chat=?, max_parallel_tasks=? where id=?", project.Name, project.Alert, project.AlertChat, + project.MaxParallelTasks, project.ID) return err } diff --git a/db/sql/template.go b/db/sql/template.go index 2104203ca..b36c5da9e 100644 --- a/db/sql/template.go +++ b/db/sql/template.go @@ -17,8 +17,8 @@ func (d *SqlDb) CreateTemplate(template db.Template) (newTemplate db.Template, e "id", "insert into project__template (project_id, inventory_id, repository_id, environment_id, "+ "name, playbook, arguments, allow_override_args_in_task, description, vault_key_id, `type`, start_version,"+ - "build_template_id, view_id, autorun, survey_vars)"+ - "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + "build_template_id, view_id, autorun, survey_vars, suppress_success_alerts)"+ + "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", template.ProjectID, template.InventoryID, template.RepositoryID, @@ -34,7 +34,8 @@ func (d *SqlDb) CreateTemplate(template db.Template) (newTemplate db.Template, e template.BuildTemplateID, template.ViewID, template.Autorun, - db.ObjectToJSON(template.SurveyVars)) + db.ObjectToJSON(template.SurveyVars), + template.SuppressSuccessAlerts) if err != nil { return @@ -74,7 +75,8 @@ func (d *SqlDb) UpdateTemplate(template db.Template) error { "build_template_id=?, "+ "view_id=?, "+ "autorun=?, "+ - "survey_vars=? "+ + "survey_vars=?, "+ + "suppress_success_alerts=? "+ "where id=? and project_id=?", template.InventoryID, template.RepositoryID, @@ -91,6 +93,7 @@ func (d *SqlDb) UpdateTemplate(template db.Template) error { template.ViewID, template.Autorun, db.ObjectToJSON(template.SurveyVars), + template.SuppressSuccessAlerts, template.ID, template.ProjectID, ) diff --git a/services/tasks/alert.go b/services/tasks/alert.go index 6a8206a5d..5b61b42e0 100644 --- a/services/tasks/alert.go +++ b/services/tasks/alert.go @@ -2,6 +2,7 @@ package tasks import ( "bytes" + "github.com/ansible-semaphore/semaphore/db" "html/template" "net/http" "strconv" @@ -71,6 +72,10 @@ func (t *TaskRunner) sendTelegramAlert() { return } + if t.template.SuppressSuccessAlerts && t.task.Status == db.TaskSuccessStatus { + return + } + chatID := util.Config.TelegramChat if t.alertChat != nil && *t.alertChat != "" { chatID = *t.alertChat @@ -106,7 +111,7 @@ func (t *TaskRunner) sendTelegramAlert() { Name: t.template.Name, TaskURL: util.Config.WebHost + "/project/" + strconv.Itoa(t.template.ProjectID) + "/templates/" + strconv.Itoa(t.template.ID) + "?t=" + strconv.Itoa(t.task.ID), ChatID: chatID, - TaskResult: strings.ToUpper(t.task.Status), + TaskResult: strings.ToUpper(string(t.task.Status)), TaskVersion: version, TaskDescription: message, Author: author, diff --git a/services/tasks/pool.go b/services/tasks/pool.go index 59fdd283e..c1874651a 100644 --- a/services/tasks/pool.go +++ b/services/tasks/pool.go @@ -17,18 +17,19 @@ type logRecord struct { time time.Time } +type resourceLock struct { + lock bool + holder *TaskRunner +} + type TaskPool struct { // queue contains list of tasks in status TaskWaitingStatus. queue []*TaskRunner - + // register channel used to put tasks to queue. register chan *TaskRunner - activeProj map[int]*TaskRunner - activeNodes map[string]*TaskRunner - - // running is a number of task with status TaskRunningStatus. - running int + activeProj map[int]map[int]*TaskRunner // runningTasks contains tasks with status TaskRunningStatus. runningTasks map[int]*TaskRunner @@ -37,15 +38,10 @@ type TaskPool struct { logger chan logRecord store db.Store -} -type resourceLock struct { - lock bool - holder *TaskRunner + resourceLocker chan *resourceLock } -var resourceLocker = make(chan *resourceLock) - func (p *TaskPool) GetTask(id int) (task *TaskRunner) { for _, t := range p.queue { @@ -72,7 +68,7 @@ func (p *TaskPool) Run() { ticker := time.NewTicker(5 * time.Second) defer func() { - close(resourceLocker) + close(p.resourceLocker) ticker.Stop() }() @@ -86,33 +82,30 @@ func (p *TaskPool) Run() { panic("Trying to lock an already locked resource!") } - p.activeProj[t.task.ProjectID] = t - - for _, node := range t.hosts { - p.activeNodes[node] = t + projTasks, ok := p.activeProj[t.task.ProjectID] + if !ok { + projTasks = make(map[int]*TaskRunner) + p.activeProj[t.task.ProjectID] = projTasks } - - p.running++ + projTasks[t.task.ID] = t p.runningTasks[t.task.ID] = t continue } - if p.activeProj[t.task.ProjectID] == t { - delete(p.activeProj, t.task.ProjectID) - } - - for _, node := range t.hosts { - delete(p.activeNodes, node) + if p.activeProj[t.task.ProjectID] != nil && p.activeProj[t.task.ProjectID][t.task.ID] != nil { + delete(p.activeProj[t.task.ProjectID], t.task.ID) + if len(p.activeProj[t.task.ProjectID]) == 0 { + delete(p.activeProj, t.task.ProjectID) + } } - p.running-- delete(p.runningTasks, t.task.ID) } - }(resourceLocker) + }(p.resourceLocker) for { select { - case record := <-p.logger: + case record := <-p.logger: // new log message which should be put to database _, err := record.task.pool.store.CreateTaskOutput(db.TaskOutput{ TaskID: record.task.task.ID, Output: record.output, @@ -122,14 +115,15 @@ func (p *TaskPool) Run() { if err != nil { log.Error(err) } - case task := <-p.register: + case task := <-p.register: // new task created by API or schedule p.queue = append(p.queue, task) log.Debug(task) msg := "Task " + strconv.Itoa(task.task.ID) + " added to queue" task.Log(msg) log.Info(msg) + task.updateStatus() - case <-ticker.C: + case <-ticker.C: // timer 5 seconds if len(p.queue) == 0 { continue } @@ -148,7 +142,7 @@ func (p *TaskPool) Run() { continue } log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.task.ID)) - resourceLocker <- &resourceLock{lock: true, holder: t} + p.resourceLocker <- &resourceLock{lock: true, holder: t} if !t.prepared { go t.prepareRun() continue @@ -161,36 +155,40 @@ func (p *TaskPool) Run() { } func (p *TaskPool) blocks(t *TaskRunner) bool { - if p.running >= util.Config.MaxParallelTasks { + + if len(p.runningTasks) >= util.Config.MaxParallelTasks { return true } - switch util.Config.ConcurrencyMode { - case "project": - return p.activeProj[t.task.ProjectID] != nil - case "node": - for _, node := range t.hosts { - if p.activeNodes[node] != nil { - return true - } + if p.activeProj[t.task.ProjectID] == nil || len(p.activeProj[t.task.ProjectID]) == 0 { + return false + } + + for _, r := range p.activeProj[t.task.ProjectID] { + if r.template.ID == t.task.TemplateID { + return true } + } + + proj, err := p.store.GetProject(t.task.ProjectID) + if err != nil { + log.Error(err) return false - default: - return p.running > 0 } + + return proj.MaxParallelTasks > 0 && len(p.activeProj[t.task.ProjectID]) >= proj.MaxParallelTasks } func CreateTaskPool(store db.Store) TaskPool { return TaskPool{ - queue: make([]*TaskRunner, 0), // queue of waiting tasks - register: make(chan *TaskRunner), // add TaskRunner to queue - activeProj: make(map[int]*TaskRunner), - activeNodes: make(map[string]*TaskRunner), - running: 0, // number of running tasks - runningTasks: make(map[int]*TaskRunner), // working tasks - logger: make(chan logRecord, 10000), // store log records to database - store: store, + queue: make([]*TaskRunner, 0), // queue of waiting tasks + register: make(chan *TaskRunner), // add TaskRunner to queue + activeProj: make(map[int]map[int]*TaskRunner), + runningTasks: make(map[int]*TaskRunner), // working tasks + logger: make(chan logRecord, 10000), // store log records to database + store: store, + resourceLocker: make(chan *resourceLock), } } @@ -313,11 +311,20 @@ func (p *TaskPool) AddTask(taskObj db.Task, userID *int, projectID int) (newTask return } - p.register <- &TaskRunner{ + taskRunner := TaskRunner{ task: newTask, pool: p, } + err = taskRunner.populateDetails() + if err != nil { + taskRunner.Log("Error: " + err.Error()) + taskRunner.fail() + return + } + + p.register <- &taskRunner + objType := db.EventTask desc := "Task ID " + strconv.Itoa(newTask.ID) + " queued for running" _, err = p.store.CreateEvent(db.Event{ diff --git a/services/tasks/runner.go b/services/tasks/runner.go index f433180be..0c3aa2f1a 100644 --- a/services/tasks/runner.go +++ b/services/tasks/runner.go @@ -30,9 +30,8 @@ type TaskRunner struct { environment db.Environment users []int - hosts []string - alertChat *string alert bool + alertChat *string prepared bool process *os.Process pool *TaskPool @@ -62,7 +61,7 @@ func (t *TaskRunner) getRepoPath() string { return repo.GetFullPath() } -func (t *TaskRunner) setStatus(status string) { +func (t *TaskRunner) setStatus(status db.TaskStatus) { if t.task.Status == db.TaskStoppingStatus { switch status { case db.TaskFailStatus: @@ -132,7 +131,7 @@ func (t *TaskRunner) destroyKeys() { func (t *TaskRunner) createTaskEvent() { objType := db.EventTask - desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Name + ")" + " finished - " + strings.ToUpper(t.task.Status) + desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Name + ")" + " finished - " + strings.ToUpper(string(t.task.Status)) _, err := t.pool.store.CreateEvent(db.Event{ UserID: t.task.UserID, @@ -153,7 +152,7 @@ func (t *TaskRunner) prepareRun() { defer func() { log.Info("Stopped preparing TaskRunner " + strconv.Itoa(t.task.ID)) log.Info("Release resource locker with TaskRunner " + strconv.Itoa(t.task.ID)) - resourceLocker <- &resourceLock{lock: false, holder: t} + t.pool.resourceLocker <- &resourceLock{lock: false, holder: t} t.createTaskEvent() }() @@ -167,13 +166,6 @@ func (t *TaskRunner) prepareRun() { return } - err = t.populateDetails() - if err != nil { - t.Log("Error: " + err.Error()) - t.fail() - return - } - objType := db.EventTask desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Name + ")" + " is preparing" _, err = t.pool.store.CreateEvent(db.Event{ @@ -232,12 +224,6 @@ func (t *TaskRunner) prepareRun() { return } - if err := t.listPlaybookHosts(); err != nil { - t.Log("Listing playbook hosts failed: " + err.Error()) - t.fail() - return - } - t.prepared = true } @@ -245,7 +231,7 @@ func (t *TaskRunner) run() { defer func() { log.Info("Stopped running TaskRunner " + strconv.Itoa(t.task.ID)) log.Info("Release resource locker with TaskRunner " + strconv.Itoa(t.task.ID)) - resourceLocker <- &resourceLock{lock: false, holder: t} + t.pool.resourceLocker <- &resourceLock{lock: false, holder: t} now := time.Now() t.task.End = &now @@ -533,25 +519,6 @@ func (t *TaskRunner) runGalaxy(args []string) error { }.RunGalaxy(args) } -func (t *TaskRunner) listPlaybookHosts() (err error) { - if util.Config.ConcurrencyMode == "project" { - return - } - - args, err := t.getPlaybookArgs() - if err != nil { - return - } - - t.hosts, err = lib.AnsiblePlaybook{ - Logger: t, - TemplateID: t.template.ID, - Repository: t.repository, - }.GetHosts(args) - - return -} - func (t *TaskRunner) runPlaybook() (err error) { args, err := t.getPlaybookArgs() if err != nil { diff --git a/util/config.go b/util/config.go index c44fb4816..e8926ede9 100644 --- a/util/config.go +++ b/util/config.go @@ -99,8 +99,7 @@ type ConfigType struct { TelegramToken string `json:"telegram_token"` // task concurrency - ConcurrencyMode string `json:"concurrency_mode"` - MaxParallelTasks int `json:"max_parallel_tasks"` + MaxParallelTasks int `json:"max_parallel_tasks"` // configType field ordering with bools at end reduces struct size // (maligned check)