Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ctrd,cli,daemon,pkg: update to adapt with containerd@v1.2.5 #2725

Merged
merged 2 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion cli/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/alibaba/pouch/pkg/jsonstream"
"github.com/alibaba/pouch/pkg/reference"

"github.com/containerd/containerd/progress"
"github.com/containerd/containerd/pkg/progress"
"github.com/spf13/cobra"
"golang.org/x/crypto/ssh/terminal"
)
Expand Down
28 changes: 19 additions & 9 deletions ctrd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/containerd/containerd"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/api/services/introspection/v1"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/typeurl"
Expand Down Expand Up @@ -235,22 +236,31 @@ func (c *Client) Plugins(ctx context.Context, filters []string) ([]Plugin, error
// collectContainerdEvents collects events generated by containerd.
func (c *Client) collectContainerdEvents() {
ctx := context.Background()
topicsToHandle := []string{TaskOOMEventTopic, TaskExitEventTopic}

// set filters for subscribe containerd events,
// now we only care about task and container events.
ef := []string{"topic~=task.*", "topic~=container.*"}
events, err := c.Events(ctx, ef...)
// get client
wrapperCli, err := c.Get(ctx)
if err != nil {
logrus.Errorf("failed to connect containerd event service: %v", err)
logrus.Errorf("failed to get a containerd grpc client: %v", err)
return
}
eventsClient := wrapperCli.client.EventService()

// set filters for subscribe containerd events,
// now we only care about task and container events.
ef := []string{"topic~=task.*", "topic~=container.*"}
topicsToHandle := []string{TaskOOMEventTopic, TaskExitEventTopic}

eventCh, errCh := eventsClient.Subscribe(ctx, ef...)

for {
// TODO(ziren):need reconnect the event service
e, err := events.Recv()
if err != nil {
logrus.Errorf("failed to receive event: %v", err)
var e *events.Envelope
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about combine var define with below:

		var (
			action      string
			containerID string
			attributes  = map[string]string{}
		)

select {
case e = <-eventCh:
case err := <-errCh:
if err != nil {
logrus.Errorf("failed to receive event: %v", err)
}
return
}

Expand Down
32 changes: 16 additions & 16 deletions ctrd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/linux/runctypes"
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/runtime/linux/runctypes"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -112,7 +112,7 @@ func (c *Client) execContainer(ctx context.Context, process *Process) error {
},
).Debugf("creating cio (withStdin=%v, withTerminal=%v)", withStdin, withTerminal)

fifoset, err := containerio.NewCioFIFOSet(execID, withStdin, withTerminal)
fifoset, err := containerio.NewFIFOSet(execID, withStdin, withTerminal)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func (c *Client) destroyContainer(ctx context.Context, id string, timeout int64)
return nil, fmt.Errorf("failed to get a containerd grpc client: %v", err)
}

ctx = leases.WithLease(ctx, wrapperCli.lease.ID())
ctx = leases.WithLease(ctx, wrapperCli.lease.ID)

if !c.lock.TrylockWithRetry(ctx, id) {
return nil, errtypes.ErrLockfailed
Expand Down Expand Up @@ -618,7 +618,7 @@ func (c *Client) createTask(ctx context.Context, id, checkpointDir string, conta
task, err := container.NewTask(ctx, func(_ string) (cio.IO, error) {
logrus.WithField("container", cntrID).Debugf("creating cio (withStdin=%v, withTerminal=%v)", withStdin, withTerminal)

fifoset, err := containerio.NewCioFIFOSet(execID, withStdin, withTerminal)
fifoset, err := containerio.NewFIFOSet(execID, withStdin, withTerminal)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -730,7 +730,7 @@ func (c *Client) waitContainer(ctx context.Context, id string) (types.ContainerW
return types.ContainerWaitOKBody{}, fmt.Errorf("failed to get a containerd grpc client: %v", err)
}

ctx = leases.WithLease(ctx, wrapperCli.lease.ID())
ctx = leases.WithLease(ctx, wrapperCli.lease.ID)

waitExit := func() *Message {
return c.ProbeContainer(ctx, id, -1*time.Second)
Expand Down Expand Up @@ -769,7 +769,7 @@ func (c *Client) CreateCheckpoint(ctx context.Context, id string, checkpointDir

var opts []containerd.CheckpointTaskOpts
if exit {
opts = append(opts, containerd.WithExit)
opts = append(opts, withExitShimV1CheckpointTaskOpts())
}
checkpoint, err := pack.task.Checkpoint(ctx, opts...)
if err != nil {
Expand All @@ -783,7 +783,7 @@ func (c *Client) CreateCheckpoint(ctx context.Context, id string, checkpointDir
}

func applyCheckpointImage(ctx context.Context, client *containerd.Client, checkpoint containerd.Image, checkpointDir string) error {
b, err := content.ReadBlob(ctx, client.ContentStore(), checkpoint.Target().Digest)
b, err := content.ReadBlob(ctx, client.ContentStore(), checkpoint.Target())
if err != nil {
return errors.Wrapf(err, "failed to retrieve checkpoint data")
}
Expand All @@ -803,7 +803,7 @@ func applyCheckpointImage(ctx context.Context, client *containerd.Client, checkp
return errors.Wrapf(err, "invalid checkpoint")
}

rat, err := client.ContentStore().ReaderAt(ctx, cpDesc.Digest)
rat, err := client.ContentStore().ReaderAt(ctx, *cpDesc)
if err != nil {
return errors.Wrapf(err, "failed to get checkpoint reader")
}
Expand All @@ -817,7 +817,7 @@ func applyCheckpointImage(ctx context.Context, client *containerd.Client, checkp
}

func writeContent(ctx context.Context, mediaType, ref string, r io.Reader, client *containerd.Client) (*containerdtypes.Descriptor, error) {
writer, err := client.ContentStore().Writer(ctx, ref, 0, "")
writer, err := client.ContentStore().Writer(ctx, content.WithRef(ref))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -866,10 +866,10 @@ func withCheckpointOpt(checkpoint *containerdtypes.Descriptor) containerd.NewTas
}

// InitStdio allows caller to handle any initialize job.
type InitStdio func(dio *containerio.DirectIO) (cio.IO, error)
type InitStdio func(dio *cio.DirectIO) (cio.IO, error)

func (c *Client) createIO(fifoSet *containerio.CioFIFOSet, cntrID, procID string, closeStdinCh <-chan struct{}, initstdio InitStdio) (cio.IO, error) {
cdio, err := containerio.NewDirectIO(context.Background(), fifoSet)
func (c *Client) createIO(fifoSet *cio.FIFOSet, cntrID, procID string, closeStdinCh <-chan struct{}, initstdio InitStdio) (cio.IO, error) {
cdio, err := cio.NewDirectIO(context.Background(), fifoSet)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -918,12 +918,12 @@ func (c *Client) attachIO(fifoSet *cio.FIFOSet, initstdio InitStdio) (cio.IO, er
return nil, fmt.Errorf("cannot attach to existing fifos")
}

cdio, err := containerio.NewDirectIO(context.Background(), &containerio.CioFIFOSet{
cdio, err := cio.NewDirectIO(context.Background(), &cio.FIFOSet{
Config: cio.Config{
Terminal: fifoSet.Terminal,
Stdin: fifoSet.In,
Stdout: fifoSet.Out,
Stderr: fifoSet.Err,
Stdin: fifoSet.Stdin,
Stdout: fifoSet.Stdout,
Stderr: fifoSet.Stderr,
},
})
if err != nil {
Expand Down
17 changes: 0 additions & 17 deletions ctrd/events.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package ctrd

import (
"context"

eventsapi "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/runtime"
"github.com/pkg/errors"
)

const (
Expand All @@ -23,16 +19,3 @@ const (
// TaskOOMEventTopic for task oom
TaskOOMEventTopic = runtime.TaskOOMEventTopic
)

// Events subscribe containerd events through an event subscribe client.
func (c *Client) Events(ctx context.Context, ef ...string) (eventsapi.Events_SubscribeClient, error) {
wrapperCli, err := c.Get(ctx)
if err != nil {
return nil, errors.Wrap(err, ErrGetCtrdClient.Error())
}

eventsClient := wrapperCli.client.EventService()
return eventsClient.Subscribe(ctx, &eventsapi.SubscribeRequest{
Filters: ef,
})
}
21 changes: 15 additions & 6 deletions ctrd/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func (c *Client) saveImage(ctx context.Context, exporter ctrdmetaimages.Exporter
// ImportImage creates a set of images by tarstream.
//
// NOTE: One tar may have several manifests.
func (c *Client) ImportImage(ctx context.Context, importer ctrdmetaimages.Importer, reader io.Reader) ([]containerd.Image, error) {
imgs, err := c.importImage(ctx, importer, reader)
func (c *Client) ImportImage(ctx context.Context, reader io.Reader, opts ...containerd.ImportOpt) ([]containerd.Image, error) {
imgs, err := c.importImage(ctx, reader, opts...)
if err != nil {
return imgs, convertCtrdErr(err)
}
Expand All @@ -154,26 +154,35 @@ func (c *Client) ImportImage(ctx context.Context, importer ctrdmetaimages.Import
// importImage creates a set of images by tarstream.
//
// NOTE: One tar may have several manifests.
func (c *Client) importImage(ctx context.Context, importer ctrdmetaimages.Importer, reader io.Reader) ([]containerd.Image, error) {
func (c *Client) importImage(ctx context.Context, reader io.Reader, opts ...containerd.ImportOpt) ([]containerd.Image, error) {
wrapperCli, err := c.Get(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get a containerd grpc client: %v", err)
}

// NOTE: The import will store the data into boltdb. But the unpack may
// fail. It is not transaction.
imgs, err := wrapperCli.client.Import(ctx, importer, reader)
imgs, err := wrapperCli.client.Import(ctx, reader, opts...)
if err != nil {
return nil, err
}

var (
res = make([]containerd.Image, 0, len(imgs))
snaphotter = CurrentSnapshotterName(ctx)
)

for _, img := range imgs {
err = img.Unpack(ctx, CurrentSnapshotterName(ctx))
image := containerd.NewImage(wrapperCli.client, img)

err = image.Unpack(ctx, snaphotter)
if err != nil {
return nil, err
}

res = append(res, image)
}
return imgs, nil
return res, nil
}

// PushImage pushes image to registry
Expand Down
13 changes: 7 additions & 6 deletions ctrd/image_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (c *Client) Commit(ctx context.Context, config *CommitConfig) (_ digest.Dig
if err != nil {
return "", errors.Wrapf(err, "failed to create lease for commit")
}
defer done()
defer done(ctx)
fuweid marked this conversation as resolved.
Show resolved Hide resolved

var (
sn = client.SnapshotService(CurrentSnapshotterName(ctx))
Expand Down Expand Up @@ -188,7 +188,7 @@ func (c *Client) Commit(ctx context.Context, config *CommitConfig) (_ digest.Dig

// write manifest content
ref := mfstDigest.String()
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(mfstJSON), mfstDesc.Size, mfstDesc.Digest, content.WithLabels(labels)); err != nil {
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(mfstJSON), mfstDesc, content.WithLabels(labels)); err != nil {
return "", errors.Wrapf(err, "error writing manifest blob %s", mfstDigest)
}

Expand All @@ -197,7 +197,7 @@ func (c *Client) Commit(ctx context.Context, config *CommitConfig) (_ digest.Dig
labelOpt := content.WithLabels(map[string]string{
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", CurrentSnapshotterName(ctx)): rootfsID,
})
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(imgJSON), configDesc.Size, configDesc.Digest, labelOpt); err != nil {
if err := content.WriteBlob(ctx, cs, ref, bytes.NewReader(imgJSON), configDesc, labelOpt); err != nil {
return "", errors.Wrap(err, "error writing config blob")
}

Expand All @@ -206,8 +206,9 @@ func (c *Client) Commit(ctx context.Context, config *CommitConfig) (_ digest.Dig
}

// export a new layer from a container
func exportLayer(ctx context.Context, name string, sn snapshots.Snapshotter, cs content.Store, differ diff.Differ) (ocispec.Descriptor, digest.Digest, error) {
rwDesc, err := rootfs.Diff(ctx, name, sn, differ)
func exportLayer(ctx context.Context, name string, sn snapshots.Snapshotter, cs content.Store, comparer diff.Comparer) (ocispec.Descriptor, digest.Digest, error) {
// export new layer
rwDesc, err := rootfs.CreateDiff(ctx, name, sn, comparer)
if err != nil {
return ocispec.Descriptor{}, digest.Digest(""), fmt.Errorf("failed to diff: %s", err)
}
Expand Down Expand Up @@ -264,7 +265,7 @@ func newChildImage(ctx context.Context, config *CommitConfig, diffID digest.Dige
}

// create a new snapshot for exported layer
func newSnapshot(ctx context.Context, name string, pImg ocispec.Image, sn snapshots.Snapshotter, differ diff.Differ, layer ocispec.Descriptor) error {
func newSnapshot(ctx context.Context, name string, pImg ocispec.Image, sn snapshots.Snapshotter, differ diff.Applier, layer ocispec.Descriptor) error {
var (
key = randomid.Generate()
parent = identity.ChainID(pImg.RootFS.DiffIDs).String()
Expand Down
5 changes: 1 addition & 4 deletions ctrd/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/alibaba/pouch/pkg/jsonstream"

"github.com/containerd/containerd"
eventsapi "github.com/containerd/containerd/api/services/events/v1"
containerdtypes "github.com/containerd/containerd/api/types"
ctrdmetaimages "github.com/containerd/containerd/images"
"github.com/containerd/containerd/mount"
Expand Down Expand Up @@ -66,8 +65,6 @@ type ContainerAPIClient interface {
SetExitHooks(hooks ...func(string, *Message, func() error) error)
// SetExecExitHooks specified the handlers of exec process exit.
SetExecExitHooks(hooks ...func(string, *Message) error)
// Events subscribe containerd events through an event subscribe client.
Events(ctx context.Context, ef ...string) (eventsapi.Events_SubscribeClient, error)
// SetEventsHooks specified the methods to handle the containerd events.
SetEventsHooks(hooks ...func(context.Context, string, string, map[string]string) error)
}
Expand All @@ -85,7 +82,7 @@ type ImageAPIClient interface {
// RemoveImage removes the image by the given reference.
RemoveImage(ctx context.Context, ref string) error
// ImportImage creates a set of images by tarstream.
ImportImage(ctx context.Context, importer ctrdmetaimages.Importer, reader io.Reader) ([]containerd.Image, error)
ImportImage(ctx context.Context, reader io.Reader, opts ...containerd.ImportOpt) ([]containerd.Image, error)
// SaveImage saves image to tarstream
SaveImage(ctx context.Context, exporter ctrdmetaimages.Exporter, ref string) (io.ReadCloser, error)
// Commit commits an image from a container.
Expand Down
2 changes: 1 addition & 1 deletion ctrd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (c *Client) CreateSnapshot(ctx context.Context, id, ref string) error {
if err != nil {
return fmt.Errorf("failed to get a containerd grpc client: %v", err)
}
ctx = leases.WithLease(ctx, wrapperCli.lease.ID())
ctx = leases.WithLease(ctx, wrapperCli.lease.ID)

image, err := wrapperCli.client.ImageService().Get(ctx, ref)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions ctrd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,24 @@ import (
"github.com/alibaba/pouch/apis/types"
"github.com/alibaba/pouch/pkg/errtypes"

"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)

func withExitShimV1CheckpointTaskOpts() containerd.CheckpointTaskOpts {
return func(r *containerd.CheckpointTaskInfo) error {
r.Options = &runctypes.CheckpointOptions{
Exit: true,
}
return nil
}
}

func resolver(authConfig *types.AuthConfig, resolverOpt docker.ResolverOptions) (remotes.Resolver, error) {
var (
// TODO
Expand Down
10 changes: 6 additions & 4 deletions ctrd/wrapper_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"github.com/containerd/containerd"
"github.com/containerd/containerd/leases"
"github.com/pkg/errors"
)

Expand All @@ -18,7 +19,7 @@ type WrapperClient struct {
// Lease is a new feature of containerd, We use it to avoid that the images
// are removed by garbage collection. If no lease is defined, the downloaded images will
// be removed automatically when the container is removed.
lease *containerd.Lease
lease *leases.Lease

mux sync.Mutex
// streamQuota records the numbers of stream client without be using
Expand All @@ -33,18 +34,19 @@ func newWrapperClient(rpcAddr string, defaultns string, maxStreamsClient int) (*
if err != nil {
return nil, errors.Wrap(err, "failed to connect containerd")
}
leaseSrv := cli.LeasesService()

// create a new lease or reuse the existed.
var lease containerd.Lease
var lease leases.Lease

leases, err := cli.ListLeases(context.TODO())
leases, err := leaseSrv.List(context.TODO())
if err != nil {
return nil, err
}
if len(leases) != 0 {
lease = leases[0]
} else {
if lease, err = cli.CreateLease(context.TODO()); err != nil {
if lease, err = leaseSrv.Create(context.TODO()); err != nil {
return nil, err
}
}
Expand Down
Loading