Skip to content

Commit

Permalink
Fix for #1176
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed Jul 3, 2013
1 parent 30bb1a6 commit f66492f
Showing 1 changed file with 6 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@
package org.atmosphere.cache;

import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceImpl;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterCache;
import org.atmosphere.cpr.BroadcasterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,7 +72,7 @@ public Set<String> getIds() {
}

@Override
public String toString(){
public String toString() {
return queue.toString();
}
}
Expand All @@ -99,7 +96,7 @@ public String getId() {
return id;
}

public String toString(){
public String toString() {
return message.toString();
}
}
Expand Down Expand Up @@ -173,6 +170,7 @@ public void addToCache(String broadcasterId, AtmosphereResource r, Object e) {

/**
* For backward compatibility with 1.0.9 and lower, use the method above.
*
* @param broadcasterId
* @param r
* @param e
Expand All @@ -197,59 +195,15 @@ public CacheMessage addCacheCandidate(String broadcasterId, AtmosphereResource r
String clientId = uuid(r);

activeClients.put(clientId, now);
if (isAtmosphereResourceValid(r)) {//todo better to have cacheLost flag
/**
* This line is called for each AtmosphereResource (note that
* broadcaster.getAtmosphereResources() may not return the current AtmosphereResource because
* the resource may be destroyed by DefaultBroadcaster.executeAsyncWrite or JerseyBroadcasterUtil
* concurrently, that is why we need to check duplicates),
*
* Cache the message only once for the clients
* which are not currently connected to the server
*/
Broadcaster broadcaster = getBroadCaster(r.getAtmosphereConfig(), broadcasterId);
List<AtmosphereResource> resources = new ArrayList<AtmosphereResource>(broadcaster.getAtmosphereResources());
Set<String> disconnectedClients = getDisconnectedClients(resources);
for (String disconnectedId : disconnectedClients) {
addMessageIfNotExists(disconnectedId, cacheMessage);
}
} else {
/**
* Cache lost message, caching only for specific client.
* Preventing duplicate inserts because this method can be called
* concurrently from DefaultBroadcaster.executeAsyncWrite or JerseyBroadcasterUtil
* when calling cacheLostMessage
*/
addMessageIfNotExists(clientId, cacheMessage);
}
addMessageIfNotExists(clientId, cacheMessage);
}
}
return cacheMessage;
}

private String uuid(AtmosphereResource r) {
return r.transport() == AtmosphereResource.TRANSPORT.WEBSOCKET
? (String) r.getRequest().getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID) : r.uuid();
}

private boolean isAtmosphereResourceValid(AtmosphereResource r) {
return !r.isResumed()
&& !r.isCancelled()
&& AtmosphereResourceImpl.class.cast(r).isInScope();
}

private Set<String> getDisconnectedClients(List<AtmosphereResource> resources) {
Set<String> ids = new HashSet<String>(activeClients.keySet());
for (AtmosphereResource resource : resources) {
ids.remove(resource.uuid());
}
return ids;
}

private Broadcaster getBroadCaster(AtmosphereConfig config, String broadcasterId) {
BroadcasterFactory factory = config.getBroadcasterFactory();
Broadcaster broadcaster = factory.lookup(broadcasterId, false);
return broadcaster;
? (String) r.getRequest().getAttribute(ApplicationConfig.SUSPENDED_ATMOSPHERE_RESOURCE_UUID) : r.uuid();
}

private void addMessageIfNotExists(String clientId, CacheMessage message) {
Expand Down Expand Up @@ -309,7 +263,7 @@ public List<Object> retrieveFromCache(String broadcasterId, AtmosphereResource r
}

public void clearCache(String broadcasterId, AtmosphereResourceImpl r, CacheMessage message) {
String clientId = uuid(r);
String clientId = uuid(r);
ClientQueue clientQueue;
synchronized (messages) {
clientQueue = messages.get(clientId);
Expand Down

0 comments on commit f66492f

Please sign in to comment.