From 7a8abd789b10f9a50bb770cb30e65804d089bc05 Mon Sep 17 00:00:00 2001 From: mrkaspa Date: Mon, 7 Mar 2016 16:55:25 -0500 Subject: [PATCH] added funnels and broadcasts --- lib/exstreme/stream_graph.ex | 81 ++++++++++++++++++++++++--- test/exstreme/stream_graph_test.exs | 87 ++++++++++++++++++++++------- 2 files changed, 140 insertions(+), 28 deletions(-) diff --git a/lib/exstreme/stream_graph.ex b/lib/exstreme/stream_graph.ex index b35b158..31211d7 100644 --- a/lib/exstreme/stream_graph.ex +++ b/lib/exstreme/stream_graph.ex @@ -13,6 +13,25 @@ defmodule Exstreme.StreamGraph do """ def create_node(graph = %Graph{nodes: nodes}, params \\ []) do key = next_node_key(nodes) + + new_graph = update_in(graph.nodes, &(Map.put(&1, key, params))) + {new_graph, key} + end + + @doc """ + """ + def create_broadcast(graph = %Graph{nodes: nodes}, params \\ []) do + key = next_broadcast_key(nodes) + + new_graph = update_in(graph.nodes, &(Map.put(&1, key, params))) + {new_graph, key} + end + + @doc """ + """ + def create_funnel(graph = %Graph{nodes: nodes}, params \\ []) do + key = next_funnel_key(nodes) + new_graph = update_in(graph.nodes, &(Map.put(&1, key, params))) {new_graph, key} end @@ -20,10 +39,8 @@ defmodule Exstreme.StreamGraph do @doc """ """ def add_connection(graph = %Graph{nodes: nodes}, start, finish) when start != finish do - if Map.has_key?(nodes, start) and Map.has_key?(nodes, finish) do + if has_node(nodes, start) && has_node(nodes, finish) do update_in(graph.connections, store_connection_fn(start, finish)) - else - raise ArgumentError, message: "nodes not found" end end @@ -31,12 +48,22 @@ defmodule Exstreme.StreamGraph do defp store_connection_fn(start, finish) do fn(connections) -> + add_connection = fn(keywords, start, finish) -> + Keyword.update(keywords, start, finish, fn(value)-> + if is_list(value) do + [finish | value] + else + [finish, value] + end + end) + end + connections |> validate_repeated(start, finish) |> validate_repeated(finish, start) |> validate_position(start, :start) |> validate_position(finish, :end) - |> Keyword.put(start, finish) + |> add_connection.(start, finish) end end @@ -51,39 +78,75 @@ defmodule Exstreme.StreamGraph do defp validate_position(connections, node, position) do case node|> Atom.to_string |> String.first do "n" -> validate_position_node(connections, node, position) - # "b" -> - # "f" -> + "b" -> validate_position_broadcast(connections, node, position) + "f" -> validate_position_funnel(connections, node, position) _ -> raise ArgumentError, message: "invalid node" end end defp validate_position_node(connections, node, :start) do + validate_position_start(connections, node,"the node can't be twice at start position #{node}") + end + + defp validate_position_node(connections, node, :end) do + validate_position_end(connections, node,"the node can't be twice at end position") + end + + defp validate_position_broadcast(connections, _, :start), do: connections + + defp validate_position_broadcast(connections, bct, :end) do + validate_position_end(connections, bct, "the broadcast can't be twice at end position") + end + + defp validate_position_funnel(connections, node, :start) do + validate_position_start(connections, node,"the funnel can't be twice at start position #{node}") + end + + defp validate_position_funnel(connections, node, :end), do: connections + + defp validate_position_start(connections, node, msg) do exist = connections |> Keyword.has_key?(node) if exist do - raise ArgumentError, message: "the node can't be twice at start position" + raise ArgumentError, message: msg else connections end end - defp validate_position_node(connections, node, :end) do + defp validate_position_end(connections, node, msg) do exist = connections |> Keyword.values |> Enum.member?(node) if exist do - raise ArgumentError, message: "the node can't be twice at end position" + raise ArgumentError, message: msg else connections end end + defp has_node(nodes, node) do + if Map.has_key?(nodes, node) do + true + else + raise ArgumentError, message: "node #{node} not found" + end + end + defp next_node_key(nodes) do next_key(nodes, "n") end + defp next_broadcast_key(nodes) do + next_key(nodes, "b") + end + + defp next_funnel_key(nodes) do + next_key(nodes, "f") + end + defp next_key(map, letter) do count = map diff --git a/test/exstreme/stream_graph_test.exs b/test/exstreme/stream_graph_test.exs index 369f3cb..ddeaf9b 100644 --- a/test/exstreme/stream_graph_test.exs +++ b/test/exstreme/stream_graph_test.exs @@ -7,21 +7,20 @@ defmodule Exstreme.StreamGraphTest do def params, do: [] def create_graph do - graph = StreamGraph.create_graph(params) - - {graph, n1} = graph |> StreamGraph.create_node(params) - {graph, n2} = graph |> StreamGraph.create_node(params) - - graph - |> StreamGraph.add_connection(n1, n2) + with( + graph <- StreamGraph.create_graph(params), + {graph, n1} <- StreamGraph.create_node(graph, params), + {graph, n2} <- StreamGraph.create_node(graph, params), + do: StreamGraph.add_connection(graph, n1, n2) + ) end test "creates a valid graph struct" do - graph = %Graph{ + compare_graph = %Graph{ nodes: %{n1: [], n2: []}, connections: [{:n1, :n2}] } - assert create_graph == graph + assert create_graph == compare_graph end test "throws an error when adding again the relation" do @@ -45,15 +44,65 @@ defmodule Exstreme.StreamGraphTest do end end - test "can create n3 and add a relatio between n2 and n3" do - graph = %Graph{ - nodes: %{n1: [], n2: []}, - connections: [{:n1, :n2}] - } + test "can create n3 and add a relation between n2 and n3" do + compare_graph = %Graph{ + nodes: %{n1: [], n2: [], n3: []}, + connections: [{:n1, :n2}, {:n2, :n3}] + } - {new_graph, _} = create_graph |> StreamGraph.create_node(params) - new_graph = - new_graph - |> StreamGraph.add_connection(:n2, :n3) - end + new_graph = + with( + {graph, n3} <- StreamGraph.create_node(create_graph, params), + do: graph |> StreamGraph.add_connection(:n2, n3) + ) + + assert new_graph == compare_graph + end + + test "can add a broadcast an many nodes to the broadcast" do + compare_graph = %Graph{ + nodes: %{n1: [], n2: [], b1: [], n3: [], n4: []}, + connections: [{:n1, :n2}, {:n2, :b1}, {:b1, [:n4, :n3]}] + } + + new_graph = + with( + {graph, b1} <- StreamGraph.create_broadcast(create_graph, params), + {graph, n3} <- StreamGraph.create_node(graph, params), + {graph, n4} <- StreamGraph.create_node(graph, params) + ) do + graph + |> StreamGraph.add_connection(:n2, b1) + |> StreamGraph.add_connection(b1, n3) + |> StreamGraph.add_connection(b1, n4) + end + + assert new_graph == compare_graph + end + + test "can add a funnel" do + compare_graph = %Graph{ + nodes: %{n1: [], n2: [], b1: [], n3: [], n4: [], f1: [], n5: []}, + connections: [{:n1, :n2}, {:n2, :b1}, {:b1, [:n4, :n3]}, {:n3, :f1}, {:n4, :f1}, {:f1, :n5}] + } + + new_graph = + with( + {graph, b1} <- StreamGraph.create_broadcast(create_graph, params), + {graph, n3} <- StreamGraph.create_node(graph, params), + {graph, n4} <- StreamGraph.create_node(graph, params), + {graph, f1} <- StreamGraph.create_funnel(graph, params), + {graph, n5} <- StreamGraph.create_node(graph, params) + ) do + graph + |> StreamGraph.add_connection(:n2, b1) + |> StreamGraph.add_connection(b1, n3) + |> StreamGraph.add_connection(b1, n4) + |> StreamGraph.add_connection(n3, f1) + |> StreamGraph.add_connection(n4, f1) + |> StreamGraph.add_connection(f1, n5) + end + + assert new_graph == compare_graph + end end