From a3c4cea309c11d330d3f1f88ad4c0b1a3752a905 Mon Sep 17 00:00:00 2001 From: Vaadin Bot Date: Fri, 3 Feb 2023 21:30:04 +0100 Subject: [PATCH] fix: preserve push messages in cache until they are seen by client (#15764) (#15820) Atmospehere caches messages when the client is disconnected, but unfortunately it may happen that a message does not reach the client because of network disconnection during async response write operation. In this case the message is not cached and will be completely lost, causing a UI resynchronization request. This change preserves messages in broadcaster cache until the client confirms that they have been processed, by sending the last seen server sync identifier on reconnection. This should prevent the need for a UI resynchronization. It may happen in some cases, e.g. back to online after being offline, that messages already seen will be sent to the client, but Flow will discard them. Part of #15281 Fixes #15205 Co-authored-by: Marco Collovati Co-authored-by: Teppo Kurki --- .../AtmospherePushConnection.java | 11 +- .../AtmospherePushConnection.java | 34 +++- .../communication/LongPollingCacheFilter.java | 94 ++++++++++ .../communication/PushRequestHandler.java | 6 + .../LongPollingCacheFilterTest.java | 163 ++++++++++++++++++ 5 files changed, 306 insertions(+), 2 deletions(-) create mode 100644 flow-server/src/main/java/com/vaadin/flow/server/communication/LongPollingCacheFilter.java create mode 100644 flow-server/src/test/java/com/vaadin/flow/server/communication/LongPollingCacheFilterTest.java diff --git a/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java b/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java index 089629321c7..ee7de0a55ba 100644 --- a/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java +++ b/flow-client/src/main/java/com/vaadin/client/communication/AtmospherePushConnection.java @@ -18,7 +18,6 @@ import com.google.gwt.core.client.JavaScriptObject; import com.google.gwt.core.client.Scheduler; - import com.vaadin.client.Command; import com.vaadin.client.Console; import com.vaadin.client.Registry; @@ -451,6 +450,10 @@ protected void onReconnect(JavaScriptObject request, getConnectionStateHandler().pushReconnectPending(this); } + private int getLastSeenServerSyncId() { + return registry.getMessageHandler().getLastSeenServerSyncId(); + } + /** * JavaScriptObject class with some helper methods to set and get primitive * values. @@ -665,6 +668,7 @@ protected final native AtmosphereConfiguration createConfig() trackMessageLength: true, enableProtocol: true, handleOnlineOffline: false, + executeCallbackBeforeReconnect: true, messageDelimiter: String.fromCharCode(@com.vaadin.flow.shared.communication.PushConstants::MESSAGE_DELIMITER) }; }-*/; @@ -699,6 +703,11 @@ private final native JavaScriptObject doConnect(String uri, config.onClientTimeout = $entry(function(request) { self.@com.vaadin.client.communication.AtmospherePushConnection::onClientTimeout(*)(request); }); + config.headers = { + 'X-Vaadin-LastSeenServerSyncId': function() { + return self.@com.vaadin.client.communication.AtmospherePushConnection::getLastSeenServerSyncId(*)(); + } + }; return $wnd.vaadinPush.atmosphere.subscribe(config); }-*/; diff --git a/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java b/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java index f405205698f..70ebd095c57 100644 --- a/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java +++ b/flow-server/src/main/java/com/vaadin/flow/server/communication/AtmospherePushConnection.java @@ -27,6 +27,7 @@ import org.atmosphere.cpr.AtmosphereResource; import org.atmosphere.cpr.AtmosphereResource.TRANSPORT; +import org.atmosphere.cpr.BroadcastFilterAdapter; import org.atmosphere.util.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,7 +217,9 @@ public void push(boolean async) { protected void sendMessage(String message) { assert (isConnected()); // "Broadcast" the changes to the single client only - outgoingMessage = getResource().getBroadcaster().broadcast(message, + outgoingMessage = getResource().getBroadcaster().broadcast( + new PushMessage(ui.getInternals().getServerSyncId() - 1, + message), getResource()); } @@ -412,4 +415,33 @@ public static void enableAtmosphereDebugLogging() { + " instead (i.e.: logback, log4j, etc)"); } + static final class PushMessage implements Serializable { + final int serverSyncId; + final String message; + + PushMessage(int serverSyncId, String message) { + this.serverSyncId = serverSyncId; + this.message = message; + } + + boolean alreadySeen(int lastSeenOnClient) { + return serverSyncId <= lastSeenOnClient; + } + } + + /** + * A {@link org.atmosphere.cpr.BroadcastFilter} that unwraps the message to + * be sent to the client from a {@link PushMessage} instance. + */ + static final class PushMessageUnwrapFilter extends BroadcastFilterAdapter + implements Serializable { + @Override + public BroadcastAction filter(String broadcasterId, + AtmosphereResource r, Object originalMessage, Object message) { + if (message instanceof AtmospherePushConnection.PushMessage) { + message = ((PushMessage) message).message; + } + return new BroadcastAction(message); + } + } } diff --git a/flow-server/src/main/java/com/vaadin/flow/server/communication/LongPollingCacheFilter.java b/flow-server/src/main/java/com/vaadin/flow/server/communication/LongPollingCacheFilter.java new file mode 100644 index 00000000000..9ed00e55240 --- /dev/null +++ b/flow-server/src/main/java/com/vaadin/flow/server/communication/LongPollingCacheFilter.java @@ -0,0 +1,94 @@ +/* + * Copyright 2000-2023 Vaadin Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.vaadin.flow.server.communication; + +import java.io.Serializable; + +import org.atmosphere.cache.BroadcastMessage; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.BroadcasterCache; +import org.atmosphere.cpr.PerRequestBroadcastFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PerRequestBroadcastFilter} implementation that handles + * {@link com.vaadin.flow.server.communication.AtmospherePushConnection.PushMessage}s + * to ensure that a message is preserved in the {@link BroadcasterCache} until + * the client has received it. + * + * The filter acts only on {@literal LONG POLLING} transport and expects that + * the client sends the {@literal X-Vaadin-LastSeenServerSyncId} header with the + * identifier of the last message seen, every time the connection is + * established. + * + * Messages already seen are discarded, whereas messages not yet sent to the + * client are added again to the cache to preserve them until client confirms + * reception by sending the last seen message identifier. + */ +public class LongPollingCacheFilter + implements PerRequestBroadcastFilter, Serializable { + public static final String SEEN_SERVER_SYNC_ID = "X-Vaadin-LastSeenServerSyncId"; + + private static Logger getLogger() { + return LoggerFactory.getLogger(LongPollingCacheFilter.class.getName()); + } + + @Override + public BroadcastAction filter(String broadcasterId, AtmosphereResource r, + Object originalMessage, Object message) { + if (originalMessage instanceof AtmospherePushConnection.PushMessage + && r.transport() == AtmosphereResource.TRANSPORT.LONG_POLLING + && r.getRequest().getHeader(SEEN_SERVER_SYNC_ID) != null) { + AtmospherePushConnection.PushMessage pushMessage = (AtmospherePushConnection.PushMessage) originalMessage; + String uuid = r.uuid(); + int lastSeenOnClient = Integer + .parseInt(r.getRequest().getHeader(SEEN_SERVER_SYNC_ID)); + if (pushMessage.alreadySeen(lastSeenOnClient)) { + getLogger().trace( + "Discarding message {} for resource {} as client already seen {}. {}", + pushMessage.serverSyncId, uuid, lastSeenOnClient, + pushMessage.message); + // Client has already seen this message, discard it + return new BroadcastAction(BroadcastAction.ACTION.ABORT, + message); + } else { + // In rare cases with long polling, message may be lost during + // write operation and the client may never receive it. + // To prevent this kind of issues we move the message back to + // the cache until we get confirmation that the message has been + // seen + getLogger().trace( + "Put message {} for resource {} back to the cache because it may not have reached the client, as the last seen message is {}. {}", + pushMessage.serverSyncId, uuid, lastSeenOnClient, + pushMessage.message); + BroadcasterCache cache = r.getBroadcaster() + .getBroadcasterConfig().getBroadcasterCache(); + cache.addToCache(broadcasterId, uuid, + new BroadcastMessage(originalMessage)); + } + } + return new BroadcastAction(message); + } + + @Override + public BroadcastAction filter(String broadcasterId, Object originalMessage, + Object message) { + return new BroadcastAction(message); + } + +} diff --git a/flow-server/src/main/java/com/vaadin/flow/server/communication/PushRequestHandler.java b/flow-server/src/main/java/com/vaadin/flow/server/communication/PushRequestHandler.java index 7da20dc5f1d..01cfb932f8f 100644 --- a/flow-server/src/main/java/com/vaadin/flow/server/communication/PushRequestHandler.java +++ b/flow-server/src/main/java/com/vaadin/flow/server/communication/PushRequestHandler.java @@ -30,6 +30,7 @@ import org.atmosphere.cpr.AtmosphereInterceptor; import org.atmosphere.cpr.AtmosphereRequestImpl; import org.atmosphere.cpr.AtmosphereResponseImpl; +import org.atmosphere.cpr.BroadcasterConfig; import org.atmosphere.interceptor.HeartbeatInterceptor; import org.atmosphere.util.VoidAnnotationProcessor; import org.slf4j.Logger; @@ -116,6 +117,11 @@ public PushRequestHandler(VaadinServletService service) // Map the (possibly pre-initialized) handler to the actual push // handler ((PushAtmosphereHandler) handler).setPushHandler(pushHandler); + BroadcasterConfig broadcasterConfig = handlerWrapper.broadcaster + .getBroadcasterConfig(); + broadcasterConfig.addFilter(new LongPollingCacheFilter()); + broadcasterConfig.addFilter( + new AtmospherePushConnection.PushMessageUnwrapFilter()); } } diff --git a/flow-server/src/test/java/com/vaadin/flow/server/communication/LongPollingCacheFilterTest.java b/flow-server/src/test/java/com/vaadin/flow/server/communication/LongPollingCacheFilterTest.java new file mode 100644 index 00000000000..d5d69ea28ab --- /dev/null +++ b/flow-server/src/test/java/com/vaadin/flow/server/communication/LongPollingCacheFilterTest.java @@ -0,0 +1,163 @@ +/* + * Copyright 2000-2023 Vaadin Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.vaadin.flow.server.communication; + +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.atmosphere.cpr.AtmosphereRequest; +import org.atmosphere.cpr.AtmosphereResource; +import org.atmosphere.cpr.BroadcastFilter.BroadcastAction; +import org.atmosphere.cpr.BroadcastFilter.BroadcastAction.ACTION; +import org.atmosphere.cpr.Broadcaster; +import org.atmosphere.cpr.BroadcasterCache; +import org.atmosphere.cpr.BroadcasterConfig; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +public class LongPollingCacheFilterTest { + + static final String RESOURCE_UUID = "resourceUUID"; + LongPollingCacheFilter filter = new LongPollingCacheFilter(); + + AtmospherePushConnection.PushMessage originalMessage = new AtmospherePushConnection.PushMessage( + 5, "PUSH ME"); + Object nonPushMessage = new Object(); + Object message = new Object(); + private AtmosphereResource resource; + private BroadcasterCache cache; + + @Test + public void filter_notPushMessage_continueWithCurrentMessage() { + setTransport(AtmosphereResource.TRANSPORT.LONG_POLLING); + setSeenServerSyncIdHeader(5); + BroadcastAction action = filter.filter("broadcasterId", resource, + nonPushMessage, message); + Assert.assertEquals(ACTION.CONTINUE, action.action()); + Assert.assertSame("Message should not be altered by filter", message, + action.message()); + verifyMessageIsNotCached(); + } + + @Test + public void filter_notLongPollingTransport_continueWithCurrentMessage() { + setSeenServerSyncIdHeader(5); + Stream.of(AtmosphereResource.TRANSPORT.values()) + .filter(t -> t != AtmosphereResource.TRANSPORT.LONG_POLLING) + .forEach(transport -> { + setTransport(transport); + BroadcastAction action = filter.filter("broadcasterId", + resource, originalMessage, message); + Assert.assertEquals(ACTION.CONTINUE, action.action()); + Assert.assertSame( + "Message should not be altered by filter when transport is " + + transport, + message, action.message()); + }); + verifyMessageIsNotCached(); + } + + @Test + public void filter_missingLastSeenServerSyncId_continueWithCurrentMessage() { + setTransport(AtmosphereResource.TRANSPORT.LONG_POLLING); + BroadcastAction action = filter.filter("broadcasterId", resource, + originalMessage, message); + Assert.assertEquals(ACTION.CONTINUE, action.action()); + Assert.assertSame( + "Message should not be altered by filter if server sync id header is missing", + message, action.message()); + verifyMessageIsNotCached(); + } + + @Test + public void filter_messageAlreadySeen_abort() { + setTransport(AtmosphereResource.TRANSPORT.LONG_POLLING); + setSeenServerSyncIdHeader(5, 6); + + // seen server sync id == push message server sync id + BroadcastAction action = filter.filter("broadcasterId", resource, + originalMessage, message); + Assert.assertEquals("Expecting message seen on client to be skipped", + ACTION.ABORT, action.action()); + Assert.assertSame( + "Message should not be altered by filter when aborting", + message, action.message()); + + // seen server sync id > push message server sync id + action = filter.filter("broadcasterId", resource, originalMessage, + message); + Assert.assertEquals("Expecting message seen on client to be skipped", + ACTION.ABORT, action.action()); + Assert.assertSame( + "Message should not be altered by filter when aborting", + message, action.message()); + verifyMessageIsNotCached(); + } + + @Test + public void filter_messageNotYetSeen_addToCacheAndContinue() { + setTransport(AtmosphereResource.TRANSPORT.LONG_POLLING); + setSeenServerSyncIdHeader(2); + String broadcasterId = "broadcasterId"; + BroadcastAction action = filter.filter(broadcasterId, resource, + originalMessage, message); + Assert.assertEquals("Expecting message not seen on client to be sent", + ACTION.CONTINUE, action.action()); + Assert.assertSame( + "Message should not be altered by filter when continuing", + message, action.message()); + Mockito.verify(cache).addToCache(ArgumentMatchers.eq(broadcasterId), + ArgumentMatchers.eq(RESOURCE_UUID), + ArgumentMatchers.argThat(m -> m.message() == originalMessage)); + } + + @Before + public void setUp() { + resource = Mockito.mock(AtmosphereResource.class); + AtmosphereRequest request = Mockito.mock(AtmosphereRequest.class); + Broadcaster broadcaster = Mockito.mock(Broadcaster.class); + BroadcasterConfig broadcasterConfig = Mockito + .mock(BroadcasterConfig.class); + cache = Mockito.mock(BroadcasterCache.class); + Mockito.when(broadcaster.getBroadcasterConfig()) + .thenReturn(broadcasterConfig); + Mockito.when(broadcasterConfig.getBroadcasterCache()).thenReturn(cache); + + Mockito.when(resource.getBroadcaster()).thenReturn(broadcaster); + Mockito.when(resource.getRequest()).thenReturn(request); + Mockito.when(resource.uuid()).thenReturn(RESOURCE_UUID); + } + + private void setTransport(AtmosphereResource.TRANSPORT transport) { + Mockito.when(resource.transport()).thenReturn(transport); + } + + private void setSeenServerSyncIdHeader(int id, int... ids) { + Mockito.when(resource.getRequest() + .getHeader(LongPollingCacheFilter.SEEN_SERVER_SYNC_ID)) + .thenReturn(Integer.toString(id), IntStream.of(ids) + .mapToObj(Integer::toString).toArray(String[]::new)); + + } + + private void verifyMessageIsNotCached() { + Mockito.verifyNoInteractions(cache); + } +} \ No newline at end of file