From d3f362b76cc532b1d2bcf3ff62805b7c9c811fbc Mon Sep 17 00:00:00 2001 From: Radek Gruchalski Date: Mon, 29 Mar 2021 00:03:36 +0000 Subject: [PATCH 1/2] On the way to process directories via grpc --- configs/firecracker.go | 2 +- pkg/build/dependency.go | 2 +- pkg/build/resources/resource.go | 31 ++-- pkg/build/rootfs.go | 2 +- pkg/build/rootfs_test.go | 301 ++++++++++++++++++++++---------- pkg/build/server/impl.go | 2 - pkg/build/server/server_test.go | 29 +-- pkg/build/server/shatredtest.go | 52 ++++-- pkg/containers/docker.go | 3 + 9 files changed, 278 insertions(+), 146 deletions(-) diff --git a/configs/firecracker.go b/configs/firecracker.go index 1ad8910..3af4ba5 100644 --- a/configs/firecracker.go +++ b/configs/firecracker.go @@ -48,7 +48,7 @@ func NewFcConfigProvider(jailingFcConfig *JailingFirecrackerConfig, machineConfi func (c *defaultFcConfigProvider) ToSDKConfig() firecracker.Config { - var fifo io.WriteCloser // TODO: do it like firectl does it + var fifo io.WriteCloser // CONSIDER: do it like firectl does it return firecracker.Config{ SocketPath: "", // given via Jailer diff --git a/pkg/build/dependency.go b/pkg/build/dependency.go index 909b665..df131b2 100644 --- a/pkg/build/dependency.go +++ b/pkg/build/dependency.go @@ -51,9 +51,9 @@ func (ddb *defaultDependencyBuild) Build(externalCopies []commands.Copy) ([]reso return emptyResponse, fmt.Errorf("error fetching Docker client: %+v", clientErr) } - // TODO: verify that this is actually possible with Docker. // Do not return early, maybe somebody attempts to build a base image // using the multistage build but without extracting actual resources. + // This is perfectly possible with Docker. // The cloned sources reside in .../sources directory, let's write our stage Dockerfile in there randFileName := strings.ToLower(utils.RandStringBytes(32)) diff --git a/pkg/build/resources/resource.go b/pkg/build/resources/resource.go index f7006b9..7bcd8c6 100644 --- a/pkg/build/resources/resource.go +++ b/pkg/build/resources/resource.go @@ -191,16 +191,11 @@ func (dr *defaultResolver) resolveResources(originalSource, resourcePath, target return nil, fmt.Errorf("resource failed: resolved '%s', reason: %v", match, statErr) } if statResult.IsDir() { - resources = append(resources, &defaultResolvedResource{contentsReader: func() (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader([]byte{})), nil - }, - isDir: true, - resolved: newPath, - sourcePath: resourcePath, - targetMode: statResult.Mode().Perm(), - targetPath: targetPath, - targetWorkdir: targetWorkdir, - targetUser: targetUser}) + resources = append(resources, + NewResolvedDirectoryResourceWithPath(statResult.Mode().Perm(), + newPath, resourcePath, targetPath, + targetWorkdir, + targetUser)) } else { resources = append(resources, &defaultResolvedResource{contentsReader: func() (io.ReadCloser, error) { file, err := os.Open(newPath) @@ -238,3 +233,19 @@ func NewResolvedFileResourceWithPath(contentsReader func() (io.ReadCloser, error targetWorkdir: workdir, targetUser: user} } + +// NewResolvedDirectoryResourceWithPath creates a resolved resource from input information containing resource source path. +func NewResolvedDirectoryResourceWithPath(mode fs.FileMode, resolvedPath, sourcePath, targetPath string, workdir commands.Workdir, user commands.User) ResolvedResource { + return &defaultResolvedResource{contentsReader: func() (io.ReadCloser, error) { + // TODO-MULTI-STAGE-VMINIT: here an SCP like protocol is needed to read the contents of the directory in one go + fmt.Println(" =====================> directory ", resolvedPath) + return ioutil.NopCloser(bytes.NewReader([]byte{})), nil + }, + isDir: true, + resolved: resolvedPath, + targetMode: mode, + sourcePath: sourcePath, + targetPath: targetPath, + targetWorkdir: workdir, + targetUser: user} +} diff --git a/pkg/build/rootfs.go b/pkg/build/rootfs.go index da83cea..751ae66 100644 --- a/pkg/build/rootfs.go +++ b/pkg/build/rootfs.go @@ -159,7 +159,7 @@ func (b *defaultBuild) CreateContext(dependencies server.Resources) (*server.Wor ctx.ResourcesResolved[sourcePath] = []resources.ResolvedResource{dependencyResource} ctx.ExecutableCommands = append(ctx.ExecutableCommands, commands.Copy{ - OriginalCommand: "", + OriginalCommand: tcommand.OriginalCommand, OriginalSource: dependencyResource.ResolvedURIOrPath(), Source: sourcePath, Target: dependencyResource.TargetPath(), diff --git a/pkg/build/rootfs_test.go b/pkg/build/rootfs_test.go index 6dc23c0..58fa38c 100644 --- a/pkg/build/rootfs_test.go +++ b/pkg/build/rootfs_test.go @@ -13,18 +13,106 @@ import ( "github.com/combust-labs/firebuild/pkg/build/reader" "github.com/combust-labs/firebuild/pkg/build/resources" "github.com/combust-labs/firebuild/pkg/build/server" + "github.com/combust-labs/firebuild/pkg/build/stage" "github.com/docker/docker/pkg/fileutils" "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/assert" ) -func mustNewArg(t *testing.T, rawValue string) commands.Arg { - arg, err := commands.NewRawArg(rawValue) +func TestContextBuilderMultiStageWithResources(t *testing.T) { + logger := hclog.Default() + logger.SetLevel(hclog.Debug) + + tempDir, err := ioutil.TempDir("", "") + if err != nil { + t.Fatal("expected temp dir, got error", err) + } + defer os.RemoveAll(tempDir) + + dockerfilePath := filepath.Join(tempDir, "Dockerfile") + + expectedResource1Bytes := []byte("resource 1 content") + expectedResource2Bytes := []byte("resource 2 content") + + mustPutTestResource(t, dockerfilePath, []byte(testDockerfileMultiStage)) + mustPutTestResource(t, filepath.Join(tempDir, "resource1"), []byte(expectedResource1Bytes)) + mustPutTestResource(t, filepath.Join(tempDir, "resource2"), []byte(expectedResource2Bytes)) + + readResult, err := reader.ReadFromString(dockerfilePath, tempDir) if err != nil { - t.Fatal(err) + t.Fatal("expected Dockerfile to be read, got error", err) } - return arg + + // this is a multi stage build so let's test the stage selection: + stages, errs := stage.ReadStages(readResult.Commands()) + if len(errs) > 0 { + t.Fatal("expected no errors in stage reader, got", errs) + } + + unnamed := stages.Unnamed() + if len(unnamed) != 1 { + t.Fatal("expected exactly one unnamed stage, got", len(unnamed)) + } + + named := stages.Named() + if len(unnamed) != 1 { + t.Fatal("expected exactly one named stage, got", len(named)) + } + + contextBuilder := NewDefaultBuild() + if err := contextBuilder.AddInstructions(unnamed[0].Commands()...); err != nil { + t.Fatal("expected commands to be added, got error", err) + } + + t.Run("it=fails if dependency resources do not exist", func(tt *testing.T) { + _, err := contextBuilder.WithResolver(resources.NewDefaultResolver()).CreateContext(make(server.Resources)) + if err == nil { + tt.Fatal("expected context creation to fail, but it built", err) + } + }) + + t.Run("it=succeeds when dependency resources exist", func(tt *testing.T) { + + mustPutTestResource(tt, filepath.Join(tempDir, "etc/test/file1"), []byte("etc/test/file1")) + mustPutTestResource(tt, filepath.Join(tempDir, "etc/test/file2"), []byte("etc/test/file2")) + mustPutTestResource(tt, filepath.Join(tempDir, "etc/test/subdir/subdir-file1"), []byte("etc/test/subdir/subdir-file1")) + + // construct resolved resources from the written files: + dependencyResources := server.Resources{ + "builder": []resources.ResolvedResource{resources. + NewResolvedDirectoryResourceWithPath(fs.ModePerm, + filepath.Join(tempDir, "etc/test"), "/etc/test", "/etc/test", + commands.Workdir{Value: "/"}, commands.User{Value: "0:0"}), + }, + } + + buildCtx, err := contextBuilder.WithResolver(resources.NewDefaultResolver()).CreateContext(dependencyResources) + if err != nil { + tt.Fatal("expected build context to be created, got error", err) + } + + testServer, testClient, cancelFunc := server.MustStartTestGRPCServer(tt, logger, buildCtx) + defer cancelFunc() + + opErr := testClient.Commands(tt) + if opErr != nil { + tt.Fatal("GRPC client Commands() opErr", opErr) + } + + mustBeRunCommand(tt, testClient) + mustBeAddCommand(tt, testClient, expectedResource1Bytes) + mustBeCopyCommand(tt, testClient, expectedResource2Bytes) + // TODO-MULTI-STAGE-VMINIT: this does not do yet what it should: + mustBeCopyCommand(tt, testClient, []byte("etc/test/file1"), []byte("etc/test/file2"), []byte("etc/test/subdir/subdir-file1")) + mustBeRunCommand(tt, testClient) + assert.Nil(tt, testClient.NextCommand()) + + testClient.Success() + <-testServer.FinishedNotify() + + }) + } func TestContextBuilderSingleStageWithResources(t *testing.T) { @@ -38,19 +126,16 @@ func TestContextBuilderSingleStageWithResources(t *testing.T) { } defer os.RemoveAll(tempDir) + dockerfilePath := filepath.Join(tempDir, "Dockerfile") + expectedResource1Bytes := []byte("resource 1 content") expectedResource2Bytes := []byte("resource 2 content") - if err := ioutil.WriteFile(filepath.Join(tempDir, "Dockerfile"), []byte(testDockerfile1), fs.ModePerm); err != nil { - t.Fatal("expected Dockerfile to be written, got error", err) - } - if err := ioutil.WriteFile(filepath.Join(tempDir, "resource1"), expectedResource1Bytes, fs.ModePerm); err != nil { - t.Fatal("expected Dockerfile to be written, got error", err) - } - if err := ioutil.WriteFile(filepath.Join(tempDir, "resource2"), expectedResource2Bytes, fs.ModePerm); err != nil { - t.Fatal("expected Dockerfile to be written, got error", err) - } - readResult, err := reader.ReadFromString(filepath.Join(tempDir, "Dockerfile"), tempDir) + mustPutTestResource(t, dockerfilePath, []byte(testDockerfileSingleStage)) + mustPutTestResource(t, filepath.Join(tempDir, "resource1"), []byte(expectedResource1Bytes)) + mustPutTestResource(t, filepath.Join(tempDir, "resource2"), []byte(expectedResource2Bytes)) + + readResult, err := reader.ReadFromString(dockerfilePath, tempDir) if err != nil { t.Fatal("expected Dockerfile to be read, got error", err) } @@ -59,98 +144,28 @@ func TestContextBuilderSingleStageWithResources(t *testing.T) { if err := contextBuilder.AddInstructions(readResult.Commands()...); err != nil { t.Fatal("expected commands to be added, got error", err) } + buildCtx, err := contextBuilder.WithResolver(resources.NewDefaultResolver()).CreateContext(make(server.Resources)) if err != nil { t.Fatal("expected build context to be created, got error", err) } - grpcConfig := &server.GRPCServiceConfig{ - ServerName: "test-grpc-server", - BindHostPort: "127.0.0.1:0", - EmbeddedCAKeySize: 1024, // use this low for tests only! low value speeds up tests - } - - testServer := server.NewTestServer(t, logger.Named("grpc-server"), grpcConfig, buildCtx) - testServer.Start() - - select { - case startErr := <-testServer.FailedNotify(): - t.Fatal("expected the GRPC server to start but it failed", startErr) - case <-testServer.ReadyNotify(): - t.Log("GRPC server started and serving on", grpcConfig.BindHostPort) - defer testServer.Stop() - } - - testClient, clientErr := server.NewTestClient(t, logger.Named("grpc-client"), grpcConfig) - if clientErr != nil { - t.Fatal("expected the GRPC client, got error", clientErr) - } + testServer, testClient, cancelFunc := server.MustStartTestGRPCServer(t, logger, buildCtx) + defer cancelFunc() opErr := testClient.Commands(t) if opErr != nil { t.Fatal("GRPC client Commands() opErr", opErr) } - nextCommand := testClient.NextCommand() - if _, ok := nextCommand.(commands.Run); !ok { - t.Fatal("expected RUN command") - } - - nextCommand = testClient.NextCommand() - if addCommand, ok := nextCommand.(commands.Add); !ok { - t.Fatal("expected ADD command") - } else { - resourceChannel, errorChannel, err := testClient.Resource(addCommand.Source) - if err != nil { - t.Fatal("expected resource channel for ADD command, got error", err) - } - select { - case err := <-errorChannel: - if err != nil { - t.Fatal("error while reading resource", err) - } - case resource := <-resourceChannel: - resourceData, err := mustReadFromReader(resource.Contents()) - if err != nil { - t.Fatal("expected resource to read, got error", err) - } - assert.Equal(t, expectedResource1Bytes, resourceData) - } - } - - nextCommand = testClient.NextCommand() - if copyCommand, ok := nextCommand.(commands.Copy); !ok { - t.Fatal("expected COPY command") - } else { - resourceChannel, errorChannel, err := testClient.Resource(copyCommand.Source) - if err != nil { - t.Fatal("expected resource channel for ADD command, got error", err) - } - select { - case err := <-errorChannel: - if err != nil { - t.Fatal("error while reading resource", err) - } - case resource := <-resourceChannel: - resourceData, err := mustReadFromReader(resource.Contents()) - if err != nil { - t.Fatal("expected resource to read, got error", err) - } - assert.Equal(t, expectedResource2Bytes, resourceData) - } - } - - nextCommand = testClient.NextCommand() - if _, ok := nextCommand.(commands.Run); !ok { - t.Fatal("expected RUN command") - } - + mustBeRunCommand(t, testClient) + mustBeAddCommand(t, testClient, expectedResource1Bytes) + mustBeCopyCommand(t, testClient, expectedResource2Bytes) + mustBeRunCommand(t, testClient) assert.Nil(t, testClient.NextCommand()) testClient.Success() - <-testServer.FinishedNotify() - } func TestDockerignoreMatches(t *testing.T) { @@ -187,15 +202,117 @@ func TestDockerignoreMatches(t *testing.T) { } } +func mustPutTestResource(t *testing.T, path string, contents []byte) { + if err := os.MkdirAll(filepath.Dir(path), fs.ModePerm); err != nil { + t.Fatal("failed creating parent directory for the resource, got error", err) + } + if err := ioutil.WriteFile(path, contents, fs.ModePerm); err != nil { + t.Fatal("expected resource to be written, got error", err) + } +} + func mustReadFromReader(reader io.ReadCloser, _ error) ([]byte, error) { return ioutil.ReadAll(reader) } -const testDockerfile1 = `FROM alpine:3.13 +func mustBeAddCommand(t *testing.T, testClient server.TestClient, expectedContents ...[]byte) { + if addCommand, ok := testClient.NextCommand().(commands.Add); !ok { + t.Fatal("expected ADD command") + } else { + resourceChannel, err := testClient.Resource(addCommand.Source) + if err != nil { + t.Fatal("expected resource channel for ADD command, got error", err) + } + + idx := 0 + out: + for { + select { + case item := <-resourceChannel: + switch titem := item.(type) { + case nil: + break out // break out on nil + case resources.ResolvedResource: + resourceData, err := mustReadFromReader(titem.Contents()) + if err != nil { + t.Fatal("expected resource to read, got error", err) + } + assert.Equal(t, expectedContents[idx], resourceData) + idx = idx + 1 + case error: + t.Fatal("received an error while reading ADD resource", titem) + } + } + } + + } +} + +func mustBeCopyCommand(t *testing.T, testClient server.TestClient, expectedContents ...[]byte) { + if copyCommand, ok := testClient.NextCommand().(commands.Copy); !ok { + t.Fatal("expected COPY command") + } else { + resourceChannel, err := testClient.Resource(copyCommand.Source) + if err != nil { + t.Fatal("expected resource channel for COPY command, got error", err) + } + + idx := 0 + out: + for { + select { + case item := <-resourceChannel: + switch titem := item.(type) { + case nil: + break out // break out on nil + case resources.ResolvedResource: + if titem.IsDir() { + // TODO-MULTI-STAGE-VMINIT: handle a directory here... + resourceData, err := mustReadFromReader(titem.Contents()) + if err != nil { + t.Fatal("expected resource to read, got error", err) + } + fmt.Println(" ================> ", resourceData) + continue // skip directories for tests + } + resourceData, err := mustReadFromReader(titem.Contents()) + if err != nil { + t.Fatal("expected resource to read, got error", err) + } + assert.Equal(t, expectedContents[idx], resourceData) + idx = idx + 1 + case error: + t.Fatal("received an error while reading ADD resource", titem) + } + } + } + + } +} + +func mustBeRunCommand(t *testing.T, testClient server.TestClient) { + if _, ok := testClient.NextCommand().(commands.Run); !ok { + t.Fatal("expected RUN command") + } +} + +const testDockerfileSingleStage = `FROM alpine:3.13 +ARG PARAM1=value +ENV ENVPARAM1=envparam1 +RUN mkdir -p /dir +ADD resource1 /target/resource1 +COPY resource2 /target/resource2 +RUN cp /dir/${ENVPARAM1} \ + && call --arg=${PARAM1}` + +const testDockerfileMultiStage = `FROM alpine:3.13 as builder + +FROM alpine:3.13 ARG PARAM1=value ENV ENVPARAM1=envparam1 RUN mkdir -p /dir ADD resource1 /target/resource1 COPY resource2 /target/resource2 +COPY --from=builder /etc/test /etc/test RUN cp /dir/${ENVPARAM1} \ && call --arg=${PARAM1}` diff --git a/pkg/build/server/impl.go b/pkg/build/server/impl.go index 4239750..b45fbce 100644 --- a/pkg/build/server/impl.go +++ b/pkg/build/server/impl.go @@ -14,8 +14,6 @@ import ( "github.com/hashicorp/go-hclog" ) -// TODO: handle closing of channels when the owning server is closed. - // EventProvider provides the event subsriptions to the server executor. // When client event occurs, a corresponding event will be sent via one of the channels. type EventProvider interface { diff --git a/pkg/build/server/server_test.go b/pkg/build/server/server_test.go index 86bc4db..7af0887 100644 --- a/pkg/build/server/server_test.go +++ b/pkg/build/server/server_test.go @@ -15,7 +15,7 @@ type eventuallyFunc func() error func TestServerNoContentOpsAbort(t *testing.T) { testWithStopType(t, func(client TestClient) { client.Abort(fmt.Errorf("aborted")) - }, func(server TestingServerProvider) eventuallyFunc { + }, func(server TestServer) eventuallyFunc { return func() error { if server.Aborted() == nil { return fmt.Errorf("expected Aborted() to be not nil") @@ -28,7 +28,7 @@ func TestServerNoContentOpsAbort(t *testing.T) { func TestServerNoContentOpsSuccess(t *testing.T) { testWithStopType(t, func(client TestClient) { client.Success() - }, func(server TestingServerProvider) eventuallyFunc { + }, func(server TestServer) eventuallyFunc { return func() error { if !server.Succeeded() { return fmt.Errorf("expected Succeeded() to be true") @@ -38,7 +38,7 @@ func TestServerNoContentOpsSuccess(t *testing.T) { }) } -func testWithStopType(t *testing.T, stopTrigger func(TestClient), eventuallyCond func(TestingServerProvider) eventuallyFunc) { +func testWithStopType(t *testing.T, stopTrigger func(TestClient), eventuallyCond func(TestServer) eventuallyFunc) { logger := hclog.Default() logger.SetLevel(hclog.Debug) @@ -47,27 +47,8 @@ func testWithStopType(t *testing.T, stopTrigger func(TestClient), eventuallyCond ResourcesResolved: make(Resources), } - grpcConfig := &GRPCServiceConfig{ - ServerName: "test-grpc-server", - BindHostPort: "127.0.0.1:0", - EmbeddedCAKeySize: 1024, // use this low for tests only! low value speeds up tests - } - - testServer := NewTestServer(t, logger.Named("grpc-server"), grpcConfig, buildCtx) - testServer.Start() - - select { - case startErr := <-testServer.FailedNotify(): - t.Fatal("expected the GRPC server to start but it failed", startErr) - case <-testServer.ReadyNotify(): - t.Log("GRPC server started and serving on", grpcConfig.BindHostPort) - defer testServer.Stop() - } - - testClient, clientErr := NewTestClient(t, logger.Named("grpc-client"), grpcConfig) - if clientErr != nil { - t.Fatal("expected the GRPC client, got error", clientErr) - } + testServer, testClient, cleanupFunc := MustStartTestGRPCServer(t, logger, buildCtx) + defer cleanupFunc() expectedStderrLines := []string{"stderr line", "stderr line 2"} expectedStdoutLines := []string{"stdout line", "stdout line 2"} diff --git a/pkg/build/server/shatredtest.go b/pkg/build/server/shatredtest.go index b58bbf4..6471357 100644 --- a/pkg/build/server/shatredtest.go +++ b/pkg/build/server/shatredtest.go @@ -13,7 +13,6 @@ import ( "github.com/combust-labs/firebuild/grpc/proto" "github.com/combust-labs/firebuild/pkg/build/commands" - "github.com/combust-labs/firebuild/pkg/build/resources" "github.com/hashicorp/go-hclog" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" @@ -21,9 +20,9 @@ import ( "google.golang.org/grpc/credentials" ) -// TestingServerProvider wraps an instance of a server and provides testing +// TestServer wraps an instance of a server and provides testing // utilities around it. -type TestingServerProvider interface { +type TestServer interface { Start() Stop() FailedNotify() <-chan error @@ -160,13 +159,37 @@ func (p *testGRPCServerProvider) Succeeded() bool { return p.success } +// MustStartTestGRPCServer starts a test server and returns a client, a server and a server cleanup function. +// Fails test on any error. +func MustStartTestGRPCServer(t *testing.T, logger hclog.Logger, buildCtx *WorkContext) (TestServer, TestClient, func()) { + grpcConfig := &GRPCServiceConfig{ + ServerName: "test-grpc-server", + BindHostPort: "127.0.0.1:0", + EmbeddedCAKeySize: 1024, // use this low for tests only! low value speeds up tests + } + testServer := NewTestServer(t, logger.Named("grpc-server"), grpcConfig, buildCtx) + testServer.Start() + select { + case startErr := <-testServer.FailedNotify(): + t.Fatal("expected the GRPC server to start but it failed", startErr) + case <-testServer.ReadyNotify(): + t.Log("GRPC server started and serving on", grpcConfig.BindHostPort) + } + testClient, clientErr := NewTestClient(t, logger.Named("grpc-client"), grpcConfig) + if clientErr != nil { + testServer.Stop() + t.Fatal("expected the GRPC client, got error", clientErr) + } + return testServer, testClient, func() { testServer.Stop() } +} + // -- test client type TestClient interface { Abort(error) error Commands(*testing.T) error NextCommand() commands.VMInitSerializableCommand - Resource(string) (<-chan resources.ResolvedResource, <-chan error, error) + Resource(string) (chan interface{}, error) StdErr([]string) error StdOut([]string) error Success() error @@ -241,20 +264,20 @@ func (c *testClient) NextCommand() commands.VMInitSerializableCommand { return result } -func (c *testClient) Resource(input string) (<-chan resources.ResolvedResource, <-chan error, error) { +func (c *testClient) Resource(input string) (chan interface{}, error) { - chanResources := make(chan resources.ResolvedResource) - chanError := make(chan error) + chanResources := make(chan interface{}) resourceClient, err := c.underlying.Resource(context.Background(), &proto.ResourceRequest{Path: input}) if err != nil { - return nil, nil, err + return nil, err } go func() { var currentResource *testResolvedResource + out: for { response, err := resourceClient.Recv() @@ -265,19 +288,18 @@ func (c *testClient) Resource(input string) (<-chan resources.ResolvedResource, // yes, err check after response check if err != nil { - chanError <- errors.Wrap(err, "failed reading chunk") - return + chanResources <- errors.Wrap(err, "failed reading chunk") + break out } switch tresponse := response.GetPayload().(type) { case *proto.ResourceChunk_Eof: chanResources <- currentResource case *proto.ResourceChunk_Chunk: - // TODO: check the checksum of the chunk... hash := sha256.Sum256(tresponse.Chunk.Chunk) - if string(hash[:]) != string(currentResource.contents) { - chanError <- errors.Wrap(err, "chunk checksum did not match") - return + if string(hash[:]) != string(tresponse.Chunk.Checksum) { + chanResources <- errors.Wrap(err, "chunk checksum did not match") + break out } currentResource.contents = append(currentResource.contents, tresponse.Chunk.Chunk...) case *proto.ResourceChunk_Header: @@ -297,7 +319,7 @@ func (c *testClient) Resource(input string) (<-chan resources.ResolvedResource, }() - return chanResources, chanError, nil + return chanResources, nil } func (c *testClient) StdErr(input []string) error { diff --git a/pkg/containers/docker.go b/pkg/containers/docker.go index 6c82f17..99ac9ce 100644 --- a/pkg/containers/docker.go +++ b/pkg/containers/docker.go @@ -415,6 +415,9 @@ func ImageExportStageDependentResources(ctx context.Context, client *docker.Clie if !layerHeader.FileInfo().IsDir() { // gotta read the file... opLogger.Debug("reading file", "layer", layerHeader.Name, "matched-prefix", opCopy.Source) + // -- + // CONSIDER: saving the files in a stage derived directory: + // -- targetPath := filepath.Join(exportsRoot, layerHeader.Name) // make sure we have the parent directory for the target: if parentDirErr := os.MkdirAll(filepath.Dir(targetPath), fs.ModePerm); err != nil { From f71441fd7f9ccdc367cdd7633c255862a8984fdc Mon Sep 17 00:00:00 2001 From: Radek Gruchalski Date: Wed, 31 Mar 2021 21:54:42 +0000 Subject: [PATCH 2/2] Wrap up sending directories over grpc --- go.mod | 1 + pkg/build/resources/resource.go | 2 - pkg/build/rootfs_test.go | 93 +++++-------- pkg/build/server/grpc_directory_resource.go | 141 ++++++++++++++++++++ pkg/build/server/impl.go | 40 ++++-- 5 files changed, 203 insertions(+), 74 deletions(-) create mode 100644 pkg/build/server/grpc_directory_resource.go diff --git a/go.mod b/go.mod index 24b1020..01aa47e 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/firecracker-microvm/firecracker-go-sdk v0.22.0 github.com/go-git/go-git/v5 v5.2.0 github.com/gofrs/uuid v4.0.0+incompatible + github.com/golang/protobuf v1.5.1 github.com/hashicorp/go-hclog v0.15.0 github.com/hashicorp/go-multierror v1.0.0 github.com/mitchellh/mapstructure v1.3.2 diff --git a/pkg/build/resources/resource.go b/pkg/build/resources/resource.go index 7bcd8c6..3d54adf 100644 --- a/pkg/build/resources/resource.go +++ b/pkg/build/resources/resource.go @@ -237,8 +237,6 @@ func NewResolvedFileResourceWithPath(contentsReader func() (io.ReadCloser, error // NewResolvedDirectoryResourceWithPath creates a resolved resource from input information containing resource source path. func NewResolvedDirectoryResourceWithPath(mode fs.FileMode, resolvedPath, sourcePath, targetPath string, workdir commands.Workdir, user commands.User) ResolvedResource { return &defaultResolvedResource{contentsReader: func() (io.ReadCloser, error) { - // TODO-MULTI-STAGE-VMINIT: here an SCP like protocol is needed to read the contents of the directory in one go - fmt.Println(" =====================> directory ", resolvedPath) return ioutil.NopCloser(bytes.NewReader([]byte{})), nil }, isDir: true, diff --git a/pkg/build/rootfs_test.go b/pkg/build/rootfs_test.go index 58fa38c..3b8411e 100644 --- a/pkg/build/rootfs_test.go +++ b/pkg/build/rootfs_test.go @@ -103,8 +103,15 @@ func TestContextBuilderMultiStageWithResources(t *testing.T) { mustBeRunCommand(tt, testClient) mustBeAddCommand(tt, testClient, expectedResource1Bytes) mustBeCopyCommand(tt, testClient, expectedResource2Bytes) - // TODO-MULTI-STAGE-VMINIT: this does not do yet what it should: - mustBeCopyCommand(tt, testClient, []byte("etc/test/file1"), []byte("etc/test/file2"), []byte("etc/test/subdir/subdir-file1")) + // directories do not have a byte content, they always return empty bytes: + // since we have: + // - /etc/test: dir + // - /etc/test/file1: file + // - /etc/test/file2: file + // - /etc/test/subdir: dir + // - /etc/test/subdir/subdir-file1: file + // we expect the following: + mustBeCopyCommand(tt, testClient, []byte{}, []byte("etc/test/file1"), []byte("etc/test/file2"), []byte{}, []byte("etc/test/subdir/subdir-file1")) mustBeRunCommand(tt, testClient) assert.Nil(tt, testClient.NextCommand()) @@ -219,31 +226,7 @@ func mustBeAddCommand(t *testing.T, testClient server.TestClient, expectedConten if addCommand, ok := testClient.NextCommand().(commands.Add); !ok { t.Fatal("expected ADD command") } else { - resourceChannel, err := testClient.Resource(addCommand.Source) - if err != nil { - t.Fatal("expected resource channel for ADD command, got error", err) - } - - idx := 0 - out: - for { - select { - case item := <-resourceChannel: - switch titem := item.(type) { - case nil: - break out // break out on nil - case resources.ResolvedResource: - resourceData, err := mustReadFromReader(titem.Contents()) - if err != nil { - t.Fatal("expected resource to read, got error", err) - } - assert.Equal(t, expectedContents[idx], resourceData) - idx = idx + 1 - case error: - t.Fatal("received an error while reading ADD resource", titem) - } - } - } + mustReadResources(t, testClient, addCommand.Source, expectedContents...) } } @@ -252,41 +235,35 @@ func mustBeCopyCommand(t *testing.T, testClient server.TestClient, expectedConte if copyCommand, ok := testClient.NextCommand().(commands.Copy); !ok { t.Fatal("expected COPY command") } else { - resourceChannel, err := testClient.Resource(copyCommand.Source) - if err != nil { - t.Fatal("expected resource channel for COPY command, got error", err) - } + mustReadResources(t, testClient, copyCommand.Source, expectedContents...) + } +} - idx := 0 - out: - for { - select { - case item := <-resourceChannel: - switch titem := item.(type) { - case nil: - break out // break out on nil - case resources.ResolvedResource: - if titem.IsDir() { - // TODO-MULTI-STAGE-VMINIT: handle a directory here... - resourceData, err := mustReadFromReader(titem.Contents()) - if err != nil { - t.Fatal("expected resource to read, got error", err) - } - fmt.Println(" ================> ", resourceData) - continue // skip directories for tests - } - resourceData, err := mustReadFromReader(titem.Contents()) - if err != nil { - t.Fatal("expected resource to read, got error", err) - } - assert.Equal(t, expectedContents[idx], resourceData) - idx = idx + 1 - case error: - t.Fatal("received an error while reading ADD resource", titem) +func mustReadResources(t *testing.T, testClient server.TestClient, source string, expectedContents ...[]byte) { + resourceChannel, err := testClient.Resource(source) + if err != nil { + t.Fatal("expected resource channel for COPY command, got error", err) + } + + idx := 0 +out: + for { + select { + case item := <-resourceChannel: + switch titem := item.(type) { + case nil: + break out // break out on nil + case resources.ResolvedResource: + resourceData, err := mustReadFromReader(titem.Contents()) + if err != nil { + t.Fatal("expected resource to read, got error", err) } + assert.Equal(t, expectedContents[idx], resourceData) + idx = idx + 1 + case error: + t.Fatal("received an error while reading ADD resource", titem) } } - } } diff --git a/pkg/build/server/grpc_directory_resource.go b/pkg/build/server/grpc_directory_resource.go new file mode 100644 index 0000000..84fa2d0 --- /dev/null +++ b/pkg/build/server/grpc_directory_resource.go @@ -0,0 +1,141 @@ +package server + +import ( + "bytes" + "crypto/sha256" + "io" + "io/fs" + "io/ioutil" + "os" + "path/filepath" + "strings" + + "github.com/combust-labs/firebuild/grpc/proto" + "github.com/combust-labs/firebuild/pkg/build/commands" + "github.com/combust-labs/firebuild/pkg/build/resources" + "github.com/gofrs/uuid" +) + +type GRPCReadingDirectoryResource interface { + WalkResource() chan *proto.ResourceChunk +} + +// NewResolvedDirectoryResourceWithPath creates a resolved resource from input information containing resource source path. +func NewGRPCDirectoryResource(safeBufferSize int, resource resources.ResolvedResource) GRPCReadingDirectoryResource { + return &grpcDirectoryResource{contentsReader: func() (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewReader([]byte{})), nil + }, + isDir: true, + resolved: resource.ResolvedURIOrPath(), + safeBufferSize: safeBufferSize, + targetMode: resource.TargetMode(), + sourcePath: resource.SourcePath(), + targetPath: resource.TargetPath(), + targetWorkdir: resource.TargetWorkdir(), + targetUser: resource.TargetUser(), + } +} + +type grpcDirectoryResource struct { + contentsReader func() (io.ReadCloser, error) + isDir bool + resolved string + safeBufferSize int + targetMode fs.FileMode + sourcePath string + targetPath string + targetWorkdir commands.Workdir + targetUser commands.User +} + +func (drr *grpcDirectoryResource) WalkResource() chan *proto.ResourceChunk { + chanChunks := make(chan *proto.ResourceChunk) + go func() { + filepath.WalkDir(drr.resolved, func(path string, d fs.DirEntry, err error) error { + + finfo, err := d.Info() + if err != nil { + return err + } + + remainingPath := strings.TrimPrefix(strings.TrimPrefix(path, drr.resolved), "/") + + resourceUUID := uuid.Must(uuid.NewV4()).String() + + if d.IsDir() { + chanChunks <- &proto.ResourceChunk{ + Payload: &proto.ResourceChunk_Header{ + Header: &proto.ResourceChunk_ResourceHeader{ + SourcePath: filepath.Join(drr.sourcePath, remainingPath), + TargetPath: filepath.Join(drr.targetPath, remainingPath), + FileMode: int64(finfo.Mode().Perm()), + IsDir: true, + TargetUser: drr.targetUser.Value, + TargetWorkdir: drr.targetWorkdir.Value, + Id: resourceUUID, + }, + }, + } + chanChunks <- &proto.ResourceChunk{ + Payload: &proto.ResourceChunk_Eof{ + Eof: &proto.ResourceChunk_ResourceEof{ + Id: resourceUUID, + }, + }, + } + return nil + } + + // it's a file: + + chanChunks <- &proto.ResourceChunk{ + Payload: &proto.ResourceChunk_Header{ + Header: &proto.ResourceChunk_ResourceHeader{ + SourcePath: filepath.Join(drr.sourcePath, remainingPath), + TargetPath: filepath.Join(drr.targetPath, remainingPath), + FileMode: int64(finfo.Mode().Perm()), + IsDir: true, + TargetUser: drr.targetUser.Value, + TargetWorkdir: drr.targetWorkdir.Value, + Id: resourceUUID, + }, + }, + } + + buffer := make([]byte, drr.safeBufferSize) + + reader, err := os.Open(path) + defer reader.Close() + + for { + readBytes, err := reader.Read(buffer) + if readBytes == 0 && err == io.EOF { + chanChunks <- &proto.ResourceChunk{ + Payload: &proto.ResourceChunk_Eof{ + Eof: &proto.ResourceChunk_ResourceEof{ + Id: resourceUUID, + }, + }, + } + break + } else { + payload := buffer[0:readBytes] + hash := sha256.Sum256(payload) + chanChunks <- &proto.ResourceChunk{ + Payload: &proto.ResourceChunk_Chunk{ + Chunk: &proto.ResourceChunk_ResourceContents{ + Chunk: payload, + Checksum: hash[:], + Id: resourceUUID, + }, + }, + } + } + } + + return nil + }) + chanChunks <- nil + }() + return chanChunks +} diff --git a/pkg/build/server/impl.go b/pkg/build/server/impl.go index b45fbce..9d8b11f 100644 --- a/pkg/build/server/impl.go +++ b/pkg/build/server/impl.go @@ -82,9 +82,31 @@ func (impl *serverImpl) Commands(ctx context.Context, _ *proto.Empty) (*proto.Co } func (impl *serverImpl) Resource(req *proto.ResourceRequest, stream proto.RootfsServer_ResourceServer) error { - if resources, ok := impl.serverCtx.ResourcesResolved[req.Path]; ok { - // TODO: can this be parallel? - for _, resource := range resources { + if ress, ok := impl.serverCtx.ResourcesResolved[req.Path]; ok { + for _, resource := range ress { + + reader, err := resource.Contents() + if err != nil { + return err + } + + // make it a little bit smaller than the actual max size + safeBufSize := impl.serviceConfig.SafeClientMaxRecvMsgSize() + + impl.logger.Debug("sending data with safe buffer size", "resource", resource.TargetPath(), "safe-buffer-size", safeBufSize) + + if resource.IsDir() { + grpcDirResource := NewGRPCDirectoryResource(safeBufSize, resource) + outputChannel := grpcDirResource.WalkResource() + for { + payload := <-outputChannel + if payload == nil { + break + } + stream.Send(payload) + } + continue + } resourceUUID := uuid.Must(uuid.NewV4()).String() stream.Send(&proto.ResourceChunk{ @@ -101,18 +123,7 @@ func (impl *serverImpl) Resource(req *proto.ResourceRequest, stream proto.Rootfs }, }) - reader, err := resource.Contents() - if err != nil { - return err - } - - // make it a little bit smaller than the actual max size - safeBufSize := impl.serviceConfig.SafeClientMaxRecvMsgSize() - - impl.logger.Debug("sending data with safe buffer size", "resource", resource.TargetPath(), "safe-buffer-size", safeBufSize) - buffer := make([]byte, safeBufSize) - for { readBytes, err := reader.Read(buffer) if readBytes == 0 && err == io.EOF { @@ -139,6 +150,7 @@ func (impl *serverImpl) Resource(req *proto.ResourceRequest, stream proto.Rootfs } } } + } else { return fmt.Errorf("not found: '%s/%s'", req.Stage, req.Path) }