Skip to content

Commit

Permalink
Make the Gate::close synchronous returning future
Browse files Browse the repository at this point in the history
It's useful if the pattern you want to do is to initiate a close and
then wait for it completing later. Otherwise you sometimes have to jump
through hoops.
  • Loading branch information
vlovich committed May 11, 2024
1 parent 4568083 commit 79b9f60
Showing 1 changed file with 59 additions and 13 deletions.
72 changes: 59 additions & 13 deletions glommio/src/sync/gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use futures_lite::Future;

use crate::{
channels::local_channel::{self, LocalSender},
channels::local_channel::{self, LocalReceiver, LocalSender},
GlommioError, ResourceType, Task, TaskQueueHandle,
};

Expand Down Expand Up @@ -83,11 +83,14 @@ impl Gate {
)
}

/// Close the gate, and wait for all spawned tasks to complete. If the gate is currently closing, this will wait
/// for it to close before returning a success. This is particularly useful if you might have a timeout on the close
/// - the would otherwise be no safe way to retry & wait for remaining tasks to finish.
pub async fn close(&self) -> Result<(), GlommioError<()>> {
self.inner.close().await
/// Close the gate, and return a waiter for all spawned tasks to complete. If the gate is currently closing, the
/// returned future will wait for it to close before returning a success. This is particularly useful if you might
/// have a timeout on the close - the would otherwise be no safe way to retry & wait for remaining tasks to finish.
///
/// NOTE: After this function returns, the gate is marked as closed/closing any subsequent attempts to acquire a
/// pass will fail, even if you drop the future.
pub fn close(&self) -> impl Future<Output = Result<(), GlommioError<()>>> {
self.inner.close()
}

/// Whether the gate is open or not.
Expand All @@ -105,6 +108,9 @@ impl Gate {
}
}

type PreviousWaiter = Option<LocalSender<bool>>;
type CurrentClosure = LocalReceiver<bool>;

#[derive(Debug)]
struct GateInner {
count: Cell<usize>,
Expand Down Expand Up @@ -135,15 +141,30 @@ impl GateInner {
}
}

pub async fn close(&self) -> Result<(), GlommioError<()>> {
async fn wait_for_closure(
waiter: Result<Option<(CurrentClosure, PreviousWaiter)>, GlommioError<()>>,
) -> Result<(), GlommioError<()>> {
if let Some((waiter, previous_closer)) = waiter? {
waiter.recv().await;
if let Some(previous_closer) = previous_closer {
// Previous channel may be dropped so ignore the result.
let _ = previous_closer.try_send(true);
}
}

Ok(())
}

pub fn close(&self) -> impl Future<Output = Result<(), GlommioError<()>>> {
match self.state.replace(State::Closed) {
State::Open => {
if self.count.get() != 0 {
let (sender, receiver) = local_channel::new_bounded(1);
self.state.replace(State::Closing(sender));
receiver.recv().await;
Self::wait_for_closure(Ok(Some((receiver, None))))
} else {
Self::wait_for_closure(Ok(None))
}
Ok(())
}
State::Closing(previous_closer) => {
assert!(
Expand All @@ -158,11 +179,9 @@ impl GateInner {
let (sender, receiver) = local_channel::new_bounded(1);
self.state.replace(State::Closing(sender));

receiver.recv().await;
let _ = previous_closer.try_send(true);
Ok(())
Self::wait_for_closure(Ok(Some((receiver, Some(previous_closer)))))
}
State::Closed => Err(GlommioError::Closed(ResourceType::Gate)),
State::Closed => Self::wait_for_closure(Err(GlommioError::Closed(ResourceType::Gate))),
}
}

Expand Down Expand Up @@ -324,4 +343,31 @@ mod tests {
gate.close().await.unwrap();
})
}

#[test]
fn test_marked_closed_without_waiting() {
LocalExecutor::default().run(async {
let gate = Gate::new();
// Even if task is immediately cancelled, the gate still closes.
drop(gate.close());
assert!(gate.is_closed());

let gate = Gate::new();
let pass = gate.enter().unwrap();
// Even if task is cancelled, the gate is still marked as closing.
drop(gate.close());
assert!(!gate.is_open());
assert!(!gate.is_closed());
// Here we install a waiter after the aborted cancel.
let wait_for_closure = gate.close();
join!(
async move {
drop(pass);
},
async move {
wait_for_closure.await.unwrap();
}
);
})
}
}

0 comments on commit 79b9f60

Please sign in to comment.