Skip to content

Commit

Permalink
Ensure input services can be safely opened and closed
Browse files Browse the repository at this point in the history
  • Loading branch information
e-dard committed Oct 13, 2016
1 parent 3116951 commit 6cab812
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 16 deletions.
38 changes: 29 additions & 9 deletions services/collectd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ type Service struct {

wg sync.WaitGroup
err chan error
stop chan struct{}
conn *net.UDPConn
batcher *tsdb.PointBatcher
typesdb gollectd.Types
addr net.Addr

mu sync.Mutex
done chan struct{}

// expvar-based stats.
stats *Statistics
defaultTags models.StatisticTags
Expand All @@ -78,6 +80,14 @@ func NewService(c Config) *Service {

// Open starts the service.
func (s *Service) Open() error {
s.mu.Lock()
defer s.mu.Unlock()

if s.done != nil {
return nil // Already open.
}
s.done = make(chan struct{})

s.Logger.Printf("Starting collectd service")

if s.Config.BindAddress == "" {
Expand Down Expand Up @@ -171,8 +181,7 @@ func (s *Service) Open() error {
s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration))
s.batcher.Start()

// Create channel and wait group for signalling goroutines to stop.
s.stop = make(chan struct{})
// Create waitgroup for signalling goroutines to stop.
s.wg.Add(2)

// Start goroutines that process collectd packets.
Expand All @@ -184,10 +193,21 @@ func (s *Service) Open() error {

// Close stops the service.
func (s *Service) Close() error {
// Close the connection, and wait for the goroutine to exit.
if s.stop != nil {
close(s.stop)
s.mu.Lock()
defer s.mu.Unlock()

select {
case <-s.done:
// Service is closing...
return nil
default:
if s.done == nil {
return nil // Already closed.
}
}
close(s.done)

// Close the connection, and wait for the goroutine to exit.
if s.conn != nil {
s.conn.Close()
}
Expand All @@ -197,10 +217,10 @@ func (s *Service) Close() error {
s.wg.Wait()

// Release all remaining resources.
s.stop = nil
s.conn = nil
s.batcher = nil
s.Logger.Println("collectd UDP closed")
s.done = nil
return nil
}

Expand Down Expand Up @@ -269,7 +289,7 @@ func (s *Service) serve() {

for {
select {
case <-s.stop:
case <-s.done:
// We closed the connection, time to go.
return
default:
Expand Down Expand Up @@ -310,7 +330,7 @@ func (s *Service) writePoints() {

for {
select {
case <-s.stop:
case <-s.done:
return
case batch := <-s.batcher.Out():
if err := s.PointsWriter.WritePoints(s.Config.Database, s.Config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
Expand Down
16 changes: 15 additions & 1 deletion services/graphite/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ func NewService(c Config) (*Service, error) {
stats: &Statistics{},
defaultTags: models.StatisticTags{"proto": d.Protocol, "bind": d.BindAddress},
tcpConnections: make(map[string]*tcpConnection),
done: make(chan struct{}),
diagsKey: strings.Join([]string{"graphite", d.Protocol, d.BindAddress}, ":"),
}

Expand All @@ -130,6 +129,11 @@ func (s *Service) Open() error {
s.mu.Lock()
defer s.mu.Unlock()

if s.done != nil {
return nil // Already open.
}
s.done = make(chan struct{})

s.logger.Printf("Starting graphite service, batch size %d, batch timeout %s", s.batchSize, s.batchTimeout)

// Register diagnostics if a Monitor service is available.
Expand Down Expand Up @@ -187,6 +191,16 @@ func (s *Service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()

select {
case <-s.done:
// Service is closing...
return nil
default:
if s.done == nil {
return nil // Already closed.
}
}

s.closeAllConnections()

if s.ln != nil {
Expand Down
16 changes: 15 additions & 1 deletion services/opentsdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func NewService(c Config) (*Service, error) {
d := c.WithDefaults()

s := &Service{
done: make(chan struct{}),
tls: d.TLSEnabled,
cert: d.Certificate,
err: make(chan error),
Expand All @@ -106,6 +105,10 @@ func (s *Service) Open() error {
s.mu.Lock()
defer s.mu.Unlock()

if s.done != nil {
return nil // Already open.
}

s.Logger.Println("Starting OpenTSDB service")

if _, err := s.MetaClient.CreateDatabase(s.Database); err != nil {
Expand Down Expand Up @@ -160,6 +163,16 @@ func (s *Service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()

select {
case <-s.done:
// Service is closing...
return nil
default:
if s.done == nil {
return nil // Already closed.
}
}

if s.ln != nil {
return s.ln.Close()
}
Expand All @@ -169,6 +182,7 @@ func (s *Service) Close() error {
}
close(s.done)
s.wg.Wait()
s.done = nil
return nil
}

Expand Down
30 changes: 25 additions & 5 deletions services/udp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Service struct {
conn *net.UDPConn
addr *net.UDPAddr
wg sync.WaitGroup

mu sync.Mutex
done chan struct{}

parserChan chan []byte
Expand All @@ -66,7 +68,6 @@ func NewService(c Config) *Service {
d := *c.WithDefaults()
return &Service{
config: d,
done: make(chan struct{}),
parserChan: make(chan []byte, parserChanLen),
batcher: tsdb.NewPointBatcher(d.BatchSize, d.BatchPending, time.Duration(d.BatchTimeout)),
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
Expand All @@ -77,6 +78,13 @@ func NewService(c Config) *Service {

// Open starts the service
func (s *Service) Open() (err error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.done != nil {
return nil // Already open.
}

if s.config.BindAddress == "" {
return errors.New("bind address has to be specified in config")
}
Expand Down Expand Up @@ -220,13 +228,25 @@ func (s *Service) parser() {

// Close closes the underlying listener.
func (s *Service) Close() error {
if s.conn == nil {
return errors.New("Service already closed")
s.mu.Lock()
defer s.mu.Unlock()

select {
case <-s.done:
// Service is closing...
return nil
default:
if s.done == nil {
return nil // Already closed.
}
}
close(s.done)

if s.conn != nil {
s.conn.Close()
}

s.conn.Close()
s.batcher.Flush()
close(s.done)
s.wg.Wait()

// Release all remaining resources.
Expand Down

0 comments on commit 6cab812

Please sign in to comment.