Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change all Start() to Start([]namespaces) to consume updated namespaces where resources get deployed. #5487

Merged
merged 4 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/skaffold/kubernetes/debugging/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,24 @@ type ContainerManager struct {
events chan kubernetes.PodEvent
}

func NewContainerManager(podSelector kubernetes.PodSelector, namespaces []string) *ContainerManager {
func NewContainerManager(podSelector kubernetes.PodSelector) *ContainerManager {
// Create the channel here as Stop() may be called before Start() when a build fails, thus
// avoiding the possibility of closing a nil channel. Channels are cheap.
return &ContainerManager{
podWatcher: kubernetes.NewPodWatcher(podSelector, namespaces),
podWatcher: kubernetes.NewPodWatcher(podSelector),
active: map[string]string{},
events: make(chan kubernetes.PodEvent),
}
}

func (d *ContainerManager) Start(ctx context.Context) error {
func (d *ContainerManager) Start(ctx context.Context, namespaces []string) error {
if d == nil {
// debug mode probably not enabled
return nil
}

d.podWatcher.Register(d.events)
stopWatcher, err := d.podWatcher.Start()
stopWatcher, err := d.podWatcher.Start(namespaces)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ func TestContainerManagerZeroValue(t *testing.T) {
var m *ContainerManager

// Should not raise a nil dereference
m.Start(context.Background())
m.Start(context.Background(), nil)
m.Stop()
}
8 changes: 4 additions & 4 deletions pkg/skaffold/kubernetes/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ type Config interface {
}

// NewLogAggregator creates a new LogAggregator for a given output.
func NewLogAggregator(out io.Writer, cli *kubectl.CLI, imageNames []string, podSelector PodSelector, namespaces []string, config Config) *LogAggregator {
func NewLogAggregator(out io.Writer, cli *kubectl.CLI, imageNames []string, podSelector PodSelector, config Config) *LogAggregator {
return &LogAggregator{
output: out,
kubectlcli: cli,
config: config,
podWatcher: NewPodWatcher(podSelector, namespaces),
podWatcher: NewPodWatcher(podSelector),
colorPicker: NewColorPicker(imageNames),
events: make(chan PodEvent),
}
Expand All @@ -76,14 +76,14 @@ func (a *LogAggregator) SetSince(t time.Time) {

// Start starts a logger that listens to pods and tail their logs
// if they are matched by the `podSelector`.
func (a *LogAggregator) Start(ctx context.Context) error {
func (a *LogAggregator) Start(ctx context.Context, namespaces []string) error {
if a == nil {
// Logs are not activated.
return nil
}

a.podWatcher.Register(a.events)
stopWatcher, err := a.podWatcher.Start()
stopWatcher, err := a.podWatcher.Start(namespaces)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/skaffold/kubernetes/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestLogAggregatorZeroValue(t *testing.T) {
var m *LogAggregator

// Should not raise a nil dereference
m.Start(context.Background())
m.Start(context.Background(), []string{})
m.Mute()
m.Unmute()
m.Stop()
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestPrefix(t *testing.T) {
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
logger := NewLogAggregator(nil, nil, nil, nil, nil, &mockConfig{log: latest.LogsConfig{
logger := NewLogAggregator(nil, nil, nil, nil, &mockConfig{log: latest.LogsConfig{
Prefix: test.prefix,
}})

Expand Down
10 changes: 5 additions & 5 deletions pkg/skaffold/kubernetes/portforward/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

// Forwarder is an interface that can modify and manage port-forward processes
type Forwarder interface {
Start(ctx context.Context) error
Start(ctx context.Context, namespaces []string) error
Stop()
}

Expand All @@ -42,9 +42,9 @@ func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes
entryManager := NewEntryManager(out, NewKubectlForwarder(out, cli))

var forwarders []Forwarder
forwarders = append(forwarders, NewResourceForwarder(entryManager, namespaces, label, userDefined))
forwarders = append(forwarders, NewResourceForwarder(entryManager, label, userDefined))
if opts.ForwardPods {
forwarders = append(forwarders, NewWatchingPodForwarder(entryManager, podSelector, namespaces))
forwarders = append(forwarders, NewWatchingPodForwarder(entryManager, podSelector))
}

return &ForwarderManager{
Expand All @@ -53,14 +53,14 @@ func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes
}

// Start begins all forwarders managed by the ForwarderManager
func (p *ForwarderManager) Start(ctx context.Context) error {
func (p *ForwarderManager) Start(ctx context.Context, namespaces []string) error {
// Port forwarding is not enabled.
if p == nil {
return nil
}

for _, f := range p.forwarders {
if err := f.Start(ctx); err != nil {
if err := f.Start(ctx, namespaces); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ func TestForwarderManagerZeroValue(t *testing.T) {
var m *ForwarderManager

// Should not raise a nil dereference
m.Start(context.Background())
m.Start(context.Background(), nil)
m.Stop()
}
8 changes: 4 additions & 4 deletions pkg/skaffold/kubernetes/portforward/pod_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ type WatchingPodForwarder struct {
}

// NewWatchingPodForwarder returns a struct that tracks and port-forwards pods as they are created and modified
func NewWatchingPodForwarder(entryManager *EntryManager, podSelector kubernetes.PodSelector, namespaces []string) *WatchingPodForwarder {
func NewWatchingPodForwarder(entryManager *EntryManager, podSelector kubernetes.PodSelector) *WatchingPodForwarder {
return &WatchingPodForwarder{
entryManager: entryManager,
podWatcher: newPodWatcher(podSelector, namespaces),
podWatcher: newPodWatcher(podSelector),
events: make(chan kubernetes.PodEvent),
}
}

func (p *WatchingPodForwarder) Start(ctx context.Context) error {
func (p *WatchingPodForwarder) Start(ctx context.Context, namespaces []string) error {
p.podWatcher.Register(p.events)
stopWatcher, err := p.podWatcher.Start()
stopWatcher, err := p.podWatcher.Start(namespaces)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/skaffold/kubernetes/portforward/pod_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func TestAutomaticPortForwardPod(t *testing.T) {
entryManager := NewEntryManager(ioutil.Discard, nil)
entryManager.entryForwarder = test.forwarder

p := NewWatchingPodForwarder(entryManager, kubernetes.NewImageList(), nil)
p := NewWatchingPodForwarder(entryManager, kubernetes.NewImageList())
for _, pod := range test.pods {
err := p.portForwardPod(context.Background(), pod)
t.CheckError(test.shouldErr, err)
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestStartPodForwarder(t *testing.T) {
testutil.Run(t, test.description, func(t *testutil.T) {
event.InitializeState([]latest.Pipeline{{}}, "", true, true, true)
t.Override(&topLevelOwnerKey, func(context.Context, metav1.Object, string) string { return "owner" })
t.Override(&newPodWatcher, func(kubernetes.PodSelector, []string) kubernetes.PodWatcher {
t.Override(&newPodWatcher, func(kubernetes.PodSelector) kubernetes.PodWatcher {
return &fakePodWatcher{
events: []kubernetes.PodEvent{test.event},
}
Expand All @@ -488,8 +488,8 @@ func TestStartPodForwarder(t *testing.T) {
fakeForwarder := newTestForwarder()
entryManager := NewEntryManager(ioutil.Discard, fakeForwarder)

p := NewWatchingPodForwarder(entryManager, imageList, nil)
p.Start(context.Background())
p := NewWatchingPodForwarder(entryManager, imageList)
p.Start(context.Background(), nil)

// wait for the pod resource to be forwarded
err := wait.PollImmediate(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) {
Expand All @@ -512,7 +512,7 @@ func (f *fakePodWatcher) Register(receiver chan<- kubernetes.PodEvent) {
f.receiver = receiver
}

func (f *fakePodWatcher) Start() (func(), error) {
func (f *fakePodWatcher) Start(namespaces []string) (func(), error) {
go func() {
for _, event := range f.events {
f.receiver <- event
Expand Down
33 changes: 15 additions & 18 deletions pkg/skaffold/kubernetes/portforward/resource_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
// services deployed by skaffold.
type ResourceForwarder struct {
entryManager *EntryManager
namespaces []string
label string
userDefinedResources []*latest.PortForwardResource
}
Expand All @@ -46,37 +45,35 @@ var (
)

// NewResourceForwarder returns a struct that tracks and port-forwards services as they are created and modified
func NewResourceForwarder(entryManager *EntryManager, namespaces []string, label string, userDefinedResources []*latest.PortForwardResource) *ResourceForwarder {
func NewResourceForwarder(entryManager *EntryManager, label string, userDefinedResources []*latest.PortForwardResource) *ResourceForwarder {
return &ResourceForwarder{
entryManager: entryManager,
label: label,
userDefinedResources: userDefinedResources,
}
}

// Start gets a list of services deployed by skaffold as []latest.PortForwardResource and
// forwards them.
func (p *ResourceForwarder) Start(ctx context.Context, namespaces []string) error {
if len(namespaces) == 1 {
for _, pf := range userDefinedResources {
for _, pf := range p.userDefinedResources {
if pf.Namespace == "" {
pf.Namespace = namespaces[0]
}
}
} else {
var validResources []*latest.PortForwardResource
for _, pf := range userDefinedResources {
for _, pf := range p.userDefinedResources {
if pf.Namespace != "" {
validResources = append(validResources, pf)
} else {
logrus.Warnf("Skipping the port forwarding resource %s/%s because namespace is not specified", pf.Type, pf.Name)
}
}
userDefinedResources = validResources
p.userDefinedResources = validResources
}

return &ResourceForwarder{
entryManager: entryManager,
namespaces: namespaces,
label: label,
userDefinedResources: userDefinedResources,
}
}

// Start gets a list of services deployed by skaffold as []latest.PortForwardResource and
// forwards them.
func (p *ResourceForwarder) Start(ctx context.Context) error {
serviceResources, err := retrieveServices(ctx, p.label, p.namespaces)
serviceResources, err := retrieveServices(ctx, p.label, namespaces)
if err != nil {
return fmt.Errorf("retrieving services for automatic port forwarding: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/skaffold/kubernetes/portforward/resource_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func TestStart(t *testing.T) {
fakeForwarder := newTestForwarder()
entryManager := NewEntryManager(ioutil.Discard, fakeForwarder)

rf := NewResourceForwarder(entryManager, []string{"test"}, "", nil)
if err := rf.Start(context.Background()); err != nil {
rf := NewResourceForwarder(entryManager, "", nil)
if err := rf.Start(context.Background(), []string{"test"}); err != nil {
t.Fatalf("error starting resource forwarder: %v", err)
}

Expand Down Expand Up @@ -194,7 +194,7 @@ func TestGetCurrentEntryFunc(t *testing.T) {
entryManager.forwardedResources = forwardedResources{
resources: test.forwardedResources,
}
rf := NewResourceForwarder(entryManager, []string{"test"}, "", nil)
rf := NewResourceForwarder(entryManager, "", nil)
actualEntry := rf.getCurrentEntry(test.resource)

expectedEntry := test.expected
Expand Down Expand Up @@ -272,8 +272,8 @@ func TestUserDefinedResources(t *testing.T) {
fakeForwarder := newTestForwarder()
entryManager := NewEntryManager(ioutil.Discard, fakeForwarder)

rf := NewResourceForwarder(entryManager, test.namespaces, "", test.userResources)
if err := rf.Start(context.Background()); err != nil {
rf := NewResourceForwarder(entryManager, "", test.userResources)
if err := rf.Start(context.Background(), test.namespaces); err != nil {
t.Fatalf("error starting resource forwarder: %v", err)
}

Expand Down
10 changes: 4 additions & 6 deletions pkg/skaffold/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ import (

type PodWatcher interface {
Register(receiver chan<- PodEvent)
Start() (func(), error)
Start(ns []string) (func(), error)
}

// podWatcher is a pod watcher for multiple namespaces.
type podWatcher struct {
podSelector PodSelector
namespaces []string
receivers []chan<- PodEvent
}

Expand All @@ -46,18 +45,17 @@ type PodEvent struct {
Pod *v1.Pod
}

func NewPodWatcher(podSelector PodSelector, namespaces []string) PodWatcher {
func NewPodWatcher(podSelector PodSelector) PodWatcher {
return &podWatcher{
podSelector: podSelector,
namespaces: namespaces,
}
}

func (w *podWatcher) Register(receiver chan<- PodEvent) {
w.receivers = append(w.receivers, receiver)
}

func (w *podWatcher) Start() (func(), error) {
func (w *podWatcher) Start(namespaces []string) (func(), error) {
if len(w.receivers) == 0 {
return func() {}, errors.New("no receiver was registered")
}
Expand All @@ -76,7 +74,7 @@ func (w *podWatcher) Start() (func(), error) {

var forever int64 = 3600 * 24 * 365 * 100

for _, ns := range w.namespaces {
for _, ns := range namespaces {
watcher, err := kubeclient.CoreV1().Pods(ns).Watch(context.Background(), metav1.ListOptions{
TimeoutSeconds: &forever,
})
Expand Down
16 changes: 8 additions & 8 deletions pkg/skaffold/kubernetes/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (h *hasName) Select(pod *v1.Pod) bool {

func TestPodWatcher(t *testing.T) {
testutil.Run(t, "need to register first", func(t *testutil.T) {
watcher := NewPodWatcher(&anyPod{}, []string{"ns"})
cleanup, err := watcher.Start()
watcher := NewPodWatcher(&anyPod{})
cleanup, err := watcher.Start([]string{"ns"})
defer cleanup()

t.CheckErrorContains("no receiver was registered", err)
Expand All @@ -70,9 +70,9 @@ func TestPodWatcher(t *testing.T) {
testutil.Run(t, "fail to get client", func(t *testutil.T) {
t.Override(&client.Client, func() (kubernetes.Interface, error) { return nil, errors.New("unable to get client") })

watcher := NewPodWatcher(&anyPod{}, []string{"ns"})
watcher := NewPodWatcher(&anyPod{})
watcher.Register(make(chan PodEvent))
cleanup, err := watcher.Start()
cleanup, err := watcher.Start([]string{"ns"})
defer cleanup()

t.CheckErrorContains("unable to get client", err)
Expand All @@ -86,9 +86,9 @@ func TestPodWatcher(t *testing.T) {
return true, nil, errors.New("unable to watch")
})

watcher := NewPodWatcher(&anyPod{}, []string{"ns"})
watcher := NewPodWatcher(&anyPod{})
watcher.Register(make(chan PodEvent))
cleanup, err := watcher.Start()
cleanup, err := watcher.Start([]string{"ns"})
defer cleanup()

t.CheckErrorContains("unable to watch", err)
Expand All @@ -102,9 +102,9 @@ func TestPodWatcher(t *testing.T) {
validNames: []string{"pod1", "pod2", "pod3"},
}
events := make(chan PodEvent)
watcher := NewPodWatcher(podSelector, []string{"ns1", "ns2"})
watcher := NewPodWatcher(podSelector)
watcher.Register(events)
cleanup, err := watcher.Start()
cleanup, err := watcher.Start([]string{"ns1", "ns2"})
defer cleanup()
t.CheckNoError(err)

Expand Down
5 changes: 2 additions & 3 deletions pkg/skaffold/runner/build_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ func (r *SkaffoldRunner) DeployAndLog(ctx context.Context, out io.Writer, artifa

// Logs should be retrieved up to just before the deploy
logger.SetSince(time.Now())

// First deploy
if err := r.Deploy(ctx, out, artifacts); err != nil {
return err
Expand All @@ -117,12 +116,12 @@ func (r *SkaffoldRunner) DeployAndLog(ctx context.Context, out io.Writer, artifa
forwarderManager := r.createForwarder(out)
defer forwarderManager.Stop()

if err := forwarderManager.Start(ctx); err != nil {
if err := forwarderManager.Start(ctx, r.runCtx.GetNamespaces()); err != nil {
logrus.Warnln("Error starting port forwarding:", err)
}

// Start printing the logs after deploy is finished
if err := logger.Start(ctx); err != nil {
if err := logger.Start(ctx, r.runCtx.Namespaces); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetNamespaces()

(we should un-export these fields… someday)

return fmt.Errorf("starting logger: %w", err)
}

Expand Down
Loading