diff --git a/go.mod b/go.mod index bf8e26b1574..eb3a7fb2e18 100644 --- a/go.mod +++ b/go.mod @@ -196,3 +196,5 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) + +replace github.com/compose-spec/compose-go/v2 => github.com/ndeloof/compose-go/v2 v2.0.1-0.20241127110655-b1321070b3ab diff --git a/go.sum b/go.sum index dd004eef300..d4b2f3eef4a 100644 --- a/go.sum +++ b/go.sum @@ -85,8 +85,6 @@ github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 h1:QVw89YDxXxEe+l8gU8E github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb h1:EDmT6Q9Zs+SbUoc7Ik9EfrFqcylYqgPZ9ANSbTAntnE= github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb/go.mod h1:ZjrT6AXHbDs86ZSdt/osfBi5qfexBrKUdONk989Wnk4= -github.com/compose-spec/compose-go/v2 v2.4.5 h1:p4ih4Jb6VgGPLPxh3fSFVKAjFHtZd+7HVLCSFzcFx9Y= -github.com/compose-spec/compose-go/v2 v2.4.5/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc= github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM= github.com/containerd/cgroups/v3 v3.0.2 h1:f5WFqIVSgo5IZmtTT3qVBo6TzI1ON6sycSBKkymb9L0= github.com/containerd/cgroups/v3 v3.0.2/go.mod h1:JUgITrzdFqp42uI2ryGA+ge0ap/nxzYgkGmIcetmErE= @@ -359,6 +357,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/ndeloof/compose-go/v2 v2.0.1-0.20241127110655-b1321070b3ab h1:3Q4/1sAnPv4nMpak/lIzWsQJjX8X5zKZRkDd6mlf2mc= +github.com/ndeloof/compose-go/v2 v2.0.1-0.20241127110655-b1321070b3ab/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU= diff --git a/pkg/compose/watch.go b/pkg/compose/watch.go index f469820b137..7917ce8aa3b 100644 --- a/pkg/compose/watch.go +++ b/pkg/compose/watch.go @@ -23,12 +23,12 @@ import ( "os" "path" "path/filepath" - "sort" "strconv" "strings" "time" "github.com/compose-spec/compose-go/v2/types" + ccli "github.com/docker/cli/cli/command/container" pathutil "github.com/docker/compose/v2/internal/paths" "github.com/docker/compose/v2/internal/sync" "github.com/docker/compose/v2/pkg/api" @@ -48,7 +48,7 @@ const quietPeriod = 500 * time.Millisecond // fileEvent contains the Compose service and modified host system path. type fileEvent struct { sync.PathMapping - Action types.WatchAction + Trigger types.Trigger } // getSyncImplementation returns an appropriate sync implementation for the @@ -298,7 +298,7 @@ func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMat } return &fileEvent{ - Action: trigger.Action, + Trigger: trigger, PathMapping: sync.PathMapping{ HostPath: hostPath, ContainerPath: containerPath, @@ -338,6 +338,9 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) if trigger.Action == types.WatchActionRebuild && service.Build == nil { return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name) } + if trigger.Action == types.WatchActionSyncExec && len(trigger.Exec.Command) == 0 { + return nil, fmt.Errorf("can't watch with action 'sync+exec' on service %s wihtout a command", service.Name) + } config.Watch[i] = trigger } @@ -352,24 +355,17 @@ func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time. out := make(chan []fileEvent) go func() { defer close(out) - seen := make(map[fileEvent]time.Time) + seen := make(map[sync.PathMapping]fileEvent) flushEvents := func() { if len(seen) == 0 { return } events := make([]fileEvent, 0, len(seen)) - for e := range seen { + for _, e := range seen { events = append(events, e) } - // sort batch by oldest -> newest - // (if an event is seen > 1 per batch, it gets the latest timestamp) - sort.SliceStable(events, func(i, j int) bool { - x := events[i] - y := events[j] - return seen[x].Before(seen[y]) - }) out <- events - seen = make(map[fileEvent]time.Time) + seen = make(map[sync.PathMapping]fileEvent) } t := clock.NewTicker(delay) @@ -386,7 +382,7 @@ func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time. flushEvents() return } - seen[e] = time.Now() + seen[e.PathMapping] = e t.Reset(delay) } } @@ -485,49 +481,10 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr pathMappings := make([]sync.PathMapping, len(batch)) restartService := false for i := range batch { - if batch[i].Action == types.WatchActionRebuild { - options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName)) - // restrict the build to ONLY this service, not any of its dependencies - options.Build.Services = []string{serviceName} - imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil) - - if err != nil { - options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err)) - return err - } - - if options.Prune { - s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap) - } - - options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName)) - - err = s.create(ctx, project, api.CreateOptions{ - Services: []string{serviceName}, - Inherit: true, - Recreate: api.RecreateForce, - }) - if err != nil { - options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err)) - return err - } - - services := []string{serviceName} - p, err := project.WithSelectedServices(services) - if err != nil { - return err - } - err = s.start(ctx, project.Name, api.StartOptions{ - Project: p, - Services: services, - AttachTo: services, - }, nil) - if err != nil { - options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err)) - } - return nil + if batch[i].Trigger.Action == types.WatchActionRebuild { + return s.rebuild(ctx, project, serviceName, options) } - if batch[i].Action == types.WatchActionSyncRestart { + if batch[i].Trigger.Action == types.WatchActionSyncRestart { restartService = true } pathMappings[i] = batch[i].PathMapping @@ -554,7 +511,75 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr options.LogTo.Log( api.WatchLogger, fmt.Sprintf("service %q restarted", serviceName)) + } + eg, ctx := errgroup.WithContext(ctx) + for _, b := range batch { + if b.Trigger.Action == "sync+exec" { + containers, err := s.getContainers(ctx, project.Name, oneOffExclude, false, serviceName) + if err != nil { + return err + } + x := b.Trigger.Exec + for _, c := range containers { + eg.Go(func() error { + exec := ccli.NewExecOptions() + exec.User = x.User + exec.Privileged = x.Privileged + exec.Command = x.Command + exec.Workdir = x.WorkingDir + for _, v := range x.Environment.ToMapping().Values() { + err := exec.Env.Set(v) + if err != nil { + return err + } + } + return ccli.RunExec(ctx, s.dockerCli, c.ID, exec) + }) + } + } + } + return eg.Wait() +} + +func (s *composeService) rebuild(ctx context.Context, project *types.Project, serviceName string, options api.WatchOptions) error { + options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName)) + // restrict the build to ONLY this service, not any of its dependencies + options.Build.Services = []string{serviceName} + imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil) + + if err != nil { + options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err)) + return err + } + if options.Prune { + s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap) + } + + options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName)) + + err = s.create(ctx, project, api.CreateOptions{ + Services: []string{serviceName}, + Inherit: true, + Recreate: api.RecreateForce, + }) + if err != nil { + options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err)) + return err + } + + services := []string{serviceName} + p, err := project.WithSelectedServices(services) + if err != nil { + return err + } + err = s.start(ctx, project.Name, api.StartOptions{ + Project: p, + Services: services, + AttachTo: services, + }, nil) + if err != nil { + options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err)) } return nil } diff --git a/pkg/compose/watch_test.go b/pkg/compose/watch_test.go index 31fd6545d6a..cfe4095cfc0 100644 --- a/pkg/compose/watch_test.go +++ b/pkg/compose/watch_test.go @@ -48,7 +48,7 @@ func TestDebounceBatching(t *testing.T) { if i%2 == 0 { action = "b" } - ch <- fileEvent{Action: action} + ch <- fileEvent{Trigger: types.Trigger{Action: action}} } // we sent 100 events + the debouncer clock.BlockUntil(101) @@ -56,8 +56,8 @@ func TestDebounceBatching(t *testing.T) { select { case batch := <-eventBatchCh: require.ElementsMatch(t, batch, []fileEvent{ - {Action: "a"}, - {Action: "b"}, + {Trigger: types.Trigger{Action: "a"}}, + {Trigger: types.Trigger{Action: "b"}}, }) case <-time.After(50 * time.Millisecond): t.Fatal("timed out waiting for events") diff --git a/pkg/e2e/fixtures/watch/exec.yaml b/pkg/e2e/fixtures/watch/exec.yaml new file mode 100644 index 00000000000..ac1947fb18a --- /dev/null +++ b/pkg/e2e/fixtures/watch/exec.yaml @@ -0,0 +1,14 @@ +services: + test: + build: + dockerfile_inline: FROM alpine + command: ping localhost + volumes: + - /data + develop: + watch: + - path: . + target: /data + action: sync+exec + exec: + command: echo "SUCCESS" \ No newline at end of file diff --git a/pkg/e2e/watch_test.go b/pkg/e2e/watch_test.go index d7b56435ef2..9b7b442a162 100644 --- a/pkg/e2e/watch_test.go +++ b/pkg/e2e/watch_test.go @@ -17,6 +17,7 @@ package e2e import ( + "bytes" "crypto/rand" "fmt" "os" @@ -293,3 +294,42 @@ func doTest(t *testing.T, svcName string) { testComplete.Store(true) } + +func TestWatchExec(t *testing.T) { + cli := NewCLI(t) + const projectName = "test_watch_exec" + + t.Cleanup(func() { + cli.RunDockerComposeCmd(t, "-p", projectName, "down") + }) + + tmpdir := t.TempDir() + composeFilePath := filepath.Join(tmpdir, "compose.yaml") + CopyFile(t, filepath.Join("fixtures", "watch", "exec.yaml"), composeFilePath) + cmd := cli.NewDockerComposeCmd(t, "-p", projectName, "-f", composeFilePath, "up", "--watch") + buffer := bytes.NewBuffer(nil) + cmd.Stdout = buffer + watch := icmd.StartCmd(cmd) + + poll.WaitOn(t, func(l poll.LogT) poll.Result { + out := buffer.String() + if strings.Contains(out, "64 bytes from") { + return poll.Success() + } + return poll.Continue("%v", watch.Stdout()) + }) + + t.Logf("Create new file") + + testFile := filepath.Join(tmpdir, "test") + require.NoError(t, os.WriteFile(testFile, []byte("test\n"), 0o600)) + + poll.WaitOn(t, func(l poll.LogT) poll.Result { + out := buffer.String() + if strings.Contains(out, "SUCCESS") { + return poll.Success() + } + return poll.Continue("%v", out) + }) + cli.RunDockerComposeCmdNoCheck(t, "-p", projectName, "kill", "-s", "9") +}