multiprocessing
--- 基于進(jìn)程的并行?
概述?
multiprocessing
是一個(gè)支持使用與 threading
模塊類(lèi)似的 API 來(lái)產(chǎn)生進(jìn)程的包。 multiprocessing
包同時(shí)提供了本地和遠程并發(fā)操作,通過(guò)使用子進(jìn)程而非線(xiàn)程有效地繞過(guò)了 全局解釋器鎖。 因此,multiprocessing
模塊允許程序員充分利用給定機器上的多個(gè)處理器。 它在 Unix 和 Windows 上均可運行。
multiprocessing
模塊還引入了在 threading
模塊中沒(méi)有的API。一個(gè)主要的例子就是 Pool
對象,它提供了一種快捷的方法,賦予函數并行化處理一系列輸入值的能力,可以將輸入數據分配給不同進(jìn)程處理(數據并行)。下面的例子演示了在模塊中定義此類(lèi)函數的常見(jiàn)做法,以便子進(jìn)程可以成功導入該模塊。這個(gè)數據并行的基本例子使用了 Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
將在標準輸出中打印
[1, 4, 9]
Process
類(lèi)?
在 multiprocessing
中,通過(guò)創(chuàng )建一個(gè) Process
對象然后調用它的 start()
方法來(lái)生成進(jìn)程。 Process
和 threading.Thread
API 相同。 一個(gè)簡(jiǎn)單的多進(jìn)程程序示例是:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
要顯示所涉及的各個(gè)進(jìn)程ID,這是一個(gè)擴展示例:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
關(guān)于為什么 if __name__ == '__main__'
部分是必需的解釋?zhuān)垍⒁?jiàn) 編程指導。
上下文和啟動(dòng)方法?
根據不同的平臺, multiprocessing
支持三種啟動(dòng)進(jìn)程的方法。這些 啟動(dòng)方法 有
- spawn
父進(jìn)程會(huì )啟動(dòng)一個(gè)全新的 python 解釋器進(jìn)程。 子進(jìn)程將只繼承那些運行進(jìn)程對象的
run()
方法所必需的資源。 特別地,來(lái)自父進(jìn)程的非必需文件描述符和句柄將不會(huì )被繼承。 使用此方法啟動(dòng)進(jìn)程相比使用 fork 或 forkserver 要慢上許多。可在Unix和Windows上使用。 Windows上的默認設置。
- fork
父進(jìn)程使用
os.fork()
來(lái)產(chǎn)生 Python 解釋器分叉。子進(jìn)程在開(kāi)始時(shí)實(shí)際上與父進(jìn)程相同。父進(jìn)程的所有資源都由子進(jìn)程繼承。請注意,安全分叉多線(xiàn)程進(jìn)程是棘手的。只存在于Unix。Unix中的默認值。
- forkserver
程序啟動(dòng)并選擇* forkserver * 啟動(dòng)方法時(shí),將啟動(dòng)服務(wù)器進(jìn)程。從那時(shí)起,每當需要一個(gè)新進(jìn)程時(shí),父進(jìn)程就會(huì )連接到服務(wù)器并請求它分叉一個(gè)新進(jìn)程。分叉服務(wù)器進(jìn)程是單線(xiàn)程的,因此使用
os.fork()
是安全的。沒(méi)有不必要的資源被繼承。可在Unix平臺上使用,支持通過(guò)Unix管道傳遞文件描述符。
在 3.8 版更改: 對于 macOS,spawn 啟動(dòng)方式是默認方式。 因為 fork 可能導致subprocess崩潰,被認為是不安全的,查看 bpo-33725 。
在 3.4 版更改: 在所有unix平臺上添加支持了 spawn ,并且為一些unix平臺添加了 forkserver 。在Windows上子進(jìn)程不再繼承所有可繼承的父進(jìn)程句柄。
在 Unix 上通過(guò) spawn 和 forkserver 方式啟動(dòng)多進(jìn)程會(huì )同時(shí)啟動(dòng)一個(gè) 資源追蹤 進(jìn)程,負責追蹤當前程序的進(jìn)程產(chǎn)生的、并且不再被使用的命名系統資源(如命名信號量以及 SharedMemory
對象)。當所有進(jìn)程退出后,資源追蹤會(huì )負責釋放這些仍被追蹤的的對象。通常情況下是不會(huì )有這種對象的,但是假如一個(gè)子進(jìn)程被某個(gè)信號殺死,就可能存在這一類(lèi)資源的“泄露”情況。(泄露的信號量以及共享內存不會(huì )被釋放,直到下一次系統重啟,對于這兩類(lèi)資源來(lái)說(shuō),這是一個(gè)比較大的問(wèn)題,因為操作系統允許的命名信號量的數量是有限的,而共享內存也會(huì )占據主內存的一片空間)
要選擇一個(gè)啟動(dòng)方法,你應該在主模塊的 if __name__ == '__main__'
子句中調用 set_start_method()
。例如:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
在程序中 set_start_method()
不應該被多次調用。
或者,你可以使用 get_context()
來(lái)獲取上下文對象。上下文對象與 multiprocessing 模塊具有相同的API,并允許在同一程序中使用多種啟動(dòng)方法。:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
請注意,對象在不同上下文創(chuàng )建的進(jìn)程間可能并不兼容。 特別是,使用 fork 上下文創(chuàng )建的鎖不能傳遞給使用 spawn 或 forkserver 啟動(dòng)方法啟動(dòng)的進(jìn)程。
想要使用特定啟動(dòng)方法的庫應該使用 get_context()
以避免干擾庫用戶(hù)的選擇。
警告
'spawn'
和 'forkserver'
啟動(dòng)方法當前不能在Unix上和“凍結的”可執行內容一同使用(例如,有類(lèi)似 PyInstaller 和 cx_Freeze 的包產(chǎn)生的二進(jìn)制文件)。 'fork'
啟動(dòng)方法可以使用。
在進(jìn)程之間交換對象?
multiprocessing
支持進(jìn)程之間的兩種通信通道:
隊列
Queue
類(lèi)是一個(gè)近似queue.Queue
的克隆。 例如:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()隊列是線(xiàn)程和進(jìn)程安全的。
管道
Pipe()
函數返回一個(gè)由管道連接的連接對象,默認情況下是雙工(雙向)。例如:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()返回的兩個(gè)連接對象
Pipe()
表示管道的兩端。每個(gè)連接對象都有send()
和recv()
方法(相互之間的)。請注意,如果兩個(gè)進(jìn)程(或線(xiàn)程)同時(shí)嘗試讀取或寫(xiě)入管道的 同一 端,則管道中的數據可能會(huì )損壞。當然,在不同進(jìn)程中同時(shí)使用管道的不同端的情況下不存在損壞的風(fēng)險。
進(jìn)程間同步?
multiprocessing
包含來(lái)自 threading
的所有同步原語(yǔ)的等價(jià)物。例如,可以使用鎖來(lái)確保一次只有一個(gè)進(jìn)程打印到標準輸出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
不使用鎖的情況下,來(lái)自于多進(jìn)程的輸出很容易產(chǎn)生混淆。
使用工作進(jìn)程?
Pool
類(lèi)表示一個(gè)工作進(jìn)程池。它具有允許以幾種不同方式將任務(wù)分配到工作進(jìn)程的方法。
例如:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 seconds
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
請注意,進(jìn)程池的方法只能由創(chuàng )建它的進(jìn)程使用。
備注
這個(gè)包中的功能要求子進(jìn)程可以導入 __main__
模塊。雖然這在 編程指導 中有描述,但還是需要提前說(shuō)明一下。這意味著(zhù)一些示例在交互式解釋器中不起作用,比如 multiprocessing.pool.Pool
示例。例如:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(如果嘗試執行上面的代碼,它會(huì )以一種半隨機的方式將三個(gè)完整的堆棧內容交替輸出,然后你只能以某種方式停止父進(jìn)程。)
參考?
multiprocessing
包主要復制了 threading
模塊的API。
Process
和異常?
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)?
進(jìn)程對象表示在單獨進(jìn)程中運行的活動(dòng)。
Process
類(lèi)擁有和threading.Thread
等價(jià)的大部分方法。應始終使用關(guān)鍵字參數調用構造函數。 group 應該始終是
None
;它僅用于兼容threading.Thread
。 target 是由run()
方法調用的可調用對象。它默認為None
,意味著(zhù)什么都沒(méi)有被調用。 name 是進(jìn)程名稱(chēng)(有關(guān)詳細信息,請參閱name
)。 args 是目標調用的參數元組。 kwargs 是目標調用的關(guān)鍵字參數字典。如果提供,則鍵參數 daemon 將進(jìn)程daemon
標志設置為True
或False
。如果是None
(默認值),則該標志將從創(chuàng )建的進(jìn)程繼承。By default, no arguments are passed to target. The args argument, which defaults to
()
, can be used to specify a list or tuple of the arguments to pass to target.如果子類(lèi)重寫(xiě)構造函數,它必須確保它在對進(jìn)程執行任何其他操作之前調用基類(lèi)構造函數(
Process.__init__()
)。在 3.3 版更改: 加入 daemon 參數。
- run()?
表示進(jìn)程活動(dòng)的方法。
你可以在子類(lèi)中重載此方法。標準
run()
方法調用傳遞給對象構造函數的可調用對象作為目標參數(如果有),分別從 args 和 kwargs 參數中獲取順序和關(guān)鍵字參數。Using a list or tuple as the args argument passed to
Process
achieves the same effect.Example:
>>> from multiprocessing import Process >>> p = Process(target=print, args=[1]) >>> p.run() 1 >>> p = Process(target=print, args=(1,)) >>> p.run() 1
- start()?
啟動(dòng)進(jìn)程活動(dòng)。
這個(gè)方法每個(gè)進(jìn)程對象最多只能調用一次。它會(huì )將對象的
run()
方法安排在一個(gè)單獨的進(jìn)程中調用。
- join([timeout])?
如果可選參數 timeout 是
None
(默認值),則該方法將阻塞,直到調用join()
方法的進(jìn)程終止。如果 timeout 是一個(gè)正數,它最多會(huì )阻塞 timeout 秒。請注意,如果進(jìn)程終止或方法超時(shí),則該方法返回None
。檢查進(jìn)程的exitcode
以確定它是否終止。一個(gè)進(jìn)程可以被 join 多次。
進(jìn)程無(wú)法join自身,因為這會(huì )導致死鎖。嘗試在啟動(dòng)進(jìn)程之前join進(jìn)程是錯誤的。
- name?
進(jìn)程的名稱(chēng)。該名稱(chēng)是一個(gè)字符串,僅用于識別目的。它沒(méi)有語(yǔ)義??梢詾槎鄠€(gè)進(jìn)程指定相同的名稱(chēng)。
初始名稱(chēng)由構造器設定。 如果沒(méi)有為構造器提供顯式名稱(chēng),則會(huì )構造一個(gè)形式為 'Process-N1:N2:...:Nk' 的名稱(chēng),其中每個(gè) Nk 是其父親的第 N 個(gè)孩子。
- is_alive()?
返回進(jìn)程是否還活著(zhù)。
粗略地說(shuō),從
start()
方法返回到子進(jìn)程終止之前,進(jìn)程對象仍處于活動(dòng)狀態(tài)。
- daemon?
進(jìn)程的守護標志,一個(gè)布爾值。這必須在
start()
被調用之前設置。初始值繼承自創(chuàng )建進(jìn)程。
當進(jìn)程退出時(shí),它會(huì )嘗試終止其所有守護進(jìn)程子進(jìn)程。
請注意,不允許在守護進(jìn)程中創(chuàng )建子進(jìn)程。這是因為當守護進(jìn)程由于父進(jìn)程退出而中斷時(shí),其子進(jìn)程會(huì )變成孤兒進(jìn)程。 另外,這些 不是 Unix 守護進(jìn)程或服務(wù),它們是正常進(jìn)程,如果非守護進(jìn)程已經(jīng)退出,它們將被終止(并且不被合并)。
除了
threading.Thread
API ,Process
對象還支持以下屬性和方法:- pid?
返回進(jìn)程ID。在生成該進(jìn)程之前,這將是
None
。
- exitcode?
The child's exit code. This will be
None
if the process has not yet terminated.If the child's
run()
method returned normally, the exit code will be 0. If it terminated viasys.exit()
with an integer argument N, the exit code will be N.If the child terminated due to an exception not caught within
run()
, the exit code will be 1. If it was terminated by signal N, the exit code will be the negative value -N.
- authkey?
進(jìn)程的身份驗證密鑰(字節字符串)。
當
multiprocessing
初始化時(shí),主進(jìn)程使用os.urandom()
分配一個(gè)隨機字符串。當創(chuàng )建
Process
對象時(shí),它將繼承其父進(jìn)程的身份驗證密鑰,盡管可以通過(guò)將authkey
設置為另一個(gè)字節字符串來(lái)更改。參見(jiàn) 認證密碼 。
- sentinel?
系統對象的數字句柄,當進(jìn)程結束時(shí)將變?yōu)?"ready" 。
如果要使用
multiprocessing.connection.wait()
一次等待多個(gè)事件,可以使用此值。否則調用join()
更簡(jiǎn)單。在Windows上,這是一個(gè)操作系統句柄,可以與
WaitForSingleObject
和WaitForMultipleObjects
系列API調用一起使用。在Unix上,這是一個(gè)文件描述符,可以使用來(lái)自select
模塊的原語(yǔ)。3.3 新版功能.
- terminate()?
終止進(jìn)程。 在Unix上,這是使用
SIGTERM
信號完成的;在Windows上使用TerminateProcess()
。 請注意,不會(huì )執行退出處理程序和finally子句等。請注意,進(jìn)程的后代進(jìn)程將不會(huì )被終止 —— 它們將簡(jiǎn)單地變成孤立的。
警告
如果在關(guān)聯(lián)進(jìn)程使用管道或隊列時(shí)使用此方法,則管道或隊列可能會(huì )損壞,并可能無(wú)法被其他進(jìn)程使用。類(lèi)似地,如果進(jìn)程已獲得鎖或信號量等,則終止它可能導致其他進(jìn)程死鎖。
- kill()?
與
terminate()
相同,但在Unix上使用SIGKILL
信號。3.7 新版功能.
- close()?
關(guān)閉
Process
對象,釋放與之關(guān)聯(lián)的所有資源。如果底層進(jìn)程仍在運行,則會(huì )引發(fā)ValueError
。一旦close()
成功返回,Process
對象的大多數其他方法和屬性將引發(fā)ValueError
。3.7 新版功能.
注意
start()
、join()
、is_alive()
、terminate()
和exitcode
方法只能由創(chuàng )建進(jìn)程對象的進(jìn)程調用。Process
一些方法的示例用法:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.ProcessError?
所有
multiprocessing
異常的基類(lèi)。
- exception multiprocessing.BufferTooShort?
當提供的緩沖區對象太小而無(wú)法讀取消息時(shí),
Connection.recv_bytes_into()
引發(fā)的異常。如果
e
是一個(gè)BufferTooShort
實(shí)例,那么e.args[0]
將把消息作為字節字符串給出。
- exception multiprocessing.AuthenticationError?
出現身份驗證錯誤時(shí)引發(fā)。
- exception multiprocessing.TimeoutError?
有超時(shí)的方法超時(shí)時(shí)引發(fā)。
管道和隊列?
使用多進(jìn)程時(shí),一般使用消息機制實(shí)現進(jìn)程間通信,盡可能避免使用同步原語(yǔ),例如鎖。
消息機制包含: Pipe()
(可以用于在兩個(gè)進(jìn)程間傳遞消息),以及隊列(能夠在多個(gè)生產(chǎn)者和消費者之間通信)。
Queue
, SimpleQueue
以及 JoinableQueue
都是多生產(chǎn)者,多消費者,并且實(shí)現了 FIFO 的隊列類(lèi)型,其表現與標準庫中的 queue.Queue
類(lèi)相似。 不同之處在于 Queue
缺少標準庫的 queue.Queue
從 Python 2.5 開(kāi)始引入的 task_done()
和 join()
方法。
如果你使用了 JoinableQueue
,那么你 必須 對每個(gè)已經(jīng)移出隊列的任務(wù)調用 JoinableQueue.task_done()
。 不然的話(huà)用于統計未完成任務(wù)的信號量最終會(huì )溢出并拋出異常。
另外還可以通過(guò)使用一個(gè)管理器對象創(chuàng )建一個(gè)共享隊列,詳見(jiàn) 管理器 。
備注
multiprocessing
使用了普通的 queue.Empty
和 queue.Full
異常去表示超時(shí)。 你需要從 queue
中導入它們,因為它們并不在 multiprocessing
的命名空間中。
備注
當一個(gè)對象被放入一個(gè)隊列中時(shí),這個(gè)對象首先會(huì )被一個(gè)后臺線(xiàn)程用 pickle 序列化,并將序列化后的數據通過(guò)一個(gè)底層管道的管道傳遞到隊列中。 這種做法會(huì )有點(diǎn)讓人驚訝,但一般不會(huì )出現什么問(wèn)題。 如果它們確實(shí)妨礙了你,你可以使用一個(gè)由管理器 manager 創(chuàng )建的隊列替換它。
將一個(gè)對象放入一個(gè)空隊列后,可能需要極小的延遲,隊列的方法
empty()
才會(huì )返回False
。而get_nowait()
可以不拋出queue.Empty
直接返回。如果有多個(gè)進(jìn)程同時(shí)將對象放入隊列,那么在隊列的另一端接受到的對象可能是無(wú)序的。但是由同一個(gè)進(jìn)程放入的多個(gè)對象的順序在另一端輸出時(shí)總是一樣的。
警告
如果一個(gè)進(jìn)程在嘗試使用 Queue
期間被 Process.terminate()
或 os.kill()
調用終止了,那么隊列中的數據很可能被破壞。 這可能導致其他進(jìn)程在嘗試使用該隊列時(shí)發(fā)生異常。
警告
正如剛才提到的,如果一個(gè)子進(jìn)程將一些對象放進(jìn)隊列中 (并且它沒(méi)有用 JoinableQueue.cancel_join_thread
方法),那么這個(gè)進(jìn)程在所有緩沖區的對象被刷新進(jìn)管道之前,是不會(huì )終止的。
這意味著(zhù),除非你確定所有放入隊列中的對象都已經(jīng)被消費了,否則如果你試圖等待這個(gè)進(jìn)程,你可能會(huì )陷入死鎖中。相似地,如果該子進(jìn)程不是后臺進(jìn)程,那么父進(jìn)程可能在試圖等待所有非后臺進(jìn)程退出時(shí)掛起。
注意用管理器創(chuàng )建的隊列不存在這個(gè)問(wèn)題,詳見(jiàn) 編程指導 。
該 例子 展示了如何使用隊列實(shí)現進(jìn)程間通信。
- multiprocessing.Pipe([duplex])?
返回一對
Connection
對象(conn1, conn2)
, 分別表示管道的兩端。如果 duplex 被置為
True
(默認值),那么該管道是雙向的。如果 duplex 被置為False
,那么該管道是單向的,即conn1
只能用于接收消息,而conn2
僅能用于發(fā)送消息。
- class multiprocessing.Queue([maxsize])?
返回一個(gè)使用一個(gè)管道和少量鎖和信號量實(shí)現的共享隊列實(shí)例。當一個(gè)進(jìn)程將一個(gè)對象放進(jìn)隊列中時(shí),一個(gè)寫(xiě)入線(xiàn)程會(huì )啟動(dòng)并將對象從緩沖區寫(xiě)入管道中。
一旦超時(shí),將拋出標準庫
queue
模塊中常見(jiàn)的異常queue.Empty
和queue.Full
。除了
task_done()
和join()
之外,Queue
實(shí)現了標準庫類(lèi)queue.Queue
中所有的方法。- qsize()?
返回隊列的大致長(cháng)度。由于多線(xiàn)程或者多進(jìn)程的上下文,這個(gè)數字是不可靠的。
Note that this may raise
NotImplementedError
on Unix platforms like macOS wheresem_getvalue()
is not implemented.
- empty()?
如果隊列是空的,返回
True
,反之返回False
。 由于多線(xiàn)程或多進(jìn)程的環(huán)境,該狀態(tài)是不可靠的。
- full()?
如果隊列是滿(mǎn)的,返回
True
,反之返回False
。 由于多線(xiàn)程或多進(jìn)程的環(huán)境,該狀態(tài)是不可靠的。
- put(obj[, block[, timeout]])?
將 obj 放入隊列。如果可選參數 block 是
True
(默認值) 而且 timeout 是None
(默認值), 將會(huì )阻塞當前進(jìn)程,直到有空的緩沖槽。如果 timeout 是正數,將會(huì )在阻塞了最多 timeout 秒之后還是沒(méi)有可用的緩沖槽時(shí)拋出queue.Full
異常。反之 (block 是False
時(shí)),僅當有可用緩沖槽時(shí)才放入對象,否則拋出queue.Full
異常 (在這種情形下 timeout 參數會(huì )被忽略)。在 3.8 版更改: 如果隊列已經(jīng)關(guān)閉,會(huì )拋出
ValueError
而不是AssertionError
。
- put_nowait(obj)?
相當于
put(obj, False)
。
- get([block[, timeout]])?
從隊列中取出并返回對象。如果可選參數 block 是
True
(默認值) 而且 timeout 是None
(默認值), 將會(huì )阻塞當前進(jìn)程,直到隊列中出現可用的對象。如果 timeout 是正數,將會(huì )在阻塞了最多 timeout 秒之后還是沒(méi)有可用的對象時(shí)拋出queue.Empty
異常。反之 (block 是False
時(shí)),僅當有可用對象能夠取出時(shí)返回,否則拋出queue.Empty
異常 (在這種情形下 timeout 參數會(huì )被忽略)。在 3.8 版更改: 如果隊列已經(jīng)關(guān)閉,會(huì )拋出
ValueError
而不是OSError
。
- get_nowait()?
相當于
get(False)
。
multiprocessing.Queue
類(lèi)有一些在queue.Queue
類(lèi)中沒(méi)有出現的方法。這些方法在大多數情形下并不是必須的。- close()?
指示當前進(jìn)程將不會(huì )再往隊列中放入對象。一旦所有緩沖區中的數據被寫(xiě)入管道之后,后臺的線(xiàn)程會(huì )退出。這個(gè)方法在隊列被gc回收時(shí)會(huì )自動(dòng)調用。
- join_thread()?
等待后臺線(xiàn)程。這個(gè)方法僅在調用了
close()
方法之后可用。這會(huì )阻塞當前進(jìn)程,直到后臺線(xiàn)程退出,確保所有緩沖區中的數據都被寫(xiě)入管道中。默認情況下,如果一個(gè)不是隊列創(chuàng )建者的進(jìn)程試圖退出,它會(huì )嘗試等待這個(gè)隊列的后臺線(xiàn)程。這個(gè)進(jìn)程可以使用
cancel_join_thread()
讓join_thread()
方法什么都不做直接跳過(guò)。
- cancel_join_thread()?
防止
join_thread()
方法阻塞當前進(jìn)程。具體而言,這防止進(jìn)程退出時(shí)自動(dòng)等待后臺線(xiàn)程退出。詳見(jiàn)join_thread()
。這個(gè)方法更好的名字可能是
allow_exit_without_flush()
。 這可能會(huì )導致已排入隊列的數據丟失,幾乎可以肯定你將不需要用到這個(gè)方法。 實(shí)際上它僅適用于當你需要當前進(jìn)程立即退出而不必等待將已排入的隊列更新到下層管道,并且你不擔心丟失數據的時(shí)候。
備注
該類(lèi)的功能依賴(lài)于宿主操作系統具有可用的共享信號量實(shí)現。否則該類(lèi)將被禁用,任何試圖實(shí)例化一個(gè)
Queue
對象的操作都會(huì )拋出ImportError
異常,更多信息詳見(jiàn) bpo-3770 。后續說(shuō)明的任何專(zhuān)用隊列對象亦如此。
- class multiprocessing.SimpleQueue?
這是一個(gè)簡(jiǎn)化的
Queue
類(lèi)的實(shí)現,很像帶鎖的Pipe
。- empty()?
如果隊列為空返回
True
,否則返回False
。
- get()?
從隊列中移出并返回一個(gè)對象。
- put(item)?
將 item 放入隊列。
- class multiprocessing.JoinableQueue([maxsize])?
JoinableQueue
類(lèi)是Queue
的子類(lèi),額外添加了task_done()
和join()
方法。- task_done()?
指出之前進(jìn)入隊列的任務(wù)已經(jīng)完成。由隊列的消費者進(jìn)程使用。對于每次調用
get()
獲取的任務(wù),執行完成后調用task_done()
告訴隊列該任務(wù)已經(jīng)處理完成。如果
join()
方法正在阻塞之中,該方法會(huì )在所有對象都被處理完的時(shí)候返回 (即對之前使用put()
放進(jìn)隊列中的所有對象都已經(jīng)返回了對應的task_done()
) 。如果被調用的次數多于放入隊列中的項目數量,將引發(fā)
ValueError
異常 。
- join()?
阻塞至隊列中所有的元素都被接收和處理完畢。
當條目添加到隊列的時(shí)候,未完成任務(wù)的計數就會(huì )增加。每當消費者進(jìn)程調用
task_done()
表示這個(gè)條目已經(jīng)被回收,該條目所有工作已經(jīng)完成,未完成計數就會(huì )減少。當未完成計數降到零的時(shí)候,join()
阻塞被解除。
雜項?
- multiprocessing.active_children()?
返回當前進(jìn)程存活的子進(jìn)程的列表。
調用該方法有“等待”已經(jīng)結束的進(jìn)程的副作用。
- multiprocessing.cpu_count()?
返回系統的CPU數量。
該數量不同于當前進(jìn)程可以使用的CPU數量??捎玫腃PU數量可以由
len(os.sched_getaffinity(0))
方法獲得。When the number of CPUs cannot be determined a
NotImplementedError
is raised.參見(jiàn)
- multiprocessing.parent_process()?
返回父進(jìn)程
Process
對象,和父進(jìn)程調用current_process()
返回的對象一樣。如果一個(gè)進(jìn)程已經(jīng)是主進(jìn)程,parent_process
會(huì )返回None
.3.8 新版功能.
- multiprocessing.freeze_support()?
為使用了
multiprocessing
的程序,提供凍結以產(chǎn)生 Windows 可執行文件的支持。(在 py2exe, PyInstaller 和 cx_Freeze 上測試通過(guò))需要在 main 模塊的
if __name__ == '__main__'
該行之后馬上調用該函數。例如:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
如果沒(méi)有調用
freeze_support()
在嘗試運行被凍結的可執行文件時(shí)會(huì )拋出RuntimeError
異常。對
freeze_support()
的調用在非 Windows 平臺上是無(wú)效的。如果該模塊在 Windows 平臺的 Python 解釋器中正常運行 (該程序沒(méi)有被凍結), 調用``freeze_support()`` 也是無(wú)效的。
- multiprocessing.get_all_start_methods()?
返回支持的啟動(dòng)方法的列表,該列表的首項即為默認選項??赡艿膯?dòng)方法有
'fork'
,'spawn'
和``'forkserver'`` 。在 Windows 中,只有'spawn'
是可用的。 Unix 平臺總是支持'fork'
和'spawn'
,且'fork'
是默認值。3.4 新版功能.
- multiprocessing.get_context(method=None)?
返回一個(gè) Context 對象。該對象具有和
multiprocessing
模塊相同的API。如果 method 設置成
None
那么將返回默認上下文對象。否則 method 應該是'fork'
,'spawn'
,'forkserver'
。 如果指定的啟動(dòng)方法不存在,將拋出ValueError
異常。3.4 新版功能.
- multiprocessing.get_start_method(allow_none=False)?
返回啟動(dòng)進(jìn)程時(shí)使用的啟動(dòng)方法名。
如果啟動(dòng)方法已經(jīng)固定,并且 allow_none 被設置成 False ,那么啟動(dòng)方法將被固定為默認的啟動(dòng)方法,并且返回其方法名。如果啟動(dòng)方法沒(méi)有設定,并且 allow_none 被設置成 True ,那么將返回
None
。The return value can be
'fork'
,'spawn'
,'forkserver'
orNone
.'fork'
is the default on Unix, while'spawn'
is the default on Windows and macOS.
在 3.8 版更改: 對于 macOS,spawn 啟動(dòng)方式是默認方式。 因為 fork 可能導致subprocess崩潰,被認為是不安全的,查看 bpo-33725 。
3.4 新版功能.
- multiprocessing.set_executable(executable)?
Set the path of the Python interpreter to use when starting a child process. (By default
sys.executable
is used). Embedders will probably need to do some thing likeset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
以使他們可以創(chuàng )建子進(jìn)程。
在 3.4 版更改: 現在在 Unix 平臺上使用
'spawn'
啟動(dòng)方法時(shí)支持調用該方法。在 3.11 版更改: Accepts a path-like object.
- multiprocessing.set_start_method(method)?
設置啟動(dòng)子進(jìn)程的方法。 method 可以是
'fork'
,'spawn'
或者'forkserver'
。注意這最多只能調用一次,并且需要藏在 main 模塊中,由
if __name__ == '__main__'
保護著(zhù)。3.4 新版功能.
備注
multiprocessing
并沒(méi)有包含類(lèi)似 threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
, 或者 threading.local
的方法和類(lèi)。
連接對象(Connection)?
Connection 對象允許收發(fā)可以序列化的對象或字符串。它們可以看作面向消息的連接套接字。
通常使用 Pipe
創(chuàng )建 Connection 對象。詳見(jiàn) : 監聽(tīng)器及客戶(hù)端.
- class multiprocessing.connection.Connection?
- send(obj)?
將一個(gè)對象發(fā)送到連接的另一端,可以用
recv()
讀取。發(fā)送的對象必須是可以序列化的,過(guò)大的對象 ( 接近 32MiB+ ,這個(gè)值取決于操作系統 ) 有可能引發(fā)
ValueError
異常。
- recv()?
返回一個(gè)由另一端使用
send()
發(fā)送的對象。該方法會(huì )一直阻塞直到接收到對象。 如果對端關(guān)閉了連接或者沒(méi)有東西可接收,將拋出EOFError
異常。
- fileno()?
返回由連接對象使用的描述符或者句柄。
- close()?
關(guān)閉連接對象。
當連接對象被垃圾回收時(shí)會(huì )自動(dòng)調用。
- poll([timeout])?
返回連接對象中是否有可以讀取的數據。
如果未指定 timeout ,此方法會(huì )馬上返回。如果 timeout 是一個(gè)數字,則指定了最大阻塞的秒數。如果 timeout 是
None
,那么將一直等待,不會(huì )超時(shí)。注意通過(guò)使用
multiprocessing.connection.wait()
可以一次輪詢(xún)多個(gè)連接對象。
- send_bytes(buffer[, offset[, size]])?
從一個(gè) bytes-like object 對象中取出字節數組并作為一條完整消息發(fā)送。
如果由 offset 給定了在 buffer 中讀取數據的位置。 如果給定了 size ,那么將會(huì )從緩沖區中讀取多個(gè)字節。 過(guò)大的緩沖區 ( 接近 32MiB+ ,此值依賴(lài)于操作系統 ) 有可能引發(fā)
ValueError
異常。
- recv_bytes([maxlength])?
以字符串形式返回一條從連接對象另一端發(fā)送過(guò)來(lái)的字節數據。此方法在接收到數據前將一直阻塞。 如果連接對象被對端關(guān)閉或者沒(méi)有數據可讀取,將拋出
EOFError
異常。如果給定了 maxlength 并且消息長(cháng)于 maxlength 那么將拋出
OSError
并且該連接對象將不再可讀。
- recv_bytes_into(buffer[, offset])?
將一條完整的字節數據消息讀入 buffer 中并返回消息的字節數。 此方法在接收到數據前將一直阻塞。 如果連接對象被對端關(guān)閉或者沒(méi)有數據可讀取,將拋出
EOFError
異常。buffer must be a writable bytes-like object. If offset is given then the message will be written into the buffer from that position. Offset must be a non-negative integer less than the length of buffer (in bytes).
如果緩沖區太小,則將引發(fā)
BufferTooShort
異常,并且完整的消息將會(huì )存放在異常實(shí)例e
的e.args[0]
中。
在 3.3 版更改: 現在連接對象自身可以通過(guò)
Connection.send()
和Connection.recv()
在進(jìn)程之間傳遞。3.3 新版功能: 連接對象現已支持上下文管理協(xié)議 -- 參見(jiàn) see 上下文管理器類(lèi)型 。
__enter__()
返回連接對象,__exit__()
會(huì )調用close()
。
例如:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv()
方法會(huì )自動(dòng)解封它收到的數據,除非你能夠信任發(fā)送消息的進(jìn)程,否則此處可能有安全風(fēng)險。
因此, 除非連接對象是由 Pipe()
產(chǎn)生的,否則你應該僅在使用了某種認證手段之后才使用 recv()
和 send()
方法。 參考 認證密碼。
警告
如果一個(gè)進(jìn)程在試圖讀寫(xiě)管道時(shí)被終止了,那么管道中的數據很可能是不完整的,因為此時(shí)可能無(wú)法確定消息的邊界。
同步原語(yǔ)?
通常來(lái)說(shuō)同步原語(yǔ)在多進(jìn)程環(huán)境中并不像它們在多線(xiàn)程環(huán)境中那么必要。參考 threading
模塊的文檔。
注意可以使用管理器對象創(chuàng )建同步原語(yǔ),參考 管理器 。
- class multiprocessing.Barrier(parties[, action[, timeout]])?
類(lèi)似
threading.Barrier
的柵欄對象。3.3 新版功能.
- class multiprocessing.BoundedSemaphore([value])?
非常類(lèi)似
threading.BoundedSemaphore
的有界信號量對象。一個(gè)小小的不同在于,它的
acquire
方法的第一個(gè)參數名是和Lock.acquire()
一樣的 block 。備注
On macOS, this is indistinguishable from
Semaphore
becausesem_getvalue()
is not implemented on that platform.
- class multiprocessing.Condition([lock])?
條件變量:
threading.Condition
的別名。指定的 lock 參數應該是
multiprocessing
模塊中的Lock
或者RLock
對象。在 3.3 版更改: 新增了
wait_for()
方法。
- class multiprocessing.Event?
A clone of
threading.Event
.
- class multiprocessing.Lock?
原始鎖(非遞歸鎖)對象,類(lèi)似于
threading.Lock
。一旦一個(gè)進(jìn)程或者線(xiàn)程拿到了鎖,后續的任何其他進(jìn)程或線(xiàn)程的其他請求都會(huì )被阻塞直到鎖被釋放。任何進(jìn)程或線(xiàn)程都可以釋放鎖。除非另有說(shuō)明,否則multiprocessing.Lock
用于進(jìn)程或者線(xiàn)程的概念和行為都和threading.Lock
一致。注意
Lock
實(shí)際上是一個(gè)工廠(chǎng)函數。它返回由默認上下文初始化的multiprocessing.synchronize.Lock
對象。Lock
supports the context manager protocol and thus may be used inwith
statements.- acquire(block=True, timeout=None)?
可以阻塞或非阻塞地獲得鎖。
如果 block 參數被設為
True
( 默認值 ) , 對該方法的調用在鎖處于釋放狀態(tài)之前都會(huì )阻塞,然后將鎖設置為鎖住狀態(tài)并返回True
。需要注意的是第一個(gè)參數名與threading.Lock.acquire()
的不同。如果 block 參數被設置成
False
,方法的調用將不會(huì )阻塞。 如果鎖當前處于鎖住狀態(tài),將返回False
; 否則將鎖設置成鎖住狀態(tài),并返回True
。當 timeout 是一個(gè)正浮點(diǎn)數時(shí),會(huì )在等待鎖的過(guò)程中最多阻塞等待 timeout 秒,當 timeout 是負數時(shí),效果和 timeout 為0時(shí)一樣,當 timeout 是
None
(默認值)時(shí),等待時(shí)間是無(wú)限長(cháng)。需要注意的是,對于 timeout 參數是負數和None
的情況, 其行為與threading.Lock.acquire()
是不一樣的。當 block 參數 為False
時(shí), timeout 并沒(méi)有實(shí)際用處,會(huì )直接忽略。否則,函數會(huì )在拿到鎖后返回True
或者 超時(shí)沒(méi)拿到鎖后返回False
。
- release()?
釋放鎖,可以在任何進(jìn)程、線(xiàn)程使用,并不限于鎖的擁有者。
當嘗試釋放一個(gè)沒(méi)有被持有的鎖時(shí),會(huì )拋出
ValueError
異常,除此之外其行為與threading.Lock.release()
一樣。
- class multiprocessing.RLock?
遞歸鎖對象: 類(lèi)似于
threading.RLock
。遞歸鎖必須由持有線(xiàn)程、進(jìn)程親自釋放。如果某個(gè)進(jìn)程或者線(xiàn)程拿到了遞歸鎖,這個(gè)進(jìn)程或者線(xiàn)程可以再次拿到這個(gè)鎖而不需要等待。但是這個(gè)進(jìn)程或者線(xiàn)程的拿鎖操作和釋放鎖操作的次數必須相同。注意
RLock
是一個(gè)工廠(chǎng)函數,調用后返回一個(gè)使用默認 context 初始化的multiprocessing.synchronize.RLock
實(shí)例。RLock
支持 context manager 協(xié)議,因此可在with
語(yǔ)句內使用。- acquire(block=True, timeout=None)?
可以阻塞或非阻塞地獲得鎖。
當 block 參數設置為
True
時(shí),會(huì )一直阻塞直到鎖處于空閑狀態(tài)(沒(méi)有被任何進(jìn)程、線(xiàn)程擁有),除非當前進(jìn)程或線(xiàn)程已經(jīng)擁有了這把鎖。然后當前進(jìn)程/線(xiàn)程會(huì )持有這把鎖(在鎖沒(méi)有其他持有者的情況下),鎖內的遞歸等級加一,并返回True
. 注意, 這個(gè)函數第一個(gè)參數的行為和threading.RLock.acquire()
的實(shí)現有幾個(gè)不同點(diǎn),包括參數名本身。當 block 參數是
False
, 將不會(huì )阻塞,如果此時(shí)鎖被其他進(jìn)程或者線(xiàn)程持有,當前進(jìn)程、線(xiàn)程獲取鎖操作失敗,鎖的遞歸等級也不會(huì )改變,函數返回False
, 如果當前鎖已經(jīng)處于釋放狀態(tài),則當前進(jìn)程、線(xiàn)程則會(huì )拿到鎖,并且鎖內的遞歸等級加一,函數返回True
。timeout 參數的使用方法及行為與
Lock.acquire()
一樣。但是要注意 timeout 的其中一些行為和threading.RLock.acquire()
中實(shí)現的行為是不同的。
- release()?
釋放鎖,使鎖內的遞歸等級減一。如果釋放后鎖內的遞歸等級降低為0,則會(huì )重置鎖的狀態(tài)為釋放狀態(tài)(即沒(méi)有被任何進(jìn)程、線(xiàn)程持有),重置后如果有有其他進(jìn)程和線(xiàn)程在等待這把鎖,他們中的一個(gè)會(huì )獲得這個(gè)鎖而繼續運行。如果釋放后鎖內的遞歸等級還沒(méi)到達0,則這個(gè)鎖仍將保持未釋放狀態(tài)且當前進(jìn)程和線(xiàn)程仍然是持有者。
只有當前進(jìn)程或線(xiàn)程是鎖的持有者時(shí),才允許調用這個(gè)方法。如果當前進(jìn)程或線(xiàn)程不是這個(gè)鎖的擁有者,或者這個(gè)鎖處于已釋放的狀態(tài)(即沒(méi)有任何擁有者),調用這個(gè)方法會(huì )拋出
AssertionError
異常。注意這里拋出的異常類(lèi)型和threading.RLock.release()
中實(shí)現的行為不一樣。
- class multiprocessing.Semaphore([value])?
一種信號量對象: 類(lèi)似于
threading.Semaphore
.一個(gè)小小的不同在于,它的
acquire
方法的第一個(gè)參數名是和Lock.acquire()
一樣的 block 。
備注
On macOS, sem_timedwait
is unsupported, so calling acquire()
with
a timeout will emulate that function's behavior using a sleeping loop.
備注
假如信號 SIGINT 是來(lái)自于 Ctrl-C ,并且主線(xiàn)程被 BoundedSemaphore.acquire()
, Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
或 Condition.wait()
阻塞,則調用會(huì )立即中斷同時(shí)拋出 KeyboardInterrupt
異常。
這和 threading
的行為不同,此模塊中當執行對應的阻塞式調用時(shí),SIGINT 會(huì )被忽略。
備注
這個(gè)包的某些功能依賴(lài)于宿主機系統的共享信號量的實(shí)現,如果系統沒(méi)有這個(gè)特性, multiprocessing.synchronize
會(huì )被禁用,嘗試導入這個(gè)模塊會(huì )引發(fā) ImportError
異常,詳細信息請查看 bpo-3770 。
管理器?
管理器提供了一種創(chuàng )建共享數據的方法,從而可以在不同進(jìn)程中共享,甚至可以通過(guò)網(wǎng)絡(luò )跨機器共享數據。管理器維護一個(gè)用于管理 共享對象 的服務(wù)。其他進(jìn)程可以通過(guò)代理訪(fǎng)問(wèn)這些共享對象。
- multiprocessing.Manager()?
返回一個(gè)已啟動(dòng)的
SyncManager
管理器對象,這個(gè)對象可以用于在不同進(jìn)程中共享數據。返回的管理器對象對應了一個(gè)已經(jīng)啟動(dòng)的子進(jìn)程,并且擁有一系列方法可以用于創(chuàng )建共享對象、返回對應的代理。
當管理器被垃圾回收或者父進(jìn)程退出時(shí),管理器進(jìn)程會(huì )立即退出。管理器類(lèi)定義在 multiprocessing.managers
模塊:
- class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)?
創(chuàng )建一個(gè) BaseManager 對象。
一旦創(chuàng )建,應該及時(shí)調用
start()
或者get_server().serve_forever()
以確保管理器對象對應的管理進(jìn)程已經(jīng)啟動(dòng)。address 是管理器服務(wù)進(jìn)程監聽(tīng)的地址。如果 address 是
None
,則允許和任意主機的請求建立連接。authkey 是認證標識,用于檢查連接服務(wù)進(jìn)程的請求合法性。如果 authkey 是
None
, 則會(huì )使用current_process().authkey
, 否則,就使用 authkey , 需要保證它必須是 byte 類(lèi)型的字符串。serializer must be
'pickle'
(usepickle
serialization) or'xmlrpclib'
(usexmlrpc.client
serialization).ctx is a context object, or
None
(use the current context). See theget_context()
function.shutdown_timeout is a timeout in seconds used to wait until the process used by the manager completes in the
shutdown()
method. If the shutdown times out, the process is terminated. If terminating the process also times out, the process is killed.- start([initializer[, initargs]])?
為管理器開(kāi)啟一個(gè)子進(jìn)程,如果 initializer 不是
None
, 子進(jìn)程在啟動(dòng)時(shí)將會(huì )調用initializer(*initargs)
。
- get_server()?
返回一個(gè)
Server
對象,它是管理器在后臺控制的真實(shí)的服務(wù)。Server
對象擁有serve_forever()
方法。>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
額外擁有一個(gè)address
屬性。
- connect()?
將本地管理器對象連接到一個(gè)遠程管理器進(jìn)程:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])?
一個(gè) classmethod,可以將一個(gè)類(lèi)型或者可調用對象注冊到管理器類(lèi)。
typeid 是一種 "類(lèi)型標識符",用于唯一表示某種共享對象類(lèi)型,必須是一個(gè)字符串。
callable 是一個(gè)用來(lái)為此類(lèi)型標識符創(chuàng )建對象的可調用對象。如果一個(gè)管理器實(shí)例將使用
connect()
方法連接到服務(wù)器,或者 create_method 參數為False
,那么這里可留下None
。proxytype 是
BaseProxy
的子類(lèi),可以根據 typeid 為共享對象創(chuàng )建一個(gè)代理,如果是None
, 則會(huì )自動(dòng)創(chuàng )建一個(gè)代理類(lèi)。exposed 是一個(gè)函數名組成的序列,用來(lái)指明只有這些方法可以使用
BaseProxy._callmethod()
代理。(如果 exposed 是None
, 則會(huì )在proxytype._exposed_
存在的情況下轉而使用它) 當暴露的方法列表沒(méi)有指定的時(shí)候,共享對象的所有 “公共方法” 都會(huì )被代理。(這里的“公共方法”是指所有擁有__call__()
方法并且不是以'_'
開(kāi)頭的屬性)method_to_typeid 是一個(gè)映射,用來(lái)指定那些應該返回代理對象的暴露方法所返回的類(lèi)型。(如果 method_to_typeid 是
None
, 則proxytype._method_to_typeid_
會(huì )在存在的情況下被使用)如果方法名稱(chēng)不在這個(gè)映射中或者映射是None
,則方法返回的對象會(huì )是一個(gè)值拷貝。create_method 指明,是否要創(chuàng )建一個(gè)以 typeid 命名并返回一個(gè)代理對象的方法,這個(gè)函數會(huì )被服務(wù)進(jìn)程用于創(chuàng )建共享對象,默認為
True
。
BaseManager
實(shí)例也有一個(gè)只讀屬性。- address?
管理器所用的地址。
在 3.3 版更改: 管理器對象支持上下文管理協(xié)議 - 查看 上下文管理器類(lèi)型 。
__enter__()
啟動(dòng)服務(wù)進(jìn)程(如果它還沒(méi)有啟動(dòng))并且返回管理器對象,__exit__()
會(huì )調用shutdown()
。在之前的版本中,如果管理器服務(wù)進(jìn)程沒(méi)有啟動(dòng),
__enter__()
不會(huì )負責啟動(dòng)它。
- class multiprocessing.managers.SyncManager?
BaseManager
的子類(lèi),可用于進(jìn)程的同步。這個(gè)類(lèi)型的對象使用multiprocessing.Manager()
創(chuàng )建。它擁有一系列方法,可以為大部分常用數據類(lèi)型創(chuàng )建并返回 代理對象 代理,用于進(jìn)程間同步。甚至包括共享列表和字典。
- Barrier(parties[, action[, timeout]])?
創(chuàng )建一個(gè)共享的
threading.Barrier
對象并返回它的代理。3.3 新版功能.
- BoundedSemaphore([value])?
創(chuàng )建一個(gè)共享的
threading.BoundedSemaphore
對象并返回它的代理。
- Condition([lock])?
創(chuàng )建一個(gè)共享的
threading.Condition
對象并返回它的代理。如果提供了 lock 參數,那它必須是
threading.Lock
或threading.RLock
的代理對象。在 3.3 版更改: 新增了
wait_for()
方法。
- Event()?
創(chuàng )建一個(gè)共享的
threading.Event
對象并返回它的代理。
- Lock()?
創(chuàng )建一個(gè)共享的
threading.Lock
對象并返回它的代理。
- Queue([maxsize])?
創(chuàng )建一個(gè)共享的
queue.Queue
對象并返回它的代理。
- RLock()?
創(chuàng )建一個(gè)共享的
threading.RLock
對象并返回它的代理。
- Semaphore([value])?
創(chuàng )建一個(gè)共享的
threading.Semaphore
對象并返回它的代理。
- Array(typecode, sequence)?
創(chuàng )建一個(gè)數組并返回它的代理。
- Value(typecode, value)?
創(chuàng )建一個(gè)具有可寫(xiě)
value
屬性的對象并返回它的代理。
在 3.6 版更改: 共享對象能夠嵌套。例如, 共享的容器對象如共享列表,可以包含另一個(gè)共享對象,他們全都會(huì )在
SyncManager
中進(jìn)行管理和同步。
- class multiprocessing.managers.Namespace?
一個(gè)可以注冊到
SyncManager
的類(lèi)型。命名空間對象沒(méi)有公共方法,但是擁有可寫(xiě)的屬性。直接print會(huì )顯示所有屬性的值。
值得一提的是,當對命名空間對象使用代理的時(shí)候,訪(fǎng)問(wèn)所有名稱(chēng)以
'_'
開(kāi)頭的屬性都只是代理器上的屬性,而不是命名空間對象的屬性。>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
自定義管理器?
要創(chuàng )建一個(gè)自定義的管理器,需要新建一個(gè) BaseManager
的子類(lèi),然后使用這個(gè)管理器類(lèi)上的 register()
類(lèi)方法將新類(lèi)型或者可調用方法注冊上去。例如:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
使用遠程管理器?
可以將管理器服務(wù)運行在一臺機器上,然后使用客戶(hù)端從其他機器上訪(fǎng)問(wèn)。(假設它們的防火墻允許)
運行下面的代碼可以啟動(dòng)一個(gè)服務(wù),此付包含了一個(gè)共享隊列,允許遠程客戶(hù)端訪(fǎng)問(wèn):
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
遠程客戶(hù)端可以通過(guò)下面的方式訪(fǎng)問(wèn)服務(wù):
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
也可以通過(guò)下面的方式:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地進(jìn)程也可以訪(fǎng)問(wèn)這個(gè)隊列,利用上面的客戶(hù)端代碼通過(guò)遠程方式訪(fǎng)問(wèn):
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
代理對象?
代理是一個(gè) 指向 其他共享對象的對象,這個(gè)對象(很可能)在另外一個(gè)進(jìn)程中。共享對象也可以說(shuō)是代理 指涉 的對象。多個(gè)代理對象可能指向同一個(gè)指涉對象。
代理對象代理了指涉對象的一系列方法調用(雖然并不是指涉對象的每個(gè)方法都有必要被代理)。通過(guò)這種方式,代理的使用方法可以和它的指涉對象一樣:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
注意,對代理使用 str()
函數會(huì )返回指涉對象的字符串表示,但是 repr()
卻會(huì )返回代理本身的內部字符串表示。
被代理的對象很重要的一點(diǎn)是必須可以被序列化,這樣才能允許他們在進(jìn)程間傳遞。因此,指涉對象可以包含 代理對象 。這允許管理器中列表、字典或者其他 代理對象 對象之間的嵌套。
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
類(lèi)似地,字典和列表代理也可以相互嵌套:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
如果指涉對象包含了普通 list
或 dict
對象,對這些內部可變對象的修改不會(huì )通過(guò)管理器傳播,因為代理無(wú)法得知被包含的值什么時(shí)候被修改了。但是把存放在容器代理中的值本身是會(huì )通過(guò)管理器傳播的(會(huì )觸發(fā)代理對象中的 __setitem__
)從而有效修改這些對象,所以可以把修改過(guò)的值重新賦值給容器代理:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
在大多是使用情形下,這種實(shí)現方式并不比嵌套 代理對象 方便,但是依然演示了對于同步的一種控制級別。
備注
multiprocessing
中的代理類(lèi)并沒(méi)有提供任何對于代理值比較的支持。所以,我們會(huì )得到如下結果:
>>> manager.list([1,2,3]) == [1,2,3]
False
當需要比較值的時(shí)候,應該替換為使用指涉對象的拷貝。
- class multiprocessing.managers.BaseProxy?
代理對象是
BaseProxy
派生類(lèi)的實(shí)例。- _callmethod(methodname[, args[, kwds]])?
調用指涉對象的方法并返回結果。
如果
proxy
是一個(gè)代理且其指涉的是obj
, 那么下面的表達式:proxy._callmethod(methodname, args, kwds)
相當于求取以下表達式的值:
getattr(obj, methodname)(*args, **kwds)
于管理器進(jìn)程。
返回結果會(huì )是一個(gè)值拷貝或者一個(gè)新的共享對象的代理 - 見(jiàn)函數
BaseManager.register()
中關(guān)于參數 method_to_typeid 的文檔。如果這個(gè)調用熬出了異常,則這個(gè)異常會(huì )被
_callmethod()
透傳出來(lái)。如果是管理器進(jìn)程本身拋出的一些其他異常,則會(huì )被_callmethod()
轉換為RemoteError
異常重新拋出。特別注意,如果 methodname 沒(méi)有 暴露 出來(lái),將會(huì )引發(fā)一個(gè)異常。
_callmethod()
的一個(gè)使用示例:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()?
返回指涉對象的一份拷貝。
如果指涉對象無(wú)法序列化,則會(huì )拋出一個(gè)異常。
- __repr__()?
返回代理對象的內部字符串表示。
- __str__()?
返回指涉對象的內部字符串表示。
清理?
代理對象使用了一個(gè)弱引用回調函數,當它被垃圾回收時(shí),會(huì )將自己從擁有此指涉對象的管理器上反注冊,
當共享對象沒(méi)有被任何代理器引用時(shí),會(huì )被管理器進(jìn)程刪除。
進(jìn)程池?
可以創(chuàng )建一個(gè)進(jìn)程池,它將使用 Pool
類(lèi)執行提交給它的任務(wù)。
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])?
一個(gè)進(jìn)程池對象,它控制可以提交作業(yè)的工作進(jìn)程池。它支持帶有超時(shí)和回調的異步結果,以及一個(gè)并行的 map 實(shí)現。
processes 是要使用的工作進(jìn)程數目。如果 processes 為
None
,則使用os.cpu_count()
返回的值。如果 initializer 不為
None
,則每個(gè)工作進(jìn)程將會(huì )在啟動(dòng)時(shí)調用initializer(*initargs)
。maxtasksperchild 是一個(gè)工作進(jìn)程在它退出或被一個(gè)新的工作進(jìn)程代替之前能完成的任務(wù)數量,為了釋放未使用的資源。默認的 maxtasksperchild 是
None
,意味著(zhù)工作進(jìn)程壽與池齊。context 可被用于指定啟動(dòng)的工作進(jìn)程的上下文。通常一個(gè)進(jìn)程池是使用函數
multiprocessing.Pool()
或者一個(gè)上下文對象的Pool()
方法創(chuàng )建的。在這兩種情況下, context 都是適當設置的。注意,進(jìn)程池對象的方法只有創(chuàng )建它的進(jìn)程能夠調用。
警告
multiprocessing.pool
對象具有需要正確管理的內部資源 (像任何其他資源一樣),具體方式是將進(jìn)程池用作上下文管理器,或者手動(dòng)調用close()
和terminate()
。 未做此類(lèi)操作將導致進(jìn)程在終結階段掛起。請注意依賴(lài)垃圾回收器來(lái)銷(xiāo)毀進(jìn)程池是 不正確的 做法,因為 CPython 并不保證進(jìn)程池終結器會(huì )被調用(請參閱
object.__del__()
來(lái)了解詳情)。3.2 新版功能: maxtasksperchild
3.4 新版功能: context
備注
通常來(lái)說(shuō),
Pool
中的 Worker 進(jìn)程的生命周期和進(jìn)程池的工作隊列一樣長(cháng)。一些其他系統中(如 Apache, mod_wsgi 等)也可以發(fā)現另一種模式,他們會(huì )讓工作進(jìn)程在完成一些任務(wù)后退出,清理、釋放資源,然后啟動(dòng)一個(gè)新的進(jìn)程代替舊的工作進(jìn)程。Pool
的 maxtasksperchild 參數給用戶(hù)提供了這種能力。- apply(func[, args[, kwds]])?
使用 args 參數以及 kwds 命名參數調用 func , 它會(huì )返回結果前阻塞。這種情況下,
apply_async()
更適合并行化工作。另外 func 只會(huì )在一個(gè)進(jìn)程池中的一個(gè)工作進(jìn)程中執行。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])?
apply()
方法的一個(gè)變種,返回一個(gè)AsyncResult
對象。如果指定了 callback , 它必須是一個(gè)接受單個(gè)參數的可調用對象。當執行成功時(shí), callback 會(huì )被用于處理執行后的返回結果,否則,調用 error_callback 。
如果指定了 error_callback , 它必須是一個(gè)接受單個(gè)參數的可調用對象。當目標函數執行失敗時(shí), 會(huì )將拋出的異常對象作為參數傳遞給 error_callback 執行。
回調函數應該立即執行完成,否則會(huì )阻塞負責處理結果的線(xiàn)程。
- map(func, iterable[, chunksize])?
內置
map()
函數的并行版本 (但它只支持一個(gè) iterable 參數,對于多個(gè)可迭代對象請參閱starmap()
)。 它會(huì )保持阻塞直到獲得結果。這個(gè)方法會(huì )將可迭代對象分割為許多塊,然后提交給進(jìn)程池??梢詫?chunksize 設置為一個(gè)正整數從而(近似)指定每個(gè)塊的大小可以。
注意對于很長(cháng)的迭代對象,可能消耗很多內存??梢钥紤]使用
imap()
或imap_unordered()
并且顯示指定 chunksize 以提升效率。
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])?
map()
方法的一個(gè)變種,返回一個(gè)AsyncResult
對象。如果指定了 callback , 它必須是一個(gè)接受單個(gè)參數的可調用對象。當執行成功時(shí), callback 會(huì )被用于處理執行后的返回結果,否則,調用 error_callback 。
如果指定了 error_callback , 它必須是一個(gè)接受單個(gè)參數的可調用對象。當目標函數執行失敗時(shí), 會(huì )將拋出的異常對象作為參數傳遞給 error_callback 執行。
回調函數應該立即執行完成,否則會(huì )阻塞負責處理結果的線(xiàn)程。
- imap(func, iterable[, chunksize])?
map()
的延遲執行版本。chunksize 參數的作用和
map()
方法的一樣。對于很長(cháng)的迭代器,給 chunksize 設置一個(gè)很大的值會(huì )比默認值1
極大 地加快執行速度。同樣,如果 chunksize 是
1
, 那么imap()
方法所返回的迭代器的next()
方法擁有一個(gè)可選的 timeout 參數: 如果無(wú)法在 timeout 秒內執行得到結果,則``next(timeout)`` 會(huì )拋出multiprocessing.TimeoutError
異常。
- imap_unordered(func, iterable[, chunksize])?
和
imap()
相同,只不過(guò)通過(guò)迭代器返回的結果是任意的。(當進(jìn)程池中只有一個(gè)工作進(jìn)程的時(shí)候,返回結果的順序才能認為是"有序"的)
- starmap(func, iterable[, chunksize])?
Like
map()
except that the elements of the iterable are expected to be iterables that are unpacked as arguments.比如可迭代對象
[(1,2), (3, 4)]
會(huì )轉化為等價(jià)于[func(1,2), func(3,4)]
的調用。3.3 新版功能.
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])?
相當于
starmap()
與map_async()
的結合,迭代 iterable 的每一項,解包作為 func 的參數并執行,返回用于獲取結果的對象。3.3 新版功能.
- close()?
阻止后續任務(wù)提交到進(jìn)程池,當所有任務(wù)執行完成后,工作進(jìn)程會(huì )退出。
- terminate()?
不必等待未完成的任務(wù),立即停止工作進(jìn)程。當進(jìn)程池對象被垃圾回收時(shí),會(huì )立即調用
terminate()
。
- join()?
等待工作進(jìn)程結束。調用
join()
前必須先調用close()
或者terminate()
。
3.3 新版功能: 進(jìn)程池對象現在支持上下文管理器協(xié)議 - 參見(jiàn) 上下文管理器類(lèi)型 。
__enter__()
返回進(jìn)程池對象,__exit__()
會(huì )調用terminate()
。
- class multiprocessing.pool.AsyncResult?
Pool.apply_async()
和Pool.map_async()
返回對象所屬的類(lèi)。- get([timeout])?
用于獲取執行結果。如果 timeout 不是
None
并且在 timeout 秒內仍然沒(méi)有執行完得到結果,則拋出multiprocessing.TimeoutError
異常。如果遠程調用發(fā)生異常,這個(gè)異常會(huì )通過(guò)get()
重新拋出。
- wait([timeout])?
阻塞,直到返回結果,或者 timeout 秒后超時(shí)。
- ready()?
返回執行狀態(tài),是否已經(jīng)完成。
- successful()?
判斷調用是否已經(jīng)完成并且未引發(fā)異常。 如果還未獲得結果則將引發(fā)
ValueError
。在 3.7 版更改: 如果沒(méi)有執行完,會(huì )拋出
ValueError
異常而不是AssertionError
。
下面的例子演示了進(jìn)程池的用法:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
監聽(tīng)器及客戶(hù)端?
通常情況下,進(jìn)程間通過(guò)隊列或者 Pipe()
返回的 Connection
傳遞消息。
不過(guò),multiprocessing.connection
模塊其實(shí)提供了一些更靈活的特性。最基礎的用法是通過(guò)它抽象出來(lái)的高級API來(lái)操作socket或者Windows命名管道。也提供一些高級用法,如通過(guò) hmac
模塊來(lái)支持 摘要認證,以及同時(shí)監聽(tīng)多個(gè)管道連接。
- multiprocessing.connection.deliver_challenge(connection, authkey)?
發(fā)送一個(gè)隨機生成的消息到另一端,并等待回復。
如果收到的回復與使用 authkey 作為鍵生成的信息摘要匹配成功,就會(huì )發(fā)送一個(gè)歡迎信息給管道另一端。否則拋出
AuthenticationError
異常。
- multiprocessing.connection.answer_challenge(connection, authkey)?
接收一條信息,使用 authkey 作為鍵計算信息摘要,然后將摘要發(fā)送回去。
如果沒(méi)有收到歡迎消息,就拋出
AuthenticationError
異常。
- multiprocessing.connection.Client(address[, family[, authkey]])?
嘗試使用 address 地址上的監聽(tīng)器建立一個(gè)連接,返回
Connection
。連接的類(lèi)型取決于 family 參數,但是通??梢允÷?,因為可以通過(guò) address 的格式推導出來(lái)。(查看 地址格式 )
如果提供了 authkey 參數并且不是 None,那它必須是一個(gè)字符串并且會(huì )被當做基于 HMAC 認證的密鑰。如果 authkey 是None 則不會(huì )有認證行為。認證失敗拋出
AuthenticationError
異常,請查看 See 認證密碼 。
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])?
可以監聽(tīng)連接請求,是對于綁定套接字或者 Windows 命名管道的封裝。
address 是監聽(tīng)器對象中的綁定套接字或命名管道使用的地址。
備注
如果使用 '0.0.0.0' 作為監聽(tīng)地址,那么在Windows上這個(gè)地址無(wú)法建立連接。想要建立一個(gè)可連接的端點(diǎn),應該使用 '127.0.0.1' 。
family 是套接字(或者命名管道)使用的類(lèi)型。它可以是以下一種:
'AF_INET'
( TCP 套接字類(lèi)型),'AF_UNIX'
( Unix 域套接字) 或者'AF_PIPE'
( Windows 命名管道)。其中只有第一個(gè)保證各平臺可用。如果 family 是None
,那么 family 會(huì )根據 address 的格式自動(dòng)推導出來(lái)。如果 address 也是None
, 則取默認值。默認值為可用類(lèi)型中速度最快的。見(jiàn) 地址格式 。注意,如果 family 是'AF_UNIX'
而address是``None`` ,套接字會(huì )在一個(gè)tempfile.mkstemp()
創(chuàng )建的私有臨時(shí)目錄中創(chuàng )建。如果監聽(tīng)器對象使用了套接字,backlog (默認值為1) 會(huì )在套接字綁定后傳遞給它的
listen()
方法。如果提供了 authkey 參數并且不是 None,那它必須是一個(gè)字符串并且會(huì )被當做基于 HMAC 認證的密鑰。如果 authkey 是None 則不會(huì )有認證行為。認證失敗拋出
AuthenticationError
異常,請查看 See 認證密碼 。- accept()?
接受一個(gè)連接并返回一個(gè)
Connection
對象,其連接到的監聽(tīng)器對象已綁定套接字或者命名管道。如果已經(jīng)嘗試過(guò)認證并且失敗了,則會(huì )拋出AuthenticationError
異常。
- close()?
關(guān)閉監聽(tīng)器對象上的綁定套接字或者命名管道。此函數會(huì )在監聽(tīng)器被垃圾回收后自動(dòng)調用。不過(guò)仍然建議顯式調用函數關(guān)閉。
監聽(tīng)器對象擁有下列只讀屬性:
- address?
監聽(tīng)器對象使用的地址。
- last_accepted?
最后一個(gè)連接所使用的地址。如果沒(méi)有的話(huà)就是
None
。
3.3 新版功能: 監聽(tīng)器對象現在支持了上下文管理協(xié)議 - 見(jiàn) 上下文管理器類(lèi)型 。
__enter__()
返回一個(gè)監聽(tīng)器對象,__exit__()
會(huì )調用close()
。
- multiprocessing.connection.wait(object_list, timeout=None)?
一直等待直到 object_list 中某個(gè)對象處于就緒狀態(tài)。返回 object_list 中處于就緒狀態(tài)的對象。如果 timeout 是一個(gè)浮點(diǎn)型,該方法會(huì )最多阻塞這么多秒。如果 timeout 是
None
,則會(huì )允許阻塞的事件沒(méi)有限制。timeout為負數的情況下和為0的情況相同。對于 Unix 和 Windows ,滿(mǎn)足下列條件的對象可以出現在 object_list 中
可讀的
Connection
對象;一個(gè)已連接并且可讀的
socket.socket
對象;或者
當一個(gè)連接或者套接字對象擁有有效的數據可被讀取的時(shí)候,或者另一端關(guān)閉后,這個(gè)對象就處于就緒狀態(tài)。
Unix:
wait(object_list, timeout)
和select.select(object_list, [], [], timeout)
幾乎相同。差別在于,如果select.select()
被信號中斷,它會(huì )拋出一個(gè)附帶錯誤號為EINTR
的OSError
異常,而wait()
不會(huì )。Windows: object_list 中的元素必須是一個(gè)表示為整數的可等待的句柄(按照 Win32 函數
WaitForMultipleObjects()
的文檔中所定義) 或者一個(gè)擁有fileno()
方法的對象,這個(gè)對象返回一個(gè)套接字句柄或者管道句柄。(注意管道和套接字兩種句柄 不是 可等待的句柄)3.3 新版功能.
示例
下面的服務(wù)代碼創(chuàng )建了一個(gè)使用 'secret password'
作為認證密碼的監聽(tīng)器。它會(huì )等待連接然后發(fā)送一些數據給客戶(hù)端:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
下面的代碼連接到服務(wù)然后從服務(wù)器上j接收一些數據:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
下面的代碼使用了 wait()
,以便在同時(shí)等待多個(gè)進(jìn)程發(fā)來(lái)消息。
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
地址格式?
'AF_INET'
地址是(hostname, port)
形式的元組類(lèi)型,其中 hostname 是一個(gè)字符串,port 是整數。'AF_UNIX'
地址是文件系統上文件名的字符串。'AF_PIPE'
地址是一個(gè)r'\.\pipe{PipeName}'
形式的字符串。 要使用Client()
來(lái)連接到遠程計算機上一個(gè)名為 ServerName 的命名管道,則應當改用r'\ServerName\pipe{PipeName}'
形式的地址。
注意,使用兩個(gè)反斜線(xiàn)開(kāi)頭的字符串默認被當做 'AF_PIPE'
地址而不是 'AF_UNIX'
。
認證密碼?
當使用 Connection.recv
接收數據時(shí),數據會(huì )自動(dòng)被反序列化。不幸的是,對于一個(gè)不可信的數據源發(fā)來(lái)的數據,反序列化是存在安全風(fēng)險的。所以 Listener
和 Client()
之間使用 hmac
模塊進(jìn)行摘要認證。
認證密鑰是一個(gè) byte 類(lèi)型的字符串,可以認為是和密碼一樣的東西,連接建立好后,雙方都會(huì )要求另一方證明知道認證密鑰。(這個(gè)證明過(guò)程不會(huì )通過(guò)連接發(fā)送密鑰)
如果要求認證但是沒(méi)有指定認證密鑰,則會(huì )使用 current_process().authkey
的返回值 (參見(jiàn) Process
)。 這個(gè)值將被當前進(jìn)程所創(chuàng )建的任何 Process
對象自動(dòng)繼承。 這意味著(zhù) (在默認情況下) 一個(gè)包含多進(jìn)程的程序中的所有進(jìn)程會(huì )在相互間建立連接的時(shí)候共享單個(gè)認證密鑰。
os.urandom()
也可以用來(lái)生成合適的認證密鑰。
日志記錄?
當前模塊也提供了一些對 logging 的支持。注意, logging
模塊本身并沒(méi)有使用進(jìn)程間共享的鎖,所以來(lái)自于多個(gè)進(jìn)程的日志可能(具體取決于使用的日志 handler 類(lèi)型)相互覆蓋或者混雜。
- multiprocessing.get_logger()?
返回
multiprocessing
使用的 logger,必要的話(huà)會(huì )創(chuàng )建一個(gè)新的。如果創(chuàng )建的首個(gè) logger 日志級別為
logging.NOTSET
并且沒(méi)有默認 handler。通過(guò)這個(gè) logger 打印的消息不會(huì )傳遞到根 logger。注意在 Windows 上,子進(jìn)程只會(huì )繼承父進(jìn)程 logger 的日志級別 - 對于logger的其他自定義項不會(huì )繼承。
- multiprocessing.log_to_stderr(level=None)?
This function performs a call to
get_logger()
but in addition to returning the logger created by get_logger, it adds a handler which sends output tosys.stderr
using format'[%(levelname)s/%(processName)s] %(message)s'
. You can modifylevelname
of the logger by passing alevel
argument.
下面是一個(gè)在交互式解釋器中打開(kāi)日志功能的例子:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
要查看日志等級的完整列表,見(jiàn) logging
模塊。
multiprocessing.dummy
模塊?
multiprocessing.dummy
復制了 multiprocessing
的 API,不過(guò)是在 threading
模塊之上包裝了一層。
特別地,multiprocessing.dummy
所提供的 Pool
函數會(huì )返回一個(gè) ThreadPool
的實(shí)例,該類(lèi)是 Pool
的子類(lèi),它支持所有相同的方法調用但會(huì )使用一個(gè)工作線(xiàn)程池而非工作進(jìn)程池。
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])?
一個(gè)線(xiàn)程池對象,用來(lái)控制可向其提交任務(wù)的工作線(xiàn)程池。
ThreadPool
實(shí)例與Pool
實(shí)例是完全接口兼容的,并且它們的資源也必須被正確地管理,或者是將線(xiàn)程池作為上下文管理器來(lái)使用,或者是通過(guò)手動(dòng)調用close()
和terminate()
。processes 是要使用的工作線(xiàn)程數目。 如果 processes 為
None
,則使用os.cpu_count()
返回的值。如果 initializer 不為
None
,則每個(gè)工作進(jìn)程將會(huì )在啟動(dòng)時(shí)調用initializer(*initargs)
。不同于
Pool
,maxtasksperchild 和 context 不可被提供。備注
ThreadPool
具有與Pool
相同的接口,它圍繞一個(gè)進(jìn)程池進(jìn)行設計并且先于concurrent.futures
模塊的引入。 因此,它繼承了一些對于基于線(xiàn)程的池來(lái)說(shuō)沒(méi)有意義的操作,并且它具有自己的用于表示異步任務(wù)狀態(tài)的類(lèi)型AsyncResult
,該類(lèi)型不為任何其他庫所知。用戶(hù)通常應該傾向于使用
concurrent.futures.ThreadPoolExecutor
,它擁有從一開(kāi)始就圍繞線(xiàn)程進(jìn)行設計的更簡(jiǎn)單接口,并且返回與許多其他庫相兼容的concurrent.futures.Future
實(shí)例,包括asyncio
庫。
編程指導?
使用 multiprocessing
時(shí),應遵循一些指導原則和習慣用法。
所有start方法?
下面這些適用于所有start方法。
避免共享狀態(tài)
應該盡可能避免在進(jìn)程間傳遞大量數據,越少越好。
最好堅持使用隊列或者管道進(jìn)行進(jìn)程間通信,而不是底層的同步原語(yǔ)。
可序列化
保證所代理的方法的參數是可以序列化的。
代理的線(xiàn)程安全性
不要在多線(xiàn)程中同時(shí)使用一個(gè)代理對象,除非你用鎖保護它。
(而在不同進(jìn)程中使用 相同 的代理對象卻沒(méi)有問(wèn)題。)
使用 Join 避免僵尸進(jìn)程
在 Unix 上,如果一個(gè)進(jìn)程執行完成但是沒(méi)有被 join,就會(huì )變成僵尸進(jìn)程。一般來(lái)說(shuō),僵尸進(jìn)程不會(huì )很多,因為每次新啟動(dòng)進(jìn)程(或者
active_children()
被調用)時(shí),所有已執行完成且沒(méi)有被 join 的進(jìn)程都會(huì )自動(dòng)被 join,而且對一個(gè)執行完的進(jìn)程調用Process.is_alive
也會(huì ) join 這個(gè)進(jìn)程。盡管如此,對自己?jiǎn)?dòng)的進(jìn)程顯式調用 join 依然是最佳實(shí)踐。
繼承優(yōu)于序列化、反序列化
當使用 spawn 或者 forkserver 的啟動(dòng)方式時(shí),
multiprocessing
中的許多類(lèi)型都必須是可序列化的,這樣子進(jìn)程才能使用它們。但是通常我們都應該避免使用管道和隊列發(fā)送共享對象到另外一個(gè)進(jìn)程,而是重新組織代碼,對于其他進(jìn)程創(chuàng )建出來(lái)的共享對象,讓那些需要訪(fǎng)問(wèn)這些對象的子進(jìn)程可以直接將這些對象從父進(jìn)程繼承過(guò)來(lái)。
避免殺死進(jìn)程
聽(tīng)過(guò)
Process.terminate
停止一個(gè)進(jìn)程很容易導致這個(gè)進(jìn)程正在使用的共享資源(如鎖、信號量、管道和隊列)損壞或者變得不可用,無(wú)法在其他進(jìn)程中繼續使用。所以,最好只對那些從來(lái)不使用共享資源的進(jìn)程調用
Process.terminate
。
Join 使用隊列的進(jìn)程
記住,往隊列放入數據的進(jìn)程會(huì )一直等待直到隊列中所有項被"feeder" 線(xiàn)程傳給底層管道。(子進(jìn)程可以調用隊列的
Queue.cancel_join_thread
方法禁止這種行為)這意味著(zhù),任何使用隊列的時(shí)候,你都要確保在進(jìn)程join之前,所有存放到隊列中的項將會(huì )被其他進(jìn)程、線(xiàn)程完全消費。否則不能保證這個(gè)寫(xiě)過(guò)隊列的進(jìn)程可以正常終止。記住非精靈進(jìn)程會(huì )自動(dòng) join 。
下面是一個(gè)會(huì )導致死鎖的例子:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()交換最后兩行可以修復這個(gè)問(wèn)題(或者直接刪掉
p.join()
)。
顯式傳遞資源給子進(jìn)程
在Unix上,使用 fork 方式啟動(dòng)的子進(jìn)程可以使用父進(jìn)程中全局創(chuàng )建的共享資源。不過(guò),最好是顯式將資源對象通過(guò)參數的形式傳遞給子進(jìn)程。
除了(部分原因)讓代碼兼容 Windows 以及其他的進(jìn)程啟動(dòng)方式外,這種形式還保證了在子進(jìn)程生命期這個(gè)對象是不會(huì )被父進(jìn)程垃圾回收的。如果父進(jìn)程中的某些對象被垃圾回收會(huì )導致資源釋放,這就變得很重要。
所以對于實(shí)例:
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()應當重寫(xiě)成這樣:
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
謹防將 sys.stdin
數據替換為 “類(lèi)似文件的對象”
multiprocessing
原本會(huì )無(wú)條件地這樣調用:os.close(sys.stdin.fileno())在
multiprocessing.Process._bootstrap()
方法中 —— 這會(huì )導致與"進(jìn)程中的進(jìn)程"相關(guān)的一些問(wèn)題。這已經(jīng)被修改成了:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)它解決了進(jìn)程相互沖突導致文件描述符錯誤的根本問(wèn)題,但是對使用帶緩沖的“文件類(lèi)對象”替換
sys.stdin()
作為輸出的應用程序造成了潛在的危險。如果多個(gè)進(jìn)程調用了此文件類(lèi)對象的close()
方法,會(huì )導致相同的數據多次刷寫(xiě)到此對象,損壞數據。如果你寫(xiě)入文件類(lèi)對象并實(shí)現了自己的緩存,可以在每次追加緩存數據時(shí)記錄當前進(jìn)程id,從而將其變成 fork 安全的,當發(fā)現進(jìn)程id變化后舍棄之前的緩存,例如:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
spawn 和 forkserver 啟動(dòng)方式?
相對于 fork 啟動(dòng)方式,有一些額外的限制。
更依賴(lài)序列化
Process.__init__()
的所有參數都必須可序列化。同樣的,當你繼承Process
時(shí),需要保證當調用Process.start
方法時(shí),實(shí)例可以被序列化。
全局變量
記住,如果子進(jìn)程中的代碼嘗試訪(fǎng)問(wèn)一個(gè)全局變量,它所看到的值(如果有)可能和父進(jìn)程中執行
Process.start
那一刻的值不一樣。當全局變量知識模塊級別的常量時(shí),是不會(huì )有問(wèn)題的。
安全導入主模塊
確保主模塊可以被新啟動(dòng)的Python解釋器安全導入而不會(huì )引發(fā)什么副作用(比如又啟動(dòng)了一個(gè)子進(jìn)程)
例如,使用 spawn 或 forkserver 啟動(dòng)方式執行下面的模塊,會(huì )引發(fā)
RuntimeError
異常而失敗。from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()應該通過(guò)下面的方法使用
if __name__ == '__main__':
,從而保護程序"入口點(diǎn)":from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(如果程序將正常運行而不是凍結,則可以省略
freeze_support()
行)這允許新啟動(dòng)的 Python 解釋器安全導入模塊然后運行模塊中的
foo()
函數。如果主模塊中創(chuàng )建了進(jìn)程池或者管理器,這個(gè)規則也適用。
例子?
創(chuàng )建和使用自定義管理器、代理的示例:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
使用 Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
一個(gè)演示如何使用隊列來(lái)向一組工作進(jìn)程提供任務(wù)并收集結果的例子:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()