Skip to content

Commit

Permalink
Merge #69
Browse files Browse the repository at this point in the history
69: Bump async dependencies r=eldruin a=oll3

Bumped tokio, mio and futures to recent versions. Took some inspiration from the previous gpio-cdev tokio work and also picked some parts from #60.

Not tested the mio parts but the tokio example (from #60) seems to be working.

Co-authored-by: Olle Sandberg <[email protected]>
Co-authored-by: Paul Osborne <[email protected]>
  • Loading branch information
3 people authored Sep 24, 2021
2 parents 12b60cf + 28b52c0 commit 4713a02
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 139 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
- Migrated to 'tokio' crate.
- Updated `nix` to version 0.22.
- Minimmum supported Rust version updated to 1.46.0.
- Updated `tokio`to version 1.
- Updated `mio` to version 0.7.
- Updated `futures` to version 0.3.

## [0.5.3] - 2018-04-19

Expand Down
14 changes: 11 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@ homepage = "https://github.com/rust-embedded/rust-sysfs-gpio"
documentation = "https://docs.rs/sysfs_gpio/"
description = "Provides access to GPIOs using the Linux sysfs interface."
readme = "README.md"
edition = "2018"

[features]
mio-evented = ["mio"]
async-tokio = ["futures", "tokio", "mio-evented"]

[dependencies]
futures = { version = "0.1", optional = true }
futures = { version = "0.3", optional = true }
nix = "0.22"
mio = { version = "0.6", optional = true }
tokio = { version = "0.1", optional = true }
mio = { version = "0.7", optional = true, features = ["os-ext"]}
tokio = { version = "1", optional = true, features = ["net"] }

[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

[[example]]
name = "tokio"
required-features = ["async-tokio"]
6 changes: 3 additions & 3 deletions examples/blinky.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ fn get_args() -> Option<Arguments> {
Err(_) => return None,
};
Some(Arguments {
pin: pin,
duration_ms: duration_ms,
period_ms: period_ms,
pin,
duration_ms,
period_ms,
})
}

Expand Down
67 changes: 24 additions & 43 deletions examples/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,44 @@
#[cfg(feature = "async-tokio")]
extern crate futures;
#[cfg(feature = "async-tokio")]
extern crate sysfs_gpio;
#[cfg(feature = "async-tokio")]
extern crate tokio;
// Copyright (c) 2020. The sysfs-gpio Authors.

#[cfg(feature = "async-tokio")]
use futures::future::join_all;
use futures::StreamExt;
use std::env;

#[cfg(feature = "async-tokio")]
use futures::{lazy, Future, Stream};

#[cfg(feature = "async-tokio")]
use sysfs_gpio::{Direction, Edge, Pin};

#[cfg(feature = "async-tokio")]
fn stream(pin_nums: Vec<u64>) -> sysfs_gpio::Result<()> {
async fn monitor_pin(pin: Pin) -> Result<(), sysfs_gpio::Error> {
pin.export()?;
pin.set_direction(Direction::In)?;
pin.set_edge(Edge::BothEdges)?;
let mut gpio_events = pin.get_value_stream()?;
while let Some(evt) = gpio_events.next().await {
let val = evt.unwrap();
println!("Pin {} changed value to {}", pin.get_pin_num(), val);
}
Ok(())
}

async fn stream(pin_nums: Vec<u64>) {
// NOTE: this currently runs forever and as such if
// the app is stopped (Ctrl-C), no cleanup will happen
// and the GPIO will be left exported. Not much
// can be done about this as Rust signal handling isn't
// really present at the moment. Revisit later.
let pins: Vec<_> = pin_nums.iter().map(|&p| (p, Pin::new(p))).collect();
let task = lazy(move || {
for &(i, ref pin) in pins.iter() {
pin.export().unwrap();
pin.set_direction(Direction::In).unwrap();
pin.set_edge(Edge::BothEdges).unwrap();
tokio::spawn(
pin.get_value_stream()
.unwrap()
.for_each(move |val| {
println!("Pin {} changed value to {}", i, val);
Ok(())
})
.map_err(|_| ()),
);
}
Ok(())
});
tokio::run(task);

Ok(())
join_all(pin_nums.into_iter().map(|p| {
let pin = Pin::new(p);
tokio::task::spawn(monitor_pin(pin))
}))
.await;
}

#[cfg(feature = "async-tokio")]
fn main() {
#[tokio::main]
async fn main() {
let pins: Vec<u64> = env::args()
.skip(1)
.map(|a| a.parse().expect("Pins must be specified as integers"))
.collect();
if pins.is_empty() {
println!("Usage: ./tokio <pin> [pin ...]");
} else {
stream(pins).unwrap();
stream(pins).await;
}
}

#[cfg(not(feature = "async-tokio"))]
fn main() {
println!("This example requires the `tokio` feature to be enabled.");
}
2 changes: 0 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#[cfg(not(target_os = "wasi"))]
use nix;
use std::convert;
use std::fmt;
use std::io;
Expand Down
139 changes: 51 additions & 88 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,29 @@ extern crate nix;
#[cfg(feature = "async-tokio")]
extern crate tokio;

use std::fs;
use std::fs::File;
use std::io;
use std::io::prelude::*;
#[cfg(any(target_os = "linux", target_os = "android", feature = "async-tokio"))]
use std::io::SeekFrom;
#[cfg(not(target_os = "wasi"))]
use std::os::unix::prelude::*;
use std::path::Path;
use std::{fs, fs::File};

#[cfg(feature = "async-tokio")]
use futures::{Async, Poll, Stream};
use futures::{ready, Stream};
#[cfg(feature = "mio-evented")]
use mio::unix::EventedFd;
use mio::event::Source;
#[cfg(feature = "mio-evented")]
use mio::Evented;
use mio::unix::SourceFd;
#[cfg(any(target_os = "linux", target_os = "android"))]
use nix::sys::epoll::*;
#[cfg(not(target_os = "wasi"))]
use nix::unistd::close;
#[cfg(feature = "async-tokio")]
use tokio::reactor::{Handle, PollEvented};
use std::task::Poll;
#[cfg(feature = "async-tokio")]
use tokio::io::unix::AsyncFd;

pub use error::Error;

Expand Down Expand Up @@ -472,17 +473,6 @@ impl Pin {
AsyncPinPoller::new(self.pin_num)
}

/// Get a Stream of pin interrupts for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
/// `set_edge()` before using this.
///
/// This method is only available when the `async-tokio` crate feature is enabled.
#[cfg(feature = "async-tokio")]
pub fn get_stream_with_handle(&self, handle: &Handle) -> Result<PinStream> {
PinStream::init_with_handle(*self, handle)
}

/// Get a Stream of pin interrupts for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
Expand All @@ -494,22 +484,6 @@ impl Pin {
PinStream::init(*self)
}

/// Get a Stream of pin values for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
/// `set_edge(Edge::BothEdges)` before using this.
///
/// Note that the values produced are the value of the pin as soon as we get to handling the
/// interrupt in userspace. Each time this stream produces a value, a change has occurred, but
/// it could end up producing the same value multiple times if the value has changed back
/// between when the interrupt occurred and when the value was read.
///
/// This method is only available when the `async-tokio` crate feature is enabled.
#[cfg(feature = "async-tokio")]
pub fn get_value_stream_with_handle(&self, handle: &Handle) -> Result<PinValueStream> {
Ok(PinValueStream(PinStream::init_with_handle(*self, handle)?))
}

/// Get a Stream of pin values for this pin
///
/// The PinStream object can be used with the `tokio` crate. You should probably call
Expand All @@ -536,9 +510,9 @@ fn extract_pin_fom_path_test() {
let tok3 = Pin::extract_pin_from_path(&"../../devices/soc0/gpiochip3/gpio/gpio124");
assert_eq!(124, tok3.unwrap());
let err1 = Pin::extract_pin_from_path(&"/sys/CLASS/gpio/gpio");
assert_eq!(true, err1.is_err());
assert!(err1.is_err());
let err2 = Pin::extract_pin_from_path(&"/sys/class/gpio/gpioSDS");
assert_eq!(true, err2.is_err());
assert!(err2.is_err());
}
#[cfg(not(target_os = "wasi"))]
#[derive(Debug)]
Expand Down Expand Up @@ -643,76 +617,70 @@ impl AsyncPinPoller {
}

#[cfg(feature = "mio-evented")]
impl Evented for AsyncPinPoller {
impl Source for AsyncPinPoller {
fn register(
&self,
poll: &mio::Poll,
&mut self,
poll: &mio::Registry,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
interest: mio::Interest,
) -> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).register(poll, token, interest, opts)
SourceFd(&self.as_raw_fd()).register(poll, token, interest)
}

fn reregister(
&self,
poll: &mio::Poll,
&mut self,
poll: &mio::Registry,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt,
interest: mio::Interest,
) -> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).reregister(poll, token, interest, opts)
SourceFd(&self.as_raw_fd()).reregister(poll, token, interest)
}

fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
EventedFd(&self.devfile.as_raw_fd()).deregister(poll)
fn deregister(&mut self, poll: &mio::Registry) -> io::Result<()> {
SourceFd(&self.as_raw_fd()).deregister(poll)
}
}

#[cfg(feature = "async-tokio")]
pub struct PinStream {
evented: PollEvented<AsyncPinPoller>,
skipped_first_event: bool,
#[cfg(any(feature = "async-tokio", feature = "mio-evented"))]
impl AsRawFd for AsyncPinPoller {
fn as_raw_fd(&self) -> RawFd {
self.devfile.as_raw_fd()
}
}

#[cfg(feature = "async-tokio")]
impl PinStream {
pub fn init_with_handle(pin: Pin, handle: &Handle) -> Result<Self> {
Ok(PinStream {
evented: PollEvented::new(pin.get_async_poller()?, handle)?,
skipped_first_event: false,
})
}
pub struct PinStream {
evented: AsyncFd<AsyncPinPoller>,
skipped_first_event: bool,
}

#[cfg(feature = "async-tokio")]
impl PinStream {
pub fn init(pin: Pin) -> Result<Self> {
Ok(PinStream {
evented: PollEvented::new(pin.get_async_poller()?, &Handle::default())?,
evented: AsyncFd::new(pin.get_async_poller()?)?,
skipped_first_event: false,
})
}
}

#[cfg(feature = "async-tokio")]
impl Stream for PinStream {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Ok(match self.evented.poll_read() {
Async::Ready(()) => {
self.evented.need_read()?;
if self.skipped_first_event {
Async::Ready(Some(()))
} else {
self.skipped_first_event = true;
Async::NotReady
}
type Item = Result<()>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
let mut guard = ready!(self.evented.poll_read_ready(cx))?;
guard.clear_ready();
if self.skipped_first_event {
return Poll::Ready(Some(Ok(())));
} else {
self.skipped_first_event = true;
}
Async::NotReady => Async::NotReady,
})
}
}
}

Expand All @@ -729,18 +697,13 @@ impl PinValueStream {

#[cfg(feature = "async-tokio")]
impl Stream for PinValueStream {
type Item = u8;
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.0.poll() {
Ok(Async::Ready(Some(()))) => {
let value = self.get_value()?;
Ok(Async::Ready(Some(value)))
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
type Item = Result<u8>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
ready!(std::pin::Pin::new(&mut self.0).poll_next(cx));
Poll::Ready(Some(Ok(self.get_value()?)))
}
}

0 comments on commit 4713a02

Please sign in to comment.