Skip to content

Commit

Permalink
untype
Browse files Browse the repository at this point in the history
  • Loading branch information
thomash-acinq committed Nov 29, 2021
1 parent 5711a15 commit 1942b4c
Show file tree
Hide file tree
Showing 16 changed files with 43 additions and 55 deletions.
16 changes: 6 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package fr.acinq.eclair

import akka.actor.typed.scaladsl.AskPattern.{Askable, schedulerFromActorSystem}
import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, ClassicActorSystemOps, ClassicSchedulerOps}
import akka.actor.{ActorRef, typed}
import akka.actor.ActorRef
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.actor.typed.scaladsl.adapter.ClassicSchedulerOps
import akka.pattern._
import akka.util.Timeout
import com.softwaremill.quicklens.ModifyPimp
Expand Down Expand Up @@ -164,13 +164,9 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
// We constrain external identifiers. This allows uuid, long and pubkey to be used.
private val externalIdMaxLength = 66

override def connect(target: Either[NodeURI, PublicKey])(implicit timeout: Timeout): Future[String] = {
implicit val typedScheduler: typed.Scheduler = appKit.system.scheduler.toTyped
appKit.switchboard.toTyped[Peer.Connect]
.ask[PeerConnection.ConnectionResult](ref => target.fold(
uri => Peer.Connect(uri, ref),
nodeId => Peer.Connect(nodeId, None, ref))
).map(_.toString)
override def connect(target: Either[NodeURI, PublicKey])(implicit timeout: Timeout): Future[String] = target match {
case Left(uri) => (appKit.switchboard ? Peer.Connect(uri, ActorRef.noSender)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
case Right(pubKey) => (appKit.switchboard ? Peer.Connect(pubKey, None, ActorRef.noSender)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
}

override def disconnect(nodeId: PublicKey)(implicit timeout: Timeout): Future[String] = {
Expand Down
4 changes: 2 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import scala.concurrent.duration._
* Created by PM on 27/10/2015.
*
*/
class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, remoteAddress: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[typed.ActorRef[PeerConnection.ConnectionResult]]) extends Actor with DiagnosticActorLogging {
class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, remoteAddress: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef]) extends Actor with DiagnosticActorLogging {

import context.system

Expand Down Expand Up @@ -134,6 +134,6 @@ class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams],

object Client {

def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, address: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[typed.ActorRef[PeerConnection.ConnectionResult]]): Props = Props(new Client(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, address, remoteNodeId, origin_opt))
def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, address: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef]): Props = Props(new Client(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, address, remoteNodeId, origin_opt))

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package fr.acinq.eclair.io

import java.net.InetSocketAddress
import akka.actor.{Actor, ActorLogging, ActorRef, DeadLetter, Props, typed}

import akka.actor.{Actor, ActorLogging, ActorRef, DeadLetter, Props}
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Put
Expand Down Expand Up @@ -66,5 +67,5 @@ object ClientSpawner {

case class ConnectionRequest(address: InetSocketAddress,
remoteNodeId: PublicKey,
origin: typed.ActorRef[PeerConnection.ConnectionResult]) extends RemoteTypes
origin: ActorRef) extends RemoteTypes
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package fr.acinq.eclair.io

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import akka.actor.{ActorRef, typed}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.wire.protocol.OnionMessage
Expand All @@ -36,7 +37,7 @@ object MessageRelay {
def apply(): Behavior[Command] = {
Behaviors.receivePartial {
case (context, RelayMessage(switchboard, nextNodeId, msg, replyTo)) =>
switchboard ! Peer.Connect(nextNodeId, None, context.messageAdapter(WrappedConnectionResult))
switchboard ! Peer.Connect(nextNodeId, None, context.messageAdapter(WrappedConnectionResult).toClassic)
waitForConnection(msg, replyTo)
}
}
Expand Down
5 changes: 2 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package fr.acinq.eclair.io

import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, ClassicActorRefOps}
import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem, FSM, OneForOneStrategy, PossiblyHarmful, Props, Status, SupervisorStrategy, Terminated, typed}
import akka.event.Logging.MDC
import akka.event.{BusLogging, DiagnosticLoggingAdapter}
Expand Down Expand Up @@ -436,11 +435,11 @@ object Peer {
case object CONNECTED extends State

case class Init(storedChannels: Set[HasCommitments])
case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort], replyTo: typed.ActorRef[PeerConnection.ConnectionResult]) {
case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort], replyTo: ActorRef) {
def uri: Option[NodeURI] = address_opt.map(NodeURI(nodeId, _))
}
object Connect {
def apply(uri: NodeURI, replyTo: typed.ActorRef[PeerConnection.ConnectionResult]): Connect = new Connect(uri.nodeId, Some(uri.address), replyTo)
def apply(uri: NodeURI, replyTo: ActorRef): Connect = new Connect(uri.nodeId, Some(uri.address), replyTo)
}

case class Disconnect(nodeId: PublicKey) extends PossiblyHarmful
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package fr.acinq.eclair.io

import akka.actor.{ActorRef, FSM, OneForOneStrategy, PoisonPill, Props, SupervisorStrategy, Terminated, typed}
import akka.actor.{ActorRef, FSM, OneForOneStrategy, PoisonPill, Props, SupervisorStrategy, Terminated}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Crypto.PublicKey
Expand Down Expand Up @@ -506,7 +506,7 @@ object PeerConnection {
case object INITIALIZING extends State
case object CONNECTED extends State

case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[typed.ActorRef[PeerConnection.ConnectionResult]], transport_opt: Option[ActorRef] = None) {
case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None) {
def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance
}
case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey) extends RemoteTypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package fr.acinq.eclair.io

import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
import akka.actor.{ActorRef, Props, typed}
import akka.actor.{ActorRef, Props}
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Send
Expand Down Expand Up @@ -68,7 +67,7 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
// we query the db every time because it may have been updated in the meantime (e.g. with network announcements)
getPeerAddressFromDb(nodeParams.db.peers, nodeParams.db.network, remoteNodeId) match {
case Some(address) =>
connect(address, origin = self.toTyped)
connect(address, origin = self)
goto(CONNECTING) using ConnectingData(address, d.nextReconnectionDelay)
case None =>
// we don't have an address for that peer, nothing to do
Expand Down Expand Up @@ -149,7 +148,7 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
// activate the extension only on demand, so that tests pass
lazy val mediator = DistributedPubSub(context.system).mediator

private def connect(address: InetSocketAddress, origin: typed.ActorRef[PeerConnection.ConnectionResult]): Unit = {
private def connect(address: InetSocketAddress, origin: ActorRef): Unit = {
log.info(s"connecting to $address")
val req = ClientSpawner.ConnectionRequest(address, remoteNodeId, origin)
if (context.system.hasExtension(Cluster) && !address.getHostName.endsWith("onion")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,14 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
case Peer.Connect(publicKey, _, _) if publicKey == nodeParams.nodeId =>
sender() ! Status.Failure(new RuntimeException("cannot open connection with oneself"))

case c: Peer.Connect =>
case Peer.Connect(nodeId, address_opt, replyTo) =>
// we create a peer if it doesn't exist
val peer = createOrGetPeer(c.nodeId, offlineChannels = Set.empty)
val peer = createOrGetPeer(nodeId, offlineChannels = Set.empty)
val c = if (replyTo == ActorRef.noSender){
Peer.Connect(nodeId, address_opt, sender())
}else{
Peer.Connect(nodeId, address_opt, replyTo)
}
peer forward c

case d: Peer.Disconnect =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package fr.acinq.eclair.remote

import akka.actor.typed.scaladsl.adapter.{ClassicActorRefOps, TypedActorRefOps}
import akka.actor.{ActorRef, ExtendedActorSystem, typed}
import akka.actor.{ActorRef, ExtendedActorSystem}
import akka.serialization.Serialization
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.crypto.TransportHandler
Expand Down Expand Up @@ -130,9 +129,6 @@ object EclairInternalsSerializer {
(path: String) => system.provider.resolveActorRef(path),
(actor: ActorRef) => Serialization.serializedActorPath(actor))

def typedActorRefCodec[T](system: typed.ActorSystem[Nothing]): Codec[typed.ActorRef[T]] = variableSizeBytes(uint16, utf8).xmap(
(path: String) => ActorRefResolver.get(system).resolveActorRef(path),
(actor: typed.ActorRef[T]) => ActorRefResolver.get(system).toSerializationFormat(actor))
val inetAddressCodec: Codec[InetAddress] = discriminated[InetAddress].by(uint8)
.typecase(0, ipv4address)
.typecase(1, ipv6address)
Expand All @@ -142,7 +138,7 @@ object EclairInternalsSerializer {
def connectionRequestCodec(system: ExtendedActorSystem): Codec[ClientSpawner.ConnectionRequest] = (
("address" | inetSocketAddressCodec) ::
("remoteNodeId" | publicKey) ::
("origin" | typedActorRefCodec[PeerConnection.ConnectionResult](system))).as[ClientSpawner.ConnectionRequest]
("origin" | actorRefCodec(system))).as[ClientSpawner.ConnectionRequest]

def initializeConnectionCodec(system: ExtendedActorSystem): Codec[PeerConnection.InitializeConnection] = (
("peer" | actorRefCodec(system)) ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package fr.acinq.eclair.integration

import akka.actor.ActorRef
import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
import akka.pattern.pipe
import akka.testkit.TestProbe
import com.google.common.net.HostAndPort
Expand Down Expand Up @@ -532,7 +531,7 @@ class StandardChannelIntegrationSpec extends ChannelIntegrationSpec {
sender.send(fundee.switchboard, Peer.Connect(
nodeId = funder.nodeParams.nodeId,
address_opt = Some(HostAndPort.fromParts(funder.nodeParams.publicAddresses.head.socketAddress.getHostString, funder.nodeParams.publicAddresses.head.socketAddress.getPort)),
sender.ref.toTyped
sender.ref
))
sender.expectMsgType[PeerConnection.ConnectionResult.HasConnection](30 seconds)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package fr.acinq.eclair.integration

import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
import akka.testkit.{TestKit, TestProbe}
import com.google.common.net.HostAndPort
import com.typesafe.config.{Config, ConfigFactory}
Expand Down Expand Up @@ -156,7 +155,7 @@ abstract class IntegrationSpec extends TestKitBaseClass with BitcoindService wit
sender.send(node1.switchboard, Peer.Connect(
nodeId = node2.nodeParams.nodeId,
address_opt = Some(HostAndPort.fromParts(address.socketAddress.getHostString, address.socketAddress.getPort)),
sender.ref.toTyped
sender.ref
))
sender.expectMsgType[PeerConnection.ConnectionResult.HasConnection](10 seconds)
sender.send(node1.switchboard, Peer.OpenChannel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@

package fr.acinq.eclair.integration

import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
import akka.testkit.TestProbe
import akka.util.Timeout
import com.google.common.net.HostAndPort
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.Satoshi
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{Watch, WatchFundingConfirmed}
import fr.acinq.eclair.channel.{ChannelStateChanged, NORMAL}
import fr.acinq.eclair.channel.ChannelStateChanged
import fr.acinq.eclair.io.NodeURI
import fr.acinq.eclair.message.OnionMessages
import fr.acinq.eclair.router.Router
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package fr.acinq.eclair.io

import akka.actor.PoisonPill
import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
import akka.testkit.{TestFSMRef, TestProbe}
import fr.acinq.bitcoin.Block
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
Expand Down Expand Up @@ -107,7 +106,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(peerConnection)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref.toTyped), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
probe.expectTerminated(peerConnection, nodeParams.peerConnectionConf.authTimeout / transport.testKitSettings.TestTimeFactor + 1.second) // we don't want dilated time here
origin.expectMsg(PeerConnection.ConnectionResult.AuthenticationFailed("authentication timed out"))
}
Expand All @@ -117,7 +116,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(peerConnection)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref.toTyped), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true))
probe.expectTerminated(peerConnection, nodeParams.peerConnectionConf.initTimeout / transport.testKitSettings.TestTimeFactor + 1.second) // we don't want dilated time here
Expand All @@ -129,7 +128,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref.toTyped), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true))
transport.expectMsgType[TransportHandler.Listener]
Expand All @@ -145,7 +144,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref.toTyped), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true))
transport.expectMsgType[TransportHandler.Listener]
Expand All @@ -161,7 +160,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref.toTyped), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true))
transport.expectMsgType[TransportHandler.Listener]
Expand All @@ -178,7 +177,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val probe = TestProbe()
val origin = TestProbe()
probe.watch(transport.ref)
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref.toTyped), transport_opt = Some(transport.ref)))
probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref)))
transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId))
probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features, doSync = true))
transport.expectMsgType[TransportHandler.Listener]
Expand Down
7 changes: 3 additions & 4 deletions eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package fr.acinq.eclair.io

import akka.actor.Status.Failure
import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
import akka.actor.{ActorContext, ActorRef, FSM}
import akka.testkit.{TestFSMRef, TestProbe}
import com.google.common.net.HostAndPort
Expand Down Expand Up @@ -106,7 +105,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Paralle

val probe = TestProbe()
probe.send(peer, Peer.Init(Set.empty))
probe.send(peer, Peer.Connect(remoteNodeId, address_opt = None, probe.ref.toTyped))
probe.send(peer, Peer.Connect(remoteNodeId, address_opt = None, probe.ref))
probe.expectMsg(PeerConnection.ConnectionResult.NoAddressFound)
}

Expand All @@ -123,7 +122,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Paralle
val probe = TestProbe()
probe.send(peer, Peer.Init(Set.empty))
// we have auto-reconnect=false so we need to manually tell the peer to reconnect
probe.send(peer, Peer.Connect(remoteNodeId, Some(mockAddress), probe.ref.toTyped))
probe.send(peer, Peer.Connect(remoteNodeId, Some(mockAddress), probe.ref))

// assert our mock server got an incoming connection (the client was spawned with the address from node_announcement)
awaitCond(mockServer.accept() != null, max = 30 seconds, interval = 1 second)
Expand Down Expand Up @@ -158,7 +157,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Paralle
val probe = TestProbe()
connect(remoteNodeId, peer, peerConnection, switchboard, channels = Set(ChannelCodecsSpec.normal))

probe.send(peer, Peer.Connect(remoteNodeId, None, probe.ref.toTyped))
probe.send(peer, Peer.Connect(remoteNodeId, None, probe.ref))
probe.expectMsgType[PeerConnection.ConnectionResult.AlreadyConnected]
}

Expand Down
Loading

0 comments on commit 1942b4c

Please sign in to comment.