Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-1713 Bulk Write #1034

Merged
merged 80 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 67 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
34b1579
api shell
isabelatkinson Nov 2, 2023
8e387ab
basic command building and outcome testing
isabelatkinson Nov 10, 2023
c4ffe01
add results
isabelatkinson Dec 7, 2023
fd4232d
top level options support and testing
isabelatkinson Dec 12, 2023
28d1bb6
impl and refactor
isabelatkinson Dec 12, 2023
42a2ae4
add server responses file
isabelatkinson Feb 26, 2024
6588a5e
add batching prose tests
isabelatkinson Feb 26, 2024
3a84805
update tests
isabelatkinson Feb 29, 2024
3a9d970
rebase fixes
isabelatkinson Feb 29, 2024
1c8fbc2
resync retryable writes/transactions
isabelatkinson Feb 29, 2024
84203d7
fix transactions test typo
isabelatkinson Feb 29, 2024
a491642
rustdoc workaround
isabelatkinson Feb 29, 2024
47a5975
resync tests
isabelatkinson Mar 4, 2024
29938fa
remove extra file
isabelatkinson Mar 4, 2024
040d807
fix msrv failure
isabelatkinson Mar 4, 2024
d42fe0e
reorg
isabelatkinson Mar 4, 2024
ad7aa44
fix files
isabelatkinson Mar 4, 2024
321b64b
abraham comments
isabelatkinson Mar 6, 2024
31d1c9e
add network errors test
isabelatkinson Apr 2, 2024
199163f
upserted id changes
isabelatkinson Apr 2, 2024
77daa8e
rename write concern error message
isabelatkinson Apr 2, 2024
1193a5c
add cursor iteration test
isabelatkinson Apr 2, 2024
d62beda
sync retryability tests
isabelatkinson Apr 3, 2024
427ec51
write concern error prose test, refactor fail point
isabelatkinson Apr 4, 2024
a1375a8
resync retryable writes
isabelatkinson Apr 4, 2024
6d2b8f2
sync bypassdocumentvalidation test
isabelatkinson Apr 4, 2024
b80ac98
add partial result check
isabelatkinson Apr 4, 2024
d566c99
prose test updates
isabelatkinson Apr 4, 2024
37e0a73
Merge branch 'main' into bulk-write-merge
isabelatkinson Apr 4, 2024
02c54d2
rework cursor test
isabelatkinson Apr 5, 2024
8f0dd5e
add pipeline tests
isabelatkinson Apr 5, 2024
6fd1a7a
validate update and replacement documents
isabelatkinson Apr 5, 2024
927e9c5
simplify check
isabelatkinson Apr 5, 2024
a2ffd7a
allow empty replaces
isabelatkinson Apr 5, 2024
7b11bf0
add failed iteration test
isabelatkinson Apr 5, 2024
6f26067
empty models test
isabelatkinson Apr 9, 2024
04f06bd
write concern
isabelatkinson Apr 9, 2024
4a07b61
more write concern tests
isabelatkinson Apr 9, 2024
37852df
add retryWrites:false test
isabelatkinson Apr 9, 2024
733cfb9
remove OperationResponse
isabelatkinson Apr 9, 2024
de46b4a
use pinned connection for cursor
isabelatkinson Apr 9, 2024
acbc534
reduce _id size
isabelatkinson Apr 9, 2024
20ce39d
fix fle
isabelatkinson Apr 10, 2024
e82caf2
Merge branch 'main' into bulk-write
isabelatkinson Apr 10, 2024
cfd0c3e
strip extra mongoses
isabelatkinson Apr 10, 2024
dbe4352
rework iteration tests
isabelatkinson Apr 11, 2024
a5612cd
more assertions
isabelatkinson Apr 11, 2024
fe6de0a
skip and sync files
isabelatkinson Apr 11, 2024
356248a
remove method
isabelatkinson Apr 11, 2024
ecb8973
fix integer casts
isabelatkinson Apr 11, 2024
36ba52c
skip transaction test on standalone
isabelatkinson Apr 11, 2024
41194ab
don't use multiple mongoses
isabelatkinson Apr 17, 2024
0f2a523
update transactions tests
isabelatkinson Apr 17, 2024
956729e
add file
isabelatkinson Apr 17, 2024
c943b89
transaction test, change error expectations
isabelatkinson Apr 17, 2024
0cb9676
fail point improvements
isabelatkinson Apr 18, 2024
a2c52e8
add file
isabelatkinson Apr 18, 2024
ecd7d7e
Merge branch 'main' into bulk-write
isabelatkinson Apr 18, 2024
cef01ed
small cleanup
isabelatkinson Apr 19, 2024
5ee0bd6
add namespace batching test
isabelatkinson Apr 22, 2024
9ad9e88
retry getMore
isabelatkinson Apr 24, 2024
f99b736
rework batch splitting sizes
isabelatkinson Apr 25, 2024
0e89378
Revert "retry getMore"
isabelatkinson Apr 25, 2024
a815a75
add ns size batching test
isabelatkinson Apr 25, 2024
ad579fe
different ns info batching test
isabelatkinson Apr 26, 2024
72ee8c5
too large test
isabelatkinson Apr 26, 2024
7fcec2d
skip tests
isabelatkinson Apr 26, 2024
e23d49c
rework namespace splitting test
isabelatkinson May 1, 2024
d648462
Merge branch 'main' into bulk-write
isabelatkinson May 2, 2024
c598745
error for fle
isabelatkinson May 3, 2024
98eb33c
sync retryable writes
isabelatkinson May 3, 2024
808a64a
add encryption test
isabelatkinson May 3, 2024
3c5a936
sync transaction tests
isabelatkinson May 3, 2024
fbc6651
minor: bump clippy to 1.78.0
isabelatkinson May 3, 2024
c086020
kevin comments
isabelatkinson May 9, 2024
11dba57
update operation methods
isabelatkinson May 13, 2024
81bde91
RUST-1921 Sign crate on release (#1095)
abr-egn May 13, 2024
d425f73
RUST-1945 Add a `with_type` method to the `Aggregate` action (#1100)
isabelatkinson May 13, 2024
5173765
Merge branch 'main' into bulk-write
isabelatkinson May 14, 2024
8956a55
Merge branch 'main' into bulk-write
isabelatkinson May 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ tokio = { version = ">= 0.0.0", features = ["fs", "parking_lot"] }
tracing-subscriber = "0.3.16"
regex = "1.6.0"
serde-hex = "0.1.0"
serde_path_to_error = "0.1.15"

[package.metadata.docs.rs]
rustdoc-args = ["--cfg", "docsrs"]
Expand Down
5 changes: 4 additions & 1 deletion src/action.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Action builder types.

mod aggregate;
mod bulk_write;
mod client_options;
mod count;
mod create_collection;
Expand Down Expand Up @@ -31,8 +32,10 @@ mod watch;

use std::{future::IntoFuture, marker::PhantomData, ops::Deref};

use crate::bson::Document;

pub use aggregate::Aggregate;
use bson::Document;
pub use bulk_write::BulkWrite;
pub use client_options::ParseConnectionString;
pub use count::{CountDocuments, EstimatedDocumentCount};
pub use create_collection::CreateCollection;
Expand Down
215 changes: 215 additions & 0 deletions src/action/bulk_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
#![allow(missing_docs)]
abr-egn marked this conversation as resolved.
Show resolved Hide resolved

use std::collections::HashMap;

use crate::{
bson::{Bson, Document},
error::{ClientBulkWriteError, Error, ErrorKind, Result},
operation::bulk_write::BulkWrite as BulkWriteOperation,
options::{BulkWriteOptions, WriteConcern, WriteModel},
results::BulkWriteResult,
Client,
ClientSession,
};

use super::{action_impl, option_setters};

impl Client {
pub fn bulk_write(&self, models: impl IntoIterator<Item = WriteModel>) -> BulkWrite {
BulkWrite::new(self, models.into_iter().collect())
}
}

#[must_use]
pub struct BulkWrite<'a> {
client: &'a Client,
models: Vec<WriteModel>,
options: Option<BulkWriteOptions>,
session: Option<&'a mut ClientSession>,
}

impl<'a> BulkWrite<'a> {
fn new(client: &'a Client, models: Vec<WriteModel>) -> Self {
Self {
client,
models,
options: None,
session: None,
}
}

fn is_ordered(&self) -> bool {
self.options
.as_ref()
.and_then(|options| options.ordered)
.unwrap_or(true)
}
}

impl<'a> BulkWrite<'a> {
kevinAlbs marked this conversation as resolved.
Show resolved Hide resolved
option_setters!(options: BulkWriteOptions;
ordered: bool,
bypass_document_validation: bool,
comment: Bson,
let_vars: Document,
verbose_results: bool,
write_concern: WriteConcern,
);

pub fn session(mut self, session: &'a mut ClientSession) -> BulkWrite<'a> {
self.session = Some(session);
self
}
}

#[action_impl]
impl<'a> Action for BulkWrite<'a> {
type Future = BulkWriteFuture;

async fn execute(mut self) -> Result<BulkWriteResult> {
resolve_write_concern_with_session!(
self.client,
self.options,
self.session.as_deref_mut()
)?;

let mut total_attempted = 0;
let mut execution_status = ExecutionStatus::None;

while total_attempted < self.models.len()
&& execution_status.should_continue(self.is_ordered())
{
let mut operation = BulkWriteOperation::new(
self.client.clone(),
&self.models[total_attempted..],
total_attempted,
self.options.as_ref(),
)
.await;
let result = self
.client
.execute_operation::<BulkWriteOperation>(
&mut operation,
self.session.as_deref_mut(),
)
.await;
total_attempted += operation.n_attempted;

match result {
Ok(result) => {
execution_status = execution_status.with_success(result);
}
Err(error) => {
execution_status = execution_status.with_failure(error);
}
}
}

match execution_status {
ExecutionStatus::Success(bulk_write_result) => Ok(bulk_write_result),
ExecutionStatus::Error(error) => Err(error),
ExecutionStatus::None => Err(ErrorKind::InvalidArgument {
message: "bulk_write must be provided at least one write operation".into(),
}
.into()),
}
}
}

/// Represents the execution status of a bulk write. The status starts at `None`, indicating that no
/// writes have been attempted yet, and transitions to either `Success` or `Error` as batches are
/// executed. The contents of `Error` can be inspected to determine whether a bulk write can
/// continue with further batches or should be terminated.
enum ExecutionStatus {
Success(BulkWriteResult),
Error(Error),
None,
}

impl ExecutionStatus {
fn with_success(mut self, result: BulkWriteResult) -> Self {
match self {
// Merge two successful sets of results together.
Self::Success(ref mut current_result) => {
current_result.merge(result);
self
}
// Merge the results of the new batch into the existing bulk write error.
Self::Error(ref mut current_error) => {
let bulk_write_error = Self::get_current_bulk_write_error(current_error);
bulk_write_error.merge_partial_results(result);
self
}
Self::None => Self::Success(result),
}
}

fn with_failure(self, mut error: Error) -> Self {
match self {
// If the new error is a BulkWriteError, merge the successful results into the error's
// partial result. Otherwise, create a new BulkWriteError with the existing results and
// set its source as the error that just occurred.
abr-egn marked this conversation as resolved.
Show resolved Hide resolved
Self::Success(current_result) => match *error.kind {
ErrorKind::ClientBulkWrite(ref mut bulk_write_error) => {
bulk_write_error.merge_partial_results(current_result);
Self::Error(error)
}
_ => {
let bulk_write_error: Error =
ErrorKind::ClientBulkWrite(ClientBulkWriteError {
write_errors: HashMap::new(),
write_concern_errors: Vec::new(),
partial_result: Some(current_result),
})
.into();
Self::Error(bulk_write_error.with_source(error))
}
},
// If the new error is a BulkWriteError, merge its contents with the existing error.
// Otherwise, set the new error as the existing error's source.
Self::Error(mut current_error) => match *error.kind {
ErrorKind::ClientBulkWrite(bulk_write_error) => {
let current_bulk_write_error =
Self::get_current_bulk_write_error(&mut current_error);
current_bulk_write_error.merge(bulk_write_error);
Self::Error(current_error)
}
_ => Self::Error(current_error.with_source(error)),
},
Self::None => Self::Error(error),
}
}

/// Gets a BulkWriteError from a given Error. This method should only be called when adding a
/// new result or error to the existing state, as it requires that the given Error's kind is
/// ClientBulkWrite.
fn get_current_bulk_write_error(error: &mut Error) -> &mut ClientBulkWriteError {
match *error.kind {
ErrorKind::ClientBulkWrite(ref mut bulk_write_error) => bulk_write_error,
_ => unreachable!(),
}
}

/// Whether further bulk write batches should be executed based on the current status of
/// execution.
fn should_continue(&self, ordered: bool) -> bool {
match self {
Self::Error(ref error) => {
match *error.kind {
ErrorKind::ClientBulkWrite(ref bulk_write_error) => {
// A top-level error is always fatal.
let top_level_error_occurred = error.source.is_some();
// A write error occurring during an ordered bulk write is fatal.
let terminal_write_error_occurred =
ordered && !bulk_write_error.write_errors.is_empty();

!top_level_error_occurred && !terminal_write_error_occurred
}
// A top-level error is always fatal.
_ => false,
}
}
_ => true,
}
}
}
98 changes: 76 additions & 22 deletions src/bson_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@ use std::{
};

use crate::{
bson::{Bson, Document, RawArrayBuf, RawBson, RawBsonRef, RawDocumentBuf},
bson::{
oid::ObjectId,
rawdoc,
Bson,
Document,
RawArrayBuf,
RawBson,
RawBsonRef,
RawDocumentBuf,
},
checked::Checked,
error::{ErrorKind, Result},
runtime::SyncLittleEndianRead,
Expand Down Expand Up @@ -57,38 +66,51 @@ pub(crate) fn to_raw_bson_array(docs: &[Document]) -> Result<RawBson> {
Ok(RawBson::Array(array))
}

#[cfg(test)]
pub(crate) fn sort_document(document: &mut Document) {
let temp = std::mem::take(document);

let mut elements: Vec<_> = temp.into_iter().collect();
elements.sort_by(|e1, e2| e1.0.cmp(&e2.0));

document.extend(elements);
}

pub(crate) fn first_key(document: &Document) -> Option<&str> {
document.keys().next().map(String::as_str)
}

pub(crate) fn replacement_raw_document_check(replacement: &RawDocumentBuf) -> Result<()> {
match replacement.iter().next().transpose()? {
Some((key, _)) if !key.starts_with('$') => Ok(()),
_ => Err(ErrorKind::InvalidArgument {
message: "replace document must have first key not starting with '$'".to_string(),
pub(crate) fn update_document_check(update: &Document) -> Result<()> {
match first_key(update) {
Some(key) => {
if !key.starts_with('$') {
Err(ErrorKind::InvalidArgument {
message: "update document must only contain update modifiers".to_string(),
}
.into())
} else {
Ok(())
}
}
None => Err(ErrorKind::InvalidArgument {
message: "update document must not be empty".to_string(),
}
.into()),
}
}

pub(crate) fn update_document_check(update: &Document) -> Result<()> {
match first_key(update) {
Some(s) if s.starts_with('$') => Ok(()),
_ => Err(ErrorKind::InvalidArgument {
message: "update document must have first key starting with '$".to_string(),
pub(crate) fn replacement_document_check(replacement: &Document) -> Result<()> {
if let Some(key) = first_key(replacement) {
if key.starts_with('$') {
return Err(ErrorKind::InvalidArgument {
message: "replacement document must not contain update modifiers".to_string(),
}
.into());
}
.into()),
}
Ok(())
}

pub(crate) fn replacement_raw_document_check(replacement: &RawDocumentBuf) -> Result<()> {
if let Some((key, _)) = replacement.iter().next().transpose()? {
if key.starts_with('$') {
return Err(ErrorKind::InvalidArgument {
message: "replacement document must not contain update modifiers".to_string(),
}
.into());
};
}
Ok(())
}

/// The size in bytes of the provided document's entry in a BSON array at the given index.
Expand All @@ -101,6 +123,14 @@ pub(crate) fn array_entry_size_bytes(index: usize, doc_len: usize) -> Result<usi
(Checked::new(1) + num_decimal_digits(index) + 1 + doc_len).get()
}

pub(crate) fn vec_to_raw_array_buf(docs: Vec<RawDocumentBuf>) -> RawArrayBuf {
let mut array = RawArrayBuf::new();
for doc in docs {
array.push(doc);
}
array
}

/// The number of digits in `n` in base 10.
/// Useful for calculating the size of an array entry in BSON.
fn num_decimal_digits(mut n: usize) -> usize {
Expand Down Expand Up @@ -139,6 +169,30 @@ pub(crate) fn extend_raw_document_buf(
Ok(())
}

/// Returns the _id field of this document, prepending the field to the document if one is not
/// already present.
pub(crate) fn get_or_prepend_id_field(doc: &mut RawDocumentBuf) -> Result<Bson> {
match doc.get("_id")? {
Some(id) => Ok(id.try_into()?),
None => {
let id = ObjectId::new();
let mut new_bytes = rawdoc! { "_id": id }.into_bytes();

// Remove the trailing null byte (which will be replaced by the null byte in the given
// document) and append the document's elements
new_bytes.pop();
new_bytes.extend(&doc.as_bytes()[4..]);

let new_length: i32 = Checked::new(new_bytes.len()).try_into()?;
new_bytes[0..4].copy_from_slice(&new_length.to_le_bytes());

*doc = RawDocumentBuf::from_bytes(new_bytes)?;

Ok(id.into())
}
}
}

#[cfg(test)]
mod test {
use crate::bson_util::num_decimal_digits;
Expand Down
Loading