Skip to content

Commit

Permalink
Other changes excluding tests
Browse files Browse the repository at this point in the history
This commit adds remaining changes (excluding tests from #842

Changes are received as is and only some name refactorings were made. You should check the linked PR to see individual commits. Tests will be added in subsequent PRs
  • Loading branch information
ikbalkaya committed Jan 16, 2023
1 parent a3d27ce commit 4cf3bb9
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 203 deletions.
25 changes: 25 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,31 @@ public void onConnectionStateChanged(ConnectionStateListener.ConnectionStateChan
}
});

if (options.recover != null) {
ConnectionRecoveryKey recoveryKey = ConnectionRecoveryKey.fromJson(options.recover);
if (recoveryKey == null) {
Log.d(TAG, "Recovery key initialization failed!");
connection.connectionManager.msgSerial = 0; //RTN16f
} else {
connection.connectionManager.msgSerial = recoveryKey.getMsgSerial(); //RTN16f

for (Map.Entry<String, String> serial : recoveryKey.getSerials().entrySet()) {
//RTN16j
Channel channel = channels.get(serial.getKey());
if (channel != null) {
channel.properties.channelSerial = serial.getValue(); //RTN16i
}
}
}

connection.on(ConnectionEvent.connected, new ConnectionStateListener() {
@Override
public void onConnectionStateChanged(ConnectionStateChange state) {
options.recover = null; //RTN16k
}
});
}

if(options.autoConnect) connection.connect();
}

Expand Down
26 changes: 23 additions & 3 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
/* send attach request and pending state */
Log.v(TAG, "attach(); channel = " + name + "; sending ATTACH request");
ProtocolMessage attachMessage = new ProtocolMessage(Action.attach, this.name);
attachMessage.channelSerial = this.properties.channelSerial;

if(this.options != null) {
if(this.options.hasParams()) {
attachMessage.params = CollectionUtils.copy(this.options.params);
Expand All @@ -224,7 +226,7 @@ private void attachImpl(final boolean forceReattach, final CompletionListener li
}
}
if(this.decodeFailureRecoveryInProgress) {
attachMessage.channelSerial = this.lastPayloadProtocolMessageChannelSerial;
Log.v(TAG, "attach(); message decode recovery in progress.");
}
try {
if (listener != null) {
Expand Down Expand Up @@ -360,13 +362,23 @@ private void setAttached(ProtocolMessage message) {
properties.attachSerial = message.channelSerial;
params = message.params;
modes = ChannelMode.toSet(message.flags);

if (message.channelSerial != null) {
properties.channelSerial = message.channelSerial; //RTL4c1
}

if(state == ChannelState.attached) {
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
/* emit UPDATE event according to RTL12 */
emitUpdate(null, resumed);
} else if (state == ChannelState.detaching || state == ChannelState.detached) {
//RTL5k
Log.v(TAG, "setAttached(): channel is in detaching state so no need to attach it!");
setDetached((message.error != null) ? message.error : REASON_NOT_ATTACHED);
} else {
this.attachResume = true;
setState(ChannelState.attached, message.error, resumed);
presence.reEnter(); //RTP17f
sendQueuedMessages();
presence.setAttached(message.hasFlag(Flag.has_presence));
}
Expand Down Expand Up @@ -816,7 +828,10 @@ private void onMessage(final ProtocolMessage protocolMessage) {
}

lastPayloadMessageId = lastMessage.id;
lastPayloadProtocolMessageChannelSerial = protocolMessage.channelSerial;

if (protocolMessage.channelSerial != null) {
properties.channelSerial = protocolMessage.channelSerial; //RTL15b
}

for (final Message msg : messages) {
this.listeners.onMessage(msg);
Expand Down Expand Up @@ -857,6 +872,11 @@ private void onPresence(ProtocolMessage message, String syncChannelSerial) {
if(msg.timestamp == 0) msg.timestamp = message.timestamp;
if(msg.id == null) msg.id = message.id + ':' + i;
}

if (message.channelSerial != null) {
properties.channelSerial = message.channelSerial; //RTL15b
}

presence.setPresence(messages, true, syncChannelSerial);
}

Expand Down Expand Up @@ -1013,6 +1033,7 @@ public synchronized void publish(Message[] messages, CompletionListener listener
switch(state) {
case failed:
case suspended:
properties.channelSerial = null; //RTP5a1
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to publish in failed or suspended state", 400, 40000));
default:
connectionManager.send(msg, queueMessages, listener);
Expand Down Expand Up @@ -1377,7 +1398,6 @@ public void once(ChannelState state, ChannelStateListener listener) {
*/
private Set<ChannelMode> modes;
private String lastPayloadMessageId;
private String lastPayloadProtocolMessageChannelSerial;
private boolean decodeFailureRecoveryInProgress;
private final DecodingContext decodingContext;
}
42 changes: 30 additions & 12 deletions lib/src/main/java/io/ably/lib/realtime/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,6 @@ public class Connection extends EventEmitter<ConnectionEvent, ConnectionStateLis
*/
public String id;

/**
* The serial number of the last message to be received on this connection,
* used automatically by the library when recovering or resuming a connection.
* When recovering a connection explicitly, the recoveryKey is used in the recover
* client options as it contains both the key and the last message serial.
* <p>
* Spec: RTN10
*/
public long serial;

/**
* Explicitly calling connect() is unnecessary unless the autoConnect attribute of the {@link io.ably.lib.types.ClientOptions}
* object is false.
Expand Down Expand Up @@ -101,7 +91,6 @@ public void ping(CompletionListener listener) {
*/
public void close() {
key = null;
recoveryKey = null;
connectionManager.close();
}

Expand All @@ -124,7 +113,7 @@ public void onConnectionStateChange(ConnectionStateChange stateChange) {
@Override
protected void apply(ConnectionStateListener listener, ConnectionEvent event, Object... args) {
try {
listener.onConnectionStateChanged((ConnectionStateChange)args[0]);
listener.onConnectionStateChanged((ConnectionStateChange) args[0]);
} catch (Throwable t) {
Log.e(TAG, "Unexpected exception calling ConnectionStateListener", t);
}
Expand All @@ -135,6 +124,35 @@ public void emitUpdate(ErrorInfo errorInfo) {
emit(ConnectionEvent.update, ConnectionStateListener.ConnectionStateChange.createUpdateEvent(errorInfo));
}

/**
* Spec: RTN16g
*
* @return a json string which incorporates the @connectionKey@, the current @msgSerial@,
* and a collection of pairs of channel @name@ and current @channelSerial@ for every currently attached channel.
*/
public String createRecoveryKey() {
if (key == null || connectionManager == null || connectionManager.getConnectionState() == null ||
connectionManager.getConnectionState().state == ConnectionState.closed ||
connectionManager.getConnectionState().state == ConnectionState.closing ||
connectionManager.getConnectionState().state == ConnectionState.failed ||
connectionManager.getConnectionState().state == ConnectionState.suspended
) {
//RTN16h
return null;
}

ConnectionRecoveryKey recoveryKey = new ConnectionRecoveryKey(key, connectionManager.msgSerial);

for (Object channel : ably.channels.values()) {
if (channel instanceof Channel) {
Channel rcb = (Channel) channel;
recoveryKey.addSerial(rcb.name, rcb.properties.channelSerial);
}
}

return recoveryKey.asJson();
}

@Deprecated
public void emit(ConnectionState state, ConnectionStateChange stateChange) {
super.emit(state.getConnectionEvent(), stateChange);
Expand Down
61 changes: 61 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/ConnectionRecoveryKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.ably.lib.realtime;

import com.google.gson.JsonSyntaxException;
import java.util.HashMap;
import java.util.Map;

import io.ably.lib.util.Log;
import io.ably.lib.util.Serialisation;

public class ConnectionRecoveryKey {
private static final String TAG = "RecoveryKey";

private final String connectionKey;
private final long msgSerial;
/**
* Key - channel name
* <p>
* Value - channelSerial
*/
private final Map<String, String> serials = new HashMap<>();

public ConnectionRecoveryKey(String connectionKey, long msgSerial) {
this.connectionKey = connectionKey;
this.msgSerial = msgSerial;
}

public String getConnectionKey() {
return connectionKey;
}

public long getMsgSerial() {
return msgSerial;
}

public Map<String, String> getSerials() {
return serials;
}

public void setSerials(Map<String, String> serials) {
this.serials.clear();
this.serials.putAll(serials);
}

public void addSerial(String channelName, String channelSerial) {
this.serials.put(channelName, channelSerial);
}

public String asJson() {
return Serialisation.gson.toJson(this);
}

public static ConnectionRecoveryKey fromJson(String json) {
try {
return Serialisation.gson.fromJson(json, ConnectionRecoveryKey.class);
} catch (JsonSyntaxException e) {
Log.e(TAG, "Cannot create recovery key from json: " + e.getMessage());
return null;
}
}

}
Loading

0 comments on commit 4cf3bb9

Please sign in to comment.