Skip to content

Commit

Permalink
[#9666] Fixed realtime to maintain valid value
Browse files Browse the repository at this point in the history
  • Loading branch information
smilu97 committed May 22, 2023
1 parent b1572be commit d28f0e0
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.navercorp.pinpoint.realtime.dto.ATCSupply;
import reactor.core.publisher.Flux;

import java.util.function.Supplier;
import java.util.function.Function;

/**
* @author youngjin.kim2
Expand All @@ -30,19 +30,32 @@ class ActiveThreadCountFetcherFactory implements FetcherFactory<ClusterKey, ATCS

private final PubSubFluxClient<ATCDemand, ATCSupply> endpoint;
private final long recordMaxAgeNanos;
private final long prepareTimeoutNanos;

ActiveThreadCountFetcherFactory(PubSubFluxClient<ATCDemand, ATCSupply> endpoint, long recordMaxAgeNanos) {
ActiveThreadCountFetcherFactory(
PubSubFluxClient<ATCDemand, ATCSupply> endpoint,
long recordMaxAgeNanos,
long prepareTimeoutNanos
) {
this.endpoint = endpoint;
this.recordMaxAgeNanos = recordMaxAgeNanos;
this.prepareTimeoutNanos = prepareTimeoutNanos;
}

@Override
public Fetcher<ATCSupply> getFetcher(ClusterKey key) {
return new OptimisticFetcher<>(this.makeValueSupplier(key), this.recordMaxAgeNanos);
return new OptimisticFetcher<>(this.makeValueSupplier(key), this.recordMaxAgeNanos, this.prepareTimeoutNanos);
}

private Supplier<Flux<ATCSupply>> makeValueSupplier(ClusterKey key) {
return () -> this.endpoint.request(makeDemand(key));
private Function<Integer, Flux<ATCSupply>> makeValueSupplier(ClusterKey key) {
return i -> {
final Flux<ATCSupply> response = this.endpoint.request(makeDemand(key));
if (i == 0) {
return response;
} else {
return response.filter(el -> !el.getValues().isEmpty());
}
};
}

private ATCDemand makeDemand(ClusterKey key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class ActiveThreadCountWebDaoConfig {
@Value("${pinpoint.web.realtime.atc.supply.expireInMs:3000}")
long supplyExpireInMs;

@Value("${pinpoint.web.realtime.atc.supply.prepareInMs:10000}")
long prepareInMs;

@Bean
PubSubFluxClient<ATCDemand, ATCSupply> atcEndpoint(PubSubClientFactory clientFactory) {
return clientFactory.build(RealtimePubSubServiceDescriptors.ATC);
Expand All @@ -62,7 +65,9 @@ FetcherFactory<ClusterKey, ATCSupply> atcSupplyFetcherFactory(
Cache<ClusterKey, Fetcher<ATCSupply>> fetcherCache
) {
final long recordMaxAgeNanos = TimeUnit.MILLISECONDS.toNanos(supplyExpireInMs);
final ActiveThreadCountFetcherFactory fetcherFactory = new ActiveThreadCountFetcherFactory(endpoint, recordMaxAgeNanos);
final long prepareInNanos = TimeUnit.MILLISECONDS.toNanos(prepareInMs);
final ActiveThreadCountFetcherFactory fetcherFactory
= new ActiveThreadCountFetcherFactory(endpoint, recordMaxAgeNanos, prepareInNanos);
return new CachedFetcherFactory<>(fetcherFactory, fetcherCache);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.function.Function;

/**
* @author youngjin.kim2
Expand All @@ -34,27 +35,34 @@ public class OptimisticFetcher<T> implements Fetcher<T> {

private static final Logger logger = LogManager.getLogger(OptimisticFetcher.class);

private final Supplier<Flux<T>> valueSupplier;
private final Function<Integer, Flux<T>> valueSupplier;
private final long recordMaxAgeNanos;
private final long prepareTimeoutNanos;

private final AtomicReference<Record<T>> recordRef = new AtomicReference<>();
private final Throttle prepareThrottle = new MinTermThrottle(TimeUnit.SECONDS.toNanos(3));
private final AtomicInteger numPrepared = new AtomicInteger(0);
private final AtomicLong latestPrepareTime = new AtomicLong(0);

public OptimisticFetcher(Supplier<Flux<T>> valueSupplier, long recordMaxAgeNanos) {
public OptimisticFetcher(
Function<Integer, Flux<T>> valueSupplier,
long recordMaxAgeNanos,
long prepareTimeoutNanos
) {
this.valueSupplier = Objects.requireNonNull(valueSupplier, "valueSupplier");
this.recordMaxAgeNanos = recordMaxAgeNanos;
this.prepareTimeoutNanos = prepareTimeoutNanos;
}

@Override
public T fetch() {
final Record<T> latestRecord = this.recordRef.get();
if (latestRecord == null || latestRecord.isOld(System.nanoTime() - this.recordMaxAgeNanos)) {
if (latestRecord == null || latestRecord.olderThan(System.nanoTime() - this.recordMaxAgeNanos)) {
prepareForNext();
return null;
}

if (this.latestPrepareTime.get() < System.nanoTime() - TimeUnit.SECONDS.toNanos(12)) {
if (this.latestPrepareTime.get() < System.nanoTime() - this.prepareTimeoutNanos) {
prepareForNext();
}

Expand All @@ -64,19 +72,21 @@ public T fetch() {
private void prepareForNext() {
if (this.prepareThrottle.hit()) {
logger.debug("Fetcher Started");
this.valueSupplier.get()
final long prepareTime = System.nanoTime();
this.valueSupplier.apply(this.numPrepared.getAndIncrement())
.doOnNext(item -> logger.trace("Fetcher Received: {}", item))
.doOnComplete(() -> logger.debug("Fetcher Completed"))
.subscribe(this::put);
this.latestPrepareTime.set(System.nanoTime());
this.latestPrepareTime.set(prepareTime);
}
}

private void put(T supply) {
if (supply == null) {
return;
}
this.recordRef.set(new Record<>(supply));
final Record<T> nextRecord = new Record<>(supply);
this.recordRef.set(nextRecord);
}

private static final class Record<T> {
Expand All @@ -93,7 +103,7 @@ T getValue() {
return this.value;
}

boolean isOld(long thresholdNanos) {
boolean olderThan(long thresholdNanos) {
return this.createdAt < thresholdNanos;
}

Expand Down

0 comments on commit d28f0e0

Please sign in to comment.