From 7239ba36359680e4d95fac86a96b1ebafdc4b611 Mon Sep 17 00:00:00 2001 From: Ryan Br Date: Mon, 2 Dec 2024 16:38:44 -0800 Subject: [PATCH] chore: call throughput metric from correct activity. (#14722) --- .../io/airbyte/metrics/lib/OssMetricsRegistry.java | 2 +- .../temporal/sync/AsyncReplicationActivityImpl.java | 8 ++++++++ .../temporal/sync/ReplicationActivityImpl.java | 11 ++--------- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java index dca74c3b03..818e4bbecd 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java @@ -259,7 +259,7 @@ public enum OssMetricsRegistry implements MetricsRegistry { REPLICATION_THROUGHPUT_BPS(MetricEmittingApps.WORKER, "replication_throughput_bps", - "throughput of replication in bps"), + "throughput of replication in bytes per second"), REPLICATION_BYTES_SYNCED(MetricEmittingApps.WORKER, "replication_bytes_synced", "number of bytes synced during replication"), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/AsyncReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/AsyncReplicationActivityImpl.java index 972ff97cf6..837a86361c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/AsyncReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/AsyncReplicationActivityImpl.java @@ -300,6 +300,14 @@ private void traceReplicationSummary(final ReplicationAttemptSummary replication if (replicationSummary.getStatus() != null) { tags.put(REPLICATION_STATUS_KEY, replicationSummary.getStatus().value()); } + if (replicationSummary.getStartTime() != null && replicationSummary.getEndTime() != null && replicationSummary.getBytesSynced() != null) { + final var elapsedMs = replicationSummary.getEndTime() - replicationSummary.getStartTime(); + if (elapsedMs > 0) { + final var elapsedSeconds = elapsedMs / 1000; + final var throughput = replicationSummary.getBytesSynced() / elapsedSeconds; + metricClient.count(OssMetricsRegistry.REPLICATION_THROUGHPUT_BPS, throughput, metricAttributes); + } + } if (!tags.isEmpty()) { ApmTraceUtils.addTagsToTrace(tags); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index a3eeeb056e..4555ed16bd 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -58,8 +58,9 @@ import org.slf4j.LoggerFactory; /** - * Replication temporal activity impl. + * Replication temporal activity impl. Deprecated — See AsyncReplicationActivityImpl */ +@Deprecated @Singleton @SuppressWarnings("PMD.UseVarargs") public class ReplicationActivityImpl implements ReplicationActivity { @@ -281,14 +282,6 @@ private void traceReplicationSummary(final ReplicationAttemptSummary replication if (replicationSummary.getStatus() != null) { tags.put(REPLICATION_STATUS_KEY, replicationSummary.getStatus().value()); } - if (replicationSummary.getStartTime() != null && replicationSummary.getEndTime() != null && replicationSummary.getBytesSynced() != null) { - final var elapsedMs = replicationSummary.getEndTime() - replicationSummary.getStartTime(); - if (elapsedMs > 0) { - final var elapsedSeconds = elapsedMs / 1000; - final var throughput = replicationSummary.getBytesSynced() / elapsedSeconds; - metricClient.count(OssMetricsRegistry.REPLICATION_THROUGHPUT_BPS, throughput, metricAttributes); - } - } if (!tags.isEmpty()) { ApmTraceUtils.addTagsToTrace(tags); }