3-4,协程&asyncio&异步编程补充
3.6 concurrent.futures.Future对象
- 使用线程池、进程池实现异步操作时使用到的对象
- 示例:
import timefrom concurrent.futures import Futurefrom concurrent.futures.thread import ThreadPoolExecutorfrom concurrent.futures.process import ProcessPoolExecutordef func(value):time.sleep(1)print(value)return 123#创建线程池pool = ThreadPoolExecutor(max_workers=5)#创建进程池# pool = ProcessPoolExecutor(max_workers=5)for i in range(10):fut = pool.submit(func,i)print(fut)
输出为
<Future at 0x9feea90 state=running><Future at 0xa82dd30 state=running><Future at 0xa83d0a0 state=running><Future at 0xa83d400 state=running><Future at 0xa83d790 state=running><Future at 0xa83db20 state=pending><Future at 0xa83dc40 state=pending><Future at 0xa83dd30 state=pending><Future at 0xa83de20 state=pending><Future at 0xa83df10 state=pending>0123457689
import timeimport asyncioimport concurrent.futuresdef func1():time.sleep(2)return 'SB'async def main():loop = asyncio.get_running_loop()#1.Run in the default loop's executor(默认为ThreadPoolExecutor)#第一步:内部会先调用ThreadPoolExecutor的submit方法去线程池申请一个线程去执行#func1函数,并返回一个concurrent.futures.Future对象#第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为#asyncio.Future对象。因为concurrent.futures.Future对象不支持await语法,#所以需要包装为asyncio.Future对象才能使用fut = loop.run_in_executor(None,func1)result = await futprint("default thread pool",result)# 2,run in a custom thread pool# with concurrent.futures.ThreadPoolExecutor() as pool:# result = await loop.run_in_executor(pool,func1)# print('custom thread pool',result)## #3, run in a custom process pool# with concurrent.futures.ThreadPoolExecutor() as pool:# result = await loop.run_in_executor(pool, func1)# print('custom process pool', result)asyncio.run(main())
import requestsimport timeimport asynciodef download_image(url):# 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)print('开始下载:',url)loop = asyncio.get_event_loop()# requests模块默认不支持异步操作,所以使用线程池来配合实现future = loop.run_in_executor(None,requests.get,url)response = await future #三个url的IO请求都发送,三个线程池在此等待print('下载完成') # 效果相同,浪费资源更多# 图片保存到本地file_name = url.split('/')[-1]with open(file_name,mode='wb') as fp:fp.write(response.content)if __name__ == '__main__':start_time = time.time()url_list = ['http://pic.netbian.com/uploads/allimg/180826/113958-153525479855be.jpg','http://img.netbian.com/file/2020/0413/ccb961b600fceffd457d557fcac09cad.jpg','http://img.netbian.com/file/2021/0104/a1177d8604e27cdaf0cbf34f5713e2a6.jpg']tasks = [download_image(url) for url in url_list]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))end_time = time.time()t = end_time - start_timeprint("下载时间为:", t)
3.7 异步迭代器(不重要)
import asyncioclass Reader(object):"""自定义异步迭代器(同时也是异步可迭代对象)"""def __init__(self):self.count = 0async def readline(self):await asyncio.sleep(1)self.count += 1if self.count == 100:return Nonereturn self.countdef __aiter__(self):return selfasync def __anext__(self):val = await self.readline()if val == None:raise StopIterationreturn valasync def func():obj = Reader()async for item in obj:# async for 必须写在协程函数里print(item)asyncio.run( func() )
3.8 异步上下文管理器
import asyncioclass AsyncContextManager:def __init__(self):# self.conn = connprint('')async def do_something(self):# 异步操作数据库return 666async def __aenter__(self):# 异步链接数据库self.conn = await asyncio.sleep(1)return selfasync def __aexit__(self,exc_type,exc,tb):# 异步关闭数据库链接await asyncio.sleep(1)async def main():async with AsyncContextManager() as f: # async with不能单独存在,必须嵌套在async函数中result = await f.do_something()print(result)asyncio.run(main())
输出
666
4,uvloop
pip install uvloop
import asyncioimport uvloopasyncio.set_event_loop_policy(uvloop.EventLoopPolicy())#编写asyncio的代码,和之前写的代码一致#内部的事件循环自动化变为uvloopasyncio.run(...)
注意:asgi(支持异步的web网络输出接口)->uvicorn基于uvloop效率高
5,实战案例
5.1 异步操作redis
import asyncioimport aioredisasync def execute(address,password):print('开始执行',address)#网络IO操作:创建redis链接redis = await aioredis.create_redis(address,password=password)#网络IO操作,在redis中设置哈希值car,内部设三个键值对,即# redis={car:{key1:1,key2:2,key3:3}}await redis.hmset_dict('car',key1=1,key2=2,key3=3)#网络IO操作:去Redis中获取值result = await redis.hgetall('car',encoding='utf-8')print(result)redis.close()#网络IO操作:关闭redis连接await redis.wait_closed()print('结束',address)asyncio.run(execute('redis://47.93.4.198:6379','root12345'))
import asyncioimport aioredisasync def execute(address,password):print('开始执行',address)#网络IO操作:先去连接47.93.4.198:6379,遇到IO则自动切换任务redis = await aioredis.create_redis(address,password=password)#网络IO操作,遇到IO则自动切换任务# 在redis中设置哈希值car,内部设三个键值对,即# redis={car:{key1:1,key2:2,key3:3}}await redis.hmset_dict('car',key1=1,key2=2,key3=3)#网络IO操作:遇到IO则自动切换任务# 去Redis中获取值result = await redis.hgetall('car',encoding='utf-8')print(result)redis.close()#网络IO操作:遇到IO则自动切换任务# 关闭redis连接await redis.wait_closed()print('结束',address)task_list = [execute('redis://47.93.4.198:6379','root12345'),execute('redis://47.93.4.198:6379','root12345')]asyncio.run(asyncio.wait(task_list))
5.2 异步MySQL
pip install aiomysql
import asyncioimport aiomysqlasync def execute():# 网络IO操作:连接MySQLconn = await aiomysql.connect(host='127.0.0.1',port='3306',user='root',password='123456',db='mysql')# 网络IO操作:创建cursorcur = await conn.cursor()# 网络IO操作:执行SQLawait cur.execute("SELECT Host,User FROM user")# 网络IO操作:获取SQL结果result = await cur.fetchall()print(result)# 网络IO操作:关闭链接await cur.close()conn.close()asyncio.run(execute())
import asyncioimport aiomysqlasync def execute(host,password):print('开始',host)# 网络IO操作:先去连接47.93.4.198:6379,遇到IO则自动切换任务conn = await aiomysql.connect(host='127.0.0.1', port='3306', user='root', password='123456', db='mysql')# 网络IO操作:遇到IO则自动切换任务cur = await conn.cursor()# 网络IO操作:遇到IO则自动切换任务await cur.execute("SELECT Host,User FROM user")# 网络IO操作:遇到IO则自动切换任务result = await cur.fetchall()print(result)# 网络IO操作:遇到IO则自动切换任务await cur.close()conn.close()print('结束',host)task_list = [execute('47.93.41.197','root12345'),execute('47.93.40.197','root12345')]asyncio.run(asyncio.wait(task_list))
5.3 fastAPI框架
pip install fastapi,uvicorn #(asgi,内部基于uvloop)
# 同步import asyncioimport uvicornfrom fastapi import FastAPIapp = FastAPI()def index():"""普通操作接口"""# 某个IO操作10sreturn {'message':'Hello World!'}if __name__ =="__main__":uvicorn.run("lufy:app",host="127.0.0.1",port='3306',log_level="info")
import asyncioimport uvicornimport aioredisfrom aioredis import Redisfrom fastapi import FastAPIapp = FastAPI()REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379',password='root123',minsize=1,maxsize=10)def index():"""普通操作接口"""# 某个IO操作10sreturn {'message':'Hello World!'}async def red():"""异步操作接口"""print('请求来了')await asyncio.sleep(1)#连接池获取一个连接conn = await REDIS_POOL.acquire()redis = Redis(conn)#设置值await redis.hmset_dict('car',key1=1,key2=2,key3=3)#读取值result = await redis.hgetall('car', encoding='utf-8')print(result)#连接归还连接池REDIS_POOL.release(conn)return resultif __name__ =="__main__":uvicorn.run("lufy:app",host="127.0.0.1",port='3306',log_level="info")
5.4 爬虫
pip install aiohttp
import aiohttpimport asyncioasync def fetch(session,url):print('发送请求',url)async with session.get(url,verify_ssl=False) as response:text = await response.textprint('得到结果',url,len(text))return textasync def main():async with aiohttp.ClientSession() as session:url_list = ['http://python.org','http://www.baidu.com','http://www.pythonav.com']tasks = [asyncio.create_task(fetch(session,url)) for url in url_list]done,pending = await asyncio.wait(tasks)if __name__ =="__main__":asyncio.run(main())
