tornado.queues – 协程队列

版本 4.2 中新增。

用于协程的异步队列。这些类与标准库中的 asyncio 包 中提供的类非常相似。

警告

与标准库中的 queue 模块不同,此处定义的类不是线程安全的。要在另一个线程中使用这些队列,请使用 IOLoop.add_callback 将控制权转移到 IOLoop 线程,然后再调用任何队列方法。

Queue

class tornado.queues.Queue(maxsize: int = 0)[source]

协调生产者和消费者协程。

如果 maxsize 为 0(默认值),则队列大小不受限制。

import asyncio
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await asyncio.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    for item in range(5):
        await q.put(item)
        print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')

asyncio.run(main())
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done

在没有原生协程的 Python 版本中(3.5 之前),consumer() 可以写成

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

版本 4.3 中的更改: 在 Python 3.5 中添加了 async for 支持。

property maxsize: int

队列中允许的项目数。

qsize() int[source]

队列中的项目数。

put(item: _T, timeout: Optional[Union[float, timedelta]] = None) Future[None][source]

将项目放入队列,可能需要等待直到有空间。

返回一个 Future,在超时后会引发 tornado.util.TimeoutError

timeout 可以是表示时间的数字(与 tornado.ioloop.IOLoop.time 相同的比例尺,通常是 time.time),或者一个 datetime.timedelta 对象,表示相对于当前时间的截止时间。

put_nowait(item: _T) None[source]

将项目放入队列而不阻塞。

如果立即没有空闲插槽,则引发 QueueFull

get(timeout: Optional[Union[float, timedelta]] = None) Awaitable[_T][source]

从队列中移除并返回一个项目。

返回一个可等待对象,该对象在项目可用时解析,或者在超时后引发 tornado.util.TimeoutError

timeout 可以是表示时间的数字(与 tornado.ioloop.IOLoop.time 相同的比例尺,通常是 time.time),或者一个 datetime.timedelta 对象,表示相对于当前时间的截止时间。

注意

此方法的 timeout 参数不同于标准库中的 queue.Queue.get。该方法将数字值解释为相对超时;此方法将它们解释为绝对截止时间,并需要 timedelta 对象来表示相对超时(与 Tornado 中的其他超时保持一致)。

get_nowait() _T[source]

从队列中移除并返回一个项目而不阻塞。

如果立即有项目可用,则返回一个项目,否则引发 QueueEmpty

task_done() None[source]

表明一个以前排队的任务已完成。

供队列消费者使用。对于每个用于获取任务的 get,随后的对 task_done 的调用告诉队列,该任务的处理已完成。

如果一个 join 正在阻塞,则当所有项目都已处理完时,它将恢复;也就是说,当每个 put 都与一个 task_done 匹配时。

如果调用的次数超过 put 的次数,则引发 ValueError

join(timeout: Optional[Union[float, timedelta]] = None) Awaitable[None][source]

阻塞直到队列中的所有项目都被处理。

返回一个可等待对象,在超时后会引发 tornado.util.TimeoutError

PriorityQueue

class tornado.queues.PriorityQueue(maxsize: int = 0)[source]

一个 Queue,它按照优先级顺序检索条目,最低的优先级在最前面。

条目通常是类似 (priority number, data) 的元组。

import asyncio
from tornado.queues import PriorityQueue

async def main():
    q = PriorityQueue()
    q.put((1, 'medium-priority item'))
    q.put((0, 'high-priority item'))
    q.put((10, 'low-priority item'))

    print(await q.get())
    print(await q.get())
    print(await q.get())

asyncio.run(main())
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')

LifoQueue

class tornado.queues.LifoQueue(maxsize: int = 0)[source]

一个 Queue,它首先检索最最近放入的项目。

import asyncio
from tornado.queues import LifoQueue

async def main():
    q = LifoQueue()
    q.put(3)
    q.put(2)
    q.put(1)

    print(await q.get())
    print(await q.get())
    print(await q.get())

asyncio.run(main())
1
2
3

异常

QueueEmpty

exception tornado.queues.QueueEmpty[source]

当队列中没有项目时,由 Queue.get_nowait 引发。

QueueFull

exception tornado.queues.QueueFull[source]

当队列达到其最大大小时,由 Queue.put_nowait 引发。