Skip to content

Commit

Permalink
Merge pull request #1562 from influxdb/enforce_retention_policies
Browse files Browse the repository at this point in the history
Enforce retention policies
  • Loading branch information
otoolep committed Feb 11, 2015
2 parents e9ae082 + 3b64f34 commit af41030
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 42 deletions.
41 changes: 6 additions & 35 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,10 @@ type Config struct {
} `toml:"broker"`

Data struct {
Dir string `toml:"dir"`
Port int `toml:"port"`
WriteBufferSize int `toml:"write-buffer-size"`
MaxOpenShards int `toml:"max-open-shards"`
PointBatchSize int `toml:"point-batch-size"`
WriteBatchSize int `toml:"write-batch-size"`
Engines map[string]toml.Primitive `toml:"engines"`
RetentionSweepPeriod Duration `toml:"retention-sweep-period"`
Dir string `toml:"dir"`
Port int `toml:"port"`
RetentionCheckEnabled bool `toml:"retention-check-enabled"`
RetentionCheckPeriod Duration `toml:"retention-check-period"`
} `toml:"data"`

Cluster struct {
Expand All @@ -115,13 +111,13 @@ func NewConfig() *Config {
u, _ := user.Current()

c := &Config{}
c.Data.RetentionSweepPeriod = Duration(10 * time.Minute)
c.Broker.Dir = filepath.Join(u.HomeDir, ".influxdb/broker")
c.Broker.Port = DefaultBrokerPort
c.Broker.Timeout = Duration(1 * time.Second)
c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data")
c.Data.Port = DefaultDataPort
c.Data.WriteBufferSize = 1000
c.Data.RetentionCheckEnabled = true
c.Data.RetentionCheckPeriod = Duration(10 * time.Minute)

// Detect hostname (or set to localhost).
if c.Hostname, _ = os.Hostname(); c.Hostname == "" {
Expand All @@ -138,31 +134,6 @@ func NewConfig() *Config {
return c
}

// PointBatchSize returns the data point batch size, if set.
// If not set, the LevelDB point batch size is returned.
// If that is not set then the default point batch size is returned.
func (c *Config) PointBatchSize() int {
if c.Data.PointBatchSize != 0 {
return c.Data.PointBatchSize
}
return DefaultPointBatchSize
}

// WriteBatchSize returns the data write batch size, if set.
// If not set, the LevelDB write batch size is returned.
// If that is not set then the default write batch size is returned.
func (c *Config) WriteBatchSize() int {
if c.Data.WriteBatchSize != 0 {
return c.Data.WriteBatchSize
}
return DefaultWriteBatchSize
}

// MaxOpenShards returns the maximum number of shards to keep open at once.
func (c *Config) MaxOpenShards() int {
return c.Data.MaxOpenShards
}

// DataAddr returns the binding address the data server
func (c *Config) DataAddr() string {
return net.JoinHostPort(c.BindAddress, strconv.Itoa(c.Data.Port))
Expand Down
8 changes: 8 additions & 0 deletions cmd/influxd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ func TestParseConfig(t *testing.T) {
if c.Data.Dir != "/tmp/influxdb/development/db" {
t.Fatalf("data dir mismatch: %v", c.Data.Dir)
}
if c.Data.RetentionCheckEnabled != true {
t.Fatalf("Retention check enabled mismatch: %v", c.Data.RetentionCheckEnabled)
}
if c.Data.RetentionCheckPeriod != main.Duration(5*time.Minute) {
t.Fatalf("Retention check period mismatch: %v", c.Data.RetentionCheckPeriod)
}

if c.Cluster.Dir != "/tmp/influxdb/development/cluster" {
t.Fatalf("cluster dir mismatch: %v", c.Cluster.Dir)
Expand Down Expand Up @@ -217,6 +223,8 @@ dir = "/tmp/influxdb/development/broker"
[data]
dir = "/tmp/influxdb/development/db"
retention-check-enabled = true
retention-check-period = "5m"
[cluster]
dir = "/tmp/influxdb/development/cluster"
Expand Down
10 changes: 10 additions & 0 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/admin"
Expand Down Expand Up @@ -56,6 +57,15 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
s := openServer(config.DataDir(), config.DataURL(), b, initializing, configExists, joinURLs, logWriter)
s.SetAuthenticationEnabled(config.Authentication.Enabled)

// Enable retention policy enforcement if requested.
if config.Data.RetentionCheckEnabled {
interval := time.Duration(config.Data.RetentionCheckPeriod)
if err := s.StartRetentionPolicyEnforcement(interval); err != nil {
log.Fatalf("retention policy enforcement failed: %s", err.Error())
}
log.Printf("broker enforcing retention policies with check interval of %s", interval)
}

// Start the server handler. Attach to broker if listening on the same port.
if s != nil {
sh := httpd.NewHandler(s, config.Authentication.Enabled, version)
Expand Down
20 changes: 20 additions & 0 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,26 @@ func (rp *RetentionPolicy) shardGroupByTimestamp(timestamp time.Time) *ShardGrou
return nil
}

// shardGroupByID returns the group in the policy for the given ID.
// Returns nil if group does not exist.
func (rp *RetentionPolicy) shardGroupByID(shardID uint64) *ShardGroup {
for _, g := range rp.shardGroups {
if g.ID == shardID {
return g
}
}
return nil
}

func (rp *RetentionPolicy) removeShardGroupByID(shardID uint64) {
for i, g := range rp.shardGroups {
if g.ID == shardID {
rp.shardGroups[i] = nil
rp.shardGroups = append(rp.shardGroups[:i], rp.shardGroups[i+1:]...)
}
}
}

// MarshalJSON encodes a retention policy to a JSON-encoded byte slice.
func (rp *RetentionPolicy) MarshalJSON() ([]byte, error) {
var o retentionPolicyJSON
Expand Down
9 changes: 7 additions & 2 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,13 @@ port = 8086
# Data node configuration. Data nodes are where the time-series data, in the form of
# shards, is stored.
[data]
dir = "/tmp/influxdb/development/db"
port = 8086
dir = "/tmp/influxdb/development/db"
port = 8086

# Control whether retention policies are enforced and how long the system waits between
# enforcing those policies.
retention-check-enabled = true
retention-check-period = "10m"

[cluster]
# Location for cluster state storage. For storing state persistently across restarts.
Expand Down
120 changes: 116 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ const (

// Shard messages
createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40)
deleteShardGroupMessageType = messaging.MessageType(0x41)

// Series messages
createSeriesIfNotExistsMessageType = messaging.MessageType(0x50)
Expand All @@ -80,10 +81,11 @@ const (

// Server represents a collection of metadata and raw metric data.
type Server struct {
mu sync.RWMutex
id uint64
path string
done chan struct{} // goroutine close notification
mu sync.RWMutex
id uint64
path string
done chan struct{} // goroutine close notification
rpDone chan struct{} // retention policies goroutine close notification

client MessagingClient // broker client
index uint64 // highest broadcast index seen
Expand Down Expand Up @@ -220,6 +222,10 @@ func (s *Server) Close() error {
return ErrServerClosed
}

if s.rpDone != nil {
close(s.rpDone)
}

// Remove path.
s.path = ""

Expand Down Expand Up @@ -288,6 +294,47 @@ func (s *Server) load() error {
})
}

// StartRetentionPolicyEnforcement launches retention policy enforcement.
func (s *Server) StartRetentionPolicyEnforcement(checkInterval time.Duration) error {
if checkInterval == 0 {
return fmt.Errorf("retention policy check interval must be non-zero")
}
rpDone := make(chan struct{}, 0)
s.rpDone = rpDone
go func() {
for {
select {
case <-rpDone:
return
case <-time.After(checkInterval):
s.EnforceRetentionPolicies()
}
}
}()
return nil
}

// EnforceRetentionPolicies ensures that data that is aging-out due to retention policies
// is removed from the server.
func (s *Server) EnforceRetentionPolicies() {
log.Println("retention policy enforcement check commencing")

// Check all shard groups.
for _, db := range s.databases {
for _, rp := range db.policies {
for _, g := range rp.shardGroups {
if g.EndTime.Add(rp.Duration).Before(time.Now()) {
log.Printf("shard group %d, retention policy %s, database %s due for deletion",
g.ID, rp.Name, db.name)
if err := s.DeleteShardGroup(db.name, rp.Name, g.ID); err != nil {
log.Printf("failed to request deletion of shard group %d: %s", g.ID, err.Error())
}
}
}
}
}
}

// Client retrieves the current messaging client.
func (s *Server) Client() MessagingClient {
s.mu.RLock()
Expand Down Expand Up @@ -890,6 +937,69 @@ type createShardGroupIfNotExistsCommand struct {
Timestamp time.Time `json:"timestamp"`
}

// DeleteShardGroup deletes the shard group identified by shardID.
func (s *Server) DeleteShardGroup(database, policy string, shardID uint64) error {
c := &deleteShardGroupCommand{Database: database, Policy: policy, ID: shardID}
_, err := s.broadcast(deleteShardGroupMessageType, c)
return err
}

// applyDeleteShardGroup deletes shard data from disk and updates the metastore.
func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) {
var c deleteShardGroupCommand
mustUnmarshalJSON(m.Data, &c)

s.mu.Lock()
defer s.mu.Unlock()

// Retrieve database.
db := s.databases[c.Database]
if s.databases[c.Database] == nil {
return ErrDatabaseNotFound
}

// Validate retention policy.
rp := db.policies[c.Policy]
if rp == nil {
return ErrRetentionPolicyNotFound
}

// If shard group no longer exists, then ignore request. This can occur if multiple
// data nodes triggered the deletion.
g := rp.shardGroupByID(c.ID)
if g == nil {
return nil
}

for _, shard := range g.Shards {
// Ignore shards not on this server.
if !shard.HasDataNodeID(s.id) {
continue
}

path := shard.store.Path()
shard.close()
if err := os.Remove(path); err != nil {
// Log, but keep going. This can happen if shards were deleted, but the server exited
// before it acknowledged the delete command.
log.Printf("error deleting shard %s, group ID %d, policy %s: %s", path, g.ID, rp.Name, err.Error())
}
}

// Remove from metastore.
rp.removeShardGroupByID(c.ID)
err = s.meta.mustUpdate(func(tx *metatx) error {
return tx.saveDatabase(db)
})
return
}

type deleteShardGroupCommand struct {
Database string `json:"database"`
Policy string `json:"policy"`
ID uint64 `json:"id"`
}

// User returns a user by username
// Returns nil if the user does not exist.
func (s *Server) User(name string) *User {
Expand Down Expand Up @@ -2532,6 +2642,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
err = s.applyDeleteRetentionPolicy(m)
case createShardGroupIfNotExistsMessageType:
err = s.applyCreateShardGroupIfNotExists(m)
case deleteShardGroupMessageType:
err = s.applyDeleteShardGroup(m)
case setDefaultRetentionPolicyMessageType:
err = s.applySetDefaultRetentionPolicy(m)
case createSeriesIfNotExistsMessageType:
Expand Down
Loading

0 comments on commit af41030

Please sign in to comment.