From d11322cd8c90f677785d15ff5c51f493f4decf55 Mon Sep 17 00:00:00 2001 From: mrkaspa Date: Thu, 12 May 2016 15:53:09 -0500 Subject: [PATCH] Building the graph in progress --- lib/exstreme/gnode/behaviour.ex | 40 ++++++++++++++++++++++++ lib/exstreme/gnode/broadcast.ex | 7 +++++ lib/exstreme/gnode/common.ex | 8 +++++ lib/exstreme/gnode/funnel.ex | 7 +++++ lib/exstreme/graph.ex | 2 +- lib/exstreme/graph_builder.ex | 55 +++++++++++++++++++++++++++++++++ 6 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 lib/exstreme/gnode/behaviour.ex create mode 100644 lib/exstreme/gnode/broadcast.ex create mode 100644 lib/exstreme/gnode/common.ex create mode 100644 lib/exstreme/gnode/funnel.ex create mode 100644 lib/exstreme/graph_builder.ex diff --git a/lib/exstreme/gnode/behaviour.ex b/lib/exstreme/gnode/behaviour.ex new file mode 100644 index 0000000..9313e68 --- /dev/null +++ b/lib/exstreme/gnode/behaviour.ex @@ -0,0 +1,40 @@ +defmodule Exstreme.Gnode.Behaviour do + @moduledoc """ + """ + + defmodule Data do + @moduledoc """ + """ + alias __MODULE__ + + @type t :: %Data{next: [pid], pid: pid, func: (term) -> no_return, opts: [key: term]} + defstruct [next: [], pid: nil, func: nil, opts: []] + end + + def __using__ do + quote do + use GenServer + alias Exstreme.Gnode.Behaviour.Data + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, :ok, opts) + end + + def init(params = [func: func]) do + opts = Keyword.drop(params, [:func, :type]) + {:ok, %Data{func: func, pid: self, opts: opts}} + end + + def handle_cast({:connect, to_pid}, data) do + new_data = update_in(data.next, fn(next) -> [pid | next] end) + {:noreply, new_data} + end + + def send_next(next, msg) do + Enum.each(next, fn(pid) -> + GenServer.cast(pid, msg) + do) + end + end + end +end diff --git a/lib/exstreme/gnode/broadcast.ex b/lib/exstreme/gnode/broadcast.ex new file mode 100644 index 0000000..6bb557e --- /dev/null +++ b/lib/exstreme/gnode/broadcast.ex @@ -0,0 +1,7 @@ +defmodule Exstreme.GNode.Broadcast do + use Exstreme.GNode.Behaviour + + def handle_cast({:next, {_, msg}}, stats) do + send_next(stats.next, msg) + end +end diff --git a/lib/exstreme/gnode/common.ex b/lib/exstreme/gnode/common.ex new file mode 100644 index 0000000..6d91b67 --- /dev/null +++ b/lib/exstreme/gnode/common.ex @@ -0,0 +1,8 @@ +defmodule Exstreme.GNode.Common do + use Exstreme.GNode.Behaviour + + def handle_cast({:next, {_, msg} }, stats) do + {:ok, result} = stats.func.(msg) + send_next(stats.next, result) + end +end diff --git a/lib/exstreme/gnode/funnel.ex b/lib/exstreme/gnode/funnel.ex new file mode 100644 index 0000000..38ddfc1 --- /dev/null +++ b/lib/exstreme/gnode/funnel.ex @@ -0,0 +1,7 @@ +defmodule Exstreme.GNode.Funnel do + use Exstreme.GNode.Behaviour + + def handle_cast({:next, {from_data, msg}}, stats) do + end + +end diff --git a/lib/exstreme/graph.ex b/lib/exstreme/graph.ex index 48cec9c..29437fb 100644 --- a/lib/exstreme/graph.ex +++ b/lib/exstreme/graph.ex @@ -3,7 +3,7 @@ defmodule Exstreme.Graph do """ alias __MODULE__ - @type t :: %Graph{params: [key: term], nodes: %{key: atom}, connections: %{key: atom}} + @type t :: %Graph{params: [key: term], nodes: %{key: [key: term]}, connections: %{key: atom}} defstruct params: [], nodes: %{}, connections: %{} @doc """ diff --git a/lib/exstreme/graph_builder.ex b/lib/exstreme/graph_builder.ex new file mode 100644 index 0000000..b982e11 --- /dev/null +++ b/lib/exstreme/graph_builder.ex @@ -0,0 +1,55 @@ +defmodule Exstreme.GraphBuilder do + @moduledoc """ + """ + alias Exstreme.GNode.Broadcast + alias Exstreme.GNode.Funnel + alias Exstreme.GNode.Common + + @doc """ + """ + @spec build(Graph.t) :: Graph.t + def build(graph) do + new_graph = update_in(graph.nodes, &start_nodes/1) + connect_nodes(new_graph) + new_graph + end + + #private + + @spec start_nodes(%{key: [key: term]}) :: %{key: [key: term]} + defp start_nodes(nodes) do + nodes + |> Enum.map(&start_node/1) + |> Map.new + end + + @spec start_node({atom, [key: any]}) :: {atom, [key: any]} + defp start_node({node, params = [type: type]}) do + {:ok, pid} = + case type do + :broadcast -> Broadcast.start_link(params) + :funnel -> Funnel.start_link(params) + _ -> Common.start_link(params) + end + {node, Keyword.put(params, :pid, pid)} + end + + @spec connect_nodes(Graph.t) :: no_return + defp connect_nodes(%Graph{nodes: nodes, connections: connections}) do + Enum.each(connections, &connect_pair/1) + end + + @spec connect_pair({atom, [atom]}) :: no_return + defp connect_pair({from, to}) when is_list(to) do + Enum.map(1..Enum.count(to), fn(_) -> from end) + |> Enum.zip(to) + |> Enum.each(&connect_pair/1) + end + + @spec connect_pair({atom, atom}) :: no_return + defp connect_pair({from, to}) when is_atom(to) do + [pid: pid_from] = nodes[from] + [pid: pid_to] = nodes[to] + GenServer.cast(pid_from, {:connect, pid_to}) + end +end