Skip to content

Commit

Permalink
Initialize event server only once
Browse files Browse the repository at this point in the history
And only stop it at the end.
Fix #2281

Signed-off-by: David Gageot <david@gageot.net>
  • Loading branch information
dgageot committed Jun 19, 2019
1 parent 180613e commit c12b56d
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 27 deletions.
3 changes: 0 additions & 3 deletions pkg/skaffold/runner/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type RunContext struct {
Opts *config.SkaffoldOptions
Cfg *latest.Pipeline

Trigger chan bool

DefaultRepo string
KubeContext string
WorkingDir string
Expand Down Expand Up @@ -85,6 +83,5 @@ func GetRunContext(opts *config.SkaffoldOptions, cfg *latest.Pipeline) (*RunCont
KubeContext: kubeContext,
Namespaces: namespaces,
InsecureRegistries: insecureRegistries,
Trigger: make(chan bool),
}, nil
}
3 changes: 1 addition & 2 deletions pkg/skaffold/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,11 @@ func NewForConfig(opts *config.SkaffoldOptions, cfg *latest.SkaffoldConfig) (*Sk
return nil, errors.Wrap(err, "creating watch trigger")
}

shutdown, err := server.Initialize(runCtx)
shutdown, err := server.Initialize(opts)
if err != nil {
return nil, errors.Wrap(err, "initializing skaffold server")
}
event.InitializeState(runCtx)

event.LogSkaffoldMetadata(version.Get())

return &SkaffoldRunner{
Expand Down
31 changes: 19 additions & 12 deletions pkg/skaffold/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"net/http"
"sync"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
runcontext "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/context"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/server/proto"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
Expand All @@ -33,13 +33,22 @@ import (
"google.golang.org/grpc"
)

var once sync.Once
var (
once sync.Once
callback func() error
err error
)

// Trigger TODO(dgageot): this was a global variable carried by the runCtx.
// I changed it to a plain global variable to fix an issue.
// This needs to be better addressed.
var Trigger = make(chan bool)

type server struct {
trigger chan bool
}

func newGRPCServer(port int, trigger chan bool) (func() error, error) {
func newGRPCServer(port int) (func() error, error) {
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", util.Loopback, port))
if err != nil {
return func() error { return nil }, errors.Wrap(err, "creating listener")
Expand All @@ -48,7 +57,7 @@ func newGRPCServer(port int, trigger chan bool) (func() error, error) {

s := grpc.NewServer()
proto.RegisterSkaffoldServiceServer(s, &server{
trigger: trigger,
trigger: Trigger,
})

go func() {
Expand Down Expand Up @@ -84,32 +93,30 @@ func newHTTPServer(port, proxyPort int) (func() error, error) {
// Initialize creates the gRPC and HTTP servers for serving the state and event log.
// It returns a shutdown callback for tearing down the grpc server,
// which the runner is responsible for calling.
func Initialize(runctx *runcontext.RunContext) (func() error, error) {
var callback func() error
var err error
func Initialize(opts *config.SkaffoldOptions) (func() error, error) {
once.Do(func() {
callback, err = initialize(runctx)
callback, err = initialize(opts)
})
return callback, err
}

func initialize(runctx *runcontext.RunContext) (func() error, error) {
originalRPCPort := runctx.Opts.RPCPort
func initialize(opts *config.SkaffoldOptions) (func() error, error) {
originalRPCPort := opts.RPCPort
if originalRPCPort == -1 {
return func() error { return nil }, nil
}
rpcPort := util.GetAvailablePort(originalRPCPort, &sync.Map{})
if rpcPort != originalRPCPort && originalRPCPort != constants.DefaultRPCPort {
logrus.Warnf("provided port %d already in use: using %d instead", originalRPCPort, rpcPort)
}
grpcCallback, err := newGRPCServer(rpcPort, runctx.Trigger)
grpcCallback, err := newGRPCServer(rpcPort)
if err != nil {
return grpcCallback, errors.Wrap(err, "starting gRPC server")
}
m := &sync.Map{}
m.Store(rpcPort, true)

originalHTTPPort := runctx.Opts.RPCHTTPPort
originalHTTPPort := opts.RPCHTTPPort
httpPort := util.GetAvailablePort(originalHTTPPort, m)
if httpPort != originalHTTPPort && originalHTTPPort != constants.DefaultRPCHTTPPort {
logrus.Warnf("provided port %d already in use: using %d instead", originalHTTPPort, httpPort)
Expand Down
12 changes: 4 additions & 8 deletions pkg/skaffold/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"testing"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
runcontext "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/context"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/server/proto"
"google.golang.org/grpc"
)
Expand All @@ -34,13 +33,10 @@ var (

func TestServerStartup(t *testing.T) {
// start up servers
runCtx := &runcontext.RunContext{
Opts: &config.SkaffoldOptions{
RPCPort: rpcAddr,
RPCHTTPPort: httpAddr,
},
}
initialize(runCtx)
initialize(&config.SkaffoldOptions{
RPCPort: rpcAddr,
RPCHTTPPort: httpAddr,
})

// create gRPC client and ensure we can connect
conn, err := grpc.Dial(fmt.Sprintf(":%d", rpcAddr), grpc.WithInsecure())
Expand Down
3 changes: 2 additions & 1 deletion pkg/skaffold/watch/triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/color"
runcontext "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/context"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/server"
"github.com/rjeczalik/notify"
"github.com/sirupsen/logrus"
)
Expand All @@ -54,7 +55,7 @@ func NewTrigger(runctx *runcontext.RunContext) (Trigger, error) {
return &manualTrigger{}, nil
case "api":
return &apiTrigger{
Trigger: runctx.Trigger,
Trigger: server.Trigger,
}, nil
default:
return nil, fmt.Errorf("unsupported trigger: %s", runctx.Opts.Trigger)
Expand Down
5 changes: 4 additions & 1 deletion pkg/skaffold/watch/triggers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/config"
runcontext "github.com/GoogleContainerTools/skaffold/pkg/skaffold/runner/context"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/server"
"github.com/GoogleContainerTools/skaffold/testutil"
)

Expand Down Expand Up @@ -55,7 +56,9 @@ func TestNewTrigger(t *testing.T) {
{
description: "api trigger",
opts: &config.SkaffoldOptions{Trigger: "api"},
expected: &apiTrigger{},
expected: &apiTrigger{
Trigger: server.Trigger,
},
},
{
description: "unknown trigger",
Expand Down

0 comments on commit c12b56d

Please sign in to comment.