Skip to content

Commit

Permalink
Relay onion messages
Browse files Browse the repository at this point in the history
  • Loading branch information
thomash-acinq committed Sep 27, 2021
1 parent e9a3574 commit 0024ae4
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 13 deletions.
5 changes: 5 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ eclair {
option_anchor_outputs = disabled
option_anchors_zero_fee_htlc_tx = disabled
option_shutdown_anysegwit = optional
option_onion_messages = optional
trampoline_payment = disabled
keysend = disabled
}
Expand Down Expand Up @@ -387,6 +388,10 @@ eclair {
"mempool.space"
]
}

onion-messages {
rate-limit-per-second = 10
}
}

akka {
Expand Down
6 changes: 6 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/Features.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ object Features {
val mandatory = 26
}

case object OnionMessages extends Feature {
val rfcName = "option_onion_messages"
val mandatory = 38
}

// TODO: @t-bast: update feature bits once spec-ed (currently reserved here: https://github.com/lightningnetwork/lightning-rfc/issues/605)
// We're not advertising these bits yet in our announcements, clients have to assume support.
// This is why we haven't added them yet to `areSupported`.
Expand Down Expand Up @@ -231,6 +236,7 @@ object Features {
AnchorOutputs,
AnchorOutputsZeroFeeHtlcTx,
ShutdownAnySegwit,
OnionMessages,
KeySend
)

Expand Down
6 changes: 4 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
maxPaymentAttempts: Int,
enableTrampolinePayment: Boolean,
balanceCheckInterval: FiniteDuration,
blockchainWatchdogSources: Seq[String]) {
blockchainWatchdogSources: Seq[String],
onionMessageRateLimitPerSecond: Double) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey

val nodeId: PublicKey = nodeKeyManager.nodeId
Expand Down Expand Up @@ -447,7 +448,8 @@ object NodeParams extends Logging {
maxPaymentAttempts = config.getInt("max-payment-attempts"),
enableTrampolinePayment = config.getBoolean("trampoline-payments-enable"),
balanceCheckInterval = FiniteDuration(config.getDuration("balance-check-interval").getSeconds, TimeUnit.SECONDS),
blockchainWatchdogSources = config.getStringList("blockchain-watchdog.sources").asScala.toSeq
blockchainWatchdogSources = config.getStringList("blockchain-watchdog.sources").asScala.toSeq,
onionMessageRateLimitPerSecond = config.getDouble("onion-messages.rate-limit-per-second")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import fr.acinq.eclair.wire.protocol._
import grizzled.slf4j.Logging
import scodec.Attempt
import scodec.bits.ByteVector
import scodec.codecs.provide

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -272,7 +273,7 @@ object Sphinx extends Logging {
* When an invalid onion is received, its hash should be included in the failure message.
*/
def hash(onion: protocol.OnionRoutingPacket): ByteVector32 =
Crypto.sha256(OnionCodecs.onionRoutingPacketCodec(onion.payload.length.toInt).encode(onion).require.toByteVector)
Crypto.sha256(OnionCodecs.onionRoutingPacketCodec(provide(onion.payload.length.toInt)).encode(onion).require.toByteVector)

}

Expand All @@ -291,6 +292,11 @@ object Sphinx extends Logging {
override val PayloadLength = 400
}

/**
* A message onion packet is used when requesting/sending an invoice from/to a remote node when using offers (BOLT12).
*/
case class MessagePacket(PayloadLength: Int) extends OnionRoutingPacket[Onion.MessagePacket]

/**
* A properly decrypted failure from a node in the route.
*
Expand Down
44 changes: 43 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import akka.event.Logging.MDC
import akka.event.{BusLogging, DiagnosticLoggingAdapter}
import akka.util.Timeout
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.RateLimiter
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, Satoshi, SatoshiLong, Script}
import fr.acinq.eclair.Features.Wumbo
Expand All @@ -31,15 +32,19 @@ import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.blockchain.{OnChainAddressGenerator, OnChainChannelFunder}
import fr.acinq.eclair.channel.ChannelTypes.UnsupportedChannelType
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.io.Monitoring.Metrics
import fr.acinq.eclair.io.PeerConnection.KillReason
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasTemporaryChannelId, LightningMessage, NodeAddress, RoutingMessage, UnknownMessage, Warning}
import fr.acinq.eclair.wire.protocol.Onion.{MessageFinalPayload, MessageRelayPayload}
import fr.acinq.eclair.wire.protocol.{BadOnion, Error, HasChannelId, HasTemporaryChannelId, LightningMessage, NodeAddress, OnionCodecs, OnionMessage, RoutingMessage, UnknownMessage, Warning}
import scodec.bits.ByteVector
import scodec.{Attempt, DecodeResult}

import java.net.InetSocketAddress
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

/**
* This actor represents a logical peer. There is one [[Peer]] per unique remote node id at all time.
Expand All @@ -55,6 +60,8 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainA

import Peer._

private val messageRelayRateLimiter = RateLimiter.create(nodeParams.onionMessageRateLimitPerSecond)

startWith(INSTANTIATING, Nothing)

when(INSTANTIATING) {
Expand Down Expand Up @@ -245,6 +252,16 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainA
d.channels.values.toSet[ActorRef].foreach(_ ! INPUT_DISCONNECTED) // we deduplicate with toSet because there might be two entries per channel (tmp id and final id)
gotoConnected(connectionReady, d.channels)

case Event(msg: OnionMessage, d: ConnectedData) =>
if (nodeParams.features.hasFeature(Features.OnionMessages) && messageRelayRateLimiter.tryAcquire()) {
relayOnionMessage(msg)
}
stay()

case Event(Peer.SendOnionMessage(toNodeId, msg), d: ConnectedData) if toNodeId == remoteNodeId =>
d.peerConnection ! msg
stay()

case Event(unknownMsg: UnknownMessage, d: ConnectedData) if nodeParams.pluginMessageTags.contains(unknownMsg.tag) =>
context.system.eventStream.publish(UnknownMessageReceived(self, remoteNodeId, unknownMsg, d.connectionInfo))
stay()
Expand All @@ -255,6 +272,29 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainA
}
}

private def relayOnionMessage(msg: OnionMessage) = {
val packetType = Sphinx.MessagePacket(msg.onionRoutingPacket.payload.length.toInt)
val blindedPrivKateKey = Sphinx.RouteBlinding.derivePrivateKey(nodeParams.privateKey, msg.blindingKey)
packetType.peel(blindedPrivKateKey, ByteVector.empty, msg.onionRoutingPacket) match {
case Left(_: BadOnion) => () // We ignore bad messages
case Right(p@Sphinx.DecryptedPacket(payload, nextPacket, _)) => (OnionCodecs.perHopPayloadCodecByPacketType(packetType, p.isLastPacket).decode(payload.bits): @unchecked) match {
case Attempt.Successful(DecodeResult(relayPayload: MessageRelayPayload, _)) =>
Sphinx.RouteBlinding.decryptPayload(nodeParams.privateKey, msg.blindingKey, relayPayload.blindedTlv) match {
case Success((decrypted, nextBlindingKey)) =>
OnionCodecs.messageRelayNextCodec.decode(decrypted.bits) match {
case Attempt.Successful(DecodeResult(relayNext, _)) =>
val toRelay = OnionMessage(relayNext.nextBlinding.getOrElse(nextBlindingKey), nextPacket)
context.parent ! Peer.SendOnionMessage(relayNext.nextNodeId, toRelay)
case Attempt.Failure(_) => () // We ignore bad messages
}
case Failure(_) => () // We ignore bad messages
}
case Attempt.Successful(DecodeResult(finalPayload: MessageFinalPayload, _)) => () // We only relay messages
case Attempt.Failure(_) => () // We ignore bad messages
}
}
}

whenUnhandled {
case Event(_: Peer.OpenChannel, _) =>
sender() ! Status.Failure(new RuntimeException("not connected"))
Expand Down Expand Up @@ -430,6 +470,8 @@ object Peer {
def apply(uri: NodeURI): Connect = new Connect(uri.nodeId, Some(uri.address))
}

case class SendOnionMessage(nodeId: PublicKey, message: OnionMessage) extends PossiblyHarmful

case class Disconnect(nodeId: PublicKey) extends PossiblyHarmful
case class OpenChannel(remoteNodeId: PublicKey, fundingSatoshis: Satoshi, pushMsat: MilliSatoshi, channelType_opt: Option[SupportedChannelType], fundingTxFeeratePerKw_opt: Option[FeeratePerKw], channelFlags: Option[Byte], timeout_opt: Option[Timeout]) extends PossiblyHarmful {
require(pushMsat <= fundingSatoshis, s"pushMsat must be less or equal to fundingSatoshis")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
case None => sender() ! Status.Failure(new RuntimeException("peer not found"))
}

case s@Peer.SendOnionMessage(nodeId, _) =>
val peer = createOrGetPeer(nodeId, offlineChannels = Set.empty)
peer forward s

case o: Peer.OpenChannel =>
getPeer(o.remoteNodeId) match {
case Some(peer) => peer forward o
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package fr.acinq.eclair.wire.protocol

import fr.acinq.eclair.wire.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.wire.protocol.CommonCodecs._
import fr.acinq.eclair.wire.protocol.OnionCodecs.onionRoutingPacketCodec
import fr.acinq.eclair.{Features, KamonExt}
import scodec.bits.{BitVector, ByteVector}
import scodec.codecs._
Expand Down Expand Up @@ -308,6 +309,11 @@ object LightningMessageCodecs {
("timestampRange" | uint32) ::
("tlvStream" | GossipTimestampFilterTlv.gossipTimestampFilterTlvCodec)).as[GossipTimestampFilter]

val onionMessageCodec: Codec[OnionMessage] = (
("blindingKey" | publicKey) ::
("onionPacket" | OnionCodecs.messageOnionPacketCodec) ::
("tlvStream" | OnionMessageTlv.onionMessageTlvCodec)).as[OnionMessage]

// NB: blank lines to minimize merge conflicts

//
Expand Down Expand Up @@ -361,6 +367,7 @@ object LightningMessageCodecs {
.typecase(263, queryChannelRangeCodec)
.typecase(264, replyChannelRangeCodec)
.typecase(265, gossipTimestampFilterCodec)
.typecase(387, onionMessageCodec)
// NB: blank lines to minimize merge conflicts

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ object ReplyChannelRange {

case class GossipTimestampFilter(chainHash: ByteVector32, firstTimestamp: Long, timestampRange: Long, tlvStream: TlvStream[GossipTimestampFilterTlv] = TlvStream.empty) extends RoutingMessage with HasChainHash

case class OnionMessage(blindingKey: PublicKey, onionRoutingPacket: OnionRoutingPacket, tlvStream: TlvStream[OnionMessageTlv] = TlvStream.empty) extends LightningMessage

// NB: blank lines to minimize merge conflicts

//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ object OnionTlv {
/** Pre-image included by the sender of a payment in case of a donation */
case class KeySend(paymentPreimage: ByteVector32) extends OnionTlv

case class ReplyHop(nodeId: PublicKey, encTlv: ByteVector)

case class ReplyPath(firstNodeId: PublicKey, blinding: PublicKey, path: Seq[ReplyHop]) extends OnionTlv

case class EncTlv(bytes: ByteVector) extends OnionTlv

case class Blinding(blindingKey: PublicKey) extends OnionTlv

}

object Onion {
Expand Down Expand Up @@ -218,6 +226,9 @@ object Onion {
/** See [[fr.acinq.eclair.crypto.Sphinx.TrampolinePacket]]. */
sealed trait TrampolinePacket extends PacketType

/** See [[fr.acinq.eclair.crypto.Sphinx.MessagePacket]]. */
sealed trait MessagePacket extends PacketType

/** Per-hop payload from an HTLC's payment onion (after decryption and decoding). */
sealed trait PerHopPayload

Expand Down Expand Up @@ -281,6 +292,17 @@ object Onion {
override val paymentPreimage = records.get[KeySend].map(_.paymentPreimage)
}

case class MessageRelayPayload(records: TlvStream[OnionTlv]) extends MessagePacket with TlvFormat {
val blindedTlv: ByteVector = records.get[EncTlv].get.bytes
}

case class MessageFinalPayload(records: TlvStream[OnionTlv]) extends MessagePacket with TlvFormat

case class MessageRelayNext(records: TlvStream[OnionTlv]) {
val nextNodeId: PublicKey = records.get[OutgoingNodeId].get.nodeId
val nextBlinding: Option[PublicKey] = records.get[Blinding].map(_.blindingKey)
}

def createNodeRelayPayload(amount: MilliSatoshi, expiry: CltvExpiry, nextNodeId: PublicKey): NodeRelayPayload =
NodeRelayPayload(TlvStream(AmountToForward(amount), OutgoingCltv(expiry), OutgoingNodeId(nextNodeId)))

Expand Down Expand Up @@ -310,15 +332,18 @@ object OnionCodecs {
import scodec.codecs._
import scodec.{Attempt, Codec, DecodeResult, Decoder, Err}

def onionRoutingPacketCodec(payloadLength: Int): Codec[OnionRoutingPacket] = (
("version" | uint8) ::
("publicKey" | bytes(33)) ::
("onionPayload" | bytes(payloadLength)) ::
("hmac" | bytes32)).as[OnionRoutingPacket]
def onionRoutingPacketCodec(payloadLength: Codec[Int]): Codec[OnionRoutingPacket] = (
variableSizePrefixedBytes(payloadLength,
("version" | uint8) ~
("publicKey" | bytes(33)),
("onionPayload" | bytes)) ~
("hmac" | bytes32) flattenLeftPairs).as[OnionRoutingPacket]

val paymentOnionPacketCodec: Codec[OnionRoutingPacket] = onionRoutingPacketCodec(provide(Sphinx.PaymentPacket.PayloadLength))

val paymentOnionPacketCodec: Codec[OnionRoutingPacket] = onionRoutingPacketCodec(Sphinx.PaymentPacket.PayloadLength)
val trampolineOnionPacketCodec: Codec[OnionRoutingPacket] = onionRoutingPacketCodec(provide(Sphinx.TrampolinePacket.PayloadLength))

val trampolineOnionPacketCodec: Codec[OnionRoutingPacket] = onionRoutingPacketCodec(Sphinx.TrampolinePacket.PayloadLength)
val messageOnionPacketCodec: Codec[OnionRoutingPacket] = onionRoutingPacketCodec(uint16.xmap(_ - 66, _ + 66))

/**
* The 1.1 BOLT spec changed the onion frame format to use variable-length per-hop payloads.
Expand Down Expand Up @@ -412,9 +437,47 @@ object OnionCodecs {
case FinalTlvPayload(tlvs) => tlvs
})

val replyHopCodec: Codec[ReplyHop] = (("nodeId" | publicKey) :: ("encTlv" | variableSizeBytes(uint16, bytes))).as[ReplyHop]

val replyPathCodec: Codec[ReplyPath] = (("firstNodeId" | publicKey) :: ("blinding" | publicKey) :: ("path" | list(replyHopCodec).xmap[Seq[ReplyHop]](_.toSeq, _.toList))).as[ReplyPath]

val encTlvCodec: Codec[EncTlv] = bytes.as[EncTlv]

private val messageTlvCodec = discriminated[OnionTlv].by(varint)
.typecase(UInt64(8), replyPathCodec)
.typecase(UInt64(10), encTlvCodec)

val messagePerHopPayloadCodec: Codec[TlvStream[OnionTlv]] = TlvCodecs.lengthPrefixedTlvStream[OnionTlv](messageTlvCodec).complete

val messageRelayPayloadCodec: Codec[MessageRelayPayload] = messagePerHopPayloadCodec.narrow({
case tlvs if tlvs.get[EncTlv].isEmpty => Attempt.failure(MissingRequiredTlv(UInt64(10)))
case tlvs => Attempt.successful(MessageRelayPayload(tlvs))
}, {
case MessageRelayPayload(tlvs) => tlvs
})

val messageFinalPayloadCodec: Codec[MessageFinalPayload] = messagePerHopPayloadCodec.narrow({
case tlvs => Attempt.successful(MessageFinalPayload(tlvs))
}, {
case MessageFinalPayload(tlvs) => tlvs
})

def perHopPayloadCodecByPacketType[T <: PacketType](packetType: Sphinx.OnionRoutingPacket[T], isLastPacket: Boolean): Codec[PacketType] = packetType match {
case Sphinx.PaymentPacket => if (isLastPacket) finalPerHopPayloadCodec.upcast[PacketType] else channelRelayPerHopPayloadCodec.upcast[PacketType]
case Sphinx.TrampolinePacket => if (isLastPacket) finalPerHopPayloadCodec.upcast[PacketType] else nodeRelayPerHopPayloadCodec.upcast[PacketType]
case Sphinx.MessagePacket(payloadLength) => if (isLastPacket) messageFinalPayloadCodec.upcast[PacketType] else messageRelayPayloadCodec.upcast[PacketType]
}

private val blindingKey: Codec[Blinding] = variableSizeBytesLong(varintoverflow, "blinding" | publicKey).as[Blinding]

val messageRelayNextCodec: Codec[MessageRelayNext] = TlvCodecs.lengthPrefixedTlvStream[OnionTlv](
discriminated[OnionTlv].by(varint)
.typecase(UInt64(4), outgoingNodeId)
.typecase(UInt64(12), blindingKey)).complete
.narrow({
case tlvs if tlvs.get[OutgoingNodeId].isEmpty => Attempt.failure(MissingRequiredTlv(UInt64(4)))
case tlvs => Attempt.successful(MessageRelayNext(tlvs))
}, {
case MessageRelayNext(tlvs) => tlvs
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2021 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.wire.protocol

import fr.acinq.eclair.wire.protocol.CommonCodecs.varint
import fr.acinq.eclair.wire.protocol.TlvCodecs.tlvStream
import scodec.Codec
import scodec.codecs.discriminated

/**
* Created by thomash on 10/09/2021.
*/

sealed trait OnionMessageTlv extends Tlv

object OnionMessageTlv {
val onionMessageTlvCodec: Codec[TlvStream[OnionMessageTlv]] = tlvStream(discriminated[OnionMessageTlv].by(varint))
}
Loading

0 comments on commit 0024ae4

Please sign in to comment.