Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
ctiller committed Jan 30, 2025
1 parent ad98bb1 commit 36ecdde
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 66 deletions.
35 changes: 17 additions & 18 deletions src/core/client_channel/client_channel_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,25 +529,24 @@ class ClientChannelFilter::SubchannelWrapper final
// WorkSerializer.
// Ref held by callback.
WeakRef(DEBUG_LOCATION, "subchannel map cleanup").release();
chand_->work_serializer_->Run(
[this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
chand_->subchannel_wrappers_.erase(this);
if (chand_->channelz_node_ != nullptr) {
auto* subchannel_node = subchannel_->channelz_node();
if (subchannel_node != nullptr) {
auto it =
chand_->subchannel_refcount_map_.find(subchannel_.get());
CHECK(it != chand_->subchannel_refcount_map_.end());
--it->second;
if (it->second == 0) {
chand_->channelz_node_->RemoveChildSubchannel(
subchannel_node->uuid());
chand_->subchannel_refcount_map_.erase(it);
}
}
chand_->work_serializer_->Run([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(
*chand_->work_serializer_) {
chand_->subchannel_wrappers_.erase(this);
if (chand_->channelz_node_ != nullptr) {
auto* subchannel_node = subchannel_->channelz_node();
if (subchannel_node != nullptr) {
auto it = chand_->subchannel_refcount_map_.find(subchannel_.get());
CHECK(it != chand_->subchannel_refcount_map_.end());
--it->second;
if (it->second == 0) {
chand_->channelz_node_->RemoveChildSubchannel(
subchannel_node->uuid());
chand_->subchannel_refcount_map_.erase(it);
}
WeakUnref(DEBUG_LOCATION, "subchannel map cleanup");
});
}
}
WeakUnref(DEBUG_LOCATION, "subchannel map cleanup");
});
}

void WatchConnectivityState(
Expand Down
11 changes: 5 additions & 6 deletions src/core/client_channel/subchannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,11 @@ void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked(
void Subchannel::ConnectivityStateWatcherList::NotifyLocked(
grpc_connectivity_state state, const absl::Status& status) {
for (const auto& watcher : watchers_) {
subchannel_->work_serializer_.Run(
[watcher = watcher->Ref(), state, status]() mutable {
auto* watcher_ptr = watcher.get();
watcher_ptr->OnConnectivityStateChange(std::move(watcher), state,
status);
});
subchannel_->work_serializer_.Run([watcher = watcher->Ref(), state,
status]() mutable {
auto* watcher_ptr = watcher.get();
watcher_ptr->OnConnectivityStateChange(std::move(watcher), state, status);
});
}
}

Expand Down
9 changes: 4 additions & 5 deletions src/core/resolver/google_c2p/google_c2p_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,10 @@ void GoogleCloud2ProdResolver::StartLocked() {
[resolver = RefAsSubclass<GoogleCloud2ProdResolver>()](
std::string /* attribute */,
absl::StatusOr<std::string> result) mutable {
resolver->work_serializer_->Run(
[resolver, result = std::move(result)]() mutable {
resolver->ZoneQueryDone(result.ok() ? std::move(result).value()
: "");
});
resolver->work_serializer_->Run([resolver,
result = std::move(result)]() mutable {
resolver->ZoneQueryDone(result.ok() ? std::move(result).value() : "");
});
},
Duration::Seconds(10));
ipv6_query_ = MakeOrphanable<GcpMetadataQuery>(
Expand Down
36 changes: 17 additions & 19 deletions test/core/load_balancing/bm_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,23 @@ class BenchmarkHelper : public std::enable_shared_from_this<BenchmarkHelper> {
{
MutexLock lock(&mu_);
picker_ = nullptr;
work_serializer_->Run(
[this, num_endpoints]() {
EndpointAddressesList addresses;
for (size_t i = 0; i < num_endpoints; i++) {
grpc_resolved_address addr;
int port = i % 65536;
int ip = i / 65536;
CHECK_LT(ip, 256);
CHECK(grpc_parse_uri(
URI::Parse(absl::StrCat("ipv4:127.0.0.", ip, ":", port))
.value(),
&addr));
addresses.emplace_back(addr, ChannelArgs());
}
CHECK_OK(lb_policy_->UpdateLocked(LoadBalancingPolicy::UpdateArgs{
std::make_shared<EndpointAddressesListIterator>(
std::move(addresses)),
config_, "", ChannelArgs()}));
});
work_serializer_->Run([this, num_endpoints]() {
EndpointAddressesList addresses;
for (size_t i = 0; i < num_endpoints; i++) {
grpc_resolved_address addr;
int port = i % 65536;
int ip = i / 65536;
CHECK_LT(ip, 256);
CHECK(grpc_parse_uri(
URI::Parse(absl::StrCat("ipv4:127.0.0.", ip, ":", port)).value(),
&addr));
addresses.emplace_back(addr, ChannelArgs());
}
CHECK_OK(lb_policy_->UpdateLocked(LoadBalancingPolicy::UpdateArgs{
std::make_shared<EndpointAddressesListIterator>(
std::move(addresses)),
config_, "", ChannelArgs()}));
});
}
}

Expand Down
35 changes: 17 additions & 18 deletions test/core/load_balancing/pick_first_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,24 +460,23 @@ TEST_F(PickFirstTest, ResolverUpdateBeforeLeavingIdle) {
// subchannels (i.e., before it can transition from IDLE to CONNECTING),
// we send a new update.
absl::Notification notification;
work_serializer_->Run(
[&]() {
// Inject second update into WorkSerializer queue before we
// exit idle, so that the second update gets run before the initial
// subchannel connectivity state notifications from the first update
// are delivered.
work_serializer_->Run([&]() {
// Second update.
absl::Status status = lb_policy()->UpdateLocked(
BuildUpdate(kNewAddresses, MakePickFirstConfig(false)));
EXPECT_TRUE(status.ok()) << status;
// Trigger notification once all connectivity state
// notifications have been delivered.
work_serializer_->Run([&]() { notification.Notify(); });
});
// Exit idle.
lb_policy()->ExitIdleLocked();
});
work_serializer_->Run([&]() {
// Inject second update into WorkSerializer queue before we
// exit idle, so that the second update gets run before the initial
// subchannel connectivity state notifications from the first update
// are delivered.
work_serializer_->Run([&]() {
// Second update.
absl::Status status = lb_policy()->UpdateLocked(
BuildUpdate(kNewAddresses, MakePickFirstConfig(false)));
EXPECT_TRUE(status.ok()) << status;
// Trigger notification once all connectivity state
// notifications have been delivered.
work_serializer_->Run([&]() { notification.Notify(); });
});
// Exit idle.
lb_policy()->ExitIdleLocked();
});
while (!notification.HasBeenNotified()) {
fuzzing_ee_->Tick();
}
Expand Down

0 comments on commit 36ecdde

Please sign in to comment.