Skip to content

Commit

Permalink
Add a stats exporter for latent see.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 663505735
  • Loading branch information
Vignesh2208 authored and copybara-github committed Aug 16, 2024
1 parent 605a15e commit 3063716
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 29 deletions.
2 changes: 2 additions & 0 deletions src/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ grpc_cc_library(
"util/latent_see.h",
],
external_deps = [
"absl/base:core_headers",
"absl/functional:any_invocable",
"absl/log",
"absl/strings",
"absl/types:optional",
Expand Down
19 changes: 14 additions & 5 deletions src/core/util/latent_see.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,25 @@ thread_local uint64_t Log::thread_id_ = Log::Get().next_thread_id_.fetch_add(1);
thread_local Bin* Log::bin_ = nullptr;
thread_local void* Log::bin_owner_ = nullptr;
std::atomic<uint64_t> Flow::next_flow_id_{1};
std::atomic<Bin*> Log::free_bins_;
std::atomic<uintptr_t> Log::free_bins_{0};

std::string Log::GenerateJson() {
std::vector<RecordedEvent> events;
RingBuffer<RecordedEvent, Log::kMaxEventsPerCpu>* other;
for (auto& fragment : fragments_) {
MutexLock lock(&fragment.mu);
for (auto it = fragment.events.begin(); it != fragment.events.end(); ++it) {
{
MutexLock lock(&fragment.mu);
other = fragment.active;
if (fragment.active == &fragment.primary) {
fragment.active = &fragment.secondary;
} else {
fragment.active = &fragment.primary;
}
}
for (auto it = other->begin(); it != other->end(); ++it) {
events.push_back(*it);
}
fragment.events.Clear();
other->Clear();
}
absl::optional<std::chrono::steady_clock::time_point> start_time;
for (auto& event : events) {
Expand Down Expand Up @@ -113,7 +122,7 @@ void Log::FlushBin(Bin* bin) {
{
MutexLock lock(&fragment.mu);
for (auto event : bin->events) {
fragment.events.Append(RecordedEvent{thread_id, batch_id, event});
fragment.active->Append(RecordedEvent{thread_id, batch_id, event});
}
}
bin->events.clear();
Expand Down
89 changes: 65 additions & 24 deletions src/core/util/latent_see.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include <grpc/support/port_platform.h>

#ifdef GRPC_ENABLE_LATENT_SEE
#include <sys/syscall.h>
#include <unistd.h>

#include <atomic>
#include <chrono>
#include <cstdint>
Expand All @@ -27,12 +30,17 @@
#include <utility>
#include <vector>

#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/log.h"
#include "absl/strings/string_view.h"

#include "src/core/lib/gprpp/per_cpu.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/util/ring_buffer.h"

#define TAGGED_POINTER_SIZE_BITS 48

namespace grpc_core {
namespace latent_see {

Expand All @@ -59,47 +67,59 @@ struct Bin {
}

std::vector<Event> events;
Bin* next_free;
uintptr_t next_free = 0;
};

class Log {
public:
static constexpr int kMaxEventsPerCpu = 50000;
static Bin* MaybeStartBin(void* owner) {
if (bin_ != nullptr) return bin_;
Bin* bin = free_bins_.load(std::memory_order_acquire);
static constexpr uintptr_t kTagMask = (1ULL << TAGGED_POINTER_SIZE_BITS) - 1;

GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static uintptr_t IncrementTag(
uintptr_t input) {
return input + (1UL << TAGGED_POINTER_SIZE_BITS);
}

GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Bin* ToBin(uintptr_t ptr) {
return reinterpret_cast<Bin*>(ptr & kTagMask);
}

static uintptr_t StartBin(void* owner) {
uintptr_t bin_descriptor = free_bins_.load(std::memory_order_acquire);
Bin* bin;
do {
if (bin == nullptr) {
if (bin_descriptor == 0) {
bin = new Bin();
break;
}
} while (!free_bins_.compare_exchange_weak(bin, bin->next_free,
std::memory_order_acq_rel));
bin = ToBin(bin_descriptor);
} while (!free_bins_.compare_exchange_strong(bin_descriptor, bin->next_free,
std::memory_order_acq_rel));
bin_ = bin;
bin_owner_ = owner;
return bin;
return reinterpret_cast<uintptr_t>(bin);
}

static void EndBin(void* owner) {
if (bin_owner_ != owner) return;
FlushBin(bin_);
bin_->next_free = free_bins_.load(std::memory_order_acquire);
while (!free_bins_.compare_exchange_weak(bin_->next_free, bin_,
std::memory_order_acq_rel)) {
static void EndBin(uintptr_t bin_descriptor, void* owner) {
if (bin_owner_ != owner || bin_descriptor == 0) return;
FlushBin(ToBin(bin_descriptor));
uintptr_t next_free = free_bins_.load(std::memory_order_acquire);
while (!free_bins_.compare_exchange_strong(
next_free, IncrementTag(bin_descriptor), std::memory_order_acq_rel)) {
}
bin_ = nullptr;
bin_owner_ = nullptr;
}

static Bin* CurrentThreadBin() { return bin_; }

private:
Log() = default;

static void FlushBin(Bin* bin);

GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Log& Get() {
static Log* log = []() {
atexit([] {
if (log->stats_flusher_ != nullptr) {
log->stats_flusher_(log->GenerateJson());
return;
}
LOG(INFO) << "Writing latent_see.json in " << get_current_dir_name();
FILE* f = fopen("latent_see.json", "w");
if (f == nullptr) return;
Expand All @@ -113,6 +133,16 @@ class Log {

std::string GenerateJson();

void OverrideStatsFlusher(
absl::AnyInvocable<void(absl::string_view)> stats_exporter) {
stats_flusher_ = std::move(stats_exporter);
}

private:
Log() = default;

static void FlushBin(Bin* bin);

struct RecordedEvent {
uint64_t thread_id;
uint64_t batch_id;
Expand All @@ -123,10 +153,15 @@ class Log {
static thread_local uint64_t thread_id_;
static thread_local Bin* bin_;
static thread_local void* bin_owner_;
static std::atomic<Bin*> free_bins_;
static std::atomic<uintptr_t> free_bins_;
absl::AnyInvocable<void(absl::string_view)> stats_flusher_ = nullptr;
struct Fragment {
Fragment() : active(&primary){};
Mutex mu;
RingBuffer<RecordedEvent, Log::kMaxEventsPerCpu> events ABSL_GUARDED_BY(mu);
RingBuffer<RecordedEvent, Log::kMaxEventsPerCpu>* active
ABSL_GUARDED_BY(mu);
RingBuffer<RecordedEvent, Log::kMaxEventsPerCpu> primary;
RingBuffer<RecordedEvent, Log::kMaxEventsPerCpu> secondary;
};
PerCpu<Fragment> fragments_{PerCpuOptions()};
};
Expand All @@ -136,20 +171,26 @@ class Scope {
public:
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit Scope(const Metadata* metadata)
: metadata_(metadata) {
bin_ = Log::CurrentThreadBin();
if (kParent && bin_ == nullptr) {
bin_descriptor_ = Log::StartBin(this);
bin_ = Log::ToBin(bin_descriptor_);
}
CHECK_NE(bin_, nullptr);
bin_->Append(metadata_, EventType::kBegin, 0);
}
GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION ~Scope() {
bin_->Append(metadata_, EventType::kEnd, 0);
if (kParent) Log::EndBin(this);
if (kParent) Log::EndBin(bin_descriptor_, this);
}

Scope(const Scope&) = delete;
Scope& operator=(const Scope&) = delete;

private:
const Metadata* const metadata_;
Bin* const bin_ =
kParent ? Log::MaybeStartBin(this) : Log::CurrentThreadBin();
uintptr_t bin_descriptor_ = 0;
Bin* bin_ = nullptr;
};

using ParentScope = Scope<true>;
Expand Down

0 comments on commit 3063716

Please sign in to comment.