From 44e512177dfccdb1e2ecc2a10edca463d4403f4d Mon Sep 17 00:00:00 2001 From: Yiling-J Date: Fri, 10 Jan 2025 09:53:28 +0800 Subject: [PATCH] Refactor temporal scheduler tests (#234) * Merge branch 'fix/payment_cron' into 'main' refactor saas cron workflow See merge request product/starhub/starhub-server!805 * Merge branch 'fix/payment_cron' into 'main' refactor API workflows See merge request product/starhub/starhub-server!806 --------- Co-authored-by: yiling.ji --- api/workflow/schedule_ce_test.go | 39 +++++++++++++++++++ api/workflow/{cron_worker.go => scheduler.go} | 0 .../{cron_worker_ce.go => scheduler_ce.go} | 7 ++-- api/workflow/workflow_ce_test.go | 20 +++------- api/workflow/workflow_test.go | 37 +++--------------- builder/temporal/test_scheduler.go | 33 ++++++++++++++++ 6 files changed, 86 insertions(+), 50 deletions(-) create mode 100644 api/workflow/schedule_ce_test.go rename api/workflow/{cron_worker.go => scheduler.go} (100%) rename api/workflow/{cron_worker_ce.go => scheduler_ce.go} (95%) create mode 100644 builder/temporal/test_scheduler.go diff --git a/api/workflow/schedule_ce_test.go b/api/workflow/schedule_ce_test.go new file mode 100644 index 00000000..ae1d9d4f --- /dev/null +++ b/api/workflow/schedule_ce_test.go @@ -0,0 +1,39 @@ +//go:build !saas + +package workflow_test + +import ( + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "opencsg.com/csghub-server/builder/multisync" + "opencsg.com/csghub-server/builder/store/database" +) + +func TestSchedule_CalcRecomScoreWorkflow(t *testing.T) { + tester, err := newWorkflowTester(t) + require.NoError(t, err) + + tester.mocks.recom.EXPECT().CalculateRecomScore(mock.Anything).Return() + tester.scheduler.Execute("calc-recom-score-schedule", tester.cronEnv) + require.True(t, tester.cronEnv.IsWorkflowCompleted()) + require.NoError(t, tester.cronEnv.GetWorkflowError()) +} + +func TestSchedule_SyncAsClient(t *testing.T) { + tester, err := newWorkflowTester(t) + require.NoError(t, err) + + tester.mocks.stores.SyncClientSettingMock().EXPECT().First(mock.Anything).Return( + &database.SyncClientSetting{Token: "tk"}, nil, + ) + tester.mocks.multisync.EXPECT().SyncAsClient( + mock.Anything, multisync.FromOpenCSG("", "tk"), + ).Return(nil) + + tester.scheduler.Execute("sync-as-client-schedule", tester.cronEnv) + require.True(t, tester.cronEnv.IsWorkflowCompleted()) + require.NoError(t, tester.cronEnv.GetWorkflowError()) + +} diff --git a/api/workflow/cron_worker.go b/api/workflow/scheduler.go similarity index 100% rename from api/workflow/cron_worker.go rename to api/workflow/scheduler.go diff --git a/api/workflow/cron_worker_ce.go b/api/workflow/scheduler_ce.go similarity index 95% rename from api/workflow/cron_worker_ce.go rename to api/workflow/scheduler_ce.go index d6cfce57..ede28952 100644 --- a/api/workflow/cron_worker_ce.go +++ b/api/workflow/scheduler_ce.go @@ -1,4 +1,4 @@ -//go:build !ee && !saas +//go:build !saas package workflow @@ -28,7 +28,7 @@ func RegisterCronJobs(config *config.Config, temporalClient temporal.Client) err ID: "sync-as-client-workflow", TaskQueue: CronJobQueueName, Workflow: SyncAsClientWorkflow, - Args: []interface{}{config}, + Args: []interface{}{}, }, }) if err != nil && err.Error() != AlreadyScheduledMessage { @@ -45,7 +45,7 @@ func RegisterCronJobs(config *config.Config, temporalClient temporal.Client) err ID: "calc-recom-score-workflow", TaskQueue: CronJobQueueName, Workflow: CalcRecomScoreWorkflow, - Args: []interface{}{config}, + Args: []interface{}{}, }, }) if err != nil && err.Error() != AlreadyScheduledMessage { @@ -59,6 +59,7 @@ func RegisterCronWorker(config *config.Config, temporalClient temporal.Client, a wfWorker := temporalClient.NewWorker(CronJobQueueName, worker.Options{}) wfWorker.RegisterActivity(activities) + wfWorker.RegisterWorkflow(SyncAsClientWorkflow) wfWorker.RegisterWorkflow(CalcRecomScoreWorkflow) diff --git a/api/workflow/workflow_ce_test.go b/api/workflow/workflow_ce_test.go index 82502281..f47d8d72 100644 --- a/api/workflow/workflow_ce_test.go +++ b/api/workflow/workflow_ce_test.go @@ -3,18 +3,16 @@ package workflow_test import ( - "context" "testing" "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "go.temporal.io/sdk/client" "go.temporal.io/sdk/testsuite" mock_git "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/git/gitserver" mock_temporal "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/builder/temporal" mock_component "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/component" mock_callback "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/component/callback" "opencsg.com/csghub-server/api/workflow" + "opencsg.com/csghub-server/builder/temporal" "opencsg.com/csghub-server/common/config" "opencsg.com/csghub-server/common/tests" ) @@ -22,8 +20,9 @@ import ( func newWorkflowTester(t *testing.T) (*workflowTester, error) { suite := testsuite.WorkflowTestSuite{} tester := &workflowTester{ - env: suite.NewTestWorkflowEnvironment(), - cronEnv: suite.NewTestWorkflowEnvironment(), + env: suite.NewTestWorkflowEnvironment(), + cronEnv: suite.NewTestWorkflowEnvironment(), + scheduler: temporal.NewTestScheduler(), } // Mock the dependencies @@ -47,16 +46,7 @@ func newWorkflowTester(t *testing.T) (*workflowTester, error) { mtc.EXPECT().NewWorker(workflow.CronJobQueueName, mock.Anything).Return(tester.cronEnv) mtc.EXPECT().Start().Return(nil) tester.mocks.temporal = mtc - msc := mock_temporal.NewMockScheduleClient(t) - mtc.EXPECT().GetScheduleClient().Return(msc) - msc.EXPECT().Create(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, so client.ScheduleOptions) (client.ScheduleHandle, error) { - require.Equal(t, "sync-as-client-schedule", so.ID) - return nil, nil - }).Once() - msc.EXPECT().Create(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, so client.ScheduleOptions) (client.ScheduleHandle, error) { - require.Equal(t, "calc-recom-score-schedule", so.ID) - return nil, nil - }).Once() + mtc.EXPECT().GetScheduleClient().Return(tester.scheduler) err := workflow.StartWorkflowDI( cfg, mcb, mr, mg, mm, tester.mocks.stores.SyncClientSettingMock(), mtc, diff --git a/api/workflow/workflow_test.go b/api/workflow/workflow_test.go index bb74ad61..3a1327c9 100644 --- a/api/workflow/workflow_test.go +++ b/api/workflow/workflow_test.go @@ -11,16 +11,16 @@ import ( mock_component "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/component" mock_callback "opencsg.com/csghub-server/_mocks/opencsg.com/csghub-server/component/callback" "opencsg.com/csghub-server/api/workflow" - "opencsg.com/csghub-server/builder/multisync" - "opencsg.com/csghub-server/builder/store/database" + "opencsg.com/csghub-server/builder/temporal" "opencsg.com/csghub-server/common/tests" "opencsg.com/csghub-server/common/types" ) type workflowTester struct { - env *testsuite.TestWorkflowEnvironment - cronEnv *testsuite.TestWorkflowEnvironment - mocks struct { + env *testsuite.TestWorkflowEnvironment + cronEnv *testsuite.TestWorkflowEnvironment + scheduler *temporal.TestScheduler + mocks struct { callback *mock_callback.MockGitCallbackComponent recom *mock_component.MockRecomComponent multisync *mock_component.MockMultiSyncComponent @@ -30,33 +30,6 @@ type workflowTester struct { } } -func TestWorkflow_CalcRecomScoreWorkflow(t *testing.T) { - tester, err := newWorkflowTester(t) - require.NoError(t, err) - - tester.mocks.recom.EXPECT().CalculateRecomScore(mock.Anything).Return() - tester.cronEnv.ExecuteWorkflow(workflow.CalcRecomScoreWorkflow) - require.True(t, tester.cronEnv.IsWorkflowCompleted()) - require.NoError(t, tester.cronEnv.GetWorkflowError()) -} - -func TestWorkflow_SyncAsClient(t *testing.T) { - tester, err := newWorkflowTester(t) - require.NoError(t, err) - - tester.mocks.stores.SyncClientSettingMock().EXPECT().First(mock.Anything).Return( - &database.SyncClientSetting{Token: "tk"}, nil, - ) - tester.mocks.multisync.EXPECT().SyncAsClient( - mock.Anything, multisync.FromOpenCSG("", "tk"), - ).Return(nil) - - tester.cronEnv.ExecuteWorkflow(workflow.SyncAsClientWorkflow) - require.True(t, tester.cronEnv.IsWorkflowCompleted()) - require.NoError(t, tester.cronEnv.GetWorkflowError()) - -} - func TestWorkflow_HandlePushWorkflow(t *testing.T) { tester, err := newWorkflowTester(t) require.NoError(t, err) diff --git a/builder/temporal/test_scheduler.go b/builder/temporal/test_scheduler.go new file mode 100644 index 00000000..a00ca158 --- /dev/null +++ b/builder/temporal/test_scheduler.go @@ -0,0 +1,33 @@ +package temporal + +import ( + "context" + "fmt" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/testsuite" +) + +type TestScheduler struct { + workflowOptions map[string]client.ScheduleOptions +} + +func NewTestScheduler() *TestScheduler { + return &TestScheduler{ + workflowOptions: map[string]client.ScheduleOptions{}, + } +} + +func (ts *TestScheduler) Create(ctx context.Context, options client.ScheduleOptions) (client.ScheduleHandle, error) { + ts.workflowOptions[options.ID] = options + return nil, nil +} + +func (ts *TestScheduler) Execute(id string, env *testsuite.TestWorkflowEnvironment) { + ops, ok := ts.workflowOptions[id] + if !ok { + panic(fmt.Sprintf("%s not found", id)) + } + act := ops.Action.(*client.ScheduleWorkflowAction) + env.ExecuteWorkflow(act.Workflow, act.Args...) +}