Skip to content

Commit

Permalink
added supervisors
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkaspa committed Jul 27, 2016
1 parent 8d64ab7 commit 3958349
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 52 deletions.
5 changes: 2 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ x- Use nid to connect
X- Get nid for a node
X- A node must have a function
X- The function in Broadcast and Funnel nodes can be optional
- Add Supervisors functionality
- Use counters for stats
X- Add Supervisors functionality
X- Use counters for stats
x- Improve test

Future
- Improve API
- Update to GenStage
17 changes: 10 additions & 7 deletions lib/exstreme/gnode/behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ defmodule Exstreme.GNode.Behaviour do

@type t :: %Data{next: [atom], nid: atom, func: graph_func, funnel_queue: [term], received_counter: non_neg_integer, sent_counter: non_neg_integer, opts: [key: term]}
defstruct [next: [], nid: nil, func: nil, funnel_queue: [], received_counter: 0, sent_counter: 0, opts: []]

#TODO use counters
end

def start_link(params \\ []) do
Expand All @@ -36,18 +34,23 @@ defmodule Exstreme.GNode.Behaviour do
{:ok, %Data{func: func, nid: nid, opts: opts}}
end

def handle_cast({:connect, to_nid}, data) do
def handle_cast({:next, from_data, msg}, data) do
send self, {:on_next, from_data, msg}
{:noreply, %{data | received_counter: data.received_counter + 1}}
end

def handle_call({:connect, to_nid}, _from, data) do
new_data = update_in(data.next, fn(next) -> [to_nid | next] end)
{:noreply, new_data}
{:reply, :ok, new_data}
end

def handle_info({:send_next, next, msg}, data) do
Enum.each(next, &(GenServer.cast(&1, {:next, data, msg})))
{:noreply, data}
{:noreply, %{data | sent_counter: data.sent_counter + Enum.count(next)}}
end

def send_next(pid, next, msg) do
send pid, {:send_next, next, msg}
def send_next(next, msg) do
send self, {:send_next, next, msg}
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/exstreme/gnode/broadcast.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ defmodule Exstreme.GNode.Broadcast do
use Exstreme.GNode.Behaviour

# Broadcasts the message to the next nodes
def handle_cast({:next, _, msg}, data) do
def handle_info({:on_next, _, msg}, data) do
{:ok, result} = data.func.(msg, data)
send_next(self, data.next, result)
send_next(data.next, result)
{:noreply, data}
end
end
4 changes: 2 additions & 2 deletions lib/exstreme/gnode/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ defmodule Exstreme.GNode.Common do
use Exstreme.GNode.Behaviour

# Sends the result to the next one
def handle_cast({:next, _, msg}, data) do
def handle_info({:on_next, _, msg}, data) do
{:ok, result} = data.func.(msg, data)
send_next(self, data.next, result)
send_next(data.next, result)
{:noreply, data}
end
end
4 changes: 2 additions & 2 deletions lib/exstreme/gnode/funnel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Exstreme.GNode.Funnel do

# Receives a message and saves it in a messages map queue,
# when all the messages are done sends a the map
def handle_cast({:next, from_data, msg}, data) do
def handle_info({:on_next, from_data, msg}, data) do
{result, new_queue} =
data.funnel_queue
|> add_queue(from_data.nid, msg)
Expand All @@ -20,7 +20,7 @@ defmodule Exstreme.GNode.Funnel do
if result != nil do
new_msg = Map.values(result)
{:ok, result} = data.func.(new_msg, data)
send_next(self, data.next, result)
send_next(data.next, result)
end
end

Expand Down
23 changes: 23 additions & 0 deletions lib/exstreme/gnode/graph_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Exstreme.GraphSupervisor do
use Supervisor

def start_link(graph) do
pid =
graph.name
|> String.to_atom
|> Process.whereis
if pid != nil && Process.alive?(pid) do
{:error, "The supervisor alredy exists"}
else
Supervisor.start_link(__MODULE__, graph, name: String.to_atom(graph.name))
end
end

def init(graph) do
children =
Enum.map(graph.nodes, fn({gnode, params}) ->
worker(Keyword.get(params, :module), [params], id: gnode)
end)
supervise(children, strategy: :one_for_one)
end
end
3 changes: 0 additions & 3 deletions lib/exstreme/gnode/supervisor.ex

This file was deleted.

41 changes: 23 additions & 18 deletions lib/exstreme/graph_builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Exstreme.GraphBuilder do
alias Exstreme.GNode.Common
alias Exstreme.Graph
alias Exstreme.GraphValidator
alias Exstreme.GraphSupervisor

@doc """
Builds the Supervision tree for the graph
Expand All @@ -16,7 +17,7 @@ defmodule Exstreme.GraphBuilder do
with :ok <- GraphValidator.validate(graph) do
graph
|> update_nodes_relations
|> start_nodes
|> start_graph
|> connect_nodes
end
end
Expand All @@ -36,33 +37,37 @@ defmodule Exstreme.GraphBuilder do
{gnode, new_params}
end

update_in(graph.nodes, &(&1 |> Enum.map(update_node_func) |> Map.new))
update_in(graph.nodes, &(Map.new(Enum.map(&1, update_node_func))))
end

# Starts all the nodes
@spec start_nodes(Graph.t) :: Graph.t
defp start_nodes(graph) do
update_in(graph.nodes, fn(nodes) ->
@spec start_graph(Graph.t) :: Graph.t
defp start_graph(graph) do
graph = update_in(graph.nodes, fn(nodes) ->
nodes
|> Enum.map(&start_node/1)
|> Enum.map(&setup_node/1)
|> Map.new
end)
case GraphSupervisor.start_link(graph) do
{:error, msg} ->
raise ArgumentError, message: msg
_ -> :ok
end
graph
end

# Starts a gnode
@spec start_node({atom, [term: any]}) :: {atom, [key: any]}
defp start_node({gnode, params}) do
type = Keyword.get(params, :type)
# setup node's params
@spec setup_node({atom, [term: any]}) :: {atom, [key: any]}
defp setup_node({gnode, params}) do
module = case Keyword.get(params, :type) do
:broadcast -> Broadcast
:funnel -> Funnel
_ -> Common
end
params =
params
|> Keyword.put(:nid, gnode)
|> Keyword.put(:module, module)
|> Keyword.put_new(:func, fn(data, _) -> {:ok, data} end)
{:ok, _} =
case type do
:broadcast -> Broadcast.start_link(params)
:funnel -> Funnel.start_link(params)
_ -> Common.start_link(params)
end
{gnode, params}
end

Expand All @@ -87,6 +92,6 @@ defmodule Exstreme.GraphBuilder do
defp connect_pair({from, to}, nodes) when is_atom(to) do
nid_from = Keyword.get(nodes[from], :nid)
nid_to = Keyword.get(nodes[to], :nid)
GenServer.cast(nid_from, {:connect, nid_to})
:ok = GenServer.call(nid_from, {:connect, nid_to})
end
end
34 changes: 23 additions & 11 deletions test/exstreme/graph_builder_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,43 @@ defmodule Exstreme.GraphBuilderTest do
alias Exstreme.Graph
doctest Exstreme.GraphBuilder

test "creates a graph and check built params" do
graph_built = GraphBuilder.build(create_graph)
setup_all do
{:ok,
graph_built: GraphBuilder.build(create_graph),
graph_many_built: GraphBuilder.build(graph_many_nodes("demo_1"))}
end

test "crashes when tries to create a graph with the same name" do
assert_raise ArgumentError, fn ->
GraphBuilder.build(create_graph("demo_1"))
end
end

test "creates a graph and check built params", %{graph_built: graph_built} do
assert graph_built != create_graph
Enum.each(graph_built.nodes, fn({nid, params}) ->
assert nid != nil
assert Keyword.has_key?(params, :after_nodes)
assert Keyword.has_key?(params, :before_nodes)
assert Keyword.get(params, :nid) != nil
assert nid |> Process.whereis |> Process.alive? == true
pid = Process.whereis(nid)
assert pid != nil
assert Process.alive?(pid) == true
end)
end

test "sends a message to the graph with common nodes" do
graph_built = GraphBuilder.build(create_graph)
test "sends a message to the graph with common nodes", %{graph_built: graph_built} do
[start_node] = Graph.find_start_node(graph_built)
[last_node] = Graph.find_last_node(graph_built)
GenServer.cast(last_node, {:connect, self})
:ok = GenServer.call(last_node, {:connect, self})
GenServer.cast(start_node, {:next, self, {:sum, 0}})
assert_receive {_, {:next, _, {:sum, 2}}}
end

test "sends a message to the graph with many nodes" do
graph_built = GraphBuilder.build(graph_many_nodes)
[start_node] = Graph.find_start_node(graph_built)
[last_node] = Graph.find_last_node(graph_built)
GenServer.cast(last_node, {:connect, self})
test "sends a message to the graph with many nodes", %{graph_many_built: graph_many_built} do
[start_node] = Graph.find_start_node(graph_many_built)
[last_node] = Graph.find_last_node(graph_many_built)
:ok = GenServer.call(last_node, {:connect, self})
GenServer.cast(start_node, {:next, self, {:sum, 0}})
assert_receive {_, {:next, _, {:sum, 7}}}
end
Expand Down
8 changes: 4 additions & 4 deletions test/support/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ defmodule Exstreme.Common do
quote do
def graph_name, do: "demo"

def graph_many_nodes do
graph = GraphCreator.create_graph(graph_name, [])
def graph_many_nodes(name \\ nil) do
graph = GraphCreator.create_graph(name || graph_name, [])
{graph, n1} = GraphCreator.create_node(graph, params)
{graph, n2} = GraphCreator.create_node(graph, params)
{graph, b1} = GraphCreator.create_broadcast(graph, params_broadcast)
Expand Down Expand Up @@ -58,8 +58,8 @@ defmodule Exstreme.Common do
GraphCreator.add_connection(graph, n1, n2)
end

defp create_graph do
graph = GraphCreator.create_graph(graph_name, [])
defp create_graph(name \\ nil) do
graph = GraphCreator.create_graph(name || graph_name, [])
{graph, n1} = GraphCreator.create_node(graph, params)
{graph, n2} = GraphCreator.create_node(graph, params)
GraphCreator.add_connection(graph, n1, n2)
Expand Down

0 comments on commit 3958349

Please sign in to comment.