From 4b2a1173b2030abaac83bfc2cb4e6ad60fb8af70 Mon Sep 17 00:00:00 2001 From: tomsun28 Date: Tue, 14 Mar 2023 15:52:13 +0800 Subject: [PATCH] bugfix redis collector create too many instance case oom (#734) * bugfix redis collector create too many instance * bugfix redis collector create too many instance * bugfix redis collector create too many instance --- .../collect/common/cache/RedisConnect.java | 1 - .../mongodb/MongodbSingleCollectImpl.java | 3 +- .../redis/RedisClusterCollectImpl.java | 132 --------- .../collect/redis/RedisCommonCollectImpl.java | 280 ++++++++++++------ .../collect/redis/RedisSingleCollectImpl.java | 92 ------ .../collector/collect/ssh/SshCollectImpl.java | 2 +- .../redis/RedisClusterCollectImplTest.java | 8 +- .../redis/RedisSingleCollectImplTest.java | 7 +- .../common/entity/warehouse/History.java | 1 + .../manager/service/JobSchedulerInit.java | 4 +- manager/src/main/resources/application.yml | 2 +- .../warehouse/config/WarehouseProperties.java | 2 +- .../warehouse/store/DataStorageDispatch.java | 1 - .../store/HistoryJpaDatabaseDataStorage.java | 22 +- 14 files changed, 227 insertions(+), 330 deletions(-) delete mode 100644 collector/src/main/java/com/usthe/collector/collect/redis/RedisClusterCollectImpl.java delete mode 100644 collector/src/main/java/com/usthe/collector/collect/redis/RedisSingleCollectImpl.java diff --git a/collector/src/main/java/com/usthe/collector/collect/common/cache/RedisConnect.java b/collector/src/main/java/com/usthe/collector/collect/common/cache/RedisConnect.java index bdeda13d198..f82a39c3bc8 100644 --- a/collector/src/main/java/com/usthe/collector/collect/common/cache/RedisConnect.java +++ b/collector/src/main/java/com/usthe/collector/collect/common/cache/RedisConnect.java @@ -18,7 +18,6 @@ package com.usthe.collector.collect.common.cache; import io.lettuce.core.api.StatefulConnection; -import io.lettuce.core.api.StatefulRedisConnection; import lombok.extern.slf4j.Slf4j; /** diff --git a/collector/src/main/java/com/usthe/collector/collect/mongodb/MongodbSingleCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/mongodb/MongodbSingleCollectImpl.java index 83c71462d4f..b52cd1cf274 100644 --- a/collector/src/main/java/com/usthe/collector/collect/mongodb/MongodbSingleCollectImpl.java +++ b/collector/src/main/java/com/usthe/collector/collect/mongodb/MongodbSingleCollectImpl.java @@ -23,6 +23,7 @@ import java.util.Optional; import com.mongodb.MongoServerUnavailableException; +import com.mongodb.MongoTimeoutException; import com.usthe.collector.collect.common.cache.CacheIdentifier; import com.usthe.collector.collect.common.cache.CommonCache; import com.usthe.collector.collect.common.cache.MongodbConnect; @@ -116,7 +117,7 @@ public void collect(Builder builder, long appId, String app, Metrics metrics) { } fillBuilder(metrics, valueRowBuilder, document); builder.addValues(valueRowBuilder.build()); - } catch (MongoServerUnavailableException unavailableException) { + } catch (MongoServerUnavailableException | MongoTimeoutException unavailableException) { builder.setCode(CollectRep.Code.UN_CONNECTABLE); String message = CommonUtil.getMessageFromThrowable(unavailableException); builder.setMsg(message); diff --git a/collector/src/main/java/com/usthe/collector/collect/redis/RedisClusterCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/redis/RedisClusterCollectImpl.java deleted file mode 100644 index d6dcafcaf63..00000000000 --- a/collector/src/main/java/com/usthe/collector/collect/redis/RedisClusterCollectImpl.java +++ /dev/null @@ -1,132 +0,0 @@ -package com.usthe.collector.collect.redis; - -import com.usthe.collector.collect.common.cache.CacheIdentifier; -import com.usthe.collector.collect.common.cache.CommonCache; -import com.usthe.collector.collect.common.cache.RedisConnect; -import com.usthe.common.entity.job.Metrics; -import com.usthe.common.entity.job.protocol.RedisProtocol; -import com.usthe.common.entity.message.CollectRep; -import com.usthe.common.util.CommonConstants; -import io.lettuce.core.RedisURI; -import io.lettuce.core.cluster.RedisClusterClient; -import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; -import io.lettuce.core.cluster.models.partitions.Partitions; -import io.lettuce.core.cluster.models.partitions.RedisClusterNode; -import io.lettuce.core.resource.ClientResources; -import io.lettuce.core.resource.DefaultClientResources; -import lombok.extern.slf4j.Slf4j; - -import java.net.URI; -import java.util.*; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static com.usthe.common.util.SignConstants.DOUBLE_MARK; - -/** - * @description: Redis 集群指标收集器 - * @author: hdd - * @create: 2023/02/17 - */ -@Slf4j -public class RedisClusterCollectImpl extends RedisCommonCollectImpl { - - private static final String CLUSTER_INFO = "cluster"; - - private static final String UNIQUE_IDENTITY = "identity"; - - private final ClientResources defaultClientResources; - - public RedisClusterCollectImpl() { - defaultClientResources = DefaultClientResources.create(); - } - - - public List> getRedisInfo(Metrics metrics) { - Map> connectionMap = getConnectionList(metrics.getRedis()); - List> list = new ArrayList<>(connectionMap.size()); - connectionMap.forEach((identity, connection) ->{ - String info = connection.sync().info(metrics.getName()); - Map valueMap = parseInfo(info); - valueMap.put(UNIQUE_IDENTITY, identity); - if (Objects.equals(metrics.getName(), CLUSTER_INFO)) { - String clusterNodes = connection.sync().clusterInfo(); - valueMap.putAll(parseInfo(clusterNodes)); - } - if (log.isDebugEnabled()) { - log.debug("[RedisSingleCollectImpl] fetch redis info"); - valueMap.forEach((k, v) -> log.debug("{} : {}", k, v)); - } - list.add(valueMap); - }); - return list; - } - - - - private Map> getConnectionList(RedisProtocol redisProtocol) { - - // first connection - StatefulRedisClusterConnection connection = getConnection(redisProtocol); - Partitions partitions = connection.getPartitions(); - Map> clusterConnectionMap = new HashMap<>(partitions.size()); - for (RedisClusterNode partition : partitions) { - RedisURI uri = partition.getUri(); - StatefulRedisClusterConnection clusterConnection = getConnection(uri, redisProtocol); - clusterConnectionMap.put(doUri(uri.getHost(), uri.getPort()), clusterConnection); - } - return clusterConnectionMap; - } - - - - - - private StatefulRedisClusterConnection getConnection(RedisURI uri, RedisProtocol redisProtocol) { - redisProtocol.setHost(uri.getHost()); - redisProtocol.setPort(String.valueOf(uri.getPort())); - return getConnection(redisProtocol); - } - - - - /** - * obtain StatefulRedisClusterConnection - * - * @param redisProtocol - * @return - */ - private StatefulRedisClusterConnection getConnection(RedisProtocol redisProtocol) { - CacheIdentifier identifier = doIdentifier(redisProtocol); - - StatefulRedisClusterConnection connection = (StatefulRedisClusterConnection) getStatefulConnection(identifier); - if (connection == null) { - // reuse connection failed, new one - RedisClusterClient redisClusterClient = buildClient(redisProtocol); - connection = redisClusterClient.connect(); - CommonCache.getInstance().addCache(identifier, new RedisConnect(connection)); - } - return connection; - } - - - /** - * build single redis client - * - * @param redisProtocol redis protocol config - * @return redis single client - */ - private RedisClusterClient buildClient(RedisProtocol redisProtocol) { - return RedisClusterClient.create(defaultClientResources, redisUri(redisProtocol)); - } - - /** - * build single identity - * @param ip - * @param port - * @return - */ - private String doUri(String ip, Integer port) { - return ip + DOUBLE_MARK + port; - } -} diff --git a/collector/src/main/java/com/usthe/collector/collect/redis/RedisCommonCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/redis/RedisCommonCollectImpl.java index a42139d5abe..0702872eeac 100644 --- a/collector/src/main/java/com/usthe/collector/collect/redis/RedisCommonCollectImpl.java +++ b/collector/src/main/java/com/usthe/collector/collect/redis/RedisCommonCollectImpl.java @@ -11,9 +11,17 @@ import com.usthe.common.entity.message.CollectRep; import com.usthe.common.util.CommonConstants; import com.usthe.common.util.CommonUtil; +import io.lettuce.core.RedisClient; import io.lettuce.core.RedisConnectionException; import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulConnection; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.models.partitions.Partitions; +import io.lettuce.core.cluster.models.partitions.RedisClusterNode; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.core.resource.DefaultClientResources; import lombok.extern.slf4j.Slf4j; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -25,9 +33,11 @@ import static com.usthe.common.util.SignConstants.*; /** - * @description: - * @author: hdd - * @create: 2023/02/19 + * Redis single cluster collector + * + * @author Musk.Chen , hdd + * @version 1.0 + * @date 2022/5/17 */ @Slf4j public class RedisCommonCollectImpl extends AbstractCollect { @@ -35,6 +45,16 @@ public class RedisCommonCollectImpl extends AbstractCollect { private static final String CLUSTER = "3"; + private static final String CLUSTER_INFO = "cluster"; + + private static final String UNIQUE_IDENTITY = "identity"; + + private final ClientResources defaultClientResources; + + public RedisCommonCollectImpl() { + defaultClientResources = DefaultClientResources.create(); + } + @Override public void collect(CollectRep.MetricsData.Builder builder, long appId, String app, Metrics metrics) { try { @@ -46,12 +66,10 @@ public void collect(CollectRep.MetricsData.Builder builder, long appId, String a } try { if (Objects.nonNull(metrics.getRedis().getPattern()) && Objects.equals(metrics.getRedis().getPattern(), CLUSTER)) { - RedisClusterCollectImpl redisClusterCollect = new RedisClusterCollectImpl(); - List> redisInfoList = redisClusterCollect.getRedisInfo(metrics); + List> redisInfoList = getClusterRedisInfo(metrics); doMetricsDataList(builder, redisInfoList, metrics); } else { - RedisSingleCollectImpl redisSingleCollect = new RedisSingleCollectImpl(); - Map redisInfo = redisSingleCollect.getRedisInfo(metrics); + Map redisInfo = getSingleRedisInfo(metrics); doMetricsData(builder, redisInfo, metrics); } } catch (RedisConnectionException connectionException) { @@ -65,79 +83,140 @@ public void collect(CollectRep.MetricsData.Builder builder, long appId, String a builder.setCode(CollectRep.Code.FAIL); builder.setMsg(errorMsg); } + } + /** + * get single redis metrics data + * @param metrics metrics config + * @return data + */ + private Map getSingleRedisInfo(Metrics metrics) { + StatefulRedisConnection connection = getSingleConnection(metrics.getRedis()); + String info = connection.sync().info(metrics.getName()); + Map valueMap = parseInfo(info); + if (log.isDebugEnabled()) { + log.debug("[RedisSingleCollectImpl] fetch redis info"); + valueMap.forEach((k, v) -> log.debug("{} : {}", k, v)); + } + return valueMap; } - @Override - public String supportProtocol() { - return DispatchConstants.PROTOCOL_REDIS; + /** + * get cluster redis metrics data + * @param metrics metrics config + * @return data + */ + private List> getClusterRedisInfo(Metrics metrics) { + Map> connectionMap = getConnectionList(metrics.getRedis()); + List> list = new ArrayList<>(connectionMap.size()); + connectionMap.forEach((identity, connection) ->{ + String info = connection.sync().info(metrics.getName()); + Map valueMap = parseInfo(info); + valueMap.put(UNIQUE_IDENTITY, identity); + if (Objects.equals(metrics.getName(), CLUSTER_INFO)) { + String clusterNodes = connection.sync().clusterInfo(); + valueMap.putAll(parseInfo(clusterNodes)); + } + if (log.isDebugEnabled()) { + log.debug("[RedisSingleCollectImpl] fetch redis info"); + valueMap.forEach((k, v) -> log.debug("{} : {}", k, v)); + } + list.add(valueMap); + }); + return list; } /** - * parse redis info - * - * @param info redis info - * @return parsed redis info + * Build monitoring parameters according to redis info + * @param builder builder + * @param valueMapList map list + * @param metrics metrics */ - protected Map parseInfo(String info) { - String[] lines = info.split(LINE_FEED); - Map result = new HashMap<>(calInitMap(lines.length)); - Arrays.stream(lines) - .filter(it -> StringUtils.hasText(it) && !it.startsWith(WELL_NO) && it.contains(DOUBLE_MARK)) - .map(this::removeCr) - .map(r -> r.split(DOUBLE_MARK)) - .forEach(it -> { - if (it.length > 1) { - result.put(it[0], it[1]); - } - }); - return result; + private void doMetricsDataList(CollectRep.MetricsData.Builder builder, List> valueMapList, Metrics metrics) { + valueMapList.forEach(e -> doMetricsData(builder, e, metrics)); } + /** + * Build monitoring parameters according to redis info + * @param builder builder + * @param valueMap map value + * @param metrics metrics + */ + private void doMetricsData(CollectRep.MetricsData.Builder builder, Map valueMap, Metrics metrics) { + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + metrics.getAliasFields().forEach(it -> { + if (valueMap.containsKey(it)) { + String fieldValue = valueMap.get(it); + valueRowBuilder.addColumns(Objects.requireNonNullElse(fieldValue, CommonConstants.NULL_VALUE)); + } else { + valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); + } + }); + builder.addValues(valueRowBuilder.build()); + } /** - * structure - * - * @param redisProtocol - * @return + * get single connection + * @param redisProtocol protocol + * @return connection */ - protected RedisURI redisUri(RedisProtocol redisProtocol) { - RedisURI redisUri = RedisURI.create(redisProtocol.getHost(), Integer.parseInt(redisProtocol.getPort())); - if (StringUtils.hasText(redisProtocol.getUsername())) { - redisUri.setUsername(redisProtocol.getUsername()); + private StatefulRedisConnection getSingleConnection(RedisProtocol redisProtocol) { + CacheIdentifier identifier = doIdentifier(redisProtocol); + StatefulRedisConnection connection = (StatefulRedisConnection) getStatefulConnection(identifier); + if (Objects.isNull(connection)) { + // reuse connection failed, new one + RedisClient redisClient = buildSingleClient(redisProtocol); + connection = redisClient.connect(); + CommonCache.getInstance().addCache(identifier, new RedisConnect(connection)); } - if (StringUtils.hasText(redisProtocol.getPassword())) { - redisUri.setPassword(redisProtocol.getPassword().toCharArray()); - } - Duration timeout = Duration.ofMillis(CollectUtil.getTimeout(redisProtocol.getTimeout())); - redisUri.setTimeout(timeout); - return redisUri; + return connection; } + /** + * get cluster connect list + * @param redisProtocol protocol + * @return connection map + */ + private Map> getConnectionList(RedisProtocol redisProtocol) { + // first connection + StatefulRedisClusterConnection connection = getClusterConnection(redisProtocol); + Partitions partitions = connection.getPartitions(); + Map> clusterConnectionMap = new HashMap<>(partitions.size()); + for (RedisClusterNode partition : partitions) { + RedisURI uri = partition.getUri(); + redisProtocol.setHost(uri.getHost()); + redisProtocol.setPort(String.valueOf(uri.getPort())); + StatefulRedisClusterConnection clusterConnection = getClusterConnection(redisProtocol); + clusterConnectionMap.put(doUri(uri.getHost(), uri.getPort()), clusterConnection); + } + return clusterConnectionMap; + } /** - * build redis cache key + * obtain StatefulRedisClusterConnection * - * @param redisProtocol - * @return + * @param redisProtocol redis protocol + * @return cluster connection */ - protected CacheIdentifier doIdentifier(RedisProtocol redisProtocol) { - return CacheIdentifier.builder() - .ip(redisProtocol.getHost()) - .port(redisProtocol.getPort()) - .username(redisProtocol.getUsername()) - .password(redisProtocol.getPassword()) - .build(); + private StatefulRedisClusterConnection getClusterConnection(RedisProtocol redisProtocol) { + CacheIdentifier identifier = doIdentifier(redisProtocol); + StatefulRedisClusterConnection connection = (StatefulRedisClusterConnection) getStatefulConnection(identifier); + if (connection == null) { + // reuse connection failed, new one + RedisClusterClient redisClusterClient = buildClusterClient(redisProtocol); + connection = redisClusterClient.connect(); + CommonCache.getInstance().addCache(identifier, new RedisConnect(connection)); + } + return connection; } - /** * get redis connection * - * @param identifier - * @return + * @param identifier identifier + * @return connection */ - protected StatefulConnection getStatefulConnection(CacheIdentifier identifier) { + private StatefulConnection getStatefulConnection(CacheIdentifier identifier) { StatefulConnection connection = null; Optional cacheOption = CommonCache.getInstance().getCache(identifier, true); if (cacheOption.isPresent()) { @@ -156,45 +235,71 @@ protected StatefulConnection getStatefulConnection(CacheIdentifi return connection; } - /** - * Build monitoring parameters according to redis info - * @param builder - * @param valueMapList - * @param metrics + * build cluster redis client + * + * @param redisProtocol redis protocol config + * @return redis cluster client */ - private void doMetricsDataList(CollectRep.MetricsData.Builder builder, List> valueMapList, Metrics metrics) { - valueMapList.forEach(e -> doMetricsData(builder, e, metrics)); + private RedisClusterClient buildClusterClient(RedisProtocol redisProtocol) { + return RedisClusterClient.create(defaultClientResources, redisUri(redisProtocol)); } /** - * Build monitoring parameters according to redis info + * build single redis client * - * @param builder - * @param valueMap - * @param metrics + * @param redisProtocol redis protocol config + * @return redis single client */ - private void doMetricsData(CollectRep.MetricsData.Builder builder, Map valueMap, Metrics metrics) { - CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); - metrics.getAliasFields().forEach(it -> { - if (valueMap.containsKey(it)) { - String fieldValue = valueMap.get(it); - if (fieldValue == null) { - valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); - } else { - valueRowBuilder.addColumns(fieldValue); - } - } else { - valueRowBuilder.addColumns(CommonConstants.NULL_VALUE); - } - }); - builder.addValues(valueRowBuilder.build()); + private RedisClient buildSingleClient(RedisProtocol redisProtocol) { + return RedisClient.create(defaultClientResources, redisUri(redisProtocol)); } + private RedisURI redisUri(RedisProtocol redisProtocol) { + RedisURI redisUri = RedisURI.create(redisProtocol.getHost(), Integer.parseInt(redisProtocol.getPort())); + if (StringUtils.hasText(redisProtocol.getUsername())) { + redisUri.setUsername(redisProtocol.getUsername()); + } + if (StringUtils.hasText(redisProtocol.getPassword())) { + redisUri.setPassword(redisProtocol.getPassword().toCharArray()); + } + Duration timeout = Duration.ofMillis(CollectUtil.getTimeout(redisProtocol.getTimeout())); + redisUri.setTimeout(timeout); + return redisUri; + } + + private String removeCr(String value) { + return value.replace(CARRIAGE_RETURN, ""); + } + + private String doUri(String ip, Integer port) { + return ip + DOUBLE_MARK + port; + } + + private CacheIdentifier doIdentifier(RedisProtocol redisProtocol) { + return CacheIdentifier.builder() + .ip(redisProtocol.getHost()) + .port(redisProtocol.getPort()) + .username(redisProtocol.getUsername()) + .password(redisProtocol.getPassword()) + .build(); + } + + private Map parseInfo(String info) { + String[] lines = info.split(LINE_FEED); + Map result = new HashMap<>(calInitMap(lines.length)); + Arrays.stream(lines) + .filter(it -> StringUtils.hasText(it) && !it.startsWith(WELL_NO) && it.contains(DOUBLE_MARK)) + .map(this::removeCr) + .map(r -> r.split(DOUBLE_MARK)) + .forEach(it -> { + if (it.length > 1) { + result.put(it[0], it[1]); + } + }); + return result; + } - /** - * preCheck params - */ private void preCheck(Metrics metrics) { if (metrics == null || metrics.getRedis() == null) { throw new IllegalArgumentException("Redis collect must has redis params"); @@ -204,10 +309,9 @@ private void preCheck(Metrics metrics) { Assert.hasText(redisProtocol.getPort(), "Redis Protocol port is required."); } - - private String removeCr(String value) { - return value.replace(CARRIAGE_RETURN, ""); + @Override + public String supportProtocol() { + return DispatchConstants.PROTOCOL_REDIS; } - } diff --git a/collector/src/main/java/com/usthe/collector/collect/redis/RedisSingleCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/redis/RedisSingleCollectImpl.java deleted file mode 100644 index 8c613da3c92..00000000000 --- a/collector/src/main/java/com/usthe/collector/collect/redis/RedisSingleCollectImpl.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.usthe.collector.collect.redis; - -import com.usthe.collector.collect.common.cache.CacheIdentifier; -import com.usthe.collector.collect.common.cache.CommonCache; -import com.usthe.collector.collect.common.cache.RedisConnect; -import com.usthe.common.entity.job.Metrics; -import com.usthe.common.entity.job.protocol.RedisProtocol; -import com.usthe.common.entity.message.CollectRep; -import com.usthe.common.util.CommonConstants; -import io.lettuce.core.RedisClient; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.resource.ClientResources; -import io.lettuce.core.resource.DefaultClientResources; -import lombok.extern.slf4j.Slf4j; - -import java.util.Map; -import java.util.Objects; - -/** - * Redis 单机指标收集器 - * - * @author Musk.Chen - * @version 1.0 - * Created by Musk.Chen on 2022/5/17 - */ -@Slf4j -public class RedisSingleCollectImpl extends RedisCommonCollectImpl { - - private final ClientResources defaultClientResources; - - public RedisSingleCollectImpl() { - defaultClientResources = DefaultClientResources.create(); - } - - /** - * - * @param metrics - * @return - */ - public Map getRedisInfo(Metrics metrics) { - StatefulRedisConnection connection = getConnection(metrics.getRedis()); - String info = connection.sync().info(metrics.getName()); - Map valueMap = parseInfo(info); - if (log.isDebugEnabled()) { - log.debug("[RedisSingleCollectImpl] fetch redis info"); - valueMap.forEach((k, v) -> log.debug("{} : {}", k, v)); - } - return valueMap; - } - - - private StatefulRedisConnection getConnection(RedisProtocol redisProtocol) { - CacheIdentifier identifier = doIdentifier(redisProtocol); - StatefulRedisConnection connection = (StatefulRedisConnection) getStatefulConnection(identifier); - if (Objects.isNull(connection)) { - // reuse connection failed, new one - RedisClient redisClient = buildClient(redisProtocol); - connection = redisClient.connect(); - CommonCache.getInstance().addCache(identifier, new RedisConnect(connection)); - } - return connection; - } - - /** - * build single redis client - * - * @param redisProtocol redis protocol config - * @return redis single client - */ - private RedisClient buildClient(RedisProtocol redisProtocol) { - return RedisClient.create(defaultClientResources, redisUri(redisProtocol)); - } - - -} diff --git a/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java b/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java index 8989f1e707c..48c5ee572cf 100644 --- a/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java +++ b/collector/src/main/java/com/usthe/collector/collect/ssh/SshCollectImpl.java @@ -96,7 +96,7 @@ public void collect(CollectRep.MetricsData.Builder builder, long appId, String a String result = response.toString(); if (!StringUtils.hasText(result)) { builder.setCode(CollectRep.Code.FAIL); - builder.setMsg("采集数据失败"); + builder.setMsg("collect response data is null"); } switch (sshProtocol.getParseType()) { case PARSE_TYPE_NETCAT: diff --git a/collector/src/test/java/com/usthe/collector/collect/redis/RedisClusterCollectImplTest.java b/collector/src/test/java/com/usthe/collector/collect/redis/RedisClusterCollectImplTest.java index f0c090e8211..21356e8243e 100644 --- a/collector/src/test/java/com/usthe/collector/collect/redis/RedisClusterCollectImplTest.java +++ b/collector/src/test/java/com/usthe/collector/collect/redis/RedisClusterCollectImplTest.java @@ -14,9 +14,9 @@ import java.util.List; /** - * Test case for {@link RedisClusterCollectImpl} - * @author: hdd - * @create: 2023/02/17 + * Test case for {@link RedisCommonCollectImpl} + * @author hdd + * @date 2023/02/17 */ @ExtendWith(MockitoExtension.class) public class RedisClusterCollectImplTest { @@ -25,7 +25,7 @@ public class RedisClusterCollectImplTest { private RedisProtocol redisProtocol; @InjectMocks - private RedisClusterCollectImpl redisClusterCollect; + private RedisCommonCollectImpl redisClusterCollect; @BeforeEach void setUp() { diff --git a/collector/src/test/java/com/usthe/collector/collect/redis/RedisSingleCollectImplTest.java b/collector/src/test/java/com/usthe/collector/collect/redis/RedisSingleCollectImplTest.java index 8087e5257f1..801222729b1 100644 --- a/collector/src/test/java/com/usthe/collector/collect/redis/RedisSingleCollectImplTest.java +++ b/collector/src/test/java/com/usthe/collector/collect/redis/RedisSingleCollectImplTest.java @@ -1,6 +1,5 @@ package com.usthe.collector.collect.redis; -import com.usthe.collector.collect.common.cache.CommonCache; import com.usthe.common.entity.job.Metrics; import com.usthe.common.entity.job.protocol.RedisProtocol; import com.usthe.common.entity.message.CollectRep; @@ -16,7 +15,7 @@ /** - * Test case for {@link RedisSingleCollectImpl} + * Test case for {@link RedisCommonCollectImpl} */ @ExtendWith(MockitoExtension.class) class RedisSingleCollectImplTest { @@ -26,7 +25,7 @@ class RedisSingleCollectImplTest { private RedisProtocol redisProtocol; @InjectMocks - private RedisSingleCollectImpl redisSingleCollect; + private RedisCommonCollectImpl redisSingleCollect; @BeforeEach void setUp() { @@ -51,4 +50,4 @@ void collect() { metrics.setAliasFields(aliasField); redisSingleCollect.collect(builder, 1L, "test", metrics); } -} \ No newline at end of file +} diff --git a/common/src/main/java/com/usthe/common/entity/warehouse/History.java b/common/src/main/java/com/usthe/common/entity/warehouse/History.java index 24b4a0f0540..0ccec1f59f0 100644 --- a/common/src/main/java/com/usthe/common/entity/warehouse/History.java +++ b/common/src/main/java/com/usthe/common/entity/warehouse/History.java @@ -50,6 +50,7 @@ public class History { private Byte metricType; @Schema(title = "字符值") + @Column(length = 1024) private String str; @Schema(title = "数值") diff --git a/manager/src/main/java/com/usthe/manager/service/JobSchedulerInit.java b/manager/src/main/java/com/usthe/manager/service/JobSchedulerInit.java index 8fb95cb684a..19eb0348f0c 100644 --- a/manager/src/main/java/com/usthe/manager/service/JobSchedulerInit.java +++ b/manager/src/main/java/com/usthe/manager/service/JobSchedulerInit.java @@ -77,7 +77,9 @@ public void run(String... args) throws Exception { new Configmap(param.getField(), param.getValue(), param.getType())).collect(Collectors.toList()); appDefine.setConfigmap(configmaps); // 下发采集任务 - collectJobService.addAsyncCollectJob(appDefine); + long jobId = collectJobService.addAsyncCollectJob(appDefine); + monitor.setJobId(jobId); + monitorDao.save(monitor); } catch (Exception e) { log.error("init monitor job: {} error,continue next monitor", monitor, e); } diff --git a/manager/src/main/resources/application.yml b/manager/src/main/resources/application.yml index 47a6610dbda..db9cc13d587 100644 --- a/manager/src/main/resources/application.yml +++ b/manager/src/main/resources/application.yml @@ -101,7 +101,7 @@ warehouse: # 存储历史数据方式, 下方只能enabled启用一种方式 jpa: enabled: true - expire-time: 7D + expire-time: 1h td-engine: enabled: false driver-class-name: com.taosdata.jdbc.rs.RestfulDriver diff --git a/warehouse/src/main/java/com/usthe/warehouse/config/WarehouseProperties.java b/warehouse/src/main/java/com/usthe/warehouse/config/WarehouseProperties.java index 8734f52082a..37582c9f7b1 100644 --- a/warehouse/src/main/java/com/usthe/warehouse/config/WarehouseProperties.java +++ b/warehouse/src/main/java/com/usthe/warehouse/config/WarehouseProperties.java @@ -245,7 +245,7 @@ public static class JpaProperties { /** * save data expire time(ms) */ - private String expireTime = "7D"; + private String expireTime = "1h"; public boolean isEnabled() { return enabled; diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/DataStorageDispatch.java b/warehouse/src/main/java/com/usthe/warehouse/store/DataStorageDispatch.java index 32bb5680110..d6416d66de8 100644 --- a/warehouse/src/main/java/com/usthe/warehouse/store/DataStorageDispatch.java +++ b/warehouse/src/main/java/com/usthe/warehouse/store/DataStorageDispatch.java @@ -71,7 +71,6 @@ protected void startStoragePersistentData() { } }; workerPool.executeJob(runnable); - workerPool.executeJob(runnable); } diff --git a/warehouse/src/main/java/com/usthe/warehouse/store/HistoryJpaDatabaseDataStorage.java b/warehouse/src/main/java/com/usthe/warehouse/store/HistoryJpaDatabaseDataStorage.java index 59652b5b902..bfe7dad2e3c 100644 --- a/warehouse/src/main/java/com/usthe/warehouse/store/HistoryJpaDatabaseDataStorage.java +++ b/warehouse/src/main/java/com/usthe/warehouse/store/HistoryJpaDatabaseDataStorage.java @@ -22,6 +22,7 @@ import java.time.ZonedDateTime; import java.time.temporal.TemporalAmount; import java.util.*; +import java.util.concurrent.TimeUnit; /** * data storage by mysql/h2 - jpa @@ -37,6 +38,8 @@ public class HistoryJpaDatabaseDataStorage extends AbstractHistoryDataStorage { private HistoryDao historyDao; private WarehouseProperties.StoreProperties.JpaProperties jpaProperties; + private static final int STRING_MAX_LENGTH = 1024; + public HistoryJpaDatabaseDataStorage(WarehouseProperties properties, HistoryDao historyDao) { this.jpaProperties = properties.getStore().getJpa(); @@ -44,7 +47,7 @@ public HistoryJpaDatabaseDataStorage(WarehouseProperties properties, this.historyDao = historyDao; } - @Scheduled(cron = "0 0 23 * * ?") + @Scheduled( fixedDelay = 60, timeUnit = TimeUnit.MINUTES) public void expiredDataCleaner() { String expireTimeStr = jpaProperties.getExpireTime(); long expireTime = 0; @@ -91,7 +94,7 @@ void saveData(CollectRep.MetricsData metricsData) { for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { String instance = valueRow.getInstance(); if (!instance.isEmpty()) { - instance = String.format("\"%s\"", instance); + instance = formatStrValue(instance); historyBuilder.instance(instance); } else { historyBuilder.instance(null); @@ -105,7 +108,7 @@ void saveData(CollectRep.MetricsData metricsData) { .dou(Double.parseDouble(valueRow.getColumns(i))); } else if (fieldsList.get(i).getType() == CommonConstants.TYPE_STRING) { historyBuilder.metricType(CommonConstants.TYPE_STRING) - .str(valueRow.getColumns(i)); + .str(formatStrValue(valueRow.getColumns(i))); } } else { if (fieldsList.get(i).getType() == CommonConstants.TYPE_NUMBER) { @@ -185,6 +188,19 @@ public Map> getHistoryMetricData(Long monitorId, String app, return instanceValuesMap; } + private String formatStrValue(String value) { + if (value == null) { + return ""; + } + value = value.replace("'", "\\'"); + value = value.replace("\"", "\\\""); + value = value.replace("*", "-"); + value = String.format("`%s`", value); + if (value.length() > STRING_MAX_LENGTH) { + value = value.substring(0, STRING_MAX_LENGTH - 1); + } + return value; + } @Override public Map> getHistoryIntervalMetricData(Long monitorId, String app, String metrics, String metric, String instance, String history) {