Skip to content

Commit

Permalink
Merge pull request #2175 from influxdb/data-broker-1934
Browse files Browse the repository at this point in the history
Separate broker and data nodes
  • Loading branch information
pauldix committed Apr 7, 2015
2 parents 48b3986 + fd11797 commit a72707b
Show file tree
Hide file tree
Showing 16 changed files with 966 additions and 615 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
- [#870](https://github.com/influxdb/influxdb/pull/870): Add support for OpenTSDB telnet input protocol. Thanks @tcolgate
- [#2175](https://github.com/influxdb/influxdb/pull/2175): Separate broker and data nodes

## v0.9.0-rc20 [2015-04-04]

Expand Down
234 changes: 133 additions & 101 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,15 @@ const (
// DefaultAPIReadTimeout represents the duration before an API request times out.
DefaultAPIReadTimeout = 5 * time.Second

// DefaultBrokerPort represents the default port the broker runs on.
DefaultBrokerPort = 8086

// DefaultDataPort represents the default port the data server runs on.
DefaultDataPort = 8086
// DefaultClusterPort represents the default port the cluster runs ons.
DefaultClusterPort = 8086

// DefaultSnapshotBindAddress is the default bind address to serve snapshots from.
DefaultSnapshotBindAddress = "127.0.0.1"

// DefaultSnapshotPort is the default port to serve snapshots from.
DefaultSnapshotPort = 8087

// DefaultJoinURLs represents the default URLs for joining a cluster.
DefaultJoinURLs = ""

// DefaultRetentionCreatePeriod represents how often the server will check to see if new
// shard groups need to be created in advance for writing
DefaultRetentionCreatePeriod = 45 * time.Minute
Expand All @@ -57,25 +51,80 @@ const (

// DefaultOpenTSDBDatabaseName is the default OpenTSDB database if none is specified
DefaultOpenTSDBDatabaseName = "opentsdb"

// DefaultRetentionAutoCreate is the default for auto-creating retention policies
DefaultRetentionAutoCreate = true

// DefaultRetentionCheckEnabled is the default for checking for retention policy enforcement
DefaultRetentionCheckEnabled = true

// DefaultRetentionCheckPeriod is the period of time between retention policy checks are run
DefaultRetentionCheckPeriod = 10 * time.Minute

// DefaultRecomputePreviousN is ???
DefaultContinuousQueryRecomputePreviousN = 2

// DefaultContinuousQueryRecomputeNoOlderThan is ???
DefaultContinuousQueryRecomputeNoOlderThan = 10 * time.Minute

// DefaultContinuousQueryComputeRunsPerInterval is ???
DefaultContinuousQueryComputeRunsPerInterval = 10

// DefaultContinousQueryComputeNoMoreThan is ???
DefaultContinousQueryComputeNoMoreThan = 2 * time.Minute

// DefaultStatisticsEnabled is the default setting for whether internal statistics are collected
DefaultStatisticsEnabled = false

// DefaultStatisticsDatabase is the default database internal statistics are written
DefaultStatisticsDatabase = "_internal"

// DefaultStatisticsRetentionPolicy is he default internal statistics rentention policy name
DefaultStatisticsRetentionPolicy = "default"

// DefaultStatisticsWriteInterval is the interval of time between internal stats are written
DefaultStatisticsWriteInterval = 1 * time.Minute
)

var DefaultSnapshotURL = url.URL{
Scheme: "http",
Host: net.JoinHostPort(DefaultSnapshotBindAddress, strconv.Itoa(DefaultSnapshotPort)),
}

// Broker represents the configuration for a broker node
type Broker struct {
Dir string `toml:"dir"`
Enabled bool `toml:"enabled"`
Timeout Duration `toml:"election-timeout"`
}

// Snapshot represents the configuration for a snapshot service. Snapshot configuration
// is only valid for data nodes.
type Snapshot struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Port int `toml:"port"`
}

// Data represents the configuration for a data node
type Data struct {
Dir string `toml:"dir"`
Enabled bool `toml:"enabled"`
RetentionAutoCreate bool `toml:"retention-auto-create"`
RetentionCheckEnabled bool `toml:"retention-check-enabled"`
RetentionCheckPeriod Duration `toml:"retention-check-period"`
RetentionCreatePeriod Duration `toml:"retention-create-period"`
}

// Config represents the configuration format for the influxd binary.
type Config struct {
Hostname string `toml:"hostname"`
BindAddress string `toml:"bind-address"`
Port int `toml:"port"`
ReportingDisabled bool `toml:"reporting-disabled"`
Version string `toml:"-"`
InfluxDBVersion string `toml:"-"`

Initialization struct {
JoinURLs string `toml:"join-urls"`
} `toml:"initialization"`

Authentication struct {
Enabled bool `toml:"enabled"`
} `toml:"authentication"`
Expand All @@ -86,6 +135,7 @@ type Config struct {
} `toml:"admin"`

HTTPAPI struct {
BindAddress string `toml:"bind-address"`
Port int `toml:"port"`
SSLPort int `toml:"ssl-port"`
SSLCertPath string `toml:"ssl-cert"`
Expand All @@ -102,29 +152,11 @@ type Config struct {
Port int `toml:"port"`
} `toml:"udp"`

Broker struct {
Port int `toml:"port"`
Dir string `toml:"dir"`
Timeout Duration `toml:"election-timeout"`
} `toml:"broker"`

Data struct {
Dir string `toml:"dir"`
Port int `toml:"port"`
RetentionAutoCreate bool `toml:"retention-auto-create"`
RetentionCheckEnabled bool `toml:"retention-check-enabled"`
RetentionCheckPeriod Duration `toml:"retention-check-period"`
RetentionCreatePeriod Duration `toml:"retention-create-period"`
} `toml:"data"`

Snapshot struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Port int `toml:"port"`
}
Cluster struct {
Dir string `toml:"dir"`
} `toml:"cluster"`
Broker Broker `toml:"broker"`

Data Data `toml:"data"`

Snapshot Snapshot `toml:"snapshot"`

Logging struct {
WriteTracing bool `toml:"write-tracing"`
Expand Down Expand Up @@ -165,41 +197,31 @@ type Config struct {
ComputeNoMoreThan Duration `toml:"compute-no-more-than"`

// If this flag is set to true, both the brokers and data nodes should ignore any CQ processing.
Disable bool `toml:"disable"`
Disabled bool `toml:"disabled"`
} `toml:"continuous_queries"`
}

// NewConfig returns an instance of Config with reasonable defaults.
func NewConfig() (*Config, error) {
u, err := user.Current()
if err != nil {
return nil, fmt.Errorf("failed to determine current user for storage")
}

func NewConfig() *Config {
c := &Config{}
c.Broker.Dir = filepath.Join(u.HomeDir, ".influxdb/broker")
c.Broker.Port = DefaultBrokerPort
c.Broker.Timeout = Duration(1 * time.Second)
c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data")
c.Data.Port = DefaultDataPort
c.Data.RetentionAutoCreate = true
c.Data.RetentionCheckEnabled = true
c.Data.RetentionCheckPeriod = Duration(10 * time.Minute)
c.Port = DefaultClusterPort

c.HTTPAPI.Port = DefaultClusterPort

c.Data.RetentionAutoCreate = DefaultRetentionAutoCreate
c.Data.RetentionCheckEnabled = DefaultRetentionCheckEnabled
c.Data.RetentionCheckPeriod = Duration(DefaultRetentionCheckPeriod)
c.Data.RetentionCreatePeriod = Duration(DefaultRetentionCreatePeriod)
c.Snapshot.Enabled = true

c.Snapshot.BindAddress = DefaultSnapshotBindAddress
c.Snapshot.Port = DefaultSnapshotPort
c.Admin.Enabled = true
c.Admin.Port = 8083
c.ContinuousQuery.RecomputePreviousN = 2
c.ContinuousQuery.RecomputeNoOlderThan = Duration(10 * time.Minute)
c.ContinuousQuery.ComputeRunsPerInterval = 10
c.ContinuousQuery.ComputeNoMoreThan = Duration(2 * time.Minute)
c.ContinuousQuery.Disable = false
c.ReportingDisabled = false

c.Monitoring.Enabled = false
c.Monitoring.WriteInterval = Duration(1 * time.Minute)
c.Monitoring.WriteInterval = Duration(DefaultStatisticsWriteInterval)
c.ContinuousQuery.RecomputePreviousN = DefaultContinuousQueryRecomputePreviousN
c.ContinuousQuery.RecomputeNoOlderThan = Duration(DefaultContinuousQueryRecomputeNoOlderThan)
c.ContinuousQuery.ComputeRunsPerInterval = DefaultContinuousQueryComputeRunsPerInterval
c.ContinuousQuery.ComputeNoMoreThan = Duration(DefaultContinousQueryComputeNoMoreThan)

// Detect hostname (or set to localhost).
if c.Hostname, _ = os.Hostname(); c.Hostname == "" {
Expand All @@ -213,42 +235,64 @@ func NewConfig() (*Config, error) {
// Port: tomlConfiguration.InputPlugins.UDPInput.Port,
// })

return c, nil
return c
}

// DataAddr returns the TCP binding address for the data server.
func (c *Config) DataAddr() string {
return net.JoinHostPort(c.BindAddress, strconv.Itoa(c.Data.Port))
}
// NewTestConfig returns an instance of Config with reasonable defaults suitable
// for testing a local server w/ broker and data nodes active
func NewTestConfig() (*Config, error) {
c := NewConfig()

// DataAddrUDP returns the UDP address for the series listener.
func (c *Config) DataAddrUDP() string {
return net.JoinHostPort(c.UDP.BindAddress, strconv.Itoa(c.UDP.Port))
// By default, store broker and data files in current users home directory
u, err := user.Current()
if err != nil {
return nil, fmt.Errorf("failed to determine current user for storage")
}

c.Broker.Enabled = true
c.Broker.Dir = filepath.Join(u.HomeDir, ".influxdb/broker")

c.Data.Enabled = true
c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data")

c.Admin.Enabled = true
c.Admin.Port = 8083

c.Monitoring.Enabled = false
c.Snapshot.Enabled = true

return c, nil
}

// DataURL returns the URL required to contact the data server.
func (c *Config) DataURL() url.URL {
return url.URL{
Scheme: "http",
Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Data.Port)),
// APIAddr returns the TCP binding address for the API server.
func (c *Config) APIAddr() string {
ba := c.BindAddress
if c.HTTPAPI.BindAddress != "" {
ba = c.HTTPAPI.BindAddress
}
return net.JoinHostPort(ba, strconv.Itoa(c.HTTPAPI.Port))
}

// APIAddrUDP returns the UDP address for the series listener.
func (c *Config) APIAddrUDP() string {
return net.JoinHostPort(c.UDP.BindAddress, strconv.Itoa(c.UDP.Port))
}

// SnapshotAddr returns the TCP binding address for the snapshot handler.
func (c *Config) SnapshotAddr() string {
return net.JoinHostPort(c.Snapshot.BindAddress, strconv.Itoa(c.Snapshot.Port))
}

// BrokerAddr returns the binding address the Broker server
func (c *Config) BrokerAddr() string {
return fmt.Sprintf("%s:%d", c.BindAddress, c.Broker.Port)
// ClusterAddr returns the binding address for the cluster
func (c *Config) ClusterAddr() string {
return net.JoinHostPort(c.BindAddress, strconv.Itoa(c.Port))
}

// BrokerURL returns the URL required to contact the Broker server.
func (c *Config) BrokerURL() url.URL {
// ClusterURL returns the URL required to contact the server cluster endpoints.
func (c *Config) ClusterURL() url.URL {
return url.URL{
Scheme: "http",
Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Broker.Port)),
Host: net.JoinHostPort(c.Hostname, strconv.Itoa(c.Port)),
}
}

Expand All @@ -270,14 +314,6 @@ func (c *Config) DataDir() string {
return p
}

func (c *Config) JoinURLs() string {
if c.Initialization.JoinURLs == "" {
return DefaultJoinURLs
} else {
return c.Initialization.JoinURLs
}
}

// ShardGroupPreCreateCheckPeriod returns the check interval to pre-create shard groups.
// If it was not defined in the config, it defaults to DefaultShardGroupPreCreatePeriod
func (c *Config) ShardGroupPreCreateCheckPeriod() time.Duration {
Expand Down Expand Up @@ -356,10 +392,8 @@ func (d Duration) MarshalText() (text []byte, err error) {

// ParseConfigFile parses a configuration file at a given path.
func ParseConfigFile(path string) (*Config, error) {
c, err := NewConfig()
if err != nil {
return nil, err
}
c := NewConfig()

if _, err := toml.DecodeFile(path, &c); err != nil {
return nil, err
}
Expand All @@ -368,19 +402,17 @@ func ParseConfigFile(path string) (*Config, error) {

// ParseConfig parses a configuration string into a config object.
func ParseConfig(s string) (*Config, error) {
c, err := NewConfig()
if err != nil {
return nil, err
}
c := NewConfig()

if _, err := toml.Decode(s, &c); err != nil {
return nil, err
}
return c, nil
}

type Collectd struct {
Addr string `toml:"address"`
Port uint16 `toml:"port"`
BindAddress string `toml:"bind-address"`
Port uint16 `toml:"port"`

Database string `toml:"database"`
Enabled bool `toml:"enabled"`
Expand All @@ -389,7 +421,7 @@ type Collectd struct {

// ConnnectionString returns the connection string for this collectd config in the form host:port.
func (c *Collectd) ConnectionString(defaultBindAddr string) string {
addr := c.Addr
addr := c.BindAddress
// If no address specified, use default.
if addr == "" {
addr = defaultBindAddr
Expand All @@ -405,8 +437,8 @@ func (c *Collectd) ConnectionString(defaultBindAddr string) string {
}

type Graphite struct {
Addr string `toml:"address"`
Port uint16 `toml:"port"`
BindAddress string `toml:"bind-address"`
Port uint16 `toml:"port"`

Database string `toml:"database"`
Enabled bool `toml:"enabled"`
Expand All @@ -418,7 +450,7 @@ type Graphite struct {
// ConnnectionString returns the connection string for this Graphite config in the form host:port.
func (g *Graphite) ConnectionString(defaultBindAddr string) string {

addr := g.Addr
addr := g.BindAddress
// If no address specified, use default.
if addr == "" {
addr = defaultBindAddr
Expand Down
Loading

0 comments on commit a72707b

Please sign in to comment.