Python进阶6-协程
1.协程使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import asyncioasync def a (): print ('Coro a is running' ) await asyncio.sleep(2 ) print ('Coro a is done' ) return 'A' async def b (): print ('Coro b is running' ) await asyncio.sleep(1 ) print ('Coro b is done' ) return 'B' if __name__ == '__main__' : asyncio.run(main()) ''' loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() '''
1 2 3 4 async def main (): return_value_a, return_value_b = await asyncio.gather(a(), b(), return_exceptions=True ) print (return_value_a, return_value_b)
1 2 3 4 5 6 7 8 9 10 11 async def main (): task1 = asyncio.create_task(a()) task2 = asyncio.create_task(b()) done, pending = await asyncio.wait([task1, task2], return_when=asyncio.tasks.ALL_COMPLETED) for task in done: print (f'Task {task.get_name()} result: {task.result()} ' )
1 2 3 4 5 6 async def main (): task1 = asyncio.create_task(a()) task2 = asyncio.create_task(b()) await task1 await task2
1 2 3 4 5 async def main (): task1 = asyncio.ensure_future(a()) task2 = asyncio.ensure_future(b()) await task1 await task2
1 2 3 4 5 6 7 async def main (): loop = asyncio.get_event_loop() task1 = loop.create_task(a()) task2 = loop.create_task(b()) task2.cancel() await task1
1 2 3 4 5 6 7 async def main (): loop = asyncio.get_event_loop() task1 = loop.create_task(a()) task2 = asyncio.shield(b()) task2.cancel() await task1
2.协程原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 ''' 1.producer通过c.send(None)启动生成器 2.producer通过c.send(n)切换到consumer, 3.consumer通过yield拿到send消息,再通过yield切换到producer并把结果返回 4.producer在c.send(n)返回时拿到consumer结果,继续生产下一条消息 5.producer通过c.close()关闭consumer,结束 ''' import timedef consumer (): r = '' while True : n = yield r if not n: return print ('[CONSUMER] Consuming %s...' % n) r = 'OK' def produce (c ): c.send(None ) n = 0 while n < 5 : n = n + 1 print ('[PRODUCER] Producing %s...' % n) r = c.send(n) print ('[PRODUCER] Consumer return: %s' % r) c.close() c = consumer() produce(c) def grep (pattern ): while True : line = (yield ) if pattern in line: print (line) search = grep('coroutine' ) next (search) search.send("I love you" ) search.send("I love coroutine instead!" ) search.close()
task 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import asyncioclass Task (asyncio.futures.Future): def __init__ (self, gen, *,loop ): super ().__init__(loop=loop) self ._gen = gen self ._loop.call_soon(self ._step) def _step (self, val=None , exc=None ): try : if exc: f = self ._gen.throw(exc) else : f = self ._gen.send(val) except StopIteration as e: self .set_result(e.value) except Exception as e: self .set_exception(e) else : f.add_done_callback( self ._wakeup) def _wakeup (self, fut ): try : res = fut.result() except Exception as e: self ._step(None , e) else : self ._step(res, None ) async def foo (): await asyncio.sleep(2 ) print ('Hello Foo' ) async def bar (): await asyncio.sleep(1 ) print ('Hello Bar' ) loop = asyncio.get_event_loop() tasks = [Task(foo(), loop=loop), loop.create_task(bar())] loop.run_until_complete( asyncio.wait(tasks)) loop.close()
loop 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 import asynciofrom collections import dequedef done_callback (fut ): fut._loop.stop() class Loop : def __init__ (self ): self ._ready = deque() self ._stopping = False def create_task (self, coro ): Task = asyncio.tasks.Task task = Task(coro, loop=self ) return task def run_until_complete (self, fut ): tasks = asyncio.tasks fut = tasks.ensure_future( fut, loop=self ) fut.add_done_callback(done_callback) self .run_forever() fut.remove_done_callback(done_callback) def run_forever (self ): try : while 1 : self ._run_once() if self ._stopping: break finally : self ._stopping = False def call_soon (self, cb, *args ): self ._ready.append((cb, args)) def _run_once (self ): ntodo = len (self ._ready) for i in range (ntodo): t, a = self ._ready.popleft() t(*a) def stop (self ): self ._stopping = True def close (self ): self ._ready.clear() def call_exception_handler (self, c ): pass def get_debug (self ): return False async def foo (): print ('Hello Foo' ) async def bar (): print ('Hello Bar' ) loop = Loop() tasks = [loop.create_task(foo()), loop.create_task(bar())] loop.run_until_complete( asyncio.wait(tasks)) loop.close()
yield from yield from x 表达式对 x 对象所做的第一件事是,调用 iter(x),从中获取迭代器。
1 2 3 4 5 6 7 >>> def gen ():... yield from 'AB' ... yield from range (1 , 3 )... >>> list (gen())['A' , 'B' , 1 , 2 ]
3.asynccontextmanager asynccontextmanager 是 Python 中 contextlib 模块提供的一个装饰器,用于创建异步上下文管理器。异步上下文管理器类似于同步上下文管理器,但可以在异步代码中使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 from contextlib import asynccontextmanagerimport asyncio@asynccontextmanager async def async_manager (): print ("Entering async context" ) try : await asyncio.sleep(1 ) yield "resource" finally : await asyncio.sleep(1 ) print ("Exiting async context" ) async def main (): async with async_manager() as resource: print ("Inside async context:" , resource) asyncio.run(main()) $ Entering async context Inside async context: resource Exiting async context
4.异步装饰器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncio def async_decorator (func ): async def wrapper (*args, **kwargs ): print ("Starting asynchronous operation..." ) result = await func(*args, **kwargs) print ("Asynchronous operation completed: " , result) return result return wrapper @async_decorator async def my_function (): await asyncio.sleep(1 ) print ("my_function completed." ) return "Hello, world!" asyncio.run(my_function()) $ Starting asynchronous operation... my_function completed. Asynchronous operation completed: Hello, world!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncio def async_decorator_with_args (arg1, arg2 ): def inner_decorator (func ): async def wrapper (*args, **kwargs ): print (f"Starting asynchronous operation with {arg1} and {arg2} ..." ) result = await func(*args, **kwargs) print ("Asynchronous operation completed: " , result) return result return wrapper return inner_decorator @async_decorator_with_args("Parameter 1" , "Parameter 2" ) async def my_function (): await asyncio.sleep(1 ) print ("my_function completed." ) return "Hello, world!" asyncio.run(my_function()) $ Starting asynchronous operation with Parameter 1 and Parameter 2. .. my_function completed. Asynchronous operation completed: Hello, world!
5.执行回调函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asynciofrom functools import partialdef callback (future, n ): print (f'Result: {future.result()} , {n} ' ) async def a (): await asyncio.sleep(1 ) return 'A' async def main (): task = asyncio.create_task(a()) task.add_done_callback(partial(callback, n=1 )) await task asyncio.run(main()) $ Result: A, 1
6.执行同步代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncioimport timedef a (): time.sleep(1 ) return 'A' async def b (): await asyncio.sleep(1 ) return 'B' def show_perf (func ): print ('*' * 20 ) start = time.perf_counter() asyncio.run(func()) print (f'{func.__name__} Cost: {time.perf_counter() - start} ' ) async def c1 (): loop = asyncio.get_running_loop() await asyncio.gather( loop.run_in_executor(None , a), b() ) show_perf(c1) ******************** c1 Cost: 1.0054080998525023
锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioimport functoolsdef unlock (lock ): print ('callback releasing lock' ) lock.release() async def test (locker, lock ): print ('{} waiting for the lock' .format (locker)) with await lock: print ('{} acquired lock' .format (locker)) print ('{} released lock' .format (locker)) async def main (loop ): lock = asyncio.Lock() await lock.acquire() loop.call_later(0.1 , functools.partial(unlock, lock)) await asyncio.wait([test('l1' , lock), test('l2' , lock)]) loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()
条件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import asyncioimport functoolsasync def consumer (cond, name, second ): await asyncio.sleep(second) with await cond: await cond.wait() print ('{}: Resource is available to consumer' .format (name)) async def producer (cond ): await asyncio.sleep(2 ) for n in range (1 , 3 ): with await cond: print ('notifying consumer {}' .format (n)) cond.notify(n=n) await asyncio.sleep(0.1 ) async def producer2 (cond ): await asyncio.sleep(2 ) with await cond: print ('Making resource available' ) cond.notify_all() async def main (loop ): condition = asyncio.Condition() task = loop.create_task(producer(condition)) consumers = [consumer(condition, name, index) for index, name in enumerate (('c1' , 'c2' ))] await asyncio.wait(consumers) task.cancel() task = loop.create_task(producer2(condition)) consumers = [consumer(condition, name, index) for index, name in enumerate (('c1' , 'c2' ))] await asyncio.wait(consumers) task.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()
event 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncioimport functoolsdef set_event (event ): print ('setting event in callback' ) event.set () async def test (name, event ): print ('{} waiting for event' .format (name)) await event.wait() print ('{} triggered' .format (name)) async def main (loop ): event = asyncio.Event() print ('event start state: {}' .format (event.is_set())) loop.call_later( 0.1 , functools.partial(set_event, event) ) await asyncio.wait([test('e1' , event), test('e2' , event)]) print ('event end state: {}' .format (event.is_set())) loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()
queue 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import asyncioimport randomimport aiohttpNUMBERS = random.sample(range (100 ), 7 ) URL = 'http://httpbin.org/get?a={}' sema = asyncio.Semaphore(3 ) async def fetch_async (a ): async with aiohttp.request('GET' , URL.format (a)) as r: data = await r.json() return data['args' ]['a' ] async def collect_result (a ): with (await sema): return await fetch_async(a) async def produce (queue ): for num in NUMBERS: print ('producing {}' .format (num)) item = (num, num) await queue.put(item) async def consume (queue ): while 1 : item = await queue.get() num = item[0 ] rs = await collect_result(num) print ('consuming {}...' .format (rs)) queue.task_done() async def run (): queue = asyncio.PriorityQueue() consumer = asyncio.ensure_future(consume(queue)) await produce(queue) await queue.join() consumer.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(run()) loop.close()
7.多进程+协程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import timeimport asyncioimport aiohttp from multiprocessing import Poolall_urls = ['https://www.baidu.com' ] * 400 async def get_html (url, sem ): async with (sem): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: html = await resp.text() def main (urls ): loop = asyncio.get_event_loop() sem = asyncio.Semaphore(10 ) tasks = [get_html(url, sem) for url in urls] loop.run_until_complete(asyncio.wait(tasks)) loop.close() if __name__ == '__main__' : start = time.time() p = Pool(4 ) for i in range (4 ): p.apply_async(main, args=(all_urls[i*100 :(i+1 )*100 ],)) p.close() p.join() print (time.time()-start)
多线程+协程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import timeimport requestsfrom concurrent.futures import ThreadPoolExecutorimport asyncioNUMBERS = range (12 ) URL = 'http://httpbin.org/get?a={}' def fetch (a ): r = requests.get(URL.format (a)) return r.json()['args' ]['a' ] async def run_scraper_tasks (executor ): loop = asyncio.get_event_loop() blocking_tasks = [] for num in NUMBERS: task = loop.run_in_executor(executor, fetch, num) task.__num = num blocking_tasks.append(task) completed, pending = await asyncio.wait(blocking_tasks) results = {t.__num: t.result() for t in completed} for num, result in sorted (results.items(), key=lambda x: x[0 ]): print ('fetch({}) = {}' .format (num, result)) start = time.time() executor = ThreadPoolExecutor(3 ) event_loop = asyncio.get_event_loop() event_loop.run_until_complete( run_scraper_tasks(executor) ) print ('Use asyncio+requests+ThreadPoolExecutor cost: {}' .format (time.time() - start))