Python异步编程完全指南:从asyncio基础到高性能并发实战

今天 4阅读

Python的异步编程(asyncio)是提升应用性能的利器,但对很多开发者来说,它的概念和工作原理并不直观。本文从零开始,系统讲解Python异步编程的核心概念和实战技巧。

为什么需要异步编程?

假设你需要从10个不同的API获取数据。同步方式:依次发送请求,每个请求等待响应,总耗时=10×单次响应时间。异步方式:同时发送10个请求,等待所有响应返回,总耗时≈单次响应时间。性能提升可达10倍。

但异步编程不只是快——它的核心价值在于高效利用I/O等待时间。当程序在等待网络响应或磁盘读写时,CPU完全可以去处理其他任务。

核心概念:协程、事件循环、Future

协程(Coroutine):用async def定义的函数,可以在执行中途暂停并让出控制权。这是异步编程的最小执行单元。

事件循环(Event Loop):异步程序的调度器,负责管理所有协程的执行顺序。它不断检查哪些协程可以继续执行,哪些还需要等待。

Future/Task:一个占位符对象,代表一个尚未完成的异步操作的结果。

import asyncio

async def fetch_data(url):
    print(f"开始获取 {url}")
    await asyncio.sleep(1)  # 模拟网络请求
    print(f"完成获取 {url}")
    return f"data from {url}"

async def main():
    # 并发执行3个协程
    tasks = [
        fetch_data("https://api1.com"),
        fetch_data("https://api2.com"),
        fetch_data("https://api3.com"),
    ]
    results = await asyncio.gather(*tasks)
    print(results)  # 大约1秒完成,而非3秒

asyncio.run(main())

实战一:高性能HTTP客户端

使用aiohttp构建一个每秒能处理数千请求的异步爬虫:

import aiohttp
import asyncio
from typing import List

async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def fetch_all(urls: List[str], concurrency: int = 50):
    connector = aiohttp.TCPConnector(limit=concurrency)
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

# 使用
urls = [f"https://api.example.com/item/{i}" for i in range(1000)]
results = asyncio.run(fetch_all(urls, concurrency=100))

关键点:TCPConnector(limit=concurrency)限制了并发连接数,防止对目标服务器造成过大压力。

实战二:异步数据库操作

使用asyncpg与PostgreSQL进行高性能异步交互:

import asyncpg
import asyncio

async def query_users(pool: asyncpg.Pool, min_age: int):
    async with pool.acquire() as conn:
        return await conn.fetch(
            "SELECT id, name, email FROM users WHERE age > $1",
            min_age
        )

async def main():
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/db",
        min_size=5, max_size=20
    )
    # 并发执行10个查询
    tasks = [query_users(pool, age) for age in range(18, 28)]
    results = await asyncio.gather(*tasks)
    await pool.close()

asyncio.run(main())

常见陷阱与解决方案

陷阱1:在协程中调用同步阻塞函数

如果在协程中调用time.sleep()或同步HTTP请求库(如requests),会阻塞整个事件循环。解决方案:

# 错误
async def bad():
    time.sleep(5)  # 阻塞事件循环!

# 正确
async def good():
    await asyncio.sleep(5)  # 不阻塞

# 或在线程池中运行同步代码
async def run_blocking():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, requests.get, "https://example.com")

陷阱2:忘记await

调用协程函数时忘记await只会得到一个coroutine对象,不会实际执行:

# 错误
async def main():
    fetch_data("url")  # 返回coroutine对象,不执行!
    
# 正确
async def main():
    await fetch_data("url")  # 实际执行

陷阱3:Signal和Cancellation未处理

异步程序收到SIGTERM信号时需要优雅关闭:

async def main():
    loop = asyncio.get_running_loop()
    stop_event = asyncio.Event()
    
    def signal_handler():
        stop_event.set()
    
    loop.add_signal_handler(signal.SIGTERM, signal_handler)
    
    try:
        await stop_event.wait()
    finally:
        # 优雅关闭:取消所有任务,关闭连接
        await cleanup()

异步 vs 多线程 vs 多进程

异步:适合I/O密集型任务(网络请求、数据库查询、文件读写)。单线程,无GIL竞争,资源消耗最低。

多线程:适合需要并行执行但受GIL限制不大的场景。注意线程安全。

多进程:适合CPU密集型任务(数学计算、图像处理)。绕过GIL,但进程间通信成本高。

现代Python应用的最佳实践是混合使用:异步处理I/O,多进程处理CPU密集型计算。

性能调优建议

  • 使用uvloop替代标准事件循环,性能提升2-4倍
  • 合理设置TCPConnectorlimit参数,避免连接数过多
  • 使用asyncio.Semaphore控制并发速率
  • 监控事件循环延迟,及时发现阻塞问题
  • 生产环境使用orjson替代标准json库进行异步序列化

掌握异步编程后,你会发现Python的性能瓶颈并不在语言本身,而在于如何使用它。

文章版权声明:除非注明,否则均为极派博客原创文章,转载或复制请以超链接形式并注明出处。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码