multiprocessing.shared_memory
--- 可從進(jìn)程直接訪(fǎng)問(wèn)的共享內存?
源代碼: Lib/multiprocessing/shared_memory.py
3.8 新版功能.
該模塊提供了一個(gè) SharedMemory
類(lèi),用于分配和管理多核或對稱(chēng)多處理器(SMP)機器上進(jìn)程間的共享內存。為了協(xié)助管理不同進(jìn)程間的共享內存生命周期,multiprocessing.managers
模塊也提供了一個(gè) BaseManager
的子類(lèi): SharedMemoryManager
。
本模塊中,共享內存是指 "System V 類(lèi)型" 的共享內存塊(雖然可能和它實(shí)現方式不完全一致)而不是 “分布式共享內存”。這種類(lèi)型的的共享內存允許不同進(jìn)程讀寫(xiě)一片公共(或者共享)的易失性存儲區域。一般來(lái)說(shuō),進(jìn)程被限制只能訪(fǎng)問(wèn)屬于自己進(jìn)程空間的內存,但是共享內存允許跨進(jìn)程共享數據,從而避免通過(guò)進(jìn)程間發(fā)送消息的形式傳遞數據。相比通過(guò)磁盤(pán)、套接字或者其他要求序列化、反序列化和復制數據的共享形式,直接通過(guò)內存共享數據擁有更出色性能。
- class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)?
創(chuàng )建一個(gè)新的共享內存塊或者連接到一片已經(jīng)存在的共享內存塊。每個(gè)共享內存塊都被指定了一個(gè)全局唯一的名稱(chēng)。通過(guò)這種方式,進(jìn)程可以使用一個(gè)特定的名字創(chuàng )建共享內存區塊,然后其他進(jìn)程使用同樣的名字連接到這個(gè)共享內存塊。
作為一種跨進(jìn)程共享數據的方式,共享內存塊的壽命可能超過(guò)創(chuàng )建它的原始進(jìn)程。一個(gè)共享內存塊可能同時(shí)被多個(gè)進(jìn)程使用,當一個(gè)進(jìn)程不再需要訪(fǎng)問(wèn)這個(gè)共享內存塊的時(shí)候,應該調用
close()
方法。當一個(gè)共享內存塊不被任何進(jìn)程使用的時(shí)候,應該調用unlink()
方法以執行必要的清理。name 是共享內存的唯一名稱(chēng),字符串類(lèi)型。如果創(chuàng )建一個(gè)新共享內存塊的時(shí)候,名稱(chēng)指定為
None
(默認值),將會(huì )隨機產(chǎn)生一個(gè)新名稱(chēng)。create 指定創(chuàng )建一個(gè)新的共享內存塊 (
True
) 還是連接到已存在的共享內存塊 (False
) 。如果是新創(chuàng )建共享內存塊則 size 用于指定塊的大小為多少字節。由于某些平臺是以?xún)却骓?yè)大小為最小單位來(lái)分配內存的,最終得到的內存塊大小可能大于或等于要求的大小。如果是連接到已經(jīng)存在的共享內存塊,
size
參數會(huì )被忽略。- close()?
關(guān)閉實(shí)例對于共享內存的訪(fǎng)問(wèn)連接。所有實(shí)例確認自己不再需要使用共享內存的時(shí)候都應該調用
close()
,以保證必要的資源清理。調用close()
并不會(huì )銷(xiāo)毀共享內存區域。
- unlink()?
請求銷(xiāo)毀底層的共享內存塊。為了執行必要的資源清理, 在所有使用這個(gè)共享內存塊的進(jìn)程中,
unlink()
應該調用一次(且只能調用一次) 。發(fā)出此銷(xiāo)毀請求后,共享內存塊可能會(huì )、也可能不會(huì )立即銷(xiāo)毀,且此行為在不同操作系統之間可能不同。調用unlink()
后再?lài)L試方位其中的數據可能導致內存錯誤。注意: 最后一個(gè)關(guān)閉共享內存訪(fǎng)問(wèn)權限的進(jìn)程可以以任意順序調用unlink()
和close()
。
- buf?
共享內存塊內容的 memoryview 。
- name?
共享內存塊的唯一標識,只讀屬性。
- size?
共享內存塊的字節大小,只讀屬性。
以下示例展示了 SharedMemory
底層的用法:
>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55]) # Modify multiple at once
>>> buffer[4] = 100 # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5]) # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy' # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5]) # Access via shm_a
b'howdy'
>>> shm_b.close() # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink() # Call unlink only once to release the shared memory
以下示例展示了一個(gè)現實(shí)中的例子,使用 SharedMemory
類(lèi)和 NumPy arrays 結合, 從兩個(gè) Python shell 中訪(fǎng)問(wèn)同一個(gè) numpy.ndarray
:
>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8]) # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:] # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name # We did not specify a name so one was chosen for us
'psm_21467_46075'
>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([ 1, 1, 2, 3, 5, 888])
>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([ 1, 1, 2, 3, 5, 888])
>>> # Clean up from within the second Python shell
>>> del c # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()
>>> # Clean up from within the first Python shell
>>> del b # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink() # Free and release the shared memory block at the very end
- class multiprocessing.managers.SharedMemoryManager([address[, authkey]])?
BaseManager
的子類(lèi),可用于管理跨進(jìn)程的共享內存塊。調用
SharedMemoryManager
實(shí)例上的start()
方法會(huì )啟動(dòng)一個(gè)新進(jìn)程。這個(gè)新進(jìn)程的唯一目的就是管理所有由它創(chuàng )建的共享內存塊的生命周期。想要釋放此進(jìn)程管理的所有共享內存塊,可以調用實(shí)例的shutdown()
方法。這會(huì )觸發(fā)執行它管理的所有SharedMemory
對象的SharedMemory.unlink()
方法,然后停止這個(gè)進(jìn)程。通過(guò)SharedMemoryManager
創(chuàng )建SharedMemory
實(shí)例,我們可以避免手動(dòng)跟蹤和釋放共享內存資源。這個(gè)類(lèi)提供了創(chuàng )建和返回
SharedMemory
實(shí)例的方法,以及以共享內存為基礎創(chuàng )建一個(gè)列表類(lèi)對象 (ShareableList
) 的方法。有關(guān)繼承的可選輸入參數 address 和 authkey 以及他們如何用于從進(jìn)程連接已經(jīng)存在的
SharedMemoryManager
服務(wù),參見(jiàn)multiprocessing.managers.BaseManager
。- SharedMemory(size)?
使用
size
參數,創(chuàng )建一個(gè)新的指定字節大小的SharedMemory
對象并返回。
- ShareableList(sequence)?
創(chuàng )建并返回一個(gè)新的
ShareableList
對象,通過(guò)輸入參數sequence
初始化。
下面的案例展示了 SharedMemoryManager
的基本機制:
>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start() # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown() # Calls unlink() on sl, raw_shm, and another_sl
以下案例展示了 SharedMemoryManager
對象的一種可能更方便的使用方式,通過(guò) with
語(yǔ)句來(lái)保證所有共享內存塊在使用完后被釋放。
>>> with SharedMemoryManager() as smm:
... sl = smm.ShareableList(range(2000))
... # Divide the work among two processes, storing partial results in sl
... p1 = Process(target=do_work, args=(sl, 0, 1000))
... p2 = Process(target=do_work, args=(sl, 1000, 2000))
... p1.start()
... p2.start() # A multiprocessing.Pool might be more efficient
... p1.join()
... p2.join() # Wait for all work to complete in both processes
... total_result = sum(sl) # Consolidate the partial results now in sl
在 with
語(yǔ)句中使用 SharedMemoryManager
對象的時(shí)候,使用這個(gè)管理器創(chuàng )建的共享內存塊會(huì )在 with
語(yǔ)句代碼塊結束后被釋放。
- class multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)?
提供一個(gè)可修改的類(lèi) list 對象,其中所有值都存放在共享內存塊中。這限制了可被存儲在其中的值只能是
int
,float
,bool
,str
(每條數據小于10M),bytes
(每條數據小于10M)以及None
這些內置類(lèi)型。它另一個(gè)顯著(zhù)區別于內置list
類(lèi)型的地方在于它的長(cháng)度無(wú)法修改(比如,沒(méi)有 append, insert 等操作)且不支持通過(guò)切片操作動(dòng)態(tài)創(chuàng )建新的ShareableList
實(shí)例。sequence 會(huì )被用來(lái)為一個(gè)新的
ShareableList
填充值。 設為None
則會(huì )基于唯一的共享內存名稱(chēng)關(guān)聯(lián)到已經(jīng)存在的ShareableList
。name 是所請求的共享內存的唯一名稱(chēng),與
SharedMemory
的定義中所描述的一致。 當關(guān)聯(lián)到現有的ShareableList
時(shí),則指明其共享內存塊的唯一名稱(chēng)并將sequence
設為None
。- count(value)?
返回
value
出現的次數。
- index(value)?
返回
value
首次出現的位置,如果value
不存在, 則拋出ValueError
異常。
- shm?
存儲了值的
SharedMemory
實(shí)例。
下面的例子演示了 ShareableList
實(shí)例的基本用法:
>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice' # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a # Use of a ShareableList after call to unlink() is unsupported
下面的例子演示了一個(gè)、兩個(gè)或多個(gè)進(jìn)程如何通過(guò)提供下層的共享內存塊名稱(chēng)來(lái)訪(fǎng)問(wèn)同一個(gè) ShareableList
:
>>> b = shared_memory.ShareableList(range(5)) # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name) # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()
The following examples demonstrates that ShareableList
(and underlying SharedMemory
) objects
can be pickled and unpickled if needed.
Note, that it will still be the same shared object.
This happens, because the deserialized object has
the same unique name and is just attached to an existing
object with the same name (if the object is still alive):
>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl.shm.close()
>>> sl.shm.unlink()