title | date | categories | tags | description | ||||
---|---|---|---|---|---|---|---|---|
17.8.Netty的异步模型 |
2020-03-10 |
|
|
|
在Netty 中, 所有的IO操作都是异步的
一个方法返调用返回的时候,其实IO操作还没有执行完成,操作是成功还是失败此时是还不知道的。
那我们该如何判断这个IO操作到底是成功还是失败,并拿到结果
Netty 对Java 原生的 Future 进行了升级。
Java 提供的Future 接口如下:最顶层的, 关于未来的描述。
本身的有点不方便,我们无法知道异步任务完成的确切时间点, 只能 通过 get方法一直等待, 或者 通过轮询, 不断地调用 isDone方法去看计算是否完成。
在Netty 中,提供了一个更加方便的 Future
package java.util.concurrent;
/**
* A {@code Future} represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* {@code get} when the computation has completed, blocking if
* necessary until it is ready. Cancellation is performed by the
* {@code cancel} method. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a {@code Future} for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form {@code Future<?>} and
* return {@code null} as a result of the underlying task.
*
* <p>
* <b>Sample Usage</b> (Note that the following classes are all
* made-up.)
* <pre> {@code
* interface ArchiveSearcher { String search(String target); }
* class App {
* ExecutorService executor = ...
* ArchiveSearcher searcher = ...
* void showSearch(final String target)
* throws InterruptedException {
* Future<String> future
* = executor.submit(new Callable<String>() {
* public String call() {
* return searcher.search(target);
* }});
* displayOtherThings(); // do other things while searching
* try {
* displayText(future.get()); // use future
* } catch (ExecutionException ex) { cleanup(); return; }
* }
* }}</pre>
*
* The {@link FutureTask} class is an implementation of {@code Future} that
* implements {@code Runnable}, and so may be executed by an {@code Executor}.
* For example, the above construction with {@code submit} could be replaced by:
* <pre> {@code
* FutureTask<String> future =
* new FutureTask<String>(new Callable<String>() {
* public String call() {
* return searcher.search(target);
* }});
* executor.execute(future);}</pre>
*
* <p>Memory consistency effects: Actions taken by the asynchronous computation
* <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a>
* actions following the corresponding {@code Future.get()} in another thread.
*
* @see FutureTask
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface Future<V> {
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();
/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();
/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
一直等待, 等待计算完成, 然后拿到结果
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;
/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
最多等待指定的时间,如果在这期间,计算完成,就可以拿到结果
如果等待了指定时间,但是计算还没完成, 就会抛出 TimeoutException
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Netty 的 Future , 直接继承自Java中的Future, 代表一个异步操作的结果, 比Java 中提供了更多的方法
在Java 中的 isDone, 包含多种情况:① normal termination 正常结束, ② exception,③cancellation -- in all of these cases, this method will return true
在Netty 中, 提供了 更多方法, 更准确的判断操作执行的结果
比如 isSuccess 只在IO操作完成成功的情况下返回true
另外, 提供了 addListner。 观察者模式
/**
* The result of an asynchronous operation.
*/
@SuppressWarnings("ClassNameSameAsAncestorName")
public interface Future<V> extends java.util.concurrent.Future<V> {
/**
* Returns {@code true} if and only if the I/O operation was completed
* successfully.
*/
boolean isSuccess();
/**
* returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
*/
boolean isCancellable();
/**
* Returns the cause of the failed I/O operation if the I/O operation has
* failed.
*
* @return the cause of the failure.
* {@code null} if succeeded or this future is not
* completed yet.
*/
Throwable cause();
/**
* Adds the specified listener to this future. The
* specified listener is notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
把指定的listenr 添加到future 对象中。
当future 的 isDone 方法完成时, 就会通知listenr。
当这个future 已经完成了, 这个指定的listener 也会立刻收到通知。
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* Adds the specified listeners to this future. The
* specified listeners are notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listeners are notified immediately.
*/
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* Removes the first occurrence of the specified listener from this future.
* The specified listener is no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listener is not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
* Removes the first occurrence for each of the listeners from this future.
* The specified listeners are no longer notified when this
* future is {@linkplain #isDone() done}. If the specified
* listeners are not associated with this future, this method
* does nothing and returns silently.
*/
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*/
Future<V> sync() throws InterruptedException;
/**
* Waits for this future until it is done, and rethrows the cause of the failure if this future
* failed.
*/
Future<V> syncUninterruptibly();
/**
* Waits for this future to be completed.
*
* @throws InterruptedException
* if the current thread was interrupted
*/
Future<V> await() throws InterruptedException;
/**
* Waits for this future to be completed without
* interruption. This method catches an {@link InterruptedException} and
* discards it silently.
*/
Future<V> awaitUninterruptibly();
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*
* @throws InterruptedException
* if the current thread was interrupted
*/
boolean await(long timeoutMillis) throws InterruptedException;
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* Waits for this future to be completed within the
* specified time limit without interruption. This method catches an
* {@link InterruptedException} and discards it silently.
*
* @return {@code true} if and only if the future was completed within
* the specified time limit
*/
boolean awaitUninterruptibly(long timeoutMillis);
/**
* Return the result without blocking. If the future is not done yet this will return {@code null}.
*
* As it is possible that a {@code null} value is used to mark the future as successful you also need to check
* if the future is really done with {@link #isDone()} and not rely on the returned {@code null} value.
*/
V getNow();
/**
* {@inheritDoc}
*
* If the cancellation was successful it will fail the future with a {@link CancellationException}.
*/
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
GenericFutureListener 接口
/**
* Listens to the result of a {@link Future}. The result of the asynchronous operation is notified once this listener
* is added by calling {@link Future#addListener(GenericFutureListener)}.
这个接口, 主要用于监听future 的 结果。
一旦通过调用{@link Future#addListener(GenericFutureListener)}添加了这个listener,那么便会通知异步操作的结果。
*/
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
/**
* Invoked when the operation associated with the {@link Future} has been completed.
当与future关联的操作完成的时候,这个方法会被调用
*
* @param future the source {@link Future} which called this callback
*/
void operationComplete(F future) throws Exception;
}
/**
* A tagging interface that all event listener interfaces must extend.
* @since JDK1.1
*/
public interface EventListener {
}
这个EventListener 无任何内容, 可以称之为 tagging 接口, 也可以成为 marker 接口, 主要是供swaing 或者 awt使用的
我们一般使用的是:ChannelFutureListener
/**
* Listens to the result of a {@link ChannelFuture}. The result of the
* asynchronous {@link Channel} I/O operation is notified once this listener
* is added by calling {@link ChannelFuture#addListener(GenericFutureListener)}.
*
监听ChannelFuture的结果。异步的Channel IO操作的结果会被通知到。
* <h3>Return the control to the caller quickly</h3>
*
* {@link #operationComplete(Future)} is directly called by an I/O
* thread. Therefore, performing a time consuming task or a blocking operation
* in the handler method can cause an unexpected pause during I/O. If you need
* to perform a blocking operation on I/O completion, try to execute the
* operation in a different thread using a thread pool.
operationComplete会直接被IO线程所调用。也就是与EventLoop关联的那个 线程所执行的。
因此, 如果在这个operationComplete 方法里执行一个耗时的操作或者一个阻塞的操作,就会导致IO线程的暂停。
如果你需要在IO计算的过程中执行一个阻塞的操作, 请尝试使用线程池在不同的线程中去执行。
也就是说, 在 在这个operationComplete 方法里 不要执行耗时操作, 如果在这个operationComplete 方法里想要执行耗时操作, 就另外开线程去执行。
*/
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
// 额外定义了一些成员变量
// 当操作完成, 就关闭 channel
/**
* A {@link ChannelFutureListener} that closes the {@link Channel} which is
* associated with the specified {@link ChannelFuture}.
*/
ChannelFutureListener CLOSE = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
// 当操作失败, 就关闭channel
/**
* A {@link ChannelFutureListener} that closes the {@link Channel} when the
* operation ended up with a failure or cancellation rather than a success.
*/
ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.channel().close();
}
}
};
// 当操作失败,就调用 pipeline的 fireExceptionCaught
/**
* A {@link ChannelFutureListener} that forwards the {@link Throwable} of the {@link ChannelFuture} into the
* {@link ChannelPipeline}. This mimics the old behavior of Netty 3.
*/
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
};
// Just a type alias
}
定义在接口中成成员变量, 其实就是常量。
一般我们用的Future 是 ChannelFuture , 是Future的子类
public interface ChannelFuture extends Future<Void> {
ChannelPromise 是ChannelFuture的子接口
/**
* Special {@link ChannelFuture} which is writable.
特殊的, 可以写入的 ChannelFuture
*/
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
/**
* Special {@link Future} which is writable.
特殊的 , 可以写的 Future
*/
public interface Promise<V> extends Future<V> {
/**
* Marks this future as a success and notifies all
* listeners.
将future标记为成功, 并且通知所有的 listeners
*
* If it is success or failed already it will throw an {@link IllegalStateException}.
如果这个future 已经成功或者已经失败了, 将会抛出异常 IllegalStateException
*/
Promise<V> setSuccess(V result);
/**
* Marks this future as a success and notifies all
* listeners.
*
* @return {@code true} if and only if successfully marked this future as
* a success. Otherwise {@code false} because this future is
* already marked as either a success or a failure.
*/
// 尝试着将 这个 future 标记为 成功。如果成功的把它标记成成功, 就返回true。否则返回false
// 因为这个future 已经被标记为成功或者失败了
// 也就是说, 这个future 只能被标记一次。后续再标记都会失败
boolean trySuccess(V result);
/**
* Marks this future as a failure and notifies all
* listeners.
将future标记为失败, 并且通知所有的 listeners
*
* If it is success or failed already it will throw an {@link IllegalStateException}.
*/
Promise<V> setFailure(Throwable cause);
/**
* Marks this future as a failure and notifies all
* listeners.
*
* @return {@code true} if and only if successfully marked this future as
* a failure. Otherwise {@code false} because this future is
* already marked as either a success or a failure.
*/
boolean tryFailure(Throwable cause);
/**
* Make this future impossible to cancel.
*
* @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
* without being cancelled. {@code false} if this future has been cancelled already.
*/
boolean setUncancellable();
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
sync 方法, 底层是不是就是通过 Future的get 来实现的??
Future 是怎么知道他的异步操作完成了??涉及promise,继承自Future , 并添加了可以写入的方法。只能写一次!!
Future 状态就是由channelPromise 往里面写入的。
当Future操作真正做完之后, channelpromise会去调用setSuccess方法, 标记操作已经成功,并且把结果传回去,之后,future就宣告自己的异步操作完成,并通知所有的Listener
看一下王胖胖的 Netty 的 Future 和 Promise