Skip to content

Commit

Permalink
Make MaxTopicSize and MaxSegmentSize configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Apr 14, 2015
1 parent 1da5bc3 commit 08cfe20
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
17 changes: 14 additions & 3 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ const (
// shard groups need to be created in advance for writing
DefaultRetentionCreatePeriod = 45 * time.Minute

// DefaultMaxTopicSize is the default maximum size in bytes a topic can consume on disk of a broker.
DefaultBrokerMaxTopicSize = 1024 * 1024 * 1024

// DefaultMaxTopicSize is the default maximum size in bytes a segment can consume on disk of a broker.
DefaultBrokerMaxSegmentSize = 10 * 1024 * 1024

// DefaultGraphiteDatabaseName is the default Graphite database if none is specified
DefaultGraphiteDatabaseName = "graphite"

Expand Down Expand Up @@ -93,9 +99,11 @@ var DefaultSnapshotURL = url.URL{

// Broker represents the configuration for a broker node
type Broker struct {
Dir string `toml:"dir"`
Enabled bool `toml:"enabled"`
Timeout Duration `toml:"election-timeout"`
Dir string `toml:"dir"`
Enabled bool `toml:"enabled"`
Timeout Duration `toml:"election-timeout"`
MaxTopicSize int64 `toml:"max-topic-size"`
MaxSegmentSize int64 `toml:"max-segment-size"`
}

// Snapshot represents the configuration for a snapshot service. Snapshot configuration
Expand Down Expand Up @@ -233,6 +241,9 @@ func NewConfig() *Config {
c.ContinuousQuery.ComputeRunsPerInterval = DefaultContinuousQueryComputeRunsPerInterval
c.ContinuousQuery.ComputeNoMoreThan = Duration(DefaultContinousQueryComputeNoMoreThan)

c.Broker.MaxTopicSize = DefaultBrokerMaxTopicSize
c.Broker.MaxSegmentSize = DefaultBrokerMaxSegmentSize

// Detect hostname (or set to localhost).
if c.Hostname, _ = os.Hostname(); c.Hostname == "" {
c.Hostname = "localhost"
Expand Down
2 changes: 2 additions & 0 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {

// Create broker
b := influxdb.NewBroker()
b.MaxTopicSize = cmd.config.Broker.MaxTopicSize
b.MaxSegmentSize = cmd.config.Broker.MaxSegmentSize
cmd.node.Broker = b

// Create raft log.
Expand Down
17 changes: 14 additions & 3 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import (
// only occurs when the reader is at the end of all the data.
const DefaultPollInterval = 100 * time.Millisecond

// DefaultMaxTopicSize is the largest a topic can get before truncation.
const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 10MB

// DefaultMaxSegmentSize is the largest a segment can get before starting a new segment.
const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB

// Broker represents distributed messaging system segmented into topics.
// Each topic represents a linear series of events.
type Broker struct {
Expand All @@ -34,6 +40,9 @@ type Broker struct {
meta *bolt.DB // metadata
topics map[uint64]*Topic // topics by id

MaxTopicSize int64 // Maximum size of a topic in bytes
MaxSegmentSize int64 // Maximum size of a segment in bytes

// Log is the distributed raft log that commands are applied to.
Log interface {
URL() url.URL
Expand All @@ -53,6 +62,8 @@ func NewBroker() *Broker {
topics: make(map[uint64]*Topic),
Logger: log.New(os.Stderr, "[broker] ", log.LstdFlags),
}
b.MaxTopicSize = DefaultMaxTopicSize
b.MaxSegmentSize = DefaultMaxSegmentSize
return b
}

Expand Down Expand Up @@ -189,6 +200,7 @@ func (b *Broker) openTopics() error {
return fmt.Errorf("open topic: id=%d, err=%s", t.id, err)
}
b.topics[t.id] = t
b.topics[t.id].MaxSegmentSize = b.MaxSegmentSize
}

// Retrieve the highest index across all topics.
Expand Down Expand Up @@ -386,6 +398,7 @@ func (b *Broker) ReadFrom(r io.Reader) (int64, error) {
// Copy topic files from snapshot to local disk.
for _, st := range sh.Topics {
t := NewTopic(st.ID, b.topicPath(st.ID))
t.MaxSegmentSize = b.MaxSegmentSize

// Create topic directory.
if err := os.MkdirAll(t.Path(), 0777); err != nil {
Expand Down Expand Up @@ -545,6 +558,7 @@ func (b *Broker) Apply(m *Message) error {
t := b.topics[m.TopicID]
if t == nil {
t = NewTopic(m.TopicID, b.topicPath(m.TopicID))
t.MaxSegmentSize = b.MaxSegmentSize
if err := t.Open(); err != nil {
return fmt.Errorf("open topic: %s", err)
}
Expand Down Expand Up @@ -625,9 +639,6 @@ func (fsm *RaftFSM) Apply(e *raft.LogEntry) error {
return nil
}

// DefaultMaxSegmentSize is the largest a segment can get before starting a new segment.
const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB

// topic represents a single named queue of messages.
// Each topic is identified by a unique path.
//
Expand Down

0 comments on commit 08cfe20

Please sign in to comment.