Skip to content

Commit

Permalink
[apache#1466] feat(server): introduce the JvmPauseMonitor to detect t…
Browse files Browse the repository at this point in the history
…he gc pause (apache#1470)

### What changes were proposed in this pull request?

Introduce the JvmPauseMonitor to detect the gc pause

### Why are the changes needed?

Fix: apache#1466 

We have some loop check logic in shuffle-server, 
sometimes the GC pause will make some pre-allocated buffer  removed.
If we have this monitor, we will find out this reason rather than guess. 
Although the JVM metrics are also valid.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests
  • Loading branch information
zuston authored Jan 19, 2024
1 parent 31dae44 commit 6f538a7
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.util;

import java.io.Closeable;
import java.io.IOException;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.config.RssConf;

/**
* Class which sets up a simple thread which runs in a loop sleeping for a short interval of time.
* If the sleep takes significantly longer than its target time, it implies that the JVM or host
* machine has paused processing, which may cause other problems. If such a pause is detected, the
* thread logs a message.
*/
public class JvmPauseMonitor implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class);

/** The target sleep time */
private static final long SLEEP_INTERVAL_MS = 500;

/** log WARN if we detect a pause longer than this threshold */
private long warnThresholdMs;

private static final String WARN_THRESHOLD_KEY = "jvm.pause.warn-threshold.ms";
private static final long WARN_THRESHOLD_DEFAULT = 10000;

/** log INFO if we detect a pause longer than this threshold */
private long infoThresholdMs;

private static final String INFO_THRESHOLD_KEY = "jvm.pause.info-threshold.ms";
private static final long INFO_THRESHOLD_DEFAULT = 1000;

private long numGcWarnThresholdExceeded = 0;
private long numGcInfoThresholdExceeded = 0;
private long totalGcExtraSleepTime = 0;

private Thread monitorThread;
private volatile boolean shouldRun = true;

public JvmPauseMonitor(RssConf rssConf) {
this.warnThresholdMs = rssConf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
this.infoThresholdMs = rssConf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT);
}

public void start() {
monitorThread = new Daemon(new Monitor());
monitorThread.start();
}

public boolean isStarted() {
return monitorThread != null;
}

public long getNumGcWarnThresholdExceeded() {
return numGcWarnThresholdExceeded;
}

public long getNumGcInfoThresholdExceeded() {
return numGcInfoThresholdExceeded;
}

public long getTotalGcExtraSleepTime() {
return totalGcExtraSleepTime;
}

private String formatMessage(
long extraSleepTime,
Map<String, GcTimes> gcTimesAfterSleep,
Map<String, GcTimes> gcTimesBeforeSleep) {

Set<String> gcBeanNames =
Sets.intersection(gcTimesAfterSleep.keySet(), gcTimesBeforeSleep.keySet());
List<String> gcDiffs = Lists.newArrayList();
for (String name : gcBeanNames) {
GcTimes diff = gcTimesAfterSleep.get(name).subtract(gcTimesBeforeSleep.get(name));
if (diff.gcCount != 0) {
gcDiffs.add("GC pool '" + name + "' had collection(s): " + diff.toString());
}
}

String ret =
"Detected pause in JVM or host machine (eg GC): "
+ "pause of approximately "
+ extraSleepTime
+ "ms\n";
if (gcDiffs.isEmpty()) {
ret += "No GCs detected";
} else {
ret += StringUtils.join(gcDiffs, "\n");
}
return ret;
}

private Map<String, GcTimes> getGcTimes() {
Map<String, GcTimes> map = new HashMap<>();
List<GarbageCollectorMXBean> gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean gcBean : gcBeans) {
map.put(gcBean.getName(), new GcTimes(gcBean));
}
return map;
}

@Override
public void close() throws IOException {
shouldRun = false;
if (monitorThread != null) {
monitorThread.interrupt();
try {
monitorThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

private static class GcTimes {
private GcTimes(GarbageCollectorMXBean gcBean) {
gcCount = gcBean.getCollectionCount();
gcTimeMillis = gcBean.getCollectionTime();
}

private GcTimes(long count, long time) {
this.gcCount = count;
this.gcTimeMillis = time;
}

private GcTimes subtract(GcTimes other) {
return new GcTimes(this.gcCount - other.gcCount, this.gcTimeMillis - other.gcTimeMillis);
}

@Override
public String toString() {
return "count=" + gcCount + " time=" + gcTimeMillis + "ms";
}

private long gcCount;
private long gcTimeMillis;
}

private class Monitor implements Runnable {
@Override
public void run() {
StopWatch sw = new StopWatch();
Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
LOG.info("Starting JVM pause monitor");
while (shouldRun) {
sw.reset().start();
try {
Thread.sleep(SLEEP_INTERVAL_MS);
} catch (InterruptedException ie) {
return;
}
long extraSleepTime = sw.now(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS;
Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();

if (extraSleepTime > warnThresholdMs) {
++numGcWarnThresholdExceeded;
LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
} else if (extraSleepTime > infoThresholdMs) {
++numGcInfoThresholdExceeded;
LOG.info(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
}
totalGcExtraSleepTime += extraSleepTime;
gcTimesBeforeSleep = gcTimesAfterSleep;
}
}
}

/**
* Simple 'main' to facilitate manual testing of the pause monitor.
*
* <p>This main function just leaks memory into a list. Running this class with a 1GB heap will
* very quickly go into "GC hell" and result in log messages about the GC pauses.
*
* @param args args.
* @throws Exception Exception.
*/
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
JvmPauseMonitor monitor = new JvmPauseMonitor(new RssConf());
monitor.start();
List<String> list = Lists.newArrayList();
int i = 0;
while (true) {
list.add(String.valueOf(i++));
}
}
}
10 changes: 10 additions & 0 deletions server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.uniffle.common.security.SecurityContextFactory;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.JvmPauseMonitor;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.common.web.CoalescedCollectorRegistry;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class ShuffleServer {
private Future<?> decommissionFuture;
private boolean nettyServerEnabled;
private StreamServer streamServer;
private JvmPauseMonitor jvmPauseMonitor;

public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception {
this.shuffleServerConf = shuffleServerConf;
Expand Down Expand Up @@ -197,11 +199,19 @@ public void stopServer() throws Exception {
if (shuffleTaskManager != null) {
shuffleTaskManager.stop();
}
if (jvmPauseMonitor != null) {
jvmPauseMonitor.close();
}
running = false;
LOG.info("RPC Server Stopped!");
}

private void initialization() throws Exception {
// setup jvm pause monitor
final JvmPauseMonitor monitor = new JvmPauseMonitor(shuffleServerConf);
monitor.start();
this.jvmPauseMonitor = monitor;

boolean testMode = shuffleServerConf.getBoolean(RSS_TEST_MODE_ENABLE);
String storageType = shuffleServerConf.get(RSS_STORAGE_TYPE).name();
if (!testMode
Expand Down

0 comments on commit 6f538a7

Please sign in to comment.