Skip to content

Commit

Permalink
nid implemented, tests improved
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkaspa committed Jul 5, 2016
1 parent 68ea17b commit 6c73ad3
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 108 deletions.
13 changes: 10 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
- Validate before build
x- Validate before build
x- Named nodes with graph name
x- Add documentation
x- Use nid to connect
X- Get nid for a node
- A node must have a function
- The function in Broadcast and Funnel nodes can be optional
- Add Supervisors functionality
- Named nodes with graph prefix
- Improve test
- Use counters for stats
x- Improve test
- Improve API

- Update to GenStage
19 changes: 12 additions & 7 deletions lib/exstreme/gnode/behaviour.ex
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
defmodule Exstreme.GNode.Behaviour do
@moduledoc """
This behaviour defines what means to be a node in the Graph
"""

@doc """
Every node in the Graph must use this macro
"""
defmacro __using__(_) do
quote do
use GenServer

defmodule Data do
@moduledoc """
The data for each node
"""
alias __MODULE__

@type graph_func :: ((term, Data.t) -> {:ok, term} | :error)

@type t :: %Data{next: [pid], pid: pid, nid: atom, func: graph_func, funnel_queue: [term], received_counter: non_neg_integer, sent_counter: non_neg_integer, opts: [key: term]}
defstruct [next: [], pid: nil, nid: nil, func: nil, funnel_queue: [], received_counter: 0, sent_counter: 0, opts: []]
@type t :: %Data{next: [atom], nid: atom, func: graph_func, funnel_queue: [term], received_counter: non_neg_integer, sent_counter: non_neg_integer, opts: [key: term]}
defstruct [next: [], nid: nil, func: nil, funnel_queue: [], received_counter: 0, sent_counter: 0, opts: []]

#TODO use counters
end
Expand All @@ -28,21 +33,21 @@ defmodule Exstreme.GNode.Behaviour do
func = Keyword.get(params, :func)
nid = Keyword.get(params, :nid)
opts = Keyword.drop(params, [:func, :type, :nid])
{:ok, %Data{func: func, pid: self, nid: nid, opts: opts}}
{:ok, %Data{func: func, nid: nid, opts: opts}}
end

def handle_cast({:connect, to_pid}, data) do
new_data = update_in(data.next, fn(next) -> [to_pid | next] end)
def handle_cast({:connect, to_nid}, data) do
new_data = update_in(data.next, fn(next) -> [to_nid | next] end)
{:noreply, new_data}
end

def handle_cast({:send_next, next, msg}, data) do
def handle_info({:send_next, next, msg}, data) do
Enum.each(next, &(GenServer.cast(&1, {:next, data, msg})))
{:noreply, data}
end

def send_next(pid, next, msg) do
GenServer.cast(pid, {:send_next, next, msg})
send pid, {:send_next, next, msg}
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/exstreme/gnode/broadcast.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
defmodule Exstreme.GNode.Broadcast do
use Exstreme.GNode.Behaviour

# Broadcasts the message to the next nodes
def handle_cast({:next, _, msg}, data) do
send_next(self, data.next, msg)
{:ok, result} = data.func.(msg, data)
send_next(self, data.next, result)
{:noreply, data}
end
end
1 change: 1 addition & 0 deletions lib/exstreme/gnode/common.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Exstreme.GNode.Common do
use Exstreme.GNode.Behaviour

# Sends the result to the next one
def handle_cast({:next, _, msg}, data) do
{:ok, result} = data.func.(msg, data)
send_next(self, data.next, result)
Expand Down
8 changes: 7 additions & 1 deletion lib/exstreme/gnode/funnel.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Exstreme.GNode.Funnel do
use Exstreme.GNode.Behaviour

# Receives a message and saves it in a messages map queue,
# when all the messages are done sends a the map
def handle_cast({:next, from_data, msg}, data) do
{result, new_queue} =
data.funnel_queue
Expand All @@ -10,8 +12,10 @@ defmodule Exstreme.GNode.Funnel do
{:noreply, update_in(data.funnel_queue, fn(_) -> new_queue end)}
end


#private

# Sends the result to the next one
defp send_message(result, data) do
if result != nil do
new_msg = Map.values(result)
Expand All @@ -20,15 +24,17 @@ defmodule Exstreme.GNode.Funnel do
end
end

# Adds a message in the map queue
defp add_queue(queue, from, msg) do
idx = Enum.find_index(queue, &(!Map.has_key?(&1, from)))
if idx != nil do
List.update_at(queue, idx, &(Map.put(&1, from, msg)))
else
[Map.new |> Map.put(from, msg) | queue]
queue ++ [Map.new |> Map.put(from, msg)]
end
end

# Gets a message from the map queue
defp get_queue(queue = [head | tail], before_nodes) do
before_nodes_set = MapSet.new(before_nodes)
if MapSet.equal?(MapSet.new(Map.keys(head)), before_nodes_set) do
Expand Down
3 changes: 3 additions & 0 deletions lib/exstreme/gnode/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
defmodule Exstreme.Supervisor do

end
32 changes: 30 additions & 2 deletions lib/exstreme/graph.ex
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
defmodule Exstreme.Graph do
@moduledoc """
Provides information for a Graph
"""
alias __MODULE__

@typedoc """
Represents the Graph data
"""
@type t :: %Graph{name: String.t,params: [key: term], nodes: %{key: [key: term]}, connections: %{key: atom}}
defstruct name: '', params: [], nodes: %{}, connections: %{}

@doc """
Counts the Graph nodes
"""
@spec count_nodes(t) :: non_neg_integer
def count_nodes(%Graph{nodes: nodes}) do
Expand All @@ -18,6 +21,7 @@ defmodule Exstreme.Graph do
end

@doc """
Counts the connections
"""
@spec count_connections(t) :: non_neg_integer
def count_connections(%Graph{connections: connections}) do
Expand All @@ -28,6 +32,7 @@ defmodule Exstreme.Graph do
end

@doc """
Counts the connected, unconnected, begin and end nodes
"""
@spec connections_stats(t) :: %{key: integer}
def connections_stats(graph) do
Expand All @@ -39,6 +44,7 @@ defmodule Exstreme.Graph do
end

@doc """
Gets the starting node
"""
@spec find_start_node(t) :: [atom]
def find_start_node(%Graph{nodes: nodes, connections: connections}) do
Expand All @@ -53,6 +59,7 @@ defmodule Exstreme.Graph do
end

@doc """
Gets the last nodes
"""
@spec find_last_node(t) :: [atom]
def find_last_node(%Graph{nodes: nodes, connections: connections}) do
Expand All @@ -66,6 +73,9 @@ defmodule Exstreme.Graph do
|> Enum.filter(is_last?)
end

@doc """
Gets the nodes before the current one
"""
@spec get_before_nodes(t, atom) :: [atom]
def get_before_nodes(%Graph{connections: connections}, node) do
compare_func =
Expand All @@ -77,8 +87,11 @@ defmodule Exstreme.Graph do
end) |> Enum.uniq
end

@doc """
Gets the nodes after the current one
"""
@spec get_after_nodes(t, atom) :: [atom]
def get_after_nodes(%Graph{nodes: nodes, connections: connections}, node) do
def get_after_nodes(%Graph{connections: connections}, node) do
compare_func =
fn(current_node, {from, to}) ->
{current_node == from, to}
Expand All @@ -88,8 +101,21 @@ defmodule Exstreme.Graph do
end) |> Enum.uniq
end

@doc """
Gets the name in the Graph for one node
"""
@spec nid(t, atom) :: atom
def nid(%Graph{name: name}, node) do
[char, rest] =
node
|> Atom.to_string
|> String.codepoints
String.to_atom("#{char}_#{name}_#{rest}")
end

# private

# Map the connections to the kind of connection
@spec map_to_connections(t) :: [atom]
defp map_to_connections(%Graph{nodes: nodes, connections: connections}) do
to_connections =
Expand All @@ -107,11 +133,13 @@ defmodule Exstreme.Graph do
|> Enum.map(to_connections)
end

# Checks if a node is the first position of a connection
@spec at_first?(%{key: atom}, atom) :: boolean
defp at_first?(connections, node) do
defp at_first?(connections, node) do
Map.has_key?(connections, node)
end

# Checks if a node is the last position of a connection
@spec at_last?(%{key: atom}, atom) :: boolean
defp at_last?(connections, node) do
connections
Expand Down
22 changes: 16 additions & 6 deletions lib/exstreme/graph_builder.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule Exstreme.GraphBuilder do
@moduledoc """
Builds the Graph into a Supervision tree of process
"""
alias Exstreme.GNode.Broadcast
alias Exstreme.GNode.Funnel
Expand All @@ -22,6 +23,7 @@ defmodule Exstreme.GraphBuilder do

#private

# Returns a Graph with the before_nodes and after_nodes set
@spec update_nodes_relations(Graph.t) :: Graph.t
defp update_nodes_relations(graph) do
update_node_func =
Expand All @@ -37,6 +39,7 @@ defmodule Exstreme.GraphBuilder do
update_in(graph.nodes, &(&1 |> Enum.map(update_node_func) |> Map.new))
end

# Starts all the nodes
@spec start_nodes(Graph.t) :: Graph.t
defp start_nodes(graph) do
update_in(graph.nodes, fn(nodes) ->
Expand All @@ -46,36 +49,43 @@ defmodule Exstreme.GraphBuilder do
end)
end

# Starts a node
@spec start_node({atom, [term: any]}) :: {atom, [key: any]}
defp start_node({node, params}) do
type = Keyword.get(params, :type)
params = Keyword.put(params, :nid, node)
{:ok, pid} =
params =
params
|> Keyword.put(:nid, node)
|> Keyword.put_new(:func, fn(data, _) -> {:ok, data} end)
{:ok, _} =
case type do
:broadcast -> Broadcast.start_link(params)
:funnel -> Funnel.start_link(params)
_ -> Common.start_link(params)
end
{node, Keyword.put(params, :pid, pid)}
{node, params}
end

# Connects all nodes
@spec connect_nodes(Graph.t) :: Graph.t
defp connect_nodes(graph = %Graph{nodes: nodes, connections: connections}) do
Enum.each(connections, &(connect_pair(&1, nodes)))
graph
end

# Connects two nodes
@spec connect_pair({atom, [atom]}, %{key: [key: term]}) :: no_return
defp connect_pair({from, to}, nodes) when is_list(to) do
Enum.map(1..Enum.count(to), fn(_) -> from end)
|> Enum.zip(to)
|> Enum.each(&(connect_pair(&1, nodes)))
end

# Connects two nodes
@spec connect_pair({atom, atom}, %{key: [key: term]}) :: no_return
defp connect_pair({from, to}, nodes) when is_atom(to) do
pid_from = Keyword.get(nodes[from], :pid)
pid_to = Keyword.get(nodes[to], :pid)
GenServer.cast(pid_from, {:connect, pid_to})
nid_from = Keyword.get(nodes[from], :nid)
nid_to = Keyword.get(nodes[to], :nid)
GenServer.cast(nid_from, {:connect, nid_to})
end
end
Loading

0 comments on commit 6c73ad3

Please sign in to comment.