Skip to content

Commit

Permalink
rusoto: s3sink: Support aborting or completing multipart upload on error
Browse files Browse the repository at this point in the history
A multipart upload should either be completed or aborted on error. In
the current state of things, a multipart upload would neither be
completed nor aborted, putting the onus on an external entity to take
care of finishing incomplete uploads or relying on a sane bucket
life cycle policy configured to abort incomplete multipart uploads.

An incomplete multipart upload still contributes to the storage costs as
long as it exists.

We introduce a property here to allow the user to select either aborting
or completing multipart uploads on error. Aborting the upload causes
whole of data to be discarded and the same upload ID is not usable for
uploading more parts to the same.

Completing an incomplete multipart upload can be useful in situations
like having a streamable MP4 where one might want to complete the upload
and have part of the data which was uploaded be preserved.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/618>
  • Loading branch information
SanchayanMaity committed Dec 7, 2021
1 parent 3ed9e29 commit 099a3f2
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 18 deletions.
163 changes: 145 additions & 18 deletions net/rusoto/src/s3sink/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use futures::future;
use rusoto_core::{region::Region, request::HttpClient};
use rusoto_credential::StaticProvider;
use rusoto_s3::{
CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3,
AbortMultipartUploadRequest, CompleteMultipartUploadRequest, CompletedMultipartUpload,
CompletedPart, CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3,
};

use once_cell::sync::Lazy;
Expand All @@ -29,6 +29,10 @@ use std::sync::Mutex;
use crate::s3url::*;
use crate::s3utils::{self, WaitError};

use super::OnError;

const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;

struct Started {
client: S3Client,
buffer: Vec<u8>,
Expand Down Expand Up @@ -89,6 +93,7 @@ struct Settings {
access_key: Option<String>,
secret_access_key: Option<String>,
metadata: Option<gst::Structure>,
multipart_upload_on_error: OnError,
}

impl Settings {
Expand Down Expand Up @@ -156,6 +161,7 @@ impl Default for Settings {
access_key: None,
secret_access_key: None,
metadata: None,
multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR,
}
}
}
Expand All @@ -166,6 +172,7 @@ pub struct S3Sink {
settings: Mutex<Settings>,
state: Mutex<State>,
canceller: Mutex<Option<future::AbortHandle>>,
abort_multipart_canceller: Mutex<Option<future::AbortHandle>>,
}

static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
Expand Down Expand Up @@ -196,10 +203,62 @@ impl S3Sink {

let output =
s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err {
WaitError::FutureError(err) => Some(gst::error_msg!(
gst::ResourceError::OpenWrite,
["Failed to upload part: {}", err]
)),
WaitError::FutureError(err) => {
let settings = self.settings.lock().unwrap();
match settings.multipart_upload_on_error {
OnError::Abort => {
gst_log!(
CAT,
obj: element,
"Aborting multipart upload request with id: {}",
state.upload_id
);
match self.abort_multipart_upload_request(state) {
Ok(()) => {
gst_log!(
CAT,
obj: element,
"Aborting multipart upload request succeeded."
);
}
Err(err) => gst_error!(
CAT,
obj: element,
"Aborting multipart upload failed: {}",
err.to_string()
),
}
}
OnError::Complete => {
gst_log!(
CAT,
obj: element,
"Completing multipart upload request with id: {}",
state.upload_id
);
match self.complete_multipart_upload_request(state) {
Ok(()) => {
gst_log!(
CAT,
obj: element,
"Complete multipart upload request succeeded."
);
}
Err(err) => gst_error!(
CAT,
obj: element,
"Completing multipart upload failed: {}",
err.to_string()
),
}
}
OnError::DoNothing => (),
}
Some(gst::error_msg!(
gst::ResourceError::OpenWrite,
["Failed to upload part: {}", err]
))
}
WaitError::Cancelled => None,
})?;

Expand Down Expand Up @@ -273,22 +332,53 @@ impl S3Sink {
}
}

fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> {
if self.flush_current_buffer(element).is_err() {
return Err(gst::error_msg!(
gst::ResourceError::Settings,
["Failed to flush internal buffer."]
));
fn create_abort_multipart_upload_request(
&self,
url: &GstS3Url,
started_state: &Started,
) -> AbortMultipartUploadRequest {
AbortMultipartUploadRequest {
bucket: url.bucket.clone(),
expected_bucket_owner: None,
key: url.object.clone(),
request_payer: None,
upload_id: started_state.upload_id.to_owned(),
}
}

let mut state = self.state.lock().unwrap();
let started_state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
unreachable!("Element should be started");
}
fn abort_multipart_upload_request(
&self,
started_state: &Started,
) -> Result<(), gst::ErrorMessage> {
let s3url = match *self.url.lock().unwrap() {
Some(ref url) => url.clone(),
None => unreachable!("Element should be started"),
};
let abort_req = self.create_abort_multipart_upload_request(&s3url, started_state);
let abort_req_future = started_state.client.abort_multipart_upload(abort_req);

s3utils::wait(&self.abort_multipart_canceller, abort_req_future)
.map(|_| ())
.map_err(|err| match err {
WaitError::FutureError(err) => {
gst::error_msg!(
gst::ResourceError::Write,
["Failed to abort multipart upload: {}.", err.to_string()]
)
}
WaitError::Cancelled => {
gst::error_msg!(
gst::ResourceError::Write,
["Abort multipart upload request interrupted."]
)
}
})
}

fn complete_multipart_upload_request(
&self,
started_state: &mut Started,
) -> Result<(), gst::ErrorMessage> {
let complete_req = self.create_complete_multipart_upload_request(started_state);
let complete_req_future = started_state.client.complete_multipart_upload(complete_req);

Expand All @@ -305,6 +395,25 @@ impl S3Sink {
})
}

fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> {
if self.flush_current_buffer(element).is_err() {
return Err(gst::error_msg!(
gst::ResourceError::Settings,
["Failed to flush internal buffer."]
));
}

let mut state = self.state.lock().unwrap();
let started_state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
unreachable!("Element should be started");
}
};

self.complete_multipart_upload_request(started_state)
}

fn start(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
Expand Down Expand Up @@ -406,6 +515,11 @@ impl S3Sink {

fn cancel(&self) {
let mut canceller = self.canceller.lock().unwrap();
let mut abort_canceller = self.abort_multipart_canceller.lock().unwrap();

if let Some(c) = abort_canceller.take() {
c.abort()
};

if let Some(c) = canceller.take() {
c.abort()
Expand Down Expand Up @@ -519,6 +633,14 @@ impl ObjectImpl for S3Sink {
gst::Structure::static_type(),
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecEnum::new(
"on-error",
"Whether to upload or complete the multipart upload on error",
"Do nothing, abort or complete a multipart upload request on error",
OnError::static_type(),
DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
]
});

Expand Down Expand Up @@ -591,6 +713,10 @@ impl ObjectImpl for S3Sink {
"metadata" => {
settings.metadata = value.get().expect("type checked upstream");
}
"on-error" => {
settings.multipart_upload_on_error =
value.get::<OnError>().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
Expand All @@ -614,6 +740,7 @@ impl ObjectImpl for S3Sink {
"access-key" => settings.access_key.to_value(),
"secret-access-key" => settings.secret_access_key.to_value(),
"metadata" => settings.metadata.to_value(),
"on-error" => settings.multipart_upload_on_error.to_value(),
_ => unimplemented!(),
}
}
Expand Down
15 changes: 15 additions & 0 deletions net/rusoto/src/s3sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ use gst::prelude::*;

mod imp;

#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstS3SinkOnError")]
pub(crate) enum OnError {
#[enum_value(name = "Abort: Abort multipart upload on error.", nick = "abort")]
Abort,
#[enum_value(
name = "Complete: Complete multipart upload on error.",
nick = "complete"
)]
Complete,
#[enum_value(name = "DoNothing: Do nothing on error.", nick = "nothing")]
DoNothing,
}

glib::wrapper! {
pub struct S3Sink(ObjectSubclass<imp::S3Sink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
}
Expand Down

0 comments on commit 099a3f2

Please sign in to comment.