diff --git a/channeldb/channel.go b/channeldb/channel.go index bc86b3b59f..613207f9f1 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -2783,6 +2783,17 @@ func (c *OpenChannel) AppendRemoteCommitChain(diff *CommitDiff) error { // prevents the same fails and settles from being retransmitted // after restarts. The actual fail or settle we need to // propagate to the remote party is now in the commit diff. + // + // NOTE(1/20/23): We don't actually do any internal acknowlegment + // to the outgoing link here anymore! Ever since we moved to + // pipeline (ie: immediately send without waiting for commitment + // tx update) settles/fails, this is often (always?) called with empty + // SettleFailReferences. Instead settles/fails are acked in + // batches periodically by the Switch. I guess the Settle/Fail + // will be resent internally until that batch internal + // acknowledgement occurs. + // https://github.com/lightningnetwork/lnd/pull/3143#discussion_r304190259 + fmt.Println("[AppendRemoteCommitChain]: AckSettleFails...") err = c.Packager.AckSettleFails(tx, diff.SettleFailAcks...) if err != nil { return err @@ -3216,6 +3227,7 @@ func (c *OpenChannel) AckAddHtlcs(addRefs ...AddRef) error { func (c *OpenChannel) AckSettleFails(settleFailRefs ...SettleFailRef) error { c.Lock() defer c.Unlock() + fmt.Println("[OpenChannel.AckSettleFails]") return kvdb.Update(c.Db.backend, func(tx kvdb.RwTx) error { return c.Packager.AckSettleFails(tx, settleFailRefs...) diff --git a/channeldb/forwarding_package.go b/channeldb/forwarding_package.go index 09f5671cc9..3aee2d6e93 100644 --- a/channeldb/forwarding_package.go +++ b/channeldb/forwarding_package.go @@ -752,6 +752,15 @@ func (p *ChannelPackager) AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error { if len(addRefs) == 0 { return nil } + fmt.Printf("[ChannelPackager.AckAddHtlcs(%s)]: marking ADDs as acknowledged in forwarding package\n", + p.source, + ) + + if len(addRefs) == 1 { + fmt.Printf("[ChannelPackager.AckAddHtlcs(%s)]: marking ADDs at height=%d, index=%d as acknowledged in forwarding package\n", + p.source, addRefs[0].Height, addRefs[0].Index, + ) + } fwdPkgBkt := tx.ReadWriteBucket(fwdPackagesKey) if fwdPkgBkt == nil { @@ -777,6 +786,11 @@ func (p *ChannelPackager) AckAddHtlcs(tx kvdb.RwTx, addRefs ...AddRef) error { // Load each height bucket once and remove all acked htlcs at that // height. for height, indexes := range heightDiffs { + fmt.Printf("[AckAddHtlcs(%s)]: height diff: %d, %+v!\n", + p.source, + height, + indexes, + ) err := ackAddHtlcsAtHeight(sourceBkt, height, indexes) if err != nil { return err @@ -832,6 +846,10 @@ func ackAddHtlcsAtHeight(sourceBkt kvdb.RwBucket, height uint64, // the settle/fail, or it becomes otherwise safe to forgo retransmitting the // settle/fail after a restart. func (p *ChannelPackager) AckSettleFails(tx kvdb.RwTx, settleFailRefs ...SettleFailRef) error { + fmt.Printf("[Packager.AckSettleFails(%s)]: settle/fail refs: %+v!\n", + p.source, + settleFailRefs, + ) return ackSettleFails(tx, settleFailRefs) } diff --git a/htlcswitch/circuit.go b/htlcswitch/circuit.go index efb2a47790..d546c6a3e6 100644 --- a/htlcswitch/circuit.go +++ b/htlcswitch/circuit.go @@ -2,6 +2,7 @@ package htlcswitch import ( "encoding/binary" + "fmt" "io" "github.com/lightningnetwork/lnd/channeldb" @@ -76,6 +77,12 @@ func newPaymentCircuit(hash *[32]byte, pkt *htlcPacket) *PaymentCircuit { if pkt.sourceRef != nil { addRef = *pkt.sourceRef } + fmt.Printf("[newPaymentCircuit(%s)]: forwarding package (processed update) add reference: %+v!\n", + pkt.incomingChanID, pkt.sourceRef, + ) + fmt.Printf("[newPaymentCircuit(%s)]: htlcPacket: %+v!\n", + pkt.incomingChanID, pkt, + ) return &PaymentCircuit{ AddRef: addRef, @@ -97,6 +104,12 @@ func makePaymentCircuit(hash *[32]byte, pkt *htlcPacket) PaymentCircuit { if pkt.sourceRef != nil { addRef = *pkt.sourceRef } + fmt.Printf("[makePaymentCircuit(%s)]: forwarding package (processed update) add reference: %+v!\n", + pkt.incomingChanID, pkt.sourceRef, + ) + fmt.Printf("[makePaymentCircuit(%s)]: htlcPacket: %+v!\n", + pkt.incomingChanID, pkt, + ) return PaymentCircuit{ AddRef: addRef, diff --git a/htlcswitch/circuit_map.go b/htlcswitch/circuit_map.go index 9b26a1d07e..4efbc46f71 100644 --- a/htlcswitch/circuit_map.go +++ b/htlcswitch/circuit_map.go @@ -795,6 +795,8 @@ func (cm *circuitMap) LookupByPaymentHash(hash [32]byte) []*PaymentCircuit { func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) ( *CircuitFwdActions, error) { + fmt.Println("[switch.CommitCiruits]: committing circuits!") + inKeys := make([]CircuitKey, 0, len(circuits)) for _, circuit := range circuits { inKeys = append(inKeys, circuit.Incoming) @@ -803,6 +805,9 @@ func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) ( log.Tracef("Committing fresh circuits: %v", newLogClosure(func() string { return spew.Sdump(inKeys) })) + fmt.Printf("Committing fresh circuits: %v", newLogClosure(func() string { + return spew.Sdump(inKeys) + })) actions := &CircuitFwdActions{} @@ -825,12 +830,14 @@ func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) ( for _, circuit := range circuits { inKey := circuit.InKey() if foundCircuit, ok := cm.pending[inKey]; ok { + fmt.Printf("[switch.CommitCiruits]: found circuit=%s in our map!\n", inKey) switch { // This circuit has a keystone, it's waiting for a // response from the remote peer on the outgoing link. // Drop it like it's hot, ensure duplicates get caught. case foundCircuit.HasKeystone(): + fmt.Printf("[switch.CommitCiruits]: circuit=%s has a keystone! which means...\n", inKey) drops = append(drops, circuit) // If no keystone is set and the switch has not been @@ -842,6 +849,7 @@ func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) ( // link from failing a duplicate add while it is still // in the server's memory mailboxes. case !foundCircuit.LoadedFromDisk: + fmt.Printf("[switch.CommitCiruits]: circuit=%s has has not been loaded from disk!\n", inKey) drops = append(drops, circuit) // Otherwise, the in-mem packet has been lost due to a @@ -849,6 +857,8 @@ func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) ( // the incoming link. The incoming link should be able // detect and ignore duplicate packets of this type. default: + fmt.Printf("[switch.CommitCiruits]: ADD for circuit=%s is being failed!\n", inKey) + fmt.Printf("[switch.CommitCiruits]: loaded circuit=%s from disk: %t\n", inKey, foundCircuit.LoadedFromDisk) fails = append(fails, circuit) addFails = append(addFails, circuit) } @@ -864,6 +874,7 @@ func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) ( // If all circuits are dropped or failed, we are done. if len(adds) == 0 { + fmt.Println("[switch.CommitCiruits]: no ADDs to commit. all must have been duplicates!") actions.Drops = drops actions.Fails = fails return actions, nil @@ -906,6 +917,7 @@ func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) ( actions.Adds = adds actions.Drops = drops actions.Fails = fails + fmt.Println("[switch.CommitCiruits]: committing circuits succeeded!") return actions, nil } @@ -922,6 +934,7 @@ func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) ( actions.Drops = drops actions.Fails = addFails + fmt.Println("[switch.CommitCiruits]: failed to commit circuits!") return actions, err } @@ -948,9 +961,14 @@ func (cm *circuitMap) OpenCircuits(keystones ...Keystone) error { return nil } + fmt.Println("[switch.OpenCiruits]: opening circuits!") + log.Tracef("Opening finalized circuits: %v", newLogClosure(func() string { return spew.Sdump(keystones) })) + fmt.Printf("[switch.OpenCiruits]: Opening finalized circuits: %v", newLogClosure(func() string { + return spew.Sdump(keystones) + })) // Check that all keystones correspond to committed-but-unopened // circuits. diff --git a/htlcswitch/hodl/flags.go b/htlcswitch/hodl/flags.go index 7fed7d0930..2fc6ee6a88 100644 --- a/htlcswitch/hodl/flags.go +++ b/htlcswitch/hodl/flags.go @@ -34,8 +34,6 @@ const ( // not the exit node. FailIncoming - // TODO(conner): add modes for switch breakpoints - // AddOutgoing drops an outgoing ADD before it is added to the // in-memory commitment state of the link. AddOutgoing @@ -55,6 +53,11 @@ const ( // BogusSettle attempts to settle back any incoming HTLC for which we // are the exit node with a bogus preimage. BogusSettle + + // TODO(conner): add modes for switch breakpoints + + // AddForward ... + AddForward ) // String returns a human-readable identifier for a given Flag. @@ -78,6 +81,8 @@ func (f Flag) String() string { return "Commit" case BogusSettle: return "BogusSettle" + case AddForward: + return "AddForward" default: return "UnknownHodlFlag" } @@ -106,6 +111,8 @@ func (f Flag) Warning() string { msg = "will not commit pending channel updates" case BogusSettle: msg = "will settle HTLC with bogus preimage" + case AddForward: + msg = "will not update switch circuit map for forwarded ADD" default: msg = "incorrect hodl flag usage" } diff --git a/htlcswitch/hop/iterator.go b/htlcswitch/hop/iterator.go index 43af2d092e..e361a634cf 100644 --- a/htlcswitch/hop/iterator.go +++ b/htlcswitch/hop/iterator.go @@ -188,6 +188,7 @@ func MakeBlindingKit(processor BlindingProcessor, ErrDecodeFailed, err) } + fmt.Println("MakeBlindingKit") if err := ValidateBlindedRouteData( routeData, incomingAmount, incomingCltv, ); err != nil { @@ -420,6 +421,8 @@ func (p *OnionProcessor) DecodeHopIterators(id []byte, resps = make([]DecodeHopIteratorResponse, batchSize) ) + fmt.Println("[DecodeHopIterators()]: Decrypting onion packets") + tx := p.router.BeginTxn(id, batchSize) decode := func(seqNum uint16, onionPkt *sphinx.OnionPacket, @@ -474,6 +477,13 @@ func (p *OnionProcessor) DecodeHopIterators(id []byte, // Execute cpu-heavy onion decoding in parallel. var wg sync.WaitGroup for i := range reqs { + fmt.Printf("[DecodeHopIterators()]: Decrypting onion packet for HTLC ADD, "+ + "amt=%s, cltv=%d, r_hash=%v, blinding_point=%x\n", + reqs[i].IncomingAmount.String(), + reqs[i].IncomingCltv, + reqs[i].RHash, + reqs[i].BlindingPoint.SerializeCompressed()[:10], + ) wg.Add(1) go func(seqNum uint16) { defer wg.Done() @@ -544,6 +554,9 @@ func (p *OnionProcessor) DecodeHopIterators(id []byte, // failure code for replays, we reuse one of the // failure codes that has BADONION. resp.FailCode = lnwire.CodeInvalidOnionVersion + fmt.Printf("unable to process onion packet: %v\n", + sphinx.ErrReplayedPacket) + continue } diff --git a/htlcswitch/hop/payload.go b/htlcswitch/hop/payload.go index 45d67aed53..01c62a070a 100644 --- a/htlcswitch/hop/payload.go +++ b/htlcswitch/hop/payload.go @@ -133,6 +133,24 @@ func NewLegacyPayload(f *sphinx.HopData) *Payload { } } +// NOTE(10/26/22): This function is currently only used to get around +// the fact that customRecords is unexported and is required to be set. +// I don't think a function like this would have much utility otherwise. +// Given that we use TLV en/decoding I am a bit unsure how this function +// will behave/get access to the information it would need. +// The data included in a TLV payload is highly variable. That is why it makes +// sense to define a struct and then TLV encode each of the structs types +// that the user sets. +func NewTLVPayload() *Payload { + + // Set the unexported customRecords field so that we can carry + // on with our Link testing. + return &Payload{ + FwdInfo: ForwardingInfo{}, + customRecords: make(record.CustomSet), + } +} + // NewPayloadFromReader builds a new Hop from the passed io.Reader. The reader // should correspond to the bytes encapsulated in a TLV onion payload. The // final hop bool signals that this payload was the final packet parsed by @@ -452,6 +470,12 @@ func (h *Payload) EncryptedData() []byte { return h.encryptedData } +// EncryptedData returns the route blinding encrypted data parsed from the +// onion payload. +func (h *Payload) SetEncryptedData(data []byte) { + h.encryptedData = data +} + // BlindingPoint returns the route blinding point parsed from the onion payload. func (h *Payload) BlindingPoint() *btcec.PublicKey { return h.blindingPoint @@ -512,6 +536,9 @@ func getMinRequiredViolation(set tlv.TypeMap) *tlv.Type { func ValidateBlindedRouteData(blindedData *record.BlindedRouteData, incomingAmount lnwire.MilliSatoshi, incomingTimelock uint32) error { + fmt.Printf("payload.go: ValidateBlindedRouteData() - validating blinded_data=%+v!\n", + blindedData) + // Bolt 04 notes that we should enforce payment constraints _if_ they // are present, so we do not fail if not provided. if blindedData.Constraints != nil { @@ -566,6 +593,8 @@ func ValidateBlindedRouteData(blindedData *record.BlindedRouteData, } } + fmt.Println("payload.go: ValidateBlindedRouteData() - still validating!") + // No need to check anything else if features are not provided (bolt 4 // indicates that omitted features should be treated like an empty // vector). diff --git a/htlcswitch/link.go b/htlcswitch/link.go index bb7d098b60..a997b39719 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -924,12 +924,24 @@ func (l *channelLink) syncChanStates() error { // if necessary. After a restart, this will also delete any previously // completed packages. func (l *channelLink) resolveFwdPkgs() error { + fmt.Printf("[link(%s).resolveFwdPackages - local_key=%x, remote_key=%x]: loading packages from disk and reprocessing!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) + fwdPkgs, err := l.channel.LoadFwdPkgs() if err != nil { return err } - l.log.Debugf("loaded %d fwd pks", len(fwdPkgs)) + l.log.Debugf("loaded %d fwd pkgs", len(fwdPkgs)) + fmt.Printf("[link(%s).resolveFwdPackages - local_key=%x, remote_key=%x]: loaded %d fwd pkgs.\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + len(fwdPkgs), + ) for _, fwdPkg := range fwdPkgs { if err := l.resolveFwdPkg(fwdPkg); err != nil { @@ -954,6 +966,12 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error { if fwdPkg.State == channeldb.FwdStateCompleted { l.log.Debugf("removing completed fwd pkg for height=%d", fwdPkg.Height) + fmt.Printf("[link(%s).resolveFwdPackage - local_key=%x, remote_key=%x]: removing completed fwd pkg for height=%d.\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + fwdPkg.Height, + ) err := l.channel.RemoveFwdPkgs(fwdPkg.Height) if err != nil { @@ -980,6 +998,8 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error { err) return err } + fmt.Printf("[link(%s).resolveFwdPackage - local_key=%x]: rebuilt Settle/Fail descriptors from FwdPkg "+ + "LogUpdate. Reprocessing!\n", l.ShortChanID(), l.channel.LocalFundingKey.SerializeCompressed()[:10]) l.processRemoteSettleFails(fwdPkg, settleFails) } @@ -996,6 +1016,15 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) error { err) return err } + fmt.Printf("[link(%s).resolveFwdPackage - local_key=%x]: rebuilt ADD descriptors from FwdPkg LogUpdate. "+ + "Reprocessing!\n", l.ShortChanID(), l.channel.LocalFundingKey.SerializeCompressed()[:10]) + + // NOTE(11/25/22): We are going to reprocess ADDs after a restart. + // Check that the correct blinding point is available for this + // reprocessing! + // UPDATE(11/27/22): If we persist the blinding point as part of + // the LogUpdate inside our forwarding pacakges, then it will be! + // This is done inside ReceiveRevocation(). l.processRemoteAdds(fwdPkg, adds) // If the link failed during processing the adds, we must @@ -1398,6 +1427,11 @@ func (l *channelLink) htlcManager() { // A message from the switch was just received. This indicates // that the link is an intermediate hop in a multi-hop HTLC // circuit. + // + // NOTE(11/25/22): This does not necessarily mean that the HTLC + // update packet came from another link. The packet could have + // came from the Switch in the scenario where it fails back + // our attempt at a duplicate forward?? case pkt := <-l.downstream: l.handleDownstreamPkt(pkt) @@ -1491,6 +1525,13 @@ func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution, circuitKey := resolution.CircuitKey() + fmt.Printf("[processHtlcResolution(%s) - local_key=%x, remote_key=%x]: circuit_key=%+v!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + circuitKey, + ) + // Determine required action for the resolution based on the type of // resolution we have received. switch res := resolution.(type) { @@ -1500,6 +1541,9 @@ func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution, l.log.Debugf("received settle resolution for %v "+ "with outcome: %v", circuitKey, res.Outcome) + fmt.Printf("received settle resolution for %v "+ + "with outcome: %v\n", circuitKey, res.Outcome) + return l.settleHTLC(res.Preimage, htlc.pd) // For htlc failures, we get the relevant failure message based @@ -1508,6 +1552,9 @@ func (l *channelLink) processHtlcResolution(resolution invoices.HtlcResolution, l.log.Debugf("received cancel resolution for "+ "%v with outcome: %v", circuitKey, res.Outcome) + fmt.Printf("received cancel resolution for "+ + "%v with outcome: %v\n", circuitKey, res.Outcome) + // Get the lnwire failure message based on the resolution // result. failure := getResolutionFailure(res, htlc.pd.Amount) @@ -1565,6 +1612,10 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { if !ok { return errors.New("not an UpdateAddHTLC packet") } + fmt.Println("[link.handleDownstreamUpdateAdd]: processing pkt in outgoing link!") + + fmt.Printf("[link.handleDownstreamAdd(%s)]: received ADD from switch\n", + l.ShortChanID()) // If we are flushing the link in the outgoing direction we can't add // new htlcs to the link and we need to bounce it @@ -1580,8 +1631,15 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { // If hodl.AddOutgoing mode is active, we exit early to simulate // arbitrary delays between the switch adding an ADD to the // mailbox, and the HTLC being added to the commitment state. + fmt.Printf("[link.handleDownstreamAdd(%s) - local_key=%x, remote_key=%x]: hodl.Mask=%v\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + l.cfg.HodlMask) + if l.cfg.HodlMask.Active(hodl.AddOutgoing) { l.log.Warnf(hodl.AddOutgoing.Warning()) + fmt.Println(fmt.Sprintf("%s\n", hodl.AddOutgoing.Warning())) l.mailBox.AckPacket(pkt.inKey()) return nil } @@ -1668,6 +1726,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { _ = l.handleDownstreamUpdateAdd(pkt) case *lnwire.UpdateFulfillHTLC: + fmt.Println("[link.handleDowntreamUpdateSettle]: processing pkt in (persp: add) incoming link!") // If hodl.SettleOutgoing mode is active, we exit early to // simulate arbitrary delays between the switch adding the // SETTLE to the mailbox, and the HTLC being added to the @@ -1678,6 +1737,12 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { return } + fmt.Printf("[link.handleDowntreamUpdateSettle(%s) - local_key=%x, remote_key=%x]: htlcPacket: %+v, source ref: %+v, dest ref: %+v!\n", + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + pkt.incomingChanID, pkt, pkt.sourceRef, pkt.destRef, + ) + // An HTLC we forward to the switch has just settled somewhere // upstream. Therefore we settle the HTLC within the our local // state machine. @@ -1735,6 +1800,7 @@ func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { l.updateCommitTxOrFail() case *lnwire.UpdateFailHTLC: + fmt.Printf("[link.handleDownstreamUpdateFail]: processing pkt in link(%s)!\n", l.ShortChanID()) // If hodl.FailOutgoing mode is active, we exit early to // simulate arbitrary delays between the switch adding a FAIL to // the mailbox, and the HTLC being added to the commitment @@ -1874,6 +1940,7 @@ func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) { // When retransmitting responses, the destination references will be // cleaned up if an open circuit is not found in the circuit map. if pkt.destRef != nil { + fmt.Println("[cleanupSpuriousResponse]: acking settle/fail ref:", pkt.destRef) err := l.channel.AckSettleFails(*pkt.destRef) if err != nil { l.log.Errorf("unable to ack SettleFailRef "+ @@ -1933,6 +2000,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { return } + fmt.Println("[link.handleUpstreamUpdateAdd]: processing pkt in incoming link!") // We just received an add request from an upstream peer, so we // add it to our state machine, then add the HTLC to our // "settle" list in the event that we know the preimage. @@ -1947,6 +2015,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { "assigning index: %v", msg.PaymentHash[:], index) case *lnwire.UpdateFulfillHTLC: + fmt.Println("[link.handleUpstreamUpdateSettle]: processing pkt in (persp: add) outgoing link!") pre := msg.PaymentPreimage idx := msg.ID @@ -2000,6 +2069,12 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { go l.forwardBatch(false, settlePacket) case *lnwire.UpdateFailMalformedHTLC: + fmt.Println("[link.handleUpstreamUpdateFail]: processing pkt in (persp: add) outgoing link!") + fmt.Printf("[link.handleUpstreamMsg - local_key=%x]: Received UpdateFailMalformedHTLC!\n", l.channel.LocalFundingKey.SerializeCompressed()[:10]) + // NOTE(11/16/22): Here is an example of converting errors + // received from peers into a more opaque (non-privacy-leaking) + // error. See if this helps us figure out how to do this for + // errors encountered on a blinded route!! // Convert the failure type encoded within the HTLC fail // message to the proper generic lnwire error code. var failure lnwire.FailureMessage @@ -2130,6 +2205,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } // Add fail to the update log. + fmt.Println("[link.handleUpstreamUpdateFail]: processing pkt in (persp: add) outgoing link!") idx := msg.ID err = l.channel.ReceiveFailHTLC(idx, failureReason) if err != nil { @@ -2139,6 +2215,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } case *lnwire.CommitSig: + fmt.Printf("[link.handleUpstreamMsg - local_key=%x]: Received CommitmentSigned!\n", l.channel.LocalFundingKey.SerializeCompressed()[:10]) // Since we may have learned new preimages for the first time, // we'll add them to our preimage cache. By doing this, we // ensure any contested contracts watched by any on-chain @@ -2469,6 +2546,18 @@ func (l *channelLink) ackDownStreamPackets() error { } l.log.Debugf("removing Add packet %s from mailbox", inKey) + fmt.Printf("[link.ackDownstreamPackets(%s) - local_key=%x, remote_key=%x]: removing ADD packet "+ + "%s from our mailbox queue!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + inKey, + ) + + // NOTE(1/21/23): We have sent this HTLC ADD update and a signature + // covering it to our downstream peer. We do not wish to reprocess + // this packet representing this update again so we remove it from + // our packet mailbox queue. l.mailBox.AckPacket(inKey) } @@ -2495,6 +2584,14 @@ func (l *channelLink) ackDownStreamPackets() error { for _, inKey := range l.closedCircuits { l.log.Debugf("removing Fail/Settle packet %s from mailbox", inKey) + fmt.Printf("[link.ackDownstreamPackets(%s) - local_key=%x, remote_key=%x]: removing Fail/Settle packet "+ + "%s from our mailbox queue!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + inKey, + ) + l.mailBox.AckPacket(inKey) } @@ -2555,6 +2652,7 @@ func (l *channelLink) updateCommitTx() error { // circuits that have been opened, but unsuccessfully committed. if l.cfg.HodlMask.Active(hodl.Commit) { l.log.Warnf(hodl.Commit.Warning()) + fmt.Printf(fmt.Sprintf("%s\n", hodl.Commit.Warning())) return nil } @@ -2578,6 +2676,20 @@ func (l *channelLink) updateCommitTx() error { return err } + // NOTE(1/21/23): After we sign to extend the remote party's commitment + // chain, we do some internal acknowledgement of packets in our mailbox. + // QUESTION: Why is now a safe/logical place to do this? Because signing + // a new commitment we include ALL of our local updates. Once these updates + // are included in a commitment, then channel-re-establish will take care that + // they are re-transmitted to remote after a restart. + // Settle/Fails mark an Add fully responded to. We marked them as + // acknowledged in our forwarding package with the call to SignNextCommitment() above. + fmt.Printf("[link.updateCommitTx(%s) - local_key=%x, remote_key=%x]: about to internally acknowledge packets "+ + "(from switch) in our mailbox queue!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) if err := l.ackDownStreamPackets(); err != nil { return err } @@ -3110,6 +3222,8 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg, } l.log.Debugf("settle-fail-filter %v", fwdPkg.SettleFailFilter) + fmt.Printf("[processRemoteSettleFails(%s)]: processing settle/fails!\n", l.ShortChanID()) + fmt.Printf("[processRemoteSettleFails(%s)]: settle-fail-filter %+v\n", l.ShortChanID(), fwdPkg.SettleFailFilter) var switchPackets []*htlcPacket for i, pd := range settleFails { @@ -3146,6 +3260,16 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg, }, } + // fmt.Printf("[processRemoteSettleFails(%s)]: payment descriptor dest reference: %+v, switch link count=%d!\n", + // l.ShortChanID(), pd.DestRef, len(l.cfg.Switch.linkIndex), + // ) + fmt.Printf("[processRemoteSettleFails(%s) - local_key=%x, remote_key=%x]: payment descriptor dest reference: %+v!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + pd.DestRef, + ) + // Add the packet to the batch to be forwarded, and // notify the overflow queue that a spare spot has been // freed up within the commitment state. @@ -3219,6 +3343,18 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, l.log.Tracef("processing %d remote adds for height %d", len(lockedInHtlcs), fwdPkg.Height) + fmt.Printf("[processRemoteAdds(%s) - local_key=%x, remote_key=%x]: processing adds!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) + fmt.Printf("[processRemoteAdds(%s) - local_key=%x, remote_key=%x]: forward-filter: %+v, ack-filter: %+v.\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + fwdPkg.FwdFilter, fwdPkg.AckFilter, + ) + decodeReqs := make( []hop.DecodeHopIteratorRequest, 0, len(lockedInHtlcs), ) @@ -3271,6 +3407,12 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, // been committed by one of our commitment txns. ADDs // in this state are waiting for the rest of the fwding // package to get acked before being garbage collected. + fmt.Printf("[processRemoteAdds(%s) - local_key=%x, remote_key=%x]: ADD comes from a previously seen forwarding package, "+ + "and we have already internally acknowledged that we have received its (settle/fail) response!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) continue } @@ -3279,6 +3421,7 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, // HTLC itself to decide if: we should forward it, cancel it, // or are able to settle it (and it adheres to our fee related // constraints). + fmt.Printf("[processRemoteAdds(%s)]: We can't say much about this ADD yet.\n", l.ShortChanID()) // Fetch the onion blob that was included within this processed // payment descriptor. @@ -3303,6 +3446,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, // Retrieve onion obfuscator from onion blob in order to // produce initial obfuscation of the onion failureCode. + // + // obfuscator, failureCode := chanIterator.ExtractErrorEncrypter( l.cfg.ExtractErrorEncrypter, ) @@ -3350,6 +3495,10 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, fwdInfo := pld.ForwardingInfo() + // NOTE(calvin): Because our hop.Iterator isn't fit for purpose + // on Carla's branch, we do not have a `next_hop` here so Bob + // incorrectly process as an exit hop rather than forwarding + // onward to Carol as expected! switch fwdInfo.NextHop { case hop.Exit: err := l.processExitHop( @@ -3383,9 +3532,22 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, // expiring timelocks, but we expect that an // error will be reproduced. if !fwdPkg.FwdFilter.Contains(idx) { + fmt.Printf("[processRemoteAdds(%s) - local_key=%x, remote_key=%x]: ADD comes from a previously seen forwarding package, "+ + "and the ADD has NOT been forwarded to the Switch.\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) break } + fmt.Printf("[processRemoteAdds(%s) - local_key=%x, remote_key=%x]: ADD comes from a previously seen forwarding package, "+ + "and the ADD has ALREADY been forwarded to the Switch once.\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) + // Otherwise, it was already processed, we can // can collect it and continue. addMsg := &lnwire.UpdateAddHTLC{ @@ -3420,6 +3582,18 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, outgoingTimeout: fwdInfo.OutgoingCTLV, customRecords: pld.CustomRecords(), } + + fmt.Printf("[processRemoteAdds(%s) - local_key=%x, remote_key=%x]: forwarding package (processed update) add reference: %+v!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + pd.SourceRef, + ) + + // fmt.Printf("[processRemoteAdds(%s)]: htlcPacket we forward to switch receives this add ref: %+v, switch link count=%d!\n", + // l.ShortChanID(), updatePacket.sourceRef, len(l.cfg.Switch.linkIndex), + // ) + switchPackets = append( switchPackets, updatePacket, ) @@ -3430,6 +3604,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, // TODO(roasbeef): ensure don't accept outrageous // timeout for htlc + fmt.Println("[processRemoteAdds]: ADD has NOT been forwarded to the Switch!") + // With all our forwarding constraints met, we'll // create the outgoing HTLC using the parameters as // specified in the forwarding info. @@ -3488,6 +3664,29 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, customRecords: pld.CustomRecords(), } + // NOTE(11/19/22): This is our link signing + // off (in memory) that it has processed this Add. + // The ink on that signature will dry when record + // that we have processed this Add is when the + // entire state of the forwarding package (including + // the filter we set here) is persisted below. + // If the Add is reconsidered later, it will not + // flow through the same code path. + // The index is the index in the ForwardingPackage + // not the channel state machine HTLC update log. + fmt.Printf("[processRemoteAdds(%s) - local_key=%x, remote_key=%x]: forwarding package (new update) add reference: %+v!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + pd.SourceRef, + ) + + fmt.Printf("[processRemoteAdds(%s) - local_key=%x, remote_key=%x]: ADD comes from a NEW forwarding package, "+ + "and has NOT been forwarded to the Switch. Marking in memory that we will try forwarding\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) fwdPkg.FwdFilter.Set(idx) switchPackets = append(switchPackets, updatePacket) @@ -3504,9 +3703,19 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, "unable to set fwd filter: %v", err) return } + fmt.Printf("[processRemoteAdds(%s)]: Wrote our intent to forward this ADD to disk!\n", l.ShortChanID()) } + fmt.Printf("[processRemoteAdds(%s) - local_key=%x]: after processing, forward-filter: %+v, ack-filter: %+v.\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + fwdPkg.FwdFilter, fwdPkg.AckFilter) + if len(switchPackets) == 0 { + fmt.Printf("[processRemoteAdds(%s) - local_key=%x]: nothing to forward, replay: %t.\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + fwdPkg.State != channeldb.FwdStateLockedIn) return } @@ -3515,6 +3724,11 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, l.log.Debugf("forwarding %d packets to switch: replay=%v", len(switchPackets), replay) + fmt.Printf("[processRemoteAdds(%s) - local_key=%x]: forwarding %d packets to switch: replay=%v.\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + len(switchPackets), replay) + // NOTE: This call is made synchronous so that we ensure all circuits // are committed in the exact order that they are processed in the link. // Failing to do this could cause reorderings/gaps in the range of @@ -3529,6 +3743,12 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor, obfuscator hop.ErrorEncrypter, fwdInfo hop.ForwardingInfo, heightNow uint32, payload invoices.Payload) error { + fmt.Printf("[processExitHop(%s) - local_key=%x, remote_key=%x]: processing exit hop!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) + // If hodl.ExitSettle is requested, we will not validate the final hop's // ADD, nor will we settle the corresponding invoice or respond with the // preimage. @@ -3546,6 +3766,10 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor, "incompatible value: expected <=%v, got %v", pd.RHash, pd.Amount, fwdInfo.AmountToForward) + fmt.Printf("onion payload of incoming htlc(%x) has incorrect "+ + "value: expected %v, got %v\n", pd.RHash, + pd.Amount, fwdInfo.AmountToForward) + failure := NewLinkError( lnwire.NewFinalIncorrectHtlcAmount(pd.Amount), ) @@ -3554,6 +3778,12 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor, return nil } + fmt.Printf("[processExitHop(%s) - local_key=%x, remote_key=%x]: still processing exit hop!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) + // We'll also ensure that our time-lock value has been computed // correctly. if pd.Timeout < fwdInfo.OutgoingCTLV { @@ -3569,6 +3799,12 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor, return nil } + fmt.Printf("[processExitHop(%s) - local_key=%x, remote_key=%x]: still processing exit hop (cltv)!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) + // Notify the invoiceRegistry of the exit hop htlc. If we crash right // after this, this code will be re-executed after restart. We will // receive back a resolution event. @@ -3587,6 +3823,12 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor, return err } + fmt.Printf("[processExitHop(%s) - local_key=%x, remote_key=%x]: still processing exit hop (notify exit hop)!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) + // Create a hodlHtlc struct and decide either resolved now or later. htlc := hodlHtlc{ pd: pd, @@ -3597,6 +3839,12 @@ func (l *channelLink) processExitHop(pd *lnwallet.PaymentDescriptor, // descriptor for future reference. if event == nil { l.hodlMap[circuitKey] = htlc + + fmt.Printf("[processExitHop(%s) - local_key=%x, remote_key=%x]: invoice is being held!\n", + l.ShortChanID(), + l.channel.LocalFundingKey.SerializeCompressed()[:10], + l.channel.RemoteFundingKey.SerializeCompressed()[:10], + ) return nil } diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index 3ce526ed14..8cd19593d2 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -194,8 +194,12 @@ func TestChannelLinkRevThenSig(t *testing.T) { require.NoError(t, err) defer harness.aliceLink.Stop() - alice := newPersistentLinkHarness(t, harness.aliceSwitch, - harness.aliceLink, harness.aliceBatchTicker, + // NOTE(11/24/22): I think we could use this to demonstrate + // that blind hops are correctly reprocessed (reforwarded) after + // a restart. Such a test will serve as reassurance that we put + // the blinding point in the appropriate spots. + alice := newPersistentLinkHarness( + t, harness.aliceSwitch, harness.aliceLink, nil, harness.aliceRestore, ) @@ -261,6 +265,8 @@ func TestChannelLinkRevThenSig(t *testing.T) { } // Restart Alice so she sends and accepts ChannelReestablish. + // TODO(10/22/22): Show that we correctly re process HTLCs in + // a blinded route after restart too! alice.restart(false, true) ctx.aliceLink = alice.link @@ -526,12 +532,12 @@ func TestChannelLinkMultiHopPayment(t *testing.T) { testChannelLinkMultiHopPayment(t, 3) }, ) - t.Run( - "bobOutgoingCltvRejectDelta 0", - func(t *testing.T) { - testChannelLinkMultiHopPayment(t, 0) - }, - ) + // t.Run( + // "bobOutgoingCltvRejectDelta 0", + // func(t *testing.T) { + // testChannelLinkMultiHopPayment(t, 0) + // }, + // ) } func testChannelLinkMultiHopPayment(t *testing.T, @@ -641,6 +647,208 @@ func testChannelLinkMultiHopPayment(t *testing.T, t.Fatalf("channel bandwidth incorrect: expected %v, got %v", expectedCarolBandwidth, n.carolChannelLink.Bandwidth()) } + + time.Sleep(20 * time.Second) +} + +/* + + Blinded Route + + +-----------+ +-----------+ +-----------+ +-----------+ + | Alice | | Bob | | Carol | | Dave | + | +-----------+-> +-----------+-> +-----------+-> | + | Sender | | Intro | | (Blind) | | (Blind) | + +-----------+ +-----------+ +-----------+ +-----------+ + + NOTE(11/20/22): Tests might be best performed against a "4 hop network" + as this would facilitate easy exercise for each of introduction node, + blind intermediate node, and blind recipient node. Absent adding a + newFourHopNetwork() or newNHopNetwork() we do our best here. + +*/ + +var ( + amount = lnwire.NewMSatFromSatoshis(btcutil.SatoshiPerBitcoin) + _, ephemeralBlindingPoint = btcec.PrivKeyFromBytes([]byte("test private key")) +) + +func TestChannelLinkBlindHopReprocessing2024(t *testing.T) { + t.Parallel() + + channels, restoreChannelsFromDb, err := createClusterChannels( + t, btcutil.SatoshiPerBitcoin*3, btcutil.SatoshiPerBitcoin*5, + ) + require.NoError(t, err, "unable to create channel") + + // Do not configure the links to support blind hop processing. + n := newThreeHopNetwork(t, channels.aliceToBob, channels.bobToAlice, + channels.bobToCarol, channels.carolToBob, testStartingHeight) + + if err := n.start(); err != nil { + t.Fatalf("unable to start three hop network: %v", err) + } + t.Cleanup(n.stop) + + debug := false + if debug { + // Log messages that alice receives from bob. + n.aliceServer.intersect(createLogFunc("[alice]<-bob<-carol: ", + n.aliceChannelLink.ChanID())) + + // Log messages that bob receives from alice. + n.bobServer.intersect(createLogFunc("alice->[bob]->carol: ", + n.firstBobChannelLink.ChanID())) + + // Log messages that bob receives from carol. + n.bobServer.intersect(createLogFunc("alice<-[bob]<-carol: ", + n.secondBobChannelLink.ChanID())) + + // Log messages that carol receives from bob. + n.carolServer.intersect(createLogFunc("alice->bob->[carol]", + n.carolChannelLink.ChanID())) + } + + // Set Bob in hodl AddOutgoing mode so that he won't immediately + // forward our blind ADD. We can then, restart Bob's node, removing + // this flag, and verify that he correctly reprocessed the blind ADD + // from disk and forwards it onwards to Carol. + // n.firstBobChannelLink.cfg.HodlMask = hodl.AddOutgoing.Mask() // Does NOT block forward! + // n.secondBobChannelLink.cfg.HodlMask = hodl.AddOutgoing.Mask() // This DOES block forward! AFTER the Switch has processed and committed circuits and forwarded to outgoing link + n.bobServer.htlcSwitch.cfg.HodlMask = hodl.AddForward.Mask() + + // Generate a payment preimage. + preimage, rhash, _ := generatePaymentSecret() + payAddr, _ := generatePaymentAddress() + + // Generate blinded hops, taking care to use the payment + // preimage as the path_id for the final hop. + // NOTE(11/25/22): In order to observe what we want to in this test, + // we may need to generate our own htlc & onion with blind hops. + // Alice will forward an HTLC with blinding point in the TLV extension + // of the UpdateAddHTLC message _as if_ she was the introduction node. + // This will allow us to test Bob's behavior as an intermediate/forwarding + // node inside a blinded route. + amount := lnwire.NewMSatFromSatoshis(btcutil.SatoshiPerBitcoin) + htlcAmt, totalTimelock, hops := generateBlindHops( + amount, testStartingHeight, preimage[:], + n.firstBobChannelLink, n.carolChannelLink, + ) + + blob, err := generateRoute(hops...) + require.NoError(t, err, "unable to serialize onion blob") + + t.Logf("Onion BLOB: %+v", blob) + + // Make a payment attempt. + invoice, htlc, pid, err := generatePaymentWithPreimage( + amount, htlcAmt, totalTimelock, blob, &preimage, rhash, payAddr, + ) + if err != nil { + t.Fatal(err) + } + htlc.BlindingPoint = lnwire.NewBlindingPoint(ephemeralBlindingPoint) + + // Add the invoice to Carol's invoice registry so that she's + // expecting payment. + ctx := context.Background() + err = n.carolServer.registry.AddInvoice(ctx, *invoice, htlc.PaymentHash) + require.NoError(t, err, "unable to add invoice in carol registry") + + // Check that Carol invoice was settled and bandwidth of HTLC + // links were changed. + inv, err := n.carolServer.registry.LookupInvoice(ctx, htlc.PaymentHash) + require.NoError(t, err, "unable to get invoice") + if inv.State != invpkg.ContractOpen { + t.Fatal("carol's invoice isn't active/open") + } + + // Send the HTLC from Alice to Carol, via Bob. + err = n.aliceServer.htlcSwitch.SendHTLC( + n.firstBobChannelLink.ShortChanID(), pid, htlc, + ) + require.NoError(t, err, "unable to send payment to carol") + + // See what happens if we restore channels from disk. + // Prior to restarting we clear Bob's hodl mask so we + // forward the ADD after restarting. + // NOTE(11/25/22): I think this restarts all channels, + // when we could/should get away with only restarting + // Bob's node. + time.Sleep(1 * time.Second) + n.secondBobChannelLink.cfg.HodlMask = hodl.MaskNone + n.bobServer.htlcSwitch.cfg.HodlMask = hodl.MaskNone + t.Log("TEST: Removing HodlMask from Bob's link/switch") + + // Restart Bobs's node. + channels, err = restoreChannelsFromDb() + if err != nil { + t.Fatalf("unable to restore channels from database: %v", err) + } + // time.Sleep(3 * time.Second) + + n = newThreeHopNetwork(t, channels.aliceToBob, channels.bobToAlice, + channels.bobToCarol, channels.carolToBob, testStartingHeight) + + if debug { + // Log messages that alice receives from bob. + n.aliceServer.intersect(createLogFunc("[alice]<-bob<-carol: ", + n.aliceChannelLink.ChanID())) + + // Log messages that bob receives from alice. + n.bobServer.intersect(createLogFunc("alice->[bob]->carol: ", + n.firstBobChannelLink.ChanID())) + + // Log messages that bob receives from carol. + n.bobServer.intersect(createLogFunc("alice<-[bob]<-carol: ", + n.secondBobChannelLink.ChanID())) + + // Log messages that carol receives from bob. + n.carolServer.intersect(createLogFunc("alice->bob->[carol]", + n.carolChannelLink.ChanID())) + } + + // NOTE: Restarting the whole test network as we do above wipes the + // invoice registry for all nodes. We need to restore Carol's registry + // manually after restart so that she is still expecting the payment. + err = n.carolServer.registry.AddInvoice(ctx, *invoice, htlc.PaymentHash) + require.NoError(t, err, "unable to add invoice in carol registry") + time.Sleep(3 * time.Second) + + if err := n.start(); err != nil { + t.Fatalf("unable to start three hop network: %v", err) + } + t.Cleanup(n.stop) + + // Confirm that Alice receives the proper payment result. + resultChan, err := n.aliceServer.htlcSwitch.GetAttemptResult( + pid, htlc.PaymentHash, newMockDeobfuscator(), + ) + require.NoError(t, err, "unable to get payment result") + + select { + case result, ok := <-resultChan: + if !ok { + t.Fatalf("unexpected shutdown") + } + + // The Switch fails our payment back. + // assertFailureCode(t, result.Error, lnwire.CodeTemporaryChannelFailure) + // assertFailureCode(t, result.Error, lnwire.CodeInvalidOnionBlinding) + require.NoError(t, result.Error) + + case <-time.After(10 * time.Second): + t.Fatalf("payment result did not arrive") + } + + // Check that Carol invoice was settled and bandwidth of HTLC + // links were changed. + inv, err = n.carolServer.registry.LookupInvoice(ctx, htlc.PaymentHash) + require.NoError(t, err, "unable to get invoice") + if inv.State != invpkg.ContractSettled { + t.Fatal("carol's invoice haven't been settled") + } + } // TestChannelLinkCancelFullCommitment tests the ability for links to cancel @@ -3621,6 +3829,8 @@ func TestChannelLinkBandwidthChanReserve(t *testing.T) { // TestChannelRetransmission tests the ability of the channel links to // synchronize theirs states after abrupt disconnect. +// NOTE(11/24/22): Does whether the HTLC contains a blinding point +// impacted by this? func TestChannelRetransmission(t *testing.T) { t.Parallel() @@ -3754,6 +3964,8 @@ func TestChannelRetransmission(t *testing.T) { }, }, } + // NOTE(11/25/22): Here is a payment which encounters a restart somewhere. + // Might this contain concepts useful in trying out our blind hop reprocessing? paymentWithRestart := func(t *testing.T, messages []expectedMessage) { channels, restoreChannelsFromDb, err := createClusterChannels( t, btcutil.SatoshiPerBitcoin*5, btcutil.SatoshiPerBitcoin*5, @@ -4502,6 +4714,8 @@ func newPersistentLinkHarness(t *testing.T, hSwitch *Switch, link ChannelLink, func (h *persistentLinkHarness) restart(restartSwitch, syncStates bool, hodlFlags ...hodl.Flag) { + fmt.Println("[test setup]: restarting link!") + // First, remove the link from the switch. h.hSwitch.RemoveLink(h.link.ChanID()) @@ -4770,6 +4984,64 @@ func generateHtlcAndInvoice(t *testing.T, return htlc, invoice } +// // generateHtlcAndInvoice generates a single hop htlc which pays to a +// // blinded route. The caller can configure either an introduction node, +// // intermediate node, or final node style hop. +// // +// // NOTE: It is the responsibility of the caller to ensure that they +// // provide an onion and route blinding payload which are together +// // valid under the requirements in BOLT-04. +// func generateBlindHtlc(t *testing.T, onionPayload *hop.Payload, +// blindPayload *hop.BlindHopPayload, +// blindingPointTLV bool, id uint64) *lnwire.UpdateAddHTLC { + +// t.Helper() + +// htlcAmt := lnwire.NewMSatFromSatoshis(10000) +// htlcExpiry := testStartingHeight + testInvoiceCltvExpiry + +// var b bytes.Buffer +// hop.PackRouteBlindingPayload(&b, blindPayload) +// hops := []*hop.Payload{ +// // hop.NewTLVPayload(fwdInfo, blindPayload), +// { +// FwdInfo: onionPayload.FwdInfo, +// RouteBlindingEncryptedData: b.Bytes(), +// }, +// } +// blob, err := generateRoute(hops...) +// require.NoError(t, err, "unable to generate route") + +// // Generate a payment preimage & payment address. +// // Interpret the bytes of the path ID as the preimage. +// // This allows the caller to pick the path_id and ensure +// // our HTLC is crafted with matching payment preimage. +// // preimage := (lntypes.Preimage)(blindPayload.PathID) +// // var preimage lntypes.Preimage +// preimageBytes := blindPayload.PathID[:lntypes.PreimageSize] +// preimage, err := lntypes.MakePreimage(preimageBytes) +// require.NoError(t, err, "unable to interpret path_id as payment preimage") + +// payAddr, err := generatePaymentAddress() +// require.NoError(t, err, "unable to create payment address") + +// // Construct UpdateAddHTLC. +// _, htlc, _, err := generatePaymentWithPreimage( +// amount, htlcAmt, uint32(htlcExpiry), blob, &preimage, preimage.Hash(), payAddr, +// ) +// require.NoError(t, err, "unable to create payment") + +// htlc.ID = id + +// // Set any route blinding point provided by the caller. +// // The caller can use this field to build an htlc fit for +// // either an introduction node or an intermediate node in +// // a blind route. +// htlc.BlindingPoint = lnwire.NewBlindingPoint(ephemeralBlindingPoint) + +// return htlc +// } + // TestChannelLinkNoMoreUpdates tests that we won't send a new commitment // when there are no new updates to sign. func TestChannelLinkNoMoreUpdates(t *testing.T) { diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 6770c949f1..47650b8dcd 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -20,6 +20,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2/ecdsa" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/wire" + "github.com/decred/dcrd/dcrec/secp256k1/v4" "github.com/go-errors/errors" sphinx "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/chainntnfs" @@ -27,6 +28,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/models" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/contractcourt" + "github.com/lightningnetwork/lnd/htlcswitch/hodl" "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/invoices" "github.com/lightningnetwork/lnd/lnpeer" @@ -34,6 +36,7 @@ import ( "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/record" "github.com/lightningnetwork/lnd/ticker" ) @@ -203,6 +206,7 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error) DustThreshold: DefaultDustThreshold, SignAliasUpdate: signAliasUpdate, IsAlias: isAlias, + HodlMask: hodl.MaskNone, } return New(cfg, startingHeight) @@ -275,6 +279,7 @@ func (s *mockServer) Start() error { if err := s.htlcSwitch.Start(); err != nil { return err } + // defer s.htlcSwitch.Stop() s.wg.Add(1) go func() { @@ -330,12 +335,57 @@ func newMockHopIterator(hops ...*hop.Payload) hop.Iterator { return &mockHopIterator{hops: hops} } +// HopPayload returns the set of fields that detail exactly _how_ this hop +// should forward the HTLC to the next hop. For normal (ie: non-blind) +// hops, the information encoded within the returned ForwardingInfo is to +// be used by each hop to authenticate the information given to it by the +// prior hop. For blind hops, callers will find the necessary forwarding +// information one layer deeper, inside the route blinding TLV payload. +// +// Every time this method is called we peel off a layer of the onion +// and our hop iterator contains one less hop! +// +// NOTE(calvin): Carla's branch performs all processing of the route blinding +// payload behind this function. The caller should NOT be expected to decrypt +// parse, validate, etc. the payload. func (r *mockHopIterator) HopPayload() (*hop.Payload, error) { h := r.hops[0] r.hops = r.hops[1:] + return h, nil } +func calculateForwardingAmount(incomingAmount lnwire.MilliSatoshi, baseFee, + proportionalFee uint32) (lnwire.MilliSatoshi, error) { + + // Sanity check to prevent overflow. + if incomingAmount < lnwire.MilliSatoshi(baseFee) { + return 0, fmt.Errorf("incoming amount: %v < base fee: %v", + incomingAmount, baseFee) + } + numerator := (uint64(incomingAmount) - uint64(baseFee)) * 1e6 + denominator := 1e6 + uint64(proportionalFee) + + ceiling := (numerator + denominator - 1) / denominator + + return lnwire.MilliSatoshi(ceiling), nil +} + +// IsFinalHop indicates whether a hop is the final hop in a payment route. +// When the last hop parses its TLV payload via call to HopPayload(), +// it will leave us with an empty hop iterator. +// +// NOTE: As this is a mock which does not use a real sphinx implementation +// to signal the final hop via all-zero onion HMAC, we are relying on this +// method being called AFTER HopPayload(). If this method is called BEFORE +// parsing the TLV payload then it will NOT correctly report that we are +// the final hop! +func (r *mockHopIterator) IsFinalHop() bool { + fmt.Printf("TEST: There are %d hops left!\n", len(r.hops)) + + return len(r.hops) == 0 +} + func (r *mockHopIterator) ExtraOnionBlob() []byte { return nil } @@ -347,6 +397,8 @@ func (r *mockHopIterator) ExtractErrorEncrypter( return extracter(nil) } +// NOTE: This function name implies it encodes a single hop, +// but in actuality it encodes all hops in the route? func (r *mockHopIterator) EncodeNextHop(w io.Writer) error { var hopLength [4]byte binary.BigEndian.PutUint32(hopLength[:], uint32(len(r.hops))) @@ -356,8 +408,7 @@ func (r *mockHopIterator) EncodeNextHop(w io.Writer) error { } for _, hop := range r.hops { - fwdInfo := hop.ForwardingInfo() - if err := encodeFwdInfo(w, &fwdInfo); err != nil { + if err := encodeHopPayload(w, hop); err != nil { return err } } @@ -365,6 +416,25 @@ func (r *mockHopIterator) EncodeNextHop(w io.Writer) error { return nil } +func encodeHopPayload(w io.Writer, hop *hop.Payload) error { + // Encode and write the basic forwarding info fields as before. + fwdInfo := hop.ForwardingInfo() + if err := encodeFwdInfo(w, &fwdInfo); err != nil { + return err + } + + if hop.EncryptedData() != nil { + // Length prefix the route blinding payload. + if err := writeLengthPrefixedSlice(w, hop.EncryptedData()); err != nil { + return fmt.Errorf("failed to write length-prefixed "+ + "route blinding payload: %w", err) + } + } + + // Add a sentinel byte(s) to mark the end of serialization for this hop. + return encodeHopBoundaryMarker(w) +} + func encodeFwdInfo(w io.Writer, f *hop.ForwardingInfo) error { if err := binary.Write(w, binary.BigEndian, f.NextHop); err != nil { return err @@ -381,6 +451,41 @@ func encodeFwdInfo(w io.Writer, f *hop.ForwardingInfo) error { return nil } +// writeLengthPrefixedSlice writes the length of the given byte slice as a +// uint32 followed by the byte slice itself. +func writeLengthPrefixedSlice(w io.Writer, data []byte) error { + var lengthPrefix [4]byte + binary.BigEndian.PutUint32(lengthPrefix[:], uint32(len(data))) + + // Write the length prefix. + if _, err := w.Write(lengthPrefix[:]); err != nil { + return err + } + + // Write the actual data. + _, err := w.Write(data) + return err +} + +// sentinel is used to mark the boundary between serialized hops +// in the onion blob in the absense of TLV. +// +// TODO(11/5/22): add TLV to mockHopIterator? +var sentinel = [4]byte{0xff, 0xff, 0xff, 0xff} + +// encodeHopBoundaryMarker writes our sentinel value which delineates +// the boundary between the hop currently being encoded and any subsequent +// hops yet to be serialized. This allows us to handle variable length +// payloads which is necessary to distinguish between normal and blind +// hops (ie: those with a route blinding payload) during deserialization/decoding. +func encodeHopBoundaryMarker(w io.Writer) error { + if _, err := w.Write(sentinel[:]); err != nil { + return err + } + + return nil +} + var _ hop.Iterator = (*mockHopIterator)(nil) // mockObfuscator mock implementation of the failure obfuscator which only @@ -490,7 +595,8 @@ func newMockIteratorDecoder() *mockIteratorDecoder { } func (p *mockIteratorDecoder) DecodeHopIterator(r io.Reader, rHash []byte, - cltv uint32) (hop.Iterator, lnwire.FailCode) { + amount lnwire.MilliSatoshi, cltv uint32, + blindingPoint *btcec.PublicKey) (hop.Iterator, lnwire.FailCode) { var b [4]byte _, err := r.Read(b[:]) @@ -501,25 +607,22 @@ func (p *mockIteratorDecoder) DecodeHopIterator(r io.Reader, rHash []byte, hops := make([]*hop.Payload, hopLength) for i := uint32(0); i < hopLength; i++ { - var f hop.ForwardingInfo - if err := decodeFwdInfo(r, &f); err != nil { + p := hop.NewTLVPayload() + if err := decodeHopPayload(r, p, amount, cltv, blindingPoint); err != nil { return nil, lnwire.CodeTemporaryChannelFailure } - var nextHopBytes [8]byte - binary.BigEndian.PutUint64(nextHopBytes[:], f.NextHop.ToUint64()) - - hops[i] = hop.NewLegacyPayload(&sphinx.HopData{ - Realm: [1]byte{}, // hop.BitcoinNetwork - NextAddress: nextHopBytes, - ForwardAmount: uint64(f.AmountToForward), - OutgoingCltv: f.OutgoingCTLV, - }) + hops[i] = p } return newMockHopIterator(hops...), lnwire.CodeNone } +// NOTE(10/22/22): DecodeHopIteratorRequest's will have a non-nil ephemeral +// blinding point for blind hops. In a real implementation this will be used by +// the underlying Sphinx library to decrypt the onion. For testing, it can +// probably be ignored as we just pass the public key through to the Sphinx +// implementation, but we are not dealing with encrypted data for Link testing. func (p *mockIteratorDecoder) DecodeHopIterators(id []byte, reqs []hop.DecodeHopIteratorRequest) ( []hop.DecodeHopIteratorResponse, error) { @@ -535,10 +638,26 @@ func (p *mockIteratorDecoder) DecodeHopIterators(id []byte, batchSize := len(reqs) + fmt.Printf("[DecodeHopIterators()]: Decrypting %d onion packets\n", batchSize) + resps := make([]hop.DecodeHopIteratorResponse, 0, batchSize) for _, req := range reqs { + var blindingPoint []byte + if req.BlindingPoint != nil { + blindingPoint = req.BlindingPoint.SerializeCompressed()[:10] + } + fmt.Printf("[DecodeHopIterators()]: Decrypting onion packet for HTLC ADD, "+ + "amt=%s, cltv=%d, r_hash=%x, blinding_point=%x\n", + req.IncomingAmount.String(), + req.IncomingCltv, + req.RHash, + blindingPoint, + ) + iterator, failcode := p.DecodeHopIterator( - req.OnionReader, req.RHash, req.IncomingCltv, + req.OnionReader, req.RHash, + req.IncomingAmount, req.IncomingCltv, + req.BlindingPoint, ) if p.decodeFail { @@ -559,6 +678,76 @@ func (p *mockIteratorDecoder) DecodeHopIterators(id []byte, return resps, nil } +func decodeHopPayload(r io.Reader, p *hop.Payload, + incomingAmt lnwire.MilliSatoshi, incomingCltv uint32, + blindingPoint *secp256k1.PublicKey) error { + + fmt.Println("MOCK: decodeHopPayload!") + if err := decodeFwdInfo(r, &p.FwdInfo); err != nil { + return err + } + + if err := decodeBlindHop(r, p); err != nil { + return err + } + + // Process encrypted data for blinded hops if it's present. + if p.EncryptedData() != nil { + // NOTE(calvin): We'll throw back an error here to simulate + // inability to decrypt the route blinding payload without a + // a blinding point. + if blindingPoint == nil { + fmt.Println("MOCK: decodeHopPayload() - unable to decrypt route blinding" + + "payload without blinding point") + + return fmt.Errorf("unable to decrypt route blinding" + + "payload without blinding point") + } + + fmt.Printf("MOCK: decodeHopPayload() - extracting fwd_info from "+ + "route blinding payload! blinding_point=%x\n", blindingPoint.SerializeCompressed()[:10]) + + reader := bytes.NewReader(p.EncryptedData()) + data, err := record.DecodeBlindedRouteData(reader) + if err != nil { + return fmt.Errorf("failed to decode blinded route data: %w", err) + } + + fmt.Printf("MOCK: decodeHopPayload() - parsed payload=%+v!\n", data) + + var fwdAmt lnwire.MilliSatoshi + var expiry uint32 + if data.RelayInfo != nil { + var err error + fwdAmt, err = calculateForwardingAmount( + incomingAmt, data.RelayInfo.BaseFee, + data.RelayInfo.FeeRate, + ) + if err != nil { + return err + } + + expiry = incomingCltv - uint32(data.RelayInfo.CltvExpiryDelta) + } + + fmt.Printf("MOCK: decodeHopPayload() - computed fwd_amt=%d, outgoing_cltv=%d!\n", fwdAmt, expiry) + + // Populate the forwarding information. + p.FwdInfo = hop.ForwardingInfo{ + AmountToForward: fwdAmt, + OutgoingCTLV: expiry, + // For simplicity's sake we just pass back the same blinding point. + NextBlinding: blindingPoint, + } + + if data.ShortChannelID != nil { + p.FwdInfo.NextHop = *data.ShortChannelID + } + } + + return nil +} + func decodeFwdInfo(r io.Reader, f *hop.ForwardingInfo) error { if err := binary.Read(r, binary.BigEndian, &f.NextHop); err != nil { return err @@ -575,6 +764,99 @@ func decodeFwdInfo(r io.Reader, f *hop.ForwardingInfo) error { return nil } +func decodeBlindHop(r io.Reader, p *hop.Payload) error { + fmt.Println("MOCK: decodeBlindHop!") + + // NOTE(10/26/22): If we read these 4 bytes to determine whether we + // should parse the route blinding payload and this is not a blind hop, + // then we are eating 4 bytes that ought to have been decoded/interpreted + // differently. This leads to mistakenly decoded/parsed payloads. + var b [4]byte + _, err := r.Read(b[:]) + if err != nil { + return err + } + fmt.Printf("MOCK: decodeBlindHop() - parsing payload=%+v!\n", p) + + // Check for hop boundary sentinel. If we are at a hop boundary, + // then we should bail early without reading any more bytes. + // If this is not the hop boundary, then we should interpret the bytes + // just read as the length of the route blinding payload. + if ok := isHopBoundary(b[:]); ok { + return nil + } + + // This hop has a route blinding payload, so we'll decode that now. + payloadLength := binary.BigEndian.Uint32(b[:]) + buf := make([]byte, payloadLength) + n, err := io.ReadFull(r, buf) + if err != nil { + return err + } + + // Only set the route blinding payload if it exists. + // Otherwise, leave the slice nil so we do not incorrectly + // believe the hop to be blind. + if n != 0 { + p.SetEncryptedData(buf) + } + + fmt.Printf("MOCK: decodeBlindHop() - still parsing payload=%+v!\n", p) + + // Similar procedure for blinding point. + _, err = r.Read(b[:]) + if err != nil { + fmt.Println("MOCK: decodeBlindHop() - error on read!") + return err + } + + // If this is not the hop boundary, then we should interpret + // the bytes just read as the length of the next field + // (I see the need for something like TLV). + if ok := isHopBoundary(b[:]); ok { + fmt.Println("MOCK: decodeBlindHop() - encountered hop boundary!") + + // deriveForwardingInfo(p) + fmt.Printf("MOCK: decodeBlindHop() - finished parsing payload=%+v!\n", p) + return nil + } + + fieldLength := binary.BigEndian.Uint32(b[:]) + pubKeyBytes := make([]byte, fieldLength) + n, err = io.ReadFull(r, pubKeyBytes) + if err != nil { + return err + } + + fmt.Printf("MOCK: decodeBlindHop() - still parsing payload=%+v!\n", p) + + // TODO(calvin): We parse the encrypted data. We then need to set + // the proper ForwardingInfo fields. Recall that in Carla's branch the + // link does NOT process route blinding payload at all. Rather, the + // link expects calls to HopPayload() for the mock iterator to fully + // assemble the necessary forwarding information. + // deriveForwardingInfo(p) + fmt.Printf("MOCK: decodeBlindHop() - finished parsing payload=%+v!\n", p) + // p.BlindingPoint() + // p.BlindingPoint, _ = btcec.ParsePubKey(pubKeyBytes) + + // Don't forget to trim off the sentinel, so that any hops + // after this one are parsed correctly. + return trimSentinel(r) + +} + +func isHopBoundary(b []byte) bool { + return bytes.Equal(sentinel[:], b) +} + +func trimSentinel(r io.Reader) error { + var b [4]byte + _, err := r.Read(b[:]) + + return err +} + // messageInterceptor is function that handles the incoming peer messages and // may decide should the peer skip the message or not. type messageInterceptor func(m lnwire.Message) (bool, error) @@ -990,6 +1272,8 @@ func (m *mockChainNotifier) RegisterBlockEpochNtfn(*chainntnfs.BlockEpoch) ( }, nil } +// NOTE(calvin): Whenever we call this we create an new invoice DB. +// I don't think we support restarting the invoice DB! func newMockRegistry(minDelta uint32) *mockInvoiceRegistry { cdb, cleanup, err := newDB() if err != nil { diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 2cf4744b7f..833d9befde 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -18,6 +18,7 @@ import ( "github.com/lightningnetwork/lnd/channeldb/models" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/contractcourt" + "github.com/lightningnetwork/lnd/htlcswitch/hodl" "github.com/lightningnetwork/lnd/htlcswitch/hop" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lntypes" @@ -222,6 +223,12 @@ type Config struct { // IsAlias returns whether or not a given SCID is an alias. IsAlias func(scid lnwire.ShortChannelID) bool + + // hodl.Mask is a bitvector composed of hodl.Flags, specifying breakpoints + // for HTLC forwarding internal to the switch. + // + // NOTE: This should only be used for testing. + HodlMask hodl.Mask } // Switch is the central messaging bus for all incoming/outgoing HTLCs. @@ -668,6 +675,9 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, numSent int ) + fmt.Printf("[switch.ForwardPackets(%s)]: Forwarding %d packets! switch link count=%d\n", + "this", len(packets), len(s.linkIndex)) + // No packets, nothing to do. if len(packets) == 0 { return nil @@ -702,11 +712,29 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{}, for _, packet := range packets { switch htlc := packet.htlc.(type) { case *lnwire.UpdateAddHTLC: + fmt.Printf("[switch.ForwardPackets()]: encountered ADD! switch link count=%d\n", + len(s.linkIndex)) + + fmt.Printf("[switch.ForwardPackets()]: hodl.Mask=%v\n", + s.cfg.HodlMask) + + // If hodl.AddForward mode is active, we exit early to + // simulate arbitrary delays in the switch during forwarding. + // This can be leveraged to test the switch going down + // while trying to forward a batch of HTLCs. + if s.cfg.HodlMask.Active(hodl.AddForward) { + log.Warnf(hodl.AddForward.Warning()) + fmt.Printf("%s\n", hodl.AddForward.Warning()) + continue + } + circuit := newPaymentCircuit(&htlc.PaymentHash, packet) packet.circuit = circuit circuits = append(circuits, circuit) addBatch = append(addBatch, packet) default: + fmt.Printf("[switch.ForwardPackets()]: encountered Settle/Fail! switch link count=%d\n", + len(s.linkIndex)) err := s.routeAsync(packet, fwdChan, linkQuit) if err != nil { return fmt.Errorf("failed to forward packet %w", @@ -817,6 +845,8 @@ func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) { if err != nil { log.Errorf("Unhandled error while reforwarding htlc "+ "settle/fail over htlcswitch: %v", err) + fmt.Printf("Unhandled error while reforwarding htlc "+ + "settle/fail over htlcswitch: %v\n", err) } case <-s.quit: log.Errorf("unable to forward htlc packet " + @@ -1274,12 +1304,20 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { return s.failAddPacket(packet, linkErr) } + fmt.Printf("[switch.handlePacketForward()]: forwarding incoming HTLC(%x) via "+ + "outgoing link (id=%v). switch link count=%d\n", + htlc.PaymentHash[:10], packet.outgoingChanID, len(s.linkIndex)) + // Send the packet to the destination channel link which // manages the channel. packet.outgoingChanID = destination.ShortChanID() return destination.handleSwitchPacket(packet) case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC: + fmt.Printf("[switch.handlePacketForward()]: encountered Settle/Fail! "+ + "attempting to close circuit, switch link count=%d\n", + len(s.linkIndex)) + // If the source of this packet has not been set, use the // circuit map to lookup the origin. circuit, err := s.closeCircuit(packet) @@ -1290,6 +1328,8 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { // closeCircuit returns a nil circuit when a settle packet returns an // ErrUnknownCircuit error upon the inner call to CloseCircuit. if circuit == nil { + fmt.Printf("[switch.handlePacketForward()]: bailing on handling early! switch link count=%d\n", + len(s.linkIndex)) return nil } @@ -1373,6 +1413,13 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { return nil } + // outgoingLink, _ := s.getLinkByShortID(packet.outgoingChanID) + // incomingLink, _ := s.getLinkByShortID(packet.incomingChanID) + + fmt.Printf("[switch.handlePacketForward()]: attempting to return Settle/Fail via "+ + "incoming link (id=%v). switch link count=%d\n", + packet.incomingChanID, len(s.linkIndex)) + // Check to see that the source link is online before removing // the circuit. return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) @@ -1558,6 +1605,8 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { } } + fmt.Printf("[switch.closeCircuit()]: outgoing link fwd pkg (dest) ref: %+v\n", pkt.destRef) + // Otherwise, this is packet was received from the remote party. Use // circuit map to find the incoming link to receive the settle/fail. circuit, err := s.circuits.CloseCircuit(pkt.outKey()) @@ -1580,6 +1629,11 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { pkt.incomingChanID, pkt.incomingHTLCID, pkt.outgoingChanID, pkt.outgoingHTLCID) + fmt.Printf("[switch.closeCircuit()]: Closed completed %s circuit for %x: "+ + "(%s, %d) <-> (%s, %d)\n", pktType, pkt.circuit.PaymentHash, + pkt.incomingChanID, pkt.incomingHTLCID, + pkt.outgoingChanID, pkt.outgoingHTLCID) + return circuit, nil // Circuit was previously closed, but has not been deleted. We'll just @@ -1593,6 +1647,9 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { if pkt.destRef != nil { // Add this SettleFailRef to the set of pending settle/fail entries // awaiting acknowledgement. + fmt.Printf("[switch.closeCircuit()]: adding settle/fail to batch for " + + "internal forwarding package acknowledgement!\n") + s.pendingSettleFails = append(s.pendingSettleFails, *pkt.destRef) } diff --git a/htlcswitch/test_utils.go b/htlcswitch/test_utils.go index 0c98fa4fe3..b0412c159e 100644 --- a/htlcswitch/test_utils.go +++ b/htlcswitch/test_utils.go @@ -22,7 +22,6 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/go-errors/errors" - sphinx "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb/models" "github.com/lightningnetwork/lnd/contractcourt" @@ -38,6 +37,7 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/record" "github.com/lightningnetwork/lnd/shachain" "github.com/lightningnetwork/lnd/ticker" "github.com/stretchr/testify/require" @@ -487,6 +487,7 @@ func createTestChannel(t *testing.T, alicePrivKey, bobPrivKey []byte, // getChanID retrieves the channel point from an lnnwire message. func getChanID(msg lnwire.Message) (lnwire.ChannelID, error) { var chanID lnwire.ChannelID + switch msg := msg.(type) { case *lnwire.UpdateAddHTLC: chanID = msg.ChanID @@ -494,6 +495,8 @@ func getChanID(msg lnwire.Message) (lnwire.ChannelID, error) { chanID = msg.ChanID case *lnwire.UpdateFailHTLC: chanID = msg.ChanID + case *lnwire.UpdateFailMalformedHTLC: + chanID = msg.ChanID case *lnwire.RevokeAndAck: chanID = msg.ChanID case *lnwire.CommitSig: @@ -555,27 +558,42 @@ func generatePaymentWithPreimage(invoiceAmt, htlcAmt lnwire.MilliSatoshi, return invoice, htlc, paymentID, nil } +func generatePaymentAddress() ([32]byte, error) { + + var payAddr lntypes.Hash + r, err := generateRandomBytes(sha256.Size) + copy(payAddr[:], r) + + return payAddr, err +} + +func generatePaymentSecret() (lntypes.Preimage, [32]byte, error) { + + var preimage lntypes.Preimage + var rHash lntypes.Hash + r, err := generateRandomBytes(lntypes.PreimageSize) + copy(preimage[:], r) + + rHash = sha256.Sum256(r) + + return preimage, rHash, err +} + // generatePayment generates the htlc add request by given path blob and // invoice which should be added by destination peer. func generatePayment(invoiceAmt, htlcAmt lnwire.MilliSatoshi, timelock uint32, blob [lnwire.OnionPacketSize]byte) (*invoices.Invoice, *lnwire.UpdateAddHTLC, uint64, error) { - var preimage lntypes.Preimage - r, err := generateRandomBytes(sha256.Size) + preimage, rhash, err := generatePaymentSecret() if err != nil { return nil, nil, 0, err } - copy(preimage[:], r) - - rhash := sha256.Sum256(preimage[:]) - var payAddr [sha256.Size]byte - r, err = generateRandomBytes(sha256.Size) + payAddr, err := generatePaymentAddress() if err != nil { return nil, nil, 0, err } - copy(payAddr[:], r) return generatePaymentWithPreimage( invoiceAmt, htlcAmt, timelock, blob, &preimage, rhash, payAddr, @@ -668,15 +686,140 @@ func generateHops(payAmt lnwire.MilliSatoshi, startingHeight uint32, amount = runningAmt - fee } + // NOTE(10/22/22): We still only ever use legacy onion payloads. + // We should create a new version of this function or update + // this one to use the now required TLV onion hop payload! + hop := &hop.Payload{ + FwdInfo: hop.ForwardingInfo{ + NextHop: nextHop, + AmountToForward: amount, + OutgoingCTLV: timeLock, + }, + } + hops[i] = hop + } + + return runningAmt, totalTimelock, hops +} + +// computePaymentRelay computes a the minimal set of information +// necessary to forward a payment inside a blinded route. +// This method does NOT take the perspective of a blinded +// route builder looking to maximize privacy. +func computePaymentRelay(link *channelLink) *record.PaymentRelayInfo { + return &record.PaymentRelayInfo{ + // QUESTION(10/25/22): Why do all the route + // blinding spec types not match LND types? + BaseFee: uint32(link.cfg.FwrdingPolicy.BaseFee), + FeeRate: uint32(link.cfg.FwrdingPolicy.FeeRate), + CltvExpiryDelta: uint16(link.cfg.FwrdingPolicy.TimeLockDelta), + } +} + +// generateBlindHops constructs the onion and route blinding payload +// for each hop in a blind route. It also provideds the total amount to +// be sent, and the time lock value needed to route an HTLC with the +// target amount over the specified path. +func generateBlindHops(payAmt lnwire.MilliSatoshi, startingHeight uint32, + pathID []byte, path ...*channelLink) (lnwire.MilliSatoshi, uint32, []*hop.Payload) { + + totalTimelock := startingHeight + runningAmt := payAmt + + hops := make([]*hop.Payload, len(path)) + for i := len(path) - 1; i >= 0; i-- { + + var finalHop bool = i == len(path)-1 + + // If this is the last hop, then the next hop is the special + // "exit node". Otherwise, we look to the "prior" hop. + // NOTE(10/28/22): The conditionals which handle next hop, + // amount and timelock could be combined, but the function + // might be more readable if they are handled separately. + // This is test code so performance is not as critical. + nextHop := hop.Exit + if !finalHop { + nextHop = path[i+1].channel.ShortChanID() + } + + // If this is the last hop, then the time lock will be their + // specified delta policy plus our starting height. + var timeLock uint32 + if finalHop { + totalTimelock += testInvoiceCltvExpiry + timeLock = totalTimelock + } else { + // Otherwise, the outgoing time lock should be the + // incoming timelock minus their specified delta. + delta := path[i+1].cfg.FwrdingPolicy.TimeLockDelta + totalTimelock += delta + timeLock = totalTimelock - delta + } + + // Finally, we'll need to calculate the amount to forward. For + // the last hop, it's just the payment amount. + amount := payAmt + if !finalHop { + prevHop := hops[i+1] + prevAmount := prevHop.ForwardingInfo().AmountToForward + + fee := ExpectedFee(path[i].cfg.FwrdingPolicy, prevAmount) + runningAmt += fee + + // Otherwise, for a node to forward an HTLC, then + // following inequality most hold true: + // * amt_in - fee >= amt_to_forward + amount = runningAmt - fee + } + var nextHopBytes [8]byte binary.BigEndian.PutUint64(nextHopBytes[:], nextHop.ToUint64()) - hops[i] = hop.NewLegacyPayload(&sphinx.HopData{ - Realm: [1]byte{}, // hop.BitcoinNetwork - NextAddress: nextHopBytes, - ForwardAmount: uint64(amount), - OutgoingCltv: timeLock, - }) + // Construct the onion and route blinding payloads for this hop. + payload := &hop.Payload{} + + blindPayload := record.BlindedRouteData{ + // Padding: []byte{0x00, 0x01, 0x00, 0x00}, + ShortChannelID: &nextHop, + Constraints: &record.PaymentConstraints{}, + } + + // Configuration meant for the first blind hop only. + if i == 0 { + // _, ephemeralBlindingPoint := btcec.PrivKeyFromBytes([]byte("test private key")) + // payload.BlindingPoint = ephemeralBlindingPoint + } + + // Configuration meant for all intermediate (non-final) hops. + if !finalHop { + // Each intermediate hop requires fee and timelock + // information in order to relay payment. + blindPayload.RelayInfo = computePaymentRelay(path[i]) + } + + // Configuration meant for the final blind hop only. + if finalHop { + // The final hop in a blinded route must have a + // path ID and top level forwarding information set. + blindPayload.PathID = bytes.Repeat([]byte{1}, 32) + if pathID != nil { + blindPayload.PathID = pathID + } + + payload.FwdInfo = hop.ForwardingInfo{ + NextHop: nextHop, + AmountToForward: amount, + OutgoingCTLV: timeLock, + } + } + + // Serialize the route blinding TLV payload + payloadBytes, err := record.EncodeBlindedRouteData(&blindPayload) + if err != nil { + fmt.Println("unable to encode route blinding payload") + } + payload.SetEncryptedData(payloadBytes) + hops[i] = payload } return runningAmt, totalTimelock, hops @@ -858,6 +1001,8 @@ func (n *threeHopNetwork) stop() { } } +// NOTE(10/22/22): this is a 3 (or less) node network proprietary structure. +// Would need to be generalized to support N node networks. type clusterChannels struct { aliceToBob *lnwallet.LightningChannel bobToAlice *lnwallet.LightningChannel @@ -867,6 +1012,8 @@ type clusterChannels struct { // createClusterChannels creates lightning channels which are needed for // network cluster to be initialized. +// NOTE(10/22/22): this is a 3 (or less) node network proprietary function. +// Would need to be generalized to support N node networks. func createClusterChannels(t *testing.T, aliceToBob, bobToCarol btcutil.Amount) ( *clusterChannels, func() (*clusterChannels, error), error) { @@ -890,6 +1037,7 @@ func createClusterChannels(t *testing.T, aliceToBob, bobToCarol btcutil.Amount) } restoreFromDb := func() (*clusterChannels, error) { + fmt.Println("[test setup utility]: restoring all channels from DB") a2b, err := aliceChannel.restore() if err != nil { @@ -957,6 +1105,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, hopNetwork := newHopNetwork() // Create three peers/servers. + // NOTE(calvin): This overwrites our invoice DB!! aliceServer, err := newMockServer( t, "alice", startingHeight, aliceDb, hopNetwork.defaultDelta, ) @@ -1025,6 +1174,7 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel, } } +// NOTE(10/22/22): this is a 3 (or less) node network proprietary type/function. // serverOption is a function which alters the three servers created for // a three hop network to allow custom settings on each server. type serverOption func(aliceServer, bobServer, carolServer *mockServer) @@ -1168,9 +1318,11 @@ func (h *hopNetwork) createChannelLink(server, peer *mockServer, NotifyInactiveLinkEvent: func(wire.OutPoint) {}, HtlcNotifier: server.htlcSwitch.cfg.HtlcNotifier, GetAliases: getAliases, + // BlindHopProcessor: &mockBlindHopProcessor{}, }, channel, ) + if err := server.htlcSwitch.AddLink(link); err != nil { return nil, fmt.Errorf("unable to add channel link: %w", err) } diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 7b2abd6430..10a0cb6414 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -422,6 +422,28 @@ func PayDescsFromRemoteLogUpdates(chanID lnwire.ShortChannelID, height uint64, pd.OnionBlob = make([]byte, len(wireMsg.OnionBlob)) copy(pd.OnionBlob[:], wireMsg.OnionBlob[:]) + // NOTE(11/25/22): This provides indication that we are able to + // recover a blinding point from disk after a restart, so ADDs + // should be reprocessed with a blinding point. We have given the + // blind hop processing duties to the incoming link. What happens + // if we are an outgoing link here? Are we restored with our peer's + // blinding point? I think we'll be fine because from the perspective + // of the outgoing link this is a local ADD, so we will not be calling + // processRemoteAdds(). + if wireMsg.BlindingPoint != nil { + fmt.Printf("[PayDescsFromRemoteLogUpdates(%s)]: "+ + "created payment descriptor for ADD from LogUpdate, blinding_point=%x\n", + chanID, + wireMsg.BlindingPoint.Pubkey().SerializeCompressed()[:10], + ) + } else { + fmt.Printf("[PayDescsFromRemoteLogUpdates(%s)]: "+ + "created payment descriptor for ADD from LogUpdate, blinding_point=%v\n", + chanID, + wireMsg.BlindingPoint, + ) + } + case *lnwire.UpdateFulfillHTLC: pd = PaymentDescriptor{ RPreimage: wireMsg.PaymentPreimage, @@ -1303,6 +1325,15 @@ type LightningChannel struct { localUpdateLog *updateLog remoteUpdateLog *updateLog + // LocalFundingKey is the public key under control by the wallet that + // was used for the 2-of-2 funding output which created this channel. + LocalFundingKey *btcec.PublicKey + + // RemoteFundingKey is the public key for the remote channel counter + // party which used for the 2-of-2 funding output which created this + // channel. + RemoteFundingKey *btcec.PublicKey + // log is a channel-specific logging instance. log btclog.Logger @@ -1418,6 +1449,8 @@ func NewLightningChannel(signer input.Signer, localUpdateLog: localUpdateLog, remoteUpdateLog: remoteUpdateLog, Capacity: state.Capacity, + LocalFundingKey: state.LocalChanCfg.MultiSigKey.PubKey, + RemoteFundingKey: state.RemoteChanCfg.MultiSigKey.PubKey, taprootNonceProducer: taprootNonceProducer, log: build.NewPrefixLog(logPrefix, walletLog), opts: opts, @@ -1696,6 +1729,11 @@ func (lc *LightningChannel) localLogUpdateToPayDesc(logUpdate *channeldb.LogUpda EntryType: Fail, FailReason: wireMsg.Reason[:], removeCommitHeightRemote: commitHeight, + // After a restart, we will restore our update logs from + // our on disk state. We need to make sure that any + // payment descriptors which had blinding points + // retrieve and set them here? + BlindingPoint: ogHTLC.BlindingPoint, }, nil // HTLC fails due to malformed onion blocks are treated the exact same @@ -1712,6 +1750,11 @@ func (lc *LightningChannel) localLogUpdateToPayDesc(logUpdate *channeldb.LogUpda FailCode: wireMsg.FailureCode, ShaOnionBlob: wireMsg.ShaOnionBlob, removeCommitHeightRemote: commitHeight, + // After a restart, we will restore our update logs from + // our on disk state. We need to make sure that any + // payment descriptors which had blinding points + // retrieve and set them here? + BlindingPoint: ogHTLC.BlindingPoint, }, nil case *lnwire.UpdateFee: @@ -1843,6 +1886,13 @@ func (lc *LightningChannel) remoteLogUpdateToPayDesc(logUpdate *channeldb.LogUpd func (lc *LightningChannel) restoreCommitState( localCommitState, remoteCommitState *channeldb.ChannelCommitment) error { + fmt.Printf("[channel.restoreCommitState(%s) - local_key=%x, remote_key=%x]: rebuilding in-memory"+ + "commit/update log state using the information we have on disk.\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + ) + // In order to reconstruct the pkScripts on each of the pending HTLC // outputs (if any) we'll need to regenerate the current revocation for // this current un-revoked state as well as retrieve the current @@ -1948,6 +1998,11 @@ func (lc *LightningChannel) restoreCommitState( // Finally, with the commitment states restored, we'll now restore the // state logs based on the current local+remote commit, and any pending // remote commit that exists. + // + // NOTE(11/25/22): The commitment transactions/diff this gets + // called with contains the payment descriptors which will be restored + // to our update logs. The blinding point must already be set at this point. + // Presently it is not set. err = lc.restoreStateLogs( localCommit, remoteCommit, pendingRemoteCommit, pendingRemoteCommitDiff, pendingRemoteKeyChain, @@ -1971,6 +2026,13 @@ func (lc *LightningChannel) restoreStateLogs( unsignedAckedUpdates, remoteUnsignedLocalUpdates []channeldb.LogUpdate) error { + fmt.Printf("[channel.restoreStateLogs(%s) - local_key=%x, remote_key=%x]: "+ + "rebuilding in-memory update logs.\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + ) + // We make a map of incoming HTLCs to the height of the remote // commitment they were first added, and outgoing HTLCs to the height // of the local commit they were first added. This will be used when we @@ -3566,6 +3628,12 @@ func (lc *LightningChannel) createCommitDiff( newCommit *commitment, commitSig lnwire.Sig, htlcSigs []lnwire.Sig) (*channeldb.CommitDiff, error) { + fmt.Printf("[SignNextCommittment --> createCommitDiff(%s) - local_key=%x, remote_key=%x]: signing committment for peer!\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + ) + // First, we need to convert the funding outpoint into the ID that's // used on the wire to identify this channel. We'll use this shortly // when recording the exact CommitSig message that we'll be sending @@ -3628,6 +3696,22 @@ func (lc *LightningChannel) createCommitDiff( logUpdates = append(logUpdates, logUpdate) + // fmt.Printf("[SignNextCommittment --> createCommitDiff(%s) - local_key=%x, remote_key=%x, balance=%d]: "+ + fmt.Printf("[SignNextCommittment --> createCommitDiff(%s) - local_key=%x, remote_key=%x]: "+ + // fmt.Printf("[SignNextCommittment --> createCommitDiff(%s) - balance=%v]: "+ + // fmt.Printf("[SignNextCommittment --> createCommitDiff(%s)]: "+ + "created LogUpdate for ADD: %+v, (chan_id=%s, amount=%d, expiry=%d)\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + // lc.AvailableBalance(), + // logUpdate.UpdateMsg, + htlc.BlindingPoint, + htlc.ChanID.String(), + htlc.Amount, + htlc.Expiry, + ) + // Short circuit here since an add should not have any // of the references gathered in the case of settles, // fails or malformed fails. @@ -3645,6 +3729,8 @@ func (lc *LightningChannel) createCommitDiff( ChanID: chanID, ID: pd.ParentIndex, Reason: pd.FailReason, + // TODO: do we need blinding point here? + // NOTE(11/27/22): might depend on approach to error handling. } case MalformedFail: @@ -3653,6 +3739,8 @@ func (lc *LightningChannel) createCommitDiff( ID: pd.ParentIndex, ShaOnionBlob: pd.ShaOnionBlob, FailureCode: pd.FailCode, + // TODO: do we need blinding point here? + // NOTE(11/27/22): might depend on approach to error handling. } case FeeUpdate: @@ -3669,9 +3757,21 @@ func (lc *LightningChannel) createCommitDiff( // packets, if they exist. if pd.SourceRef != nil { ackAddRefs = append(ackAddRefs, *pd.SourceRef) + fmt.Printf("[SignNextCommittment --> createCommitDiff(%s) - local_key=%x, remote_key=%x]: incoming (this) link fwd pkg add ref: %+v!\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + pd.SourceRef, + ) } if pd.DestRef != nil { settleFailRefs = append(settleFailRefs, *pd.DestRef) + fmt.Printf("[SignNextCommittment --> createCommitDiff(%s) - local_key=%x, remote_key=%x]: outgoing link's fwd pkg ref: %+v!\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + pd.DestRef, + ) } if pd.ClosedCircuitKey != nil { closedCircuitKeys = append(closedCircuitKeys, @@ -3759,6 +3859,30 @@ func (lc *LightningChannel) getUnsignedAckedUpdates() []channeldb.LogUpdate { copy(htlc.OnionBlob[:], pd.OnionBlob) logUpdate.UpdateMsg = htlc + if htlc.BlindingPoint != nil { + fmt.Printf("[getUnsignedAckedUpdates(%s) - local_key=%x, remote_key=%x]: "+ + "created LogUpdate for ADD, blinding_point=%x, (chan_id=%s, amount=%d, expiry=%d)\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + htlc.BlindingPoint.Pubkey().SerializeCompressed()[:10], + htlc.ChanID.String(), + htlc.Amount, + htlc.Expiry, + ) + } else { + fmt.Printf("[getUnsignedAckedUpdates(%s) - local_key=%x, remote_key=%x]: "+ + "created LogUpdate for ADD, blinding_point=%v, (chan_id=%s, amount=%d, expiry=%d)\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + htlc.BlindingPoint, + htlc.ChanID.String(), + htlc.Amount, + htlc.Expiry, + ) + } + case Settle: logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{ ChanID: chanID, @@ -4340,6 +4464,20 @@ func (lc *LightningChannel) SignNextCommitment() (*NewCommitState, error) { if err != nil { return nil, err } + + // IMPORTANT NOTE: Adds, Settles, and Fails are always (internally?) ack'd + // atomically with commitment signing. + // NOTE(11/23/22): + // - Pay careful attention to what we set up + // to be acknowledged in the construction of the CommitDiff. + // - This is where we write the LogUpdates to disk! + fmt.Printf("[SignNextCommittment(%s) - local_key=%x, remote_key=%x]: "+ + "add refs: %+v, settle refs: %+v!\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + commitDiff.AddAcks, commitDiff.SettleFailAcks, + ) err = lc.channelState.AppendRemoteCommitChain(commitDiff) if err != nil { return nil, err @@ -5142,6 +5280,12 @@ func (lc *LightningChannel) ReceiveNewCommitment(commitSigs *CommitSigs) error { lc.Lock() defer lc.Unlock() + fmt.Printf("[ReceiveNewCommitment(%s) - local_key=%x, remote_key=%x]: got new commitment!\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + ) + // Check for empty commit sig. Because of a previously existing bug, it // is possible that we receive an empty commit sig from nodes running an // older version. This is a relaxation of the spec, but it is still @@ -5613,6 +5757,12 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ( lc.Lock() defer lc.Unlock() + fmt.Printf("[ReceiveRevocation(%s) - local_key=%x, remote_key=%x]: got revocation!\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + ) + // Ensure that the new pre-image can be placed in preimage store. store := lc.channelState.RevocationStore revocation, err := chainhash.NewHash(revMsg.Revocation[:]) @@ -5702,6 +5852,8 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ( // locked-in at this new state. By doing this we ensure that we // don't re-forward any already processed HTLC's after a // restart. + // + // NOTE(1/20/23): The channel state machine does not reforward HTLCs? switch { case pd.EntryType == Add && committedAdd && shouldFwdAdd: // Construct a reference specifying the location that @@ -5711,6 +5863,13 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ( Height: remoteChainTail, Index: addIndex, } + fmt.Printf("[ReceiveRevocation(%s) - local_key=%x, remote_key=%x]: "+ + "forwarding package add reference: %+v!\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + pd.SourceRef, + ) addIndex++ pd.isForwarded = true @@ -5720,11 +5879,34 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ( // Construct a reference specifying the location that // this forwarded Settle/Fail will be written in the // forwarding package constructed at this remote height. + // + // NOTE(11/23/22): An payment descriptor first receives a + // forwarding package destination reference when the HTLC + // update it describes, namely a (settle/fail) response + // to a previously forwarded (add), arrives at our link + // from a peer and has been commmited to. From the perspective + // of the original (add), the response has arrived and + // been committed to by the outgoing link. It just needs + // to be returned to the incoming link via the switch + // and continued on its journey back towards the sender. + // Will our incoming link's payment descriptor have the + // same destination reference? + // - Check SettleHTLC() or FailHTLC() + // - Yes! Though payment descriptors remain local to the link + // for whose HTLC update log they were created, the channeldb.SettleFailRefs + // are passed through the Switch via the htlcPacket structure pd.DestRef = &channeldb.SettleFailRef{ Source: source, Height: remoteChainTail, Index: settleFailIndex, } + fmt.Printf("[ReceiveRevocation(%s) - local_key=%x, remote_key=%x]: "+ + "forwarding package settle/fail reference: %+v!\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + pd.DestRef, + ) settleFailIndex++ pd.isForwarded = true @@ -5754,11 +5936,43 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ( Amount: pd.Amount, Expiry: pd.Timeout, PaymentHash: pd.RHash, + // NOTE(11/27/22): Here we add the (incoming) blinding point + // to the UpdateAddHTLC message so it is written to disk + // when we persist this LogUpdate. This is the LogUpdate that + // gets put in our ForwardingPackage, so it will be preserved + // once the ForwardingPackage is written to disk below. + // The blinding point will be available to our the incoming + // link after we restore our in-memory state after a restart! + BlindingPoint: lnwire.NewBlindingPoint(pd.BlindingPoint), } copy(htlc.OnionBlob[:], pd.OnionBlob) logUpdate.UpdateMsg = htlc addUpdates = append(addUpdates, logUpdate) + if htlc.BlindingPoint != nil { + fmt.Printf("[ReceiveRevocation(%s) - local_key=%x, remote_key=%x]: "+ + "created LogUpdate for ADD, blinding_point=%x, (chan_id=%s, amount=%d, expiry=%d)\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + htlc.BlindingPoint.Pubkey().SerializeCompressed()[:10], + htlc.ChanID.String(), + htlc.Amount, + htlc.Expiry, + ) + } else { + fmt.Printf("[ReceiveRevocation(%s) - local_key=%x, remote_key=%x]: "+ + "created LogUpdate for ADD, blinding_point=%v, (chan_id=%s, amount=%d, expiry=%d)\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + htlc.BlindingPoint, + htlc.ChanID.String(), + htlc.Amount, + htlc.Expiry, + ) + } + case Settle: logUpdate.UpdateMsg = &lnwire.UpdateFulfillHTLC{ ChanID: chanID, @@ -5862,6 +6076,9 @@ func (lc *LightningChannel) ReceiveRevocation(revMsg *lnwire.RevokeAndAck) ( remoteHTLCs := lc.channelState.RemoteCommitment.Htlcs + // NOTE(1/20/23): We have a forwarding package containing all ADD + // updates alongside the collection of non duplicate ADDs which + // should be forwarded. return fwdPkg, addsToForward, settleFailsToForward, remoteHTLCs, nil } @@ -6083,6 +6300,13 @@ func (lc *LightningChannel) MayAddOutgoingHtlc(amt lnwire.MilliSatoshi) error { func (lc *LightningChannel) htlcAddDescriptor(htlc *lnwire.UpdateAddHTLC, openKey *models.CircuitKey) *PaymentDescriptor { + fmt.Printf("[AddHTLC(%s) - local_key=%x, remote_key=%x]: circuit key: %+v!\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + openKey, + ) + return &PaymentDescriptor{ EntryType: Add, RHash: PaymentHash(htlc.PaymentHash), @@ -6142,6 +6366,19 @@ func (lc *LightningChannel) ReceiveHTLC(htlc *lnwire.UpdateAddHTLC) (uint64, err "ID %d", htlc.ID, lc.remoteUpdateLog.htlcCounter) } + if htlc.BlindingPoint != nil { + fmt.Printf("[LightningChannel.ReceiveHTLC(%s) - local_key=%x, remote_key=%x]: receiving htlc with "+ + "ephemeral route blinding point=%x, (chan_id=%s, amount=%d, expiry=%d)\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + htlc.BlindingPoint.Pubkey().SerializeCompressed()[:10], + htlc.ChanID.String(), + htlc.Amount, + htlc.Expiry, + ) + } + pd := &PaymentDescriptor{ EntryType: Add, RHash: PaymentHash(htlc.PaymentHash), @@ -6202,6 +6439,14 @@ func (lc *LightningChannel) SettleHTLC(preimage [32]byte, htlcIndex uint64, sourceRef *channeldb.AddRef, destRef *channeldb.SettleFailRef, closeKey *models.CircuitKey) error { + fmt.Printf("[SettleHTLC(%s) - local_key=%x, remote_key=%x]: incoming (this) "+ + "link fwd pkg add ref: %+v, outgoing link fwd pkg ref: %+v!\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + sourceRef, destRef, + ) + lc.Lock() defer lc.Unlock() @@ -6250,6 +6495,16 @@ func (lc *LightningChannel) ReceiveHTLCSettle(preimage [32]byte, htlcIndex uint6 lc.Lock() defer lc.Unlock() + fmt.Printf("[ReceiveHTLCSettle(%s) - local_key=%x, remote_key=%x]: "+ + "preimage: %+v, htlc index: %d!\n", + lc.ShortChanID(), + lc.LocalFundingKey.SerializeCompressed()[:10], + lc.RemoteFundingKey.SerializeCompressed()[:10], + preimage, htlcIndex, + ) + + // NOTE(11/23/22): This HTLC (settle) update must be responding + // to a previous (add) we sent to the peer. htlc := lc.localUpdateLog.lookupHtlc(htlcIndex) if htlc == nil { return ErrUnknownHtlcIndex{lc.ShortChanID(), htlcIndex} @@ -6427,6 +6682,18 @@ func (lc *LightningChannel) ReceiveFailHTLC(htlcIndex uint64, reason []byte, LogIndex: lc.remoteUpdateLog.logIndex, EntryType: Fail, FailReason: reason, + // IMPORTANT NOTE(11/17/22): + // Where we need the blinding point in channel state machine's + // update log depends on how errors are to be processed and + // possibly on how HTLC retransmission is handled. + // + // The LN state machine stores the blinding point + // associated with an Add if the ADD was associated + // with a blinded route. If that Add is now being + // failed, we may want a reference to its ephemeral + // blinding point so we can use it to determine which + // error will be forwarded back towards the sender. + // BlindingPoint: htlc.BlindingPoint, } lc.remoteUpdateLog.appendUpdate(pd) diff --git a/lnwallet/channel_test.go b/lnwallet/channel_test.go index 87ad0d26c2..923f172a06 100644 --- a/lnwallet/channel_test.go +++ b/lnwallet/channel_test.go @@ -4291,6 +4291,97 @@ func TestFeeUpdateRejectInsaneFee(t *testing.T) { } } +// TestChannelRestoreBlindHTLC demonstrates that if we receive an HTLC +// with a (route) blinding point, that the blinding point is recovered +// after a restart. +func TestChannelRestoreBlindHTLC(t *testing.T) { + + // Create a test channel which will be used for the duration of this + // unittest. The channel will be funded evenly with Alice having 5 BTC, + // and Bob having 5 BTC. + aliceChannel, bobChannel, err := CreateTestChannels( + t, channeldb.SingleFunderTweaklessBit, + ) + require.NoError(t, err, "unable to create test channels") + + // First we create an HTLC that Alice will send to Bob. + // We add an ephemeral public key to the UpdateAddHTLC message + // to simulate the case that Bob is a forwarding node in the + // blinded portion of a route + htlc, _ := createHTLC(0, lnwire.MilliSatoshi(500000)) + _, ephemeralBlindingPoint := btcec.PrivKeyFromBytes([]byte("test private key")) + htlc.BlindingPoint = lnwire.NewBlindingPoint(ephemeralBlindingPoint) + + // -----add-----> + _, err = aliceChannel.AddHTLC(htlc, nil) + require.NoError(t, err) + _, err = bobChannel.ReceiveHTLC(htlc) + require.NoError(t, err) + + pdOld := bobChannel.remoteUpdateLog.lookupHtlc(0) + blindingPointBeforeRestart := pdOld.BlindingPoint + t.Logf("old blinding_point=%x", blindingPointBeforeRestart.SerializeCompressed()[:10]) + + // With the HTLC's applied to both update logs, we'll have Alice + // initiate a state transition to lock in this HTLC ADD update + // on both commitments. + // NOTE(11/25/22)): Recall that it is ReceiveRevocation() which + // builds the LogUpdate for the forwarding package we perist to disk. + // -----sig-----> + // <----rev------ + // <----sig------ + // -----rev-----> + err = ForceStateTransition(aliceChannel, bobChannel) + require.NoError(t, err) + + // We'll now force a restart for Bob, so we can test the + // persistence related portion of this assertion. + bobChannel, err = restartChannel(bobChannel) + require.NoError(t, err, "unable to restart channel") + + // Verify that Bob has access to the blinding point even + // after restoring state from disk. + // NOTE(11/25/22): As of now, the payment descriptors + // in our update log does not get a blinding point. + // We must have missed a method!!! (channeldb.HTLC) + // Do we even need the incoming blinding point for + // committed HTLCs being restored from disk? Or + // is the forwarding package enough on its own? + pdNew := bobChannel.remoteUpdateLog.lookupHtlc(0) + blindingPointAfterRestart := pdNew.BlindingPoint + // require.Nil(t, pdNew, "expected nil payment descriptor at index 0 after restart") // only if we don't lock in the ADD. + // require.Nil(t, pdNew.BlindingPoint, "unfortunately we expect nil "+ + // "blinding point from payment descriptors in the update log "+ + // "after restart") + require.Equal(t, blindingPointBeforeRestart, blindingPointAfterRestart, "expect blinding_point to match") + // t.Logf("new pay_desc=%+v", pdNew) + t.Logf("new blinding_point=%x", pdNew.BlindingPoint.SerializeCompressed()[:10]) + + // NOTE(11/25/22): We CAN access the bliding point after a restart + // if we grab our payment descriptors from our forwarding package: + // fwdpkg.LogUpdates --> payment descriptors + fwdPkgs, err := bobChannel.LoadFwdPkgs() + require.NoError(t, err, "unable to load forwarding packages") + + addPayDescsFromFwdPkg, err := PayDescsFromRemoteLogUpdates( + bobChannel.ShortChanID(), 0, fwdPkgs[0].Adds, + ) + require.NoError(t, err, "unable to convert channeldb.LogUpdate --> lnwallet.PaymentDescriptor for ADDs") + + blindingPointFromFwdPkg := addPayDescsFromFwdPkg[0].BlindingPoint + t.Logf("from forwarding package, pay_desc=%+v", addPayDescsFromFwdPkg[0]) + // t.Logf("from forwarding package, blinding_point=%x", + // blindingPointFromFwdPkg.SerializeCompressed()[:10]) + require.Nil(t, blindingPointFromFwdPkg, "unfortunately we expect payment "+ + "descriptors restored from LogUpdate in our forwarding packages to "+ + "nil blinding points after restart!") + // require.Equal(t, blindingPointBeforeRestart, blindingPointFromFwdPkg, "expect blinding_point to match") + + // TODO(11/27/22): write a ChannelLink level test which demonstrates ability to recover + // blinding point for the downstream peer from the LogUpdates stored on the CommitDiff! + +} + // TestChannelRetransmissionFeeUpdate tests that the initiator will include any // pending fee updates if it needs to retransmit signatures. func TestChannelRetransmissionFeeUpdate(t *testing.T) { @@ -10863,10 +10954,14 @@ func TestBlindingPointPersistence(t *testing.T) { // Now, Alice will send a new commitment to Bob, which will persist our // pending HTLC to disk. - _, err = aliceChannel.SignNextCommitment() + aliceCommit, err := aliceChannel.SignNextCommitment() require.NoError(t, err, "unable to sign commitment") // Restart alice to force fetching state from disk. + // + // NOTE(calvin): We restart immediately after sending SignNextCommitment + // and demonstrate that *alice* is able to recover the blinding point + // from disk. aliceChannel, err = restartChannel(aliceChannel) require.NoError(t, err, "unable to restart alice") @@ -10874,4 +10969,44 @@ func TestBlindingPointPersistence(t *testing.T) { remoteCommit := aliceChannel.remoteCommitChain.tip() require.Len(t, remoteCommit.outgoingHTLCs, 1) require.Equal(t, blinding, remoteCommit.outgoingHTLCs[0].BlindingPoint) + + // NOTE(calvin): Is Bob able to recover the blinding point from disk? + err = bobChannel.ReceiveNewCommitment(aliceCommit.CommitSigs) + require.NoError(t, err, "bob unable to receive new commitment") + + // We'll now force a restart for Bob, so we can test the + // persistence related portion of this assertion. + bobChannel, err = restartChannel(bobChannel) + require.NoError(t, err, "unable to restart bob's channel") + + // If we exchange channel sync messages from the get-go , then both + // sides should conclude that no further synchronization is needed. + assertNoChanSyncNeeded(t, aliceChannel, bobChannel) + + // Assert that Bob is NOT able to recover the blinding point from disk. + localCommit := bobChannel.localCommitChain.tip() + require.Len(t, localCommit.incomingHTLCs, 0) + require.Nil(t, localCommit.incomingHTLCs) + + // NOTE: We need to simulate Alice re-transmitting the HTLC as we + // restarted early enough in the "commit dance" that we persisted no + // record of having seen the HTLC. + _, err = bobChannel.ReceiveHTLC(htlc) + require.NoError(t, err) + + err = bobChannel.ReceiveNewCommitment(aliceCommit.CommitSigs) + require.NoError(t, err, "bob unable to receive new commitment") + + _, _, _, err = bobChannel.RevokeCurrentCommitment() + require.NoError(t, err, "bob unable to revoke current commitment") + + // We'll now force a restart for Bob, so we can test the + // persistence related portion of this assertion. + bobChannel, err = restartChannel(bobChannel) + require.NoError(t, err, "unable to restart bob's channel") + + // Assert that Bob is able to recover the blinding point from disk. + localCommit = bobChannel.localCommitChain.tip() + require.Len(t, localCommit.incomingHTLCs, 1) + require.Equal(t, blinding, localCommit.incomingHTLCs[0].BlindingPoint) } diff --git a/lnwallet/test_utils.go b/lnwallet/test_utils.go index e33d563c3f..e6b84a54e2 100644 --- a/lnwallet/test_utils.go +++ b/lnwallet/test_utils.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "encoding/binary" "encoding/hex" + "fmt" "io" prand "math/rand" "net" @@ -520,37 +521,45 @@ func ForceStateTransition(chanA, chanB *LightningChannel) error { if err != nil { return err } + fmt.Println("[ForceStateTransition]: alice signed new commitment for bob") err = chanB.ReceiveNewCommitment(aliceNewCommit.CommitSigs) if err != nil { return err } + fmt.Println("[ForceStateTransition]: bob received commitment") bobRevocation, _, _, err := chanB.RevokeCurrentCommitment() if err != nil { return err } + fmt.Println("[ForceStateTransition]: bob revoked his old commitment") bobNewCommit, err := chanB.SignNextCommitment() if err != nil { return err } + fmt.Println("[ForceStateTransition]: bob signed new commitment for alice") _, _, _, _, err = chanA.ReceiveRevocation(bobRevocation) if err != nil { return err } + fmt.Println("[ForceStateTransition]: alice received bob's revocation") err = chanA.ReceiveNewCommitment(bobNewCommit.CommitSigs) if err != nil { return err } + fmt.Println("[ForceStateTransition]: alice received commitment") aliceRevocation, _, _, err := chanA.RevokeCurrentCommitment() if err != nil { return err } + fmt.Println("[ForceStateTransition]: alice revoked her old commitment") _, _, _, _, err = chanB.ReceiveRevocation(aliceRevocation) if err != nil { return err } + fmt.Println("[ForceStateTransition]: bob received alice's revocation") return nil } diff --git a/record/blinded_data.go b/record/blinded_data.go index 3299515ebc..c48903f7ac 100644 --- a/record/blinded_data.go +++ b/record/blinded_data.go @@ -18,6 +18,10 @@ const ( // NextNodeType is a record type for the unblinded next node ID. NextNodeType tlv.Type = 4 + // PathIDType is a record type for an optional field recipients can use + // to verify that a blinded route is used in the proper context. + PathIDType tlv.Type = 6 + // NextBlindingOverrideType is a record type for the next blinding // override. NextBlindingOverrideType tlv.Type = 8 @@ -45,6 +49,10 @@ type BlindedRouteData struct { // NextNodeID is the unblinded node ID of the next hop. NextNodeID *btcec.PublicKey + // PathID is an optional field recipients can use to verify + // that a blinded route was used as they expect. + PathID []byte + // NextBlindingOverride is a blinding point that should be switched // in for the next hop. This is used to combine two blinded paths into // one (which primarily is used in onion messaging, but in theory @@ -81,6 +89,7 @@ func DecodeBlindedRouteData(r io.Reader) (*BlindedRouteData, error) { records := []tlv.Record{ tlv.MakePrimitiveRecord(ShortChannelIDType, &shortID), tlv.MakePrimitiveRecord(NextNodeType, &routeData.NextNodeID), + tlv.MakePrimitiveRecord(PathIDType, &routeData.PathID), tlv.MakePrimitiveRecord( NextBlindingOverrideType, &routeData.NextBlindingOverride, @@ -100,6 +109,12 @@ func DecodeBlindedRouteData(r io.Reader) (*BlindedRouteData, error) { return nil, err } + // If no path ID field was parsed, set the path ID field + // on the resulting payload to nil. + if _, ok := tlvMap[PathIDType]; !ok { + routeData.PathID = nil + } + if _, ok := tlvMap[PaymentRelayType]; !ok { routeData.RelayInfo = nil } @@ -139,6 +154,13 @@ func EncodeBlindedRouteData(data *BlindedRouteData) ([]byte, error) { records = append(records, nodeIDRecord) } + if data.PathID != nil { + pathID := tlv.MakePrimitiveRecord( + PathIDType, &data.PathID, + ) + records = append(records, pathID) + } + if data.NextBlindingOverride != nil { nextOverrideRecord := tlv.MakePrimitiveRecord( NextBlindingOverrideType,