diff --git a/chapter_14/chapter-14.ipynb b/chapter_14/chapter-14.ipynb new file mode 100644 index 0000000..469a64a --- /dev/null +++ b/chapter_14/chapter-14.ipynb @@ -0,0 +1,700 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "60c8d72c-b011-4360-b368-d540b8d7361a", + "metadata": {}, + "source": [ + "# Asynchronous Programming" + ] + }, + { + "cell_type": "markdown", + "id": "fd437cfa-f913-4465-9d30-eaef552d861f", + "metadata": {}, + "source": [ + "## Utilizing the tokio Library" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "e93816e8-c065-4dad-84dc-861b6746fd52", + "metadata": {}, + "outputs": [], + "source": [ + ":dep tokio = { version = \"1.35.0\", features = [\"full\"] }" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "e0b2454d-f645-42c2-aeee-8488b63eba99", + "metadata": {}, + "outputs": [], + "source": [ + ":dep reqwest = { version = \"0.11.22\" }" + ] + }, + { + "cell_type": "markdown", + "id": "a5865423-115b-4959-b352-e0a4027e3a51", + "metadata": {}, + "source": [ + "## Harnessing the Power of Async for Responsiveness" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "bc8a66bb-490a-4cf2-8145-516249bd3091", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Processing: Sample Data\n", + "Fetch Result: Ok(\"Data fetched successfully\")\n" + ] + }, + { + "data": { + "text/plain": [ + "()" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "use tokio::time::Duration;\n", + "\n", + "\n", + "async fn fetch_data() -> Result {\n", + " // Simulate an asynchronous HTTP request\n", + " tokio::time::sleep(Duration::from_secs(2)).await;\n", + " Ok::(\"Data fetched successfully\".to_owned())\n", + "}\n", + "\n", + "async fn process_data(data: String) {\n", + " // Asynchronously process the fetched data\n", + " println!(\"Processing: {}\", data);\n", + "}\n", + "\n", + "async fn main_async_workflow() {\n", + " // Spawn multiple asynchronous tasks to perform operations concurrently\n", + " let fetch_task = tokio::spawn(fetch_data());\n", + " let process_task = tokio::spawn(process_data(\"Sample Data\".to_owned()));\n", + "\n", + " // Await the completion of the tasks\n", + " let fetch_result = fetch_task.await.expect(\"Failed to fetch data\");\n", + " let _ = process_task.await; // <10>\n", + "\n", + " // Continue with the results\n", + " println!(\"Fetch Result: {:?}\", fetch_result); // <11>\n", + "}\n", + "\n", + "#[tokio::main]\n", + "async fn main() {\n", + " // Run the main asynchronous workflow\n", + " main_async_workflow().await;\n", + "}\n", + "\n", + "main()" + ] + }, + { + "cell_type": "markdown", + "id": "543b00c3-974c-40fe-b6b3-5dff44d68822", + "metadata": {}, + "source": [ + "### Error Handling in Asynchronous Code" + ] + }, + { + "cell_type": "markdown", + "id": "44008c8b-ae2f-43b9-bf78-ffcbba3d3a5c", + "metadata": {}, + "source": [ + "### Example 1: Database Interaction using FFI" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "e92dbe02-7fed-4484-bd48-b467904d60fe", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Async operation completed successfully\n" + ] + }, + { + "data": { + "text/plain": [ + "()" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "use std::error::Error;\n", + "\n", + "async fn async_with_error_handling() -> Result<(), Box> {\n", + " let handle = tokio::task::spawn(async {\n", + " // Asynchronous logic with potential errors\n", + " Ok::<(), Box>(())\n", + " });\n", + "\n", + " // Await task completion and handle errors\n", + " let _ = handle.await.map_err(|error| {\n", + " Box::new(error) as Box\n", + " })?;\n", + " Ok(())\n", + "}\n", + "\n", + "#[tokio::main]\n", + "async fn main() {\n", + " match async_with_error_handling().await {\n", + " Ok(_) => println!(\"Async operation completed successfully\"),\n", + " Err(error) => eprintln!(\"Error during async operation: {}\", error),\n", + " }\n", + "}\n", + "\n", + "main()" + ] + }, + { + "cell_type": "markdown", + "id": "058b2fa1-9339-4c9e-9a14-aff66eda9681", + "metadata": {}, + "source": [ + "### Concurrent Task Lifetimes and Resource Management" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "0ddc966b-bede-4cf2-a1ec-3c383edee531", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Task acquiring lock on shared data.\n", + "Task modifying shared data: 1\n", + "Task acquiring lock on shared data.\n", + "Task modifying shared data: 2\n", + "Task 1 and Task 2 spawned.\n", + "Task 1 completed successfully.\n", + "Task 2 completed successfully.\n", + "Continuing with the results.\n", + "Main function completed.\n" + ] + }, + { + "data": { + "text/plain": [ + "()" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "use tokio::sync::Mutex;\n", + "use std::sync::Arc;\n", + "\n", + "async fn async_with_shared_data() {\n", + " let shared_data = Arc::new(Mutex::new(0));\n", + "\n", + " let task1 = tokio::spawn(async_with_shared_data_task(Arc::clone(&shared_data)));\n", + " let task2 = tokio::spawn(async_with_shared_data_task(Arc::clone(&shared_data)));\n", + "\n", + " println!(\"Task 1 and Task 2 spawned.\");\n", + "\n", + " task1.await.expect(\"Task 1 failed\");\n", + "\n", + " println!(\"Task 1 completed successfully.\");\n", + "\n", + " task2.await.expect(\"Task 2 failed\");\n", + "\n", + " println!(\"Task 2 completed successfully.\");\n", + "\n", + " println!(\"Continuing with the results.\");\n", + "}\n", + "\n", + "async fn async_with_shared_data_task(shared_data: Arc>) {\n", + " let mut data = shared_data.lock().await;\n", + "\n", + " println!(\"Task acquiring lock on shared data.\");\n", + "\n", + " *data += 1;\n", + " println!(\"Task modifying shared data: {}\", data);\n", + "}\n", + "\n", + "#[tokio::main]\n", + "async fn main() {\n", + " async_with_shared_data().await;\n", + " println!(\"Main function completed.\");\n", + "}\n", + "\n", + "main()" + ] + }, + { + "cell_type": "markdown", + "id": "e624f7f6-3a85-4d28-8e6e-86dd37596fda", + "metadata": {}, + "source": [ + "## Advanced Patterns in Asynchronous Programming" + ] + }, + { + "cell_type": "markdown", + "id": "0eccdcd4-e972-491e-891d-dd16ff2adb13", + "metadata": {}, + "source": [ + "### Asynchronous Streams" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "5e82ec11-5b16-4e18-b8e6-adfb9d1c435f", + "metadata": {}, + "outputs": [], + "source": [ + ":dep tokio-stream = { version = \"0.1.14\" }" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "9f1764bb-47f8-43f8-bdab-e59c4aad48ef", + "metadata": {}, + "outputs": [], + "source": [ + ":dep rand = { version = \"0.8.5\" }" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "d78a4248-71de-4981-b036-14ce2deea582", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Random Number: 1375459144\n", + "Random Number: 2810972174\n", + "Random Number: 3420131127\n", + "Random Number: 3810768654\n", + "Random Number: 2841032359\n" + ] + }, + { + "data": { + "text/plain": [ + "()" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "use std::pin::Pin;\n", + "use std::task::{Context, Poll};\n", + "use tokio::time::Duration;\n", + "use tokio_stream::{Stream, StreamExt};\n", + "\n", + "\n", + "struct MyInterval {\n", + " interval: tokio::time::Interval,\n", + "}\n", + "\n", + "impl MyInterval {\n", + " fn new(duration: Duration) -> Self {\n", + " Self {\n", + " interval: tokio::time::interval(duration),\n", + " }\n", + " }\n", + "}\n", + "\n", + "impl Stream for MyInterval {\n", + " type Item = tokio::time::Instant;\n", + "\n", + " fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {\n", + " match Pin::new(&mut self.interval).poll_tick(cx) {\n", + " Poll::Ready(instant) => Poll::Ready(Some(instant)),\n", + " Poll::Pending => Poll::Pending,\n", + " }\n", + " }\n", + "}\n", + "\n", + "#[tokio::main]\n", + "async fn main() {\n", + " // <11>\n", + " let mut random_number_stream =\n", + " MyInterval::new(Duration::from_secs(1)).map(|_| rand::random::());\n", + "\n", + " for _ in 0..5 {\n", + " // <13>\n", + " if let Some(random_number) = random_number_stream.next().await {\n", + " println!(\"Random Number: {}\", random_number);\n", + " }\n", + " }\n", + "}\n", + "\n", + "main()" + ] + }, + { + "cell_type": "markdown", + "id": "f898d182-7321-45c1-bbf1-373c275599af", + "metadata": {}, + "source": [ + "### Fan-Out and Fan-In with Async Streams" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "67db8999-2c9b-4483-9520-e0a60cc5e872", + "metadata": {}, + "outputs": [], + "source": [ + ":dep tokio-stream = { version = \"0.1.14\" }" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "f3f07546-ed96-4b38-ac69-3f5e7046c57d", + "metadata": {}, + "outputs": [ + { + "ename": "Error", + "evalue": "failed to resolve: use of undeclared crate or module `stream`", + "output_type": "error", + "traceback": [ + "\u001b[31m[E0433] Error:\u001b[0m failed to resolve: use of undeclared crate or module `stream`", + " \u001b[38;5;246m╭\u001b[0m\u001b[38;5;246m─\u001b[0m\u001b[38;5;246m[\u001b[0mcommand_22:1:1\u001b[38;5;246m]\u001b[0m", + " \u001b[38;5;246m│\u001b[0m", + " \u001b[38;5;246m10 │\u001b[0m \u001b[38;5;249m \u001b[0m\u001b[38;5;249m \u001b[0m\u001b[38;5;249m \u001b[0m\u001b[38;5;249m \u001b[0m\u001b[38;5;54ms\u001b[0m\u001b[38;5;54mt\u001b[0m\u001b[38;5;54mr\u001b[0m\u001b[38;5;54me\u001b[0m\u001b[38;5;54ma\u001b[0m\u001b[38;5;54mm\u001b[0m\u001b[38;5;249m:\u001b[0m\u001b[38;5;249m:\u001b[0m\u001b[38;5;249mi\u001b[0m\u001b[38;5;249mt\u001b[0m\u001b[38;5;249me\u001b[0m\u001b[38;5;249mr\u001b[0m\u001b[38;5;249m(\u001b[0m\u001b[38;5;249m0\u001b[0m\u001b[38;5;249m.\u001b[0m\u001b[38;5;249m.\u001b[0m\u001b[38;5;249m5\u001b[0m\u001b[38;5;249m)\u001b[0m\u001b[38;5;249m.\u001b[0m\u001b[38;5;249mt\u001b[0m\u001b[38;5;249mh\u001b[0m\u001b[38;5;249me\u001b[0m\u001b[38;5;249mn\u001b[0m\u001b[38;5;249m(\u001b[0m\u001b[38;5;249ma\u001b[0m\u001b[38;5;249ms\u001b[0m\u001b[38;5;249my\u001b[0m\u001b[38;5;249mn\u001b[0m\u001b[38;5;249mc\u001b[0m\u001b[38;5;249m_\u001b[0m\u001b[38;5;249mt\u001b[0m\u001b[38;5;249ma\u001b[0m\u001b[38;5;249ms\u001b[0m\u001b[38;5;249mk\u001b[0m\u001b[38;5;249m)\u001b[0m", + " \u001b[38;5;240m │\u001b[0m \u001b[38;5;54m─\u001b[0m\u001b[38;5;54m─\u001b[0m\u001b[38;5;54m─\u001b[0m\u001b[38;5;54m┬\u001b[0m\u001b[38;5;54m─\u001b[0m\u001b[38;5;54m─\u001b[0m ", + " \u001b[38;5;240m │\u001b[0m \u001b[38;5;54m╰\u001b[0m\u001b[38;5;54m─\u001b[0m\u001b[38;5;54m─\u001b[0m\u001b[38;5;54m─\u001b[0m\u001b[38;5;54m─\u001b[0m use of undeclared crate or module `stream`", + " \u001b[38;5;240m │\u001b[0m \u001b[38;5;100m│\u001b[0m ", + " \u001b[38;5;240m │\u001b[0m \u001b[38;5;100m╰\u001b[0m\u001b[38;5;100m─\u001b[0m\u001b[38;5;100m─\u001b[0m\u001b[38;5;100m─\u001b[0m\u001b[38;5;100m─\u001b[0m help: a trait with a similar name exists: `Stream`", + "\u001b[38;5;246m────╯\u001b[0m" + ] + } + ], + "source": [ + "use tokio_stream::{self as stream, Stream, StreamExt};\n", + "use tokio::time::{sleep, Duration};\n", + "\n", + "async fn async_task(id: usize) -> usize {\n", + " sleep(Duration::from_secs(id as u64)).await;\n", + " id * 2\n", + "}\n", + "\n", + "fn fan_out_tasks() -> impl Stream {\n", + " stream::iter(0..5).then(async_task)\n", + "}\n", + "\n", + "\n", + "async fn fan_in_results() {\n", + " let result_stream = fan_out_tasks();\n", + "\n", + " tokio::pin!(result_stream);\n", + "\n", + " while let Some(result) = result_stream.next().await {\n", + " println!(\"Received result: {:?}\", result);\n", + " }\n", + "}\n", + "\n", + "#[tokio::main]\n", + "async fn main() {\n", + " fan_in_results().await;\n", + "}\n", + "\n", + "main()\n", + "\n", + "// TODO: file an issue regarding `as` keyword?" + ] + }, + { + "cell_type": "markdown", + "id": "f41364be-6f5f-4073-a183-9733b2419f33", + "metadata": {}, + "source": [ + "### Cancelation and Timeout Handling" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "623fcfcd-78c1-4bb1-9162-78d6bbffbe54", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Error: deadline has elapsed\n" + ] + }, + { + "data": { + "text/plain": [ + "()" + ] + }, + "execution_count": 23, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "use tokio::time::{sleep, timeout, Duration};\n", + "\n", + "async fn async_task() {\n", + " sleep(Duration::from_secs(5)).await;\n", + " println!(\"Task completed\");\n", + "}\n", + "\n", + "async fn cancelable_task() {\n", + " timeout(Duration::from_secs(2), async_task())\n", + " .await\n", + " .unwrap_or_else(|err| {\n", + " eprintln!(\"Error: {}\", err);\n", + " });\n", + "}\n", + "\n", + "#[tokio::main]\n", + "async fn main() {\n", + " cancelable_task().await;\n", + "}\n", + "\n", + "main()" + ] + }, + { + "cell_type": "markdown", + "id": "96a74b64-e2e7-4a5c-b0a1-1d7e892082d7", + "metadata": {}, + "source": [ + "### Dynamic Task Management" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "53f1e704-0b03-40b0-94a7-bd8410121be2", + "metadata": {}, + "outputs": [], + "source": [ + ":dep futures = { version = \"0.3.29\" }" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "b03dddbf-09b7-4174-b7ce-84641c93501f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Task result: 0\n", + "Task result: 3\n", + "Task result: 6\n", + "Task result: 9\n", + "Task result: 12\n" + ] + }, + { + "data": { + "text/plain": [ + "()" + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "use futures::stream::FuturesUnordered;\n", + "use futures::StreamExt;\n", + "use tokio::task::spawn;\n", + "use tokio::time::{sleep, Duration};\n", + "\n", + "async fn dynamic_task_manager() {\n", + " let task_manager = FuturesUnordered::new();\n", + "\n", + " for id in 0..5 {\n", + " let task = spawn(async move {\n", + " sleep(Duration::from_secs(id as u64)).await;\n", + " id * 3\n", + " });\n", + "\n", + " task_manager.push(task);\n", + " }\n", + "\n", + " tokio::pin!(task_manager);\n", + "\n", + " while let Some(result) = task_manager.next().await {\n", + " println!(\"Task result: {:?}\", result.unwrap());\n", + " }\n", + "}\n", + "\n", + "#[tokio::main]\n", + "async fn main() {\n", + " dynamic_task_manager().await;\n", + "}\n", + "\n", + "main()" + ] + }, + { + "cell_type": "markdown", + "id": "4c139593-34fb-4d02-80d0-fd99977f3fae", + "metadata": {}, + "source": [ + "### Integrating Async Code with Sync Code" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "id": "17b5a5b8-6f3d-4510-9328-92b5829a42d1", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Asynchronous code: Start\n", + "Asynchronous code: End\n", + "Synchronous code: Start\n", + "Synchronous iteration: 1\n", + "Synchronous iteration: 2\n", + "Synchronous iteration: 3\n", + "Synchronous code: End\n", + "Continuing with asynchronous logic\n", + "Main function completed\n" + ] + }, + { + "data": { + "text/plain": [ + "()" + ] + }, + "execution_count": 27, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "use tokio::task;\n", + "use tokio::time::{sleep, Duration};\n", + "\n", + "#[derive(Clone)]\n", + "struct AsyncResource {\n", + " // Resource fields\n", + "}\n", + "\n", + "impl AsyncResource {\n", + " async fn cleanup(&self) {\n", + " // Asynchronous cleanup logic\n", + " println!(\"Cleaning up resources asynchronously...\");\n", + " // Simulate cleanup work\n", + " sleep(Duration::from_secs(1)).await;\n", + " println!(\"Cleanup completed\");\n", + " }\n", + "}\n", + "\n", + "async fn async_and_sync_integration() {\n", + " let handle = task::spawn(async {\n", + "\n", + " println!(\"Asynchronous code: Start\");\n", + "\n", + " sleep(Duration::from_secs(2)).await;\n", + "\n", + " println!(\"Asynchronous code: End\");\n", + "\n", + " tokio::task::block_in_place(|| {\n", + "\n", + " println!(\"Synchronous code: Start\");\n", + "\n", + " for i in 1..=3 {\n", + " println!(\"Synchronous iteration: {}\", i);\n", + " }\n", + "\n", + " println!(\"Synchronous code: End\");\n", + " });\n", + "\n", + " println!(\"Continuing with asynchronous logic\");\n", + " });\n", + "\n", + " handle.await.expect(\"Task failed\");\n", + "}\n", + "\n", + "#[tokio::main]\n", + "async fn main() {\n", + " async_and_sync_integration().await;\n", + "\n", + " println!(\"Main function completed\");\n", + "}\n", + "\n", + "main()" + ] + }, + { + "cell_type": "markdown", + "id": "6ae516b6-36ad-41dc-bad9-cc6dea7526a8", + "metadata": {}, + "source": [ + "---\n", + "---" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Rust", + "language": "rust", + "name": "rust" + }, + "language_info": { + "codemirror_mode": "rust", + "file_extension": ".rs", + "mimetype": "text/rust", + "name": "Rust", + "pygment_lexer": "rust", + "version": "" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}