From abfbdf10483cba2749bc68ce26d3018f457cfd9c Mon Sep 17 00:00:00 2001 From: Connor Rigby Date: Wed, 10 Feb 2021 15:39:49 -0800 Subject: [PATCH] Implement simple Journaling system This enables fwupdates to resume even after a reboot --- .../downloader/retry_config.ex | 12 ++- lib/nerves_hub_link_common/fwup_config.ex | 11 +- lib/nerves_hub_link_common/journal.ex | 101 ++++++++++++++++++ lib/nerves_hub_link_common/update_manager.ex | 60 +++++++++-- test/nerves_hub_link_common/journal_test.exs | 32 ++++++ .../update_manager_test.exs | 10 +- 6 files changed, 213 insertions(+), 13 deletions(-) create mode 100644 lib/nerves_hub_link_common/journal.ex create mode 100644 test/nerves_hub_link_common/journal_test.exs diff --git a/lib/nerves_hub_link_common/downloader/retry_config.ex b/lib/nerves_hub_link_common/downloader/retry_config.ex index 118991b..bf6bcfe 100644 --- a/lib/nerves_hub_link_common/downloader/retry_config.ex +++ b/lib/nerves_hub_link_common/downloader/retry_config.ex @@ -24,7 +24,8 @@ defmodule NervesHubLinkCommon.Downloader.RetryConfig do # worst case average download speed in bits/second # This is used to calculate a "sensible" timeout that is shorter than `max_timeout`. # LTE Cat M1 modems sometimes top out at 32 kbps (30 kbps for some slack) - worst_case_download_speed: 30_000 + worst_case_download_speed: 30_000, + content_length: 0 ] @typedoc """ @@ -59,11 +60,18 @@ defmodule NervesHubLinkCommon.Downloader.RetryConfig do """ @type worst_case_download_speed :: non_neg_integer() + @typedoc """ + Gets translated into a range request header. + Useful for restarting downloads from a specific position + """ + @type content_length() :: non_neg_integer() + @type t :: %__MODULE__{ max_disconnects: max_disconnects(), idle_timeout: idle_timeout(), max_timeout: max_timeout(), time_between_retries: time_between_retries(), - worst_case_download_speed: worst_case_download_speed() + worst_case_download_speed: worst_case_download_speed(), + content_length: content_length() } end diff --git a/lib/nerves_hub_link_common/fwup_config.ex b/lib/nerves_hub_link_common/fwup_config.ex index 5ea3af7..3deb2b1 100644 --- a/lib/nerves_hub_link_common/fwup_config.ex +++ b/lib/nerves_hub_link_common/fwup_config.ex @@ -8,7 +8,8 @@ defmodule NervesHubLinkCommon.FwupConfig do defstruct fwup_public_keys: [], fwup_devpath: "/dev/mmcblk0", handle_fwup_message: nil, - update_available: nil + update_available: nil, + journal_location: "/tmp/" @typedoc """ `handle_fwup_message` will be called with this data @@ -33,6 +34,7 @@ defmodule NervesHubLinkCommon.FwupConfig do @type t :: %__MODULE__{ fwup_public_keys: [String.t()], fwup_devpath: Path.t(), + journal_location: Path.t(), handle_fwup_message: handle_fwup_message_fun, update_available: update_available_fun } @@ -44,6 +46,7 @@ defmodule NervesHubLinkCommon.FwupConfig do |> validate_fwup_devpath!() |> validate_handle_fwup_message!() |> validate_update_available!() + |> validate_journal_location!() end defp validate_fwup_public_keys!(%__MODULE__{fwup_public_keys: list} = args) when is_list(list), @@ -71,4 +74,10 @@ defmodule NervesHubLinkCommon.FwupConfig do defp validate_update_available!(%__MODULE__{}), do: raise(ArgumentError, message: "update_available function signature incorrect") + + defp validate_journal_location!(%__MODULE__{journal_location: loc} = args) when is_binary(loc), + do: args + + defp validate_journal_location!(%__MODULE__{}), + do: raise(ArgumentError, message: "invalid arg: journal_location") end diff --git a/lib/nerves_hub_link_common/journal.ex b/lib/nerves_hub_link_common/journal.ex new file mode 100644 index 0000000..82a8d03 --- /dev/null +++ b/lib/nerves_hub_link_common/journal.ex @@ -0,0 +1,101 @@ +defmodule NervesHubLinkCommon.Journal do + @moduledoc """ + Simple journaling structure backed by a file on the filesystem + + Stores data in chunks in the following format: + + <> + + as chunks are streamed with `save_chunk/2` the data is updated both on disk and + in the structure. This can be used to rehydrate stateful events after a reboot, such as + a firmware update for example. + + When opening an existing journal (done automatically if the journal exists), + the structure will validate all the chunks on disk, stopping on either + + * the first chunk to fail a hash check + * the end of the file + + In either case, the journal is valid to use at this point + """ + + defstruct [:fd, :content_length, :chunks] + + @type t :: %__MODULE__{ + fd: :file.fd(), + content_length: non_neg_integer(), + chunks: [binary()] + } + + @doc "Open or create a journal for this meta" + @spec open(Path.t()) :: {:ok, t()} | {:error, File.posix()} + def open(filename) do + with {:ok, fd} <- :file.open(filename, [:write, :read, :binary]), + {:ok, 0} <- :file.position(fd, 0), + {:ok, journal} <- validate_and_seek(%__MODULE__{fd: fd, content_length: 0, chunks: []}) do + {:ok, journal} + end + end + + @spec reload(Path.t()) :: {:ok, t()} | {:error, File.posix()} + def reload(filename) do + if File.exists?(filename) do + open(filename) + else + {:error, :enoent} + end + end + + @spec validate_and_seek(t()) :: {:ok, t()} | {:error, File.posix()} + def validate_and_seek(%__MODULE__{fd: fd, content_length: content_length} = journal) do + with {:ok, <>} <- :file.read(fd, 4), + {:ok, hash} <- :file.read(fd, 32), + {:ok, data} <- :file.read(fd, length), + {:hash, ^length, ^hash} <- {:hash, length, :crypto.hash(:sha256, data)} do + validate_and_seek(%__MODULE__{ + journal + | content_length: content_length + length, + chunks: journal.chunks ++ [data] + }) + else + # made it thru all chunks in the file + :eof -> + {:ok, journal} + + # hash check failed. rewind and break + {:hash, length, _} -> + rewind(journal, length + 32 + 4) + + {:error, posix} -> + {:error, posix} + end + end + + @spec rewind(t(), pos_integer()) :: {:ok, t()} | {:error, File.posix()} + def rewind(journal, length) do + with {:ok, _} <- :file.position(journal.fd, -length) do + {:ok, journal} + end + end + + @spec close(t()) :: :ok + def close(%__MODULE__{fd: fd} = _journal) do + :ok = :file.close(fd) + end + + @spec save_chunk(t(), iodata()) :: {:ok, t()} | {:error, File.posix()} + def save_chunk(%__MODULE__{fd: fd} = journal, data) when is_binary(data) do + hash = :crypto.hash(:sha256, data) + length = byte_size(data) + journal_entry = IO.iodata_to_binary([<>, hash, data]) + + with :ok <- :file.write(fd, journal_entry) do + {:ok, + %__MODULE__{ + journal + | chunks: journal.chunks ++ [data], + content_length: journal.content_length + length + }} + end + end +end diff --git a/lib/nerves_hub_link_common/update_manager.ex b/lib/nerves_hub_link_common/update_manager.ex index 5d2b2cc..c7b7b1b 100644 --- a/lib/nerves_hub_link_common/update_manager.ex +++ b/lib/nerves_hub_link_common/update_manager.ex @@ -15,7 +15,8 @@ defmodule NervesHubLinkCommon.UpdateManager do Downloader, Downloader.RetryConfig, FwupConfig, - UpdateAvailable + UpdateAvailable, + Journal } defmodule State do @@ -37,7 +38,8 @@ defmodule NervesHubLinkCommon.UpdateManager do download: nil | GenServer.server(), fwup: nil | GenServer.server(), fwup_config: FwupConfig.t(), - retry_config: RetryConfig.t() + retry_config: RetryConfig.t(), + journal: nil | Journal.t() } @type download_started :: %__MODULE__{ @@ -45,7 +47,8 @@ defmodule NervesHubLinkCommon.UpdateManager do update_reschedule_timer: nil, download: GenServer.server(), fwup: GenServer.server(), - fwup_config: FwupConfig.t() + fwup_config: FwupConfig.t(), + journal: Journal.t() } @type download_rescheduled :: %__MODULE__{ @@ -61,7 +64,8 @@ defmodule NervesHubLinkCommon.UpdateManager do fwup: nil, download: nil, fwup_config: nil, - retry_config: nil + retry_config: nil, + journal: nil end @doc """ @@ -108,6 +112,13 @@ defmodule NervesHubLinkCommon.UpdateManager do {:ok, %State{fwup_config: fwup_config, retry_config: retry_config}} end + @impl GenServer + def terminate(_, %State{journal: journal}) do + if journal do + Journal.close(journal) + end + end + @impl GenServer def handle_call({:apply_update, %UpdateAvailable{} = update}, _from, %State{} = state) do state = maybe_update_firmware(update, state) @@ -158,8 +169,16 @@ defmodule NervesHubLinkCommon.UpdateManager do # Data from the downloader is sent to fwup def handle_info({:download, {:data, data}}, state) do - _ = Fwup.Stream.send_chunk(state.fwup, data) - {:noreply, state} + with {:ok, journal} <- Journal.save_chunk(state.journal, data), + :ok <- Fwup.Stream.send_chunk(state.fwup, data) do + {:noreply, %{state | journal: journal}} + else + {:error, posix} -> + Logger.error("[NervesHubLink] Error journaling download data: #{posix}") + # forced to stop here because the journal and fwup stream need to remain in sync + # for the journaling backup system to work + {:stop, state} + end end @spec maybe_update_firmware(UpdateAvailable.t(), State.t()) :: @@ -208,15 +227,28 @@ defmodule NervesHubLinkCommon.UpdateManager do end @spec start_fwup_stream(UpdateAvailable.t(), State.t()) :: State.download_started() - defp start_fwup_stream(%UpdateAvailable{} = update, state) do + defp start_fwup_stream( + %UpdateAvailable{firmware_meta: %UpdateAvailable.FirmwareMetadata{uuid: uuid}} = update, + state + ) do pid = self() fun = &send(pid, {:download, &1}) with {:ok, fwup} <- Fwup.stream(pid, fwup_args(state.fwup_config)), + {:ok, journal} <- Journal.open(Path.join(state.fwup_config.journal_location, uuid)), + :ok <- sync_fwup_with_journal(fwup, journal.chunks), {:ok, download} <- - Downloader.start_download(update.firmware_url, fun, state.retry_config) do + Downloader.start_download(update.firmware_url, fun, %{ + state.retry_config + | content_length: journal.content_length + }) do Logger.info("[NervesHubLink] Downloading firmware: #{update.firmware_url}") - %State{state | status: {:updating, 0}, download: download, fwup: fwup} + %State{state | status: {:updating, 0}, download: download, fwup: fwup, journal: journal} + else + # not sure how to handle error case right now... + error -> + Logger.error("Failed to start fwup! #{inspect(error)}") + state end end @@ -228,4 +260,14 @@ defmodule NervesHubLinkCommon.UpdateManager do args ++ ["--public-key", public_key] end) end + + # Takes all the data form the journal and catches FWUP so that streamed messages + # will continue to be in sync + @spec sync_fwup_with_journal(GenServer.server(), [binary()]) :: :ok | no_return() + defp sync_fwup_with_journal(fwup, [chunk | rest]) do + :ok = Fwup.send_chunk(fwup, chunk) + sync_fwup_with_journal(fwup, rest) + end + + defp sync_fwup_with_journal(_fwup, []), do: :ok end diff --git a/test/nerves_hub_link_common/journal_test.exs b/test/nerves_hub_link_common/journal_test.exs new file mode 100644 index 0000000..dda165b --- /dev/null +++ b/test/nerves_hub_link_common/journal_test.exs @@ -0,0 +1,32 @@ +defmodule NervesHubLinkCommon.JournalTest do + use ExUnit.Case + + alias NervesHubLinkCommon.Journal + + setup do + {:ok, [path: "/tmp/#{System.unique_integer([:positive])}.journal"]} + end + + test "journals data to the filesystem", %{path: path} do + {:ok, journal} = Journal.open(path) + {:ok, journal1} = Journal.save_chunk(journal, "hello") + assert journal1.content_length == byte_size("hello") + assert "hello" in journal1.chunks + + {:ok, journal2} = Journal.save_chunk(journal1, "world") + :ok = Journal.close(journal2) + + {:ok, journal} = Journal.open(path) + assert journal.chunks == ["hello", "world"] + end + + test "stops when journal chunk hashes don't match", %{path: path} do + hash = :crypto.hash(:sha256, "hello") + :ok = File.write!(path, [<<5::32>>, hash, "hello"]) + :ok = File.write!(path, [<<5::32>>, <<0::32>>, "world"], [:append]) + {:ok, journal} = Journal.open(path) + assert journal.content_length == 5 + assert "hello" in journal.chunks + refute "world" in journal.chunks + end +end diff --git a/test/nerves_hub_link_common/update_manager_test.exs b/test/nerves_hub_link_common/update_manager_test.exs index 7a712d5..114d48a 100644 --- a/test/nerves_hub_link_common/update_manager_test.exs +++ b/test/nerves_hub_link_common/update_manager_test.exs @@ -9,7 +9,15 @@ defmodule NervesHubLinkCommon.UpdateManagerTest do setup do port = 5000 devpath = "/tmp/fwup_output" - update_payload = %{"firmware_url" => "http://localhost:#{port}/test.fw"} + + update_payload = %{ + "update_available" => true, + "firmware_url" => "http://localhost:#{port}/test.fw", + "firmware_meta" => %{ + # UUID technically the only field required here. + "uuid" => "db5cd77f-1491-444a-af36-020de700b2de" + } + } {:ok, plug} = start_supervised(