Skip to content

Commit

Permalink
feat(be): add max parallel tasks to project settings and ability to s…
Browse files Browse the repository at this point in the history
…uppress success alerts for tasks
  • Loading branch information
fiftin committed Feb 14, 2022
1 parent 7774378 commit b127e05
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 110 deletions.
1 change: 1 addition & 0 deletions db/Migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func GetMigrations() []Migration {
{Version: "2.8.39"},
{Version: "2.8.40"},
{Version: "2.8.42"},
{Version: "2.8.51"},
}
}

Expand Down
11 changes: 6 additions & 5 deletions db/Project.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (

// Project is the top level structure in Semaphore
type Project struct {
ID int `db:"id" json:"id"`
Name string `db:"name" json:"name" binding:"required"`
Created time.Time `db:"created" json:"created"`
Alert bool `db:"alert" json:"alert"`
AlertChat *string `db:"alert_chat" json:"alert_chat"`
ID int `db:"id" json:"id"`
Name string `db:"name" json:"name" binding:"required"`
Created time.Time `db:"created" json:"created"`
Alert bool `db:"alert" json:"alert"`
AlertChat *string `db:"alert_chat" json:"alert_chat"`
MaxParallelTasks int `db:"max_parallel_tasks" json:"max_parallel_tasks"`
}
18 changes: 10 additions & 8 deletions db/Task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"time"
)

type TaskStatus string

const (
TaskRunningStatus = "running"
TaskWaitingStatus = "waiting"
TaskStoppingStatus = "stopping"
TaskStoppedStatus = "stopped"
TaskSuccessStatus = "success"
TaskFailStatus = "error"
TaskRunningStatus TaskStatus = "running"
TaskWaitingStatus TaskStatus = "waiting"
TaskStoppingStatus TaskStatus = "stopping"
TaskStoppedStatus TaskStatus = "stopped"
TaskSuccessStatus TaskStatus = "success"
TaskFailStatus TaskStatus = "error"
)

//Task is a model of a task which will be executed by the runner
Expand All @@ -19,8 +21,8 @@ type Task struct {
TemplateID int `db:"template_id" json:"template_id" binding:"required"`
ProjectID int `db:"project_id" json:"project_id"`

Status string `db:"status" json:"status"`
Debug bool `db:"debug" json:"debug"`
Status TaskStatus `db:"status" json:"status"`
Debug bool `db:"debug" json:"debug"`

DryRun bool `db:"dry_run" json:"dry_run"`

Expand Down
2 changes: 2 additions & 0 deletions db/Template.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type Template struct {
// Do not use it in your code. Use SurveyVars instead.
SurveyVarsJSON *string `db:"survey_vars" json:"-"`
SurveyVars []SurveyVar `db:"-" json:"survey_vars"`

SuppressSuccessAlerts bool `db:"suppress_success_alerts" json:"suppress_success_alerts"`
}

func (tpl *Template) Validate() error {
Expand Down
2 changes: 2 additions & 0 deletions db/sql/migrations/v2.8.51.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table `project` add column `max_parallel_tasks` int not null default 0;
alter table `project__template` add column `suppress_success_alerts` bool not null default false;
3 changes: 2 additions & 1 deletion db/sql/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func (d *SqlDb) DeleteProject(projectID int) error {

func (d *SqlDb) UpdateProject(project db.Project) error {
_, err := d.exec(
"update project set name=?, alert=?, alert_chat=? where id=?",
"update project set name=?, alert=?, alert_chat=?, max_parallel_tasks=? where id=?",
project.Name,
project.Alert,
project.AlertChat,
project.MaxParallelTasks,
project.ID)
return err
}
11 changes: 7 additions & 4 deletions db/sql/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ func (d *SqlDb) CreateTemplate(template db.Template) (newTemplate db.Template, e
"id",
"insert into project__template (project_id, inventory_id, repository_id, environment_id, "+
"name, playbook, arguments, allow_override_args_in_task, description, vault_key_id, `type`, start_version,"+
"build_template_id, view_id, autorun, survey_vars)"+
"values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
"build_template_id, view_id, autorun, survey_vars, suppress_success_alerts)"+
"values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
template.ProjectID,
template.InventoryID,
template.RepositoryID,
Expand All @@ -34,7 +34,8 @@ func (d *SqlDb) CreateTemplate(template db.Template) (newTemplate db.Template, e
template.BuildTemplateID,
template.ViewID,
template.Autorun,
db.ObjectToJSON(template.SurveyVars))
db.ObjectToJSON(template.SurveyVars),
template.SuppressSuccessAlerts)

if err != nil {
return
Expand Down Expand Up @@ -74,7 +75,8 @@ func (d *SqlDb) UpdateTemplate(template db.Template) error {
"build_template_id=?, "+
"view_id=?, "+
"autorun=?, "+
"survey_vars=? "+
"survey_vars=?, "+
"suppress_success_alerts=? "+
"where id=? and project_id=?",
template.InventoryID,
template.RepositoryID,
Expand All @@ -91,6 +93,7 @@ func (d *SqlDb) UpdateTemplate(template db.Template) error {
template.ViewID,
template.Autorun,
db.ObjectToJSON(template.SurveyVars),
template.SuppressSuccessAlerts,
template.ID,
template.ProjectID,
)
Expand Down
7 changes: 6 additions & 1 deletion services/tasks/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tasks

import (
"bytes"
"github.com/ansible-semaphore/semaphore/db"
"html/template"
"net/http"
"strconv"
Expand Down Expand Up @@ -71,6 +72,10 @@ func (t *TaskRunner) sendTelegramAlert() {
return
}

if t.template.SuppressSuccessAlerts && t.task.Status == db.TaskSuccessStatus {
return
}

chatID := util.Config.TelegramChat
if t.alertChat != nil && *t.alertChat != "" {
chatID = *t.alertChat
Expand Down Expand Up @@ -106,7 +111,7 @@ func (t *TaskRunner) sendTelegramAlert() {
Name: t.template.Name,
TaskURL: util.Config.WebHost + "/project/" + strconv.Itoa(t.template.ProjectID) + "/templates/" + strconv.Itoa(t.template.ID) + "?t=" + strconv.Itoa(t.task.ID),
ChatID: chatID,
TaskResult: strings.ToUpper(t.task.Status),
TaskResult: strings.ToUpper(string(t.task.Status)),
TaskVersion: version,
TaskDescription: message,
Author: author,
Expand Down
109 changes: 58 additions & 51 deletions services/tasks/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ type logRecord struct {
time time.Time
}

type resourceLock struct {
lock bool
holder *TaskRunner
}

type TaskPool struct {
// queue contains list of tasks in status TaskWaitingStatus.
queue []*TaskRunner

// register channel used to put tasks to queue.
register chan *TaskRunner

activeProj map[int]*TaskRunner
activeNodes map[string]*TaskRunner

// running is a number of task with status TaskRunningStatus.
running int
activeProj map[int]map[int]*TaskRunner

// runningTasks contains tasks with status TaskRunningStatus.
runningTasks map[int]*TaskRunner
Expand All @@ -37,15 +38,10 @@ type TaskPool struct {
logger chan logRecord

store db.Store
}

type resourceLock struct {
lock bool
holder *TaskRunner
resourceLocker chan *resourceLock
}

var resourceLocker = make(chan *resourceLock)

func (p *TaskPool) GetTask(id int) (task *TaskRunner) {

for _, t := range p.queue {
Expand All @@ -72,7 +68,7 @@ func (p *TaskPool) Run() {
ticker := time.NewTicker(5 * time.Second)

defer func() {
close(resourceLocker)
close(p.resourceLocker)
ticker.Stop()
}()

Expand All @@ -86,33 +82,30 @@ func (p *TaskPool) Run() {
panic("Trying to lock an already locked resource!")
}

p.activeProj[t.task.ProjectID] = t

for _, node := range t.hosts {
p.activeNodes[node] = t
projTasks, ok := p.activeProj[t.task.ProjectID]
if !ok {
projTasks = make(map[int]*TaskRunner)
p.activeProj[t.task.ProjectID] = projTasks
}

p.running++
projTasks[t.task.ID] = t
p.runningTasks[t.task.ID] = t
continue
}

if p.activeProj[t.task.ProjectID] == t {
delete(p.activeProj, t.task.ProjectID)
}

for _, node := range t.hosts {
delete(p.activeNodes, node)
if p.activeProj[t.task.ProjectID] != nil && p.activeProj[t.task.ProjectID][t.task.ID] != nil {
delete(p.activeProj[t.task.ProjectID], t.task.ID)
if len(p.activeProj[t.task.ProjectID]) == 0 {
delete(p.activeProj, t.task.ProjectID)
}
}

p.running--
delete(p.runningTasks, t.task.ID)
}
}(resourceLocker)
}(p.resourceLocker)

for {
select {
case record := <-p.logger:
case record := <-p.logger: // new log message which should be put to database
_, err := record.task.pool.store.CreateTaskOutput(db.TaskOutput{
TaskID: record.task.task.ID,
Output: record.output,
Expand All @@ -122,14 +115,15 @@ func (p *TaskPool) Run() {
if err != nil {
log.Error(err)
}
case task := <-p.register:
case task := <-p.register: // new task created by API or schedule
p.queue = append(p.queue, task)
log.Debug(task)
msg := "Task " + strconv.Itoa(task.task.ID) + " added to queue"
task.Log(msg)
log.Info(msg)
task.updateStatus()

case <-ticker.C:
case <-ticker.C: // timer 5 seconds
if len(p.queue) == 0 {
continue
}
Expand All @@ -148,7 +142,7 @@ func (p *TaskPool) Run() {
continue
}
log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.task.ID))
resourceLocker <- &resourceLock{lock: true, holder: t}
p.resourceLocker <- &resourceLock{lock: true, holder: t}
if !t.prepared {
go t.prepareRun()
continue
Expand All @@ -161,36 +155,40 @@ func (p *TaskPool) Run() {
}

func (p *TaskPool) blocks(t *TaskRunner) bool {
if p.running >= util.Config.MaxParallelTasks {

if len(p.runningTasks) >= util.Config.MaxParallelTasks {
return true
}

switch util.Config.ConcurrencyMode {
case "project":
return p.activeProj[t.task.ProjectID] != nil
case "node":
for _, node := range t.hosts {
if p.activeNodes[node] != nil {
return true
}
if p.activeProj[t.task.ProjectID] == nil || len(p.activeProj[t.task.ProjectID]) == 0 {
return false
}

for _, r := range p.activeProj[t.task.ProjectID] {
if r.template.ID == t.task.TemplateID {
return true
}
}

proj, err := p.store.GetProject(t.task.ProjectID)

if err != nil {
log.Error(err)
return false
default:
return p.running > 0
}

return proj.MaxParallelTasks > 0 && len(p.activeProj[t.task.ProjectID]) >= proj.MaxParallelTasks
}

func CreateTaskPool(store db.Store) TaskPool {
return TaskPool{
queue: make([]*TaskRunner, 0), // queue of waiting tasks
register: make(chan *TaskRunner), // add TaskRunner to queue
activeProj: make(map[int]*TaskRunner),
activeNodes: make(map[string]*TaskRunner),
running: 0, // number of running tasks
runningTasks: make(map[int]*TaskRunner), // working tasks
logger: make(chan logRecord, 10000), // store log records to database
store: store,
queue: make([]*TaskRunner, 0), // queue of waiting tasks
register: make(chan *TaskRunner), // add TaskRunner to queue
activeProj: make(map[int]map[int]*TaskRunner),
runningTasks: make(map[int]*TaskRunner), // working tasks
logger: make(chan logRecord, 10000), // store log records to database
store: store,
resourceLocker: make(chan *resourceLock),
}
}

Expand Down Expand Up @@ -313,11 +311,20 @@ func (p *TaskPool) AddTask(taskObj db.Task, userID *int, projectID int) (newTask
return
}

p.register <- &TaskRunner{
taskRunner := TaskRunner{
task: newTask,
pool: p,
}

err = taskRunner.populateDetails()
if err != nil {
taskRunner.Log("Error: " + err.Error())
taskRunner.fail()
return
}

p.register <- &taskRunner

objType := db.EventTask
desc := "Task ID " + strconv.Itoa(newTask.ID) + " queued for running"
_, err = p.store.CreateEvent(db.Event{
Expand Down
Loading

0 comments on commit b127e05

Please sign in to comment.