Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved wallet sync performance #1033

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

holtzman
Copy link
Collaborator

@holtzman holtzman commented Jan 8, 2025

Motivation

Since PR #1027, a performance issue has been noted when syncing a wallet-db that has a large number of txos tracked within. This is due to looking up the txo_id of each txo as it is synced from the ledger via the txos table using the public_key column, which is not indexed.

In this PR

  • when scanning the transaction_logs to see if a txo appearing in the ledger means a corresponding transaction_log entry should be transitioned from pending to succeeded, calculate the txo_id (which is a digest of the txo data that appeared in the ledger) rather than looking the txo_id up in the txos table, searching by public_key. This significantly relieved the performance issue, lowering sync times by several orders of magnitude.
  • to further increase sync throughput for imported (and re-syncing) accounts, a check was added to only do the above part of the sync when the account has pending transactions in transaction_log, and at least one of those transactions was submitted prior to the chunk of the ledger being synced was "mined."
  • during debugging, it was also noted that a good deal of time was being spent in the part of the sync loop where unspent txos were being checked against incoming key_images to see if they should be marked as spent. Further exploration indicated that an index of (account_id, spent_block_index) would greatly speed up the SQL query that returned all of the unspent key_images for an account, and this PR adds that index via a migration.

Copy link
Contributor

@nick-mobilecoin nick-mobilecoin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good, just want to be sure about the logic of only updating logs that have a submitted index after the current range.

Comment on lines +801 to +815
let lowest_pending_submitted_block_index: Vec<Option<i64>> = transaction_logs::table
.filter(transaction_logs::account_id.eq(&account_id.0))
.filter(transaction_logs::submitted_block_index.is_not_null())
.filter(transaction_logs::failed.eq(false)) // non-failed transactions
.filter(transaction_logs::finalized_block_index.is_null())
.select(diesel::dsl::min(transaction_logs::submitted_block_index))
.load(conn)?;

if lowest_pending_submitted_block_index.len() == 0
|| lowest_pending_submitted_block_index[0].is_none()
{
Ok(None)
} else {
Ok(Some(lowest_pending_submitted_block_index[0].unwrap() as u64))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⛏️ consider using get_result() as it returns one item

Suggested change
let lowest_pending_submitted_block_index: Vec<Option<i64>> = transaction_logs::table
.filter(transaction_logs::account_id.eq(&account_id.0))
.filter(transaction_logs::submitted_block_index.is_not_null())
.filter(transaction_logs::failed.eq(false)) // non-failed transactions
.filter(transaction_logs::finalized_block_index.is_null())
.select(diesel::dsl::min(transaction_logs::submitted_block_index))
.load(conn)?;
if lowest_pending_submitted_block_index.len() == 0
|| lowest_pending_submitted_block_index[0].is_none()
{
Ok(None)
} else {
Ok(Some(lowest_pending_submitted_block_index[0].unwrap() as u64))
}
let lowest_pending_submitted_block_index: Option<i64> = transaction_logs::table
.filter(transaction_logs::account_id.eq(&account_id.0))
.filter(transaction_logs::submitted_block_index.is_not_null())
.filter(transaction_logs::failed.eq(false)) // non-failed transactions
.filter(transaction_logs::finalized_block_index.is_null())
.select(diesel::dsl::min(transaction_logs::submitted_block_index))
.get_result(conn)?;
Ok(lowest_pending_submitted_block_index.map(|x| x as u64))

Comment on lines +810 to +814
|| lowest_pending_submitted_block_index[0].is_none()
{
Ok(None)
} else {
Ok(Some(lowest_pending_submitted_block_index[0].unwrap() as u64))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's usually a more idiomatic way to approach is_none() or is_some() followed by an unwrap().
Consider using something like:

Comment on lines +235 to +247
let lowest_pending_block_index =
TransactionLog::lowest_pending_block_index(&account_id, conn)?;
if lowest_pending_block_index.is_some()
&& lowest_pending_block_index.unwrap() <= end_block_index
{
tx_outs.iter().try_for_each(|(block_index, tx_out)| {
TransactionLog::update_pending_associated_with_txo_to_succeeded(
&txo.id,
&TxoID::from(tx_out).to_string(),
*block_index,
conn,
)
})
})?;
})?;
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not fond of the extra nesting in the suggestion but the is_some() followed by unwrap() is a bit of a code smell

Suggested change
let lowest_pending_block_index =
TransactionLog::lowest_pending_block_index(&account_id, conn)?;
if lowest_pending_block_index.is_some()
&& lowest_pending_block_index.unwrap() <= end_block_index
{
tx_outs.iter().try_for_each(|(block_index, tx_out)| {
TransactionLog::update_pending_associated_with_txo_to_succeeded(
&txo.id,
&TxoID::from(tx_out).to_string(),
*block_index,
conn,
)
})
})?;
})?;
};
if let Some(index) = TransactionLog::lowest_pending_block_index(&account_id, conn)? {
if index <= end_block_index {
tx_outs.iter().try_for_each(|(block_index, tx_out)| {
TransactionLog::update_pending_associated_with_txo_to_succeeded(
&TxoID::from(tx_out).to_string(),
*block_index,
conn,
)
})?;
}
}

Comment on lines +2170 to +2171
let known_recipients: Vec<PublicAddress> = Vec::new();
let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⛏️

Suggested change
let known_recipients: Vec<PublicAddress> = Vec::new();
let mut ledger_db = get_test_ledger(5, &known_recipients, 12, &mut rng);
let mut ledger_db = get_test_ledger(5, &[], 12, &mut rng);

Comment on lines +2173 to +2189
let first_account_key = random_account_with_seed_values(
&wallet_db,
&mut ledger_db,
&[70 * MOB, 75 * MOB],
&mut rng,
&logger,
);
let first_account_id = AccountID::from(&first_account_key);

let second_account_key = random_account_with_seed_values(
&wallet_db,
&mut ledger_db,
&[80 * MOB],
&mut rng,
&logger,
);
let second_account_id = AccountID::from(&second_account_key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⛏️ this will require renaming uses of first_account_key to account_keys[0] and second_account_key to account_keys[1], etc

Suggested change
let first_account_key = random_account_with_seed_values(
&wallet_db,
&mut ledger_db,
&[70 * MOB, 75 * MOB],
&mut rng,
&logger,
);
let first_account_id = AccountID::from(&first_account_key);
let second_account_key = random_account_with_seed_values(
&wallet_db,
&mut ledger_db,
&[80 * MOB],
&mut rng,
&logger,
);
let second_account_id = AccountID::from(&second_account_key);
let account_keys = [vec![70 * MOB, 75 * MOB], vec![80 * MOB]]
.iter()
.map(|seed_values| {
random_account_with_seed_values(
&wallet_db,
&mut ledger_db,
seed_values,
&mut rng,
&logger,
)
})
.collect::<Vec<_>>();
let account_ids = account_keys.iter().map(AccountID::from).collect::<Vec<_>>();

Comment on lines +2191 to +2193
// Build a first transaction from first_account
let mut pooled_conn = wallet_db.get_pooled_conn().unwrap();
let conn = pooled_conn.deref_mut();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider this comment and the lines immediately following it.
The comment talks about building a transaction, but the next two lines deal with creating a database connection.
Then there is a blank line, which often communicates a change in thought or logic

one will mentally group these 3 lines together as the same idea, somewhat independent of the rest of the code

Suggested change
// Build a first transaction from first_account
let mut pooled_conn = wallet_db.get_pooled_conn().unwrap();
let conn = pooled_conn.deref_mut();
let mut pooled_conn = wallet_db.get_pooled_conn().unwrap();
let conn = pooled_conn.deref_mut();

Comment on lines +2195 to +2337
&mut rng,
);

assert_eq!(ledger_db.num_blocks().unwrap(), 16);

// Build a second transaction from first_account
let (recipient, mut builder) =
builder_for_random_recipient(&first_account_key, &ledger_db, &mut rng);
builder
.add_recipient(recipient.clone(), 45 * MOB, Mob::ID)
.unwrap();
builder.set_tombstone(0).unwrap();
builder.select_txos(conn, None).unwrap();
let unsigned_tx_proposal = builder
.build(
TransactionMemo::RTH {
subaddress_index: None,
},
conn,
)
.unwrap();
let tx_proposal = unsigned_tx_proposal.clone().sign(&account).await.unwrap();

// Log submitted transaction from tx_proposal
let first_account_second_tx_log = TransactionLog::log_submitted(
&tx_proposal,
ledger_db.num_blocks().unwrap(),
"".to_string(),
&first_account_id.to_string(),
conn,
)
.unwrap();

let key_images: Vec<KeyImage> = tx_proposal
.input_txos
.iter()
.map(|txo| txo.key_image)
.collect();

// Note: This block doesn't contain the fee output.
add_block_with_tx_outs(
&mut ledger_db,
&[
tx_proposal.change_txos[0].tx_out.clone(),
tx_proposal.payload_txos[0].tx_out.clone(),
],
&key_images,
&mut rng,
);

assert_eq!(ledger_db.num_blocks().unwrap(), 17);

// build a transaction from second_account
let account = Account::get(&second_account_id, conn).unwrap();

let (recipient, mut builder) =
builder_for_random_recipient(&second_account_key, &ledger_db, &mut rng);
builder
.add_recipient(recipient.clone(), 55 * MOB, Mob::ID)
.unwrap();
builder.set_tombstone(0).unwrap();
builder.select_txos(conn, None).unwrap();
let unsigned_tx_proposal = builder
.build(
TransactionMemo::RTH {
subaddress_index: None,
},
conn,
)
.unwrap();
let tx_proposal = unsigned_tx_proposal.clone().sign(&account).await.unwrap();

// Log submitted transaction from tx_proposal
let second_account_tx_log = TransactionLog::log_submitted(
&tx_proposal,
ledger_db.num_blocks().unwrap(),
"".to_string(),
&second_account_id.to_string(),
conn,
)
.unwrap();

let key_images: Vec<KeyImage> = tx_proposal
.input_txos
.iter()
.map(|txo| txo.key_image)
.collect();

// Note: This block doesn't contain the fee output.
add_block_with_tx_outs(
&mut ledger_db,
&[
tx_proposal.change_txos[0].tx_out.clone(),
tx_proposal.payload_txos[0].tx_out.clone(),
],
&key_images,
&mut rng,
);

assert_eq!(ledger_db.num_blocks().unwrap(), 18);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be DRYed up some

Suggested change
let account = Account::get(&first_account_id, conn).unwrap();
let (recipient, mut builder) =
builder_for_random_recipient(&first_account_key, &ledger_db, &mut rng);
builder
.add_recipient(recipient.clone(), 50 * MOB, Mob::ID)
.unwrap();
builder.set_tombstone(0).unwrap();
builder.select_txos(conn, None).unwrap();
let unsigned_tx_proposal = builder
.build(
TransactionMemo::RTH {
subaddress_index: None,
},
conn,
)
.unwrap();
let tx_proposal = unsigned_tx_proposal.clone().sign(&account).await.unwrap();
// Log submitted transaction from tx_proposal
let first_account_first_tx_log = TransactionLog::log_submitted(
&tx_proposal,
ledger_db.num_blocks().unwrap(),
"".to_string(),
&first_account_id.to_string(),
conn,
)
.unwrap();
let key_images: Vec<KeyImage> = tx_proposal
.input_txos
.iter()
.map(|txo| txo.key_image)
.collect();
// Note: This block doesn't contain the fee output.
add_block_with_tx_outs(
&mut ledger_db,
&[
tx_proposal.change_txos[0].tx_out.clone(),
tx_proposal.payload_txos[0].tx_out.clone(),
],
&key_images,
&mut rng,
);
assert_eq!(ledger_db.num_blocks().unwrap(), 16);
// Build a second transaction from first_account
let (recipient, mut builder) =
builder_for_random_recipient(&first_account_key, &ledger_db, &mut rng);
builder
.add_recipient(recipient.clone(), 45 * MOB, Mob::ID)
.unwrap();
builder.set_tombstone(0).unwrap();
builder.select_txos(conn, None).unwrap();
let unsigned_tx_proposal = builder
.build(
TransactionMemo::RTH {
subaddress_index: None,
},
conn,
)
.unwrap();
let tx_proposal = unsigned_tx_proposal.clone().sign(&account).await.unwrap();
// Log submitted transaction from tx_proposal
let first_account_second_tx_log = TransactionLog::log_submitted(
&tx_proposal,
ledger_db.num_blocks().unwrap(),
"".to_string(),
&first_account_id.to_string(),
conn,
)
.unwrap();
let key_images: Vec<KeyImage> = tx_proposal
.input_txos
.iter()
.map(|txo| txo.key_image)
.collect();
// Note: This block doesn't contain the fee output.
add_block_with_tx_outs(
&mut ledger_db,
&[
tx_proposal.change_txos[0].tx_out.clone(),
tx_proposal.payload_txos[0].tx_out.clone(),
],
&key_images,
&mut rng,
);
assert_eq!(ledger_db.num_blocks().unwrap(), 17);
// build a transaction from second_account
let account = Account::get(&second_account_id, conn).unwrap();
let (recipient, mut builder) =
builder_for_random_recipient(&second_account_key, &ledger_db, &mut rng);
builder
.add_recipient(recipient.clone(), 55 * MOB, Mob::ID)
.unwrap();
builder.set_tombstone(0).unwrap();
builder.select_txos(conn, None).unwrap();
let unsigned_tx_proposal = builder
.build(
TransactionMemo::RTH {
subaddress_index: None,
},
conn,
)
.unwrap();
let tx_proposal = unsigned_tx_proposal.clone().sign(&account).await.unwrap();
// Log submitted transaction from tx_proposal
let second_account_tx_log = TransactionLog::log_submitted(
&tx_proposal,
ledger_db.num_blocks().unwrap(),
"".to_string(),
&second_account_id.to_string(),
conn,
)
.unwrap();
let key_images: Vec<KeyImage> = tx_proposal
.input_txos
.iter()
.map(|txo| txo.key_image)
.collect();
// Note: This block doesn't contain the fee output.
add_block_with_tx_outs(
&mut ledger_db,
&[
tx_proposal.change_txos[0].tx_out.clone(),
tx_proposal.payload_txos[0].tx_out.clone(),
],
&key_images,
&mut rng,
);
assert_eq!(ledger_db.num_blocks().unwrap(), 18);
let recipient_account_key = AccountKey::random(&mut rng);
let recipient = recipient_account_key.subaddress(rng.next_u64());
let mut logs = vec![];
for (account_key, value) in [(&account_keys[0], 50), (&account_keys[0], 45), (&account_keys[1], 55)] {
let (log, tx_proposal) = create_test_minted_and_change_txos(account_key.clone(), recipient.clone(), value * MOB, wallet_db.clone(), ledger_db.clone()).await;
let key_images: Vec<KeyImage> = tx_proposal
.input_txos
.iter()
.map(|txo| txo.key_image)
.collect();
// Note: This block doesn't contain the fee output.
add_block_with_tx_outs(
&mut ledger_db,
&[
tx_proposal.change_txos[0].tx_out.clone(),
tx_proposal.payload_txos[0].tx_out.clone(),
],
&key_images,
&mut rng,
);
logs.push(log);
}
assert_eq!(ledger_db.num_blocks().unwrap(), 18);

Comment on lines +2339 to +2383
// make sure the tests were set up correctly
assert_ne!(
first_account_first_tx_log.submitted_block_index,
first_account_second_tx_log.submitted_block_index
);
assert_ne!(
first_account_first_tx_log.submitted_block_index,
second_account_tx_log.submitted_block_index
);
assert_ne!(first_account_id.to_string(), second_account_id.to_string());

// chceck that the results are as expected prior to syncing
assert_eq!(
TransactionLog::lowest_pending_block_index(&first_account_id, conn).unwrap(),
Some(first_account_first_tx_log.submitted_block_index.unwrap() as u64)
);
assert_eq!(
TransactionLog::lowest_pending_block_index(&second_account_id, conn).unwrap(),
Some(second_account_tx_log.submitted_block_index.unwrap() as u64)
);

// sync the accounts
let _sync = manually_sync_account(&ledger_db, &wallet_db, &first_account_id, &logger);
let _sync = manually_sync_account(&ledger_db, &wallet_db, &second_account_id, &logger);

// check that the results are as expected after syncing
assert_eq!(
TransactionLog::lowest_pending_block_index(&first_account_id, conn).unwrap(),
None
);
assert_eq!(
TransactionLog::lowest_pending_block_index(&second_account_id, conn).unwrap(),
None
);
let updated_tx_log =
TransactionLog::get(&TransactionId::from(&first_account_first_tx_log), conn).unwrap();
assert_eq!(updated_tx_log.status(), TxStatus::Succeeded);

let updated_tx_log =
TransactionLog::get(&TransactionId::from(&first_account_second_tx_log), conn).unwrap();
assert_eq!(updated_tx_log.status(), TxStatus::Succeeded);

let updated_tx_log =
TransactionLog::get(&TransactionId::from(&second_account_tx_log), conn).unwrap();
assert_eq!(updated_tx_log.status(), TxStatus::Succeeded);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IFF the change to use of arrays is done above

Suggested change
// make sure the tests were set up correctly
assert_ne!(
first_account_first_tx_log.submitted_block_index,
first_account_second_tx_log.submitted_block_index
);
assert_ne!(
first_account_first_tx_log.submitted_block_index,
second_account_tx_log.submitted_block_index
);
assert_ne!(first_account_id.to_string(), second_account_id.to_string());
// chceck that the results are as expected prior to syncing
assert_eq!(
TransactionLog::lowest_pending_block_index(&first_account_id, conn).unwrap(),
Some(first_account_first_tx_log.submitted_block_index.unwrap() as u64)
);
assert_eq!(
TransactionLog::lowest_pending_block_index(&second_account_id, conn).unwrap(),
Some(second_account_tx_log.submitted_block_index.unwrap() as u64)
);
// sync the accounts
let _sync = manually_sync_account(&ledger_db, &wallet_db, &first_account_id, &logger);
let _sync = manually_sync_account(&ledger_db, &wallet_db, &second_account_id, &logger);
// check that the results are as expected after syncing
assert_eq!(
TransactionLog::lowest_pending_block_index(&first_account_id, conn).unwrap(),
None
);
assert_eq!(
TransactionLog::lowest_pending_block_index(&second_account_id, conn).unwrap(),
None
);
let updated_tx_log =
TransactionLog::get(&TransactionId::from(&first_account_first_tx_log), conn).unwrap();
assert_eq!(updated_tx_log.status(), TxStatus::Succeeded);
let updated_tx_log =
TransactionLog::get(&TransactionId::from(&first_account_second_tx_log), conn).unwrap();
assert_eq!(updated_tx_log.status(), TxStatus::Succeeded);
let updated_tx_log =
TransactionLog::get(&TransactionId::from(&second_account_tx_log), conn).unwrap();
assert_eq!(updated_tx_log.status(), TxStatus::Succeeded);
// make sure the tests were set up correctly
assert_ne!(
logs[0].submitted_block_index,
logs[1].submitted_block_index
);
assert_ne!(
logs[0].submitted_block_index,
logs[2].submitted_block_index
);
assert_ne!(account_ids[0].to_string(), account_ids[1].to_string());
// check that the results are as expected prior to syncing
assert_eq!(
TransactionLog::lowest_pending_block_index(&account_ids[0], conn).unwrap(),
Some(logs[0].submitted_block_index.unwrap() as u64)
);
assert_eq!(
TransactionLog::lowest_pending_block_index(&account_ids[1], conn).unwrap(),
Some(logs[2].submitted_block_index.unwrap() as u64)
);
for account_id in &account_ids {
manually_sync_account(&ledger_db, &wallet_db, &account_id, &logger);
assert_eq!(
TransactionLog::lowest_pending_block_index(&account_id, conn).unwrap(),
None
);
}
for log in logs {
let updated_log = TransactionLog::get(&TransactionId::from(&log), conn).unwrap();
assert_eq!(updated_log.status(), TxStatus::Succeeded);
}

Comment on lines +235 to +238
let lowest_pending_block_index =
TransactionLog::lowest_pending_block_index(&account_id, conn)?;
if lowest_pending_block_index.is_some()
&& lowest_pending_block_index.unwrap() <= end_block_index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the lowest_pending_block_index is an optimization intended to speed up re-syncing the full block chain for a client. I'm not sure how much we've regressed performance wise compared to pre 2.10.5 and only adding the TxoID::from() logic.

I want to be sure this logic is sound and isn't prone to re-submission issues

  1. Client calls submit_transaction when full service sees the block
  2. Full service sends the request to a consensus node, the node says I'm at block 10, and the tx looks good so far
  3. full service sets the submitted block based on 10
  4. Client calls submit_transaction again
  5. Full service sends the request to a consensus node, the node says I'm at block 11 and the txo looks good so far
  6. full service will set the submitted block based on 11

My understanding is the consensus node would return an error in an instance where the initial submit transaction landed and the client returned a newer block index

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants