Skip to content

Commit

Permalink
Use the generalized search index writer in text/vector search (#26324)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: fdac18df6edea6b1277a568e45c2d861ff90126d
  • Loading branch information
sjudd authored and Convex, Inc. committed May 28, 2024
1 parent 25bbff3 commit 74eae6d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 156 deletions.
11 changes: 9 additions & 2 deletions crates/database/src/index_workers/index_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,17 @@ pub enum SnapshotData<T> {
impl<T> SnapshotData<T> {
pub fn segments(self) -> Vec<T> {
match self {
SnapshotData::Unknown(_) | SnapshotData::SingleSegment(_) => vec![],
SnapshotData::MultiSegment(segments) => segments,
Self::Unknown(_) | Self::SingleSegment(_) => vec![],
Self::MultiSegment(segments) => segments,
}
}

pub fn require_multi_segment(self) -> anyhow::Result<Vec<T>> {
let Self::MultiSegment(segments) = self else {
anyhow::bail!("Not a multi segment type!");
};
Ok(segments)
}
}

impl<T> SnapshotData<T> {
Expand Down
36 changes: 24 additions & 12 deletions crates/database/src/index_workers/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ use crate::{
SegmentType,
SnapshotData,
},
search_flusher::IndexBuild,
search_flusher::{
IndexBuild,
IndexBuildResult,
},
MultiSegmentBackfillResult,
},
metrics::{
Expand Down Expand Up @@ -155,28 +158,37 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexMetadataWriter<RT, T> {
pub(crate) async fn commit_flush(
&self,
job: &IndexBuild<T>,
new_ts: Timestamp,
new_and_modified_segments: Vec<T::Segment>,
new_segment_id: Option<String>,
index_backfill_result: Option<MultiSegmentBackfillResult>,
) -> anyhow::Result<()> {
result: IndexBuildResult<T>,
) -> anyhow::Result<(T::Statistics, Option<T::Statistics>)> {
let IndexBuildResult {
snapshot_ts,
data,
total_stats,
new_segment_stats,
new_segment_id,
backfill_result,
} = result;

let inner = self.inner(SearchWriterLockWaiter::Flusher).await;
let segments = data.require_multi_segment()?;

if let Some(index_backfill_result) = index_backfill_result {
if let Some(index_backfill_result) = backfill_result {
inner
.commit_backfill_flush(
job,
new_ts,
new_and_modified_segments,
snapshot_ts,
segments,
new_segment_id,
index_backfill_result,
)
.await
.await?
} else {
inner
.commit_snapshot_flush(job, new_ts, new_and_modified_segments, new_segment_id)
.await
.commit_snapshot_flush(job, snapshot_ts, segments, new_segment_id)
.await?
}

Ok((total_stats, new_segment_stats))
}

async fn inner(&self, waiter: SearchWriterLockWaiter) -> MutexGuard<Inner<RT, T>> {
Expand Down
132 changes: 15 additions & 117 deletions crates/database/src/text_index_worker/flusher2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,25 @@ use std::{
#[cfg(any(test, feature = "testing"))]
use common::pause::PauseClient;
use common::{
bootstrap_model::index::{
text_index::{
FragmentedTextSegment,
TextBackfillCursor,
TextIndexBackfillState,
TextIndexSnapshot,
TextIndexSnapshotData,
TextIndexState,
TextSnapshotVersion,
},
IndexMetadata,
},
knobs::SEARCH_INDEX_SIZE_SOFT_LIMIT,
persistence::PersistenceReader,
runtime::Runtime,
types::TabletIndexName,
};
use keybroker::Identity;
use search::searcher::SegmentTermMetadataFetcher;
use search::{
metrics::SearchType,
searcher::SegmentTermMetadataFetcher,
};
use storage::Storage;
use sync_types::Timestamp;

use crate::{
index_workers::{
index_meta::{
SearchOnDiskState,
SnapshotData,
},
search_flusher::{
IndexBuild,
IndexBuildResult,
SearchFlusher,
SearchIndexLimits,
},
MultiSegmentBackfillResult,
writer::SearchIndexMetadataWriter,
},
metrics::search::{
log_documents_per_index,
Expand All @@ -52,7 +36,6 @@ use crate::{
TextSearchIndex,
},
Database,
SystemMetadataModel,
Token,
};

Expand All @@ -61,9 +44,9 @@ pub(crate) const FLUSH_RUNNING_LABEL: &str = "flush_running";

pub struct TextIndexFlusher2<RT: Runtime> {
flusher: SearchFlusher<RT, TextIndexConfigParser>,
database: Database<RT>,
storage: Arc<dyn Storage>,
segment_term_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>,
writer: SearchIndexMetadataWriter<RT, TextSearchIndex>,

#[allow(unused)]
#[cfg(any(test, feature = "testing"))]
Expand Down Expand Up @@ -135,17 +118,23 @@ impl<RT: Runtime> FlusherBuilder<RT> {

pub(crate) fn build(self) -> TextIndexFlusher2<RT> {
let flusher = SearchFlusher::new(
self.runtime,
self.runtime.clone(),
self.database.clone(),
self.reader,
self.storage.clone(),
self.limits,
);
let writer: SearchIndexMetadataWriter<RT, TextSearchIndex> = SearchIndexMetadataWriter::new(
self.runtime,
self.database,
self.storage.clone(),
SearchType::Text,
);
TextIndexFlusher2 {
flusher,
database: self.database,
storage: self.storage,
segment_term_metadata_fetcher: self.segment_term_metadata_fetcher,
writer,
#[cfg(any(test, feature = "testing"))]
should_terminate: self.should_terminate,
#[cfg(any(test, feature = "testing"))]
Expand Down Expand Up @@ -209,105 +198,14 @@ impl<RT: Runtime> TextIndexFlusher2<RT> {
.await?;
tracing::debug!("Built a text segment for: {result:#?}");

let IndexBuildResult {
snapshot_ts,
data,
total_stats,
new_segment_stats,
backfill_result,
..
} = result;
let (total_stats, new_segment_stats) = self.writer.commit_flush(&job, result).await?;

match data {
SnapshotData::Unknown(_) => {
anyhow::bail!("Created an unknown snapshot data type");
},
SnapshotData::SingleSegment(_) => {
anyhow::bail!("Created a single segment snapshot?");
},
SnapshotData::MultiSegment(segments) => {
self.write_search_metadata(job, snapshot_ts, segments, backfill_result)
.await?;
},
}
let num_indexed_documents = new_segment_stats.unwrap_or_default().num_indexed_documents;
log_documents_per_new_segment(num_indexed_documents);
log_documents_per_index(total_stats.num_indexed_documents as usize);
timer.finish();
Ok(num_indexed_documents)
}

fn get_new_disk_state(
backfill_result: Option<MultiSegmentBackfillResult>,
backfill_ts: Timestamp,
segments: Vec<FragmentedTextSegment>,
on_disk_state: SearchOnDiskState<TextSearchIndex>,
) -> TextIndexState {
if let Some(backfill_result) = backfill_result {
if backfill_result.is_backfill_complete {
TextIndexState::Backfilled(TextIndexSnapshot {
data: TextIndexSnapshotData::MultiSegment(segments),
ts: backfill_ts,
version: TextSnapshotVersion::V2UseStringIds,
})
} else {
let cursor = if let Some(cursor) = backfill_result.new_cursor {
Some(TextBackfillCursor {
cursor: cursor.internal_id(),
backfill_snapshot_ts: backfill_ts,
})
} else {
None
};
TextIndexState::Backfilling(TextIndexBackfillState { segments, cursor })
}
} else {
let snapshot = TextIndexSnapshot {
data: TextIndexSnapshotData::MultiSegment(segments),
ts: backfill_ts,
version: TextSnapshotVersion::V2UseStringIds,
};
let is_snapshotted = matches!(on_disk_state, SearchOnDiskState::SnapshottedAt(_));
if is_snapshotted {
TextIndexState::SnapshottedAt(snapshot)
} else {
TextIndexState::Backfilled(snapshot)
}
}
}

async fn write_search_metadata(
&self,
job: IndexBuild<TextSearchIndex>,
snapshot_ts: Timestamp,
segments: Vec<FragmentedTextSegment>,
backfill_result: Option<MultiSegmentBackfillResult>,
) -> anyhow::Result<()> {
let mut tx = self.database.begin(Identity::system()).await?;

let new_on_disk_state = Self::get_new_disk_state(
backfill_result,
snapshot_ts,
segments,
job.index_config.on_disk_state,
);

SystemMetadataModel::new_global(&mut tx)
.replace(
job.metadata_id,
IndexMetadata::new_search_index(
job.index_name,
job.index_config.developer_config,
new_on_disk_state,
)
.try_into()?,
)
.await?;
self.database
.commit_with_write_source(tx, "search_index_worker_build_index")
.await?;
Ok(())
}
}

#[cfg(test)]
Expand Down
26 changes: 1 addition & 25 deletions crates/database/src/vector_index_worker/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ use storage::Storage;
use super::vector_meta::BuildVectorIndexArgs;
use crate::{
index_workers::{
index_meta::SnapshotData,
search_flusher::{
IndexBuild,
IndexBuildResult,
SearchFlusher,
SearchIndexLimits,
},
Expand Down Expand Up @@ -132,29 +130,7 @@ impl<RT: Runtime> VectorIndexFlusher<RT> {
.await?;
tracing::debug!("Built a vector segment for: {result:#?}");

// 3. Update the vector index metadata.
let IndexBuildResult {
snapshot_ts,
data,
total_stats,
new_segment_stats,
new_segment_id,
backfill_result,
} = result;

match data {
SnapshotData::Unknown(_) => {
anyhow::bail!("Created an unknown snapshot data type");
},
SnapshotData::SingleSegment(_) => {
anyhow::bail!("Created a single segment snapshot?");
},
SnapshotData::MultiSegment(segments) => {
self.writer
.commit_flush(&job, snapshot_ts, segments, new_segment_id, backfill_result)
.await?;
},
}
let (total_stats, new_segment_stats) = self.writer.commit_flush(&job, result).await?;

let vectors_in_new_segment = new_segment_stats.unwrap_or_default().num_vectors;
metrics::vector::log_documents_per_index(total_stats.non_deleted_vectors);
Expand Down

0 comments on commit 74eae6d

Please sign in to comment.