Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config loader #53

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions aws-s3-transfer-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,33 @@ publish = false

[dependencies]
async-channel = "2.3.1"
async-trait = "0.1.81"
aws-sdk-s3 = { version = "1.40.0", features = ["behavior-version-latest", "test-util"] }
aws-smithy-http = "0.60.9"
async-trait = "0.1.82"
aws-config = { version = "1.5.6", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest"] }
aws-smithy-experimental = { version = "0.1.3", features = ["crypto-aws-lc"] }
aws-smithy-runtime-api = "1.7.1"
aws-smithy-types = "1.2.0"
aws-smithy-types = "1.2.6"
aws-types = "1.3.3"
bytes = "1"
futures-util = "0.3.30"
# FIXME - upgrade to hyper 1.x
hyper = { version = "0.14.29", features = ["client"] }
path-clean = "1.0.1"
pin-project-lite = "0.2.14"
tokio = { version = "1.38.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] }
tower = { version = "0.5.0", features = ["limit", "retry", "util"] }
tokio = { version = "1.40.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] }
tower = { version = "0.5.1", features = ["limit", "retry", "util"] }
tracing = "0.1"

[dev-dependencies]
aws-config = { version = "1.5.4", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest", "test-util"] }
aws-smithy-mocks-experimental = "0.2.1"
aws-smithy-runtime = { version = "1.7.1", features = ["client", "connector-hyper-0-14-x", "test-util", "wire-mock"] }
clap = { version = "4.5.7", default-features = false, features = ["derive", "std", "help"] }
console-subscriber = "0.3.0"
http-02x = { package = "http", version = "0.2.9" }
http-body-1x = { package = "http-body", version = "1" }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tempfile = "3.10.1"
fastrand = "2.1.0"
tempfile = "3.12.0"
fastrand = "2.1.1"
futures-test = "0.3.30"
walkdir = "2"
tower-test = "0.4.0"
Expand Down
34 changes: 12 additions & 22 deletions aws-s3-transfer-manager/examples/cp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use aws_s3_transfer_manager::metrics::unit::ByteUnit;
use aws_s3_transfer_manager::metrics::Throughput;
use aws_s3_transfer_manager::operation::download::body::Body;
use aws_s3_transfer_manager::types::{ConcurrencySetting, PartSize};
use aws_sdk_s3::config::StalledStreamProtectionConfig;
use aws_sdk_s3::error::DisplayErrorContext;
use aws_types::SdkConfig;
use bytes::Buf;
use clap::{CommandFactory, Parser};
use tokio::fs;
Expand Down Expand Up @@ -148,20 +146,15 @@ async fn do_recursive_download(
}

async fn do_download(args: Args) -> Result<(), BoxError> {
let config = aws_config::from_env()
.stalled_stream_protection(StalledStreamProtectionConfig::disabled())
.load()
.await;
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
let (bucket, _) = args.source.expect_s3().parts();
warmup(&config, bucket).await?;

let s3_client = aws_sdk_s3::Client::new(&config);

let tm_config = aws_s3_transfer_manager::Config::builder()
let tm_config = aws_s3_transfer_manager::from_env()
.concurrency(ConcurrencySetting::Explicit(args.concurrency))
.part_size(PartSize::Target(args.part_size))
.client(s3_client)
.build();
.load()
.await;

warmup(&tm_config, bucket).await?;

let tm = aws_s3_transfer_manager::Client::new(tm_config);

Expand Down Expand Up @@ -201,18 +194,15 @@ async fn do_upload(args: Args) -> Result<(), BoxError> {
unimplemented!("recursive upload not supported yet")
}

let config = aws_config::from_env().load().await;
let (bucket, key) = args.dest.expect_s3().parts();

warmup(&config, bucket).await?;

let s3_client = aws_sdk_s3::Client::new(&config);

let tm_config = aws_s3_transfer_manager::Config::builder()
let tm_config = aws_s3_transfer_manager::from_env()
.concurrency(ConcurrencySetting::Explicit(args.concurrency))
.part_size(PartSize::Target(args.part_size))
.client(s3_client)
.build();
.load()
.await;

warmup(&tm_config, bucket).await?;

let tm = aws_s3_transfer_manager::Client::new(tm_config);

Expand Down Expand Up @@ -288,9 +278,9 @@ async fn write_body(body: &mut Body, mut dest: fs::File) -> Result<(), BoxError>
Ok(())
}

async fn warmup(config: &SdkConfig, bucket: &str) -> Result<(), BoxError> {
async fn warmup(config: &aws_s3_transfer_manager::Config, bucket: &str) -> Result<(), BoxError> {
println!("warming up client...");
let s3 = aws_sdk_s3::Client::new(config);
let s3 = config.client();

let mut handles = Vec::new();
for _ in 0..16 {
Expand Down
2 changes: 2 additions & 0 deletions aws-s3-transfer-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::metrics::unit::ByteUnit;
use crate::types::{ConcurrencySetting, PartSize};
use std::cmp;

pub(crate) mod loader;

/// Minimum upload part size in bytes
const MIN_MULTIPART_PART_SIZE_BYTES: u64 = 5 * ByteUnit::Mebibyte.as_bytes_u64();

Expand Down
68 changes: 68 additions & 0 deletions aws-s3-transfer-manager/src/config/loader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use crate::config::Builder;
use crate::{
http,
types::{ConcurrencySetting, PartSize},
Config,
};

/// Load transfer manager [`Config`] from the environment.
#[derive(Default, Debug)]
pub struct ConfigLoader {
builder: Builder,
}

impl ConfigLoader {
/// Minimum object size that should trigger a multipart upload.
///
/// The minimum part size is 5 MiB, any part size less than that will be rounded up.
/// Default is [PartSize::Auto]
pub fn multipart_threshold(mut self, threshold: PartSize) -> Self {
self.builder = self.builder.multipart_threshold(threshold);
self
}

/// The target size of each part when using a multipart upload to complete the request.
///
/// When a request's content length is les than [`multipart_threshold`],
/// this setting is ignored and a single [`PutObject`] request will be made instead.
///
/// NOTE: The actual part size used may be larger than the configured part size if
/// the current value would result in more than 10,000 parts for an upload request.
///
/// Default is [PartSize::Auto]
///
/// [`multipart_threshold`]: method@Self::multipart_threshold
/// [`PutObject`]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
pub fn part_size(mut self, part_size: PartSize) -> Self {
self.builder = self.builder.part_size(part_size);
self
}

/// Set the concurrency level this component is allowed to use.
///
/// This sets the maximum number of concurrent in-flight requests.
/// Default is [ConcurrencySetting::Auto].
pub fn concurrency(mut self, concurrency: ConcurrencySetting) -> Self {
self.builder = self.builder.concurrency(concurrency);
self
}

/// Load the default configuration
///
/// If fields have been overridden during builder construction, the override values will be
/// used. Otherwise, the default values for each field will be provided.
pub async fn load(self) -> Config {
let shared_config = aws_config::from_env()
.http_client(http::default_client())
.load()
.await;
let s3_client = aws_sdk_s3::Client::new(&shared_config);
let builder = self.builder.client(s3_client);
builder.build()
}
}
15 changes: 15 additions & 0 deletions aws-s3-transfer-manager/src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
use aws_smithy_experimental::hyper_1_0::{CryptoMode, HyperClientBuilder};
use aws_smithy_runtime_api::client::http::SharedHttpClient;

pub(crate) mod header;

/// The default HTTP client used by a transfer manager when not explicitly configured.
pub(crate) fn default_client() -> SharedHttpClient {
HyperClientBuilder::new()
.crypto_mode(CryptoMode::AwsLc)
.build_https()
}
64 changes: 57 additions & 7 deletions aws-s3-transfer-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@
/* Automatically managed default lints */
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
/* End of automatically managed default lints */

//! AWS S3 Transfer Manager
//!
//! # Crate Features
//!
//! - `test-util`: Enables utilities for unit tests. DO NOT ENABLE IN PRODUCTION.

#![warn(
missing_debug_implementations,
missing_docs,
Expand All @@ -21,6 +14,54 @@
rust_2018_idioms
)]

//! An Amazon S3 client focused on maximizing throughput and network utilization.
//!
//! AWS S3 Transfer Manager is a high level abstraction over the base Amazon S3
//! [service API]. Transfer operations such as upload or download are automatically
//! split into concurrent requests to accelerate performance.
//!
//! [service API]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_Operations_Amazon_Simple_Storage_Service.html
//!
//! # Examples
//!
//! Load the default configuration:
//!
//! ```no_run
//! # async fn example() {
//! let config = aws_s3_transfer_manager::from_env().load().await;
//! let client = aws_s3_transfer_manager::Client::new(config);
//! # }
//! ```
//!
//! Download a bucket to a local directory:
//!
//! ```no_run
//! # async fn example() -> Result<(), aws_s3_transfer_manager::error::Error> {
//! let config = aws_s3_transfer_manager::from_env().load().await;
//! let client = aws_s3_transfer_manager::Client::new(config);
//!
//! let handle = client
//! .download_objects()
//! .bucket("my-bucket")
//! .destination("/tmp/my-bucket")
//! .send()
//! .await?;
//!
//! // wait for transfer to complete
//! handle.join().await?;
//!
//! # Ok(())
//! # }
//!
//! ```
//!
//! See the documentation for each client operation for more information:
//!
//! * [`download`](crate::Client::download) - download a single object
//! * [`upload`](crate::Client::upload) - upload a single object
//! * [`download_objects`](crate::Client::download_objects) - download an entire bucket or prefix to a local directory
//! * [`upload_objects`](crate::Client::upload_objects) - upload an entire local directory to a bucket

pub(crate) const DEFAULT_CONCURRENCY: usize = 8;

/// Error types emitted by `aws-s3-transfer-manager`
Expand All @@ -44,8 +85,17 @@ pub mod config;
/// Tower related middleware and components
pub(crate) mod middleware;

/// HTTP related components and utils
pub(crate) mod http;

/// Metrics
pub mod metrics;

pub use self::client::Client;
use self::config::loader::ConfigLoader;
pub use self::config::Config;

/// Create a config loader
pub fn from_env() -> ConfigLoader {
ConfigLoader::default()
}
1 change: 0 additions & 1 deletion aws-s3-transfer-manager/src/operation/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ mod discovery;
mod handle;
pub use handle::DownloadHandle;

mod header;
mod object_meta;
mod service;

Expand Down
4 changes: 2 additions & 2 deletions aws-s3-transfer-manager/src/operation/download/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use aws_sdk_s3::operation::get_object::builders::GetObjectInputBuilder;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::byte_stream::ByteStream;

use super::header::{self, ByteRange};
use super::object_meta::ObjectMetadata;
use super::DownloadContext;
use super::DownloadInput;
use crate::error;
use crate::http::header::{self, ByteRange};

#[derive(Debug, Clone, PartialEq)]
enum ObjectDiscoveryStrategy {
Expand Down Expand Up @@ -178,11 +178,11 @@ async fn discover_obj_with_get(
mod tests {
use std::sync::Arc;

use crate::http::header::ByteRange;
use crate::metrics::unit::ByteUnit;
use crate::operation::download::discovery::{
discover_obj, discover_obj_with_head, ObjectDiscoveryStrategy,
};
use crate::operation::download::header::ByteRange;
use crate::operation::download::DownloadContext;
use crate::operation::download::DownloadInput;
use crate::types::PartSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::str::FromStr;
use aws_sdk_s3::operation::get_object::GetObjectOutput;
use aws_sdk_s3::operation::head_object::HeadObjectOutput;

use super::header;
use crate::http::header;

// TODO(aws-sdk-rust#1159,design): how many of these fields should we expose?
// TODO(aws-sdk-rust#1159,docs): Document fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
use crate::error;
use crate::http::header;
use crate::middleware::retry;
use crate::operation::download::header;
use crate::operation::download::DownloadContext;
use aws_smithy_types::body::SdkBody;
use aws_smithy_types::byte_stream::{AggregatedBytes, ByteStream};
Expand Down