일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Windows
- multiprocessing
- 퀀트
- DataFrame
- pycharm
- 의사 클래스
- 구현
- 하이라이트
- idxmax
- OpenAPI+
- HTML
- pywinauto
- 필기
- 금결원
- 웹크롤링
- Python
- line number
- 코드블럭
- Tistory
- highlight.js
- 금융결제원
- 멀티 로그인
- QueueHandler
- 티스토리
- idxmin
- freeze_support
- CSS
- 진행 상황
- 파이썬
- 우리FIS
- Today
- Total
맨땅에 헤딩하는 사람
Python multiprocessing에서 logging 사용하기 (QueueHandler) 본문
logging
은 파이썬에서 자체적으로 제공하는 로그 패키지다. 여러가지 기능을 제공하므로 굳이 다른 패키지를 찾지 않더라도 유용하게 사용이 가능하지만 멀티프로세스 환경을 자체적으로 보장하지 못하는 문제를 가지고 있다. (멀티 쓰레드에서는 안전하게 동작한다.) logging Cookbook에서는 이러한 점을 언급하며 예제 코드를 제공하고 있으며 이를 참고하여 내가 사용할 클래스를 구현하였다.
QueueHandler
logging
패키지는 QueueHandler
를 제공한다. QueueHandler
란 말 그대로 Queue
에 logging 할 수 있게 구현된 Handler
다. 이 자체는 FileHandler
, StreamHandler
와 다르게 log 기록 시 파일이나 표준 출력에 기록되지 않고 Queue
에 출력이 저장된다. 그 후 위 Handler
에서 Queue
에 저장된 log를 꺼내 기록하는 과정으로 logging을 할 수 있다. 만약 multiprocess라면 여러 프로세스가 다중 생산자로 Queue
에 log 내역을 저장하고, 하나의 쓰레드 혹은 프로세스가 Queue
에서 log를 get()
해서 기록하는 역할을 수행하는 것이다. 아래 그림은 구현된 Class를 도식화한 것이다.
구현 코드
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
import logging
import logging.handlers
import multiprocessing
from threading import Thread
from random import choice, random
import time
import platform
'''
Class Log performs logger configuration, creation, multiprocess listener.
'''
class Log():
def __init__(self):
self.th = None
def get_logger(self, name):
return logging.getLogger(name)
def listener_start(self, file_path, name, queue):
self.th = Thread(target=self._proc_log_queue, args=(file_path, name, queue))
self.th.start()
def listener_end(self, queue):
queue.put(None)
self.th.join()
print('log listener end...')
def _proc_log_queue(self, file_path, name, queue):
self.config_log(file_path, name)
logger = self.get_logger(name)
while True:
try:
record = queue.get()
if record is None:
break
logger.handle(record)
except Exception:
import sys, traceback
print('listener problem', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
def config_queue_log(self, queue, name):
'''
if you use multiprocess logging,
call this in multiprocess as logging producer.
logging consumer function is [self.listener_start] and [self.listener_end]
it returns logger and you can use this logger to log
'''
qh = logging.handlers.QueueHandler(queue)
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.addHandler(qh)
return logger
def config_log(self, file_path, name):
'''
it returns FileHandler and StreamHandler logger
if you do not need to use multiprocess logging,
just call this function and use returned logger.
'''
# err file handler
fh_err = logging.handlers.RotatingFileHandler(file_path + '_error.log', 'a', 300, 10)
fh_err.setLevel(logging.WARNING)
# file handler
fh_dbg = logging.handlers.RotatingFileHandler(file_path + '_debug.log', 'a', 300, 10)
fh_dbg.setLevel(logging.DEBUG)
# console handler
sh = logging.StreamHandler()
sh.setLevel(logging.INFO)
# logging format setting
ff = logging.Formatter('''[%(asctime)s] %(levelname)s : %(message)s''')
sf = logging.Formatter('''[%(levelname)s] %(message)s''')
fh_err.setFormatter(ff)
fh_dbg.setFormatter(ff)
sh.setFormatter(sf)
if platform.system() == 'Windows':
import msvcrt
import win32api
import win32con
win32api.SetHandleInformation(msvcrt.get_osfhandle(fh_dbg.stream.fileno()),
win32con.HANDLE_FLAG_INHERIT, 0)
win32api.SetHandleInformation(msvcrt.get_osfhandle(fh_err.stream.fileno()),
win32con.HANDLE_FLAG_INHERIT, 0)
# create logger, assign handler
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.addHandler(fh_err)
logger.addHandler(fh_dbg)
logger.addHandler(sh)
return logger
'''
The code below tests the multiprocess logging.
Main process and child process produce log messasge. (put message into queue.)
(random choice in variable LEVEL, MESSAGES)
Listener process produced by main process consume log message. (write log message in stdout and file)
'''
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]
MESSAGES = ['Random message #1',
'Random message #2',
'Random message #3',
]
def worker(queue):
# multi process log producer start
logger = Log().config_queue_log(queue, 'mp')
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
for i in range(10):
time.sleep(random())
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, f"{name} - {message}")
print('Worker finished: %s' % name)
# multi process log producer end
def main():
queue = multiprocessing.Queue(-1)
listener = Log()
listener.listener_start('test', 'listener', queue) # log consumer thread start
workers = []
for i in range(10): # multiprocess loop
w = multiprocessing.Process(target=worker, args=(queue,))
workers.append(w)
w.start()
# main process log producer start
logger = Log().config_queue_log(queue, 'mp')
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
for i in range(10):
time.sleep(random())
level = choice(LEVELS)
message = choice(MESSAGES)
logger.log(level, f"{name} - {message}")
print('Worker finished: %s' % name)
# main process log producer end
for w in workers:
w.join()
listener.listener_end(queue) # log consumer thread end
if __name__ == '__main__':
main()
|
cs |
- line 18 - 39 :
listener_start()
함수는_proc_log_queue()
를Thread
로 실행시킨다._proc_log_queue
는Queue
에서 log data를get()
하고logger.handle()
을 통해 기록하는 동작을 계속 반복한다.Queue.get()
함수는timeout
parameter가 없을 경우Queue
에 데이터가 없으면 블록된다(waiting 상태).get()
한 데이터가None
인 경우 종료된다. 종료는Queue
에None
데이터를 삽입하는listener_end()
함수를 통해 이루어진다. - line 41 - 52 :
QueueHandler
를 할당받은logger
를 선언하고 반환한다. - line 54 - 90 :
FileHandler
와StreamHandler
를 할당받은logger
를 선언하고 반환한다. 만약multiprocessing
을 사용하지 않는다면 이logger
만 사용하여 log를 수행할 수 있다. - line 75 - 82 : 위 코드에서는
config_log()
가_proc_log_queue()
에서만 사용된다. 하지만 단일 프로세스를 사용하면서config_log()
를 사용하다가multiprocessing
을 사용하며listener_start()
를 호출해_proc_log_queue()
가 호출된다면 단일 프로세스와listener_start()
를 사용한 쓰레드(코드에서는Thread()
지만Process()
로 사용할 수도 있다.)가 동일한 log file을 참조하게 된다. 이에 따라Handler
가 log file의 이름을 자동적으로 바꿔줄 때 윈도우 운영체제를 사용할 경우 에러가 발생하는데 이에 대한 참조를 허용하는 코드이다. - line 91 - : logging level과 message를 무작위로 선택하여 로그를 기록하는 코드다. 10개의 child process와 main process에서
QueueHandler
로Queue
에 log를 기록하고listener.start()
를 통해Queue
에서 log 내역을 받아 file 및 stdout 출력을 하는 Thread를 생성한다.
logging
패키지를 사용하며 굳이 logger
의 이름을 정의할 필요가 있을까란 의문을 잠깐 했었다. logging
패키지는 한 번 logger
의 name을 할당하여 생성되면 메모리 공간에 남아 프로세스 내에서 언제든 호출할 경우 같은 logger
를 획득하게 된다. 따라서 실제로 logging
패키지를 사용하면서 다양한 Handler
나 다양한 조건에 따른 log 출력을 원할 경우 각기 다른 logger
설정이 필요하므로 name을 통해 logger
를 쉽게 할당받는 방법은 아주 효율적이라 생각한다.
참고
logging Cookbook
'파이썬 > 이론' 카테고리의 다른 글
Python DataFrame 최대 scalar 값의 index, column 구하기 (0) | 2020.08.31 |
---|---|
[Python] pandas DataFrame 최적화 (삽입, 생성, 반복, 문자열) (0) | 2020.08.25 |
[Python] tqdm을 사용하여 for 반복, 함수 진행률 출력하기 (0) | 2020.08.23 |
[Python] Windows GUI 자동화 pywinauto 사용법 (4) | 2020.08.22 |
Windows 환경 python multiprocessing 시 freeze_support() (0) | 2020.08.09 |