Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use plugin registry instead of pluginClients and stageBasedPluingsMap in the controller, scheduler, and planner #5472

Merged
merged 3 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/eventwatcher"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/notifier"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/statsreporter"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/trigger"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
Expand Down Expand Up @@ -348,7 +349,7 @@
}

// Make grpc clients to connect to plugins.
pluginClis := make([]pluginapi.PluginClient, 0, len(cfg.Plugins))
plugins := make([]plugin.Plugin, 0, len(cfg.Plugins))

Check warning on line 352 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L352

Added line #L352 was not covered by tests
options := []rpcclient.DialOption{
rpcclient.WithBlock(),
rpcclient.WithInsecure(),
Expand All @@ -357,8 +358,19 @@
cli, err := pluginapi.NewClient(ctx, net.JoinHostPort("localhost", strconv.Itoa(plg.Port)), options...)
if err != nil {
input.Logger.Error("failed to create client to connect plugin", zap.String("plugin", plg.Name), zap.Error(err))
return err

Check warning on line 361 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L361

Added line #L361 was not covered by tests
}
pluginClis = append(pluginClis, cli)

plugins = append(plugins, plugin.Plugin{
Name: plg.Name,
Cli: cli,
})

Check warning on line 367 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L364-L367

Added lines #L364 - L367 were not covered by tests
}

pluginRegistry, err := plugin.NewPluginRegistry(ctx, plugins)
if err != nil {
input.Logger.Error("failed to create plugin registry", zap.Error(err))
return err

Check warning on line 373 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L370-L373

Added lines #L370 - L373 were not covered by tests
}

// Start running application live state reporter.
Expand All @@ -383,7 +395,7 @@
c := controller.NewController(
apiClient,
gitClient,
pluginClis,
pluginRegistry,

Check warning on line 398 in pkg/app/pipedv1/cmd/piped/piped.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/cmd/piped/piped.go#L398

Added line #L398 was not covered by tests
deploymentLister,
commandLister,
notifier,
Expand Down
36 changes: 8 additions & 28 deletions pkg/app/pipedv1/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@
"google.golang.org/grpc/status"

"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/git"
"github.com/pipe-cd/pipecd/pkg/model"
pluginapi "github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/deployment"
)

type apiClient interface {
Expand Down Expand Up @@ -96,10 +95,9 @@
notifier notifier
secretDecrypter secretDecrypter

// gRPC clients to communicate with plugins.
pluginClients []pluginapi.PluginClient
// Map from stage name to the plugin client.
stageBasedPluginsMap map[string]pluginapi.PluginClient
// The registry of all plugins.
pluginRegistry plugin.PluginRegistry

// Map from application ID to the planner
// of a pending deployment of that application.
planners map[string]*planner
Expand Down Expand Up @@ -131,7 +129,7 @@
func NewController(
apiClient apiClient,
gitClient gitClient,
pluginClients []pluginapi.PluginClient,
pluginRegistry plugin.PluginRegistry,
deploymentLister deploymentLister,
commandLister commandLister,
notifier notifier,
Expand All @@ -144,7 +142,7 @@
return &controller{
apiClient: apiClient,
gitClient: gitClient,
pluginClients: pluginClients,
pluginRegistry: pluginRegistry,

Check warning on line 145 in pkg/app/pipedv1/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/controller.go#L145

Added line #L145 was not covered by tests
deploymentLister: deploymentLister,
commandLister: commandLister,
notifier: notifier,
Expand Down Expand Up @@ -179,23 +177,6 @@
c.workspaceDir = dir
c.logger.Info(fmt.Sprintf("workspace directory was configured to %s", c.workspaceDir))

// Build the list of stages that can be handled by piped's plugins.
stagesBasedPluginsMap := make(map[string]pluginapi.PluginClient)
for _, plugin := range c.pluginClients {
resp, err := plugin.FetchDefinedStages(ctx, &deployment.FetchDefinedStagesRequest{})
if err != nil {
return err
}
for _, stage := range resp.GetStages() {
if _, ok := stagesBasedPluginsMap[stage]; ok {
c.logger.Error("duplicated stage name", zap.String("stage", stage))
return fmt.Errorf("duplicated stage name %s", stage)
}
stagesBasedPluginsMap[stage] = plugin
}
}
c.stageBasedPluginsMap = stagesBasedPluginsMap

ticker := time.NewTicker(c.syncInternal)
defer ticker.Stop()
c.logger.Info("start syncing planners and schedulers")
Expand Down Expand Up @@ -448,8 +429,7 @@
commitHash,
configFilename,
workingDir,
c.pluginClients, // FIXME: Find a way to ensure the plugins only related to deployment.
c.stageBasedPluginsMap,
c.pluginRegistry,

Check warning on line 432 in pkg/app/pipedv1/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/controller.go#L432

Added line #L432 was not covered by tests
c.apiClient,
c.gitClient,
c.notifier,
Expand Down Expand Up @@ -590,7 +570,7 @@
workingDir,
c.apiClient,
c.gitClient,
c.stageBasedPluginsMap,
c.pluginRegistry,

Check warning on line 573 in pkg/app/pipedv1/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/controller.go#L573

Added line #L573 was not covered by tests
c.notifier,
c.secretDecrypter,
c.logger,
Expand Down
54 changes: 30 additions & 24 deletions pkg/app/pipedv1/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/metadatastore"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
config "github.com/pipe-cd/pipecd/pkg/configv1"
"github.com/pipe-cd/pipecd/pkg/model"
Expand All @@ -57,13 +58,6 @@
lastSuccessfulConfigFilename string
workingDir string

// The plugin clients are used to call plugin that actually
// performs planning deployment.
plugins []pluginapi.PluginClient
// The map used to know which plugin is incharged for a given stage
// of the current deployment.
stageBasedPluginsMap map[string]pluginapi.PluginClient

// The apiClient is used to report the deployment status.
apiClient apiClient

Expand All @@ -79,6 +73,9 @@
// which encrypted using PipeCD built-in secret management.
secretDecrypter secretDecrypter

// The pluginRegistry is used to determine which plugins to be used
pluginRegistry plugin.PluginRegistry

logger *zap.Logger
tracer trace.Tracer

Expand All @@ -96,8 +93,7 @@
lastSuccessfulCommitHash string,
lastSuccessfulConfigFilename string,
workingDir string,
pluginClients []pluginapi.PluginClient,
stageBasedPluginsMap map[string]pluginapi.PluginClient,
pluginRegistry plugin.PluginRegistry,
apiClient apiClient,
gitClient gitClient,
notifier notifier,
Expand All @@ -119,8 +115,7 @@
lastSuccessfulCommitHash: lastSuccessfulCommitHash,
lastSuccessfulConfigFilename: lastSuccessfulConfigFilename,
workingDir: workingDir,
stageBasedPluginsMap: stageBasedPluginsMap,
plugins: pluginClients,
pluginRegistry: pluginRegistry,

Check warning on line 118 in pkg/app/pipedv1/controller/planner.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/planner.go#L118

Added line #L118 was not covered by tests
apiClient: apiClient,
gitClient: gitClient,
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
Expand Down Expand Up @@ -266,8 +261,21 @@
TargetDeploymentSource: targetDS,
}

cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](targetDS.GetApplicationConfig())
if err != nil {
p.logger.Error("unable to parse application config", zap.Error(err))
return nil, err
}

Check warning on line 268 in pkg/app/pipedv1/controller/planner.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/planner.go#L266-L268

Added lines #L266 - L268 were not covered by tests
spec := cfg.Spec

plugins, err := p.pluginRegistry.GetPluginClientsByAppConfig(spec)
if err != nil {
p.logger.Error("unable to get plugin clients by app config", zap.Error(err))
return nil, err
}

Check warning on line 275 in pkg/app/pipedv1/controller/planner.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/planner.go#L273-L275

Added lines #L273 - L275 were not covered by tests

// Build deployment target versions.
for _, plg := range p.plugins {
for _, plg := range plugins {
vRes, err := plg.DetermineVersions(ctx, &deployment.DetermineVersionsRequest{Input: input})
if err != nil {
p.logger.Warn("unable to determine versions", zap.Error(err))
Expand All @@ -284,13 +292,6 @@
}
}

cfg, err := config.DecodeYAML[*config.GenericApplicationSpec](targetDS.GetApplicationConfig())
if err != nil {
p.logger.Error("unable to parse application config", zap.Error(err))
return nil, err
}
spec := cfg.Spec

// In case the strategy has been decided by trigger.
// For example: user triggered the deployment via web console.
switch p.deployment.Trigger.SyncStrategy {
Expand Down Expand Up @@ -375,7 +376,7 @@
summary string
)
// Build plan based on plugins determined strategy
for _, plg := range p.plugins {
for _, plg := range plugins {
res, err := plg.DetermineStrategy(ctx, &deployment.DetermineStrategyRequest{Input: input})
if err != nil {
p.logger.Warn("Unable to determine strategy using current plugin", zap.Error(err))
Expand Down Expand Up @@ -421,7 +422,12 @@
rollbackStages = []*model.PipelineStage{}
rollback = *cfg.Planner.AutoRollback
)
for _, plg := range p.plugins {

plugins, err := p.pluginRegistry.GetPluginClientsByAppConfig(cfg)
if err != nil {
return nil, err
Warashi marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 429 in pkg/app/pipedv1/controller/planner.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/planner.go#L428-L429

Added lines #L428 - L429 were not covered by tests
for _, plg := range plugins {
res, err := plg.BuildQuickSyncStages(ctx, &deployment.BuildQuickSyncStagesRequest{Rollback: rollback})
if err != nil {
return nil, fmt.Errorf("failed to build quick sync stage deployment (%w)", err)
Expand Down Expand Up @@ -462,9 +468,9 @@
// Build stages config for each plugin.
for i := range stagesCfg {
stageCfg := stagesCfg[i]
plg, ok := p.stageBasedPluginsMap[stageCfg.Name.String()]
if !ok {
return nil, fmt.Errorf("unable to find plugin for stage %q", stageCfg.Name.String())
plg, err := p.pluginRegistry.GetPluginClientByStageName(stageCfg.Name.String())
if err != nil {
return nil, err

Check warning on line 473 in pkg/app/pipedv1/controller/planner.go

View check run for this annotation

Codecov / codecov/patch

pkg/app/pipedv1/controller/planner.go#L473

Added line #L473 was not covered by tests
Warashi marked this conversation as resolved.
Show resolved Hide resolved
}

stagesCfgPerPlugin[plg] = append(stagesCfgPerPlugin[plg], &deployment.BuildPipelineSyncStagesRequest_StageConfig{
Expand Down
Loading
Loading