Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
* Use typed actor framework for MessageRelaySpec
* Fix port conflicts in integration tests
* Use pipeTo instead of blocking for integration tests
* Increase timeout for integration spec
* Add tests for user content tlvs
* Add tests for automatic reconnection
  • Loading branch information
t-bast authored and thomash-acinq committed Nov 29, 2021
1 parent 1942b4c commit 42ff02b
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ abstract class IntegrationSpec extends TestKitBaseClass with BitcoindService wit
properties
}

def connect(node1: Kit, node2: Kit, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi): ChannelOpenResponse.ChannelOpened = {
def connect(node1: Kit, node2: Kit): Unit = {
val sender = TestProbe()
val address = node2.nodeParams.publicAddresses.head
sender.send(node1.switchboard, Peer.Connect(
Expand All @@ -158,6 +158,11 @@ abstract class IntegrationSpec extends TestKitBaseClass with BitcoindService wit
sender.ref
))
sender.expectMsgType[PeerConnection.ConnectionResult.HasConnection](10 seconds)
}

def connect(node1: Kit, node2: Kit, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi): ChannelOpenResponse.ChannelOpened = {
val sender = TestProbe()
connect(node1, node2)
sender.send(node1.switchboard, Peer.OpenChannel(
remoteNodeId = node2.nodeParams.nodeId,
fundingSatoshis = fundingSatoshis,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,95 +16,175 @@

package fr.acinq.eclair.integration

import akka.actor.ActorRef
import akka.actor.typed.scaladsl.adapter.actorRefAdapter
import akka.pattern.pipe
import akka.testkit.TestProbe
import akka.util.Timeout
import com.google.common.net.HostAndPort
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.Satoshi
import fr.acinq.eclair.channel.ChannelStateChanged
import fr.acinq.eclair.io.NodeURI
import fr.acinq.bitcoin.{ByteVector32, Satoshi, Transaction}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{Watch, WatchFundingConfirmed}
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.{CMD_CLOSE, RES_SUCCESS}
import fr.acinq.eclair.message.OnionMessages
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.wire.protocol.NodeAnnouncement
import fr.acinq.eclair.{EclairImpl, Features}
import fr.acinq.eclair.wire.protocol.{GenericTlv, NodeAnnouncement}
import fr.acinq.eclair.{EclairImpl, Features, MilliSatoshi, SendOnionMessageResponse, UInt64}
import scodec.bits.{ByteVector, HexStringSyntax}

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

class MessageIntegrationSpec extends IntegrationSpec {
val duration: FiniteDuration = FiniteDuration(5, SECONDS)
implicit val timeout: Timeout = duration
implicit val timeout: Timeout = FiniteDuration(30, SECONDS)

test("start eclair nodes") {
instantiateEclairNode("A", ConfigFactory.parseMap(Map("eclair.node-alias" -> "A", "eclair.expiry-delta-blocks" -> 130, "eclair.server.port" -> 29730, "eclair.api.port" -> 28080, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig))
instantiateEclairNode("B", ConfigFactory.parseMap(Map("eclair.node-alias" -> "B", "eclair.expiry-delta-blocks" -> 131, "eclair.server.port" -> 29731, "eclair.api.port" -> 28081, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig))
instantiateEclairNode("C", ConfigFactory.parseMap(Map("eclair.node-alias" -> "C", "eclair.expiry-delta-blocks" -> 132, "eclair.server.port" -> 29732, "eclair.api.port" -> 28082, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig))
instantiateEclairNode("D", ConfigFactory.parseMap(Map("eclair.node-alias" -> "D", "eclair.expiry-delta-blocks" -> 133, "eclair.server.port" -> 29733, "eclair.api.port" -> 28083, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig))
instantiateEclairNode("E", ConfigFactory.parseMap(Map("eclair.node-alias" -> "E", "eclair.expiry-delta-blocks" -> 134, "eclair.server.port" -> 29734, "eclair.api.port" -> 28084, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig))
instantiateEclairNode("A", ConfigFactory.parseMap(Map("eclair.node-alias" -> "A", "eclair.server.port" -> 30700, "eclair.api.port" -> 30780, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig))
instantiateEclairNode("B", ConfigFactory.parseMap(Map("eclair.node-alias" -> "B", "eclair.server.port" -> 30701, "eclair.api.port" -> 30781, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig))
instantiateEclairNode("C", ConfigFactory.parseMap(Map("eclair.node-alias" -> "C", "eclair.server.port" -> 30702, "eclair.api.port" -> 30782, s"eclair.features.${Features.OnionMessages.rfcName}" -> "optional").asJava).withFallback(commonFeatures).withFallback(commonConfig))
instantiateEclairNode("D", ConfigFactory.parseMap(Map("eclair.node-alias" -> "D", "eclair.server.port" -> 30703, "eclair.api.port" -> 30783).asJava).withFallback(commonFeatures).withFallback(commonConfig))
}

test("try to reach unknown node") {
val alice = new EclairImpl(nodes("A"))

assert(Await.result(alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"000000")), duration).sent === false)
val probe = TestProbe()
alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"000000")).pipeTo(probe.ref)
val result = probe.expectMsgType[SendOnionMessageResponse]
assert(!result.sent)
}

test("send to connected node") {
val alice = new EclairImpl(nodes("A"))
val eventListener = TestProbe()

assert(Await.result(alice.connect(Left(NodeURI(nodes("B").nodeParams.nodeId, HostAndPort.fromString("127.0.0.1:29731")))), duration) === "connected")
connect(nodes("A"), nodes("B"))

val probe = TestProbe()
val eventListener = TestProbe()
nodes("B").system.eventStream.subscribe(eventListener.ref, classOf[OnionMessages.ReceiveMessage])
assert(Await.result(alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"111111")), duration).sent === true)
alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"111111")).pipeTo(probe.ref)
assert(probe.expectMsgType[SendOnionMessageResponse].sent)

val r = eventListener.expectMsgType[OnionMessages.ReceiveMessage]
assert(r.pathId contains hex"111111")
assert(r.pathId === Some(hex"111111"))
}

test("send with hop") {
val alice = new EclairImpl(nodes("A"))
val bob = new EclairImpl(nodes("B"))
val eventListener = TestProbe()

assert(Await.result(bob.connect(Left(NodeURI(nodes("C").nodeParams.nodeId, HostAndPort.fromString("127.0.0.1:29732")))), duration) === "connected")
connect(nodes("B"), nodes("C"))

val probe = TestProbe()
val eventListener = TestProbe()
nodes("C").system.eventStream.subscribe(eventListener.ref, classOf[OnionMessages.ReceiveMessage])
assert(Await.result(alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: nodes("C").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"2222")), duration).sent === true)
alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: nodes("C").nodeParams.nodeId :: Nil, hex"710301020375020102", Some(hex"2222")).pipeTo(probe.ref)
assert(probe.expectMsgType[SendOnionMessageResponse].sent)

val r = eventListener.expectMsgType[OnionMessages.ReceiveMessage]
assert(r.pathId contains hex"2222")
assert(r.pathId === Some(hex"2222"))
assert(r.finalPayload.records.unknown.toSet === Set(GenericTlv(UInt64(113), hex"010203"), GenericTlv(UInt64(117), hex"0102")))
}

test("create channels") {
val bob = new EclairImpl(nodes("B"))
val sender = TestProbe()
val eventListener = TestProbe()
nodes.values.foreach(_.system.eventStream.subscribe(eventListener.ref, classOf[ChannelStateChanged]))
test("open channels") {
val probe = TestProbe()

Await.result(bob.open(nodes("A").nodeParams.nodeId, Satoshi(100000), None, None, None, None, None), duration)
Await.result(bob.open(nodes("C").nodeParams.nodeId, Satoshi(100000), None, None, None, None, None), duration)

// confirming the funding tx
// We connect A -> B -> C
connect(nodes("B"), nodes("A"), Satoshi(100_000), MilliSatoshi(0))
connect(nodes("B"), nodes("C"), Satoshi(100_000), MilliSatoshi(0))

// we make sure all channels have set up their WatchConfirmed for the funding tx
awaitCond({
nodes("B").watcher ! ZmqWatcher.ListWatches(probe.ref)
val watches = probe.expectMsgType[Set[Watch[_]]]
watches.count(_.isInstanceOf[WatchFundingConfirmed]) == 2
}, max = 20 seconds, interval = 500 millis)

// We also connect A -> D, B -> D, C -> D
connect(nodes("D"), nodes("A"), Satoshi(100_000), MilliSatoshi(0))
connect(nodes("D"), nodes("B"), Satoshi(100_000), MilliSatoshi(0))
connect(nodes("D"), nodes("C"), Satoshi(100_000), MilliSatoshi(0))

// we make sure all channels have set up their WatchConfirmed for the funding tx
awaitCond({
nodes("D").watcher ! ZmqWatcher.ListWatches(probe.ref)
val watches = probe.expectMsgType[Set[Watch[_]]]
watches.count(_.isInstanceOf[WatchFundingConfirmed]) == 3
}, max = 20 seconds, interval = 500 millis)

// confirm funding txs
generateBlocks(10)

// We wait for A to know about B and C
// We wait for A to know about B, C and D
awaitCond({
sender.send(nodes("A").router, Router.GetNodes)
sender.expectMsgType[Iterable[NodeAnnouncement]].size == 3
probe.send(nodes("A").router, Router.GetNodes)
probe.expectMsgType[Iterable[NodeAnnouncement]].size == 4
}, max = 60 seconds, interval = 1 second)
}

test("skip hop") {
test("automatically connect to node based on node_announcement address") {
val alice = new EclairImpl(nodes("A"))
val probe = TestProbe()
val eventListener = TestProbe()

nodes("C").system.eventStream.subscribe(eventListener.ref, classOf[OnionMessages.ReceiveMessage])
assert(Await.result(alice.sendOnionMessage(nodes("C").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"33333333")), duration).sent === true)
alice.sendOnionMessage(nodes("C").nodeParams.nodeId :: Nil, ByteVector.empty, Some(hex"33333333")).pipeTo(probe.ref)
assert(probe.expectMsgType[SendOnionMessageResponse].sent)

val r = eventListener.expectMsgType[OnionMessages.ReceiveMessage]
assert(r.pathId === Some(hex"33333333"))

// We disconnect A from C for future tests.
alice.disconnect(nodes("C").nodeParams.nodeId)
}

test("close channels") {
// We close the channels A -> B -> C but we keep channels with D
// This ensures nodes still have an unrelated channel so we keep them in the network DB.
val probe = TestProbe()
probe.send(nodes("B").register, Symbol("channels"))
val channelsB = probe.expectMsgType[Map[ByteVector32, ActorRef]]
assert(channelsB.size === 3)
probe.send(nodes("D").register, Symbol("channels"))
val channelsD = probe.expectMsgType[Map[ByteVector32, ActorRef]]
assert(channelsD.size === 3)
channelsB.foreach {
case (channelId, channel) =>
if (!channelsD.contains(channelId)) {
channel ! CMD_CLOSE(probe.ref, None, None)
probe.expectMsgType[RES_SUCCESS[CMD_CLOSE]]
}
}

val bitcoinClient = new BitcoinCoreClient(bitcoinrpcclient)
awaitCond({
bitcoinClient.getMempool().pipeTo(probe.ref)
probe.expectMsgType[Seq[Transaction]].size == 2
}, max = 20 seconds, interval = 500 millis)

// confirm closing txs
generateBlocks(10)

// nodes should disconnect automatically once they don't have any channels left
awaitCond({
probe.send(nodes("A").switchboard, Symbol("peers"))
val peersA = probe.expectMsgType[Iterable[ActorRef]]
probe.send(nodes("B").switchboard, Symbol("peers"))
val peersB = probe.expectMsgType[Iterable[ActorRef]]
// A and B are now only connected to D
peersA.size == 1 && peersB.size == 1
}, max = 20 seconds, interval = 500 millis)
}

test("automatically connect to known nodes") {
val alice = new EclairImpl(nodes("A"))
val probe = TestProbe()
val eventListener = TestProbe()
nodes("C").system.eventStream.subscribe(eventListener.ref, classOf[OnionMessages.ReceiveMessage])
alice.sendOnionMessage(nodes("B").nodeParams.nodeId :: nodes("C").nodeParams.nodeId :: Nil, hex"7300", None).pipeTo(probe.ref)
assert(probe.expectMsgType[SendOnionMessageResponse].sent)

val r = eventListener.expectMsgType[OnionMessages.ReceiveMessage]
assert(r.pathId contains hex"33333333")
assert(r.pathId === None)
assert(r.finalPayload.records.unknown.toSet === Set(GenericTlv(UInt64(115), hex"")))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,78 +16,72 @@

package fr.acinq.eclair.io

import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, ClassicActorSystemOps}
import akka.actor.{ActorContext, ActorRef}
import akka.testkit.TestProbe
import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.io.Peer.ChannelFactory
import fr.acinq.eclair.message.OnionMessages
import fr.acinq.eclair.message.OnionMessages.{IntermediateNode, Recipient}
import fr.acinq.eclair.randomKey
import fr.acinq.eclair.wire.protocol.OnionMessage
import fr.acinq.eclair.{TestKitBaseClass, randomKey}
import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.Outcome
import org.scalatest.funsuite.FixtureAnyFunSuiteLike

case class FakeChannelFactory(channel: TestProbe) extends ChannelFactory {
override def spawn(context: ActorContext, remoteNodeId: PublicKey, origin_opt: Option[ActorRef]): ActorRef = {
channel.ref
}
}

class MessageRelaySpec extends TestKitBaseClass with AnyFunSuiteLike {
class MessageRelaySpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with FixtureAnyFunSuiteLike {
val aliceId: PublicKey = Alice.nodeParams.nodeId
val bobId: PublicKey = Bob.nodeParams.nodeId

test("relay with new connection") {
val switchboard = TestProbe()
val peerConnection = TestProbe()

val (_, message) = OnionMessages.buildMessage(randomKey(), randomKey(), Seq(IntermediateNode(aliceId)), Left(Recipient(bobId, None)), Nil)
case class FixtureParam(relay: ActorRef[MessageRelay.Command], switchboard: TestProbe[Peer.Connect], peerConnection: TestProbe[OnionMessage], probe: TestProbe[MessageRelay.Status])

override def withFixture(test: OneArgTest): Outcome = {
val switchboard = TestProbe[Peer.Connect]("switchboard")
val peerConnection = TestProbe[OnionMessage]("peerConnection")
val probe = TestProbe[MessageRelay.Status]("probe")
val relay = testKit.spawn(MessageRelay())
try {
withFixture(test.toNoArgTest(FixtureParam(relay, switchboard, peerConnection, probe)))
} finally {
testKit.stop(relay)
}
}

val probe = TestProbe()
test("relay with new connection") { f =>
import f._

val relay = system.spawnAnonymous(MessageRelay())
relay ! MessageRelay.RelayMessage(switchboard.ref, bobId, message, probe.ref.toTyped)
val (_, message) = OnionMessages.buildMessage(randomKey(), randomKey(), Seq(IntermediateNode(aliceId)), Left(Recipient(bobId, None)), Nil)
relay ! MessageRelay.RelayMessage(switchboard.ref.toClassic, bobId, message, probe.ref)

val connectToNextPeer = switchboard.expectMsgType[Peer.Connect]
val connectToNextPeer = switchboard.expectMessageType[Peer.Connect]
assert(connectToNextPeer.nodeId === bobId)
connectToNextPeer.replyTo ! PeerConnection.ConnectionResult.Connected(peerConnection.ref)
peerConnection.expectMsgType[OnionMessage]
probe.expectMsg(MessageRelay.Success)
connectToNextPeer.replyTo ! PeerConnection.ConnectionResult.Connected(peerConnection.ref.toClassic)
peerConnection.expectMessage(message)
probe.expectMessage(MessageRelay.Success)
}

test("relay with existing connection") {
val switchboard = TestProbe()
val peerConnection = TestProbe()
test("relay with existing connection") { f =>
import f._

val (_, message) = OnionMessages.buildMessage(randomKey(), randomKey(), Seq(IntermediateNode(aliceId)), Left(Recipient(bobId, None)), Nil)
relay ! MessageRelay.RelayMessage(switchboard.ref.toClassic, bobId, message, probe.ref)

val probe = TestProbe()

val relay = system.spawnAnonymous(MessageRelay())
relay ! MessageRelay.RelayMessage(switchboard.ref, bobId, message, probe.ref.toTyped)

val connectToNextPeer = switchboard.expectMsgType[Peer.Connect]
val connectToNextPeer = switchboard.expectMessageType[Peer.Connect]
assert(connectToNextPeer.nodeId === bobId)
connectToNextPeer.replyTo ! PeerConnection.ConnectionResult.AlreadyConnected(peerConnection.ref)
peerConnection.expectMsgType[OnionMessage]
probe.expectMsg(MessageRelay.Success)
connectToNextPeer.replyTo ! PeerConnection.ConnectionResult.AlreadyConnected(peerConnection.ref.toClassic)
peerConnection.expectMessage(message)
probe.expectMessage(MessageRelay.Success)
}

test("can't open new connection") {
val switchboard = TestProbe()
val peerConnection = TestProbe()
test("can't open new connection") { f =>
import f._

val (_, message) = OnionMessages.buildMessage(randomKey(), randomKey(), Seq(IntermediateNode(aliceId)), Left(Recipient(bobId, None)), Nil)
relay ! MessageRelay.RelayMessage(switchboard.ref.toClassic, bobId, message, probe.ref)

val probe = TestProbe()

val relay = system.spawnAnonymous(MessageRelay())
relay ! MessageRelay.RelayMessage(switchboard.ref, bobId, message, probe.ref.toTyped)

val connectToNextPeer = switchboard.expectMsgType[Peer.Connect]
val connectToNextPeer = switchboard.expectMessageType[Peer.Connect]
assert(connectToNextPeer.nodeId === bobId)
connectToNextPeer.replyTo ! PeerConnection.ConnectionResult.NoAddressFound
probe.expectMsg(MessageRelay.Failure(PeerConnection.ConnectionResult.NoAddressFound))
probe.expectMessage(MessageRelay.Failure(PeerConnection.ConnectionResult.NoAddressFound))
}
}

0 comments on commit 42ff02b

Please sign in to comment.