Skip to content

Commit

Permalink
fix(meta): fix notification when dropping tables
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jan 3, 2025
1 parent c28ea2d commit bdee34e
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 51 deletions.
11 changes: 5 additions & 6 deletions src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,21 +194,20 @@ impl NotificationServiceImpl {
Ok((nodes, notification_version))
}

async fn get_tables_and_creating_tables_snapshot(
&self,
) -> MetaResult<(Vec<Table>, NotificationVersion)> {
async fn get_tables_snapshot(&self) -> MetaResult<(Vec<Table>, NotificationVersion)> {
let catalog_guard = self
.metadata_manager
.catalog_controller
.get_inner_read_guard()
.await;
let tables = catalog_guard.list_all_state_tables().await?;
let mut tables = catalog_guard.list_all_state_tables(None).await?;
tables.extend(catalog_guard.dropped_tables.values().cloned());
let notification_version = self.env.notification_manager().current_version().await;
Ok((tables, notification_version))
}

async fn compactor_subscribe(&self) -> MetaResult<MetaSnapshot> {
let (tables, catalog_version) = self.get_tables_and_creating_tables_snapshot().await?;
let (tables, catalog_version) = self.get_tables_snapshot().await?;

Ok(MetaSnapshot {
tables,
Expand Down Expand Up @@ -294,7 +293,7 @@ impl NotificationServiceImpl {
}

async fn hummock_subscribe(&self) -> MetaResult<MetaSnapshot> {
let (tables, catalog_version) = self.get_tables_and_creating_tables_snapshot().await?;
let (tables, catalog_version) = self.get_tables_snapshot().await?;
let hummock_version = self
.hummock_manager
.on_current_version(|version| version.into())
Expand Down
23 changes: 21 additions & 2 deletions src/meta/src/controller/catalog/drop_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::*;
use risingwave_pb::catalog::PbTable;

use super::*;
impl CatalogController {
pub async fn drop_relation(
&self,
object_type: ObjectType,
object_id: ObjectId,
drop_mode: DropMode,
) -> MetaResult<(ReleaseContext, NotificationVersion)> {
let inner = self.inner.write().await;
let mut inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let obj: PartialObject = Object::find_by_id(object_id)
.into_partial_model()
Expand Down Expand Up @@ -239,11 +240,21 @@ impl CatalogController {

// notify about them.
self.notify_users_update(user_infos).await;
let dropped_tables = inner
.list_all_state_tables(Some(to_drop_state_table_ids.iter().copied().collect()))
.await?;
inner.dropped_tables.extend(
dropped_tables
.into_iter()
.map(|t| (TableId::try_from(t.id).unwrap(), t)),
);
let relation_group = build_relation_group_for_delete(to_drop_objects);

let version = self
.notify_frontend(NotificationOperation::Delete, relation_group)
.await;
// Hummock observers and compactor observers are notified once the corresponding barrier is completed.
// They only need RelationInfo::Table.

let fragment_mappings = fragment_ids
.into_iter()
Expand Down Expand Up @@ -576,4 +587,12 @@ impl CatalogController {
.await;
Ok(version)
}

pub async fn complete_dropped_tables(
&self,
table_ids: impl Iterator<Item = TableId>,
) -> Vec<PbTable> {
let mut inner = self.inner.write().await;
inner.complete_dropped_tables(table_ids)
}
}
2 changes: 1 addition & 1 deletion src/meta/src/controller/catalog/list_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl CatalogController {

pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
let inner = self.inner.read().await;
inner.list_all_state_tables().await
inner.list_all_state_tables(None).await
}

pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult<Vec<TableId>> {
Expand Down
35 changes: 30 additions & 5 deletions src/meta/src/controller/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl CatalogController {
inner: RwLock::new(CatalogControllerInner {
db: meta_store.conn,
creating_table_finish_notifier: HashMap::new(),
dropped_tables: HashMap::new(),
}),
};

Expand All @@ -172,6 +173,8 @@ pub struct CatalogControllerInner {
/// On notifying, we can remove the entry from this map.
pub creating_table_finish_notifier:
HashMap<ObjectId, Vec<Sender<MetaResult<NotificationVersion>>>>,
/// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
pub dropped_tables: HashMap<TableId, PbTable>,
}

impl CatalogController {
Expand Down Expand Up @@ -616,11 +619,16 @@ impl CatalogControllerInner {
}

/// `list_all_tables` return all tables and internal tables.
pub async fn list_all_state_tables(&self) -> MetaResult<Vec<PbTable>> {
let table_objs = Table::find()
.find_also_related(Object)
.all(&self.db)
.await?;
/// `table_ids_filter` is used for filtering if it's set.
pub async fn list_all_state_tables(
&self,
table_ids_filter: Option<HashSet<TableId>>,
) -> MetaResult<Vec<PbTable>> {
let mut table_objs = Table::find().find_also_related(Object);
if let Some(table_ids_filter) = table_ids_filter {
table_objs = table_objs.filter(table::Column::TableId.is_in(table_ids_filter));
}
let table_objs = table_objs.all(&self.db).await?;

Ok(table_objs
.into_iter()
Expand Down Expand Up @@ -861,4 +869,21 @@ impl CatalogControllerInner {
.await?;
Ok(table_ids)
}

/// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
/// Returns the removed table copies.
pub(crate) fn complete_dropped_tables(
&mut self,
table_ids: impl Iterator<Item = TableId>,
) -> Vec<PbTable> {
let mut res = vec![];
for table_id in table_ids {
if let Some(t) = self.dropped_tables.remove(&table_id) {
res.push(t);
continue;
}
tracing::warn!("table {table_id} not found");
}
res
}
}
34 changes: 0 additions & 34 deletions src/meta/src/manager/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,45 +197,11 @@ impl NotificationManager {
.await
}

pub async fn notify_hummock_relation_info(
&self,
operation: Operation,
relation_info: RelationInfo,
) -> NotificationVersion {
self.notify_with_version(
SubscribeType::Hummock.into(),
operation,
Info::RelationGroup(RelationGroup {
relations: vec![Relation {
relation_info: relation_info.into(),
}],
}),
)
.await
}

pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
.await
}

pub async fn notify_compactor_relation_info(
&self,
operation: Operation,
relation_info: RelationInfo,
) -> NotificationVersion {
self.notify_with_version(
SubscribeType::Compactor.into(),
operation,
Info::RelationGroup(RelationGroup {
relations: vec![Relation {
relation_info: relation_info.into(),
}],
}),
)
.await
}

pub async fn notify_compute(&self, operation: Operation, info: Info) -> NotificationVersion {
self.notify_with_version(SubscribeType::Compute.into(), operation, info)
.await
Expand Down
38 changes: 35 additions & 3 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use risingwave_common::bail;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_meta_model::{ObjectId, WorkerId};
use risingwave_pb::catalog::{CreateType, Subscription, Table};
use risingwave_pb::meta::relation::PbRelationInfo;
use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
use risingwave_pb::meta::{PbRelation, PbRelationGroup};
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::Dispatcher;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -517,7 +520,7 @@ impl GlobalStreamManager {
|| !streaming_job_ids.is_empty()
|| !state_table_ids.is_empty()
{
let _ = self
let res = self
.barrier_scheduler
.run_command(
database_id,
Expand All @@ -528,8 +531,8 @@ impl GlobalStreamManager {
.collect(),
actors: removed_actors,
unregistered_state_table_ids: state_table_ids
.into_iter()
.map(|table_id| TableId::new(table_id as _))
.iter()
.map(|table_id| TableId::new(*table_id as _))
.collect(),
unregistered_fragment_ids: fragment_ids,
},
Expand All @@ -538,9 +541,38 @@ impl GlobalStreamManager {
.inspect_err(|err| {
tracing::error!(error = ?err.as_report(), "failed to run drop command");
});
if res.is_ok() {
self.post_dropping_streaming_jobs(state_table_ids).await;
}
}
}

async fn post_dropping_streaming_jobs(
&self,
state_table_ids: Vec<risingwave_meta_model::TableId>,
) {
let tables = self
.metadata_manager
.catalog_controller
.complete_dropped_tables(state_table_ids.into_iter())
.await;
let relations = tables
.into_iter()
.map(|t| PbRelation {
relation_info: Some(PbRelationInfo::Table(t)),
})
.collect();
let group = PbInfo::RelationGroup(PbRelationGroup { relations });
self.env
.notification_manager()
.notify_hummock(Operation::Delete, group.clone())
.await;
self.env
.notification_manager()
.notify_compactor(Operation::Delete, group)
.await;
}

/// Cancel streaming jobs and return the canceled table ids.
/// 1. Send cancel message to stream jobs (via `cancel_jobs`).
/// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).
Expand Down

0 comments on commit bdee34e

Please sign in to comment.