Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix runner replacement on recovery #667

Merged
merged 1 commit into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions internal/runner/nomad_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,28 +240,48 @@
return newJob, isUsed, nil
}

// updateUsedRunners handles the cleanup process of updating the used runner storage.
// This includes the clean deletion of the local references to the (replaced/deleted) runners.
// Only if removeDeleted is set, the runners that are only in newUsedRunners (and not in the main m.usedRunners) will be removed.
// updateUsedRunners handles the updating of the used runner storage.
// This includes the update of the local reference with new/changed values and the deletion of the additional runner reference.
// Only if removeDeleted is set, the runners that are only in m.usedRunners (and not in the newUsedRunners) will be removed.
func (m *NomadRunnerManager) updateUsedRunners(newUsedRunners storage.Storage[Runner], removeDeleted bool) {
for _, r := range m.usedRunners.List() {
var reason DestroyReason
if _, ok := newUsedRunners.Get(r.ID()); ok {
reason = ErrDestroyedAndReplaced
if newRunner, ok := newUsedRunners.Get(r.ID()); ok {
updatePortMapping(r, newRunner)
} else if removeDeleted {
reason = ErrLocalDestruction
log.WithError(reason).WithField(dto.KeyRunnerID, r.ID()).Warn("Local runner cannot be recovered")
}
if reason != nil {
// Remove old reference
log.WithField(dto.KeyRunnerID, r.ID()).Warn("Local runner cannot be recovered")

Check warning on line 252 in internal/runner/nomad_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_manager.go#L252

Added line #L252 was not covered by tests
m.usedRunners.Delete(r.ID())
if err := r.Destroy(reason); err != nil {
if err := r.Destroy(ErrLocalDestruction); err != nil {

Check warning on line 254 in internal/runner/nomad_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_manager.go#L254

Added line #L254 was not covered by tests
log.WithError(err).WithField(dto.KeyRunnerID, r.ID()).Warn("failed to destroy runner locally")
}
}
}

for _, r := range newUsedRunners.List() {
m.usedRunners.Add(r.ID(), r)
// Add runners not already in m.usedRunners
if _, ok := m.usedRunners.Get(r.ID()); !ok {
m.usedRunners.Add(r.ID(), r)
}
}
}

// updatePortMapping sets the port mapping of target to the port mapping of updated.
// It then removes the updated reference to the runner.
func updatePortMapping(target Runner, updated Runner) {
defer func() {
// Remove updated reference. We keep using the old reference to not cancel running executions.
if err := updated.Destroy(ErrLocalDestruction); err != nil {
log.WithError(err).WithField(dto.KeyRunnerID, target.ID()).Warn("failed to destroy runner locally")

Check warning on line 274 in internal/runner/nomad_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_manager.go#L274

Added line #L274 was not covered by tests
}
}()

nomadRunner, ok := target.(*NomadJob)
if !ok {
log.WithField(dto.KeyRunnerID, target.ID()).Error("Unexpected handling of non-Nomad runner")
return

Check warning on line 281 in internal/runner/nomad_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_manager.go#L280-L281

Added lines #L280 - L281 were not covered by tests
}
if err := nomadRunner.UpdateMappedPorts(updated.MappedPorts()); err != nil {
log.WithError(err).WithField(dto.KeyRunnerID, target.ID()).Error("Failed updating the port mapping")

Check warning on line 284 in internal/runner/nomad_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_manager.go#L284

Added line #L284 was not covered by tests
}
}

Expand Down
82 changes: 82 additions & 0 deletions internal/runner/nomad_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package runner

import (
"context"
"io"
"strconv"
"testing"
"time"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/openHPI/poseidon/internal/config"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/nullio"
"github.com/openHPI/poseidon/pkg/storage"
"github.com/openHPI/poseidon/pkg/util"
"github.com/openHPI/poseidon/tests"
Expand Down Expand Up @@ -550,6 +552,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
runnerManager := NewNomadRunnerManager(s.TestCtx, apiMock)
environmentMock := createBasicEnvironmentMock(tests.DefaultEnvironmentIDAsInteger)
environmentMock.On("ApplyPrewarmingPoolSize").Return(nil)
environmentMock.On("DeleteRunner", mock.Anything).Return(nil, true).Maybe()
runnerManager.StoreEnvironment(environmentMock)

s.Run("Stores unused runner", func() {
Expand Down Expand Up @@ -608,6 +611,85 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
<-time.After(time.Duration(timeout*2) * time.Second)
s.Require().Zero(runnerManager.usedRunners.Length())
})

s.Run("Don't stop running executions", func() {
apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil).Once()
_, job := helpers.CreateTemplateJob()
jobID := tests.DefaultRunnerID
job.ID = &jobID
job.Name = &jobID
configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName)
s.Require().NotNil(configTaskGroup)
configTaskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUsedValue
configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey] = strconv.Itoa(1)
call.Return([]*nomadApi.Job{job}, nil)

executionCtx, cancelExecution := context.WithCancel(s.TestCtx)
apiMock.On("ExecuteCommand", mock.Anything, tests.DefaultRunnerID,
mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Run(func(_ mock.Arguments) {
<-executionCtx.Done()
}).Return(0, nil)
r := NewNomadJob(s.TestCtx, tests.DefaultRunnerID, nil, apiMock, func(_ Runner) error {
return nil
})
runnerManager.usedRunners.Add(r.ID(), r)
r.StoreExecution(tests.DefaultExecutionID, &dto.ExecutionRequest{})
exitInfo, _, err := r.ExecuteInteractively(s.TestCtx, tests.DefaultExecutionID,
&nullio.ReadWriter{Reader: nullio.Reader{Ctx: s.TestCtx}}, io.Discard, io.Discard)
s.Require().NoError(err)

runnerManager.Load(s.TestCtx)
select {
case <-exitInfo:
s.FailNow("Execution stopped on recovery")
case <-time.After(tests.ShortTimeout):
}

cancelExecution()
err = r.Destroy(ErrLocalDestruction)
s.Require().NoError(err)
})

s.Run("Update the Port Mapping", func() {
updatedPortMapping := nomadApi.PortMapping{
Label: "ssh",
Value: 10022,
To: 22,
HostIP: "127.0.0.1",
}
updatedMappedPort := &dto.MappedPort{
ExposedPort: 22,
HostAddress: "127.0.0.1:10022",
}

tests.RemoveMethodFromMock(&apiMock.Mock, "LoadRunnerPortMappings")
apiMock.On("LoadRunnerPortMappings", mock.Anything).
Return([]nomadApi.PortMapping{updatedPortMapping}, nil).Once()

apiMock.On("MarkRunnerAsUsed", mock.Anything, mock.Anything).Return(nil).Once()
_, job := helpers.CreateTemplateJob()
jobID := tests.DefaultRunnerID
job.ID = &jobID
job.Name = &jobID
configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName)
s.Require().NotNil(configTaskGroup)
configTaskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUsedValue
configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey] = strconv.Itoa(1)
call.Return([]*nomadApi.Job{job}, nil)
r := NewNomadJob(s.TestCtx, tests.DefaultRunnerID, nil, apiMock, func(_ Runner) error {
return nil
})
runnerManager.usedRunners.Add(r.ID(), r)

s.Empty(r.MappedPorts())
runnerManager.Load(s.TestCtx)
s.Require().Len(r.MappedPorts(), 1)
s.Equal(updatedMappedPort, r.MappedPorts()[0])

err := r.Destroy(ErrLocalDestruction)
s.Require().NoError(err)
})
}

func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() {
Expand Down
29 changes: 29 additions & 0 deletions internal/runner/nomad_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"io"
"math"
"net/http"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -41,6 +42,7 @@

var (
ErrUnknownExecution = errors.New("unknown execution")
ErrInvalidPortMapping = errors.New("invalid port mapping")
ErrFileCopyFailed = errors.New("file copy failed")
ErrFileNotFound = errors.New("file not found or insufficient permissions")
ErrLocalDestruction DestroyReason = nomad.ErrLocalDestruction
Expand Down Expand Up @@ -116,6 +118,33 @@
return ports
}

// UpdateMappedPorts changes the local information about the runner's port mapping.
func (r *NomadJob) UpdateMappedPorts(ports []*dto.MappedPort) error {
mapping := make([]nomadApi.PortMapping, 0, len(ports))
for _, portMapping := range ports {
hostAddress := strings.Split(portMapping.HostAddress, ":")
const PartsInHostAddress = 2
if len(hostAddress) != PartsInHostAddress {
return ErrInvalidPortMapping

Check warning on line 128 in internal/runner/nomad_runner.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_runner.go#L128

Added line #L128 was not covered by tests
}
port, err := strconv.Atoi(hostAddress[1])
if err != nil {
return fmt.Errorf("failed parsing the port: %w", err)

Check warning on line 132 in internal/runner/nomad_runner.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_runner.go#L132

Added line #L132 was not covered by tests
}
if portMapping.ExposedPort > math.MaxInt32 {
return util.ErrMaxNumberExceeded

Check warning on line 135 in internal/runner/nomad_runner.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_runner.go#L135

Added line #L135 was not covered by tests
}

mapping = append(mapping, nomadApi.PortMapping{
Value: port,
To: int(portMapping.ExposedPort), //nolint:gosec // We check for an integer overflow right above.
HostIP: hostAddress[0],
})
}
r.portMappings = mapping
return nil
}

func (r *NomadJob) StoreExecution(id string, request *dto.ExecutionRequest) {
r.executions.Add(id, request)
}
Expand Down
14 changes: 14 additions & 0 deletions internal/runner/nomad_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ func (s *MainTestSuite) TestMappedPortsAreStoredCorrectly() {
s.Require().NoError(runner.Destroy(nil))
}

func (s *MainTestSuite) TestUpdateMappedPortsSetsPortsCorrectly() {
apiMock := &nomad.ExecutorAPIMock{}
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)

runner := NewNomadJob(s.TestCtx, tests.DefaultRunnerID, nil, apiMock, func(_ Runner) error { return nil })
s.Empty(runner.MappedPorts())

err := runner.UpdateMappedPorts(tests.DefaultMappedPorts)
s.Require().NoError(err)
s.Equal(tests.DefaultMappedPorts, runner.MappedPorts())

s.Require().NoError(runner.Destroy(nil))
}

func (s *MainTestSuite) TestMarshalRunner() {
apiMock := &nomad.ExecutorAPIMock{}
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
Expand Down
Loading