Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NEOS-1485: reduce workflow checkaccountstatus for paid accounts #2752

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
507 changes: 259 additions & 248 deletions backend/gen/go/protos/mgmt/v1alpha1/user_account.pb.go

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions backend/protos/mgmt/v1alpha1/user_account.proto
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ message IsAccountStatusValidResponse {
bool is_valid = 1;
// If the account is not valid, a reason for why may be provided.
optional string reason = 2;
// Whether or not the process should decide to continue polling for validitiy updates
bool should_poll = 3;
}

message GetAccountBillingCheckoutSessionRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

require.True(s.T(), resp.Msg.GetIsValid())
require.Empty(s.T(), resp.Msg.GetReason())
require.True(s.T(), resp.Msg.GetShouldPoll())
}

func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_NeosyncCloud_Personal_Overprovisioned() {
Expand All @@ -537,6 +538,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

require.False(s.T(), resp.Msg.GetIsValid())
require.NotEmpty(s.T(), resp.Msg.GetReason())
require.False(s.T(), resp.Msg.GetShouldPoll())
}

func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_NeosyncCloud_Personal_RequestedRecords() {
Expand Down Expand Up @@ -564,6 +566,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

require.False(s.T(), resp.Msg.GetIsValid())
require.NotEmpty(s.T(), resp.Msg.GetReason())
require.False(s.T(), resp.Msg.GetShouldPoll())
})
t.Run("under the limit", func(t *testing.T) {
resp, err := userclient.IsAccountStatusValid(s.ctx, connect.NewRequest(&mgmtv1alpha1.IsAccountStatusValidRequest{
Expand All @@ -574,6 +577,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

require.True(s.T(), resp.Msg.GetIsValid())
require.Empty(s.T(), resp.Msg.GetReason())
require.True(s.T(), resp.Msg.GetShouldPoll())
})
}

Expand All @@ -595,6 +599,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

assert.True(s.T(), resp.Msg.GetIsValid())
assert.Empty(s.T(), resp.Msg.GetReason())
require.False(s.T(), resp.Msg.GetShouldPoll())
})
t.Run("inactive", func(t *testing.T) {
custId := "cust_id2"
Expand All @@ -610,6 +615,7 @@ func (s *IntegrationTestSuite) Test_UserAccountService_IsAccountStatusValid_Neos

assert.False(s.T(), resp.Msg.GetIsValid())
assert.NotEmpty(s.T(), resp.Msg.GetReason())
require.False(s.T(), resp.Msg.GetShouldPoll())
})
}

Expand Down
26 changes: 13 additions & 13 deletions backend/services/mgmt/v1alpha1/user-account-service/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func (s *Service) IsAccountStatusValid(
if err != nil {
return nil, err
}

if !s.cfg.IsNeosyncCloud {
return connect.NewResponse(&mgmtv1alpha1.IsAccountStatusValidResponse{IsValid: true}), nil
}
Expand Down Expand Up @@ -176,22 +177,21 @@ func (s *Service) IsAccountStatusValid(
}), nil
}

if req.Msg.RequestedRecordCount == nil {
return connect.NewResponse(&mgmtv1alpha1.IsAccountStatusValidResponse{IsValid: true}), nil
}

requested := req.Msg.GetRequestedRecordCount()
totalUsed := currentUsed + requested
if totalUsed > allowed {
reason := fmt.Sprintf("Adding requested record count (%d) would exceed the allowed limit (%d). Current used: %d.", requested, allowed, currentUsed)
return connect.NewResponse(&mgmtv1alpha1.IsAccountStatusValidResponse{
IsValid: false,
Reason: &reason,
}), nil
if req.Msg.GetRequestedRecordCount() > 0 {
requested := req.Msg.GetRequestedRecordCount()
totalUsed := currentUsed + requested
if totalUsed > allowed {
reason := fmt.Sprintf("Adding requested record count (%d) would exceed the allowed limit (%d). Current used: %d.", requested, allowed, currentUsed)
return connect.NewResponse(&mgmtv1alpha1.IsAccountStatusValidResponse{
IsValid: false,
Reason: &reason,
}), nil
}
}

return connect.NewResponse(&mgmtv1alpha1.IsAccountStatusValidResponse{
IsValid: true,
IsValid: true,
ShouldPoll: true, // Is active free account that is currently under their usage limit
}), nil
}

Expand Down
12 changes: 12 additions & 0 deletions docs/protos/data/proto_docs.json
Original file line number Diff line number Diff line change
Expand Up @@ -14685,6 +14685,18 @@
"isoneof": true,
"oneofdecl": "_reason",
"defaultValue": ""
},
{
"name": "should_poll",
"description": "Whether or not the process should decide to continue polling for validitiy updates",
"label": "",
"type": "bool",
"longType": "bool",
"fullType": "bool",
"ismap": false,
"isoneof": false,
"oneofdecl": "",
"defaultValue": ""
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1872,6 +1872,13 @@ export class IsAccountStatusValidResponse extends Message<IsAccountStatusValidRe
*/
reason?: string;

/**
* Whether or not the process should decide to continue polling for validitiy updates
*
* @generated from field: bool should_poll = 3;
*/
shouldPoll = false;

constructor(data?: PartialMessage<IsAccountStatusValidResponse>) {
super();
proto3.util.initPartial(data, this);
Expand All @@ -1882,6 +1889,7 @@ export class IsAccountStatusValidResponse extends Message<IsAccountStatusValidRe
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "is_valid", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
{ no: 2, name: "reason", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true },
{ no: 3, name: "should_poll", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): IsAccountStatusValidResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type CheckAccountStatusRequest struct {
}

type CheckAccountStatusResponse struct {
IsValid bool
Reason *string
IsValid bool
Reason *string
ShouldPoll bool
}

func (a *Activity) CheckAccountStatus(
Expand Down Expand Up @@ -73,7 +74,7 @@ func (a *Activity) CheckAccountStatus(
"reason", withReasonOrDefault(resp.Msg.GetReason()),
)

return &CheckAccountStatusResponse{IsValid: resp.Msg.GetIsValid(), Reason: resp.Msg.Reason}, nil
return &CheckAccountStatusResponse{IsValid: resp.Msg.GetIsValid(), Reason: resp.Msg.Reason, ShouldPoll: resp.Msg.GetShouldPoll()}, nil
}

const defaultReason = "no reason provided"
Expand Down
104 changes: 53 additions & 51 deletions worker/pkg/workflows/datasync/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ func Workflow(wfctx workflow.Context, req *WorkflowRequest) (*WorkflowResponse,
if actOptResp.RequestedRecordCount != nil && *actOptResp.RequestedRecordCount > 0 {
logger.Info("requested record count of %d", *actOptResp.RequestedRecordCount)
}
var result *accountstatus_activity.CheckAccountStatusResponse
var initialCheckAccountStatusResponse *accountstatus_activity.CheckAccountStatusResponse
var a *accountstatus_activity.Activity
err = workflow.ExecuteActivity(
withCheckAccountStatusActivityOptions(ctx),
a.CheckAccountStatus,
&accountstatus_activity.CheckAccountStatusRequest{AccountId: actOptResp.AccountId, RequestedRecordCount: actOptResp.RequestedRecordCount}).
Get(ctx, &result)
Get(ctx, &initialCheckAccountStatusResponse)
if err != nil {
logger.Error("encountered error while checking account status", "error", err)
cancelHandler()
return nil, fmt.Errorf("unable to continue workflow due to error when checking account status: %w", err)
}
if !result.IsValid {
if !initialCheckAccountStatusResponse.IsValid {
logger.Warn("account is no longer is valid state")
cancelHandler()
reason := "no reason provided"
if result.Reason != nil {
reason = *result.Reason
if initialCheckAccountStatusResponse.Reason != nil {
reason = *initialCheckAccountStatusResponse.Reason
}
return nil, fmt.Errorf("halting job run due to account in invalid state. Reason: %q: %w", reason, invalidAccountStatusError)
}
Expand Down Expand Up @@ -139,57 +139,59 @@ func Workflow(wfctx workflow.Context, req *WorkflowRequest) (*WorkflowResponse,

// spawn account status checker in loop
stopChan := workflow.NewNamedChannel(ctx, "account-status")
accountStatusTimerDuration := getAccountStatusTimerDuration()
workflow.GoNamed(
ctx,
"account-status-check",
func(ctx workflow.Context) {
shouldStop := false
for {
selector := workflow.NewNamedSelector(ctx, "account-status-select")
timer := workflow.NewTimer(ctx, accountStatusTimerDuration)
selector.AddFuture(timer, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
logger.Error("time receive failed", "error", err)
if initialCheckAccountStatusResponse.ShouldPoll {
accountStatusTimerDuration := getAccountStatusTimerDuration()
workflow.GoNamed(
ctx,
"account-status-check",
func(ctx workflow.Context) {
shouldStop := false
for {
selector := workflow.NewNamedSelector(ctx, "account-status-select")
timer := workflow.NewTimer(ctx, accountStatusTimerDuration)
selector.AddFuture(timer, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
logger.Error("time receive failed", "error", err)
return
}

var result *accountstatus_activity.CheckAccountStatusResponse
var a *accountstatus_activity.Activity
err = workflow.ExecuteActivity(
withCheckAccountStatusActivityOptions(ctx),
a.CheckAccountStatus,
&accountstatus_activity.CheckAccountStatusRequest{AccountId: actOptResp.AccountId}).
Get(ctx, &result)
if err != nil {
logger.Error("encountered error while checking account status", "error", err)
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
return
}
if !result.IsValid {
logger.Warn("account is no longer is valid state")
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
return
}
})

selector.Select(ctx)

if shouldStop {
logger.Warn("exiting account status check")
return
}

var result *accountstatus_activity.CheckAccountStatusResponse
var a *accountstatus_activity.Activity
err = workflow.ExecuteActivity(
withCheckAccountStatusActivityOptions(ctx),
a.CheckAccountStatus,
&accountstatus_activity.CheckAccountStatusRequest{AccountId: actOptResp.AccountId}).
Get(ctx, &result)
if err != nil {
logger.Error("encountered error while checking account status", "error", err)
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
return
}
if !result.IsValid {
logger.Warn("account is no longer is valid state")
stopChan.Send(ctx, true)
shouldStop = true
cancelHandler()
if ctx.Err() != nil {
logger.Warn("workflow canceled due to error or stop signal", "error", ctx.Err())
return
}
})

selector.Select(ctx)

if shouldStop {
logger.Warn("exiting account status check")
return
}
if ctx.Err() != nil {
logger.Warn("workflow canceled due to error or stop signal", "error", ctx.Err())
return
}
}
})
})
}

workselector := workflow.NewSelector(ctx)
var activityErr error
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/workflows/datasync/workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func Test_Workflow_Halts_Activities_On_InvalidAccountStatus(t *testing.T) {

var accStatsActivity *accountstatus_activity.Activity
env.OnActivity(accStatsActivity.CheckAccountStatus, mock.Anything, mock.Anything).
Return(&accountstatus_activity.CheckAccountStatusResponse{IsValid: true}, nil).Once()
Return(&accountstatus_activity.CheckAccountStatusResponse{IsValid: true, ShouldPoll: true}, nil).Once()
env.OnActivity(accStatsActivity.CheckAccountStatus, mock.Anything, mock.Anything).
Return(&accountstatus_activity.CheckAccountStatusResponse{IsValid: false}, nil).Once()

Expand Down