Skip to content

Commit

Permalink
chore: remove unused activity (#15002)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau committed Jan 14, 2025
1 parent 4e02be2 commit 153436d
Show file tree
Hide file tree
Showing 9 changed files with 11 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.airbyte.workers.temporal.scheduling.activities.WorkflowConfigActivity;
import io.airbyte.workers.temporal.sync.AsyncReplicationActivity;
import io.airbyte.workers.temporal.sync.InvokeOperationsActivity;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivity;
import io.airbyte.workers.temporal.sync.ReportRunTimeActivity;
import io.airbyte.workers.temporal.sync.WorkloadStatusCheckActivity;
import io.airbyte.workers.temporal.workflows.ConnectorCommandActivity;
Expand Down Expand Up @@ -87,14 +86,13 @@ public PayloadChecker payloadChecker(final MetricClient metricClient) {
@Singleton
@Named("syncActivities")
public List<Object> syncActivities(final ConfigFetchActivity configFetchActivity,
final RefreshSchemaActivity refreshSchemaActivity,
final ReportRunTimeActivity reportRunTimeActivity,
final InvokeOperationsActivity invokeOperationsActivity,
final AsyncReplicationActivity asyncReplicationActivity,
final WorkloadStatusCheckActivity workloadStatusCheckActivity,
final DiscoverCatalogHelperActivity discoverCatalogHelperActivity) {
return List.of(configFetchActivity, refreshSchemaActivity, reportRunTimeActivity,
invokeOperationsActivity, asyncReplicationActivity, workloadStatusCheckActivity, discoverCatalogHelperActivity);
return List.of(configFetchActivity, reportRunTimeActivity, invokeOperationsActivity, asyncReplicationActivity,
workloadStatusCheckActivity, discoverCatalogHelperActivity);
}

@Singleton
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ public class SyncWorkflowImpl implements SyncWorkflow {

private static final HashFunction HASH_FUNCTION = Hashing.md5();

@TemporalActivityStub(activityOptionsBeanName = "refreshSchemaActivityOptions")
private RefreshSchemaActivity refreshSchemaActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private ConfigFetchActivity configFetchActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
Expand Down Expand Up @@ -119,10 +117,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,

final Optional<UUID> sourceId = getSourceId(syncInput);
final RefreshSchemaActivityOutput refreshSchemaOutput;
final int removeShouldRereshSchemaVersion = Workflow.getVersion("REMOVE_SHOULD_REFRESH_SCHEMA", DEFAULT_VERSION, 1);
final boolean shouldRefreshSchema = sourceId.isPresent() && removeShouldRereshSchemaVersion == DEFAULT_VERSION
? refreshSchemaActivity.shouldRefreshSchema(sourceId.get())
: true;

try {
final JsonNode sourceConfig = configFetchActivity.getSourceConfig(sourceId.get());
refreshSchemaOutput = runDiscoverAsChildWorkflow(jobRunConfig, sourceLauncherConfig, syncInput, sourceConfig);
Expand Down Expand Up @@ -182,12 +177,10 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
: syncInput.getConnectionContext().getSourceDefinitionId(),
startTime,
discoverSchemaEndTime,
replicationEndTime,
shouldRefreshSchema));
replicationEndTime));
}

// TODO: remove shouldRefreshSchema on the activity is removed
if (shouldRefreshSchema && syncOutput.getStandardSyncSummary() != null && syncOutput.getStandardSyncSummary().getTotalStats() != null) {
if (syncOutput.getStandardSyncSummary() != null && syncOutput.getStandardSyncSummary().getTotalStats() != null) {
syncOutput.getStandardSyncSummary().getTotalStats().setDiscoverSchemaEndTime(discoverSchemaEndTime);
syncOutput.getStandardSyncSummary().getTotalStats().setDiscoverSchemaStartTime(startTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ data class ReportRunTimeActivityInput(
val startTime: Long,
val refreshSchemaEndTime: Long,
val replicationEndTime: Long,
val shouldRefreshSchema: Boolean,
) {
class Builder
@JvmOverloads
Expand All @@ -20,7 +19,6 @@ data class ReportRunTimeActivityInput(
private val startTime: Long? = null,
private val refreshSchemaEndTime: Long? = null,
private val replicationEndTime: Long? = null,
private val shouldRefreshSchema: Boolean? = null,
) {
fun build(): ReportRunTimeActivityInput {
return ReportRunTimeActivityInput(
Expand All @@ -29,7 +27,6 @@ data class ReportRunTimeActivityInput(
startTime!!,
refreshSchemaEndTime!!,
replicationEndTime!!,
shouldRefreshSchema!!,
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@ class ReportRunTimeActivityImpl(private val metricClient: MetricClient) : Report
val connectionTag = MetricAttribute(MetricTags.CONNECTION_ID, input.connectionId.toString())
val sourceDefinitionTag = MetricAttribute(MetricTags.SOURCE_DEFINITION_ID, input.sourceDefinitionId.toString())

if (input.shouldRefreshSchema) {
metricClient.count(
OssMetricsRegistry.DISCOVER_CATALOG_RUN_TIME,
runTimeRefresh,
connectionTag,
sourceDefinitionTag,
)
}
metricClient.count(
OssMetricsRegistry.DISCOVER_CATALOG_RUN_TIME,
runTimeRefresh,
connectionTag,
sourceDefinitionTag,
)
metricClient.count(OssMetricsRegistry.REPLICATION_RUN_TIME, runTimeReplication, connectionTag, sourceDefinitionTag)
metricClient.count(OssMetricsRegistry.SYNC_TOTAL_TIME, totalWorkflowRunTime, connectionTag, sourceDefinitionTag)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import io.airbyte.config.secrets.persistence.SecretPersistence;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivityImpl;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivity;
import io.airbyte.workers.temporal.sync.RefreshSchemaActivityImpl;
import io.airbyte.workers.temporal.sync.WebhookOperationActivity;
import io.airbyte.workers.temporal.sync.WebhookOperationActivityImpl;
import io.micronaut.context.annotation.Bean;
Expand Down Expand Up @@ -57,9 +55,6 @@ class DataPlaneActivityInitializationMicronautTest {
@Inject
ConfigFetchActivity configFetchActivity;

@Inject
RefreshSchemaActivity refreshSchemaActivity;

@Inject
WebhookOperationActivity webhookOperationActivity;

Expand All @@ -68,11 +63,6 @@ void testConfigFetchActivity() {
assertEquals(ConfigFetchActivityImpl.class, configFetchActivity.getClass());
}

@Test
void testRefreshSchemaActivity() {
assertEquals(RefreshSchemaActivityImpl.class, refreshSchemaActivity.getClass());
}

@Test
void testWebhookOperationActivity() {
assertEquals(WebhookOperationActivityImpl.class, webhookOperationActivity.getClass());
Expand Down

This file was deleted.

Loading

0 comments on commit 153436d

Please sign in to comment.