00282 协程与任务


前言

本节将简述用于协程与任务的高层级 API。

Operating System: Ubuntu 22.04.4 LTS

参考文档

  1. 协程与任务

协程

通过 async/await 语法来声明 协程 是编写 asyncio 应用的推荐方式。 例如,以下代码段会打印 “hello”,等待 1 秒,再打印 “world”:

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

注意:简单地调用一个协程并不会使其被调度执行

>>> main()
<coroutine object main at 0x1053bb7c8>

要实际运行一个协程,asyncio 提供了以下几种机制:

  • asyncio.run() 函数用来运行最高层级的入口点 “main()” 函数 (参见上面的示例。)

  • 对协程执行 await。以下代码段会在等待 1 秒后打印 “hello”,然后 再次 等待 2 秒后打印 “world”:

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

预期的输出:

started at 17:13:52
hello
world
finished at 17:13:55

asyncio.create_task() 函数用来并发运行作为 asyncio 任务 的多个协程。

让我们修改以上示例,并发 运行两个 say_after 协程:

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # 等待直到两个任务都完成
    # (会花费约 2 秒钟。)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

注意,预期的输出显示代码段的运行时间比之前快了 1 秒:

started at 17:14:32
hello
world
finished at 17:14:34

asyncio.TaskGroup 类提供了 create_task() 的更现代化的替代。 使用此 API,之前的例子将变为:

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(
            say_after(1, 'hello'))

        task2 = tg.create_task(
            say_after(2, 'world'))

        print(f"started at {time.strftime('%X')}")

    # 当存在上下文管理器时 await 是隐式执行的。

    print(f"finished at {time.strftime('%X')}")

用时和输出结果应当与之前的版本相同。

Added in version 3.11: asyncio.TaskGroup。

可等待对象

如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。

可等待 对象有三种主要类型: 协程, 任务 和 Future.

协程

Python 协程属于 可等待 对象,因此可以在其他协程中被等待:

import asyncio

async def nested():
    return 42

async def main():
    # 如果我们只调用 "nested()" 则无事发生。
    # 一个协程对象会被创建但是不会被等待,
    # 因此它 *根本不会运行*。
    nested()  # 将引发 "RuntimeWarning"。

    # 现在让我们改为等待它:
    print(await nested())  # 将打印 "42"。

asyncio.run(main())

重要: 在本文档中 “协程” 可用来表示两个紧密关联的概念:

  • 协程函数: 定义形式为 async def 的函数;

  • 协程对象: 调用 协程函数 所返回的对象。

任务

任务 被用来“并行的”调度协程

当一个协程通过 asyncio.create_task() 等函数被封装为一个 任务,该协程会被自动调度执行:

import asyncio

async def nested():
    return 42

async def main():
    # 将 nested() 加入计划任务
    # 立即与 "main()" 并发运行。
    task = asyncio.create_task(nested())

    # 现在可以使用 "task" 来取消 "nested()",or
    # 或简单地等待它直到它被完成:
    await task

asyncio.run(main())

Futures

Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果。

当一个 Future 对象 被等待,这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。

在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。

通常情况下 没有必要 在应用层级的代码中创建 Future 对象。

Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象:

async def main():
    await function_that_returns_a_future_object()

    # 这样也可以:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

一个很好的返回对象的低层级函数的示例是 loop.run_in_executor()。

休眠

coroutine asyncio.sleep(delay, result=None)

阻塞 delay 指定的秒数。

如果指定了 result,则当协程完成时将其返回给调用者。

sleep() 总是会挂起当前任务,以允许其他任务运行。

将 delay 设为 0 将提供一个经优化的路径以允许其他任务运行。 这可供长期间运行的函数使用以避免在函数调用的全过程中阻塞事件循环。

以下协程示例运行 5 秒,每秒显示一次当前日期:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

在 3.10 版本发生变更: 移除了 loop 形参。

在 3.13 版本发生变更: 如果 delay 为 nan 则会引发 ValueError。

并发运行任务

awaitable asyncio.gather(*aws, return_exceptions=False)

并发 运行 aws 序列中的 可等待对象。

如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。

如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

如果 return_exceptions 为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。

如果 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待对象也会 被取消。

如果 aws 序列中的任一 Task 或 Future 对象 被取消,它将被当作引发了 CancelledError 一样处理 – 在此情况下 gather() 调用 不会 被取消。这是为了防止一个已提交的 Task/Future 被取消导致其他 Tasks/Future 也被取消。

备注: 一个创建然后并发地运行任务等待它们完成的新选择是 asyncio.TaskGroup。 TaskGroup 提供了针对调度嵌套子任务的比 gather 更强的安全保证:如果一个任务(或子任务,即由一个任务调度的任务)引发了异常,TaskGroup 将取消剩余的已排期任务)。

示例:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # 将三个调用 *并发地* 加入计划任务:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

# 预期的输出:
#
#     Task A: Compute factorial(2), currently i=2...
#     Task B: Compute factorial(3), currently i=2...
#     Task C: Compute factorial(4), currently i=2...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3), currently i=3...
#     Task C: Compute factorial(4), currently i=3...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4), currently i=4...
#     Task C: factorial(4) = 24
#     [2, 6, 24]

备注: 如果 return_exceptions 为假值,则在 gather() 被标记为完成后取消它将不会取消任何已提交的可等待对象。 例如,在将一个异常传播给调用者之后,gather 可被标记为已完成,因此,在从 gather 捕获一个(由可等待对象所引发的)异常之后调用 gather.cancel() 将不会取消任何其他可等待对象。

在 3.7 版本发生变更: 如果 gather 本身被取消,则无论 return_exceptions 取值为何,消息都会被传播。

在 3.10 版本发生变更: 移除了 loop 形参。

自 3.10 版本弃用: 如果未提供位置参数或者并非所有位置参数均为 Future 类对象并且没有正在运行的事件循环则会发出弃用警告。

结语

第二百八十二篇博文写完,开心!!!!

今天,也是充满希望的一天。


文章作者: LuYF-Lemon-love
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 LuYF-Lemon-love !
  目录