diff --git a/core/lib/dal/.sqlx/query-18820f4ab0c3d2cc9187c5660f9f50e423eb6134659fe52bcc2b27ad16740c96.json b/core/lib/dal/.sqlx/query-18820f4ab0c3d2cc9187c5660f9f50e423eb6134659fe52bcc2b27ad16740c96.json deleted file mode 100644 index 73887716338f..000000000000 --- a/core/lib/dal/.sqlx/query-18820f4ab0c3d2cc9187c5660f9f50e423eb6134659fe52bcc2b27ad16740c96.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM transactions\n WHERE\n in_mempool = TRUE\n AND initiator_address = ANY ($1)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "ByteaArray" - ] - }, - "nullable": [] - }, - "hash": "18820f4ab0c3d2cc9187c5660f9f50e423eb6134659fe52bcc2b27ad16740c96" -} diff --git a/core/lib/dal/.sqlx/query-1c14d2def60fa5ff91788ddb55e68cee71742b732112038a642e2a07305053c2.json b/core/lib/dal/.sqlx/query-1c14d2def60fa5ff91788ddb55e68cee71742b732112038a642e2a07305053c2.json new file mode 100644 index 000000000000..71dfada3d59a --- /dev/null +++ b/core/lib/dal/.sqlx/query-1c14d2def60fa5ff91788ddb55e68cee71742b732112038a642e2a07305053c2.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE transactions\n SET\n in_mempool = FALSE\n WHERE\n in_mempool = TRUE\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "1c14d2def60fa5ff91788ddb55e68cee71742b732112038a642e2a07305053c2" +} diff --git a/core/lib/dal/.sqlx/query-2b1aa207a058f66265acf2c21b8ed5d8007789c0fc1eab948f6d7339dfb69147.json b/core/lib/dal/.sqlx/query-2b1aa207a058f66265acf2c21b8ed5d8007789c0fc1eab948f6d7339dfb69147.json new file mode 100644 index 000000000000..96b48892516d --- /dev/null +++ b/core/lib/dal/.sqlx/query-2b1aa207a058f66265acf2c21b8ed5d8007789c0fc1eab948f6d7339dfb69147.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM transactions\n WHERE\n miniblock_number IS NULL\n AND received_at < NOW() - $1::INTERVAL\n AND is_priority = FALSE\n AND error IS NULL\n RETURNING\n hash\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "hash", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Interval" + ] + }, + "nullable": [ + false + ] + }, + "hash": "2b1aa207a058f66265acf2c21b8ed5d8007789c0fc1eab948f6d7339dfb69147" +} diff --git a/core/lib/dal/.sqlx/query-2eb617f3e34ac5b21f925053a45da2b4afc314a3b3e78b041b44c8a020a0ee12.json b/core/lib/dal/.sqlx/query-2eb617f3e34ac5b21f925053a45da2b4afc314a3b3e78b041b44c8a020a0ee12.json deleted file mode 100644 index 3baa8596a3ba..000000000000 --- a/core/lib/dal/.sqlx/query-2eb617f3e34ac5b21f925053a45da2b4afc314a3b3e78b041b44c8a020a0ee12.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE transactions\n SET\n in_mempool = FALSE\n FROM\n UNNEST($1::bytea[]) AS s (address)\n WHERE\n transactions.in_mempool = TRUE\n AND transactions.initiator_address = s.address\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "ByteaArray" - ] - }, - "nullable": [] - }, - "hash": "2eb617f3e34ac5b21f925053a45da2b4afc314a3b3e78b041b44c8a020a0ee12" -} diff --git a/core/lib/dal/.sqlx/query-546c729829083b7eba94fea742c162d717ffcf46fdf5d2ce5d32555353b6da6b.json b/core/lib/dal/.sqlx/query-546c729829083b7eba94fea742c162d717ffcf46fdf5d2ce5d32555353b6da6b.json new file mode 100644 index 000000000000..70b94f739098 --- /dev/null +++ b/core/lib/dal/.sqlx/query-546c729829083b7eba94fea742c162d717ffcf46fdf5d2ce5d32555353b6da6b.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE transactions\n SET\n in_mempool = FALSE\n FROM\n UNNEST($1::bytea[]) AS s (address)\n WHERE\n transactions.in_mempool = TRUE\n AND transactions.initiator_address = s.address\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray" + ] + }, + "nullable": [] + }, + "hash": "546c729829083b7eba94fea742c162d717ffcf46fdf5d2ce5d32555353b6da6b" +} diff --git a/core/lib/dal/.sqlx/query-852aa5fe1c3b2dfe875cd4adf0d19a00c170cf7725d95dd6eb8b753fa5facec8.json b/core/lib/dal/.sqlx/query-6621de90a024cc85946f17948e5c171cd0e4d38bd6e9cfec58b2d7f53a3204e1.json similarity index 72% rename from core/lib/dal/.sqlx/query-852aa5fe1c3b2dfe875cd4adf0d19a00c170cf7725d95dd6eb8b753fa5facec8.json rename to core/lib/dal/.sqlx/query-6621de90a024cc85946f17948e5c171cd0e4d38bd6e9cfec58b2d7f53a3204e1.json index 6e582aac6536..8ba437fe2cee 100644 --- a/core/lib/dal/.sqlx/query-852aa5fe1c3b2dfe875cd4adf0d19a00c170cf7725d95dd6eb8b753fa5facec8.json +++ b/core/lib/dal/.sqlx/query-6621de90a024cc85946f17948e5c171cd0e4d38bd6e9cfec58b2d7f53a3204e1.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE transactions\n SET\n in_mempool = TRUE\n FROM\n (\n SELECT\n hash\n FROM\n (\n SELECT\n hash\n FROM\n transactions\n WHERE\n miniblock_number IS NULL\n AND in_mempool = FALSE\n AND error IS NULL\n AND (\n is_priority = TRUE\n OR (\n max_fee_per_gas >= $2\n AND gas_per_pubdata_limit >= $3\n )\n )\n AND tx_format != $4\n ORDER BY\n is_priority DESC,\n priority_op_id,\n received_at\n LIMIT\n $1\n ) AS subquery1\n ORDER BY\n hash\n ) AS subquery2\n WHERE\n transactions.hash = subquery2.hash\n RETURNING\n transactions.*\n ", + "query": "\n UPDATE transactions\n SET\n in_mempool = TRUE\n FROM\n (\n SELECT\n hash\n FROM\n (\n SELECT\n hash\n FROM\n transactions\n WHERE\n miniblock_number IS NULL\n AND in_mempool = FALSE\n AND error IS NULL\n AND (\n is_priority = TRUE\n OR (\n max_fee_per_gas >= $2\n AND gas_per_pubdata_limit >= $3\n )\n )\n AND tx_format != $4\n ORDER BY\n is_priority DESC,\n priority_op_id,\n received_at\n LIMIT\n $1\n ) AS subquery1\n ORDER BY\n hash\n ) AS subquery2\n WHERE\n transactions.hash = subquery2.hash\n RETURNING\n transactions.*\n ", "describe": { "columns": [ { @@ -231,5 +231,5 @@ true ] }, - "hash": "852aa5fe1c3b2dfe875cd4adf0d19a00c170cf7725d95dd6eb8b753fa5facec8" + "hash": "6621de90a024cc85946f17948e5c171cd0e4d38bd6e9cfec58b2d7f53a3204e1" } diff --git a/core/lib/dal/.sqlx/query-3b0af308b0ce95a13a4eed40834279601234a489f73d843f2f314252ed4cb8b0.json b/core/lib/dal/.sqlx/query-a45dd1d1344bf6b3447a2d72949067ee3bff92b04766e98336ca9338a19aef46.json similarity index 54% rename from core/lib/dal/.sqlx/query-3b0af308b0ce95a13a4eed40834279601234a489f73d843f2f314252ed4cb8b0.json rename to core/lib/dal/.sqlx/query-a45dd1d1344bf6b3447a2d72949067ee3bff92b04766e98336ca9338a19aef46.json index 39781954d486..69ce70a7f4a2 100644 --- a/core/lib/dal/.sqlx/query-3b0af308b0ce95a13a4eed40834279601234a489f73d843f2f314252ed4cb8b0.json +++ b/core/lib/dal/.sqlx/query-a45dd1d1344bf6b3447a2d72949067ee3bff92b04766e98336ca9338a19aef46.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n hashed_key,\n value AS \"value!\"\n FROM\n storage\n WHERE\n hashed_key = ANY ($1)\n ", + "query": "\n SELECT\n hashed_key,\n value AS \"value!\"\n FROM\n storage\n WHERE\n hashed_key = ANY ($1)\n ", "describe": { "columns": [ { @@ -24,5 +24,5 @@ false ] }, - "hash": "3b0af308b0ce95a13a4eed40834279601234a489f73d843f2f314252ed4cb8b0" + "hash": "a45dd1d1344bf6b3447a2d72949067ee3bff92b04766e98336ca9338a19aef46" } diff --git a/core/lib/dal/.sqlx/query-ba2343a38e37d104786f9276d91f67d2ef1428c61ae84003c9b52b03204d1f0a.json b/core/lib/dal/.sqlx/query-ba2343a38e37d104786f9276d91f67d2ef1428c61ae84003c9b52b03204d1f0a.json new file mode 100644 index 000000000000..ff6082651184 --- /dev/null +++ b/core/lib/dal/.sqlx/query-ba2343a38e37d104786f9276d91f67d2ef1428c61ae84003c9b52b03204d1f0a.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM transactions\n WHERE\n in_mempool = TRUE\n AND initiator_address = ANY ($1)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray" + ] + }, + "nullable": [] + }, + "hash": "ba2343a38e37d104786f9276d91f67d2ef1428c61ae84003c9b52b03204d1f0a" +} diff --git a/core/lib/dal/.sqlx/query-878c9cdfd69ad8988d049041edd63595237a0c54f67b8c669dfbb4fca32757e4.json b/core/lib/dal/.sqlx/query-de3a5a35b111bd831bebe6d9391bf58b311d81c5fc0a6fbaccf204976f76f378.json similarity index 53% rename from core/lib/dal/.sqlx/query-878c9cdfd69ad8988d049041edd63595237a0c54f67b8c669dfbb4fca32757e4.json rename to core/lib/dal/.sqlx/query-de3a5a35b111bd831bebe6d9391bf58b311d81c5fc0a6fbaccf204976f76f378.json index 9dde4d74ed11..de753a93926f 100644 --- a/core/lib/dal/.sqlx/query-878c9cdfd69ad8988d049041edd63595237a0c54f67b8c669dfbb4fca32757e4.json +++ b/core/lib/dal/.sqlx/query-de3a5a35b111bd831bebe6d9391bf58b311d81c5fc0a6fbaccf204976f76f378.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n l2_address\n FROM\n tokens\n ", + "query": "\n SELECT\n l2_address\n FROM\n tokens\n ", "describe": { "columns": [ { @@ -16,5 +16,5 @@ false ] }, - "hash": "878c9cdfd69ad8988d049041edd63595237a0c54f67b8c669dfbb4fca32757e4" + "hash": "de3a5a35b111bd831bebe6d9391bf58b311d81c5fc0a6fbaccf204976f76f378" } diff --git a/core/lib/dal/.sqlx/query-e63cc86a8d527dae2905b2af6a66bc6419ba51514519652e055c769b096015f6.json b/core/lib/dal/.sqlx/query-e63cc86a8d527dae2905b2af6a66bc6419ba51514519652e055c769b096015f6.json deleted file mode 100644 index 3176fb6ac3e8..000000000000 --- a/core/lib/dal/.sqlx/query-e63cc86a8d527dae2905b2af6a66bc6419ba51514519652e055c769b096015f6.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM transactions\n WHERE\n miniblock_number IS NULL\n AND received_at < NOW() - $1::INTERVAL\n AND is_priority = FALSE\n AND error IS NULL\n RETURNING\n hash\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "hash", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Interval" - ] - }, - "nullable": [ - false - ] - }, - "hash": "e63cc86a8d527dae2905b2af6a66bc6419ba51514519652e055c769b096015f6" -} diff --git a/core/lib/dal/.sqlx/query-e76217231b4d896118e9630de9485b19e1294b3aa6e084d2051bb532408672be.json b/core/lib/dal/.sqlx/query-e76217231b4d896118e9630de9485b19e1294b3aa6e084d2051bb532408672be.json deleted file mode 100644 index 831a67cbee99..000000000000 --- a/core/lib/dal/.sqlx/query-e76217231b4d896118e9630de9485b19e1294b3aa6e084d2051bb532408672be.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE transactions\n SET\n in_mempool = FALSE\n WHERE\n in_mempool = TRUE\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [] - }, - "nullable": [] - }, - "hash": "e76217231b4d896118e9630de9485b19e1294b3aa6e084d2051bb532408672be" -} diff --git a/core/lib/dal/src/storage_dal.rs b/core/lib/dal/src/storage_dal.rs index eaefdaef0322..106a7b2a5d4e 100644 --- a/core/lib/dal/src/storage_dal.rs +++ b/core/lib/dal/src/storage_dal.rs @@ -222,10 +222,9 @@ impl StorageDal<'_, '_> { } /// Gets the current storage value at the specified `key`. - pub async fn get_by_key(&mut self, key: &StorageKey) -> Option { + pub async fn get_by_key(&mut self, key: &StorageKey) -> sqlx::Result> { let hashed_key = key.hashed_key(); - - sqlx::query!( + let row = sqlx::query!( r#" SELECT value @@ -240,9 +239,9 @@ impl StorageDal<'_, '_> { .report_latency() .with_arg("key", &hashed_key) .fetch_optional(self.storage.conn()) - .await - .unwrap() - .map(|row| H256::from_slice(&row.value)) + .await?; + + Ok(row.map(|row| H256::from_slice(&row.value))) } /// Removes all factory deps with a miniblock number strictly greater than the specified `block_number`. @@ -284,8 +283,8 @@ mod tests { conn.storage_dal().apply_storage_logs(&updates).await; let first_value = conn.storage_dal().get_by_key(&first_key).await.unwrap(); - assert_eq!(first_value, H256::repeat_byte(1)); + assert_eq!(first_value, Some(H256::repeat_byte(1))); let second_value = conn.storage_dal().get_by_key(&second_key).await.unwrap(); - assert_eq!(second_value, H256::repeat_byte(2)); + assert_eq!(second_value, Some(H256::repeat_byte(2))); } } diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index 21572f3d3c0f..ad756dd08197 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -808,11 +808,11 @@ mod tests { insert_miniblock(conn, 2, logs).await; let value = conn.storage_dal().get_by_key(&key).await.unwrap(); - assert_eq!(value, H256::repeat_byte(0xff)); + assert_eq!(value, Some(H256::repeat_byte(0xff))); let value = conn.storage_dal().get_by_key(&second_key).await.unwrap(); - assert_eq!(value, H256::zero()); + assert_eq!(value, Some(H256::zero())); let value = conn.storage_dal().get_by_key(&new_key).await.unwrap(); - assert_eq!(value, H256::repeat_byte(0xfe)); + assert_eq!(value, Some(H256::repeat_byte(0xfe))); let prev_keys = vec![key.hashed_key(), new_key.hashed_key(), H256::zero()]; let prev_values = conn @@ -829,11 +829,11 @@ mod tests { .await; let value = conn.storage_dal().get_by_key(&key).await.unwrap(); - assert_eq!(value, H256::repeat_byte(3)); + assert_eq!(value, Some(H256::repeat_byte(3))); let value = conn.storage_dal().get_by_key(&second_key).await.unwrap(); - assert_eq!(value, H256::repeat_byte(2)); - let value = conn.storage_dal().get_by_key(&new_key).await; - assert!(value.is_none()); + assert_eq!(value, Some(H256::repeat_byte(2))); + let value = conn.storage_dal().get_by_key(&new_key).await.unwrap(); + assert_eq!(value, None); } #[tokio::test] diff --git a/core/lib/dal/src/tests/mod.rs b/core/lib/dal/src/tests/mod.rs index 5f6d2fddb27b..5b285ff04f8b 100644 --- a/core/lib/dal/src/tests/mod.rs +++ b/core/lib/dal/src/tests/mod.rs @@ -204,11 +204,11 @@ async fn remove_stuck_txs() { .await; // Get all txs - transactions_dal.reset_mempool().await; - let txs = transactions_dal - .sync_mempool(vec![], vec![], 0, 0, 1000) + transactions_dal.reset_mempool().await.unwrap(); + let (txs, _) = transactions_dal + .sync_mempool(&[], &[], 0, 0, 1000) .await - .0; + .unwrap(); assert_eq!(txs.len(), 4); let storage = transactions_dal.storage; @@ -227,23 +227,24 @@ async fn remove_stuck_txs() { .await; // Get all txs - transactions_dal.reset_mempool().await; - let txs = transactions_dal - .sync_mempool(vec![], vec![], 0, 0, 1000) + transactions_dal.reset_mempool().await.unwrap(); + let (txs, _) = transactions_dal + .sync_mempool(&[], &[], 0, 0, 1000) .await - .0; + .unwrap(); assert_eq!(txs.len(), 3); // Remove one stuck tx let removed_txs = transactions_dal .remove_stuck_txs(Duration::from_secs(500)) - .await; + .await + .unwrap(); assert_eq!(removed_txs, 1); - transactions_dal.reset_mempool().await; - let txs = transactions_dal - .sync_mempool(vec![], vec![], 0, 0, 1000) + transactions_dal.reset_mempool().await.unwrap(); + let (txs, _) = transactions_dal + .sync_mempool(&[], &[], 0, 0, 1000) .await - .0; + .unwrap(); assert_eq!(txs.len(), 2); // We shouldn't collect executed tx diff --git a/core/lib/dal/src/tokens_dal.rs b/core/lib/dal/src/tokens_dal.rs index 96072bc2ec4c..1910ecbe9437 100644 --- a/core/lib/dal/src/tokens_dal.rs +++ b/core/lib/dal/src/tokens_dal.rs @@ -112,25 +112,22 @@ impl TokensDal<'_, '_> { } } - pub async fn get_all_l2_token_addresses(&mut self) -> Vec
{ - { - let records = sqlx::query!( - r#" - SELECT - l2_address - FROM - tokens - "# - ) - .fetch_all(self.storage.conn()) - .await - .unwrap(); - let addresses: Vec
= records - .into_iter() - .map(|record| Address::from_slice(&record.l2_address)) - .collect(); - addresses - } + pub async fn get_all_l2_token_addresses(&mut self) -> sqlx::Result> { + let rows = sqlx::query!( + r#" + SELECT + l2_address + FROM + tokens + "# + ) + .fetch_all(self.storage.conn()) + .await?; + + Ok(rows + .into_iter() + .map(|row| Address::from_slice(&row.l2_address)) + .collect()) } pub async fn get_unknown_l1_token_addresses(&mut self) -> Vec
{ diff --git a/core/lib/dal/src/transactions_dal.rs b/core/lib/dal/src/transactions_dal.rs index 19c19646bbfa..707cc1a007bc 100644 --- a/core/lib/dal/src/transactions_dal.rs +++ b/core/lib/dal/src/transactions_dal.rs @@ -856,184 +856,171 @@ impl TransactionsDal<'_, '_> { } } - pub async fn remove_stuck_txs(&mut self, stuck_tx_timeout: Duration) -> usize { - { - let stuck_tx_timeout = pg_interval_from_duration(stuck_tx_timeout); - sqlx::query!( - r#" - DELETE FROM transactions - WHERE - miniblock_number IS NULL - AND received_at < NOW() - $1::INTERVAL - AND is_priority = FALSE - AND error IS NULL - RETURNING - hash - "#, - stuck_tx_timeout - ) - .fetch_all(self.storage.conn()) - .await - .unwrap() - .len() - } + pub async fn remove_stuck_txs(&mut self, stuck_tx_timeout: Duration) -> sqlx::Result { + let stuck_tx_timeout = pg_interval_from_duration(stuck_tx_timeout); + let rows = sqlx::query!( + r#" + DELETE FROM transactions + WHERE + miniblock_number IS NULL + AND received_at < NOW() - $1::INTERVAL + AND is_priority = FALSE + AND error IS NULL + RETURNING + hash + "#, + stuck_tx_timeout + ) + .fetch_all(self.storage.conn()) + .await?; + + Ok(rows.len()) } - /// Fetches new updates for mempool - /// Returns new transactions and current nonces for related accounts - /// Latter is only used to bootstrap mempool for given account + /// Fetches new updates for mempool. Returns new transactions and current nonces for related accounts; + /// the latter are only used to bootstrap mempool for given account. pub async fn sync_mempool( &mut self, - stashed_accounts: Vec
, - purged_accounts: Vec
, + stashed_accounts: &[Address], + purged_accounts: &[Address], gas_per_pubdata: u32, fee_per_gas: u64, limit: usize, - ) -> (Vec, HashMap) { - { - let stashed_addresses: Vec<_> = - stashed_accounts.into_iter().map(|a| a.0.to_vec()).collect(); - sqlx::query!( - r#" - UPDATE transactions - SET - in_mempool = FALSE - FROM - UNNEST($1::bytea[]) AS s (address) - WHERE - transactions.in_mempool = TRUE - AND transactions.initiator_address = s.address - "#, - &stashed_addresses, - ) - .execute(self.storage.conn()) - .await - .unwrap(); + ) -> sqlx::Result<(Vec, HashMap)> { + let stashed_addresses: Vec<_> = stashed_accounts.iter().map(Address::as_bytes).collect(); + sqlx::query!( + r#" + UPDATE transactions + SET + in_mempool = FALSE + FROM + UNNEST($1::bytea[]) AS s (address) + WHERE + transactions.in_mempool = TRUE + AND transactions.initiator_address = s.address + "#, + &stashed_addresses as &[&[u8]], + ) + .execute(self.storage.conn()) + .await?; - let purged_addresses: Vec<_> = - purged_accounts.into_iter().map(|a| a.0.to_vec()).collect(); - sqlx::query!( - r#" - DELETE FROM transactions - WHERE - in_mempool = TRUE - AND initiator_address = ANY ($1) - "#, - &purged_addresses[..] - ) - .execute(self.storage.conn()) - .await - .unwrap(); + let purged_addresses: Vec<_> = purged_accounts.iter().map(Address::as_bytes).collect(); + sqlx::query!( + r#" + DELETE FROM transactions + WHERE + in_mempool = TRUE + AND initiator_address = ANY ($1) + "#, + &purged_addresses as &[&[u8]] + ) + .execute(self.storage.conn()) + .await?; - // Note, that transactions are updated in order of their hashes to avoid deadlocks with other UPDATE queries. - let transactions = sqlx::query_as!( - StorageTransaction, - r#" - UPDATE transactions - SET - in_mempool = TRUE - FROM - ( - SELECT - hash - FROM - ( - SELECT - hash - FROM - transactions - WHERE - miniblock_number IS NULL - AND in_mempool = FALSE - AND error IS NULL - AND ( - is_priority = TRUE - OR ( - max_fee_per_gas >= $2 - AND gas_per_pubdata_limit >= $3 - ) + // Note, that transactions are updated in order of their hashes to avoid deadlocks with other UPDATE queries. + let transactions = sqlx::query_as!( + StorageTransaction, + r#" + UPDATE transactions + SET + in_mempool = TRUE + FROM + ( + SELECT + hash + FROM + ( + SELECT + hash + FROM + transactions + WHERE + miniblock_number IS NULL + AND in_mempool = FALSE + AND error IS NULL + AND ( + is_priority = TRUE + OR ( + max_fee_per_gas >= $2 + AND gas_per_pubdata_limit >= $3 ) - AND tx_format != $4 - ORDER BY - is_priority DESC, - priority_op_id, - received_at - LIMIT - $1 - ) AS subquery1 - ORDER BY - hash - ) AS subquery2 - WHERE - transactions.hash = subquery2.hash - RETURNING - transactions.* - "#, - limit as i32, - BigDecimal::from(fee_per_gas), - BigDecimal::from(gas_per_pubdata), - PROTOCOL_UPGRADE_TX_TYPE as i32, - ) - .fetch_all(self.storage.conn()) - .await - .unwrap(); + ) + AND tx_format != $4 + ORDER BY + is_priority DESC, + priority_op_id, + received_at + LIMIT + $1 + ) AS subquery1 + ORDER BY + hash + ) AS subquery2 + WHERE + transactions.hash = subquery2.hash + RETURNING + transactions.* + "#, + limit as i32, + BigDecimal::from(fee_per_gas), + BigDecimal::from(gas_per_pubdata), + PROTOCOL_UPGRADE_TX_TYPE as i32, + ) + .fetch_all(self.storage.conn()) + .await?; - let nonce_keys: HashMap<_, _> = transactions - .iter() - .map(|tx| { - let address = Address::from_slice(&tx.initiator_address); - let nonce_key = get_nonce_key(&address).hashed_key(); - (nonce_key, address) - }) - .collect(); + let nonce_keys: HashMap<_, _> = transactions + .iter() + .map(|tx| { + let address = Address::from_slice(&tx.initiator_address); + let nonce_key = get_nonce_key(&address).hashed_key(); + (nonce_key, address) + }) + .collect(); - let storage_keys: Vec<_> = nonce_keys.keys().map(|key| key.0.to_vec()).collect(); - let nonces: HashMap<_, _> = sqlx::query!( - r#" - SELECT - hashed_key, - value AS "value!" - FROM - storage - WHERE - hashed_key = ANY ($1) - "#, - &storage_keys, - ) - .fetch_all(self.storage.conn()) - .await - .unwrap() + let storage_keys: Vec<_> = nonce_keys.keys().map(H256::as_bytes).collect(); + let nonce_rows = sqlx::query!( + r#" + SELECT + hashed_key, + value AS "value!" + FROM + storage + WHERE + hashed_key = ANY ($1) + "#, + &storage_keys as &[&[u8]], + ) + .fetch_all(self.storage.conn()) + .await?; + + let nonces = nonce_rows .into_iter() .map(|row| { let nonce_key = H256::from_slice(&row.hashed_key); let nonce = Nonce(h256_to_u32(H256::from_slice(&row.value))); - - (*nonce_keys.get(&nonce_key).unwrap(), nonce) + (nonce_keys[&nonce_key], nonce) }) .collect(); - - ( - transactions.into_iter().map(|tx| tx.into()).collect(), - nonces, - ) - } + Ok(( + transactions.into_iter().map(|tx| tx.into()).collect(), + nonces, + )) } - pub async fn reset_mempool(&mut self) { - { - sqlx::query!( - r#" - UPDATE transactions - SET - in_mempool = FALSE - WHERE - in_mempool = TRUE - "# - ) - .execute(self.storage.conn()) - .await - .unwrap(); - } + pub async fn reset_mempool(&mut self) -> sqlx::Result<()> { + sqlx::query!( + r#" + UPDATE transactions + SET + in_mempool = FALSE + WHERE + in_mempool = TRUE + "# + ) + .execute(self.storage.conn()) + .await?; + Ok(()) } pub async fn get_last_processed_l1_block(&mut self) -> Option { diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs index 45743397695b..25b01495eb74 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs @@ -53,14 +53,14 @@ pub(super) fn apply_vm_in_sandbox( &mut VmInstance>, HistoryDisabled>, Transaction, ) -> T, -) -> T { +) -> anyhow::Result { let stage_started_at = Instant::now(); let span = tracing::debug_span!("initialization").entered(); let rt_handle = vm_permit.rt_handle(); let mut connection = rt_handle .block_on(connection_pool.access_storage_tagged("api")) - .unwrap(); + .context("failed acquiring DB connection")?; let connection_acquire_time = stage_started_at.elapsed(); // We don't want to emit too many logs. if connection_acquire_time > Duration::from_millis(10) { @@ -78,7 +78,7 @@ pub(super) fn apply_vm_in_sandbox( protocol_version, } = rt_handle .block_on(block_args.resolve_block_info(&mut connection)) - .expect("Failed resolving block numbers"); + .with_context(|| format!("failed resolving block numbers for {block_args:?}"))?; let resolve_time = resolve_started_at.elapsed(); // We don't want to emit too many logs. if resolve_time > Duration::from_millis(10) { @@ -95,8 +95,12 @@ pub(super) fn apply_vm_in_sandbox( } let mut l2_block_info_to_reset = None; - let current_l2_block_info = - rt_handle.block_on(read_l2_block_info(&mut connection, state_l2_block_number)); + let current_l2_block_info = rt_handle + .block_on(StoredL2BlockInfo::new( + &mut connection, + state_l2_block_number, + )) + .context("failed reading L2 block info")?; let next_l2_block_info = if block_args.is_pending_miniblock() { L2BlockEnv { number: current_l2_block_info.l2_block_number + 1, @@ -120,10 +124,13 @@ pub(super) fn apply_vm_in_sandbox( } else { // We need to reset L2 block info in storage to process transaction in the current block context. // Actual resetting will be done after `storage_view` is created. - let prev_l2_block_info = rt_handle.block_on(read_l2_block_info( - &mut connection, - state_l2_block_number - 1, - )); + let prev_l2_block_info = rt_handle + .block_on(StoredL2BlockInfo::new( + &mut connection, + state_l2_block_number - 1, + )) + .context("failed reading previous L2 block info")?; + l2_block_info_to_reset = Some(prev_l2_block_info); L2BlockEnv { number: current_l2_block_info.l2_block_number, @@ -209,7 +216,6 @@ pub(super) fn apply_vm_in_sandbox( default_validation_computational_gas_limit: validation_computational_gas_limit, chain_id, }; - let l1_batch_env = L1BatchEnv { previous_batch_hash: None, number: vm_l1_batch_number, @@ -249,54 +255,56 @@ pub(super) fn apply_vm_in_sandbox( ); drop(vm_permit); // Ensure that the permit lives until this point - result + Ok(result) } #[derive(Debug, Clone, Copy)] struct StoredL2BlockInfo { - pub l2_block_number: u32, - pub l2_block_timestamp: u64, - pub l2_block_hash: H256, - pub txs_rolling_hash: H256, + l2_block_number: u32, + l2_block_timestamp: u64, + l2_block_hash: H256, + txs_rolling_hash: H256, } -async fn read_l2_block_info( - connection: &mut StorageProcessor<'_>, - miniblock_number: MiniblockNumber, -) -> StoredL2BlockInfo { - let l2_block_info_key = StorageKey::new( - AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), - SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, - ); - let l2_block_info = connection - .storage_web3_dal() - .get_historical_value_unchecked(&l2_block_info_key, miniblock_number) - .await - .unwrap(); - let (l2_block_number, l2_block_timestamp) = unpack_block_info(h256_to_u256(l2_block_info)); - - let l2_block_txs_rolling_hash_key = StorageKey::new( - AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), - SYSTEM_CONTEXT_CURRENT_TX_ROLLING_HASH_POSITION, - ); - let txs_rolling_hash = connection - .storage_web3_dal() - .get_historical_value_unchecked(&l2_block_txs_rolling_hash_key, miniblock_number) - .await - .unwrap(); - - let l2_block_hash = connection - .blocks_web3_dal() - .get_miniblock_hash(miniblock_number) - .await - .unwrap() - .unwrap(); - - StoredL2BlockInfo { - l2_block_number: l2_block_number as u32, - l2_block_timestamp, - l2_block_hash, - txs_rolling_hash, +impl StoredL2BlockInfo { + async fn new( + connection: &mut StorageProcessor<'_>, + miniblock_number: MiniblockNumber, + ) -> anyhow::Result { + let l2_block_info_key = StorageKey::new( + AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), + SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, + ); + let l2_block_info = connection + .storage_web3_dal() + .get_historical_value_unchecked(&l2_block_info_key, miniblock_number) + .await + .context("failed reading L2 block info from VM state")?; + let (l2_block_number, l2_block_timestamp) = unpack_block_info(h256_to_u256(l2_block_info)); + + let l2_block_txs_rolling_hash_key = StorageKey::new( + AccountTreeId::new(SYSTEM_CONTEXT_ADDRESS), + SYSTEM_CONTEXT_CURRENT_TX_ROLLING_HASH_POSITION, + ); + let txs_rolling_hash = connection + .storage_web3_dal() + .get_historical_value_unchecked(&l2_block_txs_rolling_hash_key, miniblock_number) + .await + .context("failed reading transaction rolling hash from VM state")?; + + let l2_block_hash = connection + .blocks_web3_dal() + .get_miniblock_hash(miniblock_number) + .await + .with_context(|| format!("failed getting hash for miniblock #{miniblock_number}"))? + .with_context(|| format!("miniblock #{miniblock_number} disappeared from Postgres"))?; + + Ok(Self { + l2_block_number: l2_block_number as u32, + l2_block_timestamp, + l2_block_hash, + txs_rolling_hash, + }) } } diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/execute.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/execute.rs index 80c5fcd979a6..20b5d7815507 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/execute.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/execute.rs @@ -1,5 +1,6 @@ //! Implementation of "executing" methods, e.g. `eth_call`. +use anyhow::Context as _; use multivm::{ interface::{TxExecutionMode, VmExecutionResultAndLogs, VmInterface}, tracers::StorageInvocations, @@ -75,6 +76,16 @@ impl TxExecutionArgs { } } +#[derive(Debug, Clone)] +pub(crate) struct TransactionExecutionOutput { + /// Output of the VM. + pub vm: VmExecutionResultAndLogs, + /// Execution metrics. + pub metrics: TransactionExecutionMetrics, + /// Were published bytecodes OK? + pub are_published_bytecodes_ok: bool, +} + /// Executor of transactions. #[derive(Debug)] pub(crate) enum TransactionExecutor { @@ -101,7 +112,7 @@ impl TransactionExecutor { tx: Transaction, block_args: BlockArgs, custom_tracers: Vec, - ) -> (VmExecutionResultAndLogs, TransactionExecutionMetrics, bool) { + ) -> anyhow::Result { #[cfg(test)] if let Self::Mock(mock_executor) = self { return mock_executor.execute_tx(&tx); @@ -142,15 +153,15 @@ impl TransactionExecutor { result }) .await - .unwrap(); + .context("transaction execution panicked")??; - let tx_execution_metrics = + let metrics = vm_metrics::collect_tx_execution_metrics(total_factory_deps, &execution_result); - ( - execution_result, - tx_execution_metrics, - published_bytecodes.is_ok(), - ) + Ok(TransactionExecutionOutput { + vm: execution_result, + metrics, + are_published_bytecodes_ok: published_bytecodes.is_ok(), + }) } #[allow(clippy::too_many_arguments)] @@ -163,7 +174,7 @@ impl TransactionExecutor { block_args: BlockArgs, vm_execution_cache_misses_limit: Option, custom_tracers: Vec, - ) -> VmExecutionResultAndLogs { + ) -> anyhow::Result { let enforced_base_fee = tx.common_data.fee.max_fee_per_gas.as_u64(); let execution_args = TxExecutionArgs::for_eth_call(enforced_base_fee, vm_execution_cache_misses_limit); @@ -176,7 +187,7 @@ impl TransactionExecutor { // limiting the amount of gas the call can use. // We can't use `BLOCK_ERGS_LIMIT` here since the VM itself has some overhead. tx.common_data.fee.gas_limit = ETH_CALL_GAS_LIMIT.into(); - let (vm_result, ..) = self + let output = self .execute_tx_in_sandbox( vm_permit, shared_args, @@ -187,8 +198,7 @@ impl TransactionExecutor { block_args, custom_tracers, ) - .await; - - vm_result + .await?; + Ok(output.vm) } } diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs index 95c6793d0662..3c3a8e9a36ad 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs @@ -1,8 +1,8 @@ use std::{sync::Arc, time::Duration}; -use anyhow::Context; +use anyhow::Context as _; use tokio::runtime::Handle; -use zksync_dal::{ConnectionPool, SqlxError, StorageProcessor}; +use zksync_dal::{ConnectionPool, StorageProcessor}; use zksync_state::{PostgresStorage, PostgresStorageCaches, ReadStorage, StorageView}; use zksync_system_constants::PUBLISH_BYTECODE_OVERHEAD; use zksync_types::{ @@ -15,6 +15,7 @@ pub(super) use self::{ error::SandboxExecutionError, execute::{TransactionExecutor, TxExecutionArgs}, tracers::ApiTracer, + validate::ValidationError, vm_metrics::{SubmitTxStage, SANDBOX_METRICS}, }; use super::tx_sender::MultiVMBaseSystemContracts; @@ -149,15 +150,15 @@ impl VmConcurrencyLimiter { async fn get_pending_state( connection: &mut StorageProcessor<'_>, -) -> (api::BlockId, MiniblockNumber) { +) -> anyhow::Result<(api::BlockId, MiniblockNumber)> { let block_id = api::BlockId::Number(api::BlockNumber::Pending); let resolved_block_number = connection .blocks_web3_dal() .resolve_block_id(block_id) .await - .unwrap() - .expect("Pending block should be present"); - (block_id, resolved_block_number) + .with_context(|| format!("failed resolving block ID {block_id:?}"))? + .context("pending block should always be present in Postgres")?; + Ok((block_id, resolved_block_number)) } /// Returns the number of the pubdata that the transaction will spend on factory deps. @@ -166,14 +167,17 @@ pub(super) async fn get_pubdata_for_factory_deps( connection_pool: &ConnectionPool, factory_deps: &[Vec], storage_caches: PostgresStorageCaches, -) -> u32 { +) -> anyhow::Result { if factory_deps.is_empty() { - return 0; // Shortcut for the common case allowing to not acquire DB connections etc. + return Ok(0); // Shortcut for the common case allowing to not acquire DB connections etc. } - let mut connection = connection_pool.access_storage_tagged("api").await.unwrap(); - let (_, block_number) = get_pending_state(&mut connection).await; - drop(connection); + let mut storage = connection_pool + .access_storage_tagged("api") + .await + .context("failed acquiring DB connection")?; + let (_, block_number) = get_pending_state(&mut storage).await?; + drop(storage); let rt_handle = Handle::current(); let connection_pool = connection_pool.clone(); @@ -181,7 +185,7 @@ pub(super) async fn get_pubdata_for_factory_deps( tokio::task::spawn_blocking(move || { let connection = rt_handle .block_on(connection_pool.access_storage_tagged("api")) - .unwrap(); + .context("failed acquiring DB connection")?; let storage = PostgresStorage::new(rt_handle, connection, block_number, false) .with_caches(storage_caches); let mut storage_view = StorageView::new(storage); @@ -198,10 +202,10 @@ pub(super) async fn get_pubdata_for_factory_deps( }; length as u32 + PUBLISH_BYTECODE_OVERHEAD }); - effective_lengths.sum() + anyhow::Ok(effective_lengths.sum()) }) .await - .unwrap() + .context("computing pubdata dependencies size panicked")? } /// Arguments for VM execution not specific to a particular transaction. @@ -268,7 +272,7 @@ pub(crate) enum BlockArgsError { #[error("Block is missing, but can appear in the future")] Missing, #[error("Database error")] - Database(#[from] SqlxError), + Database(#[from] anyhow::Error), } /// Information about a block provided to VM. @@ -280,13 +284,13 @@ pub(crate) struct BlockArgs { } impl BlockArgs { - pub(crate) async fn pending(connection: &mut StorageProcessor<'_>) -> Self { - let (block_id, resolved_block_number) = get_pending_state(connection).await; - Self { + pub(crate) async fn pending(connection: &mut StorageProcessor<'_>) -> anyhow::Result { + let (block_id, resolved_block_number) = get_pending_state(connection).await?; + Ok(Self { block_id, resolved_block_number, l1_batch_timestamp_s: None, - } + }) } /// Loads block information from DB. @@ -303,25 +307,30 @@ impl BlockArgs { .map_err(BlockArgsError::Pruned)?; if block_id == api::BlockId::Number(api::BlockNumber::Pending) { - return Ok(BlockArgs::pending(connection).await); + return Ok(BlockArgs::pending(connection).await?); } let resolved_block_number = connection .blocks_web3_dal() .resolve_block_id(block_id) - .await?; + .await + .with_context(|| format!("failed resolving block ID {block_id:?}"))?; let Some(resolved_block_number) = resolved_block_number else { return Err(BlockArgsError::Missing); }; - let l1_batch_number = connection + let l1_batch = connection .storage_web3_dal() .resolve_l1_batch_number_of_miniblock(resolved_block_number) - .await?; + .await + .with_context(|| { + format!("failed resolving L1 batch number of miniblock #{resolved_block_number}") + })?; let l1_batch_timestamp_s = connection .blocks_web3_dal() - .get_expected_l1_batch_timestamp(&l1_batch_number) - .await?; + .get_expected_l1_batch_timestamp(&l1_batch) + .await + .with_context(|| format!("failed getting timestamp for {l1_batch:?}"))?; if l1_batch_timestamp_s.is_none() { // Can happen after snapshot recovery if no miniblocks are persisted yet. In this case, // we cannot proceed; the issue will be resolved shortly. diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/testonly.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/testonly.rs index c9780c42e044..4eb33d45f2c5 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/testonly.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/testonly.rs @@ -1,43 +1,47 @@ use std::collections::HashMap; -use multivm::{ - interface::{ExecutionResult, VmExecutionResultAndLogs}, - tracers::validator::ValidationError, -}; +use multivm::interface::{ExecutionResult, VmExecutionResultAndLogs}; use zksync_types::{ fee::TransactionExecutionMetrics, l2::L2Tx, ExecuteTransactionCommon, Transaction, H256, }; -use super::TransactionExecutor; - -type MockExecutionOutput = (VmExecutionResultAndLogs, TransactionExecutionMetrics, bool); +use super::{ + execute::{TransactionExecutionOutput, TransactionExecutor}, + validate::ValidationError, +}; #[derive(Debug, Default)] pub(crate) struct MockTransactionExecutor { - call_responses: HashMap, MockExecutionOutput>, - tx_responses: HashMap, + call_responses: HashMap, TransactionExecutionOutput>, + tx_responses: HashMap, } impl MockTransactionExecutor { pub fn insert_call_response(&mut self, calldata: Vec, result: ExecutionResult) { - let result = VmExecutionResultAndLogs { - result, - logs: Default::default(), - statistics: Default::default(), - refunds: Default::default(), + let output = TransactionExecutionOutput { + vm: VmExecutionResultAndLogs { + result, + logs: Default::default(), + statistics: Default::default(), + refunds: Default::default(), + }, + metrics: TransactionExecutionMetrics::default(), + are_published_bytecodes_ok: true, }; - let output = (result, TransactionExecutionMetrics::default(), true); self.call_responses.insert(calldata, output); } pub fn insert_tx_response(&mut self, tx_hash: H256, result: ExecutionResult) { - let result = VmExecutionResultAndLogs { - result, - logs: Default::default(), - statistics: Default::default(), - refunds: Default::default(), + let output = TransactionExecutionOutput { + vm: VmExecutionResultAndLogs { + result, + logs: Default::default(), + statistics: Default::default(), + refunds: Default::default(), + }, + metrics: TransactionExecutionMetrics::default(), + are_published_bytecodes_ok: true, }; - let output = (result, TransactionExecutionMetrics::default(), true); self.tx_responses.insert(tx_hash, output); } @@ -48,22 +52,23 @@ impl MockTransactionExecutor { Ok(()) } - pub fn execute_tx(&self, tx: &Transaction) -> MockExecutionOutput { + pub fn execute_tx(&self, tx: &Transaction) -> anyhow::Result { if let ExecuteTransactionCommon::L2(data) = &tx.common_data { if data.input.is_none() { // `Transaction` was obtained from a `CallRequest` - return self + return Ok(self .call_responses .get(tx.execute.calldata()) .unwrap_or_else(|| panic!("Executing unexpected call: {tx:?}")) - .clone(); + .clone()); } } - self.tx_responses + Ok(self + .tx_responses .get(&tx.hash()) .unwrap_or_else(|| panic!("Executing unexpected transaction: {tx:?}")) - .clone() + .clone()) } } diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs index d81b4b940454..d18bf5dafd01 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/tests.rs @@ -22,7 +22,7 @@ async fn creating_block_args() { .await .unwrap(); - let pending_block_args = BlockArgs::pending(&mut storage).await; + let pending_block_args = BlockArgs::pending(&mut storage).await.unwrap(); assert_eq!( pending_block_args.block_id, api::BlockId::Number(api::BlockNumber::Pending) @@ -69,7 +69,7 @@ async fn creating_block_args_after_snapshot_recovery() { let mut storage = pool.access_storage().await.unwrap(); let snapshot_recovery = prepare_empty_recovery_snapshot(&mut storage, 23).await; - let pending_block_args = BlockArgs::pending(&mut storage).await; + let pending_block_args = BlockArgs::pending(&mut storage).await.unwrap(); assert_eq!( pending_block_args.block_id, api::BlockId::Number(api::BlockNumber::Pending) diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs index 419e9804f887..a2439816d817 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs @@ -1,9 +1,10 @@ use std::collections::HashSet; +use anyhow::Context as _; use multivm::{ interface::{ExecutionResult, VmExecutionMode, VmInterface}, tracers::{ - validator::{ValidationError, ValidationTracer, ValidationTracerParams}, + validator::{self, ValidationTracer, ValidationTracerParams}, StorageInvocations, }, vm_latest::HistoryDisabled, @@ -19,6 +20,16 @@ use super::{ BlockArgs, TxExecutionArgs, TxSharedArgs, VmPermit, }; +/// Validation error used by the sandbox. Besides validation errors returned by VM, it also includes an internal error +/// variant (e.g., for DB-related errors). +#[derive(Debug, thiserror::Error)] +pub(crate) enum ValidationError { + #[error("VM validation error: {0}")] + Vm(validator::ValidationError), + #[error("Internal error")] + Internal(#[from] anyhow::Error), +} + impl TransactionExecutor { pub(crate) async fn validate_tx_in_sandbox( &self, @@ -35,9 +46,14 @@ impl TransactionExecutor { } let stage_latency = SANDBOX_METRICS.sandbox[&SandboxStage::ValidateInSandbox].start(); - let mut connection = connection_pool.access_storage_tagged("api").await.unwrap(); + let mut connection = connection_pool + .access_storage_tagged("api") + .await + .context("failed acquiring DB connection")?; let validation_params = - get_validation_params(&mut connection, &tx, computational_gas_limit).await; + get_validation_params(&mut connection, &tx, computational_gas_limit) + .await + .context("failed getting validation params")?; drop(connection); let execution_args = TxExecutionArgs::for_validation(&tx); @@ -72,9 +88,11 @@ impl TransactionExecutor { ); let result = match (result.result, validation_result.get()) { - (_, Some(err)) => Err(ValidationError::ViolatedRule(err.clone())), + (_, Some(err)) => { + Err(validator::ValidationError::ViolatedRule(err.clone())) + } (ExecutionResult::Halt { reason }, _) => { - Err(ValidationError::FailedTx(reason)) + Err(validator::ValidationError::FailedTx(reason)) } (_, None) => Ok(()), }; @@ -88,28 +106,32 @@ impl TransactionExecutor { result }) .await - .unwrap(); + .context("transaction validation panicked")??; stage_latency.observe(); - validation_result + validation_result.map_err(ValidationError::Vm) } } -// Some slots can be marked as "trusted". That is needed for slots which can not be -// trusted to change between validation and execution in general case, but -// sometimes we can safely rely on them to not change often. +/// Some slots can be marked as "trusted". That is needed for slots which can not be +/// trusted to change between validation and execution in general case, but +/// sometimes we can safely rely on them to not change often. async fn get_validation_params( connection: &mut StorageProcessor<'_>, tx: &L2Tx, computational_gas_limit: u32, -) -> ValidationTracerParams { +) -> anyhow::Result { let method_latency = EXECUTION_METRICS.get_validation_params.start(); let user_address = tx.common_data.initiator_address; let paymaster_address = tx.common_data.paymaster_params.paymaster; // This method assumes that the number of tokens is relatively low. When it grows // we may need to introduce some kind of caching. - let all_tokens = connection.tokens_dal().get_all_l2_token_addresses().await; + let all_tokens = connection + .tokens_dal() + .get_all_l2_token_addresses() + .await + .context("failed getting addresses of L2 tokens")?; EXECUTION_METRICS.tokens_amount.set(all_tokens.len()); let span = tracing::debug_span!("compute_trusted_slots_for_validation").entered(); @@ -133,12 +155,12 @@ async fn get_validation_params( span.exit(); method_latency.observe(); - ValidationTracerParams { + Ok(ValidationTracerParams { user_address, paymaster_address, trusted_slots, trusted_addresses, trusted_address_slots, computational_gas_limit, - } + }) } diff --git a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs index 653e75b5feb3..7d5f1ab483fe 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs @@ -10,7 +10,7 @@ use multivm::{ }; use zksync_config::configs::{api::Web3JsonRpcConfig, chain::StateKeeperConfig}; use zksync_contracts::BaseSystemContracts; -use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool}; +use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, StorageProcessor}; use zksync_state::PostgresStorageCaches; use zksync_system_constants::DEFAULT_L2_TX_GAS_PER_PUBDATA_BYTE; use zksync_types::{ @@ -276,7 +276,14 @@ impl TxSender { self.0.storage_caches.clone() } - // TODO (PLA-725): propagate DB errors instead of panicking + async fn acquire_replica_connection(&self) -> anyhow::Result> { + self.0 + .replica_connection_pool + .access_storage_tagged("api") + .await + .context("failed acquiring connection to replica DB") + } + #[tracing::instrument(skip(self, tx))] pub async fn submit_tx(&self, tx: L2Tx) -> Result { let stage_latency = SANDBOX_METRICS.submit_tx[&SubmitTxStage::Validate].start(); @@ -287,16 +294,11 @@ impl TxSender { let shared_args = self.shared_args().await; let vm_permit = self.0.vm_concurrency_limiter.acquire().await; let vm_permit = vm_permit.ok_or(SubmitTxError::ServerShuttingDown)?; - let mut connection = self - .0 - .replica_connection_pool - .access_storage_tagged("api") - .await - .unwrap(); - let block_args = BlockArgs::pending(&mut connection).await; + let mut connection = self.acquire_replica_connection().await?; + let block_args = BlockArgs::pending(&mut connection).await?; drop(connection); - let (_, tx_metrics, published_bytecodes) = self + let execution_output = self .0 .executor .execute_tx_in_sandbox( @@ -309,12 +311,12 @@ impl TxSender { block_args, vec![], ) - .await; + .await?; tracing::info!( "Submit tx {:?} with execution metrics {:?}", tx.hash(), - tx_metrics + execution_output.metrics ); stage_latency.observe(); @@ -337,13 +339,12 @@ impl TxSender { if let Err(err) = validation_result { return Err(err.into()); } - - if !published_bytecodes { + if !execution_output.are_published_bytecodes_ok { return Err(SubmitTxError::FailedToPublishCompressedBytecodes); } let stage_started_at = Instant::now(); - self.ensure_tx_executable(tx.clone().into(), &tx_metrics, true)?; + self.ensure_tx_executable(tx.clone().into(), &execution_output.metrics, true)?; if let Some(proxy) = &self.0.proxy { // We're running an external node: we have to proxy the transaction to the main node. @@ -377,15 +378,19 @@ impl TxSender { .await .unwrap() .transactions_dal() - .insert_transaction_l2(tx, tx_metrics) + .insert_transaction_l2(tx, execution_output.metrics) .await; APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc(); match submission_res_handle { L2TxSubmissionResult::AlreadyExecuted => { - let Nonce(expected_nonce) = - self.get_expected_nonce(initiator_account).await.unwrap(); + let Nonce(expected_nonce) = self + .get_expected_nonce(initiator_account) + .await + .with_context(|| { + format!("failed getting expected nonce for {initiator_account:?}") + })?; Err(SubmitTxError::NonceIsTooLow( expected_nonce, expected_nonce + self.0.sender_config.max_nonce_ahead, @@ -481,7 +486,12 @@ impl TxSender { let Nonce(expected_nonce) = self .get_expected_nonce(tx.initiator_account()) .await - .unwrap(); + .with_context(|| { + format!( + "failed getting expected nonce for {:?}", + tx.initiator_account() + ) + })?; if tx.common_data.nonce.0 < expected_nonce { Err(SubmitTxError::NonceIsTooLow( @@ -504,12 +514,7 @@ impl TxSender { } async fn get_expected_nonce(&self, initiator_account: Address) -> anyhow::Result { - let mut storage = self - .0 - .replica_connection_pool - .access_storage_tagged("api") - .await?; - + let mut storage = self.acquire_replica_connection().await?; let latest_block_number = storage .blocks_dal() .get_sealed_miniblock_number() @@ -538,15 +543,12 @@ impl TxSender { async fn validate_enough_balance(&self, tx: &L2Tx) -> Result<(), SubmitTxError> { let paymaster = tx.common_data.paymaster_params.paymaster; - - // The paymaster is expected to pay for the tx, - // whatever balance the user has, we don't care. + // The paymaster is expected to pay for the tx; whatever balance the user has, we don't care. if paymaster != Address::default() { return Ok(()); } - let balance = self.get_balance(&tx.common_data.initiator_address).await; - + let balance = self.get_balance(&tx.common_data.initiator_address).await?; // Estimate the minimum fee price user will agree to. let gas_price = tx.common_data.fee.max_fee_per_gas; let max_fee = tx.common_data.fee.gas_limit * gas_price; @@ -563,21 +565,16 @@ impl TxSender { } } - async fn get_balance(&self, initiator_address: &H160) -> U256 { + async fn get_balance(&self, initiator_address: &H160) -> anyhow::Result { let eth_balance_key = storage_key_for_eth_balance(initiator_address); - let balance = self - .0 - .replica_connection_pool - .access_storage_tagged("api") - .await - .unwrap() + .acquire_replica_connection() + .await? .storage_dal() .get_by_key(ð_balance_key) - .await + .await? .unwrap_or_default(); - - h256_to_u256(balance) + Ok(h256_to_u256(balance)) } /// Given the gas_limit to be used for the body of the transaction, @@ -593,7 +590,7 @@ impl TxSender { block_args: BlockArgs, base_fee: u64, vm_version: VmVersion, - ) -> (VmExecutionResultAndLogs, TransactionExecutionMetrics) { + ) -> anyhow::Result<(VmExecutionResultAndLogs, TransactionExecutionMetrics)> { let gas_limit_with_overhead = tx_gas_limit + derive_overhead( tx_gas_limit, @@ -627,7 +624,7 @@ impl TxSender { let vm_execution_cache_misses_limit = self.0.sender_config.vm_execution_cache_misses_limit; let execution_args = TxExecutionArgs::for_gas_estimate(vm_execution_cache_misses_limit, &tx, base_fee); - let (exec_result, tx_metrics, _) = self + let execution_output = self .0 .executor .execute_tx_in_sandbox( @@ -640,9 +637,8 @@ impl TxSender { block_args, vec![], ) - .await; - - (exec_result, tx_metrics) + .await?; + Ok((execution_output.vm, execution_output.metrics)) } fn shared_args_for_gas_estimate(&self, fee_input: BatchFeeInput) -> TxSharedArgs { @@ -667,19 +663,13 @@ impl TxSender { ) -> Result { let estimation_started_at = Instant::now(); - let mut connection = self - .0 - .replica_connection_pool - .access_storage_tagged("api") - .await - .unwrap(); - let block_args = BlockArgs::pending(&mut connection).await; + let mut connection = self.acquire_replica_connection().await?; + let block_args = BlockArgs::pending(&mut connection).await?; let protocol_version = block_args .resolve_block_info(&mut connection) .await - .unwrap() + .with_context(|| format!("failed resolving block info for {block_args:?}"))? .protocol_version; - drop(connection); let fee_input = { @@ -715,23 +705,25 @@ impl TxSender { } let hashed_key = get_code_key(&tx.initiator_account()); - // if the default account does not have enough funds - // for transferring tx.value, without taking into account the fee, - // there is no sense to estimate the fee + // If the default account does not have enough funds for transferring `tx.value`, without taking into account the fee, + // there is no sense to estimate the fee. let account_code_hash = self - .0 - .replica_connection_pool - .access_storage_tagged("api") - .await - .unwrap() + .acquire_replica_connection() + .await? .storage_dal() .get_by_key(&hashed_key) .await + .with_context(|| { + format!( + "failed getting code hash for account {:?}", + tx.initiator_account() + ) + })? .unwrap_or_default(); if !tx.is_l1() && account_code_hash == H256::zero() - && tx.execute.value > self.get_balance(&tx.initiator_account()).await + && tx.execute.value > self.get_balance(&tx.initiator_account()).await? { tracing::info!( "fee estimation failed on validation step. @@ -768,7 +760,7 @@ impl TxSender { tx.execute.factory_deps.as_deref().unwrap_or_default(), self.storage_caches(), ) - .await; + .await?; if pubdata_for_factory_deps > MAX_PUBDATA_PER_BLOCK { return Err(SubmitTxError::Unexecutable( @@ -801,7 +793,7 @@ impl TxSender { // gas limit will make the transaction successful let iteration_started_at = Instant::now(); let try_gas_limit = gas_for_bytecodes_pubdata + mid; - let (result, _execution_metrics) = self + let (result, _) = self .estimate_gas_step( vm_permit.clone(), tx.clone(), @@ -812,7 +804,8 @@ impl TxSender { base_fee, protocol_version.into(), ) - .await; + .await + .context("estimate_gas step failed")?; if result.result.is_failed() { lower_bound = mid + 1; @@ -851,7 +844,8 @@ impl TxSender { base_fee, protocol_version.into(), ) - .await; + .await + .context("final estimate_gas step failed")?; result.into_api_call_result()?; self.ensure_tx_executable(tx.clone(), &tx_metrics, false)?; @@ -919,22 +913,17 @@ impl TxSender { vm_execution_cache_misses_limit, vec![], ) - .await + .await? .into_api_call_result() } - pub async fn gas_price(&self) -> u64 { - let mut connection = self - .0 - .replica_connection_pool - .access_storage_tagged("api") - .await - .unwrap(); - let block_args = BlockArgs::pending(&mut connection).await; + pub async fn gas_price(&self) -> anyhow::Result { + let mut connection = self.acquire_replica_connection().await?; + let block_args = BlockArgs::pending(&mut connection).await?; let protocol_version = block_args .resolve_block_info(&mut connection) .await - .unwrap() + .with_context(|| format!("failed resolving block info for {block_args:?}"))? .protocol_version; drop(connection); @@ -949,7 +938,7 @@ impl TxSender { .await, protocol_version.into(), ); - base_fee + Ok(base_fee) } fn ensure_tx_executable( diff --git a/core/lib/zksync_core/src/api_server/tx_sender/result.rs b/core/lib/zksync_core/src/api_server/tx_sender/result.rs index a8183c5e8ac4..6224ac849a43 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/result.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/result.rs @@ -1,12 +1,10 @@ -use multivm::{ - interface::{ExecutionResult, VmExecutionResultAndLogs}, - tracers::validator::ValidationError, -}; +use multivm::interface::{ExecutionResult, VmExecutionResultAndLogs}; use thiserror::Error; use zksync_types::{l2::error::TxCheckError, U256}; -use crate::api_server::execution_sandbox::SandboxExecutionError; +use crate::api_server::execution_sandbox::{SandboxExecutionError, ValidationError}; +/// Errors that con occur submitting a transaction or estimating gas for its execution. #[derive(Debug, Error)] pub enum SubmitTxError { #[error("nonce too high. allowed nonce range: {0} - {1}, actual: {2}")] @@ -71,6 +69,9 @@ pub enum SubmitTxError { ProxyError(#[from] zksync_web3_decl::jsonrpsee::core::ClientError), #[error("not enough gas to publish compressed bytecodes")] FailedToPublishCompressedBytecodes, + /// Catch-all internal error (e.g., database error) that should not be exposed to the caller. + #[error("internal error")] + Internal(#[from] anyhow::Error), } impl SubmitTxError { @@ -102,6 +103,7 @@ impl SubmitTxError { Self::IntrinsicGas => "intrinsic-gas", Self::ProxyError(_) => "proxy-error", Self::FailedToPublishCompressedBytecodes => "failed-to-publish-compressed-bytecodes", + Self::Internal(_) => "internal", } } @@ -145,7 +147,10 @@ impl From for SubmitTxError { impl From for SubmitTxError { fn from(err: ValidationError) -> Self { - Self::ValidationFailed(err.to_string()) + match err { + ValidationError::Internal(err) => Self::Internal(err), + ValidationError::Vm(err) => Self::ValidationFailed(err.to_string()), + } } } diff --git a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs index c8fbc726e2f0..b810c9fb5a2b 100644 --- a/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/backend_jsonrpsee/mod.rs @@ -9,12 +9,16 @@ use zksync_web3_decl::{ jsonrpsee::types::{error::ErrorCode, ErrorObjectOwned}, }; -use crate::api_server::web3::metrics::API_METRICS; +use crate::api_server::{tx_sender::SubmitTxError, web3::metrics::API_METRICS}; pub mod batch_limiter_middleware; pub mod namespaces; -pub fn into_jsrpc_error(err: Web3Error) -> ErrorObjectOwned { +pub(crate) fn into_jsrpc_error(err: Web3Error) -> ErrorObjectOwned { + let data = match &err { + Web3Error::SubmitTransactionError(_, data) => Some(format!("0x{}", hex::encode(data))), + _ => None, + }; ErrorObjectOwned::owned( match err { Web3Error::InternalError | Web3Error::NotImplemented => ErrorCode::InternalError.code(), @@ -35,17 +39,25 @@ pub fn into_jsrpc_error(err: Web3Error) -> ErrorObjectOwned { Web3Error::TreeApiUnavailable => 6, }, match err { - Web3Error::SubmitTransactionError(ref message, _) => message.clone(), + Web3Error::SubmitTransactionError(message, _) => message, _ => err.to_string(), }, - match err { - Web3Error::SubmitTransactionError(_, data) => Some(format!("0x{}", hex::encode(data))), - _ => None, - }, + data, ) } -pub fn internal_error(method_name: &'static str, error: impl fmt::Display) -> Web3Error { +impl SubmitTxError { + /// Maps this error into [`Web3Error`]. If this is an internal error, error details are logged, but are not returned + /// to the client. + pub(crate) fn into_web3_error(self, method_name: &'static str) -> Web3Error { + match self { + Self::Internal(err) => internal_error(method_name, err), + _ => Web3Error::SubmitTransactionError(self.to_string(), self.data()), + } + } +} + +pub(crate) fn internal_error(method_name: &'static str, error: impl fmt::Display) -> Web3Error { tracing::error!("Internal error in method {method_name}: {error}"); API_METRICS.web3_internal_errors[&method_name].inc(); Web3Error::InternalError diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/debug.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/debug.rs index 4014296f5568..d8a148a46aab 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/debug.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/debug.rs @@ -176,7 +176,8 @@ impl DebugNamespace { self.sender_config().vm_execution_cache_misses_limit, custom_tracers, ) - .await; + .await + .map_err(|err| internal_error(METHOD_NAME, err))?; let (output, revert_reason) = match result.result { ExecutionResult::Success { output, .. } => (output, None), diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs index da6df61e1e89..70b445cd8fc4 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/eth.rs @@ -85,8 +85,7 @@ impl EthNamespace { let tx = L2Tx::from_request(request.into(), self.state.api_config.max_tx_size)?; let call_result = self.state.tx_sender.eth_call(block_args, tx).await; - let res_bytes = call_result - .map_err(|err| Web3Error::SubmitTransactionError(err.to_string(), err.data()))?; + let res_bytes = call_result.map_err(|err| err.into_web3_error(METHOD_NAME))?; let block_diff = self .state @@ -134,7 +133,9 @@ impl EthNamespace { // When we're estimating fee, we are trying to deduce values related to fee, so we should // not consider provided ones. - tx.common_data.fee.max_fee_per_gas = self.state.tx_sender.gas_price().await.into(); + let gas_price = self.state.tx_sender.gas_price().await; + let gas_price = gas_price.map_err(|err| internal_error(METHOD_NAME, err))?; + tx.common_data.fee.max_fee_per_gas = gas_price.into(); tx.common_data.fee.max_priority_fee_per_gas = tx.common_data.fee.max_fee_per_gas; // Modify the l1 gas price with the scale factor @@ -147,8 +148,7 @@ impl EthNamespace { .tx_sender .get_txs_fee_in_wei(tx.into(), scale_factor, acceptable_overestimation) .await - .map_err(|err| Web3Error::SubmitTransactionError(err.to_string(), err.data()))?; - + .map_err(|err| err.into_web3_error(METHOD_NAME))?; method_latency.observe(); Ok(fee.gas_limit) } @@ -158,9 +158,10 @@ impl EthNamespace { const METHOD_NAME: &str = "gas_price"; let method_latency = API_METRICS.start_call(METHOD_NAME); - let price = self.state.tx_sender.gas_price().await; + let gas_price = self.state.tx_sender.gas_price().await; + let gas_price = gas_price.map_err(|err| internal_error(METHOD_NAME, err))?; method_latency.observe(); - Ok(price.into()) + Ok(gas_price.into()) } #[tracing::instrument(skip(self))] @@ -643,7 +644,7 @@ impl EthNamespace { let submit_result = submit_result.map(|_| hash).map_err(|err| { tracing::debug!("Send raw transaction error: {err}"); API_METRICS.submit_tx_error[&err.prom_error_code()].inc(); - Web3Error::SubmitTransactionError(err.to_string(), err.data()) + err.into_web3_error(METHOD_NAME) }); method_latency.observe(); diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs index 8bb610bbb9a7..86f879a87373 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs @@ -76,7 +76,7 @@ impl ZksNamespace { tx.common_data.fee.max_priority_fee_per_gas = 0u64.into(); tx.common_data.fee.gas_per_pubdata_limit = U256::from(DEFAULT_L2_TX_GAS_PER_PUBDATA_BYTE); - let fee = self.estimate_fee(tx.into()).await?; + let fee = self.estimate_fee(tx.into(), METHOD_NAME).await?; method_latency.observe(); Ok(fee) } @@ -102,24 +102,25 @@ impl ZksNamespace { .try_into() .map_err(Web3Error::SerializationError)?; - let fee = self.estimate_fee(tx.into()).await?; + let fee = self.estimate_fee(tx.into(), METHOD_NAME).await?; method_latency.observe(); Ok(fee.gas_limit) } - async fn estimate_fee(&self, tx: Transaction) -> Result { + async fn estimate_fee( + &self, + tx: Transaction, + method_name: &'static str, + ) -> Result { let scale_factor = self.state.api_config.estimate_gas_scale_factor; let acceptable_overestimation = self.state.api_config.estimate_gas_acceptable_overestimation; - let fee = self - .state + self.state .tx_sender .get_txs_fee_in_wei(tx, scale_factor, acceptable_overestimation) .await - .map_err(|err| Web3Error::SubmitTransactionError(err.to_string(), err.data()))?; - - Ok(fee) + .map_err(|err| err.into_web3_error(method_name)) } #[tracing::instrument(skip(self))] diff --git a/core/lib/zksync_core/src/eth_watch/tests.rs b/core/lib/zksync_core/src/eth_watch/tests.rs index d606a15107ca..76b170ba365e 100644 --- a/core/lib/zksync_core/src/eth_watch/tests.rs +++ b/core/lib/zksync_core/src/eth_watch/tests.rs @@ -524,12 +524,13 @@ async fn test_overlapping_batches() { } async fn get_all_db_txs(storage: &mut StorageProcessor<'_>) -> Vec { - storage.transactions_dal().reset_mempool().await; - storage + storage.transactions_dal().reset_mempool().await.unwrap(); + let (txs, _) = storage .transactions_dal() - .sync_mempool(vec![], vec![], 0, 0, 1000) + .sync_mempool(&[], &[], 0, 0, 1000) .await - .0 + .unwrap(); + txs } fn tx_into_log(tx: L1Tx) -> Log { diff --git a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs b/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs index e6eaf6354707..df21c890e547 100644 --- a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs +++ b/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs @@ -462,6 +462,7 @@ impl MiniblockSealCommand { CURRENT_VIRTUAL_BLOCK_INFO_POSITION, )) .await + .expect("failed getting virtual block info from VM state") .unwrap_or_default(); let (current_l2_virtual_block_number, _) = unpack_block_info(h256_to_u256(current_l2_virtual_block_info)); diff --git a/core/lib/zksync_core/src/state_keeper/mempool_actor.rs b/core/lib/zksync_core/src/state_keeper/mempool_actor.rs index 070783e51af8..7c57be2450de 100644 --- a/core/lib/zksync_core/src/state_keeper/mempool_actor.rs +++ b/core/lib/zksync_core/src/state_keeper/mempool_actor.rs @@ -1,5 +1,6 @@ use std::{sync::Arc, time::Duration}; +use anyhow::Context as _; use multivm::utils::derive_base_fee_and_gas_per_pubdata; use tokio::sync::watch; use zksync_config::configs::chain::MempoolConfig; @@ -57,15 +58,20 @@ impl MempoolFetcher { stop_receiver: watch::Receiver, ) -> anyhow::Result<()> { { - let mut storage = pool.access_storage_tagged("state_keeper").await.unwrap(); + let mut storage = pool.access_storage_tagged("state_keeper").await?; if remove_stuck_txs { let removed_txs = storage .transactions_dal() .remove_stuck_txs(stuck_tx_timeout) - .await; - tracing::info!("Number of stuck txs was removed: {}", removed_txs); + .await + .context("failed removing stuck transactions")?; + tracing::info!("Number of stuck txs was removed: {removed_txs}"); } - storage.transactions_dal().reset_mempool().await; + storage + .transactions_dal() + .reset_mempool() + .await + .context("failed resetting mempool")?; } loop { @@ -74,14 +80,16 @@ impl MempoolFetcher { break; } let latency = KEEPER_METRICS.mempool_sync.start(); - let mut storage = pool.access_storage_tagged("state_keeper").await.unwrap(); + let mut storage = pool.access_storage_tagged("state_keeper").await?; let mempool_info = self.mempool.get_mempool_info(); - let latest_miniblock = BlockArgs::pending(&mut storage).await; + let latest_miniblock = BlockArgs::pending(&mut storage) + .await + .context("failed obtaining latest miniblock")?; let protocol_version = latest_miniblock .resolve_block_info(&mut storage) .await - .unwrap() + .with_context(|| format!("failed resolving block info for {latest_miniblock:?}"))? .protocol_version; let l2_tx_filter = l2_tx_filter( @@ -93,13 +101,16 @@ impl MempoolFetcher { let (transactions, nonces) = storage .transactions_dal() .sync_mempool( - mempool_info.stashed_accounts, - mempool_info.purged_accounts, + &mempool_info.stashed_accounts, + &mempool_info.purged_accounts, l2_tx_filter.gas_per_pubdata, l2_tx_filter.fee_per_gas, self.sync_batch_size, ) - .await; + .await + .context("failed syncing mempool")?; + drop(storage); + let all_transactions_loaded = transactions.len() < self.sync_batch_size; self.mempool.insert(transactions, nonces); latency.observe();