Skip to content

Commit

Permalink
scheduling_group: improve scheduling group creation exception safety
Browse files Browse the repository at this point in the history
Improve handling of exceptions during scheduling group and scheduling
group key creation, where a user-provided constructor for the keys may
fail, for example.

We use a new struct `specific_val` and smart pointers to manage memory
allocation, construction and destruction of scheduling group data in a
safe manner.

We also reorder the initialization order to make it safer. For
example, when creating a scheduling group, first allocate all data and
then swap it into the scheduling group's data structure.

Fixes #2222
  • Loading branch information
mlitvk committed Jan 16, 2025
1 parent d80acda commit cd957c2
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 69 deletions.
8 changes: 8 additions & 0 deletions include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,14 @@ private:
std::atomic<bool> _dying{false};
gate _background_gate;

inline auto& get_sg_data(const scheduling_group& sg) {
return _scheduling_group_specific_data.per_scheduling_group_data[sg._id];
}

inline auto& get_sg_data(unsigned sg_id) {
return _scheduling_group_specific_data.per_scheduling_group_data[sg_id];
}

private:
static std::chrono::nanoseconds calculate_poll_time();
static void block_notifier(int);
Expand Down
65 changes: 57 additions & 8 deletions include/seastar/core/scheduling_specific.hh
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,66 @@ namespace seastar {
namespace internal {

struct scheduling_group_specific_thread_local_data {
using val_ptr = std::unique_ptr<void, void (*)(void*) noexcept>;
using cfg_ptr = lw_shared_ptr<scheduling_group_key_config>;

struct specific_val {
val_ptr valp;
cfg_ptr cfg;

specific_val() : valp(nullptr, &free), cfg(nullptr) {}

specific_val(val_ptr&& valp_, const cfg_ptr& cfg_) : valp(std::move(valp_)), cfg(cfg_) {
if (valp && cfg->constructor) {
cfg->constructor(valp.get());
}
}

~specific_val() {
if (valp && cfg->destructor) {
cfg->destructor(valp.get());
}
}

specific_val(const specific_val& other) = delete;
specific_val& operator=(const specific_val& other) = delete;

specific_val(specific_val&& other) : valp(std::move(other.valp)), cfg(other.cfg) {}

specific_val& operator=(specific_val&& other) {
if (this != &other) {
valp = std::move(other.valp);
cfg = std::move(other.cfg);
}
return *this;
}

void* get() { return valp.get(); }

void rename() {
if (valp && cfg->rename) {
cfg->rename(valp.get());
}
}
};

struct per_scheduling_group {
bool queue_is_initialized = false;
/**
* This array holds pointers to the scheduling group specific
* data. The pointer is not use as is but is cast to a reference
* to the appropriate type that is actually pointed to.
*/
std::vector<void*> specific_vals;
std::vector<specific_val> specific_vals;

void rename() {
for (auto& v : specific_vals) {
v.rename();
}
}
};
std::array<per_scheduling_group, max_scheduling_groups()> per_scheduling_group_data;
std::map<unsigned long, scheduling_group_key_config> scheduling_group_key_configs;
std::map<unsigned long, cfg_ptr> scheduling_group_key_configs;
};

#ifdef SEASTAR_BUILD_SHARED_LIBS
Expand Down Expand Up @@ -78,12 +127,12 @@ template<typename T>
T* scheduling_group_get_specific_ptr(scheduling_group sg, scheduling_group_key key) noexcept {
auto& data = internal::get_scheduling_group_specific_thread_local_data();
#ifdef SEASTAR_DEBUG
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index);
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index);
#endif
auto sg_id = internal::scheduling_group_index(sg);
if (__builtin_expect(sg_id < data.per_scheduling_group_data.size() &&
data.per_scheduling_group_data[sg_id].queue_is_initialized, true)) {
return reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]);
return reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get());
}
return nullptr;
}
Expand Down Expand Up @@ -123,9 +172,9 @@ T& scheduling_group_get_specific(scheduling_group_key key) noexcept {
// return a reference to an element whose queue_is_initialized is
// false.
auto& data = internal::get_scheduling_group_specific_thread_local_data();
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()].type_index);
assert(std::type_index(typeid(T)) == data.scheduling_group_key_configs[key.id()]->type_index);
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
return *reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()]);
return *reinterpret_cast<T*>(data.per_scheduling_group_data[sg_id].specific_vals[key.id()].get());
}

/**
Expand Down Expand Up @@ -155,7 +204,7 @@ map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer,
auto wrapped_mapper = [key, mapper] (per_scheduling_group& psg) {
auto id = internal::scheduling_group_key_id(key);
return make_ready_future<typename function_traits<Mapper>::return_type>
(mapper(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id])));
(mapper(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id].get())));
};

return map_reduce(
Expand Down Expand Up @@ -188,7 +237,7 @@ reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, schedulin

auto mapper = [key] (per_scheduling_group& psg) {
auto id = internal::scheduling_group_key_id(key);
return make_ready_future<SpecificValType>(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id]));
return make_ready_future<SpecificValType>(*reinterpret_cast<SpecificValType*>(psg.specific_vals[id].get()));
};

return map_reduce(
Expand Down
98 changes: 37 additions & 61 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1066,23 +1066,12 @@ reactor::~reactor() {
eraser(_expired_timers);
eraser(_expired_lowres_timers);
eraser(_expired_manual_timers);
auto& sg_data = _scheduling_group_specific_data;
for (auto&& tq : _task_queues) {
if (tq) {
auto& this_sg = sg_data.per_scheduling_group_data[tq->_id];
// The following line will preserve the convention that constructor and destructor functions
// for the per sg values are called in the context of the containing scheduling group.
*internal::current_scheduling_group_ptr() = scheduling_group(tq->_id);
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
void* val = this_sg.specific_vals[key_id];
if (val) {
if (cfg.destructor) {
cfg.destructor(val);
}
free(val);
this_sg.specific_vals[key_id] = nullptr;
}
}
get_sg_data(tq->_id).specific_vals.clear();
}
}
}
Expand Down Expand Up @@ -4896,81 +4885,81 @@ deallocate_scheduling_group_id(unsigned id) noexcept {
}

static
void*
allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id, const scheduling_group_key_config& cfg) {
void* val = aligned_alloc(cfg.alignment, cfg.allocation_size);
if (!val) {
std::abort();
}
if (cfg.constructor) {
cfg.constructor(val);
internal::scheduling_group_specific_thread_local_data::specific_val
allocate_scheduling_group_specific_data(scheduling_group sg, unsigned long key_id, const lw_shared_ptr<scheduling_group_key_config>& cfg) {
using val_ptr = internal::scheduling_group_specific_thread_local_data::val_ptr;
using specific_val = internal::scheduling_group_specific_thread_local_data::specific_val;

val_ptr valp(aligned_alloc(cfg->alignment, cfg->allocation_size), &free);
if (!valp) {
throw std::runtime_error("memory allocation failed");
}
return val;
return specific_val(std::move(valp), cfg);
}

future<>
reactor::rename_scheduling_group_specific_data(scheduling_group sg) {
return with_shared(_scheduling_group_keys_mutex, [this, sg] {
return with_scheduling_group(sg, [this, sg] {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
if (cfg.rename) {
(cfg.rename)(this_sg.specific_vals[key_id]);
}
}
get_sg_data(sg).rename();
});
});
}

future<>
reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, sstring shortname, float shares) {
return with_shared(_scheduling_group_keys_mutex, [this, sg, name = std::move(name), shortname = std::move(shortname), shares] {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
this_sg.queue_is_initialized = true;
_task_queues.resize(std::max<size_t>(_task_queues.size(), sg._id + 1));
_task_queues[sg._id] = std::make_unique<task_queue>(sg._id, name, shortname, shares);

return with_scheduling_group(sg, [this, sg, &sg_data] () {
using val_vector = decltype(_scheduling_group_specific_data.per_scheduling_group_data[sg._id].specific_vals);

return with_scheduling_group(sg, [this, sg] () {
auto& sg_data = _scheduling_group_specific_data;

val_vector vals;
vals.reserve(sg_data.scheduling_group_key_configs.size());
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfg);
vals.resize(std::max<size_t>(vals.size(), key_id+1));
vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, cfg);
}
return vals;
}).then([this, sg] (val_vector vals) {
auto& this_sg = get_sg_data(sg);
std::swap(this_sg.specific_vals, vals);
this_sg.queue_is_initialized = true;
});
});
}

future<>
reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg) {
return with_lock(_scheduling_group_keys_mutex, [this, key, cfg] {
auto& sg_data = _scheduling_group_specific_data;
auto key_id = internal::scheduling_group_key_id(key);
sg_data.scheduling_group_key_configs[key_id] = cfg;
return parallel_for_each(_task_queues, [this, cfg, key_id] (std::unique_ptr<task_queue>& tq) {
auto cfgp = make_lw_shared<scheduling_group_key_config>(std::move(cfg));

return parallel_for_each(_task_queues, [this, key_id, cfgp] (std::unique_ptr<task_queue>& tq) {
if (tq) {
scheduling_group sg = scheduling_group(tq->_id);
if (tq.get() == _at_destroy_tasks) {
// fake the group by assuming it here
auto curr = current_scheduling_group();
auto cleanup = defer([curr] () noexcept { *internal::current_scheduling_group_ptr() = curr; });
*internal::current_scheduling_group_ptr() = sg;
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
auto& this_sg = get_sg_data(sg);
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, sg_data.scheduling_group_key_configs[key_id]);
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, std::move(cfgp));
} else {
return with_scheduling_group(sg, [this, key_id, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
return with_scheduling_group(sg, [this, key_id, sg, cfgp = std::move(cfgp)] () {
auto& this_sg = get_sg_data(sg);
this_sg.specific_vals.resize(std::max<size_t>(this_sg.specific_vals.size(), key_id+1));
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, sg_data.scheduling_group_key_configs[key_id]);
this_sg.specific_vals[key_id] = allocate_scheduling_group_specific_data(sg, key_id, std::move(cfgp));
});
}
}
return make_ready_future();
}).then([this, key_id, cfgp] () {
_scheduling_group_specific_data.scheduling_group_key_configs[key_id] = std::move(cfgp);
});
});
}
Expand All @@ -4981,22 +4970,9 @@ reactor::destroy_scheduling_group(scheduling_group sg) noexcept {
on_fatal_internal_error(seastar_logger, format("Invalid scheduling_group {}", sg._id));
}
return with_scheduling_group(sg, [this, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
for (const auto& [key_id, cfg] : sg_data.scheduling_group_key_configs) {
void* val = this_sg.specific_vals[key_id];
if (val) {
if (cfg.destructor) {
cfg.destructor(val);
}
free(val);
this_sg.specific_vals[key_id] = nullptr;
}
}
get_sg_data(sg).specific_vals.clear();
}).then( [this, sg] () {
auto& sg_data = _scheduling_group_specific_data;
auto& this_sg = sg_data.per_scheduling_group_data[sg._id];
this_sg.queue_is_initialized = false;
get_sg_data(sg).queue_is_initialized = false;
_task_queues[sg._id].reset();
});

Expand Down
69 changes: 69 additions & 0 deletions tests/unit/scheduling_group_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,75 @@ SEASTAR_THREAD_TEST_CASE(sg_create_and_key_create_in_parallel) {
}
}

SEASTAR_THREAD_TEST_CASE(sg_key_constructor_exception_when_creating_new_key) {
using ivec = std::vector<int>;
const int num_scheduling_groups = 4;

scheduling_group_key_config key1_conf = make_scheduling_group_key_config<int>();
scheduling_group_key key1 = scheduling_group_key_create(key1_conf).get();

struct thrower {
thrower() {
throw std::runtime_error("constructor failed");
}
~thrower() {
// Shouldn't get here because the constructor shouldn't succeed
BOOST_ASSERT(false);
}
};
scheduling_group_key_config thrower_conf = make_scheduling_group_key_config<thrower>();
BOOST_REQUIRE_THROW(scheduling_group_key_create(thrower_conf).get(), std::runtime_error);

// check we can continue after the failure

std::vector<scheduling_group> sgs;
for (int i = 0; i < num_scheduling_groups; i++) {
sgs.push_back(create_scheduling_group(format("sg{}", i).c_str(), 100).get());
}

const auto destroy_scheduling_groups = defer([&sgs] () noexcept {
for (scheduling_group sg : sgs) {
destroy_scheduling_group(sg).get();
}
});

scheduling_group_key_config key2_conf = make_scheduling_group_key_config<ivec>();
scheduling_group_key key2 = scheduling_group_key_create(key2_conf).get();

smp::invoke_on_all([key1, key2, &sgs] () {
int factor = this_shard_id() + 1;
for (int i=0; i < num_scheduling_groups; i++) {
sgs[i].get_specific<int>(key1) = (i + 1) * factor;
sgs[i].get_specific<ivec>(key2).push_back((i + 1) * factor);
}

for (int i=0; i < num_scheduling_groups; i++) {
BOOST_REQUIRE_EQUAL(sgs[i].get_specific<int>(key1) = (i + 1) * factor, (i + 1) * factor);
BOOST_REQUIRE_EQUAL(sgs[i].get_specific<ivec>(key2)[0], (i + 1) * factor);
}

}).get();

smp::invoke_on_all([key1, key2] () {
return reduce_scheduling_group_specific<int>(std::plus<int>(), int(0), key1).then([] (int sum) {
int factor = this_shard_id() + 1;
int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2;
BOOST_REQUIRE_EQUAL(expected_sum, sum);
}). then([key2] {
auto ivec_to_int = [] (ivec& v) {
return v.size() ? v[0] : 0;
};

return map_reduce_scheduling_group_specific<ivec>(ivec_to_int, std::plus<int>(), int(0), key2).then([] (int sum) {
int factor = this_shard_id() + 1;
int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2;
BOOST_REQUIRE_EQUAL(expected_sum, sum);
});

});
}).get();
}

SEASTAR_THREAD_TEST_CASE(sg_create_with_destroy_tasks) {
struct nada{};

Expand Down

0 comments on commit cd957c2

Please sign in to comment.