diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs
index b766689cc9bf..35d5eb257fc8 100644
--- a/src/meta/service/src/notification_service.rs
+++ b/src/meta/service/src/notification_service.rs
@@ -194,21 +194,20 @@ impl NotificationServiceImpl {
Ok((nodes, notification_version))
}
- async fn get_tables_and_creating_tables_snapshot(
- &self,
- ) -> MetaResult<(Vec
, NotificationVersion)> {
+ async fn get_tables_snapshot(&self) -> MetaResult<(Vec, NotificationVersion)> {
let catalog_guard = self
.metadata_manager
.catalog_controller
.get_inner_read_guard()
.await;
- let tables = catalog_guard.list_all_state_tables().await?;
+ let mut tables = catalog_guard.list_all_state_tables(None).await?;
+ tables.extend(catalog_guard.dropped_tables.values().cloned());
let notification_version = self.env.notification_manager().current_version().await;
Ok((tables, notification_version))
}
async fn compactor_subscribe(&self) -> MetaResult {
- let (tables, catalog_version) = self.get_tables_and_creating_tables_snapshot().await?;
+ let (tables, catalog_version) = self.get_tables_snapshot().await?;
Ok(MetaSnapshot {
tables,
@@ -294,7 +293,7 @@ impl NotificationServiceImpl {
}
async fn hummock_subscribe(&self) -> MetaResult {
- let (tables, catalog_version) = self.get_tables_and_creating_tables_snapshot().await?;
+ let (tables, catalog_version) = self.get_tables_snapshot().await?;
let hummock_version = self
.hummock_manager
.on_current_version(|version| version.into())
diff --git a/src/meta/src/controller/catalog/drop_op.rs b/src/meta/src/controller/catalog/drop_op.rs
index 46928fcb487c..453cab1ac2b1 100644
--- a/src/meta/src/controller/catalog/drop_op.rs
+++ b/src/meta/src/controller/catalog/drop_op.rs
@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use super::*;
+use risingwave_pb::catalog::PbTable;
+use super::*;
impl CatalogController {
pub async fn drop_relation(
&self,
@@ -21,7 +22,7 @@ impl CatalogController {
object_id: ObjectId,
drop_mode: DropMode,
) -> MetaResult<(ReleaseContext, NotificationVersion)> {
- let inner = self.inner.write().await;
+ let mut inner = self.inner.write().await;
let txn = inner.db.begin().await?;
let obj: PartialObject = Object::find_by_id(object_id)
.into_partial_model()
@@ -239,11 +240,21 @@ impl CatalogController {
// notify about them.
self.notify_users_update(user_infos).await;
+ let dropped_tables = inner
+ .list_all_state_tables(Some(to_drop_state_table_ids.iter().copied().collect()))
+ .await?;
+ inner.dropped_tables.extend(
+ dropped_tables
+ .into_iter()
+ .map(|t| (TableId::try_from(t.id).unwrap(), t)),
+ );
let relation_group = build_relation_group_for_delete(to_drop_objects);
let version = self
.notify_frontend(NotificationOperation::Delete, relation_group)
.await;
+ // Hummock observers and compactor observers are notified once the corresponding barrier is completed.
+ // They only need RelationInfo::Table.
let fragment_mappings = fragment_ids
.into_iter()
@@ -576,4 +587,12 @@ impl CatalogController {
.await;
Ok(version)
}
+
+ pub async fn complete_dropped_tables(
+ &self,
+ table_ids: impl Iterator- ,
+ ) -> Vec {
+ let mut inner = self.inner.write().await;
+ inner.complete_dropped_tables(table_ids)
+ }
}
diff --git a/src/meta/src/controller/catalog/list_op.rs b/src/meta/src/controller/catalog/list_op.rs
index 387cc353c60a..e0f69f3294c7 100644
--- a/src/meta/src/controller/catalog/list_op.rs
+++ b/src/meta/src/controller/catalog/list_op.rs
@@ -104,7 +104,7 @@ impl CatalogController {
pub async fn list_all_state_tables(&self) -> MetaResult> {
let inner = self.inner.read().await;
- inner.list_all_state_tables().await
+ inner.list_all_state_tables(None).await
}
pub async fn list_readonly_table_ids(&self, schema_id: SchemaId) -> MetaResult> {
diff --git a/src/meta/src/controller/catalog/mod.rs b/src/meta/src/controller/catalog/mod.rs
index 9277eda30f0d..4cade80439be 100644
--- a/src/meta/src/controller/catalog/mod.rs
+++ b/src/meta/src/controller/catalog/mod.rs
@@ -146,6 +146,7 @@ impl CatalogController {
inner: RwLock::new(CatalogControllerInner {
db: meta_store.conn,
creating_table_finish_notifier: HashMap::new(),
+ dropped_tables: HashMap::new(),
}),
};
@@ -172,6 +173,8 @@ pub struct CatalogControllerInner {
/// On notifying, we can remove the entry from this map.
pub creating_table_finish_notifier:
HashMap>>>,
+ /// Tables have been dropped from the meta store, but the corresponding barrier remains unfinished.
+ pub dropped_tables: HashMap,
}
impl CatalogController {
@@ -616,11 +619,16 @@ impl CatalogControllerInner {
}
/// `list_all_tables` return all tables and internal tables.
- pub async fn list_all_state_tables(&self) -> MetaResult> {
- let table_objs = Table::find()
- .find_also_related(Object)
- .all(&self.db)
- .await?;
+ /// `table_ids_filter` is used for filtering if it's set.
+ pub async fn list_all_state_tables(
+ &self,
+ table_ids_filter: Option>,
+ ) -> MetaResult> {
+ let mut table_objs = Table::find().find_also_related(Object);
+ if let Some(table_ids_filter) = table_ids_filter {
+ table_objs = table_objs.filter(table::Column::TableId.is_in(table_ids_filter));
+ }
+ let table_objs = table_objs.all(&self.db).await?;
Ok(table_objs
.into_iter()
@@ -861,4 +869,21 @@ impl CatalogControllerInner {
.await?;
Ok(table_ids)
}
+
+ /// Since the tables have been dropped from both meta store and streaming jobs, this method removes those table copies.
+ /// Returns the removed table copies.
+ pub(crate) fn complete_dropped_tables(
+ &mut self,
+ table_ids: impl Iterator
- ,
+ ) -> Vec {
+ let mut res = vec![];
+ for table_id in table_ids {
+ if let Some(t) = self.dropped_tables.remove(&table_id) {
+ res.push(t);
+ continue;
+ }
+ tracing::warn!("table {table_id} not found");
+ }
+ res
+ }
}
diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs
index f3548389c09a..db145ceca325 100644
--- a/src/meta/src/manager/notification.rs
+++ b/src/meta/src/manager/notification.rs
@@ -197,45 +197,11 @@ impl NotificationManager {
.await
}
- pub async fn notify_hummock_relation_info(
- &self,
- operation: Operation,
- relation_info: RelationInfo,
- ) -> NotificationVersion {
- self.notify_with_version(
- SubscribeType::Hummock.into(),
- operation,
- Info::RelationGroup(RelationGroup {
- relations: vec![Relation {
- relation_info: relation_info.into(),
- }],
- }),
- )
- .await
- }
-
pub async fn notify_compactor(&self, operation: Operation, info: Info) -> NotificationVersion {
self.notify_with_version(SubscribeType::Compactor.into(), operation, info)
.await
}
- pub async fn notify_compactor_relation_info(
- &self,
- operation: Operation,
- relation_info: RelationInfo,
- ) -> NotificationVersion {
- self.notify_with_version(
- SubscribeType::Compactor.into(),
- operation,
- Info::RelationGroup(RelationGroup {
- relations: vec![Relation {
- relation_info: relation_info.into(),
- }],
- }),
- )
- .await
- }
-
pub async fn notify_compute(&self, operation: Operation, info: Info) -> NotificationVersion {
self.notify_with_version(SubscribeType::Compute.into(), operation, info)
.await
diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs
index aca86c1745e7..95e76d9f90c0 100644
--- a/src/meta/src/stream/stream_manager.rs
+++ b/src/meta/src/stream/stream_manager.rs
@@ -21,6 +21,9 @@ use risingwave_common::bail;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_meta_model::{ObjectId, WorkerId};
use risingwave_pb::catalog::{CreateType, Subscription, Table};
+use risingwave_pb::meta::relation::PbRelationInfo;
+use risingwave_pb::meta::subscribe_response::{Operation, PbInfo};
+use risingwave_pb::meta::{PbRelation, PbRelationGroup};
use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::Dispatcher;
use thiserror_ext::AsReport;
@@ -517,7 +520,7 @@ impl GlobalStreamManager {
|| !streaming_job_ids.is_empty()
|| !state_table_ids.is_empty()
{
- let _ = self
+ let res = self
.barrier_scheduler
.run_command(
database_id,
@@ -528,8 +531,8 @@ impl GlobalStreamManager {
.collect(),
actors: removed_actors,
unregistered_state_table_ids: state_table_ids
- .into_iter()
- .map(|table_id| TableId::new(table_id as _))
+ .iter()
+ .map(|table_id| TableId::new(*table_id as _))
.collect(),
unregistered_fragment_ids: fragment_ids,
},
@@ -538,9 +541,38 @@ impl GlobalStreamManager {
.inspect_err(|err| {
tracing::error!(error = ?err.as_report(), "failed to run drop command");
});
+ if res.is_ok() {
+ self.post_dropping_streaming_jobs(state_table_ids).await;
+ }
}
}
+ async fn post_dropping_streaming_jobs(
+ &self,
+ state_table_ids: Vec,
+ ) {
+ let tables = self
+ .metadata_manager
+ .catalog_controller
+ .complete_dropped_tables(state_table_ids.into_iter())
+ .await;
+ let relations = tables
+ .into_iter()
+ .map(|t| PbRelation {
+ relation_info: Some(PbRelationInfo::Table(t)),
+ })
+ .collect();
+ let group = PbInfo::RelationGroup(PbRelationGroup { relations });
+ self.env
+ .notification_manager()
+ .notify_hummock(Operation::Delete, group.clone())
+ .await;
+ self.env
+ .notification_manager()
+ .notify_compactor(Operation::Delete, group)
+ .await;
+ }
+
/// Cancel streaming jobs and return the canceled table ids.
/// 1. Send cancel message to stream jobs (via `cancel_jobs`).
/// 2. Send cancel message to recovered stream jobs (via `barrier_scheduler`).