Skip to content

Commit

Permalink
NEOS-1485: reduce workflow checkaccountstatus for paid accounts (#2752)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei authored Sep 27, 2024
1 parent 5be1926 commit 391e0ee
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 316 deletions.
507 changes: 259 additions & 248 deletions backend/gen/go/protos/mgmt/v1alpha1/user_account.pb.go

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions backend/protos/mgmt/v1alpha1/user_account.proto
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ message IsAccountStatusValidResponse {
bool is_valid = 1;
// If the account is not valid, a reason for why may be provided.
optional string reason = 2;
// Whether or not the process should decide to continue polling for validitiy updates
bool should_poll = 3;
}

message GetAccountBillingCheckoutSessionRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

require.True(s.T(), resp.Msg.GetIsValid())
require.Empty(s.T(), resp.Msg.GetReason())
require.True(s.T(), resp.Msg.GetShouldPoll())
}

func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_NeosyncCloud_Personal_Overprovisioned() {
Expand All @@ -537,6 +538,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

require.False(s.T(), resp.Msg.GetIsValid())
require.NotEmpty(s.T(), resp.Msg.GetReason())
require.False(s.T(), resp.Msg.GetShouldPoll())
}

func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_NeosyncCloud_Personal_RequestedRecords() {
Expand Down Expand Up @@ -564,6 +566,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

require.False(s.T(), resp.Msg.GetIsValid())
require.NotEmpty(s.T(), resp.Msg.GetReason())
require.False(s.T(), resp.Msg.GetShouldPoll())
})
t.Run("under the limit", func(t *testing.T) {
resp, err := userclient.IsAccountStatusValid(s.ctx, connect.NewRequest(&mgmtv1alpha1.IsAccountStatusValidRequest{
Expand All @@ -574,6 +577,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

require.True(s.T(), resp.Msg.GetIsValid())
require.Empty(s.T(), resp.Msg.GetReason())
require.True(s.T(), resp.Msg.GetShouldPoll())
})
}

Expand All @@ -595,6 +599,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

assert.True(s.T(), resp.Msg.GetIsValid())
assert.Empty(s.T(), resp.Msg.GetReason())
require.False(s.T(), resp.Msg.GetShouldPoll())
})
t.Run("inactive", func(t *testing.T) {
custId := "cust_id2"
Expand All @@ -610,6 +615,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

assert.False(s.T(), resp.Msg.GetIsValid())
assert.NotEmpty(s.T(), resp.Msg.GetReason())
require.False(s.T(), resp.Msg.GetShouldPoll())
})
}

Expand Down
26 changes: 13 additions & 13 deletions backend/services/mgmt/v1alpha1/user-account-service/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (s *Service) IsAccountStatusValid(
if err != nil {
return nil, err
}

if !s.cfg.IsNeosyncCloud {
return connect.NewResponse(&mgmtv1alpha1.IsAccountStatusValidResponse{IsValid: true}), nil
}
Expand Down Expand Up @@ -176,22 +177,21 @@ func (s *Service) IsAccountStatusValid(
}), nil
}

if req.Msg.RequestedRecordCount == nil {
return connect.NewResponse(&mgmtv1alpha1.IsAccountStatusValidResponse{IsValid: true}), nil
}

requested := req.Msg.GetRequestedRecordCount()
totalUsed := currentUsed + requested
if totalUsed > allowed {
reason := fmt.Sprintf("Adding requested record count (%d) would exceed the allowed limit (%d). Current used: %d.", requested, allowed, currentUsed)
return connect.NewResponse(&mgmtv1alpha1.IsAccountStatusValidResponse{
IsValid: false,
Reason: &reason,
}), nil
if req.Msg.GetRequestedRecordCount() > 0 {
requested := req.Msg.GetRequestedRecordCount()
totalUsed := currentUsed + requested
if totalUsed > allowed {
reason := fmt.Sprintf("Adding requested record count (%d) would exceed the allowed limit (%d). Current used: %d.", requested, allowed, currentUsed)
return connect.NewResponse(&mgmtv1alpha1.IsAccountStatusValidResponse{
IsValid: false,
Reason: &reason,
}), nil
}
}

return connect.NewResponse(&mgmtv1alpha1.IsAccountStatusValidResponse{
IsValid: true,
IsValid: true,
ShouldPoll: true, // Is active free account that is currently under their usage limit
}), nil
}

Expand Down
12 changes: 12 additions & 0 deletions docs/protos/data/proto_docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -14685,6 +14685,18 @@
"isoneof": true,
"oneofdecl": "_reason",
"defaultValue": ""
},
{
"name": "should_poll",
"description": "Whether or not the process should decide to continue polling for validitiy updates",
"label": "",
"type": "bool",
"longType": "bool",
"fullType": "bool",
"ismap": false,
"isoneof": false,
"oneofdecl": "",
"defaultValue": ""
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1872,6 +1872,13 @@ export class IsAccountStatusValidResponse extends Message<IsAccountStatusValidRe
*/
reason?: string;

/**
* Whether or not the process should decide to continue polling for validitiy updates
*
* @generated from field: bool should_poll = 3;
*/
shouldPoll = false;

constructor(data?: PartialMessage<IsAccountStatusValidResponse>) {
super();
proto3.util.initPartial(data, this);
Expand All @@ -1882,6 +1889,7 @@ export class IsAccountStatusValidResponse extends Message<IsAccountStatusValidRe
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "is_valid", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
{ no: 2, name: "reason", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true },
{ no: 3, name: "should_poll", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): IsAccountStatusValidResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type CheckAccountStatusRequest struct {
}

type CheckAccountStatusResponse struct {
IsValid bool
Reason *string
IsValid bool
Reason *string
ShouldPoll bool
}

func (a *Activity) CheckAccountStatus(
Expand Down Expand Up @@ -73,7 +74,7 @@ func (a *Activity) CheckAccountStatus(
"reason", withReasonOrDefault(resp.Msg.GetReason()),
)

return &CheckAccountStatusResponse{IsValid: resp.Msg.GetIsValid(), Reason: resp.Msg.Reason}, nil
return &CheckAccountStatusResponse{IsValid: resp.Msg.GetIsValid(), Reason: resp.Msg.Reason, ShouldPoll: resp.Msg.GetShouldPoll()}, nil
}

const defaultReason = "no reason provided"
Expand Down
104 changes: 53 additions & 51 deletions worker/pkg/workflows/datasync/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ func Workflow(wfctx workflow.Context, req *WorkflowRequest) (*WorkflowResponse,
if actOptResp.RequestedRecordCount != nil && *actOptResp.RequestedRecordCount > 0 {
logger.Info("requested record count of %d", *actOptResp.RequestedRecordCount)
}
var result *accountstatus_activity.CheckAccountStatusResponse
var initialCheckAccountStatusResponse *accountstatus_activity.CheckAccountStatusResponse
var a *accountstatus_activity.Activity
err = workflow.ExecuteActivity(
withCheckAccountStatusActivityOptions(ctx),
a.CheckAccountStatus,
&accountstatus_activity.CheckAccountStatusRequest{AccountId: actOptResp.AccountId, RequestedRecordCount: actOptResp.RequestedRecordCount}).
Get(ctx, &result)
Get(ctx, &initialCheckAccountStatusResponse)
if err != nil {
logger.Error("encountered error while checking account status", "error", err)
cancelHandler()
return nil, fmt.Errorf("unable to continue workflow due to error when checking account status: %w", err)
}
if !result.IsValid {
if !initialCheckAccountStatusResponse.IsValid {
logger.Warn("account is no longer is valid state")
cancelHandler()
reason := "no reason provided"
if result.Reason != nil {
reason = *result.Reason
if initialCheckAccountStatusResponse.Reason != nil {
reason = *initialCheckAccountStatusResponse.Reason
}
return nil, fmt.Errorf("halting job run due to account in invalid state. Reason: %q: %w", reason, invalidAccountStatusError)
}
Expand Down Expand Up @@ -139,57 +139,59 @@ func Workflow(wfctx workflow.Context, req *WorkflowRequest) (*WorkflowResponse,

// spawn account status checker in loop
stopChan := workflow.NewNamedChannel(ctx, "account-status")
accountStatusTimerDuration := getAccountStatusTimerDuration()
workflow.GoNamed(
ctx,
"account-status-check",
func(ctx workflow.Context) {
shouldStop := false
for {
selector := workflow.NewNamedSelector(ctx, "account-status-select")
timer := workflow.NewTimer(ctx, accountStatusTimerDuration)
selector.AddFuture(timer, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
logger.Error("time receive failed", "error", err)
if initialCheckAccountStatusResponse.ShouldPoll {
accountStatusTimerDuration := getAccountStatusTimerDuration()
workflow.GoNamed(
ctx,
"account-status-check",
func(ctx workflow.Context) {
shouldStop := false
for {
selector := workflow.NewNamedSelector(ctx, "account-status-select")
timer := workflow.NewTimer(ctx, accountStatusTimerDuration)
selector.AddFuture(timer, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
logger.Error("time receive failed", "error", err)
return
}

var result *accountstatus_activity.CheckAccountStatusResponse
var a *accountstatus_activity.Activity
err = workflow.ExecuteActivity(
withCheckAccountStatusActivityOptions(ctx),
a.CheckAccountStatus,
&accountstatus_activity.CheckAccountStatusRequest{AccountId: actOptResp.AccountId}).
Get(ctx, &result)
if err != nil {
logger.Error("encountered error while checking account status", "error", err)
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
return
}
if !result.IsValid {
logger.Warn("account is no longer is valid state")
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
return
}
})

selector.Select(ctx)

if shouldStop {
logger.Warn("exiting account status check")
return
}

var result *accountstatus_activity.CheckAccountStatusResponse
var a *accountstatus_activity.Activity
err = workflow.ExecuteActivity(
withCheckAccountStatusActivityOptions(ctx),
a.CheckAccountStatus,
&accountstatus_activity.CheckAccountStatusRequest{AccountId: actOptResp.AccountId}).
Get(ctx, &result)
if err != nil {
logger.Error("encountered error while checking account status", "error", err)
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
return
}
if !result.IsValid {
logger.Warn("account is no longer is valid state")
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
if ctx.Err() != nil {
logger.Warn("workflow canceled due to error or stop signal", "error", ctx.Err())
return
}
})

selector.Select(ctx)

if shouldStop {
logger.Warn("exiting account status check")
return
}
if ctx.Err() != nil {
logger.Warn("workflow canceled due to error or stop signal", "error", ctx.Err())
return
}
}
})
})
}

workselector := workflow.NewSelector(ctx)
var activityErr error
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/workflows/datasync/workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func Test_Workflow_Halts_Activities_On_InvalidAccountStatus(t *testing.T) {

var accStatsActivity *accountstatus_activity.Activity
env.OnActivity(accStatsActivity.CheckAccountStatus, mock.Anything, mock.Anything).
Return(&accountstatus_activity.CheckAccountStatusResponse{IsValid: true}, nil).Once()
Return(&accountstatus_activity.CheckAccountStatusResponse{IsValid: true, ShouldPoll: true}, nil).Once()
env.OnActivity(accStatsActivity.CheckAccountStatus, mock.Anything, mock.Anything).
Return(&accountstatus_activity.CheckAccountStatusResponse{IsValid: false}, nil).Once()

Expand Down

0 comments on commit 391e0ee

Please sign in to comment.