-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pre-create shard groups #1897
Pre-create shard groups #1897
Changes from all commits
67f03ef
b6e7210
7495e6e
c150a8c
686d6b9
35bcd48
0d634c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,11 +47,12 @@ const ( | |
|
||
// Server represents a collection of metadata and raw metric data. | ||
type Server struct { | ||
mu sync.RWMutex | ||
id uint64 | ||
path string | ||
done chan struct{} // goroutine close notification | ||
rpDone chan struct{} // retention policies goroutine close notification | ||
mu sync.RWMutex | ||
id uint64 | ||
path string | ||
done chan struct{} // goroutine close notification | ||
rpDone chan struct{} // retention policies goroutine close notification | ||
sgpcDone chan struct{} // shard group pre-create goroutine close notification | ||
|
||
client MessagingClient // broker client | ||
index uint64 // highest broadcast index seen | ||
|
@@ -207,6 +208,10 @@ func (s *Server) Close() error { | |
close(s.rpDone) | ||
} | ||
|
||
if s.sgpcDone != nil { | ||
close(s.sgpcDone) | ||
} | ||
|
||
// Remove path. | ||
s.path = "" | ||
s.index = 0 | ||
|
@@ -336,6 +341,73 @@ func (s *Server) EnforceRetentionPolicies() { | |
} | ||
} | ||
|
||
// StartShardGroupsPreCreate launches shard group pre-create to avoid write bottlenecks. | ||
func (s *Server) StartShardGroupsPreCreate(checkInterval time.Duration) error { | ||
if checkInterval == 0 { | ||
return fmt.Errorf("shard group pre-create check interval must be non-zero") | ||
} | ||
sgpcDone := make(chan struct{}, 0) | ||
s.sgpcDone = sgpcDone | ||
go func() { | ||
for { | ||
select { | ||
case <-sgpcDone: | ||
return | ||
case <-time.After(checkInterval): | ||
s.ShardGroupPreCreate(checkInterval) | ||
} | ||
} | ||
}() | ||
return nil | ||
} | ||
|
||
// ShardGroupPreCreate ensures that future shard groups and shards are created and ready for writing | ||
// is removed from the server. | ||
func (s *Server) ShardGroupPreCreate(checkInterval time.Duration) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function needs to RLock the server object. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like he's doing it below in the func There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was after Phillip pointed it out though. He fixed his on a PR between my PR so I didn't see it the first time through. Good catch! |
||
log.Println("shard group pre-create check commencing") | ||
|
||
// For safety, we double the check interval to ensure we have enough time to create all shard groups | ||
// before they are needed, but as close to needed as possible. | ||
// This is a complete punt on optimization | ||
cutoff := time.Now().Add(checkInterval * 2).UTC() | ||
|
||
type group struct { | ||
Database string | ||
Retention string | ||
ID uint64 | ||
Timestamp time.Time | ||
} | ||
|
||
groups := make([]group, 0) | ||
// Only keep the lock while walking the shard groups, so the lock is not held while | ||
// any deletions take place across the cluster. | ||
func() { | ||
s.mu.RLock() | ||
defer s.mu.RUnlock() | ||
|
||
// Check all shard groups. | ||
// See if they have a "future" shard group ready to write to | ||
// If not, create the next shard group, as well as each shard for the shardGroup | ||
for _, db := range s.databases { | ||
for _, rp := range db.policies { | ||
for _, g := range rp.shardGroups { | ||
// Check to see if it is going to end before our interval | ||
if g.EndTime.Before(cutoff) { | ||
log.Printf("pre-creating shard group for %d, retention policy %s, database %s", g.ID, rp.Name, db.name) | ||
groups = append(groups, group{Database: db.name, Retention: rp.Name, ID: g.ID, Timestamp: g.EndTime.Add(1 * time.Nanosecond)}) | ||
} | ||
} | ||
} | ||
} | ||
}() | ||
|
||
for _, g := range groups { | ||
if err := s.CreateShardGroupIfNotExists(g.Database, g.Retention, g.Timestamp); err != nil { | ||
log.Printf("failed to request pre-creation of shard group %d for time %s: %s", g.ID, g.Timestamp, err.Error()) | ||
} | ||
} | ||
} | ||
|
||
// Client retrieves the current messaging client. | ||
func (s *Server) Client() MessagingClient { | ||
s.mu.RLock() | ||
|
@@ -800,8 +872,8 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err | |
|
||
// If no shards match then create a new one. | ||
g := newShardGroup() | ||
g.StartTime = c.Timestamp.Truncate(rp.Duration).UTC() | ||
g.EndTime = g.StartTime.Add(rp.Duration).UTC() | ||
g.StartTime = c.Timestamp.Truncate(rp.ShardGroupDuration).UTC() | ||
g.EndTime = g.StartTime.Add(rp.ShardGroupDuration).UTC() | ||
|
||
// Sort nodes so they're consistently assigned to the shards. | ||
nodes := make([]*DataNode, 0, len(s.dataNodes)) | ||
|
@@ -1197,11 +1269,27 @@ func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error) | |
|
||
// CreateRetentionPolicy creates a retention policy for a database. | ||
func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error { | ||
const ( | ||
day = time.Hour * 24 | ||
month = day * 30 | ||
) | ||
|
||
var sgd time.Duration | ||
switch { | ||
case rp.Duration > 6*month || rp.Duration == 0: | ||
sgd = 7 * day | ||
case rp.Duration > 2*day: | ||
sgd = 1 * day | ||
default: | ||
sgd = 1 * time.Hour | ||
} | ||
|
||
c := &createRetentionPolicyCommand{ | ||
Database: database, | ||
Name: rp.Name, | ||
Duration: rp.Duration, | ||
ReplicaN: rp.ReplicaN, | ||
Database: database, | ||
Name: rp.Name, | ||
Duration: rp.Duration, | ||
ShardGroupDuration: sgd, | ||
ReplicaN: rp.ReplicaN, | ||
} | ||
_, err := s.broadcast(createRetentionPolicyMessageType, c) | ||
return err | ||
|
@@ -1223,9 +1311,10 @@ func (s *Server) applyCreateRetentionPolicy(m *messaging.Message) error { | |
|
||
// Add policy to the database. | ||
db.policies[c.Name] = &RetentionPolicy{ | ||
Name: c.Name, | ||
Duration: c.Duration, | ||
ReplicaN: c.ReplicaN, | ||
Name: c.Name, | ||
Duration: c.Duration, | ||
ShardGroupDuration: c.ShardGroupDuration, | ||
ReplicaN: c.ReplicaN, | ||
} | ||
|
||
// Persist to metastore. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -532,9 +532,10 @@ func TestServer_CreateRetentionPolicy(t *testing.T) { | |
|
||
// Create a retention policy on the database. | ||
rp := &influxdb.RetentionPolicy{ | ||
Name: "bar", | ||
Duration: time.Hour, | ||
ReplicaN: 2, | ||
Name: "bar", | ||
Duration: time.Hour, | ||
ShardGroupDuration: time.Hour, | ||
ReplicaN: 2, | ||
} | ||
if err := s.CreateRetentionPolicy("foo", rp); err != nil { | ||
t.Fatal(err) | ||
|
@@ -706,7 +707,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) { | |
defer s.Close() | ||
s.CreateDatabase("foo") | ||
|
||
rp := &influxdb.RetentionPolicy{Name: "bar"} | ||
rp := &influxdb.RetentionPolicy{Name: "bar", ShardGroupDuration: 7 * time.Hour * 24} | ||
if err := s.CreateRetentionPolicy("foo", rp); err != nil { | ||
t.Fatal(err) | ||
} else if rp, _ := s.RetentionPolicy("foo", "bar"); rp == nil { | ||
|
@@ -743,6 +744,42 @@ func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing. | |
} | ||
} | ||
|
||
// Ensure the server pre-creates shard groups as needed. | ||
func TestServer_PreCreateRetentionPolices(t *testing.T) { | ||
c := NewMessagingClient() | ||
s := OpenServer(c) | ||
defer s.Close() | ||
s.CreateDatabase("foo") | ||
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "mypolicy", Duration: 60 * time.Minute}) | ||
|
||
// Create two shard groups for the the new retention policy -- 1 which will age out immediately | ||
// the other in more than an hour. | ||
s.CreateShardGroupIfNotExists("foo", "mypolicy", time.Now().Add(-2*time.Hour)) | ||
|
||
// Check the two shard groups exist. | ||
var g []*influxdb.ShardGroup | ||
g, err := s.ShardGroups("foo") | ||
if err != nil { | ||
t.Fatal(err) | ||
} else if len(g) != 1 { | ||
t.Fatalf("expected 1 shard group but found %d", len(g)) | ||
} | ||
|
||
// Run shard group pre-create. | ||
s.ShardGroupPreCreate(time.Hour) | ||
|
||
// Ensure enforcement is in effect across restarts. | ||
s.Restart() | ||
|
||
// Second shard group should now be created. | ||
g, err = s.ShardGroups("foo") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment doesn't sound like pre-creation is being tested. To simplify this test, I would remove all references to shard groups being expired. Have this test focus on pre-creation, forget about groups aging out. |
||
if err != nil { | ||
t.Fatal(err) | ||
} else if len(g) != 2 { | ||
t.Fatalf("expected 2 shard group but found %d", len(g)) | ||
} | ||
} | ||
|
||
// Ensure the server prohibits a zero check interval for retention policy enforcement. | ||
func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) { | ||
s := OpenServer(NewMessagingClient()) | ||
|
@@ -757,11 +794,11 @@ func TestServer_EnforceRetentionPolices(t *testing.T) { | |
s := OpenServer(c) | ||
defer s.Close() | ||
s.CreateDatabase("foo") | ||
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "mypolicy", Duration: 30 * time.Minute}) | ||
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "mypolicy", Duration: 60 * time.Minute}) | ||
|
||
// Create two shard groups for the the new retention policy -- 1 which will age out immediately | ||
// the other in more than an hour. | ||
s.CreateShardGroupIfNotExists("foo", "mypolicy", time.Now().Add(-1*time.Hour)) | ||
s.CreateShardGroupIfNotExists("foo", "mypolicy", time.Now().Add(-2*time.Hour)) | ||
s.CreateShardGroupIfNotExists("foo", "mypolicy", time.Now().Add(time.Hour)) | ||
|
||
// Check the two shard groups exist. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment seems to have trailing text.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 364 or 365? I can't seem to find it. Also
/\s\+$
in Vim on that file shows nothing...