Skip to content

Commit

Permalink
Do not send notifications for reset jobs (#11543)
Browse files Browse the repository at this point in the history
  • Loading branch information
malikdiarra committed Mar 7, 2024
1 parent d127baf commit 8c4855c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.commons.server.handlers.helpers.JobCreationAndStatusUpdateHelper;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.AttemptSyncConfig;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobResetConnectionConfig;
import io.airbyte.config.JobSyncConfig;
Expand Down Expand Up @@ -76,7 +77,9 @@ public InternalOperationResult jobFailure(final JobFailureRequest input) {
for (Attempt attempt : job.getAttempts()) {
attemptStats.add(jobPersistence.getAttemptStats(jobId, attempt.getAttemptNumber()));
}
jobNotifier.failJob(input.getReason(), job, attemptStats);
if (!job.getConfigType().equals(JobConfig.ConfigType.RESET_CONNECTION)) {
jobNotifier.failJob(input.getReason(), job, attemptStats);
}
jobCreationAndStatusUpdateHelper.emitJobToReleaseStagesMetric(OssMetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, job);

final UUID connectionId = UUID.fromString(job.getScope());
Expand Down Expand Up @@ -154,7 +157,9 @@ public InternalOperationResult jobSuccessWithAttemptNumber(final JobSuccessWithA
for (Attempt attempt : job.getAttempts()) {
attemptStats.add(jobPersistence.getAttemptStats(jobId, attempt.getAttemptNumber()));
}
jobNotifier.successJob(job, attemptStats);
if (!job.getConfigType().equals(JobConfig.ConfigType.RESET_CONNECTION)) {
jobNotifier.successJob(job, attemptStats);
}
jobCreationAndStatusUpdateHelper.emitJobToReleaseStagesMetric(OssMetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, job);
jobCreationAndStatusUpdateHelper.trackCompletion(job, JobStatus.SUCCEEDED);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.commons.server.handlers;

import static io.airbyte.config.JobConfig.ConfigType.RESET_CONNECTION;
import static io.airbyte.config.JobConfig.ConfigType.SYNC;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -14,6 +15,7 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -112,6 +114,20 @@ void testJobSuccessWithAttemptNumber() throws IOException {
verify(helper).trackCompletion(any(), eq(JobStatus.SUCCEEDED));
}

@Test
void testResetJobNoNotification() throws IOException {
final var request = new JobSuccessWithAttemptNumberRequest()
.attemptNumber(ATTEMPT_NUMBER)
.jobId(JOB_ID)
.connectionId(UUID.randomUUID())
.standardSyncOutput(standardSyncOutput);
Job job = new Job(JOB_ID, RESET_CONNECTION, "", null, List.of(), io.airbyte.persistence.job.models.JobStatus.SUCCEEDED, 0L, 0, 0);
when(jobPersistence.getJob(JOB_ID)).thenReturn(job);
jobsHandler.jobSuccessWithAttemptNumber(request);

verify(jobNotifier, never()).successJob(any(), any());
}

@Test
void setJobSuccessWrapException() throws IOException {
final IOException exception = new IOException("oops");
Expand Down Expand Up @@ -259,6 +275,7 @@ void setJobFailure() throws IOException {
when(mJob.getScope()).thenReturn(CONNECTION_ID.toString());
when(mJob.getConfig()).thenReturn(mJobConfig);
when(mJob.getLastFailedAttempt()).thenReturn(Optional.of(mAttempt));
when(mJob.getConfigType()).thenReturn(SYNC);

when(jobPersistence.getJob(JOB_ID))
.thenReturn(mJob);
Expand Down Expand Up @@ -299,6 +316,7 @@ void setJobFailureWithNullJobSyncConfig() throws IOException {
Mockito.when(mJob.getScope()).thenReturn(CONNECTION_ID.toString());
Mockito.when(mJob.getConfig()).thenReturn(mJobConfig);
Mockito.when(mJob.getLastFailedAttempt()).thenReturn(Optional.of(mAttempt));
Mockito.when(mJob.getConfigType()).thenReturn(SYNC);

Mockito.when(jobPersistence.getJob(JOB_ID))
.thenReturn(mJob);
Expand Down

0 comments on commit 8c4855c

Please sign in to comment.