Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual watching of peer-connection actor #1567

Merged
merged 2 commits into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe
d.peerConnection ! PoisonPill
stay

case Event(Terminated(actor), d: ConnectedData) if actor == d.peerConnection =>
case Event(ConnectionDown(peerConnection), d: ConnectedData) if peerConnection == d.peerConnection =>
Logs.withMdc(diagLog)(Logs.mdc(category_opt = Some(Logs.LogCategory.CONNECTION))) {
log.info("connection lost")
}
Expand All @@ -196,7 +196,6 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe

case Event(connectionReady: PeerConnection.ConnectionReady, d: ConnectedData) =>
log.info(s"got new connection, killing current one and switching")
context unwatch d.peerConnection
d.peerConnection ! PoisonPill
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
gotoConnected(connectionReady, d.channels)
Expand Down Expand Up @@ -254,8 +253,6 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, watcher: ActorRe
require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeid: $remoteNodeId != ${connectionReady.remoteNodeId}")
log.debug("got authenticated connection to address {}:{}", connectionReady.address.getHostString, connectionReady.address.getPort)

context watch connectionReady.peerConnection

if (connectionReady.outgoing) {
// we store the node address upon successful outgoing connection, so we can reconnect later
// any previous address is overwritten
Expand Down Expand Up @@ -397,6 +394,12 @@ object Peer {

case class Transition(previousData: Peer.Data, nextData: Peer.Data)

/**
* Sent by the peer-connection to notify the peer that the connection is down.
* We could use watchWith on the peer-connection but it doesn't work with akka cluster when untrusted mode is enabled
*/
case class ConnectionDown(peerConnection: ActorRef)

// @formatter:on

def makeChannelParams(nodeParams: NodeParams, features: Features, defaultFinalScriptPubkey: ByteVector, walletStaticPaymentBasepoint: Option[PublicKey], isFunder: Boolean, fundingAmount: Satoshi): LocalParams = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
}

onTermination {
case StopEvent(_, CONNECTED, _: ConnectedData) => Metrics.PeerConnectionsConnected.withoutTags().decrement()
case StopEvent(_, CONNECTED, d: ConnectedData) =>
Metrics.PeerConnectionsConnected.withoutTags().decrement()
d.peer ! Peer.ConnectionDown(self)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,23 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
// reason to eagerly retry
// That's why we set the next reconnection delay to a random value between MAX_RECONNECT_INTERVAL/2 and MAX_RECONNECT_INTERVAL.
val firstNextReconnectionDelay = nodeParams.maxReconnectInterval.minus(Random.nextInt(nodeParams.maxReconnectInterval.toSeconds.toInt / 2).seconds)
log.debug("first connection attempt in {}", initialDelay)
(initialDelay, firstNextReconnectionDelay)
case (_, cd: ConnectingData) if System.currentTimeMillis.milliseconds - d.since < 30.seconds =>
log.info("peer is disconnected (shortly after connection was established)")
// If our latest successful connection attempt was less than 30 seconds ago, we pick up the exponential
// back-off retry delay where we left it. The goal is to address cases where the reconnection is successful,
// but we are disconnected right away.
val initialDelay = cd.nextReconnectionDelay
val firstNextReconnectionDelay = nextReconnectionDelay(initialDelay, nodeParams.maxReconnectInterval)
log.info("peer got disconnected shortly after connection was established, next reconnection in {}", initialDelay)
(initialDelay, firstNextReconnectionDelay)
case _ =>
log.info("peer is disconnected")
// Randomizing the initial delay is important in the case of a reconnection. If both peers have a public
// address, they may attempt to simultaneously connect back to each other, which could result in reconnection loop,
// given that each new connection will cause the previous one to be killed.
val initialDelay = randomizeDelay(nodeParams.initialRandomReconnectDelay)
val firstNextReconnectionDelay = nextReconnectionDelay(initialDelay, nodeParams.maxReconnectInterval)
log.info("peer is disconnected, next reconnection in {}", initialDelay)
(initialDelay, firstNextReconnectionDelay)
}
setReconnectTimer(initialDelay)
Expand Down