/
/
/
1"""
2Logging utilities.
3
4A lot in this file has been copied from Home Assistant:
5https://github.com/home-assistant/core/blob/e5ccd85e7e26c167d0b73669a88bc3a7614dd456/homeassistant/util/logging.py#L78
6
7All rights reserved.
8"""
9
10from __future__ import annotations
11
12import asyncio
13import inspect
14import logging
15import logging.handlers
16import queue
17import traceback
18from collections.abc import Callable, Coroutine
19from functools import partial, wraps
20from typing import Any, cast, overload
21
22
23class LoggingQueueHandler(logging.handlers.QueueHandler):
24 """Process the log in another thread."""
25
26 listener: logging.handlers.QueueListener | None = None
27
28 def prepare(self, record: logging.LogRecord) -> logging.LogRecord:
29 """Prepare a record for queuing.
30
31 This is added as a workaround for https://bugs.python.org/issue46755
32 """
33 record = super().prepare(record)
34 record.stack_info = None
35 return record
36
37 def handle(self, record: logging.LogRecord) -> Any:
38 """Conditionally emit the specified logging record.
39
40 Depending on which filters have been added to the handler, push the new
41 records onto the backing Queue.
42
43 The default python logger Handler acquires a lock
44 in the parent class which we do not need as
45 SimpleQueue is already thread safe.
46
47 See https://bugs.python.org/issue24645
48 """
49 return_value = self.filter(record)
50 if return_value:
51 self.emit(record)
52 return return_value
53
54 def close(self) -> None:
55 """Tidy up any resources used by the handler.
56
57 This adds shutdown of the QueueListener
58 """
59 super().close()
60 if not self.listener:
61 return
62 self.listener.stop()
63 self.listener = None
64
65
66def activate_log_queue_handler() -> None:
67 """Migrate the existing log handlers to use the queue.
68
69 This allows us to avoid blocking I/O and formatting messages
70 in the event loop as log messages are written in another thread.
71 """
72 simple_queue: queue.SimpleQueue[logging.Handler] = queue.SimpleQueue()
73 queue_handler = LoggingQueueHandler(simple_queue)
74 logging.root.addHandler(queue_handler)
75
76 migrated_handlers: list[logging.Handler] = []
77 for handler in logging.root.handlers[:]:
78 if handler is queue_handler:
79 continue
80 logging.root.removeHandler(handler)
81 migrated_handlers.append(handler)
82
83 listener = logging.handlers.QueueListener(simple_queue, *migrated_handlers)
84 queue_handler.listener = listener
85
86 listener.start()
87
88
89def log_exception(format_err: Callable[..., Any], *args: Any) -> None:
90 """Log an exception with additional context."""
91 module = inspect.getmodule(inspect.stack(context=0)[1].frame)
92 if module is not None: # noqa: SIM108
93 module_name = module.__name__
94 else:
95 # If Python is unable to access the sources files, the call stack frame
96 # will be missing information, so let's guard.
97 # https://github.com/home-assistant/core/issues/24982
98 module_name = __name__
99
100 # Do not print the wrapper in the traceback
101 frames = len(inspect.trace()) - 1
102 exc_msg = traceback.format_exc(-frames)
103 friendly_msg = format_err(*args)
104 logging.getLogger(module_name).error("%s\n%s", friendly_msg, exc_msg)
105
106
107@overload
108def catch_log_exception(
109 func: Callable[..., Coroutine[Any, Any, Any]], format_err: Callable[..., Any]
110) -> Callable[..., Coroutine[Any, Any, None]]: ...
111
112
113@overload
114def catch_log_exception(
115 func: Callable[..., Any], format_err: Callable[..., Any]
116) -> Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]: ...
117
118
119def catch_log_exception(
120 func: Callable[..., Any], format_err: Callable[..., Any]
121) -> Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]:
122 """Decorate a function func to catch and log exceptions.
123
124 If func is a coroutine function, a coroutine function will be returned.
125 If func is a callback, a callback will be returned.
126 """
127 # Check for partials to properly determine if coroutine function
128 check_func = func
129 while isinstance(check_func, partial):
130 check_func = check_func.func
131
132 wrapper_func: Callable[..., None] | Callable[..., Coroutine[Any, Any, None]]
133 if asyncio.iscoroutinefunction(check_func):
134 async_func = cast("Callable[..., Coroutine[Any, Any, None]]", func)
135
136 @wraps(async_func)
137 async def async_wrapper(*args: Any) -> None:
138 """Catch and log exception."""
139 try:
140 await async_func(*args)
141 except Exception:
142 log_exception(format_err, *args)
143
144 wrapper_func = async_wrapper
145
146 else:
147
148 @wraps(func)
149 def wrapper(*args: Any) -> None:
150 """Catch and log exception."""
151 try:
152 func(*args)
153 except Exception:
154 log_exception(format_err, *args)
155
156 wrapper_func = wrapper
157 return wrapper_func
158