diff --git a/Cargo.toml b/Cargo.toml index 2e8c26b..e42d33d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "coachman" -version = "0.2.0" +version = "0.3.0" edition = "2021" license = "Unlicense" description = "rust asynchronous task manager built on top of tokio framework" diff --git a/examples/task_manager.rs b/examples/task_manager.rs index 1af5b5b..8480dba 100644 --- a/examples/task_manager.rs +++ b/examples/task_manager.rs @@ -29,7 +29,7 @@ async fn main() { task_keys.push(task_key) } - tokio::time::timeout(time::Duration::from_secs(5), task_manager.process()).await; + tokio::time::timeout(time::Duration::from_secs(5), task_manager.process(false)).await; for task_key in task_keys { if task_manager.cancel_task(task_key).is_ok() { diff --git a/src/lib.rs b/src/lib.rs index 7a65313..b6b30d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,5 +60,5 @@ pub mod manager; pub mod task; pub use macros::AwaitResult::{self, Canceled, Completed}; -pub use manager::{TaskBuilder, TaskManager}; +pub use manager::{TaskManager, TaskManagerBuilder}; pub use task::{is_task_canceled, spawn, TaskError, TaskHandle}; diff --git a/src/manager.rs b/src/manager.rs index 673f723..9f89f08 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -8,28 +8,31 @@ use crate::task::{spawn_inner, TaskError, TaskHandle}; /// Task manager builder. Provides methods for task manager initialization and configuration. #[derive(Copy, Clone)] -pub struct TaskBuilder { +pub struct TaskManagerBuilder { max_tasks: usize, capacity: usize, completion_events_buffer_size: usize, } -impl TaskBuilder { +impl TaskManagerBuilder { /// Sets max task count the manager will be handling - pub fn with_max_tasks(&mut self, max_tasks: usize) -> &mut TaskBuilder { + pub fn with_max_tasks(&mut self, max_tasks: usize) -> &mut TaskManagerBuilder { self.max_tasks = max_tasks; return self; } /// Sets slab task storage initial capacity. The right capacity choice prevents extra memory allocation. - pub fn with_capacity(&mut self, capacity: usize) -> &mut TaskBuilder { + pub fn with_capacity(&mut self, capacity: usize) -> &mut TaskManagerBuilder { self.capacity = capacity; return self; } - /// Sets completion events queue size. Too small queue size could prevent tasks from immediate - /// cancellation until manager handles events from another tasks and empties a the queue. - pub fn with_completion_event_buffer_size(&mut self, completion_event_buffer_size: usize) -> &mut TaskBuilder { + /// Sets completion event queue size. Too small queue size could prevent tasks from immediate + /// cancellation until manager handles events from another tasks and empties the queue. + pub fn with_completion_event_buffer_size( + &mut self, + completion_event_buffer_size: usize, + ) -> &mut TaskManagerBuilder { self.completion_events_buffer_size = completion_event_buffer_size; return self; } @@ -50,7 +53,7 @@ pub enum TaskManagerError { } /// Task manager is an asynchronous task supervisor that stores all spawned tasks, controls its states -/// and provides an api from task management. +/// and provides an api for task management. pub struct TaskManager { tasks: slab::Slab>, completion_event_queue_sender: mpsc::Sender, @@ -60,8 +63,8 @@ pub struct TaskManager { impl TaskManager { /// Returns a task manager builder. - pub fn builder() -> TaskBuilder { - TaskBuilder { + pub fn builder() -> TaskManagerBuilder { + TaskManagerBuilder { max_tasks: 1024, capacity: 32, completion_events_buffer_size: 256, @@ -111,7 +114,10 @@ impl TaskManager { /// Runs manager processing loop handling task events. /// Method is cancellation safe and can be used in `tokio::select!` macro. - pub async fn process(&mut self) { + /// If `resume_panic` argument is `true ` and any of the tasks panic + /// method resumes the panic on the current task. It is useful in test environment + /// when you want your application to be panicked if any of the spawned tasks panic. + pub async fn process(&mut self, resume_panic: bool) { loop { let task_key = self .completion_event_queue_receiver @@ -121,14 +127,19 @@ impl TaskManager { match self.tasks.try_remove(task_key) { None => log::debug!("task {} is not longer attached to the manager", task_key), - Some(task_handle) => { - let _ = task_handle.await; - } + Some(task_handle) => match task_handle.await { + Err(TaskError::Panicked(reason)) => { + if resume_panic { + std::panic::resume_unwind(reason); + } + } + _ => {} + }, } } } - /// Detached a task from the manager. The task is not longer supervised by the manager. + /// Detaches a task from the manager. The task is not longer supervised by the manager. pub fn detach(&mut self, task_key: usize) -> Result, TaskManagerError> { match self.tasks.try_remove(task_key) { Some(task_handle) => Ok(task_handle), @@ -152,7 +163,8 @@ impl TaskManager { /// Waits until all the tasks are completed consuming self. /// If `resume_panic` argument is `true ` and any of the tasks panic - /// method resumes the panic on the current task. + /// method resumes the panic on the current task. It is useful in test environment + /// when you want your application to be panicked if any of the spawned tasks panic. pub async fn join(mut self, resume_panic: bool) { for (_, task_handle) in std::mem::take(&mut self.tasks) { match task_handle.await { @@ -281,7 +293,7 @@ mod tests { assert_eq!(task_manager.size(), 2); tokio::task::yield_now().await; - tokio::time::timeout(Duration::from_millis(0), task_manager.process()) + tokio::time::timeout(Duration::from_millis(0), task_manager.process(true)) .await .unwrap_err(); assert_eq!(task_manager.size(), 0); @@ -290,7 +302,7 @@ mod tests { assert_eq!(task_manager.size(), 1); tokio::task::yield_now().await; - tokio::time::timeout(Duration::from_millis(0), task_manager.process()) + tokio::time::timeout(Duration::from_millis(0), task_manager.process(true)) .await .unwrap_err(); assert_eq!(task_manager.size(), 0); diff --git a/src/task.rs b/src/task.rs index d10be4e..4973f88 100644 --- a/src/task.rs +++ b/src/task.rs @@ -147,7 +147,7 @@ impl TaskHandle { /// Cancels the task associated with the handle. Whether the task handles cancellation /// event (using [`crate::try_await`] macro) or not depends on task itself. If the task - /// doesn't implement cancellation handling it continue execution until it finishes or being aborted. + /// doesn't implement cancellation handling it continue to execute until it finishes or being aborted. pub fn cancel(&mut self) { let _ = self.cancel_event_sender.send(true); self.canceled = true; @@ -159,7 +159,7 @@ impl Future for TaskHandle { /// Polls for the task completion. The task could be completed successfully returning [`Ok`] /// or exited with an [`Err`]<[`TaskError`]> error. In the last case the - /// actual reason (cancellation, abortion or panicking) could be fetched from it. + /// actual reason (cancellation, abortion or panicking) could be fetched from the error. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match Pin::new(&mut self.handle).poll(cx) { Poll::Pending => Poll::Pending,