Skip to content

Commit

Permalink
Merge pull request #55 from cajun-rat/api-queue-refactor
Browse files Browse the repository at this point in the history
RFC: ApiQueue refactor
  • Loading branch information
cajun-rat authored Feb 23, 2022
2 parents 4249e9f + 6daaa09 commit 6f75976
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 32 deletions.
23 changes: 12 additions & 11 deletions src/libaktualizr/primary/aktualizr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
#include "utilities/timer.h"

using std::make_shared;
using std::move;
using std::shared_ptr;

Aktualizr::Aktualizr(const Config &config)
: Aktualizr(config, INvStorage::newStorage(config.storage), std::make_shared<HttpClient>()) {}

Aktualizr::Aktualizr(Config config, std::shared_ptr<INvStorage> storage_in,
const std::shared_ptr<HttpInterface> &http_in)
: config_{std::move(config)}, sig_{new event::Channel()}, api_queue_(new api::CommandQueue()) {
: config_{move(config)}, sig_{new event::Channel()}, api_queue_{new api::CommandQueue()} {
if (sodium_init() == -1) { // Note that sodium_init doesn't require a matching 'sodium_deinit'
throw std::runtime_error("Unable to initialize libsodium");
}

storage_ = std::move(storage_in);
storage_ = move(storage_in);
storage_->importData(config_.import);

uptane_client_ = std::make_shared<SotaUptaneClient>(config_, storage_, http_in, sig_);
Expand Down Expand Up @@ -125,7 +126,7 @@ std::vector<SecondaryInfo> Aktualizr::GetSecondaries() const {

std::future<result::CampaignCheck> Aktualizr::CampaignCheck() {
std::function<result::CampaignCheck()> task([this] { return uptane_client_->campaignCheck(); });
return api_queue_->enqueue(task);
return api_queue_->enqueue(move(task));
}

std::future<void> Aktualizr::CampaignControl(const std::string &campaign_id, campaign::Cmd cmd) {
Expand All @@ -144,29 +145,29 @@ std::future<void> Aktualizr::CampaignControl(const std::string &campaign_id, cam
break;
}
});
return api_queue_->enqueue(task);
return api_queue_->enqueue(move(task));
}

void Aktualizr::SetCustomHardwareInfo(Json::Value hwinfo) { uptane_client_->setCustomHardwareInfo(std::move(hwinfo)); }
void Aktualizr::SetCustomHardwareInfo(Json::Value hwinfo) { uptane_client_->setCustomHardwareInfo(move(hwinfo)); }
std::future<void> Aktualizr::SendDeviceData() {
std::function<void()> task([this] { uptane_client_->sendDeviceData(); });
return api_queue_->enqueue(task);
return api_queue_->enqueue(move(task));
}

std::future<result::UpdateCheck> Aktualizr::CheckUpdates() {
std::function<result::UpdateCheck()> task([this] { return uptane_client_->fetchMeta(); });
return api_queue_->enqueue(task);
return api_queue_->enqueue(move(task));
}

std::future<result::Download> Aktualizr::Download(const std::vector<Uptane::Target> &updates) {
std::function<result::Download(const api::FlowControlToken *)> task(
[this, updates](const api::FlowControlToken *token) { return uptane_client_->downloadImages(updates, token); });
return api_queue_->enqueue(task);
return api_queue_->enqueue(move(task));
}

std::future<result::Install> Aktualizr::Install(const std::vector<Uptane::Target> &updates) {
std::function<result::Install()> task([this, updates] { return uptane_client_->uptaneInstall(updates); });
return api_queue_->enqueue(task);
return api_queue_->enqueue(move(task));
}

bool Aktualizr::SetInstallationRawReport(const std::string &custom_raw_report) {
Expand All @@ -175,7 +176,7 @@ bool Aktualizr::SetInstallationRawReport(const std::string &custom_raw_report) {

std::future<bool> Aktualizr::SendManifest(const Json::Value &custom) {
std::function<bool()> task([this, custom]() { return uptane_client_->putManifest(custom); });
return api_queue_->enqueue(task);
return api_queue_->enqueue(move(task));
}

result::Pause Aktualizr::Pause() {
Expand Down Expand Up @@ -219,7 +220,7 @@ Aktualizr::InstallationLog Aktualizr::GetInstallationLog() {
std::vector<Uptane::Target> log;
storage_->loadInstallationLog(serial.ToString(), &log, true);

ilog.emplace_back(Aktualizr::InstallationLogEntry{serial, std::move(log)});
ilog.emplace_back(Aktualizr::InstallationLogEntry{serial, move(log)});
}

return ilog;
Expand Down
1 change: 1 addition & 0 deletions src/libaktualizr/utilities/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ set_property(SOURCE aktualizr_version.cc PROPERTY COMPILE_DEFINITIONS AKTUALIZR_

add_library(utilities OBJECT ${SOURCES})

add_aktualizr_test(NAME api_queue SOURCES api_queue_test.cc)
add_aktualizr_test(NAME dequeue_buffer SOURCES dequeue_buffer_test.cc)
add_aktualizr_test(NAME timer SOURCES timer_test.cc)
add_aktualizr_test(NAME types SOURCES types_test.cc)
Expand Down
54 changes: 54 additions & 0 deletions src/libaktualizr/utilities/api_queue_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#include <gtest/gtest.h>

#include <chrono>
#include <string>
#include "utilities/apiqueue.h"

using std::cout;
using std::future;
using std::future_status;

class CheckLifetime {
public:
CheckLifetime() { cout << "ctor\n"; }
~CheckLifetime() {
valid = 999;
cout << "dtor\n";
}
CheckLifetime(const CheckLifetime& other) {
(void)other;
cout << "copy-ctor\n";
}
CheckLifetime& operator=(const CheckLifetime&) = delete;
CheckLifetime& operator=(CheckLifetime&&) = delete;
CheckLifetime(CheckLifetime&&) = delete;

int valid{100};
};

TEST(ApiQueue, Simple) {
api::CommandQueue dut;
future<int> result;
{
CheckLifetime checkLifetime;
std::function<int()> task([checkLifetime] {
cout << "Running task..." << checkLifetime.valid << "\n";
return checkLifetime.valid;
});
result = dut.enqueue(std::move(task));
cout << "Leaving scope..";
}
EXPECT_EQ(result.wait_for(std::chrono::milliseconds(100)), future_status::timeout);

dut.run();
// Include a timeout to avoid a failing test handing forever
ASSERT_EQ(result.wait_for(std::chrono::seconds(10)), future_status::ready);
EXPECT_EQ(result.get(), 100);
}

#ifndef __NO_MAIN__
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#endif
14 changes: 12 additions & 2 deletions src/libaktualizr/utilities/apiqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ void CommandQueue::run() {
std::lock_guard<std::mutex> g(thread_m_);
if (!thread_.joinable()) {
thread_ = std::thread([this] {
Context ctx{.flow_control = &token_};
std::unique_lock<std::mutex> lock(m_);
for (;;) {
cv_.wait(lock, [this] { return (!queue_.empty() && !paused_) || shutdown_; });
Expand All @@ -66,7 +67,7 @@ void CommandQueue::run() {
auto task = std::move(queue_.front());
queue_.pop();
lock.unlock();
task();
task->PerformTask(&ctx);
lock.lock();
}
});
Expand Down Expand Up @@ -101,7 +102,7 @@ void CommandQueue::abort(bool restart_thread) {
{
// Flush the queue and reset to initial state
std::lock_guard<std::mutex> g(m_);
std::queue<std::packaged_task<void()>>().swap(queue_);
std::queue<ICommand::Ptr>().swap(queue_);
token_.reset();
shutdown_ = false;
}
Expand All @@ -110,4 +111,13 @@ void CommandQueue::abort(bool restart_thread) {
run();
}
}

void CommandQueue::enqueue(ICommand::Ptr&& task) {
{
std::lock_guard<std::mutex> lock(m_);
queue_.push(std::move(task));
}
cv_.notify_all();
}

} // namespace api
112 changes: 93 additions & 19 deletions src/libaktualizr/utilities/apiqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,88 @@ class FlowControlToken {
mutable std::condition_variable cv_;
};

struct Context {
api::FlowControlToken* flow_control;
};

class ICommand {
public:
using Ptr = std::shared_ptr<ICommand>;
ICommand() = default;
virtual ~ICommand() = default;
// Non-movable-non-copyable
ICommand(const ICommand&) = delete;
ICommand(ICommand&&) = delete;
ICommand& operator=(const ICommand&) = delete;
ICommand& operator=(ICommand&&) = delete;

virtual void PerformTask(Context* ctx) = 0;
};

template <class T>
class CommandBase : public ICommand {
public:
void PerformTask(Context* ctx) override {
try {
result_.set_value(TaskImplementation(ctx));
} catch (...) {
result_.set_exception(std::current_exception());
}
}

std::future<T> GetFuture() { return result_.get_future(); }

protected:
virtual T TaskImplementation(Context*) = 0;

private:
std::promise<T> result_;
};

template <>
class CommandBase<void> : public ICommand {
public:
void PerformTask(Context* ctx) override {
try {
TaskImplementation(ctx);
result_.set_value();
} catch (...) {
result_.set_exception(std::current_exception());
}
}

std::future<void> GetFuture() { return result_.get_future(); }

protected:
virtual void TaskImplementation(Context*) = 0;

private:
std::promise<void> result_;
};

template <class T>
class Command : public CommandBase<T> {
public:
explicit Command(std::function<T()>&& func) : f_{move(func)} {}
T TaskImplementation(Context* ctx) override {
(void)ctx;
return f_();
}

private:
std::function<T()> f_;
};

template <class T>
class CommandFlowControl : public CommandBase<T> {
public:
explicit CommandFlowControl(std::function<T(const api::FlowControlToken*)>&& func) : f_{move(func)} {}
T TaskImplementation(Context* ctx) override { return f_(ctx->flow_control); }

private:
std::function<T(const api::FlowControlToken*)> f_;
};

class CommandQueue {
public:
CommandQueue() = default;
Expand All @@ -68,37 +150,29 @@ class CommandQueue {
void abort(bool restart_thread = true);

template <class R>
std::future<R> enqueue(const std::function<R()>& f) {
std::packaged_task<R()> task(f);
auto r = task.get_future();
{
std::lock_guard<std::mutex> lock(m_);
queue_.push(std::packaged_task<void()>(std::move(task)));
}
cv_.notify_all();
return r;
std::future<R> enqueue(std::function<R()>&& function) {
auto task = std::make_shared<Command<R>>(std::move(function));
enqueue(task);
return task->GetFuture();
}

template <class R>
std::future<R> enqueue(const std::function<R(const api::FlowControlToken*)>& f) {
std::packaged_task<R()> task(std::bind(f, &token_));
auto r = task.get_future();
{
std::lock_guard<std::mutex> lock(m_);
queue_.push(std::packaged_task<void()>(std::move(task)));
}
cv_.notify_all();
return r;
std::future<R> enqueue(std::function<R(const api::FlowControlToken*)>&& function) {
auto task = std::make_shared<CommandFlowControl<R>>(std::move(function));
enqueue(task);
return task->GetFuture();
}

void enqueue(ICommand::Ptr&& task);

private:
std::atomic_bool shutdown_{false};
std::atomic_bool paused_{false};

std::thread thread_;
std::mutex thread_m_;

std::queue<std::packaged_task<void()>> queue_;
std::queue<ICommand::Ptr> queue_;
std::mutex m_;
std::condition_variable cv_;
FlowControlToken token_;
Expand Down

0 comments on commit 6f75976

Please sign in to comment.