Skip to content

Commit

Permalink
fixes #2308 resolve a memory leak issue in the rate-limit handler (#2309
Browse files Browse the repository at this point in the history
)
  • Loading branch information
stevehu authored Aug 8, 2024
1 parent 75555f7 commit 0d3ac74
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class LimitHandler implements MiddlewareHandler {

public LimitHandler() throws Exception{
config = LimitConfig.load();
logger.info("RateLimit started with key type:" + config.getKey().name());
logger.info("RateLimit started with key type {}", config.getKey().name());
rateLimiter = new RateLimiter(config);
}

Expand All @@ -61,7 +61,7 @@ public LimitHandler() throws Exception{
@Deprecated
public LimitHandler(LimitConfig cfg) throws Exception{
config = cfg;
logger.info("RateLimit started with key type:" + config.getKey().name());
logger.info("RateLimit started with key type {}", config.getKey().name());
rateLimiter = new RateLimiter(cfg);
}

Expand Down
26 changes: 15 additions & 11 deletions rate-limit/src/main/java/com/networknt/limit/RateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
/**
* Rate limit logic for light-4j framework. The config will define in the limit.yml config file.
*
* By default Rate limit will handle on the server(service) level. But framework support client and address level limitation
* By default, Rate limit will handle on the server(service) level. But framework support client and address level limitation
*
* @author Gavin Chen
*/
public class RateLimiter {
private static final String LIMIT_KEY_NOT_FOUND = "ERR10073";
protected LimitConfig config;

private Map<String, Map<Long, AtomicLong>> serverTimeMap = new ConcurrentHashMap<>();
private final Map<String, Map<Long, AtomicLong>> serverTimeMap = new ConcurrentHashMap<>();

private Map<String, Map<TimeUnit, Map<Long, AtomicLong>>> directTimeMap = new ConcurrentHashMap<>();
private final Map<String, Map<TimeUnit, Map<Long, AtomicLong>>> directTimeMap = new ConcurrentHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(RateLimiter.class);
static final String UNKNOWN_PREFIX = "/unknown/prefix"; // this is the bucket for all other request path that is not defined in the config
static final String ADDRESS_TYPE = "address";
static final String CLIENT_TYPE = "client";
static final String USER_TYPE = "user";
Expand Down Expand Up @@ -224,13 +225,7 @@ protected RateLimitResponse isAllowDirect(String directKey, String path, String
*/
public RateLimitResponse isAllowByServer(String path) {
long currentTimeWindow = Instant.now().getEpochSecond();
Map<Long, AtomicLong> timeMap = lookupServerTimeMap(path);
if(timeMap == null) {
timeMap = new ConcurrentHashMap<>();
synchronized(this) {
serverTimeMap.put(path, timeMap);
}
}
Map<Long, AtomicLong> timeMap = lookupServerTimeMap(path); // defined and unknown one if not defined.
LimitQuota limitQuota = config.getServer() != null ? lookupLimitQuota(path) : null;
if(limitQuota == null) {
limitQuota = this.config.getRateLimit().get(0);
Expand Down Expand Up @@ -264,7 +259,16 @@ private Map<Long, AtomicLong> lookupServerTimeMap(String path) {
}
}
if(prefix == null) {
return null;
// the request path is not in the defined path prefix. Use the default path prefix UNKNOWN_PREFIX.
if(!serverTimeMap.containsKey(UNKNOWN_PREFIX)) {
Map<Long, AtomicLong> timeMap = new ConcurrentHashMap<>();
synchronized(this) {
serverTimeMap.put(UNKNOWN_PREFIX, timeMap);
}
return timeMap;
} else {
return serverTimeMap.get(UNKNOWN_PREFIX);
}
} else {
return serverTimeMap.get(prefix);
}
Expand Down
205 changes: 0 additions & 205 deletions rate-limit/src/main/java/com/networknt/limit/RequestLimit.java

This file was deleted.

42 changes: 42 additions & 0 deletions rate-limit/src/test/java/com/networknt/limit/RateLimiterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -135,4 +136,45 @@ public RateLimitResponse callByAddressAsync() throws Exception {
return rateLimiterAddress.isAllowDirect(address, "/v1/address", RateLimiter.ADDRESS_TYPE);
}

@Test
public void testByServerMemoryLeak() throws Exception {
List<RateLimitResponse> responseList = new ArrayList<>();
Callable<RateLimitResponse> task =this::callByServerAsyncRandom;
List<Callable<RateLimitResponse>> tasks = Collections.nCopies(12, task);

//change the thread number here to test multi-threads
ExecutorService executorService = Executors.newFixedThreadPool(1);
List<Future<RateLimitResponse>> futures = executorService.invokeAll(tasks);
for (Future<RateLimitResponse> future : futures) {
responseList.add(future.get());
}

// Assert.assertEquals(responseList.size(), 12);
List<RateLimitResponse> rejects = responseList.stream().filter(r->!r.isAllow()).collect(Collectors.toList());
Assert.assertEquals(rejects.size(), 2);
executorService.shutdown();
}

// Method to generate a random string of a given length
public static String generateRandomString(int length) {
// Characters to choose from
String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
Random random = new Random();
StringBuilder sb = new StringBuilder(length);

// Loop to append random characters to the StringBuilder
for (int i = 0; i < length; i++) {
int randomIndex = random.nextInt(characters.length());
sb.append(characters.charAt(randomIndex));
}

return sb.toString();
}

public RateLimitResponse callByServerAsyncRandom() throws Exception {
LimitQuota limitQuota = limitConfig.getServer().get("/v1/" + generateRandomString(10));
return rateLimiter.isAllowByServer( "/v1/" + generateRandomString(10));
}


}

0 comments on commit 0d3ac74

Please sign in to comment.