-
-
Notifications
You must be signed in to change notification settings - Fork 348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add EventStream class based on 'pulse' prototype #1990
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -784,3 +784,97 @@ def statistics(self): | |
return _ConditionStatistics( | ||
tasks_waiting=len(self._lot), lock_statistics=self._lock.statistics() | ||
) | ||
|
||
|
||
@attr.s | ||
class EventStream(metaclass=Final): | ||
"""A concurrency primitive for a sequence of events. | ||
|
||
Multiple tasks can subscribe for events on the stream using an ``async | ||
for`` loop:: | ||
|
||
events = EventStream() | ||
|
||
... | ||
|
||
async for _ in events.subscribe(): | ||
... | ||
|
||
On each loop iteration, a subcriber will be blocked if there are no new | ||
events on the stream. An event can be "fired" on a stream, which causes | ||
subscribers to awake:: | ||
|
||
events.fire() | ||
|
||
By default, events are coalesced, but will never be lost. That is, if any | ||
events are fired while a subscriber is processing its last wakeup, that | ||
subscriber will not block on the next loop iteration. | ||
|
||
Note that EventStream does not hold any data items associated with events. | ||
However subscribe() does yield integer indices that indicate a position | ||
in the event stream, which could be used. fire() returns the index of the | ||
event added to the stream. | ||
|
||
""" | ||
_write_cursor = attr.ib(default=-1) | ||
_wakeup = attr.ib(default=None) | ||
_closed = attr.ib(default=False) | ||
|
||
def close(self): | ||
"""Close the stream. | ||
|
||
This causes all subscribers to terminate once they have consumed | ||
all events. | ||
""" | ||
self._closed = True | ||
self._wake() | ||
|
||
def _wake(self): | ||
"""Wake blocked tasks.""" | ||
if self._wakeup is not None: | ||
self._wakeup.set() | ||
self._wakeup = None | ||
|
||
def fire(self): | ||
"""Fire an event on the stream.""" | ||
if self._closed: | ||
raise RuntimeError( | ||
"Cannot fire an event on a closed event stream." | ||
) | ||
self._write_cursor += 1 | ||
self._wake() | ||
return self._write_cursor | ||
|
||
async def _wait(self): | ||
"""Wait for the next wakeup. | ||
|
||
We lazily create the Event object to block on if one does not yet | ||
exist; this avoids creating event objects that are never awaited. | ||
|
||
""" | ||
if self._wakeup is None: | ||
self._wakeup = trio.Event() | ||
await self._wakeup.wait() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you're worried about efficiency, then also consider implementing this with a |
||
|
||
async def subscribe(self, from_start=False, coalesce=True): | ||
"""Subscribe for events on the stream. | ||
|
||
If from_start is True, then subscribe for events from the start of | ||
the stream. | ||
|
||
If coalesce is True, then each iteration 'consumes' all previous | ||
events; otherwise, each iteration consumes just one event. | ||
""" | ||
read_cursor = -1 if from_start else self._write_cursor | ||
while True: | ||
if self._write_cursor > read_cursor: | ||
if coalesce: | ||
read_cursor = self._write_cursor | ||
else: | ||
read_cursor += 1 | ||
yield read_cursor | ||
else: | ||
if self._closed: | ||
return | ||
else: | ||
await self._wait() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ | |
|
||
from .. import _core | ||
from .. import _timeouts | ||
from .._timeouts import sleep_forever, move_on_after | ||
from .._timeouts import sleep_forever, move_on_after, sleep | ||
from .._sync import * | ||
|
||
|
||
|
@@ -568,3 +568,126 @@ async def lock_taker(): | |
await wait_all_tasks_blocked() | ||
assert record == ["started"] | ||
lock_like.release() | ||
|
||
|
||
async def test_EventStream_basics(): | ||
p = EventStream() | ||
|
||
wakeups = 0 | ||
|
||
async def background(): | ||
nonlocal wakeups | ||
async for i in p.subscribe(): | ||
wakeups += 1 | ||
|
||
async with _core.open_nursery() as nursery: | ||
nursery.start_soon(background) | ||
|
||
# The event stream starts in a blocked state (no event fired) | ||
await wait_all_tasks_blocked() | ||
assert wakeups == 0 | ||
|
||
# Calling fire() lets it run: | ||
p.fire() | ||
await wait_all_tasks_blocked() | ||
assert wakeups == 1 | ||
|
||
# Multiple events are coalesced into one: | ||
p.fire() | ||
p.fire() | ||
p.fire() | ||
await wait_all_tasks_blocked() | ||
assert wakeups == 2 | ||
|
||
p.close() | ||
|
||
|
||
async def test_EventStream_while_task_is_elsewhere(autojump_clock): | ||
p = EventStream() | ||
|
||
wakeups = 0 | ||
|
||
async def background(): | ||
nonlocal wakeups | ||
async for _ in p.subscribe(): | ||
wakeups += 1 | ||
await sleep(10) | ||
|
||
async with _core.open_nursery() as nursery: | ||
nursery.start_soon(background) | ||
|
||
# Double-check that it's all idle and settled waiting for a event | ||
await sleep(5) | ||
assert wakeups == 0 | ||
await sleep(10) | ||
assert wakeups == 0 | ||
|
||
# Wake it up | ||
p.fire() | ||
|
||
# Now it's sitting in sleep()... | ||
await sleep(5) | ||
assert wakeups == 1 | ||
|
||
# ...when another event arrives. | ||
p.fire() | ||
|
||
# It still wakes up though | ||
await sleep(10) | ||
assert wakeups == 2 | ||
|
||
p.close() | ||
|
||
|
||
async def test_EventStream_subscribe_independence(autojump_clock): | ||
p = EventStream() | ||
|
||
wakeups = [0, 0] | ||
|
||
async def background(i, sleep_time): | ||
nonlocal wakeups | ||
async for _ in p.subscribe(): | ||
wakeups[i] += 1 | ||
await sleep(sleep_time) | ||
|
||
try: | ||
async with _core.open_nursery() as nursery: | ||
nursery.start_soon(background, 0, 10) | ||
nursery.start_soon(background, 1, 100) | ||
|
||
# Initially blocked, no event fired | ||
await sleep(200) | ||
assert wakeups == [0, 0] | ||
|
||
# Firing an event wakes both tasks | ||
p.fire() | ||
await sleep(5) | ||
assert wakeups == [1, 1] | ||
|
||
# Now | ||
# task 0 is sleeping for 5 more seconds | ||
# task 1 is sleeping for 95 more seconds | ||
|
||
# Fire events at a 10s interval; task 0 will wake up for each | ||
# task 1 will only wake up after its sleep | ||
p.fire() | ||
await sleep(10) | ||
p.fire() | ||
assert wakeups == [2, 1] | ||
await sleep(100) | ||
assert wakeups == [3, 2] | ||
|
||
# Now task 0 is blocked on the next event | ||
# Task 1 is sleeping for 100s | ||
|
||
p.fire() | ||
await sleep(1) | ||
assert wakeups == [4, 2] | ||
await sleep(100) | ||
assert wakeups == [4, 3] | ||
|
||
p.close() | ||
except: | ||
import traceback | ||
traceback.print_exc() | ||
raise | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was pytest not printing the traceback properly somehow while you were developing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it was reporting a ResourceWarning about an async generator being disposed:
I didn't dig into what you're doing here, and it sounds a bit odd to me, but I rolled with it. This is one reason I added the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Huh, ok. A lot going on here :-) I don't see how the Also, that warning ought to be harmless in this case. It's pointing out that if the async generator is holding any resources like file descriptors, then they are being cleaned up by the GC rather than explicitly. Is this a useful thing to point out? tbh I'm not sure. @oremanj, any thoughts you want to add? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd put the docs for this right after the docs for
Event
-- "if you clicked here because you were looking for multiple events, then maybe you want...". (Maybe add a cross-ref at the top of theEvent
docs too.)