Skip to content

Commit

Permalink
removing duplicate functions
Browse files Browse the repository at this point in the history
  • Loading branch information
anupsv committed Jan 28, 2025
1 parent c219091 commit d8e2e2b
Showing 1 changed file with 0 additions and 155 deletions.
155 changes: 0 additions & 155 deletions prover/src/kzg.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use ark_bn254::{Fr, G1Affine, G1Projective};
use ark_ec::{CurveGroup, VariableBaseMSM};
use ark_poly::{EvaluationDomain, GeneralEvaluationDomain};
use ark_serialize::Read;
use ark_std::{ops::Div, Zero};
use crossbeam_channel::{bounded, Sender};
use num_traits::ToPrimitive;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use rust_kzg_bn254_primitives::{
Expand All @@ -12,10 +10,6 @@ use rust_kzg_bn254_primitives::{
helpers,
polynomial::{PolynomialCoeffForm, PolynomialEvalForm},
};
use std::{
fs::File,
io::{self, BufReader},
};

use crate::srs::SRS;

Expand Down Expand Up @@ -86,155 +80,6 @@ impl KZG {
self.expanded_roots_of_unity.get(i)
}

/// read files in chunks with specified length
/// TODO: chunks seems misleading here, since we read one field element at a
/// time.
fn read_file_chunks(
file_path: &str,
sender: Sender<(Vec<u8>, usize, bool)>,
point_size: usize,
num_points: u32,
is_native: bool,
) -> io::Result<()> {
let file = File::open(file_path)?;
let mut reader = BufReader::new(file);
let mut position = 0;
let mut buffer = vec![0u8; point_size];

let mut i = 0;
// We are making one syscall per field element, which is super inefficient.
// FIXME: Read the entire file (or large segments) into memory and then split it
// into field elements. Entire G1 file might be ~8GiB, so might not fit
// in RAM. But we can only read the subset of the file that we need.
// For eg. for fault proof usage, only need to read 32MiB if our blob size is
// that large.
while let Ok(bytes_read) = reader.read(&mut buffer) {
if bytes_read == 0 {
break;
}
sender
.send((buffer[..bytes_read].to_vec(), position, is_native))
.unwrap();
position += bytes_read;
buffer.resize(point_size, 0); // Ensure the buffer is always the correct size
i += 1;
if num_points == i {
break;
}
}
Ok(())
}

/// read G1 points in parallel, by creating one reader thread, which reads
/// bytes from the file, and fans them out to worker threads (one per
/// cpu) which parse the bytes into G1Affine points. The worker threads
/// then fan in the parsed points to the main thread, which sorts them by
/// their original position in the file to maintain order. Not used anywhere
/// but kept as a reference.
///
/// # Arguments
/// * `file_path` - The path to the file containing the G1 points
/// * `srs_points_to_load` - The number of points to load from the file
/// * `is_native` - Whether the points are in native arkworks format or not
///
/// # Returns
/// * `Ok(Vec<G1Affine>)` - The G1 points read from the file
/// * `Err(KzgError)` - An error occurred while reading the file
pub fn parallel_read_g1_points_native(
file_path: String,
srs_points_to_load: u32,
is_native: bool,
) -> Result<Vec<G1Affine>, KzgError> {
// Channel contains (bytes, position, is_native) tuples. The position is used to
// reorder the points after processing them.
let (sender, receiver) = bounded::<(Vec<u8>, usize, bool)>(1000);

// Spawning the reader thread
let reader_thread = std::thread::spawn(
move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Self::read_file_chunks(&file_path, sender, 32, srs_points_to_load, is_native)
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })
},
);

let num_workers = num_cpus::get();

let workers: Vec<_> = (0..num_workers)
.map(|_| {
let receiver = receiver.clone();
std::thread::spawn(move || helpers::process_chunks::<G1Affine>(receiver))
})
.collect();

// Wait for the reader thread to finish
match reader_thread.join() {
Ok(result) => match result {
Ok(_) => {},
Err(e) => return Err(KzgError::GenericError(e.to_string())),
},
Err(_) => return Err(KzgError::GenericError("Thread panicked".to_string())),
}

// Collect and sort results
let mut all_points = Vec::new();
for worker in workers {
let points = worker.join().expect("Worker thread panicked");
all_points.extend(points);
}

// Sort by original position to maintain order
all_points.sort_by_key(|&(_, position)| position);

Ok(all_points.iter().map(|(point, _)| *point).collect())
}

/// read G1 points in parallel
pub fn parallel_read_g1_points(
file_path: String,
srs_points_to_load: u32,
is_native: bool,
) -> Result<Vec<G1Affine>, KzgError> {
let (sender, receiver) = bounded::<(Vec<u8>, usize, bool)>(1000);

// Spawning the reader thread
let reader_handle = std::thread::spawn(
move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Self::read_file_chunks(&file_path, sender, 32, srs_points_to_load, is_native)
.map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })
},
);

let num_workers = num_cpus::get();

let worker_handles: Vec<_> = (0..num_workers)
.map(|_| {
let receiver = receiver.clone();
std::thread::spawn(move || helpers::process_chunks::<G1Affine>(receiver))
})
.collect();

// Wait for the reader thread to finish
match reader_handle.join() {
Ok(result) => match result {
Ok(_) => {},
Err(e) => return Err(KzgError::GenericError(e.to_string())),
},
Err(_) => return Err(KzgError::GenericError("Thread panicked".to_string())),
}

// Collect and sort results
let mut all_points = Vec::new();
for handle in worker_handles {
let points = handle.join().expect("Worker thread panicked");
all_points.extend(points);
}

// Sort by original position to maintain order
all_points.sort_by_key(|&(_, position)| position);

Ok(all_points.iter().map(|(point, _)| *point).collect())
}

/// Commit the polynomial with the srs values loaded into [Kzg].
pub fn commit_eval_form(
&self,
Expand Down

0 comments on commit d8e2e2b

Please sign in to comment.