Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Chain #451

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bin/dolos/bootstrap/mithril.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ pub fn run(config: &crate::Config, args: &Args, feedback: &Feedback) -> miette::

import_hardano_into_wal(config, &immutable_path, feedback)?;

crate::doctor::run_rebuild_ledger(config, feedback).context("rebuilding ledger")?;
crate::doctor::run_rebuild(config, feedback).context("rebuilding ledger and chain")?;

if !args.retain_snapshot {
info!("deleting downloaded snapshot");
Expand Down
19 changes: 16 additions & 3 deletions src/bin/dolos/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use dolos::{ledger::pparams::Genesis, state, wal};
use dolos::{chain, ledger::pparams::Genesis, state, wal};
use miette::{Context as _, IntoDiagnostic};
use std::{path::PathBuf, time::Duration};
use tokio::task::JoinHandle;
Expand All @@ -10,7 +10,7 @@ use dolos::prelude::*;

use crate::{GenesisConfig, LoggingConfig};

pub type Stores = (wal::redb::WalStore, state::LedgerStore);
pub type Stores = (wal::redb::WalStore, state::LedgerStore, chain::ChainStore);

pub fn open_wal(config: &crate::Config) -> Result<wal::redb::WalStore, Error> {
let root = &config.storage.path;
Expand All @@ -36,6 +36,15 @@ pub fn define_ledger_path(config: &crate::Config) -> Result<PathBuf, Error> {
Ok(ledger)
}

pub fn define_chain_path(config: &crate::Config) -> Result<PathBuf, Error> {
let root = &config.storage.path;
std::fs::create_dir_all(root).map_err(Error::storage)?;

let ledger = root.join("chain");

Ok(ledger)
}

pub fn open_data_stores(config: &crate::Config) -> Result<Stores, Error> {
let root = &config.storage.path;

Expand All @@ -52,7 +61,11 @@ pub fn open_data_stores(config: &crate::Config) -> Result<Stores, Error> {
.map_err(Error::storage)?
.into();

Ok((wal, ledger))
let chain = chain::redb::ChainStore::open(root.join("chain"), config.storage.chain_cache)
.map_err(Error::storage)?
.into();

Ok((wal, ledger, chain))
}

pub fn setup_tracing(config: &LoggingConfig) -> miette::Result<()> {
Expand Down
4 changes: 3 additions & 1 deletion src/bin/dolos/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub struct Args {}
pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (wal, ledger) = crate::common::open_data_stores(&config)?;
let (wal, ledger, chain) = crate::common::open_data_stores(&config)?;
let genesis = Arc::new(crate::common::open_genesis_files(&config.genesis)?);
let mempool = dolos::mempool::Mempool::new(genesis.clone(), ledger.clone());
let exit = crate::common::hook_exit_token();
Expand All @@ -21,6 +21,7 @@ pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {
&config.storage,
wal.clone(),
ledger.clone(),
chain.clone(),
genesis.clone(),
mempool.clone(),
&config.retries,
Expand All @@ -40,6 +41,7 @@ pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {
genesis.clone(),
wal.clone(),
ledger.clone(),
chain.clone(),
mempool.clone(),
exit.clone(),
));
Expand Down
2 changes: 1 addition & 1 deletion src/bin/dolos/data/copy_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Args {
pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (source, _) = crate::common::open_data_stores(config).context("opening data stores")?;
let source = crate::common::open_wal(config).context("opening data stores")?;

let mut target = dolos::wal::redb::WalStore::open(&args.output, None, None)
.into_diagnostic()
Expand Down
2 changes: 1 addition & 1 deletion src/bin/dolos/data/dump_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Formatter {
pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (wal, _) = crate::common::open_data_stores(config).context("opening data stores")?;
let wal = crate::common::open_wal(config).context("opening data stores")?;

let mut formatter = Formatter::new_table();

Expand Down
29 changes: 28 additions & 1 deletion src/bin/dolos/data/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ fn prepare_ledger(
Ok(())
}

fn prepare_chain(
chain: dolos::chain::ChainStore,
pb: &crate::feedback::ProgressBar,
) -> miette::Result<()> {
let mut chain = match chain {
dolos::chain::ChainStore::Redb(x) => x,
_ => miette::bail!("Only redb is supported for export"),
};

let db = chain.db_mut().unwrap();
pb.set_message("compacting ledger");
db.compact().into_diagnostic()?;

pb.set_message("checking ledger integrity");
db.check_integrity().into_diagnostic()?;

Ok(())
}

pub fn run(
config: &crate::Config,
args: &Args,
Expand All @@ -58,7 +77,7 @@ pub fn run(
let encoder = GzEncoder::new(export_file, Compression::default());
let mut archive = Builder::new(encoder);

let (wal, ledger) = crate::common::open_data_stores(config)?;
let (wal, ledger, chain) = crate::common::open_data_stores(config)?;

prepare_wal(wal, &pb)?;

Expand All @@ -76,6 +95,14 @@ pub fn run(
.append_path_with_name(&path, "ledger")
.into_diagnostic()?;

prepare_chain(chain, &pb)?;

let path = config.storage.path.join("chain");

archive
.append_path_with_name(&path, "chain")
.into_diagnostic()?;

pb.set_message("creating archive");
archive.finish().into_diagnostic()?;

Expand Down
2 changes: 1 addition & 1 deletion src/bin/dolos/data/prune_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct Args {
pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (mut wal, _) = crate::common::open_data_stores(config).context("opening data stores")?;
let mut wal = crate::common::open_wal(config).context("opening data stores")?;

let max_slots = match args.max_slots {
Some(x) => x,
Expand Down
2 changes: 1 addition & 1 deletion src/bin/dolos/data/summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct Args {}
pub fn run(config: &crate::Config, _args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (wal, ledger) = crate::common::open_data_stores(config)?;
let (wal, ledger, _) = crate::common::open_data_stores(config)?;

if let Some((seq, point)) = wal.crawl_from(None).unwrap().next() {
println!("found WAL start");
Expand Down
12 changes: 6 additions & 6 deletions src/bin/dolos/doctor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use clap::{Parser, Subcommand};

use crate::feedback::Feedback;

mod rebuild_ledger;
mod rebuild;
mod wal_integrity;

#[derive(Debug, Subcommand)]
pub enum Command {
/// rebuilds the whole ledger from chain data
RebuildLedger(rebuild_ledger::Args),
/// rebuilds ledger and chain from WAL
Rebuild(rebuild::Args),
/// checks the integrity of the WAL records
WalIntegrity(wal_integrity::Args),
}
Expand All @@ -21,13 +21,13 @@ pub struct Args {

pub fn run(config: &super::Config, args: &Args, feedback: &Feedback) -> miette::Result<()> {
match &args.command {
Command::RebuildLedger(x) => rebuild_ledger::run(config, x, feedback)?,
Command::Rebuild(x) => rebuild::run(config, x, feedback)?,
Command::WalIntegrity(x) => wal_integrity::run(config, x)?,
}

Ok(())
}

pub fn run_rebuild_ledger(config: &super::Config, feedback: &Feedback) -> miette::Result<()> {
rebuild_ledger::run(config, &rebuild_ledger::Args, feedback)
pub fn run_rebuild(config: &super::Config, feedback: &Feedback) -> miette::Result<()> {
rebuild::run(config, &rebuild::Args { chunk: 500 }, feedback)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use dolos::{
ledger,
wal::{self, RawBlock, ReadUtils, WalReader as _},
};
use dolos::wal::{self, RawBlock, ReadUtils, WalReader as _};
use itertools::Itertools;
use miette::{Context, IntoDiagnostic};
use pallas::ledger::traverse::MultiEraBlock;
Expand All @@ -10,9 +7,12 @@ use tracing::debug;
use crate::feedback::Feedback;

#[derive(Debug, clap::Args)]
pub struct Args;
pub struct Args {
#[arg(short, long, default_value_t = 500)]
pub chunk: usize,
}

pub fn run(config: &crate::Config, _args: &Args, feedback: &Feedback) -> miette::Result<()> {
pub fn run(config: &crate::Config, args: &Args, feedback: &Feedback) -> miette::Result<()> {
//crate::common::setup_tracing(&config.logging)?;

let progress = feedback.slot_progress_bar();
Expand Down Expand Up @@ -43,6 +43,11 @@ pub fn run(config: &crate::Config, _args: &Args, feedback: &Feedback) -> miette:
.context("applying origin utxos")?;
}

let chain_path = crate::common::define_chain_path(config).context("finding chain path")?;
let chain = dolos::chain::redb::ChainStore::open(chain_path, None)
.into_diagnostic()
.context("opening chain store.")?;

let (_, tip) = wal
.find_tip()
.into_diagnostic()
Expand All @@ -54,24 +59,15 @@ pub fn run(config: &crate::Config, _args: &Args, feedback: &Feedback) -> miette:
wal::ChainPoint::Specific(slot, _) => progress.set_length(slot),
}

let wal_seq = light
.cursor()
.into_diagnostic()
.context("finding ledger cursor")?
.map(|ledger::ChainPoint(s, h)| wal.assert_point(&wal::ChainPoint::Specific(s, h)))
.transpose()
.into_diagnostic()
.context("locating wal sequence")?;

let remaining = wal
.crawl_from(wal_seq)
.crawl_from(None)
.into_diagnostic()
.context("crawling wal")?
.filter_forward()
.into_blocks()
.flatten();

for chunk in remaining.chunks(100).into_iter() {
for chunk in remaining.chunks(args.chunk).into_iter() {
let bodies = chunk.map(|RawBlock { body, .. }| body).collect_vec();

let blocks: Vec<_> = bodies
Expand All @@ -81,8 +77,17 @@ pub fn run(config: &crate::Config, _args: &Args, feedback: &Feedback) -> miette:
.into_diagnostic()
.context("decoding blocks")?;

dolos::state::apply_block_batch(
&blocks,
let deltas = dolos::state::calculate_block_batch_deltas(&blocks, &light)
.into_diagnostic()
.context("calculating batch deltas.")?;

chain
.apply(&deltas)
.into_diagnostic()
.context("applying deltas to chain")?;

dolos::state::apply_delta_batch(
deltas,
&light,
&genesis,
config.storage.max_ledger_history,
Expand Down
2 changes: 1 addition & 1 deletion src/bin/dolos/doctor/wal_integrity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub fn run(config: &crate::Config, _args: &Args) -> miette::Result<()> {

let feedback = Feedback::default();

let (wal, _) = crate::common::open_data_stores(config).context("opening data stores")?;
let wal = crate::common::open_wal(config).context("opening data stores")?;

let (_, tip) = wal
.find_tip()
Expand Down
2 changes: 1 addition & 1 deletion src/bin/dolos/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct Args {
pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (_, ledger) = crate::common::open_data_stores(config)?;
let (_, ledger, _) = crate::common::open_data_stores(config)?;

let cbor = std::fs::read_to_string(&args.file)
.into_diagnostic()
Expand Down
4 changes: 2 additions & 2 deletions src/bin/dolos/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ pub struct Args {}
pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (wal, ledger) = crate::common::open_data_stores(&config)?;
let (wal, ledger, chain) = crate::common::open_data_stores(&config)?;
let genesis = Arc::new(crate::common::open_genesis_files(&config.genesis)?);
let mempool = dolos::mempool::Mempool::new(genesis.clone(), ledger.clone());
let exit = crate::common::hook_exit_token();

dolos::serve::serve(config.serve, genesis, wal, ledger, mempool, exit)
dolos::serve::serve(config.serve, genesis, wal, ledger, chain, mempool, exit)
.await
.context("serving clients")?;

Expand Down
3 changes: 2 additions & 1 deletion src/bin/dolos/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct Args {
pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> {
crate::common::setup_tracing(&config.logging)?;

let (wal, ledger) = crate::common::open_data_stores(config)?;
let (wal, ledger, chain) = crate::common::open_data_stores(config)?;
let genesis = Arc::new(crate::common::open_genesis_files(&config.genesis)?);
let mempool = dolos::mempool::Mempool::new(genesis.clone(), ledger.clone());

Expand All @@ -22,6 +22,7 @@ pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> {
&config.storage,
wal,
ledger,
chain,
genesis,
mempool,
&config.retries,
Expand Down
57 changes: 57 additions & 0 deletions src/chain/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use thiserror::Error;

use crate::ledger::BrokenInvariant;

#[derive(Debug, Error)]
pub enum ChainError {
#[error("broken invariant")]
BrokenInvariant(#[source] BrokenInvariant),

#[error("storage error")]
StorageError(#[source] ::redb::Error),

#[error("address decoding error")]
AddressDecoding(pallas::ledger::addresses::Error),

#[error("query not supported")]
QueryNotSupported,

#[error("invalid store version")]
InvalidStoreVersion,

#[error("decoding error")]
DecodingError(#[source] pallas::codec::minicbor::decode::Error),

#[error("block decoding error")]
BlockDecodingError(#[source] pallas::ledger::traverse::Error),
}

impl From<::redb::TableError> for ChainError {
fn from(value: ::redb::TableError) -> Self {
Self::StorageError(value.into())
}
}

impl From<::redb::CommitError> for ChainError {
fn from(value: ::redb::CommitError) -> Self {
Self::StorageError(value.into())
}
}

impl From<::redb::StorageError> for ChainError {
fn from(value: ::redb::StorageError) -> Self {
Self::StorageError(value.into())
}
}

impl From<::redb::TransactionError> for ChainError {
fn from(value: ::redb::TransactionError) -> Self {
Self::StorageError(value.into())
}
}

impl From<pallas::ledger::addresses::Error> for ChainError {
fn from(value: pallas::ledger::addresses::Error) -> Self {
Self::AddressDecoding(value)
}
}
Loading