Skip to content

Commit

Permalink
starting to test
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkaspa committed May 17, 2016
1 parent d11322c commit 7f1074a
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 50 deletions.
63 changes: 35 additions & 28 deletions lib/exstreme/gnode/behaviour.ex
Original file line number Diff line number Diff line change
@@ -1,40 +1,47 @@
defmodule Exstreme.Gnode.Behaviour do
defmodule Exstreme.GNode.Behaviour do
@moduledoc """
"""

defmacro __using__(_) do
quote do
use GenServer
import Exstreme.GNode.Behaviour
end
end

defmodule Data do
@moduledoc """
"""
alias __MODULE__

@type t :: %Data{next: [pid], pid: pid, func: (term) -> no_return, opts: [key: term]}
defstruct [next: [], pid: nil, func: nil, opts: []]
@type graph_func :: ((term, Data.t) -> {:ok, term} | :error)

@type t :: %Data{next: [pid], pid: pid, nid: atom, func: graph_func, funnel_queue: [term], received_counter: non_neg_integer, sent_counter: non_neg_integer, opts: [key: term]}
defstruct [next: [], pid: nil, nid: nil, func: nil, funnel_queue: [], received_counter: 0, sent_counter: 0, opts: []]

#TODO use counters
end

def __using__ do
quote do
use GenServer
alias Exstreme.Gnode.Behaviour.Data

def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end

def init(params = [func: func]) do
opts = Keyword.drop(params, [:func, :type])
{:ok, %Data{func: func, pid: self, opts: opts}}
end

def handle_cast({:connect, to_pid}, data) do
new_data = update_in(data.next, fn(next) -> [pid | next] end)
{:noreply, new_data}
end

def send_next(next, msg) do
Enum.each(next, fn(pid) ->
GenServer.cast(pid, msg)
do)
end
end
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end

def init(params = [func: func, nid: nid]) do
opts = Keyword.drop(params, [:func, :type, :nid])
{:ok, %Data{func: func, pid: self, nid: nid, opts: opts}}
end

def handle_cast({:connect, to_pid}, data) do
new_data = update_in(data.next, fn(next) -> [to_pid | next] end)
{:noreply, new_data}
end

def handle_cast({:send_next, next, msg}, data) do
Enum.each(next, &(GenServer.cast(&1, {:next, self, msg})))
{:noreply, data}
end

def send_next(pid, next, msg) do
GenServer.cast(pid, {:send_next, next, msg})
end
end
5 changes: 3 additions & 2 deletions lib/exstreme/gnode/broadcast.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
defmodule Exstreme.GNode.Broadcast do
use Exstreme.GNode.Behaviour

def handle_cast({:next, {_, msg}}, stats) do
send_next(stats.next, msg)
def handle_cast({:next, _, msg}, data) do
send_next(self, data.next, msg)
{:noreply, data}
end
end
7 changes: 4 additions & 3 deletions lib/exstreme/gnode/common.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
defmodule Exstreme.GNode.Common do
use Exstreme.GNode.Behaviour

def handle_cast({:next, {_, msg} }, stats) do
{:ok, result} = stats.func.(msg)
send_next(stats.next, result)
def handle_cast({:next, _, msg}, data) do
{:ok, result} = data.func.(msg, data)
send_next(self, data.next, result)
{:noreply, data}
end
end
37 changes: 36 additions & 1 deletion lib/exstreme/gnode/funnel.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,42 @@
defmodule Exstreme.GNode.Funnel do
use Exstreme.GNode.Behaviour

def handle_cast({:next, {from_data, msg}}, stats) do
def handle_cast({:next, from_data, msg}, data) do
{res, new_queue} =
data.funnel_queue
|> add_queue(from_data.nid, msg)
|> get_queue(data.opts[:before_nodes])
send_message(res, data.next)
{:noreply, update_in(data.funnel_queue, new_queue)}
end

#private

defp send_message(res, next) do
if res != nil do
new_msg =
res
|> Map.values
|> List.to_tuple
send_next(self, next, new_msg)
end
end

defp add_queue(queue, from, msg) do
idx = Enum.find_index(queue, &(!Map.has_key?(&1, from)))
if idx != nil do
List.update_at(queue, idx, &(Map.put(&1, from, msg)))
else
[queue | Map.new |> Map.put(from, msg)]
end
end

defp get_queue(queue = [head | tail], before_nodes) do
before_nodes_set = MapSet.new(before_nodes)
if MapSet.equal?(MapSet.new(Map.keys(head)), before_nodes_set) do
{head, tail}
else
{nil, queue}
end
end
end
38 changes: 38 additions & 0 deletions lib/exstreme/graph.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,28 @@ defmodule Exstreme.Graph do
|> Enum.filter(is_first?)
end

@spec get_before_nodes(t, atom) :: [atom]
def get_before_nodes(%Graph{connections: connections}, node) do
compare_func =
fn(current_node, {_from, to}) ->
current_node == to
end
Enum.reduce(connections, [], fn(connection, res) ->
res ++ get_nodes_func(node, connection, res, compare_func)
end)
end

@spec get_after_nodes(t, atom) :: [atom]
def get_after_nodes(%Graph{nodes: nodes, connections: connections}, node) do
compare_func =
fn(current_node, {from, _to}) ->
current_node == from
end
Enum.reduce(connections, [], fn(connection, res) ->
res ++ get_nodes_func(node, connection, res, compare_func)
end)
end

# private

@spec map_to_connections(t) :: [atom]
Expand Down Expand Up @@ -81,4 +103,20 @@ defmodule Exstreme.Graph do
|> List.flatten
|> Enum.member?(node)
end

@spec get_nodes_func(atom, {atom, atom}, [atom], ((atom, {atom, atom}) -> boolean)) :: [atom]
defp get_nodes_func(node, pair = {from, to}, res, func) do
case to do
to when is_atom(to) ->
if func.(node, pair) do
[res | node]
else
res
end
to when is_list(to) ->
Enum.map(to, fn(current_to) ->
get_nodes_func(node, {from, current_to}, res, func)
end)
end
end
end
53 changes: 37 additions & 16 deletions lib/exstreme/graph_builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,47 @@ defmodule Exstreme.GraphBuilder do
alias Exstreme.GNode.Broadcast
alias Exstreme.GNode.Funnel
alias Exstreme.GNode.Common
alias Exstreme.Graph

@doc """
"""
@spec build(Graph.t) :: Graph.t
def build(graph) do
new_graph = update_in(graph.nodes, &start_nodes/1)
connect_nodes(new_graph)
new_graph
graph
|> update_nodes_relations
|> start_nodes
|> connect_nodes
end

#private

@spec start_nodes(%{key: [key: term]}) :: %{key: [key: term]}
defp start_nodes(nodes) do
nodes
|> Enum.map(&start_node/1)
|> Map.new
@spec update_nodes_relations(Graph.t) :: Graph.t
defp update_nodes_relations(graph) do
update_node_func =
fn({node, params}) ->
new_params =
params
|> Keyword.put(:before_nodes, Graph.get_before_nodes(graph, node))
|> Keyword.put(:after_nodes, Graph.get_after_nodes(graph, node))

{node, new_params}
end

update_in(graph.nodes, &(&1 |> Enum.map(update_node_func) |> Map.new))
end

@spec start_nodes(Graph.t) :: Graph.t
defp start_nodes(graph) do
update_in(graph.nodes, fn(nodes) ->
nodes
|> Enum.map(&start_node/1)
|> Map.new
end)
end

@spec start_node({atom, [key: any]}) :: {atom, [key: any]}
defp start_node({node, params = [type: type]}) do
params = Keyword.put(params, :nid, node)
{:ok, pid} =
case type do
:broadcast -> Broadcast.start_link(params)
Expand All @@ -34,20 +54,21 @@ defmodule Exstreme.GraphBuilder do
{node, Keyword.put(params, :pid, pid)}
end

@spec connect_nodes(Graph.t) :: no_return
defp connect_nodes(%Graph{nodes: nodes, connections: connections}) do
Enum.each(connections, &connect_pair/1)
@spec connect_nodes(Graph.t) :: Graph.t
defp connect_nodes(graph = %Graph{nodes: nodes, connections: connections}) do
Enum.each(connections, &(connect_pair(&1, nodes)))
graph
end

@spec connect_pair({atom, [atom]}) :: no_return
defp connect_pair({from, to}) when is_list(to) do
@spec connect_pair({atom, [atom]}, %{key: [key: term]}) :: no_return
defp connect_pair({from, to}, nodes) when is_list(to) do
Enum.map(1..Enum.count(to), fn(_) -> from end)
|> Enum.zip(to)
|> Enum.each(&connect_pair/1)
|> Enum.each(&(connect_pair(&1, nodes)))
end

@spec connect_pair({atom, atom}) :: no_return
defp connect_pair({from, to}) when is_atom(to) do
@spec connect_pair({atom, atom}, %{key: [key: term]}) :: no_return
defp connect_pair({from, to}, nodes) when is_atom(to) do
[pid: pid_from] = nodes[from]
[pid: pid_to] = nodes[to]
GenServer.cast(pid_from, {:connect, pid_to})
Expand Down
9 changes: 9 additions & 0 deletions test/exstreme/graph_builder_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Exstreme.GraphBuilderTest do
use ExUnit.Case
use Exstreme.Common
alias Exstreme.GraphBuilder
doctest Exstreme.GraphBuilder

test "" do
end
end
4 changes: 4 additions & 0 deletions test/exstreme/graph_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ defmodule Exstreme.GraphTest do
test "the start node is n1" do
assert Graph.find_start_node(graph_many_nodes) == [:n1]
end

test "" do
assert Graph.get_before_nodes(graph_many_nodes, :f1) == [:n3, :n4]
end
end

0 comments on commit 7f1074a

Please sign in to comment.