/
/
/
1"""
2Logging utilities.
3
4A lot in this file has been copied from Home Assistant:
5https://github.com/home-assistant/core/blob/e5ccd85e7e26c167d0b73669a88bc3a7614dd456/homeassistant/util/logging.py#L78
6
7All rights reserved.
8"""
9
10from __future__ import annotations
11
12import inspect
13import logging
14import logging.handlers
15import queue
16import traceback
17from collections.abc import Callable, Coroutine
18from functools import partial, wraps
19from typing import Any, cast, overload
20
21
22class LoggingQueueHandler(logging.handlers.QueueHandler):
23 """Process the log in another thread."""
24
25 listener: logging.handlers.QueueListener | None = None
26
27 def prepare(self, record: logging.LogRecord) -> logging.LogRecord:
28 """Prepare a record for queuing.
29
30 This is added as a workaround for https://bugs.python.org/issue46755
31 """
32 record = super().prepare(record)
33 record.stack_info = None
34 return record
35
36 def handle(self, record: logging.LogRecord) -> Any:
37 """Conditionally emit the specified logging record.
38
39 Depending on which filters have been added to the handler, push the new
40 records onto the backing Queue.
41
42 The default python logger Handler acquires a lock
43 in the parent class which we do not need as
44 SimpleQueue is already thread safe.
45
46 See https://bugs.python.org/issue24645
47 """
48 return_value = self.filter(record)
49 if return_value:
50 self.emit(record)
51 return return_value
52
53 def close(self) -> None:
54 """Tidy up any resources used by the handler.
55
56 This adds shutdown of the QueueListener
57 """
58 super().close()
59 if not self.listener:
60 return
61 self.listener.stop()
62 self.listener = None
63
64
65def activate_log_queue_handler() -> None:
66 """Migrate the existing log handlers to use the queue.
67
68 This allows us to avoid blocking I/O and formatting messages
69 in the event loop as log messages are written in another thread.
70 """
71 simple_queue: queue.SimpleQueue[logging.Handler] = queue.SimpleQueue()
72 queue_handler = LoggingQueueHandler(simple_queue)
73 logging.root.addHandler(queue_handler)
74
75 migrated_handlers: list[logging.Handler] = []
76 for handler in logging.root.handlers[:]:
77 if handler is queue_handler:
78 continue
79 logging.root.removeHandler(handler)
80 migrated_handlers.append(handler)
81
82 listener = logging.handlers.QueueListener(simple_queue, *migrated_handlers)
83 queue_handler.listener = listener
84
85 listener.start()
86
87
88def log_exception(format_err: Callable[..., Any], *args: Any) -> None:
89 """Log an exception with additional context."""
90 module = inspect.getmodule(inspect.stack(context=0)[1].frame)
91 if module is not None: # noqa: SIM108
92 module_name = module.__name__
93 else:
94 # If Python is unable to access the sources files, the call stack frame
95 # will be missing information, so let's guard.
96 # https://github.com/home-assistant/core/issues/24982
97 module_name = __name__
98
99 # Do not print the wrapper in the traceback
100 frames = len(inspect.trace()) - 1
101 exc_msg = traceback.format_exc(-frames)
102 friendly_msg = format_err(*args)
103 logging.getLogger(module_name).error("%s\n%s", friendly_msg, exc_msg)
104
105
106@overload
107def catch_log_exception(
108 func: Callable[..., Coroutine[Any, Any, Any]], format_err: Callable[..., Any]
109) -> Callable[..., Coroutine[Any, Any, None]]: ...
110
111
112@overload
113def catch_log_exception(
114 func: Callable[..., Any], format_err: Callable[..., Any]
115) -> Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]: ...
116
117
118def catch_log_exception(
119 func: Callable[..., Any], format_err: Callable[..., Any]
120) -> Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]:
121 """Decorate a function func to catch and log exceptions.
122
123 If func is a coroutine function, a coroutine function will be returned.
124 If func is a callback, a callback will be returned.
125 """
126 # Check for partials to properly determine if coroutine function
127 check_func = func
128 while isinstance(check_func, partial):
129 check_func = check_func.func
130
131 wrapper_func: Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]
132 if inspect.iscoroutinefunction(check_func):
133 async_func = cast("Callable[..., Coroutine[Any, Any, None]]", func)
134
135 @wraps(async_func)
136 async def async_wrapper(*args: Any) -> None:
137 """Catch and log exception."""
138 try:
139 await async_func(*args)
140 except Exception:
141 log_exception(format_err, *args)
142
143 wrapper_func = async_wrapper
144
145 else:
146
147 @wraps(func)
148 def wrapper(*args: Any) -> None:
149 """Catch and log exception."""
150 try:
151 func(*args)
152 except Exception:
153 log_exception(format_err, *args)
154
155 wrapper_func = wrapper
156 return wrapper_func
157