diff --git a/Cargo.lock b/Cargo.lock index dfb58ff8..afbb6eb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -937,6 +937,12 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80115a2dfde04491e181c2440a39e4be26e52d9ca4e92bed213f65b94e0b8db1" +[[package]] +name = "difference" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" + [[package]] name = "digest" version = "0.8.1" @@ -961,6 +967,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "downcast" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" + [[package]] name = "either" version = "1.6.1" @@ -1066,6 +1078,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1082,6 +1103,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69a039c3498dc930fe810151a34ba0c1c70b02b8625035592e74432f678591f2" + [[package]] name = "fs_extra" version = "1.2.0" @@ -1822,6 +1849,7 @@ dependencies = [ "memmap", "milli", "mime", + "mockall", "once_cell", "oxidized-json-checker", "parking_lot", @@ -2023,6 +2051,33 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "mockall" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d614ad23f9bb59119b8b5670a85c7ba92c5e9adf4385c81ea00c51c8be33d5" +dependencies = [ + "cfg-if 1.0.0", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5dd4234635bca06fc96c7368d038061e0aae1b00a764dc817e900dc974e3deea" +dependencies = [ + "cfg-if 1.0.0", + "proc-macro2 1.0.24", + "quote 1.0.9", + "syn 1.0.64", +] + [[package]] name = "net2" version = "0.2.37" @@ -2046,6 +2101,12 @@ dependencies = [ "libc", ] +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "ntapi" version = "0.3.6" @@ -2329,6 +2390,35 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" +[[package]] +name = "predicates" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eeb433456c1a57cc93554dea3ce40b4c19c4057e41c55d4a0f3d84ea71c325aa" +dependencies = [ + "difference", + "float-cmp", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451" + +[[package]] +name = "predicates-tree" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15f553275e5721409451eb85e15fd9a860a6e5ab4496eb215987502b5f5391f2" +dependencies = [ + "predicates-core", + "treeline", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3438,6 +3528,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "treeline" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41" + [[package]] name = "trust-dns-proto" version = "0.19.7" diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index 4b8ecd13..93776269 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -80,10 +80,11 @@ version = "0.18.1" [dev-dependencies] +actix-rt = "2.1.0" +assert-json-diff = { branch = "master", git = "https://github.com/qdequele/assert-json-diff" } +mockall = "0.9.1" serde_url_params = "0.2.0" tempdir = "0.3.7" -assert-json-diff = { branch = "master", git = "https://github.com/qdequele/assert-json-diff" } -actix-rt = "2.1.0" urlencoding = "1.1.1" [features] diff --git a/meilisearch-http/src/data/mod.rs b/meilisearch-http/src/data/mod.rs index c312c291..717d728f 100644 --- a/meilisearch-http/src/data/mod.rs +++ b/meilisearch-http/src/data/mod.rs @@ -1,7 +1,6 @@ pub mod search; mod updates; -use std::fs::create_dir_all; use std::ops::Deref; use std::sync::Arc; @@ -59,10 +58,7 @@ impl Data { pub fn new(options: Opt) -> anyhow::Result { let path = options.db_path.clone(); - create_dir_all(&path)?; - let index_size = options.max_mdb_size.get_bytes() as usize; - let update_store_size = options.max_udb_size.get_bytes() as usize; - let index_controller = IndexController::new(&path, index_size, update_store_size)?; + let index_controller = IndexController::new(&path, &options)?; let mut api_keys = ApiKeys { master: options.clone().master_key, diff --git a/meilisearch-http/src/helpers/compression.rs b/meilisearch-http/src/helpers/compression.rs index bbf14d57..c4747cb2 100644 --- a/meilisearch-http/src/helpers/compression.rs +++ b/meilisearch-http/src/helpers/compression.rs @@ -1,27 +1,26 @@ -use flate2::read::GzDecoder; -use flate2::write::GzEncoder; -use flate2::Compression; use std::fs::{create_dir_all, File}; +use std::io::Write; use std::path::Path; -use tar::{Archive, Builder}; -use crate::error::Error; +use flate2::{read::GzDecoder, write::GzEncoder, Compression}; +use tar::{Archive, Builder}; -pub fn to_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> { - let f = File::create(dest)?; - let gz_encoder = GzEncoder::new(f, Compression::default()); +pub fn to_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { + let mut f = File::create(dest)?; + let gz_encoder = GzEncoder::new(&mut f, Compression::default()); let mut tar_encoder = Builder::new(gz_encoder); tar_encoder.append_dir_all(".", src)?; let gz_encoder = tar_encoder.into_inner()?; gz_encoder.finish()?; + f.flush()?; Ok(()) } -pub fn from_tar_gz(src: &Path, dest: &Path) -> Result<(), Error> { - let f = File::open(src)?; +pub fn from_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { + let f = File::open(&src)?; let gz = GzDecoder::new(f); let mut ar = Archive::new(gz); - create_dir_all(dest)?; - ar.unpack(dest)?; + create_dir_all(&dest)?; + ar.unpack(&dest)?; Ok(()) } diff --git a/meilisearch-http/src/index/search.rs b/meilisearch-http/src/index/search.rs index 2002266a..7311687d 100644 --- a/meilisearch-http/src/index/search.rs +++ b/meilisearch-http/src/index/search.rs @@ -20,7 +20,6 @@ const fn default_search_limit() -> usize { #[derive(Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] -#[allow(dead_code)] pub struct SearchQuery { pub q: Option, pub offset: Option, diff --git a/meilisearch-http/src/index_controller/index_actor.rs b/meilisearch-http/src/index_controller/index_actor.rs deleted file mode 100644 index cc6d6752..00000000 --- a/meilisearch-http/src/index_controller/index_actor.rs +++ /dev/null @@ -1,612 +0,0 @@ -use std::collections::HashMap; -use std::fs::{create_dir_all, File}; -use std::future::Future; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use async_stream::stream; -use chrono::{DateTime, Utc}; -use futures::pin_mut; -use futures::stream::StreamExt; -use heed::EnvOpenOptions; -use log::debug; -use serde::{Deserialize, Serialize}; -use thiserror::Error; -use tokio::fs::remove_dir_all; -use tokio::sync::{mpsc, oneshot, RwLock}; -use tokio::task::spawn_blocking; -use uuid::Uuid; - -use super::update_handler::UpdateHandler; -use super::{get_arc_ownership_blocking, IndexSettings}; -use crate::index::UpdateResult as UResult; -use crate::index::{Document, Index, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{ - updates::{Failed, Processed, Processing}, - UpdateMeta, -}; -use crate::option::IndexerOpts; - -pub type Result = std::result::Result; -type AsyncMap = Arc>>; -type UpdateResult = std::result::Result, Failed>; - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct IndexMeta { - created_at: DateTime, - updated_at: DateTime, - primary_key: Option, -} - -impl IndexMeta { - fn new(index: &Index) -> Result { - let txn = index.read_txn()?; - Self::new_txn(index, &txn) - } - - fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { - let created_at = index.created_at(&txn)?; - let updated_at = index.updated_at(&txn)?; - let primary_key = index.primary_key(&txn)?.map(String::from); - Ok(Self { - primary_key, - updated_at, - created_at, - }) - } -} - -enum IndexMsg { - CreateIndex { - uuid: Uuid, - primary_key: Option, - ret: oneshot::Sender>, - }, - Update { - meta: Processing, - data: std::fs::File, - ret: oneshot::Sender>, - }, - Search { - uuid: Uuid, - query: SearchQuery, - ret: oneshot::Sender>, - }, - Settings { - uuid: Uuid, - ret: oneshot::Sender>, - }, - Documents { - uuid: Uuid, - attributes_to_retrieve: Option>, - offset: usize, - limit: usize, - ret: oneshot::Sender>>, - }, - Document { - uuid: Uuid, - attributes_to_retrieve: Option>, - doc_id: String, - ret: oneshot::Sender>, - }, - Delete { - uuid: Uuid, - ret: oneshot::Sender>, - }, - GetMeta { - uuid: Uuid, - ret: oneshot::Sender>, - }, - UpdateIndex { - uuid: Uuid, - index_settings: IndexSettings, - ret: oneshot::Sender>, - }, -} - -struct IndexActor { - read_receiver: Option>, - write_receiver: Option>, - update_handler: Arc, - store: S, -} - -#[derive(Error, Debug)] -pub enum IndexError { - #[error("error with index: {0}")] - Error(#[from] anyhow::Error), - #[error("index already exists")] - IndexAlreadyExists, - #[error("Index doesn't exists")] - UnexistingIndex, - #[error("Heed error: {0}")] - HeedError(#[from] heed::Error), - #[error("Existing primary key")] - ExistingPrimaryKey, -} - -#[async_trait::async_trait] -trait IndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> Result; - async fn get(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: Uuid) -> Result>; -} - -impl IndexActor { - fn new( - read_receiver: mpsc::Receiver, - write_receiver: mpsc::Receiver, - store: S, - ) -> Result { - let options = IndexerOpts::default(); - let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; - let update_handler = Arc::new(update_handler); - let read_receiver = Some(read_receiver); - let write_receiver = Some(write_receiver); - Ok(Self { - read_receiver, - write_receiver, - store, - update_handler, - }) - } - - /// `run` poll the write_receiver and read_receiver concurrently, but while messages send - /// through the read channel are processed concurrently, the messages sent through the write - /// channel are processed one at a time. - async fn run(mut self) { - let mut read_receiver = self - .read_receiver - .take() - .expect("Index Actor must have a inbox at this point."); - - let read_stream = stream! { - loop { - match read_receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - - let mut write_receiver = self - .write_receiver - .take() - .expect("Index Actor must have a inbox at this point."); - - let write_stream = stream! { - loop { - match write_receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - - pin_mut!(write_stream); - pin_mut!(read_stream); - - let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg)); - let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg)); - - let fut1: Box + Unpin + Send> = Box::new(fut1); - let fut2: Box + Unpin + Send> = Box::new(fut2); - - tokio::join!(fut1, fut2); - } - - async fn handle_message(&self, msg: IndexMsg) { - use IndexMsg::*; - match msg { - CreateIndex { - uuid, - primary_key, - ret, - } => { - let _ = ret.send(self.handle_create_index(uuid, primary_key).await); - } - Update { ret, meta, data } => { - let _ = ret.send(self.handle_update(meta, data).await); - } - Search { ret, query, uuid } => { - let _ = ret.send(self.handle_search(uuid, query).await); - } - Settings { ret, uuid } => { - let _ = ret.send(self.handle_settings(uuid).await); - } - Documents { - ret, - uuid, - attributes_to_retrieve, - offset, - limit, - } => { - let _ = ret.send( - self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve) - .await, - ); - } - Document { - uuid, - attributes_to_retrieve, - doc_id, - ret, - } => { - let _ = ret.send( - self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve) - .await, - ); - } - Delete { uuid, ret } => { - let _ = ret.send(self.handle_delete(uuid).await); - } - GetMeta { uuid, ret } => { - let _ = ret.send(self.handle_get_meta(uuid).await); - } - UpdateIndex { - uuid, - index_settings, - ret, - } => { - let _ = ret.send(self.handle_update_index(uuid, index_settings).await); - } - } - } - - async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || index.perform_search(query)).await? - } - - async fn handle_create_index( - &self, - uuid: Uuid, - primary_key: Option, - ) -> Result { - let index = self.store.create(uuid, primary_key).await?; - let meta = spawn_blocking(move || IndexMeta::new(&index)) - .await - .map_err(|e| IndexError::Error(e.into()))??; - Ok(meta) - } - - async fn handle_update( - &self, - meta: Processing, - data: File, - ) -> Result { - log::info!("Processing update {}", meta.id()); - let uuid = meta.index_uuid(); - let update_handler = self.update_handler.clone(); - let index = match self.store.get(*uuid).await? { - Some(index) => index, - None => self.store.create(*uuid, None).await?, - }; - spawn_blocking(move || update_handler.handle_update(meta, data, index)) - .await - .map_err(|e| IndexError::Error(e.into())) - } - - async fn handle_settings(&self, uuid: Uuid) -> Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || index.settings().map_err(IndexError::Error)) - .await - .map_err(|e| IndexError::Error(e.into()))? - } - - async fn handle_fetch_documents( - &self, - uuid: Uuid, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result> { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || { - index - .retrieve_documents(offset, limit, attributes_to_retrieve) - .map_err(IndexError::Error) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? - } - - async fn handle_fetch_document( - &self, - uuid: Uuid, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || { - index - .retrieve_document(doc_id, attributes_to_retrieve) - .map_err(IndexError::Error) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? - } - - async fn handle_delete(&self, uuid: Uuid) -> Result<()> { - let index = self.store.delete(uuid).await?; - - if let Some(index) = index { - tokio::task::spawn(async move { - let index = index.0; - let store = get_arc_ownership_blocking(index).await; - spawn_blocking(move || { - store.prepare_for_closing().wait(); - debug!("Index closed"); - }); - }); - } - - Ok(()) - } - - async fn handle_get_meta(&self, uuid: Uuid) -> Result { - match self.store.get(uuid).await? { - Some(index) => { - let meta = spawn_blocking(move || IndexMeta::new(&index)) - .await - .map_err(|e| IndexError::Error(e.into()))??; - Ok(meta) - } - None => Err(IndexError::UnexistingIndex), - } - } - - async fn handle_update_index( - &self, - uuid: Uuid, - index_settings: IndexSettings, - ) -> Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexError::UnexistingIndex)?; - - spawn_blocking(move || match index_settings.primary_key { - Some(ref primary_key) => { - let mut txn = index.write_txn()?; - if index.primary_key(&txn)?.is_some() { - return Err(IndexError::ExistingPrimaryKey); - } - index.put_primary_key(&mut txn, primary_key)?; - let meta = IndexMeta::new_txn(&index, &txn)?; - txn.commit()?; - Ok(meta) - } - None => { - let meta = IndexMeta::new(&index)?; - Ok(meta) - } - }) - .await - .map_err(|e| IndexError::Error(e.into()))? - } -} - -#[derive(Clone)] -pub struct IndexActorHandle { - read_sender: mpsc::Sender, - write_sender: mpsc::Sender, -} - -impl IndexActorHandle { - pub fn new(path: impl AsRef, index_size: usize) -> anyhow::Result { - let (read_sender, read_receiver) = mpsc::channel(100); - let (write_sender, write_receiver) = mpsc::channel(100); - - let store = HeedIndexStore::new(path, index_size); - let actor = IndexActor::new(read_receiver, write_receiver, store)?; - tokio::task::spawn(actor.run()); - Ok(Self { - read_sender, - write_sender, - }) - } - - pub async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::CreateIndex { - ret, - uuid, - primary_key, - }; - let _ = self.read_sender.send(msg).await; - receiver.await.expect("IndexActor has been killed") - } - - pub async fn update( - &self, - meta: Processing, - data: std::fs::File, - ) -> anyhow::Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Update { ret, meta, data }; - let _ = self.write_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Search { uuid, query, ret }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn settings(&self, uuid: Uuid) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Settings { uuid, ret }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn documents( - &self, - uuid: Uuid, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Documents { - uuid, - ret, - offset, - attributes_to_retrieve, - limit, - }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn document( - &self, - uuid: Uuid, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Document { - uuid, - ret, - doc_id, - attributes_to_retrieve, - }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn delete(&self, uuid: Uuid) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Delete { uuid, ret }; - let _ = self.write_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn get_index_meta(&self, uuid: Uuid) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::GetMeta { uuid, ret }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn update_index( - &self, - uuid: Uuid, - index_settings: IndexSettings, - ) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::UpdateIndex { - uuid, - index_settings, - ret, - }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } -} - -struct HeedIndexStore { - index_store: AsyncMap, - path: PathBuf, - index_size: usize, -} - -impl HeedIndexStore { - fn new(path: impl AsRef, index_size: usize) -> Self { - let path = path.as_ref().join("indexes/"); - let index_store = Arc::new(RwLock::new(HashMap::new())); - Self { - index_store, - path, - index_size, - } - } -} - -#[async_trait::async_trait] -impl IndexStore for HeedIndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> Result { - let path = self.path.join(format!("index-{}", uuid)); - if path.exists() { - return Err(IndexError::IndexAlreadyExists); - } - - let index_size = self.index_size; - let index = spawn_blocking(move || -> Result { - let index = open_index(&path, index_size)?; - if let Some(primary_key) = primary_key { - let mut txn = index.write_txn()?; - index.put_primary_key(&mut txn, &primary_key)?; - txn.commit()?; - } - Ok(index) - }) - .await - .map_err(|e| IndexError::Error(e.into()))??; - - self.index_store.write().await.insert(uuid, index.clone()); - - Ok(index) - } - - async fn get(&self, uuid: Uuid) -> Result> { - let guard = self.index_store.read().await; - match guard.get(&uuid) { - Some(index) => Ok(Some(index.clone())), - None => { - // drop the guard here so we can perform the write after without deadlocking; - drop(guard); - let path = self.path.join(format!("index-{}", uuid)); - if !path.exists() { - return Ok(None); - } - - let index_size = self.index_size; - let index = spawn_blocking(move || open_index(path, index_size)) - .await - .map_err(|e| IndexError::Error(e.into()))??; - self.index_store.write().await.insert(uuid, index.clone()); - Ok(Some(index)) - } - } - } - - async fn delete(&self, uuid: Uuid) -> Result> { - let db_path = self.path.join(format!("index-{}", uuid)); - remove_dir_all(db_path) - .await - .map_err(|e| IndexError::Error(e.into()))?; - let index = self.index_store.write().await.remove(&uuid); - Ok(index) - } -} - -fn open_index(path: impl AsRef, size: usize) -> Result { - create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let index = milli::Index::new(options, &path).map_err(IndexError::Error)?; - Ok(Index(Arc::new(index))) -} diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs new file mode 100644 index 00000000..a4228227 --- /dev/null +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -0,0 +1,331 @@ +use std::fs::File; +use std::future::Future; +use std::path::PathBuf; +use std::sync::Arc; + +use async_stream::stream; +use futures::pin_mut; +use futures::stream::StreamExt; +use heed::CompactionOption; +use log::debug; +use tokio::sync::mpsc; +use tokio::task::spawn_blocking; +use uuid::Uuid; + +use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult}; +use crate::index::{Document, SearchQuery, SearchResult, Settings}; +use crate::index_controller::update_handler::UpdateHandler; +use crate::index_controller::{get_arc_ownership_blocking, updates::Processing, UpdateMeta}; +use crate::option::IndexerOpts; + +pub struct IndexActor { + read_receiver: Option>, + write_receiver: Option>, + update_handler: Arc, + store: S, +} + +impl IndexActor { + pub fn new( + read_receiver: mpsc::Receiver, + write_receiver: mpsc::Receiver, + store: S, + ) -> Result { + let options = IndexerOpts::default(); + let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; + let update_handler = Arc::new(update_handler); + let read_receiver = Some(read_receiver); + let write_receiver = Some(write_receiver); + Ok(Self { + read_receiver, + write_receiver, + store, + update_handler, + }) + } + + /// `run` poll the write_receiver and read_receiver concurrently, but while messages send + /// through the read channel are processed concurrently, the messages sent through the write + /// channel are processed one at a time. + pub async fn run(mut self) { + let mut read_receiver = self + .read_receiver + .take() + .expect("Index Actor must have a inbox at this point."); + + let read_stream = stream! { + loop { + match read_receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + + let mut write_receiver = self + .write_receiver + .take() + .expect("Index Actor must have a inbox at this point."); + + let write_stream = stream! { + loop { + match write_receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + + pin_mut!(write_stream); + pin_mut!(read_stream); + + let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg)); + let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg)); + + let fut1: Box + Unpin + Send> = Box::new(fut1); + let fut2: Box + Unpin + Send> = Box::new(fut2); + + tokio::join!(fut1, fut2); + } + + async fn handle_message(&self, msg: IndexMsg) { + use IndexMsg::*; + match msg { + CreateIndex { + uuid, + primary_key, + ret, + } => { + let _ = ret.send(self.handle_create_index(uuid, primary_key).await); + } + Update { ret, meta, data } => { + let _ = ret.send(self.handle_update(meta, data).await); + } + Search { ret, query, uuid } => { + let _ = ret.send(self.handle_search(uuid, query).await); + } + Settings { ret, uuid } => { + let _ = ret.send(self.handle_settings(uuid).await); + } + Documents { + ret, + uuid, + attributes_to_retrieve, + offset, + limit, + } => { + let _ = ret.send( + self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve) + .await, + ); + } + Document { + uuid, + attributes_to_retrieve, + doc_id, + ret, + } => { + let _ = ret.send( + self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve) + .await, + ); + } + Delete { uuid, ret } => { + let _ = ret.send(self.handle_delete(uuid).await); + } + GetMeta { uuid, ret } => { + let _ = ret.send(self.handle_get_meta(uuid).await); + } + UpdateIndex { + uuid, + index_settings, + ret, + } => { + let _ = ret.send(self.handle_update_index(uuid, index_settings).await); + } + Snapshot { uuid, path, ret } => { + let _ = ret.send(self.handle_snapshot(uuid, path).await); + } + } + } + + async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + spawn_blocking(move || index.perform_search(query)).await? + } + + async fn handle_create_index( + &self, + uuid: Uuid, + primary_key: Option, + ) -> Result { + let index = self.store.create(uuid, primary_key).await?; + let meta = spawn_blocking(move || IndexMeta::new(&index)) + .await + .map_err(|e| IndexError::Error(e.into()))??; + Ok(meta) + } + + async fn handle_update( + &self, + meta: Processing, + data: File, + ) -> Result { + debug!("Processing update {}", meta.id()); + let uuid = meta.index_uuid(); + let update_handler = self.update_handler.clone(); + let index = match self.store.get(*uuid).await? { + Some(index) => index, + None => self.store.create(*uuid, None).await?, + }; + spawn_blocking(move || update_handler.handle_update(meta, data, index)) + .await + .map_err(|e| IndexError::Error(e.into())) + } + + async fn handle_settings(&self, uuid: Uuid) -> Result { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + spawn_blocking(move || index.settings().map_err(IndexError::Error)) + .await + .map_err(|e| IndexError::Error(e.into()))? + } + + async fn handle_fetch_documents( + &self, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result> { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + spawn_blocking(move || { + index + .retrieve_documents(offset, limit, attributes_to_retrieve) + .map_err(IndexError::Error) + }) + .await + .map_err(|e| IndexError::Error(e.into()))? + } + + async fn handle_fetch_document( + &self, + uuid: Uuid, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + spawn_blocking(move || { + index + .retrieve_document(doc_id, attributes_to_retrieve) + .map_err(IndexError::Error) + }) + .await + .map_err(|e| IndexError::Error(e.into()))? + } + + async fn handle_delete(&self, uuid: Uuid) -> Result<()> { + let index = self.store.delete(uuid).await?; + + if let Some(index) = index { + tokio::task::spawn(async move { + let index = index.0; + let store = get_arc_ownership_blocking(index).await; + spawn_blocking(move || { + store.prepare_for_closing().wait(); + debug!("Index closed"); + }); + }); + } + + Ok(()) + } + + async fn handle_get_meta(&self, uuid: Uuid) -> Result { + match self.store.get(uuid).await? { + Some(index) => { + let meta = spawn_blocking(move || IndexMeta::new(&index)) + .await + .map_err(|e| IndexError::Error(e.into()))??; + Ok(meta) + } + None => Err(IndexError::UnexistingIndex), + } + } + + async fn handle_update_index( + &self, + uuid: Uuid, + index_settings: IndexSettings, + ) -> Result { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + + spawn_blocking(move || match index_settings.primary_key { + Some(ref primary_key) => { + let mut txn = index.write_txn()?; + if index.primary_key(&txn)?.is_some() { + return Err(IndexError::ExistingPrimaryKey); + } + index.put_primary_key(&mut txn, primary_key)?; + let meta = IndexMeta::new_txn(&index, &txn)?; + txn.commit()?; + Ok(meta) + } + None => { + let meta = IndexMeta::new(&index)?; + Ok(meta) + } + }) + .await + .map_err(|e| IndexError::Error(e.into()))? + } + + async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> { + use tokio::fs::create_dir_all; + + path.push("indexes"); + create_dir_all(&path) + .await + .map_err(|e| IndexError::Error(e.into()))?; + + if let Some(index) = self.store.get(uuid).await? { + let mut index_path = path.join(format!("index-{}", uuid)); + create_dir_all(&index_path) + .await + .map_err(|e| IndexError::Error(e.into()))?; + index_path.push("data.mdb"); + spawn_blocking(move || -> anyhow::Result<()> { + // Get write txn to wait for ongoing write transaction before snapshot. + let _txn = index.write_txn()?; + index + .env + .copy_to_path(index_path, CompactionOption::Enabled)?; + Ok(()) + }) + .await + .map_err(|e| IndexError::Error(e.into()))? + .map_err(IndexError::Error)?; + } + + Ok(()) + } +} diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs new file mode 100644 index 00000000..dba0f9e6 --- /dev/null +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -0,0 +1,139 @@ +use std::path::{Path, PathBuf}; + +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +use super::{ + IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult, +}; +use crate::index::{Document, SearchQuery, SearchResult, Settings}; +use crate::index_controller::IndexSettings; +use crate::index_controller::{updates::Processing, UpdateMeta}; + +#[derive(Clone)] +pub struct IndexActorHandleImpl { + read_sender: mpsc::Sender, + write_sender: mpsc::Sender, +} + +#[async_trait::async_trait] +impl IndexActorHandle for IndexActorHandleImpl { + async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::CreateIndex { + ret, + uuid, + primary_key, + }; + let _ = self.read_sender.send(msg).await; + receiver.await.expect("IndexActor has been killed") + } + + async fn update( + &self, + meta: Processing, + data: std::fs::File, + ) -> anyhow::Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Update { ret, meta, data }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Search { uuid, query, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn settings(&self, uuid: Uuid) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Settings { uuid, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn documents( + &self, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Documents { + uuid, + ret, + offset, + attributes_to_retrieve, + limit, + }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn document( + &self, + uuid: Uuid, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Document { + uuid, + ret, + doc_id, + attributes_to_retrieve, + }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn delete(&self, uuid: Uuid) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Delete { uuid, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn get_index_meta(&self, uuid: Uuid) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::GetMeta { uuid, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::UpdateIndex { + uuid, + index_settings, + ret, + }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Snapshot { uuid, path, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } +} + +impl IndexActorHandleImpl { + pub fn new(path: impl AsRef, index_size: usize) -> anyhow::Result { + let (read_sender, read_receiver) = mpsc::channel(100); + let (write_sender, write_receiver) = mpsc::channel(100); + + let store = MapIndexStore::new(path, index_size); + let actor = IndexActor::new(read_receiver, write_receiver, store)?; + tokio::task::spawn(actor.run()); + Ok(Self { + read_sender, + write_sender, + }) + } +} diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs new file mode 100644 index 00000000..46d7f621 --- /dev/null +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -0,0 +1,61 @@ +use std::path::PathBuf; + +use tokio::sync::oneshot; +use uuid::Uuid; + +use super::{IndexMeta, IndexSettings, Result, UpdateResult}; +use crate::index::{Document, SearchQuery, SearchResult, Settings}; +use crate::index_controller::{updates::Processing, UpdateMeta}; + +pub enum IndexMsg { + CreateIndex { + uuid: Uuid, + primary_key: Option, + ret: oneshot::Sender>, + }, + Update { + meta: Processing, + data: std::fs::File, + ret: oneshot::Sender>, + }, + Search { + uuid: Uuid, + query: SearchQuery, + ret: oneshot::Sender>, + }, + Settings { + uuid: Uuid, + ret: oneshot::Sender>, + }, + Documents { + uuid: Uuid, + attributes_to_retrieve: Option>, + offset: usize, + limit: usize, + ret: oneshot::Sender>>, + }, + Document { + uuid: Uuid, + attributes_to_retrieve: Option>, + doc_id: String, + ret: oneshot::Sender>, + }, + Delete { + uuid: Uuid, + ret: oneshot::Sender>, + }, + GetMeta { + uuid: Uuid, + ret: oneshot::Sender>, + }, + UpdateIndex { + uuid: Uuid, + index_settings: IndexSettings, + ret: oneshot::Sender>, + }, + Snapshot { + uuid: Uuid, + path: PathBuf, + ret: oneshot::Sender>, + }, +} diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs new file mode 100644 index 00000000..2dc856b8 --- /dev/null +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -0,0 +1,101 @@ +mod actor; +mod handle_impl; +mod message; +mod store; + +use std::path::PathBuf; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use uuid::Uuid; + +use super::IndexSettings; +use crate::index::UpdateResult as UResult; +use crate::index::{Document, Index, SearchQuery, SearchResult, Settings}; +use crate::index_controller::{ + updates::{Failed, Processed, Processing}, + UpdateMeta, +}; +use actor::IndexActor; +use message::IndexMsg; +use store::{IndexStore, MapIndexStore}; + +pub use handle_impl::IndexActorHandleImpl; + +#[cfg(test)] +use mockall::automock; + +pub type Result = std::result::Result; +type UpdateResult = std::result::Result, Failed>; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct IndexMeta { + created_at: DateTime, + updated_at: DateTime, + primary_key: Option, +} + +impl IndexMeta { + fn new(index: &Index) -> Result { + let txn = index.read_txn()?; + Self::new_txn(index, &txn) + } + + fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { + let created_at = index.created_at(&txn)?; + let updated_at = index.updated_at(&txn)?; + let primary_key = index.primary_key(&txn)?.map(String::from); + Ok(Self { + primary_key, + updated_at, + created_at, + }) + } +} + +#[derive(Error, Debug)] +pub enum IndexError { + #[error("error with index: {0}")] + Error(#[from] anyhow::Error), + #[error("index already exists")] + IndexAlreadyExists, + #[error("Index doesn't exists")] + UnexistingIndex, + #[error("Heed error: {0}")] + HeedError(#[from] heed::Error), + #[error("Existing primary key")] + ExistingPrimaryKey, +} + +#[async_trait::async_trait] +#[cfg_attr(test, automock)] +pub trait IndexActorHandle { + async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; + async fn update( + &self, + meta: Processing, + data: std::fs::File, + ) -> anyhow::Result; + async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result; + async fn settings(&self, uuid: Uuid) -> Result; + + async fn documents( + &self, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result>; + async fn document( + &self, + uuid: Uuid, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result; + async fn delete(&self, uuid: Uuid) -> Result<()>; + async fn get_index_meta(&self, uuid: Uuid) -> Result; + async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result; + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; +} diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs new file mode 100644 index 00000000..6250f515 --- /dev/null +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -0,0 +1,105 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use heed::EnvOpenOptions; +use tokio::fs; +use tokio::sync::RwLock; +use tokio::task::spawn_blocking; +use uuid::Uuid; + +use super::{IndexError, Result}; +use crate::index::Index; + +type AsyncMap = Arc>>; + +#[async_trait::async_trait] +pub trait IndexStore { + async fn create(&self, uuid: Uuid, primary_key: Option) -> Result; + async fn get(&self, uuid: Uuid) -> Result>; + async fn delete(&self, uuid: Uuid) -> Result>; +} + +pub struct MapIndexStore { + index_store: AsyncMap, + path: PathBuf, + index_size: usize, +} + +impl MapIndexStore { + pub fn new(path: impl AsRef, index_size: usize) -> Self { + let path = path.as_ref().join("indexes/"); + let index_store = Arc::new(RwLock::new(HashMap::new())); + Self { + index_store, + path, + index_size, + } + } +} + +#[async_trait::async_trait] +impl IndexStore for MapIndexStore { + async fn create(&self, uuid: Uuid, primary_key: Option) -> Result { + let path = self.path.join(format!("index-{}", uuid)); + if path.exists() { + return Err(IndexError::IndexAlreadyExists); + } + + let index_size = self.index_size; + let index = spawn_blocking(move || -> Result { + let index = open_index(&path, index_size)?; + if let Some(primary_key) = primary_key { + let mut txn = index.write_txn()?; + index.put_primary_key(&mut txn, &primary_key)?; + txn.commit()?; + } + Ok(index) + }) + .await + .map_err(|e| IndexError::Error(e.into()))??; + + self.index_store.write().await.insert(uuid, index.clone()); + + Ok(index) + } + + async fn get(&self, uuid: Uuid) -> Result> { + let guard = self.index_store.read().await; + match guard.get(&uuid) { + Some(index) => Ok(Some(index.clone())), + None => { + // drop the guard here so we can perform the write after without deadlocking; + drop(guard); + let path = self.path.join(format!("index-{}", uuid)); + if !path.exists() { + return Ok(None); + } + + let index_size = self.index_size; + let index = spawn_blocking(move || open_index(path, index_size)) + .await + .map_err(|e| IndexError::Error(e.into()))??; + self.index_store.write().await.insert(uuid, index.clone()); + Ok(Some(index)) + } + } + } + + async fn delete(&self, uuid: Uuid) -> Result> { + let db_path = self.path.join(format!("index-{}", uuid)); + fs::remove_dir_all(db_path) + .await + .map_err(|e| IndexError::Error(e.into()))?; + let index = self.index_store.write().await.remove(&uuid); + Ok(index) + } +} + +fn open_index(path: impl AsRef, size: usize) -> Result { + std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let index = milli::Index::new(options, &path).map_err(IndexError::Error)?; + Ok(Index(Arc::new(index))) +} diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 2bd373a5..b26ab882 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -1,7 +1,7 @@ mod index_actor; +mod snapshot; mod update_actor; mod update_handler; -mod update_store; mod updates; mod uuid_resolver; @@ -12,6 +12,7 @@ use std::time::Duration; use actix_web::web::{Bytes, Payload}; use anyhow::bail; use futures::stream::StreamExt; +use log::info; use milli::update::{IndexDocumentsMethod, UpdateFormat}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; @@ -20,6 +21,14 @@ use uuid::Uuid; use crate::index::{Document, SearchQuery, SearchResult}; use crate::index::{Facets, Settings, UpdateResult}; +use crate::option::Opt; + +use index_actor::IndexActorHandle; +use snapshot::load_snapshot; +use update_actor::UpdateActorHandle; +use uuid_resolver::UuidResolverHandle; + +use snapshot::SnapshotService; pub use updates::{Failed, Processed, Processing}; use uuid_resolver::UuidError; @@ -55,24 +64,54 @@ pub struct IndexSettings { } pub struct IndexController { - uuid_resolver: uuid_resolver::UuidResolverHandle, - index_handle: index_actor::IndexActorHandle, - update_handle: update_actor::UpdateActorHandle, + uuid_resolver: uuid_resolver::UuidResolverHandleImpl, + index_handle: index_actor::IndexActorHandleImpl, + update_handle: update_actor::UpdateActorHandleImpl, } impl IndexController { - pub fn new( - path: impl AsRef, - index_size: usize, - update_store_size: usize, - ) -> anyhow::Result { - let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?; - let index_actor = index_actor::IndexActorHandle::new(&path, index_size)?; - let update_handle = - update_actor::UpdateActorHandle::new(index_actor.clone(), &path, update_store_size)?; + pub fn new(path: impl AsRef, options: &Opt) -> anyhow::Result { + let index_size = options.max_mdb_size.get_bytes() as usize; + let update_store_size = options.max_udb_size.get_bytes() as usize; + + if let Some(ref path) = options.import_snapshot { + info!("Loading from snapshot {:?}", path); + load_snapshot( + &options.db_path, + path, + options.ignore_snapshot_if_db_exists, + options.ignore_missing_snapshot, + )?; + } + + std::fs::create_dir_all(&path)?; + + let uuid_resolver = uuid_resolver::UuidResolverHandleImpl::new(&path)?; + let index_handle = index_actor::IndexActorHandleImpl::new(&path, index_size)?; + let update_handle = update_actor::UpdateActorHandleImpl::new( + index_handle.clone(), + &path, + update_store_size, + )?; + + if options.schedule_snapshot { + let snapshot_service = SnapshotService::new( + uuid_resolver.clone(), + update_handle.clone(), + Duration::from_secs(options.snapshot_interval_sec), + options.snapshot_dir.clone(), + options.db_path + .file_name() + .map(|n| n.to_owned().into_string().expect("invalid path")) + .unwrap_or_else(|| String::from("data.ms")), + ); + + tokio::task::spawn(snapshot_service.run()); + } + Ok(Self { uuid_resolver, - index_handle: index_actor, + index_handle, update_handle, }) } diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs new file mode 100644 index 00000000..8557fe04 --- /dev/null +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -0,0 +1,260 @@ +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use anyhow::bail; +use log::{error, info}; +use tokio::fs; +use tokio::task::spawn_blocking; +use tokio::time::sleep; + +use super::update_actor::UpdateActorHandle; +use super::uuid_resolver::UuidResolverHandle; +use crate::helpers::compression; + +pub struct SnapshotService { + uuid_resolver_handle: R, + update_handle: U, + snapshot_period: Duration, + snapshot_path: PathBuf, + db_name: String, +} + +impl SnapshotService +where + U: UpdateActorHandle, + R: UuidResolverHandle, +{ + pub fn new( + uuid_resolver_handle: R, + update_handle: U, + snapshot_period: Duration, + snapshot_path: PathBuf, + db_name: String, + ) -> Self { + Self { + uuid_resolver_handle, + update_handle, + snapshot_period, + snapshot_path, + db_name, + } + } + + pub async fn run(self) { + info!( + "Snapshot scheduled every {}s.", + self.snapshot_period.as_secs() + ); + loop { + if let Err(e) = self.perform_snapshot().await { + error!("{}", e); + } + sleep(self.snapshot_period).await; + } + } + + async fn perform_snapshot(&self) -> anyhow::Result<()> { + info!("Performing snapshot."); + + let snapshot_dir = self.snapshot_path.clone(); + fs::create_dir_all(&snapshot_dir).await?; + let temp_snapshot_dir = + spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??; + let temp_snapshot_path = temp_snapshot_dir.path().to_owned(); + + let uuids = self + .uuid_resolver_handle + .snapshot(temp_snapshot_path.clone()) + .await?; + + if uuids.is_empty() { + return Ok(()); + } + + let tasks = uuids + .iter() + .map(|&uuid| { + self.update_handle + .snapshot(uuid, temp_snapshot_path.clone()) + }) + .collect::>(); + + futures::future::try_join_all(tasks).await?; + + let snapshot_dir = self.snapshot_path.clone(); + let snapshot_path = self + .snapshot_path + .join(format!("{}.snapshot", self.db_name)); + let snapshot_path = spawn_blocking(move || -> anyhow::Result { + let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?; + let temp_snapshot_file_path = temp_snapshot_file.path().to_owned(); + compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?; + temp_snapshot_file.persist(&snapshot_path)?; + Ok(snapshot_path) + }) + .await??; + + info!("Created snapshot in {:?}.", snapshot_path); + + Ok(()) + } +} + +pub fn load_snapshot( + db_path: impl AsRef, + snapshot_path: impl AsRef, + ignore_snapshot_if_db_exists: bool, + ignore_missing_snapshot: bool, +) -> anyhow::Result<()> { + if !db_path.as_ref().exists() && snapshot_path.as_ref().exists() { + match compression::from_tar_gz(snapshot_path, &db_path) { + Ok(()) => Ok(()), + Err(e) => { + // clean created db folder + std::fs::remove_dir_all(&db_path)?; + Err(e) + } + } + } else if db_path.as_ref().exists() && !ignore_snapshot_if_db_exists { + bail!( + "database already exists at {:?}, try to delete it or rename it", + db_path + .as_ref() + .canonicalize() + .unwrap_or_else(|_| db_path.as_ref().to_owned()) + ) + } else if !snapshot_path.as_ref().exists() && !ignore_missing_snapshot { + bail!( + "snapshot doesn't exist at {:?}", + snapshot_path + .as_ref() + .canonicalize() + .unwrap_or_else(|_| snapshot_path.as_ref().to_owned()) + ) + } else { + Ok(()) + } +} + +#[cfg(test)] +mod test { + use futures::future::{err, ok}; + use rand::Rng; + use tokio::time::timeout; + use uuid::Uuid; + + use super::*; + use crate::index_controller::update_actor::{MockUpdateActorHandle, UpdateError}; + use crate::index_controller::uuid_resolver::{MockUuidResolverHandle, UuidError}; + + #[actix_rt::test] + async fn test_normal() { + let mut rng = rand::thread_rng(); + let uuids_num = rng.gen_range(5, 10); + let uuids = (0..uuids_num).map(|_| Uuid::new_v4()).collect::>(); + + let mut uuid_resolver = MockUuidResolverHandle::new(); + let uuids_clone = uuids.clone(); + uuid_resolver + .expect_snapshot() + .times(1) + .returning(move |_| Box::pin(ok(uuids_clone.clone()))); + + let mut update_handle = MockUpdateActorHandle::new(); + let uuids_clone = uuids.clone(); + update_handle + .expect_snapshot() + .withf(move |uuid, _path| uuids_clone.contains(uuid)) + .times(uuids_num) + .returning(move |_, _| Box::pin(ok(()))); + + let snapshot_path = tempfile::tempdir_in(".").unwrap(); + let snapshot_service = SnapshotService::new( + uuid_resolver, + update_handle, + Duration::from_millis(100), + snapshot_path.path().to_owned(), + "data.ms".to_string(), + ); + + snapshot_service.perform_snapshot().await.unwrap(); + } + + #[actix_rt::test] + async fn error_performing_uuid_snapshot() { + let mut uuid_resolver = MockUuidResolverHandle::new(); + uuid_resolver + .expect_snapshot() + .times(1) + // abitrary error + .returning(|_| Box::pin(err(UuidError::NameAlreadyExist))); + + let update_handle = MockUpdateActorHandle::new(); + + let snapshot_path = tempfile::tempdir_in(".").unwrap(); + let snapshot_service = SnapshotService::new( + uuid_resolver, + update_handle, + Duration::from_millis(100), + snapshot_path.path().to_owned(), + "data.ms".to_string(), + ); + + assert!(snapshot_service.perform_snapshot().await.is_err()); + // Nothing was written to the file + assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); + } + + #[actix_rt::test] + async fn error_performing_index_snapshot() { + let uuid = Uuid::new_v4(); + let mut uuid_resolver = MockUuidResolverHandle::new(); + uuid_resolver + .expect_snapshot() + .times(1) + .returning(move |_| Box::pin(ok(vec![uuid]))); + + let mut update_handle = MockUpdateActorHandle::new(); + update_handle + .expect_snapshot() + // abitrary error + .returning(|_, _| Box::pin(err(UpdateError::UnexistingUpdate(0)))); + + let snapshot_path = tempfile::tempdir_in(".").unwrap(); + let snapshot_service = SnapshotService::new( + uuid_resolver, + update_handle, + Duration::from_millis(100), + snapshot_path.path().to_owned(), + "data.ms".to_string(), + ); + + assert!(snapshot_service.perform_snapshot().await.is_err()); + // Nothing was written to the file + assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); + } + + #[actix_rt::test] + async fn test_loop() { + let mut uuid_resolver = MockUuidResolverHandle::new(); + uuid_resolver + .expect_snapshot() + // we expect the funtion to be called between 2 and 3 time in the given interval. + .times(2..4) + // abitrary error, to short-circuit the function + .returning(move |_| Box::pin(err(UuidError::NameAlreadyExist))); + + let update_handle = MockUpdateActorHandle::new(); + + let snapshot_path = tempfile::tempdir_in(".").unwrap(); + let snapshot_service = SnapshotService::new( + uuid_resolver, + update_handle, + Duration::from_millis(100), + snapshot_path.path().to_owned(), + "data.ms".to_string(), + ); + + let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await; + } +} diff --git a/meilisearch-http/src/index_controller/update_actor.rs b/meilisearch-http/src/index_controller/update_actor.rs deleted file mode 100644 index abf2ab8b..00000000 --- a/meilisearch-http/src/index_controller/update_actor.rs +++ /dev/null @@ -1,399 +0,0 @@ -use std::collections::{hash_map::Entry, HashMap}; -use std::io::SeekFrom; -use std::fs::{create_dir_all, remove_dir_all}; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use log::info; -use oxidized_json_checker::JsonChecker; -use super::index_actor::IndexActorHandle; -use thiserror::Error; -use tokio::fs::OpenOptions; -use tokio::io::{AsyncWriteExt, AsyncSeekExt}; -use tokio::sync::{mpsc, oneshot, RwLock}; -use uuid::Uuid; - -use super::get_arc_ownership_blocking; -use crate::index::UpdateResult; -use crate::index_controller::{UpdateMeta, UpdateStatus}; - -pub type Result = std::result::Result; -type UpdateStore = super::update_store::UpdateStore; -type PayloadData = std::result::Result>; - -#[derive(Debug, Error)] -pub enum UpdateError { - #[error("error with update: {0}")] - Error(Box), - #[error("Index {0} doesn't exist.")] - UnexistingIndex(Uuid), - #[error("Update {0} doesn't exist.")] - UnexistingUpdate(u64), -} - -enum UpdateMsg { - Update { - uuid: Uuid, - meta: UpdateMeta, - data: mpsc::Receiver>, - ret: oneshot::Sender>, - }, - ListUpdates { - uuid: Uuid, - ret: oneshot::Sender>>, - }, - GetUpdate { - uuid: Uuid, - ret: oneshot::Sender>, - id: u64, - }, - Delete { - uuid: Uuid, - ret: oneshot::Sender>, - }, - Create { - uuid: Uuid, - ret: oneshot::Sender>, - }, -} - -struct UpdateActor { - path: PathBuf, - store: S, - inbox: mpsc::Receiver>, -} - -#[async_trait::async_trait] -trait UpdateStoreStore { - async fn get_or_create(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: Uuid) -> Result>>; - async fn get(&self, uuid: Uuid) -> Result>>; -} - -impl UpdateActor -where - D: AsRef<[u8]> + Sized + 'static, - S: UpdateStoreStore, -{ - fn new( - store: S, - inbox: mpsc::Receiver>, - path: impl AsRef, - ) -> anyhow::Result { - let path = path.as_ref().to_owned().join("update_files"); - create_dir_all(&path)?; - assert!(path.exists()); - Ok(Self { store, inbox, path }) - } - - async fn run(mut self) { - use UpdateMsg::*; - - info!("Started update actor."); - - loop { - match self.inbox.recv().await { - Some(Update { - uuid, - meta, - data, - ret, - }) => { - let _ = ret.send(self.handle_update(uuid, meta, data).await); - } - Some(ListUpdates { uuid, ret }) => { - let _ = ret.send(self.handle_list_updates(uuid).await); - } - Some(GetUpdate { uuid, ret, id }) => { - let _ = ret.send(self.handle_get_update(uuid, id).await); - } - Some(Delete { uuid, ret }) => { - let _ = ret.send(self.handle_delete(uuid).await); - } - Some(Create { uuid, ret }) => { - let _ = ret.send(self.handle_create(uuid).await); - } - None => break, - } - } - } - - async fn handle_update( - &self, - uuid: Uuid, - meta: UpdateMeta, - mut payload: mpsc::Receiver>, - ) -> Result { - let update_store = self.store.get_or_create(uuid).await?; - let update_file_id = uuid::Uuid::new_v4(); - let path = self.path.join(format!("update_{}", update_file_id)); - let mut file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&path) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - - while let Some(bytes) = payload.recv().await { - match bytes { - Ok(bytes) => { - file.write_all(bytes.as_ref()) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - } - Err(e) => { - return Err(UpdateError::Error(e)); - } - } - } - - file.flush() - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - - file.seek(SeekFrom::Start(0)) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - - let mut file = file.into_std().await; - - tokio::task::spawn_blocking(move || { - use std::io::{BufReader, sink, copy, Seek}; - - // If the payload is empty, ignore the check. - if file.metadata().map_err(|e| UpdateError::Error(Box::new(e)))?.len() > 0 { - // Check that the json payload is valid: - let reader = BufReader::new(&mut file); - let mut checker = JsonChecker::new(reader); - - if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() { - // The json file is invalid, we use Serde to get a nice error message: - file.seek(SeekFrom::Start(0)) - .map_err(|e| UpdateError::Error(Box::new(e)))?; - let _: serde_json::Value = serde_json::from_reader(file) - .map_err(|e| UpdateError::Error(Box::new(e)))?; - } - } - - // The payload is valid, we can register it to the update store. - update_store - .register_update(meta, path, uuid) - .map(UpdateStatus::Pending) - .map_err(|e| UpdateError::Error(Box::new(e))) - }) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))? - } - - async fn handle_list_updates(&self, uuid: Uuid) -> Result> { - let update_store = self.store.get(uuid).await?; - tokio::task::spawn_blocking(move || { - let result = update_store - .ok_or(UpdateError::UnexistingIndex(uuid))? - .list() - .map_err(|e| UpdateError::Error(e.into()))?; - Ok(result) - }) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))? - } - - async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result { - let store = self - .store - .get(uuid) - .await? - .ok_or(UpdateError::UnexistingIndex(uuid))?; - let result = store - .meta(id) - .map_err(|e| UpdateError::Error(Box::new(e)))? - .ok_or(UpdateError::UnexistingUpdate(id))?; - Ok(result) - } - - async fn handle_delete(&self, uuid: Uuid) -> Result<()> { - let store = self.store.delete(uuid).await?; - - if let Some(store) = store { - tokio::task::spawn(async move { - let store = get_arc_ownership_blocking(store).await; - tokio::task::spawn_blocking(move || { - store.prepare_for_closing().wait(); - info!("Update store {} was closed.", uuid); - }); - }); - } - - Ok(()) - } - - async fn handle_create(&self, uuid: Uuid) -> Result<()> { - let _ = self.store.get_or_create(uuid).await?; - Ok(()) - } -} - -#[derive(Clone)] -pub struct UpdateActorHandle { - sender: mpsc::Sender>, -} - -impl UpdateActorHandle -where - D: AsRef<[u8]> + Sized + 'static + Sync + Send, -{ - pub fn new( - index_handle: IndexActorHandle, - path: impl AsRef, - update_store_size: usize, - ) -> anyhow::Result { - let path = path.as_ref().to_owned().join("updates"); - let (sender, receiver) = mpsc::channel(100); - let store = MapUpdateStoreStore::new(index_handle, &path, update_store_size); - let actor = UpdateActor::new(store, receiver, path)?; - - tokio::task::spawn(actor.run()); - - Ok(Self { sender }) - } - - pub async fn update( - &self, - meta: UpdateMeta, - data: mpsc::Receiver>, - uuid: Uuid, - ) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Update { - uuid, - data, - meta, - ret, - }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } - - pub async fn get_all_updates_status(&self, uuid: Uuid) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::ListUpdates { uuid, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } - - pub async fn update_status(&self, uuid: Uuid, id: u64) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::GetUpdate { uuid, id, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } - - pub async fn delete(&self, uuid: Uuid) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Delete { uuid, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } - - pub async fn create(&self, uuid: Uuid) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Create { uuid, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } -} - -struct MapUpdateStoreStore { - db: Arc>>>, - index_handle: IndexActorHandle, - path: PathBuf, - update_store_size: usize, -} - -impl MapUpdateStoreStore { - fn new( - index_handle: IndexActorHandle, - path: impl AsRef, - update_store_size: usize, - ) -> Self { - let db = Arc::new(RwLock::new(HashMap::new())); - let path = path.as_ref().to_owned(); - Self { - db, - index_handle, - path, - update_store_size, - } - } -} - -#[async_trait::async_trait] -impl UpdateStoreStore for MapUpdateStoreStore { - async fn get_or_create(&self, uuid: Uuid) -> Result> { - match self.db.write().await.entry(uuid) { - Entry::Vacant(e) => { - let mut options = heed::EnvOpenOptions::new(); - let update_store_size = self.update_store_size; - options.map_size(update_store_size); - let path = self.path.clone().join(format!("updates-{}", e.key())); - create_dir_all(&path).unwrap(); - let index_handle = self.index_handle.clone(); - let store = UpdateStore::open(options, &path, move |meta, file| { - futures::executor::block_on(index_handle.update(meta, file)) - }) - .map_err(|e| UpdateError::Error(e.into()))?; - let store = e.insert(store); - Ok(store.clone()) - } - Entry::Occupied(e) => Ok(e.get().clone()), - } - } - - async fn get(&self, uuid: Uuid) -> Result>> { - let guard = self.db.read().await; - match guard.get(&uuid) { - Some(uuid) => Ok(Some(uuid.clone())), - None => { - // The index is not found in the found in the loaded indexes, so we attempt to load - // it from disk. We need to acquire a write lock **before** attempting to open the - // index, because someone could be trying to open it at the same time as us. - drop(guard); - let path = self.path.clone().join(format!("updates-{}", uuid)); - if path.exists() { - let mut guard = self.db.write().await; - match guard.entry(uuid) { - Entry::Vacant(entry) => { - // We can safely load the index - let index_handle = self.index_handle.clone(); - let mut options = heed::EnvOpenOptions::new(); - let update_store_size = self.update_store_size; - options.map_size(update_store_size); - let store = UpdateStore::open(options, &path, move |meta, file| { - futures::executor::block_on(index_handle.update(meta, file)) - }) - .map_err(|e| UpdateError::Error(e.into()))?; - let store = entry.insert(store); - Ok(Some(store.clone())) - } - Entry::Occupied(entry) => { - // The index was loaded while we attempted to to iter - Ok(Some(entry.get().clone())) - } - } - } else { - Ok(None) - } - } - } - } - - async fn delete(&self, uuid: Uuid) -> Result>> { - let store = self.db.write().await.remove(&uuid); - let path = self.path.clone().join(format!("updates-{}", uuid)); - if store.is_some() || path.exists() { - remove_dir_all(path).unwrap(); - } - Ok(store) - } -} diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs new file mode 100644 index 00000000..68847274 --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -0,0 +1,226 @@ +use std::io::SeekFrom; +use std::path::{Path, PathBuf}; + +use log::info; +use oxidized_json_checker::JsonChecker; +use tokio::fs; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +use tokio::sync::mpsc; +use uuid::Uuid; + +use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStoreStore}; +use crate::index_controller::index_actor::IndexActorHandle; +use crate::index_controller::{get_arc_ownership_blocking, UpdateMeta, UpdateStatus}; + +pub struct UpdateActor { + path: PathBuf, + store: S, + inbox: mpsc::Receiver>, + index_handle: I, +} + +impl UpdateActor +where + D: AsRef<[u8]> + Sized + 'static, + S: UpdateStoreStore, + I: IndexActorHandle + Clone + Send + Sync + 'static, +{ + pub fn new( + store: S, + inbox: mpsc::Receiver>, + path: impl AsRef, + index_handle: I, + ) -> anyhow::Result { + let path = path.as_ref().to_owned(); + std::fs::create_dir_all(path.join("update_files"))?; + assert!(path.exists()); + Ok(Self { + store, + inbox, + path, + index_handle, + }) + } + + pub async fn run(mut self) { + use UpdateMsg::*; + + info!("Started update actor."); + + loop { + match self.inbox.recv().await { + Some(Update { + uuid, + meta, + data, + ret, + }) => { + let _ = ret.send(self.handle_update(uuid, meta, data).await); + } + Some(ListUpdates { uuid, ret }) => { + let _ = ret.send(self.handle_list_updates(uuid).await); + } + Some(GetUpdate { uuid, ret, id }) => { + let _ = ret.send(self.handle_get_update(uuid, id).await); + } + Some(Delete { uuid, ret }) => { + let _ = ret.send(self.handle_delete(uuid).await); + } + Some(Create { uuid, ret }) => { + let _ = ret.send(self.handle_create(uuid).await); + } + Some(Snapshot { uuid, path, ret }) => { + let _ = ret.send(self.handle_snapshot(uuid, path).await); + } + None => break, + } + } + } + + async fn handle_update( + &self, + uuid: Uuid, + meta: UpdateMeta, + mut payload: mpsc::Receiver>, + ) -> Result { + let update_store = self.store.get_or_create(uuid).await?; + let update_file_id = uuid::Uuid::new_v4(); + let path = self + .path + .join(format!("update_files/update_{}", update_file_id)); + let mut file = fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&path) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + + while let Some(bytes) = payload.recv().await { + match bytes { + Ok(bytes) => { + file.write_all(bytes.as_ref()) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + } + Err(e) => { + return Err(UpdateError::Error(e)); + } + } + } + + file.flush() + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + + file.seek(SeekFrom::Start(0)) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + + let mut file = file.into_std().await; + + tokio::task::spawn_blocking(move || { + use std::io::{copy, sink, BufReader, Seek}; + + // If the payload is empty, ignore the check. + if file + .metadata() + .map_err(|e| UpdateError::Error(Box::new(e)))? + .len() + > 0 + { + // Check that the json payload is valid: + let reader = BufReader::new(&mut file); + let mut checker = JsonChecker::new(reader); + + if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() { + // The json file is invalid, we use Serde to get a nice error message: + file.seek(SeekFrom::Start(0)) + .map_err(|e| UpdateError::Error(Box::new(e)))?; + let _: serde_json::Value = serde_json::from_reader(file) + .map_err(|e| UpdateError::Error(Box::new(e)))?; + } + } + + // The payload is valid, we can register it to the update store. + update_store + .register_update(meta, path, uuid) + .map(UpdateStatus::Pending) + .map_err(|e| UpdateError::Error(Box::new(e))) + }) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))? + } + + async fn handle_list_updates(&self, uuid: Uuid) -> Result> { + let update_store = self.store.get(uuid).await?; + tokio::task::spawn_blocking(move || { + let result = update_store + .ok_or(UpdateError::UnexistingIndex(uuid))? + .list() + .map_err(|e| UpdateError::Error(e.into()))?; + Ok(result) + }) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))? + } + + async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result { + let store = self + .store + .get(uuid) + .await? + .ok_or(UpdateError::UnexistingIndex(uuid))?; + let result = store + .meta(id) + .map_err(|e| UpdateError::Error(Box::new(e)))? + .ok_or(UpdateError::UnexistingUpdate(id))?; + Ok(result) + } + + async fn handle_delete(&self, uuid: Uuid) -> Result<()> { + let store = self.store.delete(uuid).await?; + + if let Some(store) = store { + tokio::task::spawn(async move { + let store = get_arc_ownership_blocking(store).await; + tokio::task::spawn_blocking(move || { + store.prepare_for_closing().wait(); + info!("Update store {} was closed.", uuid); + }); + }); + } + + Ok(()) + } + + async fn handle_create(&self, uuid: Uuid) -> Result<()> { + let _ = self.store.get_or_create(uuid).await?; + Ok(()) + } + + async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + let index_handle = self.index_handle.clone(); + if let Some(update_store) = self.store.get(uuid).await? { + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + // acquire write lock to prevent further writes during snapshot + // the update lock must be acquired BEFORE the write lock to prevent dead lock + let _lock = update_store.update_lock.lock(); + let mut txn = update_store.env.write_txn()?; + + // create db snapshot + update_store.snapshot(&mut txn, &path, uuid)?; + + futures::executor::block_on( + async move { index_handle.snapshot(uuid, path).await }, + )?; + Ok(()) + }) + .await + .map_err(|e| UpdateError::Error(e.into()))? + .map_err(|e| UpdateError::Error(e.into()))?; + } + + Ok(()) + } +} diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs new file mode 100644 index 00000000..59f67fbe --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -0,0 +1,96 @@ +use std::path::{Path, PathBuf}; + +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +use super::{ + MapUpdateStoreStore, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, + UpdateMsg, UpdateStatus, +}; +use crate::index_controller::IndexActorHandle; + +#[derive(Clone)] +pub struct UpdateActorHandleImpl { + sender: mpsc::Sender>, +} + +impl UpdateActorHandleImpl +where + D: AsRef<[u8]> + Sized + 'static + Sync + Send, +{ + pub fn new( + index_handle: I, + path: impl AsRef, + update_store_size: usize, + ) -> anyhow::Result + where + I: IndexActorHandle + Clone + Send + Sync + 'static, + { + let path = path.as_ref().to_owned().join("updates"); + let (sender, receiver) = mpsc::channel(100); + let store = MapUpdateStoreStore::new(index_handle.clone(), &path, update_store_size); + let actor = UpdateActor::new(store, receiver, path, index_handle)?; + + tokio::task::spawn(actor.run()); + + Ok(Self { sender }) + } +} +#[async_trait::async_trait] +impl UpdateActorHandle for UpdateActorHandleImpl +where + D: AsRef<[u8]> + Sized + 'static + Sync + Send, +{ + type Data = D; + + async fn update( + &self, + meta: UpdateMeta, + data: mpsc::Receiver>, + uuid: Uuid, + ) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Update { + uuid, + data, + meta, + ret, + }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + async fn get_all_updates_status(&self, uuid: Uuid) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::ListUpdates { uuid, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + + async fn update_status(&self, uuid: Uuid, id: u64) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::GetUpdate { uuid, id, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + + async fn delete(&self, uuid: Uuid) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Delete { uuid, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + + async fn create(&self, uuid: Uuid) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Create { uuid, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Snapshot { uuid, path, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } +} diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs new file mode 100644 index 00000000..8e6e3c21 --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -0,0 +1,37 @@ +use std::path::PathBuf; + +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +use super::{PayloadData, Result, UpdateMeta, UpdateStatus}; + +pub enum UpdateMsg { + Update { + uuid: Uuid, + meta: UpdateMeta, + data: mpsc::Receiver>, + ret: oneshot::Sender>, + }, + ListUpdates { + uuid: Uuid, + ret: oneshot::Sender>>, + }, + GetUpdate { + uuid: Uuid, + ret: oneshot::Sender>, + id: u64, + }, + Delete { + uuid: Uuid, + ret: oneshot::Sender>, + }, + Create { + uuid: Uuid, + ret: oneshot::Sender>, + }, + Snapshot { + uuid: Uuid, + path: PathBuf, + ret: oneshot::Sender>, + }, +} diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs new file mode 100644 index 00000000..f3c3caf0 --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -0,0 +1,55 @@ +mod actor; +mod handle_impl; +mod message; +mod store; +mod update_store; + +use std::path::PathBuf; + +use thiserror::Error; +use tokio::sync::mpsc; +use uuid::Uuid; + +use crate::index::UpdateResult; +use crate::index_controller::{UpdateMeta, UpdateStatus}; + +use actor::UpdateActor; +use message::UpdateMsg; +use store::{MapUpdateStoreStore, UpdateStoreStore}; + +pub use handle_impl::UpdateActorHandleImpl; + +pub type Result = std::result::Result; +type UpdateStore = update_store::UpdateStore; +type PayloadData = std::result::Result>; + +#[cfg(test)] +use mockall::automock; + +#[derive(Debug, Error)] +pub enum UpdateError { + #[error("error with update: {0}")] + Error(Box), + #[error("Index {0} doesn't exist.")] + UnexistingIndex(Uuid), + #[error("Update {0} doesn't exist.")] + UnexistingUpdate(u64), +} + +#[async_trait::async_trait] +#[cfg_attr(test, automock(type Data=Vec;))] +pub trait UpdateActorHandle { + type Data: AsRef<[u8]> + Sized + 'static + Sync + Send; + + async fn get_all_updates_status(&self, uuid: Uuid) -> Result>; + async fn update_status(&self, uuid: Uuid, id: u64) -> Result; + async fn delete(&self, uuid: Uuid) -> Result<()>; + async fn create(&self, uuid: Uuid) -> Result<()>; + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; + async fn update( + &self, + meta: UpdateMeta, + data: mpsc::Receiver>, + uuid: Uuid, + ) -> Result; +} diff --git a/meilisearch-http/src/index_controller/update_actor/store.rs b/meilisearch-http/src/index_controller/update_actor/store.rs new file mode 100644 index 00000000..676182a6 --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/store.rs @@ -0,0 +1,111 @@ +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use tokio::fs; +use tokio::sync::RwLock; +use uuid::Uuid; + +use super::{Result, UpdateError, UpdateStore}; +use crate::index_controller::IndexActorHandle; + +#[async_trait::async_trait] +pub trait UpdateStoreStore { + async fn get_or_create(&self, uuid: Uuid) -> Result>; + async fn delete(&self, uuid: Uuid) -> Result>>; + async fn get(&self, uuid: Uuid) -> Result>>; +} + +pub struct MapUpdateStoreStore { + db: Arc>>>, + index_handle: I, + path: PathBuf, + update_store_size: usize, +} + +impl MapUpdateStoreStore { + pub fn new(index_handle: I, path: impl AsRef, update_store_size: usize) -> Self { + let db = Arc::new(RwLock::new(HashMap::new())); + let path = path.as_ref().to_owned(); + Self { + db, + index_handle, + path, + update_store_size, + } + } +} + +#[async_trait::async_trait] +impl UpdateStoreStore for MapUpdateStoreStore +where + I: IndexActorHandle + Clone + Send + Sync + 'static, +{ + async fn get_or_create(&self, uuid: Uuid) -> Result> { + match self.db.write().await.entry(uuid) { + Entry::Vacant(e) => { + let mut options = heed::EnvOpenOptions::new(); + let update_store_size = self.update_store_size; + options.map_size(update_store_size); + let path = self.path.clone().join(format!("updates-{}", e.key())); + fs::create_dir_all(&path).await.unwrap(); + let index_handle = self.index_handle.clone(); + let store = UpdateStore::open(options, &path, move |meta, file| { + futures::executor::block_on(index_handle.update(meta, file)) + }) + .map_err(|e| UpdateError::Error(e.into()))?; + let store = e.insert(store); + Ok(store.clone()) + } + Entry::Occupied(e) => Ok(e.get().clone()), + } + } + + async fn get(&self, uuid: Uuid) -> Result>> { + let guard = self.db.read().await; + match guard.get(&uuid) { + Some(uuid) => Ok(Some(uuid.clone())), + None => { + // The index is not found in the found in the loaded indexes, so we attempt to load + // it from disk. We need to acquire a write lock **before** attempting to open the + // index, because someone could be trying to open it at the same time as us. + drop(guard); + let path = self.path.clone().join(format!("updates-{}", uuid)); + if path.exists() { + let mut guard = self.db.write().await; + match guard.entry(uuid) { + Entry::Vacant(entry) => { + // We can safely load the index + let index_handle = self.index_handle.clone(); + let mut options = heed::EnvOpenOptions::new(); + let update_store_size = self.update_store_size; + options.map_size(update_store_size); + let store = UpdateStore::open(options, &path, move |meta, file| { + futures::executor::block_on(index_handle.update(meta, file)) + }) + .map_err(|e| UpdateError::Error(e.into()))?; + let store = entry.insert(store); + Ok(Some(store.clone())) + } + Entry::Occupied(entry) => { + // The index was loaded while we attempted to to iter + Ok(Some(entry.get().clone())) + } + } + } else { + Ok(None) + } + } + } + } + + async fn delete(&self, uuid: Uuid) -> Result>> { + let store = self.db.write().await.remove(&uuid); + let path = self.path.clone().join(format!("updates-{}", uuid)); + if store.is_some() || path.exists() { + fs::remove_dir_all(path).await.unwrap(); + } + Ok(store) + } +} diff --git a/meilisearch-http/src/index_controller/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs similarity index 83% rename from meilisearch-http/src/index_controller/update_store.rs rename to meilisearch-http/src/index_controller/update_actor/update_store.rs index 6de30ab7..f8dcace7 100644 --- a/meilisearch-http/src/index_controller/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -1,10 +1,10 @@ -use std::fs::remove_file; +use std::fs::{copy, create_dir_all, remove_file}; use std::path::{Path, PathBuf}; use std::sync::Arc; use heed::types::{DecodeIgnore, OwnedType, SerdeJson}; -use heed::{Database, Env, EnvOpenOptions}; -use parking_lot::RwLock; +use heed::{CompactionOption, Database, Env, EnvOpenOptions}; +use parking_lot::{Mutex, RwLock}; use serde::{Deserialize, Serialize}; use std::fs::File; use tokio::sync::mpsc; @@ -16,7 +16,7 @@ type BEU64 = heed::zerocopy::U64; #[derive(Clone)] pub struct UpdateStore { - env: Env, + pub env: Env, pending_meta: Database, SerdeJson>>, pending: Database, SerdeJson>, processed_meta: Database, SerdeJson>>, @@ -24,6 +24,9 @@ pub struct UpdateStore { aborted_meta: Database, SerdeJson>>, processing: Arc>>>, notification_sender: mpsc::Sender<()>, + /// A lock on the update loop. This is meant to prevent a snapshot to occur while an update is + /// processing, while not preventing writes all together during an update + pub update_lock: Arc>, } pub trait HandleUpdate { @@ -76,6 +79,8 @@ where // Send a first notification to trigger the process. let _ = notification_sender.send(()); + let update_lock = Arc::new(Mutex::new(())); + let update_store = Arc::new(UpdateStore { env, pending, @@ -85,6 +90,7 @@ where notification_sender, failed_meta, processing, + update_lock, }); // We need a weak reference so we can take ownership on the arc later when we @@ -190,6 +196,7 @@ where where U: HandleUpdate, { + let _lock = self.update_lock.lock(); // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; let first_meta = self.pending_meta.first(&rtxn)?; @@ -371,94 +378,35 @@ where Ok(aborted_updates) } -} -//#[cfg(test)] -//mod tests { -//use super::*; -//use std::thread; -//use std::time::{Duration, Instant}; - -//#[test] -//fn simple() { -//let dir = tempfile::tempdir().unwrap(); -//let mut options = EnvOpenOptions::new(); -//options.map_size(4096 * 100); -//let update_store = UpdateStore::open( -//options, -//dir, -//|meta: Processing, _content: &_| -> Result<_, Failed<_, ()>> { -//let new_meta = meta.meta().to_string() + " processed"; -//let processed = meta.process(new_meta); -//Ok(processed) -//}, -//) -//.unwrap(); - -//let meta = String::from("kiki"); -//let update = update_store.register_update(meta, &[]).unwrap(); -//thread::sleep(Duration::from_millis(100)); -//let meta = update_store.meta(update.id()).unwrap().unwrap(); -//if let UpdateStatus::Processed(Processed { success, .. }) = meta { -//assert_eq!(success, "kiki processed"); -//} else { -//panic!() -//} -//} - -//#[test] -//#[ignore] -//fn long_running_update() { -//let dir = tempfile::tempdir().unwrap(); -//let mut options = EnvOpenOptions::new(); -//options.map_size(4096 * 100); -//let update_store = UpdateStore::open( -//options, -//dir, -//|meta: Processing, _content: &_| -> Result<_, Failed<_, ()>> { -//thread::sleep(Duration::from_millis(400)); -//let new_meta = meta.meta().to_string() + "processed"; -//let processed = meta.process(new_meta); -//Ok(processed) -//}, -//) -//.unwrap(); - -//let before_register = Instant::now(); - -//let meta = String::from("kiki"); -//let update_kiki = update_store.register_update(meta, &[]).unwrap(); -//assert!(before_register.elapsed() < Duration::from_millis(200)); - -//let meta = String::from("coco"); -//let update_coco = update_store.register_update(meta, &[]).unwrap(); -//assert!(before_register.elapsed() < Duration::from_millis(200)); - -//let meta = String::from("cucu"); -//let update_cucu = update_store.register_update(meta, &[]).unwrap(); -//assert!(before_register.elapsed() < Duration::from_millis(200)); - -//thread::sleep(Duration::from_millis(400 * 3 + 100)); - -//let meta = update_store.meta(update_kiki.id()).unwrap().unwrap(); -//if let UpdateStatus::Processed(Processed { success, .. }) = meta { -//assert_eq!(success, "kiki processed"); -//} else { -//panic!() -//} - -//let meta = update_store.meta(update_coco.id()).unwrap().unwrap(); -//if let UpdateStatus::Processed(Processed { success, .. }) = meta { -//assert_eq!(success, "coco processed"); -//} else { -//panic!() -//} - -//let meta = update_store.meta(update_cucu.id()).unwrap().unwrap(); -//if let UpdateStatus::Processed(Processed { success, .. }) = meta { -//assert_eq!(success, "cucu processed"); -//} else { -//panic!() -//} -//} -//} + pub fn snapshot( + &self, + txn: &mut heed::RwTxn, + path: impl AsRef, + uuid: Uuid, + ) -> anyhow::Result<()> { + let update_path = path.as_ref().join("updates"); + create_dir_all(&update_path)?; + + let mut snapshot_path = update_path.join(format!("update-{}", uuid)); + // acquire write lock to prevent further writes during snapshot + create_dir_all(&snapshot_path)?; + snapshot_path.push("data.mdb"); + + // create db snapshot + self.env + .copy_to_path(&snapshot_path, CompactionOption::Enabled)?; + + let update_files_path = update_path.join("update_files"); + create_dir_all(&update_files_path)?; + + for path in self.pending.iter(&txn)? { + let (_, path) = path?; + let name = path.file_name().unwrap(); + let to = update_files_path.join(name); + copy(path, to)?; + } + + Ok(()) + } +} diff --git a/meilisearch-http/src/index_controller/uuid_resolver.rs b/meilisearch-http/src/index_controller/uuid_resolver.rs deleted file mode 100644 index 328080d9..00000000 --- a/meilisearch-http/src/index_controller/uuid_resolver.rs +++ /dev/null @@ -1,310 +0,0 @@ -use std::{fs::create_dir_all, path::Path}; - -use heed::{ - types::{ByteSlice, Str}, - Database, Env, EnvOpenOptions, -}; -use log::{info, warn}; -use thiserror::Error; -use tokio::sync::{mpsc, oneshot}; -use uuid::Uuid; - -pub type Result = std::result::Result; - -#[derive(Debug)] -enum UuidResolveMsg { - Get { - uid: String, - ret: oneshot::Sender>, - }, - Create { - uid: String, - ret: oneshot::Sender>, - }, - Delete { - uid: String, - ret: oneshot::Sender>, - }, - List { - ret: oneshot::Sender>>, - }, - Insert { - uuid: Uuid, - name: String, - ret: oneshot::Sender>, - } -} - -struct UuidResolverActor { - inbox: mpsc::Receiver, - store: S, -} - -impl UuidResolverActor { - fn new(inbox: mpsc::Receiver, store: S) -> Self { - Self { inbox, store } - } - - async fn run(mut self) { - use UuidResolveMsg::*; - - info!("uuid resolver started"); - - loop { - match self.inbox.recv().await { - Some(Create { uid: name, ret }) => { - let _ = ret.send(self.handle_create(name).await); - } - Some(Get { uid: name, ret }) => { - let _ = ret.send(self.handle_get(name).await); - } - Some(Delete { uid: name, ret }) => { - let _ = ret.send(self.handle_delete(name).await); - } - Some(List { ret }) => { - let _ = ret.send(self.handle_list().await); - } - Some(Insert { ret, uuid, name }) => { - let _ = ret.send(self.handle_insert(name, uuid).await); - } - // all senders have been dropped, need to quit. - None => break, - } - } - - warn!("exiting uuid resolver loop"); - } - - async fn handle_create(&self, uid: String) -> Result { - if !is_index_uid_valid(&uid) { - return Err(UuidError::BadlyFormatted(uid)); - } - self.store.create_uuid(uid, true).await - } - - async fn handle_get(&self, uid: String) -> Result { - self.store - .get_uuid(uid.clone()) - .await? - .ok_or(UuidError::UnexistingIndex(uid)) - } - - async fn handle_delete(&self, uid: String) -> Result { - self.store - .delete(uid.clone()) - .await? - .ok_or(UuidError::UnexistingIndex(uid)) - } - - async fn handle_list(&self) -> Result> { - let result = self.store.list().await?; - Ok(result) - } - - async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> { - if !is_index_uid_valid(&uid) { - return Err(UuidError::BadlyFormatted(uid)); - } - self.store.insert(uid, uuid).await?; - Ok(()) - } -} - -fn is_index_uid_valid(uid: &str) -> bool { - uid.chars() - .all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_') -} - -#[derive(Clone)] -pub struct UuidResolverHandle { - sender: mpsc::Sender, -} - -impl UuidResolverHandle { - pub fn new(path: impl AsRef) -> anyhow::Result { - let (sender, reveiver) = mpsc::channel(100); - let store = HeedUuidStore::new(path)?; - let actor = UuidResolverActor::new(reveiver, store); - tokio::spawn(actor.run()); - Ok(Self { sender }) - } - - pub async fn get(&self, name: String) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Get { uid: name, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - pub async fn create(&self, name: String) -> anyhow::Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Create { uid: name, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - pub async fn delete(&self, name: String) -> anyhow::Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Delete { uid: name, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - pub async fn list(&self) -> anyhow::Result> { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::List { ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - pub async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Insert { ret, name, uuid }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } -} - -#[derive(Debug, Error)] -pub enum UuidError { - #[error("Name already exist.")] - NameAlreadyExist, - #[error("Index \"{0}\" doesn't exist.")] - UnexistingIndex(String), - #[error("Error performing task: {0}")] - TokioTask(#[from] tokio::task::JoinError), - #[error("Database error: {0}")] - Heed(#[from] heed::Error), - #[error("Uuid error: {0}")] - Uuid(#[from] uuid::Error), - #[error("Badly formatted index uid: {0}")] - BadlyFormatted(String), -} - -#[async_trait::async_trait] -trait UuidStore { - // Create a new entry for `name`. Return an error if `err` and the entry already exists, return - // the uuid otherwise. - async fn create_uuid(&self, uid: String, err: bool) -> Result; - async fn get_uuid(&self, uid: String) -> Result>; - async fn delete(&self, uid: String) -> Result>; - async fn list(&self) -> Result>; - async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; -} - -struct HeedUuidStore { - env: Env, - db: Database, -} - -impl HeedUuidStore { - fn new(path: impl AsRef) -> anyhow::Result { - let path = path.as_ref().join("index_uuids"); - create_dir_all(&path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(1_073_741_824); // 1GB - let env = options.open(path)?; - let db = env.create_database(None)?; - Ok(Self { env, db }) - } -} - -#[async_trait::async_trait] -impl UuidStore for HeedUuidStore { - async fn create_uuid(&self, name: String, err: bool) -> Result { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let mut txn = env.write_txn()?; - match db.get(&txn, &name)? { - Some(uuid) => { - if err { - Err(UuidError::NameAlreadyExist) - } else { - let uuid = Uuid::from_slice(uuid)?; - Ok(uuid) - } - } - None => { - let uuid = Uuid::new_v4(); - db.put(&mut txn, &name, uuid.as_bytes())?; - txn.commit()?; - Ok(uuid) - } - } - }) - .await? - } - - async fn get_uuid(&self, name: String) -> Result> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let txn = env.read_txn()?; - match db.get(&txn, &name)? { - Some(uuid) => { - let uuid = Uuid::from_slice(uuid)?; - Ok(Some(uuid)) - } - None => Ok(None), - } - }) - .await? - } - - async fn delete(&self, uid: String) -> Result> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let mut txn = env.write_txn()?; - match db.get(&txn, &uid)? { - Some(uuid) => { - let uuid = Uuid::from_slice(uuid)?; - db.delete(&mut txn, &uid)?; - txn.commit()?; - Ok(Some(uuid)) - } - None => Ok(None), - } - }) - .await? - } - - async fn list(&self) -> Result> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let txn = env.read_txn()?; - let mut entries = Vec::new(); - for entry in db.iter(&txn)? { - let (name, uuid) = entry?; - let uuid = Uuid::from_slice(uuid)?; - entries.push((name.to_owned(), uuid)) - } - Ok(entries) - }) - .await? - } - - async fn insert(&self, name: String, uuid: Uuid) -> Result<()> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let mut txn = env.write_txn()?; - db.put(&mut txn, &name, uuid.as_bytes())?; - txn.commit()?; - Ok(()) - }) - .await? - } -} diff --git a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs new file mode 100644 index 00000000..d5cde13e --- /dev/null +++ b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs @@ -0,0 +1,94 @@ +use std::path::PathBuf; + +use log::{info, warn}; +use tokio::sync::mpsc; +use uuid::Uuid; + +use super::{Result, UuidError, UuidResolveMsg, UuidStore}; + +pub struct UuidResolverActor { + inbox: mpsc::Receiver, + store: S, +} + +impl UuidResolverActor { + pub fn new(inbox: mpsc::Receiver, store: S) -> Self { + Self { inbox, store } + } + + pub async fn run(mut self) { + use UuidResolveMsg::*; + + info!("uuid resolver started"); + + loop { + match self.inbox.recv().await { + Some(Create { uid: name, ret }) => { + let _ = ret.send(self.handle_create(name).await); + } + Some(Get { uid: name, ret }) => { + let _ = ret.send(self.handle_get(name).await); + } + Some(Delete { uid: name, ret }) => { + let _ = ret.send(self.handle_delete(name).await); + } + Some(List { ret }) => { + let _ = ret.send(self.handle_list().await); + } + Some(Insert { ret, uuid, name }) => { + let _ = ret.send(self.handle_insert(name, uuid).await); + } + Some(SnapshotRequest { path, ret }) => { + let _ = ret.send(self.handle_snapshot(path).await); + } + // all senders have been dropped, need to quit. + None => break, + } + } + + warn!("exiting uuid resolver loop"); + } + + async fn handle_create(&self, uid: String) -> Result { + if !is_index_uid_valid(&uid) { + return Err(UuidError::BadlyFormatted(uid)); + } + self.store.create_uuid(uid, true).await + } + + async fn handle_get(&self, uid: String) -> Result { + self.store + .get_uuid(uid.clone()) + .await? + .ok_or(UuidError::UnexistingIndex(uid)) + } + + async fn handle_delete(&self, uid: String) -> Result { + self.store + .delete(uid.clone()) + .await? + .ok_or(UuidError::UnexistingIndex(uid)) + } + + async fn handle_list(&self) -> Result> { + let result = self.store.list().await?; + Ok(result) + } + + async fn handle_snapshot(&self, path: PathBuf) -> Result> { + self.store.snapshot(path).await + } + + async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> { + if !is_index_uid_valid(&uid) { + return Err(UuidError::BadlyFormatted(uid)); + } + self.store.insert(uid, uuid).await?; + Ok(()) + } +} + +fn is_index_uid_valid(uid: &str) -> bool { + uid.chars() + .all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_') +} diff --git a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs new file mode 100644 index 00000000..f8625b37 --- /dev/null +++ b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs @@ -0,0 +1,78 @@ +use std::path::{Path, PathBuf}; + +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +use super::{HeedUuidStore, Result, UuidResolveMsg, UuidResolverActor, UuidResolverHandle}; + +#[derive(Clone)] +pub struct UuidResolverHandleImpl { + sender: mpsc::Sender, +} + +impl UuidResolverHandleImpl { + pub fn new(path: impl AsRef) -> anyhow::Result { + let (sender, reveiver) = mpsc::channel(100); + let store = HeedUuidStore::new(path)?; + let actor = UuidResolverActor::new(reveiver, store); + tokio::spawn(actor.run()); + Ok(Self { sender }) + } +} + +#[async_trait::async_trait] +impl UuidResolverHandle for UuidResolverHandleImpl { + async fn get(&self, name: String) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::Get { uid: name, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } + + async fn create(&self, name: String) -> anyhow::Result { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::Create { uid: name, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } + + async fn delete(&self, name: String) -> anyhow::Result { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::Delete { uid: name, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } + + async fn list(&self) -> anyhow::Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::List { ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } + + async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::Insert { ret, name, uuid }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } + + async fn snapshot(&self, path: PathBuf) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::SnapshotRequest { path, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } +} diff --git a/meilisearch-http/src/index_controller/uuid_resolver/message.rs b/meilisearch-http/src/index_controller/uuid_resolver/message.rs new file mode 100644 index 00000000..975c709b --- /dev/null +++ b/meilisearch-http/src/index_controller/uuid_resolver/message.rs @@ -0,0 +1,32 @@ +use std::path::PathBuf; + +use tokio::sync::oneshot; +use uuid::Uuid; + +use super::Result; +pub enum UuidResolveMsg { + Get { + uid: String, + ret: oneshot::Sender>, + }, + Create { + uid: String, + ret: oneshot::Sender>, + }, + Delete { + uid: String, + ret: oneshot::Sender>, + }, + List { + ret: oneshot::Sender>>, + }, + Insert { + uuid: Uuid, + name: String, + ret: oneshot::Sender>, + }, + SnapshotRequest { + path: PathBuf, + ret: oneshot::Sender>>, + }, +} diff --git a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs new file mode 100644 index 00000000..43cd9995 --- /dev/null +++ b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs @@ -0,0 +1,49 @@ +mod actor; +mod handle_impl; +mod message; +mod store; + +use std::path::PathBuf; + +use thiserror::Error; +use uuid::Uuid; + +use actor::UuidResolverActor; +use message::UuidResolveMsg; +use store::{HeedUuidStore, UuidStore}; + +#[cfg(test)] +use mockall::automock; + +pub use handle_impl::UuidResolverHandleImpl; + +const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB + +pub type Result = std::result::Result; + +#[async_trait::async_trait] +#[cfg_attr(test, automock)] +pub trait UuidResolverHandle { + async fn get(&self, name: String) -> Result; + async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()>; + async fn create(&self, name: String) -> anyhow::Result; + async fn delete(&self, name: String) -> anyhow::Result; + async fn list(&self) -> anyhow::Result>; + async fn snapshot(&self, path: PathBuf) -> Result>; +} + +#[derive(Debug, Error)] +pub enum UuidError { + #[error("Name already exist.")] + NameAlreadyExist, + #[error("Index \"{0}\" doesn't exist.")] + UnexistingIndex(String), + #[error("Error performing task: {0}")] + TokioTask(#[from] tokio::task::JoinError), + #[error("Database error: {0}")] + Heed(#[from] heed::Error), + #[error("Uuid error: {0}")] + Uuid(#[from] uuid::Error), + #[error("Badly formatted index uid: {0}")] + BadlyFormatted(String), +} diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs new file mode 100644 index 00000000..43531491 --- /dev/null +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -0,0 +1,154 @@ +use std::fs::create_dir_all; +use std::path::{Path, PathBuf}; + +use heed::{ + types::{ByteSlice, Str}, + CompactionOption, Database, Env, EnvOpenOptions, +}; +use uuid::Uuid; + +use super::{Result, UuidError, UUID_STORE_SIZE}; + +#[async_trait::async_trait] +pub trait UuidStore { + // Create a new entry for `name`. Return an error if `err` and the entry already exists, return + // the uuid otherwise. + async fn create_uuid(&self, uid: String, err: bool) -> Result; + async fn get_uuid(&self, uid: String) -> Result>; + async fn delete(&self, uid: String) -> Result>; + async fn list(&self) -> Result>; + async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; + async fn snapshot(&self, path: PathBuf) -> Result>; +} + +pub struct HeedUuidStore { + env: Env, + db: Database, +} + +impl HeedUuidStore { + pub fn new(path: impl AsRef) -> anyhow::Result { + let path = path.as_ref().join("index_uuids"); + create_dir_all(&path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(UUID_STORE_SIZE); // 1GB + let env = options.open(path)?; + let db = env.create_database(None)?; + Ok(Self { env, db }) + } +} + +#[async_trait::async_trait] +impl UuidStore for HeedUuidStore { + async fn create_uuid(&self, name: String, err: bool) -> Result { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + let mut txn = env.write_txn()?; + match db.get(&txn, &name)? { + Some(uuid) => { + if err { + Err(UuidError::NameAlreadyExist) + } else { + let uuid = Uuid::from_slice(uuid)?; + Ok(uuid) + } + } + None => { + let uuid = Uuid::new_v4(); + db.put(&mut txn, &name, uuid.as_bytes())?; + txn.commit()?; + Ok(uuid) + } + } + }) + .await? + } + + async fn get_uuid(&self, name: String) -> Result> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + let txn = env.read_txn()?; + match db.get(&txn, &name)? { + Some(uuid) => { + let uuid = Uuid::from_slice(uuid)?; + Ok(Some(uuid)) + } + None => Ok(None), + } + }) + .await? + } + + async fn delete(&self, uid: String) -> Result> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + let mut txn = env.write_txn()?; + match db.get(&txn, &uid)? { + Some(uuid) => { + let uuid = Uuid::from_slice(uuid)?; + db.delete(&mut txn, &uid)?; + txn.commit()?; + Ok(Some(uuid)) + } + None => Ok(None), + } + }) + .await? + } + + async fn list(&self) -> Result> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + let txn = env.read_txn()?; + let mut entries = Vec::new(); + for entry in db.iter(&txn)? { + let (name, uuid) = entry?; + let uuid = Uuid::from_slice(uuid)?; + entries.push((name.to_owned(), uuid)) + } + Ok(entries) + }) + .await? + } + + async fn insert(&self, name: String, uuid: Uuid) -> Result<()> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + let mut txn = env.write_txn()?; + db.put(&mut txn, &name, uuid.as_bytes())?; + txn.commit()?; + Ok(()) + }) + .await? + } + + async fn snapshot(&self, mut path: PathBuf) -> Result> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + // Write transaction to acquire a lock on the database. + let txn = env.write_txn()?; + let mut entries = Vec::new(); + for entry in db.iter(&txn)? { + let (_, uuid) = entry?; + let uuid = Uuid::from_slice(uuid)?; + entries.push(uuid) + } + + // only perform snapshot if there are indexes + if !entries.is_empty() { + path.push("index_uuids"); + create_dir_all(&path).unwrap(); + path.push("data.mdb"); + env.copy_to_path(path, CompactionOption::Enabled)?; + } + Ok(entries) + }) + .await? + } +} diff --git a/meilisearch-http/src/option.rs b/meilisearch-http/src/option.rs index 82eb75fc..1997718c 100644 --- a/meilisearch-http/src/option.rs +++ b/meilisearch-http/src/option.rs @@ -191,8 +191,8 @@ pub struct Opt { pub schedule_snapshot: bool, /// Defines time interval, in seconds, between each snapshot creation. - #[structopt(long, env = "MEILI_SNAPSHOT_INTERVAL_SEC")] - pub snapshot_interval_sec: Option, + #[structopt(long, env = "MEILI_SNAPSHOT_INTERVAL_SEC", default_value = "86400")] // 24h + pub snapshot_interval_sec: u64, /// Folder where dumps are created when the dump route is called. #[structopt(long, env = "MEILI_DUMPS_DIR", default_value = "dumps/")] diff --git a/meilisearch-http/src/snapshot.rs b/meilisearch-http/src/snapshot.rs deleted file mode 100644 index 520044f8..00000000 --- a/meilisearch-http/src/snapshot.rs +++ /dev/null @@ -1,96 +0,0 @@ -use crate::Data; -use crate::error::Error; -use crate::helpers::compression; - -use log::error; -use std::fs::create_dir_all; -use std::path::Path; -use std::thread; -use std::time::{Duration}; -use tempfile::TempDir; - -pub fn load_snapshot( - db_path: &str, - snapshot_path: &Path, - ignore_snapshot_if_db_exists: bool, - ignore_missing_snapshot: bool -) -> Result<(), Error> { - let db_path = Path::new(db_path); - - if !db_path.exists() && snapshot_path.exists() { - compression::from_tar_gz(snapshot_path, db_path) - } else if db_path.exists() && !ignore_snapshot_if_db_exists { - Err(Error::Internal(format!("database already exists at {:?}, try to delete it or rename it", db_path.canonicalize().unwrap_or(db_path.into())))) - } else if !snapshot_path.exists() && !ignore_missing_snapshot { - Err(Error::Internal(format!("snapshot doesn't exist at {:?}", snapshot_path.canonicalize().unwrap_or(snapshot_path.into())))) - } else { - Ok(()) - } -} - -pub fn create_snapshot(data: &Data, snapshot_path: &Path) -> Result<(), Error> { - let tmp_dir = TempDir::new()?; - - data.db.copy_and_compact_to_path(tmp_dir.path())?; - - compression::to_tar_gz(tmp_dir.path(), snapshot_path).map_err(|e| Error::Internal(format!("something went wrong during snapshot compression: {}", e))) -} - -pub fn schedule_snapshot(data: Data, snapshot_dir: &Path, time_gap_s: u64) -> Result<(), Error> { - if snapshot_dir.file_name().is_none() { - return Err(Error::Internal("invalid snapshot file path".to_string())); - } - let db_name = Path::new(&data.db_path).file_name().ok_or_else(|| Error::Internal("invalid database name".to_string()))?; - create_dir_all(snapshot_dir)?; - let snapshot_path = snapshot_dir.join(format!("{}.snapshot", db_name.to_str().unwrap_or("data.ms"))); - - thread::spawn(move || loop { - if let Err(e) = create_snapshot(&data, &snapshot_path) { - error!("Unsuccessful snapshot creation: {}", e); - } - thread::sleep(Duration::from_secs(time_gap_s)); - }); - - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - use std::io::prelude::*; - use std::fs; - - #[test] - fn test_pack_unpack() { - let tempdir = TempDir::new().unwrap(); - - let test_dir = tempdir.path(); - let src_dir = test_dir.join("src"); - let dest_dir = test_dir.join("complex/destination/path/"); - let archive_path = test_dir.join("archive.snapshot"); - - let file_1_relative = Path::new("file1.txt"); - let subdir_relative = Path::new("subdir/"); - let file_2_relative = Path::new("subdir/file2.txt"); - - create_dir_all(src_dir.join(subdir_relative)).unwrap(); - fs::File::create(src_dir.join(file_1_relative)).unwrap().write_all(b"Hello_file_1").unwrap(); - fs::File::create(src_dir.join(file_2_relative)).unwrap().write_all(b"Hello_file_2").unwrap(); - - - assert!(compression::to_tar_gz(&src_dir, &archive_path).is_ok()); - assert!(archive_path.exists()); - assert!(load_snapshot(&dest_dir.to_str().unwrap(), &archive_path, false, false).is_ok()); - - assert!(dest_dir.exists()); - assert!(dest_dir.join(file_1_relative).exists()); - assert!(dest_dir.join(subdir_relative).exists()); - assert!(dest_dir.join(file_2_relative).exists()); - - let contents = fs::read_to_string(dest_dir.join(file_1_relative)).unwrap(); - assert_eq!(contents, "Hello_file_1"); - - let contents = fs::read_to_string(dest_dir.join(file_2_relative)).unwrap(); - assert_eq!(contents, "Hello_file_2"); - } -} diff --git a/meilisearch-http/tests/common/index.rs b/meilisearch-http/tests/common/index.rs index 8fda99ef..a7a9a537 100644 --- a/meilisearch-http/tests/common/index.rs +++ b/meilisearch-http/tests/common/index.rs @@ -96,7 +96,6 @@ impl Index<'_> { self.service.get(url).await } - #[allow(dead_code)] pub async fn list_updates(&self) -> (Value, StatusCode) { let url = format!("/indexes/{}/updates", self.uid); self.service.get(url).await diff --git a/meilisearch-http/tests/common/mod.rs b/meilisearch-http/tests/common/mod.rs index d1874ae8..e734b362 100644 --- a/meilisearch-http/tests/common/mod.rs +++ b/meilisearch-http/tests/common/mod.rs @@ -1,6 +1,6 @@ -mod index; -mod server; -mod service; +pub mod index; +pub mod server; +pub mod service; pub use index::{GetAllDocumentsOptions, GetDocumentOptions}; pub use server::Server; diff --git a/meilisearch-http/tests/common/server.rs b/meilisearch-http/tests/common/server.rs index 43caf1dc..4655b10a 100644 --- a/meilisearch-http/tests/common/server.rs +++ b/meilisearch-http/tests/common/server.rs @@ -1,3 +1,5 @@ +use std::path::Path; + use actix_web::http::StatusCode; use byte_unit::{Byte, ByteUnit}; use serde_json::Value; @@ -12,50 +14,33 @@ use super::service::Service; pub struct Server { pub service: Service, - // hod ownership to the tempdir while we use the server instance. - _dir: tempdir::TempDir, + // hold ownership to the tempdir while we use the server instance. + _dir: Option, } impl Server { pub async fn new() -> Self { let dir = TempDir::new("meilisearch").unwrap(); - let opt = Opt { - db_path: dir.path().join("db"), - dumps_dir: dir.path().join("dump"), - dump_batch_size: 16, - http_addr: "127.0.0.1:7700".to_owned(), - master_key: None, - env: "development".to_owned(), - no_analytics: true, - max_mdb_size: Byte::from_unit(4.0, ByteUnit::GiB).unwrap(), - max_udb_size: Byte::from_unit(4.0, ByteUnit::GiB).unwrap(), - http_payload_size_limit: Byte::from_unit(10.0, ByteUnit::MiB).unwrap(), - ssl_cert_path: None, - ssl_key_path: None, - ssl_auth_path: None, - ssl_ocsp_path: None, - ssl_require_auth: false, - ssl_resumption: false, - ssl_tickets: false, - import_snapshot: None, - ignore_missing_snapshot: false, - ignore_snapshot_if_db_exists: false, - snapshot_dir: ".".into(), - schedule_snapshot: false, - snapshot_interval_sec: None, - import_dump: None, - indexer_options: IndexerOpts::default(), - #[cfg(all(not(debug_assertions), feature = "sentry"))] - sentry_dsn: String::from(""), - #[cfg(all(not(debug_assertions), feature = "sentry"))] - no_sentry: true, - }; + let opt = default_settings(dir.path()); let data = Data::new(opt).unwrap(); let service = Service(data); - Server { service, _dir: dir } + Server { + service, + _dir: Some(dir), + } + } + + pub async fn new_with_options(opt: Opt) -> Self { + let data = Data::new(opt).unwrap(); + let service = Service(data); + + Server { + service, + _dir: None, + } } /// Returns a view to an index. There is no guarantee that the index exists. @@ -74,3 +59,37 @@ impl Server { self.service.get("/version").await } } + +pub fn default_settings(dir: impl AsRef) -> Opt { + Opt { + db_path: dir.as_ref().join("db"), + dumps_dir: dir.as_ref().join("dump"), + dump_batch_size: 16, + http_addr: "127.0.0.1:7700".to_owned(), + master_key: None, + env: "development".to_owned(), + no_analytics: true, + max_mdb_size: Byte::from_unit(4.0, ByteUnit::GiB).unwrap(), + max_udb_size: Byte::from_unit(4.0, ByteUnit::GiB).unwrap(), + http_payload_size_limit: Byte::from_unit(10.0, ByteUnit::MiB).unwrap(), + ssl_cert_path: None, + ssl_key_path: None, + ssl_auth_path: None, + ssl_ocsp_path: None, + ssl_require_auth: false, + ssl_resumption: false, + ssl_tickets: false, + import_snapshot: None, + ignore_missing_snapshot: false, + ignore_snapshot_if_db_exists: false, + snapshot_dir: ".".into(), + schedule_snapshot: false, + snapshot_interval_sec: 0, + import_dump: None, + indexer_options: IndexerOpts::default(), + #[cfg(all(not(debug_assertions), feature = "sentry"))] + sentry_dsn: String::from(""), + #[cfg(all(not(debug_assertions), feature = "sentry"))] + no_sentry: true, + } +} diff --git a/meilisearch-http/tests/integration.rs b/meilisearch-http/tests/integration.rs index 8acc75ff..b414072d 100644 --- a/meilisearch-http/tests/integration.rs +++ b/meilisearch-http/tests/integration.rs @@ -3,8 +3,9 @@ mod documents; mod index; mod search; mod settings; -mod updates; +mod snapshot; mod stats; +mod updates; // Tests are isolated by features in different modules to allow better readability, test // targetability, and improved incremental compilation times. diff --git a/meilisearch-http/tests/snapshot/mod.rs b/meilisearch-http/tests/snapshot/mod.rs new file mode 100644 index 00000000..36763a1a --- /dev/null +++ b/meilisearch-http/tests/snapshot/mod.rs @@ -0,0 +1,53 @@ +use std::time::Duration; + +use crate::common::server::default_settings; +use crate::common::GetAllDocumentsOptions; +use crate::common::Server; +use tokio::time::sleep; + +use meilisearch_http::Opt; + +#[ignore] +#[actix_rt::test] +async fn perform_snapshot() { + let temp = tempfile::tempdir_in(".").unwrap(); + let snapshot_dir = tempfile::tempdir_in(".").unwrap(); + + let options = Opt { + snapshot_dir: snapshot_dir.path().to_owned(), + snapshot_interval_sec: 1, + schedule_snapshot: true, + ..default_settings(temp.path()) + }; + + let server = Server::new_with_options(options).await; + let index = server.index("test"); + index.load_test_set().await; + + let (response, _) = index + .get_all_documents(GetAllDocumentsOptions::default()) + .await; + + sleep(Duration::from_secs(2)).await; + + let temp = tempfile::tempdir_in(".").unwrap(); + + let snapshot_path = snapshot_dir + .path() + .to_owned() + .join(format!("db.snapshot")); + + let options = Opt { + import_snapshot: Some(snapshot_path), + ..default_settings(temp.path()) + }; + + let server = Server::new_with_options(options).await; + let index = server.index("test"); + + let (response_from_snapshot, _) = index + .get_all_documents(GetAllDocumentsOptions::default()) + .await; + + assert_eq!(response, response_from_snapshot); +} diff --git a/meilisearch-http/tests/stats/mod.rs b/meilisearch-http/tests/stats/mod.rs index 5e3c9db4..db04fb1c 100644 --- a/meilisearch-http/tests/stats/mod.rs +++ b/meilisearch-http/tests/stats/mod.rs @@ -18,4 +18,4 @@ async fn test_healthyness() { let (response, status_code) = server.service.get("/health").await; assert_eq!(status_code, 200); assert_eq!(response["status"], "available"); - } +}