diff --git a/internal/cmd/local/k8s/client.go b/internal/cmd/local/k8s/client.go index 0358703..728f173 100644 --- a/internal/cmd/local/k8s/client.go +++ b/internal/cmd/local/k8s/client.go @@ -33,50 +33,40 @@ type Client interface { // This is a blocking call, it should only return once the deployment has completed. DeploymentRestart(ctx context.Context, namespace, name string) error - // IngressCreate creates an ingress in the given namespace + EventsWatch(ctx context.Context, namespace string) (watch.Interface, error) + IngressCreate(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error - // IngressExists returns true if the ingress exists in the namespace, false otherwise. IngressExists(ctx context.Context, namespace string, ingress string) bool - // IngressUpdate updates an existing ingress in the given namespace IngressUpdate(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error - // NamespaceCreate creates a namespace + LogsGet(ctx context.Context, namespace string, name string) (string, error) + NamespaceCreate(ctx context.Context, namespace string) error - // NamespaceExists returns true if the namespace exists, false otherwise NamespaceExists(ctx context.Context, namespace string) bool - // NamespaceDelete deletes the existing namespace NamespaceDelete(ctx context.Context, namespace string) error - // PersistentVolumeCreate creates a persistent volume PersistentVolumeCreate(ctx context.Context, namespace, name string) error - // PersistentVolumeExists returns true if the persistent volume exists, false otherwise PersistentVolumeExists(ctx context.Context, namespace, name string) bool - // PersistentVolumeDelete deletes the existing persistent volume PersistentVolumeDelete(ctx context.Context, namespace, name string) error - // PersistentVolumeClaimCreate creates a persistent volume claim PersistentVolumeClaimCreate(ctx context.Context, namespace, name, volumeName string) error - // PersistentVolumeClaimExists returns true if the persistent volume claim exists, false otherwise PersistentVolumeClaimExists(ctx context.Context, namespace, name, volumeName string) bool - // PersistentVolumeClaimDelete deletes the existing persistent volume claim PersistentVolumeClaimDelete(ctx context.Context, namespace, name, volumeName string) error - // SecretCreateOrUpdate will update or create the secret name with the payload of data in the specified namespace + PodList(ctx context.Context, namespace string) (*corev1.PodList, error) + SecretCreateOrUpdate(ctx context.Context, secret corev1.Secret) error - // SecretGet returns the secrets for the namespace and name + // SecretDeleteCollection deletes multiple secrets. + // Note this takes a `type` and not a `name`. All secrets matching this type will be removed. + SecretDeleteCollection(ctx context.Context, namespace, _type string) error SecretGet(ctx context.Context, namespace, name string) (*corev1.Secret, error) - // ServiceGet returns the service for the given namespace and name ServiceGet(ctx context.Context, namespace, name string) (*corev1.Service, error) + StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) + // ServerVersionGet returns the kubernetes version. ServerVersionGet() (string, error) - - EventsWatch(ctx context.Context, namespace string) (watch.Interface, error) - - LogsGet(ctx context.Context, namespace string, name string) (string, error) - StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) - PodList(ctx context.Context, namespace string) (*corev1.PodList, error) } var _ Client = (*DefaultK8sClient)(nil) @@ -289,6 +279,13 @@ func (d *DefaultK8sClient) SecretCreateOrUpdate(ctx context.Context, secret core return fmt.Errorf("unexpected error while handling the secret %s: %w", name, err) } +func (d *DefaultK8sClient) SecretDeleteCollection(ctx context.Context, namespace, _type string) error { + listOptions := metav1.ListOptions{ + FieldSelector: "type=" + _type, + } + return d.ClientSet.CoreV1().Secrets(namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, listOptions) +} + func (d *DefaultK8sClient) SecretGet(ctx context.Context, namespace, name string) (*corev1.Secret, error) { secret, err := d.ClientSet.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { diff --git a/internal/cmd/local/k8s/k8stest/k8stest.go b/internal/cmd/local/k8s/k8stest/k8stest.go index 8b4e0e8..416ea5f 100644 --- a/internal/cmd/local/k8s/k8stest/k8stest.go +++ b/internal/cmd/local/k8s/k8stest/k8stest.go @@ -30,6 +30,7 @@ type MockClient struct { FnPersistentVolumeClaimExists func(ctx context.Context, namespace, name, volumeName string) bool FnPersistentVolumeClaimDelete func(ctx context.Context, namespace, name, volumeName string) error FnSecretCreateOrUpdate func(ctx context.Context, secret corev1.Secret) error + FnSecretDeleteCollection func(ctx context.Context, namespace, _type string) error FnSecretGet func(ctx context.Context, namespace, name string) (*corev1.Secret, error) FnServerVersionGet func() (string, error) FnServiceGet func(ctx context.Context, namespace, name string) (*corev1.Service, error) @@ -146,6 +147,14 @@ func (m *MockClient) SecretGet(ctx context.Context, namespace, name string) (*co return nil, nil } +func (m *MockClient) SecretDeleteCollection(ctx context.Context, namespace, _type string) error { + if m.FnSecretDeleteCollection != nil { + return m.FnSecretDeleteCollection(ctx, namespace, _type) + } + + return nil +} + func (m *MockClient) ServiceGet(ctx context.Context, namespace, name string) (*corev1.Service, error) { return m.FnServiceGet(ctx, namespace, name) } @@ -180,7 +189,7 @@ func (m *MockClient) StreamPodLogs(ctx context.Context, namespace string, podNam func (m *MockClient) PodList(ctx context.Context, namespace string) (*corev1.PodList, error) { if m.FnPodList == nil { - return nil, nil + return &corev1.PodList{}, nil } return m.FnPodList(ctx, namespace) } diff --git a/internal/cmd/local/local/install.go b/internal/cmd/local/local/install.go index c3124a4..d3ce08d 100644 --- a/internal/cmd/local/local/install.go +++ b/internal/cmd/local/local/install.go @@ -589,6 +589,13 @@ type chartRequest struct { uninstallFirst bool } +// errHelmStuck is the error returned (only from a msg perspective, not this actual error) from the underlying helm +// client when the most recent install/upgrade attempt was terminated early (e.g. via ctrl+c) and was +// unable to (or not configured to) rollback to a prior version. +// +// The actual error returned by the underlying helm-client isn't exported. +var errHelmStuck = errors.New("another operation (install/upgrade/rollback) is in progress") + // handleChart will handle the installation of a chart func (c *Command) handleChart( ctx context.Context, @@ -648,29 +655,57 @@ func (c *Command) handleChart( } } - pterm.Info.Println(fmt.Sprintf( - "Starting Helm Chart installation of '%s' (version: %s)", - req.chartName, helmChart.Metadata.Version, - )) - c.spinner.UpdateText(fmt.Sprintf( - "Installing '%s' (version: %s) Helm Chart (this may take several minutes)", - req.chartName, helmChart.Metadata.Version, - )) - helmRelease, err := c.helm.InstallOrUpgradeChart(ctx, &helmclient.ChartSpec{ - ReleaseName: req.chartRelease, - ChartName: req.chartLoc, - CreateNamespace: true, - Namespace: req.namespace, - Wait: true, - Timeout: 60 * time.Minute, - ValuesYaml: req.valuesYAML, - Version: req.chartVersion, - }, - &helmclient.GenericHelmOptions{}, - ) - if err != nil { - pterm.Error.Printfln("Failed to install %s Helm Chart", req.chartName) - return fmt.Errorf("unable to install helm: %w", err) + // This will be non-nil if the following for-loop is able to successfully install/upgrade the chart + // AND that for-loop doesn't return early with an error. + var helmRelease *release.Release + + // it's possible that an existing helm installation is stuck in a non-final state + // which this code will detect, attempt to clean up, and try again up to three times. + // Only the helmStuckError (based on error-message equivalence) will be retried, all other errors + // will be returned. + for attemptCount := 0; attemptCount < 3; attemptCount++ { + pterm.Info.Println(fmt.Sprintf( + "Starting Helm Chart installation of '%s' (version: %s)", + req.chartName, helmChart.Metadata.Version, + )) + c.spinner.UpdateText(fmt.Sprintf( + "Installing '%s' (version: %s) Helm Chart (this may take several minutes)", + req.chartName, helmChart.Metadata.Version, + )) + + helmRelease, err = c.helm.InstallOrUpgradeChart(ctx, &helmclient.ChartSpec{ + ReleaseName: req.chartRelease, + ChartName: req.chartLoc, + CreateNamespace: true, + Namespace: req.namespace, + Wait: true, + Timeout: 60 * time.Minute, + ValuesYaml: req.valuesYAML, + Version: req.chartVersion, + }, + &helmclient.GenericHelmOptions{}, + ) + + if err != nil { + // If the error is the errHelmStuck error, attempt to resolve this by removing the helm release secret. + // See: https://github.com/helm/helm/issues/8987#issuecomment-1082992461 + if strings.Contains(err.Error(), errHelmStuck.Error()) { + if err := c.k8s.SecretDeleteCollection(ctx, common.AirbyteNamespace, "helm.sh/release.v1"); err != nil { + pterm.Debug.Println(fmt.Sprintf("unable to delete secrets helm.sh/release.v1: %s", err)) + } + continue + } + pterm.Error.Printfln("Failed to install %s Helm Chart", req.chartName) + return fmt.Errorf("unable to install helm: %w", err) + } + break + } + + // If helmRelease is nil, that means we were unable to successfully install/upgrade the chart. + // This is an error situation. As only one specific error message should cause this (all other errors + // should have returned out of the for-loop), we can treat this as if the underlying helm-client + if helmRelease == nil { + return localerr.ErrHelmStuck } c.tel.Attr(fmt.Sprintf("helm_%s_release_version", req.name), strconv.Itoa(helmRelease.Version)) diff --git a/internal/cmd/local/local/install_test.go b/internal/cmd/local/local/install_test.go index 2998b6d..0c3155c 100644 --- a/internal/cmd/local/local/install_test.go +++ b/internal/cmd/local/local/install_test.go @@ -11,6 +11,7 @@ import ( "github.com/airbytehq/abctl/internal/cmd/local/helm" "github.com/airbytehq/abctl/internal/cmd/local/k8s" "github.com/airbytehq/abctl/internal/cmd/local/k8s/k8stest" + "github.com/airbytehq/abctl/internal/cmd/local/localerr" "github.com/airbytehq/abctl/internal/common" "github.com/airbytehq/abctl/internal/telemetry" "github.com/google/go-cmp/cmp" @@ -25,7 +26,7 @@ import ( const portTest = 9999 const testAirbyteChartLoc = "https://airbytehq.github.io/helm-charts/airbyte-1.2.3.tgz" -func TestCommand_Install(t *testing.T) { +func TestCommand_Install_HappyPath(t *testing.T) { valuesYaml := mustReadFile(t, "testdata/test-edition.values.yaml") expChartRepoCnt := 0 expChartRepo := []struct { @@ -172,6 +173,288 @@ func TestCommand_Install(t *testing.T) { } } +func TestCommand_Install_BadHelmState(t *testing.T) { + valuesYaml := mustReadFile(t, "testdata/test-edition.values.yaml") + + expChartCnt := 0 + expNginxValues, _ := helm.BuildNginxValues(9999) + expChart := []struct { + chart helmclient.ChartSpec + release release.Release + }{ + { + chart: helmclient.ChartSpec{ + ReleaseName: common.AirbyteChartRelease, + ChartName: testAirbyteChartLoc, + Namespace: common.AirbyteNamespace, + CreateNamespace: true, + Wait: true, + Timeout: 60 * time.Minute, + ValuesYaml: valuesYaml, + }, + release: release.Release{ + Chart: &chart.Chart{Metadata: &chart.Metadata{Version: "1.2.3.4"}}, + Name: common.AirbyteChartRelease, + Namespace: common.AirbyteNamespace, + Version: 0, + }, + }, + { + chart: helmclient.ChartSpec{ + ReleaseName: common.NginxChartRelease, + ChartName: common.NginxChartName, + Namespace: common.NginxNamespace, + CreateNamespace: true, + Wait: true, + Timeout: 60 * time.Minute, + ValuesYaml: expNginxValues, + }, + release: release.Release{ + Chart: &chart.Chart{Metadata: &chart.Metadata{Version: "4.3.2.1"}}, + Name: common.NginxChartRelease, + Namespace: common.NginxNamespace, + Version: 0, + }, + }, + } + + installCalled := 0 + helm := mockHelmClient{ + addOrUpdateChartRepo: func(entry repo.Entry) error { + return nil + }, + + getChart: func(name string, _ *action.ChartPathOptions) (*chart.Chart, string, error) { + switch { + case name == testAirbyteChartLoc: + return &chart.Chart{Metadata: &chart.Metadata{Version: "test.airbyte.version"}}, "", nil + case name == common.NginxChartName: + return &chart.Chart{Metadata: &chart.Metadata{Version: "test.nginx.version"}}, "", nil + default: + t.Error("unsupported chart name", name) + return nil, "", errors.New("unexpected chart name") + } + }, + + getRelease: func(name string) (*release.Release, error) { + switch { + case name == common.AirbyteChartRelease: + t.Error("should not have been called", name) + return nil, errors.New("should not have been called") + case name == common.NginxChartRelease: + return nil, errors.New("not found") + default: + t.Error("unsupported chart name", name) + return nil, errors.New("unexpected chart name") + } + }, + + installOrUpgradeChart: func(ctx context.Context, spec *helmclient.ChartSpec, opts *helmclient.GenericHelmOptions) (*release.Release, error) { + if installCalled > 0 { + defer func() { expChartCnt++ }() + + return &expChart[expChartCnt].release, nil + } + + installCalled++ + return nil, errHelmStuck + }, + + uninstallReleaseByName: func(s string) error { + return nil + }, + } + + k8sClient := k8stest.MockClient{ + FnIngressExists: func(ctx context.Context, namespace string, ingress string) bool { + return false + }, + } + + tel := telemetry.MockClient{} + + httpClient := mockHTTP{do: func(req *http.Request) (*http.Response, error) { + return &http.Response{StatusCode: 200}, nil + }} + + c, err := New( + k8s.TestProvider, + WithPortHTTP(portTest), + WithHelmClient(&helm), + WithK8sClient(&k8sClient), + WithTelemetryClient(&tel), + WithHTTPClient(&httpClient), + WithBrowserLauncher(func(url string) error { + return nil + }), + ) + if err != nil { + t.Fatal(err) + } + + installOpts := &InstallOpts{ + HelmValuesYaml: valuesYaml, + AirbyteChartLoc: testAirbyteChartLoc, + } + if err := c.Install(context.Background(), installOpts); err != nil { + t.Fatal(err) + } +} + +// verify functionality if the bad (stuck) helm state persists between attempts +func TestCommand_Install_BadHelmStatePersists(t *testing.T) { + valuesYaml := mustReadFile(t, "testdata/test-edition.values.yaml") + expChartRepoCnt := 0 + expChartRepo := []struct { + name string + url string + }{ + {name: common.AirbyteRepoName, url: common.AirbyteRepoURL}, + {name: common.NginxRepoName, url: common.NginxRepoURL}, + } + + expChartCnt := 0 + expNginxValues, _ := helm.BuildNginxValues(9999) + expChart := []struct { + chart helmclient.ChartSpec + release release.Release + }{ + { + chart: helmclient.ChartSpec{ + ReleaseName: common.AirbyteChartRelease, + ChartName: testAirbyteChartLoc, + Namespace: common.AirbyteNamespace, + CreateNamespace: true, + Wait: true, + Timeout: 60 * time.Minute, + ValuesYaml: valuesYaml, + }, + release: release.Release{ + Chart: &chart.Chart{Metadata: &chart.Metadata{Version: "1.2.3.4"}}, + Name: common.AirbyteChartRelease, + Namespace: common.AirbyteNamespace, + Version: 0, + }, + }, + { + chart: helmclient.ChartSpec{ + ReleaseName: common.NginxChartRelease, + ChartName: common.NginxChartName, + Namespace: common.NginxNamespace, + CreateNamespace: true, + Wait: true, + Timeout: 60 * time.Minute, + ValuesYaml: expNginxValues, + }, + release: release.Release{ + Chart: &chart.Chart{Metadata: &chart.Metadata{Version: "4.3.2.1"}}, + Name: common.NginxChartRelease, + Namespace: common.NginxNamespace, + Version: 0, + }, + }, + } + + installCalled := 0 + helm := mockHelmClient{ + addOrUpdateChartRepo: func(entry repo.Entry) error { + if d := cmp.Diff(expChartRepo[expChartRepoCnt].name, entry.Name); d != "" { + t.Error("chart name mismatch", d) + } + if d := cmp.Diff(expChartRepo[expChartRepoCnt].url, entry.URL); d != "" { + t.Error("chart url mismatch", d) + } + + expChartRepoCnt++ + + return nil + }, + + getChart: func(name string, _ *action.ChartPathOptions) (*chart.Chart, string, error) { + switch { + case name == testAirbyteChartLoc: + return &chart.Chart{Metadata: &chart.Metadata{Version: "test.airbyte.version"}}, "", nil + case name == common.NginxChartName: + return &chart.Chart{Metadata: &chart.Metadata{Version: "test.nginx.version"}}, "", nil + default: + t.Error("unsupported chart name", name) + return nil, "", errors.New("unexpected chart name") + } + }, + + getRelease: func(name string) (*release.Release, error) { + switch { + case name == common.AirbyteChartRelease: + t.Error("should not have been called", name) + return nil, errors.New("should not have been called") + case name == common.NginxChartRelease: + return nil, errors.New("not found") + default: + t.Error("unsupported chart name", name) + return nil, errors.New("unexpected chart name") + } + }, + + installOrUpgradeChart: func(ctx context.Context, spec *helmclient.ChartSpec, opts *helmclient.GenericHelmOptions) (*release.Release, error) { + if d := cmp.Diff(&expChart[expChartCnt].chart, spec); d != "" { + t.Error("chart mismatch", d) + } + + installCalled++ + return nil, errHelmStuck + }, + + uninstallReleaseByName: func(s string) error { + if d := cmp.Diff(expChart[expChartCnt].release.Name, s); d != "" { + t.Error("release mismatch", d) + } + + return nil + }, + } + + k8sClient := k8stest.MockClient{ + FnIngressExists: func(ctx context.Context, namespace string, ingress string) bool { + return false + }, + } + + tel := telemetry.MockClient{} + + httpClient := mockHTTP{do: func(req *http.Request) (*http.Response, error) { + return &http.Response{StatusCode: 200}, nil + }} + + c, err := New( + k8s.TestProvider, + WithPortHTTP(portTest), + WithHelmClient(&helm), + WithK8sClient(&k8sClient), + WithTelemetryClient(&tel), + WithHTTPClient(&httpClient), + WithBrowserLauncher(func(url string) error { + return nil + }), + ) + if err != nil { + t.Fatal(err) + } + + installOpts := &InstallOpts{ + HelmValuesYaml: valuesYaml, + AirbyteChartLoc: testAirbyteChartLoc, + } + if err := c.Install(context.Background(), installOpts); err == nil { + t.Fatal("expected error") + } else if !errors.Is(err, localerr.ErrHelmStuck) { + t.Fatalf("unexpected error: %v", err) + } + + if d := cmp.Diff(3, installCalled); d != "" { + t.Error("install attempts", d) + } +} + func TestCommand_InstallError(t *testing.T) { testErr := errors.New("test error") valuesYaml := mustReadFile(t, "testdata/test-edition.values.yaml") @@ -234,6 +517,7 @@ func TestCommand_InstallError(t *testing.T) { } func mustReadFile(t *testing.T, name string) string { + t.Helper() b, err := os.ReadFile(name) if err != nil { t.Fatal(err) diff --git a/internal/cmd/local/localerr/localerr.go b/internal/cmd/local/localerr/localerr.go index f126c9b..e78515f 100644 --- a/internal/cmd/local/localerr/localerr.go +++ b/internal/cmd/local/localerr/localerr.go @@ -41,12 +41,24 @@ Ensure that Docker is running and is accessible. You may need to upgrade to a n For additional help please visit https://docs.docker.com/get-docker/`, } + // ErrHelmStuck is returned if when running a helm install or upgrade command, a previous install or upgrade + // attempt is already in progress which this tool cannot work around. + ErrHelmStuck = &LocalError{ + msg: "another helm operation (install/upgrade/rollback) is in progress", + help: `An error occurred while attempting to run a helm install or upgrade. +If this error persists, you may need to run the "abctl local uninstall" command before attempting to run the +"abctl local install" command again. +Your data will persist between the uninstall and install commands. +`, + } + // ErrKubernetes is returned anytime an error occurs when attempting to communicate with the kubernetes cluster. ErrKubernetes = &LocalError{ msg: "error communicating with kubernetes", help: `An error occurred while communicating with the Kubernetes cluster. - If this error persists, you may need to run the uninstall command before attempting to run - the install command again.`, +If this error persists, you may need to run the "abctl local uninstall" command before attempting to run the +"abctl local install" command again. +Your data will persist between the uninstall and install commands.`, } // ErrIngress is returned in the event that ingress configuration failed. @@ -82,7 +94,7 @@ By default, abctl will allow access from any hostname or IP, so you might not ne } ErrBootloaderFailed = &LocalError{ - msg: "bootloader failed", + msg: "bootloader failed", help: "The bootloader failed to its initialization checks or migrations. Try running again with --verbose to see the full bootloader logs.", } )