Skip to content

Commit

Permalink
Remove grpc dependency; not used currently.
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Sep 20, 2023
1 parent f1bc99f commit d724b86
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 31 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class IOMgrConan(ConanFile):
'shared': False,
'fPIC': True,
'coverage': False,
'grpc_support': True,
'grpc_support': False,
'sanitize': False,
'spdk': True,
'testing': 'epoll_mode',
Expand Down
4 changes: 2 additions & 2 deletions prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ echo -n "dpdk."
conan export 3rd_party/dpdk
echo -n "fio."
conan export 3rd_party/fio
echo -n "gprc_internal."
conan export 3rd_party/grpc_internal
#echo -n "gprc_internal."
#conan export 3rd_party/grpc_internal
echo -n "spdk."
conan export 3rd_party/spdk

Expand Down
25 changes: 25 additions & 0 deletions src/include/iomgr/iomgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <semver200.h>
#include <boost/fiber/all.hpp>
#include <folly/Executor.h>
#include <sisl/fds/bitword.hpp>
#include <sisl/fds/buffer.hpp>
#include <sisl/fds/id_reserver.hpp>
Expand All @@ -44,6 +45,25 @@
#include <iomgr/io_device.hpp>
#include <iomgr/fiber_lib.hpp>

namespace folly {

/**
* @class folly::IOManagerExecutor
*
* A folly::Executor for which to call `.via()` with when returning a folly::SemiFuture
* for runtime erasure.
*
**/
class IOManagerExecutor : public Executor {
iomgr::io_fiber_t _fiber;

public:
IOManagerExecutor(iomgr::io_fiber_t fiber) : _fiber(fiber) {}
void add(Func fn) override;
};

} // namespace folly

namespace iomgr {

struct timer_info;
Expand Down Expand Up @@ -99,6 +119,8 @@ class IOManager {
friend class IOManagerSpdkImpl;
friend class IOManagerEpollImpl;

friend void folly::IOManagerExecutor::add(folly::Func);

static IOManager& instance() {
static IOManager inst;
return inst;
Expand Down Expand Up @@ -253,6 +275,8 @@ class IOManager {
}
}

folly::IOManagerExecutor* fiberExecutor(io_fiber_t fiber);

///////////////////////////// Access related methods /////////////////////////////
GenericIOInterface* generic_interface() { return m_default_general_iface.get(); }
GrpcInterface* grpc_interface() { return m_default_grpc_iface.get(); }
Expand Down Expand Up @@ -371,6 +395,7 @@ class IOManager {

std::shared_mutex m_iface_list_mtx;
std::vector< std::shared_ptr< IOInterface > > m_iface_list;
std::map< io_fiber_t, folly::IOManagerExecutor > m_fiber_executors;
std::vector< std::shared_ptr< DriveInterface > > m_drive_ifaces;

std::shared_ptr< GenericIOInterface > m_default_general_iface;
Expand Down
20 changes: 20 additions & 0 deletions src/lib/iomgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ void IOManager::stop() {
// m_default_grpc_iface.reset();
m_drive_ifaces.clear();
m_iface_list.clear();
m_fiber_executors.clear();
} catch (const std::exception& e) { LOGCRITICAL_AND_FLUSH("Caught exception {} during clear lists", e.what()); }
assert(get_state() == iomgr_state::stopped);

Expand Down Expand Up @@ -314,6 +315,11 @@ void IOManager::foreach_interface(const interface_cb_t& iface_cb) {
}
}

folly::IOManagerExecutor* IOManager::fiberExecutor(io_fiber_t fiber) {
std::shared_lock lg(m_iface_list_mtx);
return &m_fiber_executors.at(fiber);
}

void IOManager::_run_io_loop(int iomgr_slot_num, loop_type_t loop_type, uint32_t num_fibers, const std::string& name,
const iodev_selector_t& iodev_selector, thread_state_notifier_t&& addln_notifier) {
loop_type_t ltype = loop_type;
Expand All @@ -336,6 +342,11 @@ void IOManager::reactor_started(shared< IOReactor > reactor) {
m_yet_to_stop_nreactors.increment();
if (reactor->is_worker()) {
m_worker_reactors[reactor->m_worker_slot_num] = reactor;
for (auto const& fiber : reactor->sync_io_capable_fibers()) {
auto lg = std::scoped_lock(m_iface_list_mtx);
auto [_, happened] = m_fiber_executors.emplace(fiber, folly::IOManagerExecutor(fiber));
RELEASE_ASSERT(happened, "Failed to Bind folly::Executor!");
}
reactor->notify_thread_state(true);

// All iomgr created reactors are initialized, move iomgr to sys init (next phase of start)
Expand Down Expand Up @@ -374,6 +385,15 @@ int IOManager::run_on_forget(io_fiber_t fiber, spdk_msg_signature_t fn, void* co
return 1;
}

} // namespace iomgr

namespace folly {
void IOManagerExecutor::add(Func fn) {
iomanager.send_msg(_fiber, iomgr::iomgr_msg::create(std::move(fn).asStdFunction()));
}
} // namespace folly

namespace iomgr {
int IOManager::send_msg(io_fiber_t fiber, iomgr_msg* msg) {
int ret{0};
if (fiber->spdk_thr) {
Expand Down
46 changes: 18 additions & 28 deletions src/test/test_fiber_shared_mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ class SharedMutexTest : public testing::Test {
iomgr::FiberManagerLib::shared_mutex m_cb_mtx;
std::vector< iomgr::io_fiber_t > m_fibers;
uint64_t m_count_per_fiber{0};
std::mutex m_test_done_mtx;
std::condition_variable m_test_done_cv;
uint32_t m_test_count{0};
std::atomic_uint32_t m_test_count{0};

protected:
void SetUp() override {
Expand Down Expand Up @@ -88,10 +86,7 @@ class SharedMutexTest : public testing::Test {
}

LOGINFO("Fiber completed {} of exclusive locks", m_count_per_fiber);
{
std::unique_lock lg(m_test_done_mtx);
if (--m_test_count == 0) { m_test_done_cv.notify_one(); }
}
--m_test_count;
}

void all_reader() {
Expand All @@ -100,10 +95,7 @@ class SharedMutexTest : public testing::Test {
}

LOGINFO("Fiber completed {} of shared locks", m_count_per_fiber);
{
std::unique_lock lg(m_test_done_mtx);
if (--m_test_count == 0) { m_test_done_cv.notify_one(); }
}
--m_test_count;
}

void random_reader_writer() {
Expand All @@ -124,10 +116,7 @@ class SharedMutexTest : public testing::Test {
}

LOGINFO("Fiber completed shared_locks={} exclusive_locks={}", read_count, write_count);
{
std::unique_lock lg(m_test_done_mtx);
if (--m_test_count == 0) { m_test_done_cv.notify_one(); }
}
--m_test_count;
}

void write_once() {
Expand All @@ -145,26 +134,27 @@ class SharedMutexTest : public testing::Test {
};

TEST_F(SharedMutexTest, single_writer_multiple_readers) {
iomanager.run_on_forget(m_fibers[0], [this]() { all_writer(); });
auto e = folly::QueuedImmediateExecutor();
auto calls = std::vector< folly::SemiFuture< folly::Unit > >();
calls.push_back(
folly::makeSemiFuture().via(iomanager.fiberExecutor(m_fibers[0])).thenValue([this](auto) { all_writer(); }));
for (auto it = m_fibers.begin() + 1; it < m_fibers.end(); ++it) {
iomanager.run_on_forget(*it, [this]() { all_reader(); });
}

{
std::unique_lock< std::mutex > lk(m_test_done_mtx);
m_test_done_cv.wait(lk, [&]() { return m_test_count == 0; });
calls.push_back(
folly::makeSemiFuture().via(iomanager.fiberExecutor(*it)).thenValue([this](auto) { all_reader(); }));
}
folly::collectAll(calls).via(&e).get();
EXPECT_EQ(0, m_test_count.load());
}

TEST_F(SharedMutexTest, random_reader_writers) {
auto e = folly::QueuedImmediateExecutor();
auto calls = std::vector< folly::SemiFuture< folly::Unit > >();
for (const auto& f : m_fibers) {
iomanager.run_on_forget(f, [this]() { random_reader_writer(); });
}

{
std::unique_lock< std::mutex > lk(m_test_done_mtx);
m_test_done_cv.wait(lk, [&]() { return m_test_count == 0; });
calls.push_back(
folly::makeSemiFuture().via(iomanager.fiberExecutor(f)).thenValue([this](auto) { all_reader(); }));
}
folly::collectAll(calls).via(&e).get();
EXPECT_EQ(0, m_test_count.load());
}

int main(int argc, char* argv[]) {
Expand Down

0 comments on commit d724b86

Please sign in to comment.