From f36667cd41659e92ff7ac63b4affa0ec253de5e2 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 29 Jun 2024 23:26:20 +0000 Subject: [PATCH] updated benchmarking --- bench.py | 5 +++-- src/main.rs | 43 ++++++++++++++++++++++++++++++++----------- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/bench.py b/bench.py index 903a1c0..02f3663 100644 --- a/bench.py +++ b/bench.py @@ -2,8 +2,9 @@ import pyarrow from tqdm import tqdm import polars +import sys -metadata = polars.read_parquet("small.parquet") +metadata = polars.read_parquet("bench.parquet")[:int(sys.argv[1])] # metadatas = [] # filenames = [] @@ -26,4 +27,4 @@ "aws", file_metadata)) -print(len(result)) \ No newline at end of file +print(len(result)) diff --git a/src/main.rs b/src/main.rs index 98ebd11..61c51a9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,11 +9,14 @@ use std::time::{Duration, Instant}; use tokio::task::JoinSet; -#[tokio::main] -async fn main() -> Result<(), Box> { - let args: Vec = std::env::args().collect(); - let num_clients = args[1].parse::()?; +async fn read(args: Vec) -> Result> { + + + // let num_clients = args[1].parse::()?; + let number = args[1].parse::()?; + let read_size = args[4].parse::()?; let df = LazyFrame::scan_parquet(args[3].clone(), Default::default())? + .slice(0, number.try_into().unwrap()) .collect()?; let filenames: Vec = df.column("filename")?.str()?.into_iter().map(|x| x.unwrap().to_string()).collect(); let page_offsets: Vec = df.column("page_offset_right")?.i64()?.into_iter().map(|x| x.unwrap() as u64).collect(); @@ -24,7 +27,7 @@ async fn main() -> Result<(), Box> { let config = aws_config::load_from_env().await; let client = aws_sdk_s3::Client::new(&config); - let start = Instant::now(); + let bucket = &args[2]; for ((filename, page_offset), page_size) in filenames.into_iter().zip(page_offsets.into_iter()).zip(page_sizes.into_iter()) { @@ -32,10 +35,11 @@ async fn main() -> Result<(), Box> { let bucket_c = bucket.to_string(); join_set.spawn(async move { + let mut byte_count = 0_usize; for i in 0 .. 1 { let from = page_offset; - let to = from + page_size - 1; - println!("Downloading {filename} from {from} to {to}"); + let to = from + read_size - 1; // page_size - 1; + // println!("Downloading {filename} from {from} to {to}"); let mut object = client_c .get_object() .bucket(bucket_c.clone()) @@ -45,7 +49,7 @@ async fn main() -> Result<(), Box> { .await .unwrap(); - let mut byte_count = 0_usize; + while let Some(bytes) = object.body.try_next().await.unwrap() { let bytes_len = bytes.len(); // file.write_all(&bytes)?; @@ -53,14 +57,31 @@ async fn main() -> Result<(), Box> { byte_count += bytes_len; } } - + byte_count }); } + let mut total_bytes = 0; while let Some(x) = join_set.join_next().await { + total_bytes += x.unwrap(); } + + + Ok(total_bytes) +} + +pub fn main() { + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let args: Vec = std::env::args().collect(); + let start = Instant::now(); + let res = rt.block_on(read(args)).unwrap(); let duration = start.elapsed(); - println!("Time elapsed is: {:?}", duration); + println!("Time elapsed is: {:?} total bytes {:?}", duration.as_millis(), res); + rt.shutdown_background(); - Ok(()) }