vlambda博客
学习文章列表

3-4,协程&asyncio&异步编程补充



3.6 concurrent.futures.Future对象

    - 使用线程池、进程池实现异步操作时使用到的对象

    - 示例:

import timefrom concurrent.futures import Futurefrom concurrent.futures.thread import ThreadPoolExecutorfrom concurrent.futures.process import ProcessPoolExecutor
def func(value): time.sleep(1) print(value) return 123
#创建线程池pool = ThreadPoolExecutor(max_workers=5)
#创建进程池# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10): fut = pool.submit(func,i)    print(fut)

输出为

<Future at 0x9feea90 state=running><Future at 0xa82dd30 state=running><Future at 0xa83d0a0 state=running><Future at 0xa83d400 state=running><Future at 0xa83d790 state=running><Future at 0xa83db20 state=pending><Future at 0xa83dc40 state=pending><Future at 0xa83dd30 state=pending><Future at 0xa83de20 state=pending><Future at 0xa83df10 state=pending>0123457689
写代码可能会交叉使用,例如:crm项目80%都是基于协程异步编程+MySQL(不支持)【线程或进程做异步编程】
import timeimport asyncioimport concurrent.futures
def func1(): time.sleep(2) return 'SB'
async def main(): loop = asyncio.get_running_loop()
#1.Run in the default loop's executor(默认为ThreadPoolExecutor) #第一步:内部会先调用ThreadPoolExecutor的submit方法去线程池申请一个线程去执行 #func1函数,并返回一个concurrent.futures.Future对象 #第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为 #asyncio.Future对象。因为concurrent.futures.Future对象不支持await语法, #所以需要包装为asyncio.Future对象才能使用 fut = loop.run_in_executor(None,func1) result = await fut print("default thread pool",result)
# 2,run in a custom thread pool # with concurrent.futures.ThreadPoolExecutor() as pool: # result = await loop.run_in_executor(pool,func1) # print('custom thread pool',result) # # #3, run in a custom process pool # with concurrent.futures.ThreadPoolExecutor() as pool: # result = await loop.run_in_executor(pool, func1) # print('custom process pool', result)
asyncio.run(main())
案例:asyncio + 不支持异步的模块
import requestsimport timeimport asyncio
def download_image(url): # 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务) print('开始下载:',url) loop = asyncio.get_event_loop() # requests模块默认不支持异步操作,所以使用线程池来配合实现 future = loop.run_in_executor(None,requests.get,url) response = await future #三个url的IO请求都发送,三个线程池在此等待 print('下载完成') # 效果相同,浪费资源更多 # 图片保存到本地 file_name = url.split('/')[-1] with open(file_name,mode='wb') as fp: fp.write(response.content)
if __name__ == '__main__': start_time = time.time() url_list = [ 'http://pic.netbian.com/uploads/allimg/180826/113958-153525479855be.jpg', 'http://img.netbian.com/file/2020/0413/ccb961b600fceffd457d557fcac09cad.jpg', 'http://img.netbian.com/file/2021/0104/a1177d8604e27cdaf0cbf34f5713e2a6.jpg' ]
tasks = [download_image(url) for url in url_list]
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
end_time = time.time() t = end_time - start_time print("下载时间为:", t)

3.7 异步迭代器(不重要)

什么是异步迭代器
什么是可迭代对象(见官方定义)
import asyncio
class Reader(object): """自定义异步迭代器(同时也是异步可迭代对象)"""
def __init__(self): self.count = 0
async def readline(self): await asyncio.sleep(1) self.count += 1 if self.count == 100: return None return self.count
def __aiter__(self): return self
async def __anext__(self): val = await self.readline() if val == None: raise StopIteration return val
async def func(): obj = Reader() async for item in obj:# async for 必须写在协程函数里 print(item)
asyncio.run( func() )

3.8 异步上下文管理器

官方定义

import asyncio
class AsyncContextManager: def __init__(self): # self.conn = conn print('')
async def do_something(self): # 异步操作数据库 return 666
async def __aenter__(self): # 异步链接数据库 self.conn = await asyncio.sleep(1) return self
async def __aexit__(self,exc_type,exc,tb): # 异步关闭数据库链接 await asyncio.sleep(1)
async def main(): async with AsyncContextManager() as f: # async with不能单独存在,必须嵌套在async函数中 result = await f.do_something() print(result)
asyncio.run(main())

输出


666

4,uvloop

可理解为asyncio的事件循环替代方案,事件循环>默认asyncio的事件循环,效率高。
pip install uvloop
import asyncioimport uvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
#编写asyncio的代码,和之前写的代码一致
#内部的事件循环自动化变为uvloop
asyncio.run(...)

注意:asgi(支持异步的web网络输出接口)->uvicorn基于uvloop效率高

5,实战案例

5.1 异步操作redis

在通过代码操作redis时,链接/操作/断开都是IO。(pip install aioredis)
- 示例1:
import asyncioimport aioredisasync def execute(address,password): print('开始执行',address) #网络IO操作:创建redis链接 redis = await aioredis.create_redis(address,password=password)  #网络IO操作,在redis中设置哈希值car,内部设三个键值对,即 # redis={car:{key1:1,key2:2,key3:3}} await redis.hmset_dict('car',key1=1,key2=2,key3=3)  #网络IO操作:去Redis中获取值 result = await redis.hgetall('car',encoding='utf-8') print(result)  redis.close() #网络IO操作:关闭redis连接 await redis.wait_closed()  print('结束',address) asyncio.run(execute('redis://47.93.4.198:6379','root12345'))
- 示例2:
import asyncioimport aioredisasync def execute(address,password): print('开始执行',address) #网络IO操作:先去连接47.93.4.198:6379,遇到IO则自动切换任务 redis = await aioredis.create_redis(address,password=password)
#网络IO操作,遇到IO则自动切换任务 # 在redis中设置哈希值car,内部设三个键值对,即 # redis={car:{key1:1,key2:2,key3:3}} await redis.hmset_dict('car',key1=1,key2=2,key3=3)
#网络IO操作:遇到IO则自动切换任务 # 去Redis中获取值 result = await redis.hgetall('car',encoding='utf-8') print(result)
redis.close() #网络IO操作:遇到IO则自动切换任务 # 关闭redis连接 await redis.wait_closed()
print('结束',address) task_list = [ execute('redis://47.93.4.198:6379','root12345'), execute('redis://47.93.4.198:6379','root12345')]
asyncio.run(asyncio.wait(task_list))

5.2 异步MySQL

pip install aiomysql
- 示例1:
import asyncioimport aiomysql
async def execute(): # 网络IO操作:连接MySQL conn = await aiomysql.connect(host='127.0.0.1',port='3306' ,user='root',password='123456',db='mysql') # 网络IO操作:创建cursor cur = await conn.cursor()
# 网络IO操作:执行SQL await cur.execute("SELECT Host,User FROM user")
# 网络IO操作:获取SQL结果 result = await cur.fetchall() print(result)
# 网络IO操作:关闭链接 await cur.close() conn.close() asyncio.run(execute())
- 示例2:
import asyncioimport aiomysql
async def execute(host,password): print('开始',host) # 网络IO操作:先去连接47.93.4.198:6379,遇到IO则自动切换任务 conn = await aiomysql.connect(host='127.0.0.1', port='3306' , user='root', password='123456', db='mysql') # 网络IO操作:遇到IO则自动切换任务 cur = await conn.cursor()
# 网络IO操作:遇到IO则自动切换任务 await cur.execute("SELECT Host,User FROM user")
# 网络IO操作:遇到IO则自动切换任务 result = await cur.fetchall() print(result)
# 网络IO操作:遇到IO则自动切换任务 await cur.close() conn.close() print('结束',host)
task_list = [ execute('47.93.41.197','root12345'), execute('47.93.40.197','root12345')]
asyncio.run(asyncio.wait(task_list))

5.3 fastAPI框架

pip install fastapi,uvicorn #(asgi,内部基于uvloop)
- 示例1:
# 同步import asyncioimport uvicornfrom fastapi import FastAPI
app = FastAPI()
@app.get("/")def index(): """普通操作接口""" # 某个IO操作10s return {'message':'Hello World!'}
if __name__ =="__main__":    uvicorn.run("lufy:app",host="127.0.0.1",port='3306',log_level="info")
- 示例2:lufy.py
import asyncioimport uvicornimport aioredisfrom aioredis import Redisfrom fastapi import FastAPI
app = FastAPI()REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379',password='root123',minsize=1,maxsize=10)
@app.get("/")def index(): """普通操作接口""" # 某个IO操作10s return {'message':'Hello World!'}
@app.get('/red')async def red(): """异步操作接口""" print('请求来了') await asyncio.sleep(1) #连接池获取一个连接 conn = await REDIS_POOL.acquire() redis = Redis(conn) #设置值 await redis.hmset_dict('car',key1=1,key2=2,key3=3)
#读取值 result = await redis.hgetall('car', encoding='utf-8') print(result) #连接归还连接池 REDIS_POOL.release(conn) return result if __name__ =="__main__":    uvicorn.run("lufy:app",host="127.0.0.1",port='3306',log_level="info")

5.4 爬虫

pip install aiohttp
- 示例
import aiohttpimport asyncio
async def fetch(session,url): print('发送请求',url) async with session.get(url,verify_ssl=False) as response: text = await response.text print('得到结果',url,len(text)) return text
async def main(): async with aiohttp.ClientSession() as session: url_list = [ 'http://python.org', 'http://www.baidu.com', 'http://www.pythonav.com' ] tasks = [asyncio.create_task(fetch(session,url)) for url in url_list] done,pending = await asyncio.wait(tasks) if __name__ =="__main__":    asyncio.run(main())
协程最大的意义:通过一个线程利用其IO等待的时间去做一些其他的事情
本篇是根据B站up主:路飞学城IT上传的“2020年Python爬虫全套课程(学完可做项目)”编写