Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce finer-grained cancellation of operations #107

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions include/libaktualizr/secondaryinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,20 @@ class SecondaryInterface {
virtual int32_t getRootVersion(bool director) const = 0;
virtual data::InstallationResult putRoot(const std::string& root, bool director) = 0;

virtual data::InstallationResult sendFirmware(const Uptane::Target& target) = 0;
virtual data::InstallationResult install(const Uptane::Target& target) = 0;
/**
* Send firmware to a device. This operation should be both idempotent and
* not commit to installing the new version. Where practical, the
* implementation should pre-flight the installation and report errors now,
* while the entire installation can be cleanly aborted.
* Failures reported later (during SecondaryInterface::install()) can leave
* a multi-ecu update partially applied.
*/
virtual data::InstallationResult sendFirmware(const Uptane::Target& target,
const api::FlowControlToken* flow_control) = 0;
/**
* Commit to installing an update.
*/
virtual data::InstallationResult install(const Uptane::Target& target, const api::FlowControlToken* flow_control) = 0;

protected:
SecondaryInterface(const SecondaryInterface&) = default;
Expand Down
2 changes: 2 additions & 0 deletions include/libaktualizr/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ struct ResultCode {
kNeedCompletion = 21,
// Customer specific
kCustomError = 22,
// The operation was explicitly cancelled, either by an offline update or explicit user choice
kOperationCancelled = 23,
// Unknown
kUnknown = -1,
};
Expand Down
2 changes: 1 addition & 1 deletion src/aktualizr_get/get.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ std::string aktualizrGet(Config &config, const std::string &url, const std::vect
auto client = std_::make_unique<HttpClient>(&headers);
KeyManager keys(storage, config.keymanagerConfig());
keys.copyCertsToCurl(*client);
auto resp = client->get(url, HttpInterface::kNoLimit);
auto resp = client->get(url, HttpInterface::kNoLimit, nullptr);
if (resp.http_status_code != 200) {
throw std::runtime_error("Unable to get " + url + ": HTTP_" + std::to_string(resp.http_status_code) + "\n" +
resp.body);
Expand Down
4 changes: 2 additions & 2 deletions src/aktualizr_secondary/aktualizr_secondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ data::InstallationResult AktualizrSecondary::verifyMetadata(const Uptane::Second
// 4. NOT SUPPORTED: Download and check the Snapshot metadata file from the Director repository.
// 5. Download and check the Targets metadata file from the Director repository.
try {
director_repo_.updateMeta(*storage_, metadata);
director_repo_.updateMeta(*storage_, metadata, nullptr);
} catch (const std::exception& e) {
LOG_ERROR << "Failed to update Director metadata: " << e.what();
return data::InstallationResult(data::ResultCode::Numeric::kVerificationFailed,
Expand All @@ -96,7 +96,7 @@ data::InstallationResult AktualizrSecondary::verifyMetadata(const Uptane::Second
// 8. Download and check the Snapshot metadata file from the Image repository.
// 9. Download and check the top-level Targets metadata file from the Image repository.
try {
image_repo_.updateMeta(*storage_, metadata);
image_repo_.updateMeta(*storage_, metadata, nullptr);
} catch (const std::exception& e) {
LOG_ERROR << "Failed to update Image repo metadata: " << e.what();
return data::InstallationResult(data::ResultCode::Numeric::kVerificationFailed,
Expand Down
14 changes: 8 additions & 6 deletions src/aktualizr_secondary/secondary_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -583,15 +583,16 @@ class SecondaryRpcCommon : public ::testing::Test {
verifyMetadata(secondary_.metadata());
}

result = ip_secondary_->sendFirmware(target);
// Cancelling not supported at the moment
result = ip_secondary_->sendFirmware(target, nullptr);
if (handler_version == HandlerVersion::kV2Failure) {
EXPECT_EQ(result.result_code, data::ResultCode::Numeric::kDownloadFailed);
EXPECT_EQ(result.description, secondary_.upload_data_failure);
} else {
EXPECT_TRUE(result.isSuccess());
}

result = ip_secondary_->install(target);
result = ip_secondary_->install(target, nullptr);
if (handler_version == HandlerVersion::kV2Failure) {
EXPECT_EQ(result.result_code, data::ResultCode::Numeric::kInstallFailed);
EXPECT_EQ(result.description, secondary_.installation_failure);
Expand All @@ -617,15 +618,16 @@ class SecondaryRpcCommon : public ::testing::Test {
verifyMetadata(secondary_.metadata());
}

result = ip_secondary_->sendFirmware(target);
// Cancelling not supported at the moment
result = ip_secondary_->sendFirmware(target, nullptr);
if (handler_version == HandlerVersion::kV2Failure) {
EXPECT_EQ(result.result_code, data::ResultCode::Numeric::kDownloadFailed);
EXPECT_EQ(result.description, secondary_.ostree_failure);
} else {
EXPECT_TRUE(result.isSuccess());
}

result = ip_secondary_->install(target);
result = ip_secondary_->install(target, nullptr);
if (handler_version == HandlerVersion::kV2Failure) {
EXPECT_EQ(result.result_code, data::ResultCode::Numeric::kInstallFailed);
EXPECT_EQ(result.description, secondary_.installation_failure);
Expand Down Expand Up @@ -810,8 +812,8 @@ TEST(SecondaryTcpServer, TestIpSecondaryIfSecondaryIsNotRunning) {
EXPECT_FALSE(ip_secondary->putRoot("director-root-v2", true).isSuccess());
EXPECT_FALSE(ip_secondary->putRoot("image-root-v2", false).isSuccess());
EXPECT_FALSE(ip_secondary->putMetadata(target).isSuccess());
EXPECT_FALSE(ip_secondary->sendFirmware(target).isSuccess());
EXPECT_FALSE(ip_secondary->install(target).isSuccess());
EXPECT_FALSE(ip_secondary->sendFirmware(target, nullptr).isSuccess());
EXPECT_FALSE(ip_secondary->install(target, nullptr).isSuccess());
}

/* This class returns a positive result for every message. The test cases verify
Expand Down
1 change: 1 addition & 0 deletions src/libaktualizr-posix/asn1/messages/ipuptane_message.asn1
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ IpUptane DEFINITIONS ::= BEGIN
generalError(19),
needCompletion(21),
customError(22),
operationCancelled(23),
unknown(-1),
...
}
Expand Down
35 changes: 24 additions & 11 deletions src/libaktualizr-posix/ipuptanesecondary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "libaktualizr/secondary_provider.h"
#include "logging/logging.h"
#include "uptane/tuf.h"
#include "utilities/flow_control.h"
#include "utilities/utils.h"

namespace Uptane {
Expand Down Expand Up @@ -358,18 +359,23 @@ bool IpUptaneSecondary::ping() const {
return resp->present() == AKIpUptaneMes_PR_getInfoResp;
}

data::InstallationResult IpUptaneSecondary::sendFirmware(const Uptane::Target& target) {
data::InstallationResult send_result;
data::InstallationResult IpUptaneSecondary::sendFirmware(const Uptane::Target& target,
const api::FlowControlToken* flow_control) {
if (flow_control != nullptr && flow_control->hasAborted()) {
// TODO: Add an abort message to the IPUptane protocol and allow aborts
// during firmware transfer
return data::InstallationResult(data::ResultCode::Numeric::kOperationCancelled, "");
}

if (protocol_version == 2) {
send_result = sendFirmware_v2(target);
} else if (protocol_version == 1) {
send_result = sendFirmware_v1(target);
} else {
LOG_ERROR << "Unexpected protocol version: " << protocol_version;
send_result = data::InstallationResult(data::ResultCode::Numeric::kInternalError,
"Unexpected protocol version: " + std::to_string(protocol_version));
return sendFirmware_v2(target);
}
if (protocol_version == 1) {
return sendFirmware_v1(target);
}
return send_result;
LOG_ERROR << "Unexpected protocol version: " << protocol_version;
return data::InstallationResult(data::ResultCode::Numeric::kInternalError,
"Unexpected protocol version: " + std::to_string(protocol_version));
}

data::InstallationResult IpUptaneSecondary::sendFirmware_v1(const Uptane::Target& target) {
Expand Down Expand Up @@ -416,7 +422,14 @@ data::InstallationResult IpUptaneSecondary::sendFirmware_v2(const Uptane::Target
}
}

data::InstallationResult IpUptaneSecondary::install(const Uptane::Target& target) {
data::InstallationResult IpUptaneSecondary::install(const Uptane::Target& target,
const api::FlowControlToken* flow_control) {
if (flow_control != nullptr && flow_control->hasAborted()) {
// TODO: Add an abort message to the IPUptane protocol and allow aborts
// during installation
return data::InstallationResult(data::ResultCode::Numeric::kOperationCancelled, "");
}

data::InstallationResult install_result;
if (protocol_version == 2) {
install_result = install_v2(target);
Expand Down
5 changes: 3 additions & 2 deletions src/libaktualizr-posix/ipuptanesecondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ class IpUptaneSecondary : public SecondaryInterface {
data::InstallationResult putRoot(const std::string& root, bool director) override;
Manifest getManifest() const override;
bool ping() const override;
data::InstallationResult sendFirmware(const Uptane::Target& target) override;
data::InstallationResult install(const Uptane::Target& target) override;
data::InstallationResult sendFirmware(const Uptane::Target& target,
const api::FlowControlToken* flow_control) override;
data::InstallationResult install(const Uptane::Target& target, const api::FlowControlToken* flow_control) override;

private:
const std::pair<std::string, uint16_t>& getAddr() const { return addr_; }
Expand Down
33 changes: 32 additions & 1 deletion src/libaktualizr/http/httpclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,30 @@ static size_t writeString(void* contents, size_t size, size_t nmemb, void* userp
return size * nmemb;
}

static int ProgressHandler(void* clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
(void)dltotal;
(void)dlnow;
(void)ultotal;
(void)ulnow;
if (clientp == nullptr) {
LOG_ERROR << "ProgressHandler given null user pointer";
return 0; // Let the download continue
}

auto* token = static_cast<api::FlowControlToken*>(clientp);

if (!token->IsValid()) {
LOG_ERROR << "ProgressHandler given a thing that isn't a FlowControlToken";
return 0;
}

if (token->hasAborted()) {
// Abort download
return 1;
}
return 0;
}

HttpClient::HttpClient(const std::vector<std::string>* extra_headers) {
curl = curl_easy_init();
if (curl == nullptr) {
Expand Down Expand Up @@ -124,7 +148,7 @@ void HttpClient::setCerts(const std::string& ca, CryptoSource ca_source, const s
pkcs11_key = (pkey_source == CryptoSource::kPkcs11);
}

HttpResponse HttpClient::get(const std::string& url, int64_t maxsize) {
HttpResponse HttpClient::get(const std::string& url, int64_t maxsize, const api::FlowControlToken* flow_control) {
CURL* curl_get = Utils::curlDupHandleWrapper(curl, pkcs11_key);

curlEasySetoptWrapper(curl_get, CURLOPT_HTTPHEADER, headers);
Expand All @@ -138,6 +162,13 @@ HttpResponse HttpClient::get(const std::string& url, int64_t maxsize) {
curlEasySetoptWrapper(curl_get, CURLOPT_POSTFIELDS, "");
curlEasySetoptWrapper(curl_get, CURLOPT_URL, url.c_str());
curlEasySetoptWrapper(curl_get, CURLOPT_HTTPGET, 1L);
if (flow_control != nullptr) {
// Handle cancellation
curlEasySetoptWrapper(curl_get, CURLOPT_NOPROGRESS, 0);
curlEasySetoptWrapper(curl_get, CURLOPT_XFERINFOFUNCTION, ProgressHandler);
curlEasySetoptWrapper(curl_get, CURLOPT_XFERINFODATA, flow_control);
}

LOG_DEBUG << "GET " << url;
HttpResponse response = perform(curl_get, RETRY_TIMES, maxsize);
curl_easy_cleanup(curl_get);
Expand Down
2 changes: 1 addition & 1 deletion src/libaktualizr/http/httpclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class HttpClient : public HttpInterface {
HttpClient(HttpClient &&) = default;
HttpClient &operator=(const HttpClient &) = delete;
HttpClient &operator=(HttpClient &&) = default;
HttpResponse get(const std::string &url, int64_t maxsize) override;
HttpResponse get(const std::string &url, int64_t maxsize, const api::FlowControlToken *flow_control) override;
HttpResponse post(const std::string &url, const std::string &content_type, const std::string &data) override;
HttpResponse post(const std::string &url, const Json::Value &data) override;
HttpResponse put(const std::string &url, const std::string &content_type, const std::string &data) override;
Expand Down
43 changes: 34 additions & 9 deletions src/libaktualizr/http/httpclient_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "json/json.h"

#include <chrono>
#include "http/httpclient.h"
#include "libaktualizr/types.h"
#include "test_utils.h"
Expand All @@ -19,30 +20,30 @@ TEST(CopyConstructorTest, copied) {
HttpClient http;
HttpClient http_copy(http);
std::string path = "/path/1/2/3";
Json::Value resp = http_copy.get(server + path, HttpInterface::kNoLimit).getJson();
Json::Value resp = http_copy.get(server + path, HttpInterface::kNoLimit, nullptr).getJson();
EXPECT_EQ(resp["path"].asString(), path);
}

TEST(GetTest, get_performed) {
HttpClient http;
std::string path = "/path/1/2/3";
Json::Value response = http.get(server + path, HttpInterface::kNoLimit).getJson();
Json::Value response = http.get(server + path, HttpInterface::kNoLimit, nullptr).getJson();
EXPECT_EQ(response["path"].asString(), path);
}

TEST(GetTestWithHeaders, get_performed) {
std::vector<std::string> headers = {"Authorization: Bearer token"};
HttpClient http(&headers);
std::string path = "/auth_call";
Json::Value response = http.get(server + path, HttpInterface::kNoLimit).getJson();
Json::Value response = http.get(server + path, HttpInterface::kNoLimit, nullptr).getJson();
EXPECT_EQ(response["status"].asString(), "good");
}

/* Reject http GET responses that exceed size limit. */
TEST(GetTest, download_size_limit) {
HttpClient http;
std::string path = "/large_file";
HttpResponse resp = http.get(server + path, 1024);
HttpResponse resp = http.get(server + path, 1024, nullptr);
std::cout << "RESP SIZE " << resp.body.length() << std::endl;
EXPECT_EQ(resp.curl_code, CURLE_FILESIZE_EXCEEDED);
}
Expand All @@ -53,10 +54,34 @@ TEST(GetTest, download_speed_limit) {
std::string path = "/slow_file";

http.overrideSpeedLimitParams(3, 5000);
HttpResponse resp = http.get(server + path, HttpInterface::kNoLimit);
HttpResponse resp = http.get(server + path, HttpInterface::kNoLimit, nullptr);
EXPECT_EQ(resp.curl_code, CURLE_OPERATION_TIMEDOUT);
}

TEST(GetTest, cancellation) {
HttpClient http;
std::string path = "/slow_file";
api::FlowControlToken token;
std::atomic<bool> did_abort;
auto end = std::chrono::steady_clock::now() + std::chrono::seconds(2);
std::thread t1([&token, end, &did_abort] {
std::this_thread::sleep_until(end);
token.setAbort();
did_abort = true;
});
HttpResponse resp = http.get(server + path, HttpInterface::kNoLimit, &token);
auto actual_end = std::chrono::steady_clock::now();
EXPECT_TRUE(did_abort);
auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(actual_end - end).count();

LOG_INFO << "Took:" << diff << "ms to abort";
// Curl takes ~2 seconds to call the progress meter and abort
EXPECT_LE(0, diff);
EXPECT_LE(diff, 3000);
EXPECT_EQ(resp.curl_code, CURLE_ABORTED_BY_CALLBACK);
t1.join();
}

TEST(PostTest, post_performed) {
HttpClient http;
std::string path = "/path/1/2/3";
Expand Down Expand Up @@ -85,7 +110,7 @@ TEST(HttpClient, user_agent) {
// test the default, when setUserAgent hasn't been called yet
HttpClient http;

const auto resp = http.get(server + "/user_agent", HttpInterface::kNoLimit);
const auto resp = http.get(server + "/user_agent", HttpInterface::kNoLimit, nullptr);
const auto app = resp.body.substr(0, resp.body.find('/'));
EXPECT_EQ(app, "Aktualizr");
}
Expand All @@ -95,7 +120,7 @@ TEST(HttpClient, user_agent) {
{
HttpClient http;

auto resp = http.get(server + "/user_agent", HttpInterface::kNoLimit);
auto resp = http.get(server + "/user_agent", HttpInterface::kNoLimit, nullptr);
EXPECT_EQ(resp.body, "blah");
}
}
Expand All @@ -107,11 +132,11 @@ TEST(Headers, update_header) {
ASSERT_FALSE(http.updateHeader("NOSUCHHEADER", "foo"));

std::string path = "/auth_call";
std::string body = http.get(server + path, HttpInterface::kNoLimit).body;
std::string body = http.get(server + path, HttpInterface::kNoLimit, nullptr).body;
EXPECT_EQ(body, "{}");

ASSERT_TRUE(http.updateHeader("Authorization", "Bearer token"));
Json::Value response = http.get(server + path, HttpInterface::kNoLimit).getJson();
Json::Value response = http.get(server + path, HttpInterface::kNoLimit, nullptr).getJson();
EXPECT_EQ(response["status"].asString(), "good");
}

Expand Down
4 changes: 3 additions & 1 deletion src/libaktualizr/http/httpinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "libaktualizr/types.h"
#include "logging/logging.h"
#include "utilities/flow_control.h"
#include "utilities/utils.h"

using CurlHandler = std::shared_ptr<CURL>;
Expand Down Expand Up @@ -39,7 +40,8 @@ class HttpInterface {
public:
HttpInterface() = default;
virtual ~HttpInterface() = default;
virtual HttpResponse get(const std::string &url, int64_t maxsize) = 0;
virtual HttpResponse get(const std::string &url, int64_t maxsize, const api::FlowControlToken *flow_control) = 0;
HttpResponse get(const std::string &url, int64_t maxsize) { return get(url, maxsize, nullptr); }
virtual HttpResponse post(const std::string &url, const std::string &content_type, const std::string &data) = 0;
virtual HttpResponse post(const std::string &url, const Json::Value &data) = 0;
virtual HttpResponse put(const std::string &url, const std::string &content_type, const std::string &data) = 0;
Expand Down
5 changes: 2 additions & 3 deletions src/libaktualizr/primary/aktualizr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Aktualizr::Aktualizr(Config config, std::shared_ptr<INvStorage> storage_in,
storage_ = move(storage_in);
storage_->importData(config_.import);

uptane_client_ = std::make_shared<SotaUptaneClient>(config_, storage_, http_in, sig_);
uptane_client_ = std::make_shared<SotaUptaneClient>(config_, storage_, http_in, sig_, api_queue_->FlowControlToken());
}

Aktualizr::~Aktualizr() { api_queue_.reset(nullptr); }
Expand Down Expand Up @@ -161,8 +161,7 @@ std::future<result::UpdateCheck> Aktualizr::CheckUpdates() {
}

std::future<result::Download> Aktualizr::Download(const std::vector<Uptane::Target> &updates) {
std::function<result::Download(const api::FlowControlToken *)> task(
[this, updates](const api::FlowControlToken *token) { return uptane_client_->downloadImages(updates, token); });
std::function<result::Download()> task([this, updates]() { return uptane_client_->downloadImages(updates); });
return api_queue_->enqueue(move(task));
}

Expand Down
Loading
Loading