diff --git a/lib/nerves_hub_link_common/downloader.ex b/lib/nerves_hub_link_common/downloader.ex index 8fce5d0..e269a7e 100644 --- a/lib/nerves_hub_link_common/downloader.ex +++ b/lib/nerves_hub_link_common/downloader.ex @@ -18,7 +18,11 @@ defmodule NervesHubLinkCommon.Downloader do require Logger use GenServer - alias NervesHubLinkCommon.{Downloader, Downloader.RetryConfig, Downloader.TimeoutCalculation} + alias NervesHubLinkCommon.{ + Downloader, + Downloader.RetryConfig, + Downloader.TimeoutCalculation + } defstruct uri: nil, conn: nil, @@ -91,11 +95,8 @@ defmodule NervesHubLinkCommon.Downloader do iex> flush() :complete """ - @spec start_download(String.t() | URI.t(), event_handler_fun()) :: GenServer.on_start() - def start_download(url, fun) when is_function(fun, 1) do - GenServer.start_link(__MODULE__, [URI.parse(url), fun, %RetryConfig{}]) - end - + @spec start_download(String.t() | URI.t(), event_handler_fun(), RetryConfig.t()) :: + GenServer.on_start() def start_download(url, fun, %RetryConfig{} = retry_args) when is_function(fun, 1) do GenServer.start_link(__MODULE__, [URI.parse(url), fun, retry_args]) end diff --git a/lib/nerves_hub_link_common/downloader/retry_config.ex b/lib/nerves_hub_link_common/downloader/retry_config.ex index 118991b..bf6bcfe 100644 --- a/lib/nerves_hub_link_common/downloader/retry_config.ex +++ b/lib/nerves_hub_link_common/downloader/retry_config.ex @@ -24,7 +24,8 @@ defmodule NervesHubLinkCommon.Downloader.RetryConfig do # worst case average download speed in bits/second # This is used to calculate a "sensible" timeout that is shorter than `max_timeout`. # LTE Cat M1 modems sometimes top out at 32 kbps (30 kbps for some slack) - worst_case_download_speed: 30_000 + worst_case_download_speed: 30_000, + content_length: 0 ] @typedoc """ @@ -59,11 +60,18 @@ defmodule NervesHubLinkCommon.Downloader.RetryConfig do """ @type worst_case_download_speed :: non_neg_integer() + @typedoc """ + Gets translated into a range request header. + Useful for restarting downloads from a specific position + """ + @type content_length() :: non_neg_integer() + @type t :: %__MODULE__{ max_disconnects: max_disconnects(), idle_timeout: idle_timeout(), max_timeout: max_timeout(), time_between_retries: time_between_retries(), - worst_case_download_speed: worst_case_download_speed() + worst_case_download_speed: worst_case_download_speed(), + content_length: content_length() } end diff --git a/lib/nerves_hub_link_common/fwup_config.ex b/lib/nerves_hub_link_common/fwup_config.ex index 5e56972..434fc0a 100644 --- a/lib/nerves_hub_link_common/fwup_config.ex +++ b/lib/nerves_hub_link_common/fwup_config.ex @@ -3,10 +3,13 @@ defmodule NervesHubLinkCommon.FwupConfig do Config structure responsible for handling callbacks from FWUP, applying a fwupdate, and storing fwup task configuration """ + alias NervesHubLinkCommon.UpdatePayload + defstruct fwup_public_keys: [], fwup_devpath: "/dev/mmcblk0", handle_fwup_message: nil, - update_available: nil + update_available: nil, + journal_location: "/tmp/" @typedoc """ `handle_fwup_message` will be called with this data @@ -25,11 +28,13 @@ defmodule NervesHubLinkCommon.FwupConfig do @typedoc """ Called when an update has been dispatched via `NervesHubLinkCommon.UpdateManager.apply_update/2` """ - @type update_available_fun() :: (map() -> :ignore | {:reschedule, timeout()} | :apply) + @type update_available_fun() :: + (UpdatePayload.t() -> :ignore | {:reschedule, timeout()} | :apply) @type t :: %__MODULE__{ fwup_public_keys: [String.t()], fwup_devpath: Path.t(), + journal_location: Path.t(), handle_fwup_message: handle_fwup_message_fun, update_available: update_available_fun } @@ -41,6 +46,7 @@ defmodule NervesHubLinkCommon.FwupConfig do |> validate_fwup_devpath!() |> validate_handle_fwup_message!() |> validate_update_available!() + |> validate_journal_location!() end defp validate_fwup_public_keys!(%__MODULE__{fwup_public_keys: list} = args) when is_list(list), @@ -68,4 +74,10 @@ defmodule NervesHubLinkCommon.FwupConfig do defp validate_update_available!(%__MODULE__{}), do: raise(ArgumentError, message: "update_available function signature incorrect") + + defp validate_journal_location!(%__MODULE__{journal_location: loc} = args) when is_binary(loc), + do: args + + defp validate_journal_location!(%__MODULE__{}), + do: raise(ArgumentError, message: "invalid arg: journal_location") end diff --git a/lib/nerves_hub_link_common/journal.ex b/lib/nerves_hub_link_common/journal.ex new file mode 100644 index 0000000..82a8d03 --- /dev/null +++ b/lib/nerves_hub_link_common/journal.ex @@ -0,0 +1,101 @@ +defmodule NervesHubLinkCommon.Journal do + @moduledoc """ + Simple journaling structure backed by a file on the filesystem + + Stores data in chunks in the following format: + + <> + + as chunks are streamed with `save_chunk/2` the data is updated both on disk and + in the structure. This can be used to rehydrate stateful events after a reboot, such as + a firmware update for example. + + When opening an existing journal (done automatically if the journal exists), + the structure will validate all the chunks on disk, stopping on either + + * the first chunk to fail a hash check + * the end of the file + + In either case, the journal is valid to use at this point + """ + + defstruct [:fd, :content_length, :chunks] + + @type t :: %__MODULE__{ + fd: :file.fd(), + content_length: non_neg_integer(), + chunks: [binary()] + } + + @doc "Open or create a journal for this meta" + @spec open(Path.t()) :: {:ok, t()} | {:error, File.posix()} + def open(filename) do + with {:ok, fd} <- :file.open(filename, [:write, :read, :binary]), + {:ok, 0} <- :file.position(fd, 0), + {:ok, journal} <- validate_and_seek(%__MODULE__{fd: fd, content_length: 0, chunks: []}) do + {:ok, journal} + end + end + + @spec reload(Path.t()) :: {:ok, t()} | {:error, File.posix()} + def reload(filename) do + if File.exists?(filename) do + open(filename) + else + {:error, :enoent} + end + end + + @spec validate_and_seek(t()) :: {:ok, t()} | {:error, File.posix()} + def validate_and_seek(%__MODULE__{fd: fd, content_length: content_length} = journal) do + with {:ok, <>} <- :file.read(fd, 4), + {:ok, hash} <- :file.read(fd, 32), + {:ok, data} <- :file.read(fd, length), + {:hash, ^length, ^hash} <- {:hash, length, :crypto.hash(:sha256, data)} do + validate_and_seek(%__MODULE__{ + journal + | content_length: content_length + length, + chunks: journal.chunks ++ [data] + }) + else + # made it thru all chunks in the file + :eof -> + {:ok, journal} + + # hash check failed. rewind and break + {:hash, length, _} -> + rewind(journal, length + 32 + 4) + + {:error, posix} -> + {:error, posix} + end + end + + @spec rewind(t(), pos_integer()) :: {:ok, t()} | {:error, File.posix()} + def rewind(journal, length) do + with {:ok, _} <- :file.position(journal.fd, -length) do + {:ok, journal} + end + end + + @spec close(t()) :: :ok + def close(%__MODULE__{fd: fd} = _journal) do + :ok = :file.close(fd) + end + + @spec save_chunk(t(), iodata()) :: {:ok, t()} | {:error, File.posix()} + def save_chunk(%__MODULE__{fd: fd} = journal, data) when is_binary(data) do + hash = :crypto.hash(:sha256, data) + length = byte_size(data) + journal_entry = IO.iodata_to_binary([<>, hash, data]) + + with :ok <- :file.write(fd, journal_entry) do + {:ok, + %__MODULE__{ + journal + | chunks: journal.chunks ++ [data], + content_length: journal.content_length + length + }} + end + end +end diff --git a/lib/nerves_hub_link_common/update_manager.ex b/lib/nerves_hub_link_common/update_manager.ex index 0f3f9d6..87c2e24 100644 --- a/lib/nerves_hub_link_common/update_manager.ex +++ b/lib/nerves_hub_link_common/update_manager.ex @@ -10,7 +10,14 @@ defmodule NervesHubLinkCommon.UpdateManager do require Logger use GenServer - alias NervesHubLinkCommon.{FwupConfig, Downloader} + + alias NervesHubLinkCommon.{ + Downloader, + Downloader.RetryConfig, + FwupConfig, + UpdatePayload, + Journal + } defmodule State do @moduledoc """ @@ -30,7 +37,9 @@ defmodule NervesHubLinkCommon.UpdateManager do update_reschedule_timer: nil | :timer.tref(), download: nil | GenServer.server(), fwup: nil | GenServer.server(), - fwup_config: FwupConfig.t() + fwup_config: FwupConfig.t(), + retry_config: RetryConfig.t(), + journal: nil | Journal.t() } @type download_started :: %__MODULE__{ @@ -38,7 +47,8 @@ defmodule NervesHubLinkCommon.UpdateManager do update_reschedule_timer: nil, download: GenServer.server(), fwup: GenServer.server(), - fwup_config: FwupConfig.t() + fwup_config: FwupConfig.t(), + journal: Journal.t() } @type download_rescheduled :: %__MODULE__{ @@ -53,7 +63,9 @@ defmodule NervesHubLinkCommon.UpdateManager do update_reschedule_timer: nil, fwup: nil, download: nil, - fwup_config: nil + fwup_config: nil, + retry_config: nil, + journal: nil end @doc """ @@ -61,8 +73,9 @@ defmodule NervesHubLinkCommon.UpdateManager do NervesHub. the map must contain a `"firmware_url"` key. """ @spec apply_update(GenServer.server(), map()) :: State.status() - def apply_update(manager \\ __MODULE__, %{"firmware_url" => _} = update) do - GenServer.call(manager, {:apply_update, update}) + def apply_update(manager \\ __MODULE__, %{"firmware_url" => _} = params) do + update_available = UpdatePayload.parse(params) + GenServer.call(manager, {:apply_update, update_available}) end @doc """ @@ -74,26 +87,40 @@ defmodule NervesHubLinkCommon.UpdateManager do end @doc false - def child_spec(%FwupConfig{} = args) do + def child_spec(%FwupConfig{} = fwup_config) do + %{ + start: {__MODULE__, :start_link, [fwup_config, %RetryConfig{}, [name: __MODULE__]]}, + id: __MODULE__ + } + end + + def child_spec(%FwupConfig{} = fwup_config, %RetryConfig{} = retry_config) do %{ - start: {__MODULE__, :start_link, [args, [name: __MODULE__]]}, + start: {__MODULE__, :start_link, [fwup_config, retry_config, [name: __MODULE__]]}, id: __MODULE__ } end @doc false - def start_link(%FwupConfig{} = args, opts \\ []) do - GenServer.start_link(__MODULE__, args, opts) + def start_link(%FwupConfig{} = fwup_config, %RetryConfig{} = retry_config, opts \\ []) do + GenServer.start_link(__MODULE__, {fwup_config, retry_config}, opts) end @impl GenServer - def init(%FwupConfig{} = fwup_config) do + def init({%FwupConfig{} = fwup_config, %RetryConfig{} = retry_config}) do fwup_config = FwupConfig.validate!(fwup_config) - {:ok, %State{fwup_config: fwup_config}} + {:ok, %State{fwup_config: fwup_config, retry_config: retry_config}} + end + + @impl GenServer + def terminate(_, %State{journal: journal}) do + if journal do + Journal.close(journal) + end end @impl GenServer - def handle_call({:apply_update, update}, _from, %State{} = state) do + def handle_call({:apply_update, %UpdatePayload{} = update}, _from, %State{} = state) do state = maybe_update_firmware(update, state) {:reply, state.status, state} end @@ -142,12 +169,25 @@ defmodule NervesHubLinkCommon.UpdateManager do # Data from the downloader is sent to fwup def handle_info({:download, {:data, data}}, state) do - _ = Fwup.Stream.send_chunk(state.fwup, data) - {:noreply, state} + with {:ok, journal} <- Journal.save_chunk(state.journal, data), + :ok <- Fwup.Stream.send_chunk(state.fwup, data) do + {:noreply, %{state | journal: journal}} + else + {:error, posix} -> + Logger.error("[NervesHubLink] Error journaling download data: #{posix}") + # forced to stop here because the journal and fwup stream need to remain in sync + # for the journaling backup system to work + {:stop, state} + end end - @spec maybe_update_firmware(map(), State.t()) :: + @spec maybe_update_firmware(UpdatePayload.t(), State.t()) :: State.download_started() | State.download_rescheduled() | State.t() + defp maybe_update_firmware(%UpdatePayload{update_available: false}, %State{} = state) do + # if the `update_available` key is false, bail early. There is no update + state + end + defp maybe_update_firmware(_data, %State{status: {:updating, _percent}} = state) do # Received an update message from NervesHub, but we're already in progress. # It could be because the deployment/device was edited making a duplicate @@ -158,7 +198,7 @@ defmodule NervesHubLinkCommon.UpdateManager do state end - defp maybe_update_firmware(%{"firmware_url" => _url} = data, %State{} = state) do + defp maybe_update_firmware(%UpdatePayload{} = data, %State{} = state) do # Cancel an existing timer if it exists. # This prevents rescheduled updates` # from compounding. @@ -191,14 +231,30 @@ defmodule NervesHubLinkCommon.UpdateManager do %{state | update_reschedule_timer: nil} end - @spec start_fwup_stream(map(), State.t()) :: State.download_started() - defp start_fwup_stream(%{"firmware_url" => url}, state) do + @spec start_fwup_stream(UpdatePayload.t(), State.t()) :: State.download_started() + defp start_fwup_stream( + %UpdatePayload{firmware_meta: %UpdatePayload.FirmwareMetadata{uuid: uuid}} = update, + state + ) do pid = self() fun = &send(pid, {:download, &1}) - {:ok, download} = Downloader.start_download(url, fun) - {:ok, fwup} = Fwup.stream(pid, fwup_args(state.fwup_config)) - Logger.info("[NervesHubLink] Downloading firmware: #{url}") - %State{state | status: {:updating, 0}, download: download, fwup: fwup} + + with {:ok, fwup} <- Fwup.stream(pid, fwup_args(state.fwup_config)), + {:ok, journal} <- Journal.open(Path.join(state.fwup_config.journal_location, uuid)), + :ok <- sync_fwup_with_journal(fwup, journal.chunks), + {:ok, download} <- + Downloader.start_download(update.firmware_url, fun, %{ + state.retry_config + | content_length: journal.content_length + }) do + Logger.info("[NervesHubLink] Downloading firmware: #{update.firmware_url}") + %State{state | status: {:updating, 0}, download: download, fwup: fwup, journal: journal} + else + # not sure how to handle error case right now... + error -> + Logger.error("Failed to start fwup! #{inspect(error)}") + state + end end @spec fwup_args(FwupConfig.t()) :: [String.t()] @@ -209,4 +265,14 @@ defmodule NervesHubLinkCommon.UpdateManager do args ++ ["--public-key", public_key] end) end + + # Takes all the data form the journal and catches FWUP so that streamed messages + # will continue to be in sync + @spec sync_fwup_with_journal(GenServer.server(), [binary()]) :: :ok | no_return() + defp sync_fwup_with_journal(fwup, [chunk | rest]) do + :ok = Fwup.send_chunk(fwup, chunk) + sync_fwup_with_journal(fwup, rest) + end + + defp sync_fwup_with_journal(_fwup, []), do: :ok end diff --git a/lib/nerves_hub_link_common/update_payload.ex b/lib/nerves_hub_link_common/update_payload.ex new file mode 100644 index 0000000..39dc09b --- /dev/null +++ b/lib/nerves_hub_link_common/update_payload.ex @@ -0,0 +1,84 @@ +defmodule NervesHubLinkCommon.UpdatePayload do + defmodule FirmwareMetadata do + @moduledoc "Metadata about an update" + defstruct [ + :architecture, + :author, + :description, + :fwup_version, + :id, + :misc, + :platform, + :product, + :uuid, + :vcs_identifier, + :version + ] + + @type t :: %__MODULE__{ + architecture: String.t(), + author: String.t() | nil, + description: String.t() | nil, + fwup_version: Version.build() | nil, + misc: String.t() | nil, + platform: String.t(), + product: String.t(), + uuid: String.t(), + vcs_identifier: String.t() | nil, + version: Version.build() + } + + @spec parse(map()) :: t() + def parse(params) do + %__MODULE__{ + architecture: params["architecture"], + author: params["author"], + description: params["description"], + fwup_version: params["fwup_version"], + id: params["id"], + misc: params["misc"], + platform: params["platform"], + product: params["product"], + uuid: params["uuid"], + vcs_identifier: params["vcs_identifier"], + version: params["version"] + } + end + end + + defstruct update_available: false, + firmware_url: nil, + firmware_meta: nil + + @typedoc """ + Payload that gets dispatched down to devices upon an update + + `firmware_url` and `firmware_meta` are only available + when `update_available` is true. + """ + @type t() :: %__MODULE__{ + update_available: boolean(), + firmware_url: String.t() | nil, + firmware_meta: FirmwareMetadata.t() | nil + } + + @doc "Validates the payload from NervesHub" + @spec parse(map()) :: t() + def parse(params) do + firmware_meta = + if params["update_available"] do + FirmwareMetadata.parse(params["firmware_meta"]) + end + + firmware_url = + if params["firmware_url"] do + URI.parse(params["firmware_url"]) + end + + %__MODULE__{ + update_available: params["update_available"], + firmware_url: firmware_url, + firmware_meta: firmware_meta + } + end +end diff --git a/test/nerves_hub_link_common/downloader_test.exs b/test/nerves_hub_link_common/downloader_test.exs index 65233d6..68b6135 100644 --- a/test/nerves_hub_link_common/downloader_test.exs +++ b/test/nerves_hub_link_common/downloader_test.exs @@ -139,7 +139,7 @@ defmodule NervesHubLinkCommon.DownloaderTest do test "follows redirects", %{url: url} do test_pid = self() handler_fun = &send(test_pid, &1) - {:ok, _download} = Downloader.start_download(url, handler_fun) + {:ok, _download} = Downloader.start_download(url, handler_fun, %RetryConfig{}) refute_receive {:error, _} assert_receive {:data, "redirected"} end diff --git a/test/nerves_hub_link_common/journal_test.exs b/test/nerves_hub_link_common/journal_test.exs new file mode 100644 index 0000000..dda165b --- /dev/null +++ b/test/nerves_hub_link_common/journal_test.exs @@ -0,0 +1,32 @@ +defmodule NervesHubLinkCommon.JournalTest do + use ExUnit.Case + + alias NervesHubLinkCommon.Journal + + setup do + {:ok, [path: "/tmp/#{System.unique_integer([:positive])}.journal"]} + end + + test "journals data to the filesystem", %{path: path} do + {:ok, journal} = Journal.open(path) + {:ok, journal1} = Journal.save_chunk(journal, "hello") + assert journal1.content_length == byte_size("hello") + assert "hello" in journal1.chunks + + {:ok, journal2} = Journal.save_chunk(journal1, "world") + :ok = Journal.close(journal2) + + {:ok, journal} = Journal.open(path) + assert journal.chunks == ["hello", "world"] + end + + test "stops when journal chunk hashes don't match", %{path: path} do + hash = :crypto.hash(:sha256, "hello") + :ok = File.write!(path, [<<5::32>>, hash, "hello"]) + :ok = File.write!(path, [<<5::32>>, <<0::32>>, "world"], [:append]) + {:ok, journal} = Journal.open(path) + assert journal.content_length == 5 + assert "hello" in journal.chunks + refute "world" in journal.chunks + end +end diff --git a/test/nerves_hub_link_common/update_manager_test.exs b/test/nerves_hub_link_common/update_manager_test.exs index 1813536..114d48a 100644 --- a/test/nerves_hub_link_common/update_manager_test.exs +++ b/test/nerves_hub_link_common/update_manager_test.exs @@ -3,11 +3,21 @@ defmodule NervesHubLinkCommon.UpdateManagerTest do alias NervesHubLinkCommon.{FwupConfig, UpdateManager} alias NervesHubLinkCommon.Support.FWUPStreamPlug + @retry_config %NervesHubLinkCommon.Downloader.RetryConfig{} + describe "fwup stream" do setup do port = 5000 devpath = "/tmp/fwup_output" - update_payload = %{"firmware_url" => "http://localhost:#{port}/test.fw"} + + update_payload = %{ + "update_available" => true, + "firmware_url" => "http://localhost:#{port}/test.fw", + "firmware_meta" => %{ + # UUID technically the only field required here. + "uuid" => "db5cd77f-1491-444a-af36-020de700b2de" + } + } {:ok, plug} = start_supervised( @@ -30,7 +40,7 @@ defmodule NervesHubLinkCommon.UpdateManagerTest do update_available: update_available_fun } - {:ok, manager} = UpdateManager.start_link(fwup_config) + {:ok, manager} = UpdateManager.start_link(fwup_config, @retry_config) assert UpdateManager.apply_update(manager, update_payload) == {:updating, 0} assert_receive {:fwup, {:progress, 0}} @@ -60,7 +70,7 @@ defmodule NervesHubLinkCommon.UpdateManagerTest do update_available: update_available_fun } - {:ok, manager} = UpdateManager.start_link(fwup_config) + {:ok, manager} = UpdateManager.start_link(fwup_config, @retry_config) assert UpdateManager.apply_update(manager, update_payload) == :update_rescheduled assert_received :rescheduled refute_received {:fwup, _}