Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Bifrost Appender API in Shuffler #2659

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 64 additions & 14 deletions crates/worker/src/partition/shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use restate_storage_api::deduplication_table::DedupInformation;
use restate_storage_api::outbox_table::OutboxMessage;
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, WithPartitionKey};
use restate_types::message::MessageIndex;
use restate_wal_protocol::{append_envelope_to_bifrost, Destination, Envelope, Header, Source};
use restate_wal_protocol::{Destination, Envelope, Header, Source};

use crate::partition::shuffle::state_machine::StateMachine;
use crate::partition::types::OutboxMessageExt;
Expand Down Expand Up @@ -223,12 +223,11 @@ where
let state_machine = StateMachine::new(
metadata,
outbox_reader,
move |msg| {
let bifrost = bifrost.clone();
async move {
append_envelope_to_bifrost(&bifrost, Arc::new(msg)).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether creating Appenders is so expensive. It looks that we might save a few atomic writes. Did you measure the impact of your changes? What could make a difference is the usage of the BackgroundAppender which also supports batching of appends. But this will require a bigger refactoring of the shuffler to make it work, I believe.

Ok(())
}
bifrost,
move |mut appender, msg| async move {
let _ = appender.append(Arc::new(msg)).await?;

Ok(())
},
&mut hint_rx,
);
Expand Down Expand Up @@ -257,16 +256,22 @@ where

mod state_machine {
use pin_project::pin_project;
use restate_bifrost::{Appender, Bifrost, ErrorRecoveryStrategy};
use restate_core::Metadata;
use restate_storage_api::outbox_table::OutboxMessage;
use restate_types::identifiers::{PartitionKey, WithPartitionKey};
use restate_types::logs::LogId;
use restate_types::message::MessageIndex;
use restate_types::partition_table::FindPartition;
use restate_types::Version;
use restate_wal_protocol::Envelope;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use tokio_util::sync::ReusableBoxFuture;
use tracing::trace;

use restate_storage_api::outbox_table::OutboxMessage;
use restate_types::message::MessageIndex;
use restate_wal_protocol::Envelope;

use crate::partition::shuffle;
use crate::partition::shuffle::{
wrap_outbox_message_in_envelope, NewOutboxMessage, OutboxReaderError, ShuffleMetadata,
Expand All @@ -292,6 +297,8 @@ mod state_machine {
metadata: ShuffleMetadata,
current_sequence_number: MessageIndex,
outbox_reader: Option<OutboxReader>,
bifrost: Bifrost,
appenders: HashMap<LogId, Appender>,
read_future: ReadFuture<OutboxReader>,
send_operation: SendOp,
hint_rx: &'a mut async_channel::Receiver<NewOutboxMessage>,
Expand All @@ -313,12 +320,13 @@ mod state_machine {
impl<'a, OutboxReader, SendOp, SendFuture> StateMachine<'a, OutboxReader, SendOp, SendFuture>
where
SendFuture: Future<Output = Result<(), anyhow::Error>>,
SendOp: Fn(Envelope) -> SendFuture,
SendOp: Fn(Appender, Envelope) -> SendFuture,
OutboxReader: shuffle::OutboxReader + Send + Sync + 'static,
{
pub(super) fn new(
metadata: ShuffleMetadata,
outbox_reader: OutboxReader,
bifrost: Bifrost,
send_operation: SendOp,
hint_rx: &'a mut async_channel::Receiver<NewOutboxMessage>,
) -> Self {
Expand All @@ -332,6 +340,8 @@ mod state_machine {
metadata,
current_sequence_number,
outbox_reader: None,
bifrost,
appenders: Default::default(),
read_future: ReusableBoxFuture::new(reading_future),
send_operation,
hint_rx,
Expand Down Expand Up @@ -363,7 +373,13 @@ mod state_machine {
seq_number,
this.metadata,
);
let send_future = (this.send_operation)(envelope);
let appender = Self::get_appender(
this.bifrost,
this.appenders,
envelope.partition_key(),
)
.await?;
let send_future = (this.send_operation)(appender, envelope);
this.state.set(State::Sending(send_future));
break;
}
Expand Down Expand Up @@ -398,7 +414,13 @@ mod state_machine {

let envelope =
wrap_outbox_message_in_envelope(message, seq_number, this.metadata);
let send_future = (this.send_operation)(envelope);
let appender = Self::get_appender(
this.bifrost,
this.appenders,
envelope.partition_key(),
)
.await?;
let send_future = (this.send_operation)(appender, envelope);

this.state.set(State::Sending(send_future));
} else {
Expand All @@ -424,6 +446,34 @@ mod state_machine {
}
}
}

async fn get_appender(
bifrost: &Bifrost,
appenders: &mut HashMap<LogId, Appender>,
partition_key: PartitionKey,
) -> Result<Appender, anyhow::Error> {
let partition_id = {
// make sure we drop pinned partition table before awaiting
let partition_table = Metadata::current()
.wait_for_partition_table(Version::MIN)
.await?;
partition_table.find_partition_id(partition_key)?
};
let log_id = LogId::from(*partition_id);

let appender_entry = appenders.entry(log_id);
let appender = match appender_entry {
std::collections::hash_map::Entry::Vacant(ve) => {
let value =
bifrost.create_appender(log_id, ErrorRecoveryStrategy::default())?;
Ok::<_, anyhow::Error>(ve.insert(value))
}
std::collections::hash_map::Entry::Occupied(oe) => {
Ok::<_, anyhow::Error>(oe.into_mut())
}
}?;
Ok(appender.clone())
}
}
}

Expand Down
Loading