From 6ee1fa97f7f46a1e799d6163287b4bf8da0a39a1 Mon Sep 17 00:00:00 2001 From: hailang Date: Sat, 23 Apr 2022 11:46:33 +0800 Subject: [PATCH] [ISSUE #3303] [type:refactor] add MemorySafeTaskQueue for ShenyuThreadPoolExecutor --- .../concurrent/EagerExecutorService.java | 48 ++++++++++ .../concurrent/MemoryLimitedTaskQueue.java | 54 +++-------- .../MemorySafeLinkedBlockingQueue.java | 89 +++++++++++++++++++ .../concurrent/MemorySafeTaskQueue.java | 58 ++++++++++++ .../concurrent/ShenyuThreadPoolExecutor.java | 8 +- .../shenyu/common/concurrent/TaskQueue.java | 88 ++++++++++++++++++ 6 files changed, 297 insertions(+), 48 deletions(-) create mode 100644 shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/EagerExecutorService.java create mode 100644 shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemorySafeLinkedBlockingQueue.java create mode 100644 shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemorySafeTaskQueue.java create mode 100644 shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/TaskQueue.java diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/EagerExecutorService.java b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/EagerExecutorService.java new file mode 100644 index 000000000000..94c470652caf --- /dev/null +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/EagerExecutorService.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.common.concurrent; + +import java.util.concurrent.ExecutorService; + +/** + * EagerExecutorService. + */ +public interface EagerExecutorService extends ExecutorService { + + /** + * Returns the current number of threads in the pool. + * + * @return the number of threads + */ + int getPoolSize(); + + /** + * Returns the approximate number of threads that are actively + * executing tasks. + * + * @return the number of threads + */ + int getActiveCount(); + + /** + * Returns the maximum allowed number of threads. + * + * @return the maximum allowed number of threads + */ + int getMaximumPoolSize(); +} diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemoryLimitedTaskQueue.java b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemoryLimitedTaskQueue.java index c6a1ec5c5e2e..71d5c2b13179 100644 --- a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemoryLimitedTaskQueue.java +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemoryLimitedTaskQueue.java @@ -18,8 +18,6 @@ package org.apache.shenyu.common.concurrent; import java.lang.instrument.Instrumentation; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; /** * MemoryLimitedTaskQueue in the {@link org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor}. @@ -28,11 +26,11 @@ * That can make the executor create new worker * when the task num is bigger than corePoolSize but less than maximumPoolSize. */ -public class MemoryLimitedTaskQueue extends MemoryLimitedLinkedBlockingQueue { +public class MemoryLimitedTaskQueue extends MemoryLimitedLinkedBlockingQueue implements TaskQueue { private static final long serialVersionUID = -2635853580887179627L; - private ShenyuThreadPoolExecutor executor; + private EagerExecutorService executor; public MemoryLimitedTaskQueue(final Instrumentation inst) { super(inst); @@ -42,50 +40,18 @@ public MemoryLimitedTaskQueue(final long memoryLimit, final Instrumentation inst super(memoryLimit, inst); } - /** - * set the executor. - * - * @param executor executor - */ - public void setExecutor(final ShenyuThreadPoolExecutor executor) { - this.executor = executor; + @Override + public EagerExecutorService getExecutor() { + return executor; } @Override - public boolean offer(final Runnable runnable) { - if (executor == null) { - throw new RejectedExecutionException("The task queue does not have executor!"); - } - - int currentPoolThreadSize = executor.getPoolSize(); - // have free worker. put task into queue to let the worker deal with task. - if (executor.getActiveCount() < currentPoolThreadSize) { - return super.offer(runnable); - } - - // return false to let executor create new worker. - if (currentPoolThreadSize < executor.getMaximumPoolSize()) { - return false; - } - - // currentPoolThreadSize >= max - return super.offer(runnable); + public void setExecutor(final EagerExecutorService executor) { + this.executor = executor; } - /** - * retry offer task. - * - * @param o task - * @param timeout timeout - * @param unit timeout unit - * @return offer success or not - * @throws java.util.concurrent.RejectedExecutionException if executor is terminated. - * @throws java.lang.InterruptedException if the current thread is interrupted. - */ - public boolean retryOffer(final Runnable o, final long timeout, final TimeUnit unit) throws InterruptedException { - if (executor.isShutdown()) { - throw new RejectedExecutionException("Executor is shutdown!"); - } - return super.offer(o, timeout, unit); + @Override + public boolean doOffer(Runnable runnable) { + return super.offer(runnable); } } diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemorySafeLinkedBlockingQueue.java b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemorySafeLinkedBlockingQueue.java new file mode 100644 index 000000000000..a63a38857d79 --- /dev/null +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemorySafeLinkedBlockingQueue.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.common.concurrent; + +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue}, + * does not depend on {@link java.lang.instrument.Instrumentation} and is easier to use than + * {@link org.apache.shenyu.common.concurrent.MemoryLimitedLinkedBlockingQueue}. + */ +public class MemorySafeLinkedBlockingQueue extends LinkedBlockingQueue { + + private static final long serialVersionUID = 8032578371749960142L; + + private int maxFreeMemory; + + public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) { + super(Integer.MAX_VALUE); + this.maxFreeMemory = maxFreeMemory; + } + + public MemorySafeLinkedBlockingQueue(final Collection c, + int maxFreeMemory) { + super(c); + this.maxFreeMemory = maxFreeMemory; + } + + /** + * set the max free memory. + * + * @param maxFreeMemory the max free memory + */ + public void setMaxFreeMemory(final int maxFreeMemory) { + this.maxFreeMemory = maxFreeMemory; + } + + /** + * get the max free memory. + * + * @return the max free memory limit + */ + public int getMaxFreeMemory() { + return maxFreeMemory; + } + + /** + * determine if there is any remaining free memory. + * + * @return true if has free memory + */ + public boolean hasRemainedMemory() { + return MemoryLimitCalculator.maxAvailable() < maxFreeMemory; + } + + @Override + public void put(final E e) throws InterruptedException { + if (hasRemainedMemory()) { + super.put(e); + } + } + + @Override + public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { + return hasRemainedMemory() && super.offer(e, timeout, unit); + } + + @Override + public boolean offer(final E e) { + return hasRemainedMemory() && super.offer(e); + } +} diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemorySafeTaskQueue.java b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemorySafeTaskQueue.java new file mode 100644 index 000000000000..95b0e4538019 --- /dev/null +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemorySafeTaskQueue.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.common.concurrent; + +import java.util.Collection; + +/** + * MemorySafeTaskQueue in the {@link org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor}. + * It offer a task if the executor's submittedTaskCount less than currentPoolThreadSize + * or the currentPoolThreadSize more than executor's maximumPoolSize. + * That can make the executor create new worker + * when the task num is bigger than corePoolSize but less than maximumPoolSize. + */ +public class MemorySafeTaskQueue extends MemorySafeLinkedBlockingQueue implements TaskQueue { + + private static final long serialVersionUID = -1998413481091670338L; + + private EagerExecutorService executor; + + public MemorySafeTaskQueue(int maxFreeMemory) { + super(maxFreeMemory); + } + + public MemorySafeTaskQueue(Collection c, int maxFreeMemory) { + super(c, maxFreeMemory); + } + + @Override + public EagerExecutorService getExecutor() { + return this.executor; + } + + @Override + public void setExecutor(final EagerExecutorService executor) { + this.executor = executor; + } + + @Override + public boolean doOffer(Runnable runnable) { + return super.offer(runnable); + } + +} diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/ShenyuThreadPoolExecutor.java b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/ShenyuThreadPoolExecutor.java index 1a9f93b1f561..431a38043a6b 100644 --- a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/ShenyuThreadPoolExecutor.java +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/ShenyuThreadPoolExecutor.java @@ -26,16 +26,17 @@ /** * ShenyuThreadPoolExecutor. */ -public class ShenyuThreadPoolExecutor extends ThreadPoolExecutor { +public class ShenyuThreadPoolExecutor extends ThreadPoolExecutor implements EagerExecutorService { public ShenyuThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, - final MemoryLimitedTaskQueue workQueue, + final TaskQueue workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + workQueue.setExecutor(this); } @Override @@ -48,8 +49,7 @@ public void execute(final Runnable command) { super.execute(command); } catch (RejectedExecutionException e) { // retry to offer the task into queue. - @SuppressWarnings("all") - final MemoryLimitedTaskQueue queue = (MemoryLimitedTaskQueue) super.getQueue(); + final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { throw new RejectedExecutionException("Queue capacity is full.", e); diff --git a/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/TaskQueue.java b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/TaskQueue.java new file mode 100644 index 000000000000..bf16564687df --- /dev/null +++ b/shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/TaskQueue.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shenyu.common.concurrent; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * TaskQueue. + */ +public interface TaskQueue extends BlockingQueue { + + /** + * get executor. + * + * @return the executor + */ + EagerExecutorService getExecutor(); + + /** + * set the executor. + * + * @param executor executor + */ + void setExecutor(final EagerExecutorService executor); + + @Override + default boolean offer(final E e) { + if (getExecutor() == null) { + throw new RejectedExecutionException("The task queue does not have executor!"); + } + + int currentPoolThreadSize = getExecutor().getPoolSize(); + // have free worker. put task into queue to let the worker deal with task. + if (getExecutor().getActiveCount() < currentPoolThreadSize) { + return doOffer(e); + } + + // return false to let executor create new worker. + if (currentPoolThreadSize < getExecutor().getMaximumPoolSize()) { + return false; + } + + // currentPoolThreadSize >= max + return doOffer(e); + } + + /** + * offer element to the queue. + * + * @param e the element to add + * @return {@code true} if the element was added to this queue, else {@code false} + */ + boolean doOffer(E e); + + /** + * retry offer task. + * + * @param o task + * @param timeout timeout + * @param unit timeout unit + * @return offer success or not + * @throws java.util.concurrent.RejectedExecutionException if executor is terminated. + * @throws java.lang.InterruptedException if the current thread is interrupted. + */ + default boolean retryOffer(final E o, final long timeout, final TimeUnit unit) throws InterruptedException { + if (getExecutor().isShutdown()) { + throw new RejectedExecutionException("Executor is shutdown!"); + } + return offer(o, timeout, unit); + } +}