threading --- 基于線(xiàn)程的并行?

源代碼: Lib/threading.py


這個(gè)模塊在較低級的模塊 _thread 基礎上建立較高級的線(xiàn)程接口。參見(jiàn): queue 模塊。

在 3.7 版更改: 這個(gè)模塊曾經(jīng)為可選項,但現在總是可用。

備注

在 Python 2.x 系列中,此模塊包含有某些方法和函數 camelCase 形式的名稱(chēng)。 它們在 Python 3.10 中已棄用,但為了與 Python 2.5 及更舊版本的兼容性而仍受到支持。

CPython implementation detail: 在 CPython 中,由于存在 全局解釋器鎖,同一時(shí)刻只有一個(gè)線(xiàn)程可以執行 Python 代碼(雖然某些性能導向的庫可能會(huì )去除此限制)。 如果你想讓你的應用更好地利用多核心計算機的計算資源,推薦你使用 multiprocessingconcurrent.futures.ProcessPoolExecutor。 但是,如果你想要同時(shí)運行多個(gè) I/O 密集型任務(wù),則多線(xiàn)程仍然是一個(gè)合適的模型。

這個(gè)模塊定義了以下函數:

threading.active_count()?

返回當前存活的 Thread 對象的數量。 返回值與 enumerate() 所返回的列表長(cháng)度一致。

函數 activeCount 是此函數的已棄用別名。

threading.current_thread()?

返回當前對應調用者的控制線(xiàn)程的 Thread 對象。如果調用者的控制線(xiàn)程不是利用 threading 創(chuàng )建,會(huì )返回一個(gè)功能受限的虛擬線(xiàn)程對象。

函數 currentThread 是此函數的已棄用別名。

threading.excepthook(args, /)?

處理由 Thread.run() 引發(fā)的未捕獲異常。

args 參數具有以下屬性:

  • exc_type: 異常類(lèi)型

  • exc_value: 異常值,可以是 None.

  • exc_traceback: 異?;厮?,可以是 None.

  • thread: 引發(fā)異常的線(xiàn)程,可以為 None。

如果 exc_typeSystemExit,則異常會(huì )被靜默地忽略。 在其他情況下,異常將被打印到 sys.stderr。

如果此函數引發(fā)了異常,則會(huì )調用 sys.excepthook() 來(lái)處理它。

threading.excepthook() 可以被重載以控制由 Thread.run() 引發(fā)的未捕獲異常的處理方式。

使用定制鉤子存放 exc_value 可能會(huì )創(chuàng )建引用循環(huán)。 它應當在不再需要異常時(shí)被顯式地清空以打破引用循環(huán)。

如果一個(gè)對象正在被銷(xiāo)毀,那么使用自定義的鉤子儲存 thread 可能會(huì )將其復活。請在自定義鉤子生效后避免儲存 thread,以避免對象的復活。

參見(jiàn)

sys.excepthook() 處理未捕獲的異常。

3.8 新版功能.

threading.__excepthook__?

保存 threading.excepthook() 的原始值。 它被保存以便在原始值碰巧被已損壞或替代對象所替換的情況下可被恢復。

3.10 新版功能.

threading.get_ident()?

返回當前線(xiàn)程的 “線(xiàn)程標識符”。它是一個(gè)非零的整數。它的值沒(méi)有直接含義,主要是用作 magic cookie,比如作為含有線(xiàn)程相關(guān)數據的字典的索引。線(xiàn)程標識符可能會(huì )在線(xiàn)程退出,新線(xiàn)程創(chuàng )建時(shí)被復用。

3.3 新版功能.

threading.get_native_id()?

返回內核分配給當前線(xiàn)程的原生集成線(xiàn)程 ID。 這是一個(gè)非負整數。 它的值可被用來(lái)在整個(gè)系統中唯一地標識這個(gè)特定線(xiàn)程(直到線(xiàn)程終結,在那之后該值可能會(huì )被 OS 回收再利用)。

Availability: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX, DragonFlyBSD.

3.8 新版功能.

threading.enumerate()?

返回當前所有存活的 Thread 對象的列表。 該列表包括守護線(xiàn)程以及 current_thread() 創(chuàng )建的空線(xiàn)程。 它不包括已終結的和尚未開(kāi)始的線(xiàn)程。 但是,主線(xiàn)程將總是結果的一部分,即使是在已終結的時(shí)候。

threading.main_thread()?

返回主 Thread 對象。一般情況下,主線(xiàn)程是Python解釋器開(kāi)始時(shí)創(chuàng )建的線(xiàn)程。

3.4 新版功能.

threading.settrace(func)?

為所有 threading 模塊開(kāi)始的線(xiàn)程設置追蹤函數。在每個(gè)線(xiàn)程的 run() 方法被調用前,func 會(huì )被傳遞給 sys.settrace() 。

threading.gettrace()?

返回由 settrace() 設置的跟蹤函數。

3.10 新版功能.

threading.setprofile(func)?

為所有 threading 模塊開(kāi)始的線(xiàn)程設置性能測試函數。在每個(gè)線(xiàn)程的 run() 方法被調用前,func 會(huì )被傳遞給 sys.setprofile() 。

threading.getprofile()?

返回由 setprofile() 設置的性能分析函數。

3.10 新版功能.

threading.stack_size([size])?

返回創(chuàng )建線(xiàn)程時(shí)使用的堆棧大小??蛇x參數 size 指定之后新建的線(xiàn)程的堆棧大小,而且一定要是0(根據平臺或者默認配置)或者最小是32,768(32KiB)的一個(gè)正整數。如果 size 沒(méi)有指定,默認是0。如果不支持改變線(xiàn)程堆棧大小,會(huì )拋出 RuntimeError 錯誤。如果指定的堆棧大小不合法,會(huì )拋出 ValueError 錯誤并且不會(huì )修改堆棧大小。32KiB是當前最小的能保證解釋器有足夠堆??臻g的堆棧大小。需要注意的是部分平臺對于堆棧大小會(huì )有特定的限制,例如要求大于32KiB的堆棧大小或者需要根據系統內存頁(yè)面的整數倍進(jìn)行分配 - 應當查閱平臺文檔有關(guān)詳細信息(4KiB頁(yè)面比較普遍,在沒(méi)有更具體信息的情況下,建議的方法是使用4096的倍數作為堆棧大?。?。

適用于: Windows,具有 POSIX 線(xiàn)程的系統。

這個(gè)模塊同時(shí)定義了以下常量:

threading.TIMEOUT_MAX?

阻塞函數( Lock.acquire(), RLock.acquire(), Condition.wait(), ...)中形參 timeout 允許的最大值。傳入超過(guò)這個(gè)值的 timeout 會(huì )拋出 OverflowError 異常。

3.2 新版功能.

這個(gè)模塊定義了許多類(lèi),詳見(jiàn)以下部分。

該模塊的設計基于 Java的線(xiàn)程模型。 但是,在Java里面,鎖和條件變量是每個(gè)對象的基礎特性,而在Python里面,這些被獨立成了單獨的對象。 Python 的 Thread 類(lèi)只是 Java 的 Thread 類(lèi)的一個(gè)子集;目前還沒(méi)有優(yōu)先級,沒(méi)有線(xiàn)程組,線(xiàn)程還不能被銷(xiāo)毀、停止、暫停、恢復或中斷。 Java 的 Thread 類(lèi)的靜態(tài)方法在實(shí)現時(shí)會(huì )映射為模塊級函數。

下述方法的執行都是原子性的。

線(xiàn)程本地數據?

線(xiàn)程本地數據是特定線(xiàn)程的數據。管理線(xiàn)程本地數據,只需要創(chuàng )建一個(gè) local (或者一個(gè)子類(lèi)型)的實(shí)例并在實(shí)例中儲存屬性:

mydata = threading.local()
mydata.x = 1

在不同的線(xiàn)程中,實(shí)例的值會(huì )不同。

class threading.local?

一個(gè)代表線(xiàn)程本地數據的類(lèi)。

更多相關(guān)細節和大量示例,參見(jiàn) _threading_local 模塊的文檔。

線(xiàn)程對象?

The Thread class represents an activity that is run in a separate thread of control. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass. No other methods (except for the constructor) should be overridden in a subclass. In other words, only override the __init__() and run() methods of this class.

當線(xiàn)程對象一但被創(chuàng )建,其活動(dòng)一定會(huì )因調用線(xiàn)程的 start() 方法開(kāi)始。這會(huì )在獨立的控制線(xiàn)程調用 run() 方法。

一旦線(xiàn)程活動(dòng)開(kāi)始,該線(xiàn)程會(huì )被認為是 '存活的' 。當它的 run() 方法終結了(不管是正常的還是拋出未被處理的異常),就不是'存活的'。 is_alive() 方法用于檢查線(xiàn)程是否存活。

其他線(xiàn)程可以調用一個(gè)線(xiàn)程的 join() 方法。這會(huì )阻塞調用該方法的線(xiàn)程,直到被調用 join() 方法的線(xiàn)程終結。

線(xiàn)程有名字。名字可以傳遞給構造函數,也可以通過(guò) name 屬性讀取或者修改。

如果 run() 方法引發(fā)了異常,則會(huì )調用 threading.excepthook() 來(lái)處理它。 在默認情況下,threading.excepthook() 會(huì )靜默地忽略 SystemExit。

一個(gè)線(xiàn)程可以被標記成一個(gè)“守護線(xiàn)程”。 這個(gè)標識的意義是,當剩下的線(xiàn)程都是守護線(xiàn)程時(shí),整個(gè) Python 程序將會(huì )退出。 初始值繼承于創(chuàng )建線(xiàn)程。 這個(gè)標識可以通過(guò) daemon 特征屬性或者 daemon 構造器參數來(lái)設置。

備注

守護線(xiàn)程在程序關(guān)閉時(shí)會(huì )突然關(guān)閉。他們的資源(例如已經(jīng)打開(kāi)的文檔,數據庫事務(wù)等等)可能沒(méi)有被正確釋放。如果你想你的線(xiàn)程正常停止,設置他們成為非守護模式并且使用合適的信號機制,例如: Event。

有個(gè) "主線(xiàn)程" 對象;這對應Python程序里面初始的控制線(xiàn)程。它不是一個(gè)守護線(xiàn)程。

There is the possibility that "dummy thread objects" are created. These are thread objects corresponding to "alien threads", which are threads of control started outside the threading module, such as directly from C code. Dummy thread objects have limited functionality; they are always considered alive and daemonic, and cannot be joined. They are never deleted, since it is impossible to detect the termination of alien threads.

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)?

應當始終使用關(guān)鍵字參數調用此構造函數。 參數如下:

group 應該為 None;為了日后擴展 ThreadGroup 類(lèi)實(shí)現而保留。

target 是用于 run() 方法調用的可調用對象。默認是 None,表示不需要調用任何方法。

name 是線(xiàn)程名稱(chēng)。 在默認情況下,會(huì )以 "Thread-N" 的形式構造唯一名稱(chēng),其中 N 為一個(gè)較小的十進(jìn)制數值,或是 "Thread-N (target)" 的形式,其中 "target" 為 target.__name__,如果指定了 target 參數的話(huà)。

args is a list or tuple of arguments for the target invocation. Defaults to ().

kwargs 是用于調用目標函數的關(guān)鍵字參數字典。默認是 {}。

如果不是 None,daemon 參數將顯式地設置該線(xiàn)程是否為守護模式。 如果是 None (默認值),線(xiàn)程將繼承當前線(xiàn)程的守護模式屬性。

如果子類(lèi)型重載了構造函數,它一定要確保在做任何事前,先發(fā)起調用基類(lèi)構造器(Thread.__init__())。

在 3.10 版更改: 使用 target 名稱(chēng),如果 name 參數被省略的話(huà)。

在 3.3 版更改: 加入 daemon 參數。

start()?

開(kāi)始線(xiàn)程活動(dòng)。

它在一個(gè)線(xiàn)程里最多只能被調用一次。它安排對象的 run() 方法在一個(gè)獨立的控制進(jìn)程中調用。

如果同一個(gè)線(xiàn)程對象中調用這個(gè)方法的次數大于一次,會(huì )拋出 RuntimeError 。

run()?

代表線(xiàn)程活動(dòng)的方法。

你可以在子類(lèi)型里重載這個(gè)方法。 標準的 run() 方法會(huì )對作為 target 參數傳遞給該對象構造器的可調用對象(如果存在)發(fā)起調用,并附帶從 argskwargs 參數分別獲取的位置和關(guān)鍵字參數。

Using list or tuple as the args argument which passed to the Thread could achieve the same effect.

Example:

>>>
>>> from threading import Thread
>>> t = Thread(target=print, args=[1])
>>> t.run()
1
>>> t = Thread(target=print, args=(1,))
>>> t.run()
1
join(timeout=None)?

等待,直到線(xiàn)程終結。這會(huì )阻塞調用這個(gè)方法的線(xiàn)程,直到被調用 join() 的線(xiàn)程終結 -- 不管是正常終結還是拋出未處理異常 -- 或者直到發(fā)生超時(shí),超時(shí)選項是可選的。

timeout 參數存在而且不是 None 時(shí),它應該是一個(gè)用于指定操作超時(shí)的以秒為單位的浮點(diǎn)數或者分數。因為 join() 總是返回 None ,所以你一定要在 join() 后調用 is_alive() 才能判斷是否發(fā)生超時(shí) -- 如果線(xiàn)程仍然存活,則 join() 超時(shí)。

timeout 參數不存在或者是 None ,這個(gè)操作會(huì )阻塞直到線(xiàn)程終結。

A thread can be joined many times.

如果嘗試加入當前線(xiàn)程會(huì )導致死鎖, join() 會(huì )引起 RuntimeError 異常。如果嘗試 join() 一個(gè)尚未開(kāi)始的線(xiàn)程,也會(huì )拋出相同的異常。

name?

只用于識別的字符串。它沒(méi)有語(yǔ)義。多個(gè)線(xiàn)程可以賦予相同的名稱(chēng)。 初始名稱(chēng)由構造函數設置。

getName()?
setName()?

已被棄用的 name 的取值/設值 API;請改為直接以特征屬性方式使用它。

3.10 版后已移除.

ident?

這個(gè)線(xiàn)程的 '線(xiàn)程標識符',如果線(xiàn)程尚未開(kāi)始則為 None 。這是個(gè)非零整數。參見(jiàn) get_ident() 函數。當一個(gè)線(xiàn)程退出而另外一個(gè)線(xiàn)程被創(chuàng )建,線(xiàn)程標識符會(huì )被復用。即使線(xiàn)程退出后,仍可得到標識符。

native_id?

此線(xiàn)程的線(xiàn)程 ID (TID),由 OS (內核) 分配。 這是一個(gè)非負整數,或者如果線(xiàn)程還未啟動(dòng)則為 None。 請參閱 get_native_id() 函數。 這個(gè)值可被用來(lái)在全系統范圍內唯一地標識這個(gè)特定線(xiàn)程 (直到線(xiàn)程終結,在那之后該值可能會(huì )被 OS 回收再利用)。

備注

類(lèi)似于進(jìn)程 ID,線(xiàn)程 ID 的有效期(全系統范圍內保證唯一)將從線(xiàn)程被創(chuàng )建開(kāi)始直到線(xiàn)程被終結。

可用性: 需要 get_native_id() 函數。

3.8 新版功能.

is_alive()?

返回線(xiàn)程是否存活。

run() 方法剛開(kāi)始直到 run() 方法剛結束,這個(gè)方法返回 True 。模塊函數 enumerate() 返回包含所有存活線(xiàn)程的列表。

daemon?

A boolean value indicating whether this thread is a daemon thread (True) or not (False). This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

當沒(méi)有存活的非守護線(xiàn)程時(shí),整個(gè)Python程序才會(huì )退出。

isDaemon()?
setDaemon()?

已被棄用的 daemon 的取值/設值 API;請改為直接以特征屬性方式使用它。

3.10 版后已移除.

鎖對象?

原始鎖是一個(gè)在鎖定時(shí)不屬于特定線(xiàn)程的同步基元組件。在Python中,它是能用的最低級的同步基元組件,由 _thread 擴展模塊直接實(shí)現。

原始鎖處于 "鎖定" 或者 "非鎖定" 兩種狀態(tài)之一。它被創(chuàng )建時(shí)為非鎖定狀態(tài)。它有兩個(gè)基本方法, acquire()release() 。當狀態(tài)為非鎖定時(shí), acquire() 將狀態(tài)改為 鎖定 并立即返回。當狀態(tài)是鎖定時(shí), acquire() 將阻塞至其他線(xiàn)程調用 release() 將其改為非鎖定狀態(tài),然后 acquire() 調用重置其為鎖定狀態(tài)并返回。 release() 只在鎖定狀態(tài)下調用; 它將狀態(tài)改為非鎖定并立即返回。如果嘗試釋放一個(gè)非鎖定的鎖,則會(huì )引發(fā) RuntimeError  異常。

鎖同樣支持 上下文管理協(xié)議。

當多個(gè)線(xiàn)程在 acquire() 等待狀態(tài)轉變?yōu)槲存i定被阻塞,然后 release() 重置狀態(tài)為未鎖定時(shí),只有一個(gè)線(xiàn)程能繼續執行;至于哪個(gè)等待線(xiàn)程繼續執行沒(méi)有定義,并且會(huì )根據實(shí)現而不同。

所有方法的執行都是原子性的。

class threading.Lock?

實(shí)現原始鎖對象的類(lèi)。一旦一個(gè)線(xiàn)程獲得一個(gè)鎖,會(huì )阻塞隨后嘗試獲得鎖的線(xiàn)程,直到它被釋放;任何線(xiàn)程都可以釋放它。

需要注意的是 Lock 其實(shí)是一個(gè)工廠(chǎng)函數,返回平臺支持的具體鎖類(lèi)中最有效的版本的實(shí)例。

acquire(blocking=True, timeout=- 1)?

可以阻塞或非阻塞地獲得鎖。

當調用時(shí)參數 blocking 設置為 True (缺省值),阻塞直到鎖被釋放,然后將鎖鎖定并返回 True 。

在參數 blocking 被設置為 False 的情況下調用,將不會(huì )發(fā)生阻塞。如果調用時(shí) blocking 設為 True 會(huì )阻塞,并立即返回 False ;否則,將鎖鎖定并返回 True。

When invoked with the floating-point timeout argument set to a positive value, block for at most the number of seconds specified by timeout and as long as the lock cannot be acquired. A timeout argument of -1 specifies an unbounded wait. It is forbidden to specify a timeout when blocking is False.

如果成功獲得鎖,則返回 True,否則返回 False (例如發(fā)生 超時(shí) 的時(shí)候)。

在 3.2 版更改: 新的 timeout 形參。

在 3.2 版更改: 現在如果底層線(xiàn)程實(shí)現支持,則可以通過(guò)POSIX上的信號中斷鎖的獲取。

release()?

釋放一個(gè)鎖。這個(gè)方法可以在任何線(xiàn)程中調用,不單指獲得鎖的線(xiàn)程。

當鎖被鎖定,將它重置為未鎖定,并返回。如果其他線(xiàn)程正在等待這個(gè)鎖解鎖而被阻塞,只允許其中一個(gè)允許。

當在未鎖定的鎖上發(fā)起調用時(shí),會(huì )引發(fā) RuntimeError。

沒(méi)有返回值。

locked()?

Return True if the lock is acquired.

遞歸鎖對象?

重入鎖是一個(gè)可以被同一個(gè)線(xiàn)程多次獲取的同步基元組件。在內部,它在基元鎖的鎖定/非鎖定狀態(tài)上附加了 "所屬線(xiàn)程" 和 "遞歸等級" 的概念。在鎖定狀態(tài)下,某些線(xiàn)程擁有鎖 ; 在非鎖定狀態(tài)下, 沒(méi)有線(xiàn)程擁有它。

若要鎖定鎖,線(xiàn)程調用其 acquire() 方法;一旦線(xiàn)程擁有了鎖,方法將返回。若要解鎖,線(xiàn)程調用 release() 方法。 acquire()/release() 對可以嵌套;只有最終 release() (最外面一對的 release() ) 將鎖解開(kāi),才能讓其他線(xiàn)程繼續處理 acquire() 阻塞。

遞歸鎖也支持 上下文管理協(xié)議。

class threading.RLock?

此類(lèi)實(shí)現了重入鎖對象。重入鎖必須由獲取它的線(xiàn)程釋放。一旦線(xiàn)程獲得了重入鎖,同一個(gè)線(xiàn)程再次獲取它將不阻塞;線(xiàn)程必須在每次獲取它時(shí)釋放一次。

需要注意的是 RLock 其實(shí)是一個(gè)工廠(chǎng)函數,返回平臺支持的具體遞歸鎖類(lèi)中最有效的版本的實(shí)例。

acquire(blocking=True, timeout=- 1)?

可以阻塞或非阻塞地獲得鎖。

當無(wú)參數調用時(shí): 如果這個(gè)線(xiàn)程已經(jīng)擁有鎖,遞歸級別增加一,并立即返回。否則,如果其他線(xiàn)程擁有該鎖,則阻塞至該鎖解鎖。一旦鎖被解鎖(不屬于任何線(xiàn)程),則搶奪所有權,設置遞歸等級為一,并返回。如果多個(gè)線(xiàn)程被阻塞,等待鎖被解鎖,一次只有一個(gè)線(xiàn)程能搶到鎖的所有權。在這種情況下,沒(méi)有返回值。

When invoked with the blocking argument set to True, do the same thing as when called without arguments, and return True.

When invoked with the blocking argument set to False, do not block. If a call without an argument would block, return False immediately; otherwise, do the same thing as when called without arguments, and return True.

When invoked with the floating-point timeout argument set to a positive value, block for at most the number of seconds specified by timeout and as long as the lock cannot be acquired. Return True if the lock has been acquired, False if the timeout has elapsed.

在 3.2 版更改: 新的 timeout 形參。

release()?

釋放鎖,自減遞歸等級。如果減到零,則將鎖重置為非鎖定狀態(tài)(不被任何線(xiàn)程擁有),并且,如果其他線(xiàn)程正被阻塞著(zhù)等待鎖被解鎖,則僅允許其中一個(gè)線(xiàn)程繼續。如果自減后,遞歸等級仍然不是零,則鎖保持鎖定,仍由調用線(xiàn)程擁有。

只有當前線(xiàn)程擁有鎖才能調用這個(gè)方法。如果鎖被釋放后調用這個(gè)方法,會(huì )引起 RuntimeError 異常。

沒(méi)有返回值。

條件對象?

條件變量總是與某種類(lèi)型的鎖對象相關(guān)聯(lián),鎖對象可以通過(guò)傳入獲得,或者在缺省的情況下自動(dòng)創(chuàng )建。當多個(gè)條件變量需要共享同一個(gè)鎖時(shí),傳入一個(gè)鎖很有用。鎖是條件對象的一部分,你不必單獨地跟蹤它。

條件變量遵循 上下文管理協(xié)議 :使用 with 語(yǔ)句會(huì )在它包圍的代碼塊內獲取關(guān)聯(lián)的鎖。 acquire()release() 方法也能調用關(guān)聯(lián)鎖的相關(guān)方法。

其它方法必須在持有關(guān)聯(lián)的鎖的情況下調用。 wait() 方法釋放鎖,然后阻塞直到其它線(xiàn)程調用 notify() 方法或 notify_all() 方法喚醒它。一旦被喚醒, wait() 方法重新獲取鎖并返回。它也可以指定超時(shí)時(shí)間。

The notify() method wakes up one of the threads waiting for the condition variable, if any are waiting. The notify_all() method wakes up all threads waiting for the condition variable.

注意: notify() 方法和 notify_all() 方法并不會(huì )釋放鎖,這意味著(zhù)被喚醒的線(xiàn)程不會(huì )立即從它們的 wait() 方法調用中返回,而是會(huì )在調用了 notify() 方法或 notify_all() 方法的線(xiàn)程最終放棄了鎖的所有權后返回。

使用條件變量的典型編程風(fēng)格是將鎖用于同步某些共享狀態(tài)的權限,那些對狀態(tài)的某些特定改變感興趣的線(xiàn)程,它們重復調用 wait() 方法,直到看到所期望的改變發(fā)生;而對于修改狀態(tài)的線(xiàn)程,它們將當前狀態(tài)改變?yōu)榭赡苁堑却咚诖男聽(tīng)顟B(tài)后,調用 notify() 方法或者 notify_all() 方法。例如,下面的代碼是一個(gè)通用的無(wú)限緩沖區容量的生產(chǎn)者-消費者情形:

# Consume one item
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# Produce one item
with cv:
    make_an_item_available()
    cv.notify()

使用 while 循環(huán)檢查所要求的條件成立與否是有必要的,因為 wait() 方法可能要經(jīng)過(guò)不確定長(cháng)度的時(shí)間后才會(huì )返回,而此時(shí)導致 notify() 方法調用的那個(gè)條件可能已經(jīng)不再成立。這是多線(xiàn)程編程所固有的問(wèn)題。 wait_for() 方法可自動(dòng)化條件檢查,并簡(jiǎn)化超時(shí)計算。

# Consume an item
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()

選擇 notify() 還是 notify_all() ,取決于一次狀態(tài)改變是只能被一個(gè)還是能被多個(gè)等待線(xiàn)程所用。例如在一個(gè)典型的生產(chǎn)者-消費者情形中,添加一個(gè)項目到緩沖區只需喚醒一個(gè)消費者線(xiàn)程。

class threading.Condition(lock=None)?

實(shí)現條件變量對象的類(lèi)。一個(gè)條件變量對象允許一個(gè)或多個(gè)線(xiàn)程在被其它線(xiàn)程所通知之前進(jìn)行等待。

如果給出了非 Nonelock 參數,則它必須為 Lock 或者 RLock 對象,并且它將被用作底層鎖。否則,將會(huì )創(chuàng )建新的 RLock 對象,并將其用作底層鎖。

在 3.3 版更改: 從工廠(chǎng)函數變?yōu)轭?lèi)。

acquire(*args)?

請求底層鎖。此方法調用底層鎖的相應方法,返回值是底層鎖相應方法的返回值。

release()?

釋放底層鎖。此方法調用底層鎖的相應方法。沒(méi)有返回值。

wait(timeout=None)?

等待直到被通知或發(fā)生超時(shí)。如果線(xiàn)程在調用此方法時(shí)沒(méi)有獲得鎖,將會(huì )引發(fā) RuntimeError 異常。

這個(gè)方法釋放底層鎖,然后阻塞,直到在另外一個(gè)線(xiàn)程中調用同一個(gè)條件變量的 notify()notify_all() 喚醒它,或者直到可選的超時(shí)發(fā)生。一旦被喚醒或者超時(shí),它重新獲得鎖并返回。

當提供了 timeout 參數且不是 None 時(shí),它應該是一個(gè)浮點(diǎn)數,代表操作的超時(shí)時(shí)間,以秒為單位(可以為小數)。

當底層鎖是個(gè) RLock ,不會(huì )使用它的 release() 方法釋放鎖,因為當它被遞歸多次獲取時(shí),實(shí)際上可能無(wú)法解鎖。相反,使用了 RLock 類(lèi)的內部接口,即使多次遞歸獲取它也能解鎖它。 然后,在重新獲取鎖時(shí),使用另一個(gè)內部接口來(lái)恢復遞歸級別。

返回 True ,除非提供的 timeout 過(guò)期,這種情況下返回 False。

在 3.2 版更改: 很明顯,方法總是返回 None。

wait_for(predicate, timeout=None)?

等待,直到條件計算為真。 predicate 應該是一個(gè)可調用對象而且它的返回值可被解釋為一個(gè)布爾值??梢蕴峁?timeout 參數給出最大等待時(shí)間。

這個(gè)實(shí)用方法會(huì )重復地調用 wait() 直到滿(mǎn)足判斷式或者發(fā)生超時(shí)。返回值是判斷式最后一個(gè)返回值,而且如果方法發(fā)生超時(shí)會(huì )返回 False 。

忽略超時(shí)功能,調用此方法大致相當于編寫(xiě):

while not predicate():
    cv.wait()

因此,規則同樣適用于 wait() :鎖必須在被調用時(shí)保持獲取,并在返回時(shí)重新獲取。 隨著(zhù)鎖定執行判斷式。

3.2 新版功能.

notify(n=1)?

默認喚醒一個(gè)等待這個(gè)條件的線(xiàn)程。如果調用線(xiàn)程在沒(méi)有獲得鎖的情況下調用這個(gè)方法,會(huì )引發(fā) RuntimeError 異常。

這個(gè)方法喚醒最多 n 個(gè)正在等待這個(gè)條件變量的線(xiàn)程;如果沒(méi)有線(xiàn)程在等待,這是一個(gè)空操作。

當前實(shí)現中,如果至少有 n 個(gè)線(xiàn)程正在等待,準確喚醒 n 個(gè)線(xiàn)程。但是依賴(lài)這個(gè)行為并不安全。未來(lái),優(yōu)化的實(shí)現有時(shí)會(huì )喚醒超過(guò) n 個(gè)線(xiàn)程。

注意:被喚醒的線(xiàn)程并沒(méi)有真正恢復到它調用的 wait() ,直到它可以重新獲得鎖。 因為 notify() 不釋放鎖,其調用者才應該這樣做。

notify_all()?

喚醒所有正在等待這個(gè)條件的線(xiàn)程。這個(gè)方法行為與 notify() 相似,但并不只喚醒單一線(xiàn)程,而是喚醒所有等待線(xiàn)程。如果調用線(xiàn)程在調用這個(gè)方法時(shí)沒(méi)有獲得鎖,會(huì )引發(fā) RuntimeError 異常。

notifyAll 方法是此方法的已棄用別名。

信號量對象?

這是計算機科學(xué)史上最古老的同步原語(yǔ)之一,早期的荷蘭科學(xué)家 Edsger W. Dijkstra 發(fā)明了它。(他使用名稱(chēng) P()V() 而不是 acquire()release() )。

一個(gè)信號量管理一個(gè)內部計數器,該計數器因 acquire() 方法的調用而遞減,因 release() 方法的調用而遞增。 計數器的值永遠不會(huì )小于零;當 acquire() 方法發(fā)現計數器為零時(shí),將會(huì )阻塞,直到其它線(xiàn)程調用 release() 方法。

信號量對象也支持 上下文管理協(xié)議 。

class threading.Semaphore(value=1)?

該類(lèi)實(shí)現信號量對象。信號量對象管理一個(gè)原子性的計數器,代表 release() 方法的調用次數減去 acquire() 的調用次數再加上一個(gè)初始值。如果需要, acquire() 方法將會(huì )阻塞直到可以返回而不會(huì )使得計數器變成負數。在沒(méi)有顯式給出 value 的值時(shí),默認為1。

可選參數 value 賦予內部計數器初始值,默認值為 1 。如果 value 被賦予小于0的值,將會(huì )引發(fā) ValueError 異常。

在 3.3 版更改: 從工廠(chǎng)函數變?yōu)轭?lèi)。

acquire(blocking=True, timeout=None)?

獲取一個(gè)信號量。

在不帶參數的情況下調用時(shí):

  • 如果在進(jìn)入時(shí)內部計數器的值大于零,則將其減一并立即返回 True。

  • 如果在進(jìn)入時(shí)內部計數器的值為零,則將會(huì )阻塞直到被對 release() 的調用喚醒。 一旦被喚醒(并且計數器的值大于 0),則將計數器減 1 并返回 True。 每次對 release() 的調用將只喚醒一個(gè)線(xiàn)程。 線(xiàn)程被喚醒的次序是不可確定的。

When invoked with blocking set to False, do not block. If a call without an argument would block, return False immediately; otherwise, do the same thing as when called without arguments, and return True.

當發(fā)起調用時(shí)如果 timeout 不為 None,則它將阻塞最多 timeout 秒。 請求在此時(shí)段時(shí)未能成功完成獲取則將返回 False。 在其他情況下返回 True。

在 3.2 版更改: 新的 timeout 形參。

release(n=1)?

釋放一個(gè)信號量,將內部計數器的值增加 n。 當進(jìn)入時(shí)值為零且有其他線(xiàn)程正在等待它再次變?yōu)榇笥诹銜r(shí),則喚醒那 n 個(gè)線(xiàn)程。

在 3.9 版更改: 增加了 n 形參以一次性釋放多個(gè)等待線(xiàn)程。

class threading.BoundedSemaphore(value=1)?

該類(lèi)實(shí)現有界信號量。有界信號量通過(guò)檢查以確保它當前的值不會(huì )超過(guò)初始值。如果超過(guò)了初始值,將會(huì )引發(fā) ValueError 異常。在大多情況下,信號量用于保護數量有限的資源。如果信號量被釋放的次數過(guò)多,則表明出現了錯誤。沒(méi)有指定時(shí), value 的值默認為1。

在 3.3 版更改: 從工廠(chǎng)函數變?yōu)轭?lèi)。

Semaphore 例子?

信號量通常用于保護數量有限的資源,例如數據庫服務(wù)器。在資源數量固定的任何情況下,都應該使用有界信號量。在生成任何工作線(xiàn)程前,應該在主線(xiàn)程中初始化信號量。

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

工作線(xiàn)程生成后,當需要連接服務(wù)器時(shí),這些線(xiàn)程將調用信號量的 acquire 和 release 方法:

with pool_sema:
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()

使用有界信號量能減少這種編程錯誤:信號量的釋放次數多于其請求次數。

事件對象?

這是線(xiàn)程之間通信的最簡(jiǎn)單機制之一:一個(gè)線(xiàn)程發(fā)出事件信號,而其他線(xiàn)程等待該信號。

一個(gè)事件對象管理一個(gè)內部標識,調用 set() 方法可將其設置為 true ,調用 clear() 方法可將其設置為 false ,調用 wait() 方法將進(jìn)入阻塞直到標識為 true 。

class threading.Event?

實(shí)現事件對象的類(lèi)。事件對象管理一個(gè)內部標識,調用 set() 方法可將其設置為true。調用 clear() 方法可將其設置為 false 。調用 wait() 方法將進(jìn)入阻塞直到標識為true。這個(gè)標識初始時(shí)為 false 。

在 3.3 版更改: 從工廠(chǎng)函數變?yōu)轭?lèi)。

is_set()?

當且僅當內部標識為 true 時(shí)返回 True 。

isSet 方法是此方法的已棄用別名。

set()?

將內部標識設置為 true 。所有正在等待這個(gè)事件的線(xiàn)程將被喚醒。當標識為 true 時(shí),調用 wait() 方法的線(xiàn)程不會(huì )被被阻塞。

clear()?

將內部標識設置為 false 。之后調用 wait() 方法的線(xiàn)程將會(huì )被阻塞,直到調用 set() 方法將內部標識再次設置為 true 。

wait(timeout=None)?

阻塞線(xiàn)程直到內部變量為 true 。如果調用時(shí)內部標識為 true,將立即返回。否則將阻塞線(xiàn)程,直到調用 set() 方法將標識設置為 true 或者發(fā)生可選的超時(shí)。

當提供了timeout參數且不是 None 時(shí),它應該是一個(gè)浮點(diǎn)數,代表操作的超時(shí)時(shí)間,以秒為單位(可以為小數)。

當且僅當內部旗標在等待調用之前或者等待開(kāi)始之后被設為真值時(shí)此方法將返回 True,也就是說(shuō),它將總是返回 True 除非設定了超時(shí)且操作發(fā)生了超時(shí)。

在 3.1 版更改: 很明顯,方法總是返回 None。

定時(shí)器對象?

此類(lèi)表示一個(gè)操作應該在等待一定的時(shí)間之后運行 --- 相當于一個(gè)定時(shí)器。 Timer 類(lèi)是 Thread 類(lèi)的子類(lèi),因此可以像一個(gè)自定義線(xiàn)程一樣工作。

與線(xiàn)程一樣,通過(guò)調用 start() 方法啟動(dòng)定時(shí)器。而 cancel() 方法可以停止計時(shí)器(在計時(shí)結束前), 定時(shí)器在執行其操作之前等待的時(shí)間間隔可能與用戶(hù)指定的時(shí)間間隔不完全相同。

例如:

def hello():
    print("hello, world")

t = Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed
class threading.Timer(interval, function, args=None, kwargs=None)?

創(chuàng )建一個(gè)定時(shí)器,在經(jīng)過(guò) interval 秒的間隔事件后,將會(huì )用參數 args 和關(guān)鍵字參數 kwargs 調用 function。如果 argsNone (默認值),則會(huì )使用一個(gè)空列表。如果 kwargsNone (默認值),則會(huì )使用一個(gè)空字典。

在 3.3 版更改: 從工廠(chǎng)函數變?yōu)轭?lèi)。

cancel()?

停止定時(shí)器并取消執行計時(shí)器將要執行的操作。僅當計時(shí)器仍處于等待狀態(tài)時(shí)有效。

柵欄對象?

3.2 新版功能.

柵欄類(lèi)提供一個(gè)簡(jiǎn)單的同步原語(yǔ),用于應對固定數量的線(xiàn)程需要彼此相互等待的情況。線(xiàn)程調用 wait() 方法后將阻塞,直到所有線(xiàn)程都調用了 wait() 方法。此時(shí)所有線(xiàn)程將被同時(shí)釋放。

柵欄對象可以被多次使用,但進(jìn)程的數量不能改變。

這是一個(gè)使用簡(jiǎn)便的方法實(shí)現客戶(hù)端進(jìn)程與服務(wù)端進(jìn)程同步的例子:

b = Barrier(2, timeout=5)

def server():
    start_server()
    b.wait()
    while True:
        connection = accept_connection()
        process_server_connection(connection)

def client():
    b.wait()
    while True:
        connection = make_connection()
        process_client_connection(connection)
class threading.Barrier(parties, action=None, timeout=None)?

創(chuàng )建一個(gè)需要 parties 個(gè)線(xiàn)程的柵欄對象。如果提供了可調用的 action 參數,它會(huì )在所有線(xiàn)程被釋放時(shí)在其中一個(gè)線(xiàn)程中自動(dòng)調用。 timeout 是默認的超時(shí)時(shí)間,如果沒(méi)有在 wait() 方法中指定超時(shí)時(shí)間的話(huà)。

wait(timeout=None)?

沖出柵欄。當柵欄中所有線(xiàn)程都已經(jīng)調用了這個(gè)函數,它們將同時(shí)被釋放。如果提供了 timeout 參數,這里的 timeout 參數優(yōu)先于創(chuàng )建柵欄對象時(shí)提供的 timeout 參數。

函數返回值是一個(gè)整數,取值范圍在0到 parties -- 1,在每個(gè)線(xiàn)程中的返回值不相同??捎糜趶乃芯€(xiàn)程中選擇唯一的一個(gè)線(xiàn)程執行一些特別的工作。例如:

i = barrier.wait()
if i == 0:
    # Only one thread needs to print this
    print("passed the barrier")

如果創(chuàng )建柵欄對象時(shí)在構造函數中提供了 action 參數,它將在其中一個(gè)線(xiàn)程釋放前被調用。如果此調用引發(fā)了異常,柵欄對象將進(jìn)入損壞態(tài)。

如果發(fā)生了超時(shí),柵欄對象將進(jìn)入破損態(tài)。

如果柵欄對象進(jìn)入破損態(tài),或重置柵欄時(shí)仍有線(xiàn)程等待釋放,將會(huì )引發(fā) BrokenBarrierError 異常。

reset()?

重置柵欄為默認的初始態(tài)。如果柵欄中仍有線(xiàn)程等待釋放,這些線(xiàn)程將會(huì )收到 BrokenBarrierError 異常。

請注意使用此函數時(shí),如果存在狀態(tài)未知的其他線(xiàn)程,則可能需要執行外部同步。 如果柵欄已損壞則最好將其廢棄并新建一個(gè)。

abort()?

使柵欄處于損壞狀態(tài)。 這將導致任何現有和未來(lái)對 wait() 的調用失敗并引發(fā) BrokenBarrierError。 例如可以在需要中止某個(gè)線(xiàn)程時(shí)使用此方法,以避免應用程序的死鎖。

更好的方式是:創(chuàng )建柵欄時(shí)提供一個(gè)合理的超時(shí)時(shí)間,來(lái)自動(dòng)避免某個(gè)線(xiàn)程出錯。

parties?

沖出柵欄所需要的線(xiàn)程數量。

n_waiting?

當前時(shí)刻正在柵欄中阻塞的線(xiàn)程數量。

broken?

一個(gè)布爾值,值為 True 表明柵欄為破損態(tài)。

exception threading.BrokenBarrierError?

異常類(lèi),是 RuntimeError 異常的子類(lèi),在 Barrier 對象重置時(shí)仍有線(xiàn)程阻塞時(shí)和對象進(jìn)入破損態(tài)時(shí)被引發(fā)。

with 語(yǔ)句中使用鎖、條件和信號量?

這個(gè)模塊提供的帶有 acquire()release() 方法的對象,可以被用作 with 語(yǔ)句的上下文管理器。當進(jìn)入語(yǔ)句塊時(shí) acquire() 方法會(huì )被調用,退出語(yǔ)句塊時(shí) release() 會(huì )被調用。因此,以下片段:

with some_lock:
    # do something...

相當于:

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()

現在 Lock 、 RLock 、 Condition 、 SemaphoreBoundedSemaphore 對象可以用作 with 語(yǔ)句的上下文管理器。