Skip to content

Commit

Permalink
Add timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ryardley committed Dec 22, 2024
1 parent c374a77 commit 31ad596
Showing 1 changed file with 50 additions and 36 deletions.
86 changes: 50 additions & 36 deletions packages/ciphernode/net/src/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use libp2p::{
Multiaddr,
};
use std::net::ToSocketAddrs;
use tokio::select;
use tokio::sync::{broadcast, mpsc};
use tokio::time::{sleep, timeout, Duration};
use tracing::error;
use tracing::info;

Expand Down Expand Up @@ -84,46 +86,58 @@ async fn wait_for_connection(
dial_connection: ConnectionId,
) -> Result<(), RetryError> {
loop {
match event_rx.recv().await.map_err(to_retry)? {
NetworkPeerEvent::ConnectionEstablished { connection_id } => {
if connection_id == dial_connection {
info!("Connection Established");
return Ok(());
}
}
NetworkPeerEvent::DialError { error } => {
info!("DialError!");
return match error.as_ref() {
// If we are dialing ourself then we should just fail
DialError::NoAddresses { .. } => {
info!("DialError received. Returning RetryError::Failure");
Err(RetryError::Failure(error.clone().into()))
// Create a timeout future that can be reset
select! {
result = event_rx.recv() => {
match result.map_err(to_retry)? {
NetworkPeerEvent::ConnectionEstablished { connection_id } => {
if connection_id == dial_connection {
info!("Connection Established");
return Ok(());
}
}
// Try again otherwise
_ => Err(RetryError::Retry(error.clone().into())),
};
}
NetworkPeerEvent::OutgoingConnectionError {
connection_id,
error,
} => {
info!("OutgoingConnectionError!");
if connection_id == dial_connection {
info!(
"Connection {} failed because of error {}. Retrying...",
connection_id, error
);
return match error.as_ref() {
// If we are dialing ourself then we should just fail
DialError::NoAddresses { .. } => {
Err(RetryError::Failure(error.clone().into()))
NetworkPeerEvent::DialError { error } => {
info!("DialError!");
return match error.as_ref() {
// If we are dialing ourself then we should just fail
DialError::NoAddresses { .. } => {
info!("DialError received. Returning RetryError::Failure");
Err(RetryError::Failure(error.clone().into()))
}
// Try again otherwise
_ => Err(RetryError::Retry(error.clone().into())),
};
}
NetworkPeerEvent::OutgoingConnectionError {
connection_id,
error,
} => {
info!("OutgoingConnectionError!");
if connection_id == dial_connection {
info!(
"Connection {} failed because of error {}. Retrying...",
connection_id, error
);
return match error.as_ref() {
// If we are dialing ourself then we should just fail
DialError::NoAddresses { .. } => {
Err(RetryError::Failure(error.clone().into()))
}
// Try again otherwise
_ => Err(RetryError::Retry(error.clone().into())),
};
}
// Try again otherwise
_ => Err(RetryError::Retry(error.clone().into())),
};
}
_ => (),
}
}
_ => (),
_ = sleep(Duration::from_secs(60)) => {
info!("Connection attempt timed out after 60 seconds of no events");
return Err(RetryError::Retry(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"Connection attempt timed out",
).into()));
}
}
}
}
Expand Down

0 comments on commit 31ad596

Please sign in to comment.