From 74eae6d61dbdb653755616768562519270933d2d Mon Sep 17 00:00:00 2001 From: Sam Date: Tue, 28 May 2024 11:12:23 -0700 Subject: [PATCH] Use the generalized search index writer in text/vector search (#26324) GitOrigin-RevId: fdac18df6edea6b1277a568e45c2d861ff90126d --- .../database/src/index_workers/index_meta.rs | 11 +- crates/database/src/index_workers/writer.rs | 36 +++-- .../src/text_index_worker/flusher2.rs | 132 ++---------------- .../src/vector_index_worker/flusher.rs | 26 +--- 4 files changed, 49 insertions(+), 156 deletions(-) diff --git a/crates/database/src/index_workers/index_meta.rs b/crates/database/src/index_workers/index_meta.rs index 94fa80f8..be938a1c 100644 --- a/crates/database/src/index_workers/index_meta.rs +++ b/crates/database/src/index_workers/index_meta.rs @@ -217,10 +217,17 @@ pub enum SnapshotData { impl SnapshotData { pub fn segments(self) -> Vec { match self { - SnapshotData::Unknown(_) | SnapshotData::SingleSegment(_) => vec![], - SnapshotData::MultiSegment(segments) => segments, + Self::Unknown(_) | Self::SingleSegment(_) => vec![], + Self::MultiSegment(segments) => segments, } } + + pub fn require_multi_segment(self) -> anyhow::Result> { + let Self::MultiSegment(segments) = self else { + anyhow::bail!("Not a multi segment type!"); + }; + Ok(segments) + } } impl SnapshotData { diff --git a/crates/database/src/index_workers/writer.rs b/crates/database/src/index_workers/writer.rs index 11af5a10..80887535 100644 --- a/crates/database/src/index_workers/writer.rs +++ b/crates/database/src/index_workers/writer.rs @@ -47,7 +47,10 @@ use crate::{ SegmentType, SnapshotData, }, - search_flusher::IndexBuild, + search_flusher::{ + IndexBuild, + IndexBuildResult, + }, MultiSegmentBackfillResult, }, metrics::{ @@ -155,28 +158,37 @@ impl SearchIndexMetadataWriter { pub(crate) async fn commit_flush( &self, job: &IndexBuild, - new_ts: Timestamp, - new_and_modified_segments: Vec, - new_segment_id: Option, - index_backfill_result: Option, - ) -> anyhow::Result<()> { + result: IndexBuildResult, + ) -> anyhow::Result<(T::Statistics, Option)> { + let IndexBuildResult { + snapshot_ts, + data, + total_stats, + new_segment_stats, + new_segment_id, + backfill_result, + } = result; + let inner = self.inner(SearchWriterLockWaiter::Flusher).await; + let segments = data.require_multi_segment()?; - if let Some(index_backfill_result) = index_backfill_result { + if let Some(index_backfill_result) = backfill_result { inner .commit_backfill_flush( job, - new_ts, - new_and_modified_segments, + snapshot_ts, + segments, new_segment_id, index_backfill_result, ) - .await + .await? } else { inner - .commit_snapshot_flush(job, new_ts, new_and_modified_segments, new_segment_id) - .await + .commit_snapshot_flush(job, snapshot_ts, segments, new_segment_id) + .await? } + + Ok((total_stats, new_segment_stats)) } async fn inner(&self, waiter: SearchWriterLockWaiter) -> MutexGuard> { diff --git a/crates/database/src/text_index_worker/flusher2.rs b/crates/database/src/text_index_worker/flusher2.rs index ed020c1a..8e6e85e1 100644 --- a/crates/database/src/text_index_worker/flusher2.rs +++ b/crates/database/src/text_index_worker/flusher2.rs @@ -6,41 +6,25 @@ use std::{ #[cfg(any(test, feature = "testing"))] use common::pause::PauseClient; use common::{ - bootstrap_model::index::{ - text_index::{ - FragmentedTextSegment, - TextBackfillCursor, - TextIndexBackfillState, - TextIndexSnapshot, - TextIndexSnapshotData, - TextIndexState, - TextSnapshotVersion, - }, - IndexMetadata, - }, knobs::SEARCH_INDEX_SIZE_SOFT_LIMIT, persistence::PersistenceReader, runtime::Runtime, types::TabletIndexName, }; -use keybroker::Identity; -use search::searcher::SegmentTermMetadataFetcher; +use search::{ + metrics::SearchType, + searcher::SegmentTermMetadataFetcher, +}; use storage::Storage; -use sync_types::Timestamp; use crate::{ index_workers::{ - index_meta::{ - SearchOnDiskState, - SnapshotData, - }, search_flusher::{ IndexBuild, - IndexBuildResult, SearchFlusher, SearchIndexLimits, }, - MultiSegmentBackfillResult, + writer::SearchIndexMetadataWriter, }, metrics::search::{ log_documents_per_index, @@ -52,7 +36,6 @@ use crate::{ TextSearchIndex, }, Database, - SystemMetadataModel, Token, }; @@ -61,9 +44,9 @@ pub(crate) const FLUSH_RUNNING_LABEL: &str = "flush_running"; pub struct TextIndexFlusher2 { flusher: SearchFlusher, - database: Database, storage: Arc, segment_term_metadata_fetcher: Arc, + writer: SearchIndexMetadataWriter, #[allow(unused)] #[cfg(any(test, feature = "testing"))] @@ -135,17 +118,23 @@ impl FlusherBuilder { pub(crate) fn build(self) -> TextIndexFlusher2 { let flusher = SearchFlusher::new( - self.runtime, + self.runtime.clone(), self.database.clone(), self.reader, self.storage.clone(), self.limits, ); + let writer: SearchIndexMetadataWriter = SearchIndexMetadataWriter::new( + self.runtime, + self.database, + self.storage.clone(), + SearchType::Text, + ); TextIndexFlusher2 { flusher, - database: self.database, storage: self.storage, segment_term_metadata_fetcher: self.segment_term_metadata_fetcher, + writer, #[cfg(any(test, feature = "testing"))] should_terminate: self.should_terminate, #[cfg(any(test, feature = "testing"))] @@ -209,105 +198,14 @@ impl TextIndexFlusher2 { .await?; tracing::debug!("Built a text segment for: {result:#?}"); - let IndexBuildResult { - snapshot_ts, - data, - total_stats, - new_segment_stats, - backfill_result, - .. - } = result; + let (total_stats, new_segment_stats) = self.writer.commit_flush(&job, result).await?; - match data { - SnapshotData::Unknown(_) => { - anyhow::bail!("Created an unknown snapshot data type"); - }, - SnapshotData::SingleSegment(_) => { - anyhow::bail!("Created a single segment snapshot?"); - }, - SnapshotData::MultiSegment(segments) => { - self.write_search_metadata(job, snapshot_ts, segments, backfill_result) - .await?; - }, - } let num_indexed_documents = new_segment_stats.unwrap_or_default().num_indexed_documents; log_documents_per_new_segment(num_indexed_documents); log_documents_per_index(total_stats.num_indexed_documents as usize); timer.finish(); Ok(num_indexed_documents) } - - fn get_new_disk_state( - backfill_result: Option, - backfill_ts: Timestamp, - segments: Vec, - on_disk_state: SearchOnDiskState, - ) -> TextIndexState { - if let Some(backfill_result) = backfill_result { - if backfill_result.is_backfill_complete { - TextIndexState::Backfilled(TextIndexSnapshot { - data: TextIndexSnapshotData::MultiSegment(segments), - ts: backfill_ts, - version: TextSnapshotVersion::V2UseStringIds, - }) - } else { - let cursor = if let Some(cursor) = backfill_result.new_cursor { - Some(TextBackfillCursor { - cursor: cursor.internal_id(), - backfill_snapshot_ts: backfill_ts, - }) - } else { - None - }; - TextIndexState::Backfilling(TextIndexBackfillState { segments, cursor }) - } - } else { - let snapshot = TextIndexSnapshot { - data: TextIndexSnapshotData::MultiSegment(segments), - ts: backfill_ts, - version: TextSnapshotVersion::V2UseStringIds, - }; - let is_snapshotted = matches!(on_disk_state, SearchOnDiskState::SnapshottedAt(_)); - if is_snapshotted { - TextIndexState::SnapshottedAt(snapshot) - } else { - TextIndexState::Backfilled(snapshot) - } - } - } - - async fn write_search_metadata( - &self, - job: IndexBuild, - snapshot_ts: Timestamp, - segments: Vec, - backfill_result: Option, - ) -> anyhow::Result<()> { - let mut tx = self.database.begin(Identity::system()).await?; - - let new_on_disk_state = Self::get_new_disk_state( - backfill_result, - snapshot_ts, - segments, - job.index_config.on_disk_state, - ); - - SystemMetadataModel::new_global(&mut tx) - .replace( - job.metadata_id, - IndexMetadata::new_search_index( - job.index_name, - job.index_config.developer_config, - new_on_disk_state, - ) - .try_into()?, - ) - .await?; - self.database - .commit_with_write_source(tx, "search_index_worker_build_index") - .await?; - Ok(()) - } } #[cfg(test)] diff --git a/crates/database/src/vector_index_worker/flusher.rs b/crates/database/src/vector_index_worker/flusher.rs index 602ef91e..56b4e861 100644 --- a/crates/database/src/vector_index_worker/flusher.rs +++ b/crates/database/src/vector_index_worker/flusher.rs @@ -19,10 +19,8 @@ use storage::Storage; use super::vector_meta::BuildVectorIndexArgs; use crate::{ index_workers::{ - index_meta::SnapshotData, search_flusher::{ IndexBuild, - IndexBuildResult, SearchFlusher, SearchIndexLimits, }, @@ -132,29 +130,7 @@ impl VectorIndexFlusher { .await?; tracing::debug!("Built a vector segment for: {result:#?}"); - // 3. Update the vector index metadata. - let IndexBuildResult { - snapshot_ts, - data, - total_stats, - new_segment_stats, - new_segment_id, - backfill_result, - } = result; - - match data { - SnapshotData::Unknown(_) => { - anyhow::bail!("Created an unknown snapshot data type"); - }, - SnapshotData::SingleSegment(_) => { - anyhow::bail!("Created a single segment snapshot?"); - }, - SnapshotData::MultiSegment(segments) => { - self.writer - .commit_flush(&job, snapshot_ts, segments, new_segment_id, backfill_result) - .await?; - }, - } + let (total_stats, new_segment_stats) = self.writer.commit_flush(&job, result).await?; let vectors_in_new_segment = new_segment_stats.unwrap_or_default().num_vectors; metrics::vector::log_documents_per_index(total_stats.non_deleted_vectors);