diff --git a/src/core/client_channel/client_channel_filter.cc b/src/core/client_channel/client_channel_filter.cc index adc04bce11aff..3182f1dcfdb88 100644 --- a/src/core/client_channel/client_channel_filter.cc +++ b/src/core/client_channel/client_channel_filter.cc @@ -529,25 +529,24 @@ class ClientChannelFilter::SubchannelWrapper final // WorkSerializer. // Ref held by callback. WeakRef(DEBUG_LOCATION, "subchannel map cleanup").release(); - chand_->work_serializer_->Run( - [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) { - chand_->subchannel_wrappers_.erase(this); - if (chand_->channelz_node_ != nullptr) { - auto* subchannel_node = subchannel_->channelz_node(); - if (subchannel_node != nullptr) { - auto it = - chand_->subchannel_refcount_map_.find(subchannel_.get()); - CHECK(it != chand_->subchannel_refcount_map_.end()); - --it->second; - if (it->second == 0) { - chand_->channelz_node_->RemoveChildSubchannel( - subchannel_node->uuid()); - chand_->subchannel_refcount_map_.erase(it); - } - } + chand_->work_serializer_->Run([this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED( + *chand_->work_serializer_) { + chand_->subchannel_wrappers_.erase(this); + if (chand_->channelz_node_ != nullptr) { + auto* subchannel_node = subchannel_->channelz_node(); + if (subchannel_node != nullptr) { + auto it = chand_->subchannel_refcount_map_.find(subchannel_.get()); + CHECK(it != chand_->subchannel_refcount_map_.end()); + --it->second; + if (it->second == 0) { + chand_->channelz_node_->RemoveChildSubchannel( + subchannel_node->uuid()); + chand_->subchannel_refcount_map_.erase(it); } - WeakUnref(DEBUG_LOCATION, "subchannel map cleanup"); - }); + } + } + WeakUnref(DEBUG_LOCATION, "subchannel map cleanup"); + }); } void WatchConnectivityState( diff --git a/src/core/client_channel/subchannel.cc b/src/core/client_channel/subchannel.cc index a33ce572469d4..9d07abf7b54ce 100644 --- a/src/core/client_channel/subchannel.cc +++ b/src/core/client_channel/subchannel.cc @@ -453,12 +453,11 @@ void Subchannel::ConnectivityStateWatcherList::RemoveWatcherLocked( void Subchannel::ConnectivityStateWatcherList::NotifyLocked( grpc_connectivity_state state, const absl::Status& status) { for (const auto& watcher : watchers_) { - subchannel_->work_serializer_.Run( - [watcher = watcher->Ref(), state, status]() mutable { - auto* watcher_ptr = watcher.get(); - watcher_ptr->OnConnectivityStateChange(std::move(watcher), state, - status); - }); + subchannel_->work_serializer_.Run([watcher = watcher->Ref(), state, + status]() mutable { + auto* watcher_ptr = watcher.get(); + watcher_ptr->OnConnectivityStateChange(std::move(watcher), state, status); + }); } } diff --git a/src/core/resolver/google_c2p/google_c2p_resolver.cc b/src/core/resolver/google_c2p/google_c2p_resolver.cc index 9ff4ed39fbd45..0c86c392c57d8 100644 --- a/src/core/resolver/google_c2p/google_c2p_resolver.cc +++ b/src/core/resolver/google_c2p/google_c2p_resolver.cc @@ -156,11 +156,10 @@ void GoogleCloud2ProdResolver::StartLocked() { [resolver = RefAsSubclass()]( std::string /* attribute */, absl::StatusOr result) mutable { - resolver->work_serializer_->Run( - [resolver, result = std::move(result)]() mutable { - resolver->ZoneQueryDone(result.ok() ? std::move(result).value() - : ""); - }); + resolver->work_serializer_->Run([resolver, + result = std::move(result)]() mutable { + resolver->ZoneQueryDone(result.ok() ? std::move(result).value() : ""); + }); }, Duration::Seconds(10)); ipv6_query_ = MakeOrphanable( diff --git a/test/core/load_balancing/bm_picker.cc b/test/core/load_balancing/bm_picker.cc index a295acce220b5..4c7eb3b8838d9 100644 --- a/test/core/load_balancing/bm_picker.cc +++ b/test/core/load_balancing/bm_picker.cc @@ -62,25 +62,23 @@ class BenchmarkHelper : public std::enable_shared_from_this { { MutexLock lock(&mu_); picker_ = nullptr; - work_serializer_->Run( - [this, num_endpoints]() { - EndpointAddressesList addresses; - for (size_t i = 0; i < num_endpoints; i++) { - grpc_resolved_address addr; - int port = i % 65536; - int ip = i / 65536; - CHECK_LT(ip, 256); - CHECK(grpc_parse_uri( - URI::Parse(absl::StrCat("ipv4:127.0.0.", ip, ":", port)) - .value(), - &addr)); - addresses.emplace_back(addr, ChannelArgs()); - } - CHECK_OK(lb_policy_->UpdateLocked(LoadBalancingPolicy::UpdateArgs{ - std::make_shared( - std::move(addresses)), - config_, "", ChannelArgs()})); - }); + work_serializer_->Run([this, num_endpoints]() { + EndpointAddressesList addresses; + for (size_t i = 0; i < num_endpoints; i++) { + grpc_resolved_address addr; + int port = i % 65536; + int ip = i / 65536; + CHECK_LT(ip, 256); + CHECK(grpc_parse_uri( + URI::Parse(absl::StrCat("ipv4:127.0.0.", ip, ":", port)).value(), + &addr)); + addresses.emplace_back(addr, ChannelArgs()); + } + CHECK_OK(lb_policy_->UpdateLocked(LoadBalancingPolicy::UpdateArgs{ + std::make_shared( + std::move(addresses)), + config_, "", ChannelArgs()})); + }); } } diff --git a/test/core/load_balancing/pick_first_test.cc b/test/core/load_balancing/pick_first_test.cc index 06b8e6d7a991e..a195235d185ab 100644 --- a/test/core/load_balancing/pick_first_test.cc +++ b/test/core/load_balancing/pick_first_test.cc @@ -460,24 +460,23 @@ TEST_F(PickFirstTest, ResolverUpdateBeforeLeavingIdle) { // subchannels (i.e., before it can transition from IDLE to CONNECTING), // we send a new update. absl::Notification notification; - work_serializer_->Run( - [&]() { - // Inject second update into WorkSerializer queue before we - // exit idle, so that the second update gets run before the initial - // subchannel connectivity state notifications from the first update - // are delivered. - work_serializer_->Run([&]() { - // Second update. - absl::Status status = lb_policy()->UpdateLocked( - BuildUpdate(kNewAddresses, MakePickFirstConfig(false))); - EXPECT_TRUE(status.ok()) << status; - // Trigger notification once all connectivity state - // notifications have been delivered. - work_serializer_->Run([&]() { notification.Notify(); }); - }); - // Exit idle. - lb_policy()->ExitIdleLocked(); - }); + work_serializer_->Run([&]() { + // Inject second update into WorkSerializer queue before we + // exit idle, so that the second update gets run before the initial + // subchannel connectivity state notifications from the first update + // are delivered. + work_serializer_->Run([&]() { + // Second update. + absl::Status status = lb_policy()->UpdateLocked( + BuildUpdate(kNewAddresses, MakePickFirstConfig(false))); + EXPECT_TRUE(status.ok()) << status; + // Trigger notification once all connectivity state + // notifications have been delivered. + work_serializer_->Run([&]() { notification.Notify(); }); + }); + // Exit idle. + lb_policy()->ExitIdleLocked(); + }); while (!notification.HasBeenNotified()) { fuzzing_ee_->Tick(); }