vlambda博客
学习文章列表

​进化之旅 - Python 异步编程学习笔记

进化之旅 -  Python 异步编程学习笔记

  花了点时间在b站上看完了一个Python异步编程入门的视频,讲的蛮好的,代码示例也非常简单易懂,技术涵盖的面也是日常经常接触到的,为了后面方便回顾,做了一下笔记。

0x01 认识协程

协程, 又名微线程,是一种用户态的上下文切换技术。简而言之,其实就是在单个线程中,实现代码相互切换执行。

实现协程的4种方式:

  1. greenlet
  2. yield 方式
  3. asyncio 装饰器(py3.4)
  4. async、await 关键字(py3.5) [官方比较推荐使用的方式]

前三种方式,我直接略过了,主要学习第4种方式的用法。

async & await

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio


async def func1():
    print("func2 print 1")
    await asyncio.sleep(4)
    print("func2 print 3")


async def func2():
    print("func2 print 2")
    await asyncio.sleep(2)
    print("func2 print 4")


def main():
    tasks = [
        asyncio.ensure_future(func1()),
        asyncio.ensure_future(func2()),
    ]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    print("OK, Done!")


if __name__ == '__main__':
    main()

Output:

func2 print 1
func2 print 2
func2 print 4
func2 print 3
OK, Done!
[Finished in 4.1s]

0x02 协程意义

协程的意义,其本质是异步编程方式的实现。

优势:

单线程,占用更少的资源,资源利用也更充分,能够在IO等待的时间切换去执行其他代码。

0x03 异步编程

0x3.1 事件循环

你可以理解为后台有个死循环,一直在判断任务列表是否有人需要执行,还是需要等待,执行完的任务则从任务列表中删除。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio

def main():
    # 获取事件循环
    loop = asyncio.get_event_loop()
    # 将任务放到任务列表
    loop.run_until_complete(任务)

if __name__ == '__main__':
    main()

0x3.2 快速上手

定义协程函数:

async def coroutine_func():
  print("coroutine_func!")

协程对象,协程函数返回值就是一个协程对象。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio

async def coroutine_func():
  print("coroutine_func!")

result = coroutine_func()
print(type(result))

注意: 执行协程函数创建协程对象,函数内部代码不会执行。

执行协程内部代码,需要将协程对象提交给事件循环。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio


async def coroutine_func():
    print("coroutine_func!")


def main():
    result = coroutine_func()
    # 将协程对象提交给事件循环
    loop = asyncio.get_event_loop()
    loop.run_until_complete(result)


if __name__ == '__main__':
    main()

值得注意的是,在python3.7之后,可以简化事件循环的代码。

def main():
    result = coroutine_func()
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(result)
    # asyncio.run内部就是做的上面这两行代码的事情。
    asyncio.run(result)

0x3.3 await 关键字

await + 可等待对象(Future 、Task对象、协程对象,-> IO等待)

可以用于解决依赖上一步的返回结果然后再继续向后执行的情况。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio


async def step_1(msg=None):
    print(msg)
    print("step_1 start")
    await asyncio.sleep(3)
    print("step_1 end")
    return True


async def step_2():
    # await + 协程对象
    response = await step_1()
    if response:
        print("Success!")
    else:
        print("Failed!")
    # 协程函数内部能够使用多个await进行串联
    _response = await step_1(msg="_response")

def main():
    asyncio.run(step_2())


if __name__ == '__main__':
    main()

output:

None
step_1 start
step_1 end
Success!
_response
step_1 start
step_1 end
[Finished in 6.2s]

0x3.4 tasks对象

文档:https://docs.python.org/3/library/asyncio-task.html#asyncio.Task

A Future-like object that runs a Python coroutine. Not thread-safe.

Tasks are used to run coroutines in event loops. If a coroutine awaits on a Future, the Task suspends the execution of the coroutine and waits for the completion of the Future. When the Future is done, the execution of the wrapped coroutine resumes.

Event loops use cooperative scheduling: an event loop runs one Task at a time. While a Task awaits for the completion of a Future, the event loop runs other Tasks, callbacks, or performs IO operations.

说实话,可能我英文比较差,不是很理解这个,我将Tasks简单理解为,用于wrap 协程对象,从而实现能够在事件循环中添加多个任务。

Tasks 内部可以实现调度协程来执行不同的任务,从而实现多任务并发进行。

Use the high-level asyncio.create_task() function to create Tasks, or the low-level loop.create_task() or ensure_future() functions. Manual instantiation of Tasks is discouraged.

推荐使用asyncio.create_task来创建一个Task对象,还有其他两个更为底层的方法,但是不建议手动实例化Tasks对象。

注意:asyncio.create_task函数在Python3.7才加入。在Python3.7 之前,可以改用其他更加底层的asyncio.ensure_future,当然我一般不建议使用低版本的方式,要用就用最新的,我既然选择Python作为主力开发语言,就要有这种实时更新的觉悟。

Example1:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio


async def func(msg=None):
    print("1")
    await asyncio.sleep(3)
    print("2")
    return msg


async def main():
    task1 = asyncio.create_task(func("tasks"))
    task2 = asyncio.create_task(func("task2"))
    result1 = await task1
    result2 =  await task2
    print(result1)
    print(result2)


if __name__ == '__main__':
    asyncio.run(main())

这种方式调用有点多余,一般不用。

Example2:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio


async def func(msg=None):
    print("1")
    await asyncio.sleep(3)
    print("2")
    return msg


async def main():
  # 直接添加到事件循环,进行执行
    tasks = [
        asyncio.create_task(func("tasks"), name="task1"),
        asyncio.create_task(func("task2"), name="task2")
    ]
    # await阻塞,等待获取结果。
    done, pending = await asyncio.wait(tasks)
    print(done)

if __name__ == '__main__':
    asyncio.run(main())

Example3:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio

async def func(msg=None):
    print("1")
    await asyncio.sleep(3)
    print("2")
    return msg

tasks = [
    func("task1"),
    func("task2")
]

if __name__ == '__main__':
   # 统一执行
    result = asyncio.run(asyncio.wait(tasks))
    print(result)

0x3.5 future对象

Task是继承于Future的,Task对象内部await结果是基于Future对象。

Future内部会维护一个_state来说明任务的执行状态。

asyncio.future 在用户级代码中用的很少,并且需要手动设置返回值,故很少使用,不展开叙述。

0x3.6 concurrent.futures.Future 对象

python3.2引入了concurrent.futures库实现了使用线程池、进城池实现异步操作,比较好的一个官方说明例子:

线程池:

import asyncio
import concurrent.futures


def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom''rb'as f:
        return f.read(100)


def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))


async def main():
    loop = asyncio.get_running_loop()

    # Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

if __name__ == '__main__':
    asyncio.run(main())

多任务用法:

https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

import concurrent.futures
import time


def test_sleep(second):
    print(f"{second} sleep!")
    time.sleep(second)
    return second


def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=10as executor:
        future_to_url = {executor.submit(
            test_sleep, second): second for second in [12124534]}
        for future in concurrent.futures.as_completed(future_to_url):
            second = future_to_url[future]
            try:
                second = future.result()
            except Exception as exc:
                print(f"generated an exception: {exc}")
            else:
                print(f"Success, sleep {second}")


if __name__ == '__main__':
    main()

很漂亮的输出。

​进化之旅 - Python 异步编程学习笔记

0x3.7 异步和非异步混合方案

这种情况还是比较常见的,比如我们日常使用较多的requests库就不支持异步,所以我们需要采用混合的方式来让requests达到伪异步的效果。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio
import requests


async def download(url):
    loop = asyncio.get_event_loop()
    
    future = loop.run_in_executor(None, requests.get, url)
    try:
        response = await future
        print("Dowload Success!")
        print(response.headers)
    except Exception as exc:
        print(f"generated an execption: {exc}")


def main():
    urls = ['http://baidu.com'"http://qq.com""http://zhihu.com""http://ixasfa2xxx.com"]
    tasks = []
    for url in urls:
        tasks.append(download(url))
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(asyncio.wait(tasks))
    asyncio.run(asyncio.wait(tasks))


if __name__ == '__main__':
    main()

核心的点在于使用loop.run_in_executor(sync_func, param...)

0x3.8  异步迭代器

迭代器对象:

从技术层面而言,内部必须实现__iter__(iter方法触发,返回一个迭代器对象)和__next__方法。

异步迭代器:

实现了__aiter____anext__方法的对象,必须返回awaitable对象,async_for支持处理异步迭代器。

其中__anext__方法返回下一个等待的对象,直至引发一个stopAsyncIteration异常,这个改动由PEP 492 引入。

其中__aiter__方法一个asynchronous_iterator(异步迭代器),这个改动由PEP 492 引入。

有时候看到一些库会用这种用法,代码例子:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio


class Reader(object):
    """docstring for Reader"""

    def __init__(self):
        self.count = 0

    async def readline(self):
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val == None:
            raise StopAsyncIteration
        return val


async def main():
    obj = Reader()
    async for item in obj:
        print(item)

if __name__ == '__main__':
    asyncio.run(main())

0x3.9 异步上下文管理

此种对象通过定义__aenter____aexit__方法来对async with语句中的环境进行控制,由PEP 492 引入。

这两种方法,分别可用于初始化环境和执行完毕后清理环境。一般常见支持的异步库都会实现这两种方法,可以直接使用async with

代码示例:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio


class AsyncContentManager(object):
    """docstring for AsyncContentManager"""

    def __init__(self):
        pass

    async def do_something(self):
        return "ok"

    async def __aenter__(self):
        # try conect database
        print("__aener__")
        self.conn = await asyncio.sleep(1)
        return self

    async def __aexit__(self, exec_type, exc, tb):
        print("__aexit__")
        # close database conect
        await asyncio.sleep(2)


async def main():
    async with AsyncContentManager() as f:
        result = await f.do_something()
        print(result)

if __name__ == '__main__':
    asyncio.run(main())

0x04 uvloop

uvloop是asyncio默认事件循环的的一种替代方案,使用Cython实现,能够有效地提高性能2-4倍,让其并发性能能够接近GO,而且引用也非常简单,值得去使用。

安装uvloop:

pip3 install uvloop

注意:  uvicorn框架例如FastAPI/Django之所以快也是因为其内部使用了uvloop

Example:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import asyncio
import uvloop

# 替换掉默认的asyncio事件循环为uvloop的!
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

async def main():
    await asyncio.sleep(1)
    print("run main")

if __name__ == '__main__':
    asyncio.run(main())

0x05 异步操作Mysql和Redis

能够提高用于连接多个实例和多个SQL查询时的性能。

Mysql

文档:https://aiomysql.readthedocs.io/en/latest/

pip3 install aiomysql
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import aiomysql
import asyncio


async def execute(user, password):
    conn = await aiomysql.connect(host='127.0.0.1', port=3306,
                                       user=user, password=password, db='mysql'
                                  )
    # IO
    cur = await conn.cursor()
    # IO
    await cur.execute("SELECT Host,User FROM user")
    # Print
    print(cur.description)
    # IO
    r = await cur.fetchall()
    print(r)
    # IO
    await cur.close()
    conn.close()

asyncio.run(execute("root""123456"))

Redis

文档:https://aioredis.readthedocs.io/en/v0.3.4/

pip3 install aioredis
import asyncio
import aioredis


def main():
    loop = asyncio.get_event_loop()

    async def go():
        conn = await aioredis.create_connection(
            ('localhost'6379), encoding='utf-8')

        ok = await conn.execute('set''my-key''some value')
        assert ok == 'OK', ok

        str_value = await conn.execute('get''my-key')
        raw_value = await conn.execute('get''my-key', encoding=None)
        assert str_value == 'some value'
        assert raw_value == b'some value'

        print('str value:', str_value)
        print('raw value:', raw_value)

        # optionally close connection
        conn.close()
    loop.run_until_complete(go())


if __name__ == '__main__':
    main()

0x06 FastAPI框架

安装框架

pip3 install fastapi
pip3 install uvicorn (支持异步的asgi,基于uvloop实现)

示例:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import asyncio
import uvicorn
from time import sleep
from fastapi import FastAPI
app = FastAPI()


@app.get("/")
def index():
    return {"Message""Hello world!"}


@app.get("/demo")
async def demo():
    print("request coming!")
    await asyncio.sleep(10)
    return {"Message""demo ok!"}


if __name__ == '__main__':
    uvicorn.run("fast_api:app", host="127.0.0.1", port=9091)

0x07 异步爬虫

这个案例可以帮助我们入门aiohttp库的使用。

文档:https://docs.aiohttp.org/en/stable/

pip3 install aiohttp
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import aiohttp
import asyncio


async def fetch(session, url):
    async with session.get(url, verify_ssl=Falseas response:
        print("Status:", response.status)
        print("Content-type:", response.headers['content-type'])

        html = await response.text()
        print("Body:", html[:15], "...")
        return html


async def main():

    async with aiohttp.ClientSession() as session:
        urls = ["http://python.org""http://baidu.com""http://xxxx2asfa.com"]
        tasks = [asyncio.create_task(fetch(session, url)) for url in urls]
        done, pneding = await asyncio.wait(tasks)
        # for r in done:
        #     print(r.result())

if __name__ == '__main__':
    asyncio.run(main())

0x08 总结

  经过几个小时的学习,虽然依然不是很理解其底层具体实现逻辑,但已经能够简单运用上异步编程思维来解决一些问题,后面会花更多时间改善自己的扫描器平台,共勉!