diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java index 23edeccb404..56c4404eaaa 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.runtime.operators.schema.regular; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; @@ -62,6 +63,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils.wrap; @@ -73,16 +75,13 @@ public class SchemaCoordinator extends SchemaRegistry { /** Executor service to execute schema change. */ private final ExecutorService schemaChangeThreadPool; - /** - * Atomic flag indicating if current RequestHandler could accept more schema changes for now. - */ - private transient RequestStatus schemaChangeStatus; - /** Sink writers which have sent flush success events for the request. */ private transient ConcurrentHashMap> flushedSinkWriters; - /** Currently handling request's completable future. */ - private transient CompletableFuture pendingResponseFuture; + /** Currently handling requests' completable future. */ + private transient Map< + Integer, Tuple2>> + pendingRequests; // Static fields public SchemaCoordinator( @@ -108,7 +107,7 @@ public SchemaCoordinator( public void start() throws Exception { super.start(); this.flushedSinkWriters = new ConcurrentHashMap<>(); - this.schemaChangeStatus = RequestStatus.IDLE; + this.pendingRequests = new ConcurrentHashMap<>(); } @Override @@ -185,7 +184,7 @@ protected void handleCustomCoordinationRequest( } @Override - protected void handleFlushSuccessEvent(FlushSuccessEvent event) { + protected void handleFlushSuccessEvent(FlushSuccessEvent event) throws TimeoutException { int sinkSubtask = event.getSinkSubTaskId(); int sourceSubtask = event.getSourceSubTaskId(); LOG.info( @@ -200,16 +199,25 @@ protected void handleFlushSuccessEvent(FlushSuccessEvent event) { "Currently flushed sink writers for source task {} are: {}", sourceSubtask, flushedSinkWriters.get(sourceSubtask)); + + if (flushedSinkWriters.get(sourceSubtask).size() >= currentParallelism) { + LOG.info( + "Source SubTask {} have collected enough flush success event. Will start evolving schema changes...", + sourceSubtask); + flushedSinkWriters.remove(sourceSubtask); + startSchemaChangesEvolve(sourceSubtask); + } } @Override protected void handleUnrecoverableError(String taskDescription, Throwable t) { super.handleUnrecoverableError(taskDescription, t); - // There's a pending future, release it exceptionally before quitting - if (pendingResponseFuture != null) { - pendingResponseFuture.completeExceptionally(t); - } + // For each pending future, release it exceptionally before quitting + pendingRequests.forEach( + (index, tuple) -> { + tuple.f1.completeExceptionally(t); + }); } /** @@ -219,73 +227,14 @@ protected void handleUnrecoverableError(String taskDescription, Throwable t) { */ public void handleSchemaChangeRequest( SchemaChangeRequest request, CompletableFuture responseFuture) { - - // We use subTaskId to identify each schema change request - int subTaskId = request.getSubTaskId(); - - if (schemaChangeStatus == RequestStatus.IDLE) { - if (activeSinkWriters.size() < currentParallelism) { - LOG.info( - "Not all active sink writers have been registered. Current {}, expected {}.", - activeSinkWriters.size(), - currentParallelism); - responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush())); - return; - } - - if (!activeSinkWriters.equals(flushedSinkWriters.get(subTaskId))) { - LOG.info( - "Not all active sink writers have completed flush. Flushed writers: {}, expected: {}.", - flushedSinkWriters.get(subTaskId), - activeSinkWriters); - responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush())); - return; - } - - LOG.info( - "All sink writers have flushed for subTaskId {}. Switching to APPLYING state and starting schema evolution...", - subTaskId); - flushedSinkWriters.remove(subTaskId); - schemaChangeStatus = RequestStatus.APPLYING; - pendingResponseFuture = responseFuture; - startSchemaChangesEvolve(request, responseFuture); - } else { - responseFuture.complete(wrap(SchemaChangeResponse.busy())); - } + pendingRequests.put(request.getSubTaskId(), Tuple2.of(request, responseFuture)); } - private void startSchemaChangesEvolve( - SchemaChangeRequest request, CompletableFuture responseFuture) { - SchemaChangeEvent originalEvent = request.getSchemaChangeEvent(); - TableId originalTableId = originalEvent.tableId(); - Schema currentUpstreamSchema = - schemaManager.getLatestOriginalSchema(originalTableId).orElse(null); - - List deducedSchemaChangeEvents = new ArrayList<>(); - - // For redundant schema change events (possibly coming from duplicate emitted - // CreateTableEvents in snapshot stage), we just skip them. - if (!SchemaUtils.isSchemaChangeEventRedundant(currentUpstreamSchema, originalEvent)) { - schemaManager.applyOriginalSchemaChange(originalEvent); - deducedSchemaChangeEvents.addAll(deduceEvolvedSchemaChanges(originalEvent)); - } else { - LOG.info( - "Schema change event {} is redundant for current schema {}, just skip it.", - originalEvent, - currentUpstreamSchema); - } - - LOG.info( - "All sink subtask have flushed for table {}. Start to apply schema change request: \n\t{}\nthat extracts to:\n\t{}", - request.getTableId().toString(), - request, - deducedSchemaChangeEvents.stream() - .map(SchemaChangeEvent::toString) - .collect(Collectors.joining("\n\t"))); + private void startSchemaChangesEvolve(int sourceSubTaskId) { schemaChangeThreadPool.submit( () -> { try { - applySchemaChange(originalEvent, deducedSchemaChangeEvents); + applySchemaChange(sourceSubTaskId); } catch (Throwable t) { failJob( "Schema change applying task", @@ -379,8 +328,54 @@ private List deduceEvolvedSchemaChanges(SchemaChangeEvent eve } /** Applies the schema change to the external system. */ - private void applySchemaChange( - SchemaChangeEvent originalEvent, List deducedSchemaChangeEvents) { + private void applySchemaChange(int sourceSubTaskId) { + try { + loopUntil( + () -> pendingRequests.containsKey(sourceSubTaskId), + () -> + LOG.info( + "SchemaOperator {} has not submitted schema change request yet. Waiting...", + sourceSubTaskId), + rpcTimeout, + Duration.ofMillis(100)); + } catch (TimeoutException e) { + throw new RuntimeException( + "Timeout waiting for schema change request from SchemaOperator.", e); + } + + Tuple2> requestBody = + pendingRequests.get(sourceSubTaskId); + SchemaChangeRequest request = requestBody.f0; + CompletableFuture responseFuture = requestBody.f1; + + SchemaChangeEvent originalEvent = request.getSchemaChangeEvent(); + + TableId originalTableId = originalEvent.tableId(); + Schema currentUpstreamSchema = + schemaManager.getLatestOriginalSchema(originalTableId).orElse(null); + + List deducedSchemaChangeEvents = new ArrayList<>(); + + // For redundant schema change events (possibly coming from duplicate emitted + // CreateTableEvents in snapshot stage), we just skip them. + if (!SchemaUtils.isSchemaChangeEventRedundant(currentUpstreamSchema, originalEvent)) { + schemaManager.applyOriginalSchemaChange(originalEvent); + deducedSchemaChangeEvents.addAll(deduceEvolvedSchemaChanges(originalEvent)); + } else { + LOG.info( + "Schema change event {} is redundant for current schema {}, just skip it.", + originalEvent, + currentUpstreamSchema); + } + + LOG.info( + "All sink subtask have flushed for table {}. Start to apply schema change request: \n\t{}\nthat extracts to:\n\t{}", + request.getTableId().toString(), + request, + deducedSchemaChangeEvents.stream() + .map(SchemaChangeEvent::toString) + .collect(Collectors.joining("\n\t"))); + if (SchemaChangeBehavior.EXCEPTION.equals(behavior)) { if (deducedSchemaChangeEvents.stream() .anyMatch(evt -> !(evt instanceof CreateTableEvent))) { @@ -415,18 +410,16 @@ private void applySchemaChange( } // And returns all successfully applied schema change events to SchemaOperator. - pendingResponseFuture.complete( + responseFuture.complete( wrap( SchemaChangeResponse.success( appliedSchemaChangeEvents, refreshedEvolvedSchemas))); - pendingResponseFuture = null; - - Preconditions.checkState( - schemaChangeStatus == RequestStatus.APPLYING, - "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not " - + schemaChangeStatus); - schemaChangeStatus = RequestStatus.IDLE; - LOG.info("SchemaChangeStatus switched from APPLYING to IDLE."); + + pendingRequests.remove(sourceSubTaskId); + LOG.info( + "Finished handling schema change request from {}. Pending requests: {}", + sourceSubTaskId, + pendingRequests); } private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent schemaChangeEvent) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java index 7d62d4c0737..ce58b911ba4 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java @@ -57,7 +57,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; @@ -243,31 +242,9 @@ private void handleDataChangeEvent(DataChangeEvent dataChangeEvent) { } private SchemaChangeResponse requestSchemaChange( - TableId tableId, SchemaChangeEvent schemaChangeEvent) - throws InterruptedException, TimeoutException { - long deadline = System.currentTimeMillis() + rpcTimeout.toMillis(); - while (true) { - SchemaChangeResponse response = - sendRequestToCoordinator( - new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId)); - if (System.currentTimeMillis() < deadline) { - if (response.isRegistryBusy()) { - LOG.info( - "{}> Schema Registry is busy now, waiting for next request...", - subTaskId); - Thread.sleep(1000); - } else if (response.isWaitingForFlush()) { - LOG.info( - "{}> Schema change event has not collected enough flush success events from writers, waiting...", - subTaskId); - Thread.sleep(1000); - } else { - return response; - } - } else { - throw new TimeoutException("Timeout when requesting schema change."); - } - } + TableId tableId, SchemaChangeEvent schemaChangeEvent) { + return sendRequestToCoordinator( + new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId)); } private diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java index 282557552c3..afb75b12b72 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java @@ -51,10 +51,6 @@ public static SchemaChangeResponse success( return new SchemaChangeResponse(ResponseCode.SUCCESS, schemaChangeEvents, evolvedSchemas); } - public static SchemaChangeResponse busy() { - return new SchemaChangeResponse(ResponseCode.BUSY); - } - public static SchemaChangeResponse duplicate() { return new SchemaChangeResponse(ResponseCode.DUPLICATE); } @@ -63,10 +59,6 @@ public static SchemaChangeResponse ignored() { return new SchemaChangeResponse(ResponseCode.IGNORED); } - public static SchemaChangeResponse waitingForFlush() { - return new SchemaChangeResponse(ResponseCode.WAITING_FOR_FLUSH); - } - private SchemaChangeResponse(ResponseCode responseCode) { this(responseCode, Collections.emptyList(), Collections.emptyMap()); } @@ -84,10 +76,6 @@ public boolean isSuccess() { return ResponseCode.SUCCESS.equals(responseCode); } - public boolean isRegistryBusy() { - return ResponseCode.BUSY.equals(responseCode); - } - public boolean isDuplicate() { return ResponseCode.DUPLICATE.equals(responseCode); } @@ -96,10 +84,6 @@ public boolean isIgnored() { return ResponseCode.IGNORED.equals(responseCode); } - public boolean isWaitingForFlush() { - return ResponseCode.WAITING_FOR_FLUSH.equals(responseCode); - } - public List getAppliedSchemaChangeEvents() { return appliedSchemaChangeEvents; } @@ -142,8 +126,6 @@ public String toString() { *

- Accepted: Requested schema change request has been accepted exclusively. Any other * schema change requests will be blocked. * - *

- Busy: Schema registry is currently busy processing another schema change request. - * *

- Duplicate: This schema change request has been submitted before, possibly by another * paralleled subTask. * @@ -152,9 +134,7 @@ public String toString() { */ public enum ResponseCode { SUCCESS, - BUSY, DUPLICATE, - IGNORED, - WAITING_FOR_FLUSH + IGNORED } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java index fbeb325066f..19753675101 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkOperatorWithSchemaEvolveTest.java @@ -87,16 +87,12 @@ private void processSchemaChangeEvent( // Create the flush event to process before the schema change event FlushEvent flushEvent = createFlushEvent(tableId, event); - // Send schema change request to coordinator - schemaOperatorHarness.requestSchemaChangeEvent(tableId, event); - // Send flush event to SinkWriterOperator dataSinkWriterOperator.processElement(new StreamRecord<>(flushEvent)); - // Wait for coordinator to complete the schema change and get the finished schema change - // events + // Send schema change request to coordinator SchemaChangeResponse schemaEvolveResponse = - schemaOperatorHarness.requestSchemaChangeResult(tableId, event); + schemaOperatorHarness.requestSchemaChangeEvent(tableId, event); List finishedSchemaChangeEvents = schemaEvolveResponse.getAppliedSchemaChangeEvents(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java index 11a97064984..ea4de7a14e7 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils; @@ -66,10 +67,8 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import static org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT; import static org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils.unwrap; /** @@ -91,6 +90,9 @@ public class RegularEventOperatorTestHarness with(OP operator, int numOutputs) { operator, numOutputs, null, - null, + DEFAULT_RPC_TIMEOUT, SchemaChangeBehavior.EVOLVE, Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()), Collections.emptySet()); @@ -143,7 +145,7 @@ RegularEventOperatorTestHarness withDuration( operator, numOutputs, evolveDuration, - null, + DEFAULT_RPC_TIMEOUT, SchemaChangeBehavior.EVOLVE, Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()), Collections.emptySet()); @@ -159,7 +161,7 @@ RegularEventOperatorTestHarness withDurationAndBehavior( operator, numOutputs, evolveDuration, - null, + DEFAULT_RPC_TIMEOUT, behavior, Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()), Collections.emptySet()); @@ -176,7 +178,7 @@ RegularEventOperatorTestHarness withDurationAndFineGrainedBehavior( operator, numOutputs, evolveDuration, - null, + DEFAULT_RPC_TIMEOUT, behavior, enabledEventTypes, Collections.emptySet()); @@ -195,7 +197,7 @@ RegularEventOperatorTestHarness withDurationAndFineGrainedBehaviorWithErr operator, numOutputs, evolveDuration, - null, + DEFAULT_RPC_TIMEOUT, behavior, enabledEventTypes, errorOnEventTypes); @@ -240,31 +242,6 @@ public SchemaChangeResponse requestSchemaChangeEvent(TableId tableId, SchemaChan .get()); } - public SchemaChangeResponse requestSchemaChangeResult(TableId tableId, SchemaChangeEvent event) - throws ExecutionException, InterruptedException, TimeoutException { - long rpcTimeOutInMillis = DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis(); - long deadline = System.currentTimeMillis() + rpcTimeOutInMillis; - while (true) { - LOG.info("request schema change result"); - SchemaChangeResponse response = requestSchemaChangeEvent(tableId, event); - if (System.currentTimeMillis() < deadline) { - if (response.isRegistryBusy()) { - LOG.info("{}> Schema Registry is busy now, waiting for next request...", 0); - Thread.sleep(1000); - } else if (response.isWaitingForFlush()) { - LOG.info( - "{}> Schema change event has not collected enough flush success events from writers, waiting...", - 0); - Thread.sleep(1000); - } else { - return response; - } - } else { - throw new TimeoutException("Timeout when requesting schema change."); - } - } - } - public Schema getLatestOriginalSchema(TableId tableId) throws Exception { return ((GetOriginalSchemaResponse) unwrap(