Skip to content

Commit

Permalink
[ISSUE apache#3303] [type:refactor] add MemorySafeTaskQueue for Sheny…
Browse files Browse the repository at this point in the history
…uThreadPoolExecutor
  • Loading branch information
loongs-zhang committed Apr 23, 2022
1 parent 993f86a commit 6ee1fa9
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shenyu.common.concurrent;

import java.util.concurrent.ExecutorService;

/**
* EagerExecutorService.
*/
public interface EagerExecutorService extends ExecutorService {

/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
int getPoolSize();

/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
int getActiveCount();

/**
* Returns the maximum allowed number of threads.
*
* @return the maximum allowed number of threads
*/
int getMaximumPoolSize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.shenyu.common.concurrent;

import java.lang.instrument.Instrumentation;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/**
* MemoryLimitedTaskQueue in the {@link org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor}.
Expand All @@ -28,11 +26,11 @@
* That can make the executor create new worker
* when the task num is bigger than corePoolSize but less than maximumPoolSize.
*/
public class MemoryLimitedTaskQueue<R extends Runnable> extends MemoryLimitedLinkedBlockingQueue<Runnable> {
public class MemoryLimitedTaskQueue<R extends Runnable> extends MemoryLimitedLinkedBlockingQueue<Runnable> implements TaskQueue<Runnable> {

private static final long serialVersionUID = -2635853580887179627L;

private ShenyuThreadPoolExecutor executor;
private EagerExecutorService executor;

public MemoryLimitedTaskQueue(final Instrumentation inst) {
super(inst);
Expand All @@ -42,50 +40,18 @@ public MemoryLimitedTaskQueue(final long memoryLimit, final Instrumentation inst
super(memoryLimit, inst);
}

/**
* set the executor.
*
* @param executor executor
*/
public void setExecutor(final ShenyuThreadPoolExecutor executor) {
this.executor = executor;
@Override
public EagerExecutorService getExecutor() {
return executor;
}

@Override
public boolean offer(final Runnable runnable) {
if (executor == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}

int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
if (executor.getActiveCount() < currentPoolThreadSize) {
return super.offer(runnable);
}

// return false to let executor create new worker.
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}

// currentPoolThreadSize >= max
return super.offer(runnable);
public void setExecutor(final EagerExecutorService executor) {
this.executor = executor;
}

/**
* retry offer task.
*
* @param o task
* @param timeout timeout
* @param unit timeout unit
* @return offer success or not
* @throws java.util.concurrent.RejectedExecutionException if executor is terminated.
* @throws java.lang.InterruptedException if the current thread is interrupted.
*/
public boolean retryOffer(final Runnable o, final long timeout, final TimeUnit unit) throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
@Override
public boolean doOffer(Runnable runnable) {
return super.offer(runnable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shenyu.common.concurrent;

import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue},
* does not depend on {@link java.lang.instrument.Instrumentation} and is easier to use than
* {@link org.apache.shenyu.common.concurrent.MemoryLimitedLinkedBlockingQueue}.
*/
public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

private static final long serialVersionUID = 8032578371749960142L;

private int maxFreeMemory;

public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
super(Integer.MAX_VALUE);
this.maxFreeMemory = maxFreeMemory;
}

public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
int maxFreeMemory) {
super(c);
this.maxFreeMemory = maxFreeMemory;
}

/**
* set the max free memory.
*
* @param maxFreeMemory the max free memory
*/
public void setMaxFreeMemory(final int maxFreeMemory) {
this.maxFreeMemory = maxFreeMemory;
}

/**
* get the max free memory.
*
* @return the max free memory limit
*/
public int getMaxFreeMemory() {
return maxFreeMemory;
}

/**
* determine if there is any remaining free memory.
*
* @return true if has free memory
*/
public boolean hasRemainedMemory() {
return MemoryLimitCalculator.maxAvailable() < maxFreeMemory;
}

@Override
public void put(final E e) throws InterruptedException {
if (hasRemainedMemory()) {
super.put(e);
}
}

@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
return hasRemainedMemory() && super.offer(e, timeout, unit);
}

@Override
public boolean offer(final E e) {
return hasRemainedMemory() && super.offer(e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shenyu.common.concurrent;

import java.util.Collection;

/**
* MemorySafeTaskQueue in the {@link org.apache.shenyu.common.concurrent.ShenyuThreadPoolExecutor}.
* It offer a task if the executor's submittedTaskCount less than currentPoolThreadSize
* or the currentPoolThreadSize more than executor's maximumPoolSize.
* That can make the executor create new worker
* when the task num is bigger than corePoolSize but less than maximumPoolSize.
*/
public class MemorySafeTaskQueue<R extends Runnable> extends MemorySafeLinkedBlockingQueue<Runnable> implements TaskQueue<Runnable> {

private static final long serialVersionUID = -1998413481091670338L;

private EagerExecutorService executor;

public MemorySafeTaskQueue(int maxFreeMemory) {
super(maxFreeMemory);
}

public MemorySafeTaskQueue(Collection<? extends Runnable> c, int maxFreeMemory) {
super(c, maxFreeMemory);
}

@Override
public EagerExecutorService getExecutor() {
return this.executor;
}

@Override
public void setExecutor(final EagerExecutorService executor) {
this.executor = executor;
}

@Override
public boolean doOffer(Runnable runnable) {
return super.offer(runnable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@
/**
* ShenyuThreadPoolExecutor.
*/
public class ShenyuThreadPoolExecutor extends ThreadPoolExecutor {
public class ShenyuThreadPoolExecutor extends ThreadPoolExecutor implements EagerExecutorService {

public ShenyuThreadPoolExecutor(final int corePoolSize,
final int maximumPoolSize,
final long keepAliveTime,
final TimeUnit unit,
final MemoryLimitedTaskQueue<Runnable> workQueue,
final TaskQueue<Runnable> workQueue,
final ThreadFactory threadFactory,
final RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
workQueue.setExecutor(this);
}

@Override
Expand All @@ -48,8 +49,7 @@ public void execute(final Runnable command) {
super.execute(command);
} catch (RejectedExecutionException e) {
// retry to offer the task into queue.
@SuppressWarnings("all")
final MemoryLimitedTaskQueue queue = (MemoryLimitedTaskQueue) super.getQueue();
final TaskQueue<Runnable> queue = (TaskQueue<Runnable>) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
throw new RejectedExecutionException("Queue capacity is full.", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shenyu.common.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/**
* TaskQueue.
*/
public interface TaskQueue<E> extends BlockingQueue<E> {

/**
* get executor.
*
* @return the executor
*/
EagerExecutorService getExecutor();

/**
* set the executor.
*
* @param executor executor
*/
void setExecutor(final EagerExecutorService executor);

@Override
default boolean offer(final E e) {
if (getExecutor() == null) {
throw new RejectedExecutionException("The task queue does not have executor!");
}

int currentPoolThreadSize = getExecutor().getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
if (getExecutor().getActiveCount() < currentPoolThreadSize) {
return doOffer(e);
}

// return false to let executor create new worker.
if (currentPoolThreadSize < getExecutor().getMaximumPoolSize()) {
return false;
}

// currentPoolThreadSize >= max
return doOffer(e);
}

/**
* offer element to the queue.
*
* @param e the element to add
* @return {@code true} if the element was added to this queue, else {@code false}
*/
boolean doOffer(E e);

/**
* retry offer task.
*
* @param o task
* @param timeout timeout
* @param unit timeout unit
* @return offer success or not
* @throws java.util.concurrent.RejectedExecutionException if executor is terminated.
* @throws java.lang.InterruptedException if the current thread is interrupted.
*/
default boolean retryOffer(final E o, final long timeout, final TimeUnit unit) throws InterruptedException {
if (getExecutor().isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
return offer(o, timeout, unit);
}
}

0 comments on commit 6ee1fa9

Please sign in to comment.