Skip to content

Commit

Permalink
core: fix burst implementation
Browse files Browse the repository at this point in the history
This fixes the burst implementation which was implemented according to
the docs rather than the existing implementations out there (like
ArduPilot, MissionPlanner, and QGroundControl.

The difference is that the size field is used to signal the size of each
burst packet rather than the overall burst. This means a burst transfer
always has to be the whole file and can't be a partial file.

Signed-off-by: Julian Oes <[email protected]>
  • Loading branch information
julianoes committed Nov 10, 2023
1 parent 54f643c commit 9e385b6
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 31 deletions.
124 changes: 112 additions & 12 deletions src/mavsdk/core/mavlink_ftp_client.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "mavlink_ftp_client.h"
#include "system_impl.h"
#include "plugin_base.h"
#include "unused.h"
#include <algorithm>
#include <fstream>
#include <filesystem>
Expand Down Expand Up @@ -180,7 +181,8 @@ void MavlinkFtpClient::process_mavlink_ftp_message(const mavlink_message_t& msg)
[&](DownloadBurstItem& item) {
if (payload->opcode == RSP_ACK) {
if (payload->req_opcode == CMD_OPEN_FILE_RO ||
payload->req_opcode == CMD_BURST_READ_FILE) {
payload->req_opcode == CMD_BURST_READ_FILE ||
payload->req_opcode == CMD_READ_FILE) {
// Whenever we do get an ack,
// reset the retry counter.
work->retries = RETRIES;
Expand Down Expand Up @@ -506,7 +508,7 @@ bool MavlinkFtpClient::download_burst_continue(
LogDebug() << "Burst Download continue, got file size: " << file_size;
}

request_next_burst(work, item);
request_burst(work, item);
return true;

} else if (payload->req_opcode == CMD_BURST_READ_FILE) {
Expand Down Expand Up @@ -535,8 +537,12 @@ bool MavlinkFtpClient::download_burst_continue(
item.transferred[i] = DownloadBurstItem::Transferred::Yes;
}

const size_t bytes_transferred = std::count(
item.transferred.begin(), item.transferred.end(), DownloadBurstItem::Transferred::Yes);
if (_debugging) {
LogDebug() << "Received " << payload->offset << " to "
<< payload->size + payload->offset;
}

const size_t bytes_transferred = burst_bytes_transferred(item);

if (bytes_transferred == item.transferred.size()) {
if (_debugging) {
Expand Down Expand Up @@ -567,8 +573,9 @@ bool MavlinkFtpClient::download_burst_continue(
static_cast<uint32_t>(item.transferred.size())});

if (payload->burst_complete) {
// The burst is supposedly complete but we still need data, so request next burst.
request_next_burst(work, item);
// The burst is supposedly complete but we still need data, so request next without
// burst.
request_next_rest(work, item);

} else {
// There might be more coming, just wait for now.
Expand All @@ -578,13 +585,88 @@ bool MavlinkFtpClient::download_burst_continue(
return true;
}

} else if (payload->req_opcode == CMD_READ_FILE) {
if (_debugging) {
LogWarn() << "Burst download continue missing pieces, write at " << payload->offset
<< " for " << std::to_string(payload->size);
}

item.ofstream.seekp(payload->offset);
if (item.ofstream.fail()) {
LogWarn() << "Seek failed";
item.callback(ClientResult::FileIoError, {});
return false;
}

item.ofstream.write(reinterpret_cast<const char*>(payload->data), payload->size);
if (!item.ofstream) {
item.callback(ClientResult::FileIoError, {});
return false;
}

// Keep track of what was written.
for (size_t i = payload->offset; i < payload->offset + payload->size; ++i) {
item.transferred[i] = DownloadBurstItem::Transferred::Yes;
}

const size_t bytes_transferred = burst_bytes_transferred(item);

if (_debugging) {
LogDebug() << "Written " << bytes_transferred << " of " << item.transferred.size()
<< " bytes";
}

if (bytes_transferred == item.transferred.size()) {
// Final step
work.last_opcode = CMD_TERMINATE_SESSION;

work.payload = {};
work.payload.seq_number = work.last_sent_seq_number++;
work.payload.session = _session;

work.payload.opcode = work.last_opcode;
work.payload.offset = 0;
work.payload.size = 0;

start_timer();
send_mavlink_ftp_message(work.payload);
return true;
} else {
item.callback(
ClientResult::Next,
ProgressData{
static_cast<uint32_t>(bytes_transferred),
static_cast<uint32_t>(item.transferred.size())});

request_next_rest(work, item);
return true;
}

} else {
LogErr() << "Unexpected req_opcode";
return false;
}
}

void MavlinkFtpClient::request_next_burst(Work& work, DownloadBurstItem& item)
void MavlinkFtpClient::request_burst(Work& work, DownloadBurstItem& item)
{
UNUSED(item);

work.last_opcode = CMD_BURST_READ_FILE;
work.payload = {};
work.payload.seq_number = work.last_sent_seq_number++;
work.payload.session = _session;
work.payload.opcode = work.last_opcode;
work.payload.offset = 0;

// Fill up the whole packet.
work.payload.size = max_data_length;

start_timer();
send_mavlink_ftp_message(work.payload);
}

void MavlinkFtpClient::request_next_rest(Work& work, DownloadBurstItem& item)
{
const auto first_missing = std::find(
item.transferred.begin(), item.transferred.end(), DownloadBurstItem::Transferred::No);
Expand All @@ -597,6 +679,7 @@ void MavlinkFtpClient::request_next_burst(Work& work, DownloadBurstItem& item)
std::find(first_missing, item.transferred.end(), DownloadBurstItem::Transferred::Yes);

const size_t offset = std::distance(item.transferred.begin(), first_missing);

const uint32_t size =
static_cast<uint32_t>(std::distance(first_missing, last_missing_plus_one));

Expand All @@ -605,20 +688,30 @@ void MavlinkFtpClient::request_next_burst(Work& work, DownloadBurstItem& item)
return;
}

work.last_opcode = CMD_BURST_READ_FILE;
if (_debugging) {
LogDebug() << "Re-requesting from " << offset << " with size " << size;
}

work.last_opcode = CMD_READ_FILE;
work.payload = {};
work.payload.seq_number = work.last_sent_seq_number++;
work.payload.session = _session;
work.payload.opcode = work.last_opcode;
work.payload.offset = offset;

work.payload.size = 4;
std::memcpy(&work.payload.data, &size, 4);
work.payload.size =
static_cast<uint8_t>(std::min(static_cast<uint32_t>(max_data_length), size));

start_timer();
send_mavlink_ftp_message(work.payload);
}

size_t MavlinkFtpClient::burst_bytes_transferred(DownloadBurstItem& item)
{
return std::count(
item.transferred.begin(), item.transferred.end(), DownloadBurstItem::Transferred::Yes);
}

bool MavlinkFtpClient::upload_start(Work& work, UploadItem& item)
{
std::error_code ec;
Expand Down Expand Up @@ -1117,8 +1210,15 @@ void MavlinkFtpClient::timeout()
LogDebug() << "Retries left: " << work->retries;
}

start_timer();
send_mavlink_ftp_message(work->payload);
{
const size_t bytes_transferred = burst_bytes_transferred(item);
if (bytes_transferred == 0 || bytes_transferred == item.transferred.size()) {
start_timer();
send_mavlink_ftp_message(work->payload);
} else {
request_next_rest(*work, item);
}
}
},
[&](UploadItem& item) {
if (--work->retries == 0) {
Expand Down
4 changes: 3 additions & 1 deletion src/mavsdk/core/mavlink_ftp_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ class MavlinkFtpClient {

bool download_burst_start(Work& work, DownloadBurstItem& item);
bool download_burst_continue(Work& work, DownloadBurstItem& item, PayloadHeader* payload);
void request_next_burst(Work& work, DownloadBurstItem& item);
void request_burst(Work& work, DownloadBurstItem& item);
void request_next_rest(Work& work, DownloadBurstItem& item);
size_t burst_bytes_transferred(DownloadBurstItem& item);

bool upload_start(Work& work, UploadItem& item);
bool upload_continue(Work& work, UploadItem& item);
Expand Down
26 changes: 9 additions & 17 deletions src/mavsdk/core/mavlink_ftp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,10 @@ void MavlinkFtpServer::_work_read(const PayloadHeader& payload)
return;
}

if (_debugging) {
LogWarn() << "Read at " << payload.offset << " for " << int(payload.size);
}

_session_info.ifstream.read(reinterpret_cast<char*>(response.data), payload.size);

if (_session_info.ifstream.fail()) {
Expand All @@ -720,6 +724,7 @@ void MavlinkFtpServer::_work_read(const PayloadHeader& payload)

const uint32_t bytes_read = _session_info.ifstream.gcount();

response.offset = payload.offset;
response.size = bytes_read;
response.opcode = Opcode::RSP_ACK;

Expand Down Expand Up @@ -764,20 +769,7 @@ void MavlinkFtpServer::_work_burst(const PayloadHeader& payload)
}

_session_info.burst_offset = payload.offset;

if (payload.size != 4) {
response.seq_number = payload.seq_number + 1;
response.opcode = Opcode::RSP_NAK;
response.size = 1;
response.data[0] = ServerResult::ERR_INVALID_DATA_SIZE;
LogErr() << "Burst size invalid";
_send_mavlink_ftp_message(response);
return;
}

uint32_t burst_size;
std::memcpy(&burst_size, &payload.data, payload.size);
_session_info.burst_end = _session_info.burst_offset + burst_size;
_session_info.burst_chunk_size = payload.size;

_burst_seq = payload.seq_number + 1;

Expand Down Expand Up @@ -815,8 +807,8 @@ void MavlinkFtpServer::_send_burst_packet()
void MavlinkFtpServer::_make_burst_packet(PayloadHeader& packet)
{
uint32_t bytes_to_read = std::min(
static_cast<uint32_t>(max_data_length),
_session_info.burst_end - _session_info.burst_offset);
static_cast<uint32_t>(_session_info.burst_chunk_size),
_session_info.file_size - _session_info.burst_offset);

if (_debugging) {
LogDebug() << "Burst read of " << bytes_to_read << " bytes";
Expand All @@ -838,7 +830,7 @@ void MavlinkFtpServer::_make_burst_packet(PayloadHeader& packet)
packet.offset = _session_info.burst_offset;
_session_info.burst_offset += bytes_read;

if (_session_info.burst_offset == _session_info.burst_end) {
if (_session_info.burst_offset == _session_info.file_size) {
// Last read, we are done for this burst.
packet.burst_complete = 1;
if (_debugging) {
Expand Down
2 changes: 1 addition & 1 deletion src/mavsdk/core/mavlink_ftp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class MavlinkFtpServer {
struct SessionInfo {
uint32_t file_size{0};
uint32_t burst_offset{0};
uint32_t burst_end{0};
uint8_t burst_chunk_size{0};
std::ifstream ifstream;
std::ofstream ofstream;
} _session_info{};
Expand Down

0 comments on commit 9e385b6

Please sign in to comment.