-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shipa 2578 #236
Shipa 2578 #236
Conversation
Add conditional to statefulset and deployment yamls Move spec.template.spec into a template Add needed fields to appSpec Add volumeClaimTemplates to values if found Add watch statefulset functionality Add test
I lowered the limit for coverage since the code that was causing it was some additional fields to a struct in v1beta1. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does that warrant a ticket for the Ketch CLI incorporating StatefulSet Apps? Looks good. Tested it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets try to another approach (maybe in another branch?) so we can compare them.
creating duplicates of complex and not well tested functions is sort of technical debt we should try to avoid.
return nil | ||
} | ||
|
||
func (r *AppReconciler) watchStatefulSetFunc(ctx context.Context, cleanup cleanupFunc, app *ketchv1.App, namespace string, set *appsv1.StatefulSet, processName string, recorder record.EventRecorder, watcher watch.Interface, cli kubernetes.Interface, timeout <-chan time.Time, stopFunc func()) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with this approach we are going to duplicate several functions, these functions are quite complex and not well tested because of their complexity.
would you mind trying another approach like one below.
Asking because of two things:
- we only need a few fields from either Deployment or Statefulset, so it is easy to introduce a struct containing that fields
- we can introduce an interface additionally to
workloadClient
and makewatchFunc
function much testable.
type condition struct {
Type string
Reason string
}
type workload struct {
Name string
Replicas int
UpdatedReplicas int
ReadyReplicas int
Conditions []condition
}
type workloadClient struct {
workloadType string
workloadName string
workloadNamespace string
k8sClient kubernetes.Interface
}
func (cli workloadClient) Get(ctx context.Context) (*workload, error) {
switch cli.workloadType {
case "Deployment":
o, err := cli.k8sClient.AppsV1().Deployments(cli.workloadNamespace).Get(ctx, cli.workloadName, metav1.GetOptions{})
if err != nil {
return nil, err
}
w := workload{
Name: o.Name,
UpdatedReplicas: int(o.Status.UpdatedReplicas),
ReadyReplicas: int(o.Status.ReadyReplicas),
}
if o.Spec.Replicas != nil {
w.Replicas = int(*o.Spec.Replicas)
}
for _, c := range o.Status.Conditions {
w.Conditions = append(w.Conditions, condition{Type: string(c.Type), Reason: c.Reason})
}
return &w, nil
case "StatefulSet":
// .. mostly the same code
}
return nil, fmt.Errorf("unknown workload type")
}
func (r *AppReconciler) watchFunc(ctx context.Context, cleanup cleanupFunc, app *ketchv1.App, processName string, recorder record.EventRecorder, watcher watch.Interface, cli *workloadClient, timeout <-chan time.Time) error {
defer cleanup()
var err error
watchCh := watcher.ResultChan()
defer watcher.Stop()
set, err := cli.Get(ctx)
if err != nil {
return err
}
specReplicas := set.Replicas
oldUpdatedReplicas := -1
oldReadyUnits := -1
oldPendingTermination := -1
now := time.Now()
var healthcheckTimeout <-chan time.Time
for {
for i := range set.Conditions {
c := set.Conditions[i]
if c.Type == DeploymentProgressing && c.Reason == deadlineExeceededProgressCond {
deadlineExceededEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("deployment %q exceeded its progress deadline", set.Name), processName)
recorder.AnnotatedEventf(app, deadlineExceededEvent.Annotations, v1.EventTypeWarning, deadlineExceededEvent.Reason, deadlineExceededEvent.Description)
return errors.Errorf("deployment %q exceeded its progress deadline", set.Name)
}
}
if oldUpdatedReplicas != set.UpdatedReplicas {
unitsCreatedEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d of %d new units created", set.UpdatedReplicas, specReplicas), processName)
recorder.AnnotatedEventf(app, unitsCreatedEvent.Annotations, v1.EventTypeNormal, unitsCreatedEvent.Reason, unitsCreatedEvent.Description)
}
if healthcheckTimeout == nil && set.UpdatedReplicas == specReplicas {
err := checkPodStatus(r.Group, r.Client, app.Name, app.Spec.Deployments[len(app.Spec.Deployments)-1].Version)
if err == nil {
healthcheckTimeout = time.After(maxWaitTimeDuration)
healthcheckEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("waiting healthcheck on %d created units", specReplicas), processName)
recorder.AnnotatedEventf(app, healthcheckEvent.Annotations, v1.EventTypeNormal, healthcheckEvent.Reason, healthcheckEvent.Description)
}
}
readyUnits := set.ReadyReplicas
if oldReadyUnits != readyUnits && readyUnits >= 0 {
unitsReadyEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d of %d new units ready", readyUnits, specReplicas), processName)
recorder.AnnotatedEventf(app, unitsReadyEvent.Annotations, v1.EventTypeNormal, unitsReadyEvent.Reason, unitsReadyEvent.Description)
}
pendingTermination := set.Replicas - set.UpdatedReplicas
if oldPendingTermination != pendingTermination && pendingTermination > 0 {
pendingTerminationEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileUpdate, fmt.Sprintf("%d old units pending termination", pendingTermination), processName)
recorder.AnnotatedEventf(app, pendingTerminationEvent.Annotations, v1.EventTypeNormal, pendingTerminationEvent.Reason, pendingTerminationEvent.Description)
}
oldUpdatedReplicas = set.UpdatedReplicas
oldReadyUnits = readyUnits
oldPendingTermination = pendingTermination
if readyUnits == specReplicas &&
set.Replicas == specReplicas {
break
}
select {
case <-time.After(100 * time.Millisecond):
case msg, isOpen := <-watchCh:
if !isOpen {
break
}
if isDeploymentEvent(msg, set.Name) {
appDeploymentEvent := appDeploymentEventFromWatchEvent(msg, app, processName)
recorder.AnnotatedEventf(app, appDeploymentEvent.Annotations, v1.EventTypeNormal, ketchv1.AppReconcileUpdate, appDeploymentEvent.Description)
}
case <-healthcheckTimeout:
err = createDeployTimeoutError(ctx, cli.k8sClient, app, time.Since(now), cli.workloadNamespace, app.GroupVersionKind().Group, "healthcheck")
healthcheckTimeoutEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error waiting for healthcheck: %s", err.Error()), processName)
recorder.AnnotatedEventf(app, healthcheckTimeoutEvent.Annotations, v1.EventTypeWarning, healthcheckTimeoutEvent.Reason, healthcheckTimeoutEvent.Description)
return err
case <-timeout:
err = createDeployTimeoutError(ctx, cli.k8sClient, app, time.Since(now), cli.workloadNamespace, app.GroupVersionKind().Group, "full rollout")
timeoutEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("deployment timeout: %s", err.Error()), processName)
recorder.AnnotatedEventf(app, timeoutEvent.Annotations, v1.EventTypeWarning, timeoutEvent.Reason, timeoutEvent.Description)
return err
case <-ctx.Done():
return ctx.Err()
}
set, err = cli.Get(ctx)
if err != nil {
deploymentErrorEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileError, fmt.Sprintf("error getting deployments: %s", err.Error()), processName)
recorder.AnnotatedEventf(app, deploymentErrorEvent.Annotations, v1.EventTypeWarning, deploymentErrorEvent.Reason, deploymentErrorEvent.Description)
return err
}
}
outcome := ketchv1.AppReconcileOutcome{AppName: app.Name, DeploymentCount: int(set.ReadyReplicas)}
outcomeEvent := newAppDeploymentEvent(app, ketchv1.AppReconcileComplete, outcome.String(), processName)
recorder.AnnotatedEventf(app, outcomeEvent.Annotations, v1.EventTypeNormal, outcomeEvent.Reason, outcomeEvent.Description)
return nil
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taking a look into this now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have implemented this approach and confirmed it works, the corresponding changes can be found in this pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great!
lets discuss which PR we want to merge
@stinkyfingers @koncar
Closed in favor of #238 |
Description
Apps can now be deployed as a statefulset if the
type
inappSpec
is specified as "StatefulSet", andvolumeClaimTemplates
are provided. The default type for apps isdeployment
.There is some duplication in
app_controller.go
with the functionswatchStatefulSetEvents
andwatchStatefulSetFunc
. There are some minor differences between adeployment
and astatefulset
, that I could not get to work in one singular function.However, both
deployment
andstatefulset
useAppDeploymentEvent
. I'm treating an app deployment (in terms of ketch) as type independent (when it comes to statefulset versus deployment in k8s). It is possible to create unique events based on which type of deployment if need be.Fixes # (issue)
Type of change
Testing
Documentation
Final Checklist:
Additional Information (omit if empty)
Please include anything else relevant to this PR that may be useful to know.