Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Prevent duplicate ibft messages being processed by state machine (#811)
Browse files Browse the repository at this point in the history
  • Loading branch information
jframe authored Feb 8, 2019
1 parent c044749 commit bc20d50
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class IbftConfigOptions {
// protection for on a typical 20 node validator network with multiple rounds
private static final int DEFAULT_GOSSIPED_HISTORY_LIMIT = 1000;
private static final int DEFAULT_MESSAGE_QUEUE_LIMIT = 1000;
private static final int DEFAULT_DUPLICATE_MESSAGE_LIMIT = 100;

private final JsonObject ibftConfigRoot;

Expand All @@ -51,4 +52,8 @@ public int getGossipedHistoryLimit() {
public int getMessageQueueLimit() {
return ibftConfigRoot.getInteger("messagequeuelimit", DEFAULT_MESSAGE_QUEUE_LIMIT);
}

public int getDuplicateMessageLimit() {
return ibftConfigRoot.getInteger("duplicatemessagelimit", DEFAULT_DUPLICATE_MESSAGE_LIMIT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class IbftConfigOptionsTest {
private static final int EXPECTED_DEFAULT_REQUEST_TIMEOUT = 1;
private static final int EXPECTED_DEFAULT_GOSSIPED_HISTORY_LIMIT = 1000;
private static final int EXPECTED_DEFAULT_MESSAGE_QUEUE_LIMIT = 1000;
private static final int EXPECTED_DEFAULT_DUPLICATE_MESSAGE_LIMIT = 100;

@Test
public void shouldGetEpochLengthFromConfig() {
Expand Down Expand Up @@ -118,6 +119,25 @@ public void shouldGetDefaultMessageQueueLimitFromDefaultConfig() {
.isEqualTo(EXPECTED_DEFAULT_MESSAGE_QUEUE_LIMIT);
}

@Test
public void shouldGetDuplicateMessageLimitFromConfig() {
final IbftConfigOptions config = fromConfigOptions(singletonMap("DuplicateMessageLimit", 50));
assertThat(config.getDuplicateMessageLimit()).isEqualTo(50);
}

@Test
public void shouldFallbackToDefaultDuplicateMessageLimit() {
final IbftConfigOptions config = fromConfigOptions(emptyMap());
assertThat(config.getDuplicateMessageLimit())
.isEqualTo(EXPECTED_DEFAULT_DUPLICATE_MESSAGE_LIMIT);
}

@Test
public void shouldGetDefaultDuplicateMessageLimitFromDefaultConfig() {
assertThat(IbftConfigOptions.DEFAULT.getDuplicateMessageLimit())
.isEqualTo(EXPECTED_DEFAULT_DUPLICATE_MESSAGE_LIMIT);
}

private IbftConfigOptions fromConfigOptions(final Map<String, Object> ibftConfigOptions) {
return GenesisConfigFile.fromConfig(
new JsonObject(singletonMap("config", singletonMap("ibft", ibftConfigOptions))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -112,11 +111,12 @@ public EventMultiplexer getEventMultiplexer() {
public static final int EPOCH_LENGTH = 10_000;
public static final int BLOCK_TIMER_SEC = 3;
public static final int ROUND_TIMER_SEC = 12;
public static final int EVENT_QUEUE_SIZE = 1000;
public static final int SEEN_MESSAGE_SIZE = 100;
public static final int MESSAGE_QUEUE_LIMIT = 1000;
public static final int GOSSIPED_HISTORY_LIMIT = 100;
public static final int DUPLICATE_MESSAGE_LIMIT = 100;

private Clock clock = Clock.fixed(Instant.MIN, ZoneId.of("UTC"));
private IbftEventQueue ibftEventQueue = new IbftEventQueue(EVENT_QUEUE_SIZE);
private IbftEventQueue ibftEventQueue = new IbftEventQueue(MESSAGE_QUEUE_LIMIT);
private int validatorCount = 4;
private int indexOfFirstLocallyProposedBlock = 0; // Meaning first block is from remote peer.
private boolean useGossip = false;
Expand Down Expand Up @@ -159,9 +159,8 @@ public TestContext build() {

// Use a stubbed version of the multicaster, to prevent creating PeerConnections etc.
final StubValidatorMulticaster multicaster = new StubValidatorMulticaster();

final UniqueMessageMulticaster uniqueMulticaster =
new UniqueMessageMulticaster(multicaster, SEEN_MESSAGE_SIZE);
new UniqueMessageMulticaster(multicaster, GOSSIPED_HISTORY_LIMIT);

final Gossiper gossiper = useGossip ? new IbftGossip(uniqueMulticaster) : mock(Gossiper.class);

Expand Down Expand Up @@ -308,7 +307,7 @@ private static ControllerAndState createControllerAndFinalState(
messageValidatorFactory),
messageValidatorFactory),
gossiper,
new HashMap<>());
DUPLICATE_MESSAGE_LIMIT);

final EventMultiplexer eventMultiplexer = new EventMultiplexer(ibftController);
//////////////////////////// END IBFT PantheonController ////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;

import static java.util.Collections.newSetFromMap;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

import java.util.Set;

public class MessageTracker {
private final Set<Hash> seenMessages;

public MessageTracker(final int messageTrackingLimit) {
this.seenMessages = newSetFromMap(new SizeLimitedMap<>(messageTrackingLimit));
}

public void addSeenMessage(final MessageData message) {
final Hash uniqueID = Hash.hash(message.getData());
seenMessages.add(uniqueID);
}

public boolean hasSeenMessage(final MessageData message) {
final Hash uniqueID = Hash.hash(message.getData());
return seenMessages.contains(uniqueID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,18 @@
*/
package tech.pegasys.pantheon.consensus.ibft;

import static java.util.Collections.newSetFromMap;

import tech.pegasys.pantheon.consensus.ibft.network.ValidatorMulticaster;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;

public class UniqueMessageMulticaster implements ValidatorMulticaster {
private final ValidatorMulticaster multicaster;
private final Set<Hash> seenMessages;
private final MessageTracker gossipedMessageTracker;

/**
* Constructor that attaches gossip logic to a set of multicaster
Expand All @@ -36,8 +34,14 @@ public class UniqueMessageMulticaster implements ValidatorMulticaster {
public UniqueMessageMulticaster(
final ValidatorMulticaster multicaster, final int gossipHistoryLimit) {
this.multicaster = multicaster;
// Set that starts evicting members when it hits capacity
this.seenMessages = newSetFromMap(new SizeLimitedMap<>(gossipHistoryLimit));
this.gossipedMessageTracker = new MessageTracker(gossipHistoryLimit);
}

@VisibleForTesting
public UniqueMessageMulticaster(
final ValidatorMulticaster multicaster, final MessageTracker gossipedMessageTracker) {
this.multicaster = multicaster;
this.gossipedMessageTracker = gossipedMessageTracker;
}

@Override
Expand All @@ -47,11 +51,10 @@ public void send(final MessageData message) {

@Override
public void send(final MessageData message, final Collection<Address> blackList) {
final Hash uniqueID = Hash.hash(message.getData());
if (seenMessages.contains(uniqueID)) {
if (gossipedMessageTracker.hasSeenMessage(message)) {
return;
}
multicaster.send(message, blackList);
seenMessages.add(uniqueID);
gossipedMessageTracker.addSeenMessage(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import tech.pegasys.pantheon.consensus.ibft.ConsensusRoundIdentifier;
import tech.pegasys.pantheon.consensus.ibft.Gossiper;
import tech.pegasys.pantheon.consensus.ibft.IbftGossip;
import tech.pegasys.pantheon.consensus.ibft.MessageTracker;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.BlockTimerExpiry;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.IbftReceivedMessageEvent;
import tech.pegasys.pantheon.consensus.ibft.ibftevent.NewChainHead;
Expand Down Expand Up @@ -45,21 +45,28 @@
import org.apache.logging.log4j.Logger;

public class IbftController {

private static final Logger LOG = LogManager.getLogger();
private final Blockchain blockchain;
private final IbftFinalState ibftFinalState;
private final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory;
private final Map<Long, List<Message>> futureMessages;
private BlockHeightManager currentHeightManager;
private final Gossiper gossiper;
private final MessageTracker duplicateMessageTracker;

public IbftController(
final Blockchain blockchain,
final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final IbftGossip gossiper) {
this(blockchain, ibftFinalState, ibftBlockHeightManagerFactory, gossiper, Maps.newHashMap());
final Gossiper gossiper,
final int duplicateMessageLimit) {
this(
blockchain,
ibftFinalState,
ibftBlockHeightManagerFactory,
gossiper,
Maps.newHashMap(),
new MessageTracker(duplicateMessageLimit));
}

@VisibleForTesting
Expand All @@ -68,20 +75,26 @@ public IbftController(
final IbftFinalState ibftFinalState,
final IbftBlockHeightManagerFactory ibftBlockHeightManagerFactory,
final Gossiper gossiper,
final Map<Long, List<Message>> futureMessages) {
final Map<Long, List<Message>> futureMessages,
final MessageTracker duplicateMessageTracker) {
this.blockchain = blockchain;
this.ibftFinalState = ibftFinalState;
this.ibftBlockHeightManagerFactory = ibftBlockHeightManagerFactory;
this.futureMessages = futureMessages;
this.gossiper = gossiper;
this.duplicateMessageTracker = duplicateMessageTracker;
}

public void start() {
startNewHeightManager(blockchain.getChainHeadHeader());
}

public void handleMessageEvent(final IbftReceivedMessageEvent msg) {
handleMessage(msg.getMessage());
final MessageData data = msg.getMessage().getData();
if (!duplicateMessageTracker.hasSeenMessage(data)) {
duplicateMessageTracker.addSeenMessage(data);
handleMessage(msg.getMessage());
}
}

private void handleMessage(final Message message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.consensus.ibft;

import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import org.junit.Test;

public class MessageTrackerTest {
private final MessageTracker messageTracker = new MessageTracker(5);

@Test
public void duplicateMessagesAreConsideredSeen() {
final MessageData arbitraryMessage_1 =
createAnonymousMessageData(BytesValue.wrap(new byte[4]), 1);

final MessageData arbitraryMessage_2 =
createAnonymousMessageData(BytesValue.wrap(new byte[4]), 1);

assertThat(messageTracker.hasSeenMessage(arbitraryMessage_1)).isFalse();
assertThat(messageTracker.hasSeenMessage(arbitraryMessage_2)).isFalse();

messageTracker.addSeenMessage(arbitraryMessage_1);
assertThat(messageTracker.hasSeenMessage(arbitraryMessage_2)).isTrue();
}

private MessageData createAnonymousMessageData(final BytesValue content, final int code) {
return new MessageData() {

@Override
public int getSize() {
return content.size();
}

@Override
public int getCode() {
return code;
}

@Override
public BytesValue getData() {
return content;
}
};
}
}
Loading

0 comments on commit bc20d50

Please sign in to comment.