diff --git a/node/bft/src/helpers/storage.rs b/node/bft/src/helpers/storage.rs index 2ab097ba62..ddb6bdcd9b 100644 --- a/node/bft/src/helpers/storage.rs +++ b/node/bft/src/helpers/storage.rs @@ -22,11 +22,13 @@ use snarkvm::{ narwhal::{BatchCertificate, BatchHeader, Transmission, TransmissionID}, }, prelude::{Address, Field, Network, Result, anyhow, bail, ensure}, + utilities::{cfg_into_iter, cfg_sorted_by}, }; use indexmap::{IndexMap, IndexSet, map::Entry}; use lru::LruCache; use parking_lot::RwLock; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use std::{ collections::{HashMap, HashSet}, num::NonZeroUsize, @@ -377,30 +379,25 @@ impl Storage { /// Returns the certificates that have not yet been included in the ledger. /// Note that the order of this set is by round and then insertion. pub(crate) fn get_pending_certificates(&self) -> IndexSet> { - let mut pending_certificates = IndexSet::new(); - // Obtain the read locks. let rounds = self.rounds.read(); let certificates = self.certificates.read(); // Iterate over the rounds. - for (_, certificates_for_round) in rounds.clone().sorted_by(|a, _, b, _| a.cmp(b)) { - // Iterate over the certificates for the round. - for (certificate_id, _, _) in certificates_for_round { - // Skip the certificate if it already exists in the ledger. - if self.ledger.contains_certificate(&certificate_id).unwrap_or(false) { - continue; - } - - // Add the certificate to the pending certificates. - match certificates.get(&certificate_id).cloned() { - Some(certificate) => pending_certificates.insert(certificate), - None => continue, - }; - } - } - - pending_certificates + cfg_sorted_by!(rounds.clone(), |a, _, b, _| a.cmp(b)) + .flat_map(|(_, certificates_for_round)| { + // Iterate over the certificates for the round. + cfg_into_iter!(certificates_for_round).filter_map(|(certificate_id, _, _)| { + // Skip the certificate if it already exists in the ledger. + if self.ledger.contains_certificate(&certificate_id).unwrap_or(false) { + None + } else { + // Add the certificate to the pending certificates. + certificates.get(&certificate_id).cloned() + } + }) + }) + .collect() } /// Checks the given `batch_header` for validity, returning the missing transmissions from storage.