Skip to content

Commit

Permalink
Merge pull request #1916 from blockstack/fix/p2p-event-id-leak
Browse files Browse the repository at this point in the history
Fix event ID resource leak for connecting sockets that time out
  • Loading branch information
jcnelson authored Sep 27, 2020
2 parents 473aef1 + 4ece6a8 commit 2a1cf70
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 27 deletions.
107 changes: 80 additions & 27 deletions src/net/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,19 +1072,16 @@ impl PeerNetwork {

/// Deregister a socket/event pair
pub fn deregister_peer(&mut self, event_id: usize) -> () {
test_debug!("{:?}: Disconnect event {}", &self.local_peer, event_id);
if self.peers.contains_key(&event_id) {
self.peers.remove(&event_id);
}
debug!("{:?}: Disconnect event {}", &self.local_peer, event_id);

let mut to_remove : Vec<NeighborKey> = vec![];
let mut nk_remove : Vec<NeighborKey> = vec![];
for (neighbor_key, ev_id) in self.events.iter() {
if *ev_id == event_id {
to_remove.push(neighbor_key.clone());
nk_remove.push(neighbor_key.clone());
}
}
for nk in to_remove {
// remove events
for nk in nk_remove.into_iter() {
// remove event state
self.events.remove(&nk);

// remove inventory state
Expand All @@ -1097,30 +1094,22 @@ impl PeerNetwork {
}
}

let mut to_remove : Vec<usize> = vec![];

match self.network {
None => {},
Some(ref mut network) => {
match self.sockets.get_mut(&event_id) {
None => {},
Some(ref sock) => {
let _ = network.deregister(event_id, sock);
to_remove.push(event_id); // force it to close anyway
}
// deregister socket if connected and registered already
if let Some(socket) = self.sockets.remove(&event_id) {
let _ = network.deregister(event_id, &socket);
}
// deregister socket if still connecting
if let Some((socket, ..)) = self.connecting.remove(&event_id) {
let _ = network.deregister(event_id, &socket);
}
}
}

// always clear connecting socket state
self.connecting.remove(&event_id);

for event_id in to_remove {
// remove socket
self.sockets.remove(&event_id);
self.connecting.remove(&event_id);
self.relay_handles.remove(&event_id);
}

self.relay_handles.remove(&event_id);
self.peers.remove(&event_id);
}

/// Deregister by neighbor key
Expand Down Expand Up @@ -1422,7 +1411,7 @@ impl PeerNetwork {
}

/// Remove unresponsive peers
fn disconnect_unresponsive(&mut self) -> () {
fn disconnect_unresponsive(&mut self) -> usize {
let now = get_epoch_time_secs();
let mut to_remove = vec![];
for (event_id, (socket, _, ts)) in self.connecting.iter() {
Expand Down Expand Up @@ -1450,9 +1439,11 @@ impl PeerNetwork {
}
}

let ret = to_remove.len();
for event_id in to_remove.into_iter() {
self.deregister_peer(event_id);
}
ret
}

/// Prune inbound and outbound connections if we can
Expand Down Expand Up @@ -2653,6 +2644,68 @@ mod test {
p2p
}

#[test]
fn test_event_id_no_connecting_leaks() {
with_timeout(100, || {
let neighbor = make_test_neighbor(2300);
let mut p2p = make_test_p2p_network(&vec![]);

use std::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:2300").unwrap();

// start fake neighbor endpoint, which will accept once and wait 35 seconds
let endpoint_thread = thread::spawn(move || {
let (sock, addr) = listener.accept().unwrap();
test_debug!("Accepted {:?}", &addr);
thread::sleep(time::Duration::from_millis(35_000));
});

p2p.bind(&"127.0.0.1:2400".parse().unwrap(), &"127.0.0.1:2401".parse().unwrap()).unwrap();
p2p.connect_peer(&neighbor.addr).unwrap();

// start dispatcher
let p2p_thread = thread::spawn(move || {
let mut total_disconnected = 0;
for i in 0..40 {
test_debug!("dispatch batch {}", i);

p2p.dispatch_requests();
let mut poll_states = match p2p.network {
None => {
panic!("network not connected");
},
Some(ref mut network) => {
network.poll(100).unwrap()
}
};

let mut p2p_poll_state = poll_states.remove(&p2p.p2p_network_handle).unwrap();

p2p.process_new_sockets(&mut p2p_poll_state).unwrap();
p2p.process_connecting_sockets(&mut p2p_poll_state);
total_disconnected += p2p.disconnect_unresponsive();

let ne = p2p.network.as_ref().unwrap().num_events();
test_debug!("{} events", ne);

thread::sleep(time::Duration::from_millis(1000));
}

assert_eq!(total_disconnected, 1);

// no leaks -- only server events remain
assert_eq!(p2p.network.as_ref().unwrap().num_events(), 2);
});

p2p_thread.join().unwrap();
test_debug!("dispatcher thread joined");

endpoint_thread.join().unwrap();
test_debug!("fake endpoint thread joined");
})
}


// tests relay_signed_message()
#[test]
#[ignore]
Expand Down
4 changes: 4 additions & 0 deletions src/net/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ impl NetworkState {
})
}

pub fn num_events(&self) -> usize {
self.event_map.len()
}

fn bind_address(addr: &SocketAddr) -> Result<mio_net::TcpListener, net_error> {
if !cfg!(test) {
mio_net::TcpListener::bind(addr)
Expand Down

0 comments on commit 2a1cf70

Please sign in to comment.