-
Notifications
You must be signed in to change notification settings - Fork 750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GOBBLIN-2175] proceed with commit in case of exception from Temporal workload execution #4078
Conversation
30d84e9
to
72baf67
Compare
...rc/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
Outdated
Show resolved
Hide resolved
dcb5817
to
9805bc5
Compare
9805bc5
to
d3a3f17
Compare
log.error("Exception occurred in performing workload,proceeding with commit step", e); | ||
// We want to mark the GaaS flow as failure, in case performWorkFlow fails, but we still want to go ahead with commiting the workunits which were processed before failure | ||
sendFailureEventToGaaS(workSpec); | ||
return proceedWithCommitStepAndReturnCommitStats(workSpec, searchAttributes, workunitsProcessed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was about to suggest you omit this line and "fall through", but the timing is actually crucial. what you're doing here results in sending a failure GTE before doing the commit. that would lead GaaS in turn to emit the job observability event and complete the flow execution before commit even runs. if that case, the datasetsMetrics definitely won't be filled in the job observability event.
much better would be to wait to send the JobFailed GTE only after the JobSummary GTE is sent
hence:
performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);
throw e; // [see comment for L90 about why we throw rather than sending a GTE directly]
...rc/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
Outdated
Show resolved
Hide resolved
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4078 +/- ##
============================================
+ Coverage 51.52% 55.43% +3.90%
+ Complexity 7563 1595 -5968
============================================
Files 1387 307 -1080
Lines 52132 10598 -41534
Branches 5724 1065 -4659
============================================
- Hits 26863 5875 -20988
+ Misses 22979 4222 -18757
+ Partials 2290 501 -1789 ☔ View full report in Codecov by Sentry. |
88a9986
to
90d2af9
Compare
...temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
Outdated
Show resolved
Hide resolved
...oral/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
Outdated
Show resolved
Hide resolved
...temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java
Outdated
Show resolved
Hide resolved
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
Outdated
Show resolved
Hide resolved
...temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
Outdated
Show resolved
Hide resolved
...temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
Outdated
Show resolved
Hide resolved
...oral/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
Outdated
Show resolved
Hide resolved
} catch (Exception e) { | ||
log.error("ProcessWorkUnits failure - will attempt partial commit before announcing error", e); | ||
performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed); | ||
throw e; //We want to proceed with partial commit and throw exception so that the parent workflow ExecuteGobblinWorkflowImpl can throw the failure event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need space after //
and also could streamline comment:
throw e; // re-throw after any partial commit, to fail the parent workflow
but anyway, that throw
on L84 won't run will it? the partial commit (CommitStepWorkflow
) will throw an exception that unwinds the stack beforehand. am I correct that at best this throw
is a "should never happen" fallback?
ITO which we'd prefer to surface (possibly both) - what specific info is in the orig exception relative to the one currently being throw? would it make sense to combine messages from the two or simply throw one or the other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed, caught exception from commit step and threw exception with both the messages combined
43d6b54
to
47ec424
Compare
47ec424
to
81c03bf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work - there are certainly several tricky nuances here you did a great job of getting under control!
} | ||
if (commitGobblinStats.getOptFailure().isPresent()) { | ||
throw ApplicationFailure.newNonRetryableFailureWithCause( | ||
String.format("Failed to commit dataset state for some dataset(s)"), commitGobblinStats.getOptFailure().get().getClass().toString(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NBD, but no need for String.format
, when a string literal would do
log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e); | ||
|
||
try { | ||
performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes, workunitsProcessed);// Attempt partial commit before surfacing the failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs space before //
workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), | ||
Optional.empty())); | ||
} catch (Exception e) { | ||
log.error("ProcessWorkUnits failure - attempting partial commit before re-throwing exception", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may even have suggested this text, but reading again, it's ambiguous (e.g. was the failure while attempting partial commit?)
this would be clearer:
"ProcessWorkUnits failure - will attempt partial commit..."
"Processing failure: %s. Commit workflow failure: %s", | ||
e.getMessage(), | ||
commitException.getMessage() | ||
); | ||
log.error(combinedMessage); | ||
throw ApplicationFailure.newNonRetryableFailureWithCause( | ||
String.format("Processing failure: %s. Partial commit failure: %s", combinedMessage, commitException), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- maybe add intro context plus a newline to separate the msgs. e.g.
"ProcessWorkUnits failure (as expected) led to failure during partial commit attempt -\n ProcessWorkUnits failure: %s\n CommitStep failure: %s"
-
also, can't you reuse
combinedMessage
on L96? or is more ofcommitException
than just the msg getting used the second time? -
e
will not lose its stack trace, so no need to wrap it asnew Exception(e)
, unless you want someone to know you rethrew it from this particular place. that said, I'd avoid wrapping, since that just adds more layers to peel back while debugging
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Partial commit support was introduced in this PR [GOBBLIN-2165] Support partial commit semantic from CopyDataPublisher #4066 , This feature is not working as expected in temporal execution.
When a ProcessWorkUnit activity encounters an Exception and exhausts the configured number of retries, it causes the parent NestingExecWorkUnits workflow to fail. This failure propagates upwards, resulting in a ChildWorkflowExecutionFailed for the next enclosing parent, ProcessWorkUnitsImpl. Consequently, the CommitStepWorkflow, which is crucial for file-level commit, does not execute. The file rename into the final destination occurs during the commit step, not within the task itself. This issue disrupts the workflow, as the commit step is essential for completing the file-level commit process.
To address this, we modify ProcessWorkUnitsImpl to catch the child workflow failure and still perform the CommitStepWorkflow before exiting.
Tests
Commits