diff --git a/lib/exstreme/gnode/behaviour.ex b/lib/exstreme/gnode/behaviour.ex index a23d006..8f974af 100644 --- a/lib/exstreme/gnode/behaviour.ex +++ b/lib/exstreme/gnode/behaviour.ex @@ -5,43 +5,44 @@ defmodule Exstreme.GNode.Behaviour do defmacro __using__(_) do quote do use GenServer - import Exstreme.GNode.Behaviour - end - end - defmodule Data do - @moduledoc """ - """ - alias __MODULE__ + defmodule Data do + @moduledoc """ + """ + alias __MODULE__ - @type graph_func :: ((term, Data.t) -> {:ok, term} | :error) + @type graph_func :: ((term, Data.t) -> {:ok, term} | :error) - @type t :: %Data{next: [pid], pid: pid, nid: atom, func: graph_func, funnel_queue: [term], received_counter: non_neg_integer, sent_counter: non_neg_integer, opts: [key: term]} - defstruct [next: [], pid: nil, nid: nil, func: nil, funnel_queue: [], received_counter: 0, sent_counter: 0, opts: []] + @type t :: %Data{next: [pid], pid: pid, nid: atom, func: graph_func, funnel_queue: [term], received_counter: non_neg_integer, sent_counter: non_neg_integer, opts: [key: term]} + defstruct [next: [], pid: nil, nid: nil, func: nil, funnel_queue: [], received_counter: 0, sent_counter: 0, opts: []] - #TODO use counters - end + #TODO use counters + end - def start_link(opts \\ []) do - GenServer.start_link(__MODULE__, :ok, opts) - end + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts) + end - def init(params = [func: func, nid: nid]) do - opts = Keyword.drop(params, [:func, :type, :nid]) - {:ok, %Data{func: func, pid: self, nid: nid, opts: opts}} - end + def init(params) do + func = Keyword.get(params, :func) + nid = Keyword.get(params, :nid) + opts = Keyword.drop(params, [:func, :type, :nid]) + {:ok, %Data{func: func, pid: self, nid: nid, opts: opts}} + end - def handle_cast({:connect, to_pid}, data) do - new_data = update_in(data.next, fn(next) -> [to_pid | next] end) - {:noreply, new_data} - end + def handle_cast({:connect, to_pid}, data) do + new_data = update_in(data.next, fn(next) -> [to_pid | next] end) + {:noreply, new_data} + end - def handle_cast({:send_next, next, msg}, data) do - Enum.each(next, &(GenServer.cast(&1, {:next, self, msg}))) - {:noreply, data} - end + def handle_cast({:send_next, next, msg}, data) do + Enum.each(next, &(GenServer.cast(&1, {:next, data, msg}))) + {:noreply, data} + end - def send_next(pid, next, msg) do - GenServer.cast(pid, {:send_next, next, msg}) + def send_next(pid, next, msg) do + GenServer.cast(pid, {:send_next, next, msg}) + end + end end end diff --git a/lib/exstreme/gnode/funnel.ex b/lib/exstreme/gnode/funnel.ex index c80509d..2d9cb81 100644 --- a/lib/exstreme/gnode/funnel.ex +++ b/lib/exstreme/gnode/funnel.ex @@ -2,23 +2,21 @@ defmodule Exstreme.GNode.Funnel do use Exstreme.GNode.Behaviour def handle_cast({:next, from_data, msg}, data) do - {res, new_queue} = + {result, new_queue} = data.funnel_queue |> add_queue(from_data.nid, msg) |> get_queue(data.opts[:before_nodes]) - send_message(res, data.next) - {:noreply, update_in(data.funnel_queue, new_queue)} + send_message(result, data) + {:noreply, update_in(data.funnel_queue, fn(_) -> new_queue end)} end #private - defp send_message(res, next) do - if res != nil do - new_msg = - res - |> Map.values - |> List.to_tuple - send_next(self, next, new_msg) + defp send_message(result, data) do + if result != nil do + new_msg = Map.values(result) + {:ok, result} = data.func.(new_msg, data) + send_next(self, data.next, result) end end @@ -27,7 +25,7 @@ defmodule Exstreme.GNode.Funnel do if idx != nil do List.update_at(queue, idx, &(Map.put(&1, from, msg))) else - [queue | Map.new |> Map.put(from, msg)] + [Map.new |> Map.put(from, msg) | queue] end end diff --git a/lib/exstreme/graph.ex b/lib/exstreme/graph.ex index 26c8815..4252175 100644 --- a/lib/exstreme/graph.ex +++ b/lib/exstreme/graph.ex @@ -50,26 +50,40 @@ defmodule Exstreme.Graph do |> Enum.filter(is_first?) end + @doc """ + """ + @spec find_last_node(t) :: [atom] + def find_last_node(%Graph{nodes: nodes, connections: connections}) do + is_last? = + fn(node) -> + not(at_first?(connections, node)) and at_last?(connections, node) + end + + nodes + |> Map.keys + |> Enum.filter(is_last?) + end + @spec get_before_nodes(t, atom) :: [atom] def get_before_nodes(%Graph{connections: connections}, node) do compare_func = - fn(current_node, {_from, to}) -> - current_node == to + fn(current_node, {from, to}) -> + {current_node == to, from} end Enum.reduce(connections, [], fn(connection, res) -> - res ++ get_nodes_func(node, connection, res, compare_func) - end) + List.flatten(get_nodes_func(node, connection, res, compare_func), res) + end) |> Enum.uniq end @spec get_after_nodes(t, atom) :: [atom] def get_after_nodes(%Graph{nodes: nodes, connections: connections}, node) do compare_func = - fn(current_node, {from, _to}) -> - current_node == from + fn(current_node, {from, to}) -> + {current_node == from, to} end Enum.reduce(connections, [], fn(connection, res) -> res ++ get_nodes_func(node, connection, res, compare_func) - end) + end) |> Enum.uniq end # private @@ -108,14 +122,15 @@ defmodule Exstreme.Graph do defp get_nodes_func(node, pair = {from, to}, res, func) do case to do to when is_atom(to) -> - if func.(node, pair) do - [res | node] + {ok, add_node} = func.(node, pair) + if ok do + [add_node | res] else res end to when is_list(to) -> - Enum.map(to, fn(current_to) -> - get_nodes_func(node, {from, current_to}, res, func) + Enum.reduce(to, res, fn(current_to, new_res) -> + List.flatten(get_nodes_func(node, {from, current_to}, new_res, func), new_res) end) end end diff --git a/lib/exstreme/graph_builder.ex b/lib/exstreme/graph_builder.ex index 654f31f..7c0026c 100644 --- a/lib/exstreme/graph_builder.ex +++ b/lib/exstreme/graph_builder.ex @@ -42,8 +42,9 @@ defmodule Exstreme.GraphBuilder do end) end - @spec start_node({atom, [key: any]}) :: {atom, [key: any]} - defp start_node({node, params = [type: type]}) do + @spec start_node({atom, [term: any]}) :: {atom, [key: any]} + defp start_node({node, params}) do + type = Keyword.get(params, :type) params = Keyword.put(params, :nid, node) {:ok, pid} = case type do @@ -69,8 +70,8 @@ defmodule Exstreme.GraphBuilder do @spec connect_pair({atom, atom}, %{key: [key: term]}) :: no_return defp connect_pair({from, to}, nodes) when is_atom(to) do - [pid: pid_from] = nodes[from] - [pid: pid_to] = nodes[to] + pid_from = Keyword.get(nodes[from], :pid) + pid_to = Keyword.get(nodes[to], :pid) GenServer.cast(pid_from, {:connect, pid_to}) end end diff --git a/test/exstreme/graph_builder_test.exs b/test/exstreme/graph_builder_test.exs index 1f791df..fc61daa 100644 --- a/test/exstreme/graph_builder_test.exs +++ b/test/exstreme/graph_builder_test.exs @@ -2,8 +2,39 @@ defmodule Exstreme.GraphBuilderTest do use ExUnit.Case use Exstreme.Common alias Exstreme.GraphBuilder + alias Exstreme.Graph doctest Exstreme.GraphBuilder - test "" do + test "creates a graph" do + graph_built = GraphBuilder.build(create_graph) + assert graph_built != create_graph + Enum.each(graph_built.nodes, fn({_, params}) -> + assert Keyword.has_key?(params, :pid) + assert Keyword.has_key?(params, :after_nodes) + assert Keyword.has_key?(params, :before_nodes) + assert Keyword.get(params, :pid) != nil + end) + end + + test "sends a message to the graph with common nodes" do + graph_built = GraphBuilder.build(create_graph) + [start_node] = Graph.find_start_node(graph_built) + [last_node] = Graph.find_last_node(graph_built) + start_node_pid = Keyword.get(graph_built.nodes[start_node], :pid) + last_node_pid = Keyword.get(graph_built.nodes[last_node], :pid) + GenServer.cast(last_node_pid, {:connect, self}) + GenServer.cast(start_node_pid, {:next, self, {:sum, 0}}) + assert_receive {_, {:next, last_node_pid, {:sum, 2}}} + end + + test "sends a message to the graph with many nodes" do + graph_built = GraphBuilder.build(graph_many_nodes) + [start_node] = Graph.find_start_node(graph_built) + [last_node] = Graph.find_last_node(graph_built) + start_node_pid = Keyword.get(graph_built.nodes[start_node], :pid) + last_node_pid = Keyword.get(graph_built.nodes[last_node], :pid) + GenServer.cast(last_node_pid, {:connect, self}) + GenServer.cast(start_node_pid, {:next, self, {:sum, 0}}) + assert_receive {_, {:next, last_node_pid, {:sum, 7}}} end end diff --git a/test/exstreme/graph_creator_test.exs b/test/exstreme/graph_creator_test.exs index c126957..96321b7 100644 --- a/test/exstreme/graph_creator_test.exs +++ b/test/exstreme/graph_creator_test.exs @@ -7,7 +7,7 @@ defmodule Exstreme.GraphCreatorTest do test "creates a valid graph struct" do compare_graph = %Graph{ - nodes: %{n1: [], n2: []}, + nodes: %{n1: params, n2: params}, connections: %{n1: :n2} } assert create_graph == compare_graph @@ -36,7 +36,7 @@ defmodule Exstreme.GraphCreatorTest do test "can create n3 and add a relation between n2 and n3" do compare_graph = %Graph{ - nodes: %{n1: [], n2: [], n3: []}, + nodes: %{n1: params, n2: params, n3: params}, connections: %{n1: :n2, n2: :n3} } @@ -48,11 +48,11 @@ defmodule Exstreme.GraphCreatorTest do test "can add a broadcast an many nodes to the broadcast" do compare_graph = %Graph{ - nodes: %{n1: [], n2: [], b1: [], n3: [], n4: []}, + nodes: %{n1: params, n2: params, b1: params_broadcast, n3: params, n4: params}, connections: %{n1: :n2, n2: :b1, b1: [:n4, :n3]} } - {graph, b1} = GraphCreator.create_broadcast(create_graph, params) + {graph, b1} = GraphCreator.create_broadcast(create_graph, params_broadcast) {graph, n3} = GraphCreator.create_node(graph, params) {graph, n4} = GraphCreator.create_node(graph, params) @@ -67,14 +67,14 @@ defmodule Exstreme.GraphCreatorTest do test "can add a funnel" do compare_graph = %Graph{ - nodes: %{n1: [], n2: [], b1: [], n3: [], n4: [], f1: [], n5: []}, + nodes: %{n1: params, n2: params, b1: params_broadcast, n3: params, n4: params, f1: params_funnel, n5: params}, connections: %{n1: :n2, n2: :b1, b1: [:n4, :n3], n3: :f1, n4: :f1, f1: :n5} } - {graph, b1} = GraphCreator.create_broadcast(create_graph, params) + {graph, b1} = GraphCreator.create_broadcast(create_graph, params_broadcast) {graph, n3} = GraphCreator.create_node(graph, params) {graph, n4} = GraphCreator.create_node(graph, params) - {graph, f1} = GraphCreator.create_funnel(graph, params) + {graph, f1} = GraphCreator.create_funnel(graph, params_funnel) {graph, n5} = GraphCreator.create_node(graph, params) new_graph = diff --git a/test/exstreme/graph_test.exs b/test/exstreme/graph_test.exs index 560a95d..87164b2 100644 --- a/test/exstreme/graph_test.exs +++ b/test/exstreme/graph_test.exs @@ -17,7 +17,17 @@ defmodule Exstreme.GraphTest do assert Graph.find_start_node(graph_many_nodes) == [:n1] end - test "" do - assert Graph.get_before_nodes(graph_many_nodes, :f1) == [:n3, :n4] + test "the last node is n5" do + assert Graph.find_last_node(graph_many_nodes) == [:n5] + end + + test "the nodes before f1 are n4 and n3" do + res = Graph.get_before_nodes(graph_many_nodes, :f1) + assert res == [:n4, :n3] + end + + test "the nodes after f1 are n4 and n3" do + res = Graph.get_after_nodes(graph_many_nodes, :f1) + assert res == [:n5] end end diff --git a/test/support/common.ex b/test/support/common.ex index 49d6488..d73c7e2 100644 --- a/test/support/common.ex +++ b/test/support/common.ex @@ -5,13 +5,13 @@ defmodule Exstreme.Common do using do quote do def graph_many_nodes do - graph = GraphCreator.create_graph(params) + graph = GraphCreator.create_graph([]) {graph, n1} = GraphCreator.create_node(graph, params) {graph, n2} = GraphCreator.create_node(graph, params) - {graph, b1} = GraphCreator.create_broadcast(graph, params) + {graph, b1} = GraphCreator.create_broadcast(graph, params_broadcast) {graph, n3} = GraphCreator.create_node(graph, params) {graph, n4} = GraphCreator.create_node(graph, params) - {graph, f1} = GraphCreator.create_funnel(graph, params) + {graph, f1} = GraphCreator.create_funnel(graph, params_funnel) {graph, n5} = GraphCreator.create_node(graph, params) graph @@ -27,29 +27,29 @@ defmodule Exstreme.Common do # invalid graphs def graph_one_node_no_connections do - graph = GraphCreator.create_graph(params) + graph = GraphCreator.create_graph([]) {graph, _n1} = GraphCreator.create_node(graph, params) graph end - def graph_no_connections, do: GraphCreator.create_graph(params) + def graph_no_connections, do: GraphCreator.create_graph([]) def graph_start_with_broadcast do - graph = GraphCreator.create_graph(params) - {graph, b1} = GraphCreator.create_broadcast(graph, params) + graph = GraphCreator.create_graph([]) + {graph, b1} = GraphCreator.create_broadcast(graph, params_broadcast) {graph, n1} = GraphCreator.create_node(graph, params) GraphCreator.add_connection(graph, b1, n1) end def graph_start_with_funnnel do - graph = GraphCreator.create_graph(params) - {graph, f1} = GraphCreator.create_funnel(graph, params) + graph = GraphCreator.create_graph([]) + {graph, f1} = GraphCreator.create_funnel(graph, params_funnel) {graph, n1} = GraphCreator.create_node(graph, params) GraphCreator.add_connection(graph, f1, n1) end def graph_unconnected_nodes do - graph = GraphCreator.create_graph(params) + graph = GraphCreator.create_graph([]) {graph, n1} = GraphCreator.create_node(graph, params) {graph, n2} = GraphCreator.create_node(graph, params) {graph, _n3} = GraphCreator.create_node(graph, params) @@ -57,13 +57,17 @@ defmodule Exstreme.Common do end defp create_graph do - graph = GraphCreator.create_graph(params) + graph = GraphCreator.create_graph([]) {graph, n1} = GraphCreator.create_node(graph, params) {graph, n2} = GraphCreator.create_node(graph, params) GraphCreator.add_connection(graph, n1, n2) end - def params, do: [] + def params, do: [type: :common, func: fn({:sum, acc}, _) -> {:ok, {:sum, acc + 1}} end] + + def params_funnel, do: [type: :funnel, func: fn(values, _) -> {:ok, {:sum, Enum.reduce(values, 0, fn({:sum, num}, acc) -> num + acc end)}} end] + + def params_broadcast, do: [type: :broadcast] end end end