diff --git a/internal/runner/nomad_manager.go b/internal/runner/nomad_manager.go index df50d408..daea0292 100644 --- a/internal/runner/nomad_manager.go +++ b/internal/runner/nomad_manager.go @@ -240,28 +240,48 @@ func (m *NomadRunnerManager) loadSingleJob(ctx context.Context, job *nomadApi.Jo return newJob, isUsed, nil } -// updateUsedRunners handles the cleanup process of updating the used runner storage. -// This includes the clean deletion of the local references to the (replaced/deleted) runners. -// Only if removeDeleted is set, the runners that are only in newUsedRunners (and not in the main m.usedRunners) will be removed. +// updateUsedRunners handles the updating of the used runner storage. +// This includes the update of the local reference with new/changed values and the deletion of the additional runner reference. +// Only if removeDeleted is set, the runners that are only in m.usedRunners (and not in the newUsedRunners) will be removed. func (m *NomadRunnerManager) updateUsedRunners(newUsedRunners storage.Storage[Runner], removeDeleted bool) { for _, r := range m.usedRunners.List() { - var reason DestroyReason - if _, ok := newUsedRunners.Get(r.ID()); ok { - reason = ErrDestroyedAndReplaced + if newRunner, ok := newUsedRunners.Get(r.ID()); ok { + updatePortMapping(r, newRunner) } else if removeDeleted { - reason = ErrLocalDestruction - log.WithError(reason).WithField(dto.KeyRunnerID, r.ID()).Warn("Local runner cannot be recovered") - } - if reason != nil { + // Remove old reference + log.WithField(dto.KeyRunnerID, r.ID()).Warn("Local runner cannot be recovered") m.usedRunners.Delete(r.ID()) - if err := r.Destroy(reason); err != nil { + if err := r.Destroy(ErrLocalDestruction); err != nil { log.WithError(err).WithField(dto.KeyRunnerID, r.ID()).Warn("failed to destroy runner locally") } } } for _, r := range newUsedRunners.List() { - m.usedRunners.Add(r.ID(), r) + // Add runners not already in m.usedRunners + if _, ok := m.usedRunners.Get(r.ID()); !ok { + m.usedRunners.Add(r.ID(), r) + } + } +} + +// updatePortMapping sets the port mapping of target to the port mapping of updated. +// It then removes the updated reference to the runner. +func updatePortMapping(target Runner, updated Runner) { + defer func() { + // Remove updated reference. We keep using the old reference to not cancel running executions. + if err := updated.Destroy(ErrLocalDestruction); err != nil { + log.WithError(err).WithField(dto.KeyRunnerID, target.ID()).Warn("failed to destroy runner locally") + } + }() + + nomadRunner, ok := target.(*NomadJob) + if !ok { + log.WithField(dto.KeyRunnerID, target.ID()).Error("Unexpected handling of non-Nomad runner") + return + } + if err := nomadRunner.UpdateMappedPorts(updated.MappedPorts()); err != nil { + log.WithError(err).WithField(dto.KeyRunnerID, target.ID()).Error("Failed updating the port mapping") } } diff --git a/internal/runner/nomad_manager_test.go b/internal/runner/nomad_manager_test.go index 155ed439..1de44f4b 100644 --- a/internal/runner/nomad_manager_test.go +++ b/internal/runner/nomad_manager_test.go @@ -2,6 +2,7 @@ package runner import ( "context" + "io" "strconv" "testing" "time" @@ -10,6 +11,7 @@ import ( "github.com/openHPI/poseidon/internal/config" "github.com/openHPI/poseidon/internal/nomad" "github.com/openHPI/poseidon/pkg/dto" + "github.com/openHPI/poseidon/pkg/nullio" "github.com/openHPI/poseidon/pkg/storage" "github.com/openHPI/poseidon/pkg/util" "github.com/openHPI/poseidon/tests" @@ -550,6 +552,7 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { runnerManager := NewNomadRunnerManager(s.TestCtx, apiMock) environmentMock := createBasicEnvironmentMock(tests.DefaultEnvironmentIDAsInteger) environmentMock.On("ApplyPrewarmingPoolSize").Return(nil) + environmentMock.On("DeleteRunner", mock.Anything).Return(nil, true).Maybe() runnerManager.StoreEnvironment(environmentMock) s.Run("Stores unused runner", func() { @@ -608,6 +611,85 @@ func (s *MainTestSuite) TestNomadRunnerManager_Load() { <-time.After(time.Duration(timeout*2) * time.Second) s.Require().Zero(runnerManager.usedRunners.Length()) }) + + s.Run("Don't stop running executions", func() { + apiMock.On("MarkRunnerAsUsed", mock.AnythingOfType("string"), mock.AnythingOfType("int")).Return(nil).Once() + _, job := helpers.CreateTemplateJob() + jobID := tests.DefaultRunnerID + job.ID = &jobID + job.Name = &jobID + configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName) + s.Require().NotNil(configTaskGroup) + configTaskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUsedValue + configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey] = strconv.Itoa(1) + call.Return([]*nomadApi.Job{job}, nil) + + executionCtx, cancelExecution := context.WithCancel(s.TestCtx) + apiMock.On("ExecuteCommand", mock.Anything, tests.DefaultRunnerID, + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Run(func(_ mock.Arguments) { + <-executionCtx.Done() + }).Return(0, nil) + r := NewNomadJob(s.TestCtx, tests.DefaultRunnerID, nil, apiMock, func(_ Runner) error { + return nil + }) + runnerManager.usedRunners.Add(r.ID(), r) + r.StoreExecution(tests.DefaultExecutionID, &dto.ExecutionRequest{}) + exitInfo, _, err := r.ExecuteInteractively(s.TestCtx, tests.DefaultExecutionID, + &nullio.ReadWriter{Reader: nullio.Reader{Ctx: s.TestCtx}}, io.Discard, io.Discard) + s.Require().NoError(err) + + runnerManager.Load(s.TestCtx) + select { + case <-exitInfo: + s.FailNow("Execution stopped on recovery") + case <-time.After(tests.ShortTimeout): + } + + cancelExecution() + err = r.Destroy(ErrLocalDestruction) + s.Require().NoError(err) + }) + + s.Run("Update the Port Mapping", func() { + updatedPortMapping := nomadApi.PortMapping{ + Label: "ssh", + Value: 10022, + To: 22, + HostIP: "127.0.0.1", + } + updatedMappedPort := &dto.MappedPort{ + ExposedPort: 22, + HostAddress: "127.0.0.1:10022", + } + + tests.RemoveMethodFromMock(&apiMock.Mock, "LoadRunnerPortMappings") + apiMock.On("LoadRunnerPortMappings", mock.Anything). + Return([]nomadApi.PortMapping{updatedPortMapping}, nil).Once() + + apiMock.On("MarkRunnerAsUsed", mock.Anything, mock.Anything).Return(nil).Once() + _, job := helpers.CreateTemplateJob() + jobID := tests.DefaultRunnerID + job.ID = &jobID + job.Name = &jobID + configTaskGroup := nomad.FindTaskGroup(job, nomad.ConfigTaskGroupName) + s.Require().NotNil(configTaskGroup) + configTaskGroup.Meta[nomad.ConfigMetaUsedKey] = nomad.ConfigMetaUsedValue + configTaskGroup.Meta[nomad.ConfigMetaTimeoutKey] = strconv.Itoa(1) + call.Return([]*nomadApi.Job{job}, nil) + r := NewNomadJob(s.TestCtx, tests.DefaultRunnerID, nil, apiMock, func(_ Runner) error { + return nil + }) + runnerManager.usedRunners.Add(r.ID(), r) + + s.Empty(r.MappedPorts()) + runnerManager.Load(s.TestCtx) + s.Require().Len(r.MappedPorts(), 1) + s.Equal(updatedMappedPort, r.MappedPorts()[0]) + + err := r.Destroy(ErrLocalDestruction) + s.Require().NoError(err) + }) } func (s *MainTestSuite) TestNomadRunnerManager_checkPrewarmingPoolAlert() { diff --git a/internal/runner/nomad_runner.go b/internal/runner/nomad_runner.go index 3946a6ac..3d143a1c 100644 --- a/internal/runner/nomad_runner.go +++ b/internal/runner/nomad_runner.go @@ -11,6 +11,7 @@ import ( "io" "math" "net/http" + "strconv" "strings" "time" @@ -41,6 +42,7 @@ const ( var ( ErrUnknownExecution = errors.New("unknown execution") + ErrInvalidPortMapping = errors.New("invalid port mapping") ErrFileCopyFailed = errors.New("file copy failed") ErrFileNotFound = errors.New("file not found or insufficient permissions") ErrLocalDestruction DestroyReason = nomad.ErrLocalDestruction @@ -116,6 +118,33 @@ func (r *NomadJob) MappedPorts() []*dto.MappedPort { return ports } +// UpdateMappedPorts changes the local information about the runner's port mapping. +func (r *NomadJob) UpdateMappedPorts(ports []*dto.MappedPort) error { + mapping := make([]nomadApi.PortMapping, 0, len(ports)) + for _, portMapping := range ports { + hostAddress := strings.Split(portMapping.HostAddress, ":") + const PartsInHostAddress = 2 + if len(hostAddress) != PartsInHostAddress { + return ErrInvalidPortMapping + } + port, err := strconv.Atoi(hostAddress[1]) + if err != nil { + return fmt.Errorf("failed parsing the port: %w", err) + } + if portMapping.ExposedPort > math.MaxInt32 { + return util.ErrMaxNumberExceeded + } + + mapping = append(mapping, nomadApi.PortMapping{ + Value: port, + To: int(portMapping.ExposedPort), //nolint:gosec // We check for an integer overflow right above. + HostIP: hostAddress[0], + }) + } + r.portMappings = mapping + return nil +} + func (r *NomadJob) StoreExecution(id string, request *dto.ExecutionRequest) { r.executions.Add(id, request) } diff --git a/internal/runner/nomad_runner_test.go b/internal/runner/nomad_runner_test.go index 705e9cfd..c645cbd0 100644 --- a/internal/runner/nomad_runner_test.go +++ b/internal/runner/nomad_runner_test.go @@ -46,6 +46,20 @@ func (s *MainTestSuite) TestMappedPortsAreStoredCorrectly() { s.Require().NoError(runner.Destroy(nil)) } +func (s *MainTestSuite) TestUpdateMappedPortsSetsPortsCorrectly() { + apiMock := &nomad.ExecutorAPIMock{} + apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil) + + runner := NewNomadJob(s.TestCtx, tests.DefaultRunnerID, nil, apiMock, func(_ Runner) error { return nil }) + s.Empty(runner.MappedPorts()) + + err := runner.UpdateMappedPorts(tests.DefaultMappedPorts) + s.Require().NoError(err) + s.Equal(tests.DefaultMappedPorts, runner.MappedPorts()) + + s.Require().NoError(runner.Destroy(nil)) +} + func (s *MainTestSuite) TestMarshalRunner() { apiMock := &nomad.ExecutorAPIMock{} apiMock.On("DeleteJob", mock.AnythingOfType("string")).Return(nil)