前提 最近一直在看Netty
相关的内容,也在编写一个轻量级的RPC
框架来练手,途中发现了Netty
的源码有很多亮点,某些实现甚至可以用苛刻 来形容。另外,Netty
提供的工具类也是相当优秀,可以开箱即用。这里分析一下个人比较喜欢的领域,并发方面的一个Netty
工具模块 - Promise
。
环境版本:
Netty:4.1.44.Final
JDK1.8
Promise简介
Promise,中文翻译为承诺或者许诺,含义是人与人之间,一个人对另一个人所说的具有一定憧憬的话,一般是可以实现的。
io.netty.util.concurrent.Promise
在注释中只有一句话:特殊的可写的 io.netty.util.concurrent.Future
(Promise
接口是io.netty.util.concurrent.Future
的子接口)。而io.netty.util.concurrent.Future
是java.util.concurrent.Future
的扩展,表示一个异步操作的结果 。我们知道,JDK
并发包中的Future
是不可写,也没有提供可监听的入口(没有应用观察者模式),而Promise
很好地弥补了这两个问题。另一方面从继承关系来看,DefaultPromise
是这些接口的最终实现类,所以分析源码的时候需要把重心放在DefaultPromise
类。一般一个模块提供的功能都由接口定义,这里分析一下两个接口的功能列表:
io.netty.util.concurrent.Promise
io.netty.util.concurrent.Future
先看io.netty.util.concurrent.Future
接口:
public interface Future <V > extends java .util .concurrent .Future <V > { boolean isSuccess () ; boolean isCancellable () ; Throwable cause () ; Future<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) ; Future<V> addListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ; Future<V> removeListener (GenericFutureListener<? extends Future<? super V>> listener) ; Future<V> removeListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ; Future<V> sync () throws InterruptedException ; Future<V> syncUninterruptibly () ; Future<V> await () throws InterruptedException ; Future<V> awaitUninterruptibly () ; boolean await (long timeout, TimeUnit unit) throws InterruptedException ; boolean await (long timeoutMillis) throws InterruptedException ; boolean awaitUninterruptibly (long timeout, TimeUnit unit) ; boolean awaitUninterruptibly (long timeoutMillis) ; V getNow () ; @Override boolean cancel (boolean mayInterruptIfRunning) ; }
sync()
和await()
方法类似,只是sync()
会检查异常执行的情况,一旦发现执行异常马上把异常实例包装抛出,而await()
方法对异常无感知。
接着看io.netty.util.concurrent.Promise
接口:
public interface Promise <V > extends Future <V > { Promise<V> setSuccess (V result) ; boolean trySuccess (V result) ; Promise<V> setFailure (Throwable cause) ; boolean tryFailure (Throwable cause) ; boolean setUncancellable () ; @Override Promise<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) ; @Override Promise<V> addListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ; @Override Promise<V> removeListener (GenericFutureListener<? extends Future<? super V>> listener) ; @Override Promise<V> removeListeners (GenericFutureListener<? extends Future<? super V>>... listeners) ; @Override Promise<V> await () throws InterruptedException ; @Override Promise<V> awaitUninterruptibly () ; @Override Promise<V> sync () throws InterruptedException ; @Override Promise<V> syncUninterruptibly () ; }
到此,Promise
接口的所有功能都分析完毕,接下来从源码角度详细分析Promise
的实现。
Promise源码实现 Promise
的实现类为io.netty.util.concurrent.DefaultPromise
(其实DefaultPromise
还有很多子类,某些实现是为了定制特定的场景做了扩展),而DefaultPromise
继承自io.netty.util.concurrent.AbstractFuture
:
public abstract class AbstractFuture <V > implements Future <V > { @Override public V get () throws InterruptedException, ExecutionException { await(); Throwable cause = cause(); if (cause == null ) { return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } @Override public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) { Throwable cause = cause(); if (cause == null ) { return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } throw new TimeoutException(); } }
AbstractFuture
仅仅对get()
和get(long timeout, TimeUnit unit)
两个方法进行了实现,其实这两处的实现和java.util.concurrent.FutureTask
中的实现方式十分相似。
DefaultPromise
的源码比较多,这里分开多个部分去阅读,先看它的属性和构造函数:
public class DefaultPromise <V > extends AbstractFuture <V > implements Promise <V > { private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class); private static final InternalLogger rejectedExecutionLogger = InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution" ); private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8 , SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth" , 8 )); @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result" ); private static final Object SUCCESS = new Object(); private static final Object UNCANCELLABLE = new Object(); private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace( new CancellationException(), DefaultPromise.class, "cancel(...)" )); private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace(); private volatile Object result; private final EventExecutor executor; private Object listeners; private short waiters; private boolean notifyingListeners; public DefaultPromise (EventExecutor executor) { this .executor = checkNotNull(executor, "executor" ); } protected DefaultPromise () { executor = null ; } private static final class CauseHolder { final Throwable cause; CauseHolder(Throwable cause) { this .cause = cause; } } private static final class LeanCancellationException extends CancellationException { private static final long serialVersionUID = 2794674970981187807L ; @Override public Throwable fillInStackTrace () { setStackTrace(CANCELLATION_STACK); return this ; } @Override public String toString () { return CancellationException.class.getName(); } } }
Promise
目前支持两种类型的监听器:
GenericFutureListener
:支持泛型的Future
监听器。
GenericProgressiveFutureListener
:它是GenericFutureListener
的子类,支持进度表示和支持泛型的Future
监听器(有些场景需要多个步骤实现,类似于进度条那样)。
public interface GenericFutureListener <F extends Future <?>> extends EventListener { void operationComplete (F future) throws Exception ; } public interface GenericProgressiveFutureListener <F extends ProgressiveFuture <?>> extends GenericFutureListener <F > { void operationProgressed (F future, long progress, long total) throws Exception ; }
为了让Promise
支持多个监听器,Netty
添加了一个默认修饰符修饰的DefaultFutureListeners
类用于保存监听器实例数组:
final class DefaultFutureListeners { private GenericFutureListener<? extends Future<?>>[] listeners; private int size; private int progressiveSize; @SuppressWarnings("unchecked") DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) { listeners = new GenericFutureListener[2 ]; listeners[0 ] = first; listeners[1 ] = second; size = 2 ; if (first instanceof GenericProgressiveFutureListener) { progressiveSize ++; } if (second instanceof GenericProgressiveFutureListener) { progressiveSize ++; } } public void add (GenericFutureListener<? extends Future<?>> l) { GenericFutureListener<? extends Future<?>>[] listeners = this .listeners; final int size = this .size; if (size == listeners.length) { this .listeners = listeners = Arrays.copyOf(listeners, size << 1 ); } listeners[size] = l; this .size = size + 1 ; if (l instanceof GenericProgressiveFutureListener) { progressiveSize ++; } } public void remove (GenericFutureListener<? extends Future<?>> l) { final GenericFutureListener<? extends Future<?>>[] listeners = this .listeners; int size = this .size; for (int i = 0 ; i < size; i ++) { if (listeners[i] == l) { int listenersToMove = size - i - 1 ; if (listenersToMove > 0 ) { System.arraycopy(listeners, i + 1 , listeners, i, listenersToMove); } listeners[-- size] = null ; this .size = size; if (l instanceof GenericProgressiveFutureListener) { progressiveSize --; } return ; } } } public GenericFutureListener<? extends Future<?>>[] listeners() { return listeners; } public int size () { return size; } public int progressiveSize () { return progressiveSize; } }
接下来看DefaultPromise
的剩余方法实现,笔者觉得DefaultPromise
方法实现在代码顺序上是有一定的艺术的。先看几个判断Promise
执行状态的方法:
public class DefaultPromise <V > extends AbstractFuture <V > implements Promise <V > { @Override public boolean setUncancellable () { if (RESULT_UPDATER.compareAndSet(this , null , UNCANCELLABLE)) { return true ; } Object result = this .result; return !isDone0(result) || !isCancelled0(result); } @Override public boolean isSuccess () { Object result = this .result; return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder); } @Override public boolean isCancellable () { return result == null ; } @Override public Throwable cause () { return cause0(result); } private Throwable cause0 (Object result) { if (!(result instanceof CauseHolder)) { return null ; } if (result == CANCELLATION_CAUSE_HOLDER) { CancellationException ce = new LeanCancellationException(); if (RESULT_UPDATER.compareAndSet(this , CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) { return ce; } result = this .result; } return ((CauseHolder) result).cause; } private static boolean isCancelled0 (Object result) { return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException; } private static boolean isDone0 (Object result) { return result != null && result != UNCANCELLABLE; } @Override public boolean isCancelled () { return isCancelled0(result); } @Override public boolean isDone () { return isDone0(result); } }
接着看监听器的添加和移除方法(这其中也包含了通知监听器的逻辑):
public class DefaultPromise <V > extends AbstractFuture <V > implements Promise <V > { @Override public Promise<V> addListener (GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener" ); synchronized (this ) { addListener0(listener); } if (isDone()) { notifyListeners(); } return this ; } @Override public Promise<V> addListeners (GenericFutureListener<? extends Future<? super V>>... listeners) { checkNotNull(listeners, "listeners" ); synchronized (this ) { for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { if (listener == null ) { break ; } addListener0(listener); } } if (isDone()) { notifyListeners(); } return this ; } @Override public Promise<V> removeListener (final GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener" ); synchronized (this ) { removeListener0(listener); } return this ; } @Override public Promise<V> removeListeners (final GenericFutureListener<? extends Future<? super V>>... listeners) { checkNotNull(listeners, "listeners" ); synchronized (this ) { for (GenericFutureListener<? extends Future<? super V>> listener : listeners) { if (listener == null ) { break ; } removeListener0(listener); } } return this ; } private void addListener0 (GenericFutureListener<? extends Future<? super V>> listener) { if (listeners == null ) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); } } private void removeListener0 (GenericFutureListener<? extends Future<? super V>> listener) { if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).remove(listener); } else if (listeners == listener) { listeners = null ; } } private void notifyListeners () { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1 ); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return ; } } safeExecute(executor, new Runnable() { @Override public void run () { notifyListenersNow(); } }); } private static void safeExecute (EventExecutor executor, Runnable task) { try { executor.execute(task); } catch (Throwable t) { rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?" , t); } } private void notifyListenersNow () { Object listeners; synchronized (this ) { if (notifyingListeners || this .listeners == null ) { return ; } notifyingListeners = true ; listeners = this .listeners; this .listeners = null ; } for (;;) { if (listeners instanceof DefaultFutureListeners) { notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this , (GenericFutureListener<?>) listeners); } synchronized (this ) { if (this .listeners == null ) { notifyingListeners = false ; return ; } listeners = this .listeners; this .listeners = null ; } } } private void notifyListeners0 (DefaultFutureListeners listeners) { GenericFutureListener<?>[] a = listeners.listeners(); int size = listeners.size(); for (int i = 0 ; i < size; i ++) { notifyListener0(this , a[i]); } } @SuppressWarnings({ "unchecked", "rawtypes" }) private static void notifyListener0 (Future future, GenericFutureListener l) { try { l.operationComplete(future); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()" , t); } } } }
然后看wait()
和sync()
方法体系:
public class DefaultPromise <V > extends AbstractFuture <V > implements Promise <V > { @Override public Promise<V> await () throws InterruptedException { if (isDone()) { return this ; } if (Thread.interrupted()) { throw new InterruptedException(toString()); } checkDeadLock(); synchronized (this ) { while (!isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this ; } @Override public Promise<V> awaitUninterruptibly () { if (isDone()) { return this ; } checkDeadLock(); boolean interrupted = false ; synchronized (this ) { while (!isDone()) { incWaiters(); try { wait(); } catch (InterruptedException e) { interrupted = true ; } finally { decWaiters(); } } } if (interrupted) { Thread.currentThread().interrupt(); } return this ; } @Override public boolean await (long timeout, TimeUnit unit) throws InterruptedException { return await0(unit.toNanos(timeout), true ); } @Override public boolean await (long timeoutMillis) throws InterruptedException { return await0(MILLISECONDS.toNanos(timeoutMillis), true ); } @Override public boolean awaitUninterruptibly (long timeout, TimeUnit unit) { try { return await0(unit.toNanos(timeout), false ); } catch (InterruptedException e) { throw new InternalError(); } } @Override public boolean awaitUninterruptibly (long timeoutMillis) { try { return await0(MILLISECONDS.toNanos(timeoutMillis), false ); } catch (InterruptedException e) { throw new InternalError(); } } protected void checkDeadLock () { EventExecutor e = executor(); if (e != null && e.inEventLoop()) { throw new BlockingOperationException(toString()); } } @Override public Promise<V> sync () throws InterruptedException { await(); rethrowIfFailed(); return this ; } @Override public Promise<V> syncUninterruptibly () { awaitUninterruptibly(); rethrowIfFailed(); return this ; } private void incWaiters () { if (waiters == Short.MAX_VALUE) { throw new IllegalStateException("too many waiters: " + this ); } ++waiters; } private void decWaiters () { --waiters; } private void rethrowIfFailed () { Throwable cause = cause(); if (cause == null ) { return ; } PlatformDependent.throwException(cause); } private boolean await0 (long timeoutNanos, boolean interruptable) throws InterruptedException { if (isDone()) { return true ; } if (timeoutNanos <= 0 ) { return isDone(); } if (interruptable && Thread.interrupted()) { throw new InterruptedException(toString()); } checkDeadLock(); long startTime = System.nanoTime(); long waitTime = timeoutNanos; boolean interrupted = false ; try { for (;;) { synchronized (this ) { if (isDone()) { return true ; } incWaiters(); try { wait(waitTime / 1000000 , (int ) (waitTime % 1000000 )); } catch (InterruptedException e) { if (interruptable) { throw e; } else { interrupted = true ; } } finally { decWaiters(); } } if (isDone()) { return true ; } else { waitTime = timeoutNanos - (System.nanoTime() - startTime); if (waitTime <= 0 ) { return isDone(); } } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } }
最后是几个设置结果和获取结果的方法:
public class DefaultPromise <V > extends AbstractFuture <V > implements Promise <V > { @Override public Promise<V> setSuccess (V result) { if (setSuccess0(result)) { return this ; } throw new IllegalStateException("complete already: " + this ); } @Override public boolean trySuccess (V result) { return setSuccess0(result); } @Override public Promise<V> setFailure (Throwable cause) { if (setFailure0(cause)) { return this ; } throw new IllegalStateException("complete already: " + this , cause); } @Override public boolean tryFailure (Throwable cause) { return setFailure0(cause); } @SuppressWarnings("unchecked") @Override public V getNow () { Object result = this .result; if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) { return null ; } return (V) result; } @SuppressWarnings("unchecked") @Override public V get () throws InterruptedException, ExecutionException { Object result = this .result; if (!isDone0(result)) { await(); result = this .result; } if (result == SUCCESS || result == UNCANCELLABLE) { return null ; } Throwable cause = cause0(result); if (cause == null ) { return (V) result; } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } @SuppressWarnings("unchecked") @Override public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Object result = this .result; if (!isDone0(result)) { if (!await(timeout, unit)) { throw new TimeoutException(); } result = this .result; } if (result == SUCCESS || result == UNCANCELLABLE) { return null ; } Throwable cause = cause0(result); if (cause == null ) { return (V) result; } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } @Override public boolean cancel (boolean mayInterruptIfRunning) { if (RESULT_UPDATER.compareAndSet(this , null , CANCELLATION_CAUSE_HOLDER)) { if (checkNotifyWaiters()) { notifyListeners(); } return true ; } return false ; } private boolean setSuccess0 (V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setFailure0 (Throwable cause) { return setValue0(new CauseHolder(checkNotNull(cause, "cause" ))); } private boolean setValue0 (Object objResult) { if (RESULT_UPDATER.compareAndSet(this , null , objResult) || RESULT_UPDATER.compareAndSet(this , UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { notifyListeners(); } return true ; } return false ; } private synchronized boolean checkNotifyWaiters () { if (waiters > 0 ) { notifyAll(); } return listeners != null ; } }
Promise的基本使用 要使用Netty
的Promise
模块,并不需要引入Netty
的所有依赖,这里只需要引入netty-common
:
<dependency > <groupId > io.netty</groupId > <artifactId > netty-common</artifactId > <version > 4.1.44.Final</version > </dependency >
EventExecutor
选取方面,Netty
已经准备了一个GlobalEventExecutor
用于全局事件处理,这里可以直接选用(当然也可以自行实现EventExecutor
或者用EventExecutor
的其他实现类):
EventExecutor executor = GlobalEventExecutor.INSTANCE; Promise<String> promise = new DefaultPromise<>(executor);
这里设计一个场景:异步下载一个链接的资源到磁盘上,下载完成之后需要异步通知下载完的磁盘文件路径,得到通知之后打印下载结果到控制台中。
public class PromiseMain { public static void main (String[] args) throws Exception { String url = "http://xxx.yyy.zzz" ; EventExecutor executor = GlobalEventExecutor.INSTANCE; Promise<DownloadResult> promise = new DefaultPromise<>(executor); promise.addListener(new DownloadResultListener()); Thread thread = new Thread(() -> { try { System.out.println("开始下载资源,url:" + url); long start = System.currentTimeMillis(); Thread.sleep(2000 ); String location = "C:\\xxx\\yyy\\z.md" ; long cost = System.currentTimeMillis() - start; System.out.println(String.format("下载资源成功,url:%s,保存到:%s,耗时:%d ms" , url, location, cost)); DownloadResult result = new DownloadResult(); result.setUrl(url); result.setFileDiskLocation(location); result.setCost(cost); promise.setSuccess(result); } catch (Exception ignore) { } }, "Download-Thread" ); thread.start(); Thread.sleep(Long.MAX_VALUE); } @Data private static class DownloadResult { private String url; private String fileDiskLocation; private long cost; } private static class DownloadResultListener implements GenericFutureListener <Future <DownloadResult >> { @Override public void operationComplete (Future<DownloadResult> future) throws Exception { if (future.isSuccess()) { DownloadResult downloadResult = future.getNow(); System.out.println(String.format("下载完成通知,url:%s,文件磁盘路径:%s,耗时:%d ms" , downloadResult.getUrl(), downloadResult.getFileDiskLocation(), downloadResult.getCost())); } } } }
执行后控制台输出:
开始下载资源,url:http://xxx.yyy.zzz 下载资源成功,url:http://xxx.yyy.zzz,保存到:C:\xxx\yyy\z.md,耗时:2000 ms 下载完成通知,url:http://xxx.yyy.zzz,文件磁盘路径:C:\xxx\yyy\z.md,耗时:2000 ms
Promise
适用的场景很多,除了异步通知的场景也能用于同步调用,它在设计上比JUC
的Future
灵活很多,基于Future
扩展出很多新的特性,有需要的可以单独引入此依赖直接使用。
Promise监听器栈深度的问题 有些时候,由于封装或者人为编码异常等原因,监听器的回调可能出现基于多个Promise
形成的链(参考Issue-5302 ,a promise listener chain
),这样子有可能出现递归调用深度过大而导致栈溢出,因此需要设置一个阈值,限制递归调用的最大栈深度,这个深度阈值暂且称为栈深度保护阈值,默认值是8,可以通过系统参数io.netty.defaultPromise.maxListenerStackDepth
覆盖设置。这里贴出前面提到过的代码块:
private void notifyListeners () { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1 ); try { notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return ; } } safeExecute(executor, new Runnable() { @Override public void run () { notifyListenersNow(); } }); }
如果我们想模拟一个例子触发监听器调用栈深度保护 ,那么只需要想办法在同一个EventLoop
类型的线程中递归调用notifyListeners()
方法即可。
最典型的例子就是在上一个Promise
监听器回调的方法里面触发下一个Promise
的监听器的setSuccess()
(简单理解就是套娃 ),画个图理解一下:
测试代码:
public class PromiseListenerMain { private static final AtomicInteger COUNTER = new AtomicInteger(0 ); public static void main (String[] args) throws Exception { EventExecutor executor = ImmediateEventExecutor.INSTANCE; Promise<String> root = new DefaultPromise<>(executor); Promise<String> p1 = new DefaultPromise<>(executor); Promise<String> p2 = new DefaultPromise<>(executor); Promise<String> p3 = new DefaultPromise<>(executor); Promise<String> p4 = new DefaultPromise<>(executor); Promise<String> p5 = new DefaultPromise<>(executor); Promise<String> p6 = new DefaultPromise<>(executor); Promise<String> p7 = new DefaultPromise<>(executor); Promise<String> p8 = new DefaultPromise<>(executor); Promise<String> p9 = new DefaultPromise<>(executor); Promise<String> p10 = new DefaultPromise<>(executor); p1.addListener(new Listener(p2)); p2.addListener(new Listener(p3)); p3.addListener(new Listener(p4)); p4.addListener(new Listener(p5)); p5.addListener(new Listener(p6)); p6.addListener(new Listener(p7)); p7.addListener(new Listener(p8)); p8.addListener(new Listener(p9)); p9.addListener(new Listener(p10)); root.addListener(new Listener(p1)); root.setSuccess("success" ); Thread.sleep(Long.MAX_VALUE); } private static class Listener implements GenericFutureListener <Future <String >> { private final String name; private final Promise<String> promise; public Listener (Promise<String> promise) { this .name = "listener-" + COUNTER.getAndIncrement(); this .promise = promise; } @Override public void operationComplete (Future<String> future) throws Exception { System.out.println(String.format("监听器[%s]回调成功..." , name)); if (null != promise) { promise.setSuccess("success" ); } } } }
因为有safeExecute()
兜底执行,上面的所有Promise
都会回调,这里可以采用IDEA
的高级断点功能,在步入断点的地方添加额外的日志,输出如下:
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-9 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-0 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-1 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-2 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-3 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-4 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-5 ]回调成功... MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行--- 监听器[listener-6 ]回调成功... safeExecute(notifyListenersNow)执行---------- 监听器[listener-7 ]回调成功... safeExecute(notifyListenersNow)执行---------- 监听器[listener-8 ]回调成功...
这里笔者有点疑惑,如果调用栈深度大于8,超出的部分会包装为Runnable
实例提交到事件执行器执行,岂不是把递归栈溢出的隐患变成了内存溢出的隐患(因为异步任务也有可能积压,除非拒绝任务提交,那么具体要看EventExecutor
的实现了)?
小结 Netty
提供的Promise
工具的源码和使用方式都分析完了,设计理念和代码都是十分值得借鉴,同时能够开箱即用,可以在日常编码中直接引入,减少重复造轮子的劳动和风险。
(本文完 e-a-20200123 c-3-d)