协程

协程是编写 Tornado 中异步代码的推荐方式。协程使用 Python 的 await 关键字来挂起和恢复执行,而不是回调链(在像 gevent 这样的框架中看到的协作轻量级线程有时也被称为协程,但在 Tornado 中,所有协程都使用显式上下文切换,并被调用为异步函数)。

协程几乎与同步代码一样简单,但没有线程的开销。它们还 使并发更容易 通过减少上下文切换可能发生的次数来进行推理。

示例

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

原生协程与装饰协程

Python 3.5 引入了 asyncawait 关键字(使用这些关键字的函数也称为“原生协程”)。为了与 Python 的旧版本兼容,您可以使用 tornado.gen.coroutine 装饰器使用“装饰”或“基于 yield”的协程。

原生协程是尽可能推荐的形式。只有在需要与 Python 的旧版本兼容时才使用装饰协程。Tornado 文档中的示例通常会使用原生形式。

两种形式之间的转换通常很简单

# Decorated:                    # Native:

# Normal function declaration
# with decorator                # "async def" keywords
@gen.coroutine
def a():                        async def a():
    # "yield" all async funcs       # "await" all async funcs
    b = yield c()                   b = await c()
    # "return" and "yield"
    # cannot be mixed in
    # Python 2, so raise a
    # special exception.            # Return normally
    raise gen.Return(b)             return b

两种协程形式之间的其他区别在下面概述。

  • 原生协程

    • 通常更快。

    • 可以使用 async forasync with 语句,这使得某些模式变得更加简单。

    • 除非您 awaityield 它们,否则根本不会运行。装饰协程可以在被调用后立即“在后台”开始运行。请注意,对于两种协程,使用 awaityield 很重要,这样任何异常都有地方可以去。

  • 装饰协程

    • concurrent.futures 包具有额外的集成,允许直接生成 executor.submit 的结果。对于原生协程,请使用 IOLoop.run_in_executor 代替。

    • 支持通过生成列表或字典来等待多个对象的某些简写。使用 tornado.gen.multi 在原生协程中执行此操作。

    • 可以通过转换函数的注册表支持与其他包(包括 Twisted)的集成。要访问原生协程中的此功能,请使用 tornado.gen.convert_yielded

    • 始终返回 Future 对象。原生协程返回一个可等待对象,它不是 Future。在 Tornado 中,两者基本可以互换。

工作原理

本节解释装饰协程的操作。原生协程在概念上类似,但由于与 Python 运行时的额外集成,因此稍微复杂一些。

包含 yield 的函数是生成器。所有生成器都是异步的;当被调用时,它们返回一个生成器对象,而不是运行到完成。 @gen.coroutine 装饰器通过 yield 表达式与生成器进行通信,并通过返回一个 Future 与协程的调用者进行通信。

以下是对协程装饰器内部循环的简化版本

# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

装饰器从生成器接收一个 Future,等待(不阻塞)该 Future 完成,然后“解包” Future 并将结果发送回生成器作为 yield 表达式的结果。大多数异步代码从不直接接触 Future 类,除了立即将异步函数返回的 Future 传递给 yield 表达式。

如何调用协程

协程不会以正常的方式引发异常:它们引发的任何异常都会被捕获在可等待对象中,直到它被生成。这意味着以正确的方式调用协程很重要,否则您可能会有未被注意到的错误

async def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    divide(1, 0)

在几乎所有情况下,任何调用协程的函数本身都必须是一个协程,并且在调用中使用 awaityield 关键字。当您覆盖超类中定义的方法时,请查阅文档以查看是否允许协程(文档应说明该方法“可能是协程”或“可能返回 Future”)。

async def good_call():
    # await will unwrap the object returned by divide() and raise
    # the exception.
    await divide(1, 0)

有时您可能想“启动并忘记”一个协程,而无需等待它的结果。在这种情况下,建议使用 IOLoop.spawn_callback,它使 IOLoop 负责调用。如果失败,IOLoop 会记录一个堆栈跟踪

# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

以这种方式使用 IOLoop.spawn_callback 建议用于使用 @gen.coroutine 的函数,但它需要用于使用 async def 的函数(否则协程运行器将不会启动)。

最后,在程序的顶层,如果 IOLoop 尚未运行,您可以启动 IOLoop,运行协程,然后使用 IOLoop.run_sync 方法停止 IOLoop。这通常用于启动面向批处理的程序的 main 函数

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

协程模式

调用阻塞函数

从协程调用阻塞函数的最简单方法是使用 IOLoop.run_in_executor,它返回与协程兼容的 Futures

async def call_blocking():
    await IOLoop.current().run_in_executor(None, blocking_func, args)

并行性

multi 函数接受值为 Futures 的列表和字典,并并行等待所有这些 Futures

from tornado.gen import multi

async def parallel_fetch(url1, url2):
    resp1, resp2 = await multi([http_client.fetch(url1),
                                http_client.fetch(url2)])

async def parallel_fetch_many(urls):
    responses = await multi ([http_client.fetch(url) for url in urls])
    # responses is a list of HTTPResponses in the same order

async def parallel_fetch_dict(urls):
    responses = await multi({url: http_client.fetch(url)
                             for url in urls})
    # responses is a dict {url: HTTPResponse}

在装饰协程中,可以直接 yield 列表或字典

@gen.coroutine
def parallel_fetch_decorated(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

交织

有时保存 Future 而不是立即生成它很有用,这样您可以在等待之前启动另一个操作。

from tornado.gen import convert_yielded

async def get(self):
    # convert_yielded() starts the native coroutine in the background.
    # This is equivalent to asyncio.ensure_future() (both work in Tornado).
    fetch_future = convert_yielded(self.fetch_next_chunk())
    while True:
        chunk = await fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = convert_yielded(self.fetch_next_chunk())
        await self.flush()

在装饰协程中,这更容易做到,因为它们在被调用时立即开始

@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

循环

在原生协程中,可以使用 async for。在 Python 的旧版本中,协程的循环很棘手,因为没有办法在 forwhile 循环的每次迭代中 yield 并捕获 yield 的结果。相反,您需要将循环条件与访问结果分开,如来自 Motor 的此示例所示

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

在后台运行

作为 PeriodicCallback 的替代方案,协程可以包含一个 while True: 循环,并使用 tornado.gen.sleep

async def minute_loop():
    while True:
        await do_something()
        await gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

有时可能需要更复杂的循环。例如,前面的循环每隔 60+N 秒运行一次,其中 Ndo_something() 的运行时间。要精确地每隔 60 秒运行一次,请使用上面的交错模式

async def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        await do_something()  # Run while the clock is ticking.
        await nxt             # Wait for the timer to run out.