Skip to content

Commit

Permalink
refactor(api): Propagate DB errors in TxSender (matter-labs#902)
Browse files Browse the repository at this point in the history
## What ❔

Propagates DB errors in `TxSender` instead of panicking on them.

## Why ❔

Rust panics are slow and are more difficult to meter compared to errors.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored Jan 23, 2024
1 parent 62d3bef commit c260850
Show file tree
Hide file tree
Showing 31 changed files with 588 additions and 528 deletions.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

This file was deleted.

15 changes: 7 additions & 8 deletions core/lib/dal/src/storage_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,9 @@ impl StorageDal<'_, '_> {
}

/// Gets the current storage value at the specified `key`.
pub async fn get_by_key(&mut self, key: &StorageKey) -> Option<H256> {
pub async fn get_by_key(&mut self, key: &StorageKey) -> sqlx::Result<Option<H256>> {
let hashed_key = key.hashed_key();

sqlx::query!(
let row = sqlx::query!(
r#"
SELECT
value
Expand All @@ -240,9 +239,9 @@ impl StorageDal<'_, '_> {
.report_latency()
.with_arg("key", &hashed_key)
.fetch_optional(self.storage.conn())
.await
.unwrap()
.map(|row| H256::from_slice(&row.value))
.await?;

Ok(row.map(|row| H256::from_slice(&row.value)))
}

/// Removes all factory deps with a miniblock number strictly greater than the specified `block_number`.
Expand Down Expand Up @@ -284,8 +283,8 @@ mod tests {
conn.storage_dal().apply_storage_logs(&updates).await;

let first_value = conn.storage_dal().get_by_key(&first_key).await.unwrap();
assert_eq!(first_value, H256::repeat_byte(1));
assert_eq!(first_value, Some(H256::repeat_byte(1)));
let second_value = conn.storage_dal().get_by_key(&second_key).await.unwrap();
assert_eq!(second_value, H256::repeat_byte(2));
assert_eq!(second_value, Some(H256::repeat_byte(2)));
}
}
14 changes: 7 additions & 7 deletions core/lib/dal/src/storage_logs_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,11 +808,11 @@ mod tests {
insert_miniblock(conn, 2, logs).await;

let value = conn.storage_dal().get_by_key(&key).await.unwrap();
assert_eq!(value, H256::repeat_byte(0xff));
assert_eq!(value, Some(H256::repeat_byte(0xff)));
let value = conn.storage_dal().get_by_key(&second_key).await.unwrap();
assert_eq!(value, H256::zero());
assert_eq!(value, Some(H256::zero()));
let value = conn.storage_dal().get_by_key(&new_key).await.unwrap();
assert_eq!(value, H256::repeat_byte(0xfe));
assert_eq!(value, Some(H256::repeat_byte(0xfe)));

let prev_keys = vec![key.hashed_key(), new_key.hashed_key(), H256::zero()];
let prev_values = conn
Expand All @@ -829,11 +829,11 @@ mod tests {
.await;

let value = conn.storage_dal().get_by_key(&key).await.unwrap();
assert_eq!(value, H256::repeat_byte(3));
assert_eq!(value, Some(H256::repeat_byte(3)));
let value = conn.storage_dal().get_by_key(&second_key).await.unwrap();
assert_eq!(value, H256::repeat_byte(2));
let value = conn.storage_dal().get_by_key(&new_key).await;
assert!(value.is_none());
assert_eq!(value, Some(H256::repeat_byte(2)));
let value = conn.storage_dal().get_by_key(&new_key).await.unwrap();
assert_eq!(value, None);
}

#[tokio::test]
Expand Down
27 changes: 14 additions & 13 deletions core/lib/dal/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ async fn remove_stuck_txs() {
.await;

// Get all txs
transactions_dal.reset_mempool().await;
let txs = transactions_dal
.sync_mempool(vec![], vec![], 0, 0, 1000)
transactions_dal.reset_mempool().await.unwrap();
let (txs, _) = transactions_dal
.sync_mempool(&[], &[], 0, 0, 1000)
.await
.0;
.unwrap();
assert_eq!(txs.len(), 4);

let storage = transactions_dal.storage;
Expand All @@ -227,23 +227,24 @@ async fn remove_stuck_txs() {
.await;

// Get all txs
transactions_dal.reset_mempool().await;
let txs = transactions_dal
.sync_mempool(vec![], vec![], 0, 0, 1000)
transactions_dal.reset_mempool().await.unwrap();
let (txs, _) = transactions_dal
.sync_mempool(&[], &[], 0, 0, 1000)
.await
.0;
.unwrap();
assert_eq!(txs.len(), 3);

// Remove one stuck tx
let removed_txs = transactions_dal
.remove_stuck_txs(Duration::from_secs(500))
.await;
.await
.unwrap();
assert_eq!(removed_txs, 1);
transactions_dal.reset_mempool().await;
let txs = transactions_dal
.sync_mempool(vec![], vec![], 0, 0, 1000)
transactions_dal.reset_mempool().await.unwrap();
let (txs, _) = transactions_dal
.sync_mempool(&[], &[], 0, 0, 1000)
.await
.0;
.unwrap();
assert_eq!(txs.len(), 2);

// We shouldn't collect executed tx
Expand Down
35 changes: 16 additions & 19 deletions core/lib/dal/src/tokens_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,22 @@ impl TokensDal<'_, '_> {
}
}

pub async fn get_all_l2_token_addresses(&mut self) -> Vec<Address> {
{
let records = sqlx::query!(
r#"
SELECT
l2_address
FROM
tokens
"#
)
.fetch_all(self.storage.conn())
.await
.unwrap();
let addresses: Vec<Address> = records
.into_iter()
.map(|record| Address::from_slice(&record.l2_address))
.collect();
addresses
}
pub async fn get_all_l2_token_addresses(&mut self) -> sqlx::Result<Vec<Address>> {
let rows = sqlx::query!(
r#"
SELECT
l2_address
FROM
tokens
"#
)
.fetch_all(self.storage.conn())
.await?;

Ok(rows
.into_iter()
.map(|row| Address::from_slice(&row.l2_address))
.collect())
}

pub async fn get_unknown_l1_token_addresses(&mut self) -> Vec<Address> {
Expand Down
Loading

0 comments on commit c260850

Please sign in to comment.