tornado.locks – 同步原语

版本 4.2 新增。

使用类似于标准库为线程提供的同步原语来协调协程。这些类与标准库中提供的 asyncio 包 中提供的类非常相似。

警告

请注意,这些原语实际上不是线程安全的,不能用作标准库中 threading 模块中的原语的替代品——它们旨在在一个线程应用程序中协调 Tornado 协程,而不是在一个多线程应用程序中保护共享对象。

Condition

class tornado.locks.Condition[source]

Condition 允许一个或多个协程等待直到被通知。

与标准的 threading.Condition 相似,但不需要底层锁,不需要获取和释放锁。

使用 Condition,协程可以等待被其他协程通知。

import asyncio
from tornado import gen
from tornado.locks import Condition

condition = Condition()

async def waiter():
    print("I'll wait right here")
    await condition.wait()
    print("I'm done waiting")

async def notifier():
    print("About to notify")
    condition.notify()
    print("Done notifying")

async def runner():
    # Wait for waiter() and notifier() in parallel
    await gen.multi([waiter(), notifier()])

asyncio.run(runner())
I'll wait right here
About to notify
Done notifying
I'm done waiting

wait 接收一个可选的 timeout 参数,它可以是一个绝对时间戳

io_loop = IOLoop.current()

# Wait up to 1 second for a notification.
await condition.wait(timeout=io_loop.time() + 1)

…或者是一个 datetime.timedelta,用于指定相对于当前时间的超时时间。

# Wait up to 1 second.
await condition.wait(timeout=datetime.timedelta(seconds=1))

如果在截止日期前没有收到通知,该方法将返回 False。

版本 5.0 中更改: 之前,等待者可以在 notify 中同步地收到通知。现在,通知总是会在 IOLoop 的下一次迭代中接收。

wait(timeout: Optional[Union[float, timedelta]] = None) Awaitable[bool][source]

等待 notify

返回一个 Future,如果 Condition 被通知,则解析为 True,或者在超时后解析为 False

notify(n: int = 1) None[source]

唤醒 n 个等待者。

notify_all() None[source]

唤醒所有等待者。

Event

class tornado.locks.Event[source]

Event 会阻塞协程,直到其内部标志设置为 True。

类似于 threading.Event

协程可以等待事件被设置。一旦被设置,对 yield event.wait() 的调用将不会阻塞,除非事件已经被清除。

import asyncio
from tornado import gen
from tornado.locks import Event

event = Event()

async def waiter():
    print("Waiting for event")
    await event.wait()
    print("Not waiting this time")
    await event.wait()
    print("Done")

async def setter():
    print("About to set the event")
    event.set()

async def runner():
    await gen.multi([waiter(), setter()])

asyncio.run(runner())
Waiting for event
About to set the event
Not waiting this time
Done
is_set() bool[source]

如果内部标志为 true,则返回 True

set() None[source]

将内部标志设置为 True。所有等待者都被唤醒。

一旦标志被设置,调用 wait 将不会阻塞。

clear() None[source]

将内部标志重置为 False

wait 的调用将阻塞,直到调用 set

wait(timeout: Optional[Union[float, timedelta]] = None) Awaitable[None][source]

阻塞,直到内部标志为 true。

返回一个可等待对象,它在超时后会引发 tornado.util.TimeoutError

Semaphore

class tornado.locks.Semaphore(value: int = 1)[source]

一个可以在阻塞之前被获取固定次数的锁。

Semaphore 管理一个计数器,表示 release 调用次数减去 acquire 调用次数,再加上初始值。如果需要,acquire 方法会阻塞,直到它可以在不使计数器变为负数的情况下返回。

Semaphore 限制对共享资源的访问。要允许两个工作进程同时访问

import asyncio
from tornado import gen
from tornado.locks import Semaphore

sem = Semaphore(2)

async def worker(worker_id):
    await sem.acquire()
    try:
        print("Worker %d is working" % worker_id)
        await use_some_resource()
    finally:
        print("Worker %d is done" % worker_id)
        sem.release()

async def runner():
    # Join all workers.
    await gen.multi([worker(i) for i in range(3)])

asyncio.run(runner())
Worker 0 is working
Worker 1 is working
Worker 0 is done
Worker 2 is working
Worker 1 is done
Worker 2 is done

工作进程 0 和 1 可以并发运行,但工作进程 2 必须等待工作进程 0 释放信号量。

信号量可以作为异步上下文管理器使用

async def worker(worker_id):
    async with sem:
        print("Worker %d is working" % worker_id)
        await use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

为了与旧版本的 Python 兼容,acquire 是一个上下文管理器,所以 worker 也可以写成

@gen.coroutine
def worker(worker_id):
    with (yield sem.acquire()):
        print("Worker %d is working" % worker_id)
        yield use_some_resource()

    # Now the semaphore has been released.
    print("Worker %d is done" % worker_id)

在版本 4.3 中更改: 在 Python 3.5 中添加了 async with 支持。

release() None[source]

递增计数器并唤醒一个等待者。

acquire(timeout: Optional[Union[float, timedelta]] = None) Awaitable[_ReleasingContextManager][source]

递减计数器。返回一个可等待对象。

如果计数器为零,则阻塞并等待 release。可等待对象在截止时间后会引发 TimeoutError

BoundedSemaphore

class tornado.locks.BoundedSemaphore(value: int = 1)[source]

一个防止 release 被调用太多次的信号量。

如果 release 会使信号量的值超过初始值,它会引发 ValueError。信号量主要用于保护容量有限的资源,因此如果信号量释放了太多次,则表明存在错误。

release() None[source]

递增计数器并唤醒一个等待者。

acquire(timeout: Optional[Union[float, timedelta]] = None) Awaitable[_ReleasingContextManager]

递减计数器。返回一个可等待对象。

如果计数器为零,则阻塞并等待 release。可等待对象在截止时间后会引发 TimeoutError

Lock

class tornado.locks.Lock[source]

协程的锁。

Lock 从未锁定状态开始,acquire 会立即锁定它。在它被锁定的情况下,任何 yield acquire 的协程都会等待另一个协程调用 release

释放未锁定的锁会引发 RuntimeError

Lock 可以用 async with 语句作为异步上下文管理器使用

>>> from tornado import locks
>>> lock = locks.Lock()
>>>
>>> async def f():
...    async with lock:
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

为了与旧版本的 Python 兼容,acquire 方法会异步返回一个常规的上下文管理器

>>> async def f2():
...    with (yield lock.acquire()):
...        # Do something holding the lock.
...        pass
...
...    # Now the lock is released.

在版本 4.3 中更改: 在 Python 3.5 中添加了 async with 支持。

acquire(timeout: Optional[Union[float, timedelta]] = None) Awaitable[_ReleasingContextManager][source]

尝试锁定。返回一个可等待对象。

返回一个可等待对象,它在超时后会引发 tornado.util.TimeoutError

release() None[source]

解锁。

等待 acquire 的第一个协程会获得锁。

如果未锁定,则引发 RuntimeError