Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Jan 4, 2024
1 parent 5851f05 commit 50dbcc3
Show file tree
Hide file tree
Showing 14 changed files with 425 additions and 681 deletions.
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4070,6 +4070,7 @@ grpc_cc_library(
"//src/core:closure",
"//src/core:error",
"//src/core:experiments",
"//src/core:gpr_manual_constructor",
"//src/core:http2_errors",
"//src/core:http2_settings",
"//src/core:init_internally",
Expand Down
6 changes: 6 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6028,9 +6028,15 @@ grpc_cc_library(
hdrs = [
"ext/transport/chttp2/transport/http2_settings.h",
],
external_deps = [
"absl/functional:function_ref",
"absl/strings",
"absl/types:optional",
],
deps = [
"http2_errors",
"useful",
"//:chttp2_frame",
"//:gpr_platform",
],
)
Expand Down
195 changes: 59 additions & 136 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,6 @@ static void read_action_locked(grpc_core::RefCountedPtr<grpc_chttp2_transport>,
static void continue_read_action_locked(
grpc_core::RefCountedPtr<grpc_chttp2_transport> t);

// Set a transport level setting, and push it to our peer
static void queue_setting_update(grpc_chttp2_transport* t,
grpc_chttp2_setting_id id, uint32_t value);

static void close_from_api(grpc_chttp2_transport* t, grpc_chttp2_stream* s,
grpc_error_handle error, bool tarpit);

Expand Down Expand Up @@ -550,93 +546,54 @@ static void read_channel_args(grpc_chttp2_transport* t,
t->max_header_list_size_soft_limit = soft_limit;
}

static const struct {
absl::string_view channel_arg_name;
grpc_chttp2_setting_id setting_id;
int default_value;
int min;
int max;
bool availability[2] /* server, client */;
} settings_map[] = {{GRPC_ARG_MAX_CONCURRENT_STREAMS,
GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
-1,
0,
INT32_MAX,
{true, false}},
{GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER,
GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
-1,
0,
INT32_MAX,
{true, true}},
{GRPC_ARG_ABSOLUTE_MAX_METADATA_SIZE,
GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
-1,
0,
INT32_MAX,
{true, true}},
{GRPC_ARG_HTTP2_MAX_FRAME_SIZE,
GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
-1,
16384,
16777215,
{true, true}},
{GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY,
GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA,
1,
0,
1,
{true, true}},
{GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
-1,
5,
INT32_MAX,
{true, true}}};

for (size_t i = 0; i < GPR_ARRAY_SIZE(settings_map); i++) {
const auto& setting = settings_map[i];
if (setting.availability[is_client]) {
const int value = channel_args.GetInt(setting.channel_arg_name)
.value_or(setting.default_value);
if (value >= 0) {
const int clamped_value =
grpc_core::Clamp(value, setting.min, setting.max);
queue_setting_update(t, setting.setting_id, clamped_value);
if (setting.setting_id == GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS) {
t->max_concurrent_streams_policy.SetTarget(clamped_value);
}
} else if (setting.setting_id ==
GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE) {
// Set value to 1.25 * soft limit if this is larger than
// `DEFAULT_MAX_HEADER_LIST_SIZE` and
// `GRPC_ARG_ABSOLUTE_MAX_METADATA_SIZE` is not set.
const int soft_limit = channel_args.GetInt(GRPC_ARG_MAX_METADATA_SIZE)
.value_or(setting.default_value);
const int value = (soft_limit >= 0 && soft_limit < (INT_MAX / 1.25))
? static_cast<int>(soft_limit * 1.25)
: soft_limit;
if (value > DEFAULT_MAX_HEADER_LIST_SIZE) {
queue_setting_update(
t, setting.setting_id,
grpc_core::Clamp(value, setting.min, setting.max));
}
}
} else if (channel_args.Contains(setting.channel_arg_name)) {
gpr_log(GPR_DEBUG, "%s is not available on %s",
std::string(setting.channel_arg_name).c_str(),
is_client ? "clients" : "servers");
int value;
if (!is_client) {
value = channel_args.GetInt(GRPC_ARG_MAX_CONCURRENT_STREAMS).value_or(-1);
if (value >= 0) {
t->settings.mutable_local().SetMaxConcurrentStreams(value);
t->max_concurrent_streams_policy.SetTarget(value);
}
} else if (channel_args.Contains(GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
gpr_log(GPR_DEBUG, "%s is not available on clients",
GRPC_ARG_MAX_CONCURRENT_STREAMS);
}
value =
channel_args.GetInt(GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER).value_or(-1);
if (value >= 0) {
t->settings.mutable_local().SetHeaderTableSize(value);
}
value = channel_args.GetInt(GRPC_ARG_ABSOLUTE_MAX_METADATA_SIZE).value_or(-1);
if (value >= 0) {
t->settings.mutable_local().SetMaxHeaderListSize(value);
} else {
// Set value to 1.25 * soft limit if this is larger than
// `DEFAULT_MAX_HEADER_LIST_SIZE` and
// `GRPC_ARG_ABSOLUTE_MAX_METADATA_SIZE` is not set.
const int soft_limit =
channel_args.GetInt(GRPC_ARG_MAX_METADATA_SIZE).value_or(-1);
const int value = (soft_limit >= 0 && soft_limit < (INT_MAX / 1.25))
? static_cast<int>(soft_limit * 1.25)
: soft_limit;
if (value > DEFAULT_MAX_HEADER_LIST_SIZE) {
t->settings.mutable_local().SetMaxHeaderListSize(value);
}
}
value = channel_args.GetInt(GRPC_ARG_HTTP2_MAX_FRAME_SIZE).value_or(-1);
if (value >= 0) {
t->settings.mutable_local().SetMaxFrameSize(value);
}
value =
channel_args.GetInt(GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES).value_or(-1);
if (value >= 0) {
t->settings.mutable_local().SetInitialWindowSize(value);
}
value = channel_args.GetInt(GRPC_ARG_HTTP2_ENABLE_TRUE_BINARY).value_or(-1);
if (value >= 0) {
t->settings.mutable_local().SetAllowTrueBinaryMetadata(value != 0);
}

if (t->enable_preferred_rx_crypto_frame_advertisement) {
const grpc_chttp2_setting_parameters* sp =
&grpc_chttp2_settings_parameters
[GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE];
queue_setting_update(
t, GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE,
grpc_core::Clamp(INT_MAX, static_cast<int>(sp->min_value),
static_cast<int>(sp->max_value)));
t->settings.mutable_local().SetPreferredReceiveCryptoMessageSize(INT_MAX);
}

t->ping_on_rst_stream_percent = grpc_core::Clamp(
Expand Down Expand Up @@ -710,33 +667,22 @@ grpc_chttp2_transport::grpc_chttp2_transport(
grpc_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
}
grpc_slice_buffer_init(&qbuf);
// copy in initial settings to all setting sets
size_t i;
int j;
for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
settings[j][i] = grpc_chttp2_settings_parameters[i].default_value;
}
}
grpc_chttp2_goaway_parser_init(&goaway_parser);

// configure http2 the way we like it
if (is_client) {
queue_setting_update(this, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
settings.mutable_local().SetEnablePush(false);
settings.mutable_local().SetMaxConcurrentStreams(0);
}
queue_setting_update(this, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
DEFAULT_MAX_HEADER_LIST_SIZE);
queue_setting_update(this,
GRPC_CHTTP2_SETTINGS_GRPC_ALLOW_TRUE_BINARY_METADATA, 1);
settings.mutable_local().SetMaxHeaderListSize(DEFAULT_MAX_HEADER_LIST_SIZE);
settings.mutable_local().SetAllowTrueBinaryMetadata(true);

read_channel_args(this, channel_args, is_client);

// Initially allow *UP TO* MAX_CONCURRENT_STREAMS incoming before we start
// blanket cancelling them.
num_incoming_streams_before_settings_ack =
settings[GRPC_LOCAL_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS];
settings.local().max_concurrent_streams();

grpc_core::ExecCtx exec_ctx;
combiner->Run(
Expand Down Expand Up @@ -1126,9 +1072,7 @@ static void write_action(grpc_chttp2_transport* t) {
// Choose max_frame_size as the prefered rx crypto frame size indicated by the
// peer.
int max_frame_size =
t->settings
[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE];
t->settings.peer().preferred_receive_crypto_message_size();
// Note: max frame size is 0 if the remote peer does not support adjusting the
// sending frame size.
if (max_frame_size == 0) {
Expand Down Expand Up @@ -1205,23 +1149,6 @@ static void write_action_end_locked(
grpc_chttp2_end_write(t.get(), error);
}

// Dirties an HTTP2 setting to be sent out next time a writing path occurs.
// If the change needs to occur immediately, manually initiate a write.
static void queue_setting_update(grpc_chttp2_transport* t,
grpc_chttp2_setting_id id, uint32_t value) {
const grpc_chttp2_setting_parameters* sp =
&grpc_chttp2_settings_parameters[id];
uint32_t use_value = grpc_core::Clamp(value, sp->min_value, sp->max_value);
if (use_value != value) {
gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
value, use_value);
}
if (use_value != t->settings[GRPC_LOCAL_SETTINGS][id]) {
t->settings[GRPC_LOCAL_SETTINGS][id] = use_value;
t->dirtied_local_settings = true;
}
}

// Cancel out streams that haven't yet started if we have received a GOAWAY
static void cancel_unstarted_streams(grpc_chttp2_transport* t,
grpc_error_handle error, bool tarpit) {
Expand Down Expand Up @@ -1320,9 +1247,7 @@ static void maybe_start_some_streams(grpc_chttp2_transport* t) {
// start streams where we have free grpc_chttp2_stream ids and free
// * concurrency
while (t->next_stream_id <= MAX_CLIENT_STREAM_ID &&
t->stream_map.size() <
t->settings[GRPC_PEER_SETTINGS]
[GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
t->stream_map.size() < t->settings.peer().max_concurrent_streams() &&
grpc_chttp2_list_pop_waiting_for_concurrency(t, &s)) {
// safe since we can't (legally) be parsing this stream yet
GRPC_CHTTP2_IF_TRACING(gpr_log(
Expand Down Expand Up @@ -2707,21 +2632,19 @@ void grpc_chttp2_act_on_flowctl_action(
GRPC_CHTTP2_INITIATE_WRITE_TRANSPORT_FLOW_CONTROL, []() {});
WithUrgency(t, action.send_initial_window_update(),
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
queue_setting_update(t,
GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
action.initial_window_size());
});
WithUrgency(t, action.send_max_frame_size_update(),
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
queue_setting_update(t, GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE,
action.max_frame_size());
t->settings.mutable_local().SetInitialWindowSize(
action.initial_window_size());
});
WithUrgency(
t, action.send_max_frame_size_update(),
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
t->settings.mutable_local().SetMaxFrameSize(action.max_frame_size());
});
if (t->enable_preferred_rx_crypto_frame_advertisement) {
WithUrgency(
t, action.preferred_rx_crypto_frame_size_update(),
GRPC_CHTTP2_INITIATE_WRITE_SEND_SETTINGS, [t, &action]() {
queue_setting_update(
t, GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE,
t->settings.mutable_local().SetPreferredReceiveCryptoMessageSize(
action.preferred_rx_crypto_frame_size());
});
}
Expand Down
31 changes: 15 additions & 16 deletions src/core/ext/transport/chttp2/transport/flow_control.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,14 @@ TransportFlowControl::TargetInitialWindowSizeBasedOnMemoryPressureAndBdp()
}

void TransportFlowControl::UpdateSetting(
grpc_chttp2_setting_id id, int64_t* desired_value,
uint32_t new_desired_value, FlowControlAction* action,
absl::string_view name, int64_t* desired_value, uint32_t new_desired_value,
FlowControlAction* action,
FlowControlAction& (FlowControlAction::*set)(FlowControlAction::Urgency,
uint32_t)) {
new_desired_value =
Clamp(new_desired_value, grpc_chttp2_settings_parameters[id].min_value,
grpc_chttp2_settings_parameters[id].max_value);
if (new_desired_value != *desired_value) {
if (grpc_flowctl_trace.enabled()) {
gpr_log(GPR_INFO, "[flowctl] UPDATE SETTING %s from %" PRId64 " to %d",
grpc_chttp2_settings_parameters[id].name, *desired_value,
new_desired_value);
std::string(name).c_str(), *desired_value, new_desired_value);
}
// Reaching zero can only happen for initial window size, and if it occurs
// we really want to wake up writes and ensure all the queued stream
Expand Down Expand Up @@ -290,13 +286,15 @@ FlowControlAction TransportFlowControl::PeriodicUpdate() {
}
// Though initial window 'could' drop to 0, we keep the floor at
// kMinInitialWindowSize
UpdateSetting(GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE,
&target_initial_window_size_, target, &action,
&FlowControlAction::set_send_initial_window_update);
UpdateSetting(Http2Settings::initial_window_size_name(),
&target_initial_window_size_,
std::min(target, Http2Settings::max_initial_window_size()),
&action, &FlowControlAction::set_send_initial_window_update);
// we target the max of BDP or bandwidth in microseconds.
UpdateSetting(GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE, &target_frame_size_,
target, &action,
&FlowControlAction::set_send_max_frame_size_update);
UpdateSetting(Http2Settings::max_frame_size_name(), &target_frame_size_,
Clamp(target, Http2Settings::min_max_frame_size(),
Http2Settings::max_max_frame_size()),
&action, &FlowControlAction::set_send_max_frame_size_update);

if (IsTcpFrameSizeTuningEnabled()) {
// Advertise PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE to peer. By advertising
Expand All @@ -306,10 +304,11 @@ FlowControlAction TransportFlowControl::PeriodicUpdate() {
// Clamp(target_frame_size_ * 2, 16384, 0x7fffffff). In the future, this
// maybe updated to a different function of the memory pressure.
UpdateSetting(
GRPC_CHTTP2_SETTINGS_GRPC_PREFERRED_RECEIVE_CRYPTO_FRAME_SIZE,
Http2Settings::preferred_receive_crypto_message_size_name(),
&target_preferred_rx_crypto_frame_size_,
Clamp(static_cast<unsigned int>(target_frame_size_ * 2), 16384u,
0x7ffffffu),
Clamp(static_cast<unsigned int>(target_frame_size_ * 2),
Http2Settings::min_preferred_receive_crypto_message_size(),
Http2Settings::max_preferred_receive_crypto_message_size()),
&action,
&FlowControlAction::set_preferred_rx_crypto_frame_size_update);
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/ext/transport/chttp2/transport/flow_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class TransportFlowControl final {

private:
double TargetInitialWindowSizeBasedOnMemoryPressureAndBdp() const;
static void UpdateSetting(grpc_chttp2_setting_id id, int64_t* desired_value,
static void UpdateSetting(absl::string_view name, int64_t* desired_value,
uint32_t new_desired_value,
FlowControlAction* action,
FlowControlAction& (FlowControlAction::*set)(
Expand Down
2 changes: 2 additions & 0 deletions src/core/ext/transport/chttp2/transport/frame.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ struct Http2RstStreamFrame {
// SETTINGS frame
struct Http2SettingsFrame {
struct Setting {
Setting(uint16_t id, uint32_t value) : id(id), value(value) {}

uint16_t id;
uint32_t value;

Expand Down
Loading

0 comments on commit 50dbcc3

Please sign in to comment.