From c50d9ee8bff7adbc6a262691a7efb442a977bf8c Mon Sep 17 00:00:00 2001 From: jfarcand Date: Fri, 21 Nov 2014 11:26:06 -0500 Subject: [PATCH] More fixes for #1785 --- .../org/atmosphere/cpr/AtmosphereConfig.java | 2 +- .../atmosphere/cpr/AtmosphereFramework.java | 21 +- .../cpr/AtmosphereResourceSessionFactory.java | 10 +- ...faultAtmosphereResourceSessionFactory.java | 7 +- .../cpr/DefaultMetaBroadcaster.java | 395 ++++++++++++++++++ .../org/atmosphere/cpr/MetaBroadcaster.java | 383 +---------------- .../java/org/atmosphere/cpr/Universe.java | 14 +- .../inject/InjectableObjectFactory.java | 2 +- .../inject/MetaBroadcasterInjectable.java | 3 +- .../ManagedAtmosphereHandlerTest.java | 4 +- .../AtmosphereResourceStateRecoveryTest.java | 2 +- 11 files changed, 444 insertions(+), 399 deletions(-) create mode 100644 modules/cpr/src/main/java/org/atmosphere/cpr/DefaultMetaBroadcaster.java diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereConfig.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereConfig.java index fb3f1cc3839..39520ad3780 100755 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereConfig.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereConfig.java @@ -301,7 +301,7 @@ public AtmosphereResourceFactory resourcesFactory(){ } /** - * Return the {@link org.atmosphere.cpr.MetaBroadcaster} + * Return the {@link DefaultMetaBroadcaster} * @return the MetaBroadcaster */ public MetaBroadcaster metaBroadcaster(){ diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereFramework.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereFramework.java index a8f1bc5355c..982afcc4c4d 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereFramework.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereFramework.java @@ -873,6 +873,7 @@ public Enumeration getInitParameterNames() { configureAnnotationPackages(); configureBroadcasterFactory(); + configureMetaBroadcaster(); configureScanningPackage(scFacade, ApplicationConfig.ANNOTATION_PACKAGE); configureScanningPackage(scFacade, FrameworkConfig.JERSEY2_SCANNING_PACKAGE); configureScanningPackage(scFacade, FrameworkConfig.JERSEY_SCANNING_PACKAGE); @@ -885,7 +886,6 @@ public Enumeration getInitParameterNames() { // Reconfigure in case an annotation changed the default. configureBroadcasterFactory(); - configureMetaBroadcaster(); patchContainer(); configureBroadcaster(); loadConfiguration(scFacade); @@ -3011,14 +3011,17 @@ private AtmosphereFramework configureAtmosphereResourceFactory() { } public MetaBroadcaster metaBroadcaster() { - if (metaBroadcaster == null) { - metaBroadcaster = new MetaBroadcaster(config); - } return metaBroadcaster; } private AtmosphereFramework configureMetaBroadcaster() { - if (metaBroadcaster == null) { + try { + metaBroadcaster = newClassInstance(MetaBroadcaster.class, DefaultMetaBroadcaster.class); + metaBroadcaster.configure(config); + } catch (InstantiationException e) { + logger.error("", e); + } catch (IllegalAccessException e) { + logger.error("", e); } return this; } @@ -3082,7 +3085,13 @@ private void initDefaultSerializer() { */ public synchronized AtmosphereResourceSessionFactory sessionFactory() { if (sessionFactory == null) { - sessionFactory = new DefaultAtmosphereResourceSessionFactory(); + try { + sessionFactory = newClassInstance(AtmosphereResourceSessionFactory.class, DefaultAtmosphereResourceSessionFactory.class); + } catch (InstantiationException e) { + logger.error("", e); + } catch (IllegalAccessException e) { + logger.error("", e); + } } return sessionFactory; } diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceSessionFactory.java b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceSessionFactory.java index ddeb361ceeb..8a7ca75fd1e 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceSessionFactory.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/AtmosphereResourceSessionFactory.java @@ -20,7 +20,7 @@ * * @author uklance (https://github.com/uklance) */ -public abstract class AtmosphereResourceSessionFactory { +public interface AtmosphereResourceSessionFactory { /** * Returns the current session associated with the @@ -36,7 +36,7 @@ public abstract class AtmosphereResourceSessionFactory { * @return the session associated with this request or null if create is * false and the resource has no valid session */ - public abstract AtmosphereResourceSession getSession(AtmosphereResource resource, boolean create); + AtmosphereResourceSession getSession(AtmosphereResource resource, boolean create); /** * Returns the current session associated with the @@ -47,9 +47,7 @@ public abstract class AtmosphereResourceSessionFactory { * {@link AtmosphereResource}, or creates one if it does not yet * exist. */ - public AtmosphereResourceSession getSession(AtmosphereResource resource) { - return getSession(resource, true); - } + AtmosphereResourceSession getSession(AtmosphereResource resource); - public abstract void destroy(); + void destroy(); } diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAtmosphereResourceSessionFactory.java b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAtmosphereResourceSessionFactory.java index 5d70f20a140..f95a70649a8 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAtmosphereResourceSessionFactory.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultAtmosphereResourceSessionFactory.java @@ -21,7 +21,7 @@ /** * @author uklance (https://github.com/uklance) */ -public class DefaultAtmosphereResourceSessionFactory extends AtmosphereResourceSessionFactory { +public class DefaultAtmosphereResourceSessionFactory implements AtmosphereResourceSessionFactory { private final ConcurrentMap sessions = new ConcurrentHashMap(); private final AtmosphereResourceEventListener disconnectListener = new AtmosphereResourceEventListenerAdapter() { @@ -54,6 +54,11 @@ public AtmosphereResourceSession getSession(AtmosphereResource r, boolean create return session; } + @Override + public AtmosphereResourceSession getSession(AtmosphereResource resource) { + return getSession(resource, true); + } + @Override public void destroy() { sessions.clear(); diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultMetaBroadcaster.java b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultMetaBroadcaster.java new file mode 100644 index 00000000000..b004c496fd1 --- /dev/null +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/DefaultMetaBroadcaster.java @@ -0,0 +1,395 @@ +/* + * Copyright 2014 Jean-Francois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.cpr; + +import org.atmosphere.util.ExecutorsFactory; +import org.atmosphere.util.uri.UriTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Broadcast events to all or a subset of available {@link Broadcaster}s based on their {@link org.atmosphere.cpr.Broadcaster#getID()} value. + * This class allows broadcasting events to a set of broadcasters that maps to some String like: + *
+ *        // Broadcast the event to all Broadcaster ID starting with /hello
+ *        broadcast("/hello", event)
+ *        // Broadcast the event to all Broadcaster ID
+ *        broaccast("/*", event);
+ * 
+ * The rule used is similar to path/URI mapping used by technology like Servlet, Jersey, etc. + *

+ * NOTE: Broadcasters' name must start with / in order to get retrieved by this class. + *

+ * This class is NOT thread safe. + *

+ * If you want to use MetaBroadcaster with Jersey or any framework, make sure all {@link org.atmosphere.cpr.Broadcaster#getID()} + * starts with '/'. For example, with Jersey: + *

+ *
+ * @author Jeanfrancois Arcand
+ * @Path(RestConstants.STREAMING + "/workspace{wid:/[0-9A-Z]+}")
+ * public class JerseyPubSub {
+ * @PathParam("wid") private Broadcaster topic;
+ * 
+ */ +public class DefaultMetaBroadcaster implements MetaBroadcaster { + + public static final String MAPPING_REGEX = "[/a-zA-Z0-9-&.*=@_;\\?]+"; + + private final static Logger logger = LoggerFactory.getLogger(DefaultMetaBroadcaster.class); + private final static ConcurrentLinkedQueue broadcasterListeners = new ConcurrentLinkedQueue(); + private final static MetaBroadcasterFuture E = new MetaBroadcasterFuture(Collections.emptyList()); + private MetaBroadcasterCache cache = new NoCache(); + private AtmosphereConfig config; + + public DefaultMetaBroadcaster() { + } + + @Override + public void configure(AtmosphereConfig config) { + this.config = config; + } + + protected MetaBroadcasterFuture broadcast(final String path, Object message, int time, TimeUnit unit, boolean delay, boolean cacheMessage) { + if (config != null) { + Collection c = config.getBroadcasterFactory().lookupAll(); + + final Map m = new HashMap(); + List l = new ArrayList(); + logger.trace("Map {}", path); + UriTemplate t = null; + try { + t = new UriTemplate(path); + for (Broadcaster b : c) { + logger.trace("Trying to map {} to {}", t, b.getID()); + if (t.match(b.getID(), m)) { + l.add(b); + } + m.clear(); + } + } finally { + if (t != null) t.destroy(); + } + + if (l.isEmpty() && cacheMessage) { + if (NoCache.class.isAssignableFrom(cache.getClass())) { + logger.warn("No Broadcaster matches {}. Message {} WILL BE LOST. " + + "Make sure you cache it or make sure the Broadcaster exists before.", path, message); + } else { + cache.cache(path, message); + } + return E; + } + + MetaBroadcasterFuture f = new MetaBroadcasterFuture(l); + CompleteListener cl = new CompleteListener(f); + + for (Broadcaster b : l) { + if (time <= 0) { + f.outerFuture(b.addBroadcasterListener(cl).broadcast(message)); + } else if (!delay) { + f.outerFuture(b.scheduleFixedBroadcast(message, time, unit)); + } else { + f.outerFuture(b.delayBroadcast(message, time, unit)); + } + } + + return f; + } else { + return E; + } + } + + protected MetaBroadcasterFuture map(String path, Object message, int time, TimeUnit unit, boolean delay, boolean cacheMessage) { + + if (path == null || path.isEmpty()) { + throw new NullPointerException(); + } + + if (path.contains("*")) { + path = path.replace("*", MAPPING_REGEX); + } + + if (path.equals("/")) { + path += MAPPING_REGEX; + } + + return broadcast(path, message, time, unit, delay, cacheMessage); + } + + /** + * Broadcast the message to all Broadcasters whose {@link org.atmosphere.cpr.Broadcaster#getID()} matches the broadcasterID value. + * + * @param broadcasterID a String (or path) that can potentially match a {@link org.atmosphere.cpr.Broadcaster#getID()} + * @param message a message to be broadcasted + * @return a Future + */ + @Override + public Future> broadcastTo(String broadcasterID, Object message) { + return map(broadcasterID, message, -1, null, false, true); + } + + /** + * Flush the cached messages. + * @return this + */ + protected DefaultMetaBroadcaster flushCache() { + if (cache != null) cache.flushCache(); + return this; + } + + /** + * Broadcast the message at a fixed rate to all Broadcasters whose {@link org.atmosphere.cpr.Broadcaster#getID()} + * matches the broadcasterID value. This operation will invoke {@link Broadcaster#scheduleFixedBroadcast(Object, long, java.util.concurrent.TimeUnit)}} + * + * @param broadcasterID a String (or path) that can potentially match a {@link org.atmosphere.cpr.Broadcaster#getID()} + * @param message a message to be broadcasted + * @param time a time value + * @param unit a {@link TimeUnit} + * @return a Future + */ + @Override + public Future> scheduleTo(String broadcasterID, Object message, int time, TimeUnit unit) { + return map(broadcasterID, message, time, unit, false, true); + } + + /** + * Delay the message delivery to Broadcasters whose {@link org.atmosphere.cpr.Broadcaster#getID()} + * matches the broadcasterID value. This operation will invoke {@link Broadcaster#delayBroadcast(Object, long, java.util.concurrent.TimeUnit)} (Object, long, java.util.concurrent.TimeUnit)}} + * + * @param broadcasterID a String (or path) that can potentially match a {@link org.atmosphere.cpr.Broadcaster#getID()} + * @param message a message to be broadcasted + * @param time a time value + * @param unit a {@link TimeUnit} + * @return a Future + */ + @Override + public Future> delayTo(String broadcasterID, Object message, int time, TimeUnit unit) { + return map(broadcasterID, message, time, unit, true, true); + } + + private final static class CompleteListener extends BroadcasterListenerAdapter { + + private final MetaBroadcasterFuture f; + + private CompleteListener(MetaBroadcasterFuture f) { + this.f = f; + } + + @Override + public void onPostCreate(Broadcaster b) { + } + + @Override + public void onComplete(Broadcaster b) { + b.removeBroadcasterListener(this); + f.countDown(); + if (f.isDone()) { + for (BroadcasterListener l : broadcasterListeners) { + try { + l.onComplete(b); + } catch (Exception ex) { + logger.warn("", ex); + } + } + } + } + + @Override + public void onPreDestroy(Broadcaster b) { + } + } + + private final static class MetaBroadcasterFuture implements Future> { + + private final CountDownLatch latch; + private final List l; + private boolean isCancelled = false; + private final List> outerFuture = new ArrayList>(); + + private MetaBroadcasterFuture(List l) { + this.latch = new CountDownLatch(l.size()); + this.l = l; + } + + MetaBroadcasterFuture outerFuture(Future f) { + outerFuture.add(f); + return this; + } + + @Override + public boolean cancel(boolean b) { + for (Future f : outerFuture) { + f.cancel(b); + } + + while (latch.getCount() > 0) { + latch.countDown(); + } + isCancelled = true; + return isCancelled; + } + + @Override + public boolean isCancelled() { + return isCancelled; + } + + @Override + public boolean isDone() { + return latch.getCount() == 0; + } + + @Override + public List get() throws InterruptedException, ExecutionException { + latch.await(); + return l; + } + + @Override + public List get(long t, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + latch.await(t, timeUnit); + return l; + } + + public void countDown() { + latch.countDown(); + } + } + + /** + * Add a {@link BroadcasterListener} to all mapped {@link Broadcaster}s. + * + * @param b {@link BroadcasterListener} + * @return this + */ + @Override + public DefaultMetaBroadcaster addBroadcasterListener(BroadcasterListener b) { + broadcasterListeners.add(b); + return this; + } + + /** + * Remove the {@link BroadcasterListener}. + * + * @param b {@link BroadcasterListener} + * @return this + */ + @Override + public DefaultMetaBroadcaster removeBroadcasterListener(BroadcasterListener b) { + broadcasterListeners.remove(b); + return this; + } + + /** + * Set the {@link MetaBroadcasterCache}. Default is {@link NoCache}. + * @param cache + * @return + */ + @Override + public DefaultMetaBroadcaster cache(MetaBroadcasterCache cache) { + this.cache = cache; + return this; + } + + public void destroy(){ + broadcasterListeners.clear(); + flushCache(); + } + + /** + * Cache message if no {@link Broadcaster} maps the {@link #broadcastTo(String, Object)} + */ + public static interface MetaBroadcasterCache { + + /** + * Cache the Broadcaster ID and message + * @param path the value passed to {@link #broadcastTo(String, Object)} + * @param message the value passed to {@link #broadcastTo(String, Object)} + * @return this + */ + public MetaBroadcasterCache cache(String path, Object message); + + /** + * Flush the Cache. + * @return this + */ + public MetaBroadcasterCache flushCache(); + + } + + public final static class NoCache implements MetaBroadcasterCache { + + @Override + public MetaBroadcasterCache cache(String path, Object o) { + return this; + } + + @Override + public MetaBroadcasterCache flushCache() { + return this; + } + } + + /** + * Flush the cache every 30 seconds. + */ + public final static class ThirtySecondsCache implements MetaBroadcasterCache, Runnable { + + private final DefaultMetaBroadcaster defaultMetaBroadcaster; + private final ConcurrentHashMap cache = new ConcurrentHashMap(); + + public ThirtySecondsCache(DefaultMetaBroadcaster metaBroadcaster, AtmosphereConfig config) { + this.defaultMetaBroadcaster = metaBroadcaster; + ExecutorsFactory.getScheduler(config).scheduleAtFixedRate(this, 0, 30, TimeUnit.SECONDS); + } + + @Override + public MetaBroadcasterCache cache(String path, Object o) { + cache.put(path, o); + return this; + } + + @Override + public MetaBroadcasterCache flushCache() { + for (Map.Entry e : cache.entrySet()) { + defaultMetaBroadcaster.map(e.getKey(), e.getValue(), -1, null, false, false); + } + return this; + } + + @Override + public void run() { + flushCache(); + cache.clear(); + } + } + +} diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/MetaBroadcaster.java b/modules/cpr/src/main/java/org/atmosphere/cpr/MetaBroadcaster.java index b6ce56d75e6..3f9a9ad1e33 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/MetaBroadcaster.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/MetaBroadcaster.java @@ -1,5 +1,5 @@ /* - * Copyright 2014 Jean-Francois Arcand + * Copyright 2014 Jeanfrancois Arcand * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of @@ -15,390 +15,27 @@ */ package org.atmosphere.cpr; -import org.atmosphere.util.ExecutorsFactory; -import org.atmosphere.util.uri.UriTemplate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** - * Broadcast events to all or a subset of available {@link Broadcaster}s based on their {@link org.atmosphere.cpr.Broadcaster#getID()} value. - * This class allows broadcasting events to a set of broadcasters that maps to some String like: - *
- *        // Broadcast the event to all Broadcaster ID starting with /hello
- *        broadcast("/hello", event)
- *        // Broadcast the event to all Broadcaster ID
- *        broaccast("/*", event);
- * 
- * The rule used is similar to path/URI mapping used by technology like Servlet, Jersey, etc. - *

- * NOTE: Broadcasters' name must start with / in order to get retrieved by this class. - *

- * This class is NOT thread safe. - *

- * If you want to use MetaBroadcaster with Jersey or any framework, make sure all {@link org.atmosphere.cpr.Broadcaster#getID()} - * starts with '/'. For example, with Jersey: - *

- *
  * @author Jeanfrancois Arcand
- * @Path(RestConstants.STREAMING + "/workspace{wid:/[0-9A-Z]+}")
- * public class JerseyPubSub {
- * @PathParam("wid") private Broadcaster topic;
- * 
*/ -public class MetaBroadcaster { - - public static final String MAPPING_REGEX = "[/a-zA-Z0-9-&.*=@_;\\?]+"; - - private final static Logger logger = LoggerFactory.getLogger(MetaBroadcaster.class); - private static MetaBroadcaster metaBroadcaster; - private final static ConcurrentLinkedQueue broadcasterListeners = new ConcurrentLinkedQueue(); - private final static MetaBroadcasterFuture E = new MetaBroadcasterFuture(Collections.emptyList()); - private MetaBroadcasterCache cache = new NoCache(); - private AtmosphereConfig config; - - public MetaBroadcaster() { - // Ugly - metaBroadcaster = this; - } - - public MetaBroadcaster(AtmosphereConfig config) { - this.config = config; - // Ugly - metaBroadcaster = this; - } - - protected MetaBroadcasterFuture broadcast(final String path, Object message, int time, TimeUnit unit, boolean delay, boolean cacheMessage) { - if (config != null) { - Collection c = config.getBroadcasterFactory().lookupAll(); - - final Map m = new HashMap(); - List l = new ArrayList(); - logger.trace("Map {}", path); - UriTemplate t = null; - try { - t = new UriTemplate(path); - for (Broadcaster b : c) { - logger.trace("Trying to map {} to {}", t, b.getID()); - if (t.match(b.getID(), m)) { - l.add(b); - } - m.clear(); - } - } finally { - if (t != null) t.destroy(); - } - - if (l.isEmpty() && cacheMessage) { - if (NoCache.class.isAssignableFrom(cache.getClass())) { - logger.warn("No Broadcaster matches {}. Message {} WILL BE LOST. " + - "Make sure you cache it or make sure the Broadcaster exists before.", path, message); - } else { - cache.cache(path, message); - } - return E; - } - - MetaBroadcasterFuture f = new MetaBroadcasterFuture(l); - CompleteListener cl = new CompleteListener(f); - - for (Broadcaster b : l) { - if (time <= 0) { - f.outerFuture(b.addBroadcasterListener(cl).broadcast(message)); - } else if (!delay) { - f.outerFuture(b.scheduleFixedBroadcast(message, time, unit)); - } else { - f.outerFuture(b.delayBroadcast(message, time, unit)); - } - } - - return f; - } else { - return E; - } - } - - protected MetaBroadcasterFuture map(String path, Object message, int time, TimeUnit unit, boolean delay, boolean cacheMessage) { - - if (path == null || path.isEmpty()) { - throw new NullPointerException(); - } - - if (path.contains("*")) { - path = path.replace("*", MAPPING_REGEX); - } - - if (path.equals("/")) { - path += MAPPING_REGEX; - } - - return broadcast(path, message, time, unit, delay, cacheMessage); - } - - /** - * Broadcast the message to all Broadcasters whose {@link org.atmosphere.cpr.Broadcaster#getID()} matches the broadcasterID value. - * - * @param broadcasterID a String (or path) that can potentially match a {@link org.atmosphere.cpr.Broadcaster#getID()} - * @param message a message to be broadcasted - * @return a Future - */ - public Future> broadcastTo(String broadcasterID, Object message) { - return map(broadcasterID, message, -1, null, false, true); - } - - /** - * Flush the cached messages. - * @return this - */ - protected MetaBroadcaster flushCache() { - if (cache != null) cache.flushCache(); - return this; - } - - /** - * Broadcast the message at a fixed rate to all Broadcasters whose {@link org.atmosphere.cpr.Broadcaster#getID()} - * matches the broadcasterID value. This operation will invoke {@link Broadcaster#scheduleFixedBroadcast(Object, long, java.util.concurrent.TimeUnit)}} - * - * @param broadcasterID a String (or path) that can potentially match a {@link org.atmosphere.cpr.Broadcaster#getID()} - * @param message a message to be broadcasted - * @param time a time value - * @param unit a {@link TimeUnit} - * @return a Future - */ - public Future> scheduleTo(String broadcasterID, Object message, int time, TimeUnit unit) { - return map(broadcasterID, message, time, unit, false, true); - } - - /** - * Delay the message delivery to Broadcasters whose {@link org.atmosphere.cpr.Broadcaster#getID()} - * matches the broadcasterID value. This operation will invoke {@link Broadcaster#delayBroadcast(Object, long, java.util.concurrent.TimeUnit)} (Object, long, java.util.concurrent.TimeUnit)}} - * - * @param broadcasterID a String (or path) that can potentially match a {@link org.atmosphere.cpr.Broadcaster#getID()} - * @param message a message to be broadcasted - * @param time a time value - * @param unit a {@link TimeUnit} - * @return a Future - */ - public Future> delayTo(String broadcasterID, Object message, int time, TimeUnit unit) { - return map(broadcasterID, message, time, unit, true, true); - } - - /** - * - * @deprecated Use {@link AtmosphereConfig#metaBroadcaster()} - */ - public synchronized final static MetaBroadcaster getDefault() { - if (metaBroadcaster == null) { - metaBroadcaster = new MetaBroadcaster(); - } - return metaBroadcaster; - } - - private final static class CompleteListener extends BroadcasterListenerAdapter { - - private final MetaBroadcasterFuture f; - - private CompleteListener(MetaBroadcasterFuture f) { - this.f = f; - } - - @Override - public void onPostCreate(Broadcaster b) { - } - - @Override - public void onComplete(Broadcaster b) { - b.removeBroadcasterListener(this); - f.countDown(); - if (f.isDone()) { - for (BroadcasterListener l : broadcasterListeners) { - try { - l.onComplete(b); - } catch (Exception ex) { - logger.warn("", ex); - } - } - } - } - - @Override - public void onPreDestroy(Broadcaster b) { - } - } - - private final static class MetaBroadcasterFuture implements Future> { - - private final CountDownLatch latch; - private final List l; - private boolean isCancelled = false; - private final List> outerFuture = new ArrayList>(); - - private MetaBroadcasterFuture(List l) { - this.latch = new CountDownLatch(l.size()); - this.l = l; - } - - MetaBroadcasterFuture outerFuture(Future f) { - outerFuture.add(f); - return this; - } - - @Override - public boolean cancel(boolean b) { - for (Future f : outerFuture) { - f.cancel(b); - } - - while (latch.getCount() > 0) { - latch.countDown(); - } - isCancelled = true; - return isCancelled; - } - - @Override - public boolean isCancelled() { - return isCancelled; - } - - @Override - public boolean isDone() { - return latch.getCount() == 0; - } - - @Override - public List get() throws InterruptedException, ExecutionException { - latch.await(); - return l; - } - - @Override - public List get(long t, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { - latch.await(t, timeUnit); - return l; - } - - public void countDown() { - latch.countDown(); - } - } - - /** - * Add a {@link BroadcasterListener} to all mapped {@link Broadcaster}s. - * - * @param b {@link BroadcasterListener} - * @return this - */ - public MetaBroadcaster addBroadcasterListener(BroadcasterListener b) { - broadcasterListeners.add(b); - return this; - } - - /** - * Remove the {@link BroadcasterListener}. - * - * @param b {@link BroadcasterListener} - * @return this - */ - public MetaBroadcaster removeBroadcasterListener(BroadcasterListener b) { - broadcasterListeners.remove(b); - return this; - } - - /** - * Set the {@link MetaBroadcasterCache}. Default is {@link NoCache}. - * @param cache - * @return - */ - public MetaBroadcaster cache(MetaBroadcasterCache cache) { - this.cache = cache; - return this; - } - - protected void destroy(){ - broadcasterListeners.clear(); - flushCache(); - } - - /** - * Cache message if no {@link Broadcaster} maps the {@link #broadcastTo(String, Object)} - */ - public static interface MetaBroadcasterCache { - - /** - * Cache the Broadcaster ID and message - * @param path the value passed to {@link #broadcastTo(String, Object)} - * @param message the value passed to {@link #broadcastTo(String, Object)} - * @return this - */ - public MetaBroadcasterCache cache(String path, Object message); - - /** - * Flush the Cache. - * @return this - */ - public MetaBroadcasterCache flushCache(); - - } - - public final static class NoCache implements MetaBroadcasterCache { - - @Override - public MetaBroadcasterCache cache(String path, Object o) { - return this; - } - - @Override - public MetaBroadcasterCache flushCache() { - return this; - } - } +public interface MetaBroadcaster { + void configure(AtmosphereConfig config); - /** - * Flush the cache every 30 seconds. - */ - public final static class ThirtySecondsCache implements MetaBroadcasterCache, Runnable { + Future> broadcastTo(String broadcasterID, Object message); - private final MetaBroadcaster metaBroadcaster; - private final ConcurrentHashMap cache = new ConcurrentHashMap(); + Future> scheduleTo(String broadcasterID, Object message, int time, TimeUnit unit); - public ThirtySecondsCache(MetaBroadcaster metaBroadcaster, AtmosphereConfig config) { - this.metaBroadcaster = metaBroadcaster; - ExecutorsFactory.getScheduler(config).scheduleAtFixedRate(this, 0, 30, TimeUnit.SECONDS); - } + Future> delayTo(String broadcasterID, Object message, int time, TimeUnit unit); - @Override - public MetaBroadcasterCache cache(String path, Object o) { - cache.put(path, o); - return this; - } + MetaBroadcaster addBroadcasterListener(BroadcasterListener b); - @Override - public MetaBroadcasterCache flushCache() { - for (Map.Entry e : cache.entrySet()) { - metaBroadcaster.map(e.getKey(), e.getValue(), -1, null, false, false); - } - return this; - } + MetaBroadcaster removeBroadcasterListener(BroadcasterListener b); - @Override - public void run() { - flushCache(); - cache.clear(); - } - } + MetaBroadcaster cache(DefaultMetaBroadcaster.MetaBroadcasterCache cache); + void destroy(); } diff --git a/modules/cpr/src/main/java/org/atmosphere/cpr/Universe.java b/modules/cpr/src/main/java/org/atmosphere/cpr/Universe.java index 46bd888c398..05a863c4f78 100644 --- a/modules/cpr/src/main/java/org/atmosphere/cpr/Universe.java +++ b/modules/cpr/src/main/java/org/atmosphere/cpr/Universe.java @@ -21,14 +21,14 @@ public class Universe { private static AtmosphereFramework framework; private static AtmosphereResourceFactory resourceFactory; private static AtmosphereResourceSessionFactory sessionFactory; - private static MetaBroadcaster metaBroadcaster; + private static DefaultMetaBroadcaster metaBroadcaster; /** - * Set the must be unique {@link org.atmosphere.cpr.MetaBroadcaster} + * Set the must be unique {@link DefaultMetaBroadcaster} * - * @param a {@link org.atmosphere.cpr.MetaBroadcaster} + * @param a {@link DefaultMetaBroadcaster} */ - public static void metaBroadcaster(MetaBroadcaster a) { + public static void metaBroadcaster(DefaultMetaBroadcaster a) { if (metaBroadcaster != null) { logger.warn("More than one Universe configured. Universe class will gives wrong object reference {}", a); } @@ -120,11 +120,11 @@ public static AtmosphereResourceSessionFactory sessionFactory() { } /** - * Return the {@link org.atmosphere.cpr.MetaBroadcaster} + * Return the {@link DefaultMetaBroadcaster} * - * @return the {@link org.atmosphere.cpr.MetaBroadcaster} + * @return the {@link DefaultMetaBroadcaster} */ - public static MetaBroadcaster metaBroadcaster() { + public static DefaultMetaBroadcaster metaBroadcaster() { return metaBroadcaster; } } diff --git a/modules/cpr/src/main/java/org/atmosphere/inject/InjectableObjectFactory.java b/modules/cpr/src/main/java/org/atmosphere/inject/InjectableObjectFactory.java index 8b38cb88e18..2138251fe5b 100644 --- a/modules/cpr/src/main/java/org/atmosphere/inject/InjectableObjectFactory.java +++ b/modules/cpr/src/main/java/org/atmosphere/inject/InjectableObjectFactory.java @@ -30,7 +30,7 @@ /** * Support injection of Atmosphere's Internal object using {@link } * {@link org.atmosphere.cpr.AtmosphereConfig},{@link AtmosphereFramework,{@link AtmosphereFramework,{@link org.atmosphere.cpr.BroadcasterFactory, - * {@link org.atmosphere.cpr.AtmosphereResourceFactory} ,{@link org.atmosphere.cpr.MetaBroadcaster } and + * {@link org.atmosphere.cpr.AtmosphereResourceFactory} ,{@link org.atmosphere.cpr.DefaultMetaBroadcaster } and * {@link org.atmosphere.cpr.AtmosphereResourceSessionFactory } * * @author Jeanfrancois Arcand diff --git a/modules/cpr/src/main/java/org/atmosphere/inject/MetaBroadcasterInjectable.java b/modules/cpr/src/main/java/org/atmosphere/inject/MetaBroadcasterInjectable.java index 87b06c10989..98fd18840a9 100644 --- a/modules/cpr/src/main/java/org/atmosphere/inject/MetaBroadcasterInjectable.java +++ b/modules/cpr/src/main/java/org/atmosphere/inject/MetaBroadcasterInjectable.java @@ -16,6 +16,7 @@ package org.atmosphere.inject; import org.atmosphere.cpr.AtmosphereConfig; +import org.atmosphere.cpr.DefaultMetaBroadcaster; import org.atmosphere.cpr.MetaBroadcaster; import java.lang.reflect.Type; @@ -24,7 +25,7 @@ public class MetaBroadcasterInjectable implements Injectable { @Override public boolean supportedType(Type t) { - return (t instanceof Class) && MetaBroadcaster.class.isAssignableFrom((Class) t); + return (t instanceof Class) && DefaultMetaBroadcaster.class.isAssignableFrom((Class) t); } @Override diff --git a/modules/cpr/src/test/java/org/atmosphere/annotation/ManagedAtmosphereHandlerTest.java b/modules/cpr/src/test/java/org/atmosphere/annotation/ManagedAtmosphereHandlerTest.java index fac0e524499..736faa5039a 100644 --- a/modules/cpr/src/test/java/org/atmosphere/annotation/ManagedAtmosphereHandlerTest.java +++ b/modules/cpr/src/test/java/org/atmosphere/annotation/ManagedAtmosphereHandlerTest.java @@ -37,7 +37,7 @@ import org.atmosphere.cpr.AtmosphereResponse; import org.atmosphere.cpr.BroadcasterFactory; import org.atmosphere.cpr.FrameworkConfig; -import org.atmosphere.cpr.MetaBroadcaster; +import org.atmosphere.cpr.DefaultMetaBroadcaster; import org.atmosphere.interceptor.HeartbeatInterceptor; import org.atmosphere.interceptor.InvokationOrder; import org.atmosphere.util.ExcludeSessionBroadcaster; @@ -517,7 +517,7 @@ public final static class InjectAnnotation { @Inject private BroadcasterFactory bFactory; @Inject - private MetaBroadcaster m; + private DefaultMetaBroadcaster m; @Inject private AtmosphereResourceSessionFactory sessionFactory; diff --git a/modules/cpr/src/test/java/org/atmosphere/cpr/AtmosphereResourceStateRecoveryTest.java b/modules/cpr/src/test/java/org/atmosphere/cpr/AtmosphereResourceStateRecoveryTest.java index 610e1f1552b..717042ca678 100644 --- a/modules/cpr/src/test/java/org/atmosphere/cpr/AtmosphereResourceStateRecoveryTest.java +++ b/modules/cpr/src/test/java/org/atmosphere/cpr/AtmosphereResourceStateRecoveryTest.java @@ -168,7 +168,7 @@ public void longPollingAggregatedTest() throws ServletException, IOException, Ex r.suspend(); - MetaBroadcaster.getDefault().broadcastTo("/1", "Initialize Cache").get(); + config.metaBroadcaster().broadcastTo("/1", "Initialize Cache").get(); r.close(); AtmosphereResourceImpl r2 = (AtmosphereResourceImpl) config.resourcesFactory().create(config, "1234567");