Skip to content

Commit

Permalink
Make the Gate::close synchronous returning future
Browse files Browse the repository at this point in the history
It's useful if the pattern you want to do is to initiate a close and
then wait for it completing later. Otherwise you sometimes have to jump
through hoops.
  • Loading branch information
vlovich committed May 11, 2024
1 parent 4568083 commit 43b5ec3
Showing 1 changed file with 53 additions and 10 deletions.
63 changes: 53 additions & 10 deletions glommio/src/sync/gate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
use futures_lite::Future;

use crate::{
channels::local_channel::{self, LocalSender},
channels::local_channel::{self, LocalReceiver, LocalSender},
GlommioError, ResourceType, Task, TaskQueueHandle,
};

Expand Down Expand Up @@ -86,8 +86,8 @@ impl Gate {
/// Close the gate, and wait for all spawned tasks to complete. If the gate is currently closing, this will wait
/// for it to close before returning a success. This is particularly useful if you might have a timeout on the close
/// - the would otherwise be no safe way to retry & wait for remaining tasks to finish.
pub async fn close(&self) -> Result<(), GlommioError<()>> {
self.inner.close().await
pub fn close(&self) -> impl Future<Output = Result<(), GlommioError<()>>> {
self.inner.close()
}

/// Whether the gate is open or not.
Expand All @@ -105,6 +105,9 @@ impl Gate {
}
}

type PreviousWaiter = Option<LocalSender<bool>>;
type CurrentClosure = LocalReceiver<bool>;

#[derive(Debug)]
struct GateInner {
count: Cell<usize>,
Expand Down Expand Up @@ -135,15 +138,30 @@ impl GateInner {
}
}

pub async fn close(&self) -> Result<(), GlommioError<()>> {
async fn wait_for_closure(
waiter: Result<Option<(CurrentClosure, PreviousWaiter)>, GlommioError<()>>,
) -> Result<(), GlommioError<()>> {
if let Some((waiter, previous_closer)) = waiter? {
waiter.recv().await;
if let Some(previous_closer) = previous_closer {
// Previous channel may be dropped so ignore the result.
let _ = previous_closer.try_send(true);
}
}

Ok(())
}

pub fn close(&self) -> impl Future<Output = Result<(), GlommioError<()>>> {
match self.state.replace(State::Closed) {
State::Open => {
if self.count.get() != 0 {
let (sender, receiver) = local_channel::new_bounded(1);
self.state.replace(State::Closing(sender));
receiver.recv().await;
Self::wait_for_closure(Ok(Some((receiver, None))))
} else {
Self::wait_for_closure(Ok(None))
}
Ok(())
}
State::Closing(previous_closer) => {
assert!(
Expand All @@ -158,11 +176,9 @@ impl GateInner {
let (sender, receiver) = local_channel::new_bounded(1);
self.state.replace(State::Closing(sender));

receiver.recv().await;
let _ = previous_closer.try_send(true);
Ok(())
Self::wait_for_closure(Ok(Some((receiver, Some(previous_closer)))))
}
State::Closed => Err(GlommioError::Closed(ResourceType::Gate)),
State::Closed => Self::wait_for_closure(Err(GlommioError::Closed(ResourceType::Gate))),
}
}

Expand Down Expand Up @@ -324,4 +340,31 @@ mod tests {
gate.close().await.unwrap();
})
}

#[test]
fn test_marked_closed_without_waiting() {
LocalExecutor::default().run(async {
let gate = Gate::new();
// Even if task is immediately cancelled, the gate still closes.
drop(gate.close());
assert!(gate.is_closed());

let gate = Gate::new();
let pass = gate.enter().unwrap();
// Even if task is cancelled, the gate is still marked as closing.
drop(gate.close());
assert!(!gate.is_open());
assert!(!gate.is_closed());
// Here we install a waiter after the aborted cancel.
let wait_for_closure = gate.close();
join!(
async move {
drop(pass);
},
async move {
wait_for_closure.await.unwrap();
}
);
})
}
}

0 comments on commit 43b5ec3

Please sign in to comment.