Skip to content

Commit

Permalink
Introduce finer-grained cancellation of operations
Browse files Browse the repository at this point in the history
Push the FlowControlToken (which allows aborts of in-progress operations
in the CommandQueue) further down the call stack.

This is one part of allowing offline and online updates to co-exist:
if an online update is in progress over a very slow GSM modem, then it
should be possible to plug in a USB stick, cancel the in-progress update
and start installing the new one inside a few seconds.
  • Loading branch information
cajun-rat committed Mar 12, 2024
1 parent c907237 commit 91ead67
Show file tree
Hide file tree
Showing 47 changed files with 507 additions and 230 deletions.
16 changes: 14 additions & 2 deletions include/libaktualizr/secondaryinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,20 @@ class SecondaryInterface {
virtual int32_t getRootVersion(bool director) const = 0;
virtual data::InstallationResult putRoot(const std::string& root, bool director) = 0;

virtual data::InstallationResult sendFirmware(const Uptane::Target& target) = 0;
virtual data::InstallationResult install(const Uptane::Target& target) = 0;
/**
* Send firmware to a device. This operation should be both idempotent and
* not commit to installing the new version. Where practical, the
* implementation should pre-flight the installation and report errors now,
* while the entire installation can be cleanly aborted.
* Failures reported later (during SecondaryInterface::install()) can leave
* a multi-ecu update partially applied.
*/
virtual data::InstallationResult sendFirmware(const Uptane::Target& target,
const api::FlowControlToken* flow_control) = 0;
/**
* Commit to installing an update.
*/
virtual data::InstallationResult install(const Uptane::Target& target, const api::FlowControlToken* flow_control) = 0;

protected:
SecondaryInterface(const SecondaryInterface&) = default;
Expand Down
2 changes: 2 additions & 0 deletions include/libaktualizr/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ struct ResultCode {
kNeedCompletion = 21,
// Customer specific
kCustomError = 22,
// The operation was explicitly cancelled, either by an offline update or explicit user choice
kOperationCancelled = 23,
// Unknown
kUnknown = -1,
};
Expand Down
2 changes: 1 addition & 1 deletion src/aktualizr_get/get.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ std::string aktualizrGet(Config &config, const std::string &url, const std::vect
auto client = std_::make_unique<HttpClient>(&headers);
KeyManager keys(storage, config.keymanagerConfig());
keys.copyCertsToCurl(*client);
auto resp = client->get(url, HttpInterface::kNoLimit);
auto resp = client->get(url, HttpInterface::kNoLimit, nullptr);
if (resp.http_status_code != 200) {
throw std::runtime_error("Unable to get " + url + ": HTTP_" + std::to_string(resp.http_status_code) + "\n" +
resp.body);
Expand Down
4 changes: 2 additions & 2 deletions src/aktualizr_secondary/aktualizr_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ data::InstallationResult AktualizrSecondary::verifyMetadata(const Uptane::Second
// 4. NOT SUPPORTED: Download and check the Snapshot metadata file from the Director repository.
// 5. Download and check the Targets metadata file from the Director repository.
try {
director_repo_.updateMeta(*storage_, metadata);
director_repo_.updateMeta(*storage_, metadata, nullptr);
} catch (const std::exception& e) {
LOG_ERROR << "Failed to update Director metadata: " << e.what();
return data::InstallationResult(data::ResultCode::Numeric::kVerificationFailed,
Expand All @@ -96,7 +96,7 @@ data::InstallationResult AktualizrSecondary::verifyMetadata(const Uptane::Second
// 8. Download and check the Snapshot metadata file from the Image repository.
// 9. Download and check the top-level Targets metadata file from the Image repository.
try {
image_repo_.updateMeta(*storage_, metadata);
image_repo_.updateMeta(*storage_, metadata, nullptr);
} catch (const std::exception& e) {
LOG_ERROR << "Failed to update Image repo metadata: " << e.what();
return data::InstallationResult(data::ResultCode::Numeric::kVerificationFailed,
Expand Down
14 changes: 8 additions & 6 deletions src/aktualizr_secondary/secondary_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -583,15 +583,16 @@ class SecondaryRpcCommon : public ::testing::Test {
verifyMetadata(secondary_.metadata());
}

result = ip_secondary_->sendFirmware(target);
// Cancelling not supported at the moment
result = ip_secondary_->sendFirmware(target, nullptr);
if (handler_version == HandlerVersion::kV2Failure) {
EXPECT_EQ(result.result_code, data::ResultCode::Numeric::kDownloadFailed);
EXPECT_EQ(result.description, secondary_.upload_data_failure);
} else {
EXPECT_TRUE(result.isSuccess());
}

result = ip_secondary_->install(target);
result = ip_secondary_->install(target, nullptr);
if (handler_version == HandlerVersion::kV2Failure) {
EXPECT_EQ(result.result_code, data::ResultCode::Numeric::kInstallFailed);
EXPECT_EQ(result.description, secondary_.installation_failure);
Expand All @@ -617,15 +618,16 @@ class SecondaryRpcCommon : public ::testing::Test {
verifyMetadata(secondary_.metadata());
}

result = ip_secondary_->sendFirmware(target);
// Cancelling not supported at the moment
result = ip_secondary_->sendFirmware(target, nullptr);
if (handler_version == HandlerVersion::kV2Failure) {
EXPECT_EQ(result.result_code, data::ResultCode::Numeric::kDownloadFailed);
EXPECT_EQ(result.description, secondary_.ostree_failure);
} else {
EXPECT_TRUE(result.isSuccess());
}

result = ip_secondary_->install(target);
result = ip_secondary_->install(target, nullptr);
if (handler_version == HandlerVersion::kV2Failure) {
EXPECT_EQ(result.result_code, data::ResultCode::Numeric::kInstallFailed);
EXPECT_EQ(result.description, secondary_.installation_failure);
Expand Down Expand Up @@ -810,8 +812,8 @@ TEST(SecondaryTcpServer, TestIpSecondaryIfSecondaryIsNotRunning) {
EXPECT_FALSE(ip_secondary->putRoot("director-root-v2", true).isSuccess());
EXPECT_FALSE(ip_secondary->putRoot("image-root-v2", false).isSuccess());
EXPECT_FALSE(ip_secondary->putMetadata(target).isSuccess());
EXPECT_FALSE(ip_secondary->sendFirmware(target).isSuccess());
EXPECT_FALSE(ip_secondary->install(target).isSuccess());
EXPECT_FALSE(ip_secondary->sendFirmware(target, nullptr).isSuccess());
EXPECT_FALSE(ip_secondary->install(target, nullptr).isSuccess());
}

/* This class returns a positive result for every message. The test cases verify
Expand Down
1 change: 1 addition & 0 deletions src/libaktualizr-posix/asn1/messages/ipuptane_message.asn1
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ IpUptane DEFINITIONS ::= BEGIN
generalError(19),
needCompletion(21),
customError(22),
operationCancelled(23),
unknown(-1),
...
}
Expand Down
35 changes: 24 additions & 11 deletions src/libaktualizr-posix/ipuptanesecondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "libaktualizr/secondary_provider.h"
#include "logging/logging.h"
#include "uptane/tuf.h"
#include "utilities/flow_control.h"
#include "utilities/utils.h"

namespace Uptane {
Expand Down Expand Up @@ -358,18 +359,23 @@ bool IpUptaneSecondary::ping() const {
return resp->present() == AKIpUptaneMes_PR_getInfoResp;
}

data::InstallationResult IpUptaneSecondary::sendFirmware(const Uptane::Target& target) {
data::InstallationResult send_result;
data::InstallationResult IpUptaneSecondary::sendFirmware(const Uptane::Target& target,
const api::FlowControlToken* flow_control) {
if (flow_control != nullptr && flow_control->hasAborted()) {
// TODO: Add an abort message to the IPUptane protocol and allow aborts
// during firmware transfer
return data::InstallationResult(data::ResultCode::Numeric::kOperationCancelled, "");
}

if (protocol_version == 2) {
send_result = sendFirmware_v2(target);
} else if (protocol_version == 1) {
send_result = sendFirmware_v1(target);
} else {
LOG_ERROR << "Unexpected protocol version: " << protocol_version;
send_result = data::InstallationResult(data::ResultCode::Numeric::kInternalError,
"Unexpected protocol version: " + std::to_string(protocol_version));
return sendFirmware_v2(target);
}
if (protocol_version == 1) {
return sendFirmware_v1(target);
}
return send_result;
LOG_ERROR << "Unexpected protocol version: " << protocol_version;
return data::InstallationResult(data::ResultCode::Numeric::kInternalError,
"Unexpected protocol version: " + std::to_string(protocol_version));
}

data::InstallationResult IpUptaneSecondary::sendFirmware_v1(const Uptane::Target& target) {
Expand Down Expand Up @@ -416,7 +422,14 @@ data::InstallationResult IpUptaneSecondary::sendFirmware_v2(const Uptane::Target
}
}

data::InstallationResult IpUptaneSecondary::install(const Uptane::Target& target) {
data::InstallationResult IpUptaneSecondary::install(const Uptane::Target& target,
const api::FlowControlToken* flow_control) {
if (flow_control != nullptr && flow_control->hasAborted()) {
// TODO: Add an abort message to the IPUptane protocol and allow aborts
// during installation
return data::InstallationResult(data::ResultCode::Numeric::kOperationCancelled, "");
}

data::InstallationResult install_result;
if (protocol_version == 2) {
install_result = install_v2(target);
Expand Down
5 changes: 3 additions & 2 deletions src/libaktualizr-posix/ipuptanesecondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ class IpUptaneSecondary : public SecondaryInterface {
data::InstallationResult putRoot(const std::string& root, bool director) override;
Manifest getManifest() const override;
bool ping() const override;
data::InstallationResult sendFirmware(const Uptane::Target& target) override;
data::InstallationResult install(const Uptane::Target& target) override;
data::InstallationResult sendFirmware(const Uptane::Target& target,
const api::FlowControlToken* flow_control) override;
data::InstallationResult install(const Uptane::Target& target, const api::FlowControlToken* flow_control) override;

private:
const std::pair<std::string, uint16_t>& getAddr() const { return addr_; }
Expand Down
33 changes: 32 additions & 1 deletion src/libaktualizr/http/httpclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,30 @@ static size_t writeString(void* contents, size_t size, size_t nmemb, void* userp
return size * nmemb;
}

static int ProgressHandler(void* clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
(void)dltotal;
(void)dlnow;
(void)ultotal;
(void)ulnow;
if (clientp == nullptr) {
LOG_ERROR << "ProgressHandler given null user pointer";
return 0; // Let the download continue
}

auto* token = static_cast<api::FlowControlToken*>(clientp);

if (!token->IsValid()) {
LOG_ERROR << "ProgressHandler given a thing that isn't a FlowControlToken";
return 0;
}

if (token->hasAborted()) {
// Abort download
return 1;
}
return 0;
}

HttpClient::HttpClient(const std::vector<std::string>* extra_headers) {
curl = curl_easy_init();
if (curl == nullptr) {
Expand Down Expand Up @@ -124,7 +148,7 @@ void HttpClient::setCerts(const std::string& ca, CryptoSource ca_source, const s
pkcs11_key = (pkey_source == CryptoSource::kPkcs11);
}

HttpResponse HttpClient::get(const std::string& url, int64_t maxsize) {
HttpResponse HttpClient::get(const std::string& url, int64_t maxsize, const api::FlowControlToken* flow_control) {
CURL* curl_get = Utils::curlDupHandleWrapper(curl, pkcs11_key);

curlEasySetoptWrapper(curl_get, CURLOPT_HTTPHEADER, headers);
Expand All @@ -138,6 +162,13 @@ HttpResponse HttpClient::get(const std::string& url, int64_t maxsize) {
curlEasySetoptWrapper(curl_get, CURLOPT_POSTFIELDS, "");
curlEasySetoptWrapper(curl_get, CURLOPT_URL, url.c_str());
curlEasySetoptWrapper(curl_get, CURLOPT_HTTPGET, 1L);
if (flow_control != nullptr) {
// Handle cancellation
curlEasySetoptWrapper(curl_get, CURLOPT_NOPROGRESS, 0);
curlEasySetoptWrapper(curl_get, CURLOPT_XFERINFOFUNCTION, ProgressHandler);
curlEasySetoptWrapper(curl_get, CURLOPT_XFERINFODATA, flow_control);
}

LOG_DEBUG << "GET " << url;
HttpResponse response = perform(curl_get, RETRY_TIMES, maxsize);
curl_easy_cleanup(curl_get);
Expand Down
2 changes: 1 addition & 1 deletion src/libaktualizr/http/httpclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HttpClient : public HttpInterface {
HttpClient(HttpClient &&) = default;
HttpClient &operator=(const HttpClient &) = delete;
HttpClient &operator=(HttpClient &&) = default;
HttpResponse get(const std::string &url, int64_t maxsize) override;
HttpResponse get(const std::string &url, int64_t maxsize, const api::FlowControlToken *flow_control) override;
HttpResponse post(const std::string &url, const std::string &content_type, const std::string &data) override;
HttpResponse post(const std::string &url, const Json::Value &data) override;
HttpResponse put(const std::string &url, const std::string &content_type, const std::string &data) override;
Expand Down
43 changes: 34 additions & 9 deletions src/libaktualizr/http/httpclient_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "json/json.h"

#include <chrono>
#include "http/httpclient.h"
#include "libaktualizr/types.h"
#include "test_utils.h"
Expand All @@ -19,30 +20,30 @@ TEST(CopyConstructorTest, copied) {
HttpClient http;
HttpClient http_copy(http);
std::string path = "/path/1/2/3";
Json::Value resp = http_copy.get(server + path, HttpInterface::kNoLimit).getJson();
Json::Value resp = http_copy.get(server + path, HttpInterface::kNoLimit, nullptr).getJson();
EXPECT_EQ(resp["path"].asString(), path);
}

TEST(GetTest, get_performed) {
HttpClient http;
std::string path = "/path/1/2/3";
Json::Value response = http.get(server + path, HttpInterface::kNoLimit).getJson();
Json::Value response = http.get(server + path, HttpInterface::kNoLimit, nullptr).getJson();
EXPECT_EQ(response["path"].asString(), path);
}

TEST(GetTestWithHeaders, get_performed) {
std::vector<std::string> headers = {"Authorization: Bearer token"};
HttpClient http(&headers);
std::string path = "/auth_call";
Json::Value response = http.get(server + path, HttpInterface::kNoLimit).getJson();
Json::Value response = http.get(server + path, HttpInterface::kNoLimit, nullptr).getJson();
EXPECT_EQ(response["status"].asString(), "good");
}

/* Reject http GET responses that exceed size limit. */
TEST(GetTest, download_size_limit) {
HttpClient http;
std::string path = "/large_file";
HttpResponse resp = http.get(server + path, 1024);
HttpResponse resp = http.get(server + path, 1024, nullptr);
std::cout << "RESP SIZE " << resp.body.length() << std::endl;
EXPECT_EQ(resp.curl_code, CURLE_FILESIZE_EXCEEDED);
}
Expand All @@ -53,10 +54,34 @@ TEST(GetTest, download_speed_limit) {
std::string path = "/slow_file";

http.overrideSpeedLimitParams(3, 5000);
HttpResponse resp = http.get(server + path, HttpInterface::kNoLimit);
HttpResponse resp = http.get(server + path, HttpInterface::kNoLimit, nullptr);
EXPECT_EQ(resp.curl_code, CURLE_OPERATION_TIMEDOUT);
}

TEST(GetTest, cancellation) {
HttpClient http;
std::string path = "/slow_file";
api::FlowControlToken token;
std::atomic<bool> did_abort;
auto end = std::chrono::steady_clock::now() + std::chrono::seconds(2);
std::thread t1([&token, end, &did_abort] {
std::this_thread::sleep_until(end);
token.setAbort();
did_abort = true;
});
HttpResponse resp = http.get(server + path, HttpInterface::kNoLimit, &token);
auto actual_end = std::chrono::steady_clock::now();
EXPECT_TRUE(did_abort);
auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(actual_end - end).count();

LOG_INFO << "Took:" << diff << "ms to abort";
// Curl takes ~2 seconds to call the progress meter and abort
EXPECT_LE(0, diff);
EXPECT_LE(diff, 3000);
EXPECT_EQ(resp.curl_code, CURLE_ABORTED_BY_CALLBACK);
t1.join();
}

TEST(PostTest, post_performed) {
HttpClient http;
std::string path = "/path/1/2/3";
Expand Down Expand Up @@ -85,7 +110,7 @@ TEST(HttpClient, user_agent) {
// test the default, when setUserAgent hasn't been called yet
HttpClient http;

const auto resp = http.get(server + "/user_agent", HttpInterface::kNoLimit);
const auto resp = http.get(server + "/user_agent", HttpInterface::kNoLimit, nullptr);
const auto app = resp.body.substr(0, resp.body.find('/'));
EXPECT_EQ(app, "Aktualizr");
}
Expand All @@ -95,7 +120,7 @@ TEST(HttpClient, user_agent) {
{
HttpClient http;

auto resp = http.get(server + "/user_agent", HttpInterface::kNoLimit);
auto resp = http.get(server + "/user_agent", HttpInterface::kNoLimit, nullptr);
EXPECT_EQ(resp.body, "blah");
}
}
Expand All @@ -107,11 +132,11 @@ TEST(Headers, update_header) {
ASSERT_FALSE(http.updateHeader("NOSUCHHEADER", "foo"));

std::string path = "/auth_call";
std::string body = http.get(server + path, HttpInterface::kNoLimit).body;
std::string body = http.get(server + path, HttpInterface::kNoLimit, nullptr).body;
EXPECT_EQ(body, "{}");

ASSERT_TRUE(http.updateHeader("Authorization", "Bearer token"));
Json::Value response = http.get(server + path, HttpInterface::kNoLimit).getJson();
Json::Value response = http.get(server + path, HttpInterface::kNoLimit, nullptr).getJson();
EXPECT_EQ(response["status"].asString(), "good");
}

Expand Down
4 changes: 3 additions & 1 deletion src/libaktualizr/http/httpinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "libaktualizr/types.h"
#include "logging/logging.h"
#include "utilities/flow_control.h"
#include "utilities/utils.h"

using CurlHandler = std::shared_ptr<CURL>;
Expand Down Expand Up @@ -39,7 +40,8 @@ class HttpInterface {
public:
HttpInterface() = default;
virtual ~HttpInterface() = default;
virtual HttpResponse get(const std::string &url, int64_t maxsize) = 0;
virtual HttpResponse get(const std::string &url, int64_t maxsize, const api::FlowControlToken *flow_control) = 0;
HttpResponse get(const std::string &url, int64_t maxsize) { return get(url, maxsize, nullptr); }
virtual HttpResponse post(const std::string &url, const std::string &content_type, const std::string &data) = 0;
virtual HttpResponse post(const std::string &url, const Json::Value &data) = 0;
virtual HttpResponse put(const std::string &url, const std::string &content_type, const std::string &data) = 0;
Expand Down
5 changes: 2 additions & 3 deletions src/libaktualizr/primary/aktualizr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Aktualizr::Aktualizr(Config config, std::shared_ptr<INvStorage> storage_in,
storage_ = move(storage_in);
storage_->importData(config_.import);

uptane_client_ = std::make_shared<SotaUptaneClient>(config_, storage_, http_in, sig_);
uptane_client_ = std::make_shared<SotaUptaneClient>(config_, storage_, http_in, sig_, api_queue_->FlowControlToken());
}

Aktualizr::~Aktualizr() { api_queue_.reset(nullptr); }
Expand Down Expand Up @@ -161,8 +161,7 @@ std::future<result::UpdateCheck> Aktualizr::CheckUpdates() {
}

std::future<result::Download> Aktualizr::Download(const std::vector<Uptane::Target> &updates) {
std::function<result::Download(const api::FlowControlToken *)> task(
[this, updates](const api::FlowControlToken *token) { return uptane_client_->downloadImages(updates, token); });
std::function<result::Download()> task([this, updates]() { return uptane_client_->downloadImages(updates); });
return api_queue_->enqueue(move(task));
}

Expand Down
Loading

0 comments on commit 91ead67

Please sign in to comment.