Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
marsupialtail committed Jul 22, 2024
2 parents f98a68b + 91c0f27 commit 406e062
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 12 deletions.
3 changes: 2 additions & 1 deletion bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import pyarrow
from tqdm import tqdm
import polars
import sys

metadata = polars.read_parquet("bench.parquet")[:10]
metadata = polars.read_parquet("bench.parquet")[:int(sys.argv[1])]

# metadatas = []
# filenames = []
Expand Down
43 changes: 32 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ use std::time::{Duration, Instant};
use tokio::task::JoinSet;


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args: Vec<String> = std::env::args().collect();
let num_clients = args[1].parse::<u64>()?;
async fn read(args: Vec<String>) -> Result<usize, Box<dyn std::error::Error>> {


// let num_clients = args[1].parse::<u64>()?;
let number = args[1].parse::<u64>()?;
let read_size = args[4].parse::<u64>()?;
let df = LazyFrame::scan_parquet(args[3].clone(), Default::default())?
.slice(0, number.try_into().unwrap())
.collect()?;
let filenames: Vec<String> = df.column("filename")?.str()?.into_iter().map(|x| x.unwrap().to_string()).collect();
let page_offsets: Vec<u64> = df.column("page_offset_right")?.i64()?.into_iter().map(|x| x.unwrap() as u64).collect();
Expand All @@ -24,18 +27,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = aws_config::load_from_env().await;
let client = aws_sdk_s3::Client::new(&config);

let start = Instant::now();

let bucket = &args[2];
for ((filename, page_offset), page_size) in filenames.into_iter().zip(page_offsets.into_iter()).zip(page_sizes.into_iter()) {

let client_c = client.clone();
let bucket_c = bucket.to_string();
join_set.spawn(async move {

let mut byte_count = 0_usize;
for i in 0 .. 1 {
let from = page_offset;
let to = from + page_size - 1;
println!("Downloading {filename} from {from} to {to}");
let to = from + read_size - 1; // page_size - 1;
// println!("Downloading {filename} from {from} to {to}");
let mut object = client_c
.get_object()
.bucket(bucket_c.clone())
Expand All @@ -45,22 +49,39 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await
.unwrap();

let mut byte_count = 0_usize;

while let Some(bytes) = object.body.try_next().await.unwrap() {
let bytes_len = bytes.len();
// file.write_all(&bytes)?;
// trace!("Intermediate write of {bytes_len}");
byte_count += bytes_len;
}
}

byte_count
});
}

let mut total_bytes = 0;
while let Some(x) = join_set.join_next().await {
total_bytes += x.unwrap();
}


Ok(total_bytes)
}

pub fn main() {

let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let args: Vec<String> = std::env::args().collect();
let start = Instant::now();
let res = rt.block_on(read(args)).unwrap();
let duration = start.elapsed();
println!("Time elapsed is: {:?}", duration);
println!("Time elapsed is: {:?} total bytes {:?}", duration.as_millis(), res);
rt.shutdown_background();

Ok(())
}

0 comments on commit 406e062

Please sign in to comment.