Skip to content

Commit

Permalink
[ISSUE apache#3303] [type:refactor] add MemorySafeTaskQueue for Sheny…
Browse files Browse the repository at this point in the history
…uThreadPoolExecutor
  • Loading branch information
loongs-zhang committed Apr 25, 2022
1 parent 2eef279 commit fb43590
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 4 deletions.
2 changes: 2 additions & 0 deletions shenyu-bootstrap/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ shenyu:
# keepAliveTime: 60000
# # 1GB
# maxWorkQueueMemory: 1073741824
# # 256MB
# maxFreeMemory: 268435456

logging:
level:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shenyu.common.concurrent;

import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue},
* does not depend on {@link java.lang.instrument.Instrumentation} and is easier to use than
* {@link org.apache.shenyu.common.concurrent.MemoryLimitedLinkedBlockingQueue}.
*/
public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

private static final long serialVersionUID = 8032578371749960142L;

private int maxFreeMemory;

public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
super(Integer.MAX_VALUE);
this.maxFreeMemory = maxFreeMemory;
}

public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
final int maxFreeMemory) {
super(c);
this.maxFreeMemory = maxFreeMemory;
}

/**
* set the max free memory.
*
* @param maxFreeMemory the max free memory
*/
public void setMaxFreeMemory(final int maxFreeMemory) {
this.maxFreeMemory = maxFreeMemory;
}

/**
* get the max free memory.
*
* @return the max free memory limit
*/
public int getMaxFreeMemory() {
return maxFreeMemory;
}

/**
* determine if there is any remaining free memory.
*
* @return true if has free memory
*/
public boolean hasRemainedMemory() {
return MemoryLimitCalculator.maxAvailable() < maxFreeMemory;
}

@Override
public void put(final E e) throws InterruptedException {
if (hasRemainedMemory()) {
super.put(e);
}
}

@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
return hasRemainedMemory() && super.offer(e, timeout, unit);
}

@Override
public boolean offer(final E e) {
return hasRemainedMemory() && super.offer(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shenyu.common.concurrent;

import java.util.Collection;

/**
* MemorySafeTaskQueue in the {@link org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor}.
* It offer a task if the executor's submittedTaskCount less than currentPoolThreadSize
* or the currentPoolThreadSize more than executor's maximumPoolSize.
* That can make the executor create new worker
* when the task num is bigger than corePoolSize but less than maximumPoolSize.
*/
public class MemorySafeTaskQueue<R extends Runnable> extends MemorySafeLinkedBlockingQueue<Runnable> implements TaskQueue<Runnable> {

private static final long serialVersionUID = -1998413481091670338L;

private EagerExecutorService executor;

public MemorySafeTaskQueue(final int maxFreeMemory) {
super(maxFreeMemory);
}

public MemorySafeTaskQueue(final Collection<? extends Runnable> c, final int maxFreeMemory) {
super(c, maxFreeMemory);
}

@Override
public EagerExecutorService getExecutor() {
return this.executor;
}

@Override
public void setExecutor(final EagerExecutorService executor) {
this.executor = executor;
}

@Override
public boolean doOffer(final Runnable runnable) {
return super.offer(runnable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,14 @@ public static class SharedPool {
* controls memory directly by calculating the memory size used by the blocking queue.
*/
private Long maxWorkQueueMemory = MemoryLimitCalculator.defaultLimit();

/**
* The memory used by the blocking queue is always in the safe range, and there
* is always an attempt to make the JVM's free memory higher than this value.
*
* @see org.apache.shenyu.common.concurrent.MemorySafeLinkedBlockingQueue#getMaxFreeMemory()
*/
private Integer maxFreeMemory;

/**
* Whether to enable shared thread pool.
Expand Down Expand Up @@ -1438,6 +1446,24 @@ public Long getMaxWorkQueueMemory() {
public void setMaxWorkQueueMemory(final Long maxWorkQueueMemory) {
this.maxWorkQueueMemory = maxWorkQueueMemory;
}

/**
* Get shared thread pool max work queue free memory.
*
* @return the shared thread pool max work queue free memory
*/
public Integer getMaxFreeMemory() {
return maxFreeMemory;
}

/**
* Set max work queue free memory.
*
* @param maxFreeMemory the max work queue free memory
*/
public void setMaxFreeMemory(final Integer maxFreeMemory) {
this.maxFreeMemory = maxFreeMemory;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,11 @@ public interface Constants {
*/
String CACHED = "cached";

/**
* The maximum free memory reserved by the blocking queue for the JVM.
*/
int THE_256_MB = 256 * 1024 * 1024;

/**
* String q.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.collect.Maps;
import org.apache.shenyu.common.cache.MemorySafeLRUMap;
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.utils.PathMatchUtils;

Expand All @@ -33,8 +34,6 @@
*/
public final class MetaDataCache {

private static final int THE_256_MB = 256 * 1024 * 1024;

private static final MetaData NULL = new MetaData();

private static final MetaDataCache INSTANCE = new MetaDataCache();
Expand All @@ -44,7 +43,7 @@ public final class MetaDataCache {
*/
private static final ConcurrentMap<String, MetaData> META_DATA_MAP = Maps.newConcurrentMap();

private static final MemorySafeLRUMap<String, MetaData> CACHE = new MemorySafeLRUMap<>(THE_256_MB, 1 << 16);
private static final MemorySafeLRUMap<String, MetaData> CACHE = new MemorySafeLRUMap<>(Constants.THE_256_MB, 1 << 16);

/**
* pathPattern -> path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import net.bytebuddy.agent.ByteBuddyAgent;
import org.apache.shenyu.common.concurrent.MemoryLimitedTaskQueue;
import org.apache.shenyu.common.concurrent.MemorySafeTaskQueue;
import org.apache.shenyu.common.concurrent.ShenyuThreadFactory;
import org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor;
import org.apache.shenyu.common.concurrent.TaskQueue;
import org.apache.shenyu.common.config.ShenyuConfig;
import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.exception.ShenyuException;
import org.apache.shenyu.plugin.api.utils.SpringBeanUtils;
import org.springframework.beans.factory.ObjectProvider;
Expand Down Expand Up @@ -63,6 +65,24 @@ public TaskQueue<Runnable> memoryLimitedTaskQueue(final ShenyuConfig shenyuConfi
return new MemoryLimitedTaskQueue<>(maxWorkQueueMemory, instrumentation);
}

/**
* MemorySafeTaskQueue.
*
* @param shenyuConfig the shenyu config
* @return instance of {@link MemorySafeTaskQueue}
*/
@Bean
@ConditionalOnMissingBean(TaskQueue.class)
@ConditionalOnProperty("shenyu.sharedPool.maxFreeMemory")
public TaskQueue<Runnable> memorySafeTaskQueue(final ShenyuConfig shenyuConfig) {
final ShenyuConfig.SharedPool sharedPool = shenyuConfig.getSharedPool();
final Integer maxFreeMemory = sharedPool.getMaxFreeMemory();
if (maxFreeMemory <= 0) {
throw new ShenyuException("${shenyu.sharedPool.maxFreeMemory} must bigger than 0 !");
}
return new MemorySafeTaskQueue<>(maxFreeMemory);
}

/**
* crate shenyu shared thread pool executor.
*
Expand All @@ -79,7 +99,7 @@ public ShenyuThreadPoolExecutor shenyuThreadPoolExecutor(final ShenyuConfig shen
final Integer maximumPoolSize = sharedPool.getMaximumPoolSize();
final Long keepAliveTime = sharedPool.getKeepAliveTime();
return new ShenyuThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
TimeUnit.MILLISECONDS, provider.getIfAvailable(),
TimeUnit.MILLISECONDS, provider.getIfAvailable(()-> new MemorySafeTaskQueue<>(Constants.THE_256_MB)),
ShenyuThreadFactory.create(sharedPool.getPrefix(), true),
new ThreadPoolExecutor.AbortPolicy());
}
Expand Down

0 comments on commit fb43590

Please sign in to comment.