Skip to content

Commit

Permalink
router: index private channels on channelId
Browse files Browse the repository at this point in the history
  • Loading branch information
pm47 committed May 13, 2022
1 parent e09360e commit b016368
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,22 @@ object RouteCalculation {
case PredefinedChannelRoute(targetNodeId, shortChannelIds) =>
val (end, hops) = shortChannelIds.foldLeft((localNodeId, Seq.empty[ChannelHop])) {
case ((currentNode, previousHops), shortChannelId) =>
val channelDesc_opt = d.channels.get(shortChannelId).flatMap(c => currentNode match {
case c.ann.nodeId1 => Some(ChannelDesc(shortChannelId, c.ann.nodeId1, c.ann.nodeId2))
case c.ann.nodeId2 => Some(ChannelDesc(shortChannelId, c.ann.nodeId2, c.ann.nodeId1))
case _ => None
}).orElse(d.privateChannels.get(shortChannelId).flatMap(c => currentNode match {
case c.nodeId1 => Some(ChannelDesc(shortChannelId, c.nodeId1, c.nodeId2))
case c.nodeId2 => Some(ChannelDesc(shortChannelId, c.nodeId2, c.nodeId1))
case _ => None
})).orElse(assistedChannels.get(shortChannelId).flatMap(c => currentNode match {
case c.nodeId => Some(ChannelDesc(shortChannelId, c.nodeId, c.nextNodeId))
case _ => None
}))
val channelDesc_opt = d.resolve(shortChannelId) match {
case Some(c: PublicChannel) => currentNode match {
case c.nodeId1 => Some(ChannelDesc(shortChannelId, c.nodeId1, c.nodeId2))
case c.nodeId2 => Some(ChannelDesc(shortChannelId, c.nodeId2, c.nodeId1))
case _ => None
}
case Some(c: PrivateChannel) => currentNode match {
case c.nodeId1 => Some(ChannelDesc(c.shortChannelId, c.nodeId1, c.nodeId2))
case c.nodeId2 => Some(ChannelDesc(c.shortChannelId, c.nodeId2, c.nodeId1))
case _ => None
}
case None => assistedChannels.get(shortChannelId).flatMap(c => currentNode match {
case c.nodeId => Some(ChannelDesc(shortChannelId, c.nodeId, c.nextNodeId))
case _ => None
})
}
channelDesc_opt.flatMap(c => g.getEdge(c)) match {
case Some(edge) => (edge.desc.b, previousHops :+ ChannelHop(edge.desc.shortChannelId, edge.desc.a, edge.desc.b, edge.params))
case None => (currentNode, previousHops)
Expand Down
34 changes: 28 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import akka.event.Logging.MDC
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.ShortChannelId.outputIndex
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered}
Expand Down Expand Up @@ -105,7 +106,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm

log.info(s"initialization completed, ready to process messages")
Try(initialized.map(_.success(Done)))
startWith(NORMAL, Data(initNodes, initChannels, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty))
startWith(NORMAL, Data(initNodes, initChannels, Stash(Map.empty, Map.empty), rebroadcast = Rebroadcast(channels = Map.empty, updates = Map.empty, nodes = Map.empty), awaiting = Map.empty, privateChannels = Map.empty, resolveScid = Map.empty, excludedChannels = Set.empty, graph, sync = Map.empty))
}

when(NORMAL) {
Expand Down Expand Up @@ -192,9 +193,9 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm

case Event(PrintChannelUpdates, d) =>
println("public:")
d.channels.foreach { case (scid, pc) => println(s"$scid updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") }
d.channels.foreach { case (scid, pc) => println(s"$scid -> ${pc.channelId} updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") }
println("private:")
d.privateChannels.foreach { case (scid, pc) => println(s"$scid updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") }
d.privateChannels.foreach { case (scid, pc) => println(s"$scid -> ${pc.channelId} updates=${(pc.update_1_opt.toSeq ++ pc.update_2_opt.toSeq).size}") }
println("---------------------------------------------------")
stay()

Expand Down Expand Up @@ -331,6 +332,8 @@ object Router {
case class ChannelMeta(balance1: MilliSatoshi, balance2: MilliSatoshi)
sealed trait KnownChannel {
val capacity: Satoshi
val nodeId1: PublicKey
val nodeId2: PublicKey
def getNodeIdSameSideAs(u: ChannelUpdate): PublicKey
def getChannelUpdateSameSideAs(u: ChannelUpdate): Option[ChannelUpdate]
def getBalanceSameSideAs(u: ChannelUpdate): Option[MilliSatoshi]
Expand All @@ -342,6 +345,10 @@ object Router {
update_1_opt.foreach(u => assert(u.channelFlags.isNode1))
update_2_opt.foreach(u => assert(!u.channelFlags.isNode1))

val nodeId1: PublicKey = ann.nodeId1
val nodeId2: PublicKey = ann.nodeId2
def shortChannelId: ShortChannelId = ann.shortChannelId
def channelId: ByteVector32 = toLongId(fundingTxid.reverse, outputIndex(ann.shortChannelId))
def getNodeIdSameSideAs(u: ChannelUpdate): PublicKey = if (u.channelFlags.isNode1) ann.nodeId1 else ann.nodeId2
def getChannelUpdateSameSideAs(u: ChannelUpdate): Option[ChannelUpdate] = if (u.channelFlags.isNode1) update_1_opt else update_2_opt
def getBalanceSameSideAs(u: ChannelUpdate): Option[MilliSatoshi] = if (u.channelFlags.isNode1) meta_opt.map(_.balance1) else meta_opt.map(_.balance2)
Expand All @@ -356,7 +363,7 @@ object Router {
case Right(rcu) => updateChannelUpdateSameSideAs(rcu.channelUpdate)
}
}
case class PrivateChannel(localNodeId: PublicKey, remoteNodeId: PublicKey, update_1_opt: Option[ChannelUpdate], update_2_opt: Option[ChannelUpdate], meta: ChannelMeta) extends KnownChannel {
case class PrivateChannel(shortChannelId: ShortChannelId, channelId: ByteVector32, localNodeId: PublicKey, remoteNodeId: PublicKey, update_1_opt: Option[ChannelUpdate], update_2_opt: Option[ChannelUpdate], meta: ChannelMeta) extends KnownChannel {
val (nodeId1, nodeId2) = if (Announcements.isNode1(localNodeId, remoteNodeId)) (localNodeId, remoteNodeId) else (remoteNodeId, localNodeId)
val capacity: Satoshi = (meta.balance1 + meta.balance2).truncateToSatoshi

Expand Down Expand Up @@ -617,11 +624,26 @@ object Router {
stash: Stash,
rebroadcast: Rebroadcast,
awaiting: Map[ChannelAnnouncement, Seq[GossipOrigin]], // note: this is a seq because we want to preserve order: first actor is the one who we need to send a tcp-ack when validation is done
privateChannels: Map[ShortChannelId, PrivateChannel],
privateChannels: Map[ByteVector32, PrivateChannel], // indexed by channel id
resolveScid: Map[ShortChannelId, ByteVector32], // scid to channel_id
excludedChannels: Set[ChannelDesc], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graph: DirectedGraph,
sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
)
) {

def resolve(scid: ShortChannelId): Option[KnownChannel] = {
// let's assume this is a real scid
channels.get(scid) match {
case Some(publicChannel) => Some(publicChannel)
case None =>
// maybe it's an alias or a real scid
resolveScid.get(scid).flatMap(privateChannels.get) match {
case Some(privateChannel) => Some(privateChannel)
case None => None
}
}
}
}

// @formatter:off
sealed trait State
Expand Down
Loading

0 comments on commit b016368

Please sign in to comment.