From 6f538a79daabd0a065f1e0f19188ea72963b12aa Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Fri, 19 Jan 2024 10:06:53 +0800 Subject: [PATCH] [#1466] feat(server): introduce the JvmPauseMonitor to detect the gc pause (#1470) ### What changes were proposed in this pull request? Introduce the JvmPauseMonitor to detect the gc pause ### Why are the changes needed? Fix: #1466 We have some loop check logic in shuffle-server, sometimes the GC pause will make some pre-allocated buffer removed. If we have this monitor, we will find out this reason rather than guess. Although the JVM metrics are also valid. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests --- .../uniffle/common/util/JvmPauseMonitor.java | 219 ++++++++++++++++++ .../apache/uniffle/server/ShuffleServer.java | 10 + 2 files changed, 229 insertions(+) create mode 100644 common/src/main/java/org/apache/uniffle/common/util/JvmPauseMonitor.java diff --git a/common/src/main/java/org/apache/uniffle/common/util/JvmPauseMonitor.java b/common/src/main/java/org/apache/uniffle/common/util/JvmPauseMonitor.java new file mode 100644 index 0000000000..59f3b54c00 --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/util/JvmPauseMonitor.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.util; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.management.GarbageCollectorMXBean; +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.config.RssConf; + +/** + * Class which sets up a simple thread which runs in a loop sleeping for a short interval of time. + * If the sleep takes significantly longer than its target time, it implies that the JVM or host + * machine has paused processing, which may cause other problems. If such a pause is detected, the + * thread logs a message. + */ +public class JvmPauseMonitor implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(JvmPauseMonitor.class); + + /** The target sleep time */ + private static final long SLEEP_INTERVAL_MS = 500; + + /** log WARN if we detect a pause longer than this threshold */ + private long warnThresholdMs; + + private static final String WARN_THRESHOLD_KEY = "jvm.pause.warn-threshold.ms"; + private static final long WARN_THRESHOLD_DEFAULT = 10000; + + /** log INFO if we detect a pause longer than this threshold */ + private long infoThresholdMs; + + private static final String INFO_THRESHOLD_KEY = "jvm.pause.info-threshold.ms"; + private static final long INFO_THRESHOLD_DEFAULT = 1000; + + private long numGcWarnThresholdExceeded = 0; + private long numGcInfoThresholdExceeded = 0; + private long totalGcExtraSleepTime = 0; + + private Thread monitorThread; + private volatile boolean shouldRun = true; + + public JvmPauseMonitor(RssConf rssConf) { + this.warnThresholdMs = rssConf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT); + this.infoThresholdMs = rssConf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT); + } + + public void start() { + monitorThread = new Daemon(new Monitor()); + monitorThread.start(); + } + + public boolean isStarted() { + return monitorThread != null; + } + + public long getNumGcWarnThresholdExceeded() { + return numGcWarnThresholdExceeded; + } + + public long getNumGcInfoThresholdExceeded() { + return numGcInfoThresholdExceeded; + } + + public long getTotalGcExtraSleepTime() { + return totalGcExtraSleepTime; + } + + private String formatMessage( + long extraSleepTime, + Map gcTimesAfterSleep, + Map gcTimesBeforeSleep) { + + Set gcBeanNames = + Sets.intersection(gcTimesAfterSleep.keySet(), gcTimesBeforeSleep.keySet()); + List gcDiffs = Lists.newArrayList(); + for (String name : gcBeanNames) { + GcTimes diff = gcTimesAfterSleep.get(name).subtract(gcTimesBeforeSleep.get(name)); + if (diff.gcCount != 0) { + gcDiffs.add("GC pool '" + name + "' had collection(s): " + diff.toString()); + } + } + + String ret = + "Detected pause in JVM or host machine (eg GC): " + + "pause of approximately " + + extraSleepTime + + "ms\n"; + if (gcDiffs.isEmpty()) { + ret += "No GCs detected"; + } else { + ret += StringUtils.join(gcDiffs, "\n"); + } + return ret; + } + + private Map getGcTimes() { + Map map = new HashMap<>(); + List gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); + for (GarbageCollectorMXBean gcBean : gcBeans) { + map.put(gcBean.getName(), new GcTimes(gcBean)); + } + return map; + } + + @Override + public void close() throws IOException { + shouldRun = false; + if (monitorThread != null) { + monitorThread.interrupt(); + try { + monitorThread.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static class GcTimes { + private GcTimes(GarbageCollectorMXBean gcBean) { + gcCount = gcBean.getCollectionCount(); + gcTimeMillis = gcBean.getCollectionTime(); + } + + private GcTimes(long count, long time) { + this.gcCount = count; + this.gcTimeMillis = time; + } + + private GcTimes subtract(GcTimes other) { + return new GcTimes(this.gcCount - other.gcCount, this.gcTimeMillis - other.gcTimeMillis); + } + + @Override + public String toString() { + return "count=" + gcCount + " time=" + gcTimeMillis + "ms"; + } + + private long gcCount; + private long gcTimeMillis; + } + + private class Monitor implements Runnable { + @Override + public void run() { + StopWatch sw = new StopWatch(); + Map gcTimesBeforeSleep = getGcTimes(); + LOG.info("Starting JVM pause monitor"); + while (shouldRun) { + sw.reset().start(); + try { + Thread.sleep(SLEEP_INTERVAL_MS); + } catch (InterruptedException ie) { + return; + } + long extraSleepTime = sw.now(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS; + Map gcTimesAfterSleep = getGcTimes(); + + if (extraSleepTime > warnThresholdMs) { + ++numGcWarnThresholdExceeded; + LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } else if (extraSleepTime > infoThresholdMs) { + ++numGcInfoThresholdExceeded; + LOG.info(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep)); + } + totalGcExtraSleepTime += extraSleepTime; + gcTimesBeforeSleep = gcTimesAfterSleep; + } + } + } + + /** + * Simple 'main' to facilitate manual testing of the pause monitor. + * + *

This main function just leaks memory into a list. Running this class with a 1GB heap will + * very quickly go into "GC hell" and result in log messages about the GC pauses. + * + * @param args args. + * @throws Exception Exception. + */ + @SuppressWarnings("resource") + public static void main(String[] args) throws Exception { + JvmPauseMonitor monitor = new JvmPauseMonitor(new RssConf()); + monitor.start(); + List list = Lists.newArrayList(); + int i = 0; + while (true) { + list.add(String.valueOf(i++)); + } + } +} diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java index 9b296c98fe..358208097b 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java @@ -49,6 +49,7 @@ import org.apache.uniffle.common.security.SecurityContextFactory; import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.common.util.ExitUtils; +import org.apache.uniffle.common.util.JvmPauseMonitor; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.common.util.ThreadUtils; import org.apache.uniffle.common.web.CoalescedCollectorRegistry; @@ -99,6 +100,7 @@ public class ShuffleServer { private Future decommissionFuture; private boolean nettyServerEnabled; private StreamServer streamServer; + private JvmPauseMonitor jvmPauseMonitor; public ShuffleServer(ShuffleServerConf shuffleServerConf) throws Exception { this.shuffleServerConf = shuffleServerConf; @@ -197,11 +199,19 @@ public void stopServer() throws Exception { if (shuffleTaskManager != null) { shuffleTaskManager.stop(); } + if (jvmPauseMonitor != null) { + jvmPauseMonitor.close(); + } running = false; LOG.info("RPC Server Stopped!"); } private void initialization() throws Exception { + // setup jvm pause monitor + final JvmPauseMonitor monitor = new JvmPauseMonitor(shuffleServerConf); + monitor.start(); + this.jvmPauseMonitor = monitor; + boolean testMode = shuffleServerConf.getBoolean(RSS_TEST_MODE_ENABLE); String storageType = shuffleServerConf.get(RSS_STORAGE_TYPE).name(); if (!testMode