Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[PAN-2595] Consolidate local enode representation #1376

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@ public TestNode create(
return node;
}

public void startNetworks() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each individual network is already started here

for (final TestNode node : nodes) {
node.network.start();
}
}

public void connectAndAssertAll()
throws InterruptedException, ExecutionException, TimeoutException {
for (int i = 0; i < nodes.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public void tearDown() {

/** Helper to do common setup tasks. */
private void initTest(final TestNodeList txNodes) throws Exception {
txNodes.startNetworks();
txNodes.connectAndAssertAll();
txNodes.logPeerConnections();
txNodes.assertPeerCounts();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@

import java.util.Collection;
import java.util.Optional;
import java.util.function.Supplier;

/**
* A permissioning provider that only provides an answer when we have no peers outside of our
* bootnodes
*/
public class InsufficientPeersPermissioningProvider implements ContextualNodePermissioningProvider {
private final EnodeURL selfEnode;
private final Supplier<Optional<EnodeURL>> selfEnode;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we passed in a static EnodeURL constructed in RunnerBuilder. This enode wasn't 100% accurate as the network can be configured to choose a port on startup, and the representation constructed in the builder would not accurately reflect the real ports being used. We're now passing a supplier that returns the canonical local enode representation when available.

private final P2PNetwork p2pNetwork;
private final Collection<EnodeURL> bootnodeEnodes;
private long nonBootnodePeerConnections;
Expand All @@ -37,12 +38,13 @@ public class InsufficientPeersPermissioningProvider implements ContextualNodePer
* Creates the provider observing the provided p2p network
*
* @param p2pNetwork the p2p network to observe
* @param selfEnode the advertised enode address of this node
* @param selfEnode A supplier that provides a representation of the locally running node, if
* available
* @param bootnodeEnodes the bootnodes that this node is configured to connection to
*/
public InsufficientPeersPermissioningProvider(
final P2PNetwork p2pNetwork,
final EnodeURL selfEnode,
final Supplier<Optional<EnodeURL>> selfEnode,
final Collection<EnodeURL> bootnodeEnodes) {
this.selfEnode = selfEnode;
this.p2pNetwork = p2pNetwork;
Expand All @@ -64,17 +66,23 @@ private long countP2PNetworkNonBootnodeConnections() {
@Override
public Optional<Boolean> isPermitted(
final EnodeURL sourceEnode, final EnodeURL destinationEnode) {
Optional<EnodeURL> maybeSelfEnode = selfEnode.get();
if (nonBootnodePeerConnections > 0) {
return Optional.empty();
} else if (checkEnode(sourceEnode) && checkEnode(destinationEnode)) {
} else if (!maybeSelfEnode.isPresent()) {
// The local node is not yet ready, so we can't validate enodes yet
return Optional.empty();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll now only return an answer when the network is ready.

} else if (checkEnode(maybeSelfEnode.get(), sourceEnode)
&& checkEnode(maybeSelfEnode.get(), destinationEnode)) {
return Optional.of(true);
} else {
return Optional.empty();
}
}

private boolean checkEnode(final EnodeURL enode) {
return (enode.sameEndpoint(selfEnode) || bootnodeEnodes.stream().anyMatch(enode::sameEndpoint));
private boolean checkEnode(final EnodeURL localEnode, final EnodeURL enode) {
return (enode.sameEndpoint(localEnode)
|| bootnodeEnodes.stream().anyMatch(enode::sameEndpoint));
}

private void handleConnect(final PeerConnection peerConnection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,23 @@ public interface P2PNetwork extends Closeable {
void subscribeDisconnect(DisconnectCallback consumer);

/**
* Adds a {@link Peer} to a list indicating efforts should be made to always stay connected to it
* Adds a {@link Peer} to a list indicating efforts should be made to always stay connected
* regardless of maxPeer limits. Non-permitted peers may be added to this list, but will not
* actually be connected to as long as they are prohibited.
*
* @param peer The peer that should be connected to
* @return boolean representing whether or not the peer has been added to the list or was already
* on it
* @return boolean representing whether or not the peer has been added to the list, false is
* returned if the peer was already on the list
*/
boolean addMaintainConnectionPeer(final Peer peer);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, peer permission would be checked as peers were added to this list. The changes in this PR delay the point at which we're able to check permission to the point where our local node is up and running. As a result, if we were to gate entrance into the list based on permissions, there would be a period where no nodes could be added to this list while the node is starting up. So, this API has been updated such that a peer can be added to the "maintain connection" list whether or not it is permitted on the network. However, no peers will actually be connected to without first checking permissions.


/**
* Removes a {@link Peer} from a list indicating any existing efforts to connect to a given peer
* should be removed, and if connected, the peer should be disconnected
* Disconnect and remove the given {@link Peer} from the maintained peer list. Peer is
* disconnected even if it is not in the maintained peer list. See {@link
* #addMaintainConnectionPeer(Peer)} for details on the maintained peer list.
*
* @param peer The peer to which connections are not longer required
* @return boolean representing whether or not the peer has been disconnected, or if it was not
* currently connected.
* @return boolean representing whether the peer was removed from the maintained peer list
*/
boolean removeMaintainedConnectionPeer(final Peer peer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.p2p.ConnectingToLocalNodeException;
import tech.pegasys.pantheon.ethereum.p2p.PeerNotPermittedException;
import tech.pegasys.pantheon.ethereum.p2p.api.DisconnectCallback;
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork;
Expand Down Expand Up @@ -71,6 +69,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -182,12 +181,14 @@ public class DefaultP2PNetwork implements P2PNetwork {

private final String advertisedHost;

private volatile EnodeURL ourEnodeURL;
private volatile Optional<EnodeURL> localEnode = Optional.empty();

private final Optional<NodePermissioningController> nodePermissioningController;
private final Optional<Blockchain> blockchain;
private OptionalLong blockAddedObserverId = OptionalLong.empty();

private final AtomicBoolean started = new AtomicBoolean(false);

/**
* Creates a peer networking service for production purposes.
*
Expand Down Expand Up @@ -346,7 +347,7 @@ protected void initChannel(final SocketChannel ch) {
return;
}

if (!isPeerConnectionAllowed(connection)) {
if (!isPeerAllowed(connection)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Permission checks have been centralized to one method.

connection.disconnect(DisconnectReason.UNKNOWN);
return;
}
Expand All @@ -362,21 +363,13 @@ protected void initChannel(final SocketChannel ch) {

@Override
public boolean addMaintainConnectionPeer(final Peer peer) {
if (!isPeerAllowed(peer)) {
throw new PeerNotPermittedException();
}

if (peer.getId().equals(ourPeerInfo.getNodeId())) {
throw new ConnectingToLocalNodeException();
}

final boolean added = peerMaintainConnectionList.add(peer);
if (added) {
if (isPeerAllowed(peer) && !isConnectingOrConnected(peer)) {
// Connect immediately if appropriate
connect(peer);
return true;
} else {
return false;
}

return added;
}

@Override
Expand All @@ -394,12 +387,11 @@ public boolean removeMaintainedConnectionPeer(final Peer peer) {
return removed;
}

public void checkMaintainedConnectionPeers() {
for (final Peer peer : peerMaintainConnectionList) {
if (!(isConnecting(peer) || isConnected(peer))) {
connect(peer);
}
}
void checkMaintainedConnectionPeers() {
peerMaintainConnectionList.stream()
.filter(p -> !isConnectingOrConnected(p))
.filter(this::isPeerAllowed)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check permissions before connecting :\

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we drop them from the maintained list if they aren't permitted?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. The peer might later be permitted.

.forEach(this::connect);
}

@VisibleForTesting
Expand Down Expand Up @@ -529,33 +521,36 @@ public void subscribeDisconnect(final DisconnectCallback callback) {

@Override
public void start() {
peerDiscoveryAgent.start(ourPeerInfo.getPort()).join();
peerBondedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent()));
peerDroppedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerDroppedEvents(handlePeerDroppedEvents()));

if (nodePermissioningController.isPresent()) {
if (blockchain.isPresent()) {
synchronized (this) {
if (!blockAddedObserverId.isPresent()) {
blockAddedObserverId =
OptionalLong.of(blockchain.get().observeBlockAdded(this::handleBlockAddedEvent));
if (started.compareAndSet(false, true)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Start can be called at most once

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we switch this to use an early exit pattern rather than wrapping the whole method in an if?

peerDiscoveryAgent.start(ourPeerInfo.getPort()).join();
peerBondedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent()));
peerDroppedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerDroppedEvents(handlePeerDroppedEvents()));

if (nodePermissioningController.isPresent()) {
if (blockchain.isPresent()) {
synchronized (this) {
if (!blockAddedObserverId.isPresent()) {
blockAddedObserverId =
OptionalLong.of(blockchain.get().observeBlockAdded(this::handleBlockAddedEvent));
}
}
} else {
throw new IllegalStateException(
"Network permissioning needs to listen to BlockAddedEvents. Blockchain can't be null.");
}
} else {
throw new IllegalStateException(
"Network permissioning needs to listen to BlockAddedEvents. Blockchain can't be null.");
}
}

this.ourEnodeURL = buildSelfEnodeURL();
LOG.info("Enode URL {}", ourEnodeURL.toString());
setLocalEnode();

peerConnectionScheduler.scheduleWithFixedDelay(
this::checkMaintainedConnectionPeers, 60, 60, TimeUnit.SECONDS);
peerConnectionScheduler.scheduleWithFixedDelay(
this::attemptPeerConnections, 30, 30, TimeUnit.SECONDS);
peerConnectionScheduler.scheduleWithFixedDelay(
this::checkMaintainedConnectionPeers, 2, 60, TimeUnit.SECONDS);
peerConnectionScheduler.scheduleWithFixedDelay(
this::attemptPeerConnections, 30, 30, TimeUnit.SECONDS);
} else {
LOG.warn("Attempted to start an already started P2PNetwork");
}
}

@VisibleForTesting
Expand Down Expand Up @@ -584,7 +579,7 @@ private synchronized void handleBlockAddedEvent(
.getPeerConnections()
.forEach(
peerConnection -> {
if (!isPeerConnectionAllowed(peerConnection)) {
if (!isPeerAllowed(peerConnection)) {
peerConnection.disconnect(DisconnectReason.REQUESTED);
}
});
Expand All @@ -595,38 +590,36 @@ private synchronized void checkCurrentConnections() {
.getPeerConnections()
.forEach(
peerConnection -> {
if (!isPeerConnectionAllowed(peerConnection)) {
if (!isPeerAllowed(peerConnection)) {
peerConnection.disconnect(DisconnectReason.REQUESTED);
}
});
}

private boolean isPeerConnectionAllowed(final PeerConnection peerConnection) {
if (peerBlacklist.contains(peerConnection)) {
return false;
}

LOG.trace(
"Checking if connection with peer {} is permitted",
peerConnection.getPeerInfo().getNodeId());

return nodePermissioningController
.map(
c -> {
final EnodeURL localPeerEnodeURL = getLocalEnode().orElse(buildSelfEnodeURL());
final EnodeURL remotePeerEnodeURL = peerConnection.getRemoteEnode();
return c.isPermitted(localPeerEnodeURL, remotePeerEnodeURL);
})
.orElse(true);
private boolean isPeerAllowed(final PeerConnection conn) {
return isPeerAllowed(conn.getRemoteEnode());
}

private boolean isPeerAllowed(final Peer peer) {
if (peerBlacklist.contains(peer)) {
return isPeerAllowed(peer.getEnodeURL());
}

private boolean isPeerAllowed(final EnodeURL enode) {
if (peerBlacklist.contains(enode.getNodeId())) {
return false;
}
if (enode.getNodeId().equals(ourPeerInfo.getNodeId())) {
// Peer matches our node id
return false;
}

Optional<EnodeURL> maybeEnode = getLocalEnode();
if (!maybeEnode.isPresent()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be restrictive around permissions until we have a valid representation of our local enode.

// If local enode isn't yet available we can't evaluate permissions
return false;
}
return nodePermissioningController
.map(c -> c.isPermitted(ourEnodeURL, peer.getEnodeURL()))
.map(c -> c.isPermitted(maybeEnode.get(), enode))
.orElse(true);
}

Expand All @@ -640,6 +633,10 @@ boolean isConnected(final Peer peer) {
return connections.isAlreadyConnected(peer.getId());
}

private boolean isConnectingOrConnected(final Peer peer) {
return isConnected(peer) || isConnecting(peer);
}

@Override
public void stop() {
sendClientQuittingToPeers();
Expand Down Expand Up @@ -691,10 +688,14 @@ public boolean isDiscoveryEnabled() {

@Override
public Optional<EnodeURL> getLocalEnode() {
return Optional.ofNullable(ourEnodeURL);
return localEnode;
}

private EnodeURL buildSelfEnodeURL() {
private void setLocalEnode() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this probably should be createLocalEnode since it doesn't follow the typical setter pattern.

if (localEnode.isPresent()) {
return;
}

final BytesValue nodeId = ourPeerInfo.getNodeId();
final int listeningPort = ourPeerInfo.getPort();
final OptionalInt discoveryPort =
Expand All @@ -704,12 +705,16 @@ private EnodeURL buildSelfEnodeURL() {
.filter(port -> port.getAsInt() != listeningPort)
.orElse(OptionalInt.empty());

return EnodeURL.builder()
.nodeId(nodeId)
.ipAddress(advertisedHost)
.listeningPort(listeningPort)
.discoveryPort(discoveryPort)
.build();
final EnodeURL localEnode =
EnodeURL.builder()
.nodeId(nodeId)
.ipAddress(advertisedHost)
.listeningPort(listeningPort)
.discoveryPort(discoveryPort)
.build();

LOG.info("Enode URL {}", localEnode.toString());
this.localEnode = Optional.of(localEnode);
}

private void onConnectionEstablished(final PeerConnection connection) {
Expand All @@ -719,14 +724,14 @@ private void onConnectionEstablished(final PeerConnection connection) {

public static class Builder {

protected PeerDiscoveryAgent peerDiscoveryAgent;
protected KeyPair keyPair;
protected NetworkingConfiguration config = NetworkingConfiguration.create();
protected List<Capability> supportedCapabilities;
protected PeerBlacklist peerBlacklist;
protected MetricsSystem metricsSystem;
protected Optional<NodePermissioningController> nodePermissioningController = Optional.empty();
protected Blockchain blockchain = null;
private PeerDiscoveryAgent peerDiscoveryAgent;
private KeyPair keyPair;
private NetworkingConfiguration config = NetworkingConfiguration.create();
private List<Capability> supportedCapabilities;
private PeerBlacklist peerBlacklist;
private MetricsSystem metricsSystem;
private Optional<NodePermissioningController> nodePermissioningController = Optional.empty();
private Blockchain blockchain = null;
private Vertx vertx;
private Optional<NodeLocalConfigPermissioningController>
nodeLocalConfigPermissioningController = Optional.empty();
Expand Down
Loading