diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 6715715099..f1441b6bf8 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -24,10 +24,7 @@ use crate::{ frame::{Close, Datagram, FrameStruct}, packet::{Header, LongType, Packet, PartialDecode, SpaceId}, range_set::ArrayRangeSet, - shared::{ - ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent, - EndpointEventInner, - }, + shared::{ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint}, token::ResetToken, transport_parameters::TransportParameters, ConnectionHandle, Dir, Endpoint, EndpointConfig, Frame, Side, StreamId, Transmit, @@ -84,10 +81,10 @@ use timer::{Timer, TimerTable}; /// Protocol state and logic for a single QUIC connection /// -/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application -/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and -/// expects timeouts through various methods. A number of simple getter methods are exposed -/// to allow callers to inspect some of the connection state. +/// Objects of this type receive [`ConnectionEvent`]s and emit application [`Event`]s to make +/// progress. To handle timeouts, a `Connection` returns timer updates and expects timeouts through +/// various methods. A number of simple getter methods are exposed to allow callers to inspect some +/// of the connection state. /// /// `Connection` has roughly 4 types of methods: /// @@ -108,8 +105,7 @@ use timer::{Timer, TimerTable}; /// /// 1. [`poll_transmit`](Self::poll_transmit) /// 2. [`poll_timeout`](Self::poll_timeout) -/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events) -/// 4. [`poll`](Self::poll) +/// 3. [`poll`](Self::poll) /// /// Currently the only actual dependency is from (2) to (1), however additional /// dependencies may be added in future, so the above order is recommended. @@ -156,7 +152,6 @@ pub struct Connection { /// Total number of outgoing packets that have been deemed lost lost_packets: u64, events: VecDeque, - endpoint_events: VecDeque, /// Whether the spin bit is in use for this connection spin_enabled: bool, /// Outgoing spin bit state @@ -294,7 +289,6 @@ impl Connection { retry_src_cid: None, lost_packets: 0, events: VecDeque::new(), - endpoint_events: VecDeque::new(), spin_enabled: config.allow_spin && rng.gen_ratio(7, 8), spin: false, spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)], @@ -376,12 +370,6 @@ impl Connection { None } - /// Return endpoint-facing events - #[must_use] - pub fn poll_endpoint_events(&mut self) -> Option { - self.endpoint_events.pop_front().map(EndpointEvent) - } - /// Provide control over streams #[must_use] pub fn streams(&mut self) -> Streams<'_> { @@ -888,8 +876,8 @@ impl Connection { /// Process `ConnectionEvent`s generated by the associated `Endpoint` /// /// Will execute protocol logic upon receipt of a connection event, in turn preparing signals - /// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be - /// extracted through the relevant methods. + /// (including application `Event`s and outgoing datagrams) that should be extracted through the + /// relevant methods. pub fn handle_event(&mut self, event: ConnectionEvent, endpoint: &Endpoint) { use self::ConnectionEventInner::*; match event.0 { @@ -956,9 +944,8 @@ impl Connection { /// Process timer expirations /// - /// Executes protocol logic, potentially preparing signals (including application `Event`s, - /// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant - /// methods. + /// Executes protocol logic, potentially preparing signals (including application `Event`s, and + /// outgoing datagrams) that should be extracted through the relevant methods. /// /// It is most efficient to call this immediately after the system clock reaches the latest /// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply @@ -973,10 +960,11 @@ impl Connection { match timer { Timer::Close => { self.state = State::Drained; - self.endpoint_events.push_back(EndpointEventInner::Drained); + endpoint.handle_drained(self.handle); } Timer::Idle => { self.kill(ConnectionError::TimedOut); + endpoint.handle_drained(self.handle); } Timer::KeepAlive => { trace!("sending keep-alive"); @@ -2123,7 +2111,7 @@ impl Connection { } } if !was_drained && self.state.is_drained() { - self.endpoint_events.push_back(EndpointEventInner::Drained); + endpoint.handle_drained(self.handle); // Close timer may have been started previously, e.g. if we sent a close and got a // stateless reset in response self.timers.stop(Timer::Close); @@ -3276,7 +3264,6 @@ impl Connection { self.close_common(); self.error = Some(reason); self.state = State::Drained; - self.endpoint_events.push_back(EndpointEventInner::Drained); } } diff --git a/quinn-proto/src/endpoint.rs b/quinn-proto/src/endpoint.rs index 62afcc6985..6e862a8f84 100644 --- a/quinn-proto/src/endpoint.rs +++ b/quinn-proto/src/endpoint.rs @@ -23,10 +23,7 @@ use crate::{ crypto::{self, Keys, UnsupportedVersion}, frame, packet::{Header, Packet, PacketDecodeError, PacketNumber, PartialDecode}, - shared::{ - ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent, - EndpointEventInner, IssuedCid, - }, + shared::{ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, IssuedCid}, transport_parameters::TransportParameters, ResetToken, RetryToken, Side, Transmit, TransportConfig, TransportError, INITIAL_MTU, MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE, @@ -75,23 +72,9 @@ impl Endpoint { *self.server_config.write().unwrap() = server_config; } - /// Process `EndpointEvent`s emitted from related `Connection`s - /// - /// In turn, processing this event may return a `ConnectionEvent` for the same - /// `Connection`. Must never be called concurrently with the same `ch`. - pub fn handle_event( - &self, - ch: ConnectionHandle, - event: EndpointEvent, - ) -> Option { - use EndpointEventInner::*; - match event.0 { - Drained => { - let conn = self.connections.lock().unwrap().remove(ch.0); - self.index.write().unwrap().remove(&conn); - } - } - None + pub(crate) fn handle_drained(&self, ch: ConnectionHandle) { + let conn = self.connections.lock().unwrap().remove(ch.0); + self.index.write().unwrap().remove(&conn); } pub(crate) fn set_reset_token( @@ -573,7 +556,7 @@ impl Endpoint { } Err(e) => { debug!("handshake failed: {}", e); - self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained)); + self.handle_drained(ch); if let ConnectionError::TransportError(e) = e { Some(DatagramEvent::Response( self.initial_close(version, addresses, crypto, &src_cid, e), diff --git a/quinn-proto/src/lib.rs b/quinn-proto/src/lib.rs index 4dc8b77083..c1563ddac9 100644 --- a/quinn-proto/src/lib.rs +++ b/quinn-proto/src/lib.rs @@ -65,7 +65,7 @@ mod endpoint; pub use crate::endpoint::{ConnectError, ConnectionHandle, DatagramEvent, Endpoint}; mod shared; -pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint, EndpointEvent}; +pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint}; mod transport_error; pub use crate::transport_error::{Code as TransportErrorCode, Error as TransportError}; diff --git a/quinn-proto/src/shared.rs b/quinn-proto/src/shared.rs index ee7063c46c..cee399d3fa 100644 --- a/quinn-proto/src/shared.rs +++ b/quinn-proto/src/shared.rs @@ -20,33 +20,6 @@ pub(crate) enum ConnectionEventInner { }, } -/// Events sent from a Connection to an Endpoint -#[derive(Debug)] -pub struct EndpointEvent(pub(crate) EndpointEventInner); - -impl EndpointEvent { - /// Construct an event that indicating that a `Connection` will no longer emit events - /// - /// Useful for notifying an `Endpoint` that a `Connection` has been destroyed outside of the - /// usual state machine flow, e.g. when being dropped by the user. - pub fn drained() -> Self { - Self(EndpointEventInner::Drained) - } - - /// Determine whether this is the last event a `Connection` will emit - /// - /// Useful for determining when connection-related event loop state can be freed. - pub fn is_drained(&self) -> bool { - self.0 == EndpointEventInner::Drained - } -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub(crate) enum EndpointEventInner { - /// The connection has been drained - Drained, -} - /// Protocol-level identifier for a connection. /// /// Mainly useful for identifying this connection's packets on the wire with tools like Wireshark. diff --git a/quinn-proto/src/tests/util.rs b/quinn-proto/src/tests/util.rs index 77f73b1d64..c4fc1787f0 100644 --- a/quinn-proto/src/tests/util.rs +++ b/quinn-proto/src/tests/util.rs @@ -329,41 +329,22 @@ impl TestEndpoint { } } - loop { - let mut endpoint_events: Vec<(ConnectionHandle, EndpointEvent)> = vec![]; - for (ch, conn) in self.connections.iter_mut() { - if self.timeout.map_or(false, |x| x <= now) { - self.timeout = None; - conn.handle_timeout(now, &self.endpoint); - } - - for (_, mut events) in self.conn_events.drain() { - for event in events.drain(..) { - conn.handle_event(event, &self.endpoint); - } - } - - while let Some(event) = conn.poll_endpoint_events() { - endpoint_events.push((*ch, event)); - } - - while let Some(x) = conn.poll_transmit(now, MAX_DATAGRAMS) { - self.outbound.extend(split_transmit(x)); - } - self.timeout = conn.poll_timeout(); + for conn in self.connections.values_mut() { + if self.timeout.map_or(false, |x| x <= now) { + self.timeout = None; + conn.handle_timeout(now, &self.endpoint); } - if endpoint_events.is_empty() { - break; + for (_, mut events) in self.conn_events.drain() { + for event in events.drain(..) { + conn.handle_event(event, &self.endpoint); + } } - for (ch, event) in endpoint_events { - if let Some(event) = self.handle_event(ch, event) { - if let Some(conn) = self.connections.get_mut(&ch) { - conn.handle_event(event, &self.endpoint); - } - } + while let Some(x) = conn.poll_transmit(now, MAX_DATAGRAMS) { + self.outbound.extend(split_transmit(x)); } + self.timeout = conn.poll_timeout(); } } diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index f8a04ad182..2750624bb3 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -236,12 +236,14 @@ impl Future for ConnectionDriver { // If a timer expires, there might be more to transmit. When we transmit something, we // might need to reset a timer. Hence, we must loop until neither happens. keep_going |= conn.drive_timer(cx, &self.0.shared); - // Connection might request and receive new CIDs, which prompts a transmit. Future work: - // this should probably happen synchronously inside the connection. - keep_going |= conn.forward_endpoint_events(&self.0.shared); conn.forward_app_events(&self.0.shared); - if !conn.inner.is_drained() { + if conn.inner.is_drained() { + // Notify endpoint driver to clean up its own resources + let _ = conn + .endpoint_events + .send((conn.handle, EndpointEvent::Drained)); + } else { if keep_going { // If the connection hasn't processed all tasks, schedule it again cx.waker().wake_by_ref(); @@ -893,23 +895,6 @@ impl State { false } - fn forward_endpoint_events(&mut self, shared: &Shared) -> bool { - let mut keep_going = false; - while let Some(event) = self.inner.poll_endpoint_events() { - if event.is_drained() { - // Notify endpoint driver to clean up its own resources - let _ = self - .endpoint_events - .send((self.handle, EndpointEvent::Drained)); - } - if let Some(event) = shared.endpoint.handle_event(self.handle, event) { - self.inner.handle_event(event, &shared.endpoint); - keep_going = true; - } - } - keep_going - } - /// If this returns `Err`, the endpoint is dead, so the driver should exit immediately. fn process_conn_events( &mut self,