Skip to content

Commit

Permalink
Rewrite ApiQueue as an explicit implementation
Browse files Browse the repository at this point in the history
This replaces an implementation based on a mix of std::bind and
std::packaged_task with a concrete implementation.

Advantages
 - Easier to add extra metadata to commands in the future
 - Less magic

Disadvantages
 - More lines of code
 - Less obvious to those familiar with std::bind and friends in the standard
   library.

Signed-off-by: Phil Wise <[email protected]>
  • Loading branch information
cajun-rat committed Feb 23, 2022
1 parent 91daac9 commit 6daaa09
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 21 deletions.
14 changes: 12 additions & 2 deletions src/libaktualizr/utilities/apiqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ void CommandQueue::run() {
std::lock_guard<std::mutex> g(thread_m_);
if (!thread_.joinable()) {
thread_ = std::thread([this] {
Context ctx{.flow_control = &token_};
std::unique_lock<std::mutex> lock(m_);
for (;;) {
cv_.wait(lock, [this] { return (!queue_.empty() && !paused_) || shutdown_; });
Expand All @@ -66,7 +67,7 @@ void CommandQueue::run() {
auto task = std::move(queue_.front());
queue_.pop();
lock.unlock();
task();
task->PerformTask(&ctx);
lock.lock();
}
});
Expand Down Expand Up @@ -101,7 +102,7 @@ void CommandQueue::abort(bool restart_thread) {
{
// Flush the queue and reset to initial state
std::lock_guard<std::mutex> g(m_);
std::queue<std::packaged_task<void()>>().swap(queue_);
std::queue<ICommand::Ptr>().swap(queue_);
token_.reset();
shutdown_ = false;
}
Expand All @@ -110,4 +111,13 @@ void CommandQueue::abort(bool restart_thread) {
run();
}
}

void CommandQueue::enqueue(ICommand::Ptr&& task) {
{
std::lock_guard<std::mutex> lock(m_);
queue_.push(std::move(task));
}
cv_.notify_all();
}

} // namespace api
112 changes: 93 additions & 19 deletions src/libaktualizr/utilities/apiqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,88 @@ class FlowControlToken {
mutable std::condition_variable cv_;
};

struct Context {
api::FlowControlToken* flow_control;
};

class ICommand {
public:
using Ptr = std::shared_ptr<ICommand>;
ICommand() = default;
virtual ~ICommand() = default;
// Non-movable-non-copyable
ICommand(const ICommand&) = delete;
ICommand(ICommand&&) = delete;
ICommand& operator=(const ICommand&) = delete;
ICommand& operator=(ICommand&&) = delete;

virtual void PerformTask(Context* ctx) = 0;
};

template <class T>
class CommandBase : public ICommand {
public:
void PerformTask(Context* ctx) override {
try {
result_.set_value(TaskImplementation(ctx));
} catch (...) {
result_.set_exception(std::current_exception());
}
}

std::future<T> GetFuture() { return result_.get_future(); }

protected:
virtual T TaskImplementation(Context*) = 0;

private:
std::promise<T> result_;
};

template <>
class CommandBase<void> : public ICommand {
public:
void PerformTask(Context* ctx) override {
try {
TaskImplementation(ctx);
result_.set_value();
} catch (...) {
result_.set_exception(std::current_exception());
}
}

std::future<void> GetFuture() { return result_.get_future(); }

protected:
virtual void TaskImplementation(Context*) = 0;

private:
std::promise<void> result_;
};

template <class T>
class Command : public CommandBase<T> {
public:
explicit Command(std::function<T()>&& func) : f_{move(func)} {}
T TaskImplementation(Context* ctx) override {
(void)ctx;
return f_();
}

private:
std::function<T()> f_;
};

template <class T>
class CommandFlowControl : public CommandBase<T> {
public:
explicit CommandFlowControl(std::function<T(const api::FlowControlToken*)>&& func) : f_{move(func)} {}
T TaskImplementation(Context* ctx) override { return f_(ctx->flow_control); }

private:
std::function<T(const api::FlowControlToken*)> f_;
};

class CommandQueue {
public:
CommandQueue() = default;
Expand All @@ -68,37 +150,29 @@ class CommandQueue {
void abort(bool restart_thread = true);

template <class R>
std::future<R> enqueue(const std::function<R()>& f) {
std::packaged_task<R()> task(f);
auto r = task.get_future();
{
std::lock_guard<std::mutex> lock(m_);
queue_.push(std::packaged_task<void()>(std::move(task)));
}
cv_.notify_all();
return r;
std::future<R> enqueue(std::function<R()>&& function) {
auto task = std::make_shared<Command<R>>(std::move(function));
enqueue(task);
return task->GetFuture();
}

template <class R>
std::future<R> enqueue(const std::function<R(const api::FlowControlToken*)>& f) {
std::packaged_task<R()> task(std::bind(f, &token_));
auto r = task.get_future();
{
std::lock_guard<std::mutex> lock(m_);
queue_.push(std::packaged_task<void()>(std::move(task)));
}
cv_.notify_all();
return r;
std::future<R> enqueue(std::function<R(const api::FlowControlToken*)>&& function) {
auto task = std::make_shared<CommandFlowControl<R>>(std::move(function));
enqueue(task);
return task->GetFuture();
}

void enqueue(ICommand::Ptr&& task);

private:
std::atomic_bool shutdown_{false};
std::atomic_bool paused_{false};

std::thread thread_;
std::mutex thread_m_;

std::queue<std::packaged_task<void()>> queue_;
std::queue<ICommand::Ptr> queue_;
std::mutex m_;
std::condition_variable cv_;
FlowControlToken token_;
Expand Down

0 comments on commit 6daaa09

Please sign in to comment.