Skip to content

Commit

Permalink
fix: preserve push messages in cache until they are seen by client (#…
Browse files Browse the repository at this point in the history
…15764) (#15820)

Atmospehere caches messages when the client is disconnected, but unfortunately it may happen that a message does not reach the client because of network disconnection during async response write operation. In this case the message is not cached and will be completely lost, causing a UI resynchronization request.
This change preserves messages in broadcaster cache until the client confirms that they have been processed, by sending the last seen server sync identifier on reconnection.
This should prevent the need for a UI resynchronization. It may happen in some cases, e.g. back to online after being offline, that messages already seen will be sent to the client, but Flow will discard them.

Part of #15281
Fixes #15205

Co-authored-by: Marco Collovati <marco@vaadin.com>
Co-authored-by: Teppo Kurki <teppo.kurki@vaadin.com>
  • Loading branch information
3 people committed Feb 3, 2023
1 parent c2d46c1 commit a3c4cea
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.gwt.core.client.JavaScriptObject;
import com.google.gwt.core.client.Scheduler;

import com.vaadin.client.Command;
import com.vaadin.client.Console;
import com.vaadin.client.Registry;
Expand Down Expand Up @@ -451,6 +450,10 @@ protected void onReconnect(JavaScriptObject request,
getConnectionStateHandler().pushReconnectPending(this);
}

private int getLastSeenServerSyncId() {
return registry.getMessageHandler().getLastSeenServerSyncId();
}

/**
* JavaScriptObject class with some helper methods to set and get primitive
* values.
Expand Down Expand Up @@ -665,6 +668,7 @@ protected final native AtmosphereConfiguration createConfig()
trackMessageLength: true,
enableProtocol: true,
handleOnlineOffline: false,
executeCallbackBeforeReconnect: true,
messageDelimiter: String.fromCharCode(@com.vaadin.flow.shared.communication.PushConstants::MESSAGE_DELIMITER)
};
}-*/;
Expand Down Expand Up @@ -699,6 +703,11 @@ private final native JavaScriptObject doConnect(String uri,
config.onClientTimeout = $entry(function(request) {
self.@com.vaadin.client.communication.AtmospherePushConnection::onClientTimeout(*)(request);
});
config.headers = {
'X-Vaadin-LastSeenServerSyncId': function() {
return self.@com.vaadin.client.communication.AtmospherePushConnection::getLastSeenServerSyncId(*)();
}
};
return $wnd.vaadinPush.atmosphere.subscribe(config);
}-*/;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResource.TRANSPORT;
import org.atmosphere.cpr.BroadcastFilterAdapter;
import org.atmosphere.util.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -216,7 +217,9 @@ public void push(boolean async) {
protected void sendMessage(String message) {
assert (isConnected());
// "Broadcast" the changes to the single client only
outgoingMessage = getResource().getBroadcaster().broadcast(message,
outgoingMessage = getResource().getBroadcaster().broadcast(
new PushMessage(ui.getInternals().getServerSyncId() - 1,
message),
getResource());
}

Expand Down Expand Up @@ -412,4 +415,33 @@ public static void enableAtmosphereDebugLogging() {
+ " instead (i.e.: logback, log4j, etc)");
}

static final class PushMessage implements Serializable {
final int serverSyncId;
final String message;

PushMessage(int serverSyncId, String message) {
this.serverSyncId = serverSyncId;
this.message = message;
}

boolean alreadySeen(int lastSeenOnClient) {
return serverSyncId <= lastSeenOnClient;
}
}

/**
* A {@link org.atmosphere.cpr.BroadcastFilter} that unwraps the message to
* be sent to the client from a {@link PushMessage} instance.
*/
static final class PushMessageUnwrapFilter extends BroadcastFilterAdapter
implements Serializable {
@Override
public BroadcastAction filter(String broadcasterId,
AtmosphereResource r, Object originalMessage, Object message) {
if (message instanceof AtmospherePushConnection.PushMessage) {
message = ((PushMessage) message).message;
}
return new BroadcastAction(message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2000-2023 Vaadin Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.vaadin.flow.server.communication;

import java.io.Serializable;

import org.atmosphere.cache.BroadcastMessage;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.BroadcasterCache;
import org.atmosphere.cpr.PerRequestBroadcastFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link PerRequestBroadcastFilter} implementation that handles
* {@link com.vaadin.flow.server.communication.AtmospherePushConnection.PushMessage}s
* to ensure that a message is preserved in the {@link BroadcasterCache} until
* the client has received it.
*
* The filter acts only on {@literal LONG POLLING} transport and expects that
* the client sends the {@literal X-Vaadin-LastSeenServerSyncId} header with the
* identifier of the last message seen, every time the connection is
* established.
*
* Messages already seen are discarded, whereas messages not yet sent to the
* client are added again to the cache to preserve them until client confirms
* reception by sending the last seen message identifier.
*/
public class LongPollingCacheFilter
implements PerRequestBroadcastFilter, Serializable {
public static final String SEEN_SERVER_SYNC_ID = "X-Vaadin-LastSeenServerSyncId";

private static Logger getLogger() {
return LoggerFactory.getLogger(LongPollingCacheFilter.class.getName());
}

@Override
public BroadcastAction filter(String broadcasterId, AtmosphereResource r,
Object originalMessage, Object message) {
if (originalMessage instanceof AtmospherePushConnection.PushMessage
&& r.transport() == AtmosphereResource.TRANSPORT.LONG_POLLING
&& r.getRequest().getHeader(SEEN_SERVER_SYNC_ID) != null) {
AtmospherePushConnection.PushMessage pushMessage = (AtmospherePushConnection.PushMessage) originalMessage;
String uuid = r.uuid();
int lastSeenOnClient = Integer
.parseInt(r.getRequest().getHeader(SEEN_SERVER_SYNC_ID));
if (pushMessage.alreadySeen(lastSeenOnClient)) {
getLogger().trace(
"Discarding message {} for resource {} as client already seen {}. {}",
pushMessage.serverSyncId, uuid, lastSeenOnClient,
pushMessage.message);
// Client has already seen this message, discard it
return new BroadcastAction(BroadcastAction.ACTION.ABORT,
message);
} else {
// In rare cases with long polling, message may be lost during
// write operation and the client may never receive it.
// To prevent this kind of issues we move the message back to
// the cache until we get confirmation that the message has been
// seen
getLogger().trace(
"Put message {} for resource {} back to the cache because it may not have reached the client, as the last seen message is {}. {}",
pushMessage.serverSyncId, uuid, lastSeenOnClient,
pushMessage.message);
BroadcasterCache cache = r.getBroadcaster()
.getBroadcasterConfig().getBroadcasterCache();
cache.addToCache(broadcasterId, uuid,
new BroadcastMessage(originalMessage));
}
}
return new BroadcastAction(message);
}

@Override
public BroadcastAction filter(String broadcasterId, Object originalMessage,
Object message) {
return new BroadcastAction(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.atmosphere.cpr.AtmosphereInterceptor;
import org.atmosphere.cpr.AtmosphereRequestImpl;
import org.atmosphere.cpr.AtmosphereResponseImpl;
import org.atmosphere.cpr.BroadcasterConfig;
import org.atmosphere.interceptor.HeartbeatInterceptor;
import org.atmosphere.util.VoidAnnotationProcessor;
import org.slf4j.Logger;
Expand Down Expand Up @@ -116,6 +117,11 @@ public PushRequestHandler(VaadinServletService service)
// Map the (possibly pre-initialized) handler to the actual push
// handler
((PushAtmosphereHandler) handler).setPushHandler(pushHandler);
BroadcasterConfig broadcasterConfig = handlerWrapper.broadcaster
.getBroadcasterConfig();
broadcasterConfig.addFilter(new LongPollingCacheFilter());
broadcasterConfig.addFilter(
new AtmospherePushConnection.PushMessageUnwrapFilter());
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright 2000-2023 Vaadin Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.vaadin.flow.server.communication;

import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.atmosphere.cpr.AtmosphereRequest;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.BroadcastFilter.BroadcastAction;
import org.atmosphere.cpr.BroadcastFilter.BroadcastAction.ACTION;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.cpr.BroadcasterCache;
import org.atmosphere.cpr.BroadcasterConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class LongPollingCacheFilterTest {

static final String RESOURCE_UUID = "resourceUUID";
LongPollingCacheFilter filter = new LongPollingCacheFilter();

AtmospherePushConnection.PushMessage originalMessage = new AtmospherePushConnection.PushMessage(
5, "PUSH ME");
Object nonPushMessage = new Object();
Object message = new Object();
private AtmosphereResource resource;
private BroadcasterCache cache;

@Test
public void filter_notPushMessage_continueWithCurrentMessage() {
setTransport(AtmosphereResource.TRANSPORT.LONG_POLLING);
setSeenServerSyncIdHeader(5);
BroadcastAction action = filter.filter("broadcasterId", resource,
nonPushMessage, message);
Assert.assertEquals(ACTION.CONTINUE, action.action());
Assert.assertSame("Message should not be altered by filter", message,
action.message());
verifyMessageIsNotCached();
}

@Test
public void filter_notLongPollingTransport_continueWithCurrentMessage() {
setSeenServerSyncIdHeader(5);
Stream.of(AtmosphereResource.TRANSPORT.values())
.filter(t -> t != AtmosphereResource.TRANSPORT.LONG_POLLING)
.forEach(transport -> {
setTransport(transport);
BroadcastAction action = filter.filter("broadcasterId",
resource, originalMessage, message);
Assert.assertEquals(ACTION.CONTINUE, action.action());
Assert.assertSame(
"Message should not be altered by filter when transport is "
+ transport,
message, action.message());
});
verifyMessageIsNotCached();
}

@Test
public void filter_missingLastSeenServerSyncId_continueWithCurrentMessage() {
setTransport(AtmosphereResource.TRANSPORT.LONG_POLLING);
BroadcastAction action = filter.filter("broadcasterId", resource,
originalMessage, message);
Assert.assertEquals(ACTION.CONTINUE, action.action());
Assert.assertSame(
"Message should not be altered by filter if server sync id header is missing",
message, action.message());
verifyMessageIsNotCached();
}

@Test
public void filter_messageAlreadySeen_abort() {
setTransport(AtmosphereResource.TRANSPORT.LONG_POLLING);
setSeenServerSyncIdHeader(5, 6);

// seen server sync id == push message server sync id
BroadcastAction action = filter.filter("broadcasterId", resource,
originalMessage, message);
Assert.assertEquals("Expecting message seen on client to be skipped",
ACTION.ABORT, action.action());
Assert.assertSame(
"Message should not be altered by filter when aborting",
message, action.message());

// seen server sync id > push message server sync id
action = filter.filter("broadcasterId", resource, originalMessage,
message);
Assert.assertEquals("Expecting message seen on client to be skipped",
ACTION.ABORT, action.action());
Assert.assertSame(
"Message should not be altered by filter when aborting",
message, action.message());
verifyMessageIsNotCached();
}

@Test
public void filter_messageNotYetSeen_addToCacheAndContinue() {
setTransport(AtmosphereResource.TRANSPORT.LONG_POLLING);
setSeenServerSyncIdHeader(2);
String broadcasterId = "broadcasterId";
BroadcastAction action = filter.filter(broadcasterId, resource,
originalMessage, message);
Assert.assertEquals("Expecting message not seen on client to be sent",
ACTION.CONTINUE, action.action());
Assert.assertSame(
"Message should not be altered by filter when continuing",
message, action.message());
Mockito.verify(cache).addToCache(ArgumentMatchers.eq(broadcasterId),
ArgumentMatchers.eq(RESOURCE_UUID),
ArgumentMatchers.argThat(m -> m.message() == originalMessage));
}

@Before
public void setUp() {
resource = Mockito.mock(AtmosphereResource.class);
AtmosphereRequest request = Mockito.mock(AtmosphereRequest.class);
Broadcaster broadcaster = Mockito.mock(Broadcaster.class);
BroadcasterConfig broadcasterConfig = Mockito
.mock(BroadcasterConfig.class);
cache = Mockito.mock(BroadcasterCache.class);
Mockito.when(broadcaster.getBroadcasterConfig())
.thenReturn(broadcasterConfig);
Mockito.when(broadcasterConfig.getBroadcasterCache()).thenReturn(cache);

Mockito.when(resource.getBroadcaster()).thenReturn(broadcaster);
Mockito.when(resource.getRequest()).thenReturn(request);
Mockito.when(resource.uuid()).thenReturn(RESOURCE_UUID);
}

private void setTransport(AtmosphereResource.TRANSPORT transport) {
Mockito.when(resource.transport()).thenReturn(transport);
}

private void setSeenServerSyncIdHeader(int id, int... ids) {
Mockito.when(resource.getRequest()
.getHeader(LongPollingCacheFilter.SEEN_SERVER_SYNC_ID))
.thenReturn(Integer.toString(id), IntStream.of(ids)
.mapToObj(Integer::toString).toArray(String[]::new));

}

private void verifyMessageIsNotCached() {
Mockito.verifyNoInteractions(cache);
}
}

0 comments on commit a3c4cea

Please sign in to comment.