diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala index fff801c89e..ce22de12f0 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/IntegrationSpec.scala @@ -149,7 +149,7 @@ abstract class IntegrationSpec extends TestKitBaseClass with BitcoindService wit properties } - def connect(node1: Kit, node2: Kit, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi): ChannelOpenResponse.ChannelOpened = { + def connect(node1: Kit, node2: Kit): Unit = { val sender = TestProbe() val address = node2.nodeParams.publicAddresses.head sender.send(node1.switchboard, Peer.Connect( @@ -158,6 +158,11 @@ abstract class IntegrationSpec extends TestKitBaseClass with BitcoindService wit sender.ref )) sender.expectMsgType[PeerConnection.ConnectionResult.HasConnection](10 seconds) + } + + def connect(node1: Kit, node2: Kit, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi): ChannelOpenResponse.ChannelOpened = { + val sender = TestProbe() + connect(node1, node2) sender.send(node1.switchboard, Peer.OpenChannel( remoteNodeId = node2.nodeParams.nodeId, fundingSatoshis = fundingSatoshis, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/MessageIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/MessageIntegrationSpec.scala index d8207168e4..e4647754e9 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/MessageIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/MessageIntegrationSpec.scala @@ -16,95 +16,175 @@ package fr.acinq.eclair.integration +import akka.actor.ActorRef +import akka.actor.typed.scaladsl.adapter.actorRefAdapter +import akka.pattern.pipe import akka.testkit.TestProbe import akka.util.Timeout -import com.google.common.net.HostAndPort import com.typesafe.config.ConfigFactory -import fr.acinq.bitcoin.Satoshi -import fr.acinq.eclair.channel.ChannelStateChanged -import fr.acinq.eclair.io.NodeURI +import fr.acinq.bitcoin.{ByteVector32, Satoshi, Transaction} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{Watch, WatchFundingConfirmed} +import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient +import fr.acinq.eclair.channel.{CMD_CLOSE, RES_SUCCESS} import fr.acinq.eclair.message.OnionMessages import fr.acinq.eclair.router.Router -import fr.acinq.eclair.wire.protocol.NodeAnnouncement -import fr.acinq.eclair.{EclairImpl, Features} +import fr.acinq.eclair.wire.protocol.{GenericTlv, NodeAnnouncement} +import fr.acinq.eclair.{EclairImpl, Features, MilliSatoshi, SendOnionMessageResponse, UInt64} import scodec.bits.{ByteVector, HexStringSyntax} -import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ class MessageIntegrationSpec extends IntegrationSpec { - val duration: FiniteDuration = FiniteDuration(5, SECONDS) - implicit val timeout: Timeout = duration + implicit val timeout: Timeout = FiniteDuration(30, SECONDS) test("start eclair nodes") { - instantiateEclairNode("A", ConfigFactory.parseMap(Map("eclair.node-alias" -> "A", "eclair.expiry-delta-blocks" -> 130, "eclair.server.port" -> 29730, "eclair.api.port" -> 28080, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig)) - instantiateEclairNode("B", ConfigFactory.parseMap(Map("eclair.node-alias" -> "B", "eclair.expiry-delta-blocks" -> 131, "eclair.server.port" -> 29731, "eclair.api.port" -> 28081, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig)) - instantiateEclairNode("C", ConfigFactory.parseMap(Map("eclair.node-alias" -> "C", "eclair.expiry-delta-blocks" -> 132, "eclair.server.port" -> 29732, "eclair.api.port" -> 28082, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig)) - instantiateEclairNode("D", ConfigFactory.parseMap(Map("eclair.node-alias" -> "D", "eclair.expiry-delta-blocks" -> 133, "eclair.server.port" -> 29733, "eclair.api.port" -> 28083, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig)) - instantiateEclairNode("E", ConfigFactory.parseMap(Map("eclair.node-alias" -> "E", "eclair.expiry-delta-blocks" -> 134, "eclair.server.port" -> 29734, "eclair.api.port" -> 28084, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig)) + instantiateEclairNode("A", ConfigFactory.parseMap(Map("eclair.node-alias" -> "A", "eclair.server.port" -> 30700, "eclair.api.port" -> 30780, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig)) + instantiateEclairNode("B", ConfigFactory.parseMap(Map("eclair.node-alias" -> "B", "eclair.server.port" -> 30701, "eclair.api.port" -> 30781, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig)) + instantiateEclairNode("C", ConfigFactory.parseMap(Map("eclair.node-alias" -> "C", "eclair.server.port" -> 30702, "eclair.api.port" -> 30782, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig)) + instantiateEclairNode("D", ConfigFactory.parseMap(Map("eclair.node-alias" -> "D", "eclair.server.port" -> 30703, "eclair.api.port" -> 30783).asJava).withFallback(commonFeatures).withFallback(commonConfig)) } test("try to reach unknown node") { val alice = new EclairImpl(nodes("A")) - - assert(Await.result(alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"000000")), duration).sent === false) + val probe = TestProbe() + alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"000000")).pipeTo(probe.ref) + val result = probe.expectMsgType[SendOnionMessageResponse] + assert(!result.sent) } test("send to connected node") { val alice = new EclairImpl(nodes("A")) - val eventListener = TestProbe() - - assert(Await.result(alice.connect(Left(NodeURI(nodes("B").nodeParams.nodeId, HostAndPort.fromString("127.0.0.1:29731")))), duration) === "connected") + connect(nodes("A"), nodes("B")) + val probe = TestProbe() + val eventListener = TestProbe() nodes("B").system.eventStream.subscribe(eventListener.ref, classOf[OnionMessages.ReceiveMessage]) - assert(Await.result(alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"111111")), duration).sent === true) + alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"111111")).pipeTo(probe.ref) + assert(probe.expectMsgType[SendOnionMessageResponse].sent) val r = eventListener.expectMsgType[OnionMessages.ReceiveMessage] - assert(r.pathId contains hex"111111") + assert(r.pathId === Some(hex"111111")) } test("send with hop") { val alice = new EclairImpl(nodes("A")) - val bob = new EclairImpl(nodes("B")) - val eventListener = TestProbe() - - assert(Await.result(bob.connect(Left(NodeURI(nodes("C").nodeParams.nodeId, HostAndPort.fromString("127.0.0.1:29732")))), duration) === "connected") + connect(nodes("B"), nodes("C")) + val probe = TestProbe() + val eventListener = TestProbe() nodes("C").system.eventStream.subscribe(eventListener.ref, classOf[OnionMessages.ReceiveMessage]) - assert(Await.result(alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: nodes("C").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"2222")), duration).sent === true) + alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: nodes("C").nodeParams.nodeId :: Nil, hex"710301020375020102", Some(hex"2222")).pipeTo(probe.ref) + assert(probe.expectMsgType[SendOnionMessageResponse].sent) val r = eventListener.expectMsgType[OnionMessages.ReceiveMessage] - assert(r.pathId contains hex"2222") + assert(r.pathId === Some(hex"2222")) + assert(r.finalPayload.records.unknown.toSet === Set(GenericTlv(UInt64(113), hex"010203"), GenericTlv(UInt64(117), hex"0102"))) } - test("create channels") { - val bob = new EclairImpl(nodes("B")) - val sender = TestProbe() - val eventListener = TestProbe() - nodes.values.foreach(_.system.eventStream.subscribe(eventListener.ref, classOf[ChannelStateChanged])) + test("open channels") { + val probe = TestProbe() - Await.result(bob.open(nodes("A").nodeParams.nodeId, Satoshi(100000), None, None, None, None, None), duration) - Await.result(bob.open(nodes("C").nodeParams.nodeId, Satoshi(100000), None, None, None, None, None), duration) - - // confirming the funding tx + // We connect A -> B -> C + connect(nodes("B"), nodes("A"), Satoshi(100_000), MilliSatoshi(0)) + connect(nodes("B"), nodes("C"), Satoshi(100_000), MilliSatoshi(0)) + + // we make sure all channels have set up their WatchConfirmed for the funding tx + awaitCond({ + nodes("B").watcher ! ZmqWatcher.ListWatches(probe.ref) + val watches = probe.expectMsgType[Set[Watch[_]]] + watches.count(_.isInstanceOf[WatchFundingConfirmed]) == 2 + }, max = 20 seconds, interval = 500 millis) + + // We also connect A -> D, B -> D, C -> D + connect(nodes("D"), nodes("A"), Satoshi(100_000), MilliSatoshi(0)) + connect(nodes("D"), nodes("B"), Satoshi(100_000), MilliSatoshi(0)) + connect(nodes("D"), nodes("C"), Satoshi(100_000), MilliSatoshi(0)) + + // we make sure all channels have set up their WatchConfirmed for the funding tx + awaitCond({ + nodes("D").watcher ! ZmqWatcher.ListWatches(probe.ref) + val watches = probe.expectMsgType[Set[Watch[_]]] + watches.count(_.isInstanceOf[WatchFundingConfirmed]) == 3 + }, max = 20 seconds, interval = 500 millis) + + // confirm funding txs generateBlocks(10) - // We wait for A to know about B and C + // We wait for A to know about B, C and D awaitCond({ - sender.send(nodes("A").router, Router.GetNodes) - sender.expectMsgType[Iterable[NodeAnnouncement]].size == 3 + probe.send(nodes("A").router, Router.GetNodes) + probe.expectMsgType[Iterable[NodeAnnouncement]].size == 4 }, max = 60 seconds, interval = 1 second) } - test("skip hop") { + test("automatically connect to node based on node_announcement address") { val alice = new EclairImpl(nodes("A")) + val probe = TestProbe() val eventListener = TestProbe() nodes("C").system.eventStream.subscribe(eventListener.ref, classOf[OnionMessages.ReceiveMessage]) - assert(Await.result(alice.sendOnionMessage(nodes("C").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"33333333")), duration).sent === true) + alice.sendOnionMessage(nodes("C").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"33333333")).pipeTo(probe.ref) + assert(probe.expectMsgType[SendOnionMessageResponse].sent) + + val r = eventListener.expectMsgType[OnionMessages.ReceiveMessage] + assert(r.pathId === Some(hex"33333333")) + + // We disconnect A from C for future tests. + alice.disconnect(nodes("C").nodeParams.nodeId) + } + + test("close channels") { + // We close the channels A -> B -> C but we keep channels with D + // This ensures nodes still have an unrelated channel so we keep them in the network DB. + val probe = TestProbe() + probe.send(nodes("B").register, Symbol("channels")) + val channelsB = probe.expectMsgType[Map[ByteVector32, ActorRef]] + assert(channelsB.size === 3) + probe.send(nodes("D").register, Symbol("channels")) + val channelsD = probe.expectMsgType[Map[ByteVector32, ActorRef]] + assert(channelsD.size === 3) + channelsB.foreach { + case (channelId, channel) => + if (!channelsD.contains(channelId)) { + channel ! CMD_CLOSE(probe.ref, None, None) + probe.expectMsgType[RES_SUCCESS[CMD_CLOSE]] + } + } + + val bitcoinClient = new BitcoinCoreClient(bitcoinrpcclient) + awaitCond({ + bitcoinClient.getMempool().pipeTo(probe.ref) + probe.expectMsgType[Seq[Transaction]].size == 2 + }, max = 20 seconds, interval = 500 millis) + + // confirm closing txs + generateBlocks(10) + + // nodes should disconnect automatically once they don't have any channels left + awaitCond({ + probe.send(nodes("A").switchboard, Symbol("peers")) + val peersA = probe.expectMsgType[Iterable[ActorRef]] + probe.send(nodes("B").switchboard, Symbol("peers")) + val peersB = probe.expectMsgType[Iterable[ActorRef]] + // A and B are now only connected to D + peersA.size == 1 && peersB.size == 1 + }, max = 20 seconds, interval = 500 millis) + } + + test("automatically connect to known nodes") { + val alice = new EclairImpl(nodes("A")) + val probe = TestProbe() + val eventListener = TestProbe() + nodes("C").system.eventStream.subscribe(eventListener.ref, classOf[OnionMessages.ReceiveMessage]) + alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: nodes("C").nodeParams.nodeId :: Nil, hex"7300", None).pipeTo(probe.ref) + assert(probe.expectMsgType[SendOnionMessageResponse].sent) val r = eventListener.expectMsgType[OnionMessages.ReceiveMessage] - assert(r.pathId contains hex"33333333") + assert(r.pathId === None) + assert(r.finalPayload.records.unknown.toSet === Set(GenericTlv(UInt64(115), hex""))) } + } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/MessageRelaySpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/MessageRelaySpec.scala index 8af64a4030..22ab8f90a4 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/MessageRelaySpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/MessageRelaySpec.scala @@ -16,78 +16,72 @@ package fr.acinq.eclair.io -import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, ClassicActorSystemOps} -import akka.actor.{ActorContext, ActorRef} -import akka.testkit.TestProbe +import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe} +import akka.actor.typed.ActorRef +import akka.actor.typed.scaladsl.adapter.TypedActorRefOps +import com.typesafe.config.ConfigFactory import fr.acinq.bitcoin.Crypto.PublicKey import fr.acinq.eclair.TestConstants.{Alice, Bob} -import fr.acinq.eclair.io.Peer.ChannelFactory import fr.acinq.eclair.message.OnionMessages import fr.acinq.eclair.message.OnionMessages.{IntermediateNode, Recipient} +import fr.acinq.eclair.randomKey import fr.acinq.eclair.wire.protocol.OnionMessage -import fr.acinq.eclair.{TestKitBaseClass, randomKey} -import org.scalatest.funsuite.AnyFunSuiteLike +import org.scalatest.Outcome +import org.scalatest.funsuite.FixtureAnyFunSuiteLike -case class FakeChannelFactory(channel: TestProbe) extends ChannelFactory { - override def spawn(context: ActorContext, remoteNodeId: PublicKey, origin_opt: Option[ActorRef]): ActorRef = { - channel.ref - } -} - -class MessageRelaySpec extends TestKitBaseClass with AnyFunSuiteLike { +class MessageRelaySpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike { val aliceId: PublicKey = Alice.nodeParams.nodeId val bobId: PublicKey = Bob.nodeParams.nodeId - test("relay with new connection") { - val switchboard = TestProbe() - val peerConnection = TestProbe() - - val (_, message) = OnionMessages.buildMessage(randomKey(), randomKey(), Seq(IntermediateNode(aliceId)), Left(Recipient(bobId, None)), Nil) + case class FixtureParam(relay: ActorRef[MessageRelay.Command], switchboard: TestProbe[Peer.Connect], peerConnection: TestProbe[OnionMessage], probe: TestProbe[MessageRelay.Status]) + + override def withFixture(test: OneArgTest): Outcome = { + val switchboard = TestProbe[Peer.Connect]("switchboard") + val peerConnection = TestProbe[OnionMessage]("peerConnection") + val probe = TestProbe[MessageRelay.Status]("probe") + val relay = testKit.spawn(MessageRelay()) + try { + withFixture(test.toNoArgTest(FixtureParam(relay, switchboard, peerConnection, probe))) + } finally { + testKit.stop(relay) + } + } - val probe = TestProbe() + test("relay with new connection") { f => + import f._ - val relay = system.spawnAnonymous(MessageRelay()) - relay ! MessageRelay.RelayMessage(switchboard.ref, bobId, message, probe.ref.toTyped) + val (_, message) = OnionMessages.buildMessage(randomKey(), randomKey(), Seq(IntermediateNode(aliceId)), Left(Recipient(bobId, None)), Nil) + relay ! MessageRelay.RelayMessage(switchboard.ref.toClassic, bobId, message, probe.ref) - val connectToNextPeer = switchboard.expectMsgType[Peer.Connect] + val connectToNextPeer = switchboard.expectMessageType[Peer.Connect] assert(connectToNextPeer.nodeId === bobId) - connectToNextPeer.replyTo ! PeerConnection.ConnectionResult.Connected(peerConnection.ref) - peerConnection.expectMsgType[OnionMessage] - probe.expectMsg(MessageRelay.Success) + connectToNextPeer.replyTo ! PeerConnection.ConnectionResult.Connected(peerConnection.ref.toClassic) + peerConnection.expectMessage(message) + probe.expectMessage(MessageRelay.Success) } - test("relay with existing connection") { - val switchboard = TestProbe() - val peerConnection = TestProbe() + test("relay with existing connection") { f => + import f._ val (_, message) = OnionMessages.buildMessage(randomKey(), randomKey(), Seq(IntermediateNode(aliceId)), Left(Recipient(bobId, None)), Nil) + relay ! MessageRelay.RelayMessage(switchboard.ref.toClassic, bobId, message, probe.ref) - val probe = TestProbe() - - val relay = system.spawnAnonymous(MessageRelay()) - relay ! MessageRelay.RelayMessage(switchboard.ref, bobId, message, probe.ref.toTyped) - - val connectToNextPeer = switchboard.expectMsgType[Peer.Connect] + val connectToNextPeer = switchboard.expectMessageType[Peer.Connect] assert(connectToNextPeer.nodeId === bobId) - connectToNextPeer.replyTo ! PeerConnection.ConnectionResult.AlreadyConnected(peerConnection.ref) - peerConnection.expectMsgType[OnionMessage] - probe.expectMsg(MessageRelay.Success) + connectToNextPeer.replyTo ! PeerConnection.ConnectionResult.AlreadyConnected(peerConnection.ref.toClassic) + peerConnection.expectMessage(message) + probe.expectMessage(MessageRelay.Success) } - test("can't open new connection") { - val switchboard = TestProbe() - val peerConnection = TestProbe() + test("can't open new connection") { f => + import f._ val (_, message) = OnionMessages.buildMessage(randomKey(), randomKey(), Seq(IntermediateNode(aliceId)), Left(Recipient(bobId, None)), Nil) + relay ! MessageRelay.RelayMessage(switchboard.ref.toClassic, bobId, message, probe.ref) - val probe = TestProbe() - - val relay = system.spawnAnonymous(MessageRelay()) - relay ! MessageRelay.RelayMessage(switchboard.ref, bobId, message, probe.ref.toTyped) - - val connectToNextPeer = switchboard.expectMsgType[Peer.Connect] + val connectToNextPeer = switchboard.expectMessageType[Peer.Connect] assert(connectToNextPeer.nodeId === bobId) connectToNextPeer.replyTo ! PeerConnection.ConnectionResult.NoAddressFound - probe.expectMsg(MessageRelay.Failure(PeerConnection.ConnectionResult.NoAddressFound)) + probe.expectMessage(MessageRelay.Failure(PeerConnection.ConnectionResult.NoAddressFound)) } }