[Python] asyncio 异步编程实战指南:并发、超时控制与最佳实践

Python asyncio: 异步编程实战指南

简介

随着并发需求的增加,Python 的 asyncio 模块成为处理高并发任务的首选工具。本文将深入解析 Python 3.12+ 中的最新异步编程特性。

基础语法与示例

创建异步函数

# ✅ 推荐使用 async/await
async def fetch_data(url: str) -> dict:
    """从 URL 获取数据"""
    import aiohttp

    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# ❌ 避免同步代码阻塞异步上下文
def sync_function():  # 错误:应该用 async def
    time.sleep(5)  # 阻塞操作会冻结事件循环

Concurrently 执行多个任务

async def main():
    # 并发执行 3 个网络请求
    tasks = [fetch_data(f'https://api.example.com/{i}') for i in range(3)]
    results = await asyncio.gather(*tasks)

    return results

# 运行协程
asyncio.run(main())

进阶特性:Task 与 Future

from asyncio import Task, gather

# 创建独立任务
task = asyncio.create_task(fetch_data('https://api.example.com/data'))

# 等待多个任务完成
tasks = [asyncio.create_task(fetch_data(f'https://api.example.com/{i}')) 
         for i in range(5)]
await gather(*tasks, return_exceptions=True)  # 异常不会被中断其他任务

最佳实践

1. 合理的超时控制

from asyncio import wait_for, TimeoutError

async def fetch_with_timeout(url: str, timeout: float = 5.0):
    try:
        return await wait_for(fetch_data(url), timeout)
    except TimeoutError:
        print(f"请求超时:{url}")
        return None

2. 使用 asyncio.Semaphore 控制并发数

semaphore = asyncio.Semaphore(10)  # 限制最多 10 个并发任务

async def bounded_fetch(url: str):
    async with semaphore:  # 获取锁
        return await fetch_data(url)

3. 异常处理

try:
    result = await fetch_data('https://api.example.com/data')
except asyncio.TimeoutError as e:
    print(f"超时错误:{e}")
except ConnectionError as e:
    print(f"连接错误:{e}")
finally:
    print("清理资源")

常见问题与调试技巧

Q1: 如何查看任务执行顺序?

async def task_with_delay(name: str, duration: float):
    await asyncio.sleep(duration)
    print(f"[{name}] 完成")

# 并行执行但等待顺序完成
await gather(
    task_with_delay("Task A", 2.0),
    task_with_delay("Task B", 1.0),
    task_with_delay("Task C", 3.0)
)  # 输出顺序:A, B, C

Q2: 如何取消正在运行的任务?

async def long_running_task():
    try:
        for i in range(10):
            print(f"执行第 {i} 步")
            await asyncio.sleep(1)  # 每 1 秒执行一步
    except asyncio.CancelledError:
        print("任务被取消!")
        raise  # 重新抛出异常

# 取消任务
task = asyncio.create_task(long_running_task())
await asyncio.sleep(3)
task.cancel()  # 发送取消信号
try:
    await task  # 等待任务完成或被取消
except asyncio.CancelledError:
    print("任务已被取消")

性能优化建议

优化策略 说明 效果
asyncio.gather() 并发执行多个任务 ⚡ 2-5x 提升
asyncio.to_thread() 在事件循环外运行同步代码 ⚡ 避免阻塞
合理设置超时时间 防止长时间挂起 ✅ 提升稳定性

总结

Python asyncio 是处理高并发任务的核心工具。遵循以下最佳实践可显著提升性能:

  • ✅ 使用 async/await 编写异步函数
  • ✅ 通过 gather() 并发执行多个任务
  • ✅ 合理使用超时控制和异常处理
  • ✅ 使用 Semaphore 限制并发数防止资源耗尽

相关链接:

© 版权声明
THE END
喜欢就支持一下吧
点赞7 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容