Skip to content

Commit

Permalink
fix(en): Fix pending transactions subscription (matter-labs#1342)
Browse files Browse the repository at this point in the history
## What ❔

- Fixes `pendingTransactions` subscription notifier logic on EN. This
logic implicitly relies on the "received at" timestamp of transactions
being causally ordered (i.e., transactions persisted into the storage
later should have a later timestamp), but this wasn't the case for ENs –
ENs would copy this timestamp from the main node. This is fixed by
overriding the "received at" timestamp on ENs before saving a
transaction to the storage.
- Correspondingly, the "received at" timestamp is removed from the
consensus payload because consensus wouldn't agree on it.
- Disables reporting transaction inclusion latencies for ENs, which is
meaningless and can lead to spammy logs.

## Why ❔

- Bugs bad.
- Spammy logs bad.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli authored Mar 11, 2024
1 parent ea0956f commit a040001
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 80 deletions.
4 changes: 1 addition & 3 deletions core/lib/dal/src/models/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,7 @@ impl ProtoRepr for proto::Transaction {
false => Some(execute.factory_deps.clone()),
},
},
received_timestamp_ms: *required(&self.received_timestamp_ms)
.context("received_timestamp_ms")?,
received_timestamp_ms: 0, // This timestamp is local to the node
raw_bytes: self.raw_bytes.as_ref().map(|x| x.clone().into()),
})
}
Expand Down Expand Up @@ -365,7 +364,6 @@ impl ProtoRepr for proto::Transaction {
Self {
common_data: Some(common_data),
execute: Some(execute),
received_timestamp_ms: Some(this.received_timestamp_ms),
raw_bytes: this.raw_bytes.as_ref().map(|inner| inner.0.clone()),
}
}
Expand Down
3 changes: 1 addition & 2 deletions core/lib/dal/src/models/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,15 @@ where
pub struct ComparableTransaction {
common_data: ExecuteTransactionCommon,
execute: Execute,
received_timestamp_ms: u64,
raw_bytes: Option<Bytes>,
// `received_timestamp_ms` is intentionally not included because it's local
}

impl From<Transaction> for ComparableTransaction {
fn from(tx: Transaction) -> Self {
Self {
common_data: tx.common_data,
execute: tx.execute,
received_timestamp_ms: tx.received_timestamp_ms,
raw_bytes: tx.raw_bytes,
}
}
Expand Down
1 change: 0 additions & 1 deletion core/lib/dal/src/models/proto/mod.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ message Transaction {
ProtocolUpgradeTxCommonData protocol_upgrade = 3;
}
optional Execute execute = 4; // required
optional uint64 received_timestamp_ms = 5; // required
optional bytes raw_bytes = 6; // optional
}

Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/api_server/web3/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ mod snapshots;
mod vm;
mod ws;

const TEST_TIMEOUT: Duration = Duration::from_secs(10);
const TEST_TIMEOUT: Duration = Duration::from_secs(20);
const POLL_INTERVAL: Duration = Duration::from_millis(50);

impl ApiServerHandles {
Expand Down
11 changes: 9 additions & 2 deletions core/lib/zksync_core/src/consensus/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ mod testonly;

use crate::{
state_keeper::io::common::IoCursor,
sync_layer::{fetcher::FetchedBlock, sync_action::ActionQueueSender},
sync_layer::{
fetcher::{FetchedBlock, FetchedTransaction},
sync_action::ActionQueueSender,
},
};

/// Context-aware `zksync_dal::StorageProcessor` wrapper.
Expand Down Expand Up @@ -324,7 +327,11 @@ impl PersistentBlockStore for BlockStore {
fair_pubdata_price: payload.fair_pubdata_price,
virtual_blocks: payload.virtual_blocks,
operator_address: payload.operator_address,
transactions: payload.transactions,
transactions: payload
.transactions
.into_iter()
.map(FetchedTransaction::new)
.collect(),
};
cursor.advance(block).await.context("cursor.advance()")?;
}
Expand Down
3 changes: 2 additions & 1 deletion core/lib/zksync_core/src/consensus/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
ZkSyncStateKeeper,
},
sync_layer::{
fetcher::FetchedTransaction,
sync_action::{ActionQueue, ActionQueueSender, SyncAction},
ExternalIO, MainNodeClient, SyncState,
},
Expand Down Expand Up @@ -269,7 +270,7 @@ impl StateKeeper {
let mut actions = vec![self.open_block()];
for _ in 0..transactions {
let tx = create_l2_transaction(self.fee_per_gas, self.gas_per_pubdata);
actions.push(SyncAction::Tx(Box::new(tx.into())));
actions.push(FetchedTransaction::new(tx.into()).into());
}
actions.push(SyncAction::SealMiniblock);
self.actions_sender.push_actions(actions).await;
Expand Down
116 changes: 71 additions & 45 deletions core/lib/zksync_core/src/state_keeper/io/seal_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::time::{Duration, Instant};

use chrono::Utc;
use itertools::Itertools;
use multivm::{
interface::{FinishedL1Batch, L1BatchEnv},
Expand All @@ -13,6 +12,7 @@ use zksync_dal::StorageProcessor;
use zksync_types::{
block::{unpack_block_info, L1BatchHeader, MiniblockHeader},
event::{extract_added_tokens, extract_long_l2_to_l1_messages},
helpers::unix_timestamp_ms,
l1::L1Tx,
l2::L2Tx,
l2_to_l1_log::{SystemL2ToL1Log, UserL2ToL1Log},
Expand All @@ -27,8 +27,7 @@ use zksync_types::{
MiniblockNumber, ProtocolVersionId, StorageKey, StorageLog, StorageLogQuery, Transaction,
VmEvent, CURRENT_VIRTUAL_BLOCK_INFO_POSITION, H256, SYSTEM_CONTEXT_ADDRESS,
};
// TODO (SMA-1206): use seconds instead of milliseconds.
use zksync_utils::{h256_to_u256, time::millis_since_epoch, u256_to_h256};
use zksync_utils::{h256_to_u256, u256_to_h256};

use crate::{
metrics::{BlockStage, MiniblockStage, APP_METRICS},
Expand Down Expand Up @@ -243,7 +242,7 @@ impl UpdatesManager {
&self,
started_at: Instant,
current_l1_batch_number: L1BatchNumber,
block_timestamp: u64,
batch_timestamp: u64,
writes_metrics: &DeduplicatedWritesMetrics,
) {
L1_BATCH_METRICS
Expand All @@ -257,7 +256,7 @@ impl UpdatesManager {
.observe(self.l1_batch.executed_transactions.len());

let l1_batch_latency =
((millis_since_epoch() - block_timestamp as u128 * 1_000) as f64) / 1_000.0;
unix_timestamp_ms().saturating_sub(batch_timestamp * 1_000) as f64 / 1_000.0;
APP_METRICS.block_latency[&BlockStage::Sealed]
.observe(Duration::from_secs_f64(l1_batch_latency));

Expand All @@ -272,45 +271,56 @@ impl MiniblockSealCommand {
self.seal_inner(storage, false).await;
}

/// Seals a miniblock with the given number.
///
/// If `is_fictive` flag is set to true, then it is assumed that we should seal a fictive miniblock
/// with no transactions in it. It is needed because there might be some storage logs / events
/// that are created after the last processed tx in the L1 batch: after the last transaction is processed,
/// the bootloader enters the "tip" phase in which it can still generate events (e.g.,
/// one for sending fees to the operator).
///
/// `l2_erc20_bridge_addr` is required to extract the information on newly added tokens.
async fn seal_inner(&self, storage: &mut StorageProcessor<'_>, is_fictive: bool) {
self.assert_valid_miniblock(is_fictive);

let mut transaction = storage.start_transaction().await.unwrap();
if self.pre_insert_txs {
let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::PreInsertTxs, is_fictive);
for tx in &self.miniblock.executed_transactions {
if let Ok(l1_tx) = L1Tx::try_from(tx.transaction.clone()) {
async fn insert_transactions(&self, transaction: &mut StorageProcessor<'_>) {
for tx_result in &self.miniblock.executed_transactions {
let tx = tx_result.transaction.clone();
match &tx.common_data {
ExecuteTransactionCommon::L1(_) => {
// `unwrap` is safe due to the check above
let l1_tx = L1Tx::try_from(tx).unwrap();
let l1_block_number = L1BlockNumber(l1_tx.common_data.eth_block as u32);
transaction
.transactions_dal()
.insert_transaction_l1(l1_tx, l1_block_number)
.await;
} else if let Ok(l2_tx) = L2Tx::try_from(tx.transaction.clone()) {
}
ExecuteTransactionCommon::L2(_) => {
// `unwrap` is safe due to the check above
let l2_tx = L2Tx::try_from(tx).unwrap();
// Using `Default` for execution metrics should be OK here, since this data is not used on the EN.
transaction
.transactions_dal()
.insert_transaction_l2(l2_tx, Default::default())
.await;
} else if let Ok(protocol_system_upgrade_tx) =
ProtocolUpgradeTx::try_from(tx.transaction.clone())
{
}
ExecuteTransactionCommon::ProtocolUpgrade(_) => {
// `unwrap` is safe due to the check above
let protocol_system_upgrade_tx = ProtocolUpgradeTx::try_from(tx).unwrap();
transaction
.transactions_dal()
.insert_system_transaction(protocol_system_upgrade_tx)
.await;
} else {
unreachable!("Transaction {:?} is neither L1 nor L2", tx.transaction);
}
}
}
}

/// Seals a miniblock with the given number.
///
/// If `is_fictive` flag is set to true, then it is assumed that we should seal a fictive miniblock
/// with no transactions in it. It is needed because there might be some storage logs / events
/// that are created after the last processed tx in the L1 batch: after the last transaction is processed,
/// the bootloader enters the "tip" phase in which it can still generate events (e.g.,
/// one for sending fees to the operator).
///
/// `l2_erc20_bridge_addr` is required to extract the information on newly added tokens.
async fn seal_inner(&self, storage: &mut StorageProcessor<'_>, is_fictive: bool) {
self.assert_valid_miniblock(is_fictive);

let mut transaction = storage.start_transaction().await.unwrap();
if self.pre_insert_txs {
let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::PreInsertTxs, is_fictive);
self.insert_transactions(&mut transaction).await;
progress.observe(Some(self.miniblock.executed_transactions.len()));
}

Expand Down Expand Up @@ -469,22 +479,7 @@ impl MiniblockSealCommand {
progress.observe(None);

let progress = MINIBLOCK_METRICS.start(MiniblockSealStage::ReportTxMetrics, is_fictive);
self.miniblock.executed_transactions.iter().for_each(|tx| {
let inclusion_delay = Duration::from_millis(
Utc::now().timestamp_millis() as u64 - tx.transaction.received_timestamp_ms,
);
if inclusion_delay > Duration::from_secs(600) {
tracing::info!(
tx_hash = hex::encode(tx.hash),
inclusion_delay_ms = inclusion_delay.as_millis(),
received_timestamp_ms = tx.transaction.received_timestamp_ms,
"Transaction spent >10m in mempool before being included in a miniblock"
)
}
KEEPER_METRICS.transaction_inclusion_delay
[&TxExecutionType::from_is_l1(tx.transaction.is_l1())]
.observe(inclusion_delay)
});
self.report_transaction_metrics();
progress.observe(Some(self.miniblock.executed_transactions.len()));

self.report_miniblock_metrics(started_at, current_l2_virtual_block_number);
Expand Down Expand Up @@ -607,6 +602,37 @@ impl MiniblockSealCommand {
})
}

fn report_transaction_metrics(&self) {
const SLOW_INCLUSION_DELAY: Duration = Duration::from_secs(600);

if self.pre_insert_txs {
// This I/O logic is running on the EN. The reported metrics / logs would be meaningless:
//
// - If `received_timestamp_ms` are copied from the main node, they can be far in the past (especially during the initial EN sync).
// We would falsely classify a lot of transactions as slow.
// - If `received_timestamp_ms` are overridden with the current timestamp as when persisting transactions,
// the observed transaction latencies would always be extremely close to zero.
return;
}

for tx in &self.miniblock.executed_transactions {
let inclusion_delay =
unix_timestamp_ms().saturating_sub(tx.transaction.received_timestamp_ms);
let inclusion_delay = Duration::from_millis(inclusion_delay);
if inclusion_delay > SLOW_INCLUSION_DELAY {
tracing::info!(
tx_hash = hex::encode(tx.hash),
inclusion_delay_ms = inclusion_delay.as_millis(),
received_timestamp_ms = tx.transaction.received_timestamp_ms,
"Transaction spent >{SLOW_INCLUSION_DELAY:?} in mempool before being included in a miniblock"
);
}
KEEPER_METRICS.transaction_inclusion_delay
[&TxExecutionType::from_is_l1(tx.transaction.is_l1())]
.observe(inclusion_delay)
}
}

fn report_miniblock_metrics(&self, started_at: Instant, latest_virtual_block_number: u64) {
let miniblock_number = self.miniblock_number;

Expand All @@ -616,7 +642,7 @@ impl MiniblockSealCommand {
MINIBLOCK_METRICS.sealed_time.observe(started_at.elapsed());

let miniblock_latency =
((millis_since_epoch() - self.miniblock.timestamp as u128 * 1_000) as f64) / 1_000.0;
unix_timestamp_ms().saturating_sub(self.miniblock.timestamp * 1_000) as f64 / 1_000.0;
let stage = &MiniblockStage::Sealed;
APP_METRICS.miniblock_latency[stage].observe(Duration::from_secs_f64(miniblock_latency));
APP_METRICS.miniblock_number[stage].set(miniblock_number.0.into());
Expand Down
2 changes: 1 addition & 1 deletion core/lib/zksync_core/src/sync_layer/external_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ impl StateKeeperIO for ExternalIO {
let SyncAction::Tx(tx) = actions.pop_action().unwrap() else {
unreachable!()
};
return Some(*tx);
return Some(Transaction::from(*tx));
}
_ => {
tokio::time::sleep(POLL_INTERVAL).await;
Expand Down
38 changes: 33 additions & 5 deletions core/lib/zksync_core/src/sync_layer/fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::Context as _;
use zksync_dal::StorageProcessor;
use zksync_types::{
api::en::SyncBlock, block::MiniblockHasher, Address, L1BatchNumber, MiniblockNumber,
ProtocolVersionId, H256,
api::en::SyncBlock, block::MiniblockHasher, helpers::unix_timestamp_ms, Address, L1BatchNumber,
MiniblockNumber, ProtocolVersionId, H256,
};

use super::{
Expand All @@ -14,6 +14,31 @@ use crate::{
state_keeper::io::common::IoCursor,
};

/// Same as [`zksync_types::Transaction`], just with additional guarantees that the "received at" timestamp was set locally.
/// We cannot transfer `Transaction`s without these timestamps, because this would break backward compatibility.
#[derive(Debug, Clone)]
pub(crate) struct FetchedTransaction(zksync_types::Transaction);

impl FetchedTransaction {
pub fn new(mut tx: zksync_types::Transaction) -> Self {
// Override the "received at" timestamp for the transaction so that they are causally ordered (i.e., transactions
// with an earlier timestamp are persisted earlier). Without this property, code relying on causal ordering may work incorrectly;
// e.g., `pendingTransactions` subscriptions notifier can skip transactions.
tx.received_timestamp_ms = unix_timestamp_ms();
Self(tx)
}

pub fn hash(&self) -> H256 {
self.0.hash()
}
}

impl From<FetchedTransaction> for zksync_types::Transaction {
fn from(tx: FetchedTransaction) -> Self {
tx.0
}
}

/// Common denominator for blocks fetched by an external node.
#[derive(Debug)]
pub(crate) struct FetchedBlock {
Expand All @@ -28,7 +53,7 @@ pub(crate) struct FetchedBlock {
pub fair_pubdata_price: Option<u64>,
pub virtual_blocks: u32,
pub operator_address: Address,
pub transactions: Vec<zksync_types::Transaction>,
pub transactions: Vec<FetchedTransaction>,
}

impl FetchedBlock {
Expand Down Expand Up @@ -67,7 +92,10 @@ impl TryFrom<SyncBlock> for FetchedBlock {
fair_pubdata_price: block.fair_pubdata_price,
virtual_blocks: block.virtual_blocks.unwrap_or(0),
operator_address: block.operator_address,
transactions,
transactions: transactions
.into_iter()
.map(FetchedTransaction::new)
.collect(),
})
}
}
Expand Down Expand Up @@ -145,7 +173,7 @@ impl IoCursor {

APP_METRICS.processed_txs[&TxStage::added_to_mempool()]
.inc_by(block.transactions.len() as u64);
new_actions.extend(block.transactions.into_iter().map(SyncAction::from));
new_actions.extend(block.transactions.into_iter().map(Into::into));

// Last miniblock of the batch is a "fictive" miniblock and would be replicated locally.
// We don't need to seal it explicitly, so we only put the seal miniblock command if it's not the last miniblock.
Expand Down
Loading

0 comments on commit a040001

Please sign in to comment.