Skip to content

Commit

Permalink
Fix for #993
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed Apr 2, 2013
1 parent 79835c3 commit 2e3b323
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 25 deletions.
47 changes: 34 additions & 13 deletions modules/cpr/src/main/java/org/atmosphere/cpr/ApplicationConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/
package org.atmosphere.cpr;

import org.atmosphere.client.MessageLengthInterceptor;
import org.atmosphere.client.TrackMessageSizeInterceptor;
import org.atmosphere.interceptor.AtmosphereResourceLifecycleInterceptor;
import org.atmosphere.util.EndpointMapper;
import org.atmosphere.websocket.WebSocketProcessor;
import org.atmosphere.websocket.WebSocketProtocol;

Expand Down Expand Up @@ -274,14 +275,6 @@ public interface ApplicationConfig {
* Value: "org.atmosphere.cpr.allowQueryStreamAsPostOrGet"
*/
String ALLOW_QUERYSTRING_AS_REQUEST = ApplicationConfig.class.getPackage().getName() + ".allowQueryStreamAsPostOrGet";
/**
* Configure the padding used when streaming is used. Value can be atmosphere or whitespace. Default is ATMOSPHERE
* {@link org.atmosphere.cpr.AtmosphereResourceImpl#createStreamingPadding(String)} ()}
* <p></p>
* Default is returned by AtmosphereResourceImpl.createStreamingPadding
* Value: "org.atmosphere.cpr.padding"
*/
String STREAMING_PADDING_MODE = ApplicationConfig.class.getPackage().getName() + ".padding";
/**
* Configure {@link Broadcaster} to share the same {@link java.util.concurrent.ExecutorService} instead among them.
* <p></p>
Expand Down Expand Up @@ -444,7 +437,7 @@ public interface ApplicationConfig {
/**
* Regex pattern for excluding file from being serviced by {@link AtmosphereFilter}
* <p></p>
* Default is {@link AtmosphereFilter.EXCLUDE_FILES}
* Default is {@link AtmosphereFilter#EXCLUDE_FILES}
* Value: "org.atmosphere.cpr.AtmosphereFilter.excludes"
*/
String ATMOSPHERE_EXCLUDED_FILE = AtmosphereFilter.class.getName() + ".excludes";
Expand All @@ -453,9 +446,9 @@ public interface ApplicationConfig {
* received in one chunk. Default is '|'
* <p></p>
* Default is "|"
* Value: "org.atmosphere.client.MessageLengthInterceptor.delimiter"
* Value: "org.atmosphere.client.TrackMessageSizeInterceptor.delimiter"
*/
String MESSAGE_DELIMITER = MessageLengthInterceptor.class.getName() + ".delimiter";
String MESSAGE_DELIMITER = TrackMessageSizeInterceptor.class.getName() + ".delimiter";
/**
* The method used that trigger automatic management of {@link AtmosphereResource} when the {@link AtmosphereResourceLifecycleInterceptor}
* is used
Expand Down Expand Up @@ -524,12 +517,33 @@ public interface ApplicationConfig {
*/
String BACKWARD_COMPATIBLE_WEBSOCKET_BEHAVIOR = "org.atmosphere.websocket.backwardCompatible.atmosphereResource";
/**
* A list, seperated by comma, of package name to scan when looking for Atmosphere's component annotated with Atmosphere's annotation.
* A list, separated by comma, of package name to scan when looking for Atmosphere's component annotated with Atmosphere's annotation.
* <p>
* Default ""
* Value: "org.atmosphere.cpr.packages"
*/
String ANNOTATION_PACKAGE = "org.atmosphere.cpr.packages";
/**
* The annotation processor
* <p>
* Default "org.atmosphere.cpr.DefaultAnnotationProcessor"
* Value: "org.atmosphere.cpr.AnnotationProcessor"
*/
String ANNOTATION_PROCESSOR = AnnotationProcessor.class.getName();
/**
* Define an implementation of the {@link org.atmosphere.util.EndpointMapper}
* <p>
* Default "org.atmosphere.cpr.DefaultEndpointMapper"
* Value: "org.atmosphere.cpr.EndpointMapper"
*/
String ENDPOINT_MAPPER = EndpointMapper.class.getName();
/**
* The list of content-type to exclude when delimiting message.
* <p>
* Default ""
* Value: "org.atmosphere.client.TrackMessageSizeInterceptor.excludedContentType"
*/
String EXCLUDED_CONTENT_TYPES = TrackMessageSizeInterceptor.class.getName() + ".excludedContentType";
/**
* Allow defining the Broadcaster's Suspend Policy {@link Broadcaster#setSuspendPolicy(long, org.atmosphere.cpr.Broadcaster.POLICY)}
* <p>
Expand All @@ -543,5 +557,12 @@ public interface ApplicationConfig {
* Value: "org.atmosphere.cpr.Broadcaster.POLICY.maximumSuspended"
*/
String BROADCASTER_POLICY_TIMEOUT = Broadcaster.POLICY.class.getName() + ".maximumSuspended";
/**
* Change the default regex used when mapping AtmosphereHandler. Default is {@link AtmosphereFramework#MAPPING_REGEX}
* <p>
* Default "[a-zA-Z0-9-&.*_=@;\?]+"
* Value: "org.atmosphere.client.ApplicationConfig.mappingRegex"
*/
String HANDLER_MAPPING_REGEX = ApplicationConfig.class.getPackage().getName() + ".mappingRegex";
}

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -320,6 +321,14 @@ private void logDuplicateFilter(BroadcastFilter e) {
}
}

/**
* Return the current list of installed {@link BroadcastFilter}
* @return the current list of installed {@link BroadcastFilter}
*/
public Collection<BroadcastFilter> filters(){
return filters;
}

public void destroy() {
destroy(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,32 @@
*/
package org.atmosphere.interceptor;

import org.atmosphere.client.TrackMessageSizeFilter;
import org.atmosphere.cpr.Action;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereInterceptor;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.atmosphere.cpr.AtmosphereResourceEventListenerAdapter;
import org.atmosphere.cpr.BroadcastFilter;
import org.atmosphere.cpr.HeaderConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

/**
* An Intewrceptor that send back to a websocket and http client the value of {@link HeaderConfig#X_ATMOSPHERE_TRACKING_ID}
* An Interceptor that send back to a websocket and http client the value of {@link HeaderConfig#X_ATMOSPHERE_TRACKING_ID}
* and {@link HeaderConfig#X_CACHE_DATE}
*
* @author Jeanfrancois Arcand
*/
public class JavaScriptProtocol implements AtmosphereInterceptor {
private final static Logger logger = LoggerFactory.getLogger(JavaScriptProtocol.class);
private String wsDelimiter = "|";
private final TrackMessageSizeFilter f = new TrackMessageSizeFilter();

@Override
public void configure(AtmosphereConfig config) {
Expand All @@ -53,26 +57,50 @@ public Action inspect(final AtmosphereResource r) {
String handshakeUUID = r.getRequest().getHeader(HeaderConfig.X_ATMO_PROTOCOL);
if (uuid != null && uuid.equals("0") && handshakeUUID != null) {
r.getRequest().header(HeaderConfig.X_ATMO_PROTOCOL, null);

// Since 1.0.10
r.addEventListener(new AtmosphereResourceEventListenerAdapter() {
@Override
public void onSuspend(AtmosphereResourceEvent event) {
r.getResponse().write(r.uuid() + wsDelimiter + System.currentTimeMillis());

if (r.transport() == AtmosphereResource.TRANSPORT.LONG_POLLING ||
r.transport() == AtmosphereResource.TRANSPORT.JSONP) {
r.resume();
} else {
final StringBuffer message = new StringBuffer(r.uuid()).append(wsDelimiter).append(System.currentTimeMillis());

// https://github.com/Atmosphere/atmosphere/issues/993
boolean track = false;
if (r.getBroadcaster().getBroadcasterConfig().hasFilters()) {
for (BroadcastFilter bf : r.getBroadcaster().getBroadcasterConfig().filters()) {
if (TrackMessageSizeFilter.class.isAssignableFrom(bf.getClass())) {
track = true;
break;
}
}
}

final AtomicReference<String> protocolMessage = new AtomicReference<String>(message.toString());
if (track) {
protocolMessage.set((String) f.filter(r, protocolMessage.get(), protocolMessage.get()).message());
}

if (r.transport() == AtmosphereResource.TRANSPORT.STREAMING) {
r.addEventListener(new AtmosphereResourceEventListenerAdapter() {
@Override
public void onSuspend(AtmosphereResourceEvent event) {
r.getResponse().write(protocolMessage.get());
try {
r.getResponse().flushBuffer();
} catch (IOException e) {
logger.trace("", e);
}
}
}
});
});
} else {
r.getResponse().write(protocolMessage.get());
}

// We don't need to reconnect here
if (r.transport() == AtmosphereResource.TRANSPORT.WEBSOCKET
|| r.transport() == AtmosphereResource.TRANSPORT.STREAMING
|| r.transport() == AtmosphereResource.TRANSPORT.SSE) {
return Action.CONTINUE;
} else {
return Action.CANCELLED;
}
}
return Action.CONTINUE;
}
Expand Down

0 comments on commit 2e3b323

Please sign in to comment.