Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
refactor(http): use macros to generate actor handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Apr 2, 2021
1 parent 2206a44 commit 3e875b0
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 222 deletions.
143 changes: 33 additions & 110 deletions meilisearch-http/src/index_controller/index_actor/handle_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,126 +3,20 @@ use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

use super::{
IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult,
};
use crate::index::{Document, SearchQuery, SearchResult, Settings};
use crate::index_controller::IndexSettings;
use crate::index_controller::{updates::Processing, UpdateMeta};

use super::{
IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult,
};

#[derive(Clone)]
pub struct IndexActorHandleImpl {
read_sender: mpsc::Sender<IndexMsg>,
write_sender: mpsc::Sender<IndexMsg>,
}

#[async_trait::async_trait]
impl IndexActorHandle for IndexActorHandleImpl {
async fn create_index(&self, uuid: Uuid, primary_key: Option<String>) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::CreateIndex {
ret,
uuid,
primary_key,
};
let _ = self.read_sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}

async fn update(
&self,
meta: Processing<UpdateMeta>,
data: std::fs::File,
) -> anyhow::Result<UpdateResult> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Update { ret, meta, data };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}

async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Search { uuid, query, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}

async fn settings(&self, uuid: Uuid) -> Result<Settings> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Settings { uuid, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}

async fn documents(
&self,
uuid: Uuid,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Vec<Document>> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Documents {
uuid,
ret,
offset,
attributes_to_retrieve,
limit,
};
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}

async fn document(
&self,
uuid: Uuid,
doc_id: String,
attributes_to_retrieve: Option<Vec<String>>,
) -> Result<Document> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Document {
uuid,
ret,
doc_id,
attributes_to_retrieve,
};
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}

async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Delete { uuid, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}

async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::GetMeta { uuid, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}

async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result<IndexMeta> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::UpdateIndex {
uuid,
index_settings,
ret,
};
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}

async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Snapshot { uuid, path, ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
}

impl IndexActorHandleImpl {
pub fn new(path: impl AsRef<Path>, index_size: usize) -> anyhow::Result<Self> {
let (read_sender, read_receiver) = mpsc::channel(100);
Expand All @@ -137,3 +31,32 @@ impl IndexActorHandleImpl {
})
}
}

macro_rules! handler {
($({$fn_name:ident, $message:ident, [$($arg:ident: $arg_type:ty),*], $return:ty}),*) => {
#[async_trait::async_trait]
impl IndexActorHandle for IndexActorHandleImpl {
$(
async fn $fn_name(&self, $($arg: $arg_type, )*) -> $return {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::$message { $($arg,)* ret };
let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?)
}
)*
}
};
}

handler!(
{update, Update, [meta: Processing<UpdateMeta>, data: std::fs::File], anyhow::Result<UpdateResult>},
{create_index, CreateIndex, [uuid: Uuid, primary_key: Option<String>], Result<IndexMeta>},
{search, Search, [uuid: Uuid, query: SearchQuery], Result<SearchResult>},
{settings, Settings, [uuid: Uuid], Result<Settings>},
{documents, Documents, [uuid: Uuid, offset: usize, limit: usize, attributes_to_retrieve: Option<Vec<String>>], Result<Vec<Document>>},
{document, Document, [uuid: Uuid, doc_id: String, attributes_to_retrieve: Option<Vec<String>>], Result<Document>},
{delete, Delete, [uuid: Uuid], Result<()>},
{get_index_meta, GetMeta, [uuid: Uuid], Result<IndexMeta>},
{update_index, UpdateIndex, [uuid: Uuid, index_settings: IndexSettings], Result<IndexMeta>},
{snapshot, Snapshot, [uuid: Uuid, path: PathBuf], Result<()>}
);
86 changes: 29 additions & 57 deletions meilisearch-http/src/index_controller/update_actor/handle_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use std::path::{Path, PathBuf};
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

use crate::index_controller::IndexActorHandle;

use super::{
MapUpdateStoreStore, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta,
UpdateMsg, UpdateStatus,
};
use crate::index_controller::IndexActorHandle;

#[derive(Clone)]
pub struct UpdateActorHandleImpl<D> {
Expand Down Expand Up @@ -36,61 +37,32 @@ where
Ok(Self { sender })
}
}
#[async_trait::async_trait]
impl<D> UpdateActorHandle for UpdateActorHandleImpl<D>
where
D: AsRef<[u8]> + Sized + 'static + Sync + Send,
{
type Data = D;

async fn update(
&self,
meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<Self::Data>>,
uuid: Uuid,
) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Update {
uuid,
data,
meta,
ret,
};
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::ListUpdates { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}

async fn update_status(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::GetUpdate { uuid, id, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}

async fn delete(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Delete { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}

async fn create(&self, uuid: Uuid) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Create { uuid, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}

async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Snapshot { uuid, path, ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
macro_rules! handler {
($({$fn_name:ident, $message:ident, [$($arg:ident: $arg_type:ty),*], $return:ty}),*) => {
#[async_trait::async_trait]
impl<D> UpdateActorHandle for UpdateActorHandleImpl<D>
where
D: AsRef<[u8]> + Sized + 'static + Sync + Send,
{
type Data = D;
$(
async fn $fn_name(&self, $($arg: $arg_type, )*) -> $return {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::$message { $($arg,)* ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("UpdateActor has been killed")?)
}
)*
}
};
}

handler!(
{update, Update, [meta: UpdateMeta, data: mpsc::Receiver<PayloadData<Self::Data>>, uuid: Uuid], Result<UpdateStatus>},
{get_all_updates_status, ListUpdates, [uuid: Uuid], Result<Vec<UpdateStatus>>},
{update_status, GetUpdate, [uuid: Uuid, id: u64], Result<UpdateStatus>},
{delete, Delete, [uuid: Uuid], Result<()>},
{create, Create, [uuid: Uuid], Result<()>},
{snapshot, Snapshot, [uuid: Uuid, path: PathBuf], Result<()>}
);
78 changes: 23 additions & 55 deletions meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,59 +20,27 @@ impl UuidResolverHandleImpl {
}
}

#[async_trait::async_trait]
impl UuidResolverHandle for UuidResolverHandleImpl {
async fn get(&self, name: String) -> Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Get { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}

async fn create(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Create { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}

async fn delete(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Delete { uid: name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}

async fn list(&self) -> anyhow::Result<Vec<(String, Uuid)>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::List { ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}

async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Insert { ret, name, uuid };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}

async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::SnapshotRequest { path, ret };
let _ = self.sender.send(msg).await;
Ok(receiver
.await
.expect("Uuid resolver actor has been killed")?)
}
macro_rules! handler {
($({$fn_name:ident, $message:ident, [$($arg:ident: $arg_type:ty),*], $return:ty}),*) => {
#[async_trait::async_trait]
impl UuidResolverHandle for UuidResolverHandleImpl {
$(
async fn $fn_name(&self, $($arg: $arg_type, )*) -> $return {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::$message { $($arg,)* ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("UuidResolverActor has been killed")?)
}
)*
}
};
}

handler!(
{get, Get, [uid: String], Result<Uuid>},
{create, Create, [uid: String], anyhow::Result<Uuid>},
{delete, Delete, [uid: String], anyhow::Result<Uuid>},
{list, List, [], anyhow::Result<Vec<(String, Uuid)>>},
{insert, Insert, [name: String, uuid: Uuid], anyhow::Result<()>},
{snapshot, SnapshotRequest, [path: PathBuf], Result<Vec<Uuid>>}
);

0 comments on commit 3e875b0

Please sign in to comment.