Skip to content

Commit

Permalink
Fixes intelsdi-x#1058: Adds code to ensure Snap restarts in response …
Browse files Browse the repository at this point in the history
…to a SIGHUP
  • Loading branch information
Tom McSweeney committed Jul 13, 2016
1 parent 93eb909 commit b95a3e6
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 51 deletions.
39 changes: 32 additions & 7 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ type pluginControl struct {

pluginTrust int
keyringFiles []string
// used to cleanly shutdown the GRPC server
grpcServer *grpc.Server
closingChan chan bool
wg sync.WaitGroup
}

type runsPlugins interface {
Expand Down Expand Up @@ -336,23 +340,38 @@ func (p *pluginControl) Start() error {
}

opts := []grpc.ServerOption{}
grpcServer := grpc.NewServer(opts...)
rpc.RegisterMetricManagerServer(grpcServer, &ControlGRPCServer{p})
p.closingChan = make(chan bool, 1)
p.grpcServer = grpc.NewServer(opts...)
rpc.RegisterMetricManagerServer(p.grpcServer, &ControlGRPCServer{p})
p.wg.Add(1)
go func() {
err := grpcServer.Serve(lis)
defer p.wg.Done()
err := p.grpcServer.Serve(lis)
if err != nil {
controlLogger.Fatal(err)
select {
case <-p.closingChan:
// If we called Stop() then there will be a value in p.closingChan, so
// we'll get here and we can exit without showing the error.
default:
controlLogger.Fatal(err)
}
}
}()

return nil
}

func (p *pluginControl) Stop() {
// set the Started flag to false (since we're stopping the server)
p.Started = false
controlLogger.WithFields(log.Fields{
"_block": "stop",
}).Info("control stopped")

// and add a boolean to the p.closingChan (used for error handling in the
// goroutine that is listening for connections)
p.closingChan <- true

// stop GRPC server
p.grpcServer.Stop()
p.wg.Wait()

// stop runner
err := p.pluginRunner.Stop()
Expand All @@ -368,6 +387,12 @@ func (p *pluginControl) Stop() {

// unload plugins
p.pluginManager.teardown()

// log that we've stopped the control module
controlLogger.WithFields(log.Fields{
"_block": "stop",
}).Info("control stopped")

}

// Load is the public method to load a plugin into
Expand Down
4 changes: 2 additions & 2 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,10 +1123,10 @@ func TestCollectDynamicMetrics(t *testing.T) {
pool.SelectAndKill("1", "unsubscription event")
So(pool.Count(), ShouldEqual, 0)
So(pool.SubscriptionCount(), ShouldEqual, 0)
c.Stop()
time.Sleep(100 * time.Millisecond)
})
})
c.Stop()
time.Sleep(100 * time.Millisecond)
})
}

Expand Down
11 changes: 11 additions & 0 deletions docs/SNAPD_CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ The same configuration settings above can also be provided in a JSON formatted c
}
```

## Restarting snapd to pick up configuration changes
If changes are made to the configuration file, `snapd` must be restarted to pick up those changes. Fortunately, this is a simple matter of sending a `SIGHUP` signal to the `snapd` process. For example, the following command will restart the `snapd` process on the local system:

```bash
$ kill -HUP `pidof snapd`
```

Note that in this example, we are using the `pidof` command to retrieve the process ID of the `snapd` process. If the `pidof` command is not available on your system you might have to use a `ps aux` command and pipe the output of that command to a `grep snapd` command in order to obtain the process ID of the `snapd` process. Once the `snapd` process receives that signal it will restart and pick up any changes that have been made to the configuration file that was originally used to Âstart the `snapd` process.

Do keep in mind that this signal will trigger a **restart** of the `snapd` process. This means that any running tasks will be shut down and any loaded plugins will be unloaded. In reality, this means that when the `snapd` process restarts any plugins not in the `auto_discover_path` will need to be loaded manually once the `snapd` process restarts (and any tasks not in that same `auto_discover_path` will need to be restarted). However, any plugins in the `auto_discover_path` will be automatically reloaded and any tasks in that same `auto_discover_path` will be automatically restarted when the when the `snapd` process restarts in response to a `SIGHUP` signal.

## More information
* [SNAPD.md](SNAPD.md)
* [REST_API.md](REST_API.md)
Expand Down
64 changes: 54 additions & 10 deletions mgmt/rest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package rest

import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -169,14 +170,17 @@ type Server struct {
mc managesConfig
n *negroni.Negroni
r *httprouter.Router
tls *tls
snapTLS *snapTLS
auth bool
authpwd string
addrString string
addr net.Addr
wg sync.WaitGroup
killChan chan struct{}
err chan error
// the following instance variables are used to cleanly shutdown the server
serverListener net.Listener
closingChan chan bool
}

// New creates a REST API server with a given config
Expand All @@ -192,7 +196,7 @@ func New(cfg *Config) (*Server, error) {
}
if https {
var err error
s.tls, err = newtls(cpath, kpath)
s.snapTLS, err = newtls(cpath, kpath)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -324,6 +328,7 @@ func (s *Server) Name() string {
}

func (s *Server) Start() error {
s.closingChan = make(chan bool, 1)
s.addRoutes()
s.run(s.addrString)
restLogger.WithFields(log.Fields{
Expand All @@ -333,8 +338,19 @@ func (s *Server) Start() error {
}

func (s *Server) Stop() {
// add a boolean to the s.closingChan (used for error handling in the
// goroutine that is listening for connections)
s.closingChan <- true
// then close the server
close(s.killChan)
// close the server listener
s.serverListener.Close()
// wait for the server goroutines to complete (serve and watch)
s.wg.Wait()
// finally log the result
restLogger.WithFields(log.Fields{
"_block": "stop",
}).Info("REST stopped")
}

func (s *Server) Err() <-chan error {
Expand All @@ -347,31 +363,59 @@ func (s *Server) Port() int {

func (s *Server) run(addrString string) {
restLogger.Info("Starting REST API on ", addrString)
if s.tls != nil {
go s.serveTLS(addrString)
if s.snapTLS != nil {
cer, err := tls.LoadX509KeyPair(s.snapTLS.cert, s.snapTLS.key)
if err != nil {
s.err <- err
return
}
config := &tls.Config{Certificates: []tls.Certificate{cer}}
ln, err := tls.Listen("tcp", addrString, config)
if err != nil {
s.err <- err
}
s.serverListener = ln
s.wg.Add(1)
go s.serveTLS(ln)
} else {
ln, err := net.Listen("tcp", addrString)
if err != nil {
s.err <- err
}
s.serverListener = ln
s.addr = ln.Addr()
s.wg.Add(1)
go s.serve(ln)
}
}

func (s *Server) serveTLS(addrString string) {
err := http.ListenAndServeTLS(addrString, s.tls.cert, s.tls.key, s.n)
func (s *Server) serveTLS(ln net.Listener) {
defer s.wg.Done()
err := http.Serve(ln, s.n)
if err != nil {
restLogger.Error(err)
s.err <- err
select {
case <-s.closingChan:
// If we called Stop() then there will be a value in s.closingChan, so
// we'll get here and we can exit without showing the error.
default:
restLogger.Error(err)
s.err <- err
}
}
}

func (s *Server) serve(ln net.Listener) {
defer s.wg.Done()
err := http.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)}, s.n)
if err != nil {
restLogger.Error(err)
s.err <- err
select {
case <-s.closingChan:
// If we called Stop() then there will be a value in s.closingChan, so
// we'll get here and we can exit without showing the error.
default:
restLogger.Error(err)
s.err <- err
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions mgmt/rest/tls.go → mgmt/rest/snapTLS.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import (
"time"
)

type tls struct {
type snapTLS struct {
cert, key string
}

func newtls(certPath, keyPath string) (*tls, error) {
t := &tls{}
func newtls(certPath, keyPath string) (*snapTLS, error) {
t := &snapTLS{}
if certPath != "" && keyPath != "" {
cert, err := os.Open(certPath)
if err != nil {
Expand Down Expand Up @@ -78,7 +78,7 @@ func newtls(certPath, keyPath string) (*tls, error) {
return t, nil
}

func generateCert(t *tls) error {
func generateCert(t *snapTLS) error {
// good for 1 year
notBefore := time.Now()
notAfter := notBefore.Add(time.Hour * 24 * 365)
Expand Down
68 changes: 60 additions & 8 deletions scheduler/distributed_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"testing"
"time"

"github.com/intelsdi-x/gomit"
"github.com/intelsdi-x/snap/control"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/scheduler_event"
"github.com/intelsdi-x/snap/core/serror"
"github.com/intelsdi-x/snap/grpc/controlproxy"
"github.com/intelsdi-x/snap/pkg/schedule"
Expand Down Expand Up @@ -89,6 +91,12 @@ func TestDistributedWorkflow(t *testing.T) {
t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, true)
So(len(errs.Errors()), ShouldEqual, 0)
So(t, ShouldNotBeNil)
// stop the scheduler and control (since in nested Convey statements, the
// statements in the outer Convey execute for each of the inner Conveys
// independently; see https://github.com/smartystreets/goconvey/wiki/Execution-order
// for details on execution order in Convey)
sch.Stop()
c2.Stop()
})

Convey("Test task with invalid remote port", func() {
Expand All @@ -97,6 +105,12 @@ func TestDistributedWorkflow(t *testing.T) {
t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, true)
So(len(errs.Errors()), ShouldEqual, 1)
So(t, ShouldBeNil)
// stop the scheduler and control (since in nested Convey statements, the
// statements in the outer Convey execute for each of the inner Conveys
// independently; see https://github.com/smartystreets/goconvey/wiki/Execution-order
// for details on execution order in Convey)
sch.Stop()
c2.Stop()
})

Convey("Test task without remote plugin", func() {
Expand All @@ -106,30 +120,68 @@ func TestDistributedWorkflow(t *testing.T) {
t, errs := sch.CreateTask(schedule.NewSimpleSchedule(time.Second), wf, true)
So(len(errs.Errors()), ShouldEqual, 1)
So(t, ShouldBeNil)
// stop the scheduler and control (since in nested Convey statements, the
// statements in the outer Convey execute for each of the inner Conveys
// independently; see https://github.com/smartystreets/goconvey/wiki/Execution-order
// for details on execution order in Convey)
sch.Stop()
c2.Stop()
})

Convey("Test task failing when control is stopped while task is running", func() {
wf := dsWFMap(port1)
controlproxy.MAX_CONNECTION_TIMEOUT = 10 * time.Second
// set timeout so that connection attempt through the controlproxy will fail after 1 second
controlproxy.MAX_CONNECTION_TIMEOUT = time.Second
// define an interval that the simple scheduler will run on every 100ms
interval := time.Millisecond * 100
// create our task; should be disabled after 3 failures
t, errs := sch.CreateTask(schedule.NewSimpleSchedule(interval), wf, true)
// ensure task was created successfully
So(len(errs.Errors()), ShouldEqual, 0)
So(t, ShouldNotBeNil)
// create a channel to listen on for a response and setup an event handler
// that will respond on that channel once the 'TaskDisabledEvent' arrives
respChan := make(chan struct{})
sch.RegisterEventHandler("test", &failHandler{respChan})
// then stop the controller
c2.Stop()
// Give task time to fail
time.Sleep(time.Second)
tasks := sch.GetTasks()
var task core.Task
for _, v := range tasks {
task = v
// and wait for the response (with a 30 second timeout; just in case)
var ok bool
select {
case <-time.After(30 * time.Second):
// if get here, the select timed out waiting for a response; we don't
// expect to hit this timeout since it should only take 3 seconds for
// the workflow to fail to connect to the gRPC server three times, but
// it might if the task did not fail as expected
So("Timeout triggered waiting for disabled event", ShouldBeBlank)
case <-respChan:
// if get here, we got a response on the respChan
ok = true
}
So(task.State(), ShouldEqual, core.TaskDisabled)
So(ok, ShouldEqual, true)
// stop the scheduler (since in nested Convey statements, the
// statements in the outer Convey execute for each of the inner Conveys
// independently; see https://github.com/smartystreets/goconvey/wiki/Execution-order
// for details on execution order in Convey)
sch.Stop()
})

})

}

type failHandler struct {
respChan chan struct{}
}

func (f *failHandler) HandleGomitEvent(ev gomit.Event) {
switch ev.Body.(type) {
case *scheduler_event.TaskDisabledEvent:
close(f.respChan)
default:
}
}

func TestDistributedSubscriptions(t *testing.T) {

Convey("Load control/scheduler with a mock remote scheduler", t, func() {
Expand Down
Loading

0 comments on commit b95a3e6

Please sign in to comment.