日志操作手冊?
- 作者
Vinay Sajip <vinay_sajip at red-dove dot com>
本頁(yè)包含了許多日志記錄相關(guān)的概念,這些概念在過(guò)去一直被認為很有用。
在多個(gè)模塊中記錄日志?
無(wú)論對 logging.getLogger('someLogger')
進(jìn)行多少次調用,都會(huì )返回同一個(gè) logger 對象的引用。不僅在同一個(gè)模塊內如此,只要是在同一個(gè) Python 解釋器進(jìn)程中,跨模塊調用也是一樣。同樣是引用同一個(gè)對象,應用程序也可以在一個(gè)模塊中定義和配置一個(gè)父 logger,而在另一個(gè)單獨的模塊中創(chuàng )建(但不配置)子 logger,對于子 logger 的所有調用都會(huì )傳給父 logger。以下是主模塊:
import logging
import auxiliary_module
# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')
以下是輔助模塊:
import logging
# create logger
module_logger = logging.getLogger('spam_application.auxiliary')
class Auxiliary:
def __init__(self):
self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
self.logger.info('creating an instance of Auxiliary')
def do_something(self):
self.logger.info('doing something')
a = 1 + 1
self.logger.info('done doing something')
def some_function():
module_logger.info('received a call to "some_function"')
輸出結果會(huì )像這樣:
2005-03-23 23:47:11,663 - spam_application - INFO -
creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
done with auxiliary_module.some_function()
在多個(gè)線(xiàn)程中記錄日志?
多線(xiàn)程記錄日志并不需要特殊處理,以下示例演示了在主線(xiàn)程(起始線(xiàn)程)和其他線(xiàn)程中記錄日志的過(guò)程:
import logging
import threading
import time
def worker(arg):
while not arg['stop']:
logging.debug('Hi from myfunc')
time.sleep(0.5)
def main():
logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
info = {'stop': False}
thread = threading.Thread(target=worker, args=(info,))
thread.start()
while True:
try:
logging.debug('Hello from main')
time.sleep(0.75)
except KeyboardInterrupt:
info['stop'] = True
break
thread.join()
if __name__ == '__main__':
main()
腳本會(huì )運行輸出類(lèi)似下面的內容:
0 Thread-1 Hi from myfunc
3 MainThread Hello from main
505 Thread-1 Hi from myfunc
755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc
以上如期顯示了不同線(xiàn)程的日志是交替輸出的。當然更多的線(xiàn)程也會(huì )如此。
多個(gè) handler 和多種 formatter?
日志是個(gè)普通的 Python 對象。 addHandler()
方法可加入不限數量的日志 handler。有時(shí)候,應用程序需把嚴重錯誤信息記入文本文件,而將一般錯誤或其他級別的信息輸出到控制臺。若要進(jìn)行這樣的設定,只需多配置幾個(gè)日志 handler 即可,應用程序的日志調用代碼可以保持不變。下面對之前的分模塊日志示例略做修改:
import logging
logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)
# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
需要注意的是,“應用程序”內的代碼并不關(guān)心是否存在多個(gè)日志 handler。示例中所做的改變,只是新加入并配置了一個(gè)名為 fh 的 handler。
在編寫(xiě)和測試應用程序時(shí),若能創(chuàng )建日志 handler 對不同嚴重級別的日志信息進(jìn)行過(guò)濾,這將十分有用。調試時(shí)無(wú)需用多條 print
語(yǔ)句,而是采用 logger.debug
:print 語(yǔ)句以后還得注釋或刪掉,而 logger.debug 語(yǔ)句可以原樣留在源碼中保持靜默。當需要再次調試時(shí),只要改變日志對象或 handler 的嚴重級別即可。
在多個(gè)地方記錄日志?
假定要根據不同的情況將日志以不同的格式寫(xiě)入控制臺和文件。比如把 DEBUG 以上級別的日志信息寫(xiě)于文件,并且把 INFO 以上的日志信息輸出到控制臺。再假設日志文件需要包含時(shí)間戳,控制臺信息則不需要。以下演示了做法:
import logging
# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M',
filename='/temp/myapp.log',
filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
當運行后,你會(huì )看到控制臺如下所示
root : INFO Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO How quickly daft jumping zebras vex.
myapp.area2 : WARNING Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR The five boxing wizards jump quickly.
而日志文件將如下所示:
10-22 22:19 root INFO Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1 INFO How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2 ERROR The five boxing wizards jump quickly.
如您所見(jiàn),DEBUG 級別的日志信息只出現在了文件中,而其他信息則兩個(gè)地方都會(huì )輸出。
上述示例只用到了控制臺和文件 handler,當然還可以自由組合任意數量的日志 handler。
日志配置服務(wù)器示例?
以下是一個(gè)用到了日志配置服務(wù)器的模塊示例:
import logging
import logging.config
import time
import os
# read initial config file
logging.config.fileConfig('logging.conf')
# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()
logger = logging.getLogger('simpleExample')
try:
# loop through logging calls to see the difference
# new configurations make, until Ctrl+C is pressed
while True:
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
time.sleep(5)
except KeyboardInterrupt:
# cleanup
logging.config.stopListening()
t.join()
以下腳本將接受文件名作為參數,然后將此文件發(fā)送到服務(wù)器,前面加上文件的二進(jìn)制編碼長(cháng)度,做為新的日志配置:
#!/usr/bin/env python
import socket, sys, struct
with open(sys.argv[1], 'rb') as f:
data_to_send = f.read()
HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')
處理日志 handler 的阻塞?
有時(shí)你必須讓日志記錄處理程序的運行不會(huì )阻塞你要記錄日志的線(xiàn)程。 這在 Web 應用程序中是很常見(jiàn),當然在其他場(chǎng)景中也可能發(fā)生。
有一種原因往往會(huì )讓程序表現遲鈍,這就是 SMTPHandler
:由于很多因素是開(kāi)發(fā)人員無(wú)法控制的(例如郵件或網(wǎng)絡(luò )基礎設施的性能不佳),發(fā)送電子郵件可能需要很長(cháng)時(shí)間。不過(guò)幾乎所有網(wǎng)絡(luò ) handler 都可能會(huì )發(fā)生阻塞:即使是 SocketHandler
操作也可能在后臺執行 DNS 查詢(xún),而這種查詢(xún)實(shí)在太慢了(并且 DNS 查詢(xún)還可能在很底層的套接字庫代碼中,位于 Python 層之下,超出了可控范圍)。
有一種解決方案是分成兩部分實(shí)現。第一部分,針對那些對性能有要求的關(guān)鍵線(xiàn)程,只為日志對象連接一個(gè) QueueHandler
。日志對象只需簡(jiǎn)單地寫(xiě)入隊列即可,可為隊列設置足夠大的容量,或者可以在初始化時(shí)不設置容量上限。盡管為以防萬(wàn)一,可能需要在代碼中捕獲 queue.Full
異常,不過(guò)隊列寫(xiě)入操作通常會(huì )很快得以處理。如果要開(kāi)發(fā)庫代碼,包含性能要求較高的線(xiàn)程,為了讓使用該庫的開(kāi)發(fā)人員受益,請務(wù)必在開(kāi)發(fā)文檔中進(jìn)行標明(包括建議僅連接 QueueHandlers
)。
解決方案的另一部分就是 QueueListener
,它被設計為 QueueHandler
的對應部分。QueueListener
非常簡(jiǎn)單:傳入一個(gè)隊列和一些 handler,并啟動(dòng)一個(gè)內部線(xiàn)程,用于偵聽(tīng) QueueHandlers``(或其他 ``LogRecords
源)發(fā)送的 LogRecord 隊列。LogRecords
會(huì )從隊列中移除并傳給 handler 處理。
QueueListener
作為單獨的類(lèi),好處就是可以用同一個(gè)實(shí)例為多個(gè) QueueHandlers
服務(wù)。這比把現有 handler 類(lèi)線(xiàn)程化更加資源友好,后者會(huì )每個(gè) handler 會(huì )占用一個(gè)線(xiàn)程,卻沒(méi)有特別的好處。
以下是這兩個(gè)類(lèi)的運用示例(省略了 import 語(yǔ)句):
que = queue.Queue(-1) # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()
在運行后會(huì )產(chǎn)生:
MainThread: Look out!
在 3.5 版更改: 在 Python 3.5 之前,QueueListener
總會(huì )把由隊列接收到的每條信息都傳遞給已初始化的每個(gè)處理程序。(因為這里假定級別過(guò)濾操作已在寫(xiě)入隊列時(shí)完成了。)從 3.5 版開(kāi)始,可以修改這種處理方式,只要將關(guān)鍵字參數 respect_handler_level=True
傳給偵聽(tīng)器的構造函數即可。這樣偵聽(tīng)器將會(huì )把每條信息的級別與 handler 的級別進(jìn)行比較,只在適配時(shí)才會(huì )將信息傳給 handler 。
通過(guò)網(wǎng)絡(luò )收發(fā)日志事件?
假定現在要通過(guò)網(wǎng)絡(luò )發(fā)送日志事件,并在接收端進(jìn)行處理。有一種簡(jiǎn)單的方案,就是在發(fā)送端的根日志對象連接一個(gè) SocketHandler
實(shí)例:
import logging, logging.handlers
rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)
# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')
# Now, define a couple of other loggers which might represent areas in your
# application:
logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')
logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')
在接收端,可以用 socketserver
模塊設置一個(gè)接收器。簡(jiǎn)要示例如下:
import pickle
import logging
import logging.handlers
import socketserver
import struct
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Handler for a streaming logging request.
This basically logs the record using whatever logging policy is
configured locally.
"""
def handle(self):
"""
Handle multiple requests - each expected to be a 4-byte length,
followed by the LogRecord in pickle format. Logs the record
according to whatever policy is configured locally.
"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack('>L', chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
def unPickle(self, data):
return pickle.loads(data)
def handleLogRecord(self, record):
# if a name is specified, we use the named logger rather than the one
# implied by the record.
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
# N.B. EVERY record gets logged. This is because Logger.handle
# is normally called AFTER logger-level filtering. If you want
# to do filtering, do it at the client end to save wasting
# cycles and network bandwidth!
logger.handle(record)
class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
"""
Simple TCP socket-based logging receiver suitable for testing.
"""
allow_reuse_address = True
def __init__(self, host='localhost',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler):
socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None
def serve_until_stopped(self):
import select
abort = 0
while not abort:
rd, wr, ex = select.select([self.socket.fileno()],
[], [],
self.timeout)
if rd:
self.handle_request()
abort = self.abort
def main():
logging.basicConfig(
format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
tcpserver = LogRecordSocketReceiver()
print('About to start TCP server...')
tcpserver.serve_until_stopped()
if __name__ == '__main__':
main()
先運行服務(wù)端,再運行客戶(hù)端??蛻?hù)端控制臺不會(huì )顯示什么信息;在服務(wù)端應該會(huì )看到如下內容:
About to start TCP server...
59 root INFO Jackdaws love my big sphinx of quartz.
59 myapp.area1 DEBUG Quick zephyrs blow, vexing daft Jim.
69 myapp.area1 INFO How quickly daft jumping zebras vex.
69 myapp.area2 WARNING Jail zesty vixen who grabbed pay from quack.
69 myapp.area2 ERROR The five boxing wizards jump quickly.
請注意,某些時(shí)候 pickle 會(huì )存在一些安全問(wèn)題。若有問(wèn)題可換用自己的序列化方案,只要覆蓋 makePickle()
方法即可,并調整上述腳本以采用自己的序列化方案。
Running a logging socket listener in production?
To run a logging listener in production, you may need to use a process-management tool such as Supervisor. Here is a Gist which provides the bare-bones files to run the above functionality using Supervisor: you will need to change the /path/to/ parts in the Gist to reflect the actual paths you want to use.
在自己的輸出日志中添加上下文信息?
有時(shí),除了調用日志對象時(shí)傳入的參數之外,還希望日志輸出中能包含上下文信息。 比如在網(wǎng)絡(luò )應用程序中,可能需要在日志中記錄某客戶(hù)端的信息(如遠程客戶(hù)端的用戶(hù)名或 IP 地址)。 這雖然可以用 extra 參數實(shí)現,但傳遞起來(lái)并不總是很方便。 雖然為每個(gè)網(wǎng)絡(luò )連接都創(chuàng )建 Logger
實(shí)例貌似不錯,但并不是個(gè)好主意,因為這些實(shí)例不會(huì )被垃圾回收。 雖然在實(shí)踐中不是問(wèn)題,但當 Logger
實(shí)例的數量取決于應用程序要采用的日志粒度時(shí),如果 Logger
實(shí)例的數量實(shí)際上是無(wú)限的,則有可能難以管理。
利用 LoggerAdapter 傳遞上下文信息?
要傳遞上下文信息和日志事件信息,有一種簡(jiǎn)單方案是利用 LoggerAdapter
類(lèi)。這個(gè)類(lèi)設計得類(lèi)似 Logger
,所以可以直接調用 debug()
、info()
、 warning()
、 error()
、exception()
、 critical()
和 log()
。這些方法的簽名與 Logger
對應的方法相同,所以這兩類(lèi)實(shí)例可以交換使用。
當你創(chuàng )建一個(gè) LoggerAdapter
的實(shí)例時(shí),你會(huì )傳入一個(gè) Logger
的實(shí)例和一個(gè)包含了上下文信息的字典對象。當你調用一個(gè) LoggerAdapter
實(shí)例的方法時(shí),它會(huì )把調用委托給內部的 Logger
的實(shí)例,并為其整理相關(guān)的上下文信息。這是 LoggerAdapter
的一個(gè)代碼片段:
def debug(self, msg, /, *args, **kwargs):
"""
Delegate a debug call to the underlying logger, after adding
contextual information from this adapter instance.
"""
msg, kwargs = self.process(msg, kwargs)
self.logger.debug(msg, *args, **kwargs)
LoggerAdapter
的 process()
方法是將上下文信息添加到日志的輸出中。 它傳入日志消息和日志調用的關(guān)鍵字參數,并傳回(隱式的)這些修改后的內容去調用底層的日志記錄器。此方法的默認參數只是一個(gè)消息字段,但留有一個(gè) 'extra' 的字段作為關(guān)鍵字參數傳給構造器。當然,如果你在調用適配器時(shí)傳入了一個(gè) 'extra' 字段的參數,它會(huì )被靜默覆蓋。
使用 'extra' 的優(yōu)點(diǎn)是這些鍵值對會(huì )被傳入 LogRecord
實(shí)例的 __dict__ 中,讓你通過(guò) Formatter
的實(shí)例直接使用定制的字符串,實(shí)例能找到這個(gè)字典類(lèi)對象的鍵。 如果你需要一個(gè)其他的方法,比如說(shuō),想要在消息字符串前后增加上下文信息,你只需要創(chuàng )建一個(gè) LoggerAdapter
的子類(lèi),并覆蓋它的 process()
方法來(lái)做你想做的事情,以下是一個(gè)簡(jiǎn)單的示例:
class CustomAdapter(logging.LoggerAdapter):
"""
This example adapter expects the passed in dict-like object to have a
'connid' key, whose value in brackets is prepended to the log message.
"""
def process(self, msg, kwargs):
return '[%s] %s' % (self.extra['connid'], msg), kwargs
你可以這樣使用:
logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})
然后,你記錄在適配器中的任何事件消息前將添加``some_conn_id``的值。
使用除字典之外的其它對象傳遞上下文信息?
你不需要將一個(gè)實(shí)際的字典傳遞給 LoggerAdapter
-你可以傳入一個(gè)實(shí)現了``__getitem__`` 和``__iter__``的類(lèi)的實(shí)例,這樣它就像是一個(gè)字典。這對于你想動(dòng)態(tài)生成值(而字典中的值往往是常量)將很有幫助。
使用過(guò)濾器傳遞上下文信息?
你也可以使用一個(gè)用戶(hù)定義的類(lèi) Filter
在日志輸出中添加上下文信息。Filter
的實(shí)例是被允許修改傳入的 LogRecords
,包括添加其他的屬性,然后可以使用合適的格式化字符串輸出,或者可以使用一個(gè)自定義的類(lèi) Formatter
。
例如,在一個(gè)web應用程序中,正在處理的請求(或者至少是請求的一部分),可以存儲在一個(gè)線(xiàn)程本地 (threading.local
) 變量中,然后從``Filter`` 中去訪(fǎng)問(wèn)。請求中的信息,如IP地址和用戶(hù)名將被存儲在``LogRecord``中,使用上例 LoggerAdapter
中的 'ip' 和 'user' 屬性名。在這種情況下,可以使用相同的格式化字符串來(lái)得到上例中類(lèi)似的輸出結果。這是一段示例代碼:
import logging
from random import choice
class ContextFilter(logging.Filter):
"""
This is a filter which injects contextual information into the log.
Rather than use actual contextual information, we just use random
data in this demo.
"""
USERS = ['jim', 'fred', 'sheila']
IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']
def filter(self, record):
record.ip = choice(ContextFilter.IPS)
record.user = choice(ContextFilter.USERS)
return True
if __name__ == '__main__':
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
a1 = logging.getLogger('a.b.c')
a2 = logging.getLogger('d.e.f')
f = ContextFilter()
a1.addFilter(f)
a2.addFilter(f)
a1.debug('A debug message')
a1.info('An info message with %s', 'some parameters')
for x in range(10):
lvl = choice(levels)
lvlname = logging.getLevelName(lvl)
a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')
在運行時(shí),產(chǎn)生如下內容:
2010-09-06 22:38:15,292 a.b.c DEBUG IP: 123.231.231.123 User: fred A debug message
2010-09-06 22:38:15,300 a.b.c INFO IP: 192.168.0.1 User: sheila An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 127.0.0.1 User: jim A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 127.0.0.1 User: sheila A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR IP: 123.231.231.123 User: fred A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1 User: jim A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1 User: sheila A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG IP: 192.168.0.1 User: jim A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR IP: 127.0.0.1 User: sheila A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG IP: 123.231.231.123 User: fred A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO IP: 123.231.231.123 User: fred A message at INFO level with 2 parameters
從多個(gè)進(jìn)程記錄至單個(gè)文件?
盡管 logging 是線(xiàn)程安全的,將單個(gè)進(jìn)程中的多個(gè)線(xiàn)程日志記錄至單個(gè)文件也 是 受支持的,但將 多個(gè)進(jìn)程 中的日志記錄至單個(gè)文件則 不是 受支持的,因為在 Python 中并沒(méi)有在多個(gè)進(jìn)程中實(shí)現對單個(gè)文件訪(fǎng)問(wèn)的序列化的標準方案。 如果你需要將多個(gè)進(jìn)程中的日志記錄至單個(gè)文件,有一個(gè)方案是讓所有進(jìn)程都將日志記錄至一個(gè) SocketHandler
,然后用一個(gè)實(shí)現了套接字服務(wù)器的單獨進(jìn)程一邊從套接字中讀取一邊將日志記錄至文件。 (如果愿意的話(huà),你可以在一個(gè)現有進(jìn)程中專(zhuān)門(mén)開(kāi)一個(gè)線(xiàn)程來(lái)執行此項功能。) 這一部分 文檔對此方式有更詳細的介紹,并包含一個(gè)可用的套接字接收器,你自己的應用可以在此基礎上進(jìn)行適配。
你也可以編寫(xiě)你自己的處理程序,讓其使用 multiprocessing
模塊中的 Lock
類(lèi)來(lái)順序訪(fǎng)問(wèn)你的多個(gè)進(jìn)程中的文件。 現有的 FileHandler
及其子類(lèi)目前并不使用 multiprocessing
,盡管它們將來(lái)可能會(huì )這樣做。 請注意在目前,multiprocessing
模塊并未在所有平臺上都提供可用的鎖功能 (參見(jiàn) https://bugs.python.org/issue3770)。
或者,你也可以使用 Queue
和 QueueHandler
將所有的日志事件發(fā)送至你的多進(jìn)程應用的一個(gè)進(jìn)程中。 以下示例腳本演示了如何執行此操作。 在示例中,一個(gè)單獨的監聽(tīng)進(jìn)程負責監聽(tīng)其他進(jìn)程的日志事件,并根據自己的配置記錄。 盡管示例只演示了這種方法(例如你可能希望使用單獨的監聽(tīng)線(xiàn)程而非監聽(tīng)進(jìn)程 —— 它們的實(shí)現是類(lèi)似的),但你也可以在應用程序的監聽(tīng)進(jìn)程和其他進(jìn)程使用不同的配置,它可以作為滿(mǎn)足你特定需求的一個(gè)基礎:
# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing
# Next two import lines for this demo only
from random import choice, random
import time
#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
configurer()
while True:
try:
record = queue.get()
if record is None: # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
LOGGERS = ['a.b.c', 'd.e.f']
MESSAGES = [
'Random message #1',
'Random message #2',
'Random message #3',
]
# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
h = logging.handlers.QueueHandler(queue) # Just the one handler needed
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)
# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
configurer(queue)
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
for i in range(10):
time.sleep(random())
logger = logging.getLogger(choice(LOGGERS))
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, message)
print('Worker finished: %s' % name)
# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process,
args=(queue, listener_configurer))
listener.start()
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
queue.put_nowait(None)
listener.join()
if __name__ == '__main__':
main()
上面腳本的一個(gè)變種,仍然在主進(jìn)程中記錄日志,但使用一個(gè)單獨的線(xiàn)程:
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time
def logger_thread(q):
while True:
record = q.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
def worker_process(q):
qh = logging.handlers.QueueHandler(q)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
if __name__ == '__main__':
q = Queue()
d = {
'version': 1,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO',
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed',
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed',
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'level': 'ERROR',
'formatter': 'detailed',
},
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console', 'file', 'errors']
},
}
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
workers.append(wp)
wp.start()
logging.config.dictConfig(d)
lp = threading.Thread(target=logger_thread, args=(q,))
lp.start()
# At this point, the main process could do some useful work of its own
# Once it's done that, it can wait for the workers to terminate...
for wp in workers:
wp.join()
# And now tell the logging thread to finish up, too
q.put(None)
lp.join()
這段變種的代碼展示了如何使用特定的日志記錄配置 - 例如``foo``記錄器使用了特殊的處理程序,將 foo
子系統中所有的事件記錄至一個(gè)文件 mplog-foo.log
。在主進(jìn)程(即使是在工作進(jìn)程中產(chǎn)生的日志事件)的日志記錄機制中將直接使用恰當的配置。
concurrent.futures.ProcessPoolExecutor 的用法?
若要利用 concurrent.futures.ProcessPoolExecutor
啟動(dòng)工作進(jìn)程,創(chuàng )建隊列的方式應稍有不同。不能是:
queue = multiprocessing.Queue(-1)
而應是:
queue = multiprocessing.Manager().Queue(-1) # also works with the examples above
然后就可以將以下工作進(jìn)程的創(chuàng )建過(guò)程:
workers = []
for i in range(10):
worker = multiprocessing.Process(target=worker_process,
args=(queue, worker_configurer))
workers.append(worker)
worker.start()
for w in workers:
w.join()
改為 (記得要先導入 concurrent.futures
):
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for i in range(10):
executor.submit(worker_process, queue, worker_configurer)
Deploying Web applications using Gunicorn and uWSGI?
When deploying Web applications using Gunicorn or uWSGI (or similar), multiple worker
processes are created to handle client requests. In such environments, avoid creating
file-based handlers directly in your web application. Instead, use a
SocketHandler
to log from the web application to a listener in a separate
process. This can be set up using a process management tool such as Supervisor - see
Running a logging socket listener in production for more details.
輪換日志文件?
有時(shí),你希望當日志文件不斷記錄增長(cháng)至一定大小時(shí),打開(kāi)一個(gè)新的文件接著(zhù)記錄。 你可能希望只保留一定數量的日志文件,當不斷的創(chuàng )建文件到達該數量時(shí),又覆蓋掉最開(kāi)始的文件形成循環(huán)。 對于這種使用場(chǎng)景,日志包提供了 RotatingFileHandler
:
import glob
import logging
import logging.handlers
LOG_FILENAME = 'logging_rotatingfile_example.out'
# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)
# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
LOG_FILENAME, maxBytes=20, backupCount=5)
my_logger.addHandler(handler)
# Log some messages
for i in range(20):
my_logger.debug('i = %d' % i)
# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)
for filename in logfiles:
print(filename)
結果應該是6個(gè)單獨的文件,每個(gè)文件都包含了應用程序的部分歷史日志:
logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5
最新的文件始終是:file:logging_rotatingfile_example.out,每次到達大小限制時(shí),都會(huì )使用后綴``.1``重命名。每個(gè)現有的備份文件都會(huì )被重命名并增加其后綴(例如``.1`` 變?yōu)閌`.2``),而``.6``文件會(huì )被刪除掉。
顯然,這個(gè)例子將日志長(cháng)度設置得太小,這是一個(gè)極端的例子。 你可能希望將 maxBytes 設置為一個(gè)合適的值。
使用其他日志格式化方式?
當日志模塊被添加至 Python 標準庫時(shí),只有一種格式化消息內容的方法即 %-formatting。 在那之后,Python 又增加了兩種格式化方法: string.Template
(在 Python 2.4 中新增) 和 str.format()
(在 Python 2.6 中新增)。
日志(從 3.2 開(kāi)始)為這兩種格式化方式提供了更多支持。Formatter
類(lèi)可以添加一個(gè)額外的可選關(guān)鍵字參數 style
。它的默認值是 '%'
,其他的值 '{'
和 '$'
也支持,對應了其他兩種格式化樣式。其保持了向后兼容(如您所愿),但通過(guò)顯示指定樣式參數,你可以指定格式化字符串的方式是使用 str.format()
或 string.Template
。 這里是一個(gè)控制臺會(huì )話(huà)的示例,展示了這些方式:
>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
... style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
... style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>
請注意最終輸出到日志的消息格式完全獨立于單條日志消息的構造方式。 它仍然可以使用 %-formatting,如下所示:
>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>
日志調用(logger.debug()
、logger.info()
等)接受的位置參數只會(huì )用于日志信息本身,而關(guān)鍵字參數僅用于日志調用的可選處理參數(如關(guān)鍵字參數 exc_info
表示應記錄跟蹤信息, extra
則標識了需要加入日志的額外上下文信息)。所以不能直接用 str.format()
或 string.Template
語(yǔ)法進(jìn)行日志調用,因為日志包在內部使用 %-f 格式來(lái)合并格式串和參數變量。在保持向下兼容性時(shí),這一點(diǎn)不會(huì )改變,因為已有代碼中的所有日志調用都會(huì )使用%-f 格式串。
還有一種方法可以構建自己的日志信息,就是利用 {}- 和 $- 格式?;叵胍幌?,任意對象都可用為日志信息的格式串,日志包將會(huì )調用該對象的 str()
方法,以獲取最終的格式串。不妨看下一下兩個(gè)類(lèi):
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
上述兩個(gè)類(lèi)均可代替格式串,使得能用 {}- 或 $-formatting 構建最終的“日志信息”部分,這些信息將出現在格式化后的日志輸出中,替換 %(message)s 或“{message}”或“$message”。每次寫(xiě)入日志時(shí)都要使用類(lèi)名,有點(diǎn)不大實(shí)用,但如果用上 __ 之類(lèi)的別名就相當合適了(雙下劃線(xiàn) --- 不要與 _ 混淆,單下劃線(xiàn)用作 gettext.gettext()
或相關(guān)函數的同義詞/別名 )。
Python 并沒(méi)有上述兩個(gè)類(lèi),當然復制粘貼到自己的代碼中也很容易。用法可如下所示(假定在名為 wherever
的模塊中聲明):
>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
... point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
上述示例用了 print()
演示格式化輸出的過(guò)程,實(shí)際記錄日志時(shí)當然會(huì )用類(lèi)似 logger.debug()
的方法來(lái)應用。
值得注意的是,上述做法對性能并沒(méi)什么影響:格式化過(guò)程其實(shí)不是在日志記錄調用時(shí)發(fā)生的,而是在日志信息即將由 handler 輸出到日志時(shí)發(fā)生。因此,唯一可能讓人困惑的稍不尋常的地方,就是包裹在格式串和參數外面的括號,而不是格式串。因為 __ 符號只是對 XXXMessage 類(lèi)的構造函數調用的語(yǔ)法糖。
只要愿意,上述類(lèi)似的效果即可用 LoggerAdapter
實(shí)現,如下例所示:
import logging
class Message:
def __init__(self, fmt, args):
self.fmt = fmt
self.args = args
def __str__(self):
return self.fmt.format(*self.args)
class StyleAdapter(logging.LoggerAdapter):
def __init__(self, logger, extra=None):
super().__init__(logger, extra or {})
def log(self, level, msg, /, *args, **kwargs):
if self.isEnabledFor(level):
msg, kwargs = self.process(msg, kwargs)
self.logger._log(level, Message(msg, args), (), **kwargs)
logger = StyleAdapter(logging.getLogger(__name__))
def main():
logger.debug('Hello, {}', 'world!')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
main()
在用 Python 3.2 以上版本運行時(shí),上述代碼應該會(huì )把 Hello, world!
寫(xiě)入日志。
自定義 LogRecord
?
每條日志事件都由一個(gè) LogRecord
實(shí)例表示。當某事件要記入日志并且沒(méi)有被某級別過(guò)濾掉時(shí),就會(huì )創(chuàng )建一個(gè) LogRecord
對象,并將有關(guān)事件的信息填入,傳給該日志對象的 handler(及其祖先,直至對象禁止向上傳播為止)。在 Python 3.2 之前,只有兩個(gè)地方會(huì )進(jìn)行事件的創(chuàng )建:
Logger.makeRecord()
,在事件正常記入日志的過(guò)程中調用。這會(huì )直接調用LogRecord
來(lái)創(chuàng )建一個(gè)實(shí)例。makeLogRecord()
,調用時(shí)會(huì )帶上一個(gè)字典參數,其中存放著(zhù)要加入 LogRecord 的屬性。這通常在通過(guò)網(wǎng)絡(luò )接收到合適的字典時(shí)調用(如通過(guò)SocketHandler
以 pickle 形式,或通過(guò)HTTPHandler
以 JSON 形式)。
于是這意味著(zhù)若要對 LogRecord
進(jìn)行定制,必須進(jìn)行下述某種操作。
創(chuàng )建
Logger
自定義子類(lèi),重寫(xiě)Logger.makeRecord()
,并在實(shí)例化所需日志對象之前用setLoggerClass()
進(jìn)行設置。為日志對象添加
Filter
或 handler,當其filter()
方法被調用時(shí),會(huì )執行必要的定制操作。
比如說(shuō)在有多個(gè)不同庫要完成不同操作的場(chǎng)景下,第一種方式會(huì )有點(diǎn)笨拙。 每次都要嘗試設置自己的 Logger
子類(lèi),而起作用的是最后一次嘗試。
第二種方式在多數情況下效果都比較良好,但不允許你使用特殊化的 LogRecord
子類(lèi)。 庫開(kāi)發(fā)者可以為他們的日志記錄器設置合適的過(guò)濾器,但他們應當要記得每次引入新的日志記錄器時(shí)都需如此(他們只需通過(guò)添加新的包或模塊并執行以下操作即可):
logger = logging.getLogger(__name__)
或許這樣要顧及太多事情。開(kāi)發(fā)人員還可以將過(guò)濾器附加到其頂級日志對象的 NullHandler
中,但如果應用程序開(kāi)發(fā)人員將 handler 附加到較底層庫的日志對象,則不會(huì )調用該過(guò)濾器 --- 所以 handler 輸出的內容不會(huì )符合庫開(kāi)發(fā)人員的預期。
在 Python 3.2 以上版本中,LogRecord
的創(chuàng )建是通過(guò)工廠(chǎng)對象完成的,工廠(chǎng)對象可以指定。工廠(chǎng)對象只是一個(gè)可調用對象,可以用 setLogRecordFactory()
進(jìn)行設置,并用 getLogRecordFactory()
進(jìn)行查詢(xún)。工廠(chǎng)對象的調用參數與 LogRecord
的構造函數相同,因為 LogRecord
是工廠(chǎng)對象的默認設置。
這種方式可以讓自定義工廠(chǎng)對象完全控制 LogRecord 的創(chuàng )建過(guò)程。比如可以返回一個(gè)子類(lèi),或者在創(chuàng )建的日志對象中加入一些額外的屬性,使用方式如下所示:
old_factory = logging.getLogRecordFactory()
def record_factory(*args, **kwargs):
record = old_factory(*args, **kwargs)
record.custom_attribute = 0xdecafbad
return record
logging.setLogRecordFactory(record_factory)
這種模式允許不同的庫將多個(gè)工廠(chǎng)對象鏈在一起,只要不會(huì )覆蓋彼此的屬性或標準屬性,就不會(huì )出現意外。但應記住,工廠(chǎng)鏈中的每個(gè)節點(diǎn)都會(huì )增加日志操作的運行開(kāi)銷(xiāo),本技術(shù)僅在采用 Filter
無(wú)法達到目標時(shí)才應使用。
子類(lèi)化 QueueHandler - ZeroMQ 示例?
你可以使用 QueueHandler
子類(lèi)將消息發(fā)送給其他類(lèi)型的隊列 ,比如 ZeroMQ 'publish' 套接字。 在以下示例中,套接字將單獨創(chuàng )建并傳給處理句柄 (作為它的 'queue'):
import zmq # using pyzmq, the Python binding for ZeroMQ
import json # for serializing records portably
ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB) # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556') # or wherever
class ZeroMQSocketHandler(QueueHandler):
def enqueue(self, record):
self.queue.send_json(record.__dict__)
handler = ZeroMQSocketHandler(sock)
當然還有其他方案,比如通過(guò) hander 傳入所需數據,以創(chuàng )建 socket:
class ZeroMQSocketHandler(QueueHandler):
def __init__(self, uri, socktype=zmq.PUB, ctx=None):
self.ctx = ctx or zmq.Context()
socket = zmq.Socket(self.ctx, socktype)
socket.bind(uri)
super().__init__(socket)
def enqueue(self, record):
self.queue.send_json(record.__dict__)
def close(self):
self.queue.close()
子類(lèi)化 QueueListener —— ZeroMQ 示例?
你還可以子類(lèi)化 QueueListener
來(lái)從其他類(lèi)型的隊列中獲取消息,比如從 ZeroMQ 'subscribe' 套接字。 下面是一個(gè)例子:
class ZeroMQSocketListener(QueueListener):
def __init__(self, uri, /, *handlers, **kwargs):
self.ctx = kwargs.get('ctx') or zmq.Context()
socket = zmq.Socket(self.ctx, zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '') # subscribe to everything
socket.connect(uri)
super().__init__(socket, *handlers, **kwargs)
def dequeue(self):
msg = self.queue.recv_json()
return logging.makeLogRecord(msg)
參見(jiàn)
- 模塊
logging
日志記錄模塊的 API 參考。
logging.config
模塊日志記錄模塊的配置 API 。
logging.handlers
模塊日志記錄模塊附帶的有用處理器。
基于字典進(jìn)行日志配置的示例?
以下是日志配置字典的一個(gè)示例——它取自 Django 項目的`文檔<https://docs.djangoproject.com/en/stable/topics/logging/#configuring-logging>`_。此字典將被傳給 dictConfig()
以使配置生效:
LOGGING = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'verbose': {
'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
},
'simple': {
'format': '%(levelname)s %(message)s'
},
},
'filters': {
'special': {
'()': 'project.logging.SpecialFilter',
'foo': 'bar',
}
},
'handlers': {
'null': {
'level':'DEBUG',
'class':'django.utils.log.NullHandler',
},
'console':{
'level':'DEBUG',
'class':'logging.StreamHandler',
'formatter': 'simple'
},
'mail_admins': {
'level': 'ERROR',
'class': 'django.utils.log.AdminEmailHandler',
'filters': ['special']
}
},
'loggers': {
'django': {
'handlers':['null'],
'propagate': True,
'level':'INFO',
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
},
'myproject.custom': {
'handlers': ['console', 'mail_admins'],
'level': 'INFO',
'filters': ['special']
}
}
}
有關(guān)本配置的更多信息,請參閱 Django 文檔的 有關(guān)章節 。
利用 rotator 和 namer 自定義日志輪換操作?
以下代碼給出了定義 namer 和 rotator 的示例,其中演示了基于 zlib 的日志文件壓縮過(guò)程:
def namer(name):
return name + ".gz"
def rotator(source, dest):
with open(source, "rb") as sf:
data = sf.read()
compressed = zlib.compress(data, 9)
with open(dest, "wb") as df:
df.write(compressed)
os.remove(source)
rh = logging.handlers.RotatingFileHandler(...)
rh.rotator = rotator
rh.namer = namer
這些不是“真正的” .gz 文件,因為他們只是純壓縮數據,缺少真正 gzip 文件中的“容器”。此段代碼只是用于演示。
更加詳細的多道處理示例?
以下可運行的示例顯示了如何利用配置文件在多進(jìn)程中應用日志。這些配置相當簡(jiǎn)單,但足以說(shuō)明如何在真實(shí)的多進(jìn)程場(chǎng)景中實(shí)現較為復雜的配置。
上述示例中,主進(jìn)程產(chǎn)生一個(gè)偵聽(tīng)器進(jìn)程和一些工作進(jìn)程。每個(gè)主進(jìn)程、偵聽(tīng)器進(jìn)程和工作進(jìn)程都有三種獨立的日志配置(工作進(jìn)程共享同一套配置)。大家可以看到主進(jìn)程的日志記錄過(guò)程、工作線(xiàn)程向 QueueHandler 寫(xiě)入日志的過(guò)程,以及偵聽(tīng)器實(shí)現 QueueListener 和較為復雜的日志配置,如何將由隊列接收到的事件分發(fā)給配置指定的 handler。請注意,這些配置純粹用于演示,但應該能調整代碼以適用于自己的場(chǎng)景。
以下是代碼——但愿文檔字符串和注釋能有助于理解其工作原理:
import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time
class MyHandler:
"""
A simple handler for logging events. It runs in the listener process and
dispatches events to loggers based on the name in the received record,
which then get dispatched, by the logging system, to the handlers
configured for those loggers.
"""
def handle(self, record):
if record.name == "root":
logger = logging.getLogger()
else:
logger = logging.getLogger(record.name)
if logger.isEnabledFor(record.levelno):
# The process name is transformed just to show that it's the listener
# doing the logging to files and console
record.processName = '%s (for %s)' % (current_process().name, record.processName)
logger.handle(record)
def listener_process(q, stop_event, config):
"""
This could be done in the main process, but is just done in a separate
process for illustrative purposes.
This initialises logging according to the specified configuration,
starts the listener and waits for the main process to signal completion
via the event. The listener is then stopped, and the process exits.
"""
logging.config.dictConfig(config)
listener = logging.handlers.QueueListener(q, MyHandler())
listener.start()
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
stop_event.wait()
listener.stop()
def worker_process(config):
"""
A number of these are spawned for the purpose of illustration. In
practice, they could be a heterogeneous bunch of processes rather than
ones which are identical to each other.
This initialises logging according to the specified configuration,
and logs a hundred messages with random levels to randomly selected
loggers.
A small sleep is added to allow other processes a chance to run. This
is not strictly needed, but it mixes the output from the different
processes a bit more than if it's left out.
"""
logging.config.dictConfig(config)
levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL]
loggers = ['foo', 'foo.bar', 'foo.bar.baz',
'spam', 'spam.ham', 'spam.ham.eggs']
if os.name == 'posix':
# On POSIX, the setup logger will have been configured in the
# parent process, but should have been disabled following the
# dictConfig call.
# On Windows, since fork isn't used, the setup logger won't
# exist in the child, so it would be created and the message
# would appear - hence the "if posix" clause.
logger = logging.getLogger('setup')
logger.critical('Should not appear, because of disabled logger ...')
for i in range(100):
lvl = random.choice(levels)
logger = logging.getLogger(random.choice(loggers))
logger.log(lvl, 'Message no. %d', i)
time.sleep(0.01)
def main():
q = Queue()
# The main process gets a simple configuration which prints to the console.
config_initial = {
'version': 1,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'INFO'
}
},
'root': {
'handlers': ['console'],
'level': 'DEBUG'
}
}
# The worker process configuration is just a QueueHandler attached to the
# root logger, which allows all messages to be sent to the queue.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_worker = {
'version': 1,
'disable_existing_loggers': True,
'handlers': {
'queue': {
'class': 'logging.handlers.QueueHandler',
'queue': q
}
},
'root': {
'handlers': ['queue'],
'level': 'DEBUG'
}
}
# The listener process configuration shows that the full flexibility of
# logging configuration is available to dispatch events to handlers however
# you want.
# We disable existing loggers to disable the "setup" logger used in the
# parent process. This is needed on POSIX because the logger will
# be there in the child following a fork().
config_listener = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'detailed': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
},
'simple': {
'class': 'logging.Formatter',
'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'simple',
'level': 'INFO'
},
'file': {
'class': 'logging.FileHandler',
'filename': 'mplog.log',
'mode': 'w',
'formatter': 'detailed'
},
'foofile': {
'class': 'logging.FileHandler',
'filename': 'mplog-foo.log',
'mode': 'w',
'formatter': 'detailed'
},
'errors': {
'class': 'logging.FileHandler',
'filename': 'mplog-errors.log',
'mode': 'w',
'formatter': 'detailed',
'level': 'ERROR'
}
},
'loggers': {
'foo': {
'handlers': ['foofile']
}
},
'root': {
'handlers': ['console', 'file', 'errors'],
'level': 'DEBUG'
}
}
# Log some initial events, just to show that logging in the parent works
# normally.
logging.config.dictConfig(config_initial)
logger = logging.getLogger('setup')
logger.info('About to create workers ...')
workers = []
for i in range(5):
wp = Process(target=worker_process, name='worker %d' % (i + 1),
args=(config_worker,))
workers.append(wp)
wp.start()
logger.info('Started worker: %s', wp.name)
logger.info('About to create listener ...')
stop_event = Event()
lp = Process(target=listener_process, name='listener',
args=(q, stop_event, config_listener))
lp.start()
logger.info('Started listener')
# We now hang around for the workers to finish their work.
for wp in workers:
wp.join()
# Workers all done, listening can now stop.
# Logging in the parent still works normally.
logger.info('Telling listener to stop ...')
stop_event.set()
lp.join()
logger.info('All done.')
if __name__ == '__main__':
main()
在發(fā)送給 SysLogHandler 的信息中插入一個(gè) BOM。?
RFC 5424 要求,Unicode 信息應采用字節流形式發(fā)送到系統 syslog 守護程序,字節流結構如下所示:可選的純 ASCII部分,后跟 UTF-8 字節序標記(BOM),然后是采用 UTF-8 編碼的 Unicode。(參見(jiàn) 相關(guān)規范 。)
在 Python 3.1 的 SysLogHandler
中,已加入了在日志信息中插入 BOM 的代碼,但不幸的是,代碼并不正確,BOM 出現在了日志信息的開(kāi)頭,因此在它之前就不允許出現純 ASCII 內容了。
由于無(wú)法正常工作, Python 3.2.4 以上版本已刪除了出錯的插入 BOM 代碼。但已有版本的代碼不會(huì )被替換,若要生成與 RFC 5424 兼容的日志信息,包括一個(gè) BOM 符,前面有可選的純 ASCII 字節流,后面為 UTF-8 編碼的任意 Unicode,那么 需要執行以下操作:
為
SysLogHandler
實(shí)例串上一個(gè)Formatter
實(shí)例,格式串可如下:'ASCII section\ufeffUnicode section'
用 UTF-8 編碼時(shí),Unicode 碼位 U+FEFF 將會(huì )編碼為 UTF-8 BOM——字節串
b'\xef\xbb\xbf'
。用任意占位符替換 ASCII 部分,但要保證替換之后的數據一定是 ASCII 碼(這樣在 UTF-8 編碼后就會(huì )維持不變)。
用任意占位符替換 Unicode 部分;如果替換后的數據包含超出 ASCII 范圍的字符,沒(méi)問(wèn)題——他們將用 UTF-8 進(jìn)行編碼。
SysLogHandler
將 對格式化后的日志信息進(jìn)行 UTF-8 編碼。如果遵循上述規則,應能生成符合 RFC 5424 的日志信息。否則,日志記錄過(guò)程可能不會(huì )有什么反饋,但日志信息將不與 RFC 5424 兼容,syslog 守護程序可能會(huì )有出錯反應。
結構化日志的實(shí)現代碼?
大多數日志信息是供人閱讀的,所以機器解析起來(lái)并不容易,但某些時(shí)候可能希望以結構化的格式輸出,以 能夠 被程序解析(無(wú)需用到復雜的正則表達式)。這可以直接用 logging 包實(shí)現。實(shí)現方式有很多,以下是一種比較簡(jiǎn)單的方案,利用 JSON 以機器可解析的方式對事件信息進(jìn)行序列化:
import json
import logging
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
return '%s >>> %s' % (self.message, json.dumps(self.kwargs))
_ = StructuredMessage # optional, to improve readability
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))
上述代碼運行后的結果是:
message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}
請注意,根據 Python 版本的不同,各項數據的輸出順序可能會(huì )不一樣。
若需進(jìn)行更為定制化的處理,可以使用自定義 JSON 編碼對象,下面給出完整示例:
import json
import logging
class Encoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, set):
return tuple(o)
elif isinstance(o, str):
return o.encode('unicode_escape').decode('ascii')
return super().default(o)
class StructuredMessage:
def __init__(self, message, /, **kwargs):
self.message = message
self.kwargs = kwargs
def __str__(self):
s = Encoder().encode(self.kwargs)
return '%s >>> %s' % (self.message, s)
_ = StructuredMessage # optional, to improve readability
def main():
logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))
if __name__ == '__main__':
main()
上述代碼運行后的結果是:
message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}
請注意,根據 Python 版本的不同,各項數據的輸出順序可能會(huì )不一樣。
利用 dictConfig()
自定義 handler?
有時(shí)需要以特定方式自定義日志 handler,如果采用 dictConfig()
,可能無(wú)需生成子類(lèi)就可以做到。比如要設置日志文件的所有權。在 POSIX 上,可以利用 shutil.chown()
輕松完成,但 stdlib 中的文件 handler 并不提供內置支持。于是可以用普通函數自定義 handler 的創(chuàng )建,例如:
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
然后,你可以在傳給 dictConfig()
的日志配置中指定通過(guò)調用此函數來(lái)創(chuàng )建日志處理程序:
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
出于演示目的,以下示例設置用戶(hù)和用戶(hù)組為 pulse
。代碼置于一個(gè)可運行的腳本文件 chowntest.py
中:
import logging, logging.config, os, shutil
def owned_file_handler(filename, mode='a', encoding=None, owner=None):
if owner:
if not os.path.exists(filename):
open(filename, 'a').close()
shutil.chown(filename, *owner)
return logging.FileHandler(filename, mode, encoding)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'default': {
'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
},
},
'handlers': {
'file':{
# The values below are popped from this dictionary and
# used to create the handler, set the handler's level and
# its formatter.
'()': owned_file_handler,
'level':'DEBUG',
'formatter': 'default',
# The values below are passed to the handler creator callable
# as keyword arguments.
'owner': ['pulse', 'pulse'],
'filename': 'chowntest.log',
'mode': 'w',
'encoding': 'utf-8',
},
},
'root': {
'handlers': ['file'],
'level': 'DEBUG',
},
}
logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')
可能需要 root
權限才能運行:
$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log
請注意此示例用的是 Python 3.3,因為 shutil.chown()
是從此版本開(kāi)始出現的。 此方式應當適用于任何支持 dictConfig()
的 Python 版本 —— 例如 Python 2.7, 3.2 或更新的版本。 對于 3.3 之前的版本,你應當使用 os.chown()
之類(lèi)的函數來(lái)實(shí)現實(shí)際的所有權修改。
實(shí)際應用中,handler 的創(chuàng )建函數可能位于項目的工具模塊中。以下配置:
'()': owned_file_handler,
應使用:
'()': 'ext://project.util.owned_file_handler',
這里的 project.util
可以換成函數所在包的實(shí)際名稱(chēng)。 在上述的可用腳本中,應該可以使用 'ext://__main__.owned_file_handler'
。 在這里,實(shí)際的可調用對象是由 dictConfig()
從 ext://
說(shuō)明中解析出來(lái)的。
上述示例還指明了其他的文件修改類(lèi)型的實(shí)現方案 —— 比如同樣利用 os.chmod()
設置 POSIX 訪(fǎng)問(wèn)權限位。
當然,以上做法也可以擴展到 FileHandler
之外的其他類(lèi)型的 handler ——比如某個(gè)輪換文件 handler,或類(lèi)型完全不同的其他 handler。
生效于整個(gè)應用程序的格式化樣式?
在 Python 3.2 中,Formatter
增加了一個(gè) style
關(guān)鍵字形參,它默認為 %
以便向下兼容,但是允許采用 {
或 {TX-PL-LABEL}#x60;
來(lái)支持 str.format()
和 string.Template
所支持的格式化方式。 請注意此形參控制著(zhù)用用于最終輸出到日志的日志消息格式,并且與單獨日志消息的構造方式完全無(wú)關(guān)。
日志函數(debug()
, info()
等)只會(huì )讀取位置參數獲取日志信息本身,而關(guān)鍵字參數僅用于確定日志函數的工作選項(比如關(guān)鍵字參數 exc_info
表示應將跟蹤信息記入日志,關(guān)鍵字參數 extra
則給出了需加入日志的額外上下文信息)。所以不能直接使用 str.format()
或 string.Template
這種語(yǔ)法進(jìn)行日志調用,因為日志包在內部使用 %-f 格式來(lái)合并格式串和可變參數。因為尚需保持向下兼容,這一點(diǎn)不會(huì )改變,已有代碼中的所有日志調用都將采用 %-f 格式串。
有人建議將格式化樣式與特定的日志對象進(jìn)行關(guān)聯(lián),但其實(shí)也會(huì )遇到向下兼容的問(wèn)題,因為已有代碼可能用到了某日志對象并采用了 %-f 格式串。
為了讓第三方庫和自編代碼都能夠交互使用日志功能,需要決定在單次日志記錄調用級別采用什么格式。于是就出現了其他幾種格式化樣式方案。
LogRecord 工廠(chǎng)的用法?
在 Python 3.2 中,伴隨著(zhù) Formatter
的上述變化,logging 包增加了允許用戶(hù)使用 setLogRecordFactory()
函數來(lái)。設置自己的 LogRecord
子類(lèi)的功能。 你可以使用此功能來(lái)設置自己的 LogRecord
子類(lèi),它會(huì )通過(guò)重載 getMessage()
方法來(lái)完成適當的操作。 msg % args
格式化是在此方法的基類(lèi)實(shí)現中進(jìn)行的,你可以在那里用你自己的格式化操作來(lái)替換;但是,你應當注意要支持全部的格式化樣式并允許將 %-formatting 作為默認樣式,以確保與其他代碼進(jìn)行配合。 還應當注意調用 str(self.msg)
,正如基類(lèi)實(shí)現所做的一樣。
更多信息請參閱 setLogRecordFactory()
和 LogRecord
的參考文檔。
自定義信息對象的使用?
另一種方案可能更為簡(jiǎn)單,可以利用 {}- 和 $- 格式構建自己的日志消息。大家或許還記得(來(lái)自 使用任意對象作為消息),可以用任意對象作為日志信息的格式串,日志包將調用該對象上 str()
獲取實(shí)際的格式串??聪乱韵聝蓚€(gè)類(lèi):
class BraceMessage:
def __init__(self, fmt, /, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self):
return self.fmt.format(*self.args, **self.kwargs)
class DollarMessage:
def __init__(self, fmt, /, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
def __str__(self):
from string import Template
return Template(self.fmt).substitute(**self.kwargs)
以上兩個(gè)類(lèi)均都可用于替代格式串,以便用 {}- 或 $-formatting 構建實(shí)際的“日志信息”部分,此部分將出現在格式化后的日志輸出中,替換 %(message)s 、“{message}”或“$message”。每次要寫(xiě)入日志時(shí)都使用類(lèi)名,如果覺(jué)得使用不便,可以采用 M
或 _
之類(lèi)的別名(如果將 _
用于本地化操作,則可用 __
)。
下面給出示例。 首先用 str.format()
進(jìn)行格式化:
>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)
然后,用 string.Template
格式化:
>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>
值得注意的是,上述做法對性能并沒(méi)什么影響:格式化過(guò)程其實(shí)不是在日志調用時(shí)發(fā)生的,而是在日志信息即將由 handler 輸出到日志時(shí)發(fā)生。因此,唯一可能讓人困惑的稍不尋常的地方,就是包裹在格式串和參數外面的括號,而不是格式串。因為 __ 符號只是對 XXXMessage
類(lèi)的構造函數調用的語(yǔ)法糖。
利用 dictConfig()
定義過(guò)濾器?
用 dictConfig()
可以 對日志過(guò)濾器進(jìn)行設置,盡管乍一看做法并不明顯(所以才需要本秘籍)。 由于 Filter
是標準庫中唯一的日志過(guò)濾器類(lèi),不太可能滿(mǎn)足眾多的要求(它只是作為基類(lèi)存在),通常需要定義自己的 Filter
子類(lèi),并重寫(xiě) filter()
方法。為此,請在過(guò)濾器的配置字典中設置 ()
鍵,指定要用于創(chuàng )建過(guò)濾器的可調用對象(最明顯可用的就是給出一個(gè)類(lèi),但也可以提供任何一個(gè)可調用對象,只要能返回 Filter
實(shí)例即可)。下面是一個(gè)完整的例子:
import logging
import logging.config
import sys
class MyFilter(logging.Filter):
def __init__(self, param=None):
self.param = param
def filter(self, record):
if self.param is None:
allow = True
else:
allow = self.param not in record.msg
if allow:
record.msg = 'changed: ' + record.msg
return allow
LOGGING = {
'version': 1,
'filters': {
'myfilter': {
'()': MyFilter,
'param': 'noshow',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'filters': ['myfilter']
}
},
'root': {
'level': 'DEBUG',
'handlers': ['console']
},
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.debug('hello')
logging.debug('hello - noshow')
以上示例展示了將配置數據傳給構造實(shí)例的可調用對象,形式是關(guān)鍵字參數。運行后將會(huì )輸出:
changed: hello
這說(shuō)明過(guò)濾器按照配置的參數生效了。
需要額外注意的地方:
如果在配置中無(wú)法直接引用可調用對象(比如位于不同的模塊中,并且不能在配置字典所在的位置直接導入),則可以采用
ext://...
的形式,正如 訪(fǎng)問(wèn)外部對象 所述。例如,在上述示例中可以使用文本'ext://__main__.MyFilter'
而不是MyFilter
對象。與過(guò)濾器一樣,上述技術(shù)還可用于配置自定義 handler 和格式化對象。有關(guān)如何在日志配置中使用用戶(hù)自定義對象的更多信息,請參閱 用戶(hù)定義對象,以及上述 利用 dictConfig() 自定義 handler 的其他指南。
異常信息的自定義格式化?
有時(shí)可能需要設置自定義的異常信息格式——考慮到會(huì )用到參數,假定要讓每條日志事件只占一行,即便存在異常信息也一樣。這可以用自定義格式化類(lèi)來(lái)實(shí)現,如下所示:
import logging
class OneLineExceptionFormatter(logging.Formatter):
def formatException(self, exc_info):
"""
Format an exception so that it prints on a single line.
"""
result = super().formatException(exc_info)
return repr(result) # or format into one line however you want to
def format(self, record):
s = super().format(record)
if record.exc_text:
s = s.replace('\n', '') + '|'
return s
def configure_logging():
fh = logging.FileHandler('output.txt', 'w')
f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
'%d/%m/%Y %H:%M:%S')
fh.setFormatter(f)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(fh)
def main():
configure_logging()
logging.info('Sample message')
try:
x = 1 / 0
except ZeroDivisionError as e:
logging.exception('ZeroDivisionError: %s', e)
if __name__ == '__main__':
main()
運行后將會(huì )生成只有兩行信息的文件:
28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n File "logtest7.py", line 30, in main\n x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|
雖然上述處理方式很簡(jiǎn)單,但也給出了根據喜好對異常信息進(jìn)行格式化輸出的方案?;蛟S traceback
模塊能滿(mǎn)足更專(zhuān)門(mén)的需求。
語(yǔ)音播報日志信息?
有時(shí)可能需要以聲音的形式呈現日志消息。如果系統自帶了文本轉語(yǔ)音 (TTS)功能,即便沒(méi)與 Python 關(guān)聯(lián)也很容易做到。大多數 TTS 系統都有一個(gè)可運行的命令行程序,在 handler 中可以用 subprocess
進(jìn)行調用。這里假定 TTS 命令行程序不會(huì )與用戶(hù)交互,或需要很長(cháng)時(shí)間才會(huì )執行完畢,寫(xiě)入日志的信息也不會(huì )多到影響用戶(hù)查看,并且可以接受每次播報一條信息,以下示例實(shí)現了等一條信息播完再處理下一條,可能會(huì )導致其他 handler 的等待。這個(gè)簡(jiǎn)短示例僅供演示,假定 espeak
TTS 包已就緒:
import logging
import subprocess
import sys
class TTSHandler(logging.Handler):
def emit(self, record):
msg = self.format(record)
# Speak slowly in a female English voice
cmd = ['espeak', '-s150', '-ven+f3', msg]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# wait for the program to finish
p.communicate()
def configure_logging():
h = TTSHandler()
root = logging.getLogger()
root.addHandler(h)
# the default formatter just returns the message
root.setLevel(logging.DEBUG)
def main():
logging.info('Hello')
logging.debug('Goodbye')
if __name__ == '__main__':
configure_logging()
sys.exit(main())
運行后將會(huì )以女聲播報“Hello”和“Goodbye”。
當然,上述方案也適用于其他 TTS 系統,甚至可以通過(guò)利用命令行運行的外部程序來(lái)處理消息。
緩沖日志消息并有條件地輸出它們?
在某些情況下,你可能希望在臨時(shí)區域中記錄日志消息,并且只在發(fā)生某種特定的情況下才輸出它們。 例如,你可能希望起始在函數中記錄調試事件,如果函數執行完成且沒(méi)有錯誤,你不希望輸出收集的調試信息以避免造成日志混亂,但如果出現錯誤,那么你希望所有調試以及錯誤消息被輸出。
下面是一個(gè)示例,展示如何在你的日志記錄函數上使用裝飾器以實(shí)現這一功能。該示例使用 logging.handlers.MemoryHandler
,它允許緩沖已記錄的事件直到某些條件發(fā)生,緩沖的事件才會(huì )被刷新(flushed
) - 傳遞給另一個(gè)處理程序( target
handler)進(jìn)行處理。 默認情況下, MemoryHandler
在其緩沖區被填滿(mǎn)時(shí)被刷新,或者看到一個(gè)級別大于或等于指定閾值的事件。 如果想要自定義刷新行為,你可以通過(guò)更專(zhuān)業(yè)的 MemoryHandler
子類(lèi)來(lái)使用這個(gè)秘訣。
這個(gè)示例腳本有一個(gè)簡(jiǎn)單的函數 foo
,它只是在所有的日志級別中循環(huán)運行,寫(xiě)到 sys.stderr
,說(shuō)明它要記錄在哪個(gè)級別上,然后在這個(gè)級別上實(shí)際記錄一個(gè)消息。你可以給 foo
傳遞一個(gè)參數,如果為 true ,它將在ERROR和CRITICAL級別記錄,否則,它只在DEBUG、INFO和WARNING級別記錄。
腳本只是使用了一個(gè)裝飾器來(lái)裝飾 foo
,這個(gè)裝飾器將記錄執行所需的條件。裝飾器使用一個(gè)記錄器作為參數,并在調用被裝飾的函數期間附加一個(gè)內存處理程序。裝飾器可以使用目標處理程序、記錄級別和緩沖區的容量(緩沖記錄的數量)來(lái)附加參數。這些參數分別默認為寫(xiě)入``sys.stderr`` 的 StreamHandler
, logging.ERROR
和 100
。
以下是腳本:
import logging
from logging.handlers import MemoryHandler
import sys
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
if target_handler is None:
target_handler = logging.StreamHandler()
if flush_level is None:
flush_level = logging.ERROR
if capacity is None:
capacity = 100
handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)
def decorator(fn):
def wrapper(*args, **kwargs):
logger.addHandler(handler)
try:
return fn(*args, **kwargs)
except Exception:
logger.exception('call failed')
raise
finally:
super(MemoryHandler, handler).flush()
logger.removeHandler(handler)
return wrapper
return decorator
def write_line(s):
sys.stderr.write('%s\n' % s)
def foo(fail=False):
write_line('about to log at DEBUG ...')
logger.debug('Actually logged at DEBUG')
write_line('about to log at INFO ...')
logger.info('Actually logged at INFO')
write_line('about to log at WARNING ...')
logger.warning('Actually logged at WARNING')
if fail:
write_line('about to log at ERROR ...')
logger.error('Actually logged at ERROR')
write_line('about to log at CRITICAL ...')
logger.critical('Actually logged at CRITICAL')
return fail
decorated_foo = log_if_errors(logger)(foo)
if __name__ == '__main__':
logger.setLevel(logging.DEBUG)
write_line('Calling undecorated foo with False')
assert not foo(False)
write_line('Calling undecorated foo with True')
assert foo(True)
write_line('Calling decorated foo with False')
assert not decorated_foo(False)
write_line('Calling decorated foo with True')
assert decorated_foo(True)
運行此腳本時(shí),應看到以下輸出:
Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL
如你所見(jiàn),實(shí)際日志記錄輸出僅在消息等級為ERROR或更高的事件時(shí)發(fā)生,但在這種情況下,任何之前較低消息等級的事件還會(huì )被記錄。
你當然可以使用傳統的裝飾方法:
@log_if_errors(logger)
def foo(fail=False):
...
通過(guò)配置使用UTC (GMT) 格式化時(shí)間?
有時(shí)候,你希望使用UTC來(lái)格式化時(shí)間,這可以通過(guò)使用一個(gè)類(lèi)來(lái)實(shí)現,例如`UTCFormatter`,如下所示:
import logging
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
然后你可以在你的代碼中使用 UTCFormatter
,而不是 Formatter
。 如果你想通過(guò)配置來(lái)實(shí)現這一功能,你可以使用 dictConfig()
API 來(lái)完成,該方法在以下完整示例中展示:
import logging
import logging.config
import time
class UTCFormatter(logging.Formatter):
converter = time.gmtime
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'utc': {
'()': UTCFormatter,
'format': '%(asctime)s %(message)s',
},
'local': {
'format': '%(asctime)s %(message)s',
}
},
'handlers': {
'console1': {
'class': 'logging.StreamHandler',
'formatter': 'utc',
},
'console2': {
'class': 'logging.StreamHandler',
'formatter': 'local',
},
},
'root': {
'handlers': ['console1', 'console2'],
}
}
if __name__ == '__main__':
logging.config.dictConfig(LOGGING)
logging.warning('The local time is %s', time.asctime())
腳本會(huì )運行輸出類(lèi)似下面的內容:
2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015
展示了如何將時(shí)間格式化為本地時(shí)間和UTC兩種形式,其中每種形式對應一個(gè)日志處理器 。
使用上下文管理器的可選的日志記錄?
有時(shí)候,我們需要暫時(shí)更改日志配置,并在執行某些操作后將其還原。為此,上下文管理器是實(shí)現保存和恢復日志上下文的最明顯的方式。這是一個(gè)關(guān)于上下文管理器的簡(jiǎn)單例子,它允許你在上下文管理器的作用域內更改日志記錄等級以及增加日志處理器:
import logging
import sys
class LoggingContext:
def __init__(self, logger, level=None, handler=None, close=True):
self.logger = logger
self.level = level
self.handler = handler
self.close = close
def __enter__(self):
if self.level is not None:
self.old_level = self.logger.level
self.logger.setLevel(self.level)
if self.handler:
self.logger.addHandler(self.handler)
def __exit__(self, et, ev, tb):
if self.level is not None:
self.logger.setLevel(self.old_level)
if self.handler:
self.logger.removeHandler(self.handler)
if self.handler and self.close:
self.handler.close()
# implicit return of None => don't swallow exceptions
如果指定上下文管理器的日志記錄等級屬性,則在上下文管理器的with語(yǔ)句所涵蓋的代碼中,日志記錄器的記錄等級將臨時(shí)設置為上下文管理器所配置的日志記錄等級。 如果指定上下文管理的日志處理器屬性,則該句柄在進(jìn)入上下文管理器的上下文時(shí)添加到記錄器中,并在退出時(shí)被刪除。 如果你再也不需要該日志處理器時(shí),你可以讓上下文管理器在退出上下文管理器的上下文時(shí)關(guān)閉它。
為了說(shuō)明它是如何工作的,我們可以在上面添加以下代碼塊:
if __name__ == '__main__':
logger = logging.getLogger('foo')
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
logger.info('1. This should appear just once on stderr.')
logger.debug('2. This should not appear.')
with LoggingContext(logger, level=logging.DEBUG):
logger.debug('3. This should appear once on stderr.')
logger.debug('4. This should not appear.')
h = logging.StreamHandler(sys.stdout)
with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
logger.debug('5. This should appear twice - once on stderr and once on stdout.')
logger.info('6. This should appear just once on stderr.')
logger.debug('7. This should not appear.')
我們最初設置日志記錄器的消息等級為 INFO
,因此消息#1出現,消息#2沒(méi)有出現。在接下來(lái)的 with``代碼塊中我們暫時(shí)將消息等級變更為 ``DEBUG
,從而消息 #3 出現。在這一代碼塊退出后,日志記錄器的消息等級恢復為 INFO
,從而消息 #4 沒(méi)有出現。在下一個(gè) with
代碼塊中,我們再一次將設置消息等級設置為 DEBUG
,同時(shí)添加一個(gè)將消息寫(xiě)入 sys.stdout
的日志處理器。因此,消息#5在控制臺出現兩次 (分別通過(guò) stderr
和 stdout
)。在 with
語(yǔ)句完成后,狀態(tài)與之前一樣,因此消息 #6 出現(類(lèi)似消息 #1),而消息 #7 沒(méi)有出現(類(lèi)似消息 #2)。
如果我們運行生成的腳本,結果如下:
$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
我們將``stderr``標準錯誤重定向到``/dev/null``,我再次運行生成的腳步,唯一被寫(xiě)入``stdout``標準輸出的消息,即我們所能看見(jiàn)的消息,如下:
$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.
再一次,將 stdout
標準輸出重定向到 /dev/null
,我獲得如下結果:
$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.
在這種情況下,與預期一致,打印到 stdout
標準輸出的消息#5不會(huì )出現。
當然,這里描述的方法可以被推廣,例如臨時(shí)附加日志記錄過(guò)濾器。 請注意,上面的代碼適用于Python 2以及Python 3。
命令行日志應用起步?
下面的示例提供了如下功能:
根據命令行參數確定日志級別
在單獨的文件中分發(fā)多條子命令,同一級別的子命令均以一致的方式記錄。
最簡(jiǎn)單的配置用法
假定有一個(gè)命令行應用程序,用于停止、啟動(dòng)或重新啟動(dòng)某些服務(wù)。為了便于演示,不妨將 app.py
作為應用程序的主代碼文件,并在 start.py
、 stop.py``和 ``restart.py
中實(shí)現單獨的命令。再假定要通過(guò)命令行參數控制應用程序的日志粒度,默認為 logging.INFO
。以下是 app.py
的一個(gè)示例:
import argparse
import importlib
import logging
import os
import sys
def main(args=None):
scriptname = os.path.basename(__file__)
parser = argparse.ArgumentParser(scriptname)
levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
parser.add_argument('--log-level', default='INFO', choices=levels)
subparsers = parser.add_subparsers(dest='command',
help='Available commands:')
start_cmd = subparsers.add_parser('start', help='Start a service')
start_cmd.add_argument('name', metavar='NAME',
help='Name of service to start')
stop_cmd = subparsers.add_parser('stop',
help='Stop one or more services')
stop_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to stop')
restart_cmd = subparsers.add_parser('restart',
help='Restart one or more services')
restart_cmd.add_argument('names', metavar='NAME', nargs='+',
help='Name of service to restart')
options = parser.parse_args()
# the code to dispatch commands could all be in this file. For the purposes
# of illustration only, we implement each command in a separate module.
try:
mod = importlib.import_module(options.command)
cmd = getattr(mod, 'command')
except (ImportError, AttributeError):
print('Unable to find the code for command \'%s\'' % options.command)
return 1
# Could get fancy here and load configuration from file or dictionary
logging.basicConfig(level=options.log_level,
format='%(levelname)s %(name)s %(message)s')
cmd(options)
if __name__ == '__main__':
sys.exit(main())
start
、stop
和 restart
命令可以在單獨的模塊中實(shí)現,啟動(dòng)命令的代碼可如下:
# start.py
import logging
logger = logging.getLogger(__name__)
def command(options):
logger.debug('About to start %s', options.name)
# actually do the command processing here ...
logger.info('Started the \'%s\' service.', options.name)
然后是停止命令的代碼:
# stop.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to stop %s', services)
# actually do the command processing here ...
logger.info('Stopped the %s service%s.', services, plural)
重啟命令類(lèi)似:
# restart.py
import logging
logger = logging.getLogger(__name__)
def command(options):
n = len(options.names)
if n == 1:
plural = ''
services = '\'%s\'' % options.names[0]
else:
plural = 's'
services = ', '.join('\'%s\'' % name for name in options.names)
i = services.rfind(', ')
services = services[:i] + ' and ' + services[i + 2:]
logger.debug('About to restart %s', services)
# actually do the command processing here ...
logger.info('Restarted the %s service%s.', services, plural)
如果以默認日志級別運行該程序,會(huì )得到以下結果:
$ python app.py start foo
INFO start Started the 'foo' service.
$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
第一個(gè)單詞是日志級別,第二個(gè)單詞是日志事件所在的模塊或包的名稱(chēng)。
如果修改了日志級別,發(fā)送給日志的信息就能得以改變。如要顯示更多信息,則可:
$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.
$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.
$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.
若要顯示的信息少一些,則:
$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz
這里的命令不會(huì )向控制臺輸出任何信息,因為沒(méi)有記錄 WARNING
以上級別的日志。
Qt GUI 日志示例?
GUI 應用程序如何記錄日志,這是個(gè)常見(jiàn)的問(wèn)題。 Qt 框架是一個(gè)流行的跨平臺 UI 框架,采用的是 PySide2 或 PyQt5 庫。
下面的例子演示了將日志寫(xiě)入 Qt GUI 程序的過(guò)程。這里引入了一個(gè)簡(jiǎn)單的 QtHandler
類(lèi),參數是一個(gè)可調用對象,其應為嵌入主線(xiàn)程某個(gè)“槽位”中運行的,因為GUI 的更新由主線(xiàn)程完成。這里還創(chuàng )建了一個(gè)工作線(xiàn)程,以便演示由 UI(通過(guò)人工點(diǎn)擊日志按鈕)和后臺工作線(xiàn)程(此處只是記錄級別和時(shí)間間隔均隨機生成的日志信息)將日志寫(xiě)入 GUI 的過(guò)程。
該工作線(xiàn)程是用 Qt 的 QThread
類(lèi)實(shí)現的,而不是 threading
模塊,因為某些情況下只能采用 `QThread
,它與其他 Qt
組件的集成性更好一些。
以下代碼應能適用于最新版的 PySide2
或 PyQt5
。對于低版本的 Qt 應該也能適用。更多詳情,請參閱代碼注釋。
import datetime
import logging
import random
import sys
import time
# Deal with minor differences between PySide2 and PyQt5
try:
from PySide2 import QtCore, QtGui, QtWidgets
Signal = QtCore.Signal
Slot = QtCore.Slot
except ImportError:
from PyQt5 import QtCore, QtGui, QtWidgets
Signal = QtCore.pyqtSignal
Slot = QtCore.pyqtSlot
logger = logging.getLogger(__name__)
#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
signal = Signal(str, logging.LogRecord)
#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
def __init__(self, slotfunc, *args, **kwargs):
super().__init__(*args, **kwargs)
self.signaller = Signaller()
self.signaller.signal.connect(slotfunc)
def emit(self, record):
s = self.format(record)
self.signaller.signal.emit(s, record)
#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
return QtCore.QThread.currentThread().objectName()
#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
@Slot()
def start(self):
extra = {'qThreadName': ctname() }
logger.debug('Started work', extra=extra)
i = 1
# Let the thread run until interrupted. This allows reasonably clean
# thread termination.
while not QtCore.QThread.currentThread().isInterruptionRequested():
delay = 0.5 + random.random() * 2
time.sleep(delay)
level = random.choice(LEVELS)
logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
i += 1
#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):
COLORS = {
logging.DEBUG: 'black',
logging.INFO: 'blue',
logging.WARNING: 'orange',
logging.ERROR: 'red',
logging.CRITICAL: 'purple',
}
def __init__(self, app):
super().__init__()
self.app = app
self.textedit = te = QtWidgets.QPlainTextEdit(self)
# Set whatever the default monospace font is for the platform
f = QtGui.QFont('nosuchfont')
f.setStyleHint(f.Monospace)
te.setFont(f)
te.setReadOnly(True)
PB = QtWidgets.QPushButton
self.work_button = PB('Start background work', self)
self.log_button = PB('Log a message at a random level', self)
self.clear_button = PB('Clear log window', self)
self.handler = h = QtHandler(self.update_status)
# Remember to use qThreadName rather than threadName in the format string.
fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
formatter = logging.Formatter(fs)
h.setFormatter(formatter)
logger.addHandler(h)
# Set up to terminate the QThread when we exit
app.aboutToQuit.connect(self.force_quit)
# Lay out all the widgets
layout = QtWidgets.QVBoxLayout(self)
layout.addWidget(te)
layout.addWidget(self.work_button)
layout.addWidget(self.log_button)
layout.addWidget(self.clear_button)
self.setFixedSize(900, 400)
# Connect the non-worker slots and signals
self.log_button.clicked.connect(self.manual_update)
self.clear_button.clicked.connect(self.clear_display)
# Start a new worker thread and connect the slots for the worker
self.start_thread()
self.work_button.clicked.connect(self.worker.start)
# Once started, the button should be disabled
self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))
def start_thread(self):
self.worker = Worker()
self.worker_thread = QtCore.QThread()
self.worker.setObjectName('Worker')
self.worker_thread.setObjectName('WorkerThread') # for qThreadName
self.worker.moveToThread(self.worker_thread)
# This will start an event loop in the worker thread
self.worker_thread.start()
def kill_thread(self):
# Just tell the worker to stop, then tell it to quit and wait for that
# to happen
self.worker_thread.requestInterruption()
if self.worker_thread.isRunning():
self.worker_thread.quit()
self.worker_thread.wait()
else:
print('worker has already exited.')
def force_quit(self):
# For use when the window is closed
if self.worker_thread.isRunning():
self.kill_thread()
# The functions below update the UI and run in the main thread because
# that's where the slots are set up
@Slot(str, logging.LogRecord)
def update_status(self, status, record):
color = self.COLORS.get(record.levelno, 'black')
s = '<pre><font color="%s">%s</font></pre>' % (color, status)
self.textedit.appendHtml(s)
@Slot()
def manual_update(self):
# This function uses the formatted message passed in, but also uses
# information from the record to format the message in an appropriate
# color according to its severity (level).
level = random.choice(LEVELS)
extra = {'qThreadName': ctname() }
logger.log(level, 'Manually logged!', extra=extra)
@Slot()
def clear_display(self):
self.textedit.clear()
def main():
QtCore.QThread.currentThread().setObjectName('MainThread')
logging.getLogger().setLevel(logging.DEBUG)
app = QtWidgets.QApplication(sys.argv)
example = Window(app)
example.show()
sys.exit(app.exec_())
if __name__=='__main__':
main()
理應避免的用法?
前幾節雖介紹了幾種方案,描述了可能需要處理的操作,但值得一提的是,有些用法是 沒(méi)有好處 的,大多數情況下應該避免使用。下面幾節的順序不分先后。
多次打開(kāi)同一個(gè)日志文件?
因會(huì )導致 "文件被其他進(jìn)程占用 "錯誤,所以在 Windows 中一般無(wú)法多次打開(kāi)同一個(gè)文件。但在 POSIX 平臺中,多次打開(kāi)同一個(gè)文件不會(huì )報任何錯誤。這種操作可能是意外發(fā)生的,比如:
多次添加指向同一文件的 handler(比如通過(guò)復制/粘貼,或忘記修改)。
打開(kāi)兩個(gè)貌似不同(文件名不一樣)的文件,但一個(gè)是另一個(gè)的符號鏈接,所以其實(shí)是同一個(gè)文件。
進(jìn)程 fork,然后父進(jìn)程和子進(jìn)程都有對同一文件的引用。 例如,這可能是通過(guò)使用
multiprocessing
模塊實(shí)現的。
在大多數情況下,多次打開(kāi)同一個(gè)文件 貌似 一切正常,但實(shí)際會(huì )導致很多問(wèn)題。
由于多個(gè)線(xiàn)程或進(jìn)程會(huì )嘗試寫(xiě)入同一個(gè)文件,日志輸出可能會(huì )出現亂碼。盡管日志對象可以防止多個(gè)線(xiàn)程同時(shí)使用同一個(gè) handler 實(shí)例,但如果兩個(gè)不同的線(xiàn)程使用不同的 handler 實(shí)例同時(shí)寫(xiě)入文件,而這兩個(gè) handler 又恰好指向同一個(gè)文件,那么就失去了這種防護。
An attempt to delete a file (e.g. during file rotation) silently fails, because there is another reference pointing to it. This can lead to confusion and wasted debugging time - log entries end up in unexpected places, or are lost altogether. Or a file that was supposed to be moved remains in place, and grows in size unexpectedly despite size-based rotation being supposedly in place.
請用 從多個(gè)進(jìn)程記錄至單個(gè)文件 中介紹的技術(shù)來(lái)避免上述問(wèn)題。
將日志對象用作屬性或傳遞參數?
雖然特殊情況下可能有必要如此,但一般來(lái)說(shuō)沒(méi)有意義,因為日志是單實(shí)例對象。代碼總是可以通過(guò) logging.getLogger(name)
用名稱(chēng)訪(fǎng)問(wèn)一個(gè)已有的日志對象實(shí)例,因此將實(shí)例作為參數來(lái)傳遞,或作為屬性留存,都是毫無(wú)意義的。請注意,在其他語(yǔ)言中,如 Java 和 C#,日志對象通常是靜態(tài)類(lèi)屬性。但在 Python 中是沒(méi)有意義的,因為軟件拆分的單位是模塊(而不是類(lèi))。
給日志庫代碼添加 NullHandler
之外的其他 handler?
通過(guò)添加 handler、formatter 和 filter 來(lái)配置日志,這是應用程序開(kāi)發(fā)人員的責任,而不是庫開(kāi)發(fā)人員該做的。如果正在維護一個(gè)庫,請確保不要向任何日志對象添加 NullHandler
實(shí)例以外的 handler。
創(chuàng )建大量的日志對象?
日志是單實(shí)例對象,在代碼執行過(guò)程中不會(huì )被釋放,因此創(chuàng )建大量的日志對象會(huì )占用很多內存,而這些內存又不能被釋放。與其為每個(gè)文件或網(wǎng)絡(luò )連接創(chuàng )建一個(gè)日志,還不如利用 已有機制 將上下文信息傳給自定義日志對象,并將創(chuàng )建的日志對象限制在應用程序內的指定區域(通常是模塊,但偶爾會(huì )再精細些)使用。