From ebf607b1a9ec23cac75fdf7c889df2d1053869e6 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Thu, 11 Apr 2024 14:57:13 +0700 Subject: [PATCH] Disable PN if no cameras have it enabled --- src/common/reactor.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/common/reactor.rs b/src/common/reactor.rs index 6c90b36f..baf53769 100644 --- a/src/common/reactor.rs +++ b/src/common/reactor.rs @@ -43,10 +43,11 @@ impl NeoReactor { let cancel = CancellationToken::new(); let (config_tx, _) = watch(config); let mut set = JoinSet::new(); + let config_tx = Arc::new(config_tx); let cancel1 = cancel.clone(); let cancel2 = cancel.clone(); - let config_tx = Arc::new(config_tx); + let thread_config_tx = config_tx.clone(); set.spawn(async move { let mut instances: HashMap = Default::default(); @@ -65,14 +66,14 @@ impl NeoReactor { return Result::<(), anyhow::Error>::Ok(()); } NeoReactorCommand::Config(reply) => { - let _ = reply.send(config_tx.subscribe()); + let _ = reply.send(thread_config_tx.subscribe()); } NeoReactorCommand::Get(name, sender) => { let new = match instances.entry(name.clone()) { Entry::Occupied(occ) => Result::Ok(Some(occ.get().subscribe().await?)), Entry::Vacant(vac) => { log::debug!("Inserting new insance"); - let current_config: Config = (*config_tx.borrow()).clone(); + let current_config: Config = (*thread_config_tx.borrow()).clone(); if let Some(config) = current_config.cameras.iter().find(|cam| cam.name == name).cloned() { let cam = NeoCam::new(config, push_noti.clone()).await?; log::debug!("New instance created"); @@ -103,7 +104,7 @@ impl NeoReactor { } // Set the new conf - let _ = config_tx.send_replace(new_conf); + let _ = thread_config_tx.send_replace(new_conf); // Reply that we are done let _ = reply.send(Ok(())); } @@ -117,14 +118,19 @@ impl NeoReactor { // Push notification client let cancel1 = cancel.clone(); + let mut thread_config_rx = config_tx.subscribe(); set.spawn(async move { let r = tokio::select! { _ = cancel1.cancelled() => AnyResult::Ok(()), v = async { - let mut pn = PushNotiThread::new().await?; + let mut pn = PushNotiThread::new().await?; loop { - let r = pn.run(&pn_tx, &mut pn_rx).await; + thread_config_rx.wait_for(|c| c.cameras.iter().any(|cam| cam.push_notifications)).await?; // Wait until PN are enabled + let r = tokio::select!{ + v = pn.run(&pn_tx, &mut pn_rx) => {v}, + _ = thread_config_rx.wait_for(|c| c.cameras.iter().all(|cam| !cam.push_notifications)) => AnyResult::Ok(()), // Wait until PN are enabled => AnyResult::Ok(()), // Quit if PN is turned off + }; if r.is_err() { log::debug!("Issue with push notifier: {r:?}"); sleep(Duration::from_secs(5)).await;