From 22ebf5249a0fb61ad47ca5a65e8381de11023ef3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?= Date: Wed, 23 Aug 2017 09:31:20 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=90=AF=E5=8A=A8=E4=B8=8E?= =?UTF-8?q?=E5=81=9C=E6=AD=A2=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/mpush/api/service/BaseService.java | 45 +++++++++++++++++-- .../com/mpush/api/service/FutureListener.java | 7 ++- .../com/mpush/test/spi/FileCacheManger.java | 2 + .../main/java/com/mpush/test/spi/FileSrd.java | 22 ++++++++- .../src/main/java/com/mpush/zk/ZKClient.java | 9 ++++ 5 files changed, 79 insertions(+), 6 deletions(-) diff --git a/mpush-api/src/main/java/com/mpush/api/service/BaseService.java b/mpush-api/src/main/java/com/mpush/api/service/BaseService.java index 8337574f..9e8ffcdb 100644 --- a/mpush-api/src/main/java/com/mpush/api/service/BaseService.java +++ b/mpush-api/src/main/java/com/mpush/api/service/BaseService.java @@ -46,13 +46,17 @@ protected void tryStart(Listener l, FunctionEx function) { try { init(); function.apply(listener); - listener.monitor(this); + listener.monitor(this);//主要用于异步,否则应该放置在function.apply(listener)之前 } catch (Throwable e) { listener.onFailure(e); throw new ServiceException(e); } } else { - listener.onFailure(new ServiceException("service already started.")); + if (throwIfStarted()) { + listener.onFailure(new ServiceException("service already started.")); + } else { + listener.onSuccess(); + } } } @@ -61,13 +65,17 @@ protected void tryStop(Listener l, FunctionEx function) { if (started.compareAndSet(true, false)) { try { function.apply(listener); - listener.monitor(this); + listener.monitor(this);//主要用于异步,否则应该放置在function.apply(listener)之前 } catch (Throwable e) { listener.onFailure(e); throw new ServiceException(e); } } else { - listener.onFailure(new ServiceException("service already stopped.")); + if (throwIfStopped()) { + listener.onFailure(new ServiceException("service already stopped.")); + } else { + listener.onSuccess(); + } } } @@ -111,6 +119,35 @@ protected void doStop(Listener listener) throws Throwable { listener.onSuccess(); } + /** + * 控制当服务已经启动后,重复调用start方法,是否抛出服务已经启动异常 + * 默认是true + * + * @return true:抛出异常 + */ + protected boolean throwIfStarted() { + return true; + } + + /** + * 控制当服务已经停止后,重复调用stop方法,是否抛出服务已经停止异常 + * 默认是true + * + * @return true:抛出异常 + */ + protected boolean throwIfStopped() { + return true; + } + + /** + * 服务启动停止,超时时间, 默认是10s + * + * @return 超时时间 + */ + protected int timeoutMillis() { + return 1000 * 10; + } + protected interface FunctionEx { void apply(Listener l) throws Throwable; } diff --git a/mpush-api/src/main/java/com/mpush/api/service/FutureListener.java b/mpush-api/src/main/java/com/mpush/api/service/FutureListener.java index b14b8310..4b07d32c 100644 --- a/mpush-api/src/main/java/com/mpush/api/service/FutureListener.java +++ b/mpush-api/src/main/java/com/mpush/api/service/FutureListener.java @@ -35,11 +35,16 @@ public void onFailure(Throwable cause) { : new ServiceException(cause); } + /** + * 防止服务长时间卡在某个地方,增加超时监控 + * + * @param service 服务 + */ public void monitor(BaseService service) { if (isDone()) return;// 防止Listener被重复执行 runAsync(() -> { try { - this.get(10, TimeUnit.SECONDS); + this.get(service.timeoutMillis(), TimeUnit.MILLISECONDS); } catch (Exception e) { this.onFailure(new ServiceException(String.format("service %s monitor timeout", service.getClass().getSimpleName()))); } diff --git a/mpush-test/src/main/java/com/mpush/test/spi/FileCacheManger.java b/mpush-test/src/main/java/com/mpush/test/spi/FileCacheManger.java index f6d8d791..9563b869 100644 --- a/mpush-test/src/main/java/com/mpush/test/spi/FileCacheManger.java +++ b/mpush-test/src/main/java/com/mpush/test/spi/FileCacheManger.java @@ -22,6 +22,7 @@ import com.mpush.api.Constants; import com.mpush.api.spi.common.CacheManager; import com.mpush.tools.Jsons; +import com.mpush.tools.log.Logs; import java.nio.file.Files; import java.nio.file.Path; @@ -47,6 +48,7 @@ public final class FileCacheManger implements CacheManager { @Override public void init() { + Logs.Console.warn("你正在使用的CacheManager只能用于源码测试,生产环境请使用redis 3.x."); try { Path dir = Paths.get(this.getClass().getResource("/").toURI()); this.cacheFile = Paths.get(dir.toString(), "cache.dat"); diff --git a/mpush-test/src/main/java/com/mpush/test/spi/FileSrd.java b/mpush-test/src/main/java/com/mpush/test/spi/FileSrd.java index 9bc5216e..0ee43903 100644 --- a/mpush-test/src/main/java/com/mpush/test/spi/FileSrd.java +++ b/mpush-test/src/main/java/com/mpush/test/spi/FileSrd.java @@ -21,8 +21,10 @@ import com.google.common.collect.Lists; import com.mpush.api.service.BaseService; +import com.mpush.api.service.Listener; import com.mpush.api.srd.*; import com.mpush.tools.Jsons; +import com.mpush.tools.log.Logs; import java.util.List; @@ -36,8 +38,26 @@ public final class FileSrd extends BaseService implements ServiceRegistry, Servi public static final FileSrd I = new FileSrd(); @Override - public void init() { + public void start(Listener listener) { + if (isRunning()) { + listener.onSuccess(); + } else { + super.start(listener); + } + } + @Override + public void stop(Listener listener) { + if (isRunning()) { + super.stop(listener); + } else { + listener.onSuccess(); + } + } + + @Override + public void init() { + Logs.Console.warn("你正在使用的ServiceRegistry和ServiceDiscovery只能用于源码测试,生产环境请使用zookeeper."); } @Override diff --git a/mpush-zk/src/main/java/com/mpush/zk/ZKClient.java b/mpush-zk/src/main/java/com/mpush/zk/ZKClient.java index 1b95148e..f947ce55 100644 --- a/mpush-zk/src/main/java/com/mpush/zk/ZKClient.java +++ b/mpush-zk/src/main/java/com/mpush/zk/ZKClient.java @@ -63,6 +63,15 @@ public void start(Listener listener) { } } + @Override + public void stop(Listener listener) { + if (isRunning()) { + super.stop(listener); + } else { + listener.onSuccess(); + } + } + @Override protected void doStart(Listener listener) throws Throwable { client.start();