Skip to content

Commit

Permalink
[Bugfix] Abort transaction if flush failed (#188)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <lpengfei2016@gmail.com>
  • Loading branch information
banmoy committed Mar 6, 2023
1 parent a273ed7 commit 61e115e
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,16 @@ public void finish() {

@Override
public void close() {
sinkManager.flush();
StreamLoadSnapshot snapshot = sinkManager.snapshot();
sinkManager.abort(snapshot);
sinkManager.close();
try {
sinkManager.flush();
} catch (Exception e) {
log.error("Failed to flush when closing", e);
throw e;
} finally {
StreamLoadSnapshot snapshot = sinkManager.snapshot();
sinkManager.abort(snapshot);
sinkManager.close();
}
}

@Override
Expand All @@ -198,6 +204,7 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw
snapshotStates.clear();
snapshotStates.add(StarrocksSnapshotState.of(snapshotMap));
} else {
sinkManager.abort(snapshot);
throw new RuntimeException("Snapshot state failed by prepare");
}

Expand Down

0 comments on commit 61e115e

Please sign in to comment.