Skip to content

Commit

Permalink
added funnels and broadcasts
Browse files Browse the repository at this point in the history
  • Loading branch information
mrkaspa committed Mar 7, 2016
1 parent d68bef5 commit 7a8abd7
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 28 deletions.
81 changes: 72 additions & 9 deletions lib/exstreme/stream_graph.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,57 @@ defmodule Exstreme.StreamGraph do
"""
def create_node(graph = %Graph{nodes: nodes}, params \\ []) do
key = next_node_key(nodes)

new_graph = update_in(graph.nodes, &(Map.put(&1, key, params)))
{new_graph, key}
end

@doc """
"""
def create_broadcast(graph = %Graph{nodes: nodes}, params \\ []) do
key = next_broadcast_key(nodes)

new_graph = update_in(graph.nodes, &(Map.put(&1, key, params)))
{new_graph, key}
end

@doc """
"""
def create_funnel(graph = %Graph{nodes: nodes}, params \\ []) do
key = next_funnel_key(nodes)

new_graph = update_in(graph.nodes, &(Map.put(&1, key, params)))
{new_graph, key}
end

@doc """
"""
def add_connection(graph = %Graph{nodes: nodes}, start, finish) when start != finish do
if Map.has_key?(nodes, start) and Map.has_key?(nodes, finish) do
if has_node(nodes, start) && has_node(nodes, finish) do
update_in(graph.connections, store_connection_fn(start, finish))
else
raise ArgumentError, message: "nodes not found"
end
end

#private

defp store_connection_fn(start, finish) do
fn(connections) ->
add_connection = fn(keywords, start, finish) ->
Keyword.update(keywords, start, finish, fn(value)->
if is_list(value) do
[finish | value]
else
[finish, value]
end
end)
end

connections
|> validate_repeated(start, finish)
|> validate_repeated(finish, start)
|> validate_position(start, :start)
|> validate_position(finish, :end)
|> Keyword.put(start, finish)
|> add_connection.(start, finish)
end
end

Expand All @@ -51,39 +78,75 @@ defmodule Exstreme.StreamGraph do
defp validate_position(connections, node, position) do
case node|> Atom.to_string |> String.first do
"n" -> validate_position_node(connections, node, position)
# "b" ->
# "f" ->
"b" -> validate_position_broadcast(connections, node, position)
"f" -> validate_position_funnel(connections, node, position)
_ -> raise ArgumentError, message: "invalid node"
end
end

defp validate_position_node(connections, node, :start) do
validate_position_start(connections, node,"the node can't be twice at start position #{node}")
end

defp validate_position_node(connections, node, :end) do
validate_position_end(connections, node,"the node can't be twice at end position")
end

defp validate_position_broadcast(connections, _, :start), do: connections

defp validate_position_broadcast(connections, bct, :end) do
validate_position_end(connections, bct, "the broadcast can't be twice at end position")
end

defp validate_position_funnel(connections, node, :start) do
validate_position_start(connections, node,"the funnel can't be twice at start position #{node}")
end

defp validate_position_funnel(connections, node, :end), do: connections

defp validate_position_start(connections, node, msg) do
exist =
connections
|> Keyword.has_key?(node)
if exist do
raise ArgumentError, message: "the node can't be twice at start position"
raise ArgumentError, message: msg
else
connections
end
end

defp validate_position_node(connections, node, :end) do
defp validate_position_end(connections, node, msg) do
exist =
connections
|> Keyword.values
|> Enum.member?(node)
if exist do
raise ArgumentError, message: "the node can't be twice at end position"
raise ArgumentError, message: msg
else
connections
end
end

defp has_node(nodes, node) do
if Map.has_key?(nodes, node) do
true
else
raise ArgumentError, message: "node #{node} not found"
end
end

defp next_node_key(nodes) do
next_key(nodes, "n")
end

defp next_broadcast_key(nodes) do
next_key(nodes, "b")
end

defp next_funnel_key(nodes) do
next_key(nodes, "f")
end

defp next_key(map, letter) do
count =
map
Expand Down
87 changes: 68 additions & 19 deletions test/exstreme/stream_graph_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@ defmodule Exstreme.StreamGraphTest do
def params, do: []

def create_graph do
graph = StreamGraph.create_graph(params)

{graph, n1} = graph |> StreamGraph.create_node(params)
{graph, n2} = graph |> StreamGraph.create_node(params)

graph
|> StreamGraph.add_connection(n1, n2)
with(
graph <- StreamGraph.create_graph(params),
{graph, n1} <- StreamGraph.create_node(graph, params),
{graph, n2} <- StreamGraph.create_node(graph, params),
do: StreamGraph.add_connection(graph, n1, n2)
)
end

test "creates a valid graph struct" do
graph = %Graph{
compare_graph = %Graph{
nodes: %{n1: [], n2: []},
connections: [{:n1, :n2}]
}
assert create_graph == graph
assert create_graph == compare_graph
end

test "throws an error when adding again the relation" do
Expand All @@ -45,15 +44,65 @@ defmodule Exstreme.StreamGraphTest do
end
end

test "can create n3 and add a relatio between n2 and n3" do
graph = %Graph{
nodes: %{n1: [], n2: []},
connections: [{:n1, :n2}]
}
test "can create n3 and add a relation between n2 and n3" do
compare_graph = %Graph{
nodes: %{n1: [], n2: [], n3: []},
connections: [{:n1, :n2}, {:n2, :n3}]
}

{new_graph, _} = create_graph |> StreamGraph.create_node(params)
new_graph =
new_graph
|> StreamGraph.add_connection(:n2, :n3)
end
new_graph =
with(
{graph, n3} <- StreamGraph.create_node(create_graph, params),
do: graph |> StreamGraph.add_connection(:n2, n3)
)

assert new_graph == compare_graph
end

test "can add a broadcast an many nodes to the broadcast" do
compare_graph = %Graph{
nodes: %{n1: [], n2: [], b1: [], n3: [], n4: []},
connections: [{:n1, :n2}, {:n2, :b1}, {:b1, [:n4, :n3]}]
}

new_graph =
with(
{graph, b1} <- StreamGraph.create_broadcast(create_graph, params),
{graph, n3} <- StreamGraph.create_node(graph, params),
{graph, n4} <- StreamGraph.create_node(graph, params)
) do
graph
|> StreamGraph.add_connection(:n2, b1)
|> StreamGraph.add_connection(b1, n3)
|> StreamGraph.add_connection(b1, n4)
end

assert new_graph == compare_graph
end

test "can add a funnel" do
compare_graph = %Graph{
nodes: %{n1: [], n2: [], b1: [], n3: [], n4: [], f1: [], n5: []},
connections: [{:n1, :n2}, {:n2, :b1}, {:b1, [:n4, :n3]}, {:n3, :f1}, {:n4, :f1}, {:f1, :n5}]
}

new_graph =
with(
{graph, b1} <- StreamGraph.create_broadcast(create_graph, params),
{graph, n3} <- StreamGraph.create_node(graph, params),
{graph, n4} <- StreamGraph.create_node(graph, params),
{graph, f1} <- StreamGraph.create_funnel(graph, params),
{graph, n5} <- StreamGraph.create_node(graph, params)
) do
graph
|> StreamGraph.add_connection(:n2, b1)
|> StreamGraph.add_connection(b1, n3)
|> StreamGraph.add_connection(b1, n4)
|> StreamGraph.add_connection(n3, f1)
|> StreamGraph.add_connection(n4, f1)
|> StreamGraph.add_connection(f1, n5)
end

assert new_graph == compare_graph
end
end

0 comments on commit 7a8abd7

Please sign in to comment.