diff --git a/CHANGELOG.md b/CHANGELOG.md index c76a73fcac6..07c3ce234bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## v0.9.0-rc11 [unreleased] + +### Features +- [#1902](https://github.com/influxdb/influxdb/pull/1902): Enforce retention policies to have a minimum duraton. + ## v0.9.0-rc10 [2015-03-09] ### Bugfixes diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 9080d5332d1..8ad63948f40 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -371,7 +371,7 @@ func TestHandler_UpdateRetentionPolicy(t *testing.T) { s := NewHTTPServer(srvr) defer s.Close() - query := map[string]string{"q": "ALTER RETENTION POLICY bar ON foo REPLICATION 42 DURATION 1m DEFAULT"} + query := map[string]string{"q": "ALTER RETENTION POLICY bar ON foo REPLICATION 42 DURATION 2h DEFAULT"} status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "") // Verify updated policy. diff --git a/influxdb.go b/influxdb.go index 07c9b5768bb..f3191b61139 100644 --- a/influxdb.go +++ b/influxdb.go @@ -74,6 +74,9 @@ var ( // ErrRetentionPolicyNameRequired is returned using a blank shard space name. ErrRetentionPolicyNameRequired = errors.New("retention policy name required") + // ErrRetentionPolicyMinDuration is returned when creating replication policy with a duration smaller than RetenionPolicyMinDuration. + ErrRetentionPolicyMinDuration = fmt.Errorf("retention policy duration needs to be at least %s", retentionPolicyMinDuration) + // ErrDefaultRetentionPolicyNotFound is returned when using the default // policy on a database but the default has not been set. ErrDefaultRetentionPolicyNotFound = errors.New("default retention policy not found") diff --git a/server.go b/server.go index 1da49e8447c..7fbb0e86ce1 100644 --- a/server.go +++ b/server.go @@ -43,6 +43,9 @@ const ( // DefaultShardRetention is the length of time before a shard is dropped. DefaultShardRetention = 7 * (24 * time.Hour) + + // Defines the minimum duration allowed for all retention policies + retentionPolicyMinDuration = time.Hour ) // Server represents a collection of metadata and raw metric data. @@ -1269,6 +1272,11 @@ func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error) // CreateRetentionPolicy creates a retention policy for a database. func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error { + // Enforce duration of at least retentionPolicyMinDuration + if rp.Duration < retentionPolicyMinDuration { + return ErrRetentionPolicyMinDuration + } + const ( day = time.Hour * 24 month = day * 30 @@ -1335,6 +1343,11 @@ type RetentionPolicyUpdate struct { // UpdateRetentionPolicy updates an existing retention policy on a database. func (s *Server) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error { + // Enforce duration of at least retentionPolicyMinDuration + if *rpu.Duration < retentionPolicyMinDuration { + return ErrRetentionPolicyMinDuration + } + c := &updateRetentionPolicyCommand{Database: database, Name: name, Policy: rpu} _, err := s.broadcast(updateRetentionPolicyMessageType, c) return err @@ -2558,20 +2571,20 @@ func (s *Server) executeShowUsersStatement(q *influxql.ShowUsersStatement, user return &Result{Series: []*influxql.Row{row}} } -func (s *Server) executeCreateRetentionPolicyStatement(q *influxql.CreateRetentionPolicyStatement, user *User) *Result { - rp := NewRetentionPolicy(q.Name) - rp.Duration = q.Duration - rp.ReplicaN = uint32(q.Replication) +func (s *Server) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement, user *User) *Result { + rp := NewRetentionPolicy(stmt.Name) + rp.Duration = stmt.Duration + rp.ReplicaN = uint32(stmt.Replication) // Create new retention policy. - err := s.CreateRetentionPolicy(q.Database, rp) + err := s.CreateRetentionPolicy(stmt.Database, rp) if err != nil { return &Result{Err: err} } // If requested, set new policy as the default. - if q.Default { - err = s.SetDefaultRetentionPolicy(q.Database, q.Name) + if stmt.Default { + err = s.SetDefaultRetentionPolicy(stmt.Database, stmt.Name) } return &Result{Err: err} diff --git a/server_test.go b/server_test.go index bb34be4ead3..014fc9aac3d 100644 --- a/server_test.go +++ b/server_test.go @@ -556,7 +556,7 @@ func TestServer_CreateRetentionPolicy(t *testing.T) { func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != influxdb.ErrDatabaseNotFound { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != influxdb.ErrDatabaseNotFound { t.Fatal(err) } } @@ -566,7 +566,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing. s := OpenServer(NewMessagingClient()) defer s.Close() s.CreateDatabase("foo") - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: ""}); err != influxdb.ErrRetentionPolicyNameRequired { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "", Duration: time.Hour}); err != influxdb.ErrRetentionPolicyNameRequired { t.Fatal(err) } } @@ -576,8 +576,18 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() s.CreateDatabase("foo") - s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}) - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != influxdb.ErrRetentionPolicyExists { + s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}) + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != influxdb.ErrRetentionPolicyExists { + t.Fatal(err) + } +} + +// Ensure the server returns an error when creating a retention policy with a duration less than one hour. +func TestServer_CreateRetentionPolicy_ErrRetentionPolicyMinDuration(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + s.CreateDatabase("foo") + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Minute}); err != influxdb.ErrRetentionPolicyMinDuration { t.Fatal(err) } } @@ -603,7 +613,7 @@ func TestServer_AlterRetentionPolicy(t *testing.T) { } // Alter the retention policy. - duration := time.Minute + duration := 2 * time.Hour replicaN := uint32(3) rp2 := &influxdb.RetentionPolicyUpdate{ Duration: &duration, @@ -646,6 +656,59 @@ func TestServer_AlterRetentionPolicy(t *testing.T) { } } +// Ensure the server an error is returned if trying to alter a retention policy with a duration too small. +func TestServer_AlterRetentionPolicy_Minduration(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + + // Create a database. + if err := s.CreateDatabase("foo"); err != nil { + t.Fatal(err) + } + + // Create a retention policy on the database. + rp := &influxdb.RetentionPolicy{ + Name: "bar", + Duration: time.Hour, + ReplicaN: 2, + } + if err := s.CreateRetentionPolicy("foo", rp); err != nil { + t.Fatal(err) + } + + // Alter the retention policy. + duration := 2 * time.Hour + replicaN := uint32(3) + rp2 := &influxdb.RetentionPolicyUpdate{ + Duration: &duration, + ReplicaN: &replicaN, + } + if err := s.UpdateRetentionPolicy("foo", "bar", rp2); err != nil { + t.Fatal(err) + } + + // Restart the server to make sure the changes persist afterwards. + s.Restart() + + // Verify that the policy exists. + if o, err := s.RetentionPolicy("foo", "bar"); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if o == nil { + t.Fatalf("retention policy not found") + } else if o.Duration != *rp2.Duration { + t.Fatalf("retention policy mismatch:\n\texp Duration = %s\n\tgot Duration = %s\n", rp2.Duration, o.Duration) + } else if o.ReplicaN != *rp2.ReplicaN { + t.Fatalf("retention policy mismatch:\n\texp ReplicaN = %d\n\tgot ReplicaN = %d\n", rp2.ReplicaN, o.ReplicaN) + } + + // Test update duration only. + duration = time.Hour + results := s.ExecuteQuery(MustParseQuery(`ALTER RETENTION POLICY bar ON foo DURATION 1m`), "foo", nil) + if results.Error() == nil { + t.Fatalf("unexpected error: %s", results.Error()) + } +} + // Ensure the server can delete an existing retention policy. func TestServer_DeleteRetentionPolicy(t *testing.T) { s := OpenServer(NewMessagingClient()) @@ -653,7 +716,7 @@ func TestServer_DeleteRetentionPolicy(t *testing.T) { // Create a database and retention policy. s.CreateDatabase("foo") - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil { t.Fatal(err) } else if rp, _ := s.RetentionPolicy("foo", "bar"); rp == nil { t.Fatal("retention policy not created") @@ -707,7 +770,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) { defer s.Close() s.CreateDatabase("foo") - rp := &influxdb.RetentionPolicy{Name: "bar", ShardGroupDuration: 7 * time.Hour * 24} + rp := &influxdb.RetentionPolicy{Name: "bar", ShardGroupDuration: time.Hour, Duration: time.Hour} if err := s.CreateRetentionPolicy("foo", rp); err != nil { t.Fatal(err) } else if rp, _ := s.RetentionPolicy("foo", "bar"); rp == nil { @@ -1630,7 +1693,7 @@ func TestServer_CreateShardGroupIfNotExist(t *testing.T) { defer s.Close() s.CreateDatabase("foo") - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil { t.Fatal(err) } @@ -1653,7 +1716,7 @@ func TestServer_DeleteShardGroup(t *testing.T) { defer s.Close() s.CreateDatabase("foo") - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil { t.Fatal(err) } @@ -1759,13 +1822,13 @@ func TestServer_NormalizeMeasurement(t *testing.T) { // Default database with one policy. s.CreateDatabase("db0") - s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0"}) + s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0", Duration: time.Hour}) s.SetDefaultRetentionPolicy("db0", "rp0") // Another database with two policies. s.CreateDatabase("db1") - s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp1"}) - s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp2"}) + s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp1", Duration: time.Hour}) + s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp2", Duration: time.Hour}) s.SetDefaultRetentionPolicy("db1", "rp1") // Another database with no policies. @@ -1804,7 +1867,7 @@ func TestServer_NormalizeQuery(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() s.CreateDatabase("db0") - s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0"}) + s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0", Duration: time.Hour}) s.SetDefaultRetentionPolicy("db0", "rp0") // Execute the tests @@ -1828,7 +1891,7 @@ func TestServer_CreateContinuousQuery(t *testing.T) { if err := s.CreateDatabase("foo"); err != nil { t.Fatal(err) } - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil { t.Fatal(err) } s.SetDefaultRetentionPolicy("foo", "bar")