diff --git a/autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java b/autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java index 26d3a400..43b8e3f8 100644 --- a/autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java +++ b/autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java @@ -20,8 +20,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -95,17 +93,17 @@ public class Session implements ISession, ITransportHandler { private final int STATE_GOODBYE_SENT = 6; private final int STATE_ABORT_SENT = 7; - private volatile ITransport mTransport; - private volatile ISerializer mSerializer; + private ITransport mTransport; + private ISerializer mSerializer; private Executor mExecutor; private CompletableFuture mJoinFuture; - private final ConcurrentLinkedQueue mOnJoinListeners; - private final ConcurrentLinkedQueue mOnReadyListeners; - private final ConcurrentLinkedQueue mOnLeaveListeners; - private final ConcurrentLinkedQueue mOnConnectListeners; - private final ConcurrentLinkedQueue mOnDisconnectListeners; - private final ConcurrentLinkedQueue mOnUserErrorListeners; + private final ArrayList mOnJoinListeners; + private final ArrayList mOnReadyListeners; + private final ArrayList mOnLeaveListeners; + private final ArrayList mOnConnectListeners; + private final ArrayList mOnDisconnectListeners; + private final ArrayList mOnUserErrorListeners; private final IDGenerator mIDGenerator; private final Map mCallRequests; private final Map mSubscribeRequests; @@ -114,23 +112,23 @@ public class Session implements ISession, ITransportHandler { private final Map> mSubscriptions; private final Map mRegistrations; - private volatile int mState = STATE_DISCONNECTED; - private volatile long mSessionID; + private int mState = STATE_DISCONNECTED; + private long mSessionID; private boolean mGoodbyeSent; private String mRealm; public Session() { - mOnJoinListeners = new ConcurrentLinkedQueue<>(); - mOnReadyListeners = new ConcurrentLinkedQueue<>(); - mOnLeaveListeners = new ConcurrentLinkedQueue<>(); - mOnConnectListeners = new ConcurrentLinkedQueue<>(); - mOnDisconnectListeners = new ConcurrentLinkedQueue<>(); - mOnUserErrorListeners = new ConcurrentLinkedQueue<>(); + mOnJoinListeners = new ArrayList<>(); + mOnReadyListeners = new ArrayList<>(); + mOnLeaveListeners = new ArrayList<>(); + mOnConnectListeners = new ArrayList<>(); + mOnDisconnectListeners = new ArrayList<>(); + mOnUserErrorListeners = new ArrayList<>(); mIDGenerator = new IDGenerator(); - mCallRequests = new ConcurrentHashMap<>(); - mSubscribeRequests = new ConcurrentHashMap<>(); - mPublishRequests = new ConcurrentHashMap<>(); - mRegisterRequest = new ConcurrentHashMap<>(); + mCallRequests = new HashMap<>(); + mSubscribeRequests = new HashMap<>(); + mPublishRequests = new HashMap<>(); + mRegisterRequest = new HashMap<>(); mSubscriptions = new HashMap<>(); mRegistrations = new HashMap<>(); } @@ -253,12 +251,13 @@ private void onPreSessionMessage(IMessage message) throws Exception { private void onMessage(IMessage message) throws Exception { if (message instanceof Result) { Result msg = (Result) message; - CallRequest request = mCallRequests.remove(msg.request); + CallRequest request = getOrDefault(mCallRequests, msg.request, null); if (request == null) { throw new ProtocolError(String.format( "RESULT received for non-pending request ID %s", msg.request)); } + mCallRequests.remove(msg.request); if (request.resultTypeRef != null) { // FIXME: check args length > 1 and == 0, and kwargs != null // we cannot currently POJO automap these cases! @@ -272,24 +271,25 @@ private void onMessage(IMessage message) throws Exception { } } else if (message instanceof Subscribed) { Subscribed msg = (Subscribed) message; - SubscribeRequest request = mSubscribeRequests.remove(msg.request); + SubscribeRequest request = getOrDefault( + mSubscribeRequests, msg.request, null); if (request == null) { throw new ProtocolError(String.format( "SUBSCRIBED received for non-pending request ID %s", msg.request)); } - List list = mSubscriptions.get(msg.subscription); - if (list == null) { - list = new ArrayList<>(); - mSubscriptions.put(msg.subscription, list); + mSubscribeRequests.remove(msg.request); + if (!mSubscriptions.containsKey(msg.subscription)) { + mSubscriptions.put(msg.subscription, new ArrayList<>()); } Subscription subscription = new Subscription(msg.subscription, request.topic, request.resultTypeRef, request.resultTypeClass, request.handler); - list.add(subscription); + mSubscriptions.get(msg.subscription).add(subscription); request.onReply.complete(subscription); } else if (message instanceof Event) { Event msg = (Event) message; - List subscriptions = mSubscriptions.get(msg.subscription); + List subscriptions = getOrDefault( + mSubscriptions, msg.subscription, null); if (subscriptions == null) { throw new ProtocolError(String.format( "EVENT received for non-subscribed subscription ID %s", @@ -345,30 +345,35 @@ private void onMessage(IMessage message) throws Exception { combineFutures(futures); } else if (message instanceof Published) { Published msg = (Published) message; - PublishRequest request = mPublishRequests.remove(msg.request); + PublishRequest request = getOrDefault( + mPublishRequests, msg.request, null); if (request == null) { throw new ProtocolError(String.format( "PUBLISHED received for non-pending request ID %s", msg.request)); } + mPublishRequests.remove(msg.request); Publication publication = new Publication(msg.publication); request.onReply.complete(publication); } else if (message instanceof Registered) { Registered msg = (Registered) message; - RegisterRequest request = mRegisterRequest.remove(msg.request); + RegisterRequest request = getOrDefault( + mRegisterRequest, msg.request, null); + if (request == null) { throw new ProtocolError(String.format( "REGISTERED received for already existing registration ID %s", msg.request)); } - + mRegisterRequest.remove(msg.request); Registration registration = new Registration( msg.registration, request.procedure, request.endpoint); mRegistrations.put(msg.registration, registration); request.onReply.complete(registration); } else if (message instanceof Invocation) { Invocation msg = (Invocation) message; - Registration registration = mRegistrations.get(msg.registration); + Registration registration = getOrDefault( + mRegistrations, msg.registration, null); if (registration == null) { throw new ProtocolError(String.format( @@ -454,21 +459,21 @@ private void onMessage(IMessage message) throws Exception { } else if (message instanceof Error) { Error msg = (Error) message; CompletableFuture onReply = null; - CallRequest request = null; - PublishRequest publishRequest = null; - SubscribeRequest subscribeRequest = null; - RegisterRequest registerRequest = null; - if (msg.requestType == Call.MESSAGE_TYPE && ((request = mCallRequests.remove(msg.request)) != null)) { - onReply = request.onReply; + if (msg.requestType == Call.MESSAGE_TYPE && mCallRequests.containsKey(msg.request)) { + onReply = mCallRequests.get(msg.request).onReply; + mCallRequests.remove(msg.request); } else if (msg.requestType == Publish.MESSAGE_TYPE - && ((publishRequest = mPublishRequests.remove(msg.request)) != null)) { - onReply = publishRequest.onReply; + && mPublishRequests.containsKey(msg.request)) { + onReply = mPublishRequests.get(msg.request).onReply; + mPublishRequests.remove(msg.request); } else if (msg.requestType == Subscribe.MESSAGE_TYPE - && ((subscribeRequest = mSubscribeRequests.remove(msg.request)) != null)) { - onReply = subscribeRequest.onReply; + && mSubscribeRequests.containsKey(msg.request)) { + onReply = mSubscribeRequests.get(msg.request).onReply; + mSubscribeRequests.remove(msg.request); } else if (msg.requestType == Register.MESSAGE_TYPE - && ((registerRequest = mRegisterRequest.remove(msg.request)) != null)) { - onReply = registerRequest.onReply; + && mRegisterRequest.containsKey(msg.request)) { + onReply = mRegisterRequest.get(msg.request).onReply; + mRegisterRequest.remove(msg.request); } if (onReply != null) { onReply.completeExceptionally(new ApplicationError( @@ -1208,12 +1213,14 @@ public void removeOnUserErrorListener(OnUserErrorListener listener) { removeListener(mOnUserErrorListeners, listener); } - private T addListener(ConcurrentLinkedQueue listeners, T listener) { + private T addListener(ArrayList listeners, T listener) { listeners.add(listener); return listener; } - private void removeListener(ConcurrentLinkedQueue listeners, T listener) { - listeners.remove(listener); + private void removeListener(ArrayList listeners, T listener) { + if (listeners.contains(listener)) { + listeners.remove(listener); + } } } diff --git a/autobahn/src/main/java/io/crossbar/autobahn/wamp/utils/IDGenerator.java b/autobahn/src/main/java/io/crossbar/autobahn/wamp/utils/IDGenerator.java index b92a77e2..4fe01be3 100644 --- a/autobahn/src/main/java/io/crossbar/autobahn/wamp/utils/IDGenerator.java +++ b/autobahn/src/main/java/io/crossbar/autobahn/wamp/utils/IDGenerator.java @@ -11,13 +11,14 @@ package io.crossbar.autobahn.wamp.utils; -import java.util.concurrent.atomic.AtomicLong; - public class IDGenerator { - private AtomicLong mNext = new AtomicLong(); + private long mNext; public long next() { - // return numbers in the range [0, 2**53-1] - return mNext.getAndIncrement() & 0x1fffffffffffffL; + mNext += 1; + if (mNext > 9007199254740992L) { + mNext = 1; + } + return mNext; } }