Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent retries #8

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions lib/nerves_hub_link_common/downloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ defmodule NervesHubLinkCommon.Downloader do
require Logger
use GenServer

alias NervesHubLinkCommon.{Downloader, Downloader.RetryConfig, Downloader.TimeoutCalculation}
alias NervesHubLinkCommon.{
Downloader,
Downloader.RetryConfig,
Downloader.TimeoutCalculation
}

defstruct uri: nil,
conn: nil,
Expand Down Expand Up @@ -91,11 +95,8 @@ defmodule NervesHubLinkCommon.Downloader do
iex> flush()
:complete
"""
@spec start_download(String.t() | URI.t(), event_handler_fun()) :: GenServer.on_start()
def start_download(url, fun) when is_function(fun, 1) do
GenServer.start_link(__MODULE__, [URI.parse(url), fun, %RetryConfig{}])
end

@spec start_download(String.t() | URI.t(), event_handler_fun(), RetryConfig.t()) ::
GenServer.on_start()
def start_download(url, fun, %RetryConfig{} = retry_args) when is_function(fun, 1) do
GenServer.start_link(__MODULE__, [URI.parse(url), fun, retry_args])
end
Expand Down
12 changes: 10 additions & 2 deletions lib/nerves_hub_link_common/downloader/retry_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ defmodule NervesHubLinkCommon.Downloader.RetryConfig do
# worst case average download speed in bits/second
# This is used to calculate a "sensible" timeout that is shorter than `max_timeout`.
# LTE Cat M1 modems sometimes top out at 32 kbps (30 kbps for some slack)
worst_case_download_speed: 30_000
worst_case_download_speed: 30_000,
content_length: 0
]

@typedoc """
Expand Down Expand Up @@ -59,11 +60,18 @@ defmodule NervesHubLinkCommon.Downloader.RetryConfig do
"""
@type worst_case_download_speed :: non_neg_integer()

@typedoc """
Gets translated into a range request header.
Useful for restarting downloads from a specific position
"""
@type content_length() :: non_neg_integer()

@type t :: %__MODULE__{
max_disconnects: max_disconnects(),
idle_timeout: idle_timeout(),
max_timeout: max_timeout(),
time_between_retries: time_between_retries(),
worst_case_download_speed: worst_case_download_speed()
worst_case_download_speed: worst_case_download_speed(),
content_length: content_length()
}
end
16 changes: 14 additions & 2 deletions lib/nerves_hub_link_common/fwup_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ defmodule NervesHubLinkCommon.FwupConfig do
Config structure responsible for handling callbacks from FWUP,
applying a fwupdate, and storing fwup task configuration
"""
alias NervesHubLinkCommon.UpdatePayload

defstruct fwup_public_keys: [],
fwup_devpath: "/dev/mmcblk0",
handle_fwup_message: nil,
update_available: nil
update_available: nil,
journal_location: "/tmp/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
journal_location: "/tmp/"
journal_location: "/data/nerves_hub_link.partial"

I'm just making up the journal location so other names work for me too. What I'd like to suggest are:

  1. Putting it on a persistent filesystem so that it can survive a reboot
  2. Only having one file to make it easy to figure out where to find it and easy to clean up

I haven't gotten to it yet, but I'm guessing that the use of one file might have other ramifications. I do like the simplicity and limiting the number of files that it can create.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i only defaulted it to /tmp for the test suite since /data can't be written to on our host machines. I agree on keeping a single file, but i also wanted this to be a directory because there needs to be some way of identifying a partial download as a particular firmware after a reboot, which i was going to use the Filename for.
I also considered putting a header in the partial file, but am still thinking about that.


@typedoc """
`handle_fwup_message` will be called with this data
Expand All @@ -25,11 +28,13 @@ defmodule NervesHubLinkCommon.FwupConfig do
@typedoc """
Called when an update has been dispatched via `NervesHubLinkCommon.UpdateManager.apply_update/2`
"""
@type update_available_fun() :: (map() -> :ignore | {:reschedule, timeout()} | :apply)
@type update_available_fun() ::
(UpdatePayload.t() -> :ignore | {:reschedule, timeout()} | :apply)

@type t :: %__MODULE__{
fwup_public_keys: [String.t()],
fwup_devpath: Path.t(),
journal_location: Path.t(),
handle_fwup_message: handle_fwup_message_fun,
update_available: update_available_fun
}
Expand All @@ -41,6 +46,7 @@ defmodule NervesHubLinkCommon.FwupConfig do
|> validate_fwup_devpath!()
|> validate_handle_fwup_message!()
|> validate_update_available!()
|> validate_journal_location!()
end

defp validate_fwup_public_keys!(%__MODULE__{fwup_public_keys: list} = args) when is_list(list),
Expand Down Expand Up @@ -68,4 +74,10 @@ defmodule NervesHubLinkCommon.FwupConfig do

defp validate_update_available!(%__MODULE__{}),
do: raise(ArgumentError, message: "update_available function signature incorrect")

defp validate_journal_location!(%__MODULE__{journal_location: loc} = args) when is_binary(loc),
do: args

defp validate_journal_location!(%__MODULE__{}),
do: raise(ArgumentError, message: "invalid arg: journal_location")
end
101 changes: 101 additions & 0 deletions lib/nerves_hub_link_common/journal.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
defmodule NervesHubLinkCommon.Journal do
@moduledoc """
Simple journaling structure backed by a file on the filesystem
Stores data in chunks in the following format:
<<length::32, hash::binary-size(32)-unit(8), data::binary-size(length)-unit(8)>>
as chunks are streamed with `save_chunk/2` the data is updated both on disk and
in the structure. This can be used to rehydrate stateful events after a reboot, such as
a firmware update for example.
When opening an existing journal (done automatically if the journal exists),
the structure will validate all the chunks on disk, stopping on either
* the first chunk to fail a hash check
* the end of the file
In either case, the journal is valid to use at this point
"""

defstruct [:fd, :content_length, :chunks]

@type t :: %__MODULE__{
fd: :file.fd(),
content_length: non_neg_integer(),
chunks: [binary()]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about firmware images that exceed available memory. I haven't read the code completely, but keeping a chunk list makes me think they're being cached.

How about a setup where the firmware download process starts by pulling chunks from the journal and feeding them to fwup. When it runs out of journal chunks, it makes an http request for the rest?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah i think something along those lines will be required as well. I haven't thought of exactly how yet, but i like your idea.

}

@doc "Open or create a journal for this meta"
@spec open(Path.t()) :: {:ok, t()} | {:error, File.posix()}
def open(filename) do
with {:ok, fd} <- :file.open(filename, [:write, :read, :binary]),
{:ok, 0} <- :file.position(fd, 0),
{:ok, journal} <- validate_and_seek(%__MODULE__{fd: fd, content_length: 0, chunks: []}) do
{:ok, journal}
end
end

@spec reload(Path.t()) :: {:ok, t()} | {:error, File.posix()}
def reload(filename) do
if File.exists?(filename) do
open(filename)
else
{:error, :enoent}
end
end

@spec validate_and_seek(t()) :: {:ok, t()} | {:error, File.posix()}
def validate_and_seek(%__MODULE__{fd: fd, content_length: content_length} = journal) do
with {:ok, <<length::32>>} <- :file.read(fd, 4),
{:ok, hash} <- :file.read(fd, 32),
{:ok, data} <- :file.read(fd, length),
{:hash, ^length, ^hash} <- {:hash, length, :crypto.hash(:sha256, data)} do
validate_and_seek(%__MODULE__{
journal
| content_length: content_length + length,
chunks: journal.chunks ++ [data]
})
else
# made it thru all chunks in the file
:eof ->
{:ok, journal}

# hash check failed. rewind and break
{:hash, length, _} ->
rewind(journal, length + 32 + 4)

{:error, posix} ->
{:error, posix}
end
end

@spec rewind(t(), pos_integer()) :: {:ok, t()} | {:error, File.posix()}
def rewind(journal, length) do
with {:ok, _} <- :file.position(journal.fd, -length) do
{:ok, journal}
end
end

@spec close(t()) :: :ok
def close(%__MODULE__{fd: fd} = _journal) do
:ok = :file.close(fd)
end

@spec save_chunk(t(), iodata()) :: {:ok, t()} | {:error, File.posix()}
def save_chunk(%__MODULE__{fd: fd} = journal, data) when is_binary(data) do
hash = :crypto.hash(:sha256, data)
length = byte_size(data)
journal_entry = IO.iodata_to_binary([<<length::32>>, hash, data])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:file.write/2 takes iodata, so you can pass the list:

Suggested change
journal_entry = IO.iodata_to_binary([<<length::32>>, hash, data])
journal_entry = [<<length::32>>, hash, data]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, you're right, good catch


with :ok <- :file.write(fd, journal_entry) do
{:ok,
%__MODULE__{
journal
| chunks: journal.chunks ++ [data],
content_length: journal.content_length + length
}}
end
end
end
Loading