Skip to content

Commit

Permalink
control-service: improve jobs synchronizer error handling
Browse files Browse the repository at this point in the history
Why
As part of VEP-2272, we need to introduce a process for synchronizing data jobs from the database to Kubernetes. In the event of a data job deployment failure (due to user or platform errors), the current implementation attempts to deploy the data job during every synchronization cycle.

What
We have added a data job deployment status field to the desired_data_job_deployment table. This status is used to determine whether the deployment failed in the previous cycle and should be skipped in the current one.

Testing done
Unit tests

Signed-off-by: Miroslav Ivanov miroslavi@vmware.com
  • Loading branch information
mivanov1988 committed Oct 3, 2023
1 parent 7ae16f5 commit ab3c4b2
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public static JobDeployment toJobDeployment(
}

public static DesiredDataJobDeployment toDesiredDataJobDeployment(
JobDeployment jobDeploymentStatus) {
JobDeployment jobDeployment) {
DesiredDataJobDeployment deployment = new DesiredDataJobDeployment();
deployment.setDataJobName(jobDeploymentStatus.getDataJobName());
deployment.setEnabled(jobDeploymentStatus.getEnabled());
deployment.setDataJobName(jobDeployment.getDataJobName());
deployment.setEnabled(jobDeployment.getEnabled());

DataJobResources dataJobResources = jobDeploymentStatus.getResources();
DataJobResources dataJobResources = jobDeployment.getResources();

if (dataJobResources != null) {
DataJobDeploymentResources deploymentResources =
Expand All @@ -57,8 +57,8 @@ public static DesiredDataJobDeployment toDesiredDataJobDeployment(
deployment.setResources(deploymentResources);
}

deployment.setGitCommitSha(jobDeploymentStatus.getGitCommitSha());
deployment.setPythonVersion(jobDeploymentStatus.getPythonVersion());
deployment.setGitCommitSha(jobDeployment.getGitCommitSha());
deployment.setPythonVersion(jobDeployment.getPythonVersion());

return deployment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.vmware.taurus.service.deploy;

import com.vmware.taurus.exception.*;
import com.vmware.taurus.service.diag.methodintercept.Measurable;
import com.vmware.taurus.service.kubernetes.DataJobsKubernetesService;
import com.vmware.taurus.service.model.*;
import com.vmware.taurus.service.notification.NotificationContent;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class DeploymentServiceV2 {
* Kubernetes.
* @param sendNotification if it is true the method will send a notification to the end user.
*/
@Measurable(includeArg = 0, argName = "data_job")
public void updateDeployment(
DataJob dataJob,
DesiredDataJobDeployment desiredJobDeployment,
Expand All @@ -71,6 +73,15 @@ public void updateDeployment(
return;
}

if (DeploymentStatus.USER_ERROR.equals(desiredJobDeployment.getStatus()) ||
DeploymentStatus.PLATFORM_ERROR.equals(desiredJobDeployment.getStatus())) {
log.debug(
"Skipping the data job [job_name={}] deployment due to the previously failed deployment [status={}]",
dataJob.getName(),
desiredJobDeployment.getStatus());
return;
}

try {
log.info("Starting deployment of job {}", desiredJobDeployment.getDataJobName());
deploymentProgress.started(dataJob.getJobConfig(), desiredJobDeployment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ public class JobImageDeployerV2 {
* @param isJobDeploymentPresentInKubernetes if it is true the data job deployment is present in
* Kubernetes.
* @param jobImageName the data job docker image name.
* @return true if it is successful and false if not.
*
* @return {@link ActualDataJobDeployment} an actual data job deployment if the data job is successfully deployed
* and null in case of an error.
*/
public ActualDataJobDeployment scheduleJob(
DataJob dataJob,
Expand Down Expand Up @@ -172,6 +174,9 @@ private Map<String, String> getSystemDefaults() {
* @param isJobDeploymentPresentInKubernetes if it is true the data job deployment is present in
* Kubernetes.
* @param jobImageName the data job docker image name.
*
* @return {@link ActualDataJobDeployment} an actual data job deployment if the data job is successfully deployed
* and null in case of an error.
*/
private ActualDataJobDeployment updateCronJob(
DataJob dataJob,
Expand Down Expand Up @@ -199,8 +204,8 @@ private ActualDataJobDeployment updateCronJob(

ActualDataJobDeployment actualJobDeployment = null;

if (isJobDeploymentPresentInKubernetes && actualDataJobDeployment != null) {
if (!desiredDeploymentVersionSha.equals(actualDataJobDeployment.getDeploymentVersionSha())) {
if (isJobDeploymentPresentInKubernetes) {
if (actualDataJobDeployment == null || !desiredDeploymentVersionSha.equals(actualDataJobDeployment.getDeploymentVersionSha())) {
dataJobsKubernetesService.updateCronJob(desiredCronJob);
actualJobDeployment =
DeploymentModelConverter.toActualJobDeployment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@
@EqualsAndHashCode(callSuper = false)
@ToString
@Entity
public class DesiredDataJobDeployment extends BaseDataJobDeployment {}
public class DesiredDataJobDeployment extends BaseDataJobDeployment {

private DeploymentStatus status;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.vmware.taurus.service.model.ActualDataJobDeployment;
import com.vmware.taurus.service.model.DeploymentStatus;
import com.vmware.taurus.service.repository.ActualJobDeploymentRepository;
import com.vmware.taurus.service.repository.DesiredJobDeploymentRepository;
import com.vmware.taurus.service.repository.JobsRepository;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
Expand Down Expand Up @@ -37,16 +38,20 @@ public class DeploymentMonitor {

private final ActualJobDeploymentRepository actualJobDeploymentRepository;

private final DesiredJobDeploymentRepository desiredJobDeploymentRepository;

private final Map<String, Integer> currentStatuses = new ConcurrentHashMap<>();

@Autowired
public DeploymentMonitor(
MeterRegistry meterRegistry,
JobsRepository jobsRepository,
ActualJobDeploymentRepository actualJobDeploymentRepository) {
ActualJobDeploymentRepository actualJobDeploymentRepository,
DesiredJobDeploymentRepository desiredJobDeploymentRepository) {
this.meterRegistry = meterRegistry;
this.jobsRepository = jobsRepository;
this.actualJobDeploymentRepository = actualJobDeploymentRepository;
this.desiredJobDeploymentRepository = desiredJobDeploymentRepository;
}

/**
Expand Down Expand Up @@ -95,14 +100,13 @@ private boolean saveDataJobStatus(
final String dataJobName,
final DeploymentStatus deploymentStatus,
ActualDataJobDeployment actualDataJobDeployment) {
if (jobsRepository.updateDataJobLatestJobDeploymentStatusByName(dataJobName, deploymentStatus)
> 0) {
switch (deploymentStatus) {
case SUCCESS:
if (actualDataJobDeployment != null) {
actualJobDeploymentRepository.save(actualDataJobDeployment);
}
if (jobsRepository.updateDataJobLatestJobDeploymentStatusByName(dataJobName, deploymentStatus) > 0) {

if (DeploymentStatus.SUCCESS.equals(deploymentStatus) && actualDataJobDeployment != null) {
actualJobDeploymentRepository.save(actualDataJobDeployment);
}

desiredJobDeploymentRepository.updateDesiredDataJobDeploymentStatusByDataJobName(dataJobName, deploymentStatus);
return true;
}
log.debug("Data job: {} was deleted or hasn't been created", dataJobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package com.vmware.taurus.service.repository;

import com.vmware.taurus.service.model.DeploymentStatus;
import com.vmware.taurus.service.model.DesiredDataJobDeployment;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
Expand Down Expand Up @@ -36,4 +37,12 @@ public interface DesiredJobDeploymentRepository
+ " :dataJobName")
int updateDesiredDataJobDeploymentEnabledByDataJobName(
@Param(value = "dataJobName") String dataJobName, @Param(value = "enabled") Boolean enabled);

@Transactional
@Modifying(clearAutomatically = true)
@Query(
"update DesiredDataJobDeployment d set d.status = :status where d.dataJobName ="
+ " :dataJobName")
int updateDesiredDataJobDeploymentStatusByDataJobName(
@Param(value = "dataJobName") String dataJobName, @Param(value = "status") DeploymentStatus status);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table if exists desired_data_job_deployment
add column if not exists status varchar;
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2021-2023 VMware, Inc.
* SPDX-License-Identifier: Apache-2.0
*/

package com.vmware.taurus.service.deploy;

import com.vmware.taurus.ControlplaneApplication;
import com.vmware.taurus.service.model.*;
import io.kubernetes.client.openapi.ApiException;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;

import java.io.IOException;

@SpringBootTest(classes = ControlplaneApplication.class)
public class DeploymentServiceV2TestIT {

@Autowired
private DeploymentServiceV2 deploymentService;

@MockBean
private DeploymentProgress deploymentProgress;

@MockBean
private JobImageBuilder jobImageBuilder;

@Test
public void updateDeployment_withDesiredDeploymentStatusUserError_shouldSkipDeployment() throws IOException, InterruptedException, ApiException {
updateDeployment(DeploymentStatus.USER_ERROR, 0);
}

@Test
public void updateDeployment_withDesiredDeploymentStatusPlatformError_shouldSkipDeployment() throws IOException, InterruptedException, ApiException {
updateDeployment(DeploymentStatus.PLATFORM_ERROR, 0);
}

@Test
public void updateDeployment_withDesiredDeploymentStatusSuccess_shouldStartDeployment() throws IOException, InterruptedException, ApiException {
updateDeployment(DeploymentStatus.SUCCESS, 1);
}

@Test
public void updateDeployment_withDesiredDeploymentStatusNone_shouldStartDeployment() throws IOException, InterruptedException, ApiException {
updateDeployment(DeploymentStatus.NONE, 1);
}

private void updateDeployment(DeploymentStatus deploymentStatus, int deploymentProgressStartedInvocations) throws IOException, InterruptedException, ApiException {
DesiredDataJobDeployment desiredDataJobDeployment = new DesiredDataJobDeployment();
desiredDataJobDeployment.setStatus(deploymentStatus);
DataJob dataJob = new DataJob();
dataJob.setJobConfig(new JobConfig());
Mockito.when(jobImageBuilder.buildImage(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(false);

deploymentService.updateDeployment(dataJob, desiredDataJobDeployment, new ActualDataJobDeployment(), true, true);

Mockito.verify(deploymentProgress, Mockito.times(deploymentProgressStartedInvocations)).started(dataJob.getJobConfig(), desiredDataJobDeployment);
}
}

0 comments on commit ab3c4b2

Please sign in to comment.