Skip to content

Commit

Permalink
Set retention_running before starting retention in IndexWorker (#24076)
Browse files Browse the repository at this point in the history
Allow retention worker to process the backfilled index while we are caching up on retention.

GitOrigin-RevId: 25a5e0eafc64e8391d0504abc8df7d7afbad41ff
  • Loading branch information
Preslav Le authored and Convex, Inc. committed Apr 3, 2024
1 parent 0e01a7c commit 3e3302b
Showing 1 changed file with 64 additions and 35 deletions.
99 changes: 64 additions & 35 deletions crates/database/src/index_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,28 +343,41 @@ impl<RT: Runtime> IndexWorker<RT> {
index_documents.values(),
self.persistence_version,
)?;
let index_name = self.begin_backfill(index_id).await?;
let index_selector = IndexSelector::Index {
name: index_name,
id: index_id,
};
self.index_writer
.perform_backfill(
self.database.now_ts_for_reads(),
&index_registry,
index_selector,
)
.await?;

// If retention is already started, we have already done with the initial
// step of the backfill.
let (index_name, retention_started) = self.begin_backfill(index_id).await?;
if !retention_started {
log::info!("Starting backfill of index {}", index_name);
let index_selector = IndexSelector::Index {
name: index_name,
id: index_id,
};
self.index_writer
.perform_backfill(
self.database.now_ts_for_reads(),
&index_registry,
index_selector,
)
.await?;
}

// Run retention.
let (backfill_begin_ts, index_name, indexed_fields) =
self.begin_retention(index_id).await?;
log::info!("Started running retention for index {}", index_name);
self.index_writer
.run_retention(index_id, backfill_begin_ts, index_name, indexed_fields)
.await?;

self.finish_backfill(index_id).await?;
Ok(())
}

async fn begin_backfill(&mut self, index_id: IndexId) -> anyhow::Result<TabletIndexName> {
async fn begin_backfill(
&mut self,
index_id: IndexId,
) -> anyhow::Result<(TabletIndexName, bool)> {
let mut tx = self.database.begin(Identity::system()).await?;
let index_table_id = tx.bootstrap_tables().index_id;

Expand All @@ -382,21 +395,23 @@ impl<RT: Runtime> IndexWorker<RT> {
// the state to still be `Backfilling` here. If this assertion fails, we
// somehow raced with another `IndexWorker`(!) or don't actually have the
// database lease (!).
match &index_metadata.config {
IndexConfig::Database { on_disk_state, .. } => anyhow::ensure!(
matches!(*on_disk_state, DatabaseIndexState::Backfilling(_)),
"IndexWorker started backfilling index {index_metadata:?} not in Backfilling state",
),
let retention_started = match &index_metadata.config {
IndexConfig::Database { on_disk_state, .. } => {
let DatabaseIndexState::Backfilling(state) = on_disk_state else {
anyhow::bail!(
"IndexWorker started backfilling index {index_metadata:?} not in \
Backfilling state"
);
};
state.retention_started
},
_ => anyhow::bail!(
"IndexWorker attempted to backfill an index {index_metadata:?} which wasn't a \
database index."
),
};

let ts = tx.begin_timestamp();
drop(tx);
log::info!("Starting backfill of index {} @ {ts}", index_metadata.name);
Ok(index_metadata.name.clone())
Ok((index_metadata.name.clone(), retention_started))
}

async fn begin_retention(
Expand All @@ -410,34 +425,48 @@ impl<RT: Runtime> IndexWorker<RT> {
.get_with_ts(ResolvedDocumentId::new(index_table_id, index_id))
.await?
.ok_or_else(|| anyhow::anyhow!("Index {index_id:?} no longer exists"))?;
let index_metadata = TabletIndexMetadata::from_document(index_doc)?;
let mut index_metadata = TabletIndexMetadata::from_document(index_doc)?;

// Assuming that the IndexWorker is the only writer of index state, we expect
// the state to still be `Backfilling` here. If this assertion fails, we
// somehow raced with another `IndexWorker`(!) or don't actually have the
// database lease (!).
let indexed_fields = match &index_metadata.config {
let (index_ts, indexed_fields) = match &mut index_metadata.config {
IndexConfig::Database {
on_disk_state,
developer_config,
} => {
anyhow::ensure!(
matches!(*on_disk_state, DatabaseIndexState::Backfilling(_)),
"IndexWorker started backfilling index {index_metadata:?} not in Backfilling \
state",
);
developer_config.fields.clone()
let DatabaseIndexState::Backfilling(state) = on_disk_state else {
anyhow::bail!(
"IndexWorker started backfilling index {index_metadata:?} not in \
Backfilling state"
)
};

state.retention_started = true;

// TODO(presley): Remove the fallback to commit_ts.
let WriteTimestamp::Committed(committed_ts) = index_ts else {
anyhow::bail!("index {index_id} is pending write");
};
let index_created = state.index_created_lower_bound.unwrap_or(committed_ts);
(index_created, developer_config.fields.clone())
},
_ => anyhow::bail!(
"IndexWorker attempted to backfill an index {index_metadata:?} which wasn't a \
database index."
),
};
drop(tx);
let WriteTimestamp::Committed(index_ts) = index_ts else {
anyhow::bail!("index {index_id} is pending write");
};
Ok((index_ts, index_metadata.name.clone(), indexed_fields))

let name = index_metadata.name.clone();
SystemMetadataModel::new(&mut tx)
.replace(index_metadata.id(), index_metadata.into_value().try_into()?)
.await?;
self.database
.commit_with_write_source(tx, "index_worker_start_retention")
.await?;

Ok((index_ts, name, indexed_fields))
}

async fn finish_backfill(&mut self, index_id: IndexId) -> anyhow::Result<()> {
Expand Down

0 comments on commit 3e3302b

Please sign in to comment.