Combining Coroutines with Threads and Processes¶
A lot of existing libraries are not ready to be used with
asyncio
natively. They may block, or depend on concurrency
features not available through the module. It is still possible to use
those libraries in an application based on asyncio
by using an
executor from concurrent.futures
to run the code either in a
separate thread or a separate process.
Threads¶
The run_in_executor()
method of the event loop takes an executor
instance, a regular callable to invoke, and any arguments to be passed
to the callable. It returns a Future
that can be used to wait
for the function to finish its work and return something. If no
executor is passed in, a ThreadPoolExecutor
is created. This
example explicitly creates an executor to limit the number of worker
threads it will have available.
A ThreadPoolExecutor
starts its worker threads and then calls
each of the provided functions once in a thread. This example shows
how to combine run_in_executor()
and wait()
to have a
coroutine yield control to the event loop while blocking functions run
in separate threads, and then wake back up when those functions are
finished.
import asyncio
import concurrent.futures
import logging
import sys
import time
def blocks(n):
log = logging.getLogger('blocks({})'.format(n))
log.info('running')
time.sleep(0.1)
log.info('done')
return n ** 2
async def run_blocking_tasks(executor):
log = logging.getLogger('run_blocking_tasks')
log.info('starting')
log.info('creating executor tasks')
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, blocks, i)
for i in range(6)
]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
if __name__ == '__main__':
# Configure logging to show the name of the thread
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
# Create a limited thread pool.
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
asyncio_executor_thread.py
uses logging
to conveniently
indicate which thread and function are producing each log
message. Because a separate logger is used in each call to
blocks()
, the output clearly shows the same threads being reused
to call multiple copies of the function with different arguments.
$ python3 asyncio_executor_thread.py
MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0 blocks(0): running
ThreadPoolExecutor-0_1 blocks(1): running
ThreadPoolExecutor-0_2 blocks(2): running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_0 blocks(0): done
ThreadPoolExecutor-0_1 blocks(1): done
ThreadPoolExecutor-0_2 blocks(2): done
ThreadPoolExecutor-0_0 blocks(3): running
ThreadPoolExecutor-0_1 blocks(4): running
ThreadPoolExecutor-0_2 blocks(5): running
ThreadPoolExecutor-0_0 blocks(3): done
ThreadPoolExecutor-0_2 blocks(5): done
ThreadPoolExecutor-0_1 blocks(4): done
MainThread run_blocking_tasks: results: [0, 9, 16, 25, 1, 4]
MainThread run_blocking_tasks: exiting
Processes¶
A ProcessPoolExecutor
works in much the same way, creating a
set of worker processes instead of threads. Using separate processes
requires more system resources, but for computationally-intensive
operations it can make sense to run a separate task on each CPU core.
# changes from asyncio_executor_thread.py
if __name__ == '__main__':
# Configure logging to show the id of the process
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='PID %(process)5s %(name)18s: %(message)s',
stream=sys.stderr,
)
# Create a limited process pool.
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
The only change needed to move from threads to processes is to create a different type of executor. This example also changes the logging format string to include the process id instead of the thread name, to demonstrate that the tasks are in fact running in separate processes.
$ python3 asyncio_executor_process.py
PID 40498 run_blocking_tasks: starting
PID 40498 run_blocking_tasks: creating executor tasks
PID 40498 run_blocking_tasks: waiting for executor tasks
PID 40499 blocks(0): running
PID 40500 blocks(1): running
PID 40501 blocks(2): running
PID 40499 blocks(0): done
PID 40500 blocks(1): done
PID 40501 blocks(2): done
PID 40500 blocks(3): running
PID 40499 blocks(4): running
PID 40501 blocks(5): running
PID 40499 blocks(4): done
PID 40500 blocks(3): done
PID 40501 blocks(5): done
PID 40498 run_blocking_tasks: results: [1, 4, 9, 0, 16, 25]
PID 40498 run_blocking_tasks: exiting