diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index eb0529d9d42..3e1bd854cee 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -22,7 +22,6 @@ import io.airbyte.workers.temporal.scheduling.activities.WorkflowConfigActivity; import io.airbyte.workers.temporal.sync.AsyncReplicationActivity; import io.airbyte.workers.temporal.sync.InvokeOperationsActivity; -import io.airbyte.workers.temporal.sync.RefreshSchemaActivity; import io.airbyte.workers.temporal.sync.ReportRunTimeActivity; import io.airbyte.workers.temporal.sync.WorkloadStatusCheckActivity; import io.airbyte.workers.temporal.workflows.ConnectorCommandActivity; @@ -87,14 +86,13 @@ public PayloadChecker payloadChecker(final MetricClient metricClient) { @Singleton @Named("syncActivities") public List syncActivities(final ConfigFetchActivity configFetchActivity, - final RefreshSchemaActivity refreshSchemaActivity, final ReportRunTimeActivity reportRunTimeActivity, final InvokeOperationsActivity invokeOperationsActivity, final AsyncReplicationActivity asyncReplicationActivity, final WorkloadStatusCheckActivity workloadStatusCheckActivity, final DiscoverCatalogHelperActivity discoverCatalogHelperActivity) { - return List.of(configFetchActivity, refreshSchemaActivity, reportRunTimeActivity, - invokeOperationsActivity, asyncReplicationActivity, workloadStatusCheckActivity, discoverCatalogHelperActivity); + return List.of(configFetchActivity, reportRunTimeActivity, invokeOperationsActivity, asyncReplicationActivity, + workloadStatusCheckActivity, discoverCatalogHelperActivity); } @Singleton diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivity.java deleted file mode 100644 index 3e55e449024..00000000000 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivity.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.temporal.sync; - -import io.temporal.activity.ActivityInterface; -import io.temporal.activity.ActivityMethod; -import java.util.UUID; - -/** - * RefreshSchemaActivity. - */ -@ActivityInterface -public interface RefreshSchemaActivity { - - @ActivityMethod - boolean shouldRefreshSchema(UUID sourceCatalogId); - -} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java deleted file mode 100644 index f95592aae5d..00000000000 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.temporal.sync; - -import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; -import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_ID_KEY; - -import datadog.trace.api.Trace; -import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt; -import io.airbyte.api.client.model.generated.SourceIdRequestBody; -import io.airbyte.featureflag.FeatureFlagClient; -import io.airbyte.featureflag.RefreshSchemaPeriod; -import io.airbyte.featureflag.Workspace; -import io.airbyte.metrics.lib.ApmTraceUtils; -import jakarta.inject.Singleton; -import java.lang.invoke.MethodHandles; -import java.time.OffsetDateTime; -import java.util.Map; -import java.util.UUID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Refresh schema temporal activity impl. - */ -@Singleton -public class RefreshSchemaActivityImpl implements RefreshSchemaActivity { - - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private final AirbyteApiClient airbyteApiClient; - private final FeatureFlagClient featureFlagClient; - - public RefreshSchemaActivityImpl(final AirbyteApiClient airbyteApiClient, - final FeatureFlagClient featureFlagClient) { - this.airbyteApiClient = airbyteApiClient; - this.featureFlagClient = featureFlagClient; - } - - @Override - @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) - public boolean shouldRefreshSchema(final UUID sourceCatalogId) { - ApmTraceUtils.addTagsToTrace(Map.of(SOURCE_ID_KEY, sourceCatalogId)); - return !schemaRefreshRanRecently(sourceCatalogId); - } - - private boolean schemaRefreshRanRecently(final UUID sourceCatalogId) { - try { - final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody(sourceCatalogId); - final ActorCatalogWithUpdatedAt mostRecentFetchEvent = airbyteApiClient.getSourceApi().getMostRecentSourceActorCatalog(sourceIdRequestBody); - if (mostRecentFetchEvent.getUpdatedAt() == null) { - return false; - } - final UUID workspaceId = airbyteApiClient.getSourceApi().getSource(sourceIdRequestBody).getWorkspaceId(); - int refreshPeriod = 24; - if (workspaceId != null) { - refreshPeriod = featureFlagClient.intVariation(RefreshSchemaPeriod.INSTANCE, new Workspace(workspaceId)); - } - return mostRecentFetchEvent.getUpdatedAt() > OffsetDateTime.now().minusHours(refreshPeriod).toEpochSecond(); - } catch (final Exception e) { - ApmTraceUtils.addExceptionToTrace(e); - // catching this exception because we don't want to block replication due to a failed schema refresh - log.info("Encountered an error fetching most recent actor catalog fetch event: ", e); - return true; - } - } - -} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index a780a6afcc3..c1171e45ffc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -72,8 +72,6 @@ public class SyncWorkflowImpl implements SyncWorkflow { private static final HashFunction HASH_FUNCTION = Hashing.md5(); - @TemporalActivityStub(activityOptionsBeanName = "refreshSchemaActivityOptions") - private RefreshSchemaActivity refreshSchemaActivity; @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") private ConfigFetchActivity configFetchActivity; @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") @@ -119,10 +117,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final Optional sourceId = getSourceId(syncInput); final RefreshSchemaActivityOutput refreshSchemaOutput; - final int removeShouldRereshSchemaVersion = Workflow.getVersion("REMOVE_SHOULD_REFRESH_SCHEMA", DEFAULT_VERSION, 1); - final boolean shouldRefreshSchema = sourceId.isPresent() && removeShouldRereshSchemaVersion == DEFAULT_VERSION - ? refreshSchemaActivity.shouldRefreshSchema(sourceId.get()) - : true; + try { final JsonNode sourceConfig = configFetchActivity.getSourceConfig(sourceId.get()); refreshSchemaOutput = runDiscoverAsChildWorkflow(jobRunConfig, sourceLauncherConfig, syncInput, sourceConfig); @@ -182,12 +177,10 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, : syncInput.getConnectionContext().getSourceDefinitionId(), startTime, discoverSchemaEndTime, - replicationEndTime, - shouldRefreshSchema)); + replicationEndTime)); } - // TODO: remove shouldRefreshSchema on the activity is removed - if (shouldRefreshSchema && syncOutput.getStandardSyncSummary() != null && syncOutput.getStandardSyncSummary().getTotalStats() != null) { + if (syncOutput.getStandardSyncSummary() != null && syncOutput.getStandardSyncSummary().getTotalStats() != null) { syncOutput.getStandardSyncSummary().getTotalStats().setDiscoverSchemaEndTime(discoverSchemaEndTime); syncOutput.getStandardSyncSummary().getTotalStats().setDiscoverSchemaStartTime(startTime); } diff --git a/airbyte-workers/src/main/kotlin/io/airbyte/workers/temporal/activities/ReportRunTimeActivityInput.kt b/airbyte-workers/src/main/kotlin/io/airbyte/workers/temporal/activities/ReportRunTimeActivityInput.kt index e9738f95bb0..bd64e1abcfe 100644 --- a/airbyte-workers/src/main/kotlin/io/airbyte/workers/temporal/activities/ReportRunTimeActivityInput.kt +++ b/airbyte-workers/src/main/kotlin/io/airbyte/workers/temporal/activities/ReportRunTimeActivityInput.kt @@ -10,7 +10,6 @@ data class ReportRunTimeActivityInput( val startTime: Long, val refreshSchemaEndTime: Long, val replicationEndTime: Long, - val shouldRefreshSchema: Boolean, ) { class Builder @JvmOverloads @@ -20,7 +19,6 @@ data class ReportRunTimeActivityInput( private val startTime: Long? = null, private val refreshSchemaEndTime: Long? = null, private val replicationEndTime: Long? = null, - private val shouldRefreshSchema: Boolean? = null, ) { fun build(): ReportRunTimeActivityInput { return ReportRunTimeActivityInput( @@ -29,7 +27,6 @@ data class ReportRunTimeActivityInput( startTime!!, refreshSchemaEndTime!!, replicationEndTime!!, - shouldRefreshSchema!!, ) } } diff --git a/airbyte-workers/src/main/kotlin/io/airbyte/workers/temporal/sync/ReportRunTimeActivityImpl.kt b/airbyte-workers/src/main/kotlin/io/airbyte/workers/temporal/sync/ReportRunTimeActivityImpl.kt index f0f032f3d1b..1bee10e15e4 100644 --- a/airbyte-workers/src/main/kotlin/io/airbyte/workers/temporal/sync/ReportRunTimeActivityImpl.kt +++ b/airbyte-workers/src/main/kotlin/io/airbyte/workers/temporal/sync/ReportRunTimeActivityImpl.kt @@ -20,14 +20,12 @@ class ReportRunTimeActivityImpl(private val metricClient: MetricClient) : Report val connectionTag = MetricAttribute(MetricTags.CONNECTION_ID, input.connectionId.toString()) val sourceDefinitionTag = MetricAttribute(MetricTags.SOURCE_DEFINITION_ID, input.sourceDefinitionId.toString()) - if (input.shouldRefreshSchema) { - metricClient.count( - OssMetricsRegistry.DISCOVER_CATALOG_RUN_TIME, - runTimeRefresh, - connectionTag, - sourceDefinitionTag, - ) - } + metricClient.count( + OssMetricsRegistry.DISCOVER_CATALOG_RUN_TIME, + runTimeRefresh, + connectionTag, + sourceDefinitionTag, + ) metricClient.count(OssMetricsRegistry.REPLICATION_RUN_TIME, runTimeReplication, connectionTag, sourceDefinitionTag) metricClient.count(OssMetricsRegistry.SYNC_TOTAL_TIME, totalWorkflowRunTime, connectionTag, sourceDefinitionTag) } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/config/DataPlaneActivityInitializationMicronautTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/config/DataPlaneActivityInitializationMicronautTest.java index a26977ea91a..eaf0805018f 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/config/DataPlaneActivityInitializationMicronautTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/config/DataPlaneActivityInitializationMicronautTest.java @@ -11,8 +11,6 @@ import io.airbyte.config.secrets.persistence.SecretPersistence; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivityImpl; -import io.airbyte.workers.temporal.sync.RefreshSchemaActivity; -import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl; import io.airbyte.workers.temporal.sync.WebhookOperationActivity; import io.airbyte.workers.temporal.sync.WebhookOperationActivityImpl; import io.micronaut.context.annotation.Bean; @@ -57,9 +55,6 @@ class DataPlaneActivityInitializationMicronautTest { @Inject ConfigFetchActivity configFetchActivity; - @Inject - RefreshSchemaActivity refreshSchemaActivity; - @Inject WebhookOperationActivity webhookOperationActivity; @@ -68,11 +63,6 @@ void testConfigFetchActivity() { assertEquals(ConfigFetchActivityImpl.class, configFetchActivity.getClass()); } - @Test - void testRefreshSchemaActivity() { - assertEquals(RefreshSchemaActivityImpl.class, refreshSchemaActivity.getClass()); - } - @Test void testWebhookOperationActivity() { assertEquals(WebhookOperationActivityImpl.class, webhookOperationActivity.getClass()); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java deleted file mode 100644 index fd56b0a087c..00000000000 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RefreshSchemaActivityTest.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright (c) 2020-2025 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.temporal.scheduling.activities; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; - -import io.airbyte.api.client.AirbyteApiClient; -import io.airbyte.api.client.generated.ConnectionApi; -import io.airbyte.api.client.generated.SourceApi; -import io.airbyte.api.client.generated.WorkspaceApi; -import io.airbyte.api.client.model.generated.ActorCatalogWithUpdatedAt; -import io.airbyte.api.client.model.generated.AirbyteCatalog; -import io.airbyte.api.client.model.generated.AirbyteStream; -import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration; -import io.airbyte.api.client.model.generated.CatalogDiff; -import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; -import io.airbyte.api.client.model.generated.JobConfigType; -import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRead; -import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; -import io.airbyte.api.client.model.generated.SourceIdRequestBody; -import io.airbyte.api.client.model.generated.SourceRead; -import io.airbyte.api.client.model.generated.StreamDescriptor; -import io.airbyte.api.client.model.generated.StreamTransform; -import io.airbyte.api.client.model.generated.SynchronousJobRead; -import io.airbyte.api.client.model.generated.WorkloadPriority; -import io.airbyte.api.client.model.generated.WorkspaceRead; -import io.airbyte.commons.json.Jsons; -import io.airbyte.featureflag.RefreshSchemaPeriod; -import io.airbyte.featureflag.TestClient; -import io.airbyte.featureflag.Workspace; -import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl; -import java.io.IOException; -import java.time.OffsetDateTime; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; -import org.mockito.quality.Strictness; - -@ExtendWith(MockitoExtension.class) -@SuppressWarnings("PMD.AvoidDuplicateLiterals") -class RefreshSchemaActivityTest { - - private AirbyteApiClient mAirbyteApiClient; - private SourceApi mSourceApi; - private ConnectionApi mConnectionApi; - private WorkspaceApi mWorkspaceApi; - private TestClient mFeatureFlagClient; - - private RefreshSchemaActivityImpl refreshSchemaActivity; - - private static final UUID SOURCE_ID = UUID.randomUUID(); - private static final UUID WORKSPACE_ID = UUID.randomUUID(); - private static final UUID CONNECTION_ID = UUID.randomUUID(); - private static final UUID CATALOG_ID = UUID.randomUUID(); - private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID(); - private static final AirbyteCatalog CATALOG = - new AirbyteCatalog( - List.of(new AirbyteStreamAndConfiguration(new AirbyteStream("test stream", null, null, null, null, null, null, null), null))); - private static final CatalogDiff CATALOG_DIFF = - new CatalogDiff(List.of(new StreamTransform(StreamTransform.TransformType.UPDATE_STREAM, new StreamDescriptor("test stream", null), null))); - - @BeforeEach - void setUp() throws IOException { - mAirbyteApiClient = mock(AirbyteApiClient.class); - mSourceApi = mock(SourceApi.class, withSettings().strictness(Strictness.LENIENT)); - mConnectionApi = mock(ConnectionApi.class); - mFeatureFlagClient = mock(TestClient.class, withSettings().strictness(Strictness.LENIENT)); - mWorkspaceApi = mock(WorkspaceApi.class, withSettings().strictness(Strictness.LENIENT)); - when(mWorkspaceApi.getWorkspaceByConnectionId(new ConnectionIdRequestBody(CONNECTION_ID))) - .thenReturn(new WorkspaceRead(WORKSPACE_ID, UUID.randomUUID(), "name", "slug", false, UUID.randomUUID(), null, null, null, null, null, null, - null, null, null, null, null, null, null)); - when(mSourceApi.getSource(new SourceIdRequestBody(SOURCE_ID))).thenReturn( - new SourceRead(SOURCE_DEFINITION_ID, SOURCE_ID, WORKSPACE_ID, Jsons.jsonNode(Map.of()), "name", "source-name", 1L, null, null, null, null, - null)); - when(mSourceApi.discoverSchemaForSource( - new SourceDiscoverSchemaRequestBody(SOURCE_ID, CONNECTION_ID, true, true, WorkloadPriority.DEFAULT))) - .thenReturn( - new SourceDiscoverSchemaRead( - new SynchronousJobRead( - UUID.randomUUID(), - JobConfigType.REFRESH, - System.currentTimeMillis(), - System.currentTimeMillis(), - true, - null, - - false, - null, - null, - null), - CATALOG, - CATALOG_ID, - CATALOG_DIFF, - false, - null)); - when(mAirbyteApiClient.getSourceApi()).thenReturn(mSourceApi); - refreshSchemaActivity = - new RefreshSchemaActivityImpl(mAirbyteApiClient, mFeatureFlagClient); - } - - @Test - void testShouldRefreshSchemaNoRecentRefresh() throws IOException { - when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(new ActorCatalogWithUpdatedAt()); - Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); - } - - @Test - void testShouldRefreshSchemaRecentRefreshOver24HoursAgo() throws IOException { - final Long twoDaysAgo = OffsetDateTime.now().minusHours(48L).toEpochSecond(); - final ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt(twoDaysAgo, null); - - when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt); - when(mSourceApi.getSource(any())).thenReturn( - new SourceRead(SOURCE_DEFINITION_ID, SOURCE_ID, WORKSPACE_ID, Jsons.jsonNode(Map.of()), "name", "source-name", 1L, null, null, null, null, - null)); - when(mFeatureFlagClient.intVariation(RefreshSchemaPeriod.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(24); - - Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); - } - - @Test - void testShouldRefreshSchemaRecentRefreshLessThan24HoursAgo() throws IOException { - final Long twelveHoursAgo = OffsetDateTime.now().minusHours(12L).toEpochSecond(); - final ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt(twelveHoursAgo, null); - - when(mSourceApi.getSource(any())).thenReturn( - new SourceRead(SOURCE_DEFINITION_ID, SOURCE_ID, WORKSPACE_ID, Jsons.jsonNode(Map.of()), "name", "source-name", 1L, null, null, null, null, - null)); - when(mFeatureFlagClient.intVariation(RefreshSchemaPeriod.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(24); - when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt); - - Assertions.assertThat(false).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); - } - - @Test - void testShouldRefreshSchemaRecentRefreshLessThanValueFromFF() throws IOException { - final Long twelveHoursAgo = OffsetDateTime.now().minusHours(12L).toEpochSecond(); - final ActorCatalogWithUpdatedAt actorCatalogWithUpdatedAt = new ActorCatalogWithUpdatedAt(twelveHoursAgo, null); - - when(mSourceApi.getSource(any())).thenReturn( - new SourceRead(SOURCE_DEFINITION_ID, SOURCE_ID, WORKSPACE_ID, Jsons.jsonNode(Map.of()), "name", "source-name", 1L, null, null, null, null, - null)); - when(mFeatureFlagClient.intVariation(RefreshSchemaPeriod.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(10); - when(mSourceApi.getMostRecentSourceActorCatalog(any())).thenReturn(actorCatalogWithUpdatedAt); - - Assertions.assertThat(true).isEqualTo(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)); - } - -} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 819daae71a2..cc69c69c51f 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -81,7 +81,6 @@ class SyncWorkflowTest { private DiscoverCatalogHelperActivity discoverCatalogHelperActivity; private WorkloadStatusCheckActivity workloadStatusCheckActivity; private InvokeOperationsActivity invokeOperationsActivity; - private RefreshSchemaActivityImpl refreshSchemaActivity; private ConfigFetchActivityImpl configFetchActivity; private ReportRunTimeActivity reportRunTimeActivity; @@ -146,14 +145,12 @@ void setUp() { discoverCatalogHelperActivity = mock(DiscoverCatalogHelperActivityImpl.class); workloadStatusCheckActivity = mock(WorkloadStatusCheckActivityImpl.class); invokeOperationsActivity = mock(InvokeOperationsActivityImpl.class); - refreshSchemaActivity = mock(RefreshSchemaActivityImpl.class); configFetchActivity = mock(ConfigFetchActivityImpl.class); reportRunTimeActivity = mock(ReportRunTimeActivityImpl.class); when(discoverCatalogHelperActivity.postprocess(any())).thenReturn(PostprocessCatalogOutput.Companion.success(null)); when(configFetchActivity.getSourceId(sync.getConnectionId())).thenReturn(Optional.of(SOURCE_ID)); - when(refreshSchemaActivity.shouldRefreshSchema(SOURCE_ID)).thenReturn(true); when(configFetchActivity.getStatus(sync.getConnectionId())).thenReturn(Optional.of(ConnectionStatus.ACTIVE)); when(configFetchActivity.getSourceConfig(SOURCE_ID)).thenReturn(Jsons.emptyObject()); @@ -236,7 +233,6 @@ private StandardSyncOutput execute(final boolean isReset) { discoverCatalogHelperActivity, workloadStatusCheckActivity, invokeOperationsActivity, - refreshSchemaActivity, configFetchActivity, reportRunTimeActivity); testEnv.start(); @@ -332,7 +328,6 @@ void testSkipReplicationIfConnectionDisabledBySchemaRefresh() throws Exception { @Test void testGetProperFailureIfRefreshFails() throws Exception { - when(refreshSchemaActivity.shouldRefreshSchema(any())).thenReturn(true); doThrow(new RuntimeException()) .when(discoverCatalogHelperActivity).postprocess(any()); final StandardSyncOutput output = execute();