Skip to content

Commit

Permalink
fix invoices desync
Browse files Browse the repository at this point in the history
  • Loading branch information
coddmeistr committed Dec 7, 2024
1 parent d85320b commit fc3b1f8
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions pkg/billing/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
"slices"
"strings"
"time"
)
Expand Down Expand Up @@ -145,6 +146,7 @@ func (s *BillingServiceServer) ProcessInvoiceStatusAction(log *zap.Logger, ctx c
abort()
return fmt.Errorf("failed to get invoice: %w", err)
}
old := proto.Clone(inv.Invoice).(*bpb.Invoice)

// No Nack error means that situation is not reversible, so we basically commit transaction with scary log if we got this error
// Otherwise we're reverting entire operation by aborting transaction
Expand All @@ -158,7 +160,6 @@ func (s *BillingServiceServer) ProcessInvoiceStatusAction(log *zap.Logger, ctx c
}
return fmt.Errorf("failed to execute postpaid actions: %w", err)
}
inv.Processed = time.Now().Unix()
}

if event.GetKey() == billing.InvoiceReturned {
Expand All @@ -172,14 +173,24 @@ func (s *BillingServiceServer) ProcessInvoiceStatusAction(log *zap.Logger, ctx c
}
}

if _, err = s.invoices.Update(ctx, inv); err != nil {
log.Error("Failed to update invoice after actions were applied", zap.Error(err))
// Patch body which should contain only transactions and processed date
patch := map[string]interface{}{}
if event.GetKey() == billing.InvoicePaid {
patch["processed"] = time.Now().Unix()
}
if !slices.Equal(old.GetTransactions(), inv.GetTransactions()) {
patch["transactions"] = inv.Transactions
}

if err = s.invoices.Patch(ctx, inv.GetUuid(), patch); err != nil {
log.Error("Failed to patch invoice after actions were applied", zap.Error(err))
if inv.GetType() == bpb.ActionType_INSTANCE_RENEWAL || inv.GetType() == bpb.ActionType_INSTANCE_START {
_ = commit()
return ps.NoNackErr(fmt.Errorf("failed to update invoice: %w", err))
log.Error("Actions weren't aborted. Invoice is broken")
return ps.NoNackErr(fmt.Errorf("failed to patch invoice: %w", err))
}
abort()
return fmt.Errorf("failed to update invoice: %w", err)
return fmt.Errorf("failed to patch invoice: %w", err)
}

_ = commit()
Expand Down Expand Up @@ -211,6 +222,7 @@ func (s *BillingServiceServer) ConsumeInvoiceStatusActions(log *zap.Logger, ctx
}
continue
}
log := log.With(zap.String("invoice", event.Uuid))
log.Debug("Pubsub event received", zap.String("key", event.Key), zap.String("type", event.Type), zap.String("routingKey", msg.RoutingKey))
if err = s.ProcessInvoiceStatusAction(log, ctx, &event); err != nil {
ps.HandleAckNack(log, msg, err)
Expand Down

0 comments on commit fc3b1f8

Please sign in to comment.