diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 13d3de1a..e20f6b71 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,6 +5,9 @@ on: pull_request: workflow_dispatch: +env: + rust_version: "1.82.0" + jobs: pre_job: # continue-on-error: true # Uncomment once integration is finished @@ -33,17 +36,17 @@ jobs: - uses: actions/checkout@v4 name: Checkout onto ${{ runner.os }} - if: runner.os == 'Linux' - name: apt install gstreamer + name: apt install linux deps run: | sudo apt update sudo apt install -y aptitude sudo aptitude install -y libgstrtspserver-1.0-dev libgstreamer1.0-dev libgtk2.0-dev protobuf-compiler libssl-dev - if: runner.os == 'Windows' - name: Install Gstreamer + name: Install Windows deps run: | # Gstreamer - choco install -y --no-progress gstreamer --version=1.20.0 - choco install -y --no-progress gstreamer-devel --version=1.20.0 + choco install -y --no-progress gstreamer --version=1.24.2 + choco install -y --no-progress gstreamer-devel --version=1.24.2 $env:GSTREAMER_1_0_ROOT_MSVC_X86_64=$env:SYSTEMDRIVE + '\gstreamer\1.0\msvc_x86_64\' # Github runners work on both C or D drive and figuring out which was used is difficult if (-not (Test-Path -Path "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" -PathType Container)) { @@ -71,21 +74,17 @@ jobs: $env:OPENSSL_DIR='D:\\Program Files\OpenSSL\' } - # Set up pkgconfig for gstreamer - $env:PKG_CONFIG_PATH += ';' + $env:GSTREAMER_1_0_ROOT_MSVC_X86_64 + '\lib\pkgconfig' - # Set github vars Add-Content -Path $env:GITHUB_ENV -Value "GSTREAMER_1_0_ROOT_MSVC_X86_64=$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" Add-Content -Path $env:GITHUB_PATH -Value "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64\bin" Add-Content -Path $env:GITHUB_PATH -Value "%GSTREAMER_1_0_ROOT_MSVC_X86_64%\bin" Add-Content -Path $env:GITHUB_ENV -Value "OPENSSL_DIR=$env:OPENSSL_DIR" - Add-Content -Path $env:GITHUB_ENV -Value "PKG_CONFIG_PATH=$env:PKG_CONFIG_PATH" # One last check on directories dir "$env:GSTREAMER_1_0_ROOT_MSVC_X86_64" dir "$env:OPENSSL_DIR" - if: runner.os == 'macOS' - name: Install Gstreamer on macOS + name: Install macOS deps run: | curl -L 'https://gstreamer.freedesktop.org/data/pkg/osx/1.20.4/gstreamer-1.0-devel-1.20.4-universal.pkg' -o "$(pwd)/gstreamer-devel.pkg" sudo installer -verbose -pkg "$(pwd)/gstreamer-devel.pkg" -target / @@ -182,6 +181,7 @@ jobs: echo "${HOME}/.cargo/bin" >> "${GITHUB_PATH}" - name: Install ${{ matrix.arch }} Rust toolchain run: | + rustup default "${{ env.rust_version }}" rustup target add ${TARGET} env: TARGET: ${{ matrix.target }} @@ -286,8 +286,9 @@ jobs: echo "TAGS=${tagstr}" >> "${GITHUB_OUTPUT}" env: REPO_NAME: ${{ steps.docker_repo.outputs.DOCKER_NWO }} - - name: Install latest rust + - name: Install rust run: | + rustup default "${{ env.rust_version }}" rustup toolchain install stable - name: Install toml-cli run: | @@ -345,8 +346,9 @@ jobs: steps: - name: Checkout code uses: actions/checkout@v4 - - name: Install latest rust + - name: Install rust run: | + rustup default "${{ env.rust_version }}" rustup toolchain install stable - name: Install toml-cli run: | diff --git a/.github/workflows/style_checks.yml b/.github/workflows/style_checks.yml index 0200d1e7..4d068d52 100644 --- a/.github/workflows/style_checks.yml +++ b/.github/workflows/style_checks.yml @@ -35,9 +35,13 @@ jobs: rustup override set nightly - name: Run clippy manually run: | + echo "All Features" cargo +nightly clippy --workspace --all-targets --all-features || exit 1 + echo "No Features" cargo +nightly clippy --workspace --all-targets --no-default-features || exit 1 + echo "Gstreamer Only" cargo +nightly clippy --workspace --all-targets --no-default-features --features=gstreamer || exit 1 + echo "Pushnoti Only" cargo +nightly clippy --workspace --all-targets --no-default-features --features=pushnoti || exit 1 check_fmt: diff --git a/Cargo.lock b/Cargo.lock index 567f61f1..c0e32a35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -839,25 +839,6 @@ dependencies = [ "webpki-roots", ] -[[package]] -name = "fcm-push-listener" -version = "3.0.0" -dependencies = [ - "base64 0.21.7", - "ece", - "log", - "prost", - "prost-build", - "rand", - "reqwest", - "serde", - "serde_with", - "tokio", - "tokio-rustls 0.23.4", - "uuid", - "webpki-roots", -] - [[package]] name = "fixedbitset" version = "0.4.2" @@ -1895,7 +1876,7 @@ dependencies = [ "crossbeam-channel", "dirs", "env_logger 0.11.3", - "fcm-push-listener 2.0.3", + "fcm-push-listener", "futures", "gstreamer", "gstreamer-app", @@ -2339,7 +2320,7 @@ dependencies = [ "anyhow", "clap", "env_logger 0.10.2", - "fcm-push-listener 2.0.3", + "fcm-push-listener", "lazy_static", "log", "neolink_core", diff --git a/crates/core/src/bc/de.rs b/crates/core/src/bc/de.rs index d8da749c..182bcc7c 100644 --- a/crates/core/src/bc/de.rs +++ b/crates/core/src/bc/de.rs @@ -90,10 +90,8 @@ fn bc_modern_msg<'a>( E::add_context(input, ctx, E::from_error_kind(input, kind)) } - let ext_len = match header.payload_offset { - Some(off) => off, - _ => 0, // If missing payload_offset treat all as payload - }; + // If missing payload_offset treat all as payload + let ext_len = header.payload_offset.unwrap_or_default(); let (buf, ext_buf) = take(ext_len)(buf)?; let payload_len = header.body_len - ext_len; diff --git a/crates/core/src/bc/model.rs b/crates/core/src/bc/model.rs index 2e8dd052..6449fb3c 100644 --- a/crates/core/src/bc/model.rs +++ b/crates/core/src/bc/model.rs @@ -192,8 +192,11 @@ pub struct BcMeta { pub stream_type: u8, /// On modern messages this is the response code /// When sending a command it is set to `0`. The reply from the camera can be + /// /// - `200` for OK + /// /// - `400` for bad request + /// /// A malformed packet will return a `400` code pub response_code: u16, /// A message ID is used to match replies with requests. The camera will parrot back @@ -213,14 +216,6 @@ pub struct BcMeta { pub class: u16, } -/// The components of the Baichuan header that must be filled out after the body is serialized, or -/// is needed for the deserialization of the body (strictly part of the wire format of the message) -#[derive(Debug, PartialEq, Eq)] -pub(super) struct BcSendInfo { - pub body_len: u32, - pub payload_offset: Option, -} - #[derive(Debug)] pub(crate) struct BcContext { pub(crate) credentials: Credentials, diff --git a/crates/core/src/bc_protocol/connection/bcsub.rs b/crates/core/src/bc_protocol/connection/bcsub.rs index 89335352..6f8d21c3 100644 --- a/crates/core/src/bc_protocol/connection/bcsub.rs +++ b/crates/core/src/bc_protocol/connection/bcsub.rs @@ -20,9 +20,9 @@ pub struct BcStream<'a> { rx: &'a mut ReceiverStream>, } -impl<'a> Unpin for BcStream<'a> {} +impl Unpin for BcStream<'_> {} -impl<'a> Stream for BcStream<'a> { +impl Stream for BcStream<'_> { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { diff --git a/crates/core/src/bc_protocol/resolution.rs b/crates/core/src/bc_protocol/resolution.rs index 17e99b2b..1fb308bc 100644 --- a/crates/core/src/bc_protocol/resolution.rs +++ b/crates/core/src/bc_protocol/resolution.rs @@ -214,7 +214,7 @@ impl ToSocketAddrsOrUid for SocketAddrV6 { } } -impl<'a> ToSocketAddrsOrUid for &'a [SocketAddr] { +impl ToSocketAddrsOrUid for &'_ [SocketAddr] { type UidIter = std::vec::IntoIter; fn to_socket_addrs_or_uid(&self) -> Result { diff --git a/crates/core/src/bcudp/codex.rs b/crates/core/src/bcudp/codex.rs index fb54e6ec..d612dc43 100644 --- a/crates/core/src/bcudp/codex.rs +++ b/crates/core/src/bcudp/codex.rs @@ -34,15 +34,6 @@ impl Decoder for BcUdpCodex { type Item = BcUdp; type Error = Error; - /// Since frames can cross EOF boundaries we overload this so it doesn't error if - /// there are bytes left on the stream - // fn decode_eof(&mut self, buf: &mut BytesMut) -> Result> { - // match self.decode(buf)? { - // Some(frame) => Ok(Some(frame)), - // None => Ok(None), - // } - // } - fn decode(&mut self, src: &mut BytesMut) -> Result> { log::trace!("Decoding:"); if src.is_empty() { diff --git a/crates/mailnoti/src/main.rs b/crates/mailnoti/src/main.rs index 4ffc54e2..a223a733 100644 --- a/crates/mailnoti/src/main.rs +++ b/crates/mailnoti/src/main.rs @@ -53,14 +53,6 @@ async fn main() -> Result<()> { Ok(()) } -fn get_local_ip() -> Result { - get_if_addrs::get_if_addrs()? - .iter() - .find(|i| !i.is_loopback() && matches!(i.addr, get_if_addrs::IfAddr::V4(_))) - .map(|iface| Ok(iface.ip())) - .unwrap_or_else(|| Err(anyhow!("No Local Ip Address Found"))) -} - async fn cam_tasks(name: &str, camera: BcCamera, addr: SocketAddr) -> Result<()> { let support = camera.get_support().await?; if support.email.is_some_and(|v| v > 0) { diff --git a/src/common/instance.rs b/src/common/instance.rs index f3ecfc6c..25db1a72 100644 --- a/src/common/instance.rs +++ b/src/common/instance.rs @@ -5,28 +5,25 @@ //! The camera watch is used as an event to be triggered //! whenever the camera is lost/updated use anyhow::{anyhow, Context}; -use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt}; +use futures::TryFutureExt; use std::sync::{Arc, Weak}; -#[cfg(feature = "pushnoti")] -use tokio::sync::watch::channel as watch; use tokio::{ sync::{ - mpsc::Receiver as MpscReceiver, mpsc::Sender as MpscSender, oneshot::channel as oneshot, - watch::Receiver as WatchReceiver, + mpsc::Sender as MpscSender, oneshot::channel as oneshot, watch::Receiver as WatchReceiver, }, time::{sleep, Duration}, }; use tokio_util::sync::CancellationToken; -#[cfg(feature = "pushnoti")] -use super::PushNoti; -use super::{MdState, NeoCamCommand, NeoCamThreadState, Permit, UseCounter}; +use super::{MdState, NeoCamCommand, NeoCamThreadState, Permit}; use crate::{config::CameraConfig, AnyResult, Result}; -use neolink_core::{ - bc_protocol::{BcCamera, StreamKind}, - bcmedia::model::BcMedia, -}; +use neolink_core::bc_protocol::BcCamera; + #[cfg(feature = "gstreamer")] +mod gst; + +#[cfg(feature = "pushnoti")] +mod pushnoti; /// This instance is the primary interface used throughout the app /// @@ -233,50 +230,6 @@ impl NeoInstance { } } - #[cfg(feature = "pushnoti")] - pub(crate) async fn uid(&self) -> Result { - let (reply_tx, reply_rx) = oneshot(); - self.camera_control - .send(NeoCamCommand::GetUid(reply_tx)) - .await?; - Ok(reply_rx.await?) - } - - #[cfg(feature = "pushnoti")] - pub(crate) async fn push_notifications(&self) -> Result>> { - let uid = self.uid().await?; - let (instance_tx, instance_rx) = oneshot(); - self.camera_control - .send(NeoCamCommand::PushNoti(instance_tx)) - .await?; - let mut source_watch = instance_rx.await?; - - let (fwatch_tx, fwatch_rx) = watch(None); - tokio::task::spawn(async move { - loop { - match source_watch - .wait_for(|i| { - fwatch_tx.borrow().as_ref() != i.as_ref() - && i.as_ref() - .is_some_and(|i| i.message.contains(&format!("\"{uid}\""))) - }) - .await - { - Ok(pn) => { - log::trace!("Forwarding push notification about {}", uid); - let _ = fwatch_tx.send_replace(pn.clone()); - } - Err(e) => { - break Err(e); - } - } - }?; - AnyResult::Ok(()) - }); - - Ok(fwatch_rx) - } - pub(crate) async fn motion(&self) -> Result> { let (instance_tx, instance_rx) = oneshot(); self.camera_control @@ -346,202 +299,6 @@ impl NeoInstance { timeout, } } - - /// Streams a camera source while not paused - pub(crate) async fn stream_while_live( - &self, - stream: StreamKind, - ) -> AnyResult> { - let config = self.config().await?.borrow().clone(); - let name = config.name.clone(); - - let media_rx = if config.pause.on_motion { - let (media_tx, media_rx) = tokio::sync::mpsc::channel(100); - let counter = UseCounter::new().await; - - let mut md = self.motion().await?; - let mut tasks = FuturesUnordered::new(); - // Stream for 5s on a new client always - // This lets us negotiate the camera stream type - let init_permit = counter.create_activated().await?; - tokio::spawn( - async { - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - drop(init_permit); - } - .map(|e| { - log::debug!("Init permit thread stopped {e:?}"); - e - }), - ); - - // Create the permit for controlling the motion - let mut md_permit = { - let md_state = md.borrow_and_update().clone(); - match md_state { - MdState::Start(_) => { - log::info!("{name}::{stream:?}: Starting with Motion"); - counter.create_activated().await? - } - MdState::Stop(_) | MdState::Unknown => { - log::info!("{name}::{stream:?}: Waiting with Motion"); - counter.create_deactivated().await? - } - } - }; - // Now listen to the motion - let thread_name = name.clone(); - tasks.push(tokio::spawn( - async move { - loop { - match md.changed().await { - Ok(_) => { - let md_state: MdState = md.borrow_and_update().clone(); - match md_state { - MdState::Start(_) => { - log::info!("{thread_name}::{stream:?}: Motion Started"); - md_permit.activate().await?; - } - MdState::Stop(_) => { - log::info!("{thread_name}::{stream:?}: Motion Stopped"); - md_permit.deactivate().await?; - } - MdState::Unknown => {} - } - } - Err(e) => { - // Use break here so we can define the full type on the async closure - break AnyResult::Err(e.into()); - } - } - }?; - AnyResult::Ok(()) - } - .map(|e| { - log::debug!("Motion thread stopped {e:?}"); - e - }), - )); - - #[cfg(pushnoti)] - { - // Creates a permit for controlling based on the PN - let pn_permit = counter.create_deactivated().await?; - let mut pn = self.push_notifications().await?; - pn.borrow_and_update(); // Ignore any PNs that have already been sent before this - let thread_name = name.clone(); - tasks.push(tokio::spawn( - async move { - loop { - let noti: Option = pn.borrow_and_update().clone(); - if let Some(noti) = noti { - if noti.message.contains("Motion Alert from") { - log::info!( - "{thread_name}::{stream:?}: Push Notification Recieved" - ); - let mut new_pn_permit = pn_permit.subscribe(); - new_pn_permit.activate().await?; - tokio::spawn(async move { - tokio::time::sleep(tokio::time::Duration::from_secs(30)) - .await; - drop(new_pn_permit); - }); - } - } - if let Err(e) = pn.changed().await { - break Err(e); - } - }?; - AnyResult::Ok(()) - } - .map(|e| { - log::debug!("PN thread stopped {e:?}"); - e - }), - )); - } - - // Send the camera when the pemit is active - let camera_permit = counter.create_deactivated().await?; - let thread_camera = self.clone(); - tokio::spawn( - async move { - loop { - if let Err(e) = camera_permit.aquired_users().await { - break AnyResult::Err(e); - } - log::debug!("Starting stream"); - tokio::select! { - v = camera_permit.dropped_users() => { - log::debug!("Dropped users: {v:?}"); - v - }, - v = async { - log::debug!("Getting stream"); - let mut stream = thread_camera.stream(stream).await?; - log::debug!("Got stream"); - while let Some(media) = stream.recv().await { - media_tx.send(media).await?; - } - AnyResult::Ok(()) - } => { - log::debug!("Stopped stream: {v:?}"); - v - }, - v = tasks.next() => { - log::debug!("Task failed: {v:?}"); - Err(anyhow!("Task ended prematurly: {v:?}")) - } - }?; - log::debug!("Pausing stream"); - }?; - drop(counter); // Make sure counter is owned by this thread - AnyResult::Ok(()) - } - .map(|e| { - log::debug!("Stream thread stopped {e:?}"); - e - }), - ); - - Ok(media_rx) - } else { - self.stream(stream).await - }?; - - Ok(media_rx) - } - - /// Streams a camera source - pub(crate) async fn stream(&self, stream: StreamKind) -> AnyResult> { - let (media_tx, media_rx) = tokio::sync::mpsc::channel(100); - let config = self.config().await?.borrow().clone(); - let strict = config.strict; - let thread_camera = self.clone(); - tokio::task::spawn( - tokio::task::spawn(async move { - thread_camera - .run_task(move |cam| { - let media_tx = media_tx.clone(); - Box::pin(async move { - let mut media_stream = cam.start_video(stream, 0, strict).await?; - log::trace!("Camera started"); - while let Ok(media) = media_stream.get_data().await? { - media_tx.send(media).await?; - } - AnyResult::Ok(()) - }) - }) - .await - }) - .and_then(|res| async move { - log::debug!("Camera finished streaming: {res:?}"); - Ok(()) - }), - ); - - Ok(media_rx) - } } // A task that is run on a camera when the structure is dropped diff --git a/src/common/instance/gst.rs b/src/common/instance/gst.rs new file mode 100644 index 00000000..7dccd3bf --- /dev/null +++ b/src/common/instance/gst.rs @@ -0,0 +1,207 @@ +use super::*; + +use crate::common::UseCounter; +use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use neolink_core::{bc_protocol::StreamKind, bcmedia::model::BcMedia}; +use tokio::sync::mpsc::Receiver as MpscReceiver; + +#[cfg(feature = "pushnoti")] +use crate::common::PushNoti; + +impl NeoInstance { + /// Streams a camera source while not paused + pub(crate) async fn stream_while_live( + &self, + stream: StreamKind, + ) -> AnyResult> { + let config = self.config().await?.borrow().clone(); + let name = config.name.clone(); + + let media_rx = if config.pause.on_motion { + let (media_tx, media_rx) = tokio::sync::mpsc::channel(100); + let counter = UseCounter::new().await; + + let mut md = self.motion().await?; + let mut tasks = FuturesUnordered::new(); + // Stream for 5s on a new client always + // This lets us negotiate the camera stream type + let init_permit = counter.create_activated().await?; + tokio::spawn( + async { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + drop(init_permit); + } + .map(|e| { + log::debug!("Init permit thread stopped {e:?}"); + e + }), + ); + + // Create the permit for controlling the motion + let mut md_permit = { + let md_state = md.borrow_and_update().clone(); + match md_state { + MdState::Start(_) => { + log::info!("{name}::{stream:?}: Starting with Motion"); + counter.create_activated().await? + } + MdState::Stop(_) | MdState::Unknown => { + log::info!("{name}::{stream:?}: Waiting with Motion"); + counter.create_deactivated().await? + } + } + }; + // Now listen to the motion + let thread_name = name.clone(); + tasks.push(tokio::spawn( + async move { + loop { + match md.changed().await { + Ok(_) => { + let md_state: MdState = md.borrow_and_update().clone(); + match md_state { + MdState::Start(_) => { + log::info!("{thread_name}::{stream:?}: Motion Started"); + md_permit.activate().await?; + } + MdState::Stop(_) => { + log::info!("{thread_name}::{stream:?}: Motion Stopped"); + md_permit.deactivate().await?; + } + MdState::Unknown => {} + } + } + Err(e) => { + // Use break here so we can define the full type on the async closure + break AnyResult::Err(e.into()); + } + } + }?; + AnyResult::Ok(()) + } + .map(|e| { + log::debug!("Motion thread stopped {e:?}"); + e + }), + )); + + #[cfg(feature = "pushnoti")] + { + // Creates a permit for controlling based on the PN + let pn_permit = counter.create_deactivated().await?; + let mut pn = self.push_notifications().await?; + pn.borrow_and_update(); // Ignore any PNs that have already been sent before this + let thread_name = name.clone(); + tasks.push(tokio::spawn( + async move { + loop { + let noti: Option = pn.borrow_and_update().clone(); + if let Some(noti) = noti { + if noti.message.contains("Motion Alert from") { + log::info!( + "{thread_name}::{stream:?}: Push Notification Recieved" + ); + let mut new_pn_permit = pn_permit.subscribe(); + new_pn_permit.activate().await?; + tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_secs(30)) + .await; + drop(new_pn_permit); + }); + } + } + if let Err(e) = pn.changed().await { + break Err(e); + } + }?; + AnyResult::Ok(()) + } + .map(|e| { + log::debug!("PN thread stopped {e:?}"); + e + }), + )); + } + + // Send the camera when the pemit is active + let camera_permit = counter.create_deactivated().await?; + let thread_camera = self.clone(); + tokio::spawn( + async move { + loop { + if let Err(e) = camera_permit.aquired_users().await { + break AnyResult::Err(e); + } + log::debug!("Starting stream"); + tokio::select! { + v = camera_permit.dropped_users() => { + log::debug!("Dropped users: {v:?}"); + v + }, + v = async { + log::debug!("Getting stream"); + let mut stream = thread_camera.stream(stream).await?; + log::debug!("Got stream"); + while let Some(media) = stream.recv().await { + media_tx.send(media).await?; + } + AnyResult::Ok(()) + } => { + log::debug!("Stopped stream: {v:?}"); + v + }, + v = tasks.next() => { + log::debug!("Task failed: {v:?}"); + Err(anyhow!("Task ended prematurly: {v:?}")) + } + }?; + log::debug!("Pausing stream"); + }?; + drop(counter); // Make sure counter is owned by this thread + AnyResult::Ok(()) + } + .map(|e| { + log::debug!("Stream thread stopped {e:?}"); + e + }), + ); + + Ok(media_rx) + } else { + self.stream(stream).await + }?; + + Ok(media_rx) + } + + /// Streams a camera source + pub(crate) async fn stream(&self, stream: StreamKind) -> AnyResult> { + let (media_tx, media_rx) = tokio::sync::mpsc::channel(100); + let config = self.config().await?.borrow().clone(); + let strict = config.strict; + let thread_camera = self.clone(); + tokio::task::spawn( + tokio::task::spawn(async move { + thread_camera + .run_task(move |cam| { + let media_tx = media_tx.clone(); + Box::pin(async move { + let mut media_stream = cam.start_video(stream, 0, strict).await?; + log::trace!("Camera started"); + while let Ok(media) = media_stream.get_data().await? { + media_tx.send(media).await?; + } + AnyResult::Ok(()) + }) + }) + .await + }) + .and_then(|res| async move { + log::debug!("Camera finished streaming: {res:?}"); + Ok(()) + }), + ); + + Ok(media_rx) + } +} diff --git a/src/common/instance/pushnoti.rs b/src/common/instance/pushnoti.rs new file mode 100644 index 00000000..89722fcd --- /dev/null +++ b/src/common/instance/pushnoti.rs @@ -0,0 +1,48 @@ +use super::*; + +use crate::common::PushNoti; +use tokio::sync::watch::channel as watch; + +impl NeoInstance { + pub(crate) async fn uid(&self) -> Result { + let (reply_tx, reply_rx) = oneshot(); + self.camera_control + .send(NeoCamCommand::GetUid(reply_tx)) + .await?; + Ok(reply_rx.await?) + } + + pub(crate) async fn push_notifications(&self) -> Result>> { + let uid = self.uid().await?; + let (instance_tx, instance_rx) = oneshot(); + self.camera_control + .send(NeoCamCommand::PushNoti(instance_tx)) + .await?; + let mut source_watch = instance_rx.await?; + + let (fwatch_tx, fwatch_rx) = watch(None); + tokio::task::spawn(async move { + loop { + match source_watch + .wait_for(|i| { + fwatch_tx.borrow().as_ref() != i.as_ref() + && i.as_ref() + .is_some_and(|i| i.message.contains(&format!("\"{uid}\""))) + }) + .await + { + Ok(pn) => { + log::trace!("Forwarding push notification about {}", uid); + let _ = fwatch_tx.send_replace(pn.clone()); + } + Err(e) => { + break Err(e); + } + } + }?; + AnyResult::Ok(()) + }); + + Ok(fwatch_rx) + } +} diff --git a/src/common/neocam.rs b/src/common/neocam.rs index 22a22db7..8633eedd 100644 --- a/src/common/neocam.rs +++ b/src/common/neocam.rs @@ -27,7 +27,7 @@ use super::{ use super::{PnRequest, PushNoti}; use crate::{config::CameraConfig, AnyResult, Result}; use neolink_core::bc_protocol::BcCamera; -#[cfg(feature = "gstreamer")] + #[allow(dead_code)] pub(crate) enum NeoCamCommand { HangUp, @@ -81,7 +81,6 @@ impl NeoCam { // other threads let sender_cancel = me.cancel.clone(); let mut commander_rx = ReceiverStream::new(commander_rx); - #[cfg(feature = "gstreamer")] let thread_commander_tx = commander_tx.clone(); let thread_watch_config_rx = watch_config_rx.clone(); #[cfg(feature = "pushnoti")] diff --git a/src/mqtt/mqttc.rs b/src/mqtt/mqttc.rs index c7f8d06f..f82ec709 100644 --- a/src/mqtt/mqttc.rs +++ b/src/mqtt/mqttc.rs @@ -339,7 +339,7 @@ impl<'a> MqttBackend<'a> { } } -impl<'a> Drop for MqttBackend<'a> { +impl Drop for MqttBackend<'_> { fn drop(&mut self) { self.cancel.cancel(); }