Skip to content

Commit

Permalink
Notify endpoint of drained connections synchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed May 29, 2023
1 parent d8a3da9 commit 282b5c7
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 127 deletions.
39 changes: 13 additions & 26 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ use crate::{
frame::{Close, Datagram, FrameStruct},
packet::{Header, LongType, Packet, PartialDecode, SpaceId},
range_set::ArrayRangeSet,
shared::{
ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent,
EndpointEventInner,
},
shared::{ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint},
token::ResetToken,
transport_parameters::TransportParameters,
ConnectionHandle, Dir, Endpoint, EndpointConfig, Frame, Side, StreamId, Transmit,
Expand Down Expand Up @@ -84,10 +81,10 @@ use timer::{Timer, TimerTable};

/// Protocol state and logic for a single QUIC connection
///
/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
/// expects timeouts through various methods. A number of simple getter methods are exposed
/// to allow callers to inspect some of the connection state.
/// Objects of this type receive [`ConnectionEvent`]s and emit application [`Event`]s to make
/// progress. To handle timeouts, a `Connection` returns timer updates and expects timeouts through
/// various methods. A number of simple getter methods are exposed to allow callers to inspect some
/// of the connection state.
///
/// `Connection` has roughly 4 types of methods:
///
Expand All @@ -108,8 +105,7 @@ use timer::{Timer, TimerTable};
///
/// 1. [`poll_transmit`](Self::poll_transmit)
/// 2. [`poll_timeout`](Self::poll_timeout)
/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
/// 4. [`poll`](Self::poll)
/// 3. [`poll`](Self::poll)
///
/// Currently the only actual dependency is from (2) to (1), however additional
/// dependencies may be added in future, so the above order is recommended.
Expand Down Expand Up @@ -156,7 +152,6 @@ pub struct Connection {
/// Total number of outgoing packets that have been deemed lost
lost_packets: u64,
events: VecDeque<Event>,
endpoint_events: VecDeque<EndpointEventInner>,
/// Whether the spin bit is in use for this connection
spin_enabled: bool,
/// Outgoing spin bit state
Expand Down Expand Up @@ -294,7 +289,6 @@ impl Connection {
retry_src_cid: None,
lost_packets: 0,
events: VecDeque::new(),
endpoint_events: VecDeque::new(),
spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
spin: false,
spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
Expand Down Expand Up @@ -376,12 +370,6 @@ impl Connection {
None
}

/// Return endpoint-facing events
#[must_use]
pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
self.endpoint_events.pop_front().map(EndpointEvent)
}

/// Provide control over streams
#[must_use]
pub fn streams(&mut self) -> Streams<'_> {
Expand Down Expand Up @@ -888,8 +876,8 @@ impl Connection {
/// Process `ConnectionEvent`s generated by the associated `Endpoint`
///
/// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
/// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
/// extracted through the relevant methods.
/// (including application `Event`s and outgoing datagrams) that should be extracted through the
/// relevant methods.
pub fn handle_event(&mut self, event: ConnectionEvent, endpoint: &Endpoint) {
use self::ConnectionEventInner::*;
match event.0 {
Expand Down Expand Up @@ -956,9 +944,8 @@ impl Connection {

/// Process timer expirations
///
/// Executes protocol logic, potentially preparing signals (including application `Event`s,
/// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
/// methods.
/// Executes protocol logic, potentially preparing signals (including application `Event`s, and
/// outgoing datagrams) that should be extracted through the relevant methods.
///
/// It is most efficient to call this immediately after the system clock reaches the latest
/// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
Expand All @@ -973,10 +960,11 @@ impl Connection {
match timer {
Timer::Close => {
self.state = State::Drained;
self.endpoint_events.push_back(EndpointEventInner::Drained);
endpoint.handle_drained(self.handle);
}
Timer::Idle => {
self.kill(ConnectionError::TimedOut);
endpoint.handle_drained(self.handle);
}
Timer::KeepAlive => {
trace!("sending keep-alive");
Expand Down Expand Up @@ -2123,7 +2111,7 @@ impl Connection {
}
}
if !was_drained && self.state.is_drained() {
self.endpoint_events.push_back(EndpointEventInner::Drained);
endpoint.handle_drained(self.handle);
// Close timer may have been started previously, e.g. if we sent a close and got a
// stateless reset in response
self.timers.stop(Timer::Close);
Expand Down Expand Up @@ -3276,7 +3264,6 @@ impl Connection {
self.close_common();
self.error = Some(reason);
self.state = State::Drained;
self.endpoint_events.push_back(EndpointEventInner::Drained);
}
}

Expand Down
27 changes: 5 additions & 22 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use crate::{
crypto::{self, Keys, UnsupportedVersion},
frame,
packet::{Header, Packet, PacketDecodeError, PacketNumber, PartialDecode},
shared::{
ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent,
EndpointEventInner, IssuedCid,
},
shared::{ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, IssuedCid},
transport_parameters::TransportParameters,
ResetToken, RetryToken, Side, Transmit, TransportConfig, TransportError, INITIAL_MTU,
MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE,
Expand Down Expand Up @@ -75,23 +72,9 @@ impl Endpoint {
*self.server_config.write().unwrap() = server_config;
}

/// Process `EndpointEvent`s emitted from related `Connection`s
///
/// In turn, processing this event may return a `ConnectionEvent` for the same
/// `Connection`. Must never be called concurrently with the same `ch`.
pub fn handle_event(
&self,
ch: ConnectionHandle,
event: EndpointEvent,
) -> Option<ConnectionEvent> {
use EndpointEventInner::*;
match event.0 {
Drained => {
let conn = self.connections.lock().unwrap().remove(ch.0);
self.index.write().unwrap().remove(&conn);
}
}
None
pub(crate) fn handle_drained(&self, ch: ConnectionHandle) {
let conn = self.connections.lock().unwrap().remove(ch.0);
self.index.write().unwrap().remove(&conn);
}

pub(crate) fn set_reset_token(
Expand Down Expand Up @@ -573,7 +556,7 @@ impl Endpoint {
}
Err(e) => {
debug!("handshake failed: {}", e);
self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
self.handle_drained(ch);
if let ConnectionError::TransportError(e) = e {
Some(DatagramEvent::Response(
self.initial_close(version, addresses, crypto, &src_cid, e),
Expand Down
2 changes: 1 addition & 1 deletion quinn-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod endpoint;
pub use crate::endpoint::{ConnectError, ConnectionHandle, DatagramEvent, Endpoint};

mod shared;
pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint, EndpointEvent};
pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint};

mod transport_error;
pub use crate::transport_error::{Code as TransportErrorCode, Error as TransportError};
Expand Down
27 changes: 0 additions & 27 deletions quinn-proto/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,6 @@ pub(crate) enum ConnectionEventInner {
},
}

/// Events sent from a Connection to an Endpoint
#[derive(Debug)]
pub struct EndpointEvent(pub(crate) EndpointEventInner);

impl EndpointEvent {
/// Construct an event that indicating that a `Connection` will no longer emit events
///
/// Useful for notifying an `Endpoint` that a `Connection` has been destroyed outside of the
/// usual state machine flow, e.g. when being dropped by the user.
pub fn drained() -> Self {
Self(EndpointEventInner::Drained)
}

/// Determine whether this is the last event a `Connection` will emit
///
/// Useful for determining when connection-related event loop state can be freed.
pub fn is_drained(&self) -> bool {
self.0 == EndpointEventInner::Drained
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) enum EndpointEventInner {
/// The connection has been drained
Drained,
}

/// Protocol-level identifier for a connection.
///
/// Mainly useful for identifying this connection's packets on the wire with tools like Wireshark.
Expand Down
41 changes: 11 additions & 30 deletions quinn-proto/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,41 +329,22 @@ impl TestEndpoint {
}
}

loop {
let mut endpoint_events: Vec<(ConnectionHandle, EndpointEvent)> = vec![];
for (ch, conn) in self.connections.iter_mut() {
if self.timeout.map_or(false, |x| x <= now) {
self.timeout = None;
conn.handle_timeout(now, &self.endpoint);
}

for (_, mut events) in self.conn_events.drain() {
for event in events.drain(..) {
conn.handle_event(event, &self.endpoint);
}
}

while let Some(event) = conn.poll_endpoint_events() {
endpoint_events.push((*ch, event));
}

while let Some(x) = conn.poll_transmit(now, MAX_DATAGRAMS) {
self.outbound.extend(split_transmit(x));
}
self.timeout = conn.poll_timeout();
for conn in self.connections.values_mut() {
if self.timeout.map_or(false, |x| x <= now) {
self.timeout = None;
conn.handle_timeout(now, &self.endpoint);
}

if endpoint_events.is_empty() {
break;
for (_, mut events) in self.conn_events.drain() {
for event in events.drain(..) {
conn.handle_event(event, &self.endpoint);
}
}

for (ch, event) in endpoint_events {
if let Some(event) = self.handle_event(ch, event) {
if let Some(conn) = self.connections.get_mut(&ch) {
conn.handle_event(event, &self.endpoint);
}
}
while let Some(x) = conn.poll_transmit(now, MAX_DATAGRAMS) {
self.outbound.extend(split_transmit(x));
}
self.timeout = conn.poll_timeout();
}
}

Expand Down
27 changes: 6 additions & 21 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,14 @@ impl Future for ConnectionDriver {
// If a timer expires, there might be more to transmit. When we transmit something, we
// might need to reset a timer. Hence, we must loop until neither happens.
keep_going |= conn.drive_timer(cx, &self.0.shared);
// Connection might request and receive new CIDs, which prompts a transmit. Future work:
// this should probably happen synchronously inside the connection.
keep_going |= conn.forward_endpoint_events(&self.0.shared);
conn.forward_app_events(&self.0.shared);

if !conn.inner.is_drained() {
if conn.inner.is_drained() {
// Notify endpoint driver to clean up its own resources
let _ = conn
.endpoint_events
.send((conn.handle, EndpointEvent::Drained));
} else {
if keep_going {
// If the connection hasn't processed all tasks, schedule it again
cx.waker().wake_by_ref();
Expand Down Expand Up @@ -893,23 +895,6 @@ impl State {
false
}

fn forward_endpoint_events(&mut self, shared: &Shared) -> bool {
let mut keep_going = false;
while let Some(event) = self.inner.poll_endpoint_events() {
if event.is_drained() {
// Notify endpoint driver to clean up its own resources
let _ = self
.endpoint_events
.send((self.handle, EndpointEvent::Drained));
}
if let Some(event) = shared.endpoint.handle_event(self.handle, event) {
self.inner.handle_event(event, &shared.endpoint);
keep_going = true;
}
}
keep_going
}

/// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
fn process_conn_events(
&mut self,
Expand Down

0 comments on commit 282b5c7

Please sign in to comment.