Skip to content

Commit

Permalink
[pinpoint-apm#10976] Fix acquire leak
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed May 8, 2024
1 parent dd53b87 commit 83b73d6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public List<CompletableFuture<Void>> put(TableName tableName, List<Put> puts) {

private void acquire(int size, TableName tableName) {
if (!this.limiter.acquire(size)) {
this.limiter.release(size);
throw new RequestNotPermittedException("max concurrent requests reached. table:" + tableName, false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;

import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -82,4 +83,23 @@ void put_error() {
});
assertEquals(0, helper.count());
}


@Test
void put_acquire() {
when(putWriter.put(any(TableName.class), any(Put.class)))
.then((Answer<CompletableFuture<Void>>) invocation -> new CompletableFuture<>());

ConcurrencyLimiterHelper helper = new ConcurrencyLimiterHelper(1);
RateLimiterPutWriter writer = new RateLimiterPutWriter(putWriter, helper);

CompletableFuture<Void> future = writer.put(tableName, new Put(new byte[10]));
Assertions.assertThrows(RuntimeException.class, () -> {
writer.put(tableName, new Put(new byte[10]));
});
assertEquals(1, helper.count());
future.complete(null);
assertEquals(0, helper.count());

}
}

0 comments on commit 83b73d6

Please sign in to comment.