From 04d8ac7a1636a443cb4c81997ef573570df436cc Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Fri, 29 Dec 2023 17:30:44 +0800 Subject: [PATCH] remove sleep, probably busy-waiting (#1456) Signed-off-by: tomsun28 --- README.md | 5 +- README_CN.md | 5 +- ...nCache.java => ConnectionCommonCache.java} | 67 +++++----- .../collect/common/cache/JdbcConnect.java | 4 +- .../collect/common/cache/JmxConnect.java | 4 +- .../collect/common/cache/MongodbConnect.java | 4 +- .../collect/common/cache/RedisConnect.java | 4 +- .../collect/common/cache/SshConnect.java | 4 +- .../collect/common/http/CommonHttpClient.java | 43 +++---- .../collect/database/JdbcCommonCollect.java | 8 +- .../collector/collect/jmx/JmxCollectImpl.java | 8 +- .../mongodb/MongodbSingleCollectImpl.java | 8 +- .../collect/redis/RedisCommonCollectImpl.java | 10 +- .../collector/collect/ssh/SshCollectImpl.java | 12 +- .../collector/dispatch/CommonDispatcher.java | 116 +++++++----------- .../dispatch/MetricsCollectorQueue.java | 2 +- .../collect/common/cache/CommonCacheTest.java | 4 +- 17 files changed, 134 insertions(+), 174 deletions(-) rename collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/{CommonCache.java => ConnectionCommonCache.java} (74%) diff --git a/README.md b/README.md index 79b6bb96beb..cdb2ecdf363 100644 --- a/README.md +++ b/README.md @@ -413,8 +413,6 @@ HertzBeat is a top project under the [Dromara Open Source Community](https://dro [Reddit Community](https://www.reddit.com/r/hertzbeat/) -[User Club](https://support.qq.com/products/379369) - [Follow Us Twitter](https://twitter.com/hertzbeat1024) [Subscribe YouTube](https://www.youtube.com/channel/UCri75zfWX0GHqJFPENEbLow) @@ -441,8 +439,7 @@ HertzBeat is a top project under the [Dromara Open Source Community](https://dro ##### Sponsor -- Thanks [吉实信息(构建全新的微波+光交易网络)](https://www.flarespeed.com) sponsored server node. -- Thanks [蓝易云(全新智慧上云)](https://www.tsyvps.com/aff/BZBEGYLX) sponsored server node. +- Thanks [吉实信息(构建全新的微波+光交易网络)](https://www.flarespeed.com) sponsored server node. ##### Open-Source Project Build From Open-Source diff --git a/README_CN.md b/README_CN.md index 27102e544e6..cadf7be9188 100644 --- a/README_CN.md +++ b/README_CN.md @@ -418,8 +418,6 @@ HertzBeat 赫兹跳动是 [Dromara开源社区](https://dromara.org/) 下顶级 [Reddit Community](https://www.reddit.com/r/hertzbeat/) -[User Club](https://support.qq.com/products/379369) - [Follow Us Twitter](https://twitter.com/hertzbeat1024) [Subscribe YouTube](https://www.youtube.com/channel/UCri75zfWX0GHqJFPENEbLow) @@ -446,8 +444,7 @@ HertzBeat 赫兹跳动是 [Dromara开源社区](https://dromara.org/) 下顶级 ##### 赞助 -- 感谢 [吉实信息(构建全新的微波+光交易网络)](https://www.flarespeed.com) 赞助服务器采集节点 -- 感谢 [蓝易云(全新智慧上云)](https://www.tsyvps.com/aff/BZBEGYLX) 赞助服务器采集节点 +- 感谢 [吉实信息(构建全新的微波+光交易网络)](https://www.flarespeed.com) 赞助服务器采集节点 ##### Open-Source Project Build From Open-Source diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/CommonCache.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/ConnectionCommonCache.java similarity index 74% rename from collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/CommonCache.java rename to collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/ConnectionCommonCache.java index f59d4945dc5..5f9dd9c7cc1 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/CommonCache.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/ConnectionCommonCache.java @@ -18,6 +18,7 @@ package org.dromara.hertzbeat.collector.collect.common.cache; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import lombok.extern.slf4j.Slf4j; @@ -26,20 +27,20 @@ import java.util.concurrent.*; /** - * lru common resource cache + * lru common resource cache for client-server connection * * @author tomsun28 */ @Slf4j -public class CommonCache { +public class ConnectionCommonCache { /** - * default cache time 800s + * default cache time 200s */ - private static final long DEFAULT_CACHE_TIMEOUT = 800 * 1000L; + private static final long DEFAULT_CACHE_TIMEOUT = 200 * 1000L; /** - * default cache num + * default max cache num */ private static final int DEFAULT_MAX_CAPACITY = 10000; @@ -61,9 +62,9 @@ public class CommonCache { /** * the executor who clean cache when timeout */ - private ThreadPoolExecutor cleanTimeoutExecutor; + private ThreadPoolExecutor timeoutCleanerExecutor; - private CommonCache() { + private ConnectionCommonCache() { init(); } @@ -76,23 +77,25 @@ private void init() { if (value instanceof CacheCloseable) { ((CacheCloseable) value).close(); } - log.info("lru cache discard key: {}, value: {}.", key, value); + log.info("connection common cache discard key: {}, value: {}.", key, value); }).build(); timeoutMap = new ConcurrentHashMap<>(DEFAULT_MAX_CAPACITY >> 6); - cleanTimeoutExecutor = new ThreadPoolExecutor(1, 1, - 1, TimeUnit.SECONDS, + // last-first-coverage algorithm, run the first and last thread, discard mid + timeoutCleanerExecutor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), - r -> new Thread(r, "lru-cache-timeout-cleaner"), + r -> new Thread(r, "connection-cache-timeout-cleaner"), new ThreadPoolExecutor.DiscardOldestPolicy()); // init monitor available detector cyc task - ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1, - r -> new Thread(r, "lru-cache-available-detector")); - scheduledExecutor.scheduleWithFixedDelay(this::detectCacheAvailable, - 2, 20, TimeUnit.MINUTES); + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("connection-cache-ava-detector-%d") + .setDaemon(true) + .build(); + ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory); + scheduledExecutor.scheduleWithFixedDelay(this::detectCacheAvailable, 2, 20, TimeUnit.MINUTES); } /** - * detect all cache available, cleanup not ava object + * detect all cache available, cleanup not ava connection */ private void detectCacheAvailable() { try { @@ -110,7 +113,7 @@ private void detectCacheAvailable() { } }); } catch (Exception e) { - log.error("detect cache available error: {}.", e.getMessage(), e); + log.error("connection common cache detect cache available error: {}.", e.getMessage(), e); } } @@ -127,17 +130,18 @@ private void cleanTimeoutCache() { timeoutMap.put(key, new Long[]{currentTime, DEFAULT_CACHE_TIMEOUT}); } else if (cacheTime[0] + cacheTime[1] < currentTime) { // timeout, remove this object cache - log.warn("[cache] clean the timeout cache, key {}", key); + log.warn("[connection common cache] clean the timeout cache, key {}", key); timeoutMap.remove(key); cacheMap.remove(key); if (value instanceof CacheCloseable) { - log.warn("[cache] close the timeout cache, key {}", key); + log.warn("[connection common cache] close the timeout cache, key {}", key); ((CacheCloseable) value).close(); } } }); + Thread.sleep(20 * 1000); } catch (Exception e) { - log.error("[cache] clean timeout cache error: {}.", e.getMessage(), e); + log.error("[connection common cache] clean timeout cache error: {}.", e.getMessage(), e); } } @@ -155,14 +159,7 @@ public void addCache(Object key, Object value, Long timeDiff) { } cacheMap.put(key, value); timeoutMap.put(key, new Long[]{System.currentTimeMillis(), timeDiff}); - cleanTimeoutExecutor.execute(() -> { - try { - cleanTimeoutCache(); - Thread.sleep(10 * 1000); - } catch (InterruptedException e) { - log.error(e.getMessage(), e); - } - }); + timeoutCleanerExecutor.execute(this::cleanTimeoutCache); } /** @@ -185,18 +182,18 @@ public void addCache(Object key, Object value) { public Optional getCache(Object key, boolean refreshCache) { Long[] cacheTime = timeoutMap.get(key); if (cacheTime == null || cacheTime.length != CACHE_TIME_LENGTH) { - log.info("[cache] not hit the cache, key {}.", key); + log.info("[connection common cache] not hit the cache, key {}.", key); return Optional.empty(); } if (cacheTime[0] + cacheTime[1] < System.currentTimeMillis()) { - log.warn("[cache] is timeout, remove it, key {}.", key); + log.warn("[connection common cache] is timeout, remove it, key {}.", key); timeoutMap.remove(key); cacheMap.remove(key); return Optional.empty(); } Object value = cacheMap.get(key); if (value == null) { - log.error("[cache] value is null, remove it, key {}.", key); + log.error("[connection common cache] value is null, remove it, key {}.", key); cacheMap.remove(key); timeoutMap.remove(key); } else if (refreshCache) { @@ -222,16 +219,16 @@ public void removeCache(Object key) { /** * get common cache instance * - * @return cache + * @return connection common cache */ - public static CommonCache getInstance() { + public static ConnectionCommonCache getInstance() { return SingleInstance.INSTANCE; } /** - * static instance + * static single instance */ private static class SingleInstance { - private static final CommonCache INSTANCE = new CommonCache(); + private static final ConnectionCommonCache INSTANCE = new ConnectionCommonCache(); } } diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/JdbcConnect.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/JdbcConnect.java index 96a545f7e21..4adea6db5ef 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/JdbcConnect.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/JdbcConnect.java @@ -28,7 +28,7 @@ @Slf4j public class JdbcConnect implements CacheCloseable { - private Connection connection; + private final Connection connection; public JdbcConnect(Connection connection) { this.connection = connection; @@ -41,7 +41,7 @@ public void close() { connection.close(); } } catch (Exception e) { - log.error("close jdbc connect error: {}", e.getMessage()); + log.error("[connection common cache] close jdbc connect error: {}", e.getMessage()); } } diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/JmxConnect.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/JmxConnect.java index c240a89d39c..decb4f2ecf8 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/JmxConnect.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/JmxConnect.java @@ -11,7 +11,7 @@ @Slf4j public class JmxConnect implements CacheCloseable { - private JMXConnector connection; + private final JMXConnector connection; public JmxConnect(JMXConnector connection) { this.connection = connection; @@ -25,7 +25,7 @@ public void close() { connection.close(); } } catch (Exception e) { - log.error("close redis connect error: {}", e.getMessage()); + log.error("[connection common cache] close jmx connect error: {}", e.getMessage()); } } diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/MongodbConnect.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/MongodbConnect.java index ea1b061c7be..33fc3b2de85 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/MongodbConnect.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/MongodbConnect.java @@ -9,7 +9,7 @@ */ @Slf4j public class MongodbConnect implements CacheCloseable { - private MongoClient mongoClient; + private final MongoClient mongoClient; public MongodbConnect(MongoClient mongoClient) { this.mongoClient = mongoClient; @@ -22,7 +22,7 @@ public void close() { this.mongoClient.close(); } } catch (Exception e) { - log.error(e.getMessage(), e); + log.error("[connection common cache] close mongodb connect error: {}", e.getMessage()); } } public MongoClient getMongoClient() { diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/RedisConnect.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/RedisConnect.java index 373d3b92a58..7a52d9c0f19 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/RedisConnect.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/RedisConnect.java @@ -27,7 +27,7 @@ @Slf4j public class RedisConnect implements CacheCloseable { - private StatefulConnection connection; + private final StatefulConnection connection; public RedisConnect(StatefulConnection connection) { this.connection = connection; @@ -40,7 +40,7 @@ public void close() { connection.closeAsync(); } } catch (Exception e) { - log.error("close redis connect error: {}", e.getMessage()); + log.error("[connection common cache] close redis connect error: {}", e.getMessage()); } } diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/SshConnect.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/SshConnect.java index 4e33b5ead05..11f1e720b0a 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/SshConnect.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/cache/SshConnect.java @@ -9,7 +9,7 @@ */ @Slf4j public class SshConnect implements CacheCloseable { - private ClientSession clientSession; + private final ClientSession clientSession; public SshConnect(ClientSession clientSession) { this.clientSession = clientSession; @@ -22,7 +22,7 @@ public void close() { clientSession.close(); } } catch (Exception e) { - log.error("close ssh connect error: {}", e.getMessage()); + log.error("[connection common cache] close ssh connect error: {}", e.getMessage()); } } diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/http/CommonHttpClient.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/http/CommonHttpClient.java index 8ffa778f0cd..670fd57b0ae 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/http/CommonHttpClient.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/common/http/CommonHttpClient.java @@ -17,6 +17,7 @@ package org.dromara.hertzbeat.collector.collect.common.http; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.apache.http.client.config.RequestConfig; import org.apache.http.config.Registry; @@ -37,6 +38,9 @@ import java.security.cert.CertificateExpiredException; import java.security.cert.X509Certificate; import java.util.Date; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; /** @@ -76,8 +80,7 @@ public class CommonHttpClient { private static final int SOCKET_TIMEOUT = 60000; /** - * validated time for idle connection - * 空闲连接免检的有效时间,被重用的空闲连接若超过此时间,需检查此连接的可用性 + * validated time for idle connection. if when reuse this connection after this time, we will check it available. */ private static final int INACTIVITY_VALIDATED_TIME = 10000; @@ -91,13 +94,12 @@ public class CommonHttpClient { SSLContext sslContext = SSLContexts.createDefault(); X509TrustManager x509TrustManager = new X509TrustManager() { @Override - public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { } + public void checkClientTrusted(X509Certificate[] x509Certificates, String s) { } @Override public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { - // check server certificate timeout - // 判断服务器证书有效时间 + // check server ssl certificate expired Date now = new Date(); - if (x509Certificates != null && x509Certificates.length > 0) { + if (x509Certificates != null) { for (X509Certificate certificate : x509Certificates) { Date deadline = certificate.getNotAfter(); if (deadline != null && now.after(deadline)) { @@ -130,26 +132,21 @@ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) thr httpClient = HttpClients.custom() .setConnectionManager(connectionManager) .setDefaultRequestConfig(requestConfig) - // 定期清理不可用过期连接 + // clean up unavailable expired connections .evictExpiredConnections() - // 定期清理可用但空闲的连接 + // clean up available but idle connections .evictIdleConnections(100, TimeUnit.SECONDS) .build(); - Thread connectCleaner = new Thread(() -> { - while (Thread.currentThread().isInterrupted()) { - try { - Thread.sleep(30000); - connectionManager.closeExpiredConnections(); - connectionManager.closeIdleConnections(100, TimeUnit.SECONDS); - } catch (InterruptedException e) { - } - } - }); - connectCleaner.setName("http-connection-pool-cleaner"); - connectCleaner.setDaemon(true); - connectCleaner.start(); - } catch (Exception e) { - } + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("http-connection-pool-cleaner-%d") + .setDaemon(true) + .build(); + ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1, threadFactory); + scheduledExecutor.scheduleWithFixedDelay(() -> { + connectionManager.closeExpiredConnections(); + connectionManager.closeIdleConnections(100, TimeUnit.SECONDS); + }, 40L, 40L, TimeUnit.SECONDS); + } catch (Exception ignored) {} } public static CloseableHttpClient getHttpClient() { diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/database/JdbcCommonCollect.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/database/JdbcCommonCollect.java index e8a5d194663..a047fc8c16c 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/database/JdbcCommonCollect.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/database/JdbcCommonCollect.java @@ -20,7 +20,7 @@ import com.mysql.cj.jdbc.exceptions.CommunicationsException; import org.dromara.hertzbeat.collector.collect.AbstractCollect; import org.dromara.hertzbeat.collector.collect.common.cache.CacheIdentifier; -import org.dromara.hertzbeat.collector.collect.common.cache.CommonCache; +import org.dromara.hertzbeat.collector.collect.common.cache.ConnectionCommonCache; import org.dromara.hertzbeat.collector.collect.common.cache.JdbcConnect; import org.dromara.hertzbeat.collector.dispatch.DispatchConstants; import org.dromara.hertzbeat.collector.util.CollectUtil; @@ -138,7 +138,7 @@ private Statement getConnection(String username, String password, String url,Int CacheIdentifier identifier = CacheIdentifier.builder() .ip(url) .username(username).password(password).build(); - Optional cacheOption = CommonCache.getInstance().getCache(identifier, true); + Optional cacheOption = ConnectionCommonCache.getInstance().getCache(identifier, true); Statement statement = null; if (cacheOption.isPresent()) { JdbcConnect jdbcConnect = (JdbcConnect) cacheOption.get(); @@ -161,7 +161,7 @@ private Statement getConnection(String username, String password, String url,Int log.error(e2.getMessage()); } statement = null; - CommonCache.getInstance().removeCache(identifier); + ConnectionCommonCache.getInstance().removeCache(identifier); } } if (statement != null) { @@ -175,7 +175,7 @@ private Statement getConnection(String username, String password, String url,Int statement.setQueryTimeout(timeoutSecond); statement.setMaxRows(1000); JdbcConnect jdbcConnect = new JdbcConnect(connection); - CommonCache.getInstance().addCache(identifier, jdbcConnect); + ConnectionCommonCache.getInstance().addCache(identifier, jdbcConnect); return statement; } diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/jmx/JmxCollectImpl.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/jmx/JmxCollectImpl.java index 7b6b1db3f2c..4c9d4eab835 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/jmx/JmxCollectImpl.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/jmx/JmxCollectImpl.java @@ -2,7 +2,7 @@ import org.dromara.hertzbeat.collector.collect.AbstractCollect; import org.dromara.hertzbeat.collector.collect.common.cache.CacheIdentifier; -import org.dromara.hertzbeat.collector.collect.common.cache.CommonCache; +import org.dromara.hertzbeat.collector.collect.common.cache.ConnectionCommonCache; import org.dromara.hertzbeat.collector.collect.common.cache.JmxConnect; import org.dromara.hertzbeat.collector.dispatch.DispatchConstants; import org.dromara.hertzbeat.common.entity.job.Metrics; @@ -142,7 +142,7 @@ private JMXConnector getConnectSession(JmxProtocol jmxProtocol) throws IOExcepti CacheIdentifier identifier = CacheIdentifier.builder().ip(jmxProtocol.getHost()) .port(jmxProtocol.getPort()).username(jmxProtocol.getUsername()) .password(jmxProtocol.getPassword()).build(); - Optional cacheOption = CommonCache.getInstance().getCache(identifier, true); + Optional cacheOption = ConnectionCommonCache.getInstance().getCache(identifier, true); JMXConnector conn = null; if (cacheOption.isPresent()) { JmxConnect jmxConnect = (JmxConnect) cacheOption.get(); @@ -151,7 +151,7 @@ private JMXConnector getConnectSession(JmxProtocol jmxProtocol) throws IOExcepti conn.getMBeanServerConnection(); } catch (Exception e) { conn = null; - CommonCache.getInstance().removeCache(identifier); + ConnectionCommonCache.getInstance().removeCache(identifier); } } if (conn != null) { @@ -176,7 +176,7 @@ private JMXConnector getConnectSession(JmxProtocol jmxProtocol) throws IOExcepti } JMXServiceURL jmxServiceUrl = new JMXServiceURL(url); conn = JMXConnectorFactory.connect(jmxServiceUrl, environment); - CommonCache.getInstance().addCache(identifier, new JmxConnect(conn)); + ConnectionCommonCache.getInstance().addCache(identifier, new JmxConnect(conn)); return conn; } diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java index a33f443981e..0fb262c6ef0 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/mongodb/MongodbSingleCollectImpl.java @@ -25,7 +25,7 @@ import com.mongodb.MongoServerUnavailableException; import com.mongodb.MongoTimeoutException; import org.dromara.hertzbeat.collector.collect.common.cache.CacheIdentifier; -import org.dromara.hertzbeat.collector.collect.common.cache.CommonCache; +import org.dromara.hertzbeat.collector.collect.common.cache.ConnectionCommonCache; import org.dromara.hertzbeat.collector.collect.common.cache.MongodbConnect; import org.dromara.hertzbeat.common.util.CommonUtil; import org.dromara.hertzbeat.collector.dispatch.DispatchConstants; @@ -176,7 +176,7 @@ private MongoClient getClient(Metrics metrics) { CacheIdentifier identifier = CacheIdentifier.builder() .ip(mongodbProtocol.getHost()).port(mongodbProtocol.getPort()) .username(mongodbProtocol.getUsername()).password(mongodbProtocol.getPassword()).build(); - Optional cacheOption = CommonCache.getInstance().getCache(identifier, true); + Optional cacheOption = ConnectionCommonCache.getInstance().getCache(identifier, true); MongoClient mongoClient = null; if (cacheOption.isPresent()) { MongodbConnect mongodbConnect = (MongodbConnect) cacheOption.get(); @@ -192,7 +192,7 @@ private MongoClient getClient(Metrics metrics) { log.error(e2.getMessage()); } mongoClient = null; - CommonCache.getInstance().removeCache(identifier); + ConnectionCommonCache.getInstance().removeCache(identifier); } } if (mongoClient != null) { @@ -206,7 +206,7 @@ private MongoClient getClient(Metrics metrics) { mongodbProtocol.getDatabase(), mongodbProtocol.getAuthenticationDatabase()); mongoClient = MongoClients.create(url); MongodbConnect mongodbConnect = new MongodbConnect(mongoClient); - CommonCache.getInstance().addCache(identifier, mongodbConnect); + ConnectionCommonCache.getInstance().addCache(identifier, mongodbConnect); return mongoClient; } } diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/redis/RedisCommonCollectImpl.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/redis/RedisCommonCollectImpl.java index 68e78b27f3e..7878532ff2c 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/redis/RedisCommonCollectImpl.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/redis/RedisCommonCollectImpl.java @@ -2,7 +2,7 @@ import org.dromara.hertzbeat.collector.collect.AbstractCollect; import org.dromara.hertzbeat.collector.collect.common.cache.CacheIdentifier; -import org.dromara.hertzbeat.collector.collect.common.cache.CommonCache; +import org.dromara.hertzbeat.collector.collect.common.cache.ConnectionCommonCache; import org.dromara.hertzbeat.collector.collect.common.cache.RedisConnect; import org.dromara.hertzbeat.collector.dispatch.DispatchConstants; import org.dromara.hertzbeat.collector.util.CollectUtil; @@ -166,7 +166,7 @@ private StatefulRedisConnection getSingleConnection(RedisProtoco // reuse connection failed, new one RedisClient redisClient = buildSingleClient(redisProtocol); connection = redisClient.connect(); - CommonCache.getInstance().addCache(identifier, new RedisConnect(connection)); + ConnectionCommonCache.getInstance().addCache(identifier, new RedisConnect(connection)); } return connection; } @@ -204,7 +204,7 @@ private StatefulRedisClusterConnection getClusterConnection(Redi // reuse connection failed, new one RedisClusterClient redisClusterClient = buildClusterClient(redisProtocol); connection = redisClusterClient.connect(); - CommonCache.getInstance().addCache(identifier, new RedisConnect(connection)); + ConnectionCommonCache.getInstance().addCache(identifier, new RedisConnect(connection)); } return connection; } @@ -217,7 +217,7 @@ private StatefulRedisClusterConnection getClusterConnection(Redi */ private StatefulConnection getStatefulConnection(CacheIdentifier identifier) { StatefulConnection connection = null; - Optional cacheOption = CommonCache.getInstance().getCache(identifier, true); + Optional cacheOption = ConnectionCommonCache.getInstance().getCache(identifier, true); if (cacheOption.isPresent()) { RedisConnect redisConnect = (RedisConnect) cacheOption.get(); connection = redisConnect.getConnection(); @@ -228,7 +228,7 @@ private StatefulConnection getStatefulConnection(CacheIdentifier log.info("The redis connect form cache, close error: {}", e.getMessage()); } connection = null; - CommonCache.getInstance().removeCache(identifier); + ConnectionCommonCache.getInstance().removeCache(identifier); } } return connection; diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/ssh/SshCollectImpl.java b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/ssh/SshCollectImpl.java index 392ae36af2b..59c6b166df3 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/collect/ssh/SshCollectImpl.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/collect/ssh/SshCollectImpl.java @@ -23,7 +23,7 @@ import org.apache.sshd.common.util.security.SecurityUtils; import org.dromara.hertzbeat.collector.collect.AbstractCollect; import org.dromara.hertzbeat.collector.collect.common.cache.CacheIdentifier; -import org.dromara.hertzbeat.collector.collect.common.cache.CommonCache; +import org.dromara.hertzbeat.collector.collect.common.cache.ConnectionCommonCache; import org.dromara.hertzbeat.collector.collect.common.cache.SshConnect; import org.dromara.hertzbeat.collector.collect.common.ssh.CommonSshClient; import org.dromara.hertzbeat.collector.dispatch.DispatchConstants; @@ -284,7 +284,7 @@ private void removeConnectSessionCache(SshProtocol sshProtocol) { .ip(sshProtocol.getHost()).port(sshProtocol.getPort()) .username(sshProtocol.getUsername()).password(sshProtocol.getPassword()) .build(); - CommonCache.getInstance().removeCache(identifier); + ConnectionCommonCache.getInstance().removeCache(identifier); } private ClientSession getConnectSession(SshProtocol sshProtocol, int timeout, boolean reuseConnection) @@ -295,18 +295,18 @@ private ClientSession getConnectSession(SshProtocol sshProtocol, int timeout, bo .build(); ClientSession clientSession = null; if (reuseConnection) { - Optional cacheOption = CommonCache.getInstance().getCache(identifier, true); + Optional cacheOption = ConnectionCommonCache.getInstance().getCache(identifier, true); if (cacheOption.isPresent()) { clientSession = ((SshConnect) cacheOption.get()).getConnection(); try { if (clientSession == null || clientSession.isClosed() || clientSession.isClosing()) { clientSession = null; - CommonCache.getInstance().removeCache(identifier); + ConnectionCommonCache.getInstance().removeCache(identifier); } } catch (Exception e) { log.warn(e.getMessage()); clientSession = null; - CommonCache.getInstance().removeCache(identifier); + ConnectionCommonCache.getInstance().removeCache(identifier); } } if (clientSession != null) { @@ -331,7 +331,7 @@ private ClientSession getConnectSession(SshProtocol sshProtocol, int timeout, bo } if (reuseConnection) { SshConnect sshConnect = new SshConnect(clientSession); - CommonCache.getInstance().addCache(identifier, sshConnect); + ConnectionCommonCache.getInstance().addCache(identifier, sshConnect); } return clientSession; } diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java index 1b182f02eb8..79a863d6177 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/CommonDispatcher.java @@ -17,12 +17,12 @@ package org.dromara.hertzbeat.collector.dispatch; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.Gson; import com.google.gson.JsonElement; import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; import org.dromara.hertzbeat.collector.dispatch.entrance.internal.CollectJobService; import org.dromara.hertzbeat.collector.dispatch.timer.Timeout; import org.dromara.hertzbeat.collector.dispatch.timer.TimerDispatch; @@ -34,15 +34,10 @@ import org.dromara.hertzbeat.common.entity.job.Metrics; import org.dromara.hertzbeat.common.entity.message.CollectRep; import org.dromara.hertzbeat.common.queue.CommonDataQueue; -import org.springframework.beans.factory.DisposableBean; import org.springframework.stereotype.Component; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -52,7 +47,7 @@ */ @Component @Slf4j -public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatch, DisposableBean { +public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatch { /** * Collection task timeout value @@ -86,8 +81,6 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc private final List unitConvertList; - private final ThreadPoolExecutor poolExecutor; - private final WorkerPool workerPool; private final String collectorIdentity; @@ -105,20 +98,13 @@ public CommonDispatcher(MetricsCollectorQueue jobRequestQueue, this.workerPool = workerPool; this.collectorIdentity = collectJobService.getCollectorIdentity(); this.metricsTimeoutMonitorMap = new ConcurrentHashMap<>(16); - poolExecutor = new ThreadPoolExecutor(2, 2, 1, - TimeUnit.SECONDS, - new SynchronousQueue<>(), r -> { - Thread thread = new Thread(r); - thread.setDaemon(true); - return thread; - }); this.start(); } public void start() { try { // Pull the collection task from the task queue and put it into the thread pool for execution - poolExecutor.execute(() -> { + workerPool.executeJob(() -> { Thread.currentThread().setName("metrics-task-dispatcher"); while (!Thread.currentThread().isInterrupted()) { MetricsCollect metricsCollect = null; @@ -128,17 +114,10 @@ public void start() { workerPool.executeJob(metricsCollect); } } catch (RejectedExecutionException rejected) { - log.info("[Dispatcher]-the worker pool is full, reject this metrics task, " + - "sleep and put in queue again."); - try { - Thread.sleep(1000); - if (metricsCollect != null) { - metricsCollect.setRunPriority((byte) (metricsCollect.getRunPriority() + 1)); - jobRequestQueue.addJob(metricsCollect); - } - } catch (InterruptedException ignored) { - log.info("[Dispatcher]-metrics-task-dispatcher has been interrupt when sleep to close."); - Thread.currentThread().interrupt(); + log.info("[Dispatcher]-the worker pool is full, reject this metrics task,put in queue again."); + if (metricsCollect != null) { + metricsCollect.setRunPriority((byte) (metricsCollect.getRunPriority() + 1)); + jobRequestQueue.addJob(metricsCollect); } } catch (InterruptedException interruptedException) { log.info("[Dispatcher]-metrics-task-dispatcher has been interrupt to close."); @@ -148,45 +127,45 @@ public void start() { } } }); - // Monitoring metrics collection task execution time - poolExecutor.execute(() -> { - Thread.currentThread().setName("metrics-task-monitor"); - while (!Thread.currentThread().isInterrupted()) { - try { - // Detect whether the collection unit of each metrics has timed out for 4 minutes, - // and if it times out, it will be discarded and an exception will be returned. - long deadline = System.currentTimeMillis() - DURATION_TIME; - for (Map.Entry entry : metricsTimeoutMonitorMap.entrySet()) { - MetricsTime metricsTime = entry.getValue(); - if (metricsTime.getStartTime() < deadline) { - // Metrics collection timeout - WheelTimerTask timerJob = (WheelTimerTask) metricsTime.getTimeout().task(); - CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder() - .setId(timerJob.getJob().getMonitorId()) - .setTenantId(timerJob.getJob().getTenantId()) - .setApp(timerJob.getJob().getApp()) - .setMetrics(metricsTime.getMetrics().getName()) - .setPriority(metricsTime.getMetrics().getPriority()) - .setTime(System.currentTimeMillis()) - .setCode(CollectRep.Code.TIMEOUT).setMsg("collect timeout").build(); - log.error("[Collect Timeout]: \n{}", metricsData); - if (metricsData.getPriority() == 0) { - dispatchCollectData(metricsTime.timeout, metricsTime.getMetrics(), metricsData); - } - metricsTimeoutMonitorMap.remove(entry.getKey()); - } - } - Thread.sleep(20000); - } catch (InterruptedException interruptedException) { - log.info("[Dispatcher]-metrics-task-monitor has been interrupt to close."); - Thread.currentThread().interrupt(); - } catch (Exception e) { - log.error("[Task Monitor]-{}.", e.getMessage(), e); + // monitoring metrics collection task execution timeout + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("metrics-task-timeout-monitor-%d") + .setDaemon(true) + .build(); + ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory); + scheduledExecutor.scheduleWithFixedDelay(this::monitorCollectTaskTimeout, 2, 20, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Common Dispatcher error: {}.", e.getMessage(), e); + } + } + + private void monitorCollectTaskTimeout() { + try { + // Detect whether the collection unit of each metrics has timed out for 4 minutes, + // and if it times out, it will be discarded and an exception will be returned. + long deadline = System.currentTimeMillis() - DURATION_TIME; + for (Map.Entry entry : metricsTimeoutMonitorMap.entrySet()) { + MetricsTime metricsTime = entry.getValue(); + if (metricsTime.getStartTime() < deadline) { + // Metrics collection timeout + WheelTimerTask timerJob = (WheelTimerTask) metricsTime.getTimeout().task(); + CollectRep.MetricsData metricsData = CollectRep.MetricsData.newBuilder() + .setId(timerJob.getJob().getMonitorId()) + .setTenantId(timerJob.getJob().getTenantId()) + .setApp(timerJob.getJob().getApp()) + .setMetrics(metricsTime.getMetrics().getName()) + .setPriority(metricsTime.getMetrics().getPriority()) + .setTime(System.currentTimeMillis()) + .setCode(CollectRep.Code.TIMEOUT).setMsg("collect timeout").build(); + log.error("[Collect Timeout]: \n{}", metricsData); + if (metricsData.getPriority() == 0) { + dispatchCollectData(metricsTime.timeout, metricsTime.getMetrics(), metricsData); } + metricsTimeoutMonitorMap.remove(entry.getKey()); } - }); + } } catch (Exception e) { - log.error("Common Dispatcher error: {}.", e.getMessage(), e); + log.error("[Task Timeout Monitor]-{}.", e.getMessage(), e); } } @@ -383,13 +362,6 @@ private List> getConfigmapFromPreCollectData(CollectRep.M return mapList; } - @Override - public void destroy() throws Exception { - if (poolExecutor != null) { - poolExecutor.shutdownNow(); - } - } - @Data @AllArgsConstructor private static class MetricsTime { diff --git a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/MetricsCollectorQueue.java b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/MetricsCollectorQueue.java index 063a9dc250d..5982fd9e298 100644 --- a/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/MetricsCollectorQueue.java +++ b/collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/MetricsCollectorQueue.java @@ -33,7 +33,7 @@ public class MetricsCollectorQueue { private final PriorityBlockingQueue jobQueue; public MetricsCollectorQueue() { - jobQueue = new PriorityBlockingQueue<>(2000); + jobQueue = new PriorityBlockingQueue<>(); } public void addJob(MetricsCollect job) { diff --git a/collector/src/test/java/org/dromara/hertzbeat/collector/collect/common/cache/CommonCacheTest.java b/collector/src/test/java/org/dromara/hertzbeat/collector/collect/common/cache/CommonCacheTest.java index bc4f237a717..fc8c8a57418 100644 --- a/collector/src/test/java/org/dromara/hertzbeat/collector/collect/common/cache/CommonCacheTest.java +++ b/collector/src/test/java/org/dromara/hertzbeat/collector/collect/common/cache/CommonCacheTest.java @@ -5,7 +5,7 @@ import org.junit.jupiter.api.Test; /** - * Test case for {@link CommonCache} + * Test case for {@link ConnectionCommonCache} */ class CommonCacheTest { @@ -36,4 +36,4 @@ void removeCache() { @Test void getInstance() { } -} \ No newline at end of file +}