diff --git a/lnclient/lnd/lnd.go b/lnclient/lnd/lnd.go index a1b5c2899..25ed86074 100644 --- a/lnclient/lnd/lnd.go +++ b/lnclient/lnd/lnd.go @@ -18,6 +18,7 @@ import ( decodepay "github.com/nbd-wtf/ln-decodepay" "google.golang.org/grpc/status" + "github.com/getAlby/hub/config" "github.com/getAlby/hub/events" "github.com/getAlby/hub/lnclient" "github.com/getAlby/hub/lnclient/lnd/wrapper" @@ -31,9 +32,10 @@ import ( ) type LNDService struct { - client *wrapper.LNDWrapper - nodeInfo *lnclient.NodeInfo - cancel context.CancelFunc + client *wrapper.LNDWrapper + nodeInfo *lnclient.NodeInfo + cancel context.CancelFunc + eventPublisher events.EventPublisher } // FIXME: this always returns limit * 2 transactions and offset is not used correctly @@ -442,128 +444,207 @@ func NewLNDService(ctx context.Context, eventPublisher events.EventPublisher, ln lndCtx, cancel := context.WithCancel(ctx) - lndService := &LNDService{client: lndClient, nodeInfo: nodeInfo, cancel: cancel} - - // Subscribe to payments - go func() { - for { - select { - case <-lndCtx.Done(): - return - default: - paymentStream, err := lndClient.SubscribePayments(lndCtx, &routerrpc.TrackPaymentsRequest{ - NoInflightUpdates: true, - }) + lndService := &LNDService{ + client: lndClient, + nodeInfo: nodeInfo, + cancel: cancel, + eventPublisher: eventPublisher, + } + + go lndService.subscribePayments(lndCtx) + go lndService.subscribeInvoices(lndCtx) + go lndService.subscribeChannelEvents(lndCtx) + + logger.Logger.Infof("Connected to LND - alias %s", nodeInfo.Alias) + + return lndService, nil +} + +func (svc *LNDService) subscribePayments(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + paymentStream, err := svc.client.SubscribePayments(ctx, &routerrpc.TrackPaymentsRequest{ + NoInflightUpdates: true, + }) + if err != nil { + logger.Logger.WithError(err).Error("Error subscribing to payments") + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Second): + continue + } + } + paymentsLoop: + for { + payment, err := paymentStream.Recv() if err != nil { - logger.Logger.WithError(err).Error("Error subscribing to payments") + logger.Logger.WithError(err).Error("Failed to receive payment") select { - case <-lndCtx.Done(): + case <-ctx.Done(): return - case <-time.After(10 * time.Second): - continue + case <-time.After(2 * time.Second): + break paymentsLoop } } - paymentsLoop: - for { - payment, err := paymentStream.Recv() + + switch payment.Status { + case lnrpc.Payment_FAILED: + logger.Logger.WithFields(logrus.Fields{ + "payment": payment, + }).Info("Received payment failed notification") + + transaction, err := lndPaymentToTransaction(payment) if err != nil { - logger.Logger.WithError(err).Error("Failed to receive payment") - select { - case <-lndCtx.Done(): - return - case <-time.After(2 * time.Second): - break paymentsLoop - } + continue } + svc.eventPublisher.Publish(&events.Event{ + Event: "nwc_lnclient_payment_failed", + Properties: &lnclient.PaymentFailedEventProperties{ + Transaction: transaction, + Reason: payment.FailureReason.String(), + }, + }) + case lnrpc.Payment_SUCCEEDED: + logger.Logger.WithFields(logrus.Fields{ + "payment": payment, + }).Info("Received payment sent notification") - switch payment.Status { - case lnrpc.Payment_FAILED: - logger.Logger.WithFields(logrus.Fields{ - "payment": payment, - }).Info("Received payment failed notification") - - transaction, err := lndPaymentToTransaction(payment) - if err != nil { - continue - } - eventPublisher.Publish(&events.Event{ - Event: "nwc_lnclient_payment_failed", - Properties: &lnclient.PaymentFailedEventProperties{ - Transaction: transaction, - Reason: payment.FailureReason.String(), - }, - }) - case lnrpc.Payment_SUCCEEDED: - logger.Logger.WithFields(logrus.Fields{ - "payment": payment, - }).Info("Received payment sent notification") - - transaction, err := lndPaymentToTransaction(payment) - if err != nil { - continue - } - eventPublisher.Publish(&events.Event{ - Event: "nwc_lnclient_payment_sent", - Properties: transaction, - }) - default: + transaction, err := lndPaymentToTransaction(payment) + if err != nil { continue } + svc.eventPublisher.Publish(&events.Event{ + Event: "nwc_lnclient_payment_sent", + Properties: transaction, + }) + default: + continue } } } - }() - - // Subscribe to invoices - go func() { - for { - select { - case <-lndCtx.Done(): - return - default: - invoiceStream, err := lndClient.SubscribeInvoices(lndCtx, &lnrpc.InvoiceSubscription{}) + } +} + +func (svc *LNDService) subscribeInvoices(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + invoiceStream, err := svc.client.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{}) + if err != nil { + logger.Logger.WithError(err).Error("Error subscribing to invoices") + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Second): + continue + } + } + invoicesLoop: + for { + invoice, err := invoiceStream.Recv() if err != nil { - logger.Logger.WithError(err).Error("Error subscribing to invoices") + logger.Logger.WithError(err).Error("Failed to receive invoice") select { - case <-lndCtx.Done(): + case <-ctx.Done(): return - case <-time.After(10 * time.Second): - continue + case <-time.After(2 * time.Second): + break invoicesLoop } } - invoicesLoop: - for { - invoice, err := invoiceStream.Recv() - if err != nil { - logger.Logger.WithError(err).Error("Failed to receive invoice") - select { - case <-lndCtx.Done(): - return - case <-time.After(2 * time.Second): - break invoicesLoop - } - } - if invoice.State != lnrpc.Invoice_SETTLED { - continue + if invoice.State != lnrpc.Invoice_SETTLED { + continue + } + + logger.Logger.WithFields(logrus.Fields{ + "invoice": invoice, + }).Info("Received new invoice") + + svc.eventPublisher.Publish(&events.Event{ + Event: "nwc_lnclient_payment_received", + Properties: lndInvoiceToTransaction(invoice), + }) + } + } + } +} + +func (svc *LNDService) subscribeChannelEvents(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + channelEvents, err := svc.client.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{}) + if err != nil { + logger.Logger.WithError(err).Error("Error subscribing to channel events") + select { + case <-ctx.Done(): + return + case <-time.After(10 * time.Second): + continue + } + } + channelEventsLoop: + for { + event, err := channelEvents.Recv() + if err != nil { + logger.Logger.WithError(err).Error("Failed to receive channel event") + select { + case <-ctx.Done(): + return + case <-time.After(2 * time.Second): + break channelEventsLoop } + } + switch update := event.Channel.(type) { + case *lnrpc.ChannelEventUpdate_OpenChannel: + channel := update.OpenChannel logger.Logger.WithFields(logrus.Fields{ - "invoice": invoice, - }).Info("Received new invoice") + "counterparty_node_id": channel.RemotePubkey, + "public": !channel.Private, + "capacity": channel.Capacity, + "is_outbound": channel.Initiator, + }).Info("Channel opened") + + svc.eventPublisher.Publish(&events.Event{ + Event: "nwc_channel_ready", + Properties: map[string]interface{}{ + "counterparty_node_id": channel.RemotePubkey, + "node_type": config.LNDBackendType, + "public": !channel.Private, + "capacity": channel.Capacity, + "is_outbound": channel.Initiator, + }, + }) + case *lnrpc.ChannelEventUpdate_ClosedChannel: + closureReason := update.ClosedChannel.CloseType.String() + counterpartyNodeId := update.ClosedChannel.RemotePubkey - eventPublisher.Publish(&events.Event{ - Event: "nwc_lnclient_payment_received", - Properties: lndInvoiceToTransaction(invoice), + logger.Logger.WithFields(logrus.Fields{ + "counterparty_node_id": counterpartyNodeId, + "reason": closureReason, + }).Info("Channel closed") + + svc.eventPublisher.Publish(&events.Event{ + Event: "nwc_channel_closed", + Properties: map[string]interface{}{ + "counterparty_node_id": counterpartyNodeId, + "reason": closureReason, + "node_type": config.LNDBackendType, + }, }) } } } - }() - - logger.Logger.Infof("Connected to LND - alias %s", nodeInfo.Alias) - - return lndService, nil + } } func (svc *LNDService) Shutdown() error { diff --git a/lnclient/lnd/wrapper/lnd.go b/lnclient/lnd/wrapper/lnd.go index 7b3e1bb6a..ac36c4eaf 100644 --- a/lnclient/lnd/wrapper/lnd.go +++ b/lnclient/lnd/wrapper/lnd.go @@ -218,3 +218,7 @@ func (wrapper *LNDWrapper) UpdateChannel(ctx context.Context, req *lnrpc.PolicyU func (wrapper *LNDWrapper) DisconnectPeer(ctx context.Context, req *lnrpc.DisconnectPeerRequest, options ...grpc.CallOption) (*lnrpc.DisconnectPeerResponse, error) { return wrapper.client.DisconnectPeer(ctx, req, options...) } + +func (wrapper *LNDWrapper) SubscribeChannelEvents(ctx context.Context, in *lnrpc.ChannelEventSubscription, options ...grpc.CallOption) (lnrpc.Lightning_SubscribeChannelEventsClient, error) { + return wrapper.client.SubscribeChannelEvents(ctx, in, options...) +}