3-4,协程&asyncio&异步编程补充
3.6 concurrent.futures.Future对象
- 使用线程池、进程池实现异步操作时使用到的对象
- 示例:
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutor
def func(value):
time.sleep(1)
print(value)
return 123
#创建线程池
pool = ThreadPoolExecutor(max_workers=5)
#创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func,i)
print(fut)
输出为
<Future at 0x9feea90 state=running>
<Future at 0xa82dd30 state=running>
<Future at 0xa83d0a0 state=running>
<Future at 0xa83d400 state=running>
<Future at 0xa83d790 state=running>
<Future at 0xa83db20 state=pending>
<Future at 0xa83dc40 state=pending>
<Future at 0xa83dd30 state=pending>
<Future at 0xa83de20 state=pending>
<Future at 0xa83df10 state=pending>
0
1
2
3
4
5
7
6
8
9
import time
import asyncio
import concurrent.futures
def func1():
time.sleep(2)
return 'SB'
async def main():
loop = asyncio.get_running_loop()
#1.Run in the default loop's executor(默认为ThreadPoolExecutor)
#第一步:内部会先调用ThreadPoolExecutor的submit方法去线程池申请一个线程去执行
#func1函数,并返回一个concurrent.futures.Future对象
#第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为
#asyncio.Future对象。因为concurrent.futures.Future对象不支持await语法,
#所以需要包装为asyncio.Future对象才能使用
fut = loop.run_in_executor(None,func1)
result = await fut
print("default thread pool",result)
# 2,run in a custom thread pool
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(pool,func1)
# print('custom thread pool',result)
#
# #3, run in a custom process pool
# with concurrent.futures.ThreadPoolExecutor() as pool:
# result = await loop.run_in_executor(pool, func1)
# print('custom process pool', result)
asyncio.run(main())
import requests
import time
import asyncio
def download_image(url):
# 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)
print('开始下载:',url)
loop = asyncio.get_event_loop()
# requests模块默认不支持异步操作,所以使用线程池来配合实现
future = loop.run_in_executor(None,requests.get,url)
response = await future #三个url的IO请求都发送,三个线程池在此等待
print('下载完成') # 效果相同,浪费资源更多
# 图片保存到本地
file_name = url.split('/')[-1]
with open(file_name,mode='wb') as fp:
fp.write(response.content)
if __name__ == '__main__':
start_time = time.time()
url_list = [
'http://pic.netbian.com/uploads/allimg/180826/113958-153525479855be.jpg',
'http://img.netbian.com/file/2020/0413/ccb961b600fceffd457d557fcac09cad.jpg',
'http://img.netbian.com/file/2021/0104/a1177d8604e27cdaf0cbf34f5713e2a6.jpg'
]
tasks = [download_image(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end_time = time.time()
t = end_time - start_time
print("下载时间为:", t)
3.7 异步迭代器(不重要)
import asyncio
class Reader(object):
"""自定义异步迭代器(同时也是异步可迭代对象)"""
def __init__(self):
self.count = 0
async def readline(self):
await asyncio.sleep(1)
self.count += 1
if self.count == 100:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val == None:
raise StopIteration
return val
async def func():
obj = Reader()
async for item in obj:# async for 必须写在协程函数里
print(item)
asyncio.run( func() )
3.8 异步上下文管理器
import asyncio
class AsyncContextManager:
def __init__(self):
# self.conn = conn
print('')
async def do_something(self):
# 异步操作数据库
return 666
async def __aenter__(self):
# 异步链接数据库
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self,exc_type,exc,tb):
# 异步关闭数据库链接
await asyncio.sleep(1)
async def main():
async with AsyncContextManager() as f: # async with不能单独存在,必须嵌套在async函数中
result = await f.do_something()
print(result)
asyncio.run(main())
输出
666
4,uvloop
pip install uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
#编写asyncio的代码,和之前写的代码一致
#内部的事件循环自动化变为uvloop
asyncio.run(...)
注意:asgi(支持异步的web网络输出接口)->uvicorn基于uvloop效率高
5,实战案例
5.1 异步操作redis
import asyncio
import aioredis
async def execute(address,password):
print('开始执行',address)
#网络IO操作:创建redis链接
redis = await aioredis.create_redis(address,password=password)
#网络IO操作,在redis中设置哈希值car,内部设三个键值对,即
# redis={car:{key1:1,key2:2,key3:3}}
await redis.hmset_dict('car',key1=1,key2=2,key3=3)
#网络IO操作:去Redis中获取值
result = await redis.hgetall('car',encoding='utf-8')
print(result)
redis.close()
#网络IO操作:关闭redis连接
await redis.wait_closed()
print('结束',address)
asyncio.run(execute('redis://47.93.4.198:6379','root12345'))
import asyncio
import aioredis
async def execute(address,password):
print('开始执行',address)
#网络IO操作:先去连接47.93.4.198:6379,遇到IO则自动切换任务
redis = await aioredis.create_redis(address,password=password)
#网络IO操作,遇到IO则自动切换任务
# 在redis中设置哈希值car,内部设三个键值对,即
# redis={car:{key1:1,key2:2,key3:3}}
await redis.hmset_dict('car',key1=1,key2=2,key3=3)
#网络IO操作:遇到IO则自动切换任务
# 去Redis中获取值
result = await redis.hgetall('car',encoding='utf-8')
print(result)
redis.close()
#网络IO操作:遇到IO则自动切换任务
# 关闭redis连接
await redis.wait_closed()
print('结束',address)
task_list = [
execute('redis://47.93.4.198:6379','root12345'),
execute('redis://47.93.4.198:6379','root12345')
]
asyncio.run(asyncio.wait(task_list))
5.2 异步MySQL
pip install aiomysql
import asyncio
import aiomysql
async def execute():
# 网络IO操作:连接MySQL
conn = await aiomysql.connect(host='127.0.0.1',port='3306'
,user='root',password='123456',db='mysql')
# 网络IO操作:创建cursor
cur = await conn.cursor()
# 网络IO操作:执行SQL
await cur.execute("SELECT Host,User FROM user")
# 网络IO操作:获取SQL结果
result = await cur.fetchall()
print(result)
# 网络IO操作:关闭链接
await cur.close()
conn.close()
asyncio.run(execute())
import asyncio
import aiomysql
async def execute(host,password):
print('开始',host)
# 网络IO操作:先去连接47.93.4.198:6379,遇到IO则自动切换任务
conn = await aiomysql.connect(host='127.0.0.1', port='3306'
, user='root', password='123456', db='mysql')
# 网络IO操作:遇到IO则自动切换任务
cur = await conn.cursor()
# 网络IO操作:遇到IO则自动切换任务
await cur.execute("SELECT Host,User FROM user")
# 网络IO操作:遇到IO则自动切换任务
result = await cur.fetchall()
print(result)
# 网络IO操作:遇到IO则自动切换任务
await cur.close()
conn.close()
print('结束',host)
task_list = [
execute('47.93.41.197','root12345'),
execute('47.93.40.197','root12345')
]
asyncio.run(asyncio.wait(task_list))
5.3 fastAPI框架
pip install fastapi,uvicorn #(asgi,内部基于uvloop)
# 同步
import asyncio
import uvicorn
from fastapi import FastAPI
app = FastAPI()
def index():
"""普通操作接口"""
# 某个IO操作10s
return {'message':'Hello World!'}
if __name__ =="__main__":
uvicorn.run("lufy:app",host="127.0.0.1",port='3306',log_level="info")
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app = FastAPI()
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379',password='root123',
minsize=1,maxsize=10)
def index():
"""普通操作接口"""
# 某个IO操作10s
return {'message':'Hello World!'}
async def red():
"""异步操作接口"""
print('请求来了')
await asyncio.sleep(1)
#连接池获取一个连接
conn = await REDIS_POOL.acquire()
redis = Redis(conn)
#设置值
await redis.hmset_dict('car',key1=1,key2=2,key3=3)
#读取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
#连接归还连接池
REDIS_POOL.release(conn)
return result
if __name__ =="__main__":
uvicorn.run("lufy:app",host="127.0.0.1",port='3306',log_level="info")
5.4 爬虫
pip install aiohttp
import aiohttp
import asyncio
async def fetch(session,url):
print('发送请求',url)
async with session.get(url,verify_ssl=False) as response:
text = await response.text
print('得到结果',url,len(text))
return text
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'http://python.org',
'http://www.baidu.com',
'http://www.pythonav.com'
]
tasks = [asyncio.create_task(fetch(session,url)) for url in url_list]
done,pending = await asyncio.wait(tasks)
if __name__ =="__main__":
asyncio.run(main())