Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relay onion messages #2061

Merged
merged 14 commits into from
Nov 29, 2021
14 changes: 14 additions & 0 deletions docs/release-notes/eclair-vnext.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ Node operators should watch this file very regularly.
An event is also sent to the event stream for every such notification.
This lets plugins notify the node operator via external systems (push notifications, email, etc).

### Initial support for onion messages

Eclair now supports the feature `option_onion_messages`. If this feature is enabled, eclair will relay onion messages.
It can also send onion messages with the `sendonionmessage` API.
Messages sent to Eclair will be ignored.

### API changes

#### Timestamps
Expand Down Expand Up @@ -59,6 +65,14 @@ Examples:
}
}
```

#### Sending onion messages

You can now send onion messages with `sendonionmessage`.
It expects `--route` a list of `nodeId` to send the message through, the last one being the recipient, and `--content` the content of the message as an encoded TLV stream in hexadecimal.
It also accepts `--pathId` as an encoded TLV stream in hexadecimal.
Sending to a blinded route (as a reply to a previous message) is not supported.


This release contains many other API updates:

Expand Down
35 changes: 31 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.{IncomingPayment, OutgoingPayment}
import fr.acinq.eclair.io.Peer.{GetPeerInfo, PeerInfo}
import fr.acinq.eclair.io.{NodeURI, Peer, PeerConnection}
import fr.acinq.eclair.io.{MessageRelay, NodeURI, Peer, PeerConnection}
import fr.acinq.eclair.message.OnionMessages
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
import fr.acinq.eclair.payment.relay.Relayer.{GetOutgoingChannels, OutgoingChannels, RelayFees, UsableBalance}
Expand All @@ -46,13 +47,13 @@ import fr.acinq.eclair.router.{NetworkStats, Router}
import fr.acinq.eclair.wire.protocol._
import grizzled.slf4j.Logging
import scodec.bits.ByteVector
import scodec.{Attempt, DecodeResult, codecs}

import java.nio.charset.StandardCharsets
import java.util.UUID
import scala.collection.immutable.SortedMap
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.reflect.ClassTag

case class GetInfoResponse(version: String, nodeId: PublicKey, alias: String, color: String, features: Features, chainHash: ByteVector32, network: String, blockHeight: Int, publicAddresses: Seq[NodeAddress], onionAddress: Option[NodeAddress], instanceId: String)

Expand All @@ -62,6 +63,8 @@ case class SignedMessage(nodeId: PublicKey, message: String, signature: ByteVect

case class VerifiedMessage(valid: Boolean, publicKey: PublicKey)

case class SendOnionMessageResponse(sent: Boolean, failureMessage: Option[String])

object SignedMessage {
def signedBytes(message: ByteVector): ByteVector32 =
Crypto.hash256(ByteVector("Lightning Signed Message:".getBytes(StandardCharsets.UTF_8)) ++ message)
Expand Down Expand Up @@ -150,6 +153,8 @@ trait Eclair {
def signMessage(message: ByteVector): SignedMessage

def verifyMessage(message: ByteVector, recoverableSignature: ByteVector): VerifiedMessage

def sendOnionMessage(route: Seq[PublicKey], userCustomContent: ByteVector, pathId: Option[ByteVector])(implicit timeout: Timeout): Future[SendOnionMessageResponse]
}

class EclairImpl(appKit: Kit) extends Eclair with Logging {
Expand All @@ -160,8 +165,8 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
private val externalIdMaxLength = 66

override def connect(target: Either[NodeURI, PublicKey])(implicit timeout: Timeout): Future[String] = target match {
case Left(uri) => (appKit.switchboard ? Peer.Connect(uri)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
case Right(pubKey) => (appKit.switchboard ? Peer.Connect(pubKey, None)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
case Left(uri) => (appKit.switchboard ? Peer.Connect(uri, ActorRef.noSender)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
case Right(pubKey) => (appKit.switchboard ? Peer.Connect(pubKey, None, ActorRef.noSender)).mapTo[PeerConnection.ConnectionResult].map(_.toString)
}

override def disconnect(nodeId: PublicKey)(implicit timeout: Timeout): Future[String] = {
Expand Down Expand Up @@ -502,4 +507,26 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
}
}
}

override def sendOnionMessage(route: Seq[PublicKey], userCustomContent: ByteVector, pathId: Option[ByteVector])(implicit timeout: Timeout): Future[SendOnionMessageResponse] = {
val sessionKey = randomKey()
val blindingSecret = randomKey()
codecs.list(TlvCodecs.genericTlv).decode(userCustomContent.bits) match {
case Attempt.Successful(DecodeResult(userCustomTlvs, _)) =>
t-bast marked this conversation as resolved.
Show resolved Hide resolved
val (nextNodeId, message) =
OnionMessages.buildMessage(
sessionKey,
blindingSecret,
route.dropRight(1).map(OnionMessages.IntermediateNode(_)),
Left(OnionMessages.Recipient(route.last, pathId)),
Nil,
userCustomTlvs)
(appKit.switchboard ? OnionMessages.SendMessage(nextNodeId, message)).mapTo[MessageRelay.Status].map {
case MessageRelay.Success => SendOnionMessageResponse(sent = true, None)
case MessageRelay.Failure(f) => SendOnionMessageResponse(sent = false, Some(f.toString))
}
case Attempt.Failure(cause) => Future.successful(SendOnionMessageResponse(sent = false, Some(s"the `content` field is invalid, it must contain encoded tlvs: ${cause.message}")))
}

}
}
56 changes: 56 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/MessageRelay.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2021 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.io

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import akka.actor.{ActorRef, typed}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.wire.protocol.OnionMessage

object MessageRelay {
// @formatter:off
sealed trait Command
case class RelayMessage(switchboard: ActorRef, nextNodeId: PublicKey, msg: OnionMessage, replyTo: typed.ActorRef[Status]) extends Command
case class WrappedConnectionResult(result: PeerConnection.ConnectionResult) extends Command

sealed trait Status
case object Success extends Status
case class Failure(failure: PeerConnection.ConnectionResult.Failure) extends Status
// @formatter:on

def apply(): Behavior[Command] = {
Behaviors.receivePartial {
case (context, RelayMessage(switchboard, nextNodeId, msg, replyTo)) =>
switchboard ! Peer.Connect(nextNodeId, None, context.messageAdapter(WrappedConnectionResult).toClassic)
waitForConnection(msg, replyTo)
}
}

def waitForConnection(msg: OnionMessage, replyTo: typed.ActorRef[Status]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case WrappedConnectionResult(r: PeerConnection.ConnectionResult.HasConnection) =>
r.peerConnection ! msg
replyTo ! Success
Behaviors.stopped
case WrappedConnectionResult(f: PeerConnection.ConnectionResult.Failure) =>
replyTo ! Failure(f)
Behaviors.stopped
}
}
}
29 changes: 22 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ import fr.acinq.eclair.blockchain.{OnChainAddressGenerator, OnChainChannelFunder
import fr.acinq.eclair.channel._
import fr.acinq.eclair.io.Monitoring.Metrics
import fr.acinq.eclair.io.PeerConnection.KillReason
import fr.acinq.eclair.message.OnionMessages
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasTemporaryChannelId, LightningMessage, NodeAddress, RoutingMessage, UnknownMessage, Warning}
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasTemporaryChannelId, LightningMessage, NodeAddress, OnionMessage, RoutingMessage, UnknownMessage, Warning}
import scodec.bits.ByteVector

import java.net.InetSocketAddress
Expand All @@ -51,7 +52,7 @@ import scala.concurrent.ExecutionContext
*
* Created by PM on 26/08/2016.
*/
class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: Peer.ChannelFactory) extends FSMDiagnosticActorLogging[Peer.State, Peer.Data] {
class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: Peer.ChannelFactory, switchboard: ActorRef) extends FSMDiagnosticActorLogging[Peer.State, Peer.Data] {

import Peer._

Expand Down Expand Up @@ -98,8 +99,8 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainA

when(CONNECTED) {
dropStaleMessages {
case Event(_: Peer.Connect, _) =>
sender() ! PeerConnection.ConnectionResult.AlreadyConnected
case Event(c: Peer.Connect, d: ConnectedData) =>
c.replyTo ! PeerConnection.ConnectionResult.AlreadyConnected(d.peerConnection)
stay()

case Event(Peer.OutgoingMessage(msg, peerConnection), d: ConnectedData) if peerConnection == d.peerConnection => // this is an outgoing message, but we need to make sure that this is for the current active connection
Expand Down Expand Up @@ -243,6 +244,20 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainA
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
gotoConnected(connectionReady, d.channels)

case Event(msg: OnionMessage, _: ConnectedData) =>
if (nodeParams.features.hasFeature(Features.OnionMessages)) {
OnionMessages.process(nodeParams.privateKey, msg) match {
case OnionMessages.DropMessage(reason) =>
log.debug(s"dropping message from ${remoteNodeId.value.toHex}: ${reason.toString}")
case send: OnionMessages.SendMessage =>
switchboard ! send
case received: OnionMessages.ReceiveMessage =>
log.info(s"received message from ${remoteNodeId.value.toHex}: $received")
context.system.eventStream.publish(received)
}
}
stay()

case Event(unknownMsg: UnknownMessage, d: ConnectedData) if nodeParams.pluginMessageTags.contains(unknownMsg.tag) =>
context.system.eventStream.publish(UnknownMessageReceived(self, remoteNodeId, unknownMsg, d.connectionInfo))
stay()
Expand Down Expand Up @@ -392,7 +407,7 @@ object Peer {
context.actorOf(Channel.props(nodeParams, wallet, remoteNodeId, watcher, relayer, txPublisherFactory, origin_opt))
}

def props(nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: ChannelFactory): Props = Props(new Peer(nodeParams, remoteNodeId, wallet, channelFactory))
def props(nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainAddressGenerator, channelFactory: ChannelFactory, switchboard: ActorRef): Props = Props(new Peer(nodeParams, remoteNodeId, wallet, channelFactory, switchboard))

// @formatter:off

Expand Down Expand Up @@ -420,11 +435,11 @@ object Peer {
case object CONNECTED extends State

case class Init(storedChannels: Set[HasCommitments])
case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort]) {
case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort], replyTo: ActorRef) {
def uri: Option[NodeURI] = address_opt.map(NodeURI(nodeId, _))
}
object Connect {
def apply(uri: NodeURI): Connect = new Connect(uri.nodeId, Some(uri.address))
def apply(uri: NodeURI, replyTo: ActorRef): Connect = new Connect(uri.nodeId, Some(uri.address), replyTo)
}

case class Disconnect(nodeId: PublicKey) extends PossiblyHarmful
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
} else {
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initialized).increment()
d.peer ! ConnectionReady(self, d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit)
d.pendingAuth.origin_opt.foreach(_ ! ConnectionResult.Connected)
d.pendingAuth.origin_opt.foreach(_ ! ConnectionResult.Connected(self))

if (d.doSync) {
self ! DoSync(replacePrevious = true)
Expand Down Expand Up @@ -517,13 +517,16 @@ object PeerConnection {
object ConnectionResult {
sealed trait Success extends ConnectionResult
sealed trait Failure extends ConnectionResult
sealed trait HasConnection extends ConnectionResult {
val peerConnection: ActorRef
}

case object NoAddressFound extends ConnectionResult.Failure { override def toString: String = "no address found" }
case class ConnectionFailed(address: InetSocketAddress) extends ConnectionResult.Failure { override def toString: String = s"connection failed to $address" }
case class AuthenticationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason }
case class InitializationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason }
case object AlreadyConnected extends ConnectionResult.Failure { override def toString: String = "already connected" }
case object Connected extends ConnectionResult.Success { override def toString: String = "connected" }
case class AlreadyConnected(peerConnection: ActorRef) extends ConnectionResult.Failure with HasConnection { override def toString: String = "already connected" }
case class Connected(peerConnection: ActorRef) extends ConnectionResult.Success with HasConnection { override def toString: String = "connected" }
}

case class DelayedRebroadcast(rebroadcast: Rebroadcast)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package fr.acinq.eclair.io

import java.net.InetSocketAddress
import akka.actor.{ActorRef, Props}
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
Expand All @@ -27,8 +26,9 @@ import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.db.{NetworkDb, PeersDb}
import fr.acinq.eclair.io.Monitoring.Metrics
import fr.acinq.eclair.{FSMDiagnosticActorLogging, Logs, NodeParams, TimestampMilli, TimestampSecond}
import fr.acinq.eclair.{FSMDiagnosticActorLogging, Logs, NodeParams, TimestampMilli}

import java.net.InetSocketAddress
import scala.concurrent.duration.{FiniteDuration, _}
import scala.util.Random

Expand Down Expand Up @@ -130,15 +130,15 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends

case Event(TickReconnect, _) => stay()

case Event(Peer.Connect(_, hostAndPort_opt), _) =>
case Event(Peer.Connect(_, hostAndPort_opt, replyTo), _) =>
// manual connection requests happen completely independently of the automated reconnection process;
// we initiate a connection but don't modify our state.
// if we are already connecting/connected, the peer will kill any duplicate connections
hostAndPort_opt
.map(hostAndPort2InetSocketAddress)
.orElse(getPeerAddressFromDb(nodeParams.db.peers, nodeParams.db.network, remoteNodeId)) match {
case Some(address) => connect(address, origin = sender())
case None => sender() ! PeerConnection.ConnectionResult.NoAddressFound
case Some(address) => connect(address, origin = replyTo)
case None => replyTo ! PeerConnection.ConnectionResult.NoAddressFound
}
stay()
}
Expand Down
19 changes: 15 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package fr.acinq.eclair.io

import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, ClassicActorRefOps}
import akka.actor.{Actor, ActorContext, ActorLogging, ActorRef, OneForOneStrategy, Props, Status, SupervisorStrategy}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.blockchain.OnChainAddressGenerator
import fr.acinq.eclair.channel.Helpers.Closing
import fr.acinq.eclair.channel._
import fr.acinq.eclair.message.OnionMessages
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.router.Router.RouterConf

Expand Down Expand Up @@ -57,12 +59,17 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)

def normal(peersWithChannels: Set[PublicKey]): Receive = {

case Peer.Connect(publicKey, _) if publicKey == nodeParams.nodeId =>
case Peer.Connect(publicKey, _, _) if publicKey == nodeParams.nodeId =>
sender() ! Status.Failure(new RuntimeException("cannot open connection with oneself"))

case c: Peer.Connect =>
case Peer.Connect(nodeId, address_opt, replyTo) =>
// we create a peer if it doesn't exist
val peer = createOrGetPeer(c.nodeId, offlineChannels = Set.empty)
val peer = createOrGetPeer(nodeId, offlineChannels = Set.empty)
val c = if (replyTo == ActorRef.noSender){
Peer.Connect(nodeId, address_opt, sender())
}else{
Peer.Connect(nodeId, address_opt, replyTo)
}
peer forward c

case d: Peer.Disconnect =>
Expand Down Expand Up @@ -92,6 +99,10 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
case Symbol("peers") => sender() ! context.children

case GetRouterPeerConf => sender() ! RouterPeerConf(nodeParams.routerConf, nodeParams.peerConnectionConf)

case OnionMessages.SendMessage(nextNodeId, dataToRelay) =>
val relay = context.spawnAnonymous(MessageRelay())
relay ! MessageRelay.RelayMessage(self, nextNodeId, dataToRelay, sender().toTyped)
}

/**
Expand Down Expand Up @@ -131,7 +142,7 @@ object Switchboard {

case class SimplePeerFactory(nodeParams: NodeParams, wallet: OnChainAddressGenerator, channelFactory: Peer.ChannelFactory) extends PeerFactory {
override def spawn(context: ActorContext, remoteNodeId: PublicKey): ActorRef =
context.actorOf(Peer.props(nodeParams, remoteNodeId, wallet, channelFactory), name = peerActorName(remoteNodeId))
context.actorOf(Peer.props(nodeParams, remoteNodeId, wallet, channelFactory, context.self), name = peerActorName(remoteNodeId))
}

def props(nodeParams: NodeParams, peerFactory: PeerFactory) = Props(new Switchboard(nodeParams, peerFactory))
Expand Down
Loading