Skip to content

Commit

Permalink
feat(rust): rewrote router into a state container
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo committed Dec 11, 2024
1 parent d1124b2 commit 58907c3
Show file tree
Hide file tree
Showing 47 changed files with 901 additions and 1,820 deletions.
2 changes: 1 addition & 1 deletion examples/rust/get_started/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,6 @@ fn vault_and_identity() -> Result<(), Error> {

// Assert successful run conditions
assert_eq!(Some(0), exitcode);
assert!(stdout.contains("No more workers left. Goodbye!"));
assert!(stdout.contains("No more workers left. Goodbye!"));
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,6 @@ impl ConnectionBuilder {
if last_pass && is_last {
let is_terminal = ctx
.get_metadata(address.clone())
.await
.ok()
.flatten()
.map(|m| m.is_terminal)
.unwrap_or(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl NodeManager {
)));
}

if ctx.is_worker_registered_at(addr.clone()).await? {
if ctx.is_worker_registered_at(&addr) {
ctx.stop_worker(addr.clone()).await?
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,8 @@ impl NodeManagerWorker {
&self,
ctx: &Context,
) -> Result<Response<WorkerList>, Response<Error>> {
let workers = match ctx.list_workers().await {
Err(e) => Err(Response::internal_error_no_request(&e.to_string())),
Ok(workers) => Ok(workers),
}?;

let list = workers
let list = ctx
.list_workers()
.into_iter()
.map(|addr| WorkerStatus::new(addr.address()))
.collect();
Expand Down
5 changes: 1 addition & 4 deletions implementations/rust/ockam/ockam_api/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,7 @@ impl TestNode {
}

pub async fn create(runtime: Arc<Runtime>, listen_addr: Option<&str>) -> Self {
let (mut context, mut executor) = NodeBuilder::new().with_runtime(runtime.clone()).build();
runtime.spawn(async move {
executor.start_router().await.expect("cannot start router");
});
let (mut context, _executor) = NodeBuilder::new().with_runtime(runtime.clone()).build();
let node_manager_handle = start_manager_for_tests(
&mut context,
listen_addr,
Expand Down
6 changes: 3 additions & 3 deletions implementations/rust/ockam/ockam_api/tests/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async fn authority_starts_with_default_configuration(ctx: &mut Context) -> Resul
let configuration = default_configuration().await?;
start_authority_node(ctx, &configuration).await?;

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();

assert!(!workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR)));
assert!(!workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR)));
Expand All @@ -28,7 +28,7 @@ async fn authority_starts_direct_authenticator(ctx: &mut Context) -> Result<()>
configuration.no_direct_authentication = false;
start_authority_node(ctx, &configuration).await?;

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();

assert!(workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR)));
assert!(!workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR)));
Expand All @@ -46,7 +46,7 @@ async fn authority_starts_enrollment_token(ctx: &mut Context) -> Result<()> {
configuration.no_token_enrollment = false;
start_authority_node(ctx, &configuration).await?;

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();

assert!(!workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR)));
assert!(workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR)));
Expand Down
10 changes: 2 additions & 8 deletions implementations/rust/ockam/ockam_api/tests/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,12 @@ async fn start_monitoring__available__should_be_up_fast(ctx: &mut Context) -> Re
ctx.start_worker(Address::from_string("echo"), MockEchoer::new())
.await?;

assert!(
!ctx.is_worker_registered_at(session.collector_address().clone())
.await?
);
assert!(!ctx.is_worker_registered_at(session.collector_address()));

// Start the Session in a separate task
session.start_monitoring().await?;

assert!(
ctx.is_worker_registered_at(session.collector_address().clone())
.await?
);
assert!(ctx.is_worker_registered_at(session.collector_address()));

let mut time_to_restore = 0;

Expand Down
12 changes: 2 additions & 10 deletions implementations/rust/ockam/ockam_app_lib/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,12 @@ impl AppState {
) -> Result<AppState> {
let cli_state = CliState::with_default_dir()?;
let rt = Arc::new(Runtime::new().expect("cannot create a tokio runtime"));
let (context, mut executor) = NodeBuilder::new()
let (context, _executor) = NodeBuilder::new()
.no_logging()
.with_runtime(rt.clone())
.build();
let context = Arc::new(context);

// start the router, it is needed for the node manager creation
rt.spawn(async move {
let result = executor.start_router().await;
if let Err(e) = result {
error!(%e, "Failed to start the router")
}
});

let runtime = context.runtime().clone();
let future = async {
Self::make(
Expand Down Expand Up @@ -327,7 +319,7 @@ impl AppState {

info!("stopped the old node manager");

for w in self.context.list_workers().await.into_diagnostic()? {
for w in self.context.list_workers() {
let _ = self.context.stop_worker(w.address()).await;
}
info!("stopped all the ctx workers");
Expand Down
6 changes: 5 additions & 1 deletion implementations/rust/ockam/ockam_core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ pub mod sync {
/// Wrap `spin::RwLock` as it does not return LockResult<Guard> like `std::sync::Mutex`.
#[derive(Debug)]
pub struct RwLock<T>(spin::RwLock<T>);

/// Wrap `spin::RwLockWriteGuard`
pub type RwLockWriteGuard<'a, T> = spin::RwLockWriteGuard<'a, T>;

impl<T> RwLock<T> {
/// Creates a new spinlock wrapping the supplied data.
pub fn new(value: T) -> Self {
Expand Down Expand Up @@ -290,7 +294,7 @@ pub mod sync {
#[cfg(feature = "std")]
pub mod sync {
pub use std::sync::Arc;
pub use std::sync::{Mutex, RwLock};
pub use std::sync::{Mutex, RwLock, RwLockWriteGuard};
}

/// Provides `std::task` for `no_std` targets.
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_identity/tests/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ async fn should_stop_encryptor__and__decryptor__in__secure_channel(
0
);

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();
assert!(!workers.contains(channel1.decryptor_messaging_address()));
assert!(!workers.contains(channel1.encryptor_messaging_address()));
assert!(!workers.contains(channel2.decryptor_messaging_address()));
Expand Down
3 changes: 0 additions & 3 deletions implementations/rust/ockam/ockam_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ no_std = ["ockam_core/no_std", "ockam_transport_core/no_std", "heapless"]
# Feature: "alloc" enables support for heap allocation (implied by `feature = "std"`)
alloc = ["ockam_core/alloc", "ockam_executor/alloc", "futures/alloc", "minicbor/alloc"]

# Feature: "dump_internals" when set, will dump the internal state of
# workers at startup via the trace! macro.
dump_internals = []
# TODO should these features be combined?
metrics = []

Expand Down
23 changes: 7 additions & 16 deletions implementations/rust/ockam/ockam_node/src/async_drop.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::tokio::sync::{
mpsc::Sender as DefaultSender,
oneshot::{self, Receiver, Sender},
};
use crate::NodeMessage;
use crate::router::Router;
use crate::tokio::sync::oneshot::{self, Receiver, Sender};
use alloc::sync::Arc;
use ockam_core::Address;

/// A helper to implement Drop mechanisms, but async
Expand All @@ -19,7 +17,7 @@ use ockam_core::Address;
/// additional metadata to generate messages.
pub struct AsyncDrop {
rx: Receiver<Address>,
sender: DefaultSender<NodeMessage>,
router: Arc<Router>,
}

impl AsyncDrop {
Expand All @@ -29,9 +27,9 @@ impl AsyncDrop {
/// Context that creates this hook, while the `address` field must
/// refer to the address of the context that will be deallocated
/// this way.
pub fn new(sender: DefaultSender<NodeMessage>) -> (Self, Sender<Address>) {
pub fn new(router: Arc<Router>) -> (Self, Sender<Address>) {
let (tx, rx) = oneshot::channel();
(Self { rx, sender }, tx)
(Self { rx, router }, tx)
}

/// Wait for the cancellation of the channel and then send a
Expand All @@ -42,16 +40,9 @@ impl AsyncDrop {
pub async fn run(self) {
if let Ok(addr) = self.rx.await {
debug!("Received AsyncDrop request for address: {}", addr);

let (msg, mut reply) = NodeMessage::stop_worker(addr, true);
if let Err(e) = self.sender.send(msg).await {
if let Err(e) = self.router.stop_worker(&addr, true).await {
debug!("Failed sending AsyncDrop request to router: {}", e);
}

// Then check that address was properly shut down
if reply.recv().await.is_none() {
debug!("AsyncDrop router reply was None");
}
}
}
}
Loading

0 comments on commit 58907c3

Please sign in to comment.