Skip to content

Commit

Permalink
More proposals.
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Sep 12, 2023
1 parent ee64797 commit 6e917d8
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/include/iomgr/iomgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ class IOManager {
}
}

folly::IOManagerExecutor* fiberExecutor(io_fiber_t fiber);

///////////////////////////// Access related methods /////////////////////////////
GenericIOInterface* generic_interface() { return m_default_general_iface.get(); }
GrpcInterface* grpc_interface() { return m_default_grpc_iface.get(); }
Expand Down Expand Up @@ -393,6 +395,7 @@ class IOManager {

std::shared_mutex m_iface_list_mtx;
std::vector< std::shared_ptr< IOInterface > > m_iface_list;
std::map< io_fiber_t, folly::IOManagerExecutor > m_fiber_executors;
std::vector< std::shared_ptr< DriveInterface > > m_drive_ifaces;

std::shared_ptr< GenericIOInterface > m_default_general_iface;
Expand Down
11 changes: 11 additions & 0 deletions src/lib/iomgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ void IOManager::stop() {
// m_default_grpc_iface.reset();
m_drive_ifaces.clear();
m_iface_list.clear();
m_fiber_executors.clear();
} catch (const std::exception& e) { LOGCRITICAL_AND_FLUSH("Caught exception {} during clear lists", e.what()); }
assert(get_state() == iomgr_state::stopped);

Expand Down Expand Up @@ -314,6 +315,11 @@ void IOManager::foreach_interface(const interface_cb_t& iface_cb) {
}
}

folly::IOManagerExecutor* IOManager::fiberExecutor(io_fiber_t fiber) {
std::shared_lock lg(m_iface_list_mtx);
return &m_fiber_executors.at(fiber);
}

void IOManager::_run_io_loop(int iomgr_slot_num, loop_type_t loop_type, uint32_t num_fibers, const std::string& name,
const iodev_selector_t& iodev_selector, thread_state_notifier_t&& addln_notifier) {
loop_type_t ltype = loop_type;
Expand All @@ -336,6 +342,11 @@ void IOManager::reactor_started(shared< IOReactor > reactor) {
m_yet_to_stop_nreactors.increment();
if (reactor->is_worker()) {
m_worker_reactors[reactor->m_worker_slot_num] = reactor;
for (auto const& fiber : reactor->sync_io_capable_fibers()) {
auto lg = std::scoped_lock(m_iface_list_mtx);
auto [_, happened] = m_fiber_executors.emplace(fiber, folly::IOManagerExecutor(fiber));
RELEASE_ASSERT(happened, "Failed to Bind folly::Executor!");
}
reactor->notify_thread_state(true);

// All iomgr created reactors are initialized, move iomgr to sys init (next phase of start)
Expand Down
12 changes: 8 additions & 4 deletions src/test/test_fiber_shared_mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,19 @@ class SharedMutexTest : public testing::Test {
};

TEST_F(SharedMutexTest, single_writer_multiple_readers) {
auto e = folly::IOManagerExecutor(m_fibers[0]);
folly::makeSemiFuture().via(&e).thenValue([this](auto) { all_writer(); });
auto e = folly::QueuedImmediateExecutor();
auto calls = std::vector< folly::SemiFuture< folly::Unit > >();
calls.push_back(
folly::makeSemiFuture().via(iomanager.fiberExecutor(m_fibers[0])).thenValue([this](auto) { all_writer(); }));
for (auto it = m_fibers.begin() + 1; it < m_fibers.end(); ++it) {
iomanager.run_on_forget(*it, [this]() { all_reader(); });
calls.push_back(
folly::makeSemiFuture().via(iomanager.fiberExecutor(*it)).thenValue([this](auto) { all_reader(); }));
}
folly::collectAll(calls).via(&e).get();

{
std::unique_lock< std::mutex > lk(m_test_done_mtx);
m_test_done_cv.wait(lk, [&]() { return m_test_count == 0; });
EXPECT_EQ(0, m_test_count);
}
}

Expand Down

0 comments on commit 6e917d8

Please sign in to comment.