Skip to content

Commit

Permalink
Fixed channel integration tests and Router use of UnwatchTxConfirmed
Browse files Browse the repository at this point in the history
- Fixup tests to generate enough blocks to deeply confirm a channel has closed once before waiting for the channel to close.
- Fixup so Router does not send `UnwatchTxConfirmed` only for the spending txs that will never confirm. Channel also has a `WatchTxConfirmed` event that may trigger later and should not be removed.
  • Loading branch information
remyers committed Dec 11, 2024
1 parent 0f250b1 commit 03c5629
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm

case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) =>
d.spentChannels.get(spendingTx.txid) match {
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, shortChannelId)
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelId)
case None => stay()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ object Validation {
} else d1
}

def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get
log.info("funding tx for channelId={} was spent", shortChannelId)
Expand All @@ -294,7 +294,8 @@ object Validation {
// we will re-add a spliced channel as a new channel later when we receive the announcement
watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId))
val spendingTxs = d.spentChannels.filter(_._2 == shortChannelId).keySet
spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId))
// stop watching the spending txs that will never confirm, but continue to watch the tx that spends the parent channel
(spendingTxs - spendingTxId).foreach(txId => watcher ! UnwatchTxConfirmed(txId))
val spentChannels1 = d.spentChannels -- spendingTxs
d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
val receivedByF = listReceivedByAddress(finalAddressF)
(receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC
}, max = 30 seconds, interval = 1 second)
// we generate blocks to make txs confirm
generateBlocks(2, Some(minerAddress))
// we generate enough blocks for the channel to be deeply confirmed
generateBlocks(12, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)

// generate enough blocks so the router will know the channel has been closed and not spliced
generateBlocks(12)
awaitAnnouncements(1)
}

Expand Down Expand Up @@ -238,14 +235,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
val receivedByF = listReceivedByAddress(finalAddressF, sender)
(receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC
}, max = 30 seconds, interval = 1 second)
// we generate blocks to make txs confirm
generateBlocks(2, Some(minerAddress))
// we generate enough blocks for the channel to be deeply confirmed
generateBlocks(12, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)

// generate enough blocks so the router will know the channel has been closed and not spliced
generateBlocks(12)
awaitAnnouncements(1)
}

Expand Down Expand Up @@ -294,14 +288,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
val receivedByF = listReceivedByAddress(finalAddressF, sender)
(receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC
}, max = 30 seconds, interval = 1 second)
// we generate blocks to make txs confirm
generateBlocks(2, Some(minerAddress))
// we generate enough blocks for the channel to be deeply confirmed
generateBlocks(12, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)

// generate enough blocks so the router will know the channel has been closed and not spliced
generateBlocks(12)
awaitAnnouncements(1)
}

Expand Down Expand Up @@ -353,14 +344,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
val receivedByF = listReceivedByAddress(finalAddressF, sender)
(receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC
}, max = 30 seconds, interval = 1 second)
// we generate blocks to make tx confirm
generateBlocks(2, Some(minerAddress))
// we generate enough blocks for the channel to be deeply confirmed
generateBlocks(12, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)

// generate enough blocks so the router will know the channel has been closed and not spliced
generateBlocks(12)
awaitAnnouncements(1)
}

Expand Down Expand Up @@ -599,15 +587,13 @@ class StandardChannelIntegrationSpec extends ChannelIntegrationSpec {
bitcoinClient.getMempool().pipeTo(sender.ref)
sender.expectMsgType[Seq[Transaction]].exists(_.txIn.head.outPoint.txid == fundingOutpoint.txid)
}, max = 20 seconds, interval = 1 second)
generateBlocks(3)
// we generate enough blocks for the channel to be deeply confirmed
generateBlocks(12)
awaitCond(stateListener.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)

bitcoinClient.lookForSpendingTx(None, fundingOutpoint.txid, fundingOutpoint.index.toInt, limit = 10).pipeTo(sender.ref)
bitcoinClient.lookForSpendingTx(None, fundingOutpoint.txid, fundingOutpoint.index.toInt, limit = 12).pipeTo(sender.ref)
val closingTx = sender.expectMsgType[Transaction]
assert(closingTx.txOut.map(_.publicKeyScript).toSet == Set(finalPubKeyScriptC, finalPubKeyScriptF))

// generate enough blocks so the router will know the channel has been closed and not spliced
generateBlocks(12)
awaitAnnouncements(1)
}

Expand Down

0 comments on commit 03c5629

Please sign in to comment.