协程¶
协程是编写 Tornado 中异步代码的推荐方式。协程使用 Python 的 await
关键字来挂起和恢复执行,而不是回调链(在像 gevent 这样的框架中看到的协作轻量级线程有时也被称为协程,但在 Tornado 中,所有协程都使用显式上下文切换,并被调用为异步函数)。
协程几乎与同步代码一样简单,但没有线程的开销。它们还 使并发更容易 通过减少上下文切换可能发生的次数来进行推理。
示例
async def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = await http_client.fetch(url)
return response.body
原生协程与装饰协程¶
Python 3.5 引入了 async
和 await
关键字(使用这些关键字的函数也称为“原生协程”)。为了与 Python 的旧版本兼容,您可以使用 tornado.gen.coroutine
装饰器使用“装饰”或“基于 yield”的协程。
原生协程是尽可能推荐的形式。只有在需要与 Python 的旧版本兼容时才使用装饰协程。Tornado 文档中的示例通常会使用原生形式。
两种形式之间的转换通常很简单
# Decorated: # Native:
# Normal function declaration
# with decorator # "async def" keywords
@gen.coroutine
def a(): async def a():
# "yield" all async funcs # "await" all async funcs
b = yield c() b = await c()
# "return" and "yield"
# cannot be mixed in
# Python 2, so raise a
# special exception. # Return normally
raise gen.Return(b) return b
两种协程形式之间的其他区别在下面概述。
原生协程
通常更快。
可以使用
async for
和async with
语句,这使得某些模式变得更加简单。除非您
await
或yield
它们,否则根本不会运行。装饰协程可以在被调用后立即“在后台”开始运行。请注意,对于两种协程,使用await
或yield
很重要,这样任何异常都有地方可以去。
装饰协程
与
concurrent.futures
包具有额外的集成,允许直接生成executor.submit
的结果。对于原生协程,请使用IOLoop.run_in_executor
代替。支持通过生成列表或字典来等待多个对象的某些简写。使用
tornado.gen.multi
在原生协程中执行此操作。可以通过转换函数的注册表支持与其他包(包括 Twisted)的集成。要访问原生协程中的此功能,请使用
tornado.gen.convert_yielded
。始终返回
Future
对象。原生协程返回一个可等待对象,它不是Future
。在 Tornado 中,两者基本可以互换。
工作原理¶
本节解释装饰协程的操作。原生协程在概念上类似,但由于与 Python 运行时的额外集成,因此稍微复杂一些。
包含 yield
的函数是生成器。所有生成器都是异步的;当被调用时,它们返回一个生成器对象,而不是运行到完成。 @gen.coroutine
装饰器通过 yield
表达式与生成器进行通信,并通过返回一个 Future
与协程的调用者进行通信。
以下是对协程装饰器内部循环的简化版本
# Simplified inner loop of tornado.gen.Runner
def run(self):
# send(x) makes the current yield return x.
# It returns when the next yield is reached
future = self.gen.send(self.next)
def callback(f):
self.next = f.result()
self.run()
future.add_done_callback(callback)
装饰器从生成器接收一个 Future
,等待(不阻塞)该 Future
完成,然后“解包” Future
并将结果发送回生成器作为 yield
表达式的结果。大多数异步代码从不直接接触 Future
类,除了立即将异步函数返回的 Future
传递给 yield
表达式。
如何调用协程¶
协程不会以正常的方式引发异常:它们引发的任何异常都会被捕获在可等待对象中,直到它被生成。这意味着以正确的方式调用协程很重要,否则您可能会有未被注意到的错误
async def divide(x, y):
return x / y
def bad_call():
# This should raise a ZeroDivisionError, but it won't because
# the coroutine is called incorrectly.
divide(1, 0)
在几乎所有情况下,任何调用协程的函数本身都必须是一个协程,并且在调用中使用 await
或 yield
关键字。当您覆盖超类中定义的方法时,请查阅文档以查看是否允许协程(文档应说明该方法“可能是协程”或“可能返回 Future
”)。
async def good_call():
# await will unwrap the object returned by divide() and raise
# the exception.
await divide(1, 0)
有时您可能想“启动并忘记”一个协程,而无需等待它的结果。在这种情况下,建议使用 IOLoop.spawn_callback
,它使 IOLoop
负责调用。如果失败,IOLoop
会记录一个堆栈跟踪
# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)
以这种方式使用 IOLoop.spawn_callback
建议用于使用 @gen.coroutine
的函数,但它需要用于使用 async def
的函数(否则协程运行器将不会启动)。
最后,在程序的顶层,如果 IOLoop 尚未运行,您可以启动 IOLoop
,运行协程,然后使用 IOLoop.run_sync
方法停止 IOLoop
。这通常用于启动面向批处理的程序的 main
函数
# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))
协程模式¶
调用阻塞函数¶
从协程调用阻塞函数的最简单方法是使用 IOLoop.run_in_executor
,它返回与协程兼容的 Futures
async def call_blocking():
await IOLoop.current().run_in_executor(None, blocking_func, args)
并行性¶
multi
函数接受值为 Futures
的列表和字典,并并行等待所有这些 Futures
from tornado.gen import multi
async def parallel_fetch(url1, url2):
resp1, resp2 = await multi([http_client.fetch(url1),
http_client.fetch(url2)])
async def parallel_fetch_many(urls):
responses = await multi ([http_client.fetch(url) for url in urls])
# responses is a list of HTTPResponses in the same order
async def parallel_fetch_dict(urls):
responses = await multi({url: http_client.fetch(url)
for url in urls})
# responses is a dict {url: HTTPResponse}
在装饰协程中,可以直接 yield
列表或字典
@gen.coroutine
def parallel_fetch_decorated(url1, url2):
resp1, resp2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
交织¶
有时保存 Future
而不是立即生成它很有用,这样您可以在等待之前启动另一个操作。
from tornado.gen import convert_yielded
async def get(self):
# convert_yielded() starts the native coroutine in the background.
# This is equivalent to asyncio.ensure_future() (both work in Tornado).
fetch_future = convert_yielded(self.fetch_next_chunk())
while True:
chunk = await fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = convert_yielded(self.fetch_next_chunk())
await self.flush()
在装饰协程中,这更容易做到,因为它们在被调用时立即开始
@gen.coroutine
def get(self):
fetch_future = self.fetch_next_chunk()
while True:
chunk = yield fetch_future
if chunk is None: break
self.write(chunk)
fetch_future = self.fetch_next_chunk()
yield self.flush()
循环¶
在原生协程中,可以使用 async for
。在 Python 的旧版本中,协程的循环很棘手,因为没有办法在 for
或 while
循环的每次迭代中 yield
并捕获 yield 的结果。相反,您需要将循环条件与访问结果分开,如来自 Motor 的此示例所示
import motor
db = motor.MotorClient().test
@gen.coroutine
def loop_example(collection):
cursor = db.collection.find()
while (yield cursor.fetch_next):
doc = cursor.next_object()
在后台运行¶
作为 PeriodicCallback
的替代方案,协程可以包含一个 while True:
循环,并使用 tornado.gen.sleep
async def minute_loop():
while True:
await do_something()
await gen.sleep(60)
# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)
有时可能需要更复杂的循环。例如,前面的循环每隔 60+N
秒运行一次,其中 N
是 do_something()
的运行时间。要精确地每隔 60 秒运行一次,请使用上面的交错模式
async def minute_loop2():
while True:
nxt = gen.sleep(60) # Start the clock.
await do_something() # Run while the clock is ticking.
await nxt # Wait for the timer to run out.