Python进阶5-多线程
多线程 RLock(可重入锁) acquire () 能够不被阻塞的被同一个线程调用多次。但是要注意的是 release () 需要调用与 acquire () 相同的次数才能释放锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import threadinglock = threading.Lock() num = 0 def func (n ): lock.acquire() global num num += 1 print ('{}: ' .format (n), num) lock.release() t1 = threading.Thread(target=func, args=('Tom' ,), name='Thread1' , daemon=True ) t2 = threading.Thread(target=func, args=('Bob' ,), name='Thread2' , daemon=True ) t1.start() t2.start() t1.join() t2.join()
threadlocal local_school = threading.local() 这是一个全局对象(无需global声明),多线程读写互不干扰,可随意添加属性(子线程中访问不到主线程中的属性) local_school.name = ‘abc’
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import threadingimport timelocal_obj = threading.local() def func (): local_obj.var = 0 func_print() def func_print (): for k in range (100 ): time.sleep(0.01 ) local_obj.var += 1 print (f'线程id:{threading.get_ident()} ,thread-local数据:{local_obj.var} ' ) for th in range (3 ): threading.Thread(target=func,).start() 输出: 线程id :15952 ,thread-local数据:100 线程id :7152 ,thread-local数据:100 线程id :13588 ,thread-local数据:100
LocalProxy 使用threading.local()对象虽然可以基于线程存储全局变量,但是在Web应用中可能会存在如下问题:
有些应用使用的是greenlet协程,这种情况下无法保证协程之间数据的隔离,因为不同的协程可以在同一个线程当中。
即使使用的是线程,WSGI应用也无法保证每个http请求使用的都是不同的线程,因为后一个http请求可能使用的是之前的http请求的线程,这样的话存储于thread local中的数据可能是之前残留的数据。
==使用==
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from werkzeug.local import Local, LocalManager local = Local() local_manager = LocalManager([local]) def application (environ, start_response ): local.request = request = Request(environ) release_local(local) application = local_manager.make_middleware(application)
==Local的实现==
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 try : from greenlet import getcurrent as get_ident except ImportError: try : from thread import get_ident except ImportError: from _thread import get_ident class Local (object ): __slots__ = ('__storage__' , '__ident_func__' ) def __init__ (self ): object .__setattr__(self , '__storage__' , {}) object .__setattr__(self , '__ident_func__' , get_ident) def __iter__ (self ): return iter (self .__storage__.items()) def __call__ (self, proxy ): """Create a proxy for a name.""" return LocalProxy(self , proxy) def __release_local__ (self ): self .__storage__.pop(self .__ident_func__(), None ) def __getattr__ (self, name ): try : return self .__storage__[self .__ident_func__()][name] except KeyError: raise AttributeError(name) def __setattr__ (self, name, value ): ident = self .__ident_func__() storage = self .__storage__ try : storage[ident][name] = value except KeyError: storage[ident] = {name: value} def __delattr__ (self, name ): try : del self .__storage__[self .__ident_func__()][name] except KeyError: raise AttributeError(name)
==LocalStack== LocalStack与Local对象类似,都是可以基于Greenlet协程或者线程进行全局存储的存储空间(实际LocalStack是对Local进行了二次封装),区别在于其数据结构是栈的形式。
1 2 3 4 5 6 7 8 9 10 11 12 >>> ls = LocalStack()>>> ls.push(42 )>>> ls.top42 >>> ls.push(23 )>>> ls.top23 >>> ls.pop()23 >>> ls.top42
==LocalProxy== LocalProxy用于代理Local对象和LocalStack对象,而所谓代理就是作为中间的代理人来处理所有针对被代理对象的操作,
初始化:通过Local或者LocalStack对象的__call__ method
1 2 3 4 5 6 7 8 9 10 11 12 13 from werkzeug.local import Locall = Local() request = l('request' ) user = l('user' ) from werkzeug.local import LocalStack_response_local = LocalStack() response = _response_local()
初始化:通过LocalProxy类进行初始化
1 2 3 l = Local() request = LocalProxy(l, 'request' )
初始化:使用callable对象作为参数 通过传递一个函数,我们可以自定义如何返回Local或LocalStack对象
1 2 request = LocalProxy(get_current_request())
代码解析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 @implements_bool class LocalProxy (object ): __slots__ = ('__local' , '__dict__' , '__name__' , '__wrapped__' ) def __init__ (self, local, name=None ): object .__setattr__(self , '_LocalProxy__local' , local) object .__setattr__(self , '__name__' , name) if callable (local) and not hasattr (local, '__release_local__' ): object .__setattr__(self , '__wrapped__' , local) def _get_current_object (self ): """Return the current object. This is useful if you want the real object behind the proxy at a time for performance reasons or because you want to pass the object into a different context. """ if not hasattr (self .__local, '__release_local__' ): return self .__local() try : return getattr (self .__local, self .__name__) except AttributeError: raise RuntimeError('no object bound to %s' % self .__name__) @property def __dict__ (self ): try : return self ._get_current_object().__dict__ except RuntimeError: raise AttributeError('__dict__' ) def __getattr__ (self, name ): if name == '__members__' : return dir (self ._get_current_object()) return getattr (self ._get_current_object(), name) def __setitem__ (self, key, value ): self ._get_current_object()[key] = value def __delitem__ (self, key ): del self ._get_current_object()[key] if PY2: __getslice__ = lambda x, i, j: x._get_current_object()[i:j] def __setslice__ (self, i, j, seq ): self ._get_current_object()[i:j] = seq def __delslice__ (self, i, j ): del self ._get_current_object()[i:j] __setattr__ = lambda x, n, v: setattr (x._get_current_object(), n, v) __delattr__ = lambda x, n: delattr (x._get_current_object(), n) __str__ = lambda x: str (x._get_current_object()) __lt__ = lambda x, o: x._get_current_object() < o __le__ = lambda x, o: x._get_current_object() <= o __eq__ = lambda x, o: x._get_current_object() == o
动态更新:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from werkzeug.local import LocalStackuser_stack = LocalStack() user_stack.push({'name' : 'Bob' }) user_stack.push({'name' : 'John' }) def get_user (): return user_stack.pop() user = get_user() print (user['name' ]) print (user['name' ]) ```python from werkzeug.local import LocalStack, LocalProxyuser_stack = LocalStack() user_stack.push({'name' : 'Bob' }) user_stack.push({'name' : 'John' }) def get_user (): return user_stack.pop() user = LocalProxy(get_user) print (user['name' ]) print (user['name' ])
信号量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import timefrom random import randomfrom threading import Thread, Semaphoresema = Semaphore(3 ) def foo (tid ): with sema: print ('{} acquire sema' .format (tid)) wt = random() * 2 time.sleep(wt) print ('{} release sema' .format (tid)) threads = [] for i in range (5 ): t = Thread(target=foo, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
Condition 一个线程等待特定条件,而另一个线程发出特定条件满足的信号。最好说明的例子就是「生产者 / 消费者」模型:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import timeimport threadingdef consumer (cond ): t = threading.current_thread() with cond: cond.wait() print ('{}: Resource is available to consumer' .format (t.name)) def producer (cond ): t = threading.current_thread() with cond: print ('{}: Making resource available' .format (t.name)) cond.notify_all() condition = threading.Condition() c1 = threading.Thread(name='c1' , target=consumer, args=(condition,)) c2 = threading.Thread(name='c2' , target=consumer, args=(condition,)) p = threading.Thread(name='p' , target=producer, args=(condition,)) c1.start() time.sleep(1 ) c2.start() time.sleep(1 ) p.start()
event 一个线程发送 / 传递事件,另外的线程等待事件的触发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import timeimport threadingfrom random import randintTIMEOUT = 2 def consumer (event, l ): t = threading.current_thread() while 1 : event_is_set = event.wait(TIMEOUT) if event_is_set: try : integer = l.pop() print ('{} popped from list by {}' .format (integer, t.name)) event.clear() except IndexError: pass def producer (event, l ): t = threading.current_thread() while 1 : integer = randint(10 , 100 ) l.append(integer) print ('{} appended to list by {}' .format (integer, t.name)) event.set () time.sleep(1 ) event = threading.Event() l = [] threads = [] for name in ('consumer1' , 'consumer2' ): t = threading.Thread(name=name, target=consumer, args=(event, l)) t.start() threads.append(t) p = threading.Thread(name='producer1' , target=producer, args=(event, l)) p.start() threads.append(p) for t in threads: t.join()
线程通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import threadingimport queueimport randomimport timedef myqueue (queue ): while not queue.empty(): item = queue.get() if item is None : break print ("{} removed {} from the queue" .format (threading.current_thread(), item)) queue.task_done() time.sleep(2 ) q = queue.Queue() for i in range (5 ): q.put(i) threads = [] for i in range (4 ): thread = threading.Thread(target=myqueue, args=(q,)) thread.start() threads.append(thread) for thread in threads: thread.join()
线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from concurrent.futures import ThreadPoolExecutor, as_completed import time def square (n ): time.sleep(1 ) return n * n if __name__ == '__main__' : with ThreadPoolExecutor(max_workers=3 ) as executor: tasks = [executor.submit(square, num) for num in range (10 )] for future in as_completed(tasks): print (future.result()) from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETEDpool = ThreadPoolExecutor(max_workers=12 ) tasks = [] for k, v in media_list.items(): osspath = f"anki_source/{user_id} /apkg_{pkg_id} /{v} " tasks.append(pool.submit(upload_file, osspath)) wait(tasks, return_when=ALL_COMPLETED) pool.shutdown()
Eventloop 可以说是 asyncio 应用的核心,Eventloop 实例提供了注册、取消和执行任务和回调的方法。
协程 (Coroutine) 本质上是一个函数,特点是在代码块中可以将执行权交给其他协程
异步操作结束后会把最终结果设置到这个 Future 对象上
开发者并不需要直接操作 Future 这种底层对象,而是用 Future 的子类 Task 协同的调度协程以实现并发。