Skip to content

Commit

Permalink
Revert "Make Session thread safe. Fixes #329. (#332)" (#335)
Browse files Browse the repository at this point in the history
This reverts commit 151dfc6.
  • Loading branch information
om26er authored Oct 23, 2017
1 parent 151dfc6 commit 014d083
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 54 deletions.
105 changes: 56 additions & 49 deletions autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -95,17 +93,17 @@ public class Session implements ISession, ITransportHandler {
private final int STATE_GOODBYE_SENT = 6;
private final int STATE_ABORT_SENT = 7;

private volatile ITransport mTransport;
private volatile ISerializer mSerializer;
private ITransport mTransport;
private ISerializer mSerializer;
private Executor mExecutor;
private CompletableFuture<SessionDetails> mJoinFuture;

private final ConcurrentLinkedQueue<OnJoinListener> mOnJoinListeners;
private final ConcurrentLinkedQueue<OnReadyListener> mOnReadyListeners;
private final ConcurrentLinkedQueue<OnLeaveListener> mOnLeaveListeners;
private final ConcurrentLinkedQueue<OnConnectListener> mOnConnectListeners;
private final ConcurrentLinkedQueue<OnDisconnectListener> mOnDisconnectListeners;
private final ConcurrentLinkedQueue<OnUserErrorListener> mOnUserErrorListeners;
private final ArrayList<OnJoinListener> mOnJoinListeners;
private final ArrayList<OnReadyListener> mOnReadyListeners;
private final ArrayList<OnLeaveListener> mOnLeaveListeners;
private final ArrayList<OnConnectListener> mOnConnectListeners;
private final ArrayList<OnDisconnectListener> mOnDisconnectListeners;
private final ArrayList<OnUserErrorListener> mOnUserErrorListeners;
private final IDGenerator mIDGenerator;
private final Map<Long, CallRequest> mCallRequests;
private final Map<Long, SubscribeRequest> mSubscribeRequests;
Expand All @@ -114,23 +112,23 @@ public class Session implements ISession, ITransportHandler {
private final Map<Long, List<Subscription>> mSubscriptions;
private final Map<Long, Registration> mRegistrations;

private volatile int mState = STATE_DISCONNECTED;
private volatile long mSessionID;
private int mState = STATE_DISCONNECTED;
private long mSessionID;
private boolean mGoodbyeSent;
private String mRealm;

public Session() {
mOnJoinListeners = new ConcurrentLinkedQueue<>();
mOnReadyListeners = new ConcurrentLinkedQueue<>();
mOnLeaveListeners = new ConcurrentLinkedQueue<>();
mOnConnectListeners = new ConcurrentLinkedQueue<>();
mOnDisconnectListeners = new ConcurrentLinkedQueue<>();
mOnUserErrorListeners = new ConcurrentLinkedQueue<>();
mOnJoinListeners = new ArrayList<>();
mOnReadyListeners = new ArrayList<>();
mOnLeaveListeners = new ArrayList<>();
mOnConnectListeners = new ArrayList<>();
mOnDisconnectListeners = new ArrayList<>();
mOnUserErrorListeners = new ArrayList<>();
mIDGenerator = new IDGenerator();
mCallRequests = new ConcurrentHashMap<>();
mSubscribeRequests = new ConcurrentHashMap<>();
mPublishRequests = new ConcurrentHashMap<>();
mRegisterRequest = new ConcurrentHashMap<>();
mCallRequests = new HashMap<>();
mSubscribeRequests = new HashMap<>();
mPublishRequests = new HashMap<>();
mRegisterRequest = new HashMap<>();
mSubscriptions = new HashMap<>();
mRegistrations = new HashMap<>();
}
Expand Down Expand Up @@ -253,12 +251,13 @@ private void onPreSessionMessage(IMessage message) throws Exception {
private void onMessage(IMessage message) throws Exception {
if (message instanceof Result) {
Result msg = (Result) message;
CallRequest request = mCallRequests.remove(msg.request);
CallRequest request = getOrDefault(mCallRequests, msg.request, null);
if (request == null) {
throw new ProtocolError(String.format(
"RESULT received for non-pending request ID %s", msg.request));
}

mCallRequests.remove(msg.request);
if (request.resultTypeRef != null) {
// FIXME: check args length > 1 and == 0, and kwargs != null
// we cannot currently POJO automap these cases!
Expand All @@ -272,24 +271,25 @@ private void onMessage(IMessage message) throws Exception {
}
} else if (message instanceof Subscribed) {
Subscribed msg = (Subscribed) message;
SubscribeRequest request = mSubscribeRequests.remove(msg.request);
SubscribeRequest request = getOrDefault(
mSubscribeRequests, msg.request, null);
if (request == null) {
throw new ProtocolError(String.format(
"SUBSCRIBED received for non-pending request ID %s", msg.request));
}

List<Subscription> list = mSubscriptions.get(msg.subscription);
if (list == null) {
list = new ArrayList<>();
mSubscriptions.put(msg.subscription, list);
mSubscribeRequests.remove(msg.request);
if (!mSubscriptions.containsKey(msg.subscription)) {
mSubscriptions.put(msg.subscription, new ArrayList<>());
}
Subscription subscription = new Subscription(msg.subscription, request.topic,
request.resultTypeRef, request.resultTypeClass, request.handler);
list.add(subscription);
mSubscriptions.get(msg.subscription).add(subscription);
request.onReply.complete(subscription);
} else if (message instanceof Event) {
Event msg = (Event) message;
List<Subscription> subscriptions = mSubscriptions.get(msg.subscription);
List<Subscription> subscriptions = getOrDefault(
mSubscriptions, msg.subscription, null);
if (subscriptions == null) {
throw new ProtocolError(String.format(
"EVENT received for non-subscribed subscription ID %s",
Expand Down Expand Up @@ -345,30 +345,35 @@ private void onMessage(IMessage message) throws Exception {
combineFutures(futures);
} else if (message instanceof Published) {
Published msg = (Published) message;
PublishRequest request = mPublishRequests.remove(msg.request);
PublishRequest request = getOrDefault(
mPublishRequests, msg.request, null);
if (request == null) {
throw new ProtocolError(String.format(
"PUBLISHED received for non-pending request ID %s", msg.request));
}

mPublishRequests.remove(msg.request);
Publication publication = new Publication(msg.publication);
request.onReply.complete(publication);
} else if (message instanceof Registered) {
Registered msg = (Registered) message;
RegisterRequest request = mRegisterRequest.remove(msg.request);
RegisterRequest request = getOrDefault(
mRegisterRequest, msg.request, null);

if (request == null) {
throw new ProtocolError(String.format(
"REGISTERED received for already existing registration ID %s",
msg.request));
}

mRegisterRequest.remove(msg.request);
Registration registration = new Registration(
msg.registration, request.procedure, request.endpoint);
mRegistrations.put(msg.registration, registration);
request.onReply.complete(registration);
} else if (message instanceof Invocation) {
Invocation msg = (Invocation) message;
Registration registration = mRegistrations.get(msg.registration);
Registration registration = getOrDefault(
mRegistrations, msg.registration, null);

if (registration == null) {
throw new ProtocolError(String.format(
Expand Down Expand Up @@ -454,21 +459,21 @@ private void onMessage(IMessage message) throws Exception {
} else if (message instanceof Error) {
Error msg = (Error) message;
CompletableFuture<?> onReply = null;
CallRequest request = null;
PublishRequest publishRequest = null;
SubscribeRequest subscribeRequest = null;
RegisterRequest registerRequest = null;
if (msg.requestType == Call.MESSAGE_TYPE && ((request = mCallRequests.remove(msg.request)) != null)) {
onReply = request.onReply;
if (msg.requestType == Call.MESSAGE_TYPE && mCallRequests.containsKey(msg.request)) {
onReply = mCallRequests.get(msg.request).onReply;
mCallRequests.remove(msg.request);
} else if (msg.requestType == Publish.MESSAGE_TYPE
&& ((publishRequest = mPublishRequests.remove(msg.request)) != null)) {
onReply = publishRequest.onReply;
&& mPublishRequests.containsKey(msg.request)) {
onReply = mPublishRequests.get(msg.request).onReply;
mPublishRequests.remove(msg.request);
} else if (msg.requestType == Subscribe.MESSAGE_TYPE
&& ((subscribeRequest = mSubscribeRequests.remove(msg.request)) != null)) {
onReply = subscribeRequest.onReply;
&& mSubscribeRequests.containsKey(msg.request)) {
onReply = mSubscribeRequests.get(msg.request).onReply;
mSubscribeRequests.remove(msg.request);
} else if (msg.requestType == Register.MESSAGE_TYPE
&& ((registerRequest = mRegisterRequest.remove(msg.request)) != null)) {
onReply = registerRequest.onReply;
&& mRegisterRequest.containsKey(msg.request)) {
onReply = mRegisterRequest.get(msg.request).onReply;
mRegisterRequest.remove(msg.request);
}
if (onReply != null) {
onReply.completeExceptionally(new ApplicationError(
Expand Down Expand Up @@ -1208,12 +1213,14 @@ public void removeOnUserErrorListener(OnUserErrorListener listener) {
removeListener(mOnUserErrorListeners, listener);
}

private <T> T addListener(ConcurrentLinkedQueue<T> listeners, T listener) {
private <T> T addListener(ArrayList<T> listeners, T listener) {
listeners.add(listener);
return listener;
}

private <T> void removeListener(ConcurrentLinkedQueue<T> listeners, T listener) {
listeners.remove(listener);
private <T> void removeListener(ArrayList<T> listeners, T listener) {
if (listeners.contains(listener)) {
listeners.remove(listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

package io.crossbar.autobahn.wamp.utils;

import java.util.concurrent.atomic.AtomicLong;

public class IDGenerator {
private AtomicLong mNext = new AtomicLong();
private long mNext;

public long next() {
// return numbers in the range [0, 2**53-1]
return mNext.getAndIncrement() & 0x1fffffffffffffL;
mNext += 1;
if (mNext > 9007199254740992L) {
mNext = 1;
}
return mNext;
}
}

0 comments on commit 014d083

Please sign in to comment.