Skip to content

Commit

Permalink
config loader (#53)
Browse files Browse the repository at this point in the history
* move http utils into own module
* add config loader and default to hyper-1.x http client
  • Loading branch information
aajtodd authored Sep 30, 2024
1 parent bce02d2 commit 6cf3395
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 44 deletions.
20 changes: 10 additions & 10 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,33 @@ publish = false

[dependencies]
async-channel = "2.3.1"
async-trait = "0.1.81"
aws-sdk-s3 = { version = "1.40.0", features = ["behavior-version-latest", "test-util"] }
aws-smithy-http = "0.60.9"
async-trait = "0.1.82"
aws-config = { version = "1.5.6", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest"] }
aws-smithy-experimental = { version = "0.1.3", features = ["crypto-aws-lc"] }
aws-smithy-runtime-api = "1.7.1"
aws-smithy-types = "1.2.0"
aws-smithy-types = "1.2.6"
aws-types = "1.3.3"
bytes = "1"
futures-util = "0.3.30"
# FIXME - upgrade to hyper 1.x
hyper = { version = "0.14.29", features = ["client"] }
path-clean = "1.0.1"
pin-project-lite = "0.2.14"
tokio = { version = "1.38.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] }
tower = { version = "0.5.0", features = ["limit", "retry", "util"] }
tokio = { version = "1.40.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] }
tower = { version = "0.5.1", features = ["limit", "retry", "util"] }
tracing = "0.1"

[dev-dependencies]
aws-config = { version = "1.5.4", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest", "test-util"] }
aws-smithy-mocks-experimental = "0.2.1"
aws-smithy-runtime = { version = "1.7.1", features = ["client", "connector-hyper-0-14-x", "test-util", "wire-mock"] }
clap = { version = "4.5.7", default-features = false, features = ["derive", "std", "help"] }
console-subscriber = "0.3.0"
http-02x = { package = "http", version = "0.2.9" }
http-body-1x = { package = "http-body", version = "1" }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tempfile = "3.10.1"
fastrand = "2.1.0"
tempfile = "3.12.0"
fastrand = "2.1.1"
futures-test = "0.3.30"
walkdir = "2"
tower-test = "0.4.0"
Expand Down
34 changes: 12 additions & 22 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use aws_s3_transfer_manager::metrics::unit::ByteUnit;
use aws_s3_transfer_manager::metrics::Throughput;
use aws_s3_transfer_manager::operation::download::body::Body;
use aws_s3_transfer_manager::types::{ConcurrencySetting, PartSize};
use aws_sdk_s3::config::StalledStreamProtectionConfig;
use aws_sdk_s3::error::DisplayErrorContext;
use aws_types::SdkConfig;
use bytes::Buf;
use clap::{CommandFactory, Parser};
use tokio::fs;
Expand Down Expand Up @@ -148,20 +146,15 @@ async fn do_recursive_download(
}

async fn do_download(args: Args) -> Result<(), BoxError> {
let config = aws_config::from_env()
.stalled_stream_protection(StalledStreamProtectionConfig::disabled())
.load()
.await;
let (bucket, _) = args.source.expect_s3().parts();
warmup(&config, bucket).await?;

let s3_client = aws_sdk_s3::Client::new(&config);

let tm_config = aws_s3_transfer_manager::Config::builder()
let tm_config = aws_s3_transfer_manager::from_env()
.concurrency(ConcurrencySetting::Explicit(args.concurrency))
.part_size(PartSize::Target(args.part_size))
.client(s3_client)
.build();
.load()
.await;

warmup(&tm_config, bucket).await?;

let tm = aws_s3_transfer_manager::Client::new(tm_config);

Expand Down Expand Up @@ -201,18 +194,15 @@ async fn do_upload(args: Args) -> Result<(), BoxError> {
unimplemented!("recursive upload not supported yet")
}

let config = aws_config::from_env().load().await;
let (bucket, key) = args.dest.expect_s3().parts();

warmup(&config, bucket).await?;

let s3_client = aws_sdk_s3::Client::new(&config);

let tm_config = aws_s3_transfer_manager::Config::builder()
let tm_config = aws_s3_transfer_manager::from_env()
.concurrency(ConcurrencySetting::Explicit(args.concurrency))
.part_size(PartSize::Target(args.part_size))
.client(s3_client)
.build();
.load()
.await;

warmup(&tm_config, bucket).await?;

let tm = aws_s3_transfer_manager::Client::new(tm_config);

Expand Down Expand Up @@ -288,9 +278,9 @@ async fn write_body(body: &mut Body, mut dest: fs::File) -> Result<(), BoxError>
Ok(())
}

async fn warmup(config: &SdkConfig, bucket: &str) -> Result<(), BoxError> {
async fn warmup(config: &aws_s3_transfer_manager::Config, bucket: &str) -> Result<(), BoxError> {
println!("warming up client...");
let s3 = aws_sdk_s3::Client::new(config);
let s3 = config.client();

let mut handles = Vec::new();
for _ in 0..16 {
Expand Down
2 changes: 2 additions & 0 deletions aws-s3-transfer-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::metrics::unit::ByteUnit;
use crate::types::{ConcurrencySetting, PartSize};
use std::cmp;

pub(crate) mod loader;

/// Minimum upload part size in bytes
const MIN_MULTIPART_PART_SIZE_BYTES: u64 = 5 * ByteUnit::Mebibyte.as_bytes_u64();

Expand Down
68 changes: 68 additions & 0 deletions aws-s3-transfer-manager/src/config/loader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use crate::config::Builder;
use crate::{
http,
types::{ConcurrencySetting, PartSize},
Config,
};

/// Load transfer manager [`Config`] from the environment.
#[derive(Default, Debug)]
pub struct ConfigLoader {
builder: Builder,
}

impl ConfigLoader {
/// Minimum object size that should trigger a multipart upload.
///
/// The minimum part size is 5 MiB, any part size less than that will be rounded up.
/// Default is [PartSize::Auto]
pub fn multipart_threshold(mut self, threshold: PartSize) -> Self {
self.builder = self.builder.multipart_threshold(threshold);
self
}

/// The target size of each part when using a multipart upload to complete the request.
///
/// When a request's content length is les than [`multipart_threshold`],
/// this setting is ignored and a single [`PutObject`] request will be made instead.
///
/// NOTE: The actual part size used may be larger than the configured part size if
/// the current value would result in more than 10,000 parts for an upload request.
///
/// Default is [PartSize::Auto]
///
/// [`multipart_threshold`]: method@Self::multipart_threshold
/// [`PutObject`]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
pub fn part_size(mut self, part_size: PartSize) -> Self {
self.builder = self.builder.part_size(part_size);
self
}

/// Set the concurrency level this component is allowed to use.
///
/// This sets the maximum number of concurrent in-flight requests.
/// Default is [ConcurrencySetting::Auto].
pub fn concurrency(mut self, concurrency: ConcurrencySetting) -> Self {
self.builder = self.builder.concurrency(concurrency);
self
}

/// Load the default configuration
///
/// If fields have been overridden during builder construction, the override values will be
/// used. Otherwise, the default values for each field will be provided.
pub async fn load(self) -> Config {
let shared_config = aws_config::from_env()
.http_client(http::default_client())
.load()
.await;
let s3_client = aws_sdk_s3::Client::new(&shared_config);
let builder = self.builder.client(s3_client);
builder.build()
}
}
15 changes: 15 additions & 0 deletions aws-s3-transfer-manager/src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use aws_smithy_experimental::hyper_1_0::{CryptoMode, HyperClientBuilder};
use aws_smithy_runtime_api::client::http::SharedHttpClient;

pub(crate) mod header;

/// The default HTTP client used by a transfer manager when not explicitly configured.
pub(crate) fn default_client() -> SharedHttpClient {
HyperClientBuilder::new()
.crypto_mode(CryptoMode::AwsLc)
.build_https()
}
File renamed without changes.
64 changes: 57 additions & 7 deletions aws-s3-transfer-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@
/* Automatically managed default lints */
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
/* End of automatically managed default lints */

//! AWS S3 Transfer Manager
//!
//! # Crate Features
//!
//! - `test-util`: Enables utilities for unit tests. DO NOT ENABLE IN PRODUCTION.
#![warn(
missing_debug_implementations,
missing_docs,
Expand All @@ -21,6 +14,54 @@
rust_2018_idioms
)]

//! An Amazon S3 client focused on maximizing throughput and network utilization.
//!
//! AWS S3 Transfer Manager is a high level abstraction over the base Amazon S3
//! [service API]. Transfer operations such as upload or download are automatically
//! split into concurrent requests to accelerate performance.
//!
//! [service API]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_Operations_Amazon_Simple_Storage_Service.html
//!
//! # Examples
//!
//! Load the default configuration:
//!
//! ```no_run
//! # async fn example() {
//! let config = aws_s3_transfer_manager::from_env().load().await;
//! let client = aws_s3_transfer_manager::Client::new(config);
//! # }
//! ```
//!
//! Download a bucket to a local directory:
//!
//! ```no_run
//! # async fn example() -> Result<(), aws_s3_transfer_manager::error::Error> {
//! let config = aws_s3_transfer_manager::from_env().load().await;
//! let client = aws_s3_transfer_manager::Client::new(config);
//!
//! let handle = client
//! .download_objects()
//! .bucket("my-bucket")
//! .destination("/tmp/my-bucket")
//! .send()
//! .await?;
//!
//! // wait for transfer to complete
//! handle.join().await?;
//!
//! # Ok(())
//! # }
//!
//! ```
//!
//! See the documentation for each client operation for more information:
//!
//! * [`download`](crate::Client::download) - download a single object
//! * [`upload`](crate::Client::upload) - upload a single object
//! * [`download_objects`](crate::Client::download_objects) - download an entire bucket or prefix to a local directory
//! * [`upload_objects`](crate::Client::upload_objects) - upload an entire local directory to a bucket
pub(crate) const DEFAULT_CONCURRENCY: usize = 8;

/// Error types emitted by `aws-s3-transfer-manager`
Expand All @@ -44,8 +85,17 @@ pub mod config;
/// Tower related middleware and components
pub(crate) mod middleware;

/// HTTP related components and utils
pub(crate) mod http;

/// Metrics
pub mod metrics;

pub use self::client::Client;
use self::config::loader::ConfigLoader;
pub use self::config::Config;

/// Create a config loader
pub fn from_env() -> ConfigLoader {
ConfigLoader::default()
}
1 change: 0 additions & 1 deletion aws-s3-transfer-manager/src/operation/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ mod discovery;
mod handle;
pub use handle::DownloadHandle;

mod header;
mod object_meta;
mod service;

Expand Down
4 changes: 2 additions & 2 deletions aws-s3-transfer-manager/src/operation/download/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::byte_stream::ByteStream;

use super::header::{self, ByteRange};
use super::object_meta::ObjectMetadata;
use super::DownloadContext;
use super::DownloadInput;
use crate::error;
use crate::http::header::{self, ByteRange};

#[derive(Debug, Clone, PartialEq)]
enum ObjectDiscoveryStrategy {
Expand Down Expand Up @@ -178,11 +178,11 @@ async fn discover_obj_with_get(
mod tests {
use std::sync::Arc;

use crate::http::header::ByteRange;
use crate::metrics::unit::ByteUnit;
use crate::operation::download::discovery::{
discover_obj, discover_obj_with_head, ObjectDiscoveryStrategy,
};
use crate::operation::download::header::ByteRange;
use crate::operation::download::DownloadContext;
use crate::operation::download::DownloadInput;
use crate::types::PartSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::str::FromStr;
use aws_sdk_s3::operation::get_object::GetObjectOutput;
use aws_sdk_s3::operation::head_object::HeadObjectOutput;

use super::header;
use crate::http::header;

// TODO(aws-sdk-rust#1159,design): how many of these fields should we expose?
// TODO(aws-sdk-rust#1159,docs): Document fields
Expand Down
2 changes: 1 addition & 1 deletion aws-s3-transfer-manager/src/operation/download/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
use crate::error;
use crate::http::header;
use crate::middleware::retry;
use crate::operation::download::header;
use crate::operation::download::DownloadContext;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::byte_stream::{AggregatedBytes, ByteStream};
Expand Down

0 comments on commit 6cf3395

Please sign in to comment.