Skip to content

Commit

Permalink
Use defaults from meta package for CREATE DATABASE
Browse files Browse the repository at this point in the history
Instead of having the parser set the defaults, the command will set the
defaults so that the constants for that are actually used. This way we
can also identify which things the user provided and which ones we are
filling with default values.

This allows the meta client to be able to make smarter decisions when
determining if the user requested a conflict or if the requested
capabilities match with what is currently available. If you just say
`CREATE DATABASE WITH NAME myrp`, the user doesn't really care what the
duration of the retention policy is and just wants to use the default.
Now, we can use that information to determine if an existing retention
policy would conflict with what the user requested rather than returning
an error if a default value ever gets changed since the meta client
command can communicate intent more easily.
  • Loading branch information
jsternberg committed Aug 9, 2016
1 parent 3c12403 commit 530b00b
Show file tree
Hide file tree
Showing 19 changed files with 552 additions and 328 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#7025](https://github.com/influxdata/influxdb/issues/7025): Move the CQ interval by the group by offset.
- [#7125](https://github.com/influxdata/influxdb/pull/7125): Ensure gzip writer is closed in influx_inspect export
- [#7127](https://github.com/influxdata/influxdb/pull/7127): Concurrent series limit
- [#7119](https://github.com/influxdata/influxdb/pull/7119): Fix CREATE DATABASE when dealing with default values.

## v0.13.0 [2016-05-12]

Expand Down
2 changes: 1 addition & 1 deletion cmd/influxd/run/backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestServer_BackupAndRestore(t *testing.T) {
s := OpenServer(config)
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy(db, newRetentionPolicyInfo(rp, 1, 0)); err != nil {
if err := s.CreateDatabaseAndRetentionPolicy(db, newRetentionPolicySpec(rp, 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaClient.SetDefaultRetentionPolicy(db, rp); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions cmd/influxd/run/server_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func OpenServerWithVersion(c *run.Config, version string) *Server {
// OpenDefaultServer opens a test server with a default database & retention policy.
func OpenDefaultServer(c *run.Config) *Server {
s := OpenServer(c)
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0)); err != nil {
panic(err)
}
if err := s.MetaClient.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (s *Server) URL() string {
}

// CreateDatabaseAndRetentionPolicy will create the database and retention policy.
func (s *Server) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicyInfo) error {
func (s *Server) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicySpec) error {
if _, err := s.MetaClient.CreateDatabase(db); err != nil {
return err
} else if _, err := s.MetaClient.CreateRetentionPolicy(db, rp); err != nil {
Expand Down Expand Up @@ -250,8 +250,8 @@ func NewConfig() *run.Config {
return c
}

func newRetentionPolicyInfo(name string, rf int, duration time.Duration) *meta.RetentionPolicyInfo {
return &meta.RetentionPolicyInfo{Name: name, ReplicaN: rf, Duration: duration}
func newRetentionPolicySpec(name string, rf int, duration time.Duration) *meta.RetentionPolicySpec {
return &meta.RetentionPolicySpec{Name: name, ReplicaN: &rf, Duration: &duration}
}

func maxFloat64() string {
Expand Down Expand Up @@ -456,7 +456,7 @@ func writeTestData(s *Server, t *Test) error {
w.rp = t.retentionPolicy()
}

if err := s.CreateDatabaseAndRetentionPolicy(w.db, newRetentionPolicyInfo(w.rp, 1, 0)); err != nil {
if err := s.CreateDatabaseAndRetentionPolicy(w.db, newRetentionPolicySpec(w.rp, 1, 0)); err != nil {
return err
}
if err := s.MetaClient.SetDefaultRetentionPolicy(w.db, w.rp); err != nil {
Expand Down
88 changes: 44 additions & 44 deletions cmd/influxd/run/server_test.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions coordinator/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
type MetaClient interface {
CreateContinuousQuery(database, name, query string) error
CreateDatabase(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
CreateSubscription(database, rp, name, mode string, destinations []string) error
CreateUser(name, password string, admin bool) (*meta.UserInfo, error)
Database(name string) *meta.DatabaseInfo
Expand Down
13 changes: 7 additions & 6 deletions coordinator/meta_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
type MetaClient struct {
CreateContinuousQueryFn func(database, name, query string) error
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicyFn func(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error)
CreateRetentionPolicyFn func(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
CreateDatabaseWithRetentionPolicyFn func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error
CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error)
DatabaseFn func(name string) *meta.DatabaseInfo
Expand Down Expand Up @@ -48,13 +48,14 @@ func (c *MetaClient) CreateDatabase(name string) (*meta.DatabaseInfo, error) {
return c.CreateDatabaseFn(name)
}

func (c *MetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) {
return c.CreateDatabaseWithRetentionPolicyFn(name, rpi)
func (c *MetaClient) CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) {
return c.CreateDatabaseWithRetentionPolicyFn(name, spec)
}

func (c *MetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
return c.CreateRetentionPolicyFn(database, rpi)
func (c *MetaClient) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) {
return c.CreateRetentionPolicyFn(database, spec)
}

func (c *MetaClient) DropShard(id uint64) error {
return c.DropShardFn(id)
}
Expand Down
27 changes: 16 additions & 11 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,28 +252,33 @@ func (e *StatementExecutor) executeCreateDatabaseStatement(stmt *influxql.Create
return err
}

rpi := meta.NewRetentionPolicyInfo(stmt.RetentionPolicyName)
rpi.Duration = stmt.RetentionPolicyDuration
rpi.ReplicaN = stmt.RetentionPolicyReplication
rpi.ShardGroupDuration = stmt.RetentionPolicyShardGroupDuration
_, err := e.MetaClient.CreateDatabaseWithRetentionPolicy(stmt.Name, rpi)
spec := meta.RetentionPolicySpec{
Name: stmt.RetentionPolicyName,
Duration: stmt.RetentionPolicyDuration,
ReplicaN: stmt.RetentionPolicyReplication,
ShardGroupDuration: stmt.RetentionPolicyShardGroupDuration,
}
_, err := e.MetaClient.CreateDatabaseWithRetentionPolicy(stmt.Name, &spec)
return err
}

func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql.CreateRetentionPolicyStatement) error {
rpi := meta.NewRetentionPolicyInfo(stmt.Name)
rpi.Duration = stmt.Duration
rpi.ReplicaN = stmt.Replication
rpi.ShardGroupDuration = stmt.ShardGroupDuration
spec := meta.RetentionPolicySpec{
Name: stmt.Name,
Duration: &stmt.Duration,
ReplicaN: &stmt.Replication,
ShardGroupDuration: stmt.ShardGroupDuration,
}

// Create new retention policy.
if _, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, rpi); err != nil {
rp, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec)
if err != nil {
return err
}

// If requested, set new policy as the default.
if stmt.Default {
if err := e.MetaClient.SetDefaultRetentionPolicy(stmt.Database, stmt.Name); err != nil {
if err := e.MetaClient.SetDefaultRetentionPolicy(stmt.Database, rp.Name); err != nil {
return err
}
}
Expand Down
23 changes: 15 additions & 8 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,10 @@ type CreateDatabaseStatement struct {
RetentionPolicyCreate bool

// RetentionPolicyDuration indicates retention duration for the new database
RetentionPolicyDuration time.Duration
RetentionPolicyDuration *time.Duration

// RetentionPolicyReplication indicates retention replication for the new database
RetentionPolicyReplication int
RetentionPolicyReplication *int

// RetentionPolicyName indicates retention name for the new database
RetentionPolicyName string
Expand All @@ -481,16 +481,23 @@ func (s *CreateDatabaseStatement) String() string {
_, _ = buf.WriteString("CREATE DATABASE ")
_, _ = buf.WriteString(QuoteIdent(s.Name))
if s.RetentionPolicyCreate {
_, _ = buf.WriteString(" WITH DURATION ")
_, _ = buf.WriteString(s.RetentionPolicyDuration.String())
_, _ = buf.WriteString(" REPLICATION ")
_, _ = buf.WriteString(strconv.Itoa(s.RetentionPolicyReplication))
_, _ = buf.WriteString(" WITH")
if s.RetentionPolicyDuration != nil {
_, _ = buf.WriteString(" DURATION ")
_, _ = buf.WriteString(s.RetentionPolicyDuration.String())
}
if s.RetentionPolicyReplication != nil {
_, _ = buf.WriteString(" REPLICATION ")
_, _ = buf.WriteString(strconv.Itoa(*s.RetentionPolicyReplication))
}
if s.RetentionPolicyShardGroupDuration > 0 {
_, _ = buf.WriteString(" SHARD DURATION ")
_, _ = buf.WriteString(s.RetentionPolicyShardGroupDuration.String())
}
_, _ = buf.WriteString(" NAME ")
_, _ = buf.WriteString(QuoteIdent(s.RetentionPolicyName))
if s.RetentionPolicyName != "" {
_, _ = buf.WriteString(" NAME ")
_, _ = buf.WriteString(QuoteIdent(s.RetentionPolicyName))
}
}

return buf.String()
Expand Down
14 changes: 5 additions & 9 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,31 +1541,28 @@ func (p *Parser) parseCreateDatabaseStatement() (*CreateDatabaseStatement, error
stmt.RetentionPolicyCreate = true

// Look for "DURATION"
var rpDuration time.Duration // default is forever
if err := p.parseTokens([]Token{DURATION}); err != nil {
p.unscan()
} else {
rpDuration, err = p.parseDuration()
rpDuration, err := p.parseDuration()
if err != nil {
return nil, err
}
stmt.RetentionPolicyDuration = &rpDuration
}
stmt.RetentionPolicyDuration = rpDuration

// Look for "REPLICATION"
var rpReplication = 1 // default is 1
if err := p.parseTokens([]Token{REPLICATION}); err != nil {
p.unscan()
} else {
rpReplication, err = p.parseInt(1, math.MaxInt32)
rpReplication, err := p.parseInt(1, math.MaxInt32)
if err != nil {
return nil, err
}
stmt.RetentionPolicyReplication = &rpReplication
}
stmt.RetentionPolicyReplication = rpReplication

// Look for "SHARD"
var rpShardGroupDuration time.Duration
if err := p.parseTokens([]Token{SHARD}); err != nil {
p.unscan()
} else {
Expand All @@ -1574,11 +1571,10 @@ func (p *Parser) parseCreateDatabaseStatement() (*CreateDatabaseStatement, error
if tok != DURATION {
return nil, newParseError(tokstr(tok, lit), []string{"DURATION"}, pos)
}
rpShardGroupDuration, err = p.parseDuration()
stmt.RetentionPolicyShardGroupDuration, err = p.parseDuration()
if err != nil {
return nil, err
}
stmt.RetentionPolicyShardGroupDuration = rpShardGroupDuration
}

// Look for "NAME"
Expand Down
32 changes: 17 additions & 15 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,18 +1635,15 @@ func TestParser_ParseStatement(t *testing.T) {
s: `CREATE DATABASE testdb WITH DURATION 24h`,
stmt: &influxql.CreateDatabaseStatement{
Name: "testdb",
RetentionPolicyCreate: true,
RetentionPolicyDuration: 24 * time.Hour,
RetentionPolicyReplication: 1,
RetentionPolicyCreate: true,
RetentionPolicyDuration: duration(24 * time.Hour),
},
},
{
s: `CREATE DATABASE testdb WITH SHARD DURATION 30m`,
stmt: &influxql.CreateDatabaseStatement{
Name: "testdb",
RetentionPolicyCreate: true,
RetentionPolicyDuration: 0,
RetentionPolicyReplication: 1,
RetentionPolicyShardGroupDuration: 30 * time.Minute,
},
},
Expand All @@ -1655,27 +1652,24 @@ func TestParser_ParseStatement(t *testing.T) {
stmt: &influxql.CreateDatabaseStatement{
Name: "testdb",
RetentionPolicyCreate: true,
RetentionPolicyDuration: 0,
RetentionPolicyReplication: 2,
RetentionPolicyReplication: intptr(2),
},
},
{
s: `CREATE DATABASE testdb WITH NAME test_name`,
stmt: &influxql.CreateDatabaseStatement{
Name: "testdb",
RetentionPolicyCreate: true,
RetentionPolicyDuration: 0,
RetentionPolicyReplication: 1,
RetentionPolicyName: "test_name",
RetentionPolicyCreate: true,
RetentionPolicyName: "test_name",
},
},
{
s: `CREATE DATABASE testdb WITH DURATION 24h REPLICATION 2 NAME test_name`,
stmt: &influxql.CreateDatabaseStatement{
Name: "testdb",
RetentionPolicyCreate: true,
RetentionPolicyDuration: 24 * time.Hour,
RetentionPolicyReplication: 2,
RetentionPolicyDuration: duration(24 * time.Hour),
RetentionPolicyReplication: intptr(2),
RetentionPolicyName: "test_name",
},
},
Expand All @@ -1684,8 +1678,8 @@ func TestParser_ParseStatement(t *testing.T) {
stmt: &influxql.CreateDatabaseStatement{
Name: "testdb",
RetentionPolicyCreate: true,
RetentionPolicyDuration: 24 * time.Hour,
RetentionPolicyReplication: 2,
RetentionPolicyDuration: duration(24 * time.Hour),
RetentionPolicyReplication: intptr(2),
RetentionPolicyName: "test_name",
RetentionPolicyShardGroupDuration: 10 * time.Minute,
},
Expand Down Expand Up @@ -2812,3 +2806,11 @@ func mustParseDuration(s string) time.Duration {
}
return d
}

func duration(v time.Duration) *time.Duration {
return &v
}

func intptr(v int) *int {
return &v
}
15 changes: 10 additions & 5 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
const (
MonitorRetentionPolicy = "monitor"
MonitorRetentionPolicyDuration = 7 * 24 * time.Hour
MonitorRetentionPolicyReplicaN = 1
)

// Monitor represents an instance of the monitor system.
Expand Down Expand Up @@ -51,7 +52,7 @@ type Monitor struct {
storeInterval time.Duration

MetaClient interface {
CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
Database(name string) *meta.DatabaseInfo
}

Expand Down Expand Up @@ -352,11 +353,15 @@ func (m *Monitor) createInternalStorage() {
}

if di := m.MetaClient.Database(m.storeDatabase); di == nil {
rpi := meta.NewRetentionPolicyInfo(MonitorRetentionPolicy)
rpi.Duration = MonitorRetentionPolicyDuration
rpi.ReplicaN = 1
duration := MonitorRetentionPolicyDuration
replicaN := MonitorRetentionPolicyReplicaN
spec := meta.RetentionPolicySpec{
Name: MonitorRetentionPolicy,
Duration: &duration,
ReplicaN: &replicaN,
}

if _, err := m.MetaClient.CreateDatabaseWithRetentionPolicy(m.storeDatabase, rpi); err != nil {
if _, err := m.MetaClient.CreateDatabaseWithRetentionPolicy(m.storeDatabase, &spec); err != nil {
m.Logger.Printf("failed to create database '%s', failed to create storage: %s",
m.storeDatabase, err.Error())
return
Expand Down
12 changes: 6 additions & 6 deletions services/graphite/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type Service struct {
}
MetaClient interface {
CreateDatabase(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
Database(name string) *meta.DatabaseInfo
RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error)
}
Expand Down Expand Up @@ -139,14 +139,14 @@ func (s *Service) Open() error {

if db := s.MetaClient.Database(s.database); db != nil {
if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil {
rpi := meta.NewRetentionPolicyInfo(s.retentionPolicy)
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, rpi); err != nil {
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec); err != nil {
s.logger.Printf("Failed to ensure target retention policy %s exists: %s", s.database, err.Error())
}
}
} else {
rpi := meta.NewRetentionPolicyInfo(s.retentionPolicy)
if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, rpi); err != nil {
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
if _, err := s.MetaClient.CreateDatabaseWithRetentionPolicy(s.database, &spec); err != nil {
s.logger.Printf("Failed to ensure target database %s exists: %s", s.database, err.Error())
return err
}
Expand Down
4 changes: 2 additions & 2 deletions services/graphite/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ func (d *DatabaseCreator) CreateDatabase(name string) (*meta.DatabaseInfo, error
return nil, nil
}

func (d *DatabaseCreator) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
func (d *DatabaseCreator) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) {
return nil, nil
}

func (d *DatabaseCreator) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) {
func (d *DatabaseCreator) CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) {
d.Created = true
return nil, nil
}
Expand Down
Loading

0 comments on commit 530b00b

Please sign in to comment.