Skip to content

Commit

Permalink
Fixes #1932
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed Mar 31, 2015
1 parent 22a536a commit f29fde9
Showing 1 changed file with 7 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -71,7 +72,6 @@
public class DefaultBroadcaster implements Broadcaster {
public static final int POLLING_DEFAULT = 100;
public static final String CACHED = DefaultBroadcaster.class.getName() + ".messagesCached";
public static final String ASYNC_TOKEN = DefaultBroadcaster.class.getName() + ".token";

private static final Logger logger = LoggerFactory.getLogger(DefaultBroadcaster.class);
private static final String DESTROYED = "This Broadcaster has been destroyed and cannot be used {} by invoking {}";
Expand Down Expand Up @@ -117,6 +117,7 @@ public class DefaultBroadcaster implements Broadcaster {
private boolean cacheOnIOFlushException = true;
protected boolean sharedListeners = false;
protected boolean candidateForPoolable;
protected final String usingTokenIdForAttribute = UUID.randomUUID().toString();

public DefaultBroadcaster() {
}
Expand Down Expand Up @@ -840,7 +841,7 @@ protected void executeAsyncWrite(final AsyncWriteToken token) {
try {
request.setAttribute(getID(), token.future);
request.setAttribute(MAX_INACTIVE, System.currentTimeMillis());
request.setAttribute(ASYNC_TOKEN, token);
request.setAttribute(usingTokenIdForAttribute, token);

if (willBeResumed && !r.atmosphereResourceEventListener().isEmpty()) {
listeners.addAll(r.atmosphereResourceEventListener());
Expand Down Expand Up @@ -897,7 +898,7 @@ protected void executeAsyncWrite(final AsyncWriteToken token) {

try {
request.removeAttribute(getID());
request.removeAttribute(ASYNC_TOKEN);
request.removeAttribute(usingTokenIdForAttribute);
} catch (NullPointerException ex) {
logger.trace("NPE after the message has been written for {}", r.uuid());
}
Expand Down Expand Up @@ -1109,7 +1110,7 @@ public void onException(Throwable t, final AtmosphereResource ar, boolean notify
}

if (notifyAndCache) {
cacheLostMessage(r, (AsyncWriteToken) r.getRequest(false).getAttribute(ASYNC_TOKEN), notifyAndCache);
cacheLostMessage(r, (AsyncWriteToken) r.getRequest(false).getAttribute(usingTokenIdForAttribute), notifyAndCache);
}

/**
Expand All @@ -1134,20 +1135,6 @@ public void run() {
}
}

/**
* Cache the message because an unexpected exception occurred.
*
* @param r {@link AtmosphereResource}
*/
public void cacheLostMessage(AtmosphereResource r) {
AtmosphereRequest request = AtmosphereResourceImpl.class.cast(r).getRequest(false);
try {
cacheLostMessage(r, (AsyncWriteToken) request.getAttribute(ASYNC_TOKEN));
} finally {
request.removeAttribute(ASYNC_TOKEN);
}
}

/**
* Cache the message because an unexpected exception occurred.
*
Expand All @@ -1156,9 +1143,9 @@ public void cacheLostMessage(AtmosphereResource r) {
public void cacheLostMessage(AtmosphereResource r, boolean force) {
AtmosphereRequest request = AtmosphereResourceImpl.class.cast(r).getRequest(false);
try {
cacheLostMessage(r, (AsyncWriteToken) request.getAttribute(ASYNC_TOKEN), force);
cacheLostMessage(r, (AsyncWriteToken) request.getAttribute(usingTokenIdForAttribute), force);
} finally {
request.removeAttribute(ASYNC_TOKEN);
request.removeAttribute(usingTokenIdForAttribute);
}
}

Expand Down

0 comments on commit f29fde9

Please sign in to comment.