-
Notifications
You must be signed in to change notification settings - Fork 130
[PAN-2595] Consolidate local enode representation #1376
Changes from 10 commits
45fc6b7
4f86e9a
19cd5f0
c97239a
27e1d2d
e95d068
eab4662
d4c8f2e
98e7be1
128521b
7335b49
c3c25bd
ff867bb
b286f7d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,13 +21,14 @@ | |
|
||
import java.util.Collection; | ||
import java.util.Optional; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* A permissioning provider that only provides an answer when we have no peers outside of our | ||
* bootnodes | ||
*/ | ||
public class InsufficientPeersPermissioningProvider implements ContextualNodePermissioningProvider { | ||
private final EnodeURL selfEnode; | ||
private final Supplier<Optional<EnodeURL>> selfEnode; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, we passed in a static |
||
private final P2PNetwork p2pNetwork; | ||
private final Collection<EnodeURL> bootnodeEnodes; | ||
private long nonBootnodePeerConnections; | ||
|
@@ -37,12 +38,13 @@ public class InsufficientPeersPermissioningProvider implements ContextualNodePer | |
* Creates the provider observing the provided p2p network | ||
* | ||
* @param p2pNetwork the p2p network to observe | ||
* @param selfEnode the advertised enode address of this node | ||
* @param selfEnode A supplier that provides a representation of the locally running node, if | ||
* available | ||
* @param bootnodeEnodes the bootnodes that this node is configured to connection to | ||
*/ | ||
public InsufficientPeersPermissioningProvider( | ||
final P2PNetwork p2pNetwork, | ||
final EnodeURL selfEnode, | ||
final Supplier<Optional<EnodeURL>> selfEnode, | ||
final Collection<EnodeURL> bootnodeEnodes) { | ||
this.selfEnode = selfEnode; | ||
this.p2pNetwork = p2pNetwork; | ||
|
@@ -64,17 +66,23 @@ private long countP2PNetworkNonBootnodeConnections() { | |
@Override | ||
public Optional<Boolean> isPermitted( | ||
final EnodeURL sourceEnode, final EnodeURL destinationEnode) { | ||
Optional<EnodeURL> maybeSelfEnode = selfEnode.get(); | ||
if (nonBootnodePeerConnections > 0) { | ||
return Optional.empty(); | ||
} else if (checkEnode(sourceEnode) && checkEnode(destinationEnode)) { | ||
} else if (!maybeSelfEnode.isPresent()) { | ||
// The local node is not yet ready, so we can't validate enodes yet | ||
return Optional.empty(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'll now only return an answer when the network is ready. |
||
} else if (checkEnode(maybeSelfEnode.get(), sourceEnode) | ||
&& checkEnode(maybeSelfEnode.get(), destinationEnode)) { | ||
return Optional.of(true); | ||
} else { | ||
return Optional.empty(); | ||
} | ||
} | ||
|
||
private boolean checkEnode(final EnodeURL enode) { | ||
return (enode.sameEndpoint(selfEnode) || bootnodeEnodes.stream().anyMatch(enode::sameEndpoint)); | ||
private boolean checkEnode(final EnodeURL localEnode, final EnodeURL enode) { | ||
return (enode.sameEndpoint(localEnode) | ||
|| bootnodeEnodes.stream().anyMatch(enode::sameEndpoint)); | ||
} | ||
|
||
private void handleConnect(final PeerConnection peerConnection) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,21 +77,23 @@ public interface P2PNetwork extends Closeable { | |
void subscribeDisconnect(DisconnectCallback consumer); | ||
|
||
/** | ||
* Adds a {@link Peer} to a list indicating efforts should be made to always stay connected to it | ||
* Adds a {@link Peer} to a list indicating efforts should be made to always stay connected | ||
* regardless of maxPeer limits. Non-permitted peers may be added to this list, but will not | ||
* actually be connected to as long as they are prohibited. | ||
* | ||
* @param peer The peer that should be connected to | ||
* @return boolean representing whether or not the peer has been added to the list or was already | ||
* on it | ||
* @return boolean representing whether or not the peer has been added to the list, false is | ||
* returned if the peer was already on the list | ||
*/ | ||
boolean addMaintainConnectionPeer(final Peer peer); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, peer permission would be checked as peers were added to this list. The changes in this PR delay the point at which we're able to check permission to the point where our local node is up and running. As a result, if we were to gate entrance into the list based on permissions, there would be a period where no nodes could be added to this list while the node is starting up. So, this API has been updated such that a peer can be added to the "maintain connection" list whether or not it is permitted on the network. However, no peers will actually be connected to without first checking permissions. |
||
|
||
/** | ||
* Removes a {@link Peer} from a list indicating any existing efforts to connect to a given peer | ||
* should be removed, and if connected, the peer should be disconnected | ||
* Disconnect and remove the given {@link Peer} from the maintained peer list. Peer is | ||
* disconnected even if it is not in the maintained peer list. See {@link | ||
* #addMaintainConnectionPeer(Peer)} for details on the maintained peer list. | ||
* | ||
* @param peer The peer to which connections are not longer required | ||
* @return boolean representing whether or not the peer has been disconnected, or if it was not | ||
* currently connected. | ||
* @return boolean representing whether the peer was removed from the maintained peer list | ||
*/ | ||
boolean removeMaintainedConnectionPeer(final Peer peer); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,8 +19,6 @@ | |
import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair; | ||
import tech.pegasys.pantheon.ethereum.chain.BlockAddedEvent; | ||
import tech.pegasys.pantheon.ethereum.chain.Blockchain; | ||
import tech.pegasys.pantheon.ethereum.p2p.ConnectingToLocalNodeException; | ||
import tech.pegasys.pantheon.ethereum.p2p.PeerNotPermittedException; | ||
import tech.pegasys.pantheon.ethereum.p2p.api.DisconnectCallback; | ||
import tech.pegasys.pantheon.ethereum.p2p.api.Message; | ||
import tech.pegasys.pantheon.ethereum.p2p.api.P2PNetwork; | ||
|
@@ -71,6 +69,7 @@ | |
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Consumer; | ||
import java.util.function.Supplier; | ||
import java.util.stream.Collectors; | ||
|
@@ -182,12 +181,14 @@ public class DefaultP2PNetwork implements P2PNetwork { | |
|
||
private final String advertisedHost; | ||
|
||
private volatile EnodeURL ourEnodeURL; | ||
private volatile Optional<EnodeURL> localEnode = Optional.empty(); | ||
|
||
private final Optional<NodePermissioningController> nodePermissioningController; | ||
private final Optional<Blockchain> blockchain; | ||
private OptionalLong blockAddedObserverId = OptionalLong.empty(); | ||
|
||
private final AtomicBoolean started = new AtomicBoolean(false); | ||
|
||
/** | ||
* Creates a peer networking service for production purposes. | ||
* | ||
|
@@ -346,7 +347,7 @@ protected void initChannel(final SocketChannel ch) { | |
return; | ||
} | ||
|
||
if (!isPeerConnectionAllowed(connection)) { | ||
if (!isPeerAllowed(connection)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Permission checks have been centralized to one method. |
||
connection.disconnect(DisconnectReason.UNKNOWN); | ||
return; | ||
} | ||
|
@@ -362,21 +363,13 @@ protected void initChannel(final SocketChannel ch) { | |
|
||
@Override | ||
public boolean addMaintainConnectionPeer(final Peer peer) { | ||
if (!isPeerAllowed(peer)) { | ||
throw new PeerNotPermittedException(); | ||
} | ||
|
||
if (peer.getId().equals(ourPeerInfo.getNodeId())) { | ||
throw new ConnectingToLocalNodeException(); | ||
} | ||
|
||
final boolean added = peerMaintainConnectionList.add(peer); | ||
if (added) { | ||
if (isPeerAllowed(peer) && !isConnectingOrConnected(peer)) { | ||
// Connect immediately if appropriate | ||
connect(peer); | ||
return true; | ||
} else { | ||
return false; | ||
} | ||
|
||
return added; | ||
} | ||
|
||
@Override | ||
|
@@ -394,12 +387,11 @@ public boolean removeMaintainedConnectionPeer(final Peer peer) { | |
return removed; | ||
} | ||
|
||
public void checkMaintainedConnectionPeers() { | ||
for (final Peer peer : peerMaintainConnectionList) { | ||
if (!(isConnecting(peer) || isConnected(peer))) { | ||
connect(peer); | ||
} | ||
} | ||
void checkMaintainedConnectionPeers() { | ||
peerMaintainConnectionList.stream() | ||
.filter(p -> !isConnectingOrConnected(p)) | ||
.filter(this::isPeerAllowed) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check permissions before connecting :\ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we drop them from the maintained list if they aren't permitted? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. The peer might later be permitted. |
||
.forEach(this::connect); | ||
} | ||
|
||
@VisibleForTesting | ||
|
@@ -529,33 +521,36 @@ public void subscribeDisconnect(final DisconnectCallback callback) { | |
|
||
@Override | ||
public void start() { | ||
peerDiscoveryAgent.start(ourPeerInfo.getPort()).join(); | ||
peerBondedObserverId = | ||
OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent())); | ||
peerDroppedObserverId = | ||
OptionalLong.of(peerDiscoveryAgent.observePeerDroppedEvents(handlePeerDroppedEvents())); | ||
|
||
if (nodePermissioningController.isPresent()) { | ||
if (blockchain.isPresent()) { | ||
synchronized (this) { | ||
if (!blockAddedObserverId.isPresent()) { | ||
blockAddedObserverId = | ||
OptionalLong.of(blockchain.get().observeBlockAdded(this::handleBlockAddedEvent)); | ||
if (started.compareAndSet(false, true)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Start can be called at most once There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Should we switch this to use an early exit pattern rather than wrapping the whole method in an if? |
||
peerDiscoveryAgent.start(ourPeerInfo.getPort()).join(); | ||
peerBondedObserverId = | ||
OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent())); | ||
peerDroppedObserverId = | ||
OptionalLong.of(peerDiscoveryAgent.observePeerDroppedEvents(handlePeerDroppedEvents())); | ||
|
||
if (nodePermissioningController.isPresent()) { | ||
if (blockchain.isPresent()) { | ||
synchronized (this) { | ||
if (!blockAddedObserverId.isPresent()) { | ||
blockAddedObserverId = | ||
OptionalLong.of(blockchain.get().observeBlockAdded(this::handleBlockAddedEvent)); | ||
} | ||
} | ||
} else { | ||
throw new IllegalStateException( | ||
"Network permissioning needs to listen to BlockAddedEvents. Blockchain can't be null."); | ||
} | ||
} else { | ||
throw new IllegalStateException( | ||
"Network permissioning needs to listen to BlockAddedEvents. Blockchain can't be null."); | ||
} | ||
} | ||
|
||
this.ourEnodeURL = buildSelfEnodeURL(); | ||
LOG.info("Enode URL {}", ourEnodeURL.toString()); | ||
setLocalEnode(); | ||
|
||
peerConnectionScheduler.scheduleWithFixedDelay( | ||
this::checkMaintainedConnectionPeers, 60, 60, TimeUnit.SECONDS); | ||
peerConnectionScheduler.scheduleWithFixedDelay( | ||
this::attemptPeerConnections, 30, 30, TimeUnit.SECONDS); | ||
peerConnectionScheduler.scheduleWithFixedDelay( | ||
this::checkMaintainedConnectionPeers, 2, 60, TimeUnit.SECONDS); | ||
peerConnectionScheduler.scheduleWithFixedDelay( | ||
this::attemptPeerConnections, 30, 30, TimeUnit.SECONDS); | ||
} else { | ||
LOG.warn("Attempted to start an already started P2PNetwork"); | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
|
@@ -584,7 +579,7 @@ private synchronized void handleBlockAddedEvent( | |
.getPeerConnections() | ||
.forEach( | ||
peerConnection -> { | ||
if (!isPeerConnectionAllowed(peerConnection)) { | ||
if (!isPeerAllowed(peerConnection)) { | ||
peerConnection.disconnect(DisconnectReason.REQUESTED); | ||
} | ||
}); | ||
|
@@ -595,38 +590,36 @@ private synchronized void checkCurrentConnections() { | |
.getPeerConnections() | ||
.forEach( | ||
peerConnection -> { | ||
if (!isPeerConnectionAllowed(peerConnection)) { | ||
if (!isPeerAllowed(peerConnection)) { | ||
peerConnection.disconnect(DisconnectReason.REQUESTED); | ||
} | ||
}); | ||
} | ||
|
||
private boolean isPeerConnectionAllowed(final PeerConnection peerConnection) { | ||
if (peerBlacklist.contains(peerConnection)) { | ||
return false; | ||
} | ||
|
||
LOG.trace( | ||
"Checking if connection with peer {} is permitted", | ||
peerConnection.getPeerInfo().getNodeId()); | ||
|
||
return nodePermissioningController | ||
.map( | ||
c -> { | ||
final EnodeURL localPeerEnodeURL = getLocalEnode().orElse(buildSelfEnodeURL()); | ||
final EnodeURL remotePeerEnodeURL = peerConnection.getRemoteEnode(); | ||
return c.isPermitted(localPeerEnodeURL, remotePeerEnodeURL); | ||
}) | ||
.orElse(true); | ||
private boolean isPeerAllowed(final PeerConnection conn) { | ||
return isPeerAllowed(conn.getRemoteEnode()); | ||
} | ||
|
||
private boolean isPeerAllowed(final Peer peer) { | ||
if (peerBlacklist.contains(peer)) { | ||
return isPeerAllowed(peer.getEnodeURL()); | ||
} | ||
|
||
private boolean isPeerAllowed(final EnodeURL enode) { | ||
if (peerBlacklist.contains(enode.getNodeId())) { | ||
return false; | ||
} | ||
if (enode.getNodeId().equals(ourPeerInfo.getNodeId())) { | ||
// Peer matches our node id | ||
return false; | ||
} | ||
|
||
Optional<EnodeURL> maybeEnode = getLocalEnode(); | ||
if (!maybeEnode.isPresent()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Be restrictive around permissions until we have a valid representation of our local enode. |
||
// If local enode isn't yet available we can't evaluate permissions | ||
return false; | ||
} | ||
return nodePermissioningController | ||
.map(c -> c.isPermitted(ourEnodeURL, peer.getEnodeURL())) | ||
.map(c -> c.isPermitted(maybeEnode.get(), enode)) | ||
.orElse(true); | ||
} | ||
|
||
|
@@ -640,6 +633,10 @@ boolean isConnected(final Peer peer) { | |
return connections.isAlreadyConnected(peer.getId()); | ||
} | ||
|
||
private boolean isConnectingOrConnected(final Peer peer) { | ||
return isConnected(peer) || isConnecting(peer); | ||
} | ||
|
||
@Override | ||
public void stop() { | ||
sendClientQuittingToPeers(); | ||
|
@@ -691,10 +688,14 @@ public boolean isDiscoveryEnabled() { | |
|
||
@Override | ||
public Optional<EnodeURL> getLocalEnode() { | ||
return Optional.ofNullable(ourEnodeURL); | ||
return localEnode; | ||
} | ||
|
||
private EnodeURL buildSelfEnodeURL() { | ||
private void setLocalEnode() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this probably should be |
||
if (localEnode.isPresent()) { | ||
return; | ||
} | ||
|
||
final BytesValue nodeId = ourPeerInfo.getNodeId(); | ||
final int listeningPort = ourPeerInfo.getPort(); | ||
final OptionalInt discoveryPort = | ||
|
@@ -704,12 +705,16 @@ private EnodeURL buildSelfEnodeURL() { | |
.filter(port -> port.getAsInt() != listeningPort) | ||
.orElse(OptionalInt.empty()); | ||
|
||
return EnodeURL.builder() | ||
.nodeId(nodeId) | ||
.ipAddress(advertisedHost) | ||
.listeningPort(listeningPort) | ||
.discoveryPort(discoveryPort) | ||
.build(); | ||
final EnodeURL localEnode = | ||
EnodeURL.builder() | ||
.nodeId(nodeId) | ||
.ipAddress(advertisedHost) | ||
.listeningPort(listeningPort) | ||
.discoveryPort(discoveryPort) | ||
.build(); | ||
|
||
LOG.info("Enode URL {}", localEnode.toString()); | ||
this.localEnode = Optional.of(localEnode); | ||
} | ||
|
||
private void onConnectionEstablished(final PeerConnection connection) { | ||
|
@@ -719,14 +724,14 @@ private void onConnectionEstablished(final PeerConnection connection) { | |
|
||
public static class Builder { | ||
|
||
protected PeerDiscoveryAgent peerDiscoveryAgent; | ||
protected KeyPair keyPair; | ||
protected NetworkingConfiguration config = NetworkingConfiguration.create(); | ||
protected List<Capability> supportedCapabilities; | ||
protected PeerBlacklist peerBlacklist; | ||
protected MetricsSystem metricsSystem; | ||
protected Optional<NodePermissioningController> nodePermissioningController = Optional.empty(); | ||
protected Blockchain blockchain = null; | ||
private PeerDiscoveryAgent peerDiscoveryAgent; | ||
private KeyPair keyPair; | ||
private NetworkingConfiguration config = NetworkingConfiguration.create(); | ||
private List<Capability> supportedCapabilities; | ||
private PeerBlacklist peerBlacklist; | ||
private MetricsSystem metricsSystem; | ||
private Optional<NodePermissioningController> nodePermissioningController = Optional.empty(); | ||
private Blockchain blockchain = null; | ||
private Vertx vertx; | ||
private Optional<NodeLocalConfigPermissioningController> | ||
nodeLocalConfigPermissioningController = Optional.empty(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each individual network is already started here