Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure input services can be safely opened and closed #7463

Merged
merged 2 commits into from
Oct 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- [#7470](https://github.com/influxdata/influxdb/pull/7470): Reduce map allocations when computing the TagSet of a measurement.
- [#6894](https://github.com/influxdata/influxdb/issues/6894): Support `INFLUX_USERNAME` and `INFLUX_PASSWORD` for setting username/password in the CLI.
- [#6896](https://github.com/influxdata/influxdb/issues/6896): Correctly read in input from a non-interactive stream for the CLI.
- [#7463](https://github.com/influxdata/influxdb/pull/7463): Make input plugin services open/close idempotent.

### Bugfixes

Expand Down
162 changes: 162 additions & 0 deletions internal/meta_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package internal

import (
"time"

"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
)

// MetaClientMock is a mockable implementation of meta.MetaClient.
type MetaClientMock struct {
CloseFn func() error
CreateContinuousQueryFn func(database, name, query string) error
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicyFn func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
CreateShardGroupFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error
CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error)

DatabaseFn func(name string) *meta.DatabaseInfo
DatabasesFn func() ([]meta.DatabaseInfo, error)

DataFn func() meta.Data
DeleteShardGroupFn func(database string, policy string, id uint64) error
DropContinuousQueryFn func(database, name string) error
DropDatabaseFn func(name string) error
DropRetentionPolicyFn func(database, name string) error
DropSubscriptionFn func(database, rp, name string) error
DropShardFn func(id uint64) error
DropUserFn func(name string) error

OpenFn func() error

RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error)

SetAdminPrivilegeFn func(username string, admin bool) error
SetDataFn func(*meta.Data) error
SetDefaultRetentionPolicyFn func(database, name string) error
SetPrivilegeFn func(username, database string, p influxql.Privilege) error
ShardsByTimeRangeFn func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error)
ShardOwnerFn func(shardID uint64) (database, policy string, sgi *meta.ShardGroupInfo)
UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) error
UpdateUserFn func(name, password string) error
UserPrivilegeFn func(username, database string) (*influxql.Privilege, error)
UserPrivilegesFn func(username string) (map[string]influxql.Privilege, error)
UsersFn func() []meta.UserInfo
}

func (c *MetaClientMock) Close() error {
return c.CloseFn()
}

func (c *MetaClientMock) CreateContinuousQuery(database, name, query string) error {
return c.CreateContinuousQueryFn(database, name, query)
}

func (c *MetaClientMock) CreateDatabase(name string) (*meta.DatabaseInfo, error) {
return c.CreateDatabaseFn(name)
}

func (c *MetaClientMock) CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) {
return c.CreateDatabaseWithRetentionPolicyFn(name, spec)
}

func (c *MetaClientMock) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) {
return c.CreateRetentionPolicyFn(database, spec)
}

func (c *MetaClientMock) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
return c.CreateShardGroupFn(database, policy, timestamp)
}

func (c *MetaClientMock) CreateSubscription(database, rp, name, mode string, destinations []string) error {
return c.CreateSubscriptionFn(database, rp, name, mode, destinations)
}

func (c *MetaClientMock) CreateUser(name, password string, admin bool) (*meta.UserInfo, error) {
return c.CreateUserFn(name, password, admin)
}

func (c *MetaClientMock) Database(name string) *meta.DatabaseInfo {
return c.DatabaseFn(name)
}

func (c *MetaClientMock) Databases() ([]meta.DatabaseInfo, error) {
return c.DatabasesFn()
}

func (c *MetaClientMock) DeleteShardGroup(database string, policy string, id uint64) error {
return c.DeleteShardGroup(database, policy, id)
}

func (c *MetaClientMock) DropContinuousQuery(database, name string) error {
return c.DropContinuousQueryFn(database, name)
}

func (c *MetaClientMock) DropDatabase(name string) error {
return c.DropDatabaseFn(name)
}

func (c *MetaClientMock) DropRetentionPolicy(database, name string) error {
return c.DropRetentionPolicyFn(database, name)
}

func (c *MetaClientMock) DropShard(id uint64) error {
return c.DropShardFn(id)
}

func (c *MetaClientMock) DropSubscription(database, rp, name string) error {
return c.DropSubscriptionFn(database, rp, name)
}

func (c *MetaClientMock) DropUser(name string) error {
return c.DropUserFn(name)
}

func (c *MetaClientMock) RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error) {
return c.RetentionPolicyFn(database, name)
}

func (c *MetaClientMock) SetAdminPrivilege(username string, admin bool) error {
return c.SetAdminPrivilegeFn(username, admin)
}

func (c *MetaClientMock) SetDefaultRetentionPolicy(database, name string) error {
return c.SetDefaultRetentionPolicyFn(database, name)
}

func (c *MetaClientMock) SetPrivilege(username, database string, p influxql.Privilege) error {
return c.SetPrivilegeFn(username, database, p)
}

func (c *MetaClientMock) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) {
return c.ShardsByTimeRangeFn(sources, tmin, tmax)
}

func (c *MetaClientMock) ShardOwner(shardID uint64) (database, policy string, sgi *meta.ShardGroupInfo) {
return c.ShardOwnerFn(shardID)
}

func (c *MetaClientMock) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate) error {
return c.UpdateRetentionPolicyFn(database, name, rpu)
}

func (c *MetaClientMock) UpdateUser(name, password string) error {
return c.UpdateUserFn(name, password)
}

func (c *MetaClientMock) UserPrivilege(username, database string) (*influxql.Privilege, error) {
return c.UserPrivilegeFn(username, database)
}

func (c *MetaClientMock) UserPrivileges(username string) (map[string]influxql.Privilege, error) {
return c.UserPrivilegesFn(username)
}

func (c *MetaClientMock) Users() []meta.UserInfo { return c.UsersFn() }

func (c *MetaClientMock) Open() error { return c.OpenFn() }
func (c *MetaClientMock) Data() meta.Data { return c.DataFn() }
func (c *MetaClientMock) SetData(d *meta.Data) error { return c.SetDataFn(d) }
55 changes: 40 additions & 15 deletions services/collectd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,14 @@ type Service struct {
Logger *log.Logger

wg sync.WaitGroup
err chan error
stop chan struct{}
conn *net.UDPConn
batcher *tsdb.PointBatcher
typesdb gollectd.Types
addr net.Addr

mu sync.Mutex
done chan struct{}

// expvar-based stats.
stats *Statistics
defaultTags models.StatisticTags
Expand All @@ -68,7 +69,6 @@ func NewService(c Config) *Service {
Config: c.WithDefaults(),

Logger: log.New(os.Stderr, "[collectd] ", log.LstdFlags),
err: make(chan error),
stats: &Statistics{},
defaultTags: models.StatisticTags{"bind": c.BindAddress},
}
Expand All @@ -78,6 +78,14 @@ func NewService(c Config) *Service {

// Open starts the service.
func (s *Service) Open() error {
s.mu.Lock()
defer s.mu.Unlock()

if !s.closed() {
return nil // Already open.
}
s.done = make(chan struct{})

s.Logger.Printf("Starting collectd service")

if s.Config.BindAddress == "" {
Expand Down Expand Up @@ -142,7 +150,6 @@ func (s *Service) Open() error {
s.typesdb = typesdb
}
}

// Resolve our address.
addr, err := net.ResolveUDPAddr("udp", s.Config.BindAddress)
if err != nil {
Expand Down Expand Up @@ -171,8 +178,7 @@ func (s *Service) Open() error {
s.batcher = tsdb.NewPointBatcher(s.Config.BatchSize, s.Config.BatchPending, time.Duration(s.Config.BatchDuration))
s.batcher.Start()

// Create channel and wait group for signalling goroutines to stop.
s.stop = make(chan struct{})
// Create waitgroup for signalling goroutines to stop.
s.wg.Add(2)

// Start goroutines that process collectd packets.
Expand All @@ -184,10 +190,15 @@ func (s *Service) Open() error {

// Close stops the service.
func (s *Service) Close() error {
// Close the connection, and wait for the goroutine to exit.
if s.stop != nil {
close(s.stop)
s.mu.Lock()
defer s.mu.Unlock()

if s.closed() {
return nil // Already closed.
}
close(s.done)

// Close the connection, and wait for the goroutine to exit.
if s.conn != nil {
s.conn.Close()
}
Expand All @@ -197,13 +208,30 @@ func (s *Service) Close() error {
s.wg.Wait()

// Release all remaining resources.
s.stop = nil
s.conn = nil
s.batcher = nil
s.Logger.Println("collectd UDP closed")
s.done = nil
return nil
}

// Closed returns true if the service is currently closed.
func (s *Service) Closed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed()
}

func (s *Service) closed() bool {
select {
case <-s.done:
// Service is closing.
return true
default:
}
return s.done == nil
}

// SetLogOutput sets the writer to which all logs are written. It must not be
// called after Open is called.
func (s *Service) SetLogOutput(w io.Writer) {
Expand Down Expand Up @@ -246,9 +274,6 @@ func (s *Service) SetTypes(types string) (err error) {
return
}

// Err returns a channel for fatal errors that occur on go routines.
func (s *Service) Err() chan error { return s.err }

// Addr returns the listener's address. Returns nil if listener is closed.
func (s *Service) Addr() net.Addr {
return s.conn.LocalAddr()
Expand All @@ -269,7 +294,7 @@ func (s *Service) serve() {

for {
select {
case <-s.stop:
case <-s.done:
// We closed the connection, time to go.
return
default:
Expand Down Expand Up @@ -310,7 +335,7 @@ func (s *Service) writePoints() {

for {
select {
case <-s.stop:
case <-s.done:
return
case batch := <-s.batcher.Out():
if err := s.PointsWriter.WritePoints(s.Config.Database, s.Config.RetentionPolicy, models.ConsistencyLevelAny, batch); err == nil {
Expand Down
Loading