Skip to content

Commit

Permalink
[Feature] Add init and restoreCommit method in `SinkAggregatedCom…
Browse files Browse the repository at this point in the history
…mitter` (apache#5598)
  • Loading branch information
Hisoka-X authored Oct 10, 2023
1 parent eac76b4 commit c4b18db
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@
*/
public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> extends Serializable {

/** init sink aggregated committer */
default void init() {};

/** Re-commit message to third party data receiver, The method need to achieve idempotency. */
default List<AggregatedCommitInfoT> restoreCommit(
List<AggregatedCommitInfoT> aggregatedCommitInfo) throws IOException {
return commit(aggregatedCommitInfo);
}

/**
* Commit message to third party data receiver, The method need to achieve idempotency.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public void init() throws Exception {
this.commitInfoSerializer = sink.getSink().getCommitInfoSerializer().get();
this.aggregatedCommitInfoSerializer =
sink.getSink().getAggregatedCommitInfoSerializer().get();
aggregatedCommitter.init();
log.debug(
"starting seatunnel sink aggregated committer task, sink name[{}] ",
sink.getName());
Expand Down Expand Up @@ -262,7 +263,12 @@ public void restoreState(List<ActionSubtaskState> actionStateList) throws Except
aggregatedCommitInfoSerializer.deserialize(
bytes)))
.collect(Collectors.toList());
aggregatedCommitter.commit(aggregatedCommitInfos);
List<AggregatedCommitInfoT> commit =
aggregatedCommitter.restoreCommit(aggregatedCommitInfos);
if (CollectionUtils.isNotEmpty(commit)) {
log.error("aggregated committer error: {}", commit.size());
throw new CheckpointException(CheckpointCloseReason.AGGREGATE_COMMIT_ERROR);
}
restoreComplete.complete(null);
log.debug("restoreState for sink agg committer [{}] finished", actionStateList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class FlinkGlobalCommitter<CommT, GlobalCommT>

FlinkGlobalCommitter(SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter) {
this.aggregatedCommitter = aggregatedCommitter;
aggregatedCommitter.init();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public SparkDataSourceWriter(
throws IOException {
this.sink = sink;
this.sinkAggregatedCommitter = sink.createAggregatedCommitter().orElse(null);
if (sinkAggregatedCommitter != null) {
sinkAggregatedCommitter.init();
}
}

@Override
Expand Down

0 comments on commit c4b18db

Please sign in to comment.