Skip to content

Commit

Permalink
feat: controller log to be structured
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-tatsuno committed Aug 11, 2020
1 parent 5eda8b8 commit 94b566a
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,18 +274,18 @@ func (wfc *WorkflowController) podLabeler(stopCh <-chan struct{}) {
case pod := <-wfc.completedPods:
parts := strings.Split(pod, "/")
if len(parts) != 2 {
log.Warnf("Unexpected item on completed pod channel: %s", pod)
log.WithFields(log.Fields{"pod": pod}).Warn("Unexpected item on completed pod channel")
continue
}
namespace := parts[0]
podName := parts[1]
err := common.AddPodLabel(wfc.kubeclientset, podName, namespace, common.LabelKeyCompleted, "true")
if err != nil {
if !apierr.IsNotFound(err) {
log.Errorf("Failed to label pod %s/%s completed: %+v", namespace, podName, err)
log.WithFields(log.Fields{"namespace": namespace, "pod": podName, "err": err}).Error("Failed to labeled pod completed")
}
} else {
log.Infof("Labeled pod %s/%s completed", namespace, podName)
log.WithFields(log.Fields{"namespace": namespace, "pod": podName}).Info("Labeled pod completed")
}
}
}
Expand All @@ -300,16 +300,16 @@ func (wfc *WorkflowController) podGarbageCollector(stopCh <-chan struct{}) {
case pod := <-wfc.gcPods:
parts := strings.Split(pod, "/")
if len(parts) != 2 {
log.Warnf("Unexpected item on gcPods channel: %s", pod)
log.WithFields(log.Fields{"pod": pod}).Info("Unexpected item on gcPods channel")
continue
}
namespace := parts[0]
podName := parts[1]
err := common.DeletePod(wfc.kubeclientset, podName, namespace)
if err != nil {
log.Errorf("Failed to delete pod %s/%s for gc: %+v", namespace, podName, err)
log.WithFields(log.Fields{"namespace": namespace, "pod": podName, "err": err}).Error("Failed to delete pod for gc")
} else {
log.Infof("Delete pod %s/%s for gc successfully", namespace, podName)
log.WithFields(log.Fields{"namespace": namespace, "pod": podName}).Info("Delete pod for gc successfully")
}
}
}
Expand Down Expand Up @@ -426,7 +426,7 @@ func (wfc *WorkflowController) processNextItem() bool {

obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string))
if err != nil {
log.Errorf("Failed to get workflow '%s' from informer index: %+v", key, err)
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
return true
}
if !exists {
Expand All @@ -438,18 +438,18 @@ func (wfc *WorkflowController) processNextItem() bool {
// workflow manifests that are unable to unmarshal to workflow objects
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("Key '%s' in index is not an unstructured", key)
log.WithFields(log.Fields{"key": key}).Warn("Index is not an unstructured")
return true
}

if key, ok = wfc.throttler.Next(key); !ok {
log.Warnf("Workflow %s processing has been postponed due to max parallelism limit", key)
log.WithFields(log.Fields{"key": key}).Warn("Workflow processing has been postponed due to max parallelism limit")
return true
}

wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("Failed to unmarshal key '%s' to workflow object: %v", key, err)
log.WithFields(log.Fields{"key": key, "error": err}).Warn("Failed to unmarshal key to workflow object")
woc := newWorkflowOperationCtx(wf, wfc)
woc.markWorkflowFailed(fmt.Sprintf("invalid spec: %s", err.Error()))
woc.persistUpdates()
Expand All @@ -459,7 +459,7 @@ func (wfc *WorkflowController) processNextItem() bool {

err = wfc.setWorkflowDefaults(wf)
if err != nil {
log.Warnf("Failed to apply default workflow values to '%s': %v", wf.Name, err)
log.WithFields(log.Fields{"key": wf.Name, "error": err}).Warn("Failed to apply default workflow values")
woc := newWorkflowOperationCtx(wf, wfc)
woc.markWorkflowFailed(fmt.Sprintf("invalid spec: %s", err.Error()))
woc.persistUpdates()
Expand Down Expand Up @@ -489,15 +489,15 @@ func (wfc *WorkflowController) processNextItem() bool {
priority, creationTime := getWfPriority(woc.wf)
acquired, wfUpdate, msg, err := wfc.syncManager.TryAcquire(woc.wf, "", priority, creationTime, woc.wf.Spec.Synchronization)
if err != nil {
log.Warnf("Failed to acquire the lock for '%s' : %v", key, err)
log.WithFields(log.Fields{"key": key, "error": err}).Warn("Failed to acquire the lock")
woc.markWorkflowFailed(fmt.Sprintf("Failed to acquire the synchronization lock. %s", err.Error()))
woc.persistUpdates()
wfc.throttler.Remove(key)
return true
}
woc.updated = wfUpdate
if !acquired {
log.Warnf("Workflow %s processing has been postponed due to concurrency limit. %s", key, msg)
log.WithFields(log.Fields{"key": key, "message": msg}).Warn("Workflow processing has been postponed due to concurrency limit")
woc.persistUpdates()
return true
}
Expand All @@ -509,7 +509,7 @@ func (wfc *WorkflowController) processNextItem() bool {
if woc.wf.Status.Fulfilled() {
// Release all acquired lock for completed workflow
if wfc.syncManager.ReleaseAll(woc.wf) {
log.Infof("%s released all acquired locks", wf.Name)
log.WithFields(log.Fields{"key": wf.Name}).Info("Released all acquired locks")
woc.updated = true
woc.persistUpdates()
}
Expand Down Expand Up @@ -560,7 +560,7 @@ func (wfc *WorkflowController) processNextPodItem() bool {

obj, exists, err := wfc.podInformer.GetIndexer().GetByKey(key.(string))
if err != nil {
log.Errorf("Failed to get pod '%s' from informer index: %+v", key, err)
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get pod from informer index")
return true
}
if !exists {
Expand All @@ -571,17 +571,17 @@ func (wfc *WorkflowController) processNextPodItem() bool {
}
pod, ok := obj.(*apiv1.Pod)
if !ok {
log.Warnf("Key '%s' in index is not a pod", key)
log.WithFields(log.Fields{"key": key}).Warn("Key in index is not a pod")
return true
}
if pod.Labels == nil {
log.Warnf("Pod '%s' did not have labels", key)
log.WithFields(log.Fields{"key": key}).Warn("Pod did not have labels")
return true
}
workflowName, ok := pod.Labels[common.LabelKeyWorkflow]
if !ok {
// Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly)
log.Warnf("watch returned pod unrelated to any workflow: %s", pod.ObjectMeta.Name)
log.WithFields(log.Fields{"key": pod.ObjectMeta.Name}).Warn("Watch returned pod unrelated to any workflow")
return true
}
// add this change after 1s - this reduces the number of workflow reconciliations -
Expand Down Expand Up @@ -841,12 +841,12 @@ func (wfc *WorkflowController) getMetricsServerConfig() (metrics.ServerConfig, m
func (wfc *WorkflowController) releaseAllWorkflowLocks(obj interface{}) {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("Key '%s' in index is not an unstructured", obj)
log.WithFields(log.Fields{"key": key}).Warn("Key in index is not an unstructured")
return
}
wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("Invalid workflow object: %v", obj)
log.WithFields(log.Fields{"key": key}).Warn("Invalid workflow object")
return
}
if wf.Status.Synchronization != nil {
Expand Down

0 comments on commit 94b566a

Please sign in to comment.