diff --git a/lib/exstreme/gnode/behaviour.ex b/lib/exstreme/gnode/behaviour.ex index 9313e68..a23d006 100644 --- a/lib/exstreme/gnode/behaviour.ex +++ b/lib/exstreme/gnode/behaviour.ex @@ -1,40 +1,47 @@ -defmodule Exstreme.Gnode.Behaviour do +defmodule Exstreme.GNode.Behaviour do @moduledoc """ """ + defmacro __using__(_) do + quote do + use GenServer + import Exstreme.GNode.Behaviour + end + end + defmodule Data do @moduledoc """ """ alias __MODULE__ - @type t :: %Data{next: [pid], pid: pid, func: (term) -> no_return, opts: [key: term]} - defstruct [next: [], pid: nil, func: nil, opts: []] + @type graph_func :: ((term, Data.t) -> {:ok, term} | :error) + + @type t :: %Data{next: [pid], pid: pid, nid: atom, func: graph_func, funnel_queue: [term], received_counter: non_neg_integer, sent_counter: non_neg_integer, opts: [key: term]} + defstruct [next: [], pid: nil, nid: nil, func: nil, funnel_queue: [], received_counter: 0, sent_counter: 0, opts: []] + + #TODO use counters end - def __using__ do - quote do - use GenServer - alias Exstreme.Gnode.Behaviour.Data - - def start_link(opts \\ []) do - GenServer.start_link(__MODULE__, :ok, opts) - end - - def init(params = [func: func]) do - opts = Keyword.drop(params, [:func, :type]) - {:ok, %Data{func: func, pid: self, opts: opts}} - end - - def handle_cast({:connect, to_pid}, data) do - new_data = update_in(data.next, fn(next) -> [pid | next] end) - {:noreply, new_data} - end - - def send_next(next, msg) do - Enum.each(next, fn(pid) -> - GenServer.cast(pid, msg) - do) - end - end + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, :ok, opts) + end + + def init(params = [func: func, nid: nid]) do + opts = Keyword.drop(params, [:func, :type, :nid]) + {:ok, %Data{func: func, pid: self, nid: nid, opts: opts}} + end + + def handle_cast({:connect, to_pid}, data) do + new_data = update_in(data.next, fn(next) -> [to_pid | next] end) + {:noreply, new_data} + end + + def handle_cast({:send_next, next, msg}, data) do + Enum.each(next, &(GenServer.cast(&1, {:next, self, msg}))) + {:noreply, data} + end + + def send_next(pid, next, msg) do + GenServer.cast(pid, {:send_next, next, msg}) end end diff --git a/lib/exstreme/gnode/broadcast.ex b/lib/exstreme/gnode/broadcast.ex index 6bb557e..63776d8 100644 --- a/lib/exstreme/gnode/broadcast.ex +++ b/lib/exstreme/gnode/broadcast.ex @@ -1,7 +1,8 @@ defmodule Exstreme.GNode.Broadcast do use Exstreme.GNode.Behaviour - def handle_cast({:next, {_, msg}}, stats) do - send_next(stats.next, msg) + def handle_cast({:next, _, msg}, data) do + send_next(self, data.next, msg) + {:noreply, data} end end diff --git a/lib/exstreme/gnode/common.ex b/lib/exstreme/gnode/common.ex index 6d91b67..afe2a3f 100644 --- a/lib/exstreme/gnode/common.ex +++ b/lib/exstreme/gnode/common.ex @@ -1,8 +1,9 @@ defmodule Exstreme.GNode.Common do use Exstreme.GNode.Behaviour - def handle_cast({:next, {_, msg} }, stats) do - {:ok, result} = stats.func.(msg) - send_next(stats.next, result) + def handle_cast({:next, _, msg}, data) do + {:ok, result} = data.func.(msg, data) + send_next(self, data.next, result) + {:noreply, data} end end diff --git a/lib/exstreme/gnode/funnel.ex b/lib/exstreme/gnode/funnel.ex index 38ddfc1..c80509d 100644 --- a/lib/exstreme/gnode/funnel.ex +++ b/lib/exstreme/gnode/funnel.ex @@ -1,7 +1,42 @@ defmodule Exstreme.GNode.Funnel do use Exstreme.GNode.Behaviour - def handle_cast({:next, {from_data, msg}}, stats) do + def handle_cast({:next, from_data, msg}, data) do + {res, new_queue} = + data.funnel_queue + |> add_queue(from_data.nid, msg) + |> get_queue(data.opts[:before_nodes]) + send_message(res, data.next) + {:noreply, update_in(data.funnel_queue, new_queue)} end + #private + + defp send_message(res, next) do + if res != nil do + new_msg = + res + |> Map.values + |> List.to_tuple + send_next(self, next, new_msg) + end + end + + defp add_queue(queue, from, msg) do + idx = Enum.find_index(queue, &(!Map.has_key?(&1, from))) + if idx != nil do + List.update_at(queue, idx, &(Map.put(&1, from, msg))) + else + [queue | Map.new |> Map.put(from, msg)] + end + end + + defp get_queue(queue = [head | tail], before_nodes) do + before_nodes_set = MapSet.new(before_nodes) + if MapSet.equal?(MapSet.new(Map.keys(head)), before_nodes_set) do + {head, tail} + else + {nil, queue} + end + end end diff --git a/lib/exstreme/graph.ex b/lib/exstreme/graph.ex index 29437fb..26c8815 100644 --- a/lib/exstreme/graph.ex +++ b/lib/exstreme/graph.ex @@ -50,6 +50,28 @@ defmodule Exstreme.Graph do |> Enum.filter(is_first?) end + @spec get_before_nodes(t, atom) :: [atom] + def get_before_nodes(%Graph{connections: connections}, node) do + compare_func = + fn(current_node, {_from, to}) -> + current_node == to + end + Enum.reduce(connections, [], fn(connection, res) -> + res ++ get_nodes_func(node, connection, res, compare_func) + end) + end + + @spec get_after_nodes(t, atom) :: [atom] + def get_after_nodes(%Graph{nodes: nodes, connections: connections}, node) do + compare_func = + fn(current_node, {from, _to}) -> + current_node == from + end + Enum.reduce(connections, [], fn(connection, res) -> + res ++ get_nodes_func(node, connection, res, compare_func) + end) + end + # private @spec map_to_connections(t) :: [atom] @@ -81,4 +103,20 @@ defmodule Exstreme.Graph do |> List.flatten |> Enum.member?(node) end + + @spec get_nodes_func(atom, {atom, atom}, [atom], ((atom, {atom, atom}) -> boolean)) :: [atom] + defp get_nodes_func(node, pair = {from, to}, res, func) do + case to do + to when is_atom(to) -> + if func.(node, pair) do + [res | node] + else + res + end + to when is_list(to) -> + Enum.map(to, fn(current_to) -> + get_nodes_func(node, {from, current_to}, res, func) + end) + end + end end diff --git a/lib/exstreme/graph_builder.ex b/lib/exstreme/graph_builder.ex index b982e11..654f31f 100644 --- a/lib/exstreme/graph_builder.ex +++ b/lib/exstreme/graph_builder.ex @@ -4,27 +4,47 @@ defmodule Exstreme.GraphBuilder do alias Exstreme.GNode.Broadcast alias Exstreme.GNode.Funnel alias Exstreme.GNode.Common + alias Exstreme.Graph @doc """ """ @spec build(Graph.t) :: Graph.t def build(graph) do - new_graph = update_in(graph.nodes, &start_nodes/1) - connect_nodes(new_graph) - new_graph + graph + |> update_nodes_relations + |> start_nodes + |> connect_nodes end #private - @spec start_nodes(%{key: [key: term]}) :: %{key: [key: term]} - defp start_nodes(nodes) do - nodes - |> Enum.map(&start_node/1) - |> Map.new + @spec update_nodes_relations(Graph.t) :: Graph.t + defp update_nodes_relations(graph) do + update_node_func = + fn({node, params}) -> + new_params = + params + |> Keyword.put(:before_nodes, Graph.get_before_nodes(graph, node)) + |> Keyword.put(:after_nodes, Graph.get_after_nodes(graph, node)) + + {node, new_params} + end + + update_in(graph.nodes, &(&1 |> Enum.map(update_node_func) |> Map.new)) + end + + @spec start_nodes(Graph.t) :: Graph.t + defp start_nodes(graph) do + update_in(graph.nodes, fn(nodes) -> + nodes + |> Enum.map(&start_node/1) + |> Map.new + end) end @spec start_node({atom, [key: any]}) :: {atom, [key: any]} defp start_node({node, params = [type: type]}) do + params = Keyword.put(params, :nid, node) {:ok, pid} = case type do :broadcast -> Broadcast.start_link(params) @@ -34,20 +54,21 @@ defmodule Exstreme.GraphBuilder do {node, Keyword.put(params, :pid, pid)} end - @spec connect_nodes(Graph.t) :: no_return - defp connect_nodes(%Graph{nodes: nodes, connections: connections}) do - Enum.each(connections, &connect_pair/1) + @spec connect_nodes(Graph.t) :: Graph.t + defp connect_nodes(graph = %Graph{nodes: nodes, connections: connections}) do + Enum.each(connections, &(connect_pair(&1, nodes))) + graph end - @spec connect_pair({atom, [atom]}) :: no_return - defp connect_pair({from, to}) when is_list(to) do + @spec connect_pair({atom, [atom]}, %{key: [key: term]}) :: no_return + defp connect_pair({from, to}, nodes) when is_list(to) do Enum.map(1..Enum.count(to), fn(_) -> from end) |> Enum.zip(to) - |> Enum.each(&connect_pair/1) + |> Enum.each(&(connect_pair(&1, nodes))) end - @spec connect_pair({atom, atom}) :: no_return - defp connect_pair({from, to}) when is_atom(to) do + @spec connect_pair({atom, atom}, %{key: [key: term]}) :: no_return + defp connect_pair({from, to}, nodes) when is_atom(to) do [pid: pid_from] = nodes[from] [pid: pid_to] = nodes[to] GenServer.cast(pid_from, {:connect, pid_to}) diff --git a/test/exstreme/graph_builder_test.exs b/test/exstreme/graph_builder_test.exs new file mode 100644 index 0000000..1f791df --- /dev/null +++ b/test/exstreme/graph_builder_test.exs @@ -0,0 +1,9 @@ +defmodule Exstreme.GraphBuilderTest do + use ExUnit.Case + use Exstreme.Common + alias Exstreme.GraphBuilder + doctest Exstreme.GraphBuilder + + test "" do + end +end diff --git a/test/exstreme/graph_test.exs b/test/exstreme/graph_test.exs index 0001ec5..560a95d 100644 --- a/test/exstreme/graph_test.exs +++ b/test/exstreme/graph_test.exs @@ -16,4 +16,8 @@ defmodule Exstreme.GraphTest do test "the start node is n1" do assert Graph.find_start_node(graph_many_nodes) == [:n1] end + + test "" do + assert Graph.get_before_nodes(graph_many_nodes, :f1) == [:n3, :n4] + end end