diff --git a/src/meta/src/barrier/context/recovery.rs b/src/meta/src/barrier/context/recovery.rs index b21b84b26eb4b..51f49f4898423 100644 --- a/src/meta/src/barrier/context/recovery.rs +++ b/src/meta/src/barrier/context/recovery.rs @@ -134,6 +134,38 @@ impl GlobalBarrierWorkerContextImpl { .await .context("clean dirty streaming jobs")?; + // This is a quick path to accelerate the process of dropping and canceling streaming jobs. + let _ = self.scheduled_barriers.pre_apply_drop_cancel(None); + + let mut active_streaming_nodes = + ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()) + .await?; + + // Resolve actor info for recovery. If there's no actor to recover, most of the + // following steps will be no-op, while the compute nodes will still be reset. + // FIXME: Transactions should be used. + // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere. + let mut info = if !self.env.opts.disable_automatic_parallelism_control { + info!("trigger offline scaling"); + self.scale_actors(&active_streaming_nodes) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "scale actors failed"); + })?; + + self.resolve_graph_info(None).await.inspect_err(|err| { + warn!(error = %err.as_report(), "resolve actor info failed"); + })? + } else { + info!("trigger actor migration"); + // Migrate actors in expired CN to newly joined one. + self.migrate_actors(&mut active_streaming_nodes) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "migrate actors failed"); + })? + }; + // Mview progress needs to be recovered. tracing::info!("recovering mview progress"); let background_jobs = { @@ -168,13 +200,6 @@ impl GlobalBarrierWorkerContextImpl { }; tracing::info!("recovered mview progress"); - // This is a quick path to accelerate the process of dropping and canceling streaming jobs. - let _ = self.scheduled_barriers.pre_apply_drop_cancel(None); - - let mut active_streaming_nodes = - ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()) - .await?; - let background_streaming_jobs = background_jobs.keys().cloned().collect_vec(); info!( "background streaming jobs: {:?} total {}", @@ -182,33 +207,6 @@ impl GlobalBarrierWorkerContextImpl { background_streaming_jobs.len() ); - // Resolve actor info for recovery. If there's no actor to recover, most of the - // following steps will be no-op, while the compute nodes will still be reset. - // FIXME: Transactions should be used. - // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere. - let mut info = if !self.env.opts.disable_automatic_parallelism_control - && background_streaming_jobs.is_empty() - { - info!("trigger offline scaling"); - self.scale_actors(&active_streaming_nodes) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "scale actors failed"); - })?; - - self.resolve_graph_info(None).await.inspect_err(|err| { - warn!(error = %err.as_report(), "resolve actor info failed"); - })? - } else { - info!("trigger actor migration"); - // Migrate actors in expired CN to newly joined one. - self.migrate_actors(&mut active_streaming_nodes) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "migrate actors failed"); - })? - }; - if self.scheduled_barriers.pre_apply_drop_cancel(None) { info = self.resolve_graph_info(None).await.inspect_err(|err| { warn!(error = %err.as_report(), "resolve actor info failed"); diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 197ffa5188647..b28cca28adbad 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -401,16 +401,6 @@ impl DdlController { deferred = true; } - if !deferred - && !self - .metadata_manager - .list_background_creating_jobs() - .await? - .is_empty() - { - bail!("The system is creating jobs in the background, please try again later") - } - self.stream_manager .alter_table_parallelism(table_id, parallelism.into(), deferred) .await diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 37868ad58b0d6..0fec6d954687e 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -2267,6 +2267,24 @@ impl ScaleController { Ok(()) } + + pub async fn resolve_related_no_shuffle_jobs( + &self, + jobs: &[TableId], + ) -> MetaResult> { + let RescheduleWorkingSet { related_jobs, .. } = self + .metadata_manager + .catalog_controller + .resolve_working_set_for_reschedule_tables( + jobs.iter().map(|id| id.table_id as _).collect(), + ) + .await?; + + Ok(related_jobs + .keys() + .map(|id| TableId::new(*id as _)) + .collect()) + } } /// At present, for table level scaling, we use the strategy `TableResizePolicy`. @@ -2359,23 +2377,31 @@ impl GlobalStreamManager { /// - `Ok(false)` if no jobs can be scaled; /// - `Ok(true)` if some jobs are scaled, and it is possible that there are more jobs can be scaled. async fn trigger_parallelism_control(&self) -> MetaResult { + tracing::info!("trigger parallelism control"); + + let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + let background_streaming_jobs = self .metadata_manager .list_background_creating_jobs() .await?; - if !background_streaming_jobs.is_empty() { - tracing::debug!( - "skipping parallelism control due to background jobs {:?}", - background_streaming_jobs - ); - // skip if there are background creating jobs - return Ok(true); - } + let skipped_jobs = if !background_streaming_jobs.is_empty() { + let jobs = self + .scale_controller + .resolve_related_no_shuffle_jobs(&background_streaming_jobs) + .await?; - tracing::info!("trigger parallelism control"); + tracing::info!( + "skipping parallelism control of background jobs {:?} and associated jobs {:?}", + background_streaming_jobs, + jobs + ); - let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + jobs + } else { + HashSet::new() + }; let table_parallelisms: HashMap<_, _> = { let streaming_parallelisms = self @@ -2386,6 +2412,7 @@ impl GlobalStreamManager { streaming_parallelisms .into_iter() + .filter(|(table_id, _)| !skipped_jobs.contains(&TableId::new(*table_id as _))) .map(|(table_id, parallelism)| { let table_parallelism = match parallelism { StreamingParallelism::Adaptive => TableParallelism::Adaptive, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index aca86c1745e73..47334b03127a7 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -617,6 +617,24 @@ impl GlobalStreamManager { ) -> MetaResult<()> { let _reschedule_job_lock = self.reschedule_lock_write_guard().await; + let background_jobs = self + .metadata_manager + .list_background_creating_jobs() + .await?; + + if !background_jobs.is_empty() { + let related_jobs = self + .scale_controller + .resolve_related_no_shuffle_jobs(&background_jobs) + .await?; + + for job in background_jobs { + if related_jobs.contains(&job) { + bail!("Cannot alter the job {} because the related job {} is currently being created", table_id, job.table_id); + } + } + } + let database_id = DatabaseId::new( self.metadata_manager .catalog_controller