Skip to content

Commit

Permalink
Implement simple Journaling system
Browse files Browse the repository at this point in the history
This enables fwupdates to resume even after a reboot
  • Loading branch information
ConnorRigby committed Feb 10, 2021
1 parent db64d19 commit c3d2b35
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 15 deletions.
12 changes: 10 additions & 2 deletions lib/nerves_hub_link_common/downloader/retry_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ defmodule NervesHubLinkCommon.Downloader.RetryConfig do
# worst case average download speed in bits/second
# This is used to calculate a "sensible" timeout that is shorter than `max_timeout`.
# LTE Cat M1 modems sometimes top out at 32 kbps (30 kbps for some slack)
worst_case_download_speed: 30_000
worst_case_download_speed: 30_000,
content_length: 0
]

@typedoc """
Expand Down Expand Up @@ -59,11 +60,18 @@ defmodule NervesHubLinkCommon.Downloader.RetryConfig do
"""
@type worst_case_download_speed :: non_neg_integer()

@typedoc """
Gets translated into a range request header.
Useful for restarting downloads from a specific position
"""
@type content_length() :: non_neg_integer()

@type t :: %__MODULE__{
max_disconnects: max_disconnects(),
idle_timeout: idle_timeout(),
max_timeout: max_timeout(),
time_between_retries: time_between_retries(),
worst_case_download_speed: worst_case_download_speed()
worst_case_download_speed: worst_case_download_speed(),
content_length: content_length()
}
end
11 changes: 10 additions & 1 deletion lib/nerves_hub_link_common/fwup_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ defmodule NervesHubLinkCommon.FwupConfig do
defstruct fwup_public_keys: [],
fwup_devpath: "/dev/mmcblk0",
handle_fwup_message: nil,
update_available: nil
update_available: nil,
journal_location: "/tmp/"

@typedoc """
`handle_fwup_message` will be called with this data
Expand All @@ -33,6 +34,7 @@ defmodule NervesHubLinkCommon.FwupConfig do
@type t :: %__MODULE__{
fwup_public_keys: [String.t()],
fwup_devpath: Path.t(),
journal_location: Path.t(),
handle_fwup_message: handle_fwup_message_fun,
update_available: update_available_fun
}
Expand All @@ -44,6 +46,7 @@ defmodule NervesHubLinkCommon.FwupConfig do
|> validate_fwup_devpath!()
|> validate_handle_fwup_message!()
|> validate_update_available!()
|> validate_journal_location!()
end

defp validate_fwup_public_keys!(%__MODULE__{fwup_public_keys: list} = args) when is_list(list),
Expand Down Expand Up @@ -71,4 +74,10 @@ defmodule NervesHubLinkCommon.FwupConfig do

defp validate_update_available!(%__MODULE__{}),
do: raise(ArgumentError, message: "update_available function signature incorrect")

defp validate_journal_location!(%__MODULE__{journal_location: loc} = args) when is_binary(loc),
do: args

defp validate_journal_location!(%__MODULE__{}),
do: raise(ArgumentError, message: "invalid arg: journal_location")
end
101 changes: 101 additions & 0 deletions lib/nerves_hub_link_common/journal.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
defmodule NervesHubLinkCommon.Journal do
@moduledoc """
Simple journaling structure backed by a file on the filesystem
Stores data in chunks in the following format:
<<length::32, hash::binary-size(32)-unit(8), data::binary-size(length)-unit(8)>>
as chunks are streamed with `save_chunk/2` the data is updated both on disk and
in the structure. This can be used to rehydrate stateful events after a reboot, such as
a firmware update for example.
When opening an existing journal (done automatically if the journal exists),
the structure will validate all the chunks on disk, stopping on either
* the first chunk to fail a hash check
* the end of the file
In either case, the journal is valid to use at this point
"""

defstruct [:fd, :content_length, :chunks]

@type t :: %__MODULE__{
fd: :file.fd(),
content_length: non_neg_integer(),
chunks: [binary()]
}

@doc "Open or create a journal for this meta"
@spec open(Path.t()) :: {:ok, t()} | {:error, File.posix()}
def open(filename) do
with {:ok, fd} <- :file.open(filename, [:write, :read, :binary]),
{:ok, 0} <- :file.position(fd, 0),
{:ok, journal} <- validate_and_seek(%__MODULE__{fd: fd, content_length: 0, chunks: []}) do
{:ok, journal}
end
end

@spec reload(Path.t()) :: {:ok, t()} | {:error, File.posix()}
def reload(filename) do
if File.exists?(filename) do
open(filename)
else
{:error, :enoent}
end
end

@spec validate_and_seek(t()) :: {:ok, t()} | {:error, File.posix()}
def validate_and_seek(%__MODULE__{fd: fd, content_length: content_length} = journal) do
with {:ok, <<length::32>>} <- :file.read(fd, 4),
{:ok, hash} <- :file.read(fd, 32),
{:ok, data} <- :file.read(fd, length),
{:hash, ^length, ^hash} <- {:hash, length, :crypto.hash(:sha256, data)} do
validate_and_seek(%__MODULE__{
journal
| content_length: content_length + length,
chunks: journal.chunks ++ [data]
})
else
# made it thru all chunks in the file
:eof ->
{:ok, journal}

# hash check failed. rewind and break
{:hash, length, _} ->
rewind(journal, length + 32 + 4)

{:error, posix} ->
{:error, posix}
end
end

@spec rewind(t(), pos_integer()) :: {:ok, t()} | {:error, File.posix()}
def rewind(journal, length) do
with {:ok, _} <- :file.position(journal.fd, -length) do
{:ok, journal}
end
end

@spec close(t()) :: :ok
def close(%__MODULE__{fd: fd} = _journal) do
:ok = :file.close(fd)
end

@spec save_chunk(t(), iodata()) :: {:ok, t()} | {:error, File.posix()}
def save_chunk(%__MODULE__{fd: fd} = journal, data) when is_binary(data) do
hash = :crypto.hash(:sha256, data)
length = byte_size(data)
journal_entry = IO.iodata_to_binary([<<length::32>>, hash, data])

with :ok <- :file.write(fd, journal_entry) do
{:ok,
%__MODULE__{
journal
| chunks: journal.chunks ++ [data],
content_length: journal.content_length + length
}}
end
end
end
3 changes: 1 addition & 2 deletions lib/nerves_hub_link_common/update_available.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ defmodule NervesHubLinkCommon.UpdateAvailable do
author: String.t() | nil,
description: String.t() | nil,
fwup_version: Version.build() | nil,
id: Ecto.UUID.t(),
misc: String.t() | nil,
platform: String.t(),
product: String.t(),
uuid: Ecto.UUID.t(),
uuid: String.t(),
vcs_identifier: String.t() | nil,
version: Version.build()
}
Expand Down
60 changes: 51 additions & 9 deletions lib/nerves_hub_link_common/update_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ defmodule NervesHubLinkCommon.UpdateManager do
Downloader,
Downloader.RetryConfig,
FwupConfig,
UpdateAvailable
UpdateAvailable,
Journal
}

defmodule State do
Expand All @@ -37,15 +38,17 @@ defmodule NervesHubLinkCommon.UpdateManager do
download: nil | GenServer.server(),
fwup: nil | GenServer.server(),
fwup_config: FwupConfig.t(),
retry_config: RetryConfig.t()
retry_config: RetryConfig.t(),
journal: nil | Journal.t()
}

@type download_started :: %__MODULE__{
status: {:updating, integer()} | {:fwup_error, String.t()},
update_reschedule_timer: nil,
download: GenServer.server(),
fwup: GenServer.server(),
fwup_config: FwupConfig.t()
fwup_config: FwupConfig.t(),
journal: Journal.t()
}

@type download_rescheduled :: %__MODULE__{
Expand All @@ -61,7 +64,8 @@ defmodule NervesHubLinkCommon.UpdateManager do
fwup: nil,
download: nil,
fwup_config: nil,
retry_config: nil
retry_config: nil,
journal: nil
end

@doc """
Expand Down Expand Up @@ -108,6 +112,13 @@ defmodule NervesHubLinkCommon.UpdateManager do
{:ok, %State{fwup_config: fwup_config, retry_config: retry_config}}
end

@impl GenServer
def terminate(_, %State{journal: journal}) do
if journal do
Journal.close(journal)
end
end

@impl GenServer
def handle_call({:apply_update, %UpdateAvailable{} = update}, _from, %State{} = state) do
state = maybe_update_firmware(update, state)
Expand Down Expand Up @@ -158,8 +169,16 @@ defmodule NervesHubLinkCommon.UpdateManager do

# Data from the downloader is sent to fwup
def handle_info({:download, {:data, data}}, state) do
_ = Fwup.Stream.send_chunk(state.fwup, data)
{:noreply, state}
with {:ok, journal} <- Journal.save_chunk(state.journal, data),
:ok <- Fwup.Stream.send_chunk(state.fwup, data) do
{:noreply, %{state | journal: journal}}
else
{:error, posix} ->
Logger.error("[NervesHubLink] Error journaling download data: #{posix}")
# forced to stop here because the journal and fwup stream need to remain in sync
# for the journaling backup system to work
{:stop, state}
end
end

@spec maybe_update_firmware(UpdateAvailable.t(), State.t()) ::
Expand Down Expand Up @@ -208,15 +227,28 @@ defmodule NervesHubLinkCommon.UpdateManager do
end

@spec start_fwup_stream(UpdateAvailable.t(), State.t()) :: State.download_started()
defp start_fwup_stream(%UpdateAvailable{} = update, state) do
defp start_fwup_stream(
%UpdateAvailable{firmware_meta: %UpdateAvailable.FirmwareMetadata{uuid: uuid}} = update,
state
) do
pid = self()
fun = &send(pid, {:download, &1})

with {:ok, fwup} <- Fwup.stream(pid, fwup_args(state.fwup_config)),
{:ok, journal} <- Journal.open(Path.join(state.fwup_config.journal_location, uuid)),
:ok <- sync_fwup_with_journal(fwup, journal.chunks),
{:ok, download} <-
Downloader.start_download(update.firmware_url, fun, state.retry_config) do
Downloader.start_download(update.firmware_url, fun, %{
state.retry_config
| content_length: journal.content_length
}) do
Logger.info("[NervesHubLink] Downloading firmware: #{update.firmware_url}")
%State{state | status: {:updating, 0}, download: download, fwup: fwup}
%State{state | status: {:updating, 0}, download: download, fwup: fwup, journal: journal}
else
# not sure how to handle error case right now...
error ->
Logger.error("Failed to start fwup! #{inspect(error)}")
state
end
end

Expand All @@ -228,4 +260,14 @@ defmodule NervesHubLinkCommon.UpdateManager do
args ++ ["--public-key", public_key]
end)
end

# Takes all the data form the journal and catches FWUP so that streamed messages
# will continue to be in sync
@spec sync_fwup_with_journal(GenServer.server(), [binary()]) :: :ok | no_return()
defp sync_fwup_with_journal(fwup, [chunk | rest]) do
:ok = Fwup.send_chunk(fwup, chunk)
sync_fwup_with_journal(fwup, rest)
end

defp sync_fwup_with_journal(_fwup, []), do: :ok
end
32 changes: 32 additions & 0 deletions test/nerves_hub_link_common/journal_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule NervesHubLinkCommon.JournalTest do
use ExUnit.Case

alias NervesHubLinkCommon.Journal

setup do
{:ok, [path: "/tmp/#{System.unique_integer([:positive])}.journal"]}
end

test "journals data to the filesystem", %{path: path} do
{:ok, journal} = Journal.open(path)
{:ok, journal1} = Journal.save_chunk(journal, "hello")
assert journal1.content_length == byte_size("hello")
assert "hello" in journal1.chunks

{:ok, journal2} = Journal.save_chunk(journal1, "world")
:ok = Journal.close(journal2)

{:ok, journal} = Journal.open(path)
assert journal.chunks == ["hello", "world"]
end

test "stops when journal chunk hashes don't match", %{path: path} do
hash = :crypto.hash(:sha256, "hello")
:ok = File.write!(path, [<<5::32>>, hash, "hello"])
:ok = File.write!(path, [<<5::32>>, <<0::32>>, "world"], [:append])
{:ok, journal} = Journal.open(path)
assert journal.content_length == 5
assert "hello" in journal.chunks
refute "world" in journal.chunks
end
end
10 changes: 9 additions & 1 deletion test/nerves_hub_link_common/update_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@ defmodule NervesHubLinkCommon.UpdateManagerTest do
setup do
port = 5000
devpath = "/tmp/fwup_output"
update_payload = %{"firmware_url" => "http://localhost:#{port}/test.fw"}

update_payload = %{
"update_available" => true,
"firmware_url" => "http://localhost:#{port}/test.fw",
"firmware_meta" => %{
# UUID technically the only field required here.
"uuid" => "db5cd77f-1491-444a-af36-020de700b2de"
}
}

{:ok, plug} =
start_supervised(
Expand Down

0 comments on commit c3d2b35

Please sign in to comment.