From d28f0e049ace1f97f43c16243fca51cde32c07ab Mon Sep 17 00:00:00 2001 From: Kim YoungJin Date: Mon, 22 May 2023 18:24:18 +0900 Subject: [PATCH] [#9666] Fixed realtime to maintain valid value --- .../dao/ActiveThreadCountFetcherFactory.java | 23 +++++++++++---- .../dao/ActiveThreadCountWebDaoConfig.java | 7 ++++- .../count/dao/OptimisticFetcher.java | 28 +++++++++++++------ 3 files changed, 43 insertions(+), 15 deletions(-) diff --git a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountFetcherFactory.java b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountFetcherFactory.java index 35f1ada50f81..fb50cec6de31 100644 --- a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountFetcherFactory.java +++ b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountFetcherFactory.java @@ -21,7 +21,7 @@ import com.navercorp.pinpoint.realtime.dto.ATCSupply; import reactor.core.publisher.Flux; -import java.util.function.Supplier; +import java.util.function.Function; /** * @author youngjin.kim2 @@ -30,19 +30,32 @@ class ActiveThreadCountFetcherFactory implements FetcherFactory endpoint; private final long recordMaxAgeNanos; + private final long prepareTimeoutNanos; - ActiveThreadCountFetcherFactory(PubSubFluxClient endpoint, long recordMaxAgeNanos) { + ActiveThreadCountFetcherFactory( + PubSubFluxClient endpoint, + long recordMaxAgeNanos, + long prepareTimeoutNanos + ) { this.endpoint = endpoint; this.recordMaxAgeNanos = recordMaxAgeNanos; + this.prepareTimeoutNanos = prepareTimeoutNanos; } @Override public Fetcher getFetcher(ClusterKey key) { - return new OptimisticFetcher<>(this.makeValueSupplier(key), this.recordMaxAgeNanos); + return new OptimisticFetcher<>(this.makeValueSupplier(key), this.recordMaxAgeNanos, this.prepareTimeoutNanos); } - private Supplier> makeValueSupplier(ClusterKey key) { - return () -> this.endpoint.request(makeDemand(key)); + private Function> makeValueSupplier(ClusterKey key) { + return i -> { + final Flux response = this.endpoint.request(makeDemand(key)); + if (i == 0) { + return response; + } else { + return response.filter(el -> !el.getValues().isEmpty()); + } + }; } private ATCDemand makeDemand(ClusterKey key) { diff --git a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountWebDaoConfig.java b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountWebDaoConfig.java index 8dfcd4a9cb48..338f4991c2f0 100644 --- a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountWebDaoConfig.java +++ b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountWebDaoConfig.java @@ -42,6 +42,9 @@ public class ActiveThreadCountWebDaoConfig { @Value("${pinpoint.web.realtime.atc.supply.expireInMs:3000}") long supplyExpireInMs; + @Value("${pinpoint.web.realtime.atc.supply.prepareInMs:10000}") + long prepareInMs; + @Bean PubSubFluxClient atcEndpoint(PubSubClientFactory clientFactory) { return clientFactory.build(RealtimePubSubServiceDescriptors.ATC); @@ -62,7 +65,9 @@ FetcherFactory atcSupplyFetcherFactory( Cache> fetcherCache ) { final long recordMaxAgeNanos = TimeUnit.MILLISECONDS.toNanos(supplyExpireInMs); - final ActiveThreadCountFetcherFactory fetcherFactory = new ActiveThreadCountFetcherFactory(endpoint, recordMaxAgeNanos); + final long prepareInNanos = TimeUnit.MILLISECONDS.toNanos(prepareInMs); + final ActiveThreadCountFetcherFactory fetcherFactory + = new ActiveThreadCountFetcherFactory(endpoint, recordMaxAgeNanos, prepareInNanos); return new CachedFetcherFactory<>(fetcherFactory, fetcherCache); } diff --git a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/OptimisticFetcher.java b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/OptimisticFetcher.java index c16e626d2e28..8f4a1e4e7ac8 100644 --- a/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/OptimisticFetcher.java +++ b/realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/OptimisticFetcher.java @@ -23,9 +23,10 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; +import java.util.function.Function; /** * @author youngjin.kim2 @@ -34,27 +35,34 @@ public class OptimisticFetcher implements Fetcher { private static final Logger logger = LogManager.getLogger(OptimisticFetcher.class); - private final Supplier> valueSupplier; + private final Function> valueSupplier; private final long recordMaxAgeNanos; + private final long prepareTimeoutNanos; private final AtomicReference> recordRef = new AtomicReference<>(); private final Throttle prepareThrottle = new MinTermThrottle(TimeUnit.SECONDS.toNanos(3)); + private final AtomicInteger numPrepared = new AtomicInteger(0); private final AtomicLong latestPrepareTime = new AtomicLong(0); - public OptimisticFetcher(Supplier> valueSupplier, long recordMaxAgeNanos) { + public OptimisticFetcher( + Function> valueSupplier, + long recordMaxAgeNanos, + long prepareTimeoutNanos + ) { this.valueSupplier = Objects.requireNonNull(valueSupplier, "valueSupplier"); this.recordMaxAgeNanos = recordMaxAgeNanos; + this.prepareTimeoutNanos = prepareTimeoutNanos; } @Override public T fetch() { final Record latestRecord = this.recordRef.get(); - if (latestRecord == null || latestRecord.isOld(System.nanoTime() - this.recordMaxAgeNanos)) { + if (latestRecord == null || latestRecord.olderThan(System.nanoTime() - this.recordMaxAgeNanos)) { prepareForNext(); return null; } - if (this.latestPrepareTime.get() < System.nanoTime() - TimeUnit.SECONDS.toNanos(12)) { + if (this.latestPrepareTime.get() < System.nanoTime() - this.prepareTimeoutNanos) { prepareForNext(); } @@ -64,11 +72,12 @@ public T fetch() { private void prepareForNext() { if (this.prepareThrottle.hit()) { logger.debug("Fetcher Started"); - this.valueSupplier.get() + final long prepareTime = System.nanoTime(); + this.valueSupplier.apply(this.numPrepared.getAndIncrement()) .doOnNext(item -> logger.trace("Fetcher Received: {}", item)) .doOnComplete(() -> logger.debug("Fetcher Completed")) .subscribe(this::put); - this.latestPrepareTime.set(System.nanoTime()); + this.latestPrepareTime.set(prepareTime); } } @@ -76,7 +85,8 @@ private void put(T supply) { if (supply == null) { return; } - this.recordRef.set(new Record<>(supply)); + final Record nextRecord = new Record<>(supply); + this.recordRef.set(nextRecord); } private static final class Record { @@ -93,7 +103,7 @@ T getValue() { return this.value; } - boolean isOld(long thresholdNanos) { + boolean olderThan(long thresholdNanos) { return this.createdAt < thresholdNanos; }