黑白梦黑白梦

toggle navtoggle nav
  • 文章
  • 专栏
  • 文章
  • 专栏

Python 异步语法入门(对照 JS)

发布于 2026-02-25

https://docs.python.org/3/library/asyncio.html

Python 的异步核心是 asyncio,它是一个内置的标准库。它跟 JS 都是 “事件循环 + 非阻塞 IO” 的结构,可对照学习其异同。本文涵盖 async await 语法、创建任务、并发运行、异步生成器、并发数控制、错误处理等方面。

何时用异步

在 I/O 密集场景、需要高并发的网络服务时,适合使用异步。

如果是 CPU 密集任务时,在 Python 中通常可以使用多进程,或把计算丢到独立服务。

async await 语法

与 JS 类似,Python 也是通过 async await 语法进行异步处理:

  • 协程: async def 定义的函数,返回协程对象
  • await: 暂停当前协程,等待另一个协程完成
  • asnyc 方法中应该使用 asyncio.sleep 这样的异步方法而不是同步方法,使用第三方库时也应该主动使用异步库,否则会阻塞 event loop

最小示例:

import asyncio

async def fetch():
    await asyncio.sleep(1)
    return "data"

async def main():
    result = await fetch()
    print(result)

asyncio.run(main())

create_task 创建任务

直接 await fetch() 时,当前协程会暂停,直到 fetch() 完成才继续执行。

可通过 asyncio.create_task 创建任务,把协程立刻注册到事件循环中并并发运行,并返回一个 Task 对象。此时 fetch() 立即开始执行,但当前协程不必马上等它,后续可在合适时机 await task 。

import asyncio

async def main():
    a_task = asyncio.create_task(fetch())
    b_task = asyncio.create_task(fetch())
    a = await a_task
    b = await b_task
    print(a, b)

asyncio.run(main())

类似在 JS 中这样写:

async function main() {
  const a = fetch();
  const b = fetch();
  console.log(await a, await b);
}

gather 并发运行

可并发地运行多个异步任务,并收集它们的返回结果。类似于 JS 的 Promise.all 方法。

import asyncio

async def main():
    result = await asyncio.gather(
        fetch(),
        fetch()
    )
    print(result)

asyncio.run(main())

事件循环与任务调度

JS 的事件循环是自动运行的,Python 需要 asyncio.run 显示启动。

跟 JS 类似,Python 的默认事件循环是单线程的,它能在大量 I/O 任务里保持高吞吐,因为 CPU 不会空等,而是不断切换到可继续执行的协程。

相比较 JS 事件循环有宏任务和微任务队列等情况,Python 的事件循环没有这么复杂,可以把它理解成一个协程调度器。基本逻辑:取一个任务 -> 执行到 await -> 挂起 -> 换下一个。

async generator 异步生成器

可以使用异步生成器,用来一边等待异步操作,一边逐步产生数据。在流式数据中很常见,如:

async for chunk in stream:
    yield chunk

定义一个异步生成器,可以理解成:async + yield 的组合:

async def gen():
    yield 1
    await sleep(1)
    yield 2

使用:

async for x in gen():
    yield x

类似于 JS 中的 async function* + for await:

async function* gen() {
  yield 1
  await new Promise(r => setTimeout(r, 1000))
  yield 2
}

for await (const x of gen()) {
  console.log(x)
}

并发数控制

可以用 Semaphore 限制并发量:

import asyncio

sem = asyncio.Semaphore(3)

async def bounded_fetch(i):
    async with sem:
        await asyncio.sleep(1)
        return i

async def main():
    tasks = [asyncio.create_task(bounded_fetch(i)) for i in range(10)]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

在 JS 中,本身不支持这个并发控制特性,可以通过 p-limit 这样的库控制并发上限。

错误处理

在 Python 异步(async/await) 里处理错误,和同步代码很像,用 try / except 包住 await ,跟 JS 用法类似。

import asyncio

async def task():
    await asyncio.sleep(1)
    raise ValueError("error")

async def main():
    try:
        await task()
    except Exception as e:
        print("catch:", e)

asyncio.run(main())

在 gather 并发任务中,默认只要一个任务报错,其他任务就可能会被取消,整个 gather 抛异常,这个效果也是类似于 Promise.all()。

可通过 return_exceptions=True 参数将异常返回,然后自己处理,类似于 JS 的 Promise.allSettled()。

async def ok():
    await asyncio.sleep(0.5)
    return "ok"

async def bad():
    await asyncio.sleep(0.2)
    raise ValueError("boom")

async def main():
    results = await asyncio.gather(ok(), bad(), return_exceptions=True)

    for r in results:
        if isinstance(r, Exception):
            print("task error:", r)
        else:
            print(r)

asyncio.run(main())

create_task 任务,里面报错可能不会立刻看到异常,错误处理

©2015-2026 黑白梦 粤ICP备15018165号

联系: heibaimeng@foxmail.com