Skip to content

Commit

Permalink
Reviewer comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yashykt committed Nov 12, 2024
1 parent aa8d7d4 commit 7ee0882
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 67 deletions.
126 changes: 63 additions & 63 deletions src/core/ext/transport/chttp2/server/chttp2_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1198,8 +1198,6 @@ void NewChttp2ServerListener::ActiveConnection::HandshakingState::
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = GRPC_ERROR_CREATE(
"Did not receive HTTP/2 settings before handshake timeout");
CHECK(absl::holds_alternative<RefCountedPtr<grpc_chttp2_transport>>(
connection_->state_));
absl::get<RefCountedPtr<grpc_chttp2_transport>>(connection_->state_)
->PerformOp(op);
}
Expand All @@ -1223,64 +1221,57 @@ void NewChttp2ServerListener::ActiveConnection::HandshakingState::
OnHandshakeDoneLocked(absl::StatusOr<HandshakerArgs*> result) {
OrphanablePtr<HandshakingState> handshaking_state_ref;
RefCountedPtr<HandshakeManager> handshake_mgr;
bool release_connection = false;
if (!result.ok() || connection_->shutdown_) {
release_connection = true;
} else {
// If the handshaking succeeded but there is no endpoint, then the
// handshaker may have handed off the connection to some external
// code, so we can just clean up here without creating a transport.
if ((*result)->endpoint != nullptr) {
RefCountedPtr<Transport> transport =
grpc_create_chttp2_transport((*result)->args,
std::move((*result)->endpoint), false)
->Ref();
grpc_error_handle channel_init_err =
connection_->listener_state_->server()->SetupTransport(
transport.get(), accepting_pollset_, (*result)->args,
grpc_chttp2_transport_get_socket_node(transport.get()));
if (channel_init_err.ok()) {
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
connection_->state_ =
DownCast<grpc_chttp2_transport*>(transport.get())->Ref();
Ref().release(); // Held by OnReceiveSettings().
GRPC_CLOSURE_INIT(&on_receive_settings_, OnReceiveSettings, this,
grpc_schedule_on_exec_ctx);
grpc_closure* on_close = &connection_->on_close_;
// Refs helds by OnClose()
connection_->Ref().release();
grpc_chttp2_transport_start_reading(
transport.get(), (*result)->read_buffer.c_slice_buffer(),
&on_receive_settings_, nullptr, on_close);
timer_handle_ = connection_->listener_state_->event_engine()->RunAfter(
deadline_ - Timestamp::Now(), [self = Ref()]() mutable {
// HandshakingState deletion might require an active ExecCtx.
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto* self_ptr = self.get();
self_ptr->connection_->work_serializer_.Run(
[self = std::move(self)]() mutable {
self->OnTimeoutLocked();
},
DEBUG_LOCATION);
});
} else {
// Failed to create channel from transport. Clean up.
LOG(ERROR) << "Failed to create channel: "
<< StatusToString(channel_init_err);
transport->Orphan();
release_connection = true;
}
// If the handshaking succeeded but there is no endpoint, then the
// handshaker may have handed off the connection to some external
// code, so we can just clean up here without creating a transport.
if (!connection_->shutdown_ && result.ok() &&
(*result)->endpoint != nullptr) {
RefCountedPtr<Transport> transport =
grpc_create_chttp2_transport((*result)->args,
std::move((*result)->endpoint), false)
->Ref();
grpc_error_handle channel_init_err =
connection_->listener_state_->server()->SetupTransport(
transport.get(), accepting_pollset_, (*result)->args,
grpc_chttp2_transport_get_socket_node(transport.get()));
if (channel_init_err.ok()) {
// Use notify_on_receive_settings callback to enforce the
// handshake deadline.
connection_->state_ =
DownCast<grpc_chttp2_transport*>(transport.get())->Ref();
Ref().release(); // Held by OnReceiveSettings().
GRPC_CLOSURE_INIT(&on_receive_settings_, OnReceiveSettings, this,
grpc_schedule_on_exec_ctx);
grpc_closure* on_close = &connection_->on_close_;
// Refs helds by OnClose()
connection_->Ref().release();
grpc_chttp2_transport_start_reading(
transport.get(), (*result)->read_buffer.c_slice_buffer(),
&on_receive_settings_, nullptr, on_close);
timer_handle_ = connection_->listener_state_->event_engine()->RunAfter(
deadline_ - Timestamp::Now(), [self = Ref()]() mutable {
// HandshakingState deletion might require an active ExecCtx.
ApplicationCallbackExecCtx callback_exec_ctx;
ExecCtx exec_ctx;
auto* self_ptr = self.get();
self_ptr->connection_->work_serializer_.Run(
[self = std::move(self)]() { self->OnTimeoutLocked(); },
DEBUG_LOCATION);
});
} else {
release_connection = true;
// Failed to create channel from transport. Clean up.
LOG(ERROR) << "Failed to create channel: "
<< StatusToString(channel_init_err);
transport->Orphan();
}
}
// Since the handshake manager is done, the connection no longer needs to
// shutdown the handshake when the listener needs to stop serving.
handshake_mgr_.reset();
connection_->listener_state_->OnHandshakeDone(connection_.get());
if (release_connection) {
// Clean up if we don't have a transport
if (!absl::holds_alternative<RefCountedPtr<grpc_chttp2_transport>>(
connection_->state_)) {
connection_->listener_state_->connection_quota()->ReleaseConnections(1);
connection_->listener_state_->RemoveLogicalConnection(connection_.get());
}
Expand Down Expand Up @@ -1344,8 +1335,6 @@ void NewChttp2ServerListener::ActiveConnection::Start(const ChannelArgs& args) {
// owning NewChttp2ServerListener and all associated
// ActiveConnections have been orphaned.
if (self->shutdown_) return;
CHECK(absl::holds_alternative<OrphanablePtr<HandshakingState>>(
self->state_));
absl::get<OrphanablePtr<HandshakingState>>(self->state_)
->StartLocked(args);
},
Expand Down Expand Up @@ -1386,14 +1375,25 @@ void NewChttp2ServerListener::ActiveConnection::SendGoAwayImplLocked() {

void NewChttp2ServerListener::ActiveConnection::
DisconnectImmediatelyImplLocked() {
RefCountedPtr<grpc_chttp2_transport>* transport =
absl::get_if<RefCountedPtr<grpc_chttp2_transport>>(&state_);
if (transport != nullptr && (*transport) != nullptr) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = GRPC_ERROR_CREATE(
"Drain grace time expired. Closing connection immediately.");
(*transport)->PerformOp(op);
}
shutdown_ = true;
Match(
state_,
[](const OrphanablePtr<HandshakingState>& handshaking_state) {
// Shutdown the handshaker if it's still in progress.
if (handshaking_state != nullptr) {
handshaking_state->ShutdownLocked(
absl::UnavailableError("Connection to be disconnected"));
}
},
[](const RefCountedPtr<grpc_chttp2_transport>& transport) {
// Disconnect immediately if the transport exists
if (transport != nullptr) {
grpc_transport_op* op = grpc_make_transport_op(nullptr);
op->disconnect_with_error = GRPC_ERROR_CREATE(
"Drain grace time expired. Closing connection immediately.");
transport->PerformOp(op);
}
});
}

//
Expand Down
15 changes: 14 additions & 1 deletion src/core/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ absl::optional<ChannelArgs> Server::ListenerState::AddLogicalConnection(
}
connection_manager = connection_manager_;
}
// The following section is intentionally outside the critical section. The
// operation to update channel args for a connection is heavy and complicated.
// For example, if using the xDS config fetcher, an involved matching process
// is performed to determine the filter chain to apply for this connection,
// prepare the filters, config selector and credentials. Subsequently, the
// credentials are used to create a security connector as well. Doing this
// outside the critical region allows us to get a larger degree of parallelism
// for the handling of incoming connections.
ChannelArgs new_args = args;
if (server_->config_fetcher() != nullptr) {
if (connection_manager == nullptr) {
Expand All @@ -223,6 +231,10 @@ absl::optional<ChannelArgs> Server::ListenerState::AddLogicalConnection(
new_args = (*args_result).SetObject(security_connector);
}
MutexLock lock(&mu_);
// Since we let go of the lock earlier, we need to protect ourselves against
// time-of-check-to-time-of-use cases. The server may have stopped serving
// or the connection manager may have changed before we add the connection
// to the list of tracked connections.
if (!is_serving_ || connection_manager != connection_manager_) {
// Not serving
return absl::nullopt;
Expand All @@ -245,8 +257,9 @@ void Server::ListenerState::OnHandshakeDone(
auto connection_handle = connections_.extract(connection);
if (!connection_handle.empty()) {
connection_to_remove = std::move(connection_handle.value());
return;
}
// We do not need to check connections_to_be_drained_list_ since that only
// gets set if there is a config fetcher.
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/core/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,14 @@ class Server : public ServerInterface,
// done. As such, implementations of `LogicalConnection` should cancel the
// handshake on `Orphan` if still in progress, but not close down the
// transport.
// Implementations are responsible for informing ListenerState about the
// following stages of a connection -
// 1) Invoke AddLogicalConnection() on accepting a new connection. Do not
// invoke if the connection is going to be closed immediately.
// 2) Invoke OnHandshakeDone() (irrespective of error) once handshake is
// done.
// 3) Invoke RemoveLogicalConnection() when the connection is closed.
// Do not invoke if the connection was never added.
// TODO(yashykt): In the case where there is no config fetcher, we remove
// the connection from our map and instead use `ChannelData` to keep track
// of the connections. This is much cheaper (8 bytes per connection) as
Expand Down
6 changes: 3 additions & 3 deletions test/cpp/end2end/xds/xds_end2end_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3506,9 +3506,9 @@ int main(int argc, char** argv) {
// updates from all the subchannels's FDs.
grpc_core::ConfigVars::Overrides overrides;
overrides.client_channel_backup_poll_interval_ms = 1;
overrides.trace =
"call,channel,client_channel,client_channel_call,client_channel_lb_call,"
"handshaker";
// overrides.trace =
// "call,channel,client_channel,client_channel_call,client_channel_lb_call,"
// "handshaker";
grpc_core::ConfigVars::SetOverrides(overrides);
#if TARGET_OS_IPHONE
// Workaround Apple CFStream bug
Expand Down

0 comments on commit 7ee0882

Please sign in to comment.