From 40c6c1e0433ad2fdeb72ae1a81d9f7235d4fb5b1 Mon Sep 17 00:00:00 2001 From: Yoshiki Fujikane Date: Tue, 7 Jan 2025 13:20:36 +0900 Subject: [PATCH] Use plugin registry instead of pluginClients and stageBasedPluingsMap in the controller, scheduler, and planner Signed-off-by: Yoshiki Fujikane --- pkg/app/pipedv1/cmd/piped/piped.go | 18 +- pkg/app/pipedv1/controller/controller.go | 36 +- pkg/app/pipedv1/controller/planner.go | 54 +- pkg/app/pipedv1/controller/planner_test.go | 614 ++++++++++++------- pkg/app/pipedv1/controller/scheduler.go | 14 +- pkg/app/pipedv1/controller/scheduler_test.go | 74 ++- 6 files changed, 516 insertions(+), 294 deletions(-) diff --git a/pkg/app/pipedv1/cmd/piped/piped.go b/pkg/app/pipedv1/cmd/piped/piped.go index 3270095681..ce2a5fcc10 100644 --- a/pkg/app/pipedv1/cmd/piped/piped.go +++ b/pkg/app/pipedv1/cmd/piped/piped.go @@ -61,6 +61,7 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/eventwatcher" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/notifier" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/statsreporter" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/trigger" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" @@ -348,7 +349,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { } // Make grpc clients to connect to plugins. - pluginClis := make([]pluginapi.PluginClient, 0, len(cfg.Plugins)) + plugins := make([]plugin.Plugin, 0, len(cfg.Plugins)) options := []rpcclient.DialOption{ rpcclient.WithBlock(), rpcclient.WithInsecure(), @@ -357,8 +358,19 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { cli, err := pluginapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(plg.Port)), options...) if err != nil { input.Logger.Error("failed to create client to connect plugin", zap.String("plugin", plg.Name), zap.Error(err)) + return err } - pluginClis = append(pluginClis, cli) + + plugins = append(plugins, plugin.Plugin{ + Name: plg.Name, + Cli: cli, + }) + } + + pluginRegistry, err := plugin.NewPluginRegistry(ctx, plugins) + if err != nil { + input.Logger.Error("failed to create plugin registry", zap.Error(err)) + return err } // Start running application live state reporter. @@ -383,7 +395,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) { c := controller.NewController( apiClient, gitClient, - pluginClis, + pluginRegistry, deploymentLister, commandLister, notifier, diff --git a/pkg/app/pipedv1/controller/controller.go b/pkg/app/pipedv1/controller/controller.go index 51f1ce7634..a16a7017b7 100644 --- a/pkg/app/pipedv1/controller/controller.go +++ b/pkg/app/pipedv1/controller/controller.go @@ -34,11 +34,10 @@ import ( "google.golang.org/grpc/status" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" "github.com/pipe-cd/pipecd/pkg/git" "github.com/pipe-cd/pipecd/pkg/model" - pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1" - "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" ) type apiClient interface { @@ -96,10 +95,9 @@ type controller struct { notifier notifier secretDecrypter secretDecrypter - // gRPC clients to communicate with plugins. - pluginClients []pluginapi.PluginClient - // Map from stage name to the plugin client. - stageBasedPluginsMap map[string]pluginapi.PluginClient + // The registry of all plugins. + pluginRegistry plugin.PluginRegistry + // Map from application ID to the planner // of a pending deployment of that application. planners map[string]*planner @@ -131,7 +129,7 @@ type controller struct { func NewController( apiClient apiClient, gitClient gitClient, - pluginClients []pluginapi.PluginClient, + pluginRegistry plugin.PluginRegistry, deploymentLister deploymentLister, commandLister commandLister, notifier notifier, @@ -144,7 +142,7 @@ func NewController( return &controller{ apiClient: apiClient, gitClient: gitClient, - pluginClients: pluginClients, + pluginRegistry: pluginRegistry, deploymentLister: deploymentLister, commandLister: commandLister, notifier: notifier, @@ -179,23 +177,6 @@ func (c *controller) Run(ctx context.Context) error { c.workspaceDir = dir c.logger.Info(fmt.Sprintf("workspace directory was configured to %s", c.workspaceDir)) - // Build the list of stages that can be handled by piped's plugins. - stagesBasedPluginsMap := make(map[string]pluginapi.PluginClient) - for _, plugin := range c.pluginClients { - resp, err := plugin.FetchDefinedStages(ctx, &deployment.FetchDefinedStagesRequest{}) - if err != nil { - return err - } - for _, stage := range resp.GetStages() { - if _, ok := stagesBasedPluginsMap[stage]; ok { - c.logger.Error("duplicated stage name", zap.String("stage", stage)) - return fmt.Errorf("duplicated stage name %s", stage) - } - stagesBasedPluginsMap[stage] = plugin - } - } - c.stageBasedPluginsMap = stagesBasedPluginsMap - ticker := time.NewTicker(c.syncInternal) defer ticker.Stop() c.logger.Info("start syncing planners and schedulers") @@ -448,8 +429,7 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) ( commitHash, configFilename, workingDir, - c.pluginClients, // FIXME: Find a way to ensure the plugins only related to deployment. - c.stageBasedPluginsMap, + c.pluginRegistry, c.apiClient, c.gitClient, c.notifier, @@ -590,7 +570,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment) workingDir, c.apiClient, c.gitClient, - c.stageBasedPluginsMap, + c.pluginRegistry, c.notifier, c.secretDecrypter, c.logger, diff --git a/pkg/app/pipedv1/controller/planner.go b/pkg/app/pipedv1/controller/planner.go index ce5140f88f..ad26ea871d 100644 --- a/pkg/app/pipedv1/controller/planner.go +++ b/pkg/app/pipedv1/controller/planner.go @@ -31,6 +31,7 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" config "github.com/pipe-cd/pipecd/pkg/configv1" "github.com/pipe-cd/pipecd/pkg/model" @@ -57,13 +58,6 @@ type planner struct { lastSuccessfulConfigFilename string workingDir string - // The plugin clients are used to call plugin that actually - // performs planning deployment. - plugins []pluginapi.PluginClient - // The map used to know which plugin is incharged for a given stage - // of the current deployment. - stageBasedPluginsMap map[string]pluginapi.PluginClient - // The apiClient is used to report the deployment status. apiClient apiClient @@ -79,6 +73,9 @@ type planner struct { // which encrypted using PipeCD built-in secret management. secretDecrypter secretDecrypter + // The pluginRegistry is used to determine which plugins to be used + pluginRegistry plugin.PluginRegistry + logger *zap.Logger tracer trace.Tracer @@ -96,8 +93,7 @@ func newPlanner( lastSuccessfulCommitHash string, lastSuccessfulConfigFilename string, workingDir string, - pluginClients []pluginapi.PluginClient, - stageBasedPluginsMap map[string]pluginapi.PluginClient, + pluginRegistry plugin.PluginRegistry, apiClient apiClient, gitClient gitClient, notifier notifier, @@ -119,8 +115,7 @@ func newPlanner( lastSuccessfulCommitHash: lastSuccessfulCommitHash, lastSuccessfulConfigFilename: lastSuccessfulConfigFilename, workingDir: workingDir, - stageBasedPluginsMap: stageBasedPluginsMap, - plugins: pluginClients, + pluginRegistry: pluginRegistry, apiClient: apiClient, gitClient: gitClient, metadataStore: metadatastore.NewMetadataStore(apiClient, d), @@ -266,8 +261,21 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment TargetDeploymentSource: targetDS, } + cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](targetDS.GetApplicationConfig()) + if err != nil { + p.logger.Error("unable to parse application config", zap.Error(err)) + return nil, err + } + spec := cfg.Spec + + plugins, err := p.pluginRegistry.GetPluginClientsByAppConfig(spec) + if err != nil { + p.logger.Error("unable to get plugin clients by app config", zap.Error(err)) + return nil, err + } + // Build deployment target versions. - for _, plg := range p.plugins { + for _, plg := range plugins { vRes, err := plg.DetermineVersions(ctx, &deployment.DetermineVersionsRequest{Input: input}) if err != nil { p.logger.Warn("unable to determine versions", zap.Error(err)) @@ -284,13 +292,6 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment } } - cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](targetDS.GetApplicationConfig()) - if err != nil { - p.logger.Error("unable to parse application config", zap.Error(err)) - return nil, err - } - spec := cfg.Spec - // In case the strategy has been decided by trigger. // For example: user triggered the deployment via web console. switch p.deployment.Trigger.SyncStrategy { @@ -375,7 +376,7 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment summary string ) // Build plan based on plugins determined strategy - for _, plg := range p.plugins { + for _, plg := range plugins { res, err := plg.DetermineStrategy(ctx, &deployment.DetermineStrategyRequest{Input: input}) if err != nil { p.logger.Warn("Unable to determine strategy using current plugin", zap.Error(err)) @@ -421,7 +422,12 @@ func (p *planner) buildQuickSyncStages(ctx context.Context, cfg *config.GenericA rollbackStages = []*model.PipelineStage{} rollback = *cfg.Planner.AutoRollback ) - for _, plg := range p.plugins { + + plugins, err := p.pluginRegistry.GetPluginClientsByAppConfig(cfg) + if err != nil { + return nil, err + } + for _, plg := range plugins { res, err := plg.BuildQuickSyncStages(ctx, &deployment.BuildQuickSyncStagesRequest{Rollback: rollback}) if err != nil { return nil, fmt.Errorf("failed to build quick sync stage deployment (%w)", err) @@ -462,9 +468,9 @@ func (p *planner) buildPipelineSyncStages(ctx context.Context, cfg *config.Gener // Build stages config for each plugin. for i := range stagesCfg { stageCfg := stagesCfg[i] - plg, ok := p.stageBasedPluginsMap[stageCfg.Name.String()] - if !ok { - return nil, fmt.Errorf("unable to find plugin for stage %q", stageCfg.Name.String()) + plg, err := p.pluginRegistry.GetPluginClientByStageName(stageCfg.Name.String()) + if err != nil { + return nil, err } stagesCfgPerPlugin[plg] = append(stagesCfgPerPlugin[plg], &deployment.BuildPipelineSyncStagesRequest_StageConfig{ diff --git a/pkg/app/pipedv1/controller/planner_test.go b/pkg/app/pipedv1/controller/planner_test.go index 9fb2e97eb6..86720d9273 100644 --- a/pkg/app/pipedv1/controller/planner_test.go +++ b/pkg/app/pipedv1/controller/planner_test.go @@ -25,6 +25,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" config "github.com/pipe-cd/pipecd/pkg/configv1" "github.com/pipe-cd/pipecd/pkg/model" pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1" @@ -37,6 +38,7 @@ type fakePlugin struct { quickStages []*model.PipelineStage pipelineStages []*model.PipelineStage rollbackStages []*model.PipelineStage + stageStatusMap map[string]model.StageStatus } func (p *fakePlugin) Close() error { return nil } @@ -97,7 +99,18 @@ func (p *fakePlugin) FetchDefinedStages(ctx context.Context, req *deployment.Fet Stages: stages, }, nil } +func (p *fakePlugin) ExecuteStage(ctx context.Context, req *deployment.ExecuteStageRequest, opts ...grpc.CallOption) (*deployment.ExecuteStageResponse, error) { + status, ok := p.stageStatusMap[req.Input.Stage.Id] + if !ok { + return &deployment.ExecuteStageResponse{ + Status: model.StageStatus_STAGE_NOT_STARTED_YET, + }, nil + } + return &deployment.ExecuteStageResponse{ + Status: status, + }, nil +} func pointerBool(b bool) *bool { return &b } @@ -107,139 +120,190 @@ func TestBuildQuickSyncStages(t *testing.T) { testcases := []struct { name string - plugins []pluginapi.PluginClient + pluginRegistry plugin.PluginRegistry cfg *config.GenericApplicationSpec wantErr bool expectedStages []*model.PipelineStage }{ { name: "only one plugin", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Rollback: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), }, + Plugins: []string{"plugin-1"}, }, wantErr: false, expectedStages: []*model.PipelineStage{ { - Id: "plugin-1-stage-1", + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", }, { Id: "plugin-1-rollback", + Name: "plugin-1-rollback", Rollback: true, }, }, }, { name: "multi plugins", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Rollback: true, - }, - }, - }, - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-2-stage-1", + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-2-rollback", - Rollback: true, + { + Name: "plugin-2", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-2-stage-1", + Name: "plugin-2-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-2-rollback", + Name: "plugin-2-rollback", + Rollback: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), }, + Plugins: []string{"plugin-1", "plugin-2"}, }, wantErr: false, expectedStages: []*model.PipelineStage{ { - Id: "plugin-1-stage-1", + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", }, { - Id: "plugin-2-stage-1", + Id: "plugin-2-stage-1", + Name: "plugin-2-stage-1", }, { Id: "plugin-1-rollback", + Name: "plugin-1-rollback", Rollback: true, }, { Id: "plugin-2-rollback", + Name: "plugin-2-rollback", Rollback: true, }, }, }, { name: "multi plugins - no rollback", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Rollback: true, - }, - }, - }, - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-2-stage-1", + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-2-rollback", - Rollback: true, + { + Name: "plugin-2", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-2-stage-1", + Name: "plugin-2-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-2-rollback", + Name: "plugin-2-rollback", + Rollback: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(false), }, + Plugins: []string{"plugin-1", "plugin-2"}, }, wantErr: false, expectedStages: []*model.PipelineStage{ { - Id: "plugin-1-stage-1", + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", }, { - Id: "plugin-2-stage-1", + Id: "plugin-2-stage-1", + Name: "plugin-2-stage-1", }, }, }, @@ -248,7 +312,7 @@ func TestBuildQuickSyncStages(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { planner := &planner{ - plugins: tc.plugins, + pluginRegistry: tc.pluginRegistry, } stages, err := planner.buildQuickSyncStages(context.TODO(), tc.cfg) require.Equal(t, tc.wantErr, err != nil) @@ -262,37 +326,45 @@ func TestBuildPipelineSyncStages(t *testing.T) { testcases := []struct { name string - plugins []pluginapi.PluginClient + pluginRegistry plugin.PluginRegistry cfg *config.GenericApplicationSpec wantErr bool expectedStages []*model.PipelineStage }{ { name: "only one plugin", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Index: 0, - Name: "plugin-1-stage-1", - }, - { - Id: "plugin-1-stage-2", - Index: 1, - Name: "plugin-1-stage-2", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Index: 0, - Name: "plugin-1-rollback", - Rollback: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + }, + { + Id: "plugin-1-stage-2", + Index: 1, + Name: "plugin-1-stage-2", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Index: 0, + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -333,44 +405,60 @@ func TestBuildPipelineSyncStages(t *testing.T) { }, { name: "multi plugins single rollback", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Name: "plugin-1-stage-1", - }, - { - Id: "plugin-1-stage-2", - Name: "plugin-1-stage-2", - }, - { - Id: "plugin-1-stage-3", - Name: "plugin-1-stage-3", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Index: 0, - Name: "plugin-1-rollback", - Rollback: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + }, + { + Id: "plugin-1-stage-2", + Index: 1, + Name: "plugin-1-stage-2", + }, + { + Id: "plugin-1-stage-3", + Index: 2, + Name: "plugin-1-stage-3", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Index: 0, + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - }, - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-2-stage-1", - Name: "plugin-2-stage-1", - }, - { - Id: "plugin-2-stage-2", - Name: "plugin-2-stage-2", + { + Name: "plugin-2", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-2-stage-1", + Index: 0, + Name: "plugin-2-stage-1", + }, + { + Id: "plugin-2-stage-2", + Index: 1, + Name: "plugin-2-stage-2", + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -441,48 +529,63 @@ func TestBuildPipelineSyncStages(t *testing.T) { }, { name: "multi plugins multi rollback", - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Name: "plugin-1-stage-1", - }, - { - Id: "plugin-1-stage-2", - Name: "plugin-1-stage-2", - }, - { - Id: "plugin-1-stage-3", - Name: "plugin-1-stage-3", - }, - }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-1-rollback", - Index: 0, - Name: "plugin-1-rollback", - Rollback: true, - }, - }, - }, - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-2-stage-1", - Name: "plugin-2-stage-1", + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + }, + { + Id: "plugin-1-stage-2", + Index: 1, + Name: "plugin-1-stage-2", + }, + { + Id: "plugin-1-stage-3", + Index: 2, + Name: "plugin-1-stage-3", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-1-rollback", + Index: 0, + Name: "plugin-1-rollback", + Rollback: true, + }, + }, }, }, - rollbackStages: []*model.PipelineStage{ - { - Id: "plugin-2-rollback", - Index: 2, - Name: "plugin-2-rollback", - Rollback: true, + { + Name: "plugin-2", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-2-stage-1", + Index: 0, + Name: "plugin-2-stage-1", + }, + }, + rollbackStages: []*model.PipelineStage{ + { + Id: "plugin-2-rollback", + Index: 2, + Name: "plugin-2-rollback", + Rollback: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -551,16 +654,8 @@ func TestBuildPipelineSyncStages(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - stageBasedPluginMap := make(map[string]pluginapi.PluginClient) - for _, p := range tc.plugins { - stages, _ := p.FetchDefinedStages(context.TODO(), &deployment.FetchDefinedStagesRequest{}) - for _, s := range stages.Stages { - stageBasedPluginMap[s] = p - } - } planner := &planner{ - plugins: tc.plugins, - stageBasedPluginsMap: stageBasedPluginMap, + pluginRegistry: tc.pluginRegistry, } stages, err := planner.buildPipelineSyncStages(context.TODO(), tc.cfg) require.Equal(t, tc.wantErr, err != nil) @@ -576,6 +671,7 @@ func TestPlanner_BuildPlan(t *testing.T) { name string isFirstDeploy bool plugins []pluginapi.PluginClient + pluginRegistry plugin.PluginRegistry cfg *config.GenericApplicationSpec deployment *model.Deployment wantErr bool @@ -584,20 +680,30 @@ func TestPlanner_BuildPlan(t *testing.T) { { name: "quick sync strategy triggered by web console", isFirstDeploy: false, - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Visible: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + Visible: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), }, + Plugins: []string{"plugin-1"}, }, deployment: &model.Deployment{ Trigger: &model.DeploymentTrigger{ @@ -612,6 +718,7 @@ func TestPlanner_BuildPlan(t *testing.T) { Stages: []*model.PipelineStage{ { Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", Visible: true, }, }, @@ -626,17 +733,26 @@ func TestPlanner_BuildPlan(t *testing.T) { { name: "pipeline sync strategy triggered by web console", isFirstDeploy: false, - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Name: "plugin-1-stage-1", - Visible: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + Visible: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -679,20 +795,30 @@ func TestPlanner_BuildPlan(t *testing.T) { { name: "quick sync due to no pipeline configured", isFirstDeploy: false, - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Visible: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + Visible: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), }, + Plugins: []string{"plugin-1"}, }, deployment: &model.Deployment{ Trigger: &model.DeploymentTrigger{}, @@ -704,6 +830,7 @@ func TestPlanner_BuildPlan(t *testing.T) { Stages: []*model.PipelineStage{ { Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", Visible: true, }, }, @@ -718,17 +845,26 @@ func TestPlanner_BuildPlan(t *testing.T) { { name: "pipeline sync due to alwaysUsePipeline", isFirstDeploy: false, - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - pipelineStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Name: "plugin-1-stage-1", - Visible: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + Visible: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AlwaysUsePipeline: true, @@ -769,16 +905,25 @@ func TestPlanner_BuildPlan(t *testing.T) { { name: "quick sync due to first deployment", isFirstDeploy: true, - plugins: []pluginapi.PluginClient{ - &fakePlugin{ - quickStages: []*model.PipelineStage{ - { - Id: "plugin-1-stage-1", - Visible: true, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", + Visible: true, + }, + }, }, }, - }, - }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -802,6 +947,7 @@ func TestPlanner_BuildPlan(t *testing.T) { Stages: []*model.PipelineStage{ { Id: "plugin-1-stage-1", + Name: "plugin-1-stage-1", Visible: true, }, }, @@ -837,6 +983,36 @@ func TestPlanner_BuildPlan(t *testing.T) { }, }, }, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "plugin-1", + Cli: &fakePlugin{ + syncStrategy: &deployment.DetermineStrategyResponse{ + SyncStrategy: model.SyncStrategy_PIPELINE, + Summary: "determined by plugin", + }, + pipelineStages: []*model.PipelineStage{ + { + Id: "plugin-1-stage-1", + Index: 0, + Name: "plugin-1-stage-1", + Visible: true, + }, + }, + quickStages: []*model.PipelineStage{ + { + Id: "plugin-1-quick-stage-1", + Visible: true, + }, + }, + }, + }, + }) + require.NoError(t, err) + + return pr + }(), cfg: &config.GenericApplicationSpec{ Planner: config.DeploymentPlanner{ AutoRollback: pointerBool(true), @@ -877,16 +1053,8 @@ func TestPlanner_BuildPlan(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - stageBasedPluginMap := make(map[string]pluginapi.PluginClient) - for _, p := range tc.plugins { - stages, _ := p.FetchDefinedStages(context.TODO(), &deployment.FetchDefinedStagesRequest{}) - for _, s := range stages.Stages { - stageBasedPluginMap[s] = p - } - } planner := &planner{ - plugins: tc.plugins, - stageBasedPluginsMap: stageBasedPluginMap, + pluginRegistry: tc.pluginRegistry, deployment: tc.deployment, lastSuccessfulCommitHash: "", lastSuccessfulConfigFilename: "", diff --git a/pkg/app/pipedv1/controller/scheduler.go b/pkg/app/pipedv1/controller/scheduler.go index 5b3bf4e636..efd9a9fcb6 100644 --- a/pkg/app/pipedv1/controller/scheduler.go +++ b/pkg/app/pipedv1/controller/scheduler.go @@ -31,10 +31,10 @@ import ( "github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" config "github.com/pipe-cd/pipecd/pkg/configv1" "github.com/pipe-cd/pipecd/pkg/model" - pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1" "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment" ) @@ -43,7 +43,7 @@ type scheduler struct { deployment *model.Deployment workingDir string - stageBasedPluginsMap map[string]pluginapi.PluginClient + pluginRegistry plugin.PluginRegistry apiClient apiClient gitClient gitClient @@ -79,7 +79,7 @@ func newScheduler( workingDir string, apiClient apiClient, gitClient gitClient, - stageBasedPluginsMap map[string]pluginapi.PluginClient, + pluginRegistry plugin.PluginRegistry, notifier notifier, secretsDecrypter secretDecrypter, logger *zap.Logger, @@ -96,9 +96,9 @@ func newScheduler( s := &scheduler{ deployment: d, workingDir: workingDir, - stageBasedPluginsMap: stageBasedPluginsMap, apiClient: apiClient, gitClient: gitClient, + pluginRegistry: pluginRegistry, metadataStore: metadatastore.NewMetadataStore(apiClient, d), notifier: notifier, secretDecrypter: secretsDecrypter, @@ -513,9 +513,9 @@ func (s *scheduler) executeStage(sig StopSignal, ps *model.PipelineStage) (final } // Find the executor plugin for this stage. - plugin, ok := s.stageBasedPluginsMap[ps.Name] - if !ok { - s.logger.Error("failed to find the plugin for the stage", zap.String("stage-name", ps.Name)) + plugin, err := s.pluginRegistry.GetPluginClientByStageName(ps.Name) + if err != nil { + s.logger.Error("failed to find the plugin for the stage", zap.String("stage-name", ps.Name), zap.Error(err)) s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires) return model.StageStatus_STAGE_FAILURE } diff --git a/pkg/app/pipedv1/controller/scheduler_test.go b/pkg/app/pipedv1/controller/scheduler_test.go index b73bd92f7e..5d8fed4acf 100644 --- a/pkg/app/pipedv1/controller/scheduler_test.go +++ b/pkg/app/pipedv1/controller/scheduler_test.go @@ -21,10 +21,12 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" "google.golang.org/grpc" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" config "github.com/pipe-cd/pipecd/pkg/configv1" "github.com/pipe-cd/pipecd/pkg/model" @@ -215,9 +217,27 @@ func TestExecuteStage(t *testing.T) { apiClient: &fakeAPIClient{}, targetDSP: &fakeDeploySourceProvider{}, runningDSP: &fakeDeploySourceProvider{}, - stageBasedPluginsMap: map[string]pluginapi.PluginClient{ - "stage-name": &fakeExecutorPluginClient{}, - }, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "stage-name", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "stage-id", + Name: "stage-name", + }, + }, + stageStatusMap: map[string]model.StageStatus{ + "stage-id": model.StageStatus_STAGE_SUCCESS, + }, + }, + }, + }) + require.NoError(t, err) + + return pr + }(), genericApplicationConfig: &config.GenericApplicationSpec{ Pipeline: &config.DeploymentPipeline{ Stages: []config.PipelineStage{ @@ -257,9 +277,27 @@ func TestExecuteStage_SignalTerminated(t *testing.T) { apiClient: &fakeAPIClient{}, targetDSP: &fakeDeploySourceProvider{}, runningDSP: &fakeDeploySourceProvider{}, - stageBasedPluginsMap: map[string]pluginapi.PluginClient{ - "stage-name": &fakeExecutorPluginClient{}, - }, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "stage-name", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "stage-id", + Name: "stage-name", + }, + }, + stageStatusMap: map[string]model.StageStatus{ + "stage-id": model.StageStatus_STAGE_SUCCESS, + }, + }, + }, + }) + require.NoError(t, err) + + return pr + }(), genericApplicationConfig: &config.GenericApplicationSpec{ Pipeline: &config.DeploymentPipeline{ Stages: []config.PipelineStage{ @@ -295,9 +333,27 @@ func TestExecuteStage_SignalCancelled(t *testing.T) { apiClient: &fakeAPIClient{}, targetDSP: &fakeDeploySourceProvider{}, runningDSP: &fakeDeploySourceProvider{}, - stageBasedPluginsMap: map[string]pluginapi.PluginClient{ - "stage-name": &fakeExecutorPluginClient{}, - }, + pluginRegistry: func() plugin.PluginRegistry { + pr, err := plugin.NewPluginRegistry(context.TODO(), []plugin.Plugin{ + { + Name: "stage-name", + Cli: &fakePlugin{ + pipelineStages: []*model.PipelineStage{ + { + Id: "stage-id", + Name: "stage-name", + }, + }, + stageStatusMap: map[string]model.StageStatus{ + "stage-id": model.StageStatus_STAGE_SUCCESS, + }, + }, + }, + }) + require.NoError(t, err) + + return pr + }(), genericApplicationConfig: &config.GenericApplicationSpec{ Pipeline: &config.DeploymentPipeline{ Stages: []config.PipelineStage{