Skip to content

Commit

Permalink
control-service: async deployment deletion
Browse files Browse the repository at this point in the history
Why:
As part of VEP-2272, we need to introduce a process for synchronizing data jobs from the database to Kubernetes.
Currently, the process lacks the ability to delete deployments asynchronously.

What:
We have introduced an asynchronous method for deployment deletion. It is based on the logic that
if the desired deployment is null and the actual deployment is not null, the process performs deployment deletion.

Testing done:
Integration tests.

Signed-off-by: Miroslav Ivanov miroslavi@vmware.com
  • Loading branch information
mivanov1988 committed Oct 10, 2023
1 parent 3ecbcdd commit 1f95184
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ public void testDataJobDeploymentCrud() throws Exception {
Assertions.assertNotEquals(lastDeployedDateInitial, lastDeployedDateShouldBeChanged);
Assertions.assertNotEquals(
deploymentVersionShaShouldNotBeChanged, deploymentVersionShaShouldBeChanged);

// Deletes deployment
desiredJobDeploymentRepository.deleteById(testJobName);
dataJobsSynchronizer.synchronizeDataJob(
dataJob, null, actualDataJobDeployment, true);
Assertions.assertFalse(deploymentService.readDeployment(testJobName).isPresent());
Assertions.assertFalse(actualJobDeploymentRepository.findById(testJobName).isPresent());
}

private ActualDataJobDeployment verifyDeploymentStatus(boolean enabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import com.vmware.taurus.controlplane.model.data.DataJobMode;
import com.vmware.taurus.exception.ExternalSystemError;
import com.vmware.taurus.service.JobsService;
import com.vmware.taurus.service.deploy.DataJobDeploymentPropertiesConfig;
import com.vmware.taurus.service.deploy.DeploymentService;
import com.vmware.taurus.service.deploy.DeploymentServiceV2;
import com.vmware.taurus.service.diag.OperationContext;
import com.vmware.taurus.service.model.JobDeploymentStatus;
import io.swagger.v3.oas.annotations.tags.Tag;
Expand Down Expand Up @@ -51,14 +53,27 @@ public class DataJobsDeploymentController implements DataJobsDeploymentApi {

@Autowired private OperationContext operationContext;

@Autowired private DeploymentServiceV2 deploymentServiceV2;

@Autowired private DataJobDeploymentPropertiesConfig dataJobDeploymentPropertiesConfig;

@Override
public ResponseEntity<Void> deploymentDelete(
String teamName, String jobName, String deploymentId) {
if (jobsService.jobWithTeamExists(jobName, teamName)) {
// TODO: deploymentId not implemented
if (jobName != null) {
deploymentService.deleteDeployment(jobName);
return ResponseEntity.accepted().build();
if (dataJobDeploymentPropertiesConfig
.getWriteTos()
.contains(DataJobDeploymentPropertiesConfig.WriteTo.K8S)) {
deploymentService.deleteDeployment(jobName);
return ResponseEntity.accepted().build();
} else if (dataJobDeploymentPropertiesConfig
.getWriteTos()
.contains(DataJobDeploymentPropertiesConfig.WriteTo.DB)) {
deploymentServiceV2.deleteDesiredDeployment(jobName);
return ResponseEntity.accepted().build();
}
}
return ResponseEntity.notFound().build();
}
Expand All @@ -76,8 +91,12 @@ public ResponseEntity<Void> deploymentPatch(
if (job.isPresent()) {
var jobDeployment =
ToModelApiConverter.toJobDeployment(teamName, jobName, dataJobDeployment);
deploymentService.patchDeployment(job.get(), jobDeployment);
return ResponseEntity.accepted().build();
if (dataJobDeploymentPropertiesConfig
.getWriteTos()
.contains(DataJobDeploymentPropertiesConfig.WriteTo.K8S)) {
deploymentService.patchDeployment(job.get(), jobDeployment);
return ResponseEntity.accepted().build();
}
}
}
return ResponseEntity.notFound().build();
Expand All @@ -89,8 +108,12 @@ public ResponseEntity<List<DataJobDeploymentStatus>> deploymentList(
if (jobsService.jobWithTeamExists(jobName, teamName)) {
// TODO: deploymentId and mode not implemented
List<DataJobDeploymentStatus> deployments = Collections.emptyList();
Optional<JobDeploymentStatus> jobDeploymentStatus =
deploymentService.readDeployment(jobName.toLowerCase());
Optional<JobDeploymentStatus> jobDeploymentStatus = Optional.empty();
if (dataJobDeploymentPropertiesConfig
.getReadDataSource()
.equals(DataJobDeploymentPropertiesConfig.ReadFrom.K8S)) {
jobDeploymentStatus = deploymentService.readDeployment(jobName.toLowerCase());
}
if (jobDeploymentStatus.isPresent()) {
deployments =
Arrays.asList(ToApiModelConverter.toDataJobDeploymentStatus(jobDeploymentStatus.get()));
Expand All @@ -105,8 +128,12 @@ public ResponseEntity<DataJobDeploymentStatus> deploymentRead(
String teamName, String jobName, String deploymentId) {
if (jobsService.jobWithTeamExists(jobName, teamName)) {
// TODO: deploymentId are not implemented.
Optional<JobDeploymentStatus> jobDeploymentStatus =
deploymentService.readDeployment(jobName.toLowerCase());
Optional<JobDeploymentStatus> jobDeploymentStatus = Optional.empty();
if (dataJobDeploymentPropertiesConfig
.getReadDataSource()
.equals(DataJobDeploymentPropertiesConfig.ReadFrom.K8S)) {
jobDeploymentStatus = deploymentService.readDeployment(jobName.toLowerCase());
}
if (jobDeploymentStatus.isPresent()) {
return ResponseEntity.ok(
ToApiModelConverter.toDataJobDeploymentStatus(jobDeploymentStatus.get()));
Expand All @@ -128,13 +155,17 @@ public ResponseEntity<Void> deploymentUpdate(
if (job.isPresent()) {
var jobDeployment =
ToModelApiConverter.toJobDeployment(teamName, jobName.toLowerCase(), dataJobDeployment);
// TODO: Consider using a Task-oriented API approach
deploymentService.updateDeployment(
job.get(),
jobDeployment,
sendNotification,
operationContext.getUser(),
operationContext.getOpId());
if (dataJobDeploymentPropertiesConfig
.getWriteTos()
.contains(DataJobDeploymentPropertiesConfig.WriteTo.K8S)) {
// TODO: Consider using a Task-oriented API approach
deploymentService.updateDeployment(
job.get(),
jobDeployment,
sendNotification,
operationContext.getUser(),
operationContext.getOpId());
}

return ResponseEntity.accepted().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ void synchronizeDataJob(
desiredDataJobDeployment,
actualDataJobDeployment,
isDeploymentPresentInKubernetes);
} else if (actualDataJobDeployment != null) {
deploymentService.deleteActualDeployment(dataJob.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import com.vmware.taurus.service.notification.NotificationContent;
import com.vmware.taurus.service.repository.ActualJobDeploymentRepository;
import com.vmware.taurus.service.repository.DesiredJobDeploymentRepository;
import com.vmware.taurus.service.repository.JobsRepository;
import io.kubernetes.client.openapi.ApiException;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -43,6 +45,7 @@ public class DeploymentServiceV2 {
private final DesiredJobDeploymentRepository desiredJobDeploymentRepository;
private final ActualJobDeploymentRepository actualJobDeploymentRepository;
private final DataJobsKubernetesService dataJobsKubernetesService;
private final JobsRepository jobsRepository;

/**
* Updates or creates a Kubernetes CronJob based on the provided configuration.
Expand Down Expand Up @@ -128,6 +131,25 @@ public void updateDeployment(
}
}

public void deleteDesiredDeployment(String dataJobName) {
desiredJobDeploymentRepository.deleteById(dataJobName);
}

public void deleteActualDeployment(String dataJobName) {
if (this.deploymentExistsOrInProgress(dataJobName)) {
jobImageBuilder.cancelBuildingJob(dataJobName);
jobImageDeployer.unScheduleJob(dataJobName);
jobsRepository.updateDataJobEnabledByName(dataJobName, false);
}

actualJobDeploymentRepository.deleteById(dataJobName);
deploymentProgress.deleted(dataJobName);
}

public Optional<ActualDataJobDeployment> readDeployment(String dataJobName) {
return actualJobDeploymentRepository.findById(dataJobName);
}

public void updateDeploymentEnabledStatus(String dataJobName, Boolean enabledStatus) {
desiredJobDeploymentRepository.updateDesiredDataJobDeploymentEnabledByDataJobName(
dataJobName, enabledStatus);
Expand Down Expand Up @@ -172,4 +194,9 @@ private void handleException(
NotificationContent.getPlatformErrorBody(),
sendNotification);
}

private boolean deploymentExistsOrInProgress(String dataJobName) {
return jobImageBuilder.isBuildingJobInProgress(dataJobName)
|| readDeployment(dataJobName).isPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.vmware.taurus.service.model.*;
import com.vmware.taurus.service.notification.NotificationContent;
import io.kubernetes.client.openapi.ApiException;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -103,6 +104,18 @@ public ActualDataJobDeployment scheduleJob(
}
}

public void unScheduleJob(@NonNull String dataJobName) {
String cronJobName = getCronJobName(dataJobName);
try {
if (dataJobsKubernetesService.listCronJobs().contains(cronJobName)) {
dataJobsKubernetesService.deleteCronJob(cronJobName);
}
} catch (ApiException e) {
// TODO: technically if code is 4xx, k8s errors are more of Bug exception ...
throw new KubernetesException("Failed to un-schedule job", e);
}
}

private ActualDataJobDeployment catchApiException(
DataJob dataJob, Boolean sendNotification, ApiException apiException) {
if (apiException.getCode() == HttpStatus.UNPROCESSABLE_ENTITY.value()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package com.vmware.taurus.service.deploy;

import com.vmware.taurus.ServiceApp;
import com.vmware.taurus.service.model.ActualDataJobDeployment;
import com.vmware.taurus.service.model.DataJob;
import com.vmware.taurus.service.model.DesiredDataJobDeployment;
import io.kubernetes.client.openapi.ApiException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -107,6 +110,70 @@ void synchronizeDataJobs_synchronizationEnabledTrueAndWriteToDbFalse_shouldSkipS
.findAllActualDeploymentNamesFromKubernetes();
}

@Test
void synchronizeDataJob_desiredDeploymentNullAndActualDeploymentNull_shouldSkipSynchronization() {
DataJob dataJob = new DataJob();
dataJob.setName("test-job-name");
boolean isDeploymentPresentInKubernetes = false;
DesiredDataJobDeployment desiredDataJobDeployment = null;
ActualDataJobDeployment actualDataJobDeployment = null;

dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);

Mockito.verify(deploymentService, Mockito.times(0))
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
Mockito.verify(deploymentService, Mockito.times(0))
.deleteActualDeployment(dataJob.getName());
}

@Test
void synchronizeDataJob_desiredDeploymentNullAndActualDeploymentNotNull_shouldDeleteJobDeployment() {
DataJob dataJob = new DataJob();
dataJob.setName("test-job-name");
boolean isDeploymentPresentInKubernetes = true;
DesiredDataJobDeployment desiredDataJobDeployment = null;
ActualDataJobDeployment actualDataJobDeployment = new ActualDataJobDeployment();

dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);

Mockito.verify(deploymentService, Mockito.times(0))
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
Mockito.verify(deploymentService, Mockito.times(1))
.deleteActualDeployment(dataJob.getName());
}

@Test
void synchronizeDataJob_desiredDeploymentNotNullAndActualDeploymentNotNull_shouldUpdateJobDeployment() {
DataJob dataJob = new DataJob();
dataJob.setName("test-job-name");
boolean isDeploymentPresentInKubernetes = true;
DesiredDataJobDeployment desiredDataJobDeployment = new DesiredDataJobDeployment();
ActualDataJobDeployment actualDataJobDeployment = new ActualDataJobDeployment();

dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);

Mockito.verify(deploymentService, Mockito.times(1))
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
Mockito.verify(deploymentService, Mockito.times(0))
.deleteActualDeployment(dataJob.getName());
}

@Test
void synchronizeDataJob_desiredDeploymentNotNullAndActualDeploymentNull_shouldUpdateJobDeployment() {
DataJob dataJob = new DataJob();
dataJob.setName("test-job-name");
boolean isDeploymentPresentInKubernetes = true;
DesiredDataJobDeployment desiredDataJobDeployment = new DesiredDataJobDeployment();
ActualDataJobDeployment actualDataJobDeployment = null;

dataJobsSynchronizer.synchronizeDataJob(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);

Mockito.verify(deploymentService, Mockito.times(1))
.updateDeployment(dataJob, desiredDataJobDeployment, actualDataJobDeployment, isDeploymentPresentInKubernetes);
Mockito.verify(deploymentService, Mockito.times(0))
.deleteActualDeployment(dataJob.getName());
}

void enableSynchronizationProcess() {
initSynchronizationProcessConfig(true, true);
}
Expand Down

0 comments on commit 1f95184

Please sign in to comment.