Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Use Wait for container/sandbox in unknown state.
Browse files Browse the repository at this point in the history
Signed-off-by: Lantao Liu <lantaol@google.com>
  • Loading branch information
Random-Liu committed Apr 16, 2019
1 parent 3b49d3f commit 23d1ac1
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 264 deletions.
2 changes: 1 addition & 1 deletion pkg/server/container_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
// start the monitor after updating container state, this ensures that
// event monitor receives the TaskExit event and update container state
// after this.
c.eventMonitor.startExitMonitor(id, task.Pid(), exitCh)
c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh)

return &runtime.StartContainerResponse{}, nil
}
Expand Down
57 changes: 25 additions & 32 deletions pkg/server/container_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"golang.org/x/sys/unix"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"

ctrdutil "github.com/containerd/cri/pkg/containerd/util"
"github.com/containerd/cri/pkg/store"
containerstore "github.com/containerd/cri/pkg/store/container"
)
Expand Down Expand Up @@ -74,36 +75,34 @@ func (c *criService) stopContainer(ctx context.Context, container containerstore
return errors.Wrapf(err, "failed to get task for container %q", id)
}
// Don't return for unknown state, some cleanup needs to be done.
if state != runtime.ContainerState_CONTAINER_UNKNOWN {
return nil
if state == runtime.ContainerState_CONTAINER_UNKNOWN {
return cleanupUnknownContainer(ctx, id, container)
}
// Task is an interface, explicitly set it to nil just in case.
task = nil
return nil
}

// Handle unknown state.
if state == runtime.ContainerState_CONTAINER_UNKNOWN {
status, err := getTaskStatus(ctx, task)
// Start an exit handler for containers in unknown state.
waitCtx, waitCancel := context.WithCancel(ctrdutil.NamespacedContext())
defer waitCancel()
exitCh, err := task.Wait(waitCtx)
if err != nil {
return errors.Wrapf(err, "failed to get task status for %q", id)
}
switch status.Status {
case containerd.Running, containerd.Created:
// The task is still running, continue stopping the task.
case containerd.Stopped:
// The task has exited. If the task exited after containerd
// started, the event monitor will receive its exit event; if it
// exited before containerd started, the event monitor will never
// receive its exit event.
// However, we can't tell that because the task state was not
// successfully loaded during containerd start (container is
// in UNKNOWN state).
// So always do cleanup here, just in case that we've missed the
// exit event.
return cleanupUnknownContainer(ctx, id, status, container)
default:
return errors.Wrapf(err, "unsupported task status %q", status.Status)
if !errdefs.IsNotFound(err) {
return errors.Wrapf(err, "failed to wait for task for %q", id)
}
return cleanupUnknownContainer(ctx, id, container)
}

exitCtx, exitCancel := context.WithCancel(context.Background())
stopCh := c.eventMonitor.startExitMonitor(exitCtx, id, task.Pid(), exitCh)
defer func() {
exitCancel()
// This ensures that exit monitor is stopped before
// `Wait` is cancelled, so no exit event is generated
// because of the `Wait` cancellation.
<-stopCh
}()
}

// We only need to kill the task. The event handler will Delete the
Expand Down Expand Up @@ -176,19 +175,13 @@ func (c *criService) waitContainerStop(ctx context.Context, container containers
}

// cleanupUnknownContainer cleanup stopped container in unknown state.
func cleanupUnknownContainer(ctx context.Context, id string, status containerd.Status,
cntr containerstore.Container) error {
func cleanupUnknownContainer(ctx context.Context, id string, cntr containerstore.Container) error {
// Reuse handleContainerExit to do the cleanup.
// NOTE(random-liu): If the task did exit after containerd started, both
// the event monitor and the cleanup function would update the container
// state. The final container state will be whatever being updated first.
// There is no way to completely avoid this race condition, and for best
// effort unknown state container cleanup, this seems acceptable.
return handleContainerExit(ctx, &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: 0,
ExitStatus: status.ExitStatus,
ExitedAt: status.ExitTime,
ExitStatus: unknownExitCode,
ExitedAt: time.Now(),
}, cntr)
}
60 changes: 20 additions & 40 deletions pkg/server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"k8s.io/apimachinery/pkg/util/clock"
runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"

"github.com/containerd/cri/pkg/constants"
ctrdutil "github.com/containerd/cri/pkg/containerd/util"
Expand Down Expand Up @@ -104,31 +103,36 @@ func (em *eventMonitor) subscribe(subscriber events.Subscriber) {
// note: filters are any match, if you want any match but not in namespace foo
// then you have to manually filter namespace foo
filters := []string{
`topic=="/tasks/exit"`,
`topic=="/tasks/oom"`,
`topic~="/images/"`,
}
em.ch, em.errCh = subscriber.Subscribe(em.ctx, filters...)
}

// startExitMonitor starts an exit monitor for a given container/sandbox.
func (em *eventMonitor) startExitMonitor(id string, pid uint32, exitCh <-chan containerd.ExitStatus) {
func (em *eventMonitor) startExitMonitor(ctx context.Context, id string, pid uint32, exitCh <-chan containerd.ExitStatus) <-chan struct{} {
stopCh := make(chan struct{})
go func() {
exitRes := <-exitCh
exitStatus, exitedAt, err := exitRes.Result()
if err != nil {
logrus.WithError(err).Errorf("Failed to get task exit status for %q", id)
exitStatus = unknownExitCode
exitedAt = time.Now()
}
em.exitCh <- &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: pid,
ExitStatus: exitStatus,
ExitedAt: exitedAt,
defer close(stopCh)
select {
case exitRes := <-exitCh:
exitStatus, exitedAt, err := exitRes.Result()
if err != nil {
logrus.WithError(err).Errorf("Failed to get task exit status for %q", id)
exitStatus = unknownExitCode
exitedAt = time.Now()
}
em.exitCh <- &eventtypes.TaskExit{
ContainerID: id,
ID: id,
Pid: pid,
ExitStatus: exitStatus,
ExitedAt: exitedAt,
}
case <-ctx.Done():
}
}()
return stopCh
}

func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
Expand All @@ -139,8 +143,6 @@ func convertEvent(e *gogotypes.Any) (string, interface{}, error) {
}

switch e := evt.(type) {
case *eventtypes.TaskExit:
id = e.ID
case *eventtypes.TaskOOM:
id = e.ContainerID
case *eventtypes.ImageCreate:
Expand Down Expand Up @@ -191,10 +193,6 @@ func (em *eventMonitor) start() <-chan error {
logrus.WithError(err).Errorf("Failed to convert event %+v", e)
break
}
if em.skipEvent(evt) {
logrus.Debugf("Skip event %+v for %q", evt, id)
break
}
if em.backOff.isInBackOff(id) {
logrus.Infof("Events for %q is in backoff, enqueue event %+v", id, evt)
em.backOff.enBackOff(id, evt)
Expand Down Expand Up @@ -296,24 +294,6 @@ func (em *eventMonitor) handleEvent(any interface{}) error {
return nil
}

// skipEvent returns whether the event should be skipped.
func (em *eventMonitor) skipEvent(any interface{}) bool {
switch e := any.(type) {
case *eventtypes.TaskExit:
// Only handle containerd TaskExit events for containers/sandboxes
// in unknown state. Containers/sandboxes in unknown state don't
// have corresponding exit monitors running, because they were failed
// to be loaded. We have to rely on containerd TaskExit events for them.
if cntr, err := em.c.containerStore.Get(e.ID); err == nil {
return cntr.Status.Get().State() != runtime.ContainerState_CONTAINER_UNKNOWN
}
if sb, err := em.c.sandboxStore.Get(e.ID); err == nil {
return sb.Status.Get().State != sandboxstore.StateUnknown
}
}
return false
}

// handleContainerExit handles TaskExit event for container.
func handleContainerExit(ctx context.Context, e *eventtypes.TaskExit, cntr containerstore.Container) error {
// Attach container IO so that `Delete` could cleanup the stream properly.
Expand Down
140 changes: 0 additions & 140 deletions pkg/server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import (
"github.com/containerd/typeurl"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/clock"

containerstore "github.com/containerd/cri/pkg/store/container"
sandboxstore "github.com/containerd/cri/pkg/store/sandbox"
)

// TestBackOff tests the logic of backOff struct.
Expand Down Expand Up @@ -135,140 +132,3 @@ func TestBackOff(t *testing.T) {
assert.Equal(t, actQueue, expQueue)
}
}

func TestSkipEvent(t *testing.T) {
c := newTestCRIService()
c.eventMonitor = newEventMonitor(c)
createdAt := time.Now().UnixNano()
startedAt := time.Now().UnixNano()
finishedAt := time.Now().UnixNano()
containers := []containerForTest{
{
metadata: containerstore.Metadata{
ID: "c1",
},
status: containerstore.Status{CreatedAt: createdAt},
},
{
metadata: containerstore.Metadata{
ID: "c2",
},
status: containerstore.Status{
CreatedAt: createdAt,
StartedAt: startedAt,
},
},
{
metadata: containerstore.Metadata{
ID: "c3",
},
status: containerstore.Status{
CreatedAt: createdAt,
StartedAt: startedAt,
FinishedAt: finishedAt,
},
},
{
metadata: containerstore.Metadata{
ID: "c4",
},
status: containerstore.Status{},
},
}
sandboxes := []sandboxstore.Sandbox{
sandboxstore.NewSandbox(
sandboxstore.Metadata{
ID: "s1",
},
sandboxstore.Status{
State: sandboxstore.StateReady,
},
),
sandboxstore.NewSandbox(
sandboxstore.Metadata{
ID: "s2",
},
sandboxstore.Status{
State: sandboxstore.StateNotReady,
},
),
sandboxstore.NewSandbox(
sandboxstore.Metadata{
ID: "s3",
},
sandboxstore.Status{
State: sandboxstore.StateUnknown,
},
),
}

// Inject test container metadata
for _, cntr := range containers {
container, err := cntr.toContainer()
assert.NoError(t, err)
assert.NoError(t, c.containerStore.Add(container))
}

// Inject test sandbox metadata
for _, sb := range sandboxes {
assert.NoError(t, c.sandboxStore.Add(sb))
}

for desc, test := range map[string]struct {
event interface{}
skip bool
}{
"should skip task exit event for created container": {
event: &eventtypes.TaskExit{
ID: "c1",
},
skip: true,
},
"should skip task exit event for running container": {
event: &eventtypes.TaskExit{
ID: "c2",
},
skip: true,
},
"should skip task exit event for exited container": {
event: &eventtypes.TaskExit{
ID: "c3",
},
skip: true,
},
"should not skip task exit event for unknown container": {
event: &eventtypes.TaskExit{
ID: "c4",
},
skip: false,
},
"should skip task exit event for ready sandbox": {
event: &eventtypes.TaskExit{
ID: "s1",
},
skip: true,
},
"should skip task exit event for notready sandbox": {
event: &eventtypes.TaskExit{
ID: "s2",
},
skip: true,
},
"should not skip task exit event for unknown sandbox": {
event: &eventtypes.TaskExit{
ID: "s3",
},
skip: false,
},
"should not skip task oom event": {
event: &eventtypes.TaskOOM{
ContainerID: "s3",
},
skip: false,
},
} {
t.Logf("TestCase: %s", desc)
skip := c.eventMonitor.skipEvent(test.event)
assert.Equal(t, test.skip, skip)
}
}
28 changes: 0 additions & 28 deletions pkg/server/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/BurntSushi/toml"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime/linux/runctypes"
runcoptions "github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/typeurl"
Expand Down Expand Up @@ -477,31 +474,6 @@ func unknownSandboxStatus() sandboxstore.Status {
}
}

// unknownExitStatus generates containerd.Status for container exited with unknown exit code.
func unknownExitStatus() containerd.Status {
return containerd.Status{
Status: containerd.Stopped,
ExitStatus: unknownExitCode,
ExitTime: time.Now(),
}
}

// getTaskStatus returns status for a given task. It returns unknown exit status if
// the task is nil or not found.
func getTaskStatus(ctx context.Context, task containerd.Task) (containerd.Status, error) {
if task == nil {
return unknownExitStatus(), nil
}
status, err := task.Status(ctx)
if err != nil {
if !errdefs.IsNotFound(err) {
return containerd.Status{}, err
}
return unknownExitStatus(), nil
}
return status, nil
}

// getPassthroughAnnotations filters requested pod annotations by comparing
// against permitted annotations for the given runtime.
func getPassthroughAnnotations(podAnnotations map[string]string,
Expand Down
Loading

0 comments on commit 23d1ac1

Please sign in to comment.