Python asyncio: 异步编程实战指南
简介
随着并发需求的增加,Python 的 asyncio 模块成为处理高并发任务的首选工具。本文将深入解析 Python 3.12+ 中的最新异步编程特性。
基础语法与示例
创建异步函数
# ✅ 推荐使用 async/await
async def fetch_data(url: str) -> dict:
"""从 URL 获取数据"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# ❌ 避免同步代码阻塞异步上下文
def sync_function(): # 错误:应该用 async def
time.sleep(5) # 阻塞操作会冻结事件循环
Concurrently 执行多个任务
async def main():
# 并发执行 3 个网络请求
tasks = [fetch_data(f'https://api.example.com/{i}') for i in range(3)]
results = await asyncio.gather(*tasks)
return results
# 运行协程
asyncio.run(main())
进阶特性:Task 与 Future
from asyncio import Task, gather
# 创建独立任务
task = asyncio.create_task(fetch_data('https://api.example.com/data'))
# 等待多个任务完成
tasks = [asyncio.create_task(fetch_data(f'https://api.example.com/{i}'))
for i in range(5)]
await gather(*tasks, return_exceptions=True) # 异常不会被中断其他任务
最佳实践
1. 合理的超时控制
from asyncio import wait_for, TimeoutError
async def fetch_with_timeout(url: str, timeout: float = 5.0):
try:
return await wait_for(fetch_data(url), timeout)
except TimeoutError:
print(f"请求超时:{url}")
return None
2. 使用 asyncio.Semaphore 控制并发数
semaphore = asyncio.Semaphore(10) # 限制最多 10 个并发任务
async def bounded_fetch(url: str):
async with semaphore: # 获取锁
return await fetch_data(url)
3. 异常处理
try:
result = await fetch_data('https://api.example.com/data')
except asyncio.TimeoutError as e:
print(f"超时错误:{e}")
except ConnectionError as e:
print(f"连接错误:{e}")
finally:
print("清理资源")
常见问题与调试技巧
Q1: 如何查看任务执行顺序?
async def task_with_delay(name: str, duration: float):
await asyncio.sleep(duration)
print(f"[{name}] 完成")
# 并行执行但等待顺序完成
await gather(
task_with_delay("Task A", 2.0),
task_with_delay("Task B", 1.0),
task_with_delay("Task C", 3.0)
) # 输出顺序:A, B, C
Q2: 如何取消正在运行的任务?
async def long_running_task():
try:
for i in range(10):
print(f"执行第 {i} 步")
await asyncio.sleep(1) # 每 1 秒执行一步
except asyncio.CancelledError:
print("任务被取消!")
raise # 重新抛出异常
# 取消任务
task = asyncio.create_task(long_running_task())
await asyncio.sleep(3)
task.cancel() # 发送取消信号
try:
await task # 等待任务完成或被取消
except asyncio.CancelledError:
print("任务已被取消")
性能优化建议
| 优化策略 | 说明 | 效果 |
|---|---|---|
asyncio.gather() |
并发执行多个任务 | ⚡ 2-5x 提升 |
asyncio.to_thread() |
在事件循环外运行同步代码 | ⚡ 避免阻塞 |
| 合理设置超时时间 | 防止长时间挂起 | ✅ 提升稳定性 |
总结
Python asyncio 是处理高并发任务的核心工具。遵循以下最佳实践可显著提升性能:
- ✅ 使用
async/await编写异步函数 - ✅ 通过
gather()并发执行多个任务 - ✅ 合理使用超时控制和异常处理
- ✅ 使用 Semaphore 限制并发数防止资源耗尽
相关链接:
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END





暂无评论内容