Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make createMPU Call Async #84

Merged
merged 27 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ env:
# Change to specific Rust release to pin
rust_stable: stable
rust_nightly: nightly-2024-07-07
rust_clippy: '1.79'
rust_clippy: '1.81'
# When updating this, also update relevant docs
rust_min: '1.79'
rust_min: '1.81'
waahm7 marked this conversation as resolved.
Show resolved Hide resolved


defaults:
Expand Down Expand Up @@ -55,7 +55,7 @@ jobs:
- docs
- minrust
steps:
- run: exit 0
- run: exit 0

test-hll:
name: Test S3 transfer manager HLL
Expand Down Expand Up @@ -133,7 +133,7 @@ jobs:
run: |
cargo doc --lib --no-deps --all-features --document-private-items
env:
RUSTFLAGS: --cfg docsrs
RUSTFLAGS: --cfg docsrs
RUSTDOCFLAGS: --cfg docsrs

minrust:
Expand Down Expand Up @@ -246,7 +246,7 @@ jobs:
- name: Install cargo-hack
uses: taiki-e/install-action@cargo-hack
- uses: Swatinem/rust-cache@v2
- name: check --feature-powerset
- name: check --feature-powerset
run: cargo hack check --all --feature-powerset

# TODO - get cross check working
Expand Down
3 changes: 1 addition & 2 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,7 @@ async fn do_upload(args: Args) -> Result<(), BoxError> {
.bucket(bucket)
.key(key)
.body(stream)
.send()
.await?;
.initiate()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where'd you get the name initiate()? Is that a standard name used in similar libraries? Just want to be sure we pick something predictable for users

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, Aaron suggested changing the name from send to avoid confusion with the Rust SDK at #73 (comment), and I couldn't think of something better than initiate/send.


let _resp = handle.join().await?;
let elapsed = start.elapsed();
Expand Down
5 changes: 2 additions & 3 deletions aws-s3-transfer-manager/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@ impl Client {
/// .bucket("my-bucket")
/// .key("my-key")
/// .body(stream)
/// .send()
/// .await?;
/// .initiate()?;
///
/// // send() may return before the transfer is complete.
/// // initiate() will return before the transfer is complete.
/// // Call the `join()` method on the returned handle to drive the transfer to completion.
/// // The handle can also be used to get progress, pause, or cancel the transfer, etc.
/// let response = handle.join().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,10 @@ impl DownloadFluentBuilder {

impl crate::operation::download::input::DownloadInputBuilder {
/// Initiate a download transfer for a single object with this input using the given client.
pub fn send_with(self, client: &crate::Client) -> Result<DownloadHandle, crate::error::Error> {
pub fn initiate_with(
self,
client: &crate::Client,
) -> Result<DownloadHandle, crate::error::Error> {
let mut fluent_builder = client.download();
fluent_builder.inner = self;
fluent_builder.initiate()
Expand Down
162 changes: 115 additions & 47 deletions aws-s3-transfer-manager/src/operation/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ mod service;

use crate::error;
use crate::io::InputStream;
use aws_smithy_types::byte_stream::ByteStream;
use context::UploadContext;
pub use handle::UploadHandle;
use handle::{MultipartUploadData, UploadType};
/// Request type for uploads to Amazon S3
pub use input::{UploadInput, UploadInputBuilder};
/// Response type for uploads to Amazon S3
Expand All @@ -36,57 +36,58 @@ pub(crate) struct Upload;

impl Upload {
/// Execute a single `Upload` transfer operation
pub(crate) async fn orchestrate(
pub(crate) fn orchestrate(
handle: Arc<crate::client::Handle>,
mut input: crate::operation::upload::UploadInput,
) -> Result<UploadHandle, error::Error> {
let min_mpu_threshold = handle.mpu_threshold_bytes();

let stream = input.take_body();
let ctx = new_context(handle, input);

// MPU has max of 10K parts which requires us to know the upper bound on the content length (today anyway)
// While true for file-based workloads, the upper `size_hint` might not be equal to the actual bytes transferred.
let content_length = stream
.size_hint()
.upper()
.ok_or_else(crate::io::error::Error::upper_bound_size_hint_required)?;
let ctx = new_context(handle.clone(), input);
Ok(UploadHandle::new(
ctx.clone(),
tokio::spawn(try_start_upload(handle.clone(), stream, ctx)),
))
}
}

async fn try_start_upload(
handle: Arc<crate::client::Handle>,
stream: InputStream,
ctx: UploadContext,
) -> Result<UploadType, crate::error::Error> {
let min_mpu_threshold = handle.mpu_threshold_bytes();

// MPU has max of 10K parts which requires us to know the upper bound on the content length (today anyway)
// While true for file-based workloads, the upper `size_hint` might not be equal to the actual bytes transferred.
let content_length = stream
.size_hint()
.upper()
.ok_or_else(crate::io::error::Error::upper_bound_size_hint_required)?;

let upload_type = if content_length < min_mpu_threshold && !stream.is_mpu_only() {
tracing::trace!("upload request content size hint ({content_length}) less than min part size threshold ({min_mpu_threshold}); sending as single PutObject request");
UploadType::PutObject(tokio::spawn(put_object(
ctx.clone(),
stream,
content_length,
)))
} else {
// TODO - to upload a 0 byte object via MPU you have to send [CreateMultipartUpload, UploadPart(part=1, 0 bytes), CompleteMultipartUpload]
// we should add tests for this and hide this edge case from the user (e.g. send an empty part when a custom PartStream returns `None` immediately)
// FIXME - investigate what it would take to allow non mpu uploads for `PartStream` implementations
let handle = if content_length < min_mpu_threshold && !stream.is_mpu_only() {
tracing::trace!("upload request content size hint ({content_length}) less than min part size threshold ({min_mpu_threshold}); sending as single PutObject request");
try_start_put_object(ctx, stream, content_length).await?
} else {
// TODO - to upload a 0 byte object via MPU you have to send [CreateMultipartUpload, UploadPart(part=1, 0 bytes), CompleteMultipartUpload]
// we should add tests for this and hide this edge case from the user (e.g. send an empty part when a custom PartStream returns `None` immediately)
try_start_mpu_upload(ctx, stream, content_length).await?
};

Ok(handle)
}
try_start_mpu_upload(ctx, stream, content_length).await?
};
Ok(upload_type)
}

async fn try_start_put_object(
async fn put_object(
ctx: UploadContext,
stream: InputStream,
content_length: u64,
) -> Result<UploadHandle, crate::error::Error> {
let byte_stream = stream.into_byte_stream().await?;
) -> Result<UploadOutput, error::Error> {
let body = stream.into_byte_stream().await?;
let content_length: i64 = content_length.try_into().map_err(|_| {
error::invalid_input(format!("content_length:{} is invalid.", content_length))
})?;

Ok(UploadHandle::new_put_object(
ctx.clone(),
tokio::spawn(put_object(ctx.clone(), byte_stream, content_length)),
))
}

async fn put_object(
ctx: UploadContext,
body: ByteStream,
content_length: i64,
) -> Result<UploadOutput, error::Error> {
// FIXME - This affects performance in cases with a lot of small files workloads. We need a way to schedule
// more work for a lot of small files.
let _permit = ctx.handle.scheduler.acquire_permit().await?;
Expand Down Expand Up @@ -147,7 +148,7 @@ async fn try_start_mpu_upload(
ctx: UploadContext,
stream: InputStream,
content_length: u64,
) -> Result<UploadHandle, crate::error::Error> {
) -> Result<UploadType, crate::error::Error> {
let part_size = cmp::max(
ctx.handle.upload_part_size_bytes(),
content_length / MAX_PARTS,
Expand All @@ -159,18 +160,22 @@ async fn try_start_mpu_upload(
"multipart upload started with upload id: {:?}",
mpu.upload_id
);

let mut handle = UploadHandle::new_multipart(ctx);
handle.set_response(mpu);
distribute_work(&mut handle, stream, part_size)?;
Ok(handle)
let upload_id = mpu.upload_id.clone().expect("upload_id is present");
let mut mpu_data = MultipartUploadData {
upload_part_tasks: Default::default(),
read_body_tasks: Default::default(),
response: Some(mpu),
upload_id: upload_id.clone(),
};

distribute_work(&mut mpu_data, ctx, stream, part_size)?;
Ok(UploadType::MultipartUpload(mpu_data))
}

fn new_context(handle: Arc<crate::client::Handle>, req: UploadInput) -> UploadContext {
UploadContext {
handle,
request: Arc::new(req),
upload_id: None,
}
}

Expand Down Expand Up @@ -223,6 +228,7 @@ mod test {
use crate::io::InputStream;
use crate::operation::upload::UploadInput;
use crate::types::{ConcurrencySetting, PartSize};
use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadOutput;
use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOutput;
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
use aws_sdk_s3::operation::put_object::PutObjectOutput;
Expand All @@ -231,6 +237,7 @@ mod test {
use bytes::Bytes;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::Barrier;
use test_common::mock_client_with_stubbed_http_client;

#[tokio::test]
Expand Down Expand Up @@ -295,7 +302,7 @@ mod test {
.key("test-key")
.body(stream);

let handle = request.send_with(&tm).await.unwrap();
let handle = request.initiate_with(&tm).unwrap();

let resp = handle.join().await.unwrap();
assert_eq!(expected_upload_id.deref(), resp.upload_id.unwrap().deref());
Expand Down Expand Up @@ -329,9 +336,70 @@ mod test {
.bucket("test-bucket")
.key("test-key")
.body(stream);
let handle = request.send_with(&tm).await.unwrap();
let handle = request.initiate_with(&tm).unwrap();
let resp = handle.join().await.unwrap();
assert_eq!(resp.upload_id(), None);
assert_eq!(expected_e_tag.deref(), resp.e_tag().unwrap());
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_abort_multipart_upload() {
let expected_upload_id = Arc::new("test-upload".to_owned());
let body = Bytes::from_static(b"every adolescent dog goes bonkers early");
let stream = InputStream::from(body);
let bucket = "test-bucket";
let key = "test-key";
let wait_till_create_mpu = Arc::new(Barrier::new(2));

let upload_id = expected_upload_id.clone();
let create_mpu =
mock!(aws_sdk_s3::Client::create_multipart_upload).then_output(move || {
CreateMultipartUploadOutput::builder()
.upload_id(upload_id.as_ref().to_owned())
.build()
});

let upload_part = mock!(aws_sdk_s3::Client::upload_part).then_output({
let wait_till_create_mpu = wait_till_create_mpu.clone();
move || {
wait_till_create_mpu.wait();
UploadPartOutput::builder().build()
}
});

let abort_mpu = mock!(aws_sdk_s3::Client::abort_multipart_upload)
.match_requests({
let upload_id: Arc<String> = expected_upload_id.clone();
move |input| {
input.upload_id.as_ref() == Some(&upload_id)
&& input.bucket() == Some(bucket)
&& input.key() == Some(key)
}
})
.then_output(|| AbortMultipartUploadOutput::builder().build());

let client = mock_client_with_stubbed_http_client!(
aws_sdk_s3,
RuleMode::Sequential,
&[create_mpu, upload_part, abort_mpu]
);

let tm_config = crate::Config::builder()
.concurrency(ConcurrencySetting::Explicit(1))
.set_multipart_threshold(PartSize::Target(10))
.set_target_part_size(PartSize::Target(5 * 1024 * 1024))
.client(client)
.build();

let tm = crate::Client::new(tm_config);

let request = UploadInput::builder()
.bucket("test-bucket")
.key("test-key")
.body(stream);
let handle = request.initiate_with(&tm).unwrap();
wait_till_create_mpu.wait();
let abort = handle.abort().await.unwrap();
assert_eq!(abort.upload_id().unwrap(), expected_upload_id.deref());
}
}
10 changes: 5 additions & 5 deletions aws-s3-transfer-manager/src/operation/upload/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ impl UploadFluentBuilder {
bucket = self.inner.bucket.as_deref().unwrap_or_default(),
key = self.inner.key.as_deref().unwrap_or_default(),
))]
// TODO: Make it consistent with download by renaming it to initiate and making it synchronous
pub async fn send(self) -> Result<UploadHandle, crate::error::Error> {

pub fn initiate(self) -> Result<UploadHandle, crate::error::Error> {
let input = self.inner.build()?;
crate::operation::upload::Upload::orchestrate(self.handle, input).await
crate::operation::upload::Upload::orchestrate(self.handle, input)
}

/// <p>The canned ACL to apply to the object. For more information, see <a href="https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#CannedACL">Canned ACL</a> in the <i>Amazon S3 User Guide</i>.</p>
Expand Down Expand Up @@ -911,12 +911,12 @@ impl UploadFluentBuilder {

impl crate::operation::upload::input::UploadInputBuilder {
/// Initiate an upload transfer for a single object with this input using the given client.
pub async fn send_with(
pub fn initiate_with(
self,
client: &crate::Client,
) -> Result<UploadHandle, crate::error::Error> {
let mut fluent_builder = client.upload();
fluent_builder.inner = self;
fluent_builder.send().await
fluent_builder.initiate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're changing the name-and-asyncness of UploadFluentBuilder::send() then we should change the name-and-asyncness of UploadInputBuilder::send_with()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yeah, I can make it consistent once we have agreed on the initiate name in the previous feedback.

}
}
12 changes: 0 additions & 12 deletions aws-s3-transfer-manager/src/operation/upload/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use std::sync::Arc;
pub(crate) struct UploadContext {
/// reference to client handle used to do actual work
pub(crate) handle: Arc<crate::client::Handle>,
/// the multipart upload ID
pub(crate) upload_id: Option<String>,
/// the original request (NOTE: the body will have been taken for processing, only the other fields remain)
pub(crate) request: Arc<UploadInput>,
}
Expand All @@ -28,14 +26,4 @@ impl UploadContext {
pub(crate) fn request(&self) -> &UploadInput {
self.request.deref()
}

/// Set the upload ID if the transfer will be done using a multipart upload
pub(crate) fn set_upload_id(&mut self, upload_id: String) {
self.upload_id = Some(upload_id)
}

/// Check if this transfer is using multipart upload
pub(crate) fn is_multipart_upload(&self) -> bool {
self.upload_id.is_some()
}
}
Loading