From 7e089a22d736a095974ed6cf4492541ed0c35275 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 26 Jul 2024 09:56:02 -0700 Subject: [PATCH] [party] Make it faster (#37132) Notes: * Adds a single participant `AddParticipant` variant for this common case (per #37056 which I'm abandoning) * Folds the `PartySyncUsingAtomics` class back into `Party`, removes the `PartySyncUsingMutex` class * Leverages this integration to find places where we're doing repeated CAS operations and folds them into single operations (for example an unlock/unref pair can be folded into a single CAS) * Also lowers some `CHECK` statements into `DCHECK` - which I think is appropriate given the performance sensitivity of this code * Adds code to deal with overflowing the number of participants added to a party -- for now we do a busy add by queuing to event engine and retrying -- this has the advantage of not adding cost to the normal path, but has the slightly worrying disadvantage of effectively being a busy poll. My expectation is that this will be ok in general (the condition clears very quickly), but if not we'll modify this to be a linked list of pending actions and take a hit on the fast path. * Simplifies `PartyIsOver` (per #37113 which I'm abandoning) * Keeps a per-object wakeup cache (`wakeup_mask_`) that is protected by the lock bit -- this allows waking up a participant during polling without resorting to an extra atomic operation - significantly speeding that wakeup path (17ns --> 6ns) Before: ``` ---------------------------------------------------------------------- Benchmark Time CPU Iterations ---------------------------------------------------------------------- BM_PartyCreate 142 ns 142 ns 44952269 BM_PartyCreate 73.8 ns 73.8 ns 44952269 BM_PartyCreate 72.6 ns 72.6 ns 44952269 BM_PartyCreate 72.5 ns 72.5 ns 44952269 BM_PartyCreate 72.4 ns 72.4 ns 44952269 BM_PartyCreate 72.5 ns 72.5 ns 44952269 BM_PartyCreate 72.5 ns 72.5 ns 44952269 BM_PartyCreate 72.6 ns 72.6 ns 44952269 BM_PartyCreate 72.2 ns 72.2 ns 44952269 BM_PartyCreate 72.5 ns 72.5 ns 44952269 BM_PartyCreate_mean 79.5 ns 79.5 ns 10 BM_PartyCreate_median 72.5 ns 72.5 ns 10 BM_PartyCreate_stddev 21.8 ns 21.8 ns 10 BM_PartyCreate_cv 27.46 % 27.46 % 10 BM_AddParticipant 35.3 ns 35.3 ns 197041251 BM_AddParticipant 35.3 ns 35.3 ns 197041251 BM_AddParticipant 35.1 ns 35.1 ns 197041251 BM_AddParticipant 35.4 ns 35.4 ns 197041251 BM_AddParticipant 35.3 ns 35.3 ns 197041251 BM_AddParticipant 35.2 ns 35.2 ns 197041251 BM_AddParticipant 35.9 ns 35.9 ns 197041251 BM_AddParticipant 36.0 ns 36.0 ns 197041251 BM_AddParticipant 35.8 ns 35.8 ns 197041251 BM_AddParticipant 36.0 ns 36.0 ns 197041251 BM_AddParticipant_mean 35.5 ns 35.5 ns 10 BM_AddParticipant_median 35.4 ns 35.4 ns 10 BM_AddParticipant_stddev 0.352 ns 0.352 ns 10 BM_AddParticipant_cv 0.99 % 0.99 % 10 BM_WakeupParticipant 17.1 ns 17.1 ns 406116840 BM_WakeupParticipant 16.9 ns 16.9 ns 406116840 BM_WakeupParticipant 16.8 ns 16.8 ns 406116840 BM_WakeupParticipant 16.8 ns 16.8 ns 406116840 BM_WakeupParticipant 16.9 ns 16.9 ns 406116840 BM_WakeupParticipant 16.9 ns 16.9 ns 406116840 BM_WakeupParticipant 17.0 ns 17.0 ns 406116840 BM_WakeupParticipant 17.0 ns 17.0 ns 406116840 BM_WakeupParticipant 16.9 ns 16.9 ns 406116840 BM_WakeupParticipant 17.0 ns 17.0 ns 406116840 BM_WakeupParticipant_mean 16.9 ns 16.9 ns 10 BM_WakeupParticipant_median 16.9 ns 16.9 ns 10 BM_WakeupParticipant_stddev 0.087 ns 0.087 ns 10 BM_WakeupParticipant_cv 0.51 % 0.51 % 10 ``` After: ``` ---------------------------------------------------------------------- Benchmark Time CPU Iterations ---------------------------------------------------------------------- BM_PartyCreate 115 ns 115 ns 29602192 BM_PartyCreate 56.5 ns 56.5 ns 29602192 BM_PartyCreate 55.3 ns 55.3 ns 29602192 BM_PartyCreate 55.9 ns 55.9 ns 29602192 BM_PartyCreate 55.1 ns 55.1 ns 29602192 BM_PartyCreate 55.2 ns 55.2 ns 29602192 BM_PartyCreate 55.2 ns 55.2 ns 29602192 BM_PartyCreate 56.2 ns 56.2 ns 29602192 BM_PartyCreate 54.7 ns 54.7 ns 29602192 BM_PartyCreate 55.8 ns 55.8 ns 29602192 BM_PartyCreate_mean 61.5 ns 61.5 ns 10 BM_PartyCreate_median 55.5 ns 55.5 ns 10 BM_PartyCreate_stddev 18.9 ns 18.9 ns 10 BM_PartyCreate_cv 30.68 % 30.68 % 10 BM_AddParticipant 26.9 ns 26.9 ns 155407231 BM_AddParticipant 26.5 ns 26.5 ns 155407231 BM_AddParticipant 24.8 ns 24.8 ns 155407231 BM_AddParticipant 24.9 ns 24.9 ns 155407231 BM_AddParticipant 24.8 ns 24.8 ns 155407231 BM_AddParticipant 25.3 ns 25.3 ns 155407231 BM_AddParticipant 25.8 ns 25.8 ns 155407231 BM_AddParticipant 25.3 ns 25.3 ns 155407231 BM_AddParticipant 30.8 ns 30.8 ns 155407231 BM_AddParticipant 27.7 ns 27.7 ns 155407231 BM_AddParticipant_mean 26.3 ns 26.3 ns 10 BM_AddParticipant_median 25.6 ns 25.6 ns 10 BM_AddParticipant_stddev 1.87 ns 1.87 ns 10 BM_AddParticipant_cv 7.11 % 7.10 % 10 BM_WakeupParticipant 6.75 ns 6.75 ns 623459241 BM_WakeupParticipant 6.77 ns 6.77 ns 623459241 BM_WakeupParticipant 6.74 ns 6.74 ns 623459241 BM_WakeupParticipant 6.73 ns 6.73 ns 623459241 BM_WakeupParticipant 6.74 ns 6.74 ns 623459241 BM_WakeupParticipant 6.70 ns 6.70 ns 623459241 BM_WakeupParticipant 6.70 ns 6.69 ns 623459241 BM_WakeupParticipant 6.79 ns 6.79 ns 623459241 BM_WakeupParticipant 6.76 ns 6.76 ns 623459241 BM_WakeupParticipant 6.78 ns 6.78 ns 623459241 BM_WakeupParticipant_mean 6.75 ns 6.75 ns 10 BM_WakeupParticipant_median 6.75 ns 6.75 ns 10 BM_WakeupParticipant_stddev 0.031 ns 0.031 ns 10 BM_WakeupParticipant_cv 0.46 % 0.46 % 10 ``` Closes #37132 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/37132 from ctiller:nineteen-ninety-nine 336c87bdd645133a174861adb902ca763e07f57e PiperOrigin-RevId: 656437265 --- src/core/lib/promise/party.cc | 346 +++++++++++++++++--------- src/core/lib/promise/party.h | 413 ++++++++------------------------ test/core/promise/party_test.cc | 184 +------------- 3 files changed, 340 insertions(+), 603 deletions(-) diff --git a/src/core/lib/promise/party.cc b/src/core/lib/promise/party.cc index f1a9f70f772b4..cdf6dd10296bc 100644 --- a/src/core/lib/promise/party.cc +++ b/src/core/lib/promise/party.cc @@ -15,6 +15,7 @@ #include "src/core/lib/promise/party.h" #include +#include #include "absl/base/thread_annotations.h" #include "absl/log/check.h" @@ -39,7 +40,7 @@ namespace grpc_core { /////////////////////////////////////////////////////////////////////////////// // PartySyncUsingAtomics -GRPC_MUST_USE_RESULT bool PartySyncUsingAtomics::RefIfNonZero() { +GRPC_MUST_USE_RESULT bool Party::RefIfNonZero() { auto count = state_.load(std::memory_order_relaxed); do { // If zero, we are done (without an increment). If not, we must do a CAS @@ -55,33 +56,6 @@ GRPC_MUST_USE_RESULT bool PartySyncUsingAtomics::RefIfNonZero() { return true; } -bool PartySyncUsingAtomics::UnreffedLast() { - uint64_t prev_state = - state_.fetch_or(kDestroying | kLocked, std::memory_order_acq_rel); - LogStateChange("UnreffedLast", prev_state, - prev_state | kDestroying | kLocked); - return (prev_state & kLocked) == 0; -} - -bool PartySyncUsingAtomics::ScheduleWakeup(WakeupMask mask) { - // Or in the wakeup bit for the participant, AND the locked bit. - uint64_t prev_state = state_.fetch_or((mask & kWakeupMask) | kLocked, - std::memory_order_acq_rel); - LogStateChange("ScheduleWakeup", prev_state, - prev_state | (mask & kWakeupMask) | kLocked); - // If the lock was not held now we hold it, so we need to run. - return ((prev_state & kLocked) == 0); -} - -/////////////////////////////////////////////////////////////////////////////// -// PartySyncUsingMutex - -bool PartySyncUsingMutex::ScheduleWakeup(WakeupMask mask) { - MutexLock lock(&mu_); - wakeups_ |= mask; - return !std::exchange(locked_, true); -} - /////////////////////////////////////////////////////////////////////////////// // Party::Handle @@ -175,7 +149,7 @@ Party::Participant::~Participant() { Party::~Party() {} void Party::CancelRemainingParticipants() { - if (!sync_.has_participants()) return; + if ((state_.load(std::memory_order_relaxed) & kAllocatedMask) == 0) return; ScopedActivity activity(this); promise_detail::Context arena_ctx(arena_.get()); for (size_t i = 0; i < party_detail::kMaxParticipants; i++) { @@ -206,37 +180,42 @@ Waker Party::MakeNonOwningWaker() { void Party::ForceImmediateRepoll(WakeupMask mask) { DCHECK(is_current()); - sync_.ForceImmediateRepoll(mask); + wakeup_mask_ |= mask; } -void Party::RunLocked(Party* party) { +void Party::RunLockedAndUnref(Party* party, uint64_t prev_state) { GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked"); #ifdef GRPC_MAXIMIZE_THREADYNESS Thread thd( "RunParty", - [party]() { + [party, prev_state]() { ApplicationCallbackExecCtx app_exec_ctx; ExecCtx exec_ctx; - if (party->RunParty()) party->PartyIsOver(); + party->RunPartyAndUnref(prev_state); }, nullptr, Thread::Options().set_joinable(false)); thd.Start(); #else struct RunState; static thread_local RunState* g_run_state = nullptr; + struct PartyWakeup { + PartyWakeup() : party{nullptr} {} + PartyWakeup(Party* party, uint64_t prev_state) + : party{party}, prev_state{prev_state} {} + Party* party; + uint64_t prev_state; + }; struct RunState { - explicit RunState(Party* party) : running(party), next(nullptr) {} - Party* running; - Party* next; - void Run() { + explicit RunState(PartyWakeup first) : first{first}, next{} {} + PartyWakeup first; + PartyWakeup next; + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Run() { g_run_state = this; do { GRPC_LATENT_SEE_INNER_SCOPE("run_one_party"); - if (running->RunParty()) { - running->PartyIsOver(); - } - running = std::exchange(next, nullptr); - } while (running != nullptr); + first.party->RunPartyAndUnref(first.prev_state); + first = std::exchange(next, PartyWakeup{}); + } while (first.party != nullptr); DCHECK(g_run_state == this); g_run_state = nullptr; } @@ -245,119 +224,258 @@ void Party::RunLocked(Party* party) { // but instead add it to the end of the list of parties to run. // This enables a fairly straightforward batching of work from a // call to a transport (or back again). - if (g_run_state != nullptr) { - if (g_run_state->running == party || g_run_state->next == party) { - // Already running or already queued. + if (GPR_UNLIKELY(g_run_state != nullptr)) { + if (g_run_state->first.party == party) { + g_run_state->first.prev_state = prev_state; + party->Unref(); return; } - if (g_run_state->next != nullptr) { + if (g_run_state->next.party == party) { + g_run_state->next.prev_state = prev_state; + party->Unref(); + return; + } + if (g_run_state->next.party != nullptr) { // If there's already a different party queued, we're better off asking // event engine to run it so we can spread load. // We swap the oldest party to run on the event engine so that we don't // accidentally end up with a tail latency problem whereby one party // gets held for a really long time. - std::swap(g_run_state->next, party); + auto wakeup = + std::exchange(g_run_state->next, PartyWakeup{party, prev_state}); party->arena_->GetContext() - ->Run([party]() { + ->Run([wakeup]() { GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked offload"); ApplicationCallbackExecCtx app_exec_ctx; ExecCtx exec_ctx; - RunState{party}.Run(); + RunState{wakeup}.Run(); }); return; } - g_run_state->next = party; + g_run_state->next = PartyWakeup{party, prev_state}; return; } - RunState{party}.Run(); + RunState{{party, prev_state}}.Run(); #endif } -bool Party::RunParty() { +void Party::RunPartyAndUnref(uint64_t prev_state) { ScopedActivity activity(this); promise_detail::Context arena_ctx(arena_.get()); - return sync_.RunParty([this](int i) { return RunOneParticipant(i); }); -} - -bool Party::RunOneParticipant(int i) { - GRPC_LATENT_SEE_INNER_SCOPE("Party::RunOneParticipant"); - // If the participant is null, skip. - // This allows participants to complete whilst wakers still exist - // somewhere. - auto* participant = participants_[i].load(std::memory_order_acquire); - if (participant == nullptr) { - if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) { - LOG(INFO) << DebugTag() << "[party] wakeup " << i << " already complete"; + DCHECK_EQ(prev_state & kLocked, 0u) + << "Party should be unlocked prior to first wakeup"; + DCHECK_GE(prev_state & kRefMask, kOneRef); + // Now update prev_state to be what we want the CAS to see below. + DCHECK_EQ(prev_state & ~(kRefMask | kAllocatedMask), 0u) + << "Party should have contained no wakeups on lock"; + prev_state |= kLocked; + for (;;) { + uint64_t keep_allocated_mask = kAllocatedMask; + // For each wakeup bit... + while (wakeup_mask_ != 0) { + auto wakeup_mask = std::exchange(wakeup_mask_, 0); + while (wakeup_mask != 0) { + const uint64_t t = LowestOneBit(wakeup_mask); + const int i = CountTrailingZeros(t); + wakeup_mask ^= t; + // If the participant is null, skip. + // This allows participants to complete whilst wakers still exist + // somewhere. + auto* participant = participants_[i].load(std::memory_order_acquire); + if (GPR_UNLIKELY(participant == nullptr)) { + GRPC_TRACE_LOG(promise_primitives, INFO) + << "Party " << this << " Run:Wakeup " << i + << " already complete"; + continue; + } + GRPC_TRACE_LOG(promise_primitives, INFO) + << "Party " << this << " Run:Wakeup " << i; + // Poll the participant. + currently_polling_ = i; + if (participant->PollParticipantPromise()) { + participants_[i].store(nullptr, std::memory_order_relaxed); + const uint64_t allocated_bit = (1u << i << kAllocatedShift); + keep_allocated_mask &= ~allocated_bit; + } + } } - return false; - } - absl::string_view name; - if (GRPC_TRACE_FLAG_ENABLED(promise_primitives)) { - name = participant->name(); - LOG(INFO) << DebugTag() << "[" << name << "] begin job " << i; - } - // Poll the participant. - currently_polling_ = i; - bool done = participant->PollParticipantPromise(); - currently_polling_ = kNotPolling; - if (done) { - if (!name.empty()) { - GRPC_TRACE_LOG(promise_primitives, INFO) - << DebugTag() << "[" << name << "] end poll and finish job " << i; + currently_polling_ = kNotPolling; + // Try to CAS the state we expected to have (with no wakeups or adds) + // back to unlocked (by masking in only the ref mask - sans locked bit). + // If this succeeds then no wakeups were added, no adds were added, and we + // have successfully unlocked. + // Otherwise, we need to loop again. + // Note that if an owning waker is created or the weak cas spuriously + // fails we will also loop again, but in that case see no wakeups or adds + // and so will get back here fairly quickly. + // TODO(ctiller): consider mitigations for the accidental wakeup on owning + // waker creation case -- I currently expect this will be more expensive + // than this quick loop. + if (state_.compare_exchange_weak( + prev_state, + (prev_state & (kRefMask | keep_allocated_mask)) - kOneRef, + std::memory_order_acq_rel, std::memory_order_acquire)) { + LogStateChange("Run:End", prev_state, + prev_state & (kRefMask | kAllocatedMask) - kOneRef); + if ((prev_state & kRefMask) == kOneRef) { + // We're done with the party. + PartyIsOver(); + } + return; } - participants_[i].store(nullptr, std::memory_order_relaxed); - } else if (!name.empty()) { - GRPC_TRACE_LOG(promise_primitives, INFO) - << DebugTag() << "[" << name << "] end poll"; + // CAS out (but retrieve) any allocations and wakeups that occurred during + // the run. + while (!state_.compare_exchange_weak( + prev_state, prev_state & (kRefMask | kLocked | keep_allocated_mask))) { + // Nothing to do here. + } + LogStateChange("Run:Continue", prev_state, + prev_state & (kRefMask | kLocked | keep_allocated_mask)); + DCHECK(prev_state & kLocked) + << "Party should be locked; prev_state=" << prev_state; + DCHECK_GE(prev_state & kRefMask, kOneRef); + // From the previous state, extract which participants we're to wakeup. + wakeup_mask_ |= prev_state & kWakeupMask; + // Now update prev_state to be what we want the CAS to see once wakeups + // complete next iteration. + prev_state &= kRefMask | kLocked | keep_allocated_mask; } - return done; } void Party::AddParticipants(Participant** participants, size_t count) { - bool run_party = sync_.AddParticipantsAndRef(count, [this, participants, - count](size_t* slots) { + uint64_t state = state_.load(std::memory_order_acquire); + uint64_t allocated; + + size_t slots[party_detail::kMaxParticipants]; + + // Find slots for each new participant, ordering them from lowest available + // slot upwards to ensure the same poll ordering as presentation ordering to + // this function. + WakeupMask wakeup_mask; + uint64_t new_state; + do { + wakeup_mask = 0; + allocated = (state & kAllocatedMask) >> kAllocatedShift; for (size_t i = 0; i < count; i++) { - if (GRPC_TRACE_FLAG_ENABLED(party_state)) { - LOG(INFO) << "Party " << &sync_ << " AddParticipant: " - << participants[i]->name() << " @ " << slots[i] - << " [participant=" << participants[i] << "]"; + auto new_mask = LowestOneBit(~allocated); + if (GPR_UNLIKELY((new_mask & kWakeupMask) == 0)) { + DelayAddParticipants(participants, count); + return; } - participants_[slots[i]].store(participants[i], std::memory_order_release); + wakeup_mask |= new_mask; + allocated |= new_mask; + slots[i] = CountTrailingZeros(new_mask); } - }); - if (run_party) RunLocked(this); - Unref(); + // Try to allocate this slot and take a ref (atomically). + // Ref needs to be taken because once we store the participant it could be + // spuriously woken up and unref the party. + new_state = (state | (allocated << kAllocatedShift)) + kOneRef; + } while (!state_.compare_exchange_weak( + state, new_state, std::memory_order_acq_rel, std::memory_order_acquire)); + LogStateChange("AddParticipantsAndRef", state, new_state); + + for (size_t i = 0; i < count; i++) { + GRPC_TRACE_LOG(party_state, INFO) + << "Party " << this << " AddParticipant: " << slots[i] + << " " << participants[i]; + participants_[slots[i]].store(participants[i], std::memory_order_release); + } + + // Now we need to wake up the party. + WakeupFromState(new_state, wakeup_mask); } -void Party::Wakeup(WakeupMask wakeup_mask) { - if (sync_.ScheduleWakeup(wakeup_mask)) RunLocked(this); - Unref(); +void Party::AddParticipant(Participant* participant) { + uint64_t state = state_.load(std::memory_order_acquire); + uint64_t allocated; + size_t slot; + + // Find slots for each new participant, ordering them from lowest available + // slot upwards to ensure the same poll ordering as presentation ordering to + // this function. + uint64_t wakeup_mask; + uint64_t new_state; + do { + allocated = (state & kAllocatedMask) >> kAllocatedShift; + wakeup_mask = LowestOneBit(~allocated); + if (GPR_UNLIKELY((wakeup_mask & kWakeupMask) == 0)) { + DelayAddParticipants(&participant, 1); + return; + } + DCHECK_NE(wakeup_mask & kWakeupMask, 0u) + << "No available slots for new participant; allocated=" << allocated + << " state=" << state << " wakeup_mask=" << wakeup_mask; + allocated |= wakeup_mask; + slot = CountTrailingZeros(wakeup_mask); + // Try to allocate this slot and take a ref (atomically). + // Ref needs to be taken because once we store the participant it could be + // spuriously woken up and unref the party. + new_state = (state | (allocated << kAllocatedShift)) + kOneRef; + } while (!state_.compare_exchange_weak( + state, new_state, std::memory_order_acq_rel, std::memory_order_acquire)); + LogStateChange("AddParticipantsAndRef", state, new_state); + GRPC_TRACE_LOG(party_state, INFO) + << "Party " << this << " AddParticipant: " << slot + << " [participant=" << participant << "]"; + participants_[slot].store(participant, std::memory_order_release); + // Now we need to wake up the party. + WakeupFromState(new_state, wakeup_mask); +} + +void Party::DelayAddParticipants(Participant** participants, size_t count) { + // We need to delay the addition of participants. + IncrementRefCount(); + VLOG_EVERY_N_SEC(2, 10) << "Delaying addition of " << count + << " participants to party " << this + << " because it is full."; + std::vector delayed_participants{participants, + participants + count}; + arena_->GetContext()->Run( + [this, delayed_participants = std::move(delayed_participants)]() mutable { + ApplicationCallbackExecCtx app_exec_ctx; + ExecCtx exec_ctx; + AddParticipants(delayed_participants.data(), + delayed_participants.size()); + Unref(); + }); } void Party::WakeupAsync(WakeupMask wakeup_mask) { - if (sync_.ScheduleWakeup(wakeup_mask)) { - arena_->GetContext()->Run( - [this]() { - ApplicationCallbackExecCtx app_exec_ctx; - ExecCtx exec_ctx; - RunLocked(this); - Unref(); - }); - } else { - Unref(); + // Or in the wakeup bit for the participant, AND the locked bit. + uint64_t prev_state = state_.load(std::memory_order_relaxed); + LogStateChange("ScheduleWakeup", prev_state, + prev_state | (wakeup_mask & kWakeupMask) | kLocked); + while (true) { + if ((prev_state & kLocked) == 0) { + if (state_.compare_exchange_weak(prev_state, prev_state | kLocked, + std::memory_order_acq_rel, + std::memory_order_acquire)) { + LogStateChange("WakeupAsync", prev_state, prev_state | kLocked); + wakeup_mask_ |= wakeup_mask; + arena_->GetContext()->Run( + [this, prev_state]() { + ApplicationCallbackExecCtx app_exec_ctx; + ExecCtx exec_ctx; + RunLockedAndUnref(this, prev_state); + }); + return; + } + } else { + if (state_.compare_exchange_weak( + prev_state, (prev_state | wakeup_mask) - kOneRef, + std::memory_order_acq_rel, std::memory_order_acquire)) { + LogStateChange("WakeupAsync", prev_state, prev_state | wakeup_mask); + return; + } + } } } void Party::Drop(WakeupMask) { Unref(); } void Party::PartyIsOver() { - auto arena = arena_; - { - ScopedActivity activity(this); - promise_detail::Context arena_ctx(arena_.get()); - CancelRemainingParticipants(); - arena->DestroyManagedNewObjects(); - } + CancelRemainingParticipants(); + auto arena = std::move(arena_); this->~Party(); } diff --git a/src/core/lib/promise/party.h b/src/core/lib/promise/party.h index 36544c601cd33..b42c392d51f12 100644 --- a/src/core/lib/promise/party.h +++ b/src/core/lib/promise/party.h @@ -44,25 +44,6 @@ #include "src/core/lib/resource_quota/arena.h" #include "src/core/util/useful.h" -// Two implementations of party synchronization are provided: one using a single -// atomic, the other using a mutex and a set of state variables. -// Originally the atomic implementation was implemented, but we found some race -// conditions on Arm that were not reported by our default TSAN implementation. -// The mutex implementation was added to see if it would fix the problem, and -// it did. Later we found the race condition, so there's no known reason to use -// the mutex version - however we keep it around as a just in case measure. -// There's a thought of fuzzing the two implementations against each other as -// a correctness check of both, but that's not implemented yet. - -#define GRPC_PARTY_SYNC_USING_ATOMICS -// #define GRPC_PARTY_SYNC_USING_MUTEX - -#if defined(GRPC_PARTY_SYNC_USING_ATOMICS) + \ - defined(GRPC_PARTY_SYNC_USING_MUTEX) != \ - 1 -#error Must define a party sync mechanism -#endif - namespace grpc_core { namespace party_detail { @@ -73,264 +54,6 @@ static constexpr size_t kMaxParticipants = 16; } // namespace party_detail -class PartySyncUsingAtomics { - public: - explicit PartySyncUsingAtomics(size_t initial_refs) - : state_(kOneRef * initial_refs) {} - - void IncrementRefCount() { - const uint64_t prev_state = - state_.fetch_add(kOneRef, std::memory_order_relaxed); - LogStateChange("IncrementRefCount", prev_state, prev_state + kOneRef); - } - GRPC_MUST_USE_RESULT bool RefIfNonZero(); - // Returns true if the ref count is now zero and the caller should call - // PartyIsOver - GRPC_MUST_USE_RESULT bool Unref() { - const uint64_t prev_state = - state_.fetch_sub(kOneRef, std::memory_order_acq_rel); - LogStateChange("Unref", prev_state, prev_state - kOneRef); - if ((prev_state & kRefMask) == kOneRef) { - return UnreffedLast(); - } - return false; - } - void ForceImmediateRepoll(WakeupMask mask) { - // Or in the bit for the currently polling participant. - // Will be grabbed next round to force a repoll of this promise. - const uint64_t prev_state = - state_.fetch_or(mask, std::memory_order_relaxed); - LogStateChange("ForceImmediateRepoll", prev_state, prev_state | mask); - } - - // Run the update loop: poll_one_participant is called with an integral index - // for the participant that should be polled. It should return true if the - // participant completed and should be removed from the allocated set. - template - GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) { - // Grab the current state, and clear the wakeup bits & add flag. - uint64_t prev_state = state_.fetch_and(kRefMask | kLocked | kAllocatedMask, - std::memory_order_acquire); - LogStateChange("Run", prev_state, - prev_state & (kRefMask | kLocked | kAllocatedMask)); - CHECK(prev_state & kLocked); - if (prev_state & kDestroying) return true; - // From the previous state, extract which participants we're to wakeup. - uint64_t wakeups = prev_state & kWakeupMask; - // Now update prev_state to be what we want the CAS to see below. - prev_state &= kRefMask | kLocked | kAllocatedMask; - for (;;) { - uint64_t keep_allocated_mask = kAllocatedMask; - // For each wakeup bit... - while (wakeups != 0) { - uint64_t t = LowestOneBit(wakeups); - const int i = CountTrailingZeros(t); - wakeups ^= t; - // If the bit is not set, skip. - if (poll_one_participant(i)) { - const uint64_t allocated_bit = (1u << i << kAllocatedShift); - keep_allocated_mask &= ~allocated_bit; - } - } - // Try to CAS the state we expected to have (with no wakeups or adds) - // back to unlocked (by masking in only the ref mask - sans locked bit). - // If this succeeds then no wakeups were added, no adds were added, and we - // have successfully unlocked. - // Otherwise, we need to loop again. - // Note that if an owning waker is created or the weak cas spuriously - // fails we will also loop again, but in that case see no wakeups or adds - // and so will get back here fairly quickly. - // TODO(ctiller): consider mitigations for the accidental wakeup on owning - // waker creation case -- I currently expect this will be more expensive - // than this quick loop. - if (state_.compare_exchange_weak( - prev_state, (prev_state & (kRefMask | keep_allocated_mask)), - std::memory_order_acq_rel, std::memory_order_acquire)) { - LogStateChange("Run:End", prev_state, - prev_state & (kRefMask | kAllocatedMask)); - return false; - } - while (!state_.compare_exchange_weak( - prev_state, - prev_state & (kRefMask | kLocked | keep_allocated_mask))) { - // Nothing to do here. - } - LogStateChange("Run:Continue", prev_state, - prev_state & (kRefMask | kLocked | keep_allocated_mask)); - CHECK(prev_state & kLocked); - if (prev_state & kDestroying) return true; - // From the previous state, extract which participants we're to wakeup. - wakeups = prev_state & kWakeupMask; - // Now update prev_state to be what we want the CAS to see once wakeups - // complete next iteration. - prev_state &= kRefMask | kLocked | keep_allocated_mask; - } - return false; - } - - // Add new participants to the party. Returns true if the caller should run - // the party. store is called with an array of indices of the new - // participants. Adds a ref that should be dropped by the caller after - // RunParty has been called (if that was required). - template - GRPC_MUST_USE_RESULT bool AddParticipantsAndRef(size_t count, F store) { - uint64_t state = state_.load(std::memory_order_acquire); - uint64_t allocated; - - size_t slots[party_detail::kMaxParticipants]; - - // Find slots for each new participant, ordering them from lowest available - // slot upwards to ensure the same poll ordering as presentation ordering to - // this function. - WakeupMask wakeup_mask; - do { - wakeup_mask = 0; - allocated = (state & kAllocatedMask) >> kAllocatedShift; - for (size_t i = 0; i < count; i++) { - auto new_mask = LowestOneBit(~allocated); - wakeup_mask |= new_mask; - allocated |= new_mask; - slots[i] = CountTrailingZeros(new_mask); - } - // Try to allocate this slot and take a ref (atomically). - // Ref needs to be taken because once we store the participant it could be - // spuriously woken up and unref the party. - } while (!state_.compare_exchange_weak( - state, (state | (allocated << kAllocatedShift)) + kOneRef, - std::memory_order_acq_rel, std::memory_order_acquire)); - LogStateChange("AddParticipantsAndRef", state, - (state | (allocated << kAllocatedShift)) + kOneRef); - - store(slots); - - // Now we need to wake up the party. - state = state_.fetch_or(wakeup_mask | kLocked, std::memory_order_release); - LogStateChange("AddParticipantsAndRef:Wakeup", state, - state | wakeup_mask | kLocked); - - // If the party was already locked, we're done. - return ((state & kLocked) == 0); - } - - // Schedule a wakeup for the given participant. - // Returns true if the caller should run the party. - GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask); - - bool has_participants() const { - return (state_.load(std::memory_order_relaxed) & kAllocatedMask) != 0; - } - - private: - bool UnreffedLast(); - - void LogStateChange(const char* op, uint64_t prev_state, uint64_t new_state, - DebugLocation loc = {}) { - if (GRPC_TRACE_FLAG_ENABLED(party_state)) { - LOG(INFO).AtLocation(loc.file(), loc.line()) - << absl::StrFormat("Party %p %30s: %016" PRIx64 " -> %016" PRIx64, - this, op, prev_state, new_state); - } - } - - // State bits: - // The atomic state_ field is composed of the following: - // - 24 bits for ref counts - // 1 is owned by the party prior to Orphan() - // All others are owned by owning wakers - // - 1 bit to indicate whether the party is locked - // The first thread to set this owns the party until it is unlocked - // That thread will run the main loop until no further work needs to - // be done. - // - 1 bit to indicate whether there are participants waiting to be - // added - // - 16 bits, one per participant, indicating which participants have - // been - // woken up and should be polled next time the main loop runs. - - // clang-format off - // Bits used to store 16 bits of wakeups - static constexpr uint64_t kWakeupMask = 0x0000'0000'0000'ffff; - // Bits used to store 16 bits of allocated participant slots. - static constexpr uint64_t kAllocatedMask = 0x0000'0000'ffff'0000; - // Bit indicating destruction has begun (refs went to zero) - static constexpr uint64_t kDestroying = 0x0000'0001'0000'0000; - // Bit indicating locked or not - static constexpr uint64_t kLocked = 0x0000'0008'0000'0000; - // Bits used to store 24 bits of ref counts - static constexpr uint64_t kRefMask = 0xffff'ff00'0000'0000; - // clang-format on - - // Shift to get from a participant mask to an allocated mask. - static constexpr size_t kAllocatedShift = 16; - // How far to shift to get the refcount - static constexpr size_t kRefShift = 40; - // One ref count - static constexpr uint64_t kOneRef = 1ull << kRefShift; - - std::atomic state_; -}; - -class PartySyncUsingMutex { - public: - explicit PartySyncUsingMutex(size_t initial_refs) : refs_(initial_refs) {} - - void IncrementRefCount() { refs_.Ref(); } - GRPC_MUST_USE_RESULT bool RefIfNonZero() { return refs_.RefIfNonZero(); } - GRPC_MUST_USE_RESULT bool Unref() { return refs_.Unref(); } - void ForceImmediateRepoll(WakeupMask mask) { - MutexLock lock(&mu_); - wakeups_ |= mask; - } - template - GRPC_MUST_USE_RESULT bool RunParty(F poll_one_participant) { - WakeupMask freed = 0; - while (true) { - ReleasableMutexLock lock(&mu_); - CHECK(locked_); - allocated_ &= ~std::exchange(freed, 0); - auto wakeup = std::exchange(wakeups_, 0); - if (wakeup == 0) { - locked_ = false; - return false; - } - lock.Release(); - for (size_t i = 0; wakeup != 0; i++, wakeup >>= 1) { - if ((wakeup & 1) == 0) continue; - if (poll_one_participant(i)) freed |= 1 << i; - } - } - } - - template - GRPC_MUST_USE_RESULT bool AddParticipantsAndRef(size_t count, F store) { - IncrementRefCount(); - MutexLock lock(&mu_); - size_t slots[party_detail::kMaxParticipants]; - WakeupMask wakeup_mask = 0; - size_t n = 0; - for (size_t bit = 0; n < count && bit < party_detail::kMaxParticipants; - bit++) { - if (allocated_ & (1 << bit)) continue; - slots[n++] = bit; - wakeup_mask |= 1 << bit; - allocated_ |= 1 << bit; - } - CHECK(n == count); - store(slots); - wakeups_ |= wakeup_mask; - return !std::exchange(locked_, true); - } - - GRPC_MUST_USE_RESULT bool ScheduleWakeup(WakeupMask mask); - - private: - RefCount refs_; - Mutex mu_; - WakeupMask allocated_ ABSL_GUARDED_BY(mu_) = 0; - WakeupMask wakeups_ ABSL_GUARDED_BY(mu_) = 0; - bool locked_ ABSL_GUARDED_BY(mu_) = false; -}; - // A Party is an Activity with multiple participant promises. class Party : public Activity, private Wakeable { private: @@ -340,7 +63,6 @@ class Party : public Activity, private Wakeable { // One participant in the party. class Participant { public: - explicit Participant(absl::string_view name) : name_(name) {} // Poll the participant. Return true if complete. // Participant should take care of its own deallocation in this case. virtual bool PollParticipantPromise() = 0; @@ -351,14 +73,11 @@ class Party : public Activity, private Wakeable { // Return a Handle instance for this participant. Wakeable* MakeNonOwningWakeable(Party* party); - absl::string_view name() const { return name_; } - protected: ~Participant(); private: Handle* handle_ = nullptr; - absl::string_view name_; }; public: @@ -400,10 +119,17 @@ class Party : public Activity, private Wakeable { Waker MakeNonOwningWaker() final; std::string ActivityDebugTag(WakeupMask wakeup_mask) const final; - void IncrementRefCount() { sync_.IncrementRefCount(); } - void Unref() { - if (sync_.Unref()) PartyIsOver(); + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void IncrementRefCount() { + const uint64_t prev_state = + state_.fetch_add(kOneRef, std::memory_order_relaxed); + LogStateChange("IncrementRefCount", prev_state, prev_state + kOneRef); + } + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Unref() { + uint64_t prev_state = state_.fetch_sub(kOneRef, std::memory_order_acq_rel); + LogStateChange("Unref", prev_state, prev_state - kOneRef); + if ((prev_state & kRefMask) == kOneRef) PartyIsOver(); } + RefCountedPtr Ref() { IncrementRefCount(); return RefCountedPtr(this); @@ -432,17 +158,15 @@ class Party : public Activity, private Wakeable { friend class Arena; // Derived types should be constructed upon `arena`. - explicit Party(RefCountedPtr arena) - : sync_(1), arena_(std::move(arena)) {} + explicit Party(RefCountedPtr arena) : arena_(std::move(arena)) {} ~Party() override; // Main run loop. Must be locked. // Polls participants and drains the add queue until there is no work left to // be done. - // Returns true if the party is over. - GRPC_MUST_USE_RESULT bool RunParty(); + void RunPartyAndUnref(uint64_t prev_state); - bool RefIfNonZero() { return sync_.RefIfNonZero(); } + bool RefIfNonZero(); private: // Concrete implementation of a participant for some promise & oncomplete @@ -453,9 +177,9 @@ class Party : public Activity, private Wakeable { using Promise = typename Factory::Promise; public: - ParticipantImpl(absl::string_view name, SuppliedFactory promise_factory, + ParticipantImpl(absl::string_view, SuppliedFactory promise_factory, OnComplete on_complete) - : Participant(name), on_complete_(std::move(on_complete)) { + : on_complete_(std::move(on_complete)) { Construct(&factory_, std::move(promise_factory)); } ~ParticipantImpl() { @@ -503,9 +227,7 @@ class Party : public Activity, private Wakeable { using Result = typename Promise::Result; public: - PromiseParticipantImpl(absl::string_view name, - SuppliedFactory promise_factory) - : Participant(name) { + PromiseParticipantImpl(absl::string_view, SuppliedFactory promise_factory) { Construct(&factory_, std::move(promise_factory)); } @@ -576,38 +298,113 @@ class Party : public Activity, private Wakeable { std::atomic state_{State::kFactory}; }; + // State bits: + // The atomic state_ field is composed of the following: + // - 24 bits for ref counts + // 1 is owned by the party prior to Orphan() + // All others are owned by owning wakers + // - 1 bit to indicate whether the party is locked + // The first thread to set this owns the party until it is unlocked + // That thread will run the main loop until no further work needs to + // be done. + // - 1 bit to indicate whether there are participants waiting to be + // added + // - 16 bits, one per participant, indicating which participants have + // been + // woken up and should be polled next time the main loop runs. + + // clang-format off + // Bits used to store 16 bits of wakeups + static constexpr uint64_t kWakeupMask = 0x0000'0000'0000'ffff; + // Bits used to store 16 bits of allocated participant slots. + static constexpr uint64_t kAllocatedMask = 0x0000'0000'ffff'0000; + // Bit indicating locked or not + static constexpr uint64_t kLocked = 0x0000'0008'0000'0000; + // Bits used to store 24 bits of ref counts + static constexpr uint64_t kRefMask = 0xffff'ff00'0000'0000; + // clang-format on + + // Shift to get from a participant mask to an allocated mask. + static constexpr size_t kAllocatedShift = 16; + // How far to shift to get the refcount + static constexpr size_t kRefShift = 40; + // One ref count + static constexpr uint64_t kOneRef = 1ull << kRefShift; + // Destroy any remaining participants. // Needs to have normal context setup before calling. void CancelRemainingParticipants(); // Run the locked part of the party until it is unlocked. - static void RunLocked(Party* party); + static void RunLockedAndUnref(Party* party, uint64_t prev_state); // Called in response to Unref() hitting zero - ultimately calls PartyOver, // but needs to set some stuff up. // Here so it gets compiled out of line. void PartyIsOver(); // Wakeable implementation - void Wakeup(WakeupMask wakeup_mask) final; + void Wakeup(WakeupMask wakeup_mask) final { + if (Activity::current() == this) { + wakeup_mask_ |= wakeup_mask; + Unref(); + return; + } + WakeupFromState(state_.load(std::memory_order_relaxed), wakeup_mask); + } + + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void WakeupFromState( + uint64_t cur_state, WakeupMask wakeup_mask) { + DCHECK_NE(wakeup_mask & kWakeupMask, 0u) + << "Wakeup mask must be non-zero: " << wakeup_mask; + while (true) { + if (cur_state & kLocked) { + // If the party is locked, we need to set the wakeup bits, and then + // we'll immediately unref. Since something is running this should never + // bring the refcount to zero. + DCHECK_GT(cur_state & kRefMask, kOneRef); + auto new_state = (cur_state | wakeup_mask) - kOneRef; + if (state_.compare_exchange_weak(cur_state, new_state, + std::memory_order_release)) { + LogStateChange("Wakeup", cur_state, cur_state | wakeup_mask); + return; + } + } else { + // If the party is not locked, we need to lock it and run. + DCHECK_EQ(cur_state & kWakeupMask, 0u); + if (state_.compare_exchange_weak(cur_state, cur_state | kLocked, + std::memory_order_acq_rel)) { + LogStateChange("WakeupAndRun", cur_state, cur_state | kLocked); + wakeup_mask_ |= wakeup_mask; + RunLockedAndUnref(this, cur_state); + return; + } + } + } + } + void WakeupAsync(WakeupMask wakeup_mask) final; void Drop(WakeupMask wakeup_mask) final; // Add a participant (backs Spawn, after type erasure to ParticipantFactory). void AddParticipants(Participant** participant, size_t count); - bool RunOneParticipant(int i); + void AddParticipant(Participant* participant); + void DelayAddParticipants(Participant** participant, size_t count); + + GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void LogStateChange( + const char* op, uint64_t prev_state, uint64_t new_state, + DebugLocation loc = {}) { + GRPC_TRACE_LOG(party_state, INFO).AtLocation(loc.file(), loc.line()) + << DebugTag() << " " << op << " " + << absl::StrFormat("%016" PRIx64 " -> %016" PRIx64, prev_state, + new_state); + } // Sentinal value for currently_polling_ when no participant is being polled. static constexpr uint8_t kNotPolling = 255; -#ifdef GRPC_PARTY_SYNC_USING_ATOMICS - PartySyncUsingAtomics sync_; -#elif defined(GRPC_PARTY_SYNC_USING_MUTEX) - PartySyncUsingMutex sync_; -#else -#error No synchronization method defined -#endif - + std::atomic state_{kOneRef}; uint8_t currently_polling_ = kNotPolling; + WakeupMask wakeup_mask_ = 0; // All current participants, using a tagged format. // If the lower bit is unset, then this is a Participant*. // If the lower bit is set, then this is a ParticipantFactory*. @@ -633,8 +430,8 @@ void Party::BulkSpawner::Spawn(absl::string_view name, Factory promise_factory, template void Party::Spawn(absl::string_view name, Factory promise_factory, OnComplete on_complete) { - BulkSpawner(this).Spawn(name, std::move(promise_factory), - std::move(on_complete)); + AddParticipant(new ParticipantImpl( + name, std::move(promise_factory), std::move(on_complete))); } template @@ -642,7 +439,7 @@ auto Party::SpawnWaitable(absl::string_view name, Factory promise_factory) { auto participant = MakeRefCounted>( name, std::move(promise_factory)); Participant* p = participant->Ref().release(); - AddParticipants(&p, 1); + AddParticipant(p); return [participant = std::move(participant)]() mutable { return participant->PollCompletion(); }; diff --git a/test/core/promise/party_test.cc b/test/core/promise/party_test.cc index 110377d498283..5695abffe0947 100644 --- a/test/core/promise/party_test.cc +++ b/test/core/promise/party_test.cc @@ -48,187 +48,6 @@ namespace grpc_core { -/////////////////////////////////////////////////////////////////////////////// -// PartySyncTest - -template -class PartySyncTest : public ::testing::Test {}; - -// PartySyncUsingMutex isn't working on Mac, but we don't use it for anything -// right now so that's fine. -#ifdef GPR_APPLE -using PartySyncTypes = ::testing::Types; -#else -using PartySyncTypes = - ::testing::Types; -#endif -TYPED_TEST_SUITE(PartySyncTest, PartySyncTypes); - -TYPED_TEST(PartySyncTest, NoOp) { TypeParam sync(1); } - -TYPED_TEST(PartySyncTest, RefAndUnref) { - Notification half_way; - TypeParam sync(1); - std::thread thread1([&] { - for (int i = 0; i < 1000000; i++) { - sync.IncrementRefCount(); - } - half_way.Notify(); - for (int i = 0; i < 1000000; i++) { - sync.IncrementRefCount(); - } - for (int i = 0; i < 2000000; i++) { - EXPECT_FALSE(sync.Unref()); - } - }); - half_way.WaitForNotification(); - for (int i = 0; i < 2000000; i++) { - sync.IncrementRefCount(); - } - for (int i = 0; i < 2000000; i++) { - EXPECT_FALSE(sync.Unref()); - } - thread1.join(); - EXPECT_TRUE(sync.Unref()); -} - -TYPED_TEST(PartySyncTest, AddAndRemoveParticipant) { - TypeParam sync(1); - std::vector threads; - std::atomic*> participants[party_detail::kMaxParticipants] = - {}; - threads.reserve(8); - for (int i = 0; i < 8; i++) { - threads.emplace_back([&] { - for (int i = 0; i < 100000; i++) { - auto done = std::make_unique>(false); - int slot = -1; - bool run = sync.AddParticipantsAndRef(1, [&](size_t* idxs) { - slot = idxs[0]; - participants[slot].store(done.get(), std::memory_order_release); - }); - EXPECT_NE(slot, -1); - if (run) { - bool run_any = false; - bool run_me = false; - EXPECT_FALSE(sync.RunParty([&](int slot) { - run_any = true; - std::atomic* participant = - participants[slot].exchange(nullptr, std::memory_order_acquire); - if (participant == done.get()) run_me = true; - if (participant == nullptr) { - LOG(ERROR) << "Participant was null (spurious wakeup observed)"; - return false; - } - participant->store(true, std::memory_order_release); - return true; - })); - EXPECT_TRUE(run_any); - EXPECT_TRUE(run_me); - } - EXPECT_FALSE(sync.Unref()); - while (!done->load(std::memory_order_acquire)) { - } - } - }); - } - for (auto& thread : threads) { - thread.join(); - } - EXPECT_TRUE(sync.Unref()); -} - -TYPED_TEST(PartySyncTest, AddAndRemoveTwoParticipants) { - TypeParam sync(1); - std::vector threads; - std::atomic*> participants[party_detail::kMaxParticipants] = - {}; - threads.reserve(8); - for (int i = 0; i < 4; i++) { - threads.emplace_back([&] { - for (int i = 0; i < 100000; i++) { - auto done = std::make_unique>(2); - int slots[2] = {-1, -1}; - bool run = sync.AddParticipantsAndRef(2, [&](size_t* idxs) { - for (int i = 0; i < 2; i++) { - slots[i] = idxs[i]; - participants[slots[i]].store(done.get(), std::memory_order_release); - } - }); - EXPECT_NE(slots[0], -1); - EXPECT_NE(slots[1], -1); - EXPECT_GT(slots[1], slots[0]); - if (run) { - bool run_any = false; - int run_me = 0; - EXPECT_FALSE(sync.RunParty([&](int slot) { - run_any = true; - std::atomic* participant = - participants[slot].exchange(nullptr, std::memory_order_acquire); - if (participant == done.get()) run_me++; - if (participant == nullptr) { - LOG(ERROR) << "Participant was null (spurious wakeup observed)"; - return false; - } - participant->fetch_sub(1, std::memory_order_release); - return true; - })); - EXPECT_TRUE(run_any); - EXPECT_EQ(run_me, 2); - } - EXPECT_FALSE(sync.Unref()); - while (done->load(std::memory_order_acquire) != 0) { - } - } - }); - } - for (auto& thread : threads) { - thread.join(); - } - EXPECT_TRUE(sync.Unref()); -} - -TYPED_TEST(PartySyncTest, UnrefWhileRunning) { - std::vector trials; - std::atomic delete_paths_taken[3] = {{0}, {0}, {0}}; - trials.reserve(100); - for (int i = 0; i < 100; i++) { - trials.emplace_back([&delete_paths_taken] { - TypeParam sync(1); - int delete_path = -1; - EXPECT_TRUE(sync.AddParticipantsAndRef( - 1, [](size_t* slots) { EXPECT_EQ(slots[0], 0); })); - std::thread run_party([&] { - if (sync.RunParty([&sync, n = 0](int slot) mutable { - EXPECT_EQ(slot, 0); - ++n; - if (n < 10) { - sync.ForceImmediateRepoll(1); - return false; - } - return true; - })) { - delete_path = 0; - } - }); - std::thread unref([&] { - if (sync.Unref()) delete_path = 1; - }); - if (sync.Unref()) delete_path = 2; - run_party.join(); - unref.join(); - EXPECT_GE(delete_path, 0); - delete_paths_taken[delete_path].fetch_add(1, std::memory_order_relaxed); - }); - } - for (auto& trial : trials) { - trial.join(); - } - fprintf(stderr, "DELETE_PATHS: RunParty:%d AsyncUnref:%d SyncUnref:%d\n", - delete_paths_taken[0].load(), delete_paths_taken[1].load(), - delete_paths_taken[2].load()); -} - /////////////////////////////////////////////////////////////////////////////// // PartyTest @@ -704,6 +523,7 @@ TEST_F(PartyTest, NestedWakeup) { auto party2 = MakeParty(); auto party3 = MakeParty(); int whats_going_on = 0; + Notification done1; Notification started2; Notification done2; Notification started3; @@ -716,6 +536,7 @@ TEST_F(PartyTest, NestedWakeup) { party2->Spawn( "p2", [&]() { + done1.WaitForNotification(); started2.Notify(); started3.WaitForNotification(); EXPECT_EQ(whats_going_on, 3); @@ -749,6 +570,7 @@ TEST_F(PartyTest, NestedWakeup) { [&](Empty) { EXPECT_EQ(whats_going_on, 2); whats_going_on = 3; + done1.Notify(); }); notify_done.WaitForNotification(); }