diff --git a/cmd/controller/main.go b/cmd/controller/main.go index b75c46fc..852708f8 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -44,6 +44,7 @@ import ( const ( threadsPerController = 2 logLevelKey = "controller" + resyncPeriod = 10 * time.Hour ) var ( @@ -90,9 +91,9 @@ func main() { logger.Fatalf("Error building Caching clientset: %v", err) } - kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) - buildInformerFactory := informers.NewSharedInformerFactory(buildClient, time.Second*30) - cachingInformerFactory := cachinginformers.NewSharedInformerFactory(cachingClient, time.Second*30) + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, resyncPeriod) + buildInformerFactory := informers.NewSharedInformerFactory(buildClient, resyncPeriod) + cachingInformerFactory := cachinginformers.NewSharedInformerFactory(cachingClient, resyncPeriod) buildInformer := buildInformerFactory.Build().V1alpha1().Builds() buildTemplateInformer := buildInformerFactory.Build().V1alpha1().BuildTemplates() @@ -100,9 +101,12 @@ func main() { imageInformer := cachingInformerFactory.Caching().V1alpha1().Images() podInformer := kubeInformerFactory.Core().V1().Pods() + timeoutHandler := build.NewTimeoutHandler(logger, kubeClient, buildClient, stopCh) + timeoutHandler.CheckTimeouts() // Build all of our controllers, with the clients constructed above. controllers := []*controller.Impl{ - build.NewController(logger, kubeClient, podInformer, buildClient, buildInformer, buildTemplateInformer, clusterBuildTemplateInformer), + build.NewController(logger, kubeClient, podInformer, buildClient, buildInformer, + buildTemplateInformer, clusterBuildTemplateInformer, timeoutHandler), clusterbuildtemplate.NewController(logger, kubeClient, buildClient, cachingClient, clusterBuildTemplateInformer, imageInformer), buildtemplate.NewController(logger, kubeClient, buildClient, diff --git a/pkg/reconciler/build/build.go b/pkg/reconciler/build/build.go index 4d98132d..8379ecce 100644 --- a/pkg/reconciler/build/build.go +++ b/pkg/reconciler/build/build.go @@ -19,6 +19,7 @@ package build import ( "context" "fmt" + "sync" "time" v1alpha1 "github.com/knative/build/pkg/apis/build/v1alpha1" @@ -55,6 +56,7 @@ type Reconciler struct { kubeclientset kubernetes.Interface // buildclientset is a clientset for our own API group buildclientset clientset.Interface + timeoutHandler *TimeoutSet buildsLister listers.BuildLister buildTemplatesLister listers.BuildTemplateLister @@ -71,6 +73,7 @@ type Reconciler struct { // Check that we implement the controller.Reconciler interface. var _ controller.Reconciler = (*Reconciler)(nil) +var statusMap = sync.Map{} func init() { // Add build-controller types to the default Kubernetes Scheme so Events can be @@ -87,6 +90,7 @@ func NewController( buildInformer informers.BuildInformer, buildTemplateInformer informers.BuildTemplateInformer, clusterBuildTemplateInformer informers.ClusterBuildTemplateInformer, + timeoutHandler *TimeoutSet, ) *controller.Impl { // Enrich the logs with controller name @@ -100,6 +104,7 @@ func NewController( clusterBuildTemplatesLister: clusterBuildTemplateInformer.Lister(), podsLister: podInformer.Lister(), Logger: logger, + timeoutHandler: timeoutHandler, } impl := controller.NewImpl(r, logger, "Builds", reconciler.MustNewStatsReporter("Builds", r.Logger)) @@ -216,6 +221,8 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } return err } + // Start goroutine that waits for either build timeout or build finish + go c.timeoutHandler.wait(build) } else { // If the build is ongoing, update its status based on its pod, and // check if it's timed out. @@ -227,22 +234,32 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error { } // Update the build's status based on the pod's status. + statusLock(build) build.Status = resources.BuildStatusFromPod(p, build.Spec) - - // Check if build has timed out; if it is, this will set its status - // accordingly. - if err := c.checkTimeout(build); err != nil { - return err + statusUnlock(build) + if isDone(&build.Status) { + // release goroutine that waits for build timeout + c.timeoutHandler.release(build) + // and remove key from status map + defer statusMap.Delete(key) } return c.updateStatus(build) } func (c *Reconciler) updateStatus(u *v1alpha1.Build) error { + statusLock(u) + defer statusUnlock(u) newb, err := c.buildclientset.BuildV1alpha1().Builds(u.Namespace).Get(u.Name, metav1.GetOptions{}) if err != nil { return err } + + cond := newb.Status.GetCondition(v1alpha1.BuildSucceeded) + if cond != nil && cond.Status == corev1.ConditionFalse { + return fmt.Errorf("can't update status of failed build %q", newb.Name) + } + newb.Status = u.Status _, err = c.buildclientset.BuildV1alpha1().Builds(u.Namespace).UpdateStatus(newb) @@ -353,3 +370,20 @@ func (c *Reconciler) checkTimeout(build *v1alpha1.Build) error { } return nil } + +func statusLock(build *v1alpha1.Build) { + key := fmt.Sprintf("%s/%s", build.Namespace, build.Name) + m, _ := statusMap.LoadOrStore(key, &sync.Mutex{}) + mut := m.(*sync.Mutex) + mut.Lock() +} + +func statusUnlock(build *v1alpha1.Build) { + key := fmt.Sprintf("%s/%s", build.Namespace, build.Name) + m, ok := statusMap.Load(key) + if !ok { + return + } + mut := m.(*sync.Mutex) + mut.Unlock() +} diff --git a/pkg/reconciler/build/build_test.go b/pkg/reconciler/build/build_test.go index 474fa5f0..eed7606e 100644 --- a/pkg/reconciler/build/build_test.go +++ b/pkg/reconciler/build/build_test.go @@ -86,7 +86,7 @@ func (f *fixture) createServiceAccounts(serviceAccounts ...*corev1.ServiceAccoun } } -func (f *fixture) newReconciler() (controller.Reconciler, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { +func (f *fixture) newReconciler(stopCh <-chan struct{}) (controller.Reconciler, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriod) logger := zap.NewExample().Sugar() i := informers.NewSharedInformerFactory(f.client, noResyncPeriod) @@ -94,7 +94,8 @@ func (f *fixture) newReconciler() (controller.Reconciler, informers.SharedInform buildTemplateInformer := i.Build().V1alpha1().BuildTemplates() clusterBuildTemplateInformer := i.Build().V1alpha1().ClusterBuildTemplates() podInformer := k8sI.Core().V1().Pods() - c := NewController(logger, f.kubeclient, podInformer, f.client, buildInformer, buildTemplateInformer, clusterBuildTemplateInformer) + timeoutHandler := NewTimeoutHandler(logger, f.kubeclient, f.client, stopCh) + c := NewController(logger, f.kubeclient, podInformer, f.client, buildInformer, buildTemplateInformer, clusterBuildTemplateInformer, timeoutHandler) return c.Reconciler, i, k8sI } @@ -134,10 +135,11 @@ func TestBuildNotFoundFlow(t *testing.T) { } f.client.PrependReactor("*", "*", reactor) - r, i, k8sI := f.newReconciler() - f.updateIndex(i, b) stopCh := make(chan struct{}) defer close(stopCh) + + r, i, k8sI := f.newReconciler(stopCh) + f.updateIndex(i, b) i.Start(stopCh) k8sI.Start(stopCh) @@ -153,7 +155,10 @@ func TestBuildWithBadKey(t *testing.T) { } f.createServiceAccount() - r, _, _ := f.newReconciler() + stopCh := make(chan struct{}) + defer close(stopCh) + + r, _, _ := f.newReconciler(stopCh) if err := r.Reconcile(context.Background(), "bad/worse/worst"); err != nil { t.Errorf("Unexpected error while syncing build: %s", err.Error()) } @@ -169,10 +174,11 @@ func TestBuildNotFoundError(t *testing.T) { } f.createServiceAccount() - r, i, k8sI := f.newReconciler() - // Don't update build informers with test build object stopCh := make(chan struct{}) defer close(stopCh) + + r, i, k8sI := f.newReconciler(stopCh) + // Don't update build informers with test build object i.Start(stopCh) k8sI.Start(stopCh) @@ -195,10 +201,11 @@ func TestBuildWithMissingServiceAccount(t *testing.T) { kubeclient: k8sfake.NewSimpleClientset(), } - r, i, k8sI := f.newReconciler() - f.updateIndex(i, b) stopCh := make(chan struct{}) defer close(stopCh) + + r, i, k8sI := f.newReconciler(stopCh) + f.updateIndex(i, b) i.Start(stopCh) k8sI.Start(stopCh) @@ -242,10 +249,11 @@ func TestBuildWithMissingSecret(t *testing.T) { Secrets: []corev1.ObjectReference{{Name: "missing-secret"}}, }) - r, i, k8sI := f.newReconciler() - f.updateIndex(i, b) stopCh := make(chan struct{}) defer close(stopCh) + + r, i, k8sI := f.newReconciler(stopCh) + f.updateIndex(i, b) i.Start(stopCh) k8sI.Start(stopCh) @@ -289,10 +297,11 @@ func TestBuildWithNonExistentTemplates(t *testing.T) { } f.createServiceAccount() - r, i, k8sI := f.newReconciler() - f.updateIndex(i, b) stopCh := make(chan struct{}) defer close(stopCh) + + r, i, k8sI := f.newReconciler(stopCh) + f.updateIndex(i, b) i.Start(stopCh) k8sI.Start(stopCh) @@ -346,10 +355,11 @@ func TestBuildWithTemplate(t *testing.T) { } f.createServiceAccount() - r, i, k8sI := f.newReconciler() - f.updateIndex(i, b) stopCh := make(chan struct{}) defer close(stopCh) + + r, i, k8sI := f.newReconciler(stopCh) + f.updateIndex(i, b) i.Start(stopCh) k8sI.Start(stopCh) @@ -439,10 +449,11 @@ func TestBasicFlows(t *testing.T) { } f.createServiceAccount() - r, i, k8sI := f.newReconciler() - f.updateIndex(i, b) stopCh := make(chan struct{}) defer close(stopCh) + + r, i, k8sI := f.newReconciler(stopCh) + f.updateIndex(i, b) i.Start(stopCh) k8sI.Start(stopCh) @@ -500,7 +511,7 @@ func TestBasicFlows(t *testing.T) { func TestTimeoutFlow(t *testing.T) { b := newBuild("timeout") - b.Spec.Timeout = &metav1.Duration{Duration: 1 * time.Second} + b.Spec.Timeout = &metav1.Duration{Duration: 500 * time.Millisecond} f := &fixture{ t: t, @@ -510,10 +521,10 @@ func TestTimeoutFlow(t *testing.T) { } f.createServiceAccount() - r, i, k8sI := f.newReconciler() - f.updateIndex(i, b) stopCh := make(chan struct{}) defer close(stopCh) + r, i, k8sI := f.newReconciler(stopCh) + f.updateIndex(i, b) i.Start(stopCh) k8sI.Start(stopCh) @@ -528,24 +539,8 @@ func TestTimeoutFlow(t *testing.T) { t.Errorf("error fetching build: %v", err) } - // Update the pod to indicate it was created 10m ago, which is - // longer than the build's timeout. - podName := b.Status.Cluster.PodName - p, err := f.kubeclient.CoreV1().Pods(metav1.NamespaceDefault).Get(podName, metav1.GetOptions{}) - if err != nil { - t.Fatalf("error getting pod %q: %v", podName, err) - } - p.CreationTimestamp.Time = metav1.Now().Time.Add(-10 * time.Minute) - if _, err := f.kubeclient.CoreV1().Pods(metav1.NamespaceDefault).Update(p); err != nil { - t.Fatalf("error updating pod %q: %v", podName, err) - } - - // Reconcile to pick up pod changes. - f.updatePodIndex(k8sI, p) - f.updateIndex(i, b) - if err := r.Reconcile(ctx, getKey(b, t)); err != nil { - t.Errorf("error syncing build: %v", err) - } + // Right now there is no better way to test timeout rather than wait for it + time.Sleep(600 * time.Millisecond) // Check that the build has the expected timeout status. b, err = buildClient.Get(b.Name, metav1.GetOptions{}) @@ -556,7 +551,7 @@ func TestTimeoutFlow(t *testing.T) { Type: duckv1alpha1.ConditionSucceeded, Status: corev1.ConditionFalse, Reason: "BuildTimeout", - Message: fmt.Sprintf("Build %q failed to finish within \"1s\"", b.Name), + Message: fmt.Sprintf("Build %q failed to finish within \"500ms\"", b.Name), }, ignoreVolatileTime); d != "" { t.Errorf("Unexpected build status %s", d) } @@ -573,10 +568,11 @@ func TestCancelledFlow(t *testing.T) { } f.createServiceAccount() - r, i, k8sI := f.newReconciler() - f.updateIndex(i, b) stopCh := make(chan struct{}) defer close(stopCh) + + r, i, k8sI := f.newReconciler(stopCh) + f.updateIndex(i, b) i.Start(stopCh) k8sI.Start(stopCh) diff --git a/pkg/reconciler/build/timeout_handler.go b/pkg/reconciler/build/timeout_handler.go new file mode 100644 index 00000000..a3da5c69 --- /dev/null +++ b/pkg/reconciler/build/timeout_handler.go @@ -0,0 +1,137 @@ +package build + +import ( + "fmt" + "sync" + "time" + + v1alpha1 "github.com/knative/build/pkg/apis/build/v1alpha1" + clientset "github.com/knative/build/pkg/client/clientset/versioned" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +var ( + done = make(map[string]chan bool) + doneMut = sync.Mutex{} +) + +// TimeoutSet contains required k8s interfaces to handle build timeouts +type TimeoutSet struct { + logger *zap.SugaredLogger + kubeclientset kubernetes.Interface + buildclientset clientset.Interface + stopCh <-chan struct{} +} + +// NewTimeoutHandler returns TimeoutSet filled structure +func NewTimeoutHandler(logger *zap.SugaredLogger, + kubeclientset kubernetes.Interface, + buildclientset clientset.Interface, + stopCh <-chan struct{}) *TimeoutSet { + return &TimeoutSet{ + logger: logger, + kubeclientset: kubeclientset, + buildclientset: buildclientset, + stopCh: stopCh, + } +} + +// CheckTimeouts walks through all builds and creates t.wait goroutines that handles build timeout +func (t *TimeoutSet) CheckTimeouts() { + namespaces, err := t.kubeclientset.CoreV1().Namespaces().List(metav1.ListOptions{}) + if err != nil { + t.logger.Errorf("Can't get namespaces list: %s", err) + } + for _, namespace := range namespaces.Items { + builds, err := t.buildclientset.BuildV1alpha1().Builds(namespace.GetName()).List(metav1.ListOptions{}) + if err != nil { + t.logger.Errorf("Can't get builds list: %s", err) + } + for _, build := range builds.Items { + build := build + if isDone(&build.Status) { + continue + } + if isCancelled(build.Spec) { + continue + } + go t.wait(&build) + } + } +} + +func (t *TimeoutSet) wait(build *v1alpha1.Build) { + key := fmt.Sprintf("%s/%s", build.Namespace, build.Name) + timeout := defaultTimeout + if build.Spec.Timeout != nil { + timeout = build.Spec.Timeout.Duration + } + runtime := time.Duration(0) + statusLock(build) + if build.Status.StartTime != nil && !build.Status.StartTime.Time.IsZero() { + runtime = time.Since(build.Status.StartTime.Time) + } + statusUnlock(build) + timeout -= runtime + + finished := make(chan bool) + doneMut.Lock() + done[key] = finished + doneMut.Unlock() + defer t.release(build) + + select { + case <-t.stopCh: + case <-finished: + case <-time.After(timeout): + if err := t.stopBuild(build); err != nil { + t.logger.Errorf("Can't stop build %q after timeout: %s", build.Name, err) + } + } +} + +func (t *TimeoutSet) release(build *v1alpha1.Build) { + doneMut.Lock() + defer doneMut.Unlock() + key := fmt.Sprintf("%s/%s", build.Namespace, build.Name) + if finished, ok := done[key]; ok { + delete(done, key) + close(finished) + } +} + +func (t *TimeoutSet) stopBuild(build *v1alpha1.Build) error { + statusLock(build) + defer statusUnlock(build) + if build.Status.Cluster != nil { + if err := t.kubeclientset.CoreV1().Pods(build.Namespace).Delete(build.Status.Cluster.PodName, &metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + return err + } + } + + timeout := defaultTimeout + if build.Spec.Timeout != nil { + timeout = build.Spec.Timeout.Duration + } + build.Status.SetCondition(&duckv1alpha1.Condition{ + Type: v1alpha1.BuildSucceeded, + Status: corev1.ConditionFalse, + Reason: "BuildTimeout", + Message: fmt.Sprintf("Build %q failed to finish within %q", build.Name, timeout.String()), + }) + build.Status.CompletionTime = &metav1.Time{time.Now()} + + newb, err := t.buildclientset.BuildV1alpha1().Builds(build.Namespace).Get(build.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + newb.Status = build.Status + _, err = t.buildclientset.BuildV1alpha1().Builds(build.Namespace).UpdateStatus(newb) + return err +}