Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fire channel open and closure events in lnd #849

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 179 additions & 98 deletions lnclient/lnd/lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
decodepay "github.com/nbd-wtf/ln-decodepay"
"google.golang.org/grpc/status"

"github.com/getAlby/hub/config"
"github.com/getAlby/hub/events"
"github.com/getAlby/hub/lnclient"
"github.com/getAlby/hub/lnclient/lnd/wrapper"
Expand All @@ -31,9 +32,10 @@ import (
)

type LNDService struct {
client *wrapper.LNDWrapper
nodeInfo *lnclient.NodeInfo
cancel context.CancelFunc
client *wrapper.LNDWrapper
nodeInfo *lnclient.NodeInfo
cancel context.CancelFunc
eventPublisher events.EventPublisher
}

// FIXME: this always returns limit * 2 transactions and offset is not used correctly
Expand Down Expand Up @@ -442,128 +444,207 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln

lndCtx, cancel := context.WithCancel(ctx)

lndService := &LNDService{client: lndClient, nodeInfo: nodeInfo, cancel: cancel}

// Subscribe to payments
go func() {
for {
select {
case <-lndCtx.Done():
return
default:
paymentStream, err := lndClient.SubscribePayments(lndCtx, &routerrpc.TrackPaymentsRequest{
NoInflightUpdates: true,
})
lndService := &LNDService{
client: lndClient,
nodeInfo: nodeInfo,
cancel: cancel,
eventPublisher: eventPublisher,
}

go lndService.subscribePayments(lndCtx)
go lndService.subscribeInvoices(lndCtx)
go lndService.subscribeChannelEvents(lndCtx)

logger.Logger.Infof("Connected to LND - alias %s", nodeInfo.Alias)

return lndService, nil
}

func (svc *LNDService) subscribePayments(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
paymentStream, err := svc.client.SubscribePayments(ctx, &routerrpc.TrackPaymentsRequest{
NoInflightUpdates: true,
})
if err != nil {
logger.Logger.WithError(err).Error("Error subscribing to payments")
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Second):
continue
}
}
paymentsLoop:
for {
payment, err := paymentStream.Recv()
if err != nil {
logger.Logger.WithError(err).Error("Error subscribing to payments")
logger.Logger.WithError(err).Error("Failed to receive payment")
select {
case <-lndCtx.Done():
case <-ctx.Done():
return
case <-time.After(10 * time.Second):
continue
case <-time.After(2 * time.Second):
break paymentsLoop
}
}
paymentsLoop:
for {
payment, err := paymentStream.Recv()

switch payment.Status {
case lnrpc.Payment_FAILED:
logger.Logger.WithFields(logrus.Fields{
"payment": payment,
}).Info("Received payment failed notification")

transaction, err := lndPaymentToTransaction(payment)
if err != nil {
logger.Logger.WithError(err).Error("Failed to receive payment")
select {
case <-lndCtx.Done():
return
case <-time.After(2 * time.Second):
break paymentsLoop
}
continue
}
svc.eventPublisher.Publish(&events.Event{
Event: "nwc_lnclient_payment_failed",
Properties: &lnclient.PaymentFailedEventProperties{
Transaction: transaction,
Reason: payment.FailureReason.String(),
},
})
case lnrpc.Payment_SUCCEEDED:
logger.Logger.WithFields(logrus.Fields{
"payment": payment,
}).Info("Received payment sent notification")

switch payment.Status {
case lnrpc.Payment_FAILED:
logger.Logger.WithFields(logrus.Fields{
"payment": payment,
}).Info("Received payment failed notification")

transaction, err := lndPaymentToTransaction(payment)
if err != nil {
continue
}
eventPublisher.Publish(&events.Event{
Event: "nwc_lnclient_payment_failed",
Properties: &lnclient.PaymentFailedEventProperties{
Transaction: transaction,
Reason: payment.FailureReason.String(),
},
})
case lnrpc.Payment_SUCCEEDED:
logger.Logger.WithFields(logrus.Fields{
"payment": payment,
}).Info("Received payment sent notification")

transaction, err := lndPaymentToTransaction(payment)
if err != nil {
continue
}
eventPublisher.Publish(&events.Event{
Event: "nwc_lnclient_payment_sent",
Properties: transaction,
})
default:
transaction, err := lndPaymentToTransaction(payment)
if err != nil {
continue
}
svc.eventPublisher.Publish(&events.Event{
Event: "nwc_lnclient_payment_sent",
Properties: transaction,
})
default:
continue
}
}
}
}()

// Subscribe to invoices
go func() {
for {
select {
case <-lndCtx.Done():
return
default:
invoiceStream, err := lndClient.SubscribeInvoices(lndCtx, &lnrpc.InvoiceSubscription{})
}
}

func (svc *LNDService) subscribeInvoices(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
invoiceStream, err := svc.client.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{})
if err != nil {
logger.Logger.WithError(err).Error("Error subscribing to invoices")
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Second):
continue
}
}
invoicesLoop:
for {
invoice, err := invoiceStream.Recv()
if err != nil {
logger.Logger.WithError(err).Error("Error subscribing to invoices")
logger.Logger.WithError(err).Error("Failed to receive invoice")
select {
case <-lndCtx.Done():
case <-ctx.Done():
return
case <-time.After(10 * time.Second):
continue
case <-time.After(2 * time.Second):
break invoicesLoop
}
}
invoicesLoop:
for {
invoice, err := invoiceStream.Recv()
if err != nil {
logger.Logger.WithError(err).Error("Failed to receive invoice")
select {
case <-lndCtx.Done():
return
case <-time.After(2 * time.Second):
break invoicesLoop
}
}

if invoice.State != lnrpc.Invoice_SETTLED {
continue
if invoice.State != lnrpc.Invoice_SETTLED {
continue
}

logger.Logger.WithFields(logrus.Fields{
"invoice": invoice,
}).Info("Received new invoice")

svc.eventPublisher.Publish(&events.Event{
Event: "nwc_lnclient_payment_received",
Properties: lndInvoiceToTransaction(invoice),
})
}
}
}
}

func (svc *LNDService) subscribeChannelEvents(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
channelEvents, err := svc.client.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{})
if err != nil {
logger.Logger.WithError(err).Error("Error subscribing to channel events")
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Second):
continue
}
}
channelEventsLoop:
for {
event, err := channelEvents.Recv()
if err != nil {
logger.Logger.WithError(err).Error("Failed to receive channel event")
select {
case <-ctx.Done():
return
case <-time.After(2 * time.Second):
break channelEventsLoop
}
}

switch update := event.Channel.(type) {
case *lnrpc.ChannelEventUpdate_OpenChannel:
channel := update.OpenChannel
logger.Logger.WithFields(logrus.Fields{
"invoice": invoice,
}).Info("Received new invoice")
"counterparty_node_id": channel.RemotePubkey,
"public": !channel.Private,
"capacity": channel.Capacity,
"is_outbound": channel.Initiator,
}).Info("Channel opened")

svc.eventPublisher.Publish(&events.Event{
Event: "nwc_channel_ready",
Properties: map[string]interface{}{
"counterparty_node_id": channel.RemotePubkey,
"node_type": config.LNDBackendType,
"public": !channel.Private,
"capacity": channel.Capacity,
"is_outbound": channel.Initiator,
},
})
case *lnrpc.ChannelEventUpdate_ClosedChannel:
closureReason := update.ClosedChannel.CloseType.String()
counterpartyNodeId := update.ClosedChannel.RemotePubkey

eventPublisher.Publish(&events.Event{
Event: "nwc_lnclient_payment_received",
Properties: lndInvoiceToTransaction(invoice),
logger.Logger.WithFields(logrus.Fields{
"counterparty_node_id": counterpartyNodeId,
"reason": closureReason,
}).Info("Channel closed")

svc.eventPublisher.Publish(&events.Event{
Event: "nwc_channel_closed",
Properties: map[string]interface{}{
"counterparty_node_id": counterpartyNodeId,
"reason": closureReason,
"node_type": config.LNDBackendType,
},
})
}
}
}
}()

logger.Logger.Infof("Connected to LND - alias %s", nodeInfo.Alias)

return lndService, nil
}
}

func (svc *LNDService) Shutdown() error {
Expand Down
4 changes: 4 additions & 0 deletions lnclient/lnd/wrapper/lnd.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,7 @@ func (wrapper *LNDWrapper) UpdateChannel(ctx context.Context, req *lnrpc.PolicyU
func (wrapper *LNDWrapper) DisconnectPeer(ctx context.Context, req *lnrpc.DisconnectPeerRequest, options ...grpc.CallOption) (*lnrpc.DisconnectPeerResponse, error) {
return wrapper.client.DisconnectPeer(ctx, req, options...)
}

func (wrapper *LNDWrapper) SubscribeChannelEvents(ctx context.Context, in *lnrpc.ChannelEventSubscription, options ...grpc.CallOption) (lnrpc.Lightning_SubscribeChannelEventsClient, error) {
return wrapper.client.SubscribeChannelEvents(ctx, in, options...)
}
Loading