diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index d3af99731de..e1ace8a5c62 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -46,6 +46,9 @@ const ( // shard groups need to be created in advance for writing DefaultRetentionCreatePeriod = 45 * time.Minute + // DefaultBrokerTruncationInterval is the default period between truncating topics. + DefaultBrokerTruncationInterval = 10 * time.Minute + // DefaultMaxTopicSize is the default maximum size in bytes a topic can consume on disk of a broker. DefaultBrokerMaxTopicSize = 1024 * 1024 * 1024 @@ -99,11 +102,12 @@ var DefaultSnapshotURL = url.URL{ // Broker represents the configuration for a broker node type Broker struct { - Dir string `toml:"dir"` - Enabled bool `toml:"enabled"` - Timeout Duration `toml:"election-timeout"` - MaxTopicSize int64 `toml:"max-topic-size"` - MaxSegmentSize int64 `toml:"max-segment-size"` + Dir string `toml:"dir"` + Enabled bool `toml:"enabled"` + Timeout Duration `toml:"election-timeout"` + TruncationInterval Duration `toml:"truncation-interval"` + MaxTopicSize int64 `toml:"max-topic-size"` + MaxSegmentSize int64 `toml:"max-segment-size"` } // Snapshot represents the configuration for a snapshot service. Snapshot configuration @@ -241,6 +245,7 @@ func NewConfig() *Config { c.ContinuousQuery.ComputeRunsPerInterval = DefaultContinuousQueryComputeRunsPerInterval c.ContinuousQuery.ComputeNoMoreThan = Duration(DefaultContinousQueryComputeNoMoreThan) + c.Broker.TruncationInterval = Duration(DefaultBrokerTruncationInterval) c.Broker.MaxTopicSize = DefaultBrokerMaxTopicSize c.Broker.MaxSegmentSize = DefaultBrokerMaxSegmentSize diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 2dbe5d08db5..5d6b1e7b437 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -476,6 +476,7 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) { // Create broker b := influxdb.NewBroker() + b.TruncationInterval = time.Duration(cmd.config.Broker.TruncationInterval) b.MaxTopicSize = cmd.config.Broker.MaxTopicSize b.MaxSegmentSize = cmd.config.Broker.MaxSegmentSize cmd.node.Broker = b diff --git a/messaging/broker.go b/messaging/broker.go index 59dd46caafb..3188230e0d5 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -24,8 +24,10 @@ import ( // only occurs when the reader is at the end of all the data. const DefaultPollInterval = 100 * time.Millisecond +const DefaultTruncationInterval = 10 * time.Minute + // DefaultMaxTopicSize is the largest a topic can get before truncation. -const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 10MB +const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 1GB // DefaultMaxSegmentSize is the largest a segment can get before starting a new segment. const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB @@ -40,8 +42,13 @@ type Broker struct { meta *bolt.DB // metadata topics map[uint64]*Topic // topics by id - MaxTopicSize int64 // Maximum size of a topic in bytes - MaxSegmentSize int64 // Maximum size of a segment in bytes + TruncationInterval time.Duration + MaxTopicSize int64 // Maximum size of a topic in bytes + MaxSegmentSize int64 // Maximum size of a segment in bytes + + // Goroutinte shutdown + done chan struct{} + wg sync.WaitGroup // Log is the distributed raft log that commands are applied to. Log interface { @@ -59,11 +66,13 @@ type Broker struct { // NewBroker returns a new instance of a Broker with default values. func NewBroker() *Broker { b := &Broker{ - topics: make(map[uint64]*Topic), - Logger: log.New(os.Stderr, "[broker] ", log.LstdFlags), + topics: make(map[uint64]*Topic), + Logger: log.New(os.Stderr, "[broker] ", log.LstdFlags), + TruncationInterval: DefaultTruncationInterval, + MaxTopicSize: DefaultMaxTopicSize, + MaxSegmentSize: DefaultMaxSegmentSize, + done: make(chan struct{}), } - b.MaxTopicSize = DefaultMaxTopicSize - b.MaxSegmentSize = DefaultMaxSegmentSize return b } @@ -176,6 +185,16 @@ func (b *Broker) Open(path string) error { return fmt.Errorf("open topics: %s", err) } + // Start topic truncation. + go func() { + b.wg.Add(1) + tick := time.NewTicker(b.TruncationInterval) + for { + <-tick.C + b.TruncateTopics() + } + }() + return nil }(); err != nil { _ = b.close() @@ -236,6 +255,11 @@ func (b *Broker) close() error { // Close all topics. b.closeTopics() + // Shutdown all goroutines. + close(b.done) + b.wg.Wait() + b.done = nil + return nil } @@ -247,6 +271,19 @@ func (b *Broker) closeTopics() { b.topics = make(map[uint64]*Topic) } +// truncateTopics forces topics to truncate such that they are equal to +// or less than the requested size, if possible. +func (b *Broker) TruncateTopics() error { + for _, t := range b.topics { + if n, err := t.Truncate(b.MaxTopicSize); err != nil { + b.Logger.Printf("error truncating topic %s: %s", t.Path(), err.Error()) + } else if n > 0 { + b.Logger.Printf("topic %s, %d bytes deleted", t.Path(), n) + } + } + return nil +} + // SetMaxIndex sets the highest index applied by the broker. // This is only used for internal log messages. Topics may have a higher index. func (b *Broker) SetMaxIndex(index uint64) error { @@ -829,6 +866,56 @@ func (t *Topic) WriteMessage(m *Message) error { return nil } +// Truncate attempts to delete topic segments such that the total size of the topic on-disk +// is equal to or less-than maxSize. Returns the number of bytes deleted, and error if any. +// This function is not guaranteed to be performant. +func (t *Topic) Truncate(maxSize int64) (int64, error) { + var nBytesDeleted int64 + for { + size, err := t.Size() + if err != nil { + return nBytesDeleted, err + } + if size <= maxSize { + return nBytesDeleted, nil + } + + segments, err := ReadSegments(t.Path()) + if err != nil { + return nBytesDeleted, err + } + if len(segments) < 2 { + // Always leave 1 segment around, for current writes. + return nBytesDeleted, err + } + + if err := os.Remove(segments.First().Path); err != nil { + return nBytesDeleted, err + } + } + + return 0, nil +} + +// Size returns the on-disk size of the topic. This checks the size of each segment and +// is not guaranteed to be performant. +func (t *Topic) Size() (int64, error) { + segments, err := ReadSegments(t.Path()) + if err != nil { + return 0, err + } + + var totalSize int64 + for _, s := range segments { + size, err := s.Size() + if err != nil { + return 0, err + } + totalSize += size + } + return totalSize, nil +} + // Topics represents a list of topics sorted by id. type Topics []*Topic @@ -898,6 +985,15 @@ func (a Segments) Last() *Segment { return a[len(a)-1] } +// First returns the first segment in the slice. +// Returns nil if there are no segments. +func (a Segments) First() *Segment { + if len(a) == 0 { + return nil + } + return a[0] +} + func (a Segments) Len() int { return len(a) } func (a Segments) Less(i, j int) bool { return a[i].Index < a[j].Index } func (a Segments) Swap(i, j int) { a[i], a[j] = a[j], a[i] }