맨땅에 헤딩하는 사람

Python multiprocessing에서 logging 사용하기 (QueueHandler) 본문

파이썬/이론

Python multiprocessing에서 logging 사용하기 (QueueHandler)

purplechip 2020. 8. 30. 23:42

logging은 파이썬에서 자체적으로 제공하는 로그 패키지다. 여러가지 기능을 제공하므로 굳이 다른 패키지를 찾지 않더라도 유용하게 사용이 가능하지만 멀티프로세스 환경을 자체적으로 보장하지 못하는 문제를 가지고 있다. (멀티 쓰레드에서는 안전하게 동작한다.) logging Cookbook에서는 이러한 점을 언급하며 예제 코드를 제공하고 있으며 이를 참고하여 내가 사용할 클래스를 구현하였다.

 

QueueHandler

logging 패키지는 QueueHandler를 제공한다. QueueHandler란 말 그대로 Queue에 logging 할 수 있게 구현된 Handler다. 이 자체는 FileHandler, StreamHandler와 다르게 log 기록 시 파일이나 표준 출력에 기록되지 않고 Queue에 출력이 저장된다. 그 후 위 Handler에서 Queue에 저장된 log를 꺼내 기록하는 과정으로 logging을 할 수 있다. 만약 multiprocess라면 여러 프로세스가 다중 생산자로 Queue에 log 내역을 저장하고, 하나의 쓰레드 혹은 프로세스가 Queue에서 log를 get() 해서 기록하는 역할을 수행하는 것이다. 아래 그림은 구현된 Class를 도식화한 것이다.

[그림 1. QueueHandler logging 사용 구조]

 

구현 코드 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import logging
import logging.handlers
import multiprocessing
from threading import Thread
from random import choice, random
import time
import platform
'''
Class Log performs logger configuration, creation, multiprocess listener.
'''
class Log():
    def __init__(self):
        self.th = None
 
    def get_logger(self, name):
        return logging.getLogger(name)
 
    def listener_start(self, file_path, name, queue):
        self.th = Thread(target=self._proc_log_queue, args=(file_path, name, queue))
        self.th.start()
 
    def listener_end(self, queue):
        queue.put(None)
        self.th.join()
        print('log listener end...')
 
    def _proc_log_queue(self, file_path, name, queue):
        self.config_log(file_path, name)
        logger = self.get_logger(name)
        while True:
            try:
                record = queue.get()
                if record is None:
                    break
                logger.handle(record)
            except Exception:
                import sys, traceback
                print('listener problem'file=sys.stderr)
                traceback.print_exc(file=sys.stderr)
 
    def config_queue_log(self, queue, name):
        '''
        if you use multiprocess logging,
        call this in multiprocess as logging producer.
        logging consumer function is [self.listener_start] and [self.listener_end]
        it returns logger and you can use this logger to log
        '''
        qh = logging.handlers.QueueHandler(queue)
        logger = logging.getLogger(name)
        logger.setLevel(logging.DEBUG)
        logger.addHandler(qh)
        return logger
 
    def config_log(self, file_path, name):
        '''
        it returns FileHandler and StreamHandler logger
        if you do not need to use multiprocess logging,
        just call this function and use returned logger.
        '''
        # err file handler
        fh_err = logging.handlers.RotatingFileHandler(file_path + '_error.log''a'30010)
        fh_err.setLevel(logging.WARNING)
        # file handler
        fh_dbg = logging.handlers.RotatingFileHandler(file_path + '_debug.log''a'30010)
        fh_dbg.setLevel(logging.DEBUG)
        # console handler
        sh = logging.StreamHandler()
        sh.setLevel(logging.INFO)
        # logging format setting
        ff = logging.Formatter('''[%(asctime)s] %(levelname)s : %(message)s''')
        sf = logging.Formatter('''[%(levelname)s] %(message)s''')
        fh_err.setFormatter(ff)
        fh_dbg.setFormatter(ff)
        sh.setFormatter(sf)
        if platform.system() == 'Windows':
            import msvcrt
            import win32api
            import win32con
            win32api.SetHandleInformation(msvcrt.get_osfhandle(fh_dbg.stream.fileno()),
                                          win32con.HANDLE_FLAG_INHERIT, 0)
            win32api.SetHandleInformation(msvcrt.get_osfhandle(fh_err.stream.fileno()),
                                          win32con.HANDLE_FLAG_INHERIT, 0)
        # create logger, assign handler
        logger = logging.getLogger(name)
        logger.setLevel(logging.DEBUG)
        logger.addHandler(fh_err)
        logger.addHandler(fh_dbg)
        logger.addHandler(sh)
        return logger
 
'''
The code below tests the multiprocess logging.
Main process and child process produce log messasge. (put message into queue.) 
(random choice in variable LEVEL, MESSAGES)
Listener process produced by main process consume log message. (write log message in stdout and file)
'''
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]
MESSAGES = ['Random message #1'
            'Random message #2',
            'Random message #3',
           ]
 
 
def worker(queue):
    # multi process log producer start
    logger = Log().config_queue_log(queue, 'mp')
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, f"{name} - {message}")
    print('Worker finished: %s' % name)
    # multi process log producer end
 
 
def main():
    queue = multiprocessing.Queue(-1)
    listener = Log()
    listener.listener_start('test''listener', queue)  # log consumer thread start
    
    workers = []
    for i in range(10):  # multiprocess loop
        w = multiprocessing.Process(target=worker, args=(queue,))
        workers.append(w)
        w.start()
    # main process log producer start
    logger = Log().config_queue_log(queue, 'mp')
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, f"{name} - {message}")
    print('Worker finished: %s' % name)
    # main process log producer end
    for w in workers:
        w.join()
    listener.listener_end(queue)  # log consumer thread end
 
 
if __name__ == '__main__':
    main()
 
cs
  • line 18 - 39 : listener_start() 함수는 _proc_log_queue()Thread로 실행시킨다. _proc_log_queueQueue에서 log data를 get()하고 logger.handle()을 통해 기록하는 동작을 계속 반복한다. Queue.get() 함수는 timeout parameter가 없을 경우 Queue에 데이터가 없으면 블록된다(waiting 상태).  get()한 데이터가 None인 경우 종료된다. 종료는 QueueNone 데이터를 삽입하는 listener_end() 함수를 통해 이루어진다.
  • line 41 - 52 : QueueHandler를 할당받은 logger를 선언하고 반환한다.
  • line 54 - 90 : FileHandlerStreamHandler를 할당받은 logger를 선언하고 반환한다. 만약 multiprocessing을 사용하지 않는다면 이 logger만 사용하여 log를 수행할 수 있다.
  • line 75 - 82 : 위 코드에서는 config_log()_proc_log_queue()에서만 사용된다. 하지만 단일 프로세스를 사용하면서 config_log()를 사용하다가 multiprocessing을 사용하며 listener_start()를 호출해 _proc_log_queue()가 호출된다면 단일 프로세스와 listener_start()를 사용한 쓰레드(코드에서는 Thread()지만 Process()로 사용할 수도 있다.)가 동일한 log file을 참조하게 된다. 이에 따라 Handler가 log file의 이름을 자동적으로 바꿔줄 때 윈도우 운영체제를 사용할 경우 에러가 발생하는데 이에 대한 참조를 허용하는 코드이다. 
  • line 91 - : logging level과 message를 무작위로 선택하여 로그를 기록하는 코드다. 10개의 child process와 main process에서 QueueHandlerQueue에 log를 기록하고 listener.start()를 통해 Queue에서 log 내역을 받아 file 및 stdout 출력을 하는 Thread를 생성한다.

logging 패키지를 사용하며 굳이 logger의 이름을 정의할 필요가 있을까란 의문을 잠깐 했었다. logging 패키지는 한 번 logger의 name을 할당하여 생성되면 메모리 공간에 남아 프로세스 내에서 언제든 호출할 경우 같은 logger를 획득하게 된다. 따라서 실제로 logging 패키지를 사용하면서 다양한 Handler나 다양한 조건에 따른 log 출력을 원할 경우 각기 다른 logger 설정이 필요하므로 name을 통해 logger를 쉽게 할당받는 방법은 아주 효율적이라 생각한다.

 

참고

logging Cookbook

https://docs.python.org/ko/3/howto/logging-cookbook.html

Comments