本文共 6039 字,大约阅读时间需要 20 分钟。
事件循环是asyncio提供的核心运行机制。
和concurrent.futures.Future类似,通过Future对象可以了解任务执行的状态数据。事件循环来监控Future对象是否完成。
Task类是Tuture类的子类,它的作用就是把协程包装成一个Future对象。
import asyncio@asyncio.coroutinedef a(): for i in range(3): print('a.x', i) yield return 'a() return 1000'@asyncio.coroutinedef b(): for i in 'abcdefg': print('b.x', i) yield return 'b() return 2000'# x = a()# y = b()# print(asyncio.iscoroutinefunction(a))# print(asyncio.iscoroutine(x))## for j in range(3): # 循环,生成器函数# next(x)# next(y)loop = asyncio.get_event_loop() # 大循环def cb(fut): print(fut.result())ts = []for x in (a(), b()): t = asyncio.ensure_future(x) t.add_done_callback(cb) ts.append(t)task1 = loop.create_task(a())task2 = loop.create_task(b())# wait 迭代所有的coroutine对象,将他们封装成一个future对象# loop.run_until_complete(asyncio.wait((task1, task2)))ret = loop.run_until_complete(asyncio.wait(ts)) # 相当于遍历列表中的元素print(ret) # 结果为二元组,第一个是set,里面有2个task对象for p in ret[0]: print(p.result())
import asyncio@asyncio.coroutinedef sleep(): # count = 0 # for i in range(3): # yield count # count += 1 for i in range(3): yield from asyncio.sleep(1) # 相当于沉睡1秒 print('+++++++++++') return 'my return value = {}'.format(1000)print(asyncio.iscoroutinefunction(sleep)) # Trueprint(asyncio.iscoroutine(sleep())) # True,注意sleep的括号不能丢,不然结果为Falsedef cb(fut): print('future = {} ~~~'.format(fut))loop = asyncio.get_event_loop()print(type(loop))# Future =>Task # Task是Future的子类# future = asyncio.ensure_future(sleep()) # 确保得到一个future对象,对协程对象进行封装## print(1, future)# loop.run_until_complete(future) # 一定会带你执行下ensure_future# print(2, future)## print(future.result())# task = asyncio.ensure_future(sleep())# task = asyncio.create_task(sleep()) # python3.7版本以上才能使用task = loop.create_task(sleep())task.add_done_callback(cb)print(3, task)loop.run_until_complete(task) # 只能放一个future对象,以后会有很多任务,部分任务执行完了,会调用回调函数print(4, task)print(task.result(), '~~~~~~~')loop.close()
python3.4引入asyncio,使用装饰器,将生成器函数转换成协程函数,就可以在事件循环总执行了。ensure_future(coro_future),如果参数已经是future了,直接返回,如果是协程,则使用loop.create_task创建task,并返回task。
future对象都可以调用add_done_callback(fn)增加回调函数,回调函数是单参的,参数就是future对象。
注意:run_until_complete方法的返回结果,必须所有任务执行完才能看。
python3.5版本开始,python提供关键字async、await,在语言上原生支持协程。
import asyncioasync def a(): # async def 中不能出现yield,必须和await配合使用 for i in range(3): print('a.x', i) # await wait() await asyncio.sleep(0.1) return 'a() return 1000'async def b(): # 不能再出现yield for i in 'abc': print('b.x', i) # await wait() # await后往往是阻塞的函数,相当于yield from wait() await asyncio.sleep(0.1) return 'b() return 2000'loop = asyncio.get_event_loop()def cb(fut): print(fut.result())ts = []for x in (a(), b()): t = asyncio.ensure_future(x) t.add_done_callback(cb) ts.append(t)task1 = loop.create_task(a())task2 = loop.create_task(b())# 相当于遍历元组中的元素ret = loop.run_until_complete(asyncio.wait(ts))print(ret) # 结果为二元组,第一个是set,里面有2个task对象for p in ret[0]: print(p.result())
async def用来定义协程函数,iscoroutinefunction()返回True。协程函数总可以不包含,协程函数中可以不包含await、async关键字,但不能使用yield关键字。如同生成器函数调用返回生成器对象一样,协程函数调用也会返回一个协程对象,iscoroutine返回True。await语句之后是awaitable对象,可以是协程或者是实现了__await__()方法的对象,await会暂停当前协程执行,使loop调度其他协程。
import asynciofrom aiohttp import ClientSessionasync def get_html(src: str): async with ClientSession() as session: # 异步的with,会执行半截就暂停,执行其他的语句 async with session.get(src) as res: print(res.status) return await res.text()async def main(): url = await get_html('http://127.0.0.1:9988/12345?age=20&name=curry') print(url)loop = asyncio.get_event_loop()loop.run_until_complete(main())loop.close()"""import aiohttpimport asyncioasync def fetch(session, url): async with session.get(url) as response: return await response.text()async def main(): async with aiohttp.ClientSession() as session: html = await fetch(session, 'http://python.org') print(html)loop = asyncio.get_event_loop()loop.run_until_complete(main())"""
# https://aiohttp.readthedocs.io/en/stable/, aiohttp是小型的网页设计库,比flask、Djiango要小巧from aiohttp import webroutes = web.RouteTableDef() # 改为装饰器# https://aiohttp.readthedocs.io/en/stable/web_quickstart.html#run-a-simple-web-server"""routes = web.RouteTableDef()@routes.get('/')async def hello(request): return web.Response(text="Hello, world")app = web.Application()app.add_routes(routes)web.run_app(app)"""@routes.get('/')async def handle(request): # name = request.match_info.get('name', "Anonymous") # text = "Hello, " + name text = "{}
".format('你好,中国!') print(request) return web.Response(body=text, status=201, content_type='text/html', charset='utf-8') # 201是成功的把数据加入到了数据库,content_type默认是纯文本格式,现在设置为html@routes.get(r'/{id:\d+}') # r的意思是把后面当做原始字符串,不转义,\d+是正则表达式async def id_handle(request: web.Request): print(request.match_info.get('id', '0000')) # 获取不到,用默认值0000 text = 'path={}, qs={}'.format(request.path, request.query_string) # qs为查询条件(查询字符串) return web.Response(body=text, status=200) # 默认的状态码可以省略app = web.Application()# app.add_routes([web.get('/', handle), # 路径映射# # web.get('/{name}', handle),# web.get('/{id}', id_handle)]) # {id}为字典,利用正则表达式匹配app.add_routes(routes)if __name__ == "__main__": web.run_app(app, host='127.0.0.1', port=9988)"""async def handle(request): name = request.match_info.get('name', "Anonymous") text = "Hello, " + name return web.Response(text=text)app = web.Application()app.add_routes([web.get('/', handle), web.get('/{name}', handle)])web.run_app(app)"""
转载地址:http://epfvi.baihongyu.com/