Skip to content

Commit

Permalink
Merge pull request #10 from dapper91/dev
Browse files Browse the repository at this point in the history
- added flag resume_panic to the process method.
- TaskBuilder renamed to TaskManagerBuilder.
- documentation fixed.
  • Loading branch information
dapper91 authored Nov 21, 2021
2 parents 8a3c788 + 40fc02b commit cb6ee08
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "coachman"
version = "0.2.0"
version = "0.3.0"
edition = "2021"
license = "Unlicense"
description = "rust asynchronous task manager built on top of tokio framework"
Expand Down
2 changes: 1 addition & 1 deletion examples/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main() {
task_keys.push(task_key)
}

tokio::time::timeout(time::Duration::from_secs(5), task_manager.process()).await;
tokio::time::timeout(time::Duration::from_secs(5), task_manager.process(false)).await;

for task_key in task_keys {
if task_manager.cancel_task(task_key).is_ok() {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@ pub mod manager;
pub mod task;

pub use macros::AwaitResult::{self, Canceled, Completed};
pub use manager::{TaskBuilder, TaskManager};
pub use manager::{TaskManager, TaskManagerBuilder};
pub use task::{is_task_canceled, spawn, TaskError, TaskHandle};
48 changes: 30 additions & 18 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,31 @@ use crate::task::{spawn_inner, TaskError, TaskHandle};

/// Task manager builder. Provides methods for task manager initialization and configuration.
#[derive(Copy, Clone)]
pub struct TaskBuilder {
pub struct TaskManagerBuilder {
max_tasks: usize,
capacity: usize,
completion_events_buffer_size: usize,
}

impl TaskBuilder {
impl TaskManagerBuilder {
/// Sets max task count the manager will be handling
pub fn with_max_tasks(&mut self, max_tasks: usize) -> &mut TaskBuilder {
pub fn with_max_tasks(&mut self, max_tasks: usize) -> &mut TaskManagerBuilder {
self.max_tasks = max_tasks;
return self;
}

/// Sets slab task storage initial capacity. The right capacity choice prevents extra memory allocation.
pub fn with_capacity(&mut self, capacity: usize) -> &mut TaskBuilder {
pub fn with_capacity(&mut self, capacity: usize) -> &mut TaskManagerBuilder {
self.capacity = capacity;
return self;
}

/// Sets completion events queue size. Too small queue size could prevent tasks from immediate
/// cancellation until manager handles events from another tasks and empties a the queue.
pub fn with_completion_event_buffer_size(&mut self, completion_event_buffer_size: usize) -> &mut TaskBuilder {
/// Sets completion event queue size. Too small queue size could prevent tasks from immediate
/// cancellation until manager handles events from another tasks and empties the queue.
pub fn with_completion_event_buffer_size(
&mut self,
completion_event_buffer_size: usize,
) -> &mut TaskManagerBuilder {
self.completion_events_buffer_size = completion_event_buffer_size;
return self;
}
Expand All @@ -50,7 +53,7 @@ pub enum TaskManagerError {
}

/// Task manager is an asynchronous task supervisor that stores all spawned tasks, controls its states
/// and provides an api from task management.
/// and provides an api for task management.
pub struct TaskManager {
tasks: slab::Slab<TaskHandle<()>>,
completion_event_queue_sender: mpsc::Sender<usize>,
Expand All @@ -60,8 +63,8 @@ pub struct TaskManager {

impl TaskManager {
/// Returns a task manager builder.
pub fn builder() -> TaskBuilder {
TaskBuilder {
pub fn builder() -> TaskManagerBuilder {
TaskManagerBuilder {
max_tasks: 1024,
capacity: 32,
completion_events_buffer_size: 256,
Expand Down Expand Up @@ -111,7 +114,10 @@ impl TaskManager {

/// Runs manager processing loop handling task events.
/// Method is cancellation safe and can be used in `tokio::select!` macro.
pub async fn process(&mut self) {
/// If `resume_panic` argument is `true ` and any of the tasks panic
/// method resumes the panic on the current task. It is useful in test environment
/// when you want your application to be panicked if any of the spawned tasks panic.
pub async fn process(&mut self, resume_panic: bool) {
loop {
let task_key = self
.completion_event_queue_receiver
Expand All @@ -121,14 +127,19 @@ impl TaskManager {

match self.tasks.try_remove(task_key) {
None => log::debug!("task {} is not longer attached to the manager", task_key),
Some(task_handle) => {
let _ = task_handle.await;
}
Some(task_handle) => match task_handle.await {
Err(TaskError::Panicked(reason)) => {
if resume_panic {
std::panic::resume_unwind(reason);
}
}
_ => {}
},
}
}
}

/// Detached a task from the manager. The task is not longer supervised by the manager.
/// Detaches a task from the manager. The task is not longer supervised by the manager.
pub fn detach(&mut self, task_key: usize) -> Result<TaskHandle<()>, TaskManagerError> {
match self.tasks.try_remove(task_key) {
Some(task_handle) => Ok(task_handle),
Expand All @@ -152,7 +163,8 @@ impl TaskManager {

/// Waits until all the tasks are completed consuming self.
/// If `resume_panic` argument is `true ` and any of the tasks panic
/// method resumes the panic on the current task.
/// method resumes the panic on the current task. It is useful in test environment
/// when you want your application to be panicked if any of the spawned tasks panic.
pub async fn join(mut self, resume_panic: bool) {
for (_, task_handle) in std::mem::take(&mut self.tasks) {
match task_handle.await {
Expand Down Expand Up @@ -281,7 +293,7 @@ mod tests {
assert_eq!(task_manager.size(), 2);

tokio::task::yield_now().await;
tokio::time::timeout(Duration::from_millis(0), task_manager.process())
tokio::time::timeout(Duration::from_millis(0), task_manager.process(true))
.await
.unwrap_err();
assert_eq!(task_manager.size(), 0);
Expand All @@ -290,7 +302,7 @@ mod tests {
assert_eq!(task_manager.size(), 1);

tokio::task::yield_now().await;
tokio::time::timeout(Duration::from_millis(0), task_manager.process())
tokio::time::timeout(Duration::from_millis(0), task_manager.process(true))
.await
.unwrap_err();
assert_eq!(task_manager.size(), 0);
Expand Down
4 changes: 2 additions & 2 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl<T> TaskHandle<T> {

/// Cancels the task associated with the handle. Whether the task handles cancellation
/// event (using [`crate::try_await`] macro) or not depends on task itself. If the task
/// doesn't implement cancellation handling it continue execution until it finishes or being aborted.
/// doesn't implement cancellation handling it continue to execute until it finishes or being aborted.
pub fn cancel(&mut self) {
let _ = self.cancel_event_sender.send(true);
self.canceled = true;
Expand All @@ -159,7 +159,7 @@ impl<T> Future for TaskHandle<T> {

/// Polls for the task completion. The task could be completed successfully returning [`Ok`]
/// or exited with an [`Err`]<[`TaskError`]> error. In the last case the
/// actual reason (cancellation, abortion or panicking) could be fetched from it.
/// actual reason (cancellation, abortion or panicking) could be fetched from the error.
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.handle).poll(cx) {
Poll::Pending => Poll::Pending,
Expand Down

0 comments on commit cb6ee08

Please sign in to comment.