Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol 2.0 #842

Closed
wants to merge 70 commits into from
Closed
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
bd1993d
Remove serial implementation from Connection and ConnectionManager
Sep 21, 2022
131fd6e
Remove connectionSerial implementation from ProtocolMessage and Conne…
Sep 21, 2022
7d21386
Set Ably API version from 1.2 to 2.0
Sep 22, 2022
49bfde9
Remove recovery key direct access and create getter as spec RTN16g
Sep 22, 2022
d347a2c
Implement channelSerial in onMessage and onPresence by spec RTL25
Sep 23, 2022
8ad9623
Implement channelSerial clearing on detached, suspended or failed by …
Sep 23, 2022
63d7f04
Remove lastPayloadProtocolMessageChannelSerial as it's implementation…
Sep 23, 2022
8d9d772
Implement reattaching channels by spec RTN15c6
Sep 26, 2022
46b2567
Fix reattaching channels by spec RTN15c6
Sep 26, 2022
e994379
Set channel state to attaching on reattach
Sep 26, 2022
3f102b9
Implement reattaching channels by spec RTN15c7
Sep 26, 2022
014fc38
Implement recover querystring to websocket by spec RTN16k
Sep 27, 2022
7ae2fe7
Implement client recovery by spec RTN16i, RTN16f and RTN16j
Sep 27, 2022
c6b2c64
Fix client recovery attachment by spec RTN16i
Sep 28, 2022
8a4e8e7
Remove unused imports
Oct 3, 2022
6d27773
Add spec comments for implemented features in channel recovery
Oct 3, 2022
761a572
Add spec comments for implemented features in ConnectionManager
Oct 3, 2022
52eee30
Fix duplicate request state on onConnected
Oct 4, 2022
c2d5fab
Add spec comments for getRecoveryKey
Oct 4, 2022
4773476
Fix recovery implementation by spec RTN16l
Oct 4, 2022
bdcc5e0
Fix recovery implementation by spec RTL15b
Oct 5, 2022
ed019ed
Update memberKey by spec RTP17h
Oct 5, 2022
7297b00
Update specification comment documentation
Oct 5, 2022
0e705a5
Add re-entry of presence members by spec RTP17g and RTP17f
Oct 6, 2022
271794b
Fix test missing deviceSecret by API protocol 2.0
Oct 7, 2022
f3d0dd7
Remove old implementation spec RTN15c3
Oct 10, 2022
1658a2f
Fix recovery option test to implement new spec
Oct 10, 2022
486ff01
Fix teardown after test class is finished on null objects
Oct 11, 2022
f62ebc7
Improve channels reattach test with closing Ably instance
Oct 11, 2022
bfd621d
Add device secret key required by protocol 2.0
Oct 12, 2022
429ff78
Fix deviceSecret missing in DeviceDetails per new API requirement
Oct 17, 2022
2dba619
Fix connection history test on reattach
Oct 17, 2022
fe8ed2c
Fix re-attaching channel with force reattach
Oct 18, 2022
ce44c37
Improve test for channel resuming
Oct 18, 2022
602a223
Fix re-auth history check state
Oct 19, 2022
73ced2d
Remove connectionSerial as it is not used any more
Oct 20, 2022
4221b80
Remove unnecessary connected check and add doc comment in onConnected…
Oct 21, 2022
0287b17
Fix spec RTP17f to not reEnter if old state was already attached
Oct 21, 2022
9376960
Refactor sending pending messages when client is connected per spec R…
Oct 21, 2022
201f388
Refactor sending pending messages when client is connected and remove…
Oct 21, 2022
86625b8
Refactor presence map manipulation by spec RTP17h
Oct 21, 2022
fc543d0
Ignore bulk publish tests as they are not supported in protocol 2.0
Oct 21, 2022
b4eaadd
Remove public modifier from PresenceMap
Oct 21, 2022
e91d770
Improve re-enter by spec RTP17f
Oct 25, 2022
a6eeb00
Improve reattach of the channels on every connected message
Oct 25, 2022
4591971
Improve re-enter on attached message by spec RTP17f
Oct 25, 2022
5fae005
Fix removing sending pending messages by spec RTN19a
Oct 25, 2022
d1e262d
Fix test for attached channel
Oct 25, 2022
4dd11ee
Fix test for presence enter
Oct 26, 2022
52e06b3
Add spec RTL5k to fix channel attaching on detaching or detached state
Oct 27, 2022
90dd275
Rename getRecoveryKey->createRecoveryKey per updated spec
SimonWoolf Oct 31, 2022
a6fd26d
Add spec RTL5k to fix channel attaching on detaching or detached state
Nov 1, 2022
1b5781a
Merge remote-tracking branch 'origin/integration/protocol-2.0' into i…
Nov 1, 2022
0fa91f7
Refactor internal map by spec RTP17h
Nov 1, 2022
52884b5
Rename getRecoveryKey->createRecoveryKey per updated spec
SimonWoolf Oct 31, 2022
28f438a
Rename entry of map and serials iterator
Nov 2, 2022
dc5ed8c
Merge remote-tracking branch 'origin/integration/protocol-2.0' into i…
Nov 4, 2022
18415fc
Add encapsulation for ConnectionRecoveryKey
Nov 4, 2022
ee0dcd7
Remove unused import
Nov 4, 2022
5b99907
Add error logs instead of printing stack trace
Nov 4, 2022
d022c3d
Emit update on re-attach error
Nov 7, 2022
d7ed41a
Add error info to test fail log
Nov 7, 2022
f0e09fb
Remove old implementation of re-enter
Nov 7, 2022
865bca8
Emit update on send pending message error
Nov 7, 2022
e079abb
Update core/src/main/java/io/ably/lib/realtime/ConnectionRecoveryKey.…
qsdigor Nov 8, 2022
bd5a540
Update core/src/main/java/io/ably/lib/transport/ConnectionManager.java
qsdigor Nov 8, 2022
437bb08
Fix wrong method name in ConnectionRecoveryKey
Nov 8, 2022
28531da
Fix spec RTP17g
Nov 11, 2022
7752068
Merge branch 'integration/version-2' into integration/protocol-2.0
Nov 14, 2022
49f8d6a
Add missing javadoc
Nov 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import io.ably.lib.types.ClientOptions;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.Param;
import io.ably.lib.types.RegistrationToken;
import io.ably.lib.util.Base64Coder;
import io.ably.lib.util.IntentUtils;
import io.ably.lib.util.JsonUtils;
Expand Down Expand Up @@ -1367,11 +1366,12 @@ public void run() throws Exception {
}

testActivation.registerAndWait();
DeviceDetails otherDevice = DeviceDetails.fromJsonObject(JsonUtils.object()
LocalDevice otherDevice = LocalDevice.fromJsonObject(JsonUtils.object()
.add("id", "other")
.add("platform", "android")
.add("formFactor", "tablet")
.add("metadata", JsonUtils.object())
.add("deviceSecret", "testdevicesecret==") //Required by API protocol 2.0
.add("push", JsonUtils.object()
.add("recipient", JsonUtils.object()
.add("transportType", "fcm")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.ably.lib.types.Callback;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.Param;
import io.ably.lib.types.RegistrationToken;
import io.ably.lib.util.IntentUtils;
import io.ably.lib.util.Log;
import io.ably.lib.util.ParamsUtils;
Expand Down
12 changes: 6 additions & 6 deletions android/src/main/java/io/ably/lib/push/LocalDevice.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
import io.ably.lib.types.RegistrationToken;
import io.ably.lib.util.Base64Coder;
import io.ably.lib.util.Log;
import io.ably.lib.util.Serialisation;

public class LocalDevice extends DeviceDetails {
public String deviceSecret;

public String deviceIdentityToken;
private final Storage storage;

Expand All @@ -35,12 +36,11 @@ public LocalDevice(ActivationContext activationContext, Storage storage) {
}

public JsonObject toJsonObject() {
JsonObject o = super.toJsonObject();
if (deviceSecret != null) {
o.addProperty("deviceSecret", deviceSecret);
}
return super.toJsonObject();
}

return o;
public static LocalDevice fromJsonObject(JsonObject o) {
return Serialisation.gson.fromJson(o, LocalDevice.class);
}

private void loadPersisted() {
Expand Down
42 changes: 41 additions & 1 deletion core/src/main/java/io/ably/lib/realtime/AblyRealtimeBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public AblyRealtimeBase(String key, PlatformAgentProvider platformAgentProvider)
* @param platformAgentProvider for providing the platform specific part of the agent header
* @throws AblyException
*/
public AblyRealtimeBase(ClientOptions options, PlatformAgentProvider platformAgentProvider) throws AblyException {
public AblyRealtimeBase(final ClientOptions options, PlatformAgentProvider platformAgentProvider) throws AblyException {
super(options, platformAgentProvider);
final InternalChannels channels = new InternalChannels();
this.channels = (Channels<ChannelType>) channels;
Expand All @@ -72,6 +72,31 @@ public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChan
}
});

if (options.recover != null) {
ConnectionRecoveryKey recoveryKey = ConnectionRecoveryKey.fromJson(options.recover);
if (recoveryKey == null) {
Log.d(TAG, "Recovery key initialization failed!");
connection.connectionManager.msgSerial = 0; //RTN16f
} else {
connection.connectionManager.msgSerial = recoveryKey.msgSerial; //RTN16f

for (Map.Entry<String, String> entry : recoveryKey.serials.entrySet()) {
//RTN16j
RealtimeChannelBase channel = channels.get(entry.getKey());
if (channel != null) {
channel.properties.channelSerial = entry.getValue(); //RTN16i
}
}
}

connection.on(ConnectionEvent.connected, new ConnectionStateListener() {
@Override
public void onConnectionStateChanged(ConnectionStateChange state) {
options.recover = null; //RTN16k
}
});
}

if(options.autoConnect) connection.connect();
}

Expand Down Expand Up @@ -183,6 +208,21 @@ public void suspendAll(ErrorInfo error, boolean notifyStateChange) {
}
}

/**
* By spec RTN15c6, RTN15c7
*/
@Override
public void reAttach() {
for (Map.Entry<String, RealtimeChannelBase> entry : map.entrySet()) {
RealtimeChannelBase channel = entry.getValue();
if (channel.state == ChannelState.attaching || channel.state == ChannelState.attached || channel.state == ChannelState.suspended) {
Log.d(TAG, "reAttach(); channel = " + channel.name);
channel.state = ChannelState.attaching;
channel.attach(true, null);
}
}
}

private void clear() {
map.clear();
}
Expand Down
46 changes: 33 additions & 13 deletions core/src/main/java/io/ably/lib/realtime/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,12 @@ public class Connection extends EventEmitter<ConnectionEvent, ConnectionStateLis
*/
public String key;

/**
* RTN16b) Connection#recoveryKey is an attribute composed of the connection key and latest
* serial received on the connection
*/
public String recoveryKey;

/**
* A public identifier for this connection, used to identify
* this member in presence events and message ids.
*/
public String id;

/**
* The serial number of the last message to be received on this connection.
*/
public long serial;

/**
* Causes the library to re-attempt connection, if it was previously explicitly
* closed by the user, or was closed as a result of an unrecoverable error.
Expand All @@ -56,6 +45,7 @@ public void connect() {

/**
* Send a heartbeat message to the Ably service and await a response.
*
* @param listener a listener to be notified of the outcome of this message.
*/
public void ping(CompletionListener listener) {
Expand All @@ -69,7 +59,6 @@ public void ping(CompletionListener listener) {
*/
public void close() {
key = null;
recoveryKey = null;
connectionManager.close();
}

Expand All @@ -92,7 +81,7 @@ public void onConnectionStateChange(ConnectionStateChange stateChange) {
@Override
protected void apply(ConnectionStateListener listener, ConnectionEvent event, Object... args) {
try {
listener.onConnectionStateChanged((ConnectionStateChange)args[0]);
listener.onConnectionStateChanged((ConnectionStateChange) args[0]);
} catch (Throwable t) {
Log.e(TAG, "Unexpected exception calling ConnectionStateListener", t);
}
Expand All @@ -103,6 +92,37 @@ public void emitUpdate(ErrorInfo errorInfo) {
emit(ConnectionEvent.update, ConnectionStateListener.ConnectionStateChange.createUpdateEvent(errorInfo));
}

/**
* Spec: RTN16g
*
* @return a json string which incorporates the @connectionKey@, the current @msgSerial@,
* and a collection of pairs of channel @name@ and current @channelSerial@ for every currently attached channel.
*/
public String getRecoveryKey() {
if (key == null || connectionManager == null || connectionManager.getConnectionState() == null ||
connectionManager.getConnectionState().state == ConnectionState.closed ||
connectionManager.getConnectionState().state == ConnectionState.closing ||
connectionManager.getConnectionState().state == ConnectionState.failed ||
connectionManager.getConnectionState().state == ConnectionState.suspended
) {
//RTN16h
return null;
}

ConnectionRecoveryKey recoveryKey = new ConnectionRecoveryKey();
recoveryKey.connectionKey = key;
recoveryKey.msgSerial = connectionManager.msgSerial;

for (Object channel : ably.channels.values()) {
if (channel instanceof RealtimeChannelBase) {
RealtimeChannelBase rcb = (RealtimeChannelBase) channel;
recoveryKey.serials.put(rcb.name, rcb.properties.channelSerial);
}
}

return recoveryKey.asJson();
}

@Deprecated
public void emit(ConnectionState state, ConnectionStateChange stateChange) {
super.emit(state.getConnectionEvent(), stateChange);
Expand Down
34 changes: 34 additions & 0 deletions core/src/main/java/io/ably/lib/realtime/ConnectionRecoveryKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.ably.lib.realtime;

import com.google.gson.JsonSyntaxException;

import java.util.HashMap;
import java.util.Map;

import io.ably.lib.util.Serialisation;

public class ConnectionRecoveryKey {

public String connectionKey;
public long msgSerial;
/**
* Key - channel name
* <p>
* Value - channelSerial
*/
public Map<String, String> serials = new HashMap<>();

public String asJson() {
return Serialisation.gson.toJson(this);
}

public static ConnectionRecoveryKey fromJson(String json) {
try {
return Serialisation.gson.fromJson(json, ConnectionRecoveryKey.class);
} catch (JsonSyntaxException e) {
e.printStackTrace();
return null;
}
}

}
28 changes: 26 additions & 2 deletions core/src/main/java/io/ably/lib/realtime/Presence.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ public void onError(ErrorInfo reason) {
}
}

/**
* Spec: RTP17f, RTP17g
*/
public void reEnter(ChannelState newState) {
if (newState == ChannelState.attached && channel.state == ChannelState.attached) {
internalPresence.reEnter();
}
}

void setPresence(PresenceMessage[] messages, boolean broadcast, String syncChannelSerial) {
Log.v(TAG, "setPresence(); channel = " + channel.name + "; broadcast = " + broadcast + "; syncChannelSerial = " + syncChannelSerial);
String syncCursor = null;
Expand Down Expand Up @@ -872,7 +881,7 @@ synchronized Collection<PresenceMessage> get(Param[] params) throws AblyExceptio
* false if the message is already superseded
*/
synchronized boolean put(PresenceMessage item) {
String key = item.memberKey();
String key = item.clientId; //RTP17h
/* we've seen this member, so do not remove it at the end of sync */
if(residualMembers != null)
residualMembers.remove(key);
Expand Down Expand Up @@ -967,7 +976,7 @@ synchronized Collection<PresenceMessage> values(boolean wait) throws AblyExcepti
* @return
*/
synchronized boolean remove(PresenceMessage item) {
String key = item.memberKey();
String key = item.clientId; //RTP17h
if (hasNewerItem(key, item))
return false;
PresenceMessage existingItem = members.remove(key);
Expand Down Expand Up @@ -1031,6 +1040,21 @@ synchronized void clear() {
residualMembers.clear();
}

/**
* Spec: RTP17g
*/
synchronized void reEnter() {
for (Map.Entry<String, PresenceMessage> entry: members.entrySet()) {
PresenceMessage member = entry.getValue();
member.action = PresenceMessage.Action.enter;
try {
updatePresence(member, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Could you elaborate on why you replaced
PresenceMessage itemToSend = new PresenceMessage();
itemToSend.clientId = item.clientId;
itemToSend.data = item.data;
itemToSend.action = PresenceMessage.Action.enter;

with

PresenceMessage member = entry.getValue();
member.action = PresenceMessage.Action.enter;

It's not obvious that that change is correct (won't it have a possibly-stale connectionId?)

  1. Why did you remove the completionListener that was previously being passed to updatePresence? Is there some magic that will turn an async failure into an exception if a completionListener isn't passed that I'm missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I misinterpreted RTP17g.
Is this a good implementation 28531da?

} catch (AblyException e) {
e.printStackTrace();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use proper logging for this, instead of e.printStackTrace(). Currently, java-sdk is being used on different platforms, so it won't really work there as expected. e.g. android

Copy link
Collaborator

@sacOO7 sacOO7 Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true for all e.printStackTrace(); statements in the code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check with @SimonWoolf on this, how to log errors. I think most likely errors are supposed to be passed back to the user who is calling the code so they can be handled gracefully, otherwise logged with proper logging level.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RTP17e: If the publish attempt fails for an automatic presence @ENTER@ (for example, by Ably rejecting it with a @NACK@), an @UPDATE@ event should be emitted on the channel with @resumed@ set to true and @reason@ set to an @ErrorInfo@ object with @code@ @91004@, a @message@ indicating that an automatic re-enter has failed and indicating the @clientId@, and @cause@ set to the reason for the enter failure. The error should also be logged at @warn@ level or higher.

As usual you don't need to reinvent the wheel here, look at what the library already does in equivalent situations. Look a little further up this same file to endSyncAndEmitLeaves():

updatePresence(itemToSend, new CompletionListener() {
    @Override
    public void onSuccess() {
    }

    @Override
    public void onError(ErrorInfo reason) {
            /*
             * (RTP5c3)  If any of the automatic ENTER presence messages published
             * in RTP5c2 fail, then an UPDATE event should be emitted on the channel
             * with resumed set to true and reason set to an ErrorInfo object with error
             * code value 91004 and the error message string containing the message
             * received from Ably (if applicable), the code received from Ably
             * (if applicable) and the explicit or implicit client_id of the PresenceMessage
             */
        String errorString = String.format(Locale.ROOT, "Cannot automatically re-enter %s on channel %s (%s)",
                clientId, channel.name, reason.message);
        Log.e(TAG, errorString);
        channel.emitUpdate(new ErrorInfo(errorString, 91004), true);
    }
});

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speaking of endSyncAndEmitLeaves, it... is still there, still implements the old auto-re-entry logic (RTP17c)? The conditions on when to auto-re-enter changed, but instead of changing the existing implementation you've just added the new one right next to the old, so now it's doing both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sacOO7 Logs are fixed here 5b99907

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I will take a look !

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sacOO7 I think this is it d022c3d

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SimonWoolf I have removed old implementation of re-enter as I already implemented the new logic in separate function reEnter() by spec RTP17g.
You can check it here f0e09fb

}
}
}

private boolean syncInProgress;
private Collection<String> residualMembers;
private final HashMap<String, PresenceMessage> members = new HashMap<String, PresenceMessage>();
Expand Down
31 changes: 25 additions & 6 deletions core/src/main/java/io/ably/lib/realtime/RealtimeChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,21 @@ private void setState(ChannelState newState, ErrorInfo reason, boolean resumed)
private void setState(ChannelState newState, ErrorInfo reason, boolean resumed, boolean notifyStateChange) {
Log.v(TAG, "setState(): channel = " + name + "; setting " + newState);
ChannelStateListener.ChannelStateChange stateChange;
synchronized(this) {
synchronized (this) {
stateChange = new ChannelStateListener.ChannelStateChange(newState, this.state, reason, resumed);
this.state = stateChange.current;
this.reason = stateChange.reason;
}

if(notifyStateChange) {
if (newState == ChannelState.detached || newState == ChannelState.suspended || newState == ChannelState.failed)
properties.channelSerial = null; //RTP5a1

if (newState == ChannelState.attached && state == ChannelState.attached) {
//RTP17f
presence.reEnter(newState);
}

if (notifyStateChange) {
/* broadcast state change */
emit(newState, stateChange);
}
Expand Down Expand Up @@ -137,7 +145,7 @@ public void attach(CompletionListener listener) throws AblyException {
this.attach(false, listener);
}

private void attach(boolean forceReattach, CompletionListener listener) {
void attach(boolean forceReattach, CompletionListener listener) {
clearAttachTimers();
attachWithTimeout(forceReattach, listener);
}
Expand Down Expand Up @@ -168,6 +176,7 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
/* send attach request and pending state */
Log.v(TAG, "attach(); channel = " + name + "; sending ATTACH request");
ProtocolMessage attachMessage = new ProtocolMessage(Action.attach, this.name);
attachMessage.channelSerial = this.properties.channelSerial;
if(this.options != null) {
if(this.options.hasParams()) {
attachMessage.params = CollectionUtils.copy(this.options.params);
Expand All @@ -177,7 +186,7 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
}
}
if(this.decodeFailureRecoveryInProgress) {
attachMessage.channelSerial = this.lastPayloadProtocolMessageChannelSerial;
Log.v(TAG, "attach(); message decode recovery in progress.");
}
try {
if (listener != null) {
Expand Down Expand Up @@ -301,6 +310,10 @@ private void setAttached(ProtocolMessage message) {
properties.attachSerial = message.channelSerial;
params = message.params;
modes = ChannelMode.toSet(message.flags);
if (message.channelSerial != null) {
properties.channelSerial = message.channelSerial; //RTL4c1
}

if(state == ChannelState.attached) {
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
/* emit UPDATE event according to RTL12 */
Expand Down Expand Up @@ -727,7 +740,10 @@ private void onMessage(final ProtocolMessage protocolMessage) {
}

lastPayloadMessageId = lastMessage.id;
lastPayloadProtocolMessageChannelSerial = protocolMessage.channelSerial;

if (protocolMessage.channelSerial != null) {
properties.channelSerial = protocolMessage.channelSerial; //RTL15b
}

for (final Message msg : messages) {
this.listeners.onMessage(msg);
Expand Down Expand Up @@ -768,6 +784,9 @@ private void onPresence(ProtocolMessage message, String syncChannelSerial) {
if(msg.timestamp == 0) msg.timestamp = message.timestamp;
if(msg.id == null) msg.id = message.id + ':' + i;
}
if (message.channelSerial != null) {
properties.channelSerial = message.channelSerial; //RTL15b
}
presence.setPresence(messages, true, syncChannelSerial);
}

Expand Down Expand Up @@ -902,6 +921,7 @@ public synchronized void publish(Message[] messages, CompletionListener listener
switch(state) {
case failed:
case suspended:
properties.channelSerial = null; //RTP5a1
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to publish in failed or suspended state", 400, 40000));
default:
connectionManager.send(msg, queueMessages, listener);
Expand Down Expand Up @@ -1199,7 +1219,6 @@ public void once(ChannelState state, ChannelStateListener listener) {
private Map<String, String> params;
private Set<ChannelMode> modes;
private String lastPayloadMessageId;
private String lastPayloadProtocolMessageChannelSerial;
private boolean decodeFailureRecoveryInProgress;
private final DecodingContext decodingContext;
}
Loading