From 813c1a68b30dfb34de9dae2ad8b44440fa5ddef7 Mon Sep 17 00:00:00 2001 From: Phil Wise Date: Mon, 24 Jan 2022 20:06:58 +0000 Subject: [PATCH 1/3] Add test for ApiQueue Signed-off-by: Phil Wise --- src/libaktualizr/utilities/CMakeLists.txt | 1 + src/libaktualizr/utilities/api_queue_test.cc | 54 ++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 src/libaktualizr/utilities/api_queue_test.cc diff --git a/src/libaktualizr/utilities/CMakeLists.txt b/src/libaktualizr/utilities/CMakeLists.txt index bd352a0d7..8bdec8b30 100644 --- a/src/libaktualizr/utilities/CMakeLists.txt +++ b/src/libaktualizr/utilities/CMakeLists.txt @@ -22,6 +22,7 @@ set_property(SOURCE aktualizr_version.cc PROPERTY COMPILE_DEFINITIONS AKTUALIZR_ add_library(utilities OBJECT ${SOURCES}) +add_aktualizr_test(NAME api_queue SOURCES api_queue_test.cc) add_aktualizr_test(NAME dequeue_buffer SOURCES dequeue_buffer_test.cc) add_aktualizr_test(NAME timer SOURCES timer_test.cc) add_aktualizr_test(NAME types SOURCES types_test.cc) diff --git a/src/libaktualizr/utilities/api_queue_test.cc b/src/libaktualizr/utilities/api_queue_test.cc new file mode 100644 index 000000000..c44e46c79 --- /dev/null +++ b/src/libaktualizr/utilities/api_queue_test.cc @@ -0,0 +1,54 @@ +#include + +#include +#include +#include "utilities/apiqueue.h" + +using std::cout; +using std::future; +using std::future_status; + +class CheckLifetime { + public: + CheckLifetime() { cout << "ctor\n"; } + ~CheckLifetime() { + valid = 999; + cout << "dtor\n"; + } + CheckLifetime(const CheckLifetime& other) { + (void)other; + cout << "copy-ctor\n"; + } + CheckLifetime& operator=(const CheckLifetime&) = delete; + CheckLifetime& operator=(CheckLifetime&&) = delete; + CheckLifetime(CheckLifetime&&) = delete; + + int valid{100}; +}; + +TEST(ApiQueue, Simple) { + api::CommandQueue dut; + future result; + { + CheckLifetime checkLifetime; + std::function task([checkLifetime] { + cout << "Running task..." << checkLifetime.valid << "\n"; + return checkLifetime.valid; + }); + result = dut.enqueue(std::move(task)); + cout << "Leaving scope.."; + } + EXPECT_EQ(result.wait_for(std::chrono::milliseconds(100)), future_status::timeout); + + dut.run(); + // Include a timeout to avoid a failing test handing forever + ASSERT_EQ(result.wait_for(std::chrono::seconds(10)), future_status::ready); + EXPECT_EQ(result.get(), 100); +} + +#ifndef __NO_MAIN__ +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} +#endif From 91daac983d88ba903a008e14beb6139dab7f6c48 Mon Sep 17 00:00:00 2001 From: Phil Wise Date: Mon, 24 Jan 2022 19:54:00 +0000 Subject: [PATCH 2/3] Tidy up: std::move tasks in Aktualizr Signed-off-by: Phil Wise --- src/libaktualizr/primary/aktualizr.cc | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/libaktualizr/primary/aktualizr.cc b/src/libaktualizr/primary/aktualizr.cc index f89b7f609..358a9c87d 100644 --- a/src/libaktualizr/primary/aktualizr.cc +++ b/src/libaktualizr/primary/aktualizr.cc @@ -9,6 +9,7 @@ #include "utilities/timer.h" using std::make_shared; +using std::move; using std::shared_ptr; Aktualizr::Aktualizr(const Config &config) @@ -16,12 +17,12 @@ Aktualizr::Aktualizr(const Config &config) Aktualizr::Aktualizr(Config config, std::shared_ptr storage_in, const std::shared_ptr &http_in) - : config_{std::move(config)}, sig_{new event::Channel()}, api_queue_(new api::CommandQueue()) { + : config_{move(config)}, sig_{new event::Channel()}, api_queue_{new api::CommandQueue()} { if (sodium_init() == -1) { // Note that sodium_init doesn't require a matching 'sodium_deinit' throw std::runtime_error("Unable to initialize libsodium"); } - storage_ = std::move(storage_in); + storage_ = move(storage_in); storage_->importData(config_.import); uptane_client_ = std::make_shared(config_, storage_, http_in, sig_); @@ -125,7 +126,7 @@ std::vector Aktualizr::GetSecondaries() const { std::future Aktualizr::CampaignCheck() { std::function task([this] { return uptane_client_->campaignCheck(); }); - return api_queue_->enqueue(task); + return api_queue_->enqueue(move(task)); } std::future Aktualizr::CampaignControl(const std::string &campaign_id, campaign::Cmd cmd) { @@ -144,29 +145,29 @@ std::future Aktualizr::CampaignControl(const std::string &campaign_id, cam break; } }); - return api_queue_->enqueue(task); + return api_queue_->enqueue(move(task)); } -void Aktualizr::SetCustomHardwareInfo(Json::Value hwinfo) { uptane_client_->setCustomHardwareInfo(std::move(hwinfo)); } +void Aktualizr::SetCustomHardwareInfo(Json::Value hwinfo) { uptane_client_->setCustomHardwareInfo(move(hwinfo)); } std::future Aktualizr::SendDeviceData() { std::function task([this] { uptane_client_->sendDeviceData(); }); - return api_queue_->enqueue(task); + return api_queue_->enqueue(move(task)); } std::future Aktualizr::CheckUpdates() { std::function task([this] { return uptane_client_->fetchMeta(); }); - return api_queue_->enqueue(task); + return api_queue_->enqueue(move(task)); } std::future Aktualizr::Download(const std::vector &updates) { std::function task( [this, updates](const api::FlowControlToken *token) { return uptane_client_->downloadImages(updates, token); }); - return api_queue_->enqueue(task); + return api_queue_->enqueue(move(task)); } std::future Aktualizr::Install(const std::vector &updates) { std::function task([this, updates] { return uptane_client_->uptaneInstall(updates); }); - return api_queue_->enqueue(task); + return api_queue_->enqueue(move(task)); } bool Aktualizr::SetInstallationRawReport(const std::string &custom_raw_report) { @@ -175,7 +176,7 @@ bool Aktualizr::SetInstallationRawReport(const std::string &custom_raw_report) { std::future Aktualizr::SendManifest(const Json::Value &custom) { std::function task([this, custom]() { return uptane_client_->putManifest(custom); }); - return api_queue_->enqueue(task); + return api_queue_->enqueue(move(task)); } result::Pause Aktualizr::Pause() { @@ -219,7 +220,7 @@ Aktualizr::InstallationLog Aktualizr::GetInstallationLog() { std::vector log; storage_->loadInstallationLog(serial.ToString(), &log, true); - ilog.emplace_back(Aktualizr::InstallationLogEntry{serial, std::move(log)}); + ilog.emplace_back(Aktualizr::InstallationLogEntry{serial, move(log)}); } return ilog; From 6daaa09390e4157ae7c72902cdf829b543645db8 Mon Sep 17 00:00:00 2001 From: Phil Wise Date: Mon, 24 Jan 2022 20:13:30 +0000 Subject: [PATCH 3/3] Rewrite ApiQueue as an explicit implementation This replaces an implementation based on a mix of std::bind and std::packaged_task with a concrete implementation. Advantages - Easier to add extra metadata to commands in the future - Less magic Disadvantages - More lines of code - Less obvious to those familiar with std::bind and friends in the standard library. Signed-off-by: Phil Wise --- src/libaktualizr/utilities/apiqueue.cc | 14 +++- src/libaktualizr/utilities/apiqueue.h | 112 ++++++++++++++++++++----- 2 files changed, 105 insertions(+), 21 deletions(-) diff --git a/src/libaktualizr/utilities/apiqueue.cc b/src/libaktualizr/utilities/apiqueue.cc index 0b1b56cdd..606044682 100644 --- a/src/libaktualizr/utilities/apiqueue.cc +++ b/src/libaktualizr/utilities/apiqueue.cc @@ -57,6 +57,7 @@ void CommandQueue::run() { std::lock_guard g(thread_m_); if (!thread_.joinable()) { thread_ = std::thread([this] { + Context ctx{.flow_control = &token_}; std::unique_lock lock(m_); for (;;) { cv_.wait(lock, [this] { return (!queue_.empty() && !paused_) || shutdown_; }); @@ -66,7 +67,7 @@ void CommandQueue::run() { auto task = std::move(queue_.front()); queue_.pop(); lock.unlock(); - task(); + task->PerformTask(&ctx); lock.lock(); } }); @@ -101,7 +102,7 @@ void CommandQueue::abort(bool restart_thread) { { // Flush the queue and reset to initial state std::lock_guard g(m_); - std::queue>().swap(queue_); + std::queue().swap(queue_); token_.reset(); shutdown_ = false; } @@ -110,4 +111,13 @@ void CommandQueue::abort(bool restart_thread) { run(); } } + +void CommandQueue::enqueue(ICommand::Ptr&& task) { + { + std::lock_guard lock(m_); + queue_.push(std::move(task)); + } + cv_.notify_all(); +} + } // namespace api diff --git a/src/libaktualizr/utilities/apiqueue.h b/src/libaktualizr/utilities/apiqueue.h index 4694cbbeb..cf4d79e11 100644 --- a/src/libaktualizr/utilities/apiqueue.h +++ b/src/libaktualizr/utilities/apiqueue.h @@ -54,6 +54,88 @@ class FlowControlToken { mutable std::condition_variable cv_; }; +struct Context { + api::FlowControlToken* flow_control; +}; + +class ICommand { + public: + using Ptr = std::shared_ptr; + ICommand() = default; + virtual ~ICommand() = default; + // Non-movable-non-copyable + ICommand(const ICommand&) = delete; + ICommand(ICommand&&) = delete; + ICommand& operator=(const ICommand&) = delete; + ICommand& operator=(ICommand&&) = delete; + + virtual void PerformTask(Context* ctx) = 0; +}; + +template +class CommandBase : public ICommand { + public: + void PerformTask(Context* ctx) override { + try { + result_.set_value(TaskImplementation(ctx)); + } catch (...) { + result_.set_exception(std::current_exception()); + } + } + + std::future GetFuture() { return result_.get_future(); } + + protected: + virtual T TaskImplementation(Context*) = 0; + + private: + std::promise result_; +}; + +template <> +class CommandBase : public ICommand { + public: + void PerformTask(Context* ctx) override { + try { + TaskImplementation(ctx); + result_.set_value(); + } catch (...) { + result_.set_exception(std::current_exception()); + } + } + + std::future GetFuture() { return result_.get_future(); } + + protected: + virtual void TaskImplementation(Context*) = 0; + + private: + std::promise result_; +}; + +template +class Command : public CommandBase { + public: + explicit Command(std::function&& func) : f_{move(func)} {} + T TaskImplementation(Context* ctx) override { + (void)ctx; + return f_(); + } + + private: + std::function f_; +}; + +template +class CommandFlowControl : public CommandBase { + public: + explicit CommandFlowControl(std::function&& func) : f_{move(func)} {} + T TaskImplementation(Context* ctx) override { return f_(ctx->flow_control); } + + private: + std::function f_; +}; + class CommandQueue { public: CommandQueue() = default; @@ -68,29 +150,21 @@ class CommandQueue { void abort(bool restart_thread = true); template - std::future enqueue(const std::function& f) { - std::packaged_task task(f); - auto r = task.get_future(); - { - std::lock_guard lock(m_); - queue_.push(std::packaged_task(std::move(task))); - } - cv_.notify_all(); - return r; + std::future enqueue(std::function&& function) { + auto task = std::make_shared>(std::move(function)); + enqueue(task); + return task->GetFuture(); } template - std::future enqueue(const std::function& f) { - std::packaged_task task(std::bind(f, &token_)); - auto r = task.get_future(); - { - std::lock_guard lock(m_); - queue_.push(std::packaged_task(std::move(task))); - } - cv_.notify_all(); - return r; + std::future enqueue(std::function&& function) { + auto task = std::make_shared>(std::move(function)); + enqueue(task); + return task->GetFuture(); } + void enqueue(ICommand::Ptr&& task); + private: std::atomic_bool shutdown_{false}; std::atomic_bool paused_{false}; @@ -98,7 +172,7 @@ class CommandQueue { std::thread thread_; std::mutex thread_m_; - std::queue> queue_; + std::queue queue_; std::mutex m_; std::condition_variable cv_; FlowControlToken token_;