Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-create shard groups #1897

Merged
merged 7 commits into from
Mar 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
### Features
- [#1875](https://github.com/influxdb/influxdb/pull/1875): Support trace logging of Raft.
- [#1895](https://github.com/influxdb/influxdb/pull/1895): Auto-create a retention policy when a database is created.
- [#1897](https://github.com/influxdb/influxdb/pull/1897): Pre-create shard groups.

## v0.9.0-rc9 [2015-03-06]

Expand Down
15 changes: 15 additions & 0 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ const (

// DefaultJoinURLs represents the default URLs for joining a cluster.
DefaultJoinURLs = ""

// DefaultRetentionCreatePeriod represents how often the server will check to see if new
// shard groups need to be created in advance for writing
DefaultRetentionCreatePeriod = 45 * time.Minute
)

// Config represents the configuration format for the influxd binary.
Expand Down Expand Up @@ -90,6 +94,7 @@ type Config struct {
RetentionAutoCreate bool `toml:"retention-auto-create"`
RetentionCheckEnabled bool `toml:"retention-check-enabled"`
RetentionCheckPeriod Duration `toml:"retention-check-period"`
RetentionCreatePeriod Duration `toml:"retention-create-period"`
} `toml:"data"`

Cluster struct {
Expand Down Expand Up @@ -148,6 +153,7 @@ func NewConfig() *Config {
c.Data.RetentionAutoCreate = true
c.Data.RetentionCheckEnabled = true
c.Data.RetentionCheckPeriod = Duration(10 * time.Minute)
c.Data.RetentionCreatePeriod = Duration(DefaultRetentionCreatePeriod)
c.Admin.Enabled = true
c.Admin.Port = 8083
c.ContinuousQuery.RecomputePreviousN = 2
Expand Down Expand Up @@ -229,6 +235,15 @@ func (c *Config) JoinURLs() string {
}
}

// ShardGroupPreCreateCheckPeriod returns the check interval to pre-create shard groups.
// If it was not defined in the config, it defaults to DefaultShardGroupPreCreatePeriod
func (c *Config) ShardGroupPreCreateCheckPeriod() time.Duration {
if c.Data.RetentionCreatePeriod != 0 {
return time.Duration(c.Data.RetentionCreatePeriod)
}
return DefaultRetentionCreatePeriod
}

// Size represents a TOML parseable file size.
// Users can specify size using "m" for megabytes and "g" for gigabytes.
type Size int
Expand Down
7 changes: 7 additions & 0 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
log.Printf("broker enforcing retention policies with check interval of %s", interval)
}

// Start shard group pre-create
interval := config.ShardGroupPreCreateCheckPeriod()
if err := s.StartShardGroupsPreCreate(interval); err != nil {
log.Fatalf("shard group pre-create failed: %s", err.Error())
}
log.Printf("shard group pre-create with check interval of %s", interval)

// Start the server handler. Attach to broker if listening on the same port.
if s != nil {
sh := httpd.NewHandler(s, config.Authentication.Enabled, version)
Expand Down
13 changes: 8 additions & 5 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ type createShardGroupIfNotExistsCommand struct {
Policy string `json:"policy"`
Timestamp time.Time `json:"timestamp"`
}

type deleteShardGroupCommand struct {
Database string `json:"database"`
Policy string `json:"policy"`
ID uint64 `json:"id"`
}

type createUserCommand struct {
Username string `json:"username"`
Password string `json:"password"`
Expand All @@ -94,11 +96,12 @@ type setPrivilegeCommand struct {
Database string `json:"database"`
}
type createRetentionPolicyCommand struct {
Database string `json:"database"`
Name string `json:"name"`
Duration time.Duration `json:"duration"`
ReplicaN uint32 `json:"replicaN"`
SplitN uint32 `json:"splitN"`
Database string `json:"database"`
Name string `json:"name"`
Duration time.Duration `json:"duration"`
ShardGroupDuration time.Duration `json:"shardGroupDuration"`
ReplicaN uint32 `json:"replicaN"`
SplitN uint32 `json:"splitN"`
}
type updateRetentionPolicyCommand struct {
Database string `json:"database"`
Expand Down
15 changes: 10 additions & 5 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,9 @@ type RetentionPolicy struct {
// Length of time to keep data around. A zero duration means keep the data forever.
Duration time.Duration `json:"duration"`

// Length of time to create shard groups in.
ShardGroupDuration time.Duration `json:"shardGroupDuration"`

// The number of copies to make of each shard.
ReplicaN uint32 `json:"replicaN"`

Expand Down Expand Up @@ -1112,6 +1115,7 @@ func (rp *RetentionPolicy) MarshalJSON() ([]byte, error) {
var o retentionPolicyJSON
o.Name = rp.Name
o.Duration = rp.Duration
o.ShardGroupDuration = rp.ShardGroupDuration
o.ReplicaN = rp.ReplicaN
for _, g := range rp.shardGroups {
o.ShardGroups = append(o.ShardGroups, g)
Expand All @@ -1131,18 +1135,19 @@ func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error {
rp.Name = o.Name
rp.ReplicaN = o.ReplicaN
rp.Duration = o.Duration
rp.ShardGroupDuration = o.ShardGroupDuration
rp.shardGroups = o.ShardGroups

return nil
}

// retentionPolicyJSON represents an intermediate struct for JSON marshaling.
type retentionPolicyJSON struct {
Name string `json:"name"`
ReplicaN uint32 `json:"replicaN,omitempty"`
SplitN uint32 `json:"splitN,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
ShardGroups []*ShardGroup `json:"shardGroups,omitempty"`
Name string `json:"name"`
ReplicaN uint32 `json:"replicaN,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
ShardGroupDuration time.Duration `json:"shardGroupDuration"`
ShardGroups []*ShardGroup `json:"shardGroups,omitempty"`
}

// TagFilter represents a tag filter when looking up other tags or measurements.
Expand Down
117 changes: 103 additions & 14 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ const (

// Server represents a collection of metadata and raw metric data.
type Server struct {
mu sync.RWMutex
id uint64
path string
done chan struct{} // goroutine close notification
rpDone chan struct{} // retention policies goroutine close notification
mu sync.RWMutex
id uint64
path string
done chan struct{} // goroutine close notification
rpDone chan struct{} // retention policies goroutine close notification
sgpcDone chan struct{} // shard group pre-create goroutine close notification

client MessagingClient // broker client
index uint64 // highest broadcast index seen
Expand Down Expand Up @@ -207,6 +208,10 @@ func (s *Server) Close() error {
close(s.rpDone)
}

if s.sgpcDone != nil {
close(s.sgpcDone)
}

// Remove path.
s.path = ""
s.index = 0
Expand Down Expand Up @@ -336,6 +341,73 @@ func (s *Server) EnforceRetentionPolicies() {
}
}

// StartShardGroupsPreCreate launches shard group pre-create to avoid write bottlenecks.
func (s *Server) StartShardGroupsPreCreate(checkInterval time.Duration) error {
if checkInterval == 0 {
return fmt.Errorf("shard group pre-create check interval must be non-zero")
}
sgpcDone := make(chan struct{}, 0)
s.sgpcDone = sgpcDone
go func() {
for {
select {
case <-sgpcDone:
return
case <-time.After(checkInterval):
s.ShardGroupPreCreate(checkInterval)
}
}
}()
return nil
}

// ShardGroupPreCreate ensures that future shard groups and shards are created and ready for writing
// is removed from the server.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems to have trailing text.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 364 or 365? I can't seem to find it. Also /\s\+$ in Vim on that file shows nothing...

func (s *Server) ShardGroupPreCreate(checkInterval time.Duration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function needs to RLock the server object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like he's doing it below in the func

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was after Phillip pointed it out though. He fixed his on a PR between my PR so I didn't see it the first time through. Good catch!

log.Println("shard group pre-create check commencing")

// For safety, we double the check interval to ensure we have enough time to create all shard groups
// before they are needed, but as close to needed as possible.
// This is a complete punt on optimization
cutoff := time.Now().Add(checkInterval * 2).UTC()

type group struct {
Database string
Retention string
ID uint64
Timestamp time.Time
}

groups := make([]group, 0)
// Only keep the lock while walking the shard groups, so the lock is not held while
// any deletions take place across the cluster.
func() {
s.mu.RLock()
defer s.mu.RUnlock()

// Check all shard groups.
// See if they have a "future" shard group ready to write to
// If not, create the next shard group, as well as each shard for the shardGroup
for _, db := range s.databases {
for _, rp := range db.policies {
for _, g := range rp.shardGroups {
// Check to see if it is going to end before our interval
if g.EndTime.Before(cutoff) {
log.Printf("pre-creating shard group for %d, retention policy %s, database %s", g.ID, rp.Name, db.name)
groups = append(groups, group{Database: db.name, Retention: rp.Name, ID: g.ID, Timestamp: g.EndTime.Add(1 * time.Nanosecond)})
}
}
}
}
}()

for _, g := range groups {
if err := s.CreateShardGroupIfNotExists(g.Database, g.Retention, g.Timestamp); err != nil {
log.Printf("failed to request pre-creation of shard group %d for time %s: %s", g.ID, g.Timestamp, err.Error())
}
}
}

// Client retrieves the current messaging client.
func (s *Server) Client() MessagingClient {
s.mu.RLock()
Expand Down Expand Up @@ -800,8 +872,8 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err

// If no shards match then create a new one.
g := newShardGroup()
g.StartTime = c.Timestamp.Truncate(rp.Duration).UTC()
g.EndTime = g.StartTime.Add(rp.Duration).UTC()
g.StartTime = c.Timestamp.Truncate(rp.ShardGroupDuration).UTC()
g.EndTime = g.StartTime.Add(rp.ShardGroupDuration).UTC()

// Sort nodes so they're consistently assigned to the shards.
nodes := make([]*DataNode, 0, len(s.dataNodes))
Expand Down Expand Up @@ -1197,11 +1269,27 @@ func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error)

// CreateRetentionPolicy creates a retention policy for a database.
func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error {
const (
day = time.Hour * 24
month = day * 30
)

var sgd time.Duration
switch {
case rp.Duration > 6*month || rp.Duration == 0:
sgd = 7 * day
case rp.Duration > 2*day:
sgd = 1 * day
default:
sgd = 1 * time.Hour
}

c := &createRetentionPolicyCommand{
Database: database,
Name: rp.Name,
Duration: rp.Duration,
ReplicaN: rp.ReplicaN,
Database: database,
Name: rp.Name,
Duration: rp.Duration,
ShardGroupDuration: sgd,
ReplicaN: rp.ReplicaN,
}
_, err := s.broadcast(createRetentionPolicyMessageType, c)
return err
Expand All @@ -1223,9 +1311,10 @@ func (s *Server) applyCreateRetentionPolicy(m *messaging.Message) error {

// Add policy to the database.
db.policies[c.Name] = &RetentionPolicy{
Name: c.Name,
Duration: c.Duration,
ReplicaN: c.ReplicaN,
Name: c.Name,
Duration: c.Duration,
ShardGroupDuration: c.ShardGroupDuration,
ReplicaN: c.ReplicaN,
}

// Persist to metastore.
Expand Down
49 changes: 43 additions & 6 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,9 +532,10 @@ func TestServer_CreateRetentionPolicy(t *testing.T) {

// Create a retention policy on the database.
rp := &influxdb.RetentionPolicy{
Name: "bar",
Duration: time.Hour,
ReplicaN: 2,
Name: "bar",
Duration: time.Hour,
ShardGroupDuration: time.Hour,
ReplicaN: 2,
}
if err := s.CreateRetentionPolicy("foo", rp); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -706,7 +707,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) {
defer s.Close()
s.CreateDatabase("foo")

rp := &influxdb.RetentionPolicy{Name: "bar"}
rp := &influxdb.RetentionPolicy{Name: "bar", ShardGroupDuration: 7 * time.Hour * 24}
if err := s.CreateRetentionPolicy("foo", rp); err != nil {
t.Fatal(err)
} else if rp, _ := s.RetentionPolicy("foo", "bar"); rp == nil {
Expand Down Expand Up @@ -743,6 +744,42 @@ func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.
}
}

// Ensure the server pre-creates shard groups as needed.
func TestServer_PreCreateRetentionPolices(t *testing.T) {
c := NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "mypolicy", Duration: 60 * time.Minute})

// Create two shard groups for the the new retention policy -- 1 which will age out immediately
// the other in more than an hour.
s.CreateShardGroupIfNotExists("foo", "mypolicy", time.Now().Add(-2*time.Hour))

// Check the two shard groups exist.
var g []*influxdb.ShardGroup
g, err := s.ShardGroups("foo")
if err != nil {
t.Fatal(err)
} else if len(g) != 1 {
t.Fatalf("expected 1 shard group but found %d", len(g))
}

// Run shard group pre-create.
s.ShardGroupPreCreate(time.Hour)

// Ensure enforcement is in effect across restarts.
s.Restart()

// Second shard group should now be created.
g, err = s.ShardGroups("foo")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't sound like pre-creation is being tested. To simplify this test, I would remove all references to shard groups being expired. Have this test focus on pre-creation, forget about groups aging out.

if err != nil {
t.Fatal(err)
} else if len(g) != 2 {
t.Fatalf("expected 2 shard group but found %d", len(g))
}
}

// Ensure the server prohibits a zero check interval for retention policy enforcement.
func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) {
s := OpenServer(NewMessagingClient())
Expand All @@ -757,11 +794,11 @@ func TestServer_EnforceRetentionPolices(t *testing.T) {
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "mypolicy", Duration: 30 * time.Minute})
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "mypolicy", Duration: 60 * time.Minute})

// Create two shard groups for the the new retention policy -- 1 which will age out immediately
// the other in more than an hour.
s.CreateShardGroupIfNotExists("foo", "mypolicy", time.Now().Add(-1*time.Hour))
s.CreateShardGroupIfNotExists("foo", "mypolicy", time.Now().Add(-2*time.Hour))
s.CreateShardGroupIfNotExists("foo", "mypolicy", time.Now().Add(time.Hour))

// Check the two shard groups exist.
Expand Down