Skip to content

Commit

Permalink
Fix runner replacement on recovery
Browse files Browse the repository at this point in the history
by now just updating the port mapping.
  • Loading branch information
mpass99 committed Sep 3, 2024
1 parent add3ad7 commit 592d727
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 12 deletions.
44 changes: 32 additions & 12 deletions internal/runner/nomad_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,28 +240,48 @@ func (m *NomadRunnerManager) loadSingleJob(ctx context.Context, job *nomadApi.Jo
return newJob, isUsed, nil
}

// updateUsedRunners handles the cleanup process of updating the used runner storage.
// This includes the clean deletion of the local references to the (replaced/deleted) runners.
// Only if removeDeleted is set, the runners that are only in newUsedRunners (and not in the main m.usedRunners) will be removed.
// updateUsedRunners handles the updating of the used runner storage.
// This includes the update of the local reference with new/changed values and the deletion of the additional runner reference.
// Only if removeDeleted is set, the runners that are only in m.usedRunners (and not in the newUsedRunners) will be removed.
func (m *NomadRunnerManager) updateUsedRunners(newUsedRunners storage.Storage[Runner], removeDeleted bool) {
for _, r := range m.usedRunners.List() {
var reason DestroyReason
if _, ok := newUsedRunners.Get(r.ID()); ok {
reason = ErrDestroyedAndReplaced
if newRunner, ok := newUsedRunners.Get(r.ID()); ok {
updatePortMapping(r, newRunner)
} else if removeDeleted {
reason = ErrLocalDestruction
log.WithError(reason).WithField(dto.KeyRunnerID, r.ID()).Warn("Local runner cannot be recovered")
}
if reason != nil {
// Remove old reference
log.WithField(dto.KeyRunnerID, r.ID()).Warn("Local runner cannot be recovered")

Check warning on line 252 in internal/runner/nomad_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_manager.go#L252

Added line #L252 was not covered by tests
m.usedRunners.Delete(r.ID())
if err := r.Destroy(reason); err != nil {
if err := r.Destroy(ErrLocalDestruction); err != nil {

Check warning on line 254 in internal/runner/nomad_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_manager.go#L254

Added line #L254 was not covered by tests
log.WithError(err).WithField(dto.KeyRunnerID, r.ID()).Warn("failed to destroy runner locally")
}
}
}

for _, r := range newUsedRunners.List() {
m.usedRunners.Add(r.ID(), r)
// Add runners not already in m.usedRunners
if _, ok := m.usedRunners.Get(r.ID()); !ok {
m.usedRunners.Add(r.ID(), r)
}
}
}

// updatePortMapping sets the port mapping of target to the port mapping of updated.
// It then removes the updated reference to the runner.
func updatePortMapping(target Runner, updated Runner) {
defer func() {
// Remove updated reference. We keep using the old reference to not cancel running executions.
if err := updated.Destroy(ErrLocalDestruction); err != nil {
log.WithError(err).WithField(dto.KeyRunnerID, target.ID()).Warn("failed to destroy runner locally")

Check warning on line 274 in internal/runner/nomad_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_manager.go#L274

Added line #L274 was not covered by tests
}
}()

nomadRunner, ok := target.(*NomadJob)
if !ok {
log.WithField(dto.KeyRunnerID, target.ID()).Error("Unexpected handling of non-Nomad runner")
return

Check warning on line 281 in internal/runner/nomad_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_manager.go#L280-L281

Added lines #L280 - L281 were not covered by tests
}
if err := nomadRunner.UpdateMappedPorts(updated.MappedPorts()); err != nil {
log.WithError(err).WithField(dto.KeyRunnerID, target.ID()).Error("Failed updating the port mapping")

Check warning on line 284 in internal/runner/nomad_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_manager.go#L284

Added line #L284 was not covered by tests
}
}

Expand Down
82 changes: 82 additions & 0 deletions internal/runner/nomad_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package runner

import (
"context"
"io"
"strconv"
"testing"
"time"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/openHPI/poseidon/internal/config"
"github.com/openHPI/poseidon/internal/nomad"
"github.com/openHPI/poseidon/pkg/dto"
"github.com/openHPI/poseidon/pkg/nullio"
"github.com/openHPI/poseidon/pkg/storage"
"github.com/openHPI/poseidon/pkg/util"
"github.com/openHPI/poseidon/tests"
Expand Down Expand Up @@ -550,6 +552,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
runnerManager := NewNomadRunnerManager(s.TestCtx, apiMock)
environmentMock := createBasicEnvironmentMock(tests.DefaultEnvironmentIDAsInteger)
environmentMock.On("ApplyPrewarmingPoolSize").Return(nil)
environmentMock.On("DeleteRunner", mock.Anything).Return(nil, true).Maybe()
runnerManager.StoreEnvironment(environmentMock)

s.Run("Stores unused runner", func() {
Expand Down Expand Up @@ -608,6 +611,85 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() {
<-time.After(time.Duration(timeout*2) * time.Second)
s.Require().Zero(runnerManager.usedRunners.Length())
})

s.Run("Don't stop running executions", func() {
apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil).Once()
_, job := helpers.CreateTemplateJob()
jobID := tests.DefaultRunnerID
job.ID = &jobID
job.Name = &jobID
configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName)
s.Require().NotNil(configTaskGroup)
configTaskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUsedValue
configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey] = strconv.Itoa(1)
call.Return([]*nomadApi.Job{job}, nil)

executionCtx, cancelExecution := context.WithCancel(s.TestCtx)
apiMock.On("ExecuteCommand", mock.Anything, tests.DefaultRunnerID,
mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Run(func(_ mock.Arguments) {
<-executionCtx.Done()
}).Return(0, nil)
r := NewNomadJob(s.TestCtx, tests.DefaultRunnerID, nil, apiMock, func(_ Runner) error {
return nil
})
runnerManager.usedRunners.Add(r.ID(), r)
r.StoreExecution(tests.DefaultExecutionID, &dto.ExecutionRequest{})
exitInfo, _, err := r.ExecuteInteractively(s.TestCtx, tests.DefaultExecutionID,
&nullio.ReadWriter{Reader: nullio.Reader{Ctx: s.TestCtx}}, io.Discard, io.Discard)
s.Require().NoError(err)

runnerManager.Load(s.TestCtx)
select {
case <-exitInfo:
s.FailNow("Execution stopped on recovery")
case <-time.After(tests.ShortTimeout):
}

cancelExecution()
err = r.Destroy(ErrLocalDestruction)
s.Require().NoError(err)
})

s.Run("Update the Port Mapping", func() {
updatedPortMapping := nomadApi.PortMapping{
Label: "ssh",
Value: 10022,
To: 22,
HostIP: "127.0.0.1",
}
updatedMappedPort := &dto.MappedPort{
ExposedPort: 22,
HostAddress: "127.0.0.1:10022",
}

tests.RemoveMethodFromMock(&apiMock.Mock, "LoadRunnerPortMappings")
apiMock.On("LoadRunnerPortMappings", mock.Anything).
Return([]nomadApi.PortMapping{updatedPortMapping}, nil).Once()

apiMock.On("MarkRunnerAsUsed", mock.Anything, mock.Anything).Return(nil).Once()
_, job := helpers.CreateTemplateJob()
jobID := tests.DefaultRunnerID
job.ID = &jobID
job.Name = &jobID
configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName)
s.Require().NotNil(configTaskGroup)
configTaskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUsedValue
configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey] = strconv.Itoa(1)
call.Return([]*nomadApi.Job{job}, nil)
r := NewNomadJob(s.TestCtx, tests.DefaultRunnerID, nil, apiMock, func(_ Runner) error {
return nil
})
runnerManager.usedRunners.Add(r.ID(), r)

s.Empty(r.MappedPorts())
runnerManager.Load(s.TestCtx)
s.Require().Len(r.MappedPorts(), 1)
s.Equal(updatedMappedPort, r.MappedPorts()[0])

err := r.Destroy(ErrLocalDestruction)
s.Require().NoError(err)
})
}

func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() {
Expand Down
29 changes: 29 additions & 0 deletions internal/runner/nomad_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"math"
"net/http"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -41,6 +42,7 @@ const (

var (
ErrUnknownExecution = errors.New("unknown execution")
ErrInvalidPortMapping = errors.New("invalid port mapping")
ErrFileCopyFailed = errors.New("file copy failed")
ErrFileNotFound = errors.New("file not found or insufficient permissions")
ErrLocalDestruction DestroyReason = nomad.ErrLocalDestruction
Expand Down Expand Up @@ -116,6 +118,33 @@ func (r *NomadJob) MappedPorts() []*dto.MappedPort {
return ports
}

// UpdateMappedPorts changes the local information about the runner's port mapping.
func (r *NomadJob) UpdateMappedPorts(ports []*dto.MappedPort) error {
mapping := make([]nomadApi.PortMapping, 0, len(ports))
for _, portMapping := range ports {
hostAddress := strings.Split(portMapping.HostAddress, ":")
const PartsInHostAddress = 2
if len(hostAddress) != PartsInHostAddress {
return ErrInvalidPortMapping

Check warning on line 128 in internal/runner/nomad_runner.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_runner.go#L128

Added line #L128 was not covered by tests
}
port, err := strconv.Atoi(hostAddress[1])
if err != nil {
return fmt.Errorf("failed parsing the port: %w", err)

Check warning on line 132 in internal/runner/nomad_runner.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_runner.go#L132

Added line #L132 was not covered by tests
}
if portMapping.ExposedPort > math.MaxInt32 {
return util.ErrMaxNumberExceeded

Check warning on line 135 in internal/runner/nomad_runner.go

View check run for this annotation

Codecov / codecov/patch

internal/runner/nomad_runner.go#L135

Added line #L135 was not covered by tests
}

mapping = append(mapping, nomadApi.PortMapping{
Value: port,
To: int(portMapping.ExposedPort), //nolint:gosec // We check for an integer overflow right above.
HostIP: hostAddress[0],
})
}
r.portMappings = mapping
return nil
}

func (r *NomadJob) StoreExecution(id string, request *dto.ExecutionRequest) {
r.executions.Add(id, request)
}
Expand Down
14 changes: 14 additions & 0 deletions internal/runner/nomad_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ func (s *MainTestSuite) TestMappedPortsAreStoredCorrectly() {
s.Require().NoError(runner.Destroy(nil))
}

func (s *MainTestSuite) TestUpdateMappedPortsSetsPortsCorrectly() {
apiMock := &nomad.ExecutorAPIMock{}
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)

runner := NewNomadJob(s.TestCtx, tests.DefaultRunnerID, nil, apiMock, func(_ Runner) error { return nil })
s.Empty(runner.MappedPorts())

err := runner.UpdateMappedPorts(tests.DefaultMappedPorts)
s.Require().NoError(err)
s.Equal(tests.DefaultMappedPorts, runner.MappedPorts())

s.Require().NoError(runner.Destroy(nil))
}

func (s *MainTestSuite) TestMarshalRunner() {
apiMock := &nomad.ExecutorAPIMock{}
apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)
Expand Down

0 comments on commit 592d727

Please sign in to comment.