Skip to content

Commit

Permalink
core: add restore mon quorum in go
Browse files Browse the repository at this point in the history
Signed-off-by: subhamkrai <srai@redhat.com>
  • Loading branch information
subhamkrai committed Apr 27, 2023
1 parent a368ac0 commit 76feb3e
Show file tree
Hide file tree
Showing 15 changed files with 524 additions and 86 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/collect-logs/action.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# This isn't to be used for the go integration tests because their logs are placed in a different location and require few extra steps.
name: Log Collector
description: Log collector for canary test
inputs:
name:
description: Name to use for the workflow
required: true

runs:
using: "composite"
steps:
- name: collect common logs
shell: bash --noprofile --norc -eo pipefail -x {0}
run: |
tests/collect-logs.sh
- name: Upload canary test result
uses: actions/upload-artifact@v2
with:
name: ${{ inputs.name }}
path: test
25 changes: 24 additions & 1 deletion .github/workflows/go-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defaults:
shell: bash --noprofile --norc -eo pipefail -x {0}

jobs:
with-krew:
go-test:
runs-on: ubuntu-20.04
steps:
- name: checkout
Expand Down Expand Up @@ -36,6 +36,17 @@ jobs:
run: |
set -e
kubectl rook-ceph ceph status
# test the mon restore to restore to mon a, delete mons b and c, then add d and e
export ROOK_PLUGIN_SKIP_PROMPTS=true
kubectl rook-ceph mons restore-quorum a
kubectl -n rook-ceph wait pod -l app=rook-ceph-mon-b --for=delete --timeout=90s
kubectl -n rook-ceph wait pod -l app=rook-ceph-mon-c --for=delete --timeout=90s
tests/github-action-helper.sh wait_for_three_mons rook-ceph
kubectl -n rook-ceph wait deployment rook-ceph-mon-d --for condition=Available=True --timeout=90s
kubectl -n rook-ceph wait deployment rook-ceph-mon-e --for condition=Available=True --timeout=90s
kubectl rook-ceph mons
kubectl rook-ceph rbd ls replicapool
Expand All @@ -55,3 +66,15 @@ jobs:
kubectl rook-ceph rook status all
kubectl rook-ceph rook status cephobjectstores
kubectl rook-ceph rook purge-osd 0 --force
- name: collect common logs
if: always()
uses: ./.github/workflows/collect-logs
with:
name: go-test

- name: consider debugging
if: failure()
uses: mxschmitt/action-tmate@v3
with:
use-tmate: ${{ secrets.USE_TMATE }}
2 changes: 1 addition & 1 deletion cmd/commands/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ var CephCmd = &cobra.Command{
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
context := GetContext()
fmt.Println(exec.RunCommandInOperatorPod(context, cmd.Use, args, OperatorNamespace, CephClusterNamespace))
fmt.Println(exec.RunCommandInOperatorPod(context, cmd.Use, args, OperatorNamespace, CephClusterNamespace, true))
},
}
19 changes: 18 additions & 1 deletion cmd/commands/mons.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,27 @@ var MonCmd = &cobra.Command{
Use: "mons",
Short: "Output mon endpoints",
DisableFlagParsing: true,
Run: func(cmd *cobra.Command, args []string) {
Args: cobra.MaximumNArgs(1),
Run: func(_ *cobra.Command, args []string) {
if len(args) == 0 {
context := GetContext()
fmt.Println(mons.GetMonEndpoint(context, CephClusterNamespace))
}
},
}

// RestoreQuorum represents the mons command
var RestoreQuorum = &cobra.Command{
Use: "restore-quorum",
Short: "When quorum is lost, restore quorum to the remaining healthy mon",
DisableFlagParsing: true,
Args: cobra.ExactArgs(1),
Run: func(_ *cobra.Command, args []string) {
context := GetContext()
mons.RestoreQuorum(context, OperatorNamespace, CephClusterNamespace, args[0])
},
}

func init() {
MonCmd.AddCommand(RestoreQuorum)
}
2 changes: 1 addition & 1 deletion cmd/commands/rbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ var RbdCmd = &cobra.Command{
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
context := GetContext()
fmt.Println(exec.RunCommandInOperatorPod(context, cmd.Use, args, OperatorNamespace, CephClusterNamespace))
fmt.Println(exec.RunCommandInOperatorPod(context, cmd.Use, args, OperatorNamespace, CephClusterNamespace, true))
},
}
2 changes: 1 addition & 1 deletion cmd/commands/rook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var versionCmd = &cobra.Command{
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
context := GetContext()
fmt.Println(exec.RunCommandInOperatorPod(context, "rook", []string{cmd.Use}, OperatorNamespace, CephClusterNamespace))
fmt.Println(exec.RunCommandInOperatorPod(context, "rook", []string{cmd.Use}, OperatorNamespace, CephClusterNamespace, true))
},
}

Expand Down
31 changes: 14 additions & 17 deletions kubectl-rook-ceph.sh
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ function path_cm_rook_ceph_operator_config() {
# 'kubectl rook-ceph mons' commands
####################################################################################################

function run_mons_command () {
function run_mons_command() {
if [ "$#" -ge 1 ] && [ "$1" = "restore-quorum" ]; then
shift # remove the subcommand from the front of the arg list
run_restore_quorum "$@"
Expand All @@ -253,16 +253,16 @@ function wait_for_deployment_to_be_running() {
function run_restore_quorum() {
parse_flags parse_image_flag "$@" # parse flags before the good mon name
[[ -z "${REMAINING_ARGS[0]:-""}" ]] && fail_error "Missing healthy mon name"
good_mon="${REMAINING_ARGS[0]}" # get the good mon being used to restore quorum
shift # remove the healthy mon from the front of the arg list
REMAINING_ARGS=("${REMAINING_ARGS[@]:1}") # remove mon name from remaining args
end_of_command_parsing "$@" # end of command tree
good_mon="${REMAINING_ARGS[0]}" # get the good mon being used to restore quorum
shift # remove the healthy mon from the front of the arg list
REMAINING_ARGS=("${REMAINING_ARGS[@]:1}") # remove mon name from remaining args
end_of_command_parsing "$@" # end of command tree

# Parse the endpoints configmap for the mon endpoints
bad_mons=()
mon_endpoints=$(KUBECTL_NS_CLUSTER get cm rook-ceph-mon-endpoints -o jsonpath='{.data.data}')
# split the endpoints into an array, separated by the comma
for single_mon in ${mon_endpoints//,/ } ; do
for single_mon in ${mon_endpoints//,/ }; do
mon_name=$(echo "${single_mon/=/ }" | awk '{print $1}')
mon_endpoint=$(echo "${single_mon/=/ }" | awk '{print $2}')
echo "mon=$mon_name, endpoint=$mon_endpoint"
Expand Down Expand Up @@ -335,12 +335,11 @@ function run_restore_quorum() {
--public-bind-addr=$ROOK_POD_IP \
--extract-monmap=$monmap_path

info_msg "Printing monmap"; \
info_msg "Printing monmap"
KUBECTL_NS_CLUSTER exec deploy/rook-ceph-mon-$good_mon-debug -c mon -- monmaptool --print $monmap_path

# remove all the mons except the good one
for bad_mon in "${bad_mons[@]}"
do
for bad_mon in "${bad_mons[@]}"; do
info_msg "Removing mon $bad_mon"
KUBECTL_NS_CLUSTER exec deploy/rook-ceph-mon-$good_mon-debug -c mon -- monmaptool $monmap_path --rm $bad_mon
done
Expand Down Expand Up @@ -381,8 +380,7 @@ function run_restore_quorum() {
info_msg "Purging the bad mons: ${bad_mons[*]}"
# ignore errors purging old mons if their resources don't exist
set +e
for bad_mon in "${bad_mons[@]}"
do
for bad_mon in "${bad_mons[@]}"; do
info_msg "purging old mon: $bad_mon"
KUBECTL_NS_CLUSTER delete deploy rook-ceph-mon-$bad_mon
KUBECTL_NS_CLUSTER delete svc rook-ceph-mon-$bad_mon
Expand Down Expand Up @@ -433,8 +431,7 @@ function wait_for_mon_status_response() {
sleep_time=5

exit_status=1
while [[ $exit_status != 0 ]]
do
while [[ $exit_status != 0 ]]; do
# Don't fail the script if the ceph command fails
set +e
KUBECTL_NS_CLUSTER exec deploy/rook-ceph-tools -- ceph status --connect-timeout=3
Expand Down Expand Up @@ -642,8 +639,8 @@ function run_start_debug() {
# 3) debug start deploymentName
parse_flags parse_image_flag "$@" # parse flags before the deployment name
[[ -z "${REMAINING_ARGS[0]:-""}" ]] && fail_error "Missing mon or osd deployment name"
deployment_name="${REMAINING_ARGS[0]}" # get deployment name
REMAINING_ARGS=("${REMAINING_ARGS[@]:1}") # remove deploy name from remaining args
deployment_name="${REMAINING_ARGS[0]}" # get deployment name
REMAINING_ARGS=("${REMAINING_ARGS[@]:1}") # remove deploy name from remaining args
set +u
parse_flags parse_image_flag "${REMAINING_ARGS[@]}" # parse flags after the deployment name
set -u
Expand Down Expand Up @@ -694,8 +691,8 @@ function run_start_debug() {
spec:
$deployment_spec
EOF
info_msg "ensure the debug deployment $deployment_name is scaled up"
KUBECTL_NS_CLUSTER scale deployments "$deployment_name-debug" --replicas=1
info_msg "ensure the debug deployment $deployment_name is scaled up"
KUBECTL_NS_CLUSTER scale deployments "$deployment_name-debug" --replicas=1
}

function run_stop_debug() {
Expand Down
47 changes: 17 additions & 30 deletions pkg/debug/start_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ import (
"github.com/rook/kubectl-rook-ceph/pkg/k8sutil"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func StartDebug(context *k8sutil.Context, clusterNamespace, deploymentName, alternateImageValue string) {

err := startDebug(context, clusterNamespace, deploymentName, alternateImageValue)
if err != nil {
fmt.Println(err)
Expand All @@ -41,11 +39,14 @@ func StartDebug(context *k8sutil.Context, clusterNamespace, deploymentName, alte
}

func startDebug(context *k8sutil.Context, clusterNamespace, deploymentName, alternateImageValue string) error {
deployment, err := verifyDeploymentExists(context, clusterNamespace, deploymentName)
originalDeployment, err := GetDeployment(context, clusterNamespace, deploymentName)
if err != nil {
return fmt.Errorf("Missing mon or osd deployment name %s. %v\n", deploymentName, err)
}

// We need to dereference the deployment as it is required for the debug deployment
deployment := *originalDeployment

if alternateImageValue != "" {
log.Printf("setting debug image to %s\n", alternateImageValue)
deployment.Spec.Template.Spec.Containers[0].Image = alternateImageValue
Expand All @@ -62,23 +63,21 @@ func startDebug(context *k8sutil.Context, clusterNamespace, deploymentName, alte
deployment.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "infinity"}
deployment.Spec.Template.Spec.Containers[0].Args = []string{}

if err := updateDeployment(context, clusterNamespace, deployment); err != nil {
return fmt.Errorf("Failed to update deployment %s. %v\n", deployment.Name, err)
}

deploymentPodName, err := waitForPodToRun(context, clusterNamespace, deployment.Spec)
labelSelector := fmt.Sprintf("ceph_daemon_type=%s,ceph_daemon_id=%s", deployment.Spec.Template.Labels["ceph_daemon_type"], deployment.Spec.Template.Labels["ceph_daemon_id"])
deploymentPodName, err := k8sutil.WaitForPodToRun(context, clusterNamespace, labelSelector)
if err != nil {
fmt.Println(err)
return err
}

if err := setDeploymentScale(context, clusterNamespace, deployment.Name, 0); err != nil {
if err := SetDeploymentScale(context, clusterNamespace, deployment.Name, 0); err != nil {
return err
}
fmt.Printf("deployment %s scaled down\n", deployment.Name)

fmt.Printf("waiting for the deployment pod %s to be deleted\n", deploymentPodName)
fmt.Printf("waiting for the deployment pod %s to be deleted\n", deploymentPodName.Name)

err = waitForPodDeletion(context, clusterNamespace, deploymentPodName)
err = waitForPodDeletion(context, clusterNamespace, deploymentName)
if err != nil {
fmt.Println(err)
return err
Expand All @@ -99,13 +98,14 @@ func startDebug(context *k8sutil.Context, clusterNamespace, deploymentName, alte
}
fmt.Printf("ensure the debug deployment %s is scaled up\n", deploymentName)

if err := setDeploymentScale(context, clusterNamespace, debugDeployment.Name, 1); err != nil {
if err := SetDeploymentScale(context, clusterNamespace, debugDeployment.Name, 1); err != nil {
return err
}

return nil
}

func setDeploymentScale(context *k8sutil.Context, clusterNamespace, deploymentName string, scaleCount int) error {
func SetDeploymentScale(context *k8sutil.Context, clusterNamespace, deploymentName string, scaleCount int) error {
scale := &autoscalingv1.Scale{
ObjectMeta: v1.ObjectMeta{
Name: deploymentName,
Expand All @@ -122,11 +122,14 @@ func setDeploymentScale(context *k8sutil.Context, clusterNamespace, deploymentNa
return nil
}

func verifyDeploymentExists(context *k8sutil.Context, clusterNamespace, deploymentName string) (*appsv1.Deployment, error) {
func GetDeployment(context *k8sutil.Context, clusterNamespace, deploymentName string) (*appsv1.Deployment, error) {
fmt.Printf("fetching the deployment %s to be running\n", deploymentName)
deployment, err := context.Clientset.AppsV1().Deployments(clusterNamespace).Get(ctx.TODO(), deploymentName, v1.GetOptions{})
if err != nil {
fmt.Printf("deployment %s doesn't exist. %v", deploymentName, err)
return nil, err
}
fmt.Printf("deployment %s exists\n", deploymentName)
return deployment, nil
}

Expand All @@ -138,22 +141,6 @@ func updateDeployment(context *k8sutil.Context, clusterNamespace string, deploym
return nil
}

func waitForPodToRun(context *k8sutil.Context, clusterNamespace string, deploymentSpec appsv1.DeploymentSpec) (string, error) {
labelSelector := fmt.Sprintf("ceph_daemon_type=%s,ceph_daemon_id=%s", deploymentSpec.Template.Labels["ceph_daemon_type"], deploymentSpec.Template.Labels["ceph_daemon_id"])
for i := 0; i < 60; i++ {
pod, _ := context.Clientset.CoreV1().Pods(clusterNamespace).List(ctx.TODO(), v1.ListOptions{LabelSelector: labelSelector})
if pod.Items[0].Status.Phase == corev1.PodRunning && pod.Items[0].DeletionTimestamp.IsZero() {
return pod.Items[0].Name, nil
}

fmt.Println("waiting for pod to be running")
time.Sleep(time.Second * 5)
}

return "", fmt.Errorf("No pod with labels matching %s:%s", deploymentSpec.Template.Labels, deploymentSpec.Template.Labels)

}

func waitForPodDeletion(context *k8sutil.Context, clusterNamespace, podName string) error {
for i := 0; i < 60; i++ {
_, err := context.Clientset.CoreV1().Pods(clusterNamespace).Get(ctx.TODO(), podName, v1.GetOptions{})
Expand Down
5 changes: 2 additions & 3 deletions pkg/debug/stop_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@ func StopDebug(context *k8sutil.Context, clusterNamespace, deploymentName string
}

func stopDebug(context *k8sutil.Context, clusterNamespace, deploymentName string) error {

if !strings.HasSuffix(deploymentName, "-debug") {
deploymentName = deploymentName + "-debug"
}

debugDeployment, err := verifyDeploymentExists(context, clusterNamespace, deploymentName)
debugDeployment, err := GetDeployment(context, clusterNamespace, deploymentName)
if err != nil {
return fmt.Errorf("Missing mon or osd debug deployment name %s. %v\n", deploymentName, err)
}
Expand All @@ -55,7 +54,7 @@ func stopDebug(context *k8sutil.Context, clusterNamespace, deploymentName string
}

original_deployment_name := strings.ReplaceAll(deploymentName, "-debug", "")
if err := setDeploymentScale(context, clusterNamespace, original_deployment_name, 1); err != nil {
if err := SetDeploymentScale(context, clusterNamespace, original_deployment_name, 1); err != nil {
return err
}
return nil
Expand Down
Loading

0 comments on commit 76feb3e

Please sign in to comment.