Skip to content

Commit

Permalink
Add broker trunction
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Apr 14, 2015
1 parent dc896ce commit f8a0925
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 12 deletions.
15 changes: 10 additions & 5 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const (
// shard groups need to be created in advance for writing
DefaultRetentionCreatePeriod = 45 * time.Minute

// DefaultBrokerTruncationInterval is the default period between truncating topics.
DefaultBrokerTruncationInterval = 10 * time.Minute

// DefaultMaxTopicSize is the default maximum size in bytes a topic can consume on disk of a broker.
DefaultBrokerMaxTopicSize = 1024 * 1024 * 1024

Expand Down Expand Up @@ -99,11 +102,12 @@ var DefaultSnapshotURL = url.URL{

// Broker represents the configuration for a broker node
type Broker struct {
Dir string `toml:"dir"`
Enabled bool `toml:"enabled"`
Timeout Duration `toml:"election-timeout"`
MaxTopicSize int64 `toml:"max-topic-size"`
MaxSegmentSize int64 `toml:"max-segment-size"`
Dir string `toml:"dir"`
Enabled bool `toml:"enabled"`
Timeout Duration `toml:"election-timeout"`
TruncationInterval Duration `toml:"truncation-interval"`
MaxTopicSize int64 `toml:"max-topic-size"`
MaxSegmentSize int64 `toml:"max-segment-size"`
}

// Snapshot represents the configuration for a snapshot service. Snapshot configuration
Expand Down Expand Up @@ -241,6 +245,7 @@ func NewConfig() *Config {
c.ContinuousQuery.ComputeRunsPerInterval = DefaultContinuousQueryComputeRunsPerInterval
c.ContinuousQuery.ComputeNoMoreThan = Duration(DefaultContinousQueryComputeNoMoreThan)

c.Broker.TruncationInterval = Duration(DefaultBrokerTruncationInterval)
c.Broker.MaxTopicSize = DefaultBrokerMaxTopicSize
c.Broker.MaxSegmentSize = DefaultBrokerMaxSegmentSize

Expand Down
1 change: 1 addition & 0 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {

// Create broker
b := influxdb.NewBroker()
b.TruncationInterval = time.Duration(cmd.config.Broker.TruncationInterval)
b.MaxTopicSize = cmd.config.Broker.MaxTopicSize
b.MaxSegmentSize = cmd.config.Broker.MaxSegmentSize
cmd.node.Broker = b
Expand Down
110 changes: 103 additions & 7 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
// only occurs when the reader is at the end of all the data.
const DefaultPollInterval = 100 * time.Millisecond

const DefaultTruncationInterval = 10 * time.Minute

// DefaultMaxTopicSize is the largest a topic can get before truncation.
const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 10MB
const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 1GB

// DefaultMaxSegmentSize is the largest a segment can get before starting a new segment.
const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB
Expand All @@ -40,8 +42,13 @@ type Broker struct {
meta *bolt.DB // metadata
topics map[uint64]*Topic // topics by id

MaxTopicSize int64 // Maximum size of a topic in bytes
MaxSegmentSize int64 // Maximum size of a segment in bytes
TruncationInterval time.Duration
MaxTopicSize int64 // Maximum size of a topic in bytes
MaxSegmentSize int64 // Maximum size of a segment in bytes

// Goroutinte shutdown
done chan struct{}
wg sync.WaitGroup

// Log is the distributed raft log that commands are applied to.
Log interface {
Expand All @@ -59,11 +66,13 @@ type Broker struct {
// NewBroker returns a new instance of a Broker with default values.
func NewBroker() *Broker {
b := &Broker{
topics: make(map[uint64]*Topic),
Logger: log.New(os.Stderr, "[broker] ", log.LstdFlags),
topics: make(map[uint64]*Topic),
Logger: log.New(os.Stderr, "[broker] ", log.LstdFlags),
TruncationInterval: DefaultTruncationInterval,
MaxTopicSize: DefaultMaxTopicSize,
MaxSegmentSize: DefaultMaxSegmentSize,
done: make(chan struct{}),
}
b.MaxTopicSize = DefaultMaxTopicSize
b.MaxSegmentSize = DefaultMaxSegmentSize
return b
}

Expand Down Expand Up @@ -176,6 +185,16 @@ func (b *Broker) Open(path string) error {
return fmt.Errorf("open topics: %s", err)
}

// Start topic truncation.
go func() {
b.wg.Add(1)
tick := time.NewTicker(b.TruncationInterval)
for {
<-tick.C
b.TruncateTopics()
}
}()

return nil
}(); err != nil {
_ = b.close()
Expand Down Expand Up @@ -236,6 +255,11 @@ func (b *Broker) close() error {
// Close all topics.
b.closeTopics()

// Shutdown all goroutines.
close(b.done)
b.wg.Wait()
b.done = nil

return nil
}

Expand All @@ -247,6 +271,19 @@ func (b *Broker) closeTopics() {
b.topics = make(map[uint64]*Topic)
}

// truncateTopics forces topics to truncate such that they are equal to
// or less than the requested size, if possible.
func (b *Broker) TruncateTopics() error {
for _, t := range b.topics {
if n, err := t.Truncate(b.MaxTopicSize); err != nil {
b.Logger.Printf("error truncating topic %s: %s", t.Path(), err.Error())
} else if n > 0 {
b.Logger.Printf("topic %s, %d bytes deleted", t.Path(), n)
}
}
return nil
}

// SetMaxIndex sets the highest index applied by the broker.
// This is only used for internal log messages. Topics may have a higher index.
func (b *Broker) SetMaxIndex(index uint64) error {
Expand Down Expand Up @@ -829,6 +866,56 @@ func (t *Topic) WriteMessage(m *Message) error {
return nil
}

// Truncate attempts to delete topic segments such that the total size of the topic on-disk
// is equal to or less-than maxSize. Returns the number of bytes deleted, and error if any.
// This function is not guaranteed to be performant.
func (t *Topic) Truncate(maxSize int64) (int64, error) {
var nBytesDeleted int64
for {
size, err := t.Size()
if err != nil {
return nBytesDeleted, err
}
if size <= maxSize {
return nBytesDeleted, nil
}

segments, err := ReadSegments(t.Path())
if err != nil {
return nBytesDeleted, err
}
if len(segments) < 2 {
// Always leave 1 segment around, for current writes.
return nBytesDeleted, err
}

if err := os.Remove(segments.First().Path); err != nil {
return nBytesDeleted, err
}
}

return 0, nil
}

// Size returns the on-disk size of the topic. This checks the size of each segment and
// is not guaranteed to be performant.
func (t *Topic) Size() (int64, error) {
segments, err := ReadSegments(t.Path())
if err != nil {
return 0, err
}

var totalSize int64
for _, s := range segments {
size, err := s.Size()
if err != nil {
return 0, err
}
totalSize += size
}
return totalSize, nil
}

// Topics represents a list of topics sorted by id.
type Topics []*Topic

Expand Down Expand Up @@ -898,6 +985,15 @@ func (a Segments) Last() *Segment {
return a[len(a)-1]
}

// First returns the first segment in the slice.
// Returns nil if there are no segments.
func (a Segments) First() *Segment {
if len(a) == 0 {
return nil
}
return a[0]
}

func (a Segments) Len() int { return len(a) }
func (a Segments) Less(i, j int) bool { return a[i].Index < a[j].Index }
func (a Segments) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
Expand Down

0 comments on commit f8a0925

Please sign in to comment.