diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 8a25d3c744e..14767965083 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -38,21 +38,46 @@ func TestServer_DatabaseCommands(t *testing.T) { command: `CREATE DATABASE db0`, exp: `{"results":[{}]}`, }, + &Query{ + name: "create database with retention duration should succeed", + command: `CREATE DATABASE db0_r WITH DURATION 24h REPLICATION 2 NAME db0_r_policy`, + exp: `{"results":[{}]}`, + }, &Query{ name: "create database should error with bad name", command: `CREATE DATABASE 0xdb0`, exp: `{"error":"error parsing query: found 0, expected identifier at line 1, char 17"}`, }, + &Query{ + name: "create database with retention duration should error with bad retention duration", + command: `CREATE DATABASE db0 WITH DURATION xyz`, + exp: `{"error":"error parsing query: found xyz, expected duration at line 1, char 35"}`, + }, + &Query{ + name: "create database with retention replication should error with bad retention replication number", + command: `CREATE DATABASE db0 WITH REPLICATION xyz`, + exp: `{"error":"error parsing query: found xyz, expected number at line 1, char 38"}`, + }, + &Query{ + name: "create database with retention name should error with missing retention name", + command: `CREATE DATABASE db0 WITH NAME`, + exp: `{"error":"error parsing query: found EOF, expected identifier at line 1, char 31"}`, + }, &Query{ name: "show database should succeed", command: `SHOW DATABASES`, - exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"]]}]}]}`, }, &Query{ name: "create database should error if it already exists", command: `CREATE DATABASE db0`, exp: `{"results":[{"error":"database already exists"}]}`, }, + &Query{ + name: "create database should error if it already exists", + command: `CREATE DATABASE db0_r`, + exp: `{"results":[{"error":"database already exists"}]}`, + }, &Query{ name: "create database should not error with existing database with IF NOT EXISTS", command: `CREATE DATABASE IF NOT EXISTS db0`, @@ -63,16 +88,31 @@ func TestServer_DatabaseCommands(t *testing.T) { command: `CREATE DATABASE IF NOT EXISTS db1`, exp: `{"results":[{}]}`, }, + &Query{ + name: "create database with retention duration should not error with existing database with IF NOT EXISTS", + command: `CREATE DATABASE IF NOT EXISTS db1 WITH DURATION 24h`, + exp: `{"results":[{}]}`, + }, + &Query{ + name: "create database should error IF NOT EXISTS with bad retention duration", + command: `CREATE DATABASE IF NOT EXISTS db1 WITH DURATION xyz`, + exp: `{"error":"error parsing query: found xyz, expected duration at line 1, char 49"}`, + }, &Query{ name: "show database should succeed", command: `SHOW DATABASES`, - exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db1"]]}]}]}`, + exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"],["db1"]]}]}]}`, }, &Query{ name: "drop database db0 should succeed", command: `DROP DATABASE db0`, exp: `{"results":[{}]}`, }, + &Query{ + name: "drop database db0_r should succeed", + command: `DROP DATABASE db0_r`, + exp: `{"results":[{}]}`, + }, &Query{ name: "drop database db1 should succeed", command: `DROP DATABASE db1`, diff --git a/influxql/ast.go b/influxql/ast.go index ef7eeab8415..ba26bd062a9 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -320,6 +320,18 @@ type CreateDatabaseStatement struct { // IfNotExists indicates whether to return without error if the database // already exists. IfNotExists bool + + // RetentionPolicyCreate indicates whether the user explicitly wants to create a retention policy + RetentionPolicyCreate bool + + // RetentionPolicyDuration indicates retention duration for the new database + RetentionPolicyDuration time.Duration + + // RetentionPolicyReplication indicates retention replication for the new database + RetentionPolicyReplication int + + // RetentionPolicyName indicates retention name for the new database + RetentionPolicyName string } // String returns a string representation of the create database statement. @@ -330,6 +342,15 @@ func (s *CreateDatabaseStatement) String() string { _, _ = buf.WriteString("IF NOT EXISTS ") } _, _ = buf.WriteString(QuoteIdent(s.Name)) + if s.RetentionPolicyCreate { + _, _ = buf.WriteString("WITH DURATION ") + _, _ = buf.WriteString(s.RetentionPolicyDuration.String()) + _, _ = buf.WriteString("REPLICATION ") + _, _ = buf.WriteString(strconv.Itoa(s.RetentionPolicyReplication)) + _, _ = buf.WriteString("NAME ") + _, _ = buf.WriteString(QuoteIdent(s.RetentionPolicyName)) + } + return buf.String() } diff --git a/influxql/parser.go b/influxql/parser.go index 1ba15ea0dfc..8c90dee98e7 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -1457,6 +1457,56 @@ func (p *Parser) parseCreateDatabaseStatement() (*CreateDatabaseStatement, error } stmt.Name = lit + // Look for "WITH" + if tok, _, _ := p.scanIgnoreWhitespace(); tok == WITH { + // validate that at least one of DURATION, REPLICATION or NAME is provided + tok, pos, lit := p.scanIgnoreWhitespace() + if tok != DURATION && tok != REPLICATION && tok != NAME { + return nil, newParseError(tokstr(tok, lit), []string{"DURATION", "REPLICATION", "NAME"}, pos) + } + // rewind + p.unscan() + + // mark statement as having a RetentionPolicyInfo defined + stmt.RetentionPolicyCreate = true + + // Look for "DURATION" + var rpDuration time.Duration // default is forever + if err := p.parseTokens([]Token{DURATION}); err != nil { + p.unscan() + } else { + rpDuration, err = p.parseDuration() + if err != nil { + return nil, err + } + } + stmt.RetentionPolicyDuration = rpDuration + + // Look for "REPLICATION" + var rpReplication int = 1 // default is 1 + if err := p.parseTokens([]Token{REPLICATION}); err != nil { + p.unscan() + } else { + rpReplication, err = p.parseInt(1, math.MaxInt32) + if err != nil { + return nil, err + } + } + stmt.RetentionPolicyReplication = rpReplication + + // Look for "NAME" + var rpName string = "default" // default is default + if err := p.parseTokens([]Token{NAME}); err == nil { + rpName, err = p.parseIdent() + if err != nil { + return nil, err + } + } + stmt.RetentionPolicyName = rpName + } else { + p.unscan() + } + return stmt, nil } diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 509209b7ce5..87d6bd134f5 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -1176,15 +1176,105 @@ func TestParser_ParseStatement(t *testing.T) { { s: `CREATE DATABASE testdb`, stmt: &influxql.CreateDatabaseStatement{ - Name: "testdb", - IfNotExists: false, + Name: "testdb", + IfNotExists: false, + RetentionPolicyCreate: false, }, }, { s: `CREATE DATABASE IF NOT EXISTS testdb`, stmt: &influxql.CreateDatabaseStatement{ - Name: "testdb", - IfNotExists: true, + Name: "testdb", + IfNotExists: true, + RetentionPolicyCreate: false, + }, + }, + { + s: `CREATE DATABASE testdb WITH DURATION 24h`, + stmt: &influxql.CreateDatabaseStatement{ + Name: "testdb", + IfNotExists: false, + RetentionPolicyCreate: true, + RetentionPolicyDuration: 24 * time.Hour, + RetentionPolicyReplication: 1, + RetentionPolicyName: "default", + }, + }, + { + s: `CREATE DATABASE IF NOT EXISTS testdb WITH DURATION 24h`, + stmt: &influxql.CreateDatabaseStatement{ + Name: "testdb", + IfNotExists: true, + RetentionPolicyCreate: true, + RetentionPolicyDuration: 24 * time.Hour, + RetentionPolicyReplication: 1, + RetentionPolicyName: "default", + }, + }, + { + s: `CREATE DATABASE testdb WITH REPLICATION 2`, + stmt: &influxql.CreateDatabaseStatement{ + Name: "testdb", + IfNotExists: false, + RetentionPolicyCreate: true, + RetentionPolicyDuration: 0, + RetentionPolicyReplication: 2, + RetentionPolicyName: "default", + }, + }, + { + s: `CREATE DATABASE IF NOT EXISTS testdb WITH REPLICATION 2`, + stmt: &influxql.CreateDatabaseStatement{ + Name: "testdb", + IfNotExists: true, + RetentionPolicyCreate: true, + RetentionPolicyDuration: 0, + RetentionPolicyReplication: 2, + RetentionPolicyName: "default", + }, + }, + { + s: `CREATE DATABASE testdb WITH NAME test_name`, + stmt: &influxql.CreateDatabaseStatement{ + Name: "testdb", + IfNotExists: false, + RetentionPolicyCreate: true, + RetentionPolicyDuration: 0, + RetentionPolicyReplication: 1, + RetentionPolicyName: "test_name", + }, + }, + { + s: `CREATE DATABASE IF NOT EXISTS testdb WITH NAME test_name`, + stmt: &influxql.CreateDatabaseStatement{ + Name: "testdb", + IfNotExists: true, + RetentionPolicyCreate: true, + RetentionPolicyDuration: 0, + RetentionPolicyReplication: 1, + RetentionPolicyName: "test_name", + }, + }, + { + s: `CREATE DATABASE testdb WITH DURATION 24h REPLICATION 2 NAME test_name`, + stmt: &influxql.CreateDatabaseStatement{ + Name: "testdb", + IfNotExists: false, + RetentionPolicyCreate: true, + RetentionPolicyDuration: 24 * time.Hour, + RetentionPolicyReplication: 2, + RetentionPolicyName: "test_name", + }, + }, + { + s: `CREATE DATABASE IF NOT EXISTS testdb WITH DURATION 24h REPLICATION 2 NAME test_name`, + stmt: &influxql.CreateDatabaseStatement{ + Name: "testdb", + IfNotExists: true, + RetentionPolicyCreate: true, + RetentionPolicyDuration: 24 * time.Hour, + RetentionPolicyReplication: 2, + RetentionPolicyName: "test_name", }, }, @@ -1612,9 +1702,17 @@ func TestParser_ParseStatement(t *testing.T) { {s: `DROP FOO`, err: `found FOO, expected SERIES, CONTINUOUS, MEASUREMENT, SERVER, SUBSCRIPTION at line 1, char 6`}, {s: `CREATE FOO`, err: `found FOO, expected CONTINUOUS, DATABASE, USER, RETENTION, SUBSCRIPTION at line 1, char 8`}, {s: `CREATE DATABASE`, err: `found EOF, expected identifier at line 1, char 17`}, + {s: `CREATE DATABASE "testdb" WITH`, err: `found EOF, expected DURATION, REPLICATION, NAME at line 1, char 31`}, + {s: `CREATE DATABASE "testdb" WITH DURATION`, err: `found EOF, expected duration at line 1, char 40`}, + {s: `CREATE DATABASE "testdb" WITH REPLICATION`, err: `found EOF, expected number at line 1, char 43`}, + {s: `CREATE DATABASE "testdb" WITH NAME`, err: `found EOF, expected identifier at line 1, char 36`}, {s: `CREATE DATABASE IF`, err: `found EOF, expected NOT at line 1, char 20`}, {s: `CREATE DATABASE IF NOT`, err: `found EOF, expected EXISTS at line 1, char 24`}, {s: `CREATE DATABASE IF NOT EXISTS`, err: `found EOF, expected identifier at line 1, char 31`}, + {s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH`, err: `found EOF, expected DURATION, REPLICATION, NAME at line 1, char 45`}, + {s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH DURATION`, err: `found EOF, expected duration at line 1, char 54`}, + {s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH REPLICATION`, err: `found EOF, expected number at line 1, char 57`}, + {s: `CREATE DATABASE IF NOT EXISTS "testdb" WITH NAME`, err: `found EOF, expected identifier at line 1, char 50`}, {s: `DROP DATABASE`, err: `found EOF, expected identifier at line 1, char 15`}, {s: `DROP DATABASE IF`, err: `found EOF, expected EXISTS at line 1, char 18`}, {s: `DROP DATABASE IF EXISTS`, err: `found EOF, expected identifier at line 1, char 25`}, diff --git a/influxql/token.go b/influxql/token.go index f4644547639..655969e377c 100644 --- a/influxql/token.go +++ b/influxql/token.go @@ -97,6 +97,7 @@ const ( LIMIT MEASUREMENT MEASUREMENTS + NAME NOT OFFSET ON @@ -215,6 +216,7 @@ var tokens = [...]string{ LIMIT: "LIMIT", MEASUREMENT: "MEASUREMENT", MEASUREMENTS: "MEASUREMENTS", + NAME: "NAME", NOT: "NOT", OFFSET: "OFFSET", ON: "ON", diff --git a/meta/statement_executor.go b/meta/statement_executor.go index 73672a56643..4150f5514f9 100644 --- a/meta/statement_executor.go +++ b/meta/statement_executor.go @@ -22,6 +22,7 @@ type StatementExecutor struct { Database(name string) (*DatabaseInfo, error) Databases() ([]DatabaseInfo, error) CreateDatabase(name string) (*DatabaseInfo, error) + CreateDatabaseWithRetentionPolicy(name string, rpi *RetentionPolicyInfo) (*DatabaseInfo, error) DropDatabase(name string) error DefaultRetentionPolicy(database string) (*RetentionPolicyInfo, error) @@ -110,10 +111,19 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql. } func (e *StatementExecutor) executeCreateDatabaseStatement(q *influxql.CreateDatabaseStatement) *influxql.Result { - _, err := e.Store.CreateDatabase(q.Name) + var err error + if q.RetentionPolicyCreate { + rpi := NewRetentionPolicyInfo(q.RetentionPolicyName) + rpi.Duration = q.RetentionPolicyDuration + rpi.ReplicaN = q.RetentionPolicyReplication + _, err = e.Store.CreateDatabaseWithRetentionPolicy(q.Name, rpi) + } else { + _, err = e.Store.CreateDatabase(q.Name) + } if err == ErrDatabaseExists && q.IfNotExists { err = nil } + return &influxql.Result{Err: err} } diff --git a/meta/statement_executor_test.go b/meta/statement_executor_test.go index 0df8e63174e..52a63a64fb2 100644 --- a/meta/statement_executor_test.go +++ b/meta/statement_executor_test.go @@ -1089,33 +1089,34 @@ func NewStatementExecutor() *StatementExecutor { // StatementExecutorStore represents a mock implementation of StatementExecutor.Store. type StatementExecutorStore struct { - NodeFn func(id uint64) (*meta.NodeInfo, error) - NodesFn func() ([]meta.NodeInfo, error) - PeersFn func() ([]string, error) - LeaderFn func() string - DatabaseFn func(name string) (*meta.DatabaseInfo, error) - DatabasesFn func() ([]meta.DatabaseInfo, error) - CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error) - DropDatabaseFn func(name string) error - DeleteNodeFn func(nodeID uint64, force bool) error - DefaultRetentionPolicyFn func(database string) (*meta.RetentionPolicyInfo, error) - CreateRetentionPolicyFn func(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) - UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) error - SetDefaultRetentionPolicyFn func(database, name string) error - DropRetentionPolicyFn func(database, name string) error - UsersFn func() ([]meta.UserInfo, error) - CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error) - UpdateUserFn func(name, password string) error - DropUserFn func(name string) error - SetPrivilegeFn func(username, database string, p influxql.Privilege) error - SetAdminPrivilegeFn func(username string, admin bool) error - UserPrivilegesFn func(username string) (map[string]influxql.Privilege, error) - UserPrivilegeFn func(username, database string) (*influxql.Privilege, error) - ContinuousQueriesFn func() ([]meta.ContinuousQueryInfo, error) - CreateContinuousQueryFn func(database, name, query string) error - DropContinuousQueryFn func(database, name string) error - CreateSubscriptionFn func(database, rp, name, typ string, hosts []string) error - DropSubscriptionFn func(database, rp, name string) error + NodeFn func(id uint64) (*meta.NodeInfo, error) + NodesFn func() ([]meta.NodeInfo, error) + PeersFn func() ([]string, error) + LeaderFn func() string + DatabaseFn func(name string) (*meta.DatabaseInfo, error) + DatabasesFn func() ([]meta.DatabaseInfo, error) + CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error) + CreateDatabaseWithRetentionPolicyFn func(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) + DropDatabaseFn func(name string) error + DeleteNodeFn func(nodeID uint64, force bool) error + DefaultRetentionPolicyFn func(database string) (*meta.RetentionPolicyInfo, error) + CreateRetentionPolicyFn func(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) + UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) error + SetDefaultRetentionPolicyFn func(database, name string) error + DropRetentionPolicyFn func(database, name string) error + UsersFn func() ([]meta.UserInfo, error) + CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error) + UpdateUserFn func(name, password string) error + DropUserFn func(name string) error + SetPrivilegeFn func(username, database string, p influxql.Privilege) error + SetAdminPrivilegeFn func(username string, admin bool) error + UserPrivilegesFn func(username string) (map[string]influxql.Privilege, error) + UserPrivilegeFn func(username, database string) (*influxql.Privilege, error) + ContinuousQueriesFn func() ([]meta.ContinuousQueryInfo, error) + CreateContinuousQueryFn func(database, name, query string) error + DropContinuousQueryFn func(database, name string) error + CreateSubscriptionFn func(database, rp, name, typ string, hosts []string) error + DropSubscriptionFn func(database, rp, name string) error } func (s *StatementExecutorStore) Node(id uint64) (*meta.NodeInfo, error) { @@ -1153,6 +1154,10 @@ func (s *StatementExecutorStore) CreateDatabase(name string) (*meta.DatabaseInfo return s.CreateDatabaseFn(name) } +func (s *StatementExecutorStore) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) { + return s.CreateDatabaseWithRetentionPolicy(name, rpi) +} + func (s *StatementExecutorStore) DropDatabase(name string) error { return s.DropDatabaseFn(name) } diff --git a/meta/store.go b/meta/store.go index f59b7f4742a..ff1fa6aeefc 100644 --- a/meta/store.go +++ b/meta/store.go @@ -1037,6 +1037,29 @@ func (s *Store) CreateDatabase(name string) (*DatabaseInfo, error) { return s.Database(name) } +// CreateDatabaseWithRetentionPolicy creates a new database with an explicit retention policy in the store. +func (s *Store) CreateDatabaseWithRetentionPolicy(name string, rpi *RetentionPolicyInfo) (*DatabaseInfo, error) { + if err := s.exec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command, + &internal.CreateDatabaseCommand{ + Name: proto.String(name), + }, + ); err != nil { + return nil, err + } + s.Logger.Printf("database '%s' created", name) + + if _, err := s.CreateRetentionPolicy(name, rpi); err != nil { + return nil, err + } + + // Set it as the default retention policy. + if err := s.SetDefaultRetentionPolicy(name, rpi.Name); err != nil { + return nil, err + } + + return s.Database(name) +} + // CreateDatabaseIfNotExists creates a new database in the store if it doesn't already exist. func (s *Store) CreateDatabaseIfNotExists(name string) (*DatabaseInfo, error) { // Try to find database locally first.