Skip to content

Commit

Permalink
addressed PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Aditya Pratap Singh committed Dec 4, 2024
1 parent 72baf67 commit dcb5817
Showing 1 changed file with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
Expand Down Expand Up @@ -73,20 +74,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) {

NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = createProcessingWorkflow(workSpec, searchAttributes);

int workunitsProcessed = 0;
Optional<Integer> workunitsProcessed = Optional.empty();
try {
workunitsProcessed = processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
workunitsProcessed = Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()));
} catch (Exception e) {
log.error("Exception occurred in performing workload,proceeding with commit step", e);
return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed);
final CommitStats commitStats = proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed);
// We want to mark the GaaS flow as failure, in case performWorkFlow fails, but we still want to go ahead with commiting the workunits which were processed before failure
sendFailureEventToGaaS(workSpec);
return commitStats;
}
return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed);
}

private static void sendFailureEventToGaaS(WUProcessingSpec workSpec) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
}

private CommitStats proceedWithCommitStepAndReturnCommitStats(WUProcessingSpec workSpec,
Map<String, Object> searchAttributes, int workunitsProcessed) {
if (workunitsProcessed > 0) {
Map<String, Object> searchAttributes, Optional<Integer> workunitsProcessed) {
/*
!workunitsProcessed.isPresent() condition helps in case of partial commit,
workunitsProcessed will be Optional.Empty() only in cases performWorkload throws an exception
we are only inhibiting commit when workunitsProcessed actually known to be zero
* */
if (!workunitsProcessed.isPresent() || workunitsProcessed.get() > 0) {
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(searchAttributes);
CommitStats result = commitWorkflow.commit(workSpec);
if (result.getNumCommittedWorkUnits() == 0) {
Expand Down

0 comments on commit dcb5817

Please sign in to comment.