diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/Subscription.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/Subscription.java index 7c87fd9be4..2013f279e0 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/Subscription.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/Subscription.java @@ -20,13 +20,18 @@ public class Subscription { - private final Long id; + private final Long subscriptionId; + private final String connectionId; private final SubscriptionType subscriptionType; private final Boolean includeTransaction; public Subscription( - final Long id, final SubscriptionType subscriptionType, final Boolean includeTransaction) { - this.id = id; + final Long subscriptionId, + final String connectionId, + final SubscriptionType subscriptionType, + final Boolean includeTransaction) { + this.subscriptionId = subscriptionId; + this.connectionId = connectionId; this.subscriptionType = subscriptionType; this.includeTransaction = includeTransaction; } @@ -35,8 +40,12 @@ public SubscriptionType getSubscriptionType() { return subscriptionType; } - public Long getId() { - return id; + public Long getSubscriptionId() { + return subscriptionId; + } + + public String getConnectionId() { + return connectionId; } public Boolean getIncludeTransaction() { @@ -46,8 +55,10 @@ public Boolean getIncludeTransaction() { @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("id", id) + .add("subscriptionId", subscriptionId) + .add("connectionId", connectionId) .add("subscriptionType", subscriptionType) + .add("includeTransaction", includeTransaction) .toString(); } @@ -64,11 +75,12 @@ public boolean equals(final Object o) { return false; } final Subscription that = (Subscription) o; - return Objects.equals(id, that.id) && subscriptionType == that.subscriptionType; + return Objects.equals(subscriptionId, that.subscriptionId) + && subscriptionType == that.subscriptionType; } @Override public int hashCode() { - return Objects.hash(id, subscriptionType); + return Objects.hash(subscriptionId, subscriptionType); } } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionBuilder.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionBuilder.java index 273810b50a..94efef1fee 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionBuilder.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionBuilder.java @@ -23,27 +23,31 @@ public class SubscriptionBuilder { - public Subscription build(final long id, final SubscribeRequest request) { + public Subscription build( + final long subscriptionId, final String connectionId, final SubscribeRequest request) { final SubscriptionType subscriptionType = request.getSubscriptionType(); switch (subscriptionType) { case NEW_BLOCK_HEADERS: { - return new NewBlockHeadersSubscription(id, request.getIncludeTransaction()); + return new NewBlockHeadersSubscription( + subscriptionId, connectionId, request.getIncludeTransaction()); } case LOGS: { return new LogsSubscription( - id, + subscriptionId, + connectionId, Optional.ofNullable(request.getFilterParameter()) .orElseThrow(IllegalArgumentException::new)); } case SYNCING: { - return new SyncingSubscription(id, subscriptionType); + return new SyncingSubscription(subscriptionId, connectionId, subscriptionType); } case NEW_PENDING_TRANSACTIONS: default: - return new Subscription(id, subscriptionType, request.getIncludeTransaction()); + return new Subscription( + subscriptionId, connectionId, subscriptionType, request.getIncludeTransaction()); } } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java index 1e0625fef2..a449c0ad7a 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java @@ -18,17 +18,13 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.UnsubscribeRequest; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.response.SubscriptionResponse; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.stream.Collectors; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import io.vertx.core.AbstractVerticle; import io.vertx.core.eventbus.Message; import io.vertx.core.json.Json; @@ -47,12 +43,9 @@ public class SubscriptionManager extends AbstractVerticle { "SubscriptionManager::removeSubscriptions"; private final AtomicLong subscriptionCounter = new AtomicLong(0); - private final Map subscriptions = new HashMap<>(); - private final Map> connectionSubscriptionsMap = new HashMap<>(); + private final Map subscriptions = new ConcurrentHashMap<>(); private final SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder(); - public SubscriptionManager() {} - @Override public void start() { vertx.eventBus().consumer(EVENTBUS_REMOVE_SUBSCRIPTIONS_ADDRESS, this::removeSubscriptions); @@ -62,23 +55,11 @@ public Long subscribe(final SubscribeRequest request) { LOG.debug("Subscribe request {}", request); final long subscriptionId = subscriptionCounter.incrementAndGet(); - final Subscription subscription = subscriptionBuilder.build(subscriptionId, request); - addSubscription(subscription, request.getConnectionId()); - - return subscription.getId(); - } - - private void addSubscription(final Subscription subscription, final String connectionId) { - subscriptions.put(subscription.getId(), subscription); - mapSubscriptionToConnection(connectionId, subscription.getId()); - } + final Subscription subscription = + subscriptionBuilder.build(subscriptionId, request.getConnectionId(), request); + subscriptions.put(subscription.getSubscriptionId(), subscription); - private void mapSubscriptionToConnection(final String connectionId, final Long subscriptionId) { - if (connectionSubscriptionsMap.containsKey(connectionId)) { - connectionSubscriptionsMap.get(connectionId).add(subscriptionId); - } else { - connectionSubscriptionsMap.put(connectionId, Lists.newArrayList(subscriptionId)); - } + return subscription.getSubscriptionId(); } public boolean unsubscribe(final UnsubscribeRequest request) { @@ -87,66 +68,39 @@ public boolean unsubscribe(final UnsubscribeRequest request) { LOG.debug("Unsubscribe request subscriptionId = {}", subscriptionId); - if (!subscriptions.containsKey(subscriptionId) - || !connectionOwnsSubscription(subscriptionId, connectionId)) { + final Subscription subscription = subscriptions.get(subscriptionId); + if (subscription == null || !subscription.getConnectionId().equals(connectionId)) { throw new SubscriptionNotFoundException(subscriptionId); } - destroySubscription(subscriptionId, connectionId); + destroySubscription(subscriptionId); return true; } - private boolean connectionOwnsSubscription(final Long subscriptionId, final String connectionId) { - return connectionSubscriptionsMap.get(connectionId) != null - && connectionSubscriptionsMap.get(connectionId).contains(subscriptionId); - } - - private void destroySubscription(final long subscriptionId, final String connectionId) { + private void destroySubscription(final long subscriptionId) { subscriptions.remove(subscriptionId); - - if (connectionSubscriptionsMap.containsKey(connectionId)) { - removeSubscriptionToConnectionMapping(connectionId, subscriptionId); - } - } - - private void removeSubscriptionToConnectionMapping( - final String connectionId, final Long subscriptionId) { - if (connectionSubscriptionsMap.get(connectionId).size() > 1) { - connectionSubscriptionsMap.get(connectionId).remove(subscriptionId); - } else { - connectionSubscriptionsMap.remove(connectionId); - } } - @VisibleForTesting - void removeSubscriptions(final Message message) { + private void removeSubscriptions(final Message message) { final String connectionId = message.body(); if (connectionId == null || "".equals(connectionId)) { - LOG.warn("Received invalid connectionId ({}). No subscriptions removed."); + LOG.warn("Received invalid connectionId ({}). No subscriptions removed.", connectionId); } - LOG.debug("Removing subscription for connectionId = {}", connectionId); - - final List subscriptionIds = - Lists.newArrayList( - connectionSubscriptionsMap.getOrDefault(connectionId, Lists.newArrayList())); - subscriptionIds.forEach(subscriptionId -> destroySubscription(subscriptionId, connectionId)); - } + LOG.debug("Removing subscription for connectionId {}", connectionId); - @VisibleForTesting - Map subscriptions() { - return Maps.newHashMap(subscriptions); + subscriptions.values().stream() + .filter(subscription -> subscription.getConnectionId().equals(connectionId)) + .forEach(subscription -> destroySubscription(subscription.getSubscriptionId())); } - @VisibleForTesting - public Map> getConnectionSubscriptionsMap() { - return Maps.newHashMap(connectionSubscriptionsMap); + public Subscription getSubscriptionById(final Long subscriptionId) { + return subscriptions.get(subscriptionId); } public List subscriptionsOfType(final SubscriptionType type, final Class clazz) { - return subscriptions.entrySet().stream() - .map(Entry::getValue) + return subscriptions.values().stream() .filter(subscription -> subscription.isType(type)) .map(subscriptionBuilder.mapToSubscriptionClass(clazz)) .collect(Collectors.toList()); @@ -155,11 +109,10 @@ public List subscriptionsOfType(final SubscriptionType type, final Class< public void sendMessage(final Long subscriptionId, final JsonRpcResult msg) { final SubscriptionResponse response = new SubscriptionResponse(subscriptionId, msg); - connectionSubscriptionsMap.entrySet().stream() - .filter(e -> e.getValue().contains(subscriptionId)) - .map(Entry::getKey) - .findFirst() - .ifPresent(connectionId -> vertx.eventBus().send(connectionId, Json.encode(response))); + final Subscription subscription = subscriptions.get(subscriptionId); + if (subscription != null) { + vertx.eventBus().send(subscription.getConnectionId(), Json.encode(response)); + } } public void notifySubscribersOnWorkerThread( diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscription.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscription.java index be8e22e8b6..cdffdbb7f5 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscription.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscription.java @@ -19,8 +19,9 @@ public class NewBlockHeadersSubscription extends Subscription { private final boolean includeTransactions; - public NewBlockHeadersSubscription(final Long subscriptionId, final boolean includeTransactions) { - super(subscriptionId, SubscriptionType.NEW_BLOCK_HEADERS, Boolean.FALSE); + public NewBlockHeadersSubscription( + final Long subscriptionId, final String connectionId, final boolean includeTransactions) { + super(subscriptionId, connectionId, SubscriptionType.NEW_BLOCK_HEADERS, Boolean.FALSE); this.includeTransactions = includeTransactions; } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionService.java index 90b2cc905b..0542a876bc 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionService.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionService.java @@ -48,7 +48,7 @@ public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchai ? blockWithCompleteTransaction(newBlockHash) : blockWithTransactionHash(newBlockHash); - subscriptionManager.sendMessage(subscription.getId(), newBlock); + subscriptionManager.sendMessage(subscription.getSubscriptionId(), newBlock); } }); } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscription.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscription.java index 04bb5b7985..23d5ba55bf 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscription.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscription.java @@ -21,8 +21,9 @@ public class LogsSubscription extends Subscription { private final FilterParameter filterParameter; - public LogsSubscription(final Long subscriptionId, final FilterParameter filterParameter) { - super(subscriptionId, SubscriptionType.LOGS, Boolean.FALSE); + public LogsSubscription( + final Long subscriptionId, final String connectionId, final FilterParameter filterParameter) { + super(subscriptionId, connectionId, SubscriptionType.LOGS, Boolean.FALSE); this.filterParameter = filterParameter; } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java index 72abc0a7f3..eb6d6e608e 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java @@ -87,7 +87,8 @@ private void sendLogToSubscription( final int logIndex, final LogsSubscription subscription) { final LogWithMetadata logWithMetaData = logWithMetadata(logIndex, receiptWithMetadata, removed); - subscriptionManager.sendMessage(subscription.getId(), new LogResult(logWithMetaData)); + subscriptionManager.sendMessage( + subscription.getSubscriptionId(), new LogResult(logWithMetaData)); } // @formatter:off diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java index 9978fcd63e..95da7e2e3f 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionService.java @@ -41,7 +41,7 @@ private void notifySubscribers(final Hash pendingTransaction) { final PendingTransactionResult msg = new PendingTransactionResult(pendingTransaction); for (final Subscription subscription : subscriptions) { - subscriptionManager.sendMessage(subscription.getId(), msg); + subscriptionManager.sendMessage(subscription.getSubscriptionId(), msg); } } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java index 5842f70eec..64b95e83a4 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionService.java @@ -42,9 +42,9 @@ private void notifySubscribers(final Transaction pendingTransaction) { new PendingTransactionDetailResult(pendingTransaction); for (final Subscription subscription : subscriptions) { if (Boolean.TRUE.equals(subscription.getIncludeTransaction())) { - subscriptionManager.sendMessage(subscription.getId(), detailResult); + subscriptionManager.sendMessage(subscription.getSubscriptionId(), detailResult); } else { - subscriptionManager.sendMessage(subscription.getId(), hashResult); + subscriptionManager.sendMessage(subscription.getSubscriptionId(), hashResult); } } } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscription.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscription.java index 010e55131b..e8aabf1cd1 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscription.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscription.java @@ -18,8 +18,11 @@ public class SyncingSubscription extends Subscription { private boolean firstMessageHasBeenSent = false; - public SyncingSubscription(final Long id, final SubscriptionType subscriptionType) { - super(id, subscriptionType, Boolean.FALSE); + public SyncingSubscription( + final Long subscriptionId, + final String connectionId, + final SubscriptionType subscriptionType) { + super(subscriptionId, connectionId, subscriptionType, Boolean.FALSE); } public void setFirstMessageHasBeenSent(final boolean firstMessageHasBeenSent) { diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java index fe86d61612..64382f243e 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java @@ -36,10 +36,14 @@ private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) { syncingSubscriptions -> { if (syncStatus.inSync()) { syncingSubscriptions.forEach( - s -> subscriptionManager.sendMessage(s.getId(), new NotSynchronisingResult())); + s -> + subscriptionManager.sendMessage( + s.getSubscriptionId(), new NotSynchronisingResult())); } else { syncingSubscriptions.forEach( - s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus))); + s -> + subscriptionManager.sendMessage( + s.getSubscriptionId(), new SyncingResult(syncStatus))); } }); } diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/methods/EthSubscribeIntegrationTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/methods/EthSubscribeIntegrationTest.java index 99a892f1db..e35e9af5cb 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/methods/EthSubscribeIntegrationTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/methods/EthSubscribeIntegrationTest.java @@ -16,11 +16,14 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.internal.JsonRpcRequest; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.WebSocketRequestHandler; +import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; +import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; +import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing.SyncingSubscription; import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.stream.Collectors; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -62,10 +65,9 @@ public void shouldAddConnectionToMap(final TestContext context) { .consumer(CONNECTION_ID_1) .handler( msg -> { - final Map> connectionSubscriptionsMap = - subscriptionManager.getConnectionSubscriptionsMap(); - assertThat(connectionSubscriptionsMap.size()).isEqualTo(1); - assertThat(connectionSubscriptionsMap.containsKey(CONNECTION_ID_1)).isTrue(); + final List syncingSubscriptions = getSubscriptions(); + assertThat(syncingSubscriptions).hasSize(1); + assertThat(syncingSubscriptions.get(0).getConnectionId()).isEqualTo(CONNECTION_ID_1); async.complete(); }) .completionHandler( @@ -88,12 +90,9 @@ public void shouldAddMultipleConnectionsToMap(final TestContext context) { .consumer(CONNECTION_ID_1) .handler( msg -> { - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(1); - assertThat( - subscriptionManager - .getConnectionSubscriptionsMap() - .containsKey(CONNECTION_ID_1)) - .isTrue(); + final List subscriptions = getSubscriptions(); + assertThat(subscriptions).hasSize(1); + assertThat(subscriptions.get(0).getConnectionId()).isEqualTo(CONNECTION_ID_1); async.countDown(); vertx @@ -101,18 +100,14 @@ public void shouldAddMultipleConnectionsToMap(final TestContext context) { .consumer(CONNECTION_ID_2) .handler( msg2 -> { - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()) - .isEqualTo(2); - assertThat( - subscriptionManager - .getConnectionSubscriptionsMap() - .containsKey(CONNECTION_ID_1)) - .isTrue(); - assertThat( - subscriptionManager - .getConnectionSubscriptionsMap() - .containsKey(CONNECTION_ID_2)) - .isTrue(); + final List updatedSubscriptions = getSubscriptions(); + assertThat(updatedSubscriptions).hasSize(2); + final List connectionIds = + updatedSubscriptions.stream() + .map(Subscription::getConnectionId) + .collect(Collectors.toList()); + assertThat(connectionIds) + .containsExactlyInAnyOrder(CONNECTION_ID_1, CONNECTION_ID_2); async.countDown(); }) .completionHandler( @@ -128,6 +123,11 @@ public void shouldAddMultipleConnectionsToMap(final TestContext context) { async.awaitSuccess(ASYNC_TIMEOUT); } + private List getSubscriptions() { + return subscriptionManager.subscriptionsOfType( + SubscriptionType.SYNCING, SyncingSubscription.class); + } + private WebSocketRpcRequest createEthSubscribeRequest(final String connectionId) { return Json.decodeValue( "{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"syncing\"], \"connectionId\": \"" diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/methods/EthUnsubscribeIntegrationTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/methods/EthUnsubscribeIntegrationTest.java index 3f70cdd008..feae0f381b 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/methods/EthUnsubscribeIntegrationTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/methods/EthUnsubscribeIntegrationTest.java @@ -54,14 +54,11 @@ public void before() { public void shouldRemoveConnectionWithSingleSubscriptionFromMap(final TestContext context) { final Async async = context.async(); - // Check the connectionMap is empty - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(0); - // Add the subscription we'd like to remove final SubscribeRequest subscribeRequest = new SubscribeRequest(SubscriptionType.SYNCING, null, null, CONNECTION_ID); final Long subscriptionId = subscriptionManager.subscribe(subscribeRequest); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(1); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId)).isNotNull(); final JsonRpcRequest unsubscribeRequest = createEthUnsubscribeRequest(subscriptionId, CONNECTION_ID); @@ -71,7 +68,7 @@ public void shouldRemoveConnectionWithSingleSubscriptionFromMap(final TestContex .consumer(CONNECTION_ID) .handler( msg -> { - assertThat(subscriptionManager.getConnectionSubscriptionsMap().isEmpty()).isTrue(); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId)).isNull(); async.complete(); }) .completionHandler( @@ -86,20 +83,14 @@ public void shouldRemoveConnectionWithSingleSubscriptionFromMap(final TestContex public void shouldRemoveSubscriptionAndKeepConnection(final TestContext context) { final Async async = context.async(); - // Check the connectionMap is empty - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(0); - // Add the subscriptions we'd like to remove final SubscribeRequest subscribeRequest = new SubscribeRequest(SubscriptionType.SYNCING, null, null, CONNECTION_ID); final Long subscriptionId1 = subscriptionManager.subscribe(subscribeRequest); final Long subscriptionId2 = subscriptionManager.subscribe(subscribeRequest); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(1); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().containsKey(CONNECTION_ID)) - .isTrue(); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().get(CONNECTION_ID).size()) - .isEqualTo(2); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId1)).isNotNull(); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId2)).isNotNull(); final JsonRpcRequest unsubscribeRequest = createEthUnsubscribeRequest(subscriptionId2, CONNECTION_ID); @@ -109,18 +100,8 @@ public void shouldRemoveSubscriptionAndKeepConnection(final TestContext context) .consumer(CONNECTION_ID) .handler( msg -> { - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(1); - assertThat( - subscriptionManager - .getConnectionSubscriptionsMap() - .containsKey(CONNECTION_ID)) - .isTrue(); - assertThat( - subscriptionManager.getConnectionSubscriptionsMap().get(CONNECTION_ID).size()) - .isEqualTo(1); - assertThat( - subscriptionManager.getConnectionSubscriptionsMap().get(CONNECTION_ID).get(0)) - .isEqualTo(subscriptionId1); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId1)).isNotNull(); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId2)).isNull(); async.complete(); }) .completionHandler( diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionBuilderTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionBuilderTest.java index 3a5baa3731..294808f3e1 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionBuilderTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionBuilderTest.java @@ -29,6 +29,7 @@ public class SubscriptionBuilderTest { + private static final String CONNECTION_ID = "connectionId"; private SubscriptionBuilder subscriptionBuilder; @Before @@ -40,10 +41,12 @@ public void before() { public void shouldBuildLogsSubscriptionWhenSubscribeRequestTypeIsLogs() { final FilterParameter filterParameter = filterParameter(); final SubscribeRequest subscribeRequest = - new SubscribeRequest(SubscriptionType.LOGS, filterParameter, null, "connectionId"); - final LogsSubscription expectedSubscription = new LogsSubscription(1L, filterParameter); + new SubscribeRequest(SubscriptionType.LOGS, filterParameter, null, CONNECTION_ID); + final LogsSubscription expectedSubscription = + new LogsSubscription(1L, CONNECTION_ID, filterParameter); - final Subscription builtSubscription = subscriptionBuilder.build(1L, subscribeRequest); + final Subscription builtSubscription = + subscriptionBuilder.build(1L, CONNECTION_ID, subscribeRequest); assertThat(builtSubscription).isEqualToComparingFieldByField(expectedSubscription); } @@ -51,11 +54,12 @@ public void shouldBuildLogsSubscriptionWhenSubscribeRequestTypeIsLogs() { @Test public void shouldBuildNewBlockHeadsSubscriptionWhenSubscribeRequestTypeIsNewBlockHeads() { final SubscribeRequest subscribeRequest = - new SubscribeRequest(SubscriptionType.NEW_BLOCK_HEADERS, null, true, "connectionId"); + new SubscribeRequest(SubscriptionType.NEW_BLOCK_HEADERS, null, true, CONNECTION_ID); final NewBlockHeadersSubscription expectedSubscription = - new NewBlockHeadersSubscription(1L, true); + new NewBlockHeadersSubscription(1L, CONNECTION_ID, true); - final Subscription builtSubscription = subscriptionBuilder.build(1L, subscribeRequest); + final Subscription builtSubscription = + subscriptionBuilder.build(1L, CONNECTION_ID, subscribeRequest); assertThat(builtSubscription).isEqualToComparingFieldByField(expectedSubscription); } @@ -63,11 +67,12 @@ public void shouldBuildNewBlockHeadsSubscriptionWhenSubscribeRequestTypeIsNewBlo @Test public void shouldBuildSubscriptionWhenSubscribeRequestTypeIsNewPendingTransactions() { final SubscribeRequest subscribeRequest = - new SubscribeRequest(SubscriptionType.NEW_PENDING_TRANSACTIONS, null, null, "connectionId"); + new SubscribeRequest(SubscriptionType.NEW_PENDING_TRANSACTIONS, null, null, CONNECTION_ID); final Subscription expectedSubscription = - new Subscription(1L, SubscriptionType.NEW_PENDING_TRANSACTIONS, null); + new Subscription(1L, CONNECTION_ID, SubscriptionType.NEW_PENDING_TRANSACTIONS, null); - final Subscription builtSubscription = subscriptionBuilder.build(1L, subscribeRequest); + final Subscription builtSubscription = + subscriptionBuilder.build(1L, CONNECTION_ID, subscribeRequest); assertThat(builtSubscription).isEqualToComparingFieldByField(expectedSubscription); } @@ -75,11 +80,12 @@ public void shouldBuildSubscriptionWhenSubscribeRequestTypeIsNewPendingTransacti @Test public void shouldBuildSubscriptionWhenSubscribeRequestTypeIsSyncing() { final SubscribeRequest subscribeRequest = - new SubscribeRequest(SubscriptionType.SYNCING, null, null, "connectionId"); + new SubscribeRequest(SubscriptionType.SYNCING, null, null, CONNECTION_ID); final SyncingSubscription expectedSubscription = - new SyncingSubscription(1L, SubscriptionType.SYNCING); + new SyncingSubscription(1L, CONNECTION_ID, SubscriptionType.SYNCING); - final Subscription builtSubscription = subscriptionBuilder.build(1L, subscribeRequest); + final Subscription builtSubscription = + subscriptionBuilder.build(1L, CONNECTION_ID, subscribeRequest); assertThat(builtSubscription).isEqualToComparingFieldByField(expectedSubscription); } @@ -88,7 +94,7 @@ public void shouldBuildSubscriptionWhenSubscribeRequestTypeIsSyncing() { public void shouldReturnLogsSubscriptionWhenMappingLogsSubscription() { final Function function = subscriptionBuilder.mapToSubscriptionClass(LogsSubscription.class); - final Subscription subscription = new LogsSubscription(1L, filterParameter()); + final Subscription subscription = new LogsSubscription(1L, CONNECTION_ID, filterParameter()); assertThat(function.apply(subscription)).isInstanceOf(LogsSubscription.class); } @@ -97,7 +103,7 @@ public void shouldReturnLogsSubscriptionWhenMappingLogsSubscription() { public void shouldReturnNewBlockHeadsSubscriptionWhenMappingNewBlockHeadsSubscription() { final Function function = subscriptionBuilder.mapToSubscriptionClass(NewBlockHeadersSubscription.class); - final Subscription subscription = new NewBlockHeadersSubscription(1L, true); + final Subscription subscription = new NewBlockHeadersSubscription(1L, CONNECTION_ID, true); assertThat(function.apply(subscription)).isInstanceOf(NewBlockHeadersSubscription.class); } @@ -107,7 +113,8 @@ public void shouldReturnSubscriptionWhenMappingNewPendingTransactionsSubscriptio final Function function = subscriptionBuilder.mapToSubscriptionClass(Subscription.class); final Subscription logsSubscription = - new Subscription(1L, SubscriptionType.NEW_PENDING_TRANSACTIONS, Boolean.FALSE); + new Subscription( + 1L, CONNECTION_ID, SubscriptionType.NEW_PENDING_TRANSACTIONS, Boolean.FALSE); assertThat(function.apply(logsSubscription)).isInstanceOf(Subscription.class); } @@ -116,7 +123,8 @@ public void shouldReturnSubscriptionWhenMappingNewPendingTransactionsSubscriptio public void shouldReturnSubscriptionWhenMappingSyncingSubscription() { final Function function = subscriptionBuilder.mapToSubscriptionClass(SyncingSubscription.class); - final Subscription subscription = new SyncingSubscription(1L, SubscriptionType.SYNCING); + final Subscription subscription = + new SyncingSubscription(1L, CONNECTION_ID, SubscriptionType.SYNCING); assertThat(function.apply(subscription)).isInstanceOf(SyncingSubscription.class); } @@ -126,7 +134,8 @@ public void shouldReturnSubscriptionWhenMappingSyncingSubscription() { public void shouldThrownIllegalArgumentExceptionWhenMappingWrongSubscriptionType() { final Function function = subscriptionBuilder.mapToSubscriptionClass(LogsSubscription.class); - final NewBlockHeadersSubscription subscription = new NewBlockHeadersSubscription(1L, true); + final NewBlockHeadersSubscription subscription = + new NewBlockHeadersSubscription(1L, CONNECTION_ID, true); final Throwable thrown = catchThrowable(() -> function.apply(subscription)); assertThat(thrown) diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManagerTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManagerTest.java index 5460db620c..3bb21bfd18 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManagerTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManagerTest.java @@ -52,9 +52,10 @@ public void subscribeShouldCreateSubscription() { final Long subscriptionId = subscriptionManager.subscribe(subscribeRequest); final SyncingSubscription expectedSubscription = - new SyncingSubscription(subscriptionId, subscribeRequest.getSubscriptionType()); + new SyncingSubscription( + subscriptionId, CONNECTION_ID, subscribeRequest.getSubscriptionType()); final Subscription createdSubscription = - subscriptionManager.subscriptions().get(subscriptionId); + subscriptionManager.getSubscriptionById(subscriptionId); assertThat(subscriptionId).isEqualTo(1L); assertThat(createdSubscription).isEqualTo(expectedSubscription); @@ -65,14 +66,14 @@ public void unsubscribeExistingSubscriptionShouldDestroySubscription() { final SubscribeRequest subscribeRequest = subscribeRequest(CONNECTION_ID); final Long subscriptionId = subscriptionManager.subscribe(subscribeRequest); - assertThat(subscriptionManager.subscriptions().get(subscriptionId)).isNotNull(); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId)).isNotNull(); final UnsubscribeRequest unsubscribeRequest = new UnsubscribeRequest(subscriptionId, CONNECTION_ID); final boolean unsubscribed = subscriptionManager.unsubscribe(unsubscribeRequest); assertThat(unsubscribed).isTrue(); - assertThat(subscriptionManager.subscriptions().get(subscriptionId)).isNull(); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId)).isNull(); } @Test @@ -90,32 +91,23 @@ public void unsubscribeAbsentSubscriptionShouldThrowSubscriptionNotFoundExceptio public void shouldAddSubscriptionToNewConnection() { final SubscribeRequest subscribeRequest = subscribeRequest(CONNECTION_ID); - subscriptionManager.subscribe(subscribeRequest); + final Long subscriptionId = subscriptionManager.subscribe(subscribeRequest); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(1); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().containsKey(CONNECTION_ID)) - .isTrue(); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().get(CONNECTION_ID).size()) - .isEqualTo(1); + final Subscription subscription = subscriptionManager.getSubscriptionById(subscriptionId); + assertThat(subscription.getConnectionId()).isEqualTo(CONNECTION_ID); } @Test public void shouldAddSubscriptionToExistingConnection() { final SubscribeRequest subscribeRequest = subscribeRequest(CONNECTION_ID); - subscriptionManager.subscribe(subscribeRequest); - - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(1); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().containsKey(CONNECTION_ID)) - .isTrue(); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().get(CONNECTION_ID).size()) - .isEqualTo(1); - - subscriptionManager.subscribe(subscribeRequest); + final Long subscriptionId1 = subscriptionManager.subscribe(subscribeRequest); + final Long subscriptionId2 = subscriptionManager.subscribe(subscribeRequest); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(1); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().get(CONNECTION_ID).size()) - .isEqualTo(2); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId1).getConnectionId()) + .isEqualTo(CONNECTION_ID); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId2).getConnectionId()) + .isEqualTo(CONNECTION_ID); } @Test @@ -123,28 +115,20 @@ public void shouldRemoveSubscriptionFromExistingConnection() { final SubscribeRequest subscribeRequest = subscribeRequest(CONNECTION_ID); final Long subscriptionId1 = subscriptionManager.subscribe(subscribeRequest); - - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(1); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().containsKey(CONNECTION_ID)) - .isTrue(); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().get(CONNECTION_ID).size()) - .isEqualTo(1); - final Long subscriptionId2 = subscriptionManager.subscribe(subscribeRequest); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(1); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().get(CONNECTION_ID).size()) - .isEqualTo(2); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId1).getConnectionId()) + .isEqualTo(CONNECTION_ID); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId2).getConnectionId()) + .isEqualTo(CONNECTION_ID); final UnsubscribeRequest unsubscribeRequest = new UnsubscribeRequest(subscriptionId1, CONNECTION_ID); subscriptionManager.unsubscribe(unsubscribeRequest); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(1); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().get(CONNECTION_ID).size()) - .isEqualTo(1); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().get(CONNECTION_ID).get(0)) - .isEqualTo(subscriptionId2); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId1)).isNull(); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId2).getConnectionId()) + .isEqualTo(CONNECTION_ID); } @Test @@ -153,17 +137,14 @@ public void shouldRemoveConnectionWithSingleSubscriptions() { final Long subscriptionId1 = subscriptionManager.subscribe(subscribeRequest); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().size()).isEqualTo(1); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().containsKey(CONNECTION_ID)) - .isTrue(); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().get(CONNECTION_ID).size()) - .isEqualTo(1); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId1).getConnectionId()) + .isEqualTo(CONNECTION_ID); final UnsubscribeRequest unsubscribeRequest = new UnsubscribeRequest(subscriptionId1, CONNECTION_ID); subscriptionManager.unsubscribe(unsubscribeRequest); - assertThat(subscriptionManager.getConnectionSubscriptionsMap().isEmpty()).isTrue(); + assertThat(subscriptionManager.getSubscriptionById(subscriptionId1)).isNull(); } @Test diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java index fb2f0271e3..5bce835c3a 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java @@ -101,7 +101,7 @@ public void shouldSendMessageWhenBlockAdded() { final Long actualSubscriptionId = subscriptionIdCaptor.getValue(); final Object actualBlock = responseCaptor.getValue(); - assertThat(actualSubscriptionId).isEqualTo(subscription.getId()); + assertThat(actualSubscriptionId).isEqualTo(subscription.getSubscriptionId()); assertThat(actualBlock).isEqualToComparingFieldByFieldRecursively(expectedNewBlock); verify(subscriptionManager, times(1)).sendMessage(any(), any()); @@ -135,7 +135,7 @@ public void shouldReturnTxHashesWhenIncludeTransactionsFalse() { final Long actualSubscriptionId = subscriptionIdCaptor.getValue(); final Object actualBlock = responseCaptor.getValue(); - assertThat(actualSubscriptionId).isEqualTo(subscription.getId()); + assertThat(actualSubscriptionId).isEqualTo(subscription.getSubscriptionId()); assertThat(actualBlock).isInstanceOf(BlockResult.class); final BlockResult actualBlockResult = (BlockResult) actualBlock; assertThat(actualBlockResult.getTransactions()).hasSize(txHashList.size()); @@ -176,7 +176,7 @@ public void shouldReturnCompleteTxWhenParameterTrue() { final Long actualSubscriptionId = subscriptionIdCaptor.getValue(); final Object actualBlock = responseCaptor.getValue(); - assertThat(actualSubscriptionId).isEqualTo(subscription.getId()); + assertThat(actualSubscriptionId).isEqualTo(subscription.getSubscriptionId()); assertThat(actualBlock).isInstanceOf(BlockResult.class); final BlockResult actualBlockResult = (BlockResult) actualBlock; assertThat(actualBlockResult.getTransactions()).hasSize(txHashList.size()); @@ -214,8 +214,6 @@ private List transactionsWithHashOnly() { } private NewBlockHeadersSubscription createSubscription(final boolean includeTransactions) { - final NewBlockHeadersSubscription headerSub = - new NewBlockHeadersSubscription(1L, includeTransactions); - return headerSub; + return new NewBlockHeadersSubscription(1L, "conn", includeTransactions); } } diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java index 5e791e2eae..a264926d04 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java @@ -80,7 +80,8 @@ public void shouldSendLogMessageWhenBlockAddedEventHasAddedTransactionsMatchingS logsSubscriptionService.onBlockAdded(createBlockAddedEvent(transaction, null), blockchain); - verify(subscriptionManager).sendMessage(eq(subscription.getId()), refEq(expectedLogResult)); + verify(subscriptionManager) + .sendMessage(eq(subscription.getSubscriptionId()), refEq(expectedLogResult)); } @Test @@ -93,7 +94,8 @@ public void shouldSendLogMessageWhenBlockAddedEventHasRemovedTransactionsMatchin logsSubscriptionService.onBlockAdded(createBlockAddedEvent(null, transaction), blockchain); - verify(subscriptionManager).sendMessage(eq(subscription.getId()), refEq(expectedLogResult)); + verify(subscriptionManager) + .sendMessage(eq(subscription.getSubscriptionId()), refEq(expectedLogResult)); } @Test @@ -109,7 +111,8 @@ public void shouldSendMessageForAllLogsMatchingSubscription() { final int totalOfLogs = addedTransactions.size() + removedTransactions.size(); - verify(subscriptionManager, times(totalOfLogs)).sendMessage(eq(subscription.getId()), any()); + verify(subscriptionManager, times(totalOfLogs)) + .sendMessage(eq(subscription.getSubscriptionId()), any()); } @Test @@ -162,7 +165,7 @@ private Log createLog(final Address address) { private LogsSubscription createSubscription(final Address address) { final FilterParameter filterParameter = new FilterParameter(null, null, Lists.newArrayList(address.toString()), null, null); - final LogsSubscription logsSubscription = new LogsSubscription(1L, filterParameter); + final LogsSubscription logsSubscription = new LogsSubscription(1L, "conn", filterParameter); when(subscriptionManager.subscriptionsOfType(any(), any())) .thenReturn(Lists.newArrayList(logsSubscription)); return logsSubscription; @@ -170,10 +173,10 @@ private LogsSubscription createSubscription(final Address address) { private List createSubscriptions(final Address address) { final List subscriptions = new ArrayList<>(); - for (int i = 0; i < 3; i++) { + for (long i = 0; i < 3; i++) { final FilterParameter filterParameter = new FilterParameter(null, null, Lists.newArrayList(address.toString()), null, null); - subscriptions.add(new LogsSubscription((long) i, filterParameter)); + subscriptions.add(new LogsSubscription(i, "conn", filterParameter)); } when(subscriptionManager.subscriptionsOfType(any(), any())) .thenReturn(Lists.newArrayList(subscriptions)); diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java index a509c1f4ec..4a81d26f07 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionDroppedSubscriptionServiceTest.java @@ -106,7 +106,10 @@ private void setUpSubscriptions(final long... subscriptionsIds) { .mapToObj( id -> new Subscription( - id, SubscriptionType.DROPPED_PENDING_TRANSACTIONS, Boolean.FALSE)) + id, + "conn", + SubscriptionType.DROPPED_PENDING_TRANSACTIONS, + Boolean.FALSE)) .collect(Collectors.toList())); } } diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionServiceTest.java index 689071c937..f23c6e3ba8 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionServiceTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/pending/PendingTransactionSubscriptionServiceTest.java @@ -136,7 +136,10 @@ private void setUpSubscriptions( .mapToObj( id -> new Subscription( - id, SubscriptionType.NEW_PENDING_TRANSACTIONS, includeTransactions)) + id, + "conn", + SubscriptionType.NEW_PENDING_TRANSACTIONS, + includeTransactions)) .collect(Collectors.toList())); } } diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java index 4ea8d23881..830ec0d2ca 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java @@ -54,7 +54,8 @@ public void before() { @Test public void shouldSendSyncStatusWhenReceiveSyncStatus() { - final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING); + final SyncingSubscription subscription = + new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING); final List subscriptions = Collections.singletonList(subscription); final SyncStatus syncStatus = new SyncStatus(0L, 1L, 3L); final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus); @@ -70,12 +71,14 @@ public void shouldSendSyncStatusWhenReceiveSyncStatus() { syncStatusListener.onSyncStatus(syncStatus); - verify(subscriptionManager).sendMessage(eq(subscription.getId()), eq(expectedSyncingResult)); + verify(subscriptionManager) + .sendMessage(eq(subscription.getSubscriptionId()), eq(expectedSyncingResult)); } @Test public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() { - final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING); + final SyncingSubscription subscription = + new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING); final List subscriptions = Collections.singletonList(subscription); final SyncStatus syncStatus = new SyncStatus(0L, 1L, 1L); @@ -91,6 +94,6 @@ public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() { syncStatusListener.onSyncStatus(syncStatus); verify(subscriptionManager) - .sendMessage(eq(subscription.getId()), any(NotSynchronisingResult.class)); + .sendMessage(eq(subscription.getSubscriptionId()), any(NotSynchronisingResult.class)); } }