From 0f90819536c2ad7d1347e31f0f11354af9bab815 Mon Sep 17 00:00:00 2001 From: Michael Gattozzi Date: Tue, 17 Dec 2024 13:40:47 -0500 Subject: [PATCH] feat: create DB and Tables via REST and CLI This commit does a few things: 1. It brings the database command naming scheme for types inline with the rest of the CLI types 2. It brings the table command naming scheme for types inline with the rest of the CLI types 3. Adds tests to check that the num of dbs is not exceeded and that you cannot create more than one database with a given name. 4. Adds tests to check that you can create a table and put data into it and querying it 5. Adds tests for the CLI for both the database and table commands 6. It creates an endpoint to create databases given a JSON blob 7. It creates an endpoint to create tables given a JSON blob With this users can now create a database or table without first needing to write to the database via the line protocol! Closes #25640 Closes #25641 --- influxdb3/src/commands/manage/database.rs | 22 +- influxdb3/src/commands/manage/table.rs | 119 ++++++++- influxdb3/src/main.rs | 12 +- influxdb3/tests/server/cli.rs | 138 +++++++++++ influxdb3/tests/server/configure.rs | 283 ++++++++++++++++++++++ influxdb3_catalog/src/catalog.rs | 10 + influxdb3_client/src/lib.rs | 85 +++++++ influxdb3_server/src/http.rs | 66 +++++ influxdb3_write/src/lib.rs | 8 + influxdb3_write/src/write_buffer/mod.rs | 124 ++++++++++ 10 files changed, 849 insertions(+), 18 deletions(-) diff --git a/influxdb3/src/commands/manage/database.rs b/influxdb3/src/commands/manage/database.rs index 72f92d97351..ae519d08f96 100644 --- a/influxdb3/src/commands/manage/database.rs +++ b/influxdb3/src/commands/manage/database.rs @@ -5,13 +5,14 @@ use secrecy::ExposeSecret; use crate::commands::common::InfluxDb3Config; #[derive(Debug, clap::Parser)] -pub(crate) struct ManageDatabaseConfig { +pub(crate) struct Config { #[clap(subcommand)] command: Command, } #[derive(Debug, clap::Parser)] enum Command { + Create(DatabaseConfig), Delete(DatabaseConfig), } @@ -21,8 +22,25 @@ pub struct DatabaseConfig { influxdb3_config: InfluxDb3Config, } -pub async fn delete_database(config: ManageDatabaseConfig) -> Result<(), Box> { +pub async fn command(config: Config) -> Result<(), Box> { match config.command { + Command::Create(config) => { + let InfluxDb3Config { + host_url, + database_name, + auth_token, + } = config.influxdb3_config; + + let mut client = influxdb3_client::Client::new(host_url)?; + + if let Some(t) = auth_token { + client = client.with_auth_token(t.expose_secret()); + } + + client.api_v3_configure_db_create(&database_name).await?; + + println!("Database {:?} created successfully", &database_name); + } Command::Delete(config) => { let InfluxDb3Config { host_url, diff --git a/influxdb3/src/commands/manage/table.rs b/influxdb3/src/commands/manage/table.rs index da98496e3ba..ea952cb8c43 100644 --- a/influxdb3/src/commands/manage/table.rs +++ b/influxdb3/src/commands/manage/table.rs @@ -1,31 +1,71 @@ -use std::{error::Error, io}; +use std::{error::Error, fmt::Display, io, str::FromStr}; use secrecy::ExposeSecret; use crate::commands::common::InfluxDb3Config; #[derive(Debug, clap::Parser)] -pub(crate) struct ManageTableConfig { +pub(crate) struct Config { #[clap(subcommand)] command: Command, } #[derive(Debug, clap::Parser)] enum Command { - Delete(TableConfig), + Create(CreateTableConfig), + Delete(DeleteTableConfig), } #[derive(Debug, clap::Parser)] -pub struct TableConfig { - #[clap(short = 't', long = "table")] - table: String, +pub struct DeleteTableConfig { + #[clap(short = 't', long = "table", required = true)] + table_name: String, #[clap(flatten)] influxdb3_config: InfluxDb3Config, } -pub async fn delete_table(config: ManageTableConfig) -> Result<(), Box> { +#[derive(Debug, clap::Parser)] +pub struct CreateTableConfig { + #[clap(short = 't', long = "table", required = true)] + table_name: String, + + #[clap(long = "tags", required = true, num_args=0..)] + tags: Vec, + + #[clap(short = 'f', long = "fields", value_parser = parse_key_val::, required = true, num_args=0..)] + fields: Vec<(String, DataType)>, + + #[clap(flatten)] + influxdb3_config: InfluxDb3Config, +} + +pub async fn command(config: Config) -> Result<(), Box> { match config.command { + Command::Create(CreateTableConfig { + table_name, + tags, + fields, + influxdb3_config: + InfluxDb3Config { + host_url, + database_name, + auth_token, + }, + }) => { + let mut client = influxdb3_client::Client::new(host_url)?; + if let Some(t) = auth_token { + client = client.with_auth_token(t.expose_secret()); + } + client + .api_v3_configure_table_create(&database_name, &table_name, tags, fields) + .await?; + + println!( + "Table {:?}.{:?} created successfully", + &database_name, &table_name + ); + } Command::Delete(config) => { let InfluxDb3Config { host_url, @@ -34,7 +74,7 @@ pub async fn delete_table(config: ManageTableConfig) -> Result<(), Box Result<(), Box Result { + match s { + "int64" => Ok(Self::Int64), + "uint64" => Ok(Self::Uint64), + "float64" => Ok(Self::Float64), + "utf8" => Ok(Self::Utf8), + "bool" => Ok(Self::Bool), + _ => Err(ParseDataTypeError(s.into())), + } + } +} + +impl Display for DataType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Int64 => write!(f, "int64"), + Self::Uint64 => write!(f, "uint64"), + Self::Float64 => write!(f, "float64"), + Self::Utf8 => write!(f, "utf8"), + Self::Bool => write!(f, "bool"), + } + } +} + +impl From for String { + fn from(data: DataType) -> Self { + data.to_string() + } +} + +/// Parse a single key-value pair +fn parse_key_val(s: &str) -> Result<(T, U), Box> +where + T: std::str::FromStr, + T::Err: Error + Send + Sync + 'static, + U: std::str::FromStr, + U::Err: Error + Send + Sync + 'static, +{ + let pos = s + .find(':') + .ok_or_else(|| format!("invalid FIELD:VALUE. No `:` found in `{s}`"))?; + Ok((s[..pos].parse()?, s[pos + 1..].parse()?)) +} diff --git a/influxdb3/src/main.rs b/influxdb3/src/main.rs index d96ec6df036..393d8ba9174 100644 --- a/influxdb3/src/main.rs +++ b/influxdb3/src/main.rs @@ -97,10 +97,10 @@ enum Command { MetaCache(commands::meta_cache::Config), /// Manage database (delete only for the moment) - Database(commands::manage::database::ManageDatabaseConfig), + Database(commands::manage::database::Config), /// Manage table (delete only for the moment) - Table(commands::manage::table::ManageTableConfig), + Table(commands::manage::table::Config), } fn main() -> Result<(), std::io::Error> { @@ -166,14 +166,14 @@ fn main() -> Result<(), std::io::Error> { } } Some(Command::Database(config)) => { - if let Err(e) = commands::manage::database::delete_database(config).await { - eprintln!("Database delete command failed: {e}"); + if let Err(e) = commands::manage::database::command(config).await { + eprintln!("Database command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } Some(Command::Table(config)) => { - if let Err(e) = commands::manage::table::delete_table(config).await { - eprintln!("Table delete command failed: {e}"); + if let Err(e) = commands::manage::table::command(config).await { + eprintln!("Table command failed: {e}"); std::process::exit(ReturnCode::Failure as _) } } diff --git a/influxdb3/tests/server/cli.rs b/influxdb3/tests/server/cli.rs index 3826cf6800c..6bf6540bc2c 100644 --- a/influxdb3/tests/server/cli.rs +++ b/influxdb3/tests/server/cli.rs @@ -6,6 +6,8 @@ use std::{ use assert_cmd::cargo::CommandCargoExt; use observability_deps::tracing::debug; +use pretty_assertions::assert_eq; +use serde_json::{json, Value}; use test_helpers::assert_contains; use crate::TestServer; @@ -77,6 +79,57 @@ pub fn run_with_confirmation_and_err(args: &[&str]) -> String { .into() } +#[test_log::test(tokio::test)] +async fn test_create_database() { + let server = TestServer::spawn().await; + let server_addr = server.client_addr(); + let db_name = "foo"; + let result = run_with_confirmation(&[ + "database", + "create", + "--dbname", + db_name, + "--host", + &server_addr, + ]); + debug!(result = ?result, "create database"); + assert_contains!(&result, "Database \"foo\" created successfully"); +} + +#[test_log::test(tokio::test)] +async fn test_create_database_limit() { + let server = TestServer::spawn().await; + let server_addr = server.client_addr(); + let db_name = "foo"; + for i in 0..5 { + let name = format!("{db_name}{i}"); + let result = run_with_confirmation(&[ + "database", + "create", + "--dbname", + &name, + "--host", + &server_addr, + ]); + debug!(result = ?result, "create database"); + assert_contains!(&result, format!("Database \"{name}\" created successfully")); + } + + let result = run_with_confirmation_and_err(&[ + "database", + "create", + "--dbname", + "foo5", + "--host", + &server_addr, + ]); + debug!(result = ?result, "create database"); + assert_contains!( + &result, + "Adding a new database would exceed limit of 5 databases" + ); +} + #[test_log::test(tokio::test)] async fn test_delete_database() { let server = TestServer::spawn().await; @@ -119,6 +172,91 @@ async fn test_delete_missing_database() { assert_contains!(&result, "404"); } +#[test_log::test(tokio::test)] +async fn test_create_table() { + let server = TestServer::spawn().await; + let server_addr = server.client_addr(); + let db_name = "foo"; + let table_name = "bar"; + let result = run_with_confirmation(&[ + "database", + "create", + "--dbname", + db_name, + "--host", + &server_addr, + ]); + debug!(result = ?result, "create database"); + assert_contains!(&result, "Database \"foo\" created successfully"); + let result = run_with_confirmation(&[ + "table", + "create", + "--dbname", + db_name, + "--table", + table_name, + "--host", + &server_addr, + "--tags", + "one", + "two", + "three", + "--fields", + "four:utf8", + "five:uint64", + "six:float64", + "seven:int64", + "eight:bool", + ]); + debug!(result = ?result, "create table"); + assert_contains!(&result, "Table \"foo\".\"bar\" created successfully"); + // Check that we can query the table and that it has no values + let result = server + .api_v3_query_sql(&[ + ("db", "foo"), + ("q", "SELECT * FROM bar"), + ("format", "json"), + ]) + .await + .json::() + .await + .unwrap(); + assert_eq!(result, json!([])); + server + .write_lp_to_db( + db_name, + format!("{table_name},one=1,two=2,three=3 four=\"4\",five=5u,six=6,seven=7i,eight=true 1000"), + influxdb3_client::Precision::Second, + ) + .await + .expect("write to db"); + // Check that we can get data from the table + let result = server + .api_v3_query_sql(&[ + ("db", "foo"), + ("q", "SELECT * FROM bar"), + ("format", "json"), + ]) + .await + .json::() + .await + .unwrap(); + assert_eq!( + result, + json!([{ + "one": "1", + "two": "2", + "three": "3", + "four": "4", + "five": 5, + "six": 6.0, + "seven": 7, + "eight": true, + "time": "1970-01-01T00:16:40" + }]) + ); +} + #[test_log::test(tokio::test)] async fn test_delete_table() { let server = TestServer::spawn().await; diff --git a/influxdb3/tests/server/configure.rs b/influxdb3/tests/server/configure.rs index 0551b43dca7..2776e045dd4 100644 --- a/influxdb3/tests/server/configure.rs +++ b/influxdb3/tests/server/configure.rs @@ -885,6 +885,289 @@ async fn api_v3_configure_db_delete_missing_query_param() { assert_eq!(StatusCode::BAD_REQUEST, resp.status()); } +#[test_log::test(tokio::test)] +async fn api_v3_configure_db_create() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let url = format!( + "{base}/api/v3/configure/database", + base = server.client_addr() + ); + + let resp = client + .post(&url) + .json(&json!({ "db": "foo" })) + .send() + .await + .expect("delete database call succeed"); + assert_eq!(StatusCode::OK, resp.status()); +} + +#[test_log::test(tokio::test)] +async fn api_v3_configure_db_create_db_with_same_name() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let url = format!( + "{base}/api/v3/configure/database", + base = server.client_addr() + ); + + let resp = client + .post(&url) + .json(&json!({ "db": "foo" })) + .send() + .await + .expect("create database call did not succeed"); + assert_eq!(StatusCode::OK, resp.status()); + + let resp = client + .post(&url) + .json(&json!({ "db": "foo" })) + .send() + .await + .expect("delete database call succeed"); + assert_eq!(StatusCode::BAD_REQUEST, resp.status()); +} + +#[test_log::test(tokio::test)] +async fn api_v3_configure_db_create_db_hit_limit() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let url = format!( + "{base}/api/v3/configure/database", + base = server.client_addr() + ); + for i in 0..5 { + let resp = client + .post(&url) + .json(&json!({ "db": format!("foo{i}") })) + .send() + .await + .expect("create database call did not succeed"); + assert_eq!(StatusCode::OK, resp.status()); + } + + let resp = client + .post(&url) + .json(&json!({ "db": "foo5" })) + .send() + .await + .expect("create database succeeded"); + assert_eq!(StatusCode::UNPROCESSABLE_ENTITY, resp.status()); +} + +#[test_log::test(tokio::test)] +async fn api_v3_configure_db_create_db_reuse_old_name() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let url = format!( + "{base}/api/v3/configure/database", + base = server.client_addr() + ); + let resp = client + .post(&url) + .json(&json!({ "db": "foo" })) + .send() + .await + .expect("create database call did not succeed"); + assert_eq!(StatusCode::OK, resp.status()); + let resp = client + .delete(format!("{url}?db=foo")) + .send() + .await + .expect("delete database call did not succeed"); + assert_eq!(StatusCode::OK, resp.status()); + let resp = client + .post(&url) + .json(&json!({ "db": "foo" })) + .send() + .await + .expect("create database call did not succeed"); + assert_eq!(StatusCode::OK, resp.status()); +} + +#[test_log::test(tokio::test)] +async fn api_v3_configure_table_create_then_write() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let db_url = format!( + "{base}/api/v3/configure/database", + base = server.client_addr() + ); + let table_url = format!("{base}/api/v3/configure/table", base = server.client_addr()); + + let resp = client + .post(&db_url) + .json(&json!({ "db": "foo" })) + .send() + .await + .expect("create database call did not succeed"); + assert_eq!(StatusCode::OK, resp.status()); + + let resp = client + .post(&table_url) + .json(&json!({ + "db": "foo" , + "table": "bar", + "tags": ["tag1", "tag2"], + "fields": [ + { + "name": "field1", + "type": "uint64" + }, + { + "name": "field2", + "type": "int64" + }, + { + "name": "field3", + "type": "float64" + }, + { + "name": "field4", + "type": "utf8" + }, + { + "name": "field5", + "type": "bool" + } + ] + + })) + .send() + .await + .expect("create table call failed"); + assert_eq!(StatusCode::OK, resp.status()); + let result = server + .api_v3_query_sql(&[ + ("db", "foo"), + ("q", "SELECT * FROM bar"), + ("format", "json"), + ]) + .await + .json::() + .await + .unwrap(); + assert_eq!(result, json!([])); + server + .write_lp_to_db( + "foo", + "bar,tag1=1,tag2=2 field1=1u,field2=2i,field3=3,field4=\"4\",field5=true 1000", + influxdb3_client::Precision::Second, + ) + .await + .expect("write to db"); + let result = server + .api_v3_query_sql(&[ + ("db", "foo"), + ("q", "SELECT * FROM bar"), + ("format", "json"), + ]) + .await + .json::() + .await + .unwrap(); + assert_eq!( + result, + json!([{ + "tag1": "1", + "tag2": "2", + "field1": 1, + "field2": 2, + "field3": 3.0, + "field4": "4", + "field5": true, + "time": "1970-01-01T00:16:40" + }]) + ); +} + +#[test_log::test(tokio::test)] +async fn api_v3_configure_table_create_no_tags() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let db_url = format!( + "{base}/api/v3/configure/database", + base = server.client_addr() + ); + let table_url = format!("{base}/api/v3/configure/table", base = server.client_addr()); + + let resp = client + .post(&db_url) + .json(&json!({ "db": "foo" })) + .send() + .await + .expect("create database call did not succeed"); + assert_eq!(StatusCode::OK, resp.status()); + + let resp = client + .post(&table_url) + .json(&json!({ + "db": "foo" , + "table": "bar", + "tags": [], + "fields": [ + { + "name": "field1", + "type": "uint64" + }, + { + "name": "field2", + "type": "int64" + }, + { + "name": "field3", + "type": "float64" + }, + { + "name": "field4", + "type": "utf8" + }, + { + "name": "field5", + "type": "bool" + } + ] + + })) + .send() + .await + .expect("create table call failed"); + assert_eq!(StatusCode::UNPROCESSABLE_ENTITY, resp.status()); +} + +#[test_log::test(tokio::test)] +async fn api_v3_configure_table_create_no_fields() { + let server = TestServer::spawn().await; + let client = reqwest::Client::new(); + let db_url = format!( + "{base}/api/v3/configure/database", + base = server.client_addr() + ); + let table_url = format!("{base}/api/v3/configure/table", base = server.client_addr()); + + let resp = client + .post(&db_url) + .json(&json!({ "db": "foo" })) + .send() + .await + .expect("create database call did not succeed"); + assert_eq!(StatusCode::OK, resp.status()); + + let resp = client + .post(&table_url) + .json(&json!({ + "db": "foo" , + "table": "bar", + "tags": ["one", "two"], + "fields": [] + })) + .send() + .await + .expect("create table call failed"); + assert_eq!(StatusCode::UNPROCESSABLE_ENTITY, resp.status()); +} + #[test_log::test(tokio::test)] async fn api_v3_configure_table_delete() { let db_name = "foo"; diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 611c0892eb4..6b4ce4bd497 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -586,6 +586,16 @@ impl DatabaseSchema { catalog_batch.database_id, Arc::clone(&catalog_batch.database_name), ); + + // We need to special case when we create a DB via the commandline as + // this will only contain one op to create a database. If we don't + // startup of the database will fail + if catalog_batch.ops.len() == 1 { + if let CatalogOp::CreateDatabase(_) = catalog_batch.ops[0] { + return Ok(db_schema); + } + } + let new_db = DatabaseSchema::new_if_updated_from_batch(&db_schema, catalog_batch)? .expect("database must be new"); Ok(new_db) diff --git a/influxdb3_client/src/lib.rs b/influxdb3_client/src/lib.rs index 3ed17f95ff4..dc7b851a390 100644 --- a/influxdb3_client/src/lib.rs +++ b/influxdb3_client/src/lib.rs @@ -326,6 +326,36 @@ impl Client { } } + /// Make a request to the `POST /api/v3/configure/database` API + pub async fn api_v3_configure_db_create(&self, db: impl Into + Send) -> Result<()> { + let api_path = "/api/v3/configure/database"; + + let url = self.base_url.join(api_path)?; + + #[derive(Serialize)] + struct Req { + db: String, + } + + let mut req = self.http_client.post(url).json(&Req { db: db.into() }); + + if let Some(token) = &self.auth_token { + req = req.bearer_auth(token.expose_secret()); + } + let resp = req + .send() + .await + .map_err(|src| Error::request_send(Method::POST, api_path, src))?; + let status = resp.status(); + match status { + StatusCode::OK => Ok(()), + code => Err(Error::ApiError { + code, + message: resp.text().await.map_err(Error::Text)?, + }), + } + } + /// Make a request to the `DELETE /api/v3/configure/database?db=foo` API pub async fn api_v3_configure_db_delete(&self, db: impl AsRef + Send) -> Result<()> { let api_path = "/api/v3/configure/database"; @@ -381,6 +411,61 @@ impl Client { } } + /// Make a request to the `POST /api/v3/configure/table` API + pub async fn api_v3_configure_table_create( + &self, + db: impl Into + Send, + table: impl Into + Send, + tags: Vec + Send>, + fields: Vec<(impl Into + Send, impl Into + Send)>, + ) -> Result<()> { + let api_path = "/api/v3/configure/table"; + + let url = self.base_url.join(api_path)?; + + #[derive(Serialize)] + struct Req { + db: String, + table: String, + tags: Vec, + fields: Vec, + } + #[derive(Serialize)] + struct Field { + name: String, + r#type: String, + } + + let mut req = self.http_client.post(url).json(&Req { + db: db.into(), + table: table.into(), + tags: tags.into_iter().map(Into::into).collect(), + fields: fields + .into_iter() + .map(|(name, r#type)| Field { + name: name.into(), + r#type: r#type.into(), + }) + .collect(), + }); + + if let Some(token) = &self.auth_token { + req = req.bearer_auth(token.expose_secret()); + } + let resp = req + .send() + .await + .map_err(|src| Error::request_send(Method::POST, api_path, src))?; + let status = resp.status(); + match status { + StatusCode::OK => Ok(()), + code => Err(Error::ApiError { + code, + message: resp.text().await.map_err(Error::Text)?, + }), + } + } + /// Send a `/ping` request to the target `influxdb3` server to check its /// status and gather `version` and `revision` information pub async fn ping(&self) -> Result { diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 883898c6269..d6f9cdeca48 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -246,6 +246,18 @@ impl Error { .status(StatusCode::NOT_FOUND) .body(Body::from(err.to_string())) .unwrap(), + Self::WriteBuffer(err @ WriteBufferError::DatabaseExists(_)) => Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from(err.to_string())) + .unwrap(), + Self::WriteBuffer(err @ WriteBufferError::EmptyTagSet) => Response::builder() + .status(StatusCode::UNPROCESSABLE_ENTITY) + .body(Body::from(err.to_string())) + .unwrap(), + Self::WriteBuffer(err @ WriteBufferError::EmptyFields) => Response::builder() + .status(StatusCode::UNPROCESSABLE_ENTITY) + .body(Body::from(err.to_string())) + .unwrap(), Self::WriteBuffer(WriteBufferError::CatalogUpdateError( err @ (CatalogError::TooManyDbs | CatalogError::TooManyColumns @@ -1001,6 +1013,15 @@ where .body(Body::empty())?) } + async fn create_database(&self, req: Request) -> Result> { + let CreateDatabaseRequest { db } = self.read_body_json(req).await?; + self.write_buffer.create_database(db).await?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty()) + .unwrap()) + } + async fn delete_database(&self, req: Request) -> Result> { let query = req.uri().query().unwrap_or(""); let delete_req = serde_urlencoded::from_str::(query)?; @@ -1013,6 +1034,30 @@ where .unwrap()) } + async fn create_table(&self, req: Request) -> Result> { + let CreateTableRequest { + db, + table, + tags, + fields, + } = self.read_body_json(req).await?; + self.write_buffer + .create_table( + db, + table, + tags, + fields + .into_iter() + .map(|field| (field.name, field.r#type)) + .collect(), + ) + .await?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty()) + .unwrap()) + } + async fn delete_table(&self, req: Request) -> Result> { let query = req.uri().query().unwrap_or(""); let delete_req = serde_urlencoded::from_str::(query)?; @@ -1383,11 +1428,30 @@ struct ProcessEngineTriggerCreateRequest { trigger_specification: TriggerSpecificationDefinition, } +#[derive(Debug, Deserialize)] +struct CreateDatabaseRequest { + db: String, +} + #[derive(Debug, Deserialize)] struct DeleteDatabaseRequest { db: String, } +#[derive(Debug, Deserialize)] +struct CreateTableRequest { + db: String, + table: String, + tags: Vec, + fields: Vec, +} + +#[derive(Debug, Deserialize)] +struct CreateTableField { + name: String, + r#type: String, +} + #[derive(Debug, Deserialize)] struct DeleteTableRequest { db: String, @@ -1481,7 +1545,9 @@ pub(crate) async fn route_request( (Method::POST, "/api/v3/configure/processing_engine_trigger") => { http_server.configure_processing_engine_trigger(req).await } + (Method::POST, "/api/v3/configure/database") => http_server.create_database(req).await, (Method::DELETE, "/api/v3/configure/database") => http_server.delete_database(req).await, + (Method::POST, "/api/v3/configure/table") => http_server.create_table(req).await, // TODO: make table delete to use path param (DELETE db/foodb/table/bar) (Method::DELETE, "/api/v3/configure/table") => http_server.delete_table(req).await, _ => { diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index 8f624994c74..19a8c8a13be 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -62,7 +62,15 @@ pub trait WriteBuffer: /// Database manager - supports only delete operation #[async_trait::async_trait] pub trait DatabaseManager: Debug + Send + Sync + 'static { + async fn create_database(&self, name: String) -> Result<(), write_buffer::Error>; async fn soft_delete_database(&self, name: String) -> Result<(), write_buffer::Error>; + async fn create_table( + &self, + db: String, + table: String, + tags: Vec, + fields: Vec<(String, String)>, + ) -> Result<(), write_buffer::Error>; async fn soft_delete_table( &self, db_name: String, diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 243a7a8aeca..7cdf8411248 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -31,6 +31,8 @@ use influxdb3_cache::parquet_cache::ParquetCacheOracle; use influxdb3_catalog::catalog; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::{ColumnId, DbId, TableId}; +use influxdb3_wal::FieldDataType; +use influxdb3_wal::TableDefinition; use influxdb3_wal::{ object_store::WalObjectStore, DeleteDatabaseDefinition, PluginDefinition, PluginType, TriggerDefinition, TriggerSpecificationDefinition, WalContents, @@ -40,6 +42,7 @@ use influxdb3_wal::{ MetaCacheDefinition, MetaCacheDelete, Wal, WalConfig, WalFileNotifier, WalOp, }; use influxdb3_wal::{CatalogOp::CreateLastCache, DeleteTableDefinition}; +use influxdb3_wal::{DatabaseDefinition, FieldDefinition}; use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges}; use iox_query::QueryChunk; use iox_time::{Time, TimeProvider}; @@ -97,9 +100,18 @@ pub enum Error { #[error("table not found {table_name:?} in db {db_name:?}")] TableNotFound { db_name: String, table_name: String }, + // These two errors are exclusive to the table creation API + #[error("table creation failed due to no tags")] + EmptyTagSet, + #[error("table creation failed due to no fields")] + EmptyFields, + #[error("tried accessing database that does not exist")] DbDoesNotExist, + #[error("tried creating database named '{0}' that already exists")] + DatabaseExists(String), + #[error("tried accessing table that do not exist")] TableDoesNotExist, @@ -644,6 +656,30 @@ impl LastCacheManager for WriteBufferImpl { #[async_trait::async_trait] impl DatabaseManager for WriteBufferImpl { + async fn create_database(&self, name: String) -> crate::Result<(), self::Error> { + if self.catalog.db_name_to_id(&name).is_some() { + return Err(self::Error::DatabaseExists(name.clone())); + } + // Create the Database + let db_schema = self.catalog.db_or_create(&name)?; + let db_id = db_schema.id; + + let catalog_batch = CatalogBatch { + time_ns: self.time_provider.now().timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![CatalogOp::CreateDatabase(DatabaseDefinition { + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + })], + }; + self.catalog.apply_catalog_batch(&catalog_batch)?; + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + debug!(db_id = ?db_id, name = ?&db_schema.name, "successfully created database"); + Ok(()) + } + async fn soft_delete_database(&self, name: String) -> crate::Result<(), self::Error> { let (db_id, db_schema) = self.catalog @@ -669,6 +705,94 @@ impl DatabaseManager for WriteBufferImpl { debug!(db_id = ?db_id, name = ?&db_schema.name, "successfully deleted database"); Ok(()) } + async fn create_table( + &self, + db: String, + table: String, + tags: Vec, + fields: Vec<(String, String)>, + ) -> Result<(), self::Error> { + if tags.is_empty() { + return Err(self::Error::EmptyTagSet); + } + if fields.is_empty() { + return Err(self::Error::EmptyFields); + } + let (db_id, db_schema) = + self.catalog + .db_id_and_schema(&db) + .ok_or_else(|| self::Error::DatabaseNotFound { + db_name: db.to_owned(), + })?; + + let table_id = TableId::new(); + let table_name = table.into(); + let mut key = Vec::new(); + let field_definitions = { + let mut field_definitions = Vec::new(); + for tag in tags { + let id = ColumnId::new(); + key.push(id); + field_definitions.push(FieldDefinition { + name: tag.into(), + id, + data_type: FieldDataType::Tag, + }); + } + + for (name, ty) in fields { + field_definitions.push(FieldDefinition { + name: name.into(), + id: ColumnId::new(), + data_type: match ty.as_str() { + "uint64" => FieldDataType::UInteger, + "float64" => FieldDataType::Float, + "int64" => FieldDataType::Integer, + "bool" => FieldDataType::Boolean, + "utf8" => FieldDataType::String, + _ => todo!(), + }, + }); + } + + field_definitions.push(FieldDefinition { + name: "time".into(), + id: ColumnId::new(), + data_type: FieldDataType::Timestamp, + }); + + field_definitions + }; + + let catalog_table_def = TableDefinition { + database_id: db_schema.id, + database_name: Arc::clone(&db_schema.name), + table_name: Arc::clone(&table_name), + table_id, + field_definitions, + key, + }; + + let catalog_batch = CatalogBatch { + time_ns: self.time_provider.now().timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![CatalogOp::CreateTable(catalog_table_def)], + }; + self.catalog.apply_catalog_batch(&catalog_batch)?; + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + + debug!( + db_name = ?db_schema.name, + db_id = ?db_id, + table_id = ?table_id, + name = ?table_name, + "successfully created table" + ); + + Ok(()) + } async fn soft_delete_table( &self,