-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
executable file
·77 lines (62 loc) · 2.35 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python3
# Combining coroutines running in an asyncio event loop with
# blocking tasks in thread pool and process pool executors.
#
# Based on https://pymotw.com/3/asyncio/executors.html, but this version runs both
# threads and processes at the same time and interleaves them with asyncio coroutines.
#
# All appears to be working.
#
import uasyncio
import concurrent.futures
import logging
import sys
import time
def block_task(prefix, n):
"""A blocking task to be executed in a thread or process executor"""
log = logging.getLogger(f'{prefix}_blocks({n})')
log.info('running')
time.sleep(1.0)
log.info('done')
return f'b{n ** 2}'
async def async_task(prefix, n):
"""A coroutine intended to run in the asyncio event loop to verify that it
works concurrently with the blocking tasks"""
log = logging.getLogger(f'{prefix}_asyncio({n})')
for i in range(5):
log.info(f'running {i}')
await uasyncio.sleep(0.5)
log.info('done')
return f'a{n ** 2}'
async def run_tasks(prefix, executor):
"""Runs blocking tasks in the executor and spawns off a few coroutines to run
concurrently with the blocking tasks."""
log = logging.getLogger(f'{prefix}_run_blocking_tasks')
log.info('starting')
log.info('creating executor tasks')
loop = uasyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, block_task, prefix, i)
for i in range(6)
] + [async_task(prefix, i) for i in range(3)]
log.info('waiting for executor tasks')
completed, pending = await uasyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format='PID %(process)5s %(threadName)-25s %(name)-25s: %(message)s',
stream=sys.stderr,
)
th_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
pr_executor = concurrent.futures.ProcessPoolExecutor(max_workers=3)
event_loop = uasyncio.get_event_loop()
try:
w = uasyncio.wait([run_tasks('th', th_executor),
run_tasks('pr', pr_executor)
])
event_loop.run_until_complete(w)
finally:
event_loop.close()