Skip to content

Commit

Permalink
Merge pull request #1956 from slntopp/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
coddmeistr authored Dec 27, 2024
2 parents deb6bf3 + 8986d39 commit e06618e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 12 deletions.
3 changes: 3 additions & 0 deletions pkg/billing/cron_whmcs_invoices_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func (s *BillingServiceServer) WhmcsInvoicesSyncerCronJob(ctx context.Context, l
delCount++
}

log.Info("Finished WHMCS Invoices syncer cron job", zap.Int("deleted", delCount))
return

whmcsIdToInvoice := make(map[int]struct{})
for _, inv := range ncInvoices {
if inv.Meta == nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/instances/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ var methodsToSync = []string{
"manual_renew",
"free_renew",
"cancel_renew",
"vpn",
}

func (s *InstancesServer) Invoke(ctx context.Context, _req *connect.Request[pb.InvokeRequest]) (*connect.Response[pb.InvokeResponse], error) {
Expand Down
37 changes: 25 additions & 12 deletions pkg/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,8 +930,6 @@ func (s *ServicesServer) Stream(ctx context.Context, _req *connect.Request[pb.St
log.Debug("Request received", zap.Any("req", req))
requestor := ctx.Value(nocloud.NoCloudAccount).(string)

messages := make(chan interface{}, 10)

if service, err := s.ctrl.Get(ctx, requestor, req.GetUuid()); err != nil || service.GetAccess().GetLevel() < access.Level_READ {
log.Warn("Failed access check", zap.String("uuid", req.GetUuid()))
return errors.New("failed access check")
Expand All @@ -947,23 +945,38 @@ func (s *ServicesServer) Stream(ctx context.Context, _req *connect.Request[pb.St
for i, id := range uuids {
topics[i] = "instance/" + id
}

s.log.Debug("topics", zap.Any("topics", topics))

reconnections := 0
retry:
messages := make(chan interface{}, 10)
s.ps.AddSub(messages, topics...)
defer unsub(s.ps, messages)

for msg := range messages {
state := msg.(*spb.ObjectState)
log.Debug("state", zap.Any("state", state))
err := srv.Send(state)
if err != nil {
log.Warn("Unable to send message", zap.Error(err))
break
for {
select {
case <-ctx.Done():
log.Debug("Context is cancelled. Connection was probably closed by the client")
return nil
case msg, ok := <-messages:
if !ok {
if reconnections > 5 {
log.Error("Too many reconnections. Closing stream")
return fmt.Errorf("internal channel closed")
}
log.Warn("Messages pubsub channel is closed. Trying to resubscribe...")
reconnections++
goto retry
}
state := msg.(*spb.ObjectState)
log.Debug("state", zap.Any("state", state))
err := srv.Send(state)
if err != nil {
log.Warn("Unable to send message", zap.Error(err))
return nil
}
}
}

return nil
}

func unsub[T chan any](ps *pubsub.PubSub, ch chan any) {
Expand Down

0 comments on commit e06618e

Please sign in to comment.