diff --git a/src/io/dmabuf/stream.rs b/src/io/dmabuf/stream.rs index d865e11..ba981e9 100644 --- a/src/io/dmabuf/stream.rs +++ b/src/io/dmabuf/stream.rs @@ -132,6 +132,22 @@ impl<'a> Stream where Self: CaptureStream<'a> { let meta = self.buf_meta[index]; Ok((buf, meta)) } + + /// Waits for a buffer to be ready + pub fn wait(&self) -> Result<(), io::Error>{ + if self.handle.poll(libc::POLLIN, -1)? == 0 { + // This condition can only happen if there was a timeout. + // A timeout is only possible if the `timeout` value is non-zero, meaning we should + // propagate it to the caller. + return Err(io::Error::new(io::ErrorKind::TimedOut, "Blocking poll")); + } + Ok(()) + } + + /// Waits for a buffer to be ready + pub fn is_ready(&self) -> Result{ + Ok(self.handle.poll(libc::POLLIN, 0)? > 0) + } } impl Drop for Stream { @@ -236,7 +252,7 @@ impl<'a> CaptureStream<'a> for Stream { Ok(self.arena_index) } - fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata)> { + fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata, bool)> { if !self.active { // Enqueue all buffers once on stream start for index in 0..self.arena.bufs.len() { @@ -254,6 +270,6 @@ impl<'a> CaptureStream<'a> for Stream { // will always be valid. let bytes = &mut self.arena.bufs[self.arena_index].as_ref().unwrap(); let meta = &self.buf_meta[self.arena_index]; - Ok((bytes, meta)) + Ok((bytes, meta, self.is_ready()?)) } } diff --git a/src/io/mmap/stream.rs b/src/io/mmap/stream.rs index cd666bd..25df5dd 100644 --- a/src/io/mmap/stream.rs +++ b/src/io/mmap/stream.rs @@ -187,7 +187,7 @@ impl<'a, 'b> CaptureStream<'b> for Stream<'a> { Ok(self.arena_index) } - fn next(&'b mut self) -> io::Result<(&Self::Item, &Metadata)> { + fn next(&'b mut self) -> io::Result<(&Self::Item, &Metadata, bool)> { if !self.active { // Enqueue all buffers once on stream start for index in 0..self.arena.bufs.len() { @@ -205,7 +205,7 @@ impl<'a, 'b> CaptureStream<'b> for Stream<'a> { // will always be valid. let bytes = &self.arena.bufs[self.arena_index]; let meta = &self.buf_meta[self.arena_index]; - Ok((bytes, meta)) + Ok((bytes, meta, false)) } } diff --git a/src/io/traits.rs b/src/io/traits.rs index 8234016..b98cd61 100644 --- a/src/io/traits.rs +++ b/src/io/traits.rs @@ -22,7 +22,8 @@ pub trait CaptureStream<'a>: Stream { /// Fetch a new frame by first queueing and then dequeueing. /// First time initialization is performed if necessary. - fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata)>; + /// The last item in the tuple is True if another frame is pending. + fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata, bool)>; } pub trait OutputStream<'a>: Stream { diff --git a/src/io/userptr/stream.rs b/src/io/userptr/stream.rs index 5261806..b833825 100644 --- a/src/io/userptr/stream.rs +++ b/src/io/userptr/stream.rs @@ -191,7 +191,7 @@ impl<'a> CaptureStream<'a> for Stream { Ok(self.arena_index) } - fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata)> { + fn next(&'a mut self) -> io::Result<(&Self::Item, &Metadata, bool)> { if !self.active { // Enqueue all buffers once on stream start for index in 0..self.arena.bufs.len() { @@ -209,6 +209,6 @@ impl<'a> CaptureStream<'a> for Stream { // will always be valid. let bytes = &mut self.arena.bufs[self.arena_index]; let meta = &self.buf_meta[self.arena_index]; - Ok((bytes, meta)) + Ok((bytes, meta, false)) } }