Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graph: extract cache from CRUD [3] #9550

Merged
merged 2 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ The underlying functionality between those two options remain the same.
- Move the graph cache out of the graph CRUD layer:
- [1](https://github.com/lightningnetwork/lnd/pull/9533)
- [2](https://github.com/lightningnetwork/lnd/pull/9545)
- [3](https://github.com/lightningnetwork/lnd/pull/9550)

* [Golang was updated to
`v1.22.11`](https://github.com/lightningnetwork/lnd/pull/9462).
Expand Down
168 changes: 168 additions & 0 deletions graph/db/graph.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package graphdb

import (
"sync"
"time"

"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
Expand All @@ -27,6 +29,10 @@ type Config struct {
// KVStore. Upcoming commits will move the graph cache out of the KVStore and
// into this layer so that the KVStore is only responsible for CRUD operations.
type ChannelGraph struct {
// cacheMu guards any writes to the graphCache. It should be held
// across the DB write call and the graphCache update to make the
// two updates as atomic as possible.
cacheMu sync.Mutex
graphCache *GraphCache

*KVStore
Expand Down Expand Up @@ -91,3 +97,165 @@ func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
graphCache: graphCache,
}, nil
}

// ForEachNodeDirectedChannel iterates through all channels of a given node,
// executing the passed callback on the directed edge representing the channel
// and its incoming policy. If the callback returns an error, then the iteration
// is halted with the error propagated back up to the caller. If the graphCache
// is available, then it will be used to retrieve the node's channels instead
// of the database.
//
// Unknown policies are passed into the callback as nil values.
//
// NOTE: this is part of the graphdb.NodeTraverser interface.
func (c *ChannelGraph) ForEachNodeDirectedChannel(node route.Vertex,
cb func(channel *DirectedChannel) error) error {

if c.graphCache != nil {
return c.graphCache.ForEachChannel(node, cb)
}

return c.KVStore.ForEachNodeDirectedChannel(node, cb)
}

// FetchNodeFeatures returns the features of the given node. If no features are
// known for the node, an empty feature vector is returned.
// If the graphCache is available, then it will be used to retrieve the node's
// features instead of the database.
//
// NOTE: this is part of the graphdb.NodeTraverser interface.
func (c *ChannelGraph) FetchNodeFeatures(node route.Vertex) (
*lnwire.FeatureVector, error) {

if c.graphCache != nil {
return c.graphCache.GetFeatures(node), nil
}

return c.KVStore.FetchNodeFeatures(node)
}

// GraphSession will provide the call-back with access to a NodeTraverser
// instance which can be used to perform queries against the channel graph. If
// the graph cache is not enabled, then the call-back will be provided with
// access to the graph via a consistent read-only transaction.
func (c *ChannelGraph) GraphSession(cb func(graph NodeTraverser) error) error {
if c.graphCache != nil {
return cb(c)
}

return c.KVStore.GraphSession(cb)
}

// ForEachNodeCached iterates through all the stored vertices/nodes in the
// graph, executing the passed callback with each node encountered.
//
// NOTE: The callback contents MUST not be modified.
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
chans map[uint64]*DirectedChannel) error) error {

if c.graphCache != nil {
return c.graphCache.ForEachNode(cb)
}

return c.KVStore.ForEachNodeCached(cb)
}

// AddLightningNode adds a vertex/node to the graph database. If the node is not
// in the database from before, this will add a new, unconnected one to the
// graph. If it is present from before, this will update that node's
// information. Note that this method is expected to only be called to update an
// already present node from a node announcement, or to insert a node found in a
// channel update.
func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
op ...batch.SchedulerOption) error {

c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := c.KVStore.AddLightningNode(node, op...)
if err != nil {
return err
}

if c.graphCache != nil {
c.graphCache.AddNodeFeatures(
node.PubKeyBytes, node.Features,
)
}

return nil
}

// DeleteLightningNode starts a new database transaction to remove a vertex/node
// from the database according to the node's public key.
func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := c.KVStore.DeleteLightningNode(nodePub)
if err != nil {
return err
}

if c.graphCache != nil {
c.graphCache.RemoveNode(nodePub)
}

return nil
}

// AddChannelEdge adds a new (undirected, blank) edge to the graph database. An
// undirected edge from the two target nodes are created. The information stored
// denotes the static attributes of the channel, such as the channelID, the keys
// involved in creation of the channel, and the set of features that the channel
// supports. The chanPoint and chanID are used to uniquely identify the edge
// globally within the database.
func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
op ...batch.SchedulerOption) error {

c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := c.KVStore.AddChannelEdge(edge, op...)
if err != nil {
return err
}

if c.graphCache != nil {
c.graphCache.AddChannel(edge, nil, nil)
}

return nil
}

// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
// If the cache is enabled, the edge will be added back to the graph cache if
// we still have a record of this channel in the DB.
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()

err := c.KVStore.MarkEdgeLive(chanID)
if err != nil {
return err
}

if c.graphCache != nil {
// We need to add the channel back into our graph cache,
// otherwise we won't use it for path finding.
infos, err := c.KVStore.FetchChanInfos([]uint64{chanID})
if err != nil {
return err
}

if len(infos) == 0 {
return nil
}

info := infos[0]

c.graphCache.AddChannel(info.Info, info.Policy1, info.Policy2)
}

return nil
}
2 changes: 1 addition & 1 deletion graph/db/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3953,7 +3953,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) {

getSingleChannel := func() *DirectedChannel {
var ch *DirectedChannel
err = graph.forEachNodeDirectedChannel(nil, node1.PubKeyBytes,
err = graph.ForEachNodeDirectedChannel(node1.PubKeyBytes,
func(c *DirectedChannel) error {
require.Nil(t, ch)
ch = c
Expand Down
64 changes: 5 additions & 59 deletions graph/db/kv_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,6 @@ func (c *KVStore) ForEachChannel(cb func(*models.ChannelEdgeInfo,
func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx,
node route.Vertex, cb func(channel *DirectedChannel) error) error {

if c.graphCache != nil {
return c.graphCache.ForEachChannel(node, cb)
}

// Fallback that uses the database.
toNodeCallback := func() route.Vertex {
return node
Expand Down Expand Up @@ -539,10 +535,6 @@ func (c *KVStore) forEachNodeDirectedChannel(tx kvdb.RTx,
func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx,
node route.Vertex) (*lnwire.FeatureVector, error) {

if c.graphCache != nil {
return c.graphCache.GetFeatures(node), nil
}

// Fallback that uses the database.
targetNode, err := c.FetchLightningNodeTx(tx, node)
switch {
Expand All @@ -564,9 +556,7 @@ func (c *KVStore) fetchNodeFeatures(tx kvdb.RTx,
// ForEachNodeDirectedChannel iterates through all channels of a given node,
// executing the passed callback on the directed edge representing the channel
// and its incoming policy. If the callback returns an error, then the iteration
// is halted with the error propagated back up to the caller. If the graphCache
// is available, then it will be used to retrieve the node's channels instead
// of the database.
// is halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
//
Expand All @@ -579,8 +569,6 @@ func (c *KVStore) ForEachNodeDirectedChannel(nodePub route.Vertex,

// FetchNodeFeatures returns the features of the given node. If no features are
// known for the node, an empty feature vector is returned.
// If the graphCache is available, then it will be used to retrieve the node's
// features instead of the database.
//
// NOTE: this is part of the graphdb.NodeTraverser interface.
func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) (
Expand All @@ -589,18 +577,13 @@ func (c *KVStore) FetchNodeFeatures(nodePub route.Vertex) (
return c.fetchNodeFeatures(nil, nodePub)
}

// ForEachNodeCached is similar to forEachNode, but it utilizes the channel
// graph cache instead. Note that this doesn't return all the information the
// regular forEachNode method does.
// ForEachNodeCached is similar to forEachNode, but it returns DirectedChannel
// data to the call-back.
//
// NOTE: The callback contents MUST not be modified.
func (c *KVStore) ForEachNodeCached(cb func(node route.Vertex,
chans map[uint64]*DirectedChannel) error) error {

if c.graphCache != nil {
return c.graphCache.ForEachNode(cb)
}

// Otherwise call back to a version that uses the database directly.
// We'll iterate over each node, then the set of channels for each
// node, and construct a similar callback functiopn signature as the
Expand Down Expand Up @@ -901,12 +884,6 @@ func (c *KVStore) AddLightningNode(node *models.LightningNode,

r := &batch.Request{
Update: func(tx kvdb.RwTx) error {
if c.graphCache != nil {
c.graphCache.AddNodeFeatures(
node.PubKeyBytes, node.Features,
)
}

return addLightningNode(tx, node)
},
}
Expand Down Expand Up @@ -986,10 +963,6 @@ func (c *KVStore) DeleteLightningNode(nodePub route.Vertex) error {
return ErrGraphNodeNotFound
}

if c.graphCache != nil {
c.graphCache.RemoveNode(nodePub)
}

return c.deleteLightningNode(nodes, nodePub[:])
}, func() {})
}
Expand Down Expand Up @@ -1121,10 +1094,6 @@ func (c *KVStore) addChannelEdge(tx kvdb.RwTx,
return ErrEdgeAlreadyExist
}

if c.graphCache != nil {
c.graphCache.AddChannel(edge, nil, nil)
}

// Before we insert the channel into the database, we'll ensure that
// both nodes already exist in the channel graph. If either node
// doesn't, then we'll insert a "shell" node that just includes its
Expand Down Expand Up @@ -3734,22 +3703,6 @@ func (c *KVStore) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
c.rejectCache.remove(chanID)
c.chanCache.remove(chanID)

// We need to add the channel back into our graph cache, otherwise we
// won't use it for path finding.
if c.graphCache != nil {
edgeInfos, err := c.fetchChanInfos(tx, []uint64{chanID})
if err != nil {
return err
}

for _, edgeInfo := range edgeInfos {
c.graphCache.AddChannel(
edgeInfo.Info, edgeInfo.Policy1,
edgeInfo.Policy2,
)
}
}

return nil
}

Expand Down Expand Up @@ -3883,14 +3836,8 @@ func (c *KVStore) IsClosedScid(scid lnwire.ShortChannelID) (bool, error) {
}

// GraphSession will provide the call-back with access to a NodeTraverser
// instance which can be used to perform queries against the channel graph. If
// the graph cache is not enabled, then the call-back will be provided with
// access to the graph via a consistent read-only transaction.
// instance which can be used to perform queries against the channel graph.
func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error {
if c.graphCache != nil {
return cb(&nodeTraverserSession{db: c})
}

return c.db.View(func(tx walletdb.ReadTx) error {
return cb(&nodeTraverserSession{
db: c,
Expand All @@ -3900,8 +3847,7 @@ func (c *KVStore) GraphSession(cb func(graph NodeTraverser) error) error {
}

// nodeTraverserSession implements the NodeTraverser interface but with a
// backing read only transaction for a consistent view of the graph in the case
// where the graph Cache has not been enabled.
// backing read only transaction for a consistent view of the graph.
type nodeTraverserSession struct {
tx kvdb.RTx
db *KVStore
Expand Down
Loading