Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support graceful shutdown #1051

Merged
merged 6 commits into from
May 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions server/events/command_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
gitlab "github.com/xanzy/go-gitlab"
)

const (
ShutdownComment = "Atlantis server is shutting down, please try again later."
)

//go:generate pegomock generate -m --use-experimental-model-gen --package mocks -o mocks/mock_command_runner.go CommandRunner

// CommandRunner is the first step after a command request has been parsed.
Expand Down Expand Up @@ -98,10 +102,19 @@ type DefaultCommandRunner struct {
PendingPlanFinder PendingPlanFinder
WorkingDir WorkingDir
DB *db.BoltDB
Drainer *Drainer
}

// RunAutoplanCommand runs plan when a pull request is opened or updated.
func (c *DefaultCommandRunner) RunAutoplanCommand(baseRepo models.Repo, headRepo models.Repo, pull models.PullRequest, user models.User) {
if opStarted := c.Drainer.StartOp(); !opStarted {
if commentErr := c.VCSClient.CreateComment(baseRepo, pull.Num, ShutdownComment); commentErr != nil {
c.Logger.Log(logging.Error, "unable to comment that Atlantis is shutting down: %s", commentErr)
}
return
}
defer c.Drainer.OpDone()

log := c.buildLogger(baseRepo.FullName, pull.Num)
defer c.logPanics(baseRepo, pull.Num, log)
ctx := &CommandContext{
Expand Down Expand Up @@ -164,6 +177,14 @@ func (c *DefaultCommandRunner) RunAutoplanCommand(baseRepo models.Repo, headRepo
// the event is further validated before making an additional (potentially
// wasteful) call to get the necessary data.
func (c *DefaultCommandRunner) RunCommentCommand(baseRepo models.Repo, maybeHeadRepo *models.Repo, maybePull *models.PullRequest, user models.User, pullNum int, cmd *CommentCommand) {
if opStarted := c.Drainer.StartOp(); !opStarted {
if commentErr := c.VCSClient.CreateComment(baseRepo, pullNum, ShutdownComment); commentErr != nil {
c.Logger.Log(logging.Error, "unable to comment that Atlantis is shutting down: %s", commentErr)
}
return
}
defer c.Drainer.OpDone()

log := c.buildLogger(baseRepo.FullName, pullNum)
defer c.logPanics(baseRepo, pullNum, log)

Expand Down
43 changes: 41 additions & 2 deletions server/events/command_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
. "github.com/runatlantis/atlantis/testing"
)

var commitStatusUpdater *mocks.MockCommitStatusUpdater
var projectCommandBuilder *mocks.MockProjectCommandBuilder
var projectCommandRunner *mocks.MockProjectCommandRunner
var eventParsing *mocks.MockEventParsing
Expand All @@ -43,9 +44,11 @@ var ch events.DefaultCommandRunner
var pullLogger *logging.SimpleLogger
var workingDir events.WorkingDir
var pendingPlanFinder *mocks.MockPendingPlanFinder
var drainer *events.Drainer

func setup(t *testing.T) *vcsmocks.MockClient {
RegisterMockTestingT(t)
commitStatusUpdater = mocks.NewMockCommitStatusUpdater()
projectCommandBuilder = mocks.NewMockProjectCommandBuilder()
eventParsing = mocks.NewMockEventParsing()
vcsClient := vcsmocks.NewMockClient()
Expand All @@ -57,12 +60,13 @@ func setup(t *testing.T) *vcsmocks.MockClient {
projectCommandRunner = mocks.NewMockProjectCommandRunner()
workingDir = mocks.NewMockWorkingDir()
pendingPlanFinder = mocks.NewMockPendingPlanFinder()
drainer = &events.Drainer{}
When(logger.GetLevel()).ThenReturn(logging.Info)
When(logger.NewLogger("runatlantis/atlantis#1", true, logging.Info)).
ThenReturn(pullLogger)
ch = events.DefaultCommandRunner{
VCSClient: vcsClient,
CommitStatusUpdater: &events.DefaultCommitStatusUpdater{vcsClient, "atlantis"},
CommitStatusUpdater: commitStatusUpdater,
EventParser: eventParsing,
MarkdownRenderer: &events.MarkdownRenderer{},
GithubPullGetter: githubGetter,
Expand All @@ -76,14 +80,15 @@ func setup(t *testing.T) *vcsmocks.MockClient {
PendingPlanFinder: pendingPlanFinder,
WorkingDir: workingDir,
DisableApplyAll: false,
Drainer: drainer,
}
return vcsClient
}

func TestRunCommentCommand_LogPanics(t *testing.T) {
t.Log("if there is a panic it is commented back on the pull request")
vcsClient := setup(t)
When(githubGetter.GetPullRequest(fixtures.GithubRepo, fixtures.Pull.Num)).ThenPanic("OMG PANIC!!!")
When(githubGetter.GetPullRequest(fixtures.GithubRepo, fixtures.Pull.Num)).ThenPanic("panic test - if you're seeing this in a test failure this isn't the failing test")
ch.RunCommentCommand(fixtures.GithubRepo, &fixtures.GithubRepo, nil, fixtures.User, 1, &events.CommentCommand{Name: models.PlanCommand})
_, _, comment := vcsClient.VerifyWasCalledOnce().CreateComment(matchers.AnyModelsRepo(), AnyInt(), AnyString()).GetCapturedArguments()
Assert(t, strings.Contains(comment, "Error: goroutine panic"), fmt.Sprintf("comment should be about a goroutine panic but was %q", comment))
Expand Down Expand Up @@ -234,3 +239,37 @@ func TestRunAutoplanCommand_DeletePlans(t *testing.T) {
ch.RunAutoplanCommand(fixtures.GithubRepo, fixtures.GithubRepo, fixtures.Pull, fixtures.User)
pendingPlanFinder.VerifyWasCalledOnce().DeletePlans(tmp)
}

func TestRunCommentCommand_DrainOngoing(t *testing.T) {
t.Log("if drain is ongoing then a message should be displayed")
vcsClient := setup(t)
drainer.ShutdownBlocking()
ch.RunCommentCommand(fixtures.GithubRepo, &fixtures.GithubRepo, nil, fixtures.User, fixtures.Pull.Num, nil)
vcsClient.VerifyWasCalledOnce().CreateComment(fixtures.GithubRepo, fixtures.Pull.Num, "Atlantis server is shutting down, please try again later.")
}

func TestRunCommentCommand_DrainNotOngoing(t *testing.T) {
t.Log("if drain is not ongoing then remove ongoing operation must be called even if panic occured")
setup(t)
When(githubGetter.GetPullRequest(fixtures.GithubRepo, fixtures.Pull.Num)).ThenPanic("panic test - if you're seeing this in a test failure this isn't the failing test")
ch.RunCommentCommand(fixtures.GithubRepo, &fixtures.GithubRepo, nil, fixtures.User, fixtures.Pull.Num, nil)
githubGetter.VerifyWasCalledOnce().GetPullRequest(fixtures.GithubRepo, fixtures.Pull.Num)
Equals(t, 0, drainer.GetStatus().InProgressOps)
}

func TestRunAutoplanCommand_DrainOngoing(t *testing.T) {
t.Log("if drain is ongoing then a message should be displayed")
vcsClient := setup(t)
drainer.ShutdownBlocking()
ch.RunAutoplanCommand(fixtures.GithubRepo, fixtures.GithubRepo, fixtures.Pull, fixtures.User)
vcsClient.VerifyWasCalledOnce().CreateComment(fixtures.GithubRepo, fixtures.Pull.Num, "Atlantis server is shutting down, please try again later.")
}

func TestRunAutoplanCommand_DrainNotOngoing(t *testing.T) {
t.Log("if drain is not ongoing then remove ongoing operation must be called even if panic occured")
setup(t)
When(projectCommandBuilder.BuildAutoplanCommands(matchers.AnyPtrToEventsCommandContext())).ThenPanic("panic test - if you're seeing this in a test failure this isn't the failing test")
ch.RunAutoplanCommand(fixtures.GithubRepo, fixtures.GithubRepo, fixtures.Pull, fixtures.User)
projectCommandBuilder.VerifyWasCalledOnce().BuildAutoplanCommands(matchers.AnyPtrToEventsCommandContext())
Equals(t, 0, drainer.GetStatus().InProgressOps)
}
63 changes: 63 additions & 0 deletions server/events/drainer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package events

import (
"sync"
)

// Drainer is used to gracefully shut down atlantis by waiting for in-progress
// operations to complete.
type Drainer struct {
status DrainStatus
mutex sync.Mutex
wg sync.WaitGroup
}

type DrainStatus struct {
// ShuttingDown is whether we are in the progress of shutting down.
ShuttingDown bool
// InProgressOps is the number of operations currently in progress.
InProgressOps int
}

// StartOp tries to start a new operation. It returns false is Atlantis is
// shutting down.
func (d *Drainer) StartOp() bool {
d.mutex.Lock()
defer d.mutex.Unlock()

if d.status.ShuttingDown {
return false
}
d.status.InProgressOps++
d.wg.Add(1)
return true
}

// OpDone marks an operation as complete.
func (d *Drainer) OpDone() {
d.mutex.Lock()
defer d.mutex.Unlock()

d.status.InProgressOps--
d.wg.Done()
if d.status.InProgressOps < 0 {
// This would be a bug.
d.status.InProgressOps = 0
}
}

// ShutdownBlocking sets "shutting down" to true and blocks until there are no
// in progress operations.
func (d *Drainer) ShutdownBlocking() {
// Set the shutdown status.
d.mutex.Lock()
d.status.ShuttingDown = true
d.mutex.Unlock()

// Block until there are no in-progress ops.
d.wg.Wait()
}

func (d *Drainer) GetStatus() DrainStatus {
return d.status
}
69 changes: 69 additions & 0 deletions server/events/drainer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package events_test

import (
"context"
"testing"
"time"

"github.com/runatlantis/atlantis/server/events"
. "github.com/runatlantis/atlantis/testing"
)

// Test starting and completing ops.
func TestDrainer(t *testing.T) {
d := events.Drainer{}

// Starts at 0.
Equals(t, 0, d.GetStatus().InProgressOps)

// Add 1.
d.StartOp()
Equals(t, 1, d.GetStatus().InProgressOps)

// Remove 1.
d.OpDone()
Equals(t, 0, d.GetStatus().InProgressOps)

// Add 2.
d.StartOp()
d.StartOp()
Equals(t, 2, d.GetStatus().InProgressOps)

// Remove 1.
d.OpDone()
Equals(t, 1, d.GetStatus().InProgressOps)
}

func TestDrainer_Shutdown(t *testing.T) {
d := events.Drainer{}
d.StartOp()

shutdown := make(chan bool)
go func() {
d.ShutdownBlocking()
close(shutdown)
}()

// Sleep to ensure that ShutdownBlocking has been called.
time.Sleep(300 * time.Millisecond)

// Starting another op should fail.
Equals(t, false, d.StartOp())

// Status should be shutting down.
Equals(t, events.DrainStatus{
ShuttingDown: true,
InProgressOps: 1,
}, d.GetStatus())

// Stop the final operation and wait for shutdown to exit.
d.OpDone()
timer, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
select {
case <-shutdown:
case <-timer.Done():
Assert(t, false, "Timer reached without shutdown")

}
}
Loading