diff --git a/TODO.md b/TODO.md index 3ca7116..8f585d8 100644 --- a/TODO.md +++ b/TODO.md @@ -5,10 +5,9 @@ x- Use nid to connect X- Get nid for a node X- A node must have a function X- The function in Broadcast and Funnel nodes can be optional -- Add Supervisors functionality -- Use counters for stats +X- Add Supervisors functionality +X- Use counters for stats x- Improve test Future - Improve API -- Update to GenStage diff --git a/lib/exstreme/gnode/behaviour.ex b/lib/exstreme/gnode/behaviour.ex index 01c4a20..f86c99f 100644 --- a/lib/exstreme/gnode/behaviour.ex +++ b/lib/exstreme/gnode/behaviour.ex @@ -20,8 +20,6 @@ defmodule Exstreme.GNode.Behaviour do @type t :: %Data{next: [atom], nid: atom, func: graph_func, funnel_queue: [term], received_counter: non_neg_integer, sent_counter: non_neg_integer, opts: [key: term]} defstruct [next: [], nid: nil, func: nil, funnel_queue: [], received_counter: 0, sent_counter: 0, opts: []] - - #TODO use counters end def start_link(params \\ []) do @@ -36,18 +34,23 @@ defmodule Exstreme.GNode.Behaviour do {:ok, %Data{func: func, nid: nid, opts: opts}} end - def handle_cast({:connect, to_nid}, data) do + def handle_cast({:next, from_data, msg}, data) do + send self, {:on_next, from_data, msg} + {:noreply, %{data | received_counter: data.received_counter + 1}} + end + + def handle_call({:connect, to_nid}, _from, data) do new_data = update_in(data.next, fn(next) -> [to_nid | next] end) - {:noreply, new_data} + {:reply, :ok, new_data} end def handle_info({:send_next, next, msg}, data) do Enum.each(next, &(GenServer.cast(&1, {:next, data, msg}))) - {:noreply, data} + {:noreply, %{data | sent_counter: data.sent_counter + Enum.count(next)}} end - def send_next(pid, next, msg) do - send pid, {:send_next, next, msg} + def send_next(next, msg) do + send self, {:send_next, next, msg} end end end diff --git a/lib/exstreme/gnode/broadcast.ex b/lib/exstreme/gnode/broadcast.ex index 2d0f91e..267ee13 100644 --- a/lib/exstreme/gnode/broadcast.ex +++ b/lib/exstreme/gnode/broadcast.ex @@ -2,9 +2,9 @@ defmodule Exstreme.GNode.Broadcast do use Exstreme.GNode.Behaviour # Broadcasts the message to the next nodes - def handle_cast({:next, _, msg}, data) do + def handle_info({:on_next, _, msg}, data) do {:ok, result} = data.func.(msg, data) - send_next(self, data.next, result) + send_next(data.next, result) {:noreply, data} end end diff --git a/lib/exstreme/gnode/common.ex b/lib/exstreme/gnode/common.ex index 964b696..098f80f 100644 --- a/lib/exstreme/gnode/common.ex +++ b/lib/exstreme/gnode/common.ex @@ -2,9 +2,9 @@ defmodule Exstreme.GNode.Common do use Exstreme.GNode.Behaviour # Sends the result to the next one - def handle_cast({:next, _, msg}, data) do + def handle_info({:on_next, _, msg}, data) do {:ok, result} = data.func.(msg, data) - send_next(self, data.next, result) + send_next(data.next, result) {:noreply, data} end end diff --git a/lib/exstreme/gnode/funnel.ex b/lib/exstreme/gnode/funnel.ex index 45d240d..6382e44 100644 --- a/lib/exstreme/gnode/funnel.ex +++ b/lib/exstreme/gnode/funnel.ex @@ -3,7 +3,7 @@ defmodule Exstreme.GNode.Funnel do # Receives a message and saves it in a messages map queue, # when all the messages are done sends a the map - def handle_cast({:next, from_data, msg}, data) do + def handle_info({:on_next, from_data, msg}, data) do {result, new_queue} = data.funnel_queue |> add_queue(from_data.nid, msg) @@ -20,7 +20,7 @@ defmodule Exstreme.GNode.Funnel do if result != nil do new_msg = Map.values(result) {:ok, result} = data.func.(new_msg, data) - send_next(self, data.next, result) + send_next(data.next, result) end end diff --git a/lib/exstreme/gnode/graph_supervisor.ex b/lib/exstreme/gnode/graph_supervisor.ex new file mode 100644 index 0000000..5a123f0 --- /dev/null +++ b/lib/exstreme/gnode/graph_supervisor.ex @@ -0,0 +1,23 @@ +defmodule Exstreme.GraphSupervisor do + use Supervisor + + def start_link(graph) do + pid = + graph.name + |> String.to_atom + |> Process.whereis + if pid != nil && Process.alive?(pid) do + {:error, "The supervisor alredy exists"} + else + Supervisor.start_link(__MODULE__, graph, name: String.to_atom(graph.name)) + end + end + + def init(graph) do + children = + Enum.map(graph.nodes, fn({gnode, params}) -> + worker(Keyword.get(params, :module), [params], id: gnode) + end) + supervise(children, strategy: :one_for_one) + end +end diff --git a/lib/exstreme/gnode/supervisor.ex b/lib/exstreme/gnode/supervisor.ex deleted file mode 100644 index 41b931f..0000000 --- a/lib/exstreme/gnode/supervisor.ex +++ /dev/null @@ -1,3 +0,0 @@ -defmodule Exstreme.Supervisor do - -end diff --git a/lib/exstreme/graph_builder.ex b/lib/exstreme/graph_builder.ex index 70be31b..e206bce 100644 --- a/lib/exstreme/graph_builder.ex +++ b/lib/exstreme/graph_builder.ex @@ -7,6 +7,7 @@ defmodule Exstreme.GraphBuilder do alias Exstreme.GNode.Common alias Exstreme.Graph alias Exstreme.GraphValidator + alias Exstreme.GraphSupervisor @doc """ Builds the Supervision tree for the graph @@ -16,7 +17,7 @@ defmodule Exstreme.GraphBuilder do with :ok <- GraphValidator.validate(graph) do graph |> update_nodes_relations - |> start_nodes + |> start_graph |> connect_nodes end end @@ -36,33 +37,37 @@ defmodule Exstreme.GraphBuilder do {gnode, new_params} end - update_in(graph.nodes, &(&1 |> Enum.map(update_node_func) |> Map.new)) + update_in(graph.nodes, &(Map.new(Enum.map(&1, update_node_func)))) end - # Starts all the nodes - @spec start_nodes(Graph.t) :: Graph.t - defp start_nodes(graph) do - update_in(graph.nodes, fn(nodes) -> + @spec start_graph(Graph.t) :: Graph.t + defp start_graph(graph) do + graph = update_in(graph.nodes, fn(nodes) -> nodes - |> Enum.map(&start_node/1) + |> Enum.map(&setup_node/1) |> Map.new end) + case GraphSupervisor.start_link(graph) do + {:error, msg} -> + raise ArgumentError, message: msg + _ -> :ok + end + graph end - # Starts a gnode - @spec start_node({atom, [term: any]}) :: {atom, [key: any]} - defp start_node({gnode, params}) do - type = Keyword.get(params, :type) + # setup node's params + @spec setup_node({atom, [term: any]}) :: {atom, [key: any]} + defp setup_node({gnode, params}) do + module = case Keyword.get(params, :type) do + :broadcast -> Broadcast + :funnel -> Funnel + _ -> Common + end params = params |> Keyword.put(:nid, gnode) + |> Keyword.put(:module, module) |> Keyword.put_new(:func, fn(data, _) -> {:ok, data} end) - {:ok, _} = - case type do - :broadcast -> Broadcast.start_link(params) - :funnel -> Funnel.start_link(params) - _ -> Common.start_link(params) - end {gnode, params} end @@ -87,6 +92,6 @@ defmodule Exstreme.GraphBuilder do defp connect_pair({from, to}, nodes) when is_atom(to) do nid_from = Keyword.get(nodes[from], :nid) nid_to = Keyword.get(nodes[to], :nid) - GenServer.cast(nid_from, {:connect, nid_to}) + :ok = GenServer.call(nid_from, {:connect, nid_to}) end end diff --git a/test/exstreme/graph_builder_test.exs b/test/exstreme/graph_builder_test.exs index 982bdf4..76b9289 100644 --- a/test/exstreme/graph_builder_test.exs +++ b/test/exstreme/graph_builder_test.exs @@ -5,31 +5,43 @@ defmodule Exstreme.GraphBuilderTest do alias Exstreme.Graph doctest Exstreme.GraphBuilder - test "creates a graph and check built params" do - graph_built = GraphBuilder.build(create_graph) + setup_all do + {:ok, + graph_built: GraphBuilder.build(create_graph), + graph_many_built: GraphBuilder.build(graph_many_nodes("demo_1"))} + end + + test "crashes when tries to create a graph with the same name" do + assert_raise ArgumentError, fn -> + GraphBuilder.build(create_graph("demo_1")) + end + end + + test "creates a graph and check built params", %{graph_built: graph_built} do assert graph_built != create_graph Enum.each(graph_built.nodes, fn({nid, params}) -> + assert nid != nil assert Keyword.has_key?(params, :after_nodes) assert Keyword.has_key?(params, :before_nodes) assert Keyword.get(params, :nid) != nil - assert nid |> Process.whereis |> Process.alive? == true + pid = Process.whereis(nid) + assert pid != nil + assert Process.alive?(pid) == true end) end - test "sends a message to the graph with common nodes" do - graph_built = GraphBuilder.build(create_graph) + test "sends a message to the graph with common nodes", %{graph_built: graph_built} do [start_node] = Graph.find_start_node(graph_built) [last_node] = Graph.find_last_node(graph_built) - GenServer.cast(last_node, {:connect, self}) + :ok = GenServer.call(last_node, {:connect, self}) GenServer.cast(start_node, {:next, self, {:sum, 0}}) assert_receive {_, {:next, _, {:sum, 2}}} end - test "sends a message to the graph with many nodes" do - graph_built = GraphBuilder.build(graph_many_nodes) - [start_node] = Graph.find_start_node(graph_built) - [last_node] = Graph.find_last_node(graph_built) - GenServer.cast(last_node, {:connect, self}) + test "sends a message to the graph with many nodes", %{graph_many_built: graph_many_built} do + [start_node] = Graph.find_start_node(graph_many_built) + [last_node] = Graph.find_last_node(graph_many_built) + :ok = GenServer.call(last_node, {:connect, self}) GenServer.cast(start_node, {:next, self, {:sum, 0}}) assert_receive {_, {:next, _, {:sum, 7}}} end diff --git a/test/support/common.ex b/test/support/common.ex index e263648..a025ab8 100644 --- a/test/support/common.ex +++ b/test/support/common.ex @@ -6,8 +6,8 @@ defmodule Exstreme.Common do quote do def graph_name, do: "demo" - def graph_many_nodes do - graph = GraphCreator.create_graph(graph_name, []) + def graph_many_nodes(name \\ nil) do + graph = GraphCreator.create_graph(name || graph_name, []) {graph, n1} = GraphCreator.create_node(graph, params) {graph, n2} = GraphCreator.create_node(graph, params) {graph, b1} = GraphCreator.create_broadcast(graph, params_broadcast) @@ -58,8 +58,8 @@ defmodule Exstreme.Common do GraphCreator.add_connection(graph, n1, n2) end - defp create_graph do - graph = GraphCreator.create_graph(graph_name, []) + defp create_graph(name \\ nil) do + graph = GraphCreator.create_graph(name || graph_name, []) {graph, n1} = GraphCreator.create_node(graph, params) {graph, n2} = GraphCreator.create_node(graph, params) GraphCreator.add_connection(graph, n1, n2)