Skip to content

Commit

Permalink
Rntbd added centralized monitoring for RntbdChannelEndpoint (#15292)
Browse files Browse the repository at this point in the history
* added centralized monitoring for rntbd channel endpoint

* code review comment addressed, moved to Provider

* cleanup

* every 60 seconds

* fixed comment

* extend auto closable to ensure is free-ed on gc

* cleanup removed a method which was not needed

* fixed warning

* fixed javadoc
  • Loading branch information
moderakh authored Sep 17, 2020
1 parent 8dfecf4 commit 9ff1b62
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,6 @@ public final class RntbdClientChannelPool implements ChannelPool {
true,
Thread.NORM_PRIORITY));

// this is only for debugging monitoring of the health of RNTBD
// TODO: once we are certain no task gets stuck in the rntbd queue remove this
private static final EventExecutor monitoringRntbdChannelPool = new DefaultEventExecutor(new RntbdThreadFactory(
"monitoring-rntbd-channel-pool",
true,
Thread.MIN_PRIORITY));

private static final HashedWheelTimer acquisitionAndIdleEndpointDetectionTimer =
new HashedWheelTimer(new RntbdThreadFactory(
"channel-acquisition-timer",
Expand All @@ -114,6 +107,7 @@ public final class RntbdClientChannelPool implements ChannelPool {
private final Runnable acquisitionTimeoutTask;
private final PooledByteBufAllocatorMetric allocatorMetric;
private final Bootstrap bootstrap;
private final RntbdServiceEndpoint endpoint;
private final EventExecutor executor;
private final ChannelHealthChecker healthChecker;
// private final ScheduledFuture<?> idleStateDetectionScheduledFuture;
Expand All @@ -137,11 +131,7 @@ public final class RntbdClientChannelPool implements ChannelPool {
Comparator.comparingLong((task) -> task.originalPromise.getExpiryTimeInNanos()));

private final ScheduledFuture<?> pendingAcquisitionExpirationFuture;

// this is only for debugging monitoring of the health of RNTBD
// TODO: once we are certain no task gets stuck in the rntbd queue remove this
private final ScheduledFuture<?> monitoringFuture;


/**
* Initializes a newly created {@link RntbdClientChannelPool} instance.
*
Expand All @@ -163,6 +153,7 @@ private RntbdClientChannelPool(
checkNotNull(config, "expected non-null config");
checkNotNull(healthChecker, "expected non-null healthChecker");

this.endpoint = endpoint;
this.poolHandler = new RntbdClientChannelHandler(config, healthChecker);
this.executor = bootstrap.config().group().next();
this.healthChecker = healthChecker;
Expand Down Expand Up @@ -231,7 +222,6 @@ public void onTimeout(AcquireListener task) {
// this.runTasksInPendingAcquisitionQueue();
//
// }, requestTimerResolutionInNanos, requestTimerResolutionInNanos, TimeUnit.NANOSECONDS);
monitoringFuture = startMonitoring();
}

// region Accessors
Expand Down Expand Up @@ -274,6 +264,38 @@ public int channelsAvailableMetrics() {
return this.availableChannels.size();
}

/**
* Gets the number of connections which are getting established.
*
* @return the number of connections which are getting established.
*/
public int attemptingToConnectMetrics() {
return this.connecting.get() ? 1 : 0;
}

/**
* Gets the current tasks in the executor pool
*
* NOTE: this only provides approximation for metrics
*
* @return the current tasks in the executor pool
*/
public int executorTaskQueueMetrics() {
try {
SingleThreadEventExecutor singleThreadEventExecutor = Utils.as(this.executor,
SingleThreadEventExecutor.class);

if (singleThreadEventExecutor != null) {
return singleThreadEventExecutor.pendingTasks();
}
} catch (RuntimeException e) {
if (logger.isDebugEnabled()) {
logger.debug("task-queue unexpected monitoring failure", e);
}
}
return -1;
}

/**
* {@code true} if this {@link RntbdClientChannelPool pool} is closed or {@code false} otherwise.
*
Expand Down Expand Up @@ -410,9 +432,7 @@ public void close() {
this.executor.submit(this::doClose).awaitUninterruptibly(); // block until complete
}
}

this.pendingAcquisitionExpirationFuture.cancel(false);
this.monitoringFuture.cancel(false);
}

/**
Expand Down Expand Up @@ -1533,37 +1553,4 @@ public synchronized Throwable fillInStackTrace() {

// endregion

// TODO: remove when we are confident of RNTBD OOM bug
private ScheduledFuture<?> startMonitoring() {
return monitoringRntbdChannelPool.scheduleAtFixedRate(() -> {
int i = getTaskCount();
if (isInterestingEndpoint()) {
logger.debug("{} total number of tasks on the executor [{}], remote address: [{}], connecting [{}], acquiredChannel [{}], availableChannel [{}], pending acquisition [{}]",
this.hashCode(), i, this.remoteAddress(), connecting.get(), acquiredChannels.size(), availableChannels.size(), pendingAcquisitions.size());
}
}, 0, 60, TimeUnit.SECONDS);
}

// TODO: remove when we are confident of RNTBD OOM bug
private boolean isInterestingEndpoint() {
return true;
}

// TODO: remove when we are confident of RNTBD OOM bug
public int getTaskCount() {

try {
SingleThreadEventExecutor singleThreadEventExecutor = Utils.as(this.executor,
SingleThreadEventExecutor.class);

if (singleThreadEventExecutor != null) {
return singleThreadEventExecutor.pendingTasks();
}
} catch (RuntimeException e) {
if (logger.isDebugEnabled()) {
logger.debug("task-queue unexpected monitoring failure", e);
}
}
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.net.SocketAddress;
import java.net.URI;
import java.time.Instant;
import java.util.stream.Stream;

import static com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient.Options;
Expand All @@ -34,10 +35,25 @@ public interface RntbdEndpoint extends AutoCloseable {

int concurrentRequests();

/**
* @return returns approximate number of connections in the connecting mode.
*/
int gettingEstablishedConnectionsMetrics();

Instant getCreatedTime();

long lastRequestNanoTime();

int channelsMetrics();

int executorTaskQueueMetrics();

long id();

boolean isClosed();

int maxChannels();

SocketAddress remoteAddress();

int requestQueueLength();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,22 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultEventExecutor;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -59,6 +65,8 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
private final AtomicInteger concurrentRequests;
private final long id;
private final AtomicLong lastRequestNanoTime;

private final Instant createdTime;
private final RntbdMetrics metrics;
private final Provider provider;
private final SocketAddress remoteAddress;
Expand Down Expand Up @@ -87,6 +95,7 @@ private RntbdServiceEndpoint(
.option(ChannelOption.SO_KEEPALIVE, true)
.remoteAddress(physicalAddress.getHost(), physicalAddress.getPort());

this.createdTime = Instant.now();
this.channelPool = new RntbdClientChannelPool(this, bootstrap, config);
this.remoteAddress = bootstrap.config().remoteAddress();
this.concurrentRequests = new AtomicInteger();
Expand Down Expand Up @@ -132,6 +141,11 @@ public int concurrentRequests() {
return this.concurrentRequests.get();
}

@Override
public int gettingEstablishedConnectionsMetrics() {
return this.channelPool.attemptingToConnectMetrics();
}

@Override
public long id() {
return this.id;
Expand All @@ -142,10 +156,29 @@ public boolean isClosed() {
return this.closed.get();
}

@Override
public int maxChannels() {
return this.channelPool.channels(true);
}

public long lastRequestNanoTime() {
return this.lastRequestNanoTime.get();
}

@Override
public int channelsMetrics() {
return this.channelPool.channels(true);
}

@Override
public int executorTaskQueueMetrics() {
return this.channelPool.executorTaskQueueMetrics();
}

public Instant getCreatedTime() {
return this.createdTime;
}

@Override
public SocketAddress remoteAddress() {
return this.remoteAddress;
Expand Down Expand Up @@ -222,7 +255,7 @@ public String toString() {
// endregion

// region Privates

private void ensureSuccessWhenReleasedToPool(Channel channel, Future<Void> released) {
if (released.isSuccess()) {
logger.debug("\n [{}]\n {}\n release succeeded", this, channel);
Expand Down Expand Up @@ -338,6 +371,7 @@ public static final class Provider implements RntbdEndpoint.Provider {
private final ConcurrentHashMap<String, RntbdEndpoint> endpoints;
private final NioEventLoopGroup eventLoopGroup;
private final AtomicInteger evictions;
private final RntbdEndpointMonitoringProvider monitoring;
private final RntbdRequestTimer requestTimer;
private final RntbdTransportClient transportClient;

Expand Down Expand Up @@ -370,12 +404,15 @@ public Provider(
this.endpoints = new ConcurrentHashMap<>();
this.evictions = new AtomicInteger();
this.closed = new AtomicBoolean();
this.monitoring = new RntbdEndpointMonitoringProvider(this);
this.monitoring.init();
}

@Override
public void close() {

if (this.closed.compareAndSet(false, true)) {
this.monitoring.close();

for (final RntbdEndpoint endpoint : this.endpoints.values()) {
endpoint.close();
Expand Down Expand Up @@ -437,5 +474,87 @@ private void evict(final RntbdEndpoint endpoint) {
}
}

public static class RntbdEndpointMonitoringProvider implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(RntbdEndpointMonitoringProvider.class);
// this is only for debugging monitoring of the health of RNTBD
// TODO: once we are certain no task gets stuck in the rntbd queue remove this
private static final EventExecutor monitoringRntbdChannelPool = new DefaultEventExecutor(new RntbdThreadFactory(
"monitoring-rntbd-endpoints",
true,
Thread.MIN_PRIORITY));
private static final Duration MONITORING_PERIOD = Duration.ofSeconds(60);
private final Provider provider;
private final static int MAX_TASK_LIMIT = 5_000;

private ScheduledFuture<?> future;

RntbdEndpointMonitoringProvider(Provider provider) {
this.provider = provider;
}

synchronized void init() {
logger.info("Starting RntbdClientChannelPoolMonitoringProvider ...");
this.future = RntbdEndpointMonitoringProvider.monitoringRntbdChannelPool.scheduleAtFixedRate(() -> {
logAllPools();
}, 0, MONITORING_PERIOD.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public synchronized void close() {
logger.info("Shutting down RntbdClientChannelPoolMonitoringProvider ...");
this.future.cancel(false);
this.future = null;
}

synchronized void logAllPools() {
try {
logger.debug("Total number of RntbdClientChannelPool [{}].", provider.endpoints.size());
for (RntbdEndpoint endpoint : provider.endpoints.values()) {
logEndpoint(endpoint);
}
} catch (Exception e) {
logger.error("monitoring unexpected failure", e);
}
}

private void logEndpoint(RntbdEndpoint endpoint) {
if (this.logger.isWarnEnabled() &&
(endpoint.executorTaskQueueMetrics() > MAX_TASK_LIMIT ||
endpoint.requestQueueLength() > MAX_TASK_LIMIT ||
endpoint.gettingEstablishedConnectionsMetrics() > 0 ||
endpoint.channelsMetrics() > endpoint.maxChannels())) {
logger.warn("RntbdEndpoint Identifier {}, Stat {}", getPoolId(endpoint), getPoolStat(endpoint));
} else if (this.logger.isDebugEnabled()) {
logger.debug("RntbdEndpoint Identifier {}, Stat {}", getPoolId(endpoint), getPoolStat(endpoint));
}
}

private String getPoolStat(RntbdEndpoint endpoint) {
return "[ "
+ "poolTaskExecutorSize " + endpoint.executorTaskQueueMetrics()
+ ", lastRequestNanoTime " + Instant.now().minusNanos(
System.nanoTime() - endpoint.lastRequestNanoTime())
+ ", connecting " + endpoint.gettingEstablishedConnectionsMetrics()
+ ", acquiredChannel " + endpoint.channelsAcquiredMetric()
+ ", availableChannel " + endpoint.channelsAvailableMetric()
+ ", pendingAcquisitionSize " + endpoint.requestQueueLength()
+ ", closed " + endpoint.isClosed()
+ " ]";
}

private String getPoolId(RntbdEndpoint endpoint) {
if (endpoint == null) {
return "null";
}

return "[RntbdEndpoint" +
", id " + endpoint.id() +
", remoteAddress " + endpoint.remoteAddress() +
", creationTime " + endpoint.getCreatedTime() +
", hashCode " + endpoint.hashCode() +
"]";
}
}

// endregion
}
Loading

0 comments on commit 9ff1b62

Please sign in to comment.