Skip to content

Commit

Permalink
Add release history cleaning function (#4813)
Browse files Browse the repository at this point in the history
* add tech-support-qq-4.png

* Update README.md

* Enhance the user experience in the scenario of submitting duplicate keys

* Modify the key-value conflict exception prompt, adjust the code style

* feat(apollo-biz): Add Clearing Function for Release History

* doc(CHANGES.md): Update CHANGES.md

* test(apollo-biz): Added test case testCleanReleaseHistoryTransactionalRollBack

* test(apollo-biz): Try to fix the test case AdminServiceTransactionTest

* refactor(apollo-biz): Modify nativeSQL to JPA query

* doc(apollo-biz): Improve the documentation.

* refactor(apollo-biz): Optimize the logic of releaseHistoryRetentionLimitOverride method

---------

Co-authored-by: Jason Song <nobodyiam@gmail.com>
  • Loading branch information
klboke and nobodyiam authored Mar 27, 2023
1 parent 5b85b42 commit 8344788
Show file tree
Hide file tree
Showing 11 changed files with 499 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Apollo 2.2.0
* [Misc dependency updates](https://github.com/apolloconfig/apollo/pull/4784)
* [Fix the problem that the deletion failure of the system rights management page does not prompt](https://github.com/apolloconfig/apollo/pull/4803)
* [Fix the issue of the system permission management page retrieving non-existent users](https://github.com/apolloconfig/apollo/pull/4802)
* [Add release history cleaning function](https://github.com/apolloconfig/apollo/pull/4813)
* [[Multi-Database Support][pg] Make JdbcUserDetailsManager compat with postgre](https://github.com/apolloconfig/apollo/pull/4790)

------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.springframework.stereotype.Component;

@Component
Expand All @@ -46,12 +47,16 @@ public class BizConfig extends RefreshableConfig {
private static final int DEFAULT_RELEASE_MESSAGE_NOTIFICATION_BATCH = 100;
private static final int DEFAULT_RELEASE_MESSAGE_NOTIFICATION_BATCH_INTERVAL_IN_MILLI = 100;//100ms
private static final int DEFAULT_LONG_POLLING_TIMEOUT = 60; //60s
public static final int DEFAULT_RELEASE_HISTORY_RETENTION_SIZE = -1;

private static final Gson GSON = new Gson();

private static final Type namespaceValueLengthOverrideTypeReference =
new TypeToken<Map<Long, Integer>>() {
}.getType();
private static final Type releaseHistoryRetentionSizeOverrideTypeReference =
new TypeToken<Map<String, Integer>>() {
}.getType();

private final BizDBPropertySource propertySource;

Expand Down Expand Up @@ -154,6 +159,24 @@ public int accessKeyAuthTimeDiffTolerance() {
DEFAULT_ACCESS_KEY_AUTH_TIME_DIFF_TOLERANCE);
}

public int releaseHistoryRetentionSize() {
int count = getIntProperty("apollo.release-history.retention.size", DEFAULT_RELEASE_HISTORY_RETENTION_SIZE);
return checkInt(count, 1, Integer.MAX_VALUE, DEFAULT_RELEASE_HISTORY_RETENTION_SIZE);
}

public Map<String, Integer> releaseHistoryRetentionSizeOverride() {
String overrideString = getValue("apollo.release-history.retention.size.override");
Map<String, Integer> releaseHistoryRetentionSizeOverride = Maps.newHashMap();
if (!Strings.isNullOrEmpty(overrideString)) {
releaseHistoryRetentionSizeOverride =
GSON.fromJson(overrideString, releaseHistoryRetentionSizeOverrideTypeReference);
}
return releaseHistoryRetentionSizeOverride.entrySet()
.stream()
.filter(entry -> entry.getValue() >= 1)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public int releaseMessageCacheScanInterval() {
int interval = getIntProperty("apollo.release-message-cache-scan.interval", DEFAULT_RELEASE_MESSAGE_CACHE_SCAN_INTERVAL);
return checkInt(interval, 1, Integer.MAX_VALUE, DEFAULT_RELEASE_MESSAGE_CACHE_SCAN_INTERVAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.ctrip.framework.apollo.biz.repository;

import com.ctrip.framework.apollo.biz.entity.ReleaseHistory;

import java.util.List;
import java.util.Set;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
Expand All @@ -42,4 +42,8 @@ Page<ReleaseHistory> findByAppIdAndClusterNameAndNamespaceNameOrderByIdDesc(Stri
@Query("update ReleaseHistory set IsDeleted = true, DeletedAt = ROUND(UNIX_TIMESTAMP(NOW(4))*1000), DataChange_LastModifiedBy = ?4 where AppId=?1 and ClusterName=?2 and NamespaceName = ?3 and IsDeleted = false")
int batchDelete(String appId, String clusterName, String namespaceName, String operator);

Page<ReleaseHistory> findByAppIdAndClusterNameAndNamespaceNameAndBranchNameOrderByIdDesc(String appId, String clusterName, String namespaceName, String branchName, Pageable pageable);

List<ReleaseHistory> findFirst100ByAppIdAndClusterNameAndNamespaceNameAndBranchNameAndIdLessThanEqualOrderByIdAsc(String appId, String clusterName, String namespaceName, String branchName, long maxId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,92 @@
*/
package com.ctrip.framework.apollo.biz.service;

import static com.ctrip.framework.apollo.biz.config.BizConfig.DEFAULT_RELEASE_HISTORY_RETENTION_SIZE;

import com.ctrip.framework.apollo.biz.config.BizConfig;
import com.ctrip.framework.apollo.biz.entity.Audit;
import com.ctrip.framework.apollo.biz.entity.ReleaseHistory;
import com.ctrip.framework.apollo.biz.repository.ReleaseHistoryRepository;
import com.ctrip.framework.apollo.biz.repository.ReleaseRepository;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.tracer.Tracer;
import com.google.common.collect.Queues;
import com.google.gson.Gson;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;

import java.util.Date;
import java.util.Map;
import java.util.Set;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionTemplate;

/**
* @author Jason Song(song_s@ctrip.com)
*/
@Service
public class ReleaseHistoryService {
private static final Logger logger = LoggerFactory.getLogger(ReleaseHistoryService.class);
private static final Gson GSON = new Gson();
private static final int CLEAN_QUEUE_MAX_SIZE = 100;
private final BlockingQueue<ReleaseHistory> releaseClearQueue = Queues.newLinkedBlockingQueue(CLEAN_QUEUE_MAX_SIZE);
private final ExecutorService cleanExecutorService = Executors.newSingleThreadExecutor(
ApolloThreadFactory.create("ReleaseHistoryService", true));
private final AtomicBoolean cleanStopped = new AtomicBoolean(false);

private final ReleaseHistoryRepository releaseHistoryRepository;
private final ReleaseRepository releaseRepository;
private final AuditService auditService;
private final BizConfig bizConfig;
private final TransactionTemplate transactionManager;

public ReleaseHistoryService(
final ReleaseHistoryRepository releaseHistoryRepository,
final AuditService auditService) {
final ReleaseRepository releaseRepository,
final AuditService auditService,
final BizConfig bizConfig,
final TransactionTemplate transactionManager) {
this.releaseHistoryRepository = releaseHistoryRepository;
this.releaseRepository = releaseRepository;
this.auditService = auditService;
this.bizConfig = bizConfig;
this.transactionManager = transactionManager;
}

@PostConstruct
private void initialize() {
cleanExecutorService.submit(() -> {
while (!cleanStopped.get() && !Thread.currentThread().isInterrupted()) {
try {
ReleaseHistory releaseHistory = releaseClearQueue.poll(1, TimeUnit.SECONDS);
if (releaseHistory != null) {
this.cleanReleaseHistory(releaseHistory);
} else {
TimeUnit.MINUTES.sleep(1);
}
} catch (Throwable ex) {
logger.error("Clean releaseHistory failed", ex);
Tracer.logError(ex);
}
}
});
}

public Page<ReleaseHistory> findReleaseHistoriesByNamespace(String appId, String clusterName,
String namespaceName, Pageable
Expand Down Expand Up @@ -92,11 +148,86 @@ public ReleaseHistory createReleaseHistory(String appId, String clusterName, Str
auditService.audit(ReleaseHistory.class.getSimpleName(), releaseHistory.getId(),
Audit.OP.INSERT, releaseHistory.getDataChangeCreatedBy());

int releaseHistoryRetentionLimit = this.getReleaseHistoryRetentionLimit(releaseHistory);
if (releaseHistoryRetentionLimit != DEFAULT_RELEASE_HISTORY_RETENTION_SIZE) {
if (!releaseClearQueue.offer(releaseHistory)) {
logger.warn("releaseClearQueue is full, failed to add task to clean queue, " +
"clean queue max size:{}", CLEAN_QUEUE_MAX_SIZE);
}
}
return releaseHistory;
}

@Transactional
public int batchDelete(String appId, String clusterName, String namespaceName, String operator) {
return releaseHistoryRepository.batchDelete(appId, clusterName, namespaceName, operator);
}

private Optional<Long> releaseHistoryRetentionMaxId(ReleaseHistory releaseHistory, int releaseHistoryRetentionSize) {
Page<ReleaseHistory> releaseHistoryPage = releaseHistoryRepository.findByAppIdAndClusterNameAndNamespaceNameAndBranchNameOrderByIdDesc(
releaseHistory.getAppId(),
releaseHistory.getClusterName(),
releaseHistory.getNamespaceName(),
releaseHistory.getBranchName(),
PageRequest.of(releaseHistoryRetentionSize, 1)
);
if (releaseHistoryPage.isEmpty()) {
return Optional.empty();
}
return Optional.of(
releaseHistoryPage
.getContent()
.get(0)
.getId()
);
}

private void cleanReleaseHistory(ReleaseHistory cleanRelease) {
String appId = cleanRelease.getAppId();
String clusterName = cleanRelease.getClusterName();
String namespaceName = cleanRelease.getNamespaceName();
String branchName = cleanRelease.getBranchName();

int retentionLimit = this.getReleaseHistoryRetentionLimit(cleanRelease);
//Second check, if retentionLimit is default value, do not clean
if (retentionLimit == DEFAULT_RELEASE_HISTORY_RETENTION_SIZE) {
return;
}

Optional<Long> maxId = this.releaseHistoryRetentionMaxId(cleanRelease, retentionLimit);
if (!maxId.isPresent()) {
return;
}

boolean hasMore = true;
while (hasMore && !Thread.currentThread().isInterrupted()) {
List<ReleaseHistory> cleanReleaseHistoryList = releaseHistoryRepository.findFirst100ByAppIdAndClusterNameAndNamespaceNameAndBranchNameAndIdLessThanEqualOrderByIdAsc(
appId, clusterName, namespaceName, branchName, maxId.get());
Set<Long> releaseIds = cleanReleaseHistoryList.stream()
.map(ReleaseHistory::getReleaseId)
.collect(Collectors.toSet());

transactionManager.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
releaseHistoryRepository.deleteAll(cleanReleaseHistoryList);
releaseRepository.deleteAllById(releaseIds);
}
});
hasMore = cleanReleaseHistoryList.size() == 100;
}
}

private int getReleaseHistoryRetentionLimit(ReleaseHistory releaseHistory) {
String overrideKey = String.format("%s+%s+%s+%s", releaseHistory.getAppId(),
releaseHistory.getClusterName(), releaseHistory.getNamespaceName(), releaseHistory.getBranchName());

Map<String, Integer> overrideMap = bizConfig.releaseHistoryRetentionSizeOverride();
return overrideMap.getOrDefault(overrideKey, bizConfig.releaseHistoryRetentionSize());
}

@PreDestroy
void stopClean() {
cleanStopped.set(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@
)
public abstract class AbstractIntegrationTest {

protected static final String APP_ID = "kl-app";
protected static final String CLUSTER_NAME = "default";
protected static final String NAMESPACE_NAME = "application";
protected static final String BRANCH_NAME = "default";

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,33 @@ public void testReleaseMessageNotificationBatchWithInvalidNumber() throws Except
assertEquals(defaultBatch, bizConfig.releaseMessageNotificationBatch());
}

@Test
public void testReleaseHistoryRetentionSize() {
int someLimit = 20;
when(environment.getProperty("apollo.release-history.retention.size")).thenReturn(String.valueOf(someLimit));

assertEquals(someLimit, bizConfig.releaseHistoryRetentionSize());
}

@Test
public void testReleaseHistoryRetentionSizeOverride() {
int someOverrideLimit = 10;
String overrideValueString = "{'a+b+c+b':10}";
when(environment.getProperty("apollo.release-history.retention.size.override")).thenReturn(overrideValueString);
int overrideValue = bizConfig.releaseHistoryRetentionSizeOverride().get("a+b+c+b");
assertEquals(someOverrideLimit, overrideValue);

overrideValueString = "{'a+b+c+b':0,'a+b+d+b':2}";
when(environment.getProperty("apollo.release-history.retention.size.override")).thenReturn(overrideValueString);
assertEquals(1, bizConfig.releaseHistoryRetentionSizeOverride().size());
overrideValue = bizConfig.releaseHistoryRetentionSizeOverride().get("a+b+d+b");
assertEquals(2, overrideValue);

overrideValueString = "{}";
when(environment.getProperty("apollo.release-history.retention.size.override")).thenReturn(overrideValueString);
assertEquals(0, bizConfig.releaseHistoryRetentionSizeOverride().size());
}

@Test
public void testReleaseMessageNotificationBatchWithNAN() throws Exception {
String someNAN = "someNAN";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2023 Apollo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.ctrip.framework.apollo.biz.repository;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.ctrip.framework.apollo.biz.AbstractIntegrationTest;
import com.ctrip.framework.apollo.biz.entity.ReleaseHistory;
import java.util.List;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.test.context.jdbc.Sql;
import org.springframework.test.context.jdbc.Sql.ExecutionPhase;

/**
* @author kl (http://kailing.pub)
* @since 2023/3/23
*/
public class ReleaseHistoryRepositoryTest extends AbstractIntegrationTest {

@Autowired
private ReleaseHistoryRepository releaseHistoryRepository;

@Test
@Sql(scripts = "/sql/release-history-test.sql", executionPhase = ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/sql/clean.sql", executionPhase = ExecutionPhase.AFTER_TEST_METHOD)
public void testFindReleaseHistoryRetentionMaxId() {
Page<ReleaseHistory> releaseHistoryPage = releaseHistoryRepository.findByAppIdAndClusterNameAndNamespaceNameAndBranchNameOrderByIdDesc(APP_ID, CLUSTER_NAME, NAMESPACE_NAME, BRANCH_NAME, PageRequest.of(1, 1));
assertEquals(5, releaseHistoryPage.getContent().get(0).getId());

releaseHistoryPage = releaseHistoryRepository.findByAppIdAndClusterNameAndNamespaceNameAndBranchNameOrderByIdDesc(APP_ID, CLUSTER_NAME, NAMESPACE_NAME, BRANCH_NAME, PageRequest.of(2, 1));
assertEquals(4, releaseHistoryPage.getContent().get(0).getId());

releaseHistoryPage = releaseHistoryRepository.findByAppIdAndClusterNameAndNamespaceNameAndBranchNameOrderByIdDesc(APP_ID, CLUSTER_NAME, NAMESPACE_NAME, BRANCH_NAME, PageRequest.of(5, 1));
assertEquals(1, releaseHistoryPage.getContent().get(0).getId());

releaseHistoryRepository.deleteAll();
releaseHistoryPage = releaseHistoryRepository.findByAppIdAndClusterNameAndNamespaceNameAndBranchNameOrderByIdDesc(APP_ID, CLUSTER_NAME, NAMESPACE_NAME, BRANCH_NAME, PageRequest.of(1, 1));
assertTrue(releaseHistoryPage.isEmpty());
}

@Test
@Sql(scripts = "/sql/release-history-test.sql", executionPhase = ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/sql/clean.sql", executionPhase = ExecutionPhase.AFTER_TEST_METHOD)
public void testFindFirst100ByAppIdAndClusterNameAndNamespaceNameAndBranchNameAndIdLessThanEqualOrderByIdAsc() {

int releaseHistoryRetentionSize = 2;
Page<ReleaseHistory> releaseHistoryPage = releaseHistoryRepository.findByAppIdAndClusterNameAndNamespaceNameAndBranchNameOrderByIdDesc(APP_ID, CLUSTER_NAME, NAMESPACE_NAME, BRANCH_NAME, PageRequest.of(releaseHistoryRetentionSize, 1));
long releaseMaxId = releaseHistoryPage.getContent().get(0).getId();
List<ReleaseHistory> releaseHistories = releaseHistoryRepository.findFirst100ByAppIdAndClusterNameAndNamespaceNameAndBranchNameAndIdLessThanEqualOrderByIdAsc(
APP_ID, CLUSTER_NAME, NAMESPACE_NAME, BRANCH_NAME, releaseMaxId);
assertEquals(4, releaseHistories.size());

releaseHistoryRetentionSize = 1;
releaseHistoryPage = releaseHistoryRepository.findByAppIdAndClusterNameAndNamespaceNameAndBranchNameOrderByIdDesc(APP_ID, CLUSTER_NAME, NAMESPACE_NAME, BRANCH_NAME, PageRequest.of(releaseHistoryRetentionSize, 1));
releaseMaxId = releaseHistoryPage.getContent().get(0).getId();
releaseHistories = releaseHistoryRepository.findFirst100ByAppIdAndClusterNameAndNamespaceNameAndBranchNameAndIdLessThanEqualOrderByIdAsc(
APP_ID, CLUSTER_NAME, NAMESPACE_NAME, BRANCH_NAME, releaseMaxId);
assertEquals(5, releaseHistories.size());
}
}
Loading

0 comments on commit 8344788

Please sign in to comment.