queue --- 一個(gè)同步的隊列類(lèi)?

源代碼: Lib/queue.py


queue 模塊實(shí)現了多生產(chǎn)者、多消費者隊列。這特別適用于消息必須安全地在多線(xiàn)程間交換的線(xiàn)程編程。模塊中的 Queue 類(lèi)實(shí)現了所有所需的鎖定語(yǔ)義。

模塊實(shí)現了三種類(lèi)型的隊列,它們的區別僅僅是條目取回的順序。在 FIFO 隊列中,先添加的任務(wù)先取回。在 LIFO 隊列中,最近被添加的條目先取回(操作類(lèi)似一個(gè)堆棧)。優(yōu)先級隊列中,條目將保持排序( 使用 heapq 模塊 ) 并且最小值的條目第一個(gè)返回。

在內部,這三個(gè)類(lèi)型的隊列使用鎖來(lái)臨時(shí)阻塞競爭線(xiàn)程;然而,它們并未被設計用于線(xiàn)程的重入性處理。

此外,模塊實(shí)現了一個(gè) "簡(jiǎn)單的" FIFO 隊列類(lèi)型, SimpleQueue ,這個(gè)特殊實(shí)現為小功能在交換中提供額外的保障。

queue 模塊定義了下列類(lèi)和異常:

class queue.Queue(maxsize=0)?

Constructor for a FIFO queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

class queue.LifoQueue(maxsize=0)?

LIFO 隊列構造函數。 maxsize 是個(gè)整數,用于設置可以放入隊列中的項目數的上限。當達到這個(gè)大小的時(shí)候,插入操作將阻塞至隊列中的項目被消費掉。如果 maxsize 小于等于零,隊列尺寸為無(wú)限大。

class queue.PriorityQueue(maxsize=0)?

優(yōu)先級隊列構造函數。 maxsize 是個(gè)整數,用于設置可以放入隊列中的項目數的上限。當達到這個(gè)大小的時(shí)候,插入操作將阻塞至隊列中的項目被消費掉。如果 maxsize 小于等于零,隊列尺寸為無(wú)限大。

最小值先被取出( 最小值條目是由 sorted(list(entries))[0] 返回的條目)。條目的典型模式是一個(gè)以下形式的元組: (priority_number, data) 。

如果 data 元素沒(méi)有可比性,數據將被包裝在一個(gè)類(lèi)中,忽略數據值,僅僅比較優(yōu)先級數字 :

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue?

無(wú)界的 FIFO 隊列構造函數。簡(jiǎn)單的隊列,缺少任務(wù)跟蹤等高級功能。

3.7 新版功能.

exception queue.Empty?

對空的 Queue 對象,調用非阻塞的 get() (or get_nowait()) 時(shí),引發(fā)的異常。

exception queue.Full?

對滿(mǎn)的 Queue 對象,調用非阻塞的 put() (or put_nowait()) 時(shí),引發(fā)的異常。

Queue對象?

隊列對象 (Queue, LifoQueue, 或者 PriorityQueue) 提供下列描述的公共方法。

Queue.qsize()?

返回隊列的大致大小。注意,qsize() > 0 不保證后續的 get() 不被阻塞,qsize() < maxsize 也不保證 put() 不被阻塞。

Queue.empty()?

如果隊列為空,返回 True ,否則返回 False 。如果 empty() 返回 True ,不保證后續調用的 put() 不被阻塞。類(lèi)似的,如果 empty() 返回 False ,也不保證后續調用的 get() 不被阻塞。

Queue.full()?

如果隊列是滿(mǎn)的返回 True ,否則返回 False 。如果 full() 返回 True 不保證后續調用的 get() 不被阻塞。類(lèi)似的,如果 full() 返回 False 也不保證后續調用的 put() 不被阻塞。

Queue.put(item, block=True, timeout=None)?

item 放入隊列。如果可選參數 block 是 true 并且 timeoutNone (默認),則在必要時(shí)阻塞至有空閑插槽可用。如果 timeout 是個(gè)正數,將最多阻塞 timeout 秒,如果在這段時(shí)間沒(méi)有可用的空閑插槽,將引發(fā) Full 異常。反之 (block 是 false),如果空閑插槽立即可用,則把 item 放入隊列,否則引發(fā) Full 異常 ( 在這種情況下,timeout 將被忽略)。

Queue.put_nowait(item)?

Equivalent to put(item, block=False).

Queue.get(block=True, timeout=None)?

從隊列中移除并返回一個(gè)項目。如果可選參數 block 是 true 并且 timeoutNone (默認值),則在必要時(shí)阻塞至項目可得到。如果 timeout 是個(gè)正數,將最多阻塞 timeout 秒,如果在這段時(shí)間內項目不能得到,將引發(fā) Empty 異常。反之 (block 是 false) , 如果一個(gè)項目立即可得到,則返回一個(gè)項目,否則引發(fā) Empty 異常 (這種情況下,timeout 將被忽略)。

POSIX系統3.0之前,以及所有版本的Windows系統中,如果 block 是 true 并且 timeoutNone , 這個(gè)操作將進(jìn)入基礎鎖的不間斷等待。這意味著(zhù),沒(méi)有異常能發(fā)生,尤其是 SIGINT 將不會(huì )觸發(fā) KeyboardInterrupt 異常。

Queue.get_nowait()?

相當于 get(False) 。

提供了兩個(gè)方法,用于支持跟蹤 排隊的任務(wù) 是否 被守護的消費者線(xiàn)程 完整的處理。

Queue.task_done()?

表示前面排隊的任務(wù)已經(jīng)被完成。被隊列的消費者線(xiàn)程使用。每個(gè) get() 被用于獲取一個(gè)任務(wù), 后續調用 task_done() 告訴隊列,該任務(wù)的處理已經(jīng)完成。

如果 join() 當前正在阻塞,在所有條目都被處理后,將解除阻塞(意味著(zhù)每個(gè) put() 進(jìn)隊列的條目的 task_done() 都被收到)。

如果被調用的次數多于放入隊列中的項目數量,將引發(fā) ValueError 異常 。

Queue.join()?

阻塞至隊列中所有的元素都被接收和處理完畢。

當條目添加到隊列的時(shí)候,未完成任務(wù)的計數就會(huì )增加。每當消費者線(xiàn)程調用 task_done() 表示這個(gè)條目已經(jīng)被回收,該條目所有工作已經(jīng)完成,未完成計數就會(huì )減少。當未完成計數降到零的時(shí)候, join() 阻塞被解除。

如何等待排隊的任務(wù)被完成的示例:

import threading, queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

SimpleQueue 對象?

SimpleQueue 對象提供下列描述的公共方法。

SimpleQueue.qsize()?

返回隊列的大致大小。注意,qsize() > 0 不保證后續的 get() 不被阻塞。

SimpleQueue.empty()?

如果隊列為空,返回 True ,否則返回 False 。如果 empty() 返回 False ,不保證后續調用的 get() 不被阻塞。

SimpleQueue.put(item, block=True, timeout=None)?

item 放入隊列。此方法永不阻塞,始終成功(除了潛在的低級錯誤,例如內存分配失?。???蛇x參數 blocktimeout 僅僅是為了保持 Queue.put() 的兼容性而提供,其值被忽略。

CPython implementation detail: This method has a C implementation which is reentrant. That is, a put() or get() call can be interrupted by another put() call in the same thread without deadlocking or corrupting internal state inside the queue. This makes it appropriate for use in destructors such as __del__ methods or weakref callbacks.

SimpleQueue.put_nowait(item)?

Equivalent to put(item, block=False), provided for compatibility with Queue.put_nowait().

SimpleQueue.get(block=True, timeout=None)?

從隊列中移除并返回一個(gè)項目。如果可選參數 block 是 true 并且 timeoutNone (默認值),則在必要時(shí)阻塞至項目可得到。如果 timeout 是個(gè)正數,將最多阻塞 timeout 秒,如果在這段時(shí)間內項目不能得到,將引發(fā) Empty 異常。反之 (block 是 false) , 如果一個(gè)項目立即可得到,則返回一個(gè)項目,否則引發(fā) Empty 異常 (這種情況下,timeout 將被忽略)。

SimpleQueue.get_nowait()?

相當于 get(False) 。

參見(jiàn)

類(lèi) multiprocessing.Queue

一個(gè)用于多進(jìn)程上下文的隊列類(lèi)(而不是多線(xiàn)程)。

collections.deque 是無(wú)界隊列的一個(gè)替代實(shí)現,具有快速的不需要鎖并且支持索引的原子化 append()popleft() 操作。