Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Dentosal committed Jan 3, 2025
1 parent c3aee19 commit b3ea605
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 77 deletions.
2 changes: 1 addition & 1 deletion ci_checks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ cargo check -p fuel-core-chain-config --target wasm32-unknown-unknown --no-defau
cargo check -p fuel-core-executor --target wasm32-unknown-unknown --no-default-features --features alloc &&
cargo make check --all-features --locked &&
cargo make check --locked &&
OVERRIDE_CHAIN_CONFIGS=true cargo test --test integration_tests local_node &&
cargo test --test integration_tests local_node &&
cargo nextest run --workspace &&
cargo nextest run --all-features --workspace &&
cargo nextest run -p fuel-core --no-default-features &&
Expand Down
10 changes: 5 additions & 5 deletions crates/services/shared-sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub mod service;
pub struct Client {
endpoints: Endpoints,
topic: [u8; 32],
chain_id: Id,
ss_chain_id: Id,
gas_price: u128,
coin_denom: Denom,
account_prefix: String,
Expand All @@ -61,7 +61,7 @@ impl Client {
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
let account_prefix =
http_api::get_account_prefix(&endpoints.blockchain_rest_api).await?;
let chain_id = http_api::chain_id(&endpoints.blockchain_rest_api)
let ss_chain_id = http_api::chain_id(&endpoints.blockchain_rest_api)
.await?
.parse()
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
Expand All @@ -81,7 +81,7 @@ impl Client {
endpoints,
account_prefix,
coin_denom,
chain_id,
ss_chain_id,
gas_price,
})
}
Expand Down Expand Up @@ -165,7 +165,7 @@ impl Client {
topic: [u8; 32],
data: Bytes,
) -> anyhow::Result<()> {
// We want to estimate the transaction to know hat amount and fee to use.
// We want to estimate the transaction to know what amount and fee to use.
// We use a dummy amount and fee to estimate the gas, and based on the result
// we calculate the actual amount and fee to use in real transaction.
let dummy_amount = Coin {
Expand Down Expand Up @@ -242,7 +242,7 @@ impl Client {
SignerInfo::single_direct(Some(sender_public_key), account.sequence);
let auth_info = signer_info.auth_info(fee);
let sign_doc =
SignDoc::new(&tx_body, &auth_info, &self.chain_id, account.account_number)
SignDoc::new(&tx_body, &auth_info, &self.ss_chain_id, account.account_number)
.map_err(|err| anyhow!("{err:?}"))?;

let sign_doc_bytes = sign_doc
Expand Down
123 changes: 52 additions & 71 deletions crates/services/shared-sequencer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct Task<S> {
account_metadata: Option<AccountMetadata>,
prev_order: Option<u64>,
blobs: Arc<tokio::sync::Mutex<SSBlobs>>,
interval: tokio::time::Interval,
}

impl<S> NonInitializedTask<S> {
Expand Down Expand Up @@ -126,6 +127,7 @@ where
}

Ok(Task {
interval: tokio::time::interval(self.config.block_posting_frequency),
shared_sequencer_client,
config: self.config,
signer: self.signer,
Expand All @@ -140,48 +142,30 @@ impl<S> Task<S>
where
S: Signer,
{
// This function is not cancel-safe because it calls `sleep` inside.
async fn blobs(&mut self) -> anyhow::Result<Option<SSBlobs>> {
/// Fetch latest account metadata if it's not set
async fn ensure_account_metadata(&mut self) -> anyhow::Result<()> {
if self.account_metadata.is_some() {
return Ok(());
}
let ss = self
.shared_sequencer_client
.as_ref()
.expect("Shared sequencer client is not set; qed");

if self.account_metadata.is_none() {
// If the account is not funded, this code will fail
// because we can't sign the transaction without account metadata.
let account_metadata = ss.get_account_meta(self.signer.as_ref()).await;

match account_metadata {
Ok(account_metadata) => {
self.account_metadata = Some(account_metadata);
}
Err(err) => {
// We don't want to spam the RPC endpoint with a lot of queries,
// so wait for one second before sending the next one.
tokio::time::sleep(Duration::from_secs(1)).await;
return Err(err);
}
}
}
.expect("Shared sequencer client is not set");
self.account_metadata = Some(ss.get_account_meta(self.signer.as_ref()).await?);
Ok(())
}

if self.prev_order.is_none() {
self.prev_order = ss.get_topic().await?.map(|f| f.order);
}

tokio::time::sleep(self.config.block_posting_frequency).await;

let blobs = {
let mut lock = self.blobs.lock().await;
core::mem::take(&mut *lock)
};

if blobs.is_empty() {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(None)
} else {
Ok(Some(blobs))
/// Fetch previous order in the topic if it's not set
async fn ensure_prev_order(&mut self) -> anyhow::Result<()> {
if self.prev_order.is_some() {
return Ok(());
}
let ss = self
.shared_sequencer_client
.as_ref()
.expect("Shared sequencer client is not set");
self.prev_order = ss.get_topic().await?.map(|f| f.order);
Ok(())
}
}

Expand All @@ -196,49 +180,46 @@ where
return TaskNextAction::Stop;
}

if let Err(err) = self.ensure_account_metadata().await {
// We don't want to spam the RPC endpoint with a lot of queries,
// so wait for one second before sending the next one.
tokio::time::sleep(Duration::from_secs(1)).await;
return TaskNextAction::ErrorContinue(err)
}
if let Err(err) = self.ensure_prev_order().await {
return TaskNextAction::ErrorContinue(err)
};

tokio::select! {
biased;
_ = watcher.while_started() => {
TaskNextAction::Stop
},

blobs = self.blobs() => {
let blobs = match blobs {
Ok(blobs) => blobs,
Err(err) => return TaskNextAction::ErrorContinue(err),
_ = self.interval.tick() => {
let blobs = {
let mut lock = self.blobs.lock().await;
core::mem::take(&mut *lock)
};
if blobs.is_empty() {
tokio::time::sleep(Duration::from_secs(1)).await;
return TaskNextAction::Continue;
};

// The `blobs` function is not cancel safe, as it calls sleep inside.
// If someone adds new logic into the `tokio::select`, please
// rework the `blobs` function to be cancel safe and use interval inside.
if let Some(blobs) = blobs {
let mut account = self.account_metadata.take().expect("Account metadata is not set; qed");
let next_order = if let Some(prev_order) = self.prev_order {
prev_order.wrapping_add(1)
} else {
0
};

let ss = self.shared_sequencer_client
.as_ref().expect("Shared sequencer client is not set; qed");
let blobs_bytes = postcard::to_allocvec(&blobs).expect("Failed to serialize SSBlob");
let result = ss.send(self.signer.as_ref(), account, next_order, blobs_bytes).await;
let mut account = self.account_metadata.take().expect("Account metadata is not set");
let next_order = self.prev_order.map(|prev| prev.wrapping_add(1)).unwrap_or(0);
let ss = self.shared_sequencer_client
.as_ref().expect("Shared sequencer client is not set");
let blobs_bytes = postcard::to_allocvec(&blobs).expect("Failed to serialize SSBlob");

match result {
Ok(_) => {
tracing::info!("Posted block to shared sequencer {blobs:?}");
account.sequence = account.sequence.saturating_add(1);
self.prev_order = Some(next_order);
self.account_metadata = Some(account);
TaskNextAction::Continue
}
Err(err) => {
TaskNextAction::ErrorContinue(err)
}
}
} else {
TaskNextAction::Continue
if let Err(err) = ss.send(self.signer.as_ref(), account, next_order, blobs_bytes).await {
TaskNextAction::ErrorContinue(err);
}

tracing::info!("Posted block to shared sequencer {blobs:?}");
account.sequence = account.sequence.saturating_add(1);
self.prev_order = Some(next_order);
self.account_metadata = Some(account);
TaskNextAction::Continue
},
}
}
Expand Down

0 comments on commit b3ea605

Please sign in to comment.