Skip to content

Commit

Permalink
Require replication factor to equal cluster size
Browse files Browse the repository at this point in the history
This commit adds a requirement that retention policies must
be fully committed. This means that the replication factor
must be set to whatever the node count is in the cluster.

Fixes #2738
  • Loading branch information
benbjohnson committed Jun 11, 2015
1 parent 0b8743f commit 52696ae
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- [#2900](https://github.com/influxdb/influxdb/pull/2900): Use openTSDB input defaults where necessary
- [#2886](https://github.com/influxdb/influxdb/issues/2886): Refactor backup & restore
- [#2804](https://github.com/influxdb/influxdb/pull/2804): BREAKING: change time literals to be single quoted in InfluxQL. Thanks @nvcook42!
- [#2906](https://github.com/influxdb/influxdb/pull/2906): Restrict replication factor to the cluster size

## v0.9.0-rc33 [2015-06-09]

Expand Down
2 changes: 2 additions & 0 deletions meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInf
// Validate retention policy.
if rpi.Name == "" {
return ErrRetentionPolicyNameRequired
} else if rpi.ReplicaN != len(data.Nodes) {
return ErrReplicationFactorMismatch
}

// Find database.
Expand Down
29 changes: 20 additions & 9 deletions meta/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestData_DropDatabase(t *testing.T) {

// Ensure a retention policy can be created.
func TestData_CreateRetentionPolicy(t *testing.T) {
var data meta.Data
data := meta.Data{Nodes: []meta.NodeInfo{{ID: 1}, {ID: 2}}}
if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -121,16 +121,28 @@ func TestData_CreateRetentionPolicy(t *testing.T) {

// Ensure that creating a policy without a name returns an error.
func TestData_CreateRetentionPolicy_ErrNameRequired(t *testing.T) {
var data meta.Data
data := meta.Data{Nodes: []meta.NodeInfo{{ID: 1}}}
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: ""}); err != meta.ErrRetentionPolicyNameRequired {
t.Fatalf("unexpected error: %s", err)
}
}

// Ensure that creating a policy with a replication factor that doesn't match
// the number of nodes in the cluster will return an error. This is a temporary
// restriction until v0.9.1 is released.
func TestData_CreateRetentionPolicy_ErrReplicationFactorMismatch(t *testing.T) {
data := meta.Data{
Nodes: []meta.NodeInfo{{ID: 1}, {ID: 2}, {ID: 3}},
}
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2}); err != meta.ErrReplicationFactorMismatch {
t.Fatalf("unexpected error: %s", err)
}
}

// Ensure that creating a retention policy on a non-existent database returns an error.
func TestData_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) {
var data meta.Data
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != meta.ErrDatabaseNotFound {
data := meta.Data{Nodes: []meta.NodeInfo{{ID: 1}}}
if err := data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err != meta.ErrDatabaseNotFound {
t.Fatalf("unexpected error: %s", err)
}
}
Expand Down Expand Up @@ -275,7 +287,7 @@ func TestData_CreateShardGroup(t *testing.T) {
t.Fatal(err)
} else if err = data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", Duration: 1 * time.Hour}); err != nil {
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
}

Expand All @@ -290,8 +302,7 @@ func TestData_CreateShardGroup(t *testing.T) {
StartTime: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC),
EndTime: time.Date(2000, time.January, 1, 1, 0, 0, 0, time.UTC),
Shards: []meta.ShardInfo{
{ID: 1, OwnerIDs: []uint64{1}},
{ID: 2, OwnerIDs: []uint64{2}},
{ID: 1, OwnerIDs: []uint64{1, 2}},
},
}) {
t.Fatalf("unexpected shard group: %#v", sgi)
Expand All @@ -307,7 +318,7 @@ func TestData_ShardGroupExpiredDeleted(t *testing.T) {
t.Fatal(err)
} else if err = data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", Duration: 1 * time.Hour}); err != nil {
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -403,7 +414,7 @@ func TestData_DeleteShardGroup(t *testing.T) {
t.Fatal(err)
} else if err := data.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
} else if err = data.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err != nil {
t.Fatal(err)
} else if err := data.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
Expand Down
5 changes: 5 additions & 0 deletions meta/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ var (
// ErrRetentionPolicyDurationTooLow is returned when updating a retention
// policy that has a duration lower than the allowed minimum.
ErrRetentionPolicyDurationTooLow = errors.New("retention policy duration too low")

// ErrReplicationFactorMismatch is returned when the replication factor
// does not match the number of nodes in the cluster. This is a temporary
// restriction until v0.9.1 is released.
ErrReplicationFactorMismatch = errors.New("replication factor must match cluster size; this limitation will be lifted in v0.9.1")
)

var (
Expand Down
25 changes: 13 additions & 12 deletions meta/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,10 @@ func TestStore_CreateRetentionPolicy(t *testing.T) {
s := MustOpenStore()
defer s.Close()

// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
// Create an additional nodes and database.
if _, err := s.CreateNode("hostX"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -275,7 +277,7 @@ func TestStore_DropRetentionPolicy(t *testing.T) {

// Create policies.
for i := 0; i < 3; i++ {
if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: fmt.Sprintf("rp%d", i)}); err != nil {
if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: fmt.Sprintf("rp%d", i), ReplicaN: 1}); err != nil {
t.Fatal(err)
}
}
Expand All @@ -286,13 +288,13 @@ func TestStore_DropRetentionPolicy(t *testing.T) {
}

// Ensure remaining policies are correct.
if rpi, _ := s.RetentionPolicy("db0", "rp0"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp0", ShardGroupDuration: 7 * 24 * time.Hour}) {
if rpi, _ := s.RetentionPolicy("db0", "rp0"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, ShardGroupDuration: 7 * 24 * time.Hour}) {
t.Fatalf("unexpected policy(0): %#v", rpi)
}
if rpi, _ := s.RetentionPolicy("db0", "rp1"); rpi != nil {
t.Fatalf("unexpected policy(1): %#v", rpi)
}
if rpi, _ := s.RetentionPolicy("db0", "rp2"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp2", ShardGroupDuration: 7 * 24 * time.Hour}) {
if rpi, _ := s.RetentionPolicy("db0", "rp2"); !reflect.DeepEqual(rpi, &meta.RetentionPolicyInfo{Name: "rp2", ReplicaN: 1, ShardGroupDuration: 7 * 24 * time.Hour}) {
t.Fatalf("unexpected policy(2): %#v", rpi)
}
}
Expand All @@ -306,7 +308,7 @@ func TestStore_SetDefaultRetentionPolicy(t *testing.T) {
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
} else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err != nil {
t.Fatal(err)
}

Expand All @@ -330,15 +332,14 @@ func TestStore_UpdateRetentionPolicy(t *testing.T) {
// Create database.
if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0"}); err != nil {
} else if _, err := s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1}); err != nil {
t.Fatal(err)
}

// Update policy.
var rpu meta.RetentionPolicyUpdate
rpu.SetName("rp1")
rpu.SetDuration(10 * time.Hour)
rpu.SetReplicaN(3)
if err := s.UpdateRetentionPolicy("db0", "rp0", &rpu); err != nil {
t.Fatal(err)
}
Expand All @@ -350,7 +351,7 @@ func TestStore_UpdateRetentionPolicy(t *testing.T) {
Name: "rp1",
Duration: 10 * time.Hour,
ShardGroupDuration: 7 * 24 * time.Hour,
ReplicaN: 3,
ReplicaN: 1,
}) {
t.Fatalf("unexpected policy: %#v", rpi)
}
Expand All @@ -367,7 +368,7 @@ func TestStore_CreateShardGroup(t *testing.T) {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil {
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
}

Expand All @@ -390,7 +391,7 @@ func TestStore_DeleteShardGroup(t *testing.T) {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil {
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
} else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
Expand All @@ -413,7 +414,7 @@ func TestStore_PrecreateShardGroup(t *testing.T) {
t.Fatal(err)
} else if _, err := s.CreateDatabase("db0"); err != nil {
t.Fatal(err)
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 1, Duration: 1 * time.Hour}); err != nil {
} else if _, err = s.CreateRetentionPolicy("db0", &meta.RetentionPolicyInfo{Name: "rp0", ReplicaN: 2, Duration: 1 * time.Hour}); err != nil {
t.Fatal(err)
} else if _, err := s.CreateShardGroup("db0", "rp0", time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)); err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 52696ae

Please sign in to comment.