Skip to content

Commit

Permalink
working
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkaspa committed Jun 14, 2016
1 parent 7f1074a commit ef653c5
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 77 deletions.
59 changes: 30 additions & 29 deletions lib/exstreme/gnode/behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,44 @@ defmodule Exstreme.GNode.Behaviour do
defmacro __using__(_) do
quote do
use GenServer
import Exstreme.GNode.Behaviour
end
end

defmodule Data do
@moduledoc """
"""
alias __MODULE__
defmodule Data do
@moduledoc """
"""
alias __MODULE__

@type graph_func :: ((term, Data.t) -> {:ok, term} | :error)
@type graph_func :: ((term, Data.t) -> {:ok, term} | :error)

@type t :: %Data{next: [pid], pid: pid, nid: atom, func: graph_func, funnel_queue: [term], received_counter: non_neg_integer, sent_counter: non_neg_integer, opts: [key: term]}
defstruct [next: [], pid: nil, nid: nil, func: nil, funnel_queue: [], received_counter: 0, sent_counter: 0, opts: []]
@type t :: %Data{next: [pid], pid: pid, nid: atom, func: graph_func, funnel_queue: [term], received_counter: non_neg_integer, sent_counter: non_neg_integer, opts: [key: term]}
defstruct [next: [], pid: nil, nid: nil, func: nil, funnel_queue: [], received_counter: 0, sent_counter: 0, opts: []]

#TODO use counters
end
#TODO use counters
end

def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts)
end

def init(params = [func: func, nid: nid]) do
opts = Keyword.drop(params, [:func, :type, :nid])
{:ok, %Data{func: func, pid: self, nid: nid, opts: opts}}
end
def init(params) do
func = Keyword.get(params, :func)
nid = Keyword.get(params, :nid)
opts = Keyword.drop(params, [:func, :type, :nid])
{:ok, %Data{func: func, pid: self, nid: nid, opts: opts}}
end

def handle_cast({:connect, to_pid}, data) do
new_data = update_in(data.next, fn(next) -> [to_pid | next] end)
{:noreply, new_data}
end
def handle_cast({:connect, to_pid}, data) do
new_data = update_in(data.next, fn(next) -> [to_pid | next] end)
{:noreply, new_data}
end

def handle_cast({:send_next, next, msg}, data) do
Enum.each(next, &(GenServer.cast(&1, {:next, self, msg})))
{:noreply, data}
end
def handle_cast({:send_next, next, msg}, data) do
Enum.each(next, &(GenServer.cast(&1, {:next, data, msg})))
{:noreply, data}
end

def send_next(pid, next, msg) do
GenServer.cast(pid, {:send_next, next, msg})
def send_next(pid, next, msg) do
GenServer.cast(pid, {:send_next, next, msg})
end
end
end
end
20 changes: 9 additions & 11 deletions lib/exstreme/gnode/funnel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ defmodule Exstreme.GNode.Funnel do
use Exstreme.GNode.Behaviour

def handle_cast({:next, from_data, msg}, data) do
{res, new_queue} =
{result, new_queue} =
data.funnel_queue
|> add_queue(from_data.nid, msg)
|> get_queue(data.opts[:before_nodes])
send_message(res, data.next)
{:noreply, update_in(data.funnel_queue, new_queue)}
send_message(result, data)
{:noreply, update_in(data.funnel_queue, fn(_) -> new_queue end)}
end

#private

defp send_message(res, next) do
if res != nil do
new_msg =
res
|> Map.values
|> List.to_tuple
send_next(self, next, new_msg)
defp send_message(result, data) do
if result != nil do
new_msg = Map.values(result)
{:ok, result} = data.func.(new_msg, data)
send_next(self, data.next, result)
end
end

Expand All @@ -27,7 +25,7 @@ defmodule Exstreme.GNode.Funnel do
if idx != nil do
List.update_at(queue, idx, &(Map.put(&1, from, msg)))
else
[queue | Map.new |> Map.put(from, msg)]
[Map.new |> Map.put(from, msg) | queue]
end
end

Expand Down
37 changes: 26 additions & 11 deletions lib/exstreme/graph.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,40 @@ defmodule Exstreme.Graph do
|> Enum.filter(is_first?)
end

@doc """
"""
@spec find_last_node(t) :: [atom]
def find_last_node(%Graph{nodes: nodes, connections: connections}) do
is_last? =
fn(node) ->
not(at_first?(connections, node)) and at_last?(connections, node)
end

nodes
|> Map.keys
|> Enum.filter(is_last?)
end

@spec get_before_nodes(t, atom) :: [atom]
def get_before_nodes(%Graph{connections: connections}, node) do
compare_func =
fn(current_node, {_from, to}) ->
current_node == to
fn(current_node, {from, to}) ->
{current_node == to, from}
end
Enum.reduce(connections, [], fn(connection, res) ->
res ++ get_nodes_func(node, connection, res, compare_func)
end)
List.flatten(get_nodes_func(node, connection, res, compare_func), res)
end) |> Enum.uniq
end

@spec get_after_nodes(t, atom) :: [atom]
def get_after_nodes(%Graph{nodes: nodes, connections: connections}, node) do
compare_func =
fn(current_node, {from, _to}) ->
current_node == from
fn(current_node, {from, to}) ->
{current_node == from, to}
end
Enum.reduce(connections, [], fn(connection, res) ->
res ++ get_nodes_func(node, connection, res, compare_func)
end)
end) |> Enum.uniq
end

# private
Expand Down Expand Up @@ -108,14 +122,15 @@ defmodule Exstreme.Graph do
defp get_nodes_func(node, pair = {from, to}, res, func) do
case to do
to when is_atom(to) ->
if func.(node, pair) do
[res | node]
{ok, add_node} = func.(node, pair)
if ok do
[add_node | res]
else
res
end
to when is_list(to) ->
Enum.map(to, fn(current_to) ->
get_nodes_func(node, {from, current_to}, res, func)
Enum.reduce(to, res, fn(current_to, new_res) ->
List.flatten(get_nodes_func(node, {from, current_to}, new_res, func), new_res)
end)
end
end
Expand Down
9 changes: 5 additions & 4 deletions lib/exstreme/graph_builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ defmodule Exstreme.GraphBuilder do
end)
end

@spec start_node({atom, [key: any]}) :: {atom, [key: any]}
defp start_node({node, params = [type: type]}) do
@spec start_node({atom, [term: any]}) :: {atom, [key: any]}
defp start_node({node, params}) do
type = Keyword.get(params, :type)
params = Keyword.put(params, :nid, node)
{:ok, pid} =
case type do
Expand All @@ -69,8 +70,8 @@ defmodule Exstreme.GraphBuilder do

@spec connect_pair({atom, atom}, %{key: [key: term]}) :: no_return
defp connect_pair({from, to}, nodes) when is_atom(to) do
[pid: pid_from] = nodes[from]
[pid: pid_to] = nodes[to]
pid_from = Keyword.get(nodes[from], :pid)
pid_to = Keyword.get(nodes[to], :pid)
GenServer.cast(pid_from, {:connect, pid_to})
end
end
33 changes: 32 additions & 1 deletion test/exstreme/graph_builder_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,39 @@ defmodule Exstreme.GraphBuilderTest do
use ExUnit.Case
use Exstreme.Common
alias Exstreme.GraphBuilder
alias Exstreme.Graph
doctest Exstreme.GraphBuilder

test "" do
test "creates a graph" do
graph_built = GraphBuilder.build(create_graph)
assert graph_built != create_graph
Enum.each(graph_built.nodes, fn({_, params}) ->
assert Keyword.has_key?(params, :pid)
assert Keyword.has_key?(params, :after_nodes)
assert Keyword.has_key?(params, :before_nodes)
assert Keyword.get(params, :pid) != nil
end)
end

test "sends a message to the graph with common nodes" do
graph_built = GraphBuilder.build(create_graph)
[start_node] = Graph.find_start_node(graph_built)
[last_node] = Graph.find_last_node(graph_built)
start_node_pid = Keyword.get(graph_built.nodes[start_node], :pid)
last_node_pid = Keyword.get(graph_built.nodes[last_node], :pid)
GenServer.cast(last_node_pid, {:connect, self})
GenServer.cast(start_node_pid, {:next, self, {:sum, 0}})
assert_receive {_, {:next, last_node_pid, {:sum, 2}}}
end

test "sends a message to the graph with many nodes" do
graph_built = GraphBuilder.build(graph_many_nodes)
[start_node] = Graph.find_start_node(graph_built)
[last_node] = Graph.find_last_node(graph_built)
start_node_pid = Keyword.get(graph_built.nodes[start_node], :pid)
last_node_pid = Keyword.get(graph_built.nodes[last_node], :pid)
GenServer.cast(last_node_pid, {:connect, self})
GenServer.cast(start_node_pid, {:next, self, {:sum, 0}})
assert_receive {_, {:next, last_node_pid, {:sum, 7}}}
end
end
14 changes: 7 additions & 7 deletions test/exstreme/graph_creator_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Exstreme.GraphCreatorTest do

test "creates a valid graph struct" do
compare_graph = %Graph{
nodes: %{n1: [], n2: []},
nodes: %{n1: params, n2: params},
connections: %{n1: :n2}
}
assert create_graph == compare_graph
Expand Down Expand Up @@ -36,7 +36,7 @@ defmodule Exstreme.GraphCreatorTest do

test "can create n3 and add a relation between n2 and n3" do
compare_graph = %Graph{
nodes: %{n1: [], n2: [], n3: []},
nodes: %{n1: params, n2: params, n3: params},
connections: %{n1: :n2, n2: :n3}
}

Expand All @@ -48,11 +48,11 @@ defmodule Exstreme.GraphCreatorTest do

test "can add a broadcast an many nodes to the broadcast" do
compare_graph = %Graph{
nodes: %{n1: [], n2: [], b1: [], n3: [], n4: []},
nodes: %{n1: params, n2: params, b1: params_broadcast, n3: params, n4: params},
connections: %{n1: :n2, n2: :b1, b1: [:n4, :n3]}
}

{graph, b1} = GraphCreator.create_broadcast(create_graph, params)
{graph, b1} = GraphCreator.create_broadcast(create_graph, params_broadcast)
{graph, n3} = GraphCreator.create_node(graph, params)
{graph, n4} = GraphCreator.create_node(graph, params)

Expand All @@ -67,14 +67,14 @@ defmodule Exstreme.GraphCreatorTest do

test "can add a funnel" do
compare_graph = %Graph{
nodes: %{n1: [], n2: [], b1: [], n3: [], n4: [], f1: [], n5: []},
nodes: %{n1: params, n2: params, b1: params_broadcast, n3: params, n4: params, f1: params_funnel, n5: params},
connections: %{n1: :n2, n2: :b1, b1: [:n4, :n3], n3: :f1, n4: :f1, f1: :n5}
}

{graph, b1} = GraphCreator.create_broadcast(create_graph, params)
{graph, b1} = GraphCreator.create_broadcast(create_graph, params_broadcast)
{graph, n3} = GraphCreator.create_node(graph, params)
{graph, n4} = GraphCreator.create_node(graph, params)
{graph, f1} = GraphCreator.create_funnel(graph, params)
{graph, f1} = GraphCreator.create_funnel(graph, params_funnel)
{graph, n5} = GraphCreator.create_node(graph, params)

new_graph =
Expand Down
14 changes: 12 additions & 2 deletions test/exstreme/graph_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,17 @@ defmodule Exstreme.GraphTest do
assert Graph.find_start_node(graph_many_nodes) == [:n1]
end

test "" do
assert Graph.get_before_nodes(graph_many_nodes, :f1) == [:n3, :n4]
test "the last node is n5" do
assert Graph.find_last_node(graph_many_nodes) == [:n5]
end

test "the nodes before f1 are n4 and n3" do
res = Graph.get_before_nodes(graph_many_nodes, :f1)
assert res == [:n4, :n3]
end

test "the nodes after f1 are n4 and n3" do
res = Graph.get_after_nodes(graph_many_nodes, :f1)
assert res == [:n5]
end
end
28 changes: 16 additions & 12 deletions test/support/common.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ defmodule Exstreme.Common do
using do
quote do
def graph_many_nodes do
graph = GraphCreator.create_graph(params)
graph = GraphCreator.create_graph([])
{graph, n1} = GraphCreator.create_node(graph, params)
{graph, n2} = GraphCreator.create_node(graph, params)
{graph, b1} = GraphCreator.create_broadcast(graph, params)
{graph, b1} = GraphCreator.create_broadcast(graph, params_broadcast)
{graph, n3} = GraphCreator.create_node(graph, params)
{graph, n4} = GraphCreator.create_node(graph, params)
{graph, f1} = GraphCreator.create_funnel(graph, params)
{graph, f1} = GraphCreator.create_funnel(graph, params_funnel)
{graph, n5} = GraphCreator.create_node(graph, params)

graph
Expand All @@ -27,43 +27,47 @@ defmodule Exstreme.Common do
# invalid graphs

def graph_one_node_no_connections do
graph = GraphCreator.create_graph(params)
graph = GraphCreator.create_graph([])
{graph, _n1} = GraphCreator.create_node(graph, params)
graph
end

def graph_no_connections, do: GraphCreator.create_graph(params)
def graph_no_connections, do: GraphCreator.create_graph([])

def graph_start_with_broadcast do
graph = GraphCreator.create_graph(params)
{graph, b1} = GraphCreator.create_broadcast(graph, params)
graph = GraphCreator.create_graph([])
{graph, b1} = GraphCreator.create_broadcast(graph, params_broadcast)
{graph, n1} = GraphCreator.create_node(graph, params)
GraphCreator.add_connection(graph, b1, n1)
end

def graph_start_with_funnnel do
graph = GraphCreator.create_graph(params)
{graph, f1} = GraphCreator.create_funnel(graph, params)
graph = GraphCreator.create_graph([])
{graph, f1} = GraphCreator.create_funnel(graph, params_funnel)
{graph, n1} = GraphCreator.create_node(graph, params)
GraphCreator.add_connection(graph, f1, n1)
end

def graph_unconnected_nodes do
graph = GraphCreator.create_graph(params)
graph = GraphCreator.create_graph([])
{graph, n1} = GraphCreator.create_node(graph, params)
{graph, n2} = GraphCreator.create_node(graph, params)
{graph, _n3} = GraphCreator.create_node(graph, params)
GraphCreator.add_connection(graph, n1, n2)
end

defp create_graph do
graph = GraphCreator.create_graph(params)
graph = GraphCreator.create_graph([])
{graph, n1} = GraphCreator.create_node(graph, params)
{graph, n2} = GraphCreator.create_node(graph, params)
GraphCreator.add_connection(graph, n1, n2)
end

def params, do: []
def params, do: [type: :common, func: fn({:sum, acc}, _) -> {:ok, {:sum, acc + 1}} end]

def params_funnel, do: [type: :funnel, func: fn(values, _) -> {:ok, {:sum, Enum.reduce(values, 0, fn({:sum, num}, acc) -> num + acc end)}} end]

def params_broadcast, do: [type: :broadcast]
end
end
end

0 comments on commit ef653c5

Please sign in to comment.