Skip to content

Commit

Permalink
refactor(state_keeper): Abstract ConditionalSealer (matter-labs#803)
Browse files Browse the repository at this point in the history
## What ❔

Defines a new trait: `ConditionalSealer` and provides two
implementations: `SequencerSealer` and `NoopSealer`.

## Why ❔

- Prerequisite for ZK Stack configuration system.
- Leaves a single constructor for the state keeper.
- Removes `StateKeeperConfig` use from `TxSender`.
- Potentially makes it possible to create `ConditionalSealer` that
contains some kind of sealing logic but does not rely on
`StateKeeperConfig`.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
- [ ] Spellcheck has been run via `cargo spellcheck
--cfg=./spellcheck/era.cfg --code 1`.
  • Loading branch information
popzxc authored Jan 3, 2024
1 parent e19d654 commit cbe8c7f
Show file tree
Hide file tree
Showing 12 changed files with 151 additions and 104 deletions.
11 changes: 8 additions & 3 deletions core/bin/external_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use zksync_core::{
reorg_detector::ReorgDetector,
setup_sigint_handler,
state_keeper::{
L1BatchExecutorBuilder, MainBatchExecutorBuilder, MiniblockSealer, MiniblockSealerHandle,
ZkSyncStateKeeper,
seal_criteria::NoopSealer, L1BatchExecutorBuilder, MainBatchExecutorBuilder,
MiniblockSealer, MiniblockSealerHandle, ZkSyncStateKeeper,
},
sync_layer::{
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, fetcher::FetcherCursor,
Expand Down Expand Up @@ -92,7 +92,12 @@ async fn build_state_keeper(
)
.await;

ZkSyncStateKeeper::without_sealer(stop_receiver, Box::new(io), batch_executor_base)
ZkSyncStateKeeper::new(
stop_receiver,
Box::new(io),
batch_executor_base,
Box::new(NoopSealer),
)
}

async fn init_tasks(
Expand Down
43 changes: 19 additions & 24 deletions core/lib/zksync_core/src/api_server/tx_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{
},
l1_gas_price::L1GasPriceProvider,
metrics::{TxStage, APP_METRICS},
state_keeper::seal_criteria::{ConditionalSealer, SealData},
state_keeper::seal_criteria::{ConditionalSealer, NoopSealer, SealData},
};

mod proxy;
Expand Down Expand Up @@ -139,9 +139,8 @@ pub struct TxSenderBuilder {
master_connection_pool: Option<ConnectionPool>,
/// Proxy to submit transactions to the network. If not set, `master_connection_pool` must be set.
proxy: Option<TxProxy>,
/// Actual state keeper configuration, required for tx verification.
/// If not set, transactions would not be checked against seal criteria.
state_keeper_config: Option<StateKeeperConfig>,
/// Batch sealer used to check whether transaction can be executed by the sequencer.
sealer: Option<Arc<dyn ConditionalSealer>>,
}

impl TxSenderBuilder {
Expand All @@ -151,10 +150,15 @@ impl TxSenderBuilder {
replica_connection_pool,
master_connection_pool: None,
proxy: None,
state_keeper_config: None,
sealer: None,
}
}

pub fn with_sealer(mut self, sealer: Arc<dyn ConditionalSealer>) -> Self {
self.sealer = Some(sealer);
self
}

pub fn with_tx_proxy(mut self, main_node_url: &str) -> Self {
self.proxy = Some(TxProxy::new(main_node_url));
self
Expand All @@ -165,11 +169,6 @@ impl TxSenderBuilder {
self
}

pub fn with_state_keeper_config(mut self, state_keeper_config: StateKeeperConfig) -> Self {
self.state_keeper_config = Some(state_keeper_config);
self
}

pub async fn build(
self,
l1_gas_price_source: Arc<dyn L1GasPriceProvider>,
Expand All @@ -182,16 +181,19 @@ impl TxSenderBuilder {
"Either master connection pool or proxy must be set"
);

// Use noop sealer if no sealer was explicitly provided.
let sealer = self.sealer.unwrap_or_else(|| Arc::new(NoopSealer));

TxSender(Arc::new(TxSenderInner {
sender_config: self.config,
master_connection_pool: self.master_connection_pool,
replica_connection_pool: self.replica_connection_pool,
l1_gas_price_source,
api_contracts,
proxy: self.proxy,
state_keeper_config: self.state_keeper_config,
vm_concurrency_limiter,
storage_caches,
sealer,
}))
}
}
Expand Down Expand Up @@ -241,14 +243,12 @@ pub struct TxSenderInner {
pub(super) api_contracts: ApiContracts,
/// Optional transaction proxy to be used for transaction submission.
pub(super) proxy: Option<TxProxy>,
/// An up-to-date version of the state keeper config.
/// This field may be omitted on the external node, since the configuration may change unexpectedly.
/// If this field is set to `None`, `TxSender` will assume that any transaction is executable.
state_keeper_config: Option<StateKeeperConfig>,
/// Used to limit the amount of VMs that can be executed simultaneously.
pub(super) vm_concurrency_limiter: Arc<VmConcurrencyLimiter>,
// Caches used in VM execution.
storage_caches: PostgresStorageCaches,
/// Batch sealer used to check whether transaction can be executed by the sequencer.
sealer: Arc<dyn ConditionalSealer>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -850,13 +850,6 @@ impl TxSender {
tx_metrics: &TransactionExecutionMetrics,
log_message: bool,
) -> Result<(), SubmitTxError> {
let Some(sk_config) = &self.0.state_keeper_config else {
// No config provided, so we can't check if transaction satisfies the seal criteria.
// We assume that it's executable, and if it's not, it will be caught by the main server
// (where this check is always performed).
return Ok(());
};

// Hash is not computable for the provided `transaction` during gas estimation (it doesn't have
// its input data set). Since we don't log a hash in this case anyway, we just use a dummy value.
let tx_hash = if log_message {
Expand All @@ -870,8 +863,10 @@ impl TxSender {
// still reject them as it's not.
let protocol_version = ProtocolVersionId::latest();
let seal_data = SealData::for_transaction(transaction, tx_metrics, protocol_version);
if let Some(reason) =
ConditionalSealer::find_unexecutable_reason(sk_config, &seal_data, protocol_version)
if let Some(reason) = self
.0
.sealer
.find_unexecutable_reason(&seal_data, protocol_version)
{
let message = format!(
"Tx is Unexecutable because of {reason}; inputs for decision: {seal_data:?}"
Expand Down
4 changes: 3 additions & 1 deletion core/lib/zksync_core/src/consensus/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use zksync_types::{
use crate::{
genesis::{ensure_genesis_state, GenesisParams},
state_keeper::{
seal_criteria::NoopSealer,
tests::{create_l1_batch_metadata, create_l2_transaction, MockBatchExecutorBuilder},
MiniblockSealer, ZkSyncStateKeeper,
},
Expand Down Expand Up @@ -355,10 +356,11 @@ impl StateKeeperRunner {
s.spawn_bg(miniblock_sealer.run());
s.spawn_bg(run_mock_metadata_calculator(ctx, pool.clone()));
s.spawn_bg(
ZkSyncStateKeeper::without_sealer(
ZkSyncStateKeeper::new(
stop_receiver,
Box::new(io),
Box::new(MockBatchExecutorBuilder),
Box::new(NoopSealer),
)
.run(),
);
Expand Down
7 changes: 5 additions & 2 deletions core/lib/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ use crate::{
MetadataCalculator, MetadataCalculatorConfig, MetadataCalculatorModeConfig,
},
metrics::{InitStage, APP_METRICS},
state_keeper::{create_state_keeper, MempoolFetcher, MempoolGuard, MiniblockSealer},
state_keeper::{
create_state_keeper, MempoolFetcher, MempoolGuard, MiniblockSealer, SequencerSealer,
},
};

pub mod api_server;
Expand Down Expand Up @@ -1032,9 +1034,10 @@ async fn build_tx_sender(
l1_gas_price_provider: Arc<dyn L1GasPriceProvider>,
storage_caches: PostgresStorageCaches,
) -> (TxSender, VmConcurrencyBarrier) {
let sequencer_sealer = SequencerSealer::new(state_keeper_config.clone());
let tx_sender_builder = TxSenderBuilder::new(tx_sender_config.clone(), replica_pool)
.with_main_connection_pool(master_pool)
.with_state_keeper_config(state_keeper_config.clone());
.with_sealer(Arc::new(sequencer_sealer));

let max_concurrency = web3_json_config.vm_concurrency_limit();
let (vm_concurrency_limiter, vm_barrier) = VmConcurrencyLimiter::new(max_concurrency);
Expand Down
39 changes: 11 additions & 28 deletions core/lib/zksync_core/src/state_keeper/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,34 +58,21 @@ pub struct ZkSyncStateKeeper {
stop_receiver: watch::Receiver<bool>,
io: Box<dyn StateKeeperIO>,
batch_executor_base: Box<dyn L1BatchExecutorBuilder>,
sealer: Option<ConditionalSealer>,
sealer: Box<dyn ConditionalSealer>,
}

impl ZkSyncStateKeeper {
pub fn new(
stop_receiver: watch::Receiver<bool>,
io: Box<dyn StateKeeperIO>,
batch_executor_base: Box<dyn L1BatchExecutorBuilder>,
sealer: ConditionalSealer,
sealer: Box<dyn ConditionalSealer>,
) -> Self {
Self {
stop_receiver,
io,
batch_executor_base,
sealer: Some(sealer),
}
}

pub fn without_sealer(
stop_receiver: watch::Receiver<bool>,
io: Box<dyn StateKeeperIO>,
batch_executor_base: Box<dyn L1BatchExecutorBuilder>,
) -> Self {
Self {
stop_receiver,
io,
batch_executor_base,
sealer: None,
sealer,
}
}

Expand Down Expand Up @@ -651,18 +638,14 @@ impl ZkSyncStateKeeper {
writes_metrics: block_writes_metrics,
};

if let Some(sealer) = &self.sealer {
sealer.should_seal_l1_batch(
self.io.current_l1_batch_number().0,
updates_manager.batch_timestamp() as u128 * 1_000,
updates_manager.pending_executed_transactions_len() + 1,
&block_data,
&tx_data,
updates_manager.protocol_version(),
)
} else {
SealResolution::NoSeal
}
self.sealer.should_seal_l1_batch(
self.io.current_l1_batch_number().0,
updates_manager.batch_timestamp() as u128 * 1_000,
updates_manager.pending_executed_transactions_len() + 1,
&block_data,
&tx_data,
updates_manager.protocol_version(),
)
}
};
(resolution, exec_result)
Expand Down
8 changes: 4 additions & 4 deletions core/lib/zksync_core/src/state_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use self::{
keeper::ZkSyncStateKeeper,
};
pub(crate) use self::{
mempool_actor::MempoolFetcher, seal_criteria::ConditionalSealer, types::MempoolGuard,
mempool_actor::MempoolFetcher, seal_criteria::SequencerSealer, types::MempoolGuard,
};
use crate::l1_gas_price::L1GasPriceProvider;

Expand All @@ -26,7 +26,7 @@ pub(crate) mod io;
mod keeper;
mod mempool_actor;
pub(crate) mod metrics;
pub(crate) mod seal_criteria;
pub mod seal_criteria;
#[cfg(test)]
pub(crate) mod tests;
pub(crate) mod types;
Expand Down Expand Up @@ -76,11 +76,11 @@ pub(crate) async fn create_state_keeper(
)
.await;

let sealer = ConditionalSealer::new(state_keeper_config);
let sealer = SequencerSealer::new(state_keeper_config);
ZkSyncStateKeeper::new(
stop_receiver,
Box::new(io),
Box::new(batch_executor_base),
sealer,
Box::new(sealer),
)
}
Loading

0 comments on commit cbe8c7f

Please sign in to comment.