发布于 2026-02-25
https://docs.python.org/3/library/asyncio.html
Python 的异步核心是 asyncio,它是一个内置的标准库。它跟 JS 都是 “事件循环 + 非阻塞 IO” 的结构,可对照学习其异同。本文涵盖 async await 语法、创建任务、并发运行、异步生成器、并发数控制、错误处理等方面。
在 I/O 密集场景、需要高并发的网络服务时,适合使用异步。
如果是 CPU 密集任务时,在 Python 中通常可以使用多进程,或把计算丢到独立服务。
与 JS 类似,Python 也是通过 async await 语法进行异步处理:
async def 定义的函数,返回协程对象asyncio.sleep 这样的异步方法而不是同步方法,使用第三方库时也应该主动使用异步库,否则会阻塞 event loop最小示例:
import asyncio
async def fetch():
await asyncio.sleep(1)
return "data"
async def main():
result = await fetch()
print(result)
asyncio.run(main())
直接 await fetch() 时,当前协程会暂停,直到 fetch() 完成才继续执行。
可通过 asyncio.create_task 创建任务,把协程立刻注册到事件循环中并并发运行,并返回一个 Task 对象。此时 fetch() 立即开始执行,但当前协程不必马上等它,后续可在合适时机 await task 。
import asyncio
async def main():
a_task = asyncio.create_task(fetch())
b_task = asyncio.create_task(fetch())
a = await a_task
b = await b_task
print(a, b)
asyncio.run(main())
类似在 JS 中这样写:
async function main() {
const a = fetch();
const b = fetch();
console.log(await a, await b);
}
可并发地运行多个异步任务,并收集它们的返回结果。类似于 JS 的 Promise.all 方法。
import asyncio
async def main():
result = await asyncio.gather(
fetch(),
fetch()
)
print(result)
asyncio.run(main())
JS 的事件循环是自动运行的,Python 需要 asyncio.run 显示启动。
跟 JS 类似,Python 的默认事件循环是单线程的,它能在大量 I/O 任务里保持高吞吐,因为 CPU 不会空等,而是不断切换到可继续执行的协程。
相比较 JS 事件循环有宏任务和微任务队列等情况,Python 的事件循环没有这么复杂,可以把它理解成一个协程调度器。基本逻辑:取一个任务 -> 执行到 await -> 挂起 -> 换下一个。
可以使用异步生成器,用来一边等待异步操作,一边逐步产生数据。在流式数据中很常见,如:
async for chunk in stream:
yield chunk
定义一个异步生成器,可以理解成:async + yield 的组合:
async def gen():
yield 1
await sleep(1)
yield 2
使用:
async for x in gen():
yield x
类似于 JS 中的 async function* + for await:
async function* gen() {
yield 1
await new Promise(r => setTimeout(r, 1000))
yield 2
}
for await (const x of gen()) {
console.log(x)
}
可以用 Semaphore 限制并发量:
import asyncio
sem = asyncio.Semaphore(3)
async def bounded_fetch(i):
async with sem:
await asyncio.sleep(1)
return i
async def main():
tasks = [asyncio.create_task(bounded_fetch(i)) for i in range(10)]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
在 JS 中,本身不支持这个并发控制特性,可以通过 p-limit 这样的库控制并发上限。
在 Python 异步(async/await) 里处理错误,和同步代码很像,用 try / except 包住 await ,跟 JS 用法类似。
import asyncio
async def task():
await asyncio.sleep(1)
raise ValueError("error")
async def main():
try:
await task()
except Exception as e:
print("catch:", e)
asyncio.run(main())
在 gather 并发任务中,默认只要一个任务报错,其他任务就可能会被取消,整个 gather 抛异常,这个效果也是类似于 Promise.all()。
可通过 return_exceptions=True 参数将异常返回,然后自己处理,类似于 JS 的 Promise.allSettled()。
async def ok():
await asyncio.sleep(0.5)
return "ok"
async def bad():
await asyncio.sleep(0.2)
raise ValueError("boom")
async def main():
results = await asyncio.gather(ok(), bad(), return_exceptions=True)
for r in results:
if isinstance(r, Exception):
print("task error:", r)
else:
print(r)
asyncio.run(main())
create_task 任务,里面报错可能不会立刻看到异常,错误处理