Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(catalog): consistent ordering of catalog operations #25690

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 136 additions & 97 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
//! Implementation of the Catalog that sits entirely in memory.

use crate::catalog::Error::{
ProcessingEngineCallExists, ProcessingEngineTriggerExists, TableNotFound,
CatalogUpdatedElsewhere, ProcessingEngineCallExists, ProcessingEngineTriggerExists,
TableNotFound,
};
use bimap::BiHashMap;
use bimap::{BiHashMap, Overwritten};
use hashbrown::HashMap;
use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_wal::{
CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition, FieldAdditions,
FieldDefinition, LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete,
PluginDefinition, TriggerDefinition,
OrderedCatalogBatch, PluginDefinition, TriggerDefinition,
};
use influxdb_line_protocol::FieldValue;
use iox_time::Time;
Expand Down Expand Up @@ -184,10 +185,26 @@ impl Catalog {
}
}

pub fn apply_catalog_batch(&self, catalog_batch: &CatalogBatch) -> Result<()> {
pub fn apply_catalog_batch(
&self,
catalog_batch: CatalogBatch,
) -> Result<Option<OrderedCatalogBatch>> {
self.inner.write().apply_catalog_batch(catalog_batch)
}

// Checks the sequence number to see if it needs to be applied.
pub fn apply_ordered_catalog_batch(
&self,
batch: OrderedCatalogBatch,
) -> Result<Option<CatalogBatch>> {
if batch.sequence_number() >= self.sequence_number().as_u32() {
if let Some(catalog_batch) = self.apply_catalog_batch(batch.batch())? {
return Ok(Some(catalog_batch.batch()));
}
}
Ok(None)
}

pub fn db_or_create(&self, db_name: &str) -> Result<Arc<DatabaseSchema>> {
let db = match self.db_schema(db_name) {
Some(db) => db,
Expand Down Expand Up @@ -259,81 +276,6 @@ impl Catalog {
self.inner.read().clone()
}

pub fn add_meta_cache(&self, db_id: DbId, table_id: TableId, meta_cache: MetaCacheDefinition) {
let mut inner = self.inner.write();
let mut db = inner
.databases
.get(&db_id)
.expect("db should exist")
.as_ref()
.clone();
let mut table = db
.table_definition_by_id(&table_id)
.expect("table should exist")
.as_ref()
.clone();
table.add_meta_cache(meta_cache);
db.insert_table(table_id, Arc::new(table));
inner.upsert_db(db);
}

pub fn remove_meta_cache(&self, db_id: &DbId, table_id: &TableId, name: &str) {
let mut inner = self.inner.write();
let mut db = inner
.databases
.get(db_id)
.expect("db should exist")
.as_ref()
.clone();
let mut table = db
.tables
.get(table_id)
.expect("table should exist")
.as_ref()
.clone();
table.remove_meta_cache(name);
db.insert_table(*table_id, Arc::new(table));
inner.upsert_db(db);
}

pub fn add_last_cache(&self, db_id: DbId, table_id: TableId, last_cache: LastCacheDefinition) {
let mut inner = self.inner.write();
let mut db = inner
.databases
.get(&db_id)
.expect("db should exist")
.as_ref()
.clone();
let mut table = db
.tables
.get(&table_id)
.expect("table should exist")
.as_ref()
.clone();
table.add_last_cache(last_cache);
db.insert_table(table_id, Arc::new(table));
inner.upsert_db(db);
}

pub fn delete_last_cache(&self, db_id: DbId, table_id: TableId, name: &str) {
let mut inner = self.inner.write();
let mut db = inner
.databases
.get(&db_id)
.expect("db should exist")
.as_ref()
.clone();
let mut table = db
.tables
.get(&table_id)
.expect("table should exist")
.as_ref()
.clone();
table.remove_last_cache(name);
db.insert_table(table_id, Arc::new(table));
inner.upsert_db(db);
}

pub fn instance_id(&self) -> Arc<str> {
Arc::clone(&self.inner.read().instance_id)
}
Expand Down Expand Up @@ -478,24 +420,31 @@ impl InnerCatalog {

/// Applies the `CatalogBatch` while validating that all updates are compatible. If updates
/// have already been applied, the sequence number and updated tracker are not updated.
pub fn apply_catalog_batch(&mut self, catalog_batch: &CatalogBatch) -> Result<()> {
pub fn apply_catalog_batch(
&mut self,
catalog_batch: CatalogBatch,
) -> Result<Option<OrderedCatalogBatch>> {
let table_count = self.table_count();

if let Some(db) = self.databases.get(&catalog_batch.database_id) {
if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, catalog_batch)? {
if let Some(new_db) = DatabaseSchema::new_if_updated_from_batch(db, &catalog_batch)? {
check_overall_table_count(Some(db), &new_db, table_count)?;
self.upsert_db(new_db);
} else {
return Ok(None);
}
} else {
if self.databases.len() >= Catalog::NUM_DBS_LIMIT {
return Err(Error::TooManyDbs);
}
let new_db = DatabaseSchema::new_from_batch(catalog_batch)?;
let new_db = DatabaseSchema::new_from_batch(&catalog_batch)?;
check_overall_table_count(None, &new_db, table_count)?;
self.upsert_db(new_db);
}

Ok(())
Ok(Some(OrderedCatalogBatch::new(
catalog_batch,
self.sequence.0,
)))
}

pub fn db_exists(&self, db_id: DbId) -> bool {
Expand Down Expand Up @@ -606,10 +555,21 @@ impl DatabaseSchema {
&mut self,
table_id: TableId,
table_def: Arc<TableDefinition>,
) -> Option<Arc<TableDefinition>> {
self.table_map
.insert(table_id, Arc::clone(&table_def.table_name));
self.tables.insert(table_id, table_def)
) -> Result<Option<Arc<TableDefinition>>> {
match self
.table_map
.insert(table_id, Arc::clone(&table_def.table_name))
{
Overwritten::Left(_, _) | Overwritten::Right(_, _) | Overwritten::Both(_, _) => {
// This will happen if another table was inserted with the same name between checking
// for existence and insertion.
// We'd like this to be automatically handled by the system,
// but for now it is better to error than get into an inconsistent state.
return Err(CatalogUpdatedElsewhere);
}
Overwritten::Neither | Overwritten::Pair(_, _) => {}
}
Ok(self.tables.insert(table_id, table_def))
}

pub fn table_schema(&self, table_name: impl Into<Arc<str>>) -> Option<Schema> {
Expand Down Expand Up @@ -732,14 +692,14 @@ impl UpdateDatabaseSchema for influxdb3_wal::TableDefinition {
if let Cow::Owned(updated_table) = existing_table.check_and_add_new_fields(self)? {
database_schema
.to_mut()
.insert_table(self.table_id, Arc::new(updated_table));
.insert_table(self.table_id, Arc::new(updated_table))?;
}
}
None => {
let new_table = TableDefinition::new_from_op(self);
database_schema
.to_mut()
.insert_table(new_table.table_id, Arc::new(new_table));
.insert_table(new_table.table_id, Arc::new(new_table))?;
}
}
Ok(database_schema)
Expand Down Expand Up @@ -1156,7 +1116,7 @@ impl<T: TableUpdate> UpdateDatabaseSchema for T {
if let Cow::Owned(new_table) = self.update_table(Cow::Borrowed(table.as_ref()))? {
schema
.to_mut()
.insert_table(new_table.table_id, Arc::new(new_table));
.insert_table(new_table.table_id, Arc::new(new_table))?;
}
Ok(schema)
}
Expand Down Expand Up @@ -1288,12 +1248,12 @@ pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnT

#[cfg(test)]
mod tests {
use influxdb3_wal::{create, FieldDataType};
use super::*;
use influxdb3_wal::CatalogOp::CreateTable;
use influxdb3_wal::{create, FieldDataType, WalOp};
use pretty_assertions::assert_eq;
use test_helpers::assert_contains;

use super::*;

#[test]
fn catalog_serialization() {
let host_id = Arc::from("sample-host-id");
Expand Down Expand Up @@ -1707,7 +1667,7 @@ mod tests {
let catalog = Catalog::new(Arc::from("host"), Arc::from("instance"));
catalog.insert_database(DatabaseSchema::new(DbId::new(), Arc::from("foo")));
let db_id = catalog.db_name_to_id("foo").unwrap();
let catalog_batch = create::catalog_batch_op(
let catalog_batch = create::catalog_batch(
db_id,
"foo",
0,
Expand All @@ -1724,7 +1684,7 @@ mod tests {
)],
);
let err = catalog
.apply_catalog_batch(catalog_batch.as_catalog().unwrap())
.apply_catalog_batch(catalog_batch)
.expect_err("should fail to apply AddFields operation for non-existent table");
assert_contains!(err.to_string(), "Table banana not in DB schema for foo");
}
Expand Down Expand Up @@ -1775,7 +1735,9 @@ mod tests {
.unwrap(),
);

database.insert_table(deleted_table_id, table_defn);
database
.insert_table(deleted_table_id, table_defn)
.expect("should be able to insert");
let new_db = DatabaseSchema::new_if_updated_from_batch(
&database,
&CatalogBatch {
Expand All @@ -1798,4 +1760,81 @@ mod tests {
assert!(deleted_table.deleted);
assert!(!deleted_table.series_key.is_empty());
}

// tests that sorting catalog ops by the sequence number returned from apply_catalog_batch
// fixes potential ordering issues.
#[test]
fn test_out_of_order_ops() -> Result<()> {
let catalog = Catalog::new(Arc::from("host"), Arc::from("instance"));
let db_id = DbId::new();
let db_name = Arc::from("foo");
let table_id = TableId::new();
let table_name = Arc::from("bar");
let table_definition = influxdb3_wal::TableDefinition {
database_id: db_id,
database_name: Arc::clone(&db_name),
table_name: Arc::clone(&table_name),
table_id,
field_definitions: vec![
FieldDefinition::new(ColumnId::from(0), "tag_1", FieldDataType::Tag),
FieldDefinition::new(ColumnId::from(1), "time", FieldDataType::Timestamp),
FieldDefinition::new(ColumnId::from(2), "field", FieldDataType::String),
],
key: vec![ColumnId::from(0)],
};
let create_op = CatalogBatch {
database_id: db_id,
database_name: Arc::clone(&db_name),
time_ns: 0,
ops: vec![CreateTable(table_definition.clone())],
};
let add_column_op = CatalogBatch {
database_id: db_id,
database_name: Arc::clone(&db_name),
time_ns: 0,
ops: vec![CatalogOp::AddFields(FieldAdditions {
database_name: Arc::clone(&db_name),
database_id: db_id,
table_name,
table_id,
field_definitions: vec![FieldDefinition::new(
ColumnId::from(3),
"tag_2",
FieldDataType::Tag,
)],
})],
};
let create_ordered_op = catalog
.apply_catalog_batch(create_op)?
.expect("should be able to create");
let add_column_op = catalog
.apply_catalog_batch(add_column_op)?
.expect("should produce operation");
let mut ops = vec![
WalOp::Catalog(add_column_op),
WalOp::Catalog(create_ordered_op),
];
ops.sort();

let replayed_catalog = Catalog::new(Arc::from("host"), Arc::from("instance"));
for op in ops {
let WalOp::Catalog(catalog_batch) = op else {
panic!("should produce operation");
};
replayed_catalog.apply_catalog_batch(catalog_batch.batch())?;
}
let original_table = catalog
.db_schema_by_id(&db_id)
.unwrap()
.table_definition_by_id(&table_id)
.unwrap();
let replayed_table = catalog
.db_schema_by_id(&db_id)
.unwrap()
.table_definition_by_id(&table_id)
.unwrap();

assert_eq!(original_table, replayed_table);
Ok(())
}
}
8 changes: 4 additions & 4 deletions influxdb3_wal/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ pub fn write_batch_op(write_batch: WriteBatch) -> WalOp {
WalOp::Write(write_batch)
}

pub fn catalog_batch_op(
pub fn catalog_batch(
db_id: DbId,
db_name: impl Into<Arc<str>>,
time_ns: i64,
ops: impl IntoIterator<Item = CatalogOp>,
) -> WalOp {
WalOp::Catalog(CatalogBatch {
) -> CatalogBatch {
CatalogBatch {
database_id: db_id,
database_name: db_name.into(),
time_ns,
ops: ops.into_iter().collect(),
})
}
}

pub fn add_fields_op(
Expand Down
Loading
Loading