From b8fb723967fc623a67eaa1aef96cbd9e8340101b Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 10 Mar 2015 10:49:54 -0600 Subject: [PATCH 1/6] Enforce retention policy minimum (currently 1 hour) --- httpd/handler_test.go | 2 +- influxdb.go | 7 +++++++ server.go | 24 +++++++++++++++++------- server_test.go | 38 ++++++++++++++++++++++++-------------- 4 files changed, 49 insertions(+), 22 deletions(-) diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 9080d5332d1..8ad63948f40 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -371,7 +371,7 @@ func TestHandler_UpdateRetentionPolicy(t *testing.T) { s := NewHTTPServer(srvr) defer s.Close() - query := map[string]string{"q": "ALTER RETENTION POLICY bar ON foo REPLICATION 42 DURATION 1m DEFAULT"} + query := map[string]string{"q": "ALTER RETENTION POLICY bar ON foo REPLICATION 42 DURATION 2h DEFAULT"} status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "") // Verify updated policy. diff --git a/influxdb.go b/influxdb.go index 07c9b5768bb..949d5762a64 100644 --- a/influxdb.go +++ b/influxdb.go @@ -10,6 +10,10 @@ import ( "github.com/influxdb/influxdb/client" ) +const ( + retentionPolicyMinDuration = time.Hour +) + var ( // ErrServerOpen is returned when opening an already open server. ErrServerOpen = errors.New("server already open") @@ -74,6 +78,9 @@ var ( // ErrRetentionPolicyNameRequired is returned using a blank shard space name. ErrRetentionPolicyNameRequired = errors.New("retention policy name required") + // ErrRetentionPolicyMinDuration is returned when creating replicaiton policy with a duration smaller than RetenionPolicyMinDuration. + ErrRetentionPolicyMinDuration = fmt.Errorf("retention policy duration needs to be at least %s", retentionPolicyMinDuration) + // ErrDefaultRetentionPolicyNotFound is returned when using the default // policy on a database but the default has not been set. ErrDefaultRetentionPolicyNotFound = errors.New("default retention policy not found") diff --git a/server.go b/server.go index 1da49e8447c..5d12e447975 100644 --- a/server.go +++ b/server.go @@ -1269,6 +1269,11 @@ func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error) // CreateRetentionPolicy creates a retention policy for a database. func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error { + // Enforce duration of at least 1 hour + if rp.Duration < retentionPolicyMinDuration { + return ErrRetentionPolicyMinDuration + } + const ( day = time.Hour * 24 month = day * 30 @@ -1335,6 +1340,11 @@ type RetentionPolicyUpdate struct { // UpdateRetentionPolicy updates an existing retention policy on a database. func (s *Server) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error { + // Enforce duration of at least 1 hour + if *rpu.Duration < retentionPolicyMinDuration { + return ErrRetentionPolicyMinDuration + } + c := &updateRetentionPolicyCommand{Database: database, Name: name, Policy: rpu} _, err := s.broadcast(updateRetentionPolicyMessageType, c) return err @@ -2558,20 +2568,20 @@ func (s *Server) executeShowUsersStatement(q *influxql.ShowUsersStatement, user return &Result{Series: []*influxql.Row{row}} } -func (s *Server) executeCreateRetentionPolicyStatement(q *influxql.CreateRetentionPolicyStatement, user *User) *Result { - rp := NewRetentionPolicy(q.Name) - rp.Duration = q.Duration - rp.ReplicaN = uint32(q.Replication) +func (s *Server) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement, user *User) *Result { + rp := NewRetentionPolicy(stmt.Name) + rp.Duration = stmt.Duration + rp.ReplicaN = uint32(stmt.Replication) // Create new retention policy. - err := s.CreateRetentionPolicy(q.Database, rp) + err := s.CreateRetentionPolicy(stmt.Database, rp) if err != nil { return &Result{Err: err} } // If requested, set new policy as the default. - if q.Default { - err = s.SetDefaultRetentionPolicy(q.Database, q.Name) + if stmt.Default { + err = s.SetDefaultRetentionPolicy(stmt.Database, stmt.Name) } return &Result{Err: err} diff --git a/server_test.go b/server_test.go index bb34be4ead3..b81763c42d4 100644 --- a/server_test.go +++ b/server_test.go @@ -556,7 +556,7 @@ func TestServer_CreateRetentionPolicy(t *testing.T) { func TestServer_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != influxdb.ErrDatabaseNotFound { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != influxdb.ErrDatabaseNotFound { t.Fatal(err) } } @@ -566,7 +566,7 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing. s := OpenServer(NewMessagingClient()) defer s.Close() s.CreateDatabase("foo") - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: ""}); err != influxdb.ErrRetentionPolicyNameRequired { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "", Duration: time.Hour}); err != influxdb.ErrRetentionPolicyNameRequired { t.Fatal(err) } } @@ -576,8 +576,18 @@ func TestServer_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() s.CreateDatabase("foo") - s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}) - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != influxdb.ErrRetentionPolicyExists { + s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}) + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != influxdb.ErrRetentionPolicyExists { + t.Fatal(err) + } +} + +// Ensure the server returns an error when creating a retention policy with a duration less than one hour. +func TestServer_CreateRetentionPolicy_ErrRetentionPolicyMinDuration(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + s.CreateDatabase("foo") + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Minute}); err != influxdb.ErrRetentionPolicyMinDuration { t.Fatal(err) } } @@ -603,7 +613,7 @@ func TestServer_AlterRetentionPolicy(t *testing.T) { } // Alter the retention policy. - duration := time.Minute + duration := 2 * time.Hour replicaN := uint32(3) rp2 := &influxdb.RetentionPolicyUpdate{ Duration: &duration, @@ -653,7 +663,7 @@ func TestServer_DeleteRetentionPolicy(t *testing.T) { // Create a database and retention policy. s.CreateDatabase("foo") - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil { t.Fatal(err) } else if rp, _ := s.RetentionPolicy("foo", "bar"); rp == nil { t.Fatal("retention policy not created") @@ -707,7 +717,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) { defer s.Close() s.CreateDatabase("foo") - rp := &influxdb.RetentionPolicy{Name: "bar", ShardGroupDuration: 7 * time.Hour * 24} + rp := &influxdb.RetentionPolicy{Name: "bar", ShardGroupDuration: time.Hour, Duration: time.Hour} if err := s.CreateRetentionPolicy("foo", rp); err != nil { t.Fatal(err) } else if rp, _ := s.RetentionPolicy("foo", "bar"); rp == nil { @@ -1630,7 +1640,7 @@ func TestServer_CreateShardGroupIfNotExist(t *testing.T) { defer s.Close() s.CreateDatabase("foo") - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil { t.Fatal(err) } @@ -1653,7 +1663,7 @@ func TestServer_DeleteShardGroup(t *testing.T) { defer s.Close() s.CreateDatabase("foo") - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil { t.Fatal(err) } @@ -1759,13 +1769,13 @@ func TestServer_NormalizeMeasurement(t *testing.T) { // Default database with one policy. s.CreateDatabase("db0") - s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0"}) + s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0", Duration: time.Hour}) s.SetDefaultRetentionPolicy("db0", "rp0") // Another database with two policies. s.CreateDatabase("db1") - s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp1"}) - s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp2"}) + s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp1", Duration: time.Hour}) + s.CreateRetentionPolicy("db1", &influxdb.RetentionPolicy{Name: "rp2", Duration: time.Hour}) s.SetDefaultRetentionPolicy("db1", "rp1") // Another database with no policies. @@ -1804,7 +1814,7 @@ func TestServer_NormalizeQuery(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() s.CreateDatabase("db0") - s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0"}) + s.CreateRetentionPolicy("db0", &influxdb.RetentionPolicy{Name: "rp0", Duration: time.Hour}) s.SetDefaultRetentionPolicy("db0", "rp0") // Execute the tests @@ -1828,7 +1838,7 @@ func TestServer_CreateContinuousQuery(t *testing.T) { if err := s.CreateDatabase("foo"); err != nil { t.Fatal(err) } - if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil { + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil { t.Fatal(err) } s.SetDefaultRetentionPolicy("foo", "bar") From 91487cc8dd86fda4b1f61ba239ece8045c3c50ee Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 10 Mar 2015 10:52:23 -0600 Subject: [PATCH 2/6] update comment --- server.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server.go b/server.go index 5d12e447975..dbbda988a27 100644 --- a/server.go +++ b/server.go @@ -1269,7 +1269,7 @@ func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error) // CreateRetentionPolicy creates a retention policy for a database. func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error { - // Enforce duration of at least 1 hour + // Enforce duration of at least retentionPolicyMinDuration if rp.Duration < retentionPolicyMinDuration { return ErrRetentionPolicyMinDuration } @@ -1340,7 +1340,7 @@ type RetentionPolicyUpdate struct { // UpdateRetentionPolicy updates an existing retention policy on a database. func (s *Server) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate) error { - // Enforce duration of at least 1 hour + // Enforce duration of at least retentionPolicyMinDuration if *rpu.Duration < retentionPolicyMinDuration { return ErrRetentionPolicyMinDuration } From a203e3cf92ffa0113c4c50efac883851c702cb2a Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 10 Mar 2015 10:55:08 -0600 Subject: [PATCH 3/6] add test for alter retention policy --- server_test.go | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/server_test.go b/server_test.go index b81763c42d4..014fc9aac3d 100644 --- a/server_test.go +++ b/server_test.go @@ -656,6 +656,59 @@ func TestServer_AlterRetentionPolicy(t *testing.T) { } } +// Ensure the server an error is returned if trying to alter a retention policy with a duration too small. +func TestServer_AlterRetentionPolicy_Minduration(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + + // Create a database. + if err := s.CreateDatabase("foo"); err != nil { + t.Fatal(err) + } + + // Create a retention policy on the database. + rp := &influxdb.RetentionPolicy{ + Name: "bar", + Duration: time.Hour, + ReplicaN: 2, + } + if err := s.CreateRetentionPolicy("foo", rp); err != nil { + t.Fatal(err) + } + + // Alter the retention policy. + duration := 2 * time.Hour + replicaN := uint32(3) + rp2 := &influxdb.RetentionPolicyUpdate{ + Duration: &duration, + ReplicaN: &replicaN, + } + if err := s.UpdateRetentionPolicy("foo", "bar", rp2); err != nil { + t.Fatal(err) + } + + // Restart the server to make sure the changes persist afterwards. + s.Restart() + + // Verify that the policy exists. + if o, err := s.RetentionPolicy("foo", "bar"); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if o == nil { + t.Fatalf("retention policy not found") + } else if o.Duration != *rp2.Duration { + t.Fatalf("retention policy mismatch:\n\texp Duration = %s\n\tgot Duration = %s\n", rp2.Duration, o.Duration) + } else if o.ReplicaN != *rp2.ReplicaN { + t.Fatalf("retention policy mismatch:\n\texp ReplicaN = %d\n\tgot ReplicaN = %d\n", rp2.ReplicaN, o.ReplicaN) + } + + // Test update duration only. + duration = time.Hour + results := s.ExecuteQuery(MustParseQuery(`ALTER RETENTION POLICY bar ON foo DURATION 1m`), "foo", nil) + if results.Error() == nil { + t.Fatalf("unexpected error: %s", results.Error()) + } +} + // Ensure the server can delete an existing retention policy. func TestServer_DeleteRetentionPolicy(t *testing.T) { s := OpenServer(NewMessagingClient()) From 5455a3143077428e45b2f7f491e56c383123f33c Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 10 Mar 2015 10:57:17 -0600 Subject: [PATCH 4/6] update changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c76a73fcac6..07c3ce234bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## v0.9.0-rc11 [unreleased] + +### Features +- [#1902](https://github.com/influxdb/influxdb/pull/1902): Enforce retention policies to have a minimum duraton. + ## v0.9.0-rc10 [2015-03-09] ### Bugfixes From db067b6a2d2d1f369d475ec0ad804f395f30b725 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 10 Mar 2015 12:24:26 -0600 Subject: [PATCH 5/6] move constant --- influxdb.go | 4 ---- server.go | 3 +++ 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/influxdb.go b/influxdb.go index 949d5762a64..83dfd1666c9 100644 --- a/influxdb.go +++ b/influxdb.go @@ -10,10 +10,6 @@ import ( "github.com/influxdb/influxdb/client" ) -const ( - retentionPolicyMinDuration = time.Hour -) - var ( // ErrServerOpen is returned when opening an already open server. ErrServerOpen = errors.New("server already open") diff --git a/server.go b/server.go index dbbda988a27..7fbb0e86ce1 100644 --- a/server.go +++ b/server.go @@ -43,6 +43,9 @@ const ( // DefaultShardRetention is the length of time before a shard is dropped. DefaultShardRetention = 7 * (24 * time.Hour) + + // Defines the minimum duration allowed for all retention policies + retentionPolicyMinDuration = time.Hour ) // Server represents a collection of metadata and raw metric data. From 410e357b1b6acc1e6dabf41de73b5722f24e53f5 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 10 Mar 2015 12:38:31 -0600 Subject: [PATCH 6/6] spelling --- influxdb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb.go b/influxdb.go index 83dfd1666c9..f3191b61139 100644 --- a/influxdb.go +++ b/influxdb.go @@ -74,7 +74,7 @@ var ( // ErrRetentionPolicyNameRequired is returned using a blank shard space name. ErrRetentionPolicyNameRequired = errors.New("retention policy name required") - // ErrRetentionPolicyMinDuration is returned when creating replicaiton policy with a duration smaller than RetenionPolicyMinDuration. + // ErrRetentionPolicyMinDuration is returned when creating replication policy with a duration smaller than RetenionPolicyMinDuration. ErrRetentionPolicyMinDuration = fmt.Errorf("retention policy duration needs to be at least %s", retentionPolicyMinDuration) // ErrDefaultRetentionPolicyNotFound is returned when using the default