Skip to content
This repository has been archived by the owner on Sep 5, 2019. It is now read-only.

Commit

Permalink
Enforce timeouts outside of resyncs (#563)
Browse files Browse the repository at this point in the history
* Use podName from build Status to create new pod

* Error on empty pod name. Remove pod name if validation fails

* Added timeout handler goroutines and build status locks

* Timeout value in build status message

* Resync period set to 10 hours. Fix build tests

* Sync lock on channel and status read/write. Test update
  • Loading branch information
tzununbekov authored and knative-prow-robot committed Feb 19, 2019
1 parent c9ec524 commit fa84c19
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 50 deletions.
12 changes: 8 additions & 4 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
const (
threadsPerController = 2
logLevelKey = "controller"
resyncPeriod = 10 * time.Hour
)

var (
Expand Down Expand Up @@ -90,19 +91,22 @@ func main() {
logger.Fatalf("Error building Caching clientset: %v", err)
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
buildInformerFactory := informers.NewSharedInformerFactory(buildClient, time.Second*30)
cachingInformerFactory := cachinginformers.NewSharedInformerFactory(cachingClient, time.Second*30)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, resyncPeriod)
buildInformerFactory := informers.NewSharedInformerFactory(buildClient, resyncPeriod)
cachingInformerFactory := cachinginformers.NewSharedInformerFactory(cachingClient, resyncPeriod)

buildInformer := buildInformerFactory.Build().V1alpha1().Builds()
buildTemplateInformer := buildInformerFactory.Build().V1alpha1().BuildTemplates()
clusterBuildTemplateInformer := buildInformerFactory.Build().V1alpha1().ClusterBuildTemplates()
imageInformer := cachingInformerFactory.Caching().V1alpha1().Images()
podInformer := kubeInformerFactory.Core().V1().Pods()

timeoutHandler := build.NewTimeoutHandler(logger, kubeClient, buildClient, stopCh)
timeoutHandler.CheckTimeouts()
// Build all of our controllers, with the clients constructed above.
controllers := []*controller.Impl{
build.NewController(logger, kubeClient, podInformer, buildClient, buildInformer, buildTemplateInformer, clusterBuildTemplateInformer),
build.NewController(logger, kubeClient, podInformer, buildClient, buildInformer,
buildTemplateInformer, clusterBuildTemplateInformer, timeoutHandler),
clusterbuildtemplate.NewController(logger, kubeClient, buildClient,
cachingClient, clusterBuildTemplateInformer, imageInformer),
buildtemplate.NewController(logger, kubeClient, buildClient,
Expand Down
44 changes: 39 additions & 5 deletions pkg/reconciler/build/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package build
import (
"context"
"fmt"
"sync"
"time"

v1alpha1 "github.com/knative/build/pkg/apis/build/v1alpha1"
Expand Down Expand Up @@ -55,6 +56,7 @@ type Reconciler struct {
kubeclientset kubernetes.Interface
// buildclientset is a clientset for our own API group
buildclientset clientset.Interface
timeoutHandler *TimeoutSet

buildsLister listers.BuildLister
buildTemplatesLister listers.BuildTemplateLister
Expand All @@ -71,6 +73,7 @@ type Reconciler struct {

// Check that we implement the controller.Reconciler interface.
var _ controller.Reconciler = (*Reconciler)(nil)
var statusMap = sync.Map{}

func init() {
// Add build-controller types to the default Kubernetes Scheme so Events can be
Expand All @@ -87,6 +90,7 @@ func NewController(
buildInformer informers.BuildInformer,
buildTemplateInformer informers.BuildTemplateInformer,
clusterBuildTemplateInformer informers.ClusterBuildTemplateInformer,
timeoutHandler *TimeoutSet,
) *controller.Impl {

// Enrich the logs with controller name
Expand All @@ -100,6 +104,7 @@ func NewController(
clusterBuildTemplatesLister: clusterBuildTemplateInformer.Lister(),
podsLister: podInformer.Lister(),
Logger: logger,
timeoutHandler: timeoutHandler,
}
impl := controller.NewImpl(r, logger, "Builds",
reconciler.MustNewStatsReporter("Builds", r.Logger))
Expand Down Expand Up @@ -216,6 +221,8 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
}
return err
}
// Start goroutine that waits for either build timeout or build finish
go c.timeoutHandler.wait(build)
} else {
// If the build is ongoing, update its status based on its pod, and
// check if it's timed out.
Expand All @@ -227,22 +234,32 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
}

// Update the build's status based on the pod's status.
statusLock(build)
build.Status = resources.BuildStatusFromPod(p, build.Spec)

// Check if build has timed out; if it is, this will set its status
// accordingly.
if err := c.checkTimeout(build); err != nil {
return err
statusUnlock(build)
if isDone(&build.Status) {
// release goroutine that waits for build timeout
c.timeoutHandler.release(build)
// and remove key from status map
defer statusMap.Delete(key)
}

return c.updateStatus(build)
}

func (c *Reconciler) updateStatus(u *v1alpha1.Build) error {
statusLock(u)
defer statusUnlock(u)
newb, err := c.buildclientset.BuildV1alpha1().Builds(u.Namespace).Get(u.Name, metav1.GetOptions{})
if err != nil {
return err
}

cond := newb.Status.GetCondition(v1alpha1.BuildSucceeded)
if cond != nil && cond.Status == corev1.ConditionFalse {
return fmt.Errorf("can't update status of failed build %q", newb.Name)
}

newb.Status = u.Status

_, err = c.buildclientset.BuildV1alpha1().Builds(u.Namespace).UpdateStatus(newb)
Expand Down Expand Up @@ -353,3 +370,20 @@ func (c *Reconciler) checkTimeout(build *v1alpha1.Build) error {
}
return nil
}

func statusLock(build *v1alpha1.Build) {
key := fmt.Sprintf("%s/%s", build.Namespace, build.Name)
m, _ := statusMap.LoadOrStore(key, &sync.Mutex{})
mut := m.(*sync.Mutex)
mut.Lock()
}

func statusUnlock(build *v1alpha1.Build) {
key := fmt.Sprintf("%s/%s", build.Namespace, build.Name)
m, ok := statusMap.Load(key)
if !ok {
return
}
mut := m.(*sync.Mutex)
mut.Unlock()
}
78 changes: 37 additions & 41 deletions pkg/reconciler/build/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,16 @@ func (f *fixture) createServiceAccounts(serviceAccounts ...*corev1.ServiceAccoun
}
}

func (f *fixture) newReconciler() (controller.Reconciler, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) {
func (f *fixture) newReconciler(stopCh <-chan struct{}) (controller.Reconciler, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) {
k8sI := kubeinformers.NewSharedInformerFactory(f.kubeclient, noResyncPeriod)
logger := zap.NewExample().Sugar()
i := informers.NewSharedInformerFactory(f.client, noResyncPeriod)
buildInformer := i.Build().V1alpha1().Builds()
buildTemplateInformer := i.Build().V1alpha1().BuildTemplates()
clusterBuildTemplateInformer := i.Build().V1alpha1().ClusterBuildTemplates()
podInformer := k8sI.Core().V1().Pods()
c := NewController(logger, f.kubeclient, podInformer, f.client, buildInformer, buildTemplateInformer, clusterBuildTemplateInformer)
timeoutHandler := NewTimeoutHandler(logger, f.kubeclient, f.client, stopCh)
c := NewController(logger, f.kubeclient, podInformer, f.client, buildInformer, buildTemplateInformer, clusterBuildTemplateInformer, timeoutHandler)
return c.Reconciler, i, k8sI
}

Expand Down Expand Up @@ -134,10 +135,11 @@ func TestBuildNotFoundFlow(t *testing.T) {
}
f.client.PrependReactor("*", "*", reactor)

r, i, k8sI := f.newReconciler()
f.updateIndex(i, b)
stopCh := make(chan struct{})
defer close(stopCh)

r, i, k8sI := f.newReconciler(stopCh)
f.updateIndex(i, b)
i.Start(stopCh)
k8sI.Start(stopCh)

Expand All @@ -153,7 +155,10 @@ func TestBuildWithBadKey(t *testing.T) {
}
f.createServiceAccount()

r, _, _ := f.newReconciler()
stopCh := make(chan struct{})
defer close(stopCh)

r, _, _ := f.newReconciler(stopCh)
if err := r.Reconcile(context.Background(), "bad/worse/worst"); err != nil {
t.Errorf("Unexpected error while syncing build: %s", err.Error())
}
Expand All @@ -169,10 +174,11 @@ func TestBuildNotFoundError(t *testing.T) {
}
f.createServiceAccount()

r, i, k8sI := f.newReconciler()
// Don't update build informers with test build object
stopCh := make(chan struct{})
defer close(stopCh)

r, i, k8sI := f.newReconciler(stopCh)
// Don't update build informers with test build object
i.Start(stopCh)
k8sI.Start(stopCh)

Expand All @@ -195,10 +201,11 @@ func TestBuildWithMissingServiceAccount(t *testing.T) {
kubeclient: k8sfake.NewSimpleClientset(),
}

r, i, k8sI := f.newReconciler()
f.updateIndex(i, b)
stopCh := make(chan struct{})
defer close(stopCh)

r, i, k8sI := f.newReconciler(stopCh)
f.updateIndex(i, b)
i.Start(stopCh)
k8sI.Start(stopCh)

Expand Down Expand Up @@ -242,10 +249,11 @@ func TestBuildWithMissingSecret(t *testing.T) {
Secrets: []corev1.ObjectReference{{Name: "missing-secret"}},
})

r, i, k8sI := f.newReconciler()
f.updateIndex(i, b)
stopCh := make(chan struct{})
defer close(stopCh)

r, i, k8sI := f.newReconciler(stopCh)
f.updateIndex(i, b)
i.Start(stopCh)
k8sI.Start(stopCh)

Expand Down Expand Up @@ -289,10 +297,11 @@ func TestBuildWithNonExistentTemplates(t *testing.T) {
}
f.createServiceAccount()

r, i, k8sI := f.newReconciler()
f.updateIndex(i, b)
stopCh := make(chan struct{})
defer close(stopCh)

r, i, k8sI := f.newReconciler(stopCh)
f.updateIndex(i, b)
i.Start(stopCh)
k8sI.Start(stopCh)

Expand Down Expand Up @@ -346,10 +355,11 @@ func TestBuildWithTemplate(t *testing.T) {
}
f.createServiceAccount()

r, i, k8sI := f.newReconciler()
f.updateIndex(i, b)
stopCh := make(chan struct{})
defer close(stopCh)

r, i, k8sI := f.newReconciler(stopCh)
f.updateIndex(i, b)
i.Start(stopCh)
k8sI.Start(stopCh)

Expand Down Expand Up @@ -439,10 +449,11 @@ func TestBasicFlows(t *testing.T) {
}
f.createServiceAccount()

r, i, k8sI := f.newReconciler()
f.updateIndex(i, b)
stopCh := make(chan struct{})
defer close(stopCh)

r, i, k8sI := f.newReconciler(stopCh)
f.updateIndex(i, b)
i.Start(stopCh)
k8sI.Start(stopCh)

Expand Down Expand Up @@ -500,7 +511,7 @@ func TestBasicFlows(t *testing.T) {

func TestTimeoutFlow(t *testing.T) {
b := newBuild("timeout")
b.Spec.Timeout = &metav1.Duration{Duration: 1 * time.Second}
b.Spec.Timeout = &metav1.Duration{Duration: 500 * time.Millisecond}

f := &fixture{
t: t,
Expand All @@ -510,10 +521,10 @@ func TestTimeoutFlow(t *testing.T) {
}
f.createServiceAccount()

r, i, k8sI := f.newReconciler()
f.updateIndex(i, b)
stopCh := make(chan struct{})
defer close(stopCh)
r, i, k8sI := f.newReconciler(stopCh)
f.updateIndex(i, b)
i.Start(stopCh)
k8sI.Start(stopCh)

Expand All @@ -528,24 +539,8 @@ func TestTimeoutFlow(t *testing.T) {
t.Errorf("error fetching build: %v", err)
}

// Update the pod to indicate it was created 10m ago, which is
// longer than the build's timeout.
podName := b.Status.Cluster.PodName
p, err := f.kubeclient.CoreV1().Pods(metav1.NamespaceDefault).Get(podName, metav1.GetOptions{})
if err != nil {
t.Fatalf("error getting pod %q: %v", podName, err)
}
p.CreationTimestamp.Time = metav1.Now().Time.Add(-10 * time.Minute)
if _, err := f.kubeclient.CoreV1().Pods(metav1.NamespaceDefault).Update(p); err != nil {
t.Fatalf("error updating pod %q: %v", podName, err)
}

// Reconcile to pick up pod changes.
f.updatePodIndex(k8sI, p)
f.updateIndex(i, b)
if err := r.Reconcile(ctx, getKey(b, t)); err != nil {
t.Errorf("error syncing build: %v", err)
}
// Right now there is no better way to test timeout rather than wait for it
time.Sleep(600 * time.Millisecond)

// Check that the build has the expected timeout status.
b, err = buildClient.Get(b.Name, metav1.GetOptions{})
Expand All @@ -556,7 +551,7 @@ func TestTimeoutFlow(t *testing.T) {
Type: duckv1alpha1.ConditionSucceeded,
Status: corev1.ConditionFalse,
Reason: "BuildTimeout",
Message: fmt.Sprintf("Build %q failed to finish within \"1s\"", b.Name),
Message: fmt.Sprintf("Build %q failed to finish within \"500ms\"", b.Name),
}, ignoreVolatileTime); d != "" {
t.Errorf("Unexpected build status %s", d)
}
Expand All @@ -573,10 +568,11 @@ func TestCancelledFlow(t *testing.T) {
}
f.createServiceAccount()

r, i, k8sI := f.newReconciler()
f.updateIndex(i, b)
stopCh := make(chan struct{})
defer close(stopCh)

r, i, k8sI := f.newReconciler(stopCh)
f.updateIndex(i, b)
i.Start(stopCh)
k8sI.Start(stopCh)

Expand Down
Loading

0 comments on commit fa84c19

Please sign in to comment.