diff --git a/src/net/p2p.rs b/src/net/p2p.rs index a723a85428..f1656eaca3 100644 --- a/src/net/p2p.rs +++ b/src/net/p2p.rs @@ -1072,19 +1072,16 @@ impl PeerNetwork { /// Deregister a socket/event pair pub fn deregister_peer(&mut self, event_id: usize) -> () { - test_debug!("{:?}: Disconnect event {}", &self.local_peer, event_id); - if self.peers.contains_key(&event_id) { - self.peers.remove(&event_id); - } + debug!("{:?}: Disconnect event {}", &self.local_peer, event_id); - let mut to_remove : Vec = vec![]; + let mut nk_remove : Vec = vec![]; for (neighbor_key, ev_id) in self.events.iter() { if *ev_id == event_id { - to_remove.push(neighbor_key.clone()); + nk_remove.push(neighbor_key.clone()); } } - for nk in to_remove { - // remove events + for nk in nk_remove.into_iter() { + // remove event state self.events.remove(&nk); // remove inventory state @@ -1097,30 +1094,22 @@ impl PeerNetwork { } } - let mut to_remove : Vec = vec![]; - match self.network { None => {}, Some(ref mut network) => { - match self.sockets.get_mut(&event_id) { - None => {}, - Some(ref sock) => { - let _ = network.deregister(event_id, sock); - to_remove.push(event_id); // force it to close anyway - } + // deregister socket if connected and registered already + if let Some(socket) = self.sockets.remove(&event_id) { + let _ = network.deregister(event_id, &socket); + } + // deregister socket if still connecting + if let Some((socket, ..)) = self.connecting.remove(&event_id) { + let _ = network.deregister(event_id, &socket); } } } - - // always clear connecting socket state - self.connecting.remove(&event_id); - - for event_id in to_remove { - // remove socket - self.sockets.remove(&event_id); - self.connecting.remove(&event_id); - self.relay_handles.remove(&event_id); - } + + self.relay_handles.remove(&event_id); + self.peers.remove(&event_id); } /// Deregister by neighbor key @@ -1422,7 +1411,7 @@ impl PeerNetwork { } /// Remove unresponsive peers - fn disconnect_unresponsive(&mut self) -> () { + fn disconnect_unresponsive(&mut self) -> usize { let now = get_epoch_time_secs(); let mut to_remove = vec![]; for (event_id, (socket, _, ts)) in self.connecting.iter() { @@ -1450,9 +1439,11 @@ impl PeerNetwork { } } + let ret = to_remove.len(); for event_id in to_remove.into_iter() { self.deregister_peer(event_id); } + ret } /// Prune inbound and outbound connections if we can @@ -2653,6 +2644,68 @@ mod test { p2p } + #[test] + fn test_event_id_no_connecting_leaks() { + with_timeout(100, || { + let neighbor = make_test_neighbor(2300); + let mut p2p = make_test_p2p_network(&vec![]); + + use std::net::TcpListener; + let listener = TcpListener::bind("127.0.0.1:2300").unwrap(); + + // start fake neighbor endpoint, which will accept once and wait 35 seconds + let endpoint_thread = thread::spawn(move || { + let (sock, addr) = listener.accept().unwrap(); + test_debug!("Accepted {:?}", &addr); + thread::sleep(time::Duration::from_millis(35_000)); + }); + + p2p.bind(&"127.0.0.1:2400".parse().unwrap(), &"127.0.0.1:2401".parse().unwrap()).unwrap(); + p2p.connect_peer(&neighbor.addr).unwrap(); + + // start dispatcher + let p2p_thread = thread::spawn(move || { + let mut total_disconnected = 0; + for i in 0..40 { + test_debug!("dispatch batch {}", i); + + p2p.dispatch_requests(); + let mut poll_states = match p2p.network { + None => { + panic!("network not connected"); + }, + Some(ref mut network) => { + network.poll(100).unwrap() + } + }; + + let mut p2p_poll_state = poll_states.remove(&p2p.p2p_network_handle).unwrap(); + + p2p.process_new_sockets(&mut p2p_poll_state).unwrap(); + p2p.process_connecting_sockets(&mut p2p_poll_state); + total_disconnected += p2p.disconnect_unresponsive(); + + let ne = p2p.network.as_ref().unwrap().num_events(); + test_debug!("{} events", ne); + + thread::sleep(time::Duration::from_millis(1000)); + } + + assert_eq!(total_disconnected, 1); + + // no leaks -- only server events remain + assert_eq!(p2p.network.as_ref().unwrap().num_events(), 2); + }); + + p2p_thread.join().unwrap(); + test_debug!("dispatcher thread joined"); + + endpoint_thread.join().unwrap(); + test_debug!("fake endpoint thread joined"); + }) + } + + // tests relay_signed_message() #[test] #[ignore] diff --git a/src/net/poll.rs b/src/net/poll.rs index afbbc37c7a..cee581fb86 100644 --- a/src/net/poll.rs +++ b/src/net/poll.rs @@ -106,6 +106,10 @@ impl NetworkState { }) } + pub fn num_events(&self) -> usize { + self.event_map.len() + } + fn bind_address(addr: &SocketAddr) -> Result { if !cfg!(test) { mio_net::TcpListener::bind(addr)