Skip to content

Commit

Permalink
Raise error in Choice on duplicate outlets
Browse files Browse the repository at this point in the history
  • Loading branch information
gtopper committed Nov 13, 2024
1 parent 22765a0 commit d8416ff
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions storey/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from asyncio import Task
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Union
from typing import Any, Callable, Collection, Dict, Iterable, List, Optional, Set, Union

import aiohttp

Expand Down Expand Up @@ -363,19 +363,21 @@ def _init(self):
# TODO: hacky way of supporting mlrun preview, which replaces targets with a DFTarget
self._passthrough_for_preview = list(self._name_to_outlet) == ["dataframe"]

def select_outlets(self, event) -> List[str]:
def select_outlets(self, event) -> Collection[str]:
"""
Override this method to route events based on a customer logic. The default implementation will route all
events to all outlets.
"""
return list(self._name_to_outlet.keys())
return self._name_to_outlet.keys()

async def _do(self, event):
if event is _termination_obj:
return await self._do_downstream(_termination_obj)
else:
event_body = event if self._full_event else event.body
outlet_names = self.select_outlets(event_body)
if len(set(outlet_names)) != len(outlet_names):
raise ValueError(f"select_outlets() returned duplicate outlets: {outlet_names}")
outlets = []
if self._passthrough_for_preview:
outlet = self._name_to_outlet["dataframe"]
Expand Down

0 comments on commit d8416ff

Please sign in to comment.