Skip to content

Commit

Permalink
Use plugin registry instead of pluginClients and stageBasedPluingsMap…
Browse files Browse the repository at this point in the history
… in the controller, scheduler, and planner

Signed-off-by: Yoshiki Fujikane <ffjlabo@gmail.com>
  • Loading branch information
ffjlabo committed Jan 8, 2025
1 parent fdd31ae commit 40c6c1e
Show file tree
Hide file tree
Showing 6 changed files with 516 additions and 294 deletions.
18 changes: 15 additions & 3 deletions pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/eventwatcher"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/notifier"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/statsreporter"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/trigger"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
Expand Down Expand Up @@ -348,7 +349,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
}

// Make grpc clients to connect to plugins.
pluginClis := make([]pluginapi.PluginClient, 0, len(cfg.Plugins))
plugins := make([]plugin.Plugin, 0, len(cfg.Plugins))

Check warning on line 352 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L352

Added line #L352 was not covered by tests
options := []rpcclient.DialOption{
rpcclient.WithBlock(),
rpcclient.WithInsecure(),
Expand All @@ -357,8 +358,19 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
cli, err := pluginapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(plg.Port)), options...)
if err != nil {
input.Logger.Error("failed to create client to connect plugin", zap.String("plugin", plg.Name), zap.Error(err))
return err

Check warning on line 361 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L361

Added line #L361 was not covered by tests
}
pluginClis = append(pluginClis, cli)

plugins = append(plugins, plugin.Plugin{
Name: plg.Name,
Cli: cli,
})

Check warning on line 367 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L364-L367

Added lines #L364 - L367 were not covered by tests
}

pluginRegistry, err := plugin.NewPluginRegistry(ctx, plugins)
if err != nil {
input.Logger.Error("failed to create plugin registry", zap.Error(err))
return err

Check warning on line 373 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L370-L373

Added lines #L370 - L373 were not covered by tests
}

// Start running application live state reporter.
Expand All @@ -383,7 +395,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
c := controller.NewController(
apiClient,
gitClient,
pluginClis,
pluginRegistry,

Check warning on line 398 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L398

Added line #L398 was not covered by tests
deploymentLister,
commandLister,
notifier,
Expand Down
36 changes: 8 additions & 28 deletions pkg/app/pipedv1/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ import (
"google.golang.org/grpc/status"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/git"
"github.com/pipe-cd/pipecd/pkg/model"
pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
)

type apiClient interface {
Expand Down Expand Up @@ -96,10 +95,9 @@ type controller struct {
notifier notifier
secretDecrypter secretDecrypter

// gRPC clients to communicate with plugins.
pluginClients []pluginapi.PluginClient
// Map from stage name to the plugin client.
stageBasedPluginsMap map[string]pluginapi.PluginClient
// The registry of all plugins.
pluginRegistry plugin.PluginRegistry

// Map from application ID to the planner
// of a pending deployment of that application.
planners map[string]*planner
Expand Down Expand Up @@ -131,7 +129,7 @@ type controller struct {
func NewController(
apiClient apiClient,
gitClient gitClient,
pluginClients []pluginapi.PluginClient,
pluginRegistry plugin.PluginRegistry,
deploymentLister deploymentLister,
commandLister commandLister,
notifier notifier,
Expand All @@ -144,7 +142,7 @@ func NewController(
return &controller{
apiClient: apiClient,
gitClient: gitClient,
pluginClients: pluginClients,
pluginRegistry: pluginRegistry,

Check warning on line 145 in pkg/app/pipedv1/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/controller.go#L145

Added line #L145 was not covered by tests
deploymentLister: deploymentLister,
commandLister: commandLister,
notifier: notifier,
Expand Down Expand Up @@ -179,23 +177,6 @@ func (c *controller) Run(ctx context.Context) error {
c.workspaceDir = dir
c.logger.Info(fmt.Sprintf("workspace directory was configured to %s", c.workspaceDir))

// Build the list of stages that can be handled by piped's plugins.
stagesBasedPluginsMap := make(map[string]pluginapi.PluginClient)
for _, plugin := range c.pluginClients {
resp, err := plugin.FetchDefinedStages(ctx, &deployment.FetchDefinedStagesRequest{})
if err != nil {
return err
}
for _, stage := range resp.GetStages() {
if _, ok := stagesBasedPluginsMap[stage]; ok {
c.logger.Error("duplicated stage name", zap.String("stage", stage))
return fmt.Errorf("duplicated stage name %s", stage)
}
stagesBasedPluginsMap[stage] = plugin
}
}
c.stageBasedPluginsMap = stagesBasedPluginsMap

ticker := time.NewTicker(c.syncInternal)
defer ticker.Stop()
c.logger.Info("start syncing planners and schedulers")
Expand Down Expand Up @@ -448,8 +429,7 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) (
commitHash,
configFilename,
workingDir,
c.pluginClients, // FIXME: Find a way to ensure the plugins only related to deployment.
c.stageBasedPluginsMap,
c.pluginRegistry,

Check warning on line 432 in pkg/app/pipedv1/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/controller.go#L432

Added line #L432 was not covered by tests
c.apiClient,
c.gitClient,
c.notifier,
Expand Down Expand Up @@ -590,7 +570,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment)
workingDir,
c.apiClient,
c.gitClient,
c.stageBasedPluginsMap,
c.pluginRegistry,

Check warning on line 573 in pkg/app/pipedv1/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/controller.go#L573

Added line #L573 was not covered by tests
c.notifier,
c.secretDecrypter,
c.logger,
Expand Down
54 changes: 30 additions & 24 deletions pkg/app/pipedv1/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/model"
Expand All @@ -57,13 +58,6 @@ type planner struct {
lastSuccessfulConfigFilename string
workingDir string

// The plugin clients are used to call plugin that actually
// performs planning deployment.
plugins []pluginapi.PluginClient
// The map used to know which plugin is incharged for a given stage
// of the current deployment.
stageBasedPluginsMap map[string]pluginapi.PluginClient

// The apiClient is used to report the deployment status.
apiClient apiClient

Expand All @@ -79,6 +73,9 @@ type planner struct {
// which encrypted using PipeCD built-in secret management.
secretDecrypter secretDecrypter

// The pluginRegistry is used to determine which plugins to be used
pluginRegistry plugin.PluginRegistry

logger *zap.Logger
tracer trace.Tracer

Expand All @@ -96,8 +93,7 @@ func newPlanner(
lastSuccessfulCommitHash string,
lastSuccessfulConfigFilename string,
workingDir string,
pluginClients []pluginapi.PluginClient,
stageBasedPluginsMap map[string]pluginapi.PluginClient,
pluginRegistry plugin.PluginRegistry,
apiClient apiClient,
gitClient gitClient,
notifier notifier,
Expand All @@ -119,8 +115,7 @@ func newPlanner(
lastSuccessfulCommitHash: lastSuccessfulCommitHash,
lastSuccessfulConfigFilename: lastSuccessfulConfigFilename,
workingDir: workingDir,
stageBasedPluginsMap: stageBasedPluginsMap,
plugins: pluginClients,
pluginRegistry: pluginRegistry,

Check warning on line 118 in pkg/app/pipedv1/controller/planner.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/planner.go#L118

Added line #L118 was not covered by tests
apiClient: apiClient,
gitClient: gitClient,
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
Expand Down Expand Up @@ -266,8 +261,21 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment
TargetDeploymentSource: targetDS,
}

cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](targetDS.GetApplicationConfig())
if err != nil {
p.logger.Error("unable to parse application config", zap.Error(err))
return nil, err
}

Check warning on line 268 in pkg/app/pipedv1/controller/planner.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/planner.go#L266-L268

Added lines #L266 - L268 were not covered by tests
spec := cfg.Spec

plugins, err := p.pluginRegistry.GetPluginClientsByAppConfig(spec)
if err != nil {
p.logger.Error("unable to get plugin clients by app config", zap.Error(err))
return nil, err
}

Check warning on line 275 in pkg/app/pipedv1/controller/planner.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/planner.go#L273-L275

Added lines #L273 - L275 were not covered by tests

// Build deployment target versions.
for _, plg := range p.plugins {
for _, plg := range plugins {
vRes, err := plg.DetermineVersions(ctx, &deployment.DetermineVersionsRequest{Input: input})
if err != nil {
p.logger.Warn("unable to determine versions", zap.Error(err))
Expand All @@ -284,13 +292,6 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment
}
}

cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](targetDS.GetApplicationConfig())
if err != nil {
p.logger.Error("unable to parse application config", zap.Error(err))
return nil, err
}
spec := cfg.Spec

// In case the strategy has been decided by trigger.
// For example: user triggered the deployment via web console.
switch p.deployment.Trigger.SyncStrategy {
Expand Down Expand Up @@ -375,7 +376,7 @@ func (p *planner) buildPlan(ctx context.Context, runningDS, targetDS *deployment
summary string
)
// Build plan based on plugins determined strategy
for _, plg := range p.plugins {
for _, plg := range plugins {
res, err := plg.DetermineStrategy(ctx, &deployment.DetermineStrategyRequest{Input: input})
if err != nil {
p.logger.Warn("Unable to determine strategy using current plugin", zap.Error(err))
Expand Down Expand Up @@ -421,7 +422,12 @@ func (p *planner) buildQuickSyncStages(ctx context.Context, cfg *config.GenericA
rollbackStages = []*model.PipelineStage{}
rollback = *cfg.Planner.AutoRollback
)
for _, plg := range p.plugins {

plugins, err := p.pluginRegistry.GetPluginClientsByAppConfig(cfg)
if err != nil {
return nil, err
}

Check warning on line 429 in pkg/app/pipedv1/controller/planner.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/planner.go#L428-L429

Added lines #L428 - L429 were not covered by tests
for _, plg := range plugins {
res, err := plg.BuildQuickSyncStages(ctx, &deployment.BuildQuickSyncStagesRequest{Rollback: rollback})
if err != nil {
return nil, fmt.Errorf("failed to build quick sync stage deployment (%w)", err)
Expand Down Expand Up @@ -462,9 +468,9 @@ func (p *planner) buildPipelineSyncStages(ctx context.Context, cfg *config.Gener
// Build stages config for each plugin.
for i := range stagesCfg {
stageCfg := stagesCfg[i]
plg, ok := p.stageBasedPluginsMap[stageCfg.Name.String()]
if !ok {
return nil, fmt.Errorf("unable to find plugin for stage %q", stageCfg.Name.String())
plg, err := p.pluginRegistry.GetPluginClientByStageName(stageCfg.Name.String())
if err != nil {
return nil, err

Check warning on line 473 in pkg/app/pipedv1/controller/planner.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/planner.go#L473

Added line #L473 was not covered by tests
}

stagesCfgPerPlugin[plg] = append(stagesCfgPerPlugin[plg], &deployment.BuildPipelineSyncStagesRequest_StageConfig{
Expand Down
Loading

0 comments on commit 40c6c1e

Please sign in to comment.