/
/
/
Ansible role that deployes services on my runner machine
1"""Frigate Manager - Profile switching and snapshot capture service.
2
3Listens on MQTT for profile switch commands (home/away) and captures
4periodic snapshots from enabled cameras. Profile switching replaces
5the active Frigate config and restarts Frigate to load/unload the
6GPU detection model.
7"""
8
9import json
10import logging
11import os
12import shutil
13import threading
14import time
15from datetime import datetime
16from pathlib import Path
17
18import paho.mqtt.client as mqtt
19import requests
20
21# ---------------------------------------------------------------------------
22# Configuration from environment
23# ---------------------------------------------------------------------------
24FRIGATE_URL = os.environ.get("FRIGATE_URL", "http://frigate:5000")
25MQTT_HOST = os.environ.get("MQTT_HOST", "localhost")
26MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883"))
27MQTT_USER = os.environ.get("MQTT_USER", "")
28MQTT_PASS = os.environ.get("MQTT_PASS", "")
29SNAPSHOT_INTERVAL = int(os.environ.get("SNAPSHOT_INTERVAL", "1800"))
30SNAPSHOT_CAMERAS = [
31 c.strip()
32 for c in os.environ.get("SNAPSHOT_CAMERAS", "").split(",")
33 if c.strip()
34]
35SWITCH_TIMEOUT = int(os.environ.get("SWITCH_TIMEOUT", "120"))
36CONFIG_DIR = Path(os.environ.get("CONFIG_DIR", "/config"))
37SNAPSHOT_DIR = Path(os.environ.get("SNAPSHOT_DIR", "/snapshots"))
38LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
39
40# MQTT topics
41TOPIC_PROFILE_SET = "frigate/profile/set"
42TOPIC_PROFILE_STATUS = "frigate/profile/status"
43
44MARKER_FILE = CONFIG_DIR / ".current_profile"
45ACTIVE_CONFIG = CONFIG_DIR / "config.yml"
46
47logging.basicConfig(
48 level=getattr(logging, LOG_LEVEL, logging.INFO),
49 format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
50)
51log = logging.getLogger("frigate-manager")
52
53
54# ---------------------------------------------------------------------------
55# Profile Switcher
56# ---------------------------------------------------------------------------
57class ProfileSwitcher:
58 def __init__(self, mqtt_client: mqtt.Client):
59 self._lock = threading.Lock()
60 self._mqtt = mqtt_client
61 self.is_switching = False
62 self.current_profile = self._detect_current_profile()
63
64 def _detect_current_profile(self) -> str:
65 """Read persisted profile or fall back to default."""
66 if MARKER_FILE.exists():
67 profile = MARKER_FILE.read_text().strip()
68 if profile:
69 log.info("Detected persisted profile: %s", profile)
70 return profile
71 # Find the default profile from available config files
72 for name in ("away", "home"):
73 if (CONFIG_DIR / f"config.{name}.yml").exists():
74 log.info("No marker found, defaulting to profile: %s", name)
75 return name
76 return "away"
77
78 def _publish_status(self, profile: str, state: str, progress: str = ""):
79 payload = {"profile": profile, "state": state}
80 if progress:
81 payload["progress"] = progress
82 self._mqtt.publish(
83 TOPIC_PROFILE_STATUS, json.dumps(payload), qos=1, retain=True
84 )
85
86 def switch(self, target_profile: str):
87 if not self._lock.acquire(blocking=False):
88 log.warning("Switch already in progress, ignoring request for %s", target_profile)
89 return
90
91 try:
92 self.is_switching = True
93 source = CONFIG_DIR / f"config.{target_profile}.yml"
94
95 if not source.exists():
96 log.error("Profile config not found: %s", source)
97 self._publish_status(target_profile, "error", "config_not_found")
98 return
99
100 if target_profile == self.current_profile:
101 log.info("Already on profile %s, no switch needed", target_profile)
102 self._publish_status(target_profile, "active")
103 return
104
105 log.info("Switching from %s to %s", self.current_profile, target_profile)
106 self._publish_status(target_profile, "switching", "copying_config")
107
108 # Copy profile config to active config
109 try:
110 shutil.copy2(str(source), str(ACTIVE_CONFIG))
111 except OSError as exc:
112 log.error("Failed to copy config: %s", exc)
113 self._publish_status(target_profile, "error", "copy_failed")
114 return
115
116 # Restart Frigate via API
117 self._publish_status(target_profile, "switching", "restarting")
118 try:
119 resp = requests.post(f"{FRIGATE_URL}/api/restart", timeout=30)
120 resp.raise_for_status()
121 except requests.RequestException as exc:
122 log.error("Failed to restart Frigate: %s", exc)
123 self._publish_status(target_profile, "error", "restart_failed")
124 return
125
126 # Wait for Frigate to come back healthy
127 self._publish_status(target_profile, "switching", "waiting_healthy")
128 deadline = time.monotonic() + SWITCH_TIMEOUT
129 healthy = False
130 # Give Frigate a moment to begin restarting
131 time.sleep(5)
132 while time.monotonic() < deadline:
133 try:
134 r = requests.get(f"{FRIGATE_URL}/api/config", timeout=5)
135 if r.status_code == 200:
136 healthy = True
137 break
138 except requests.RequestException:
139 pass
140 time.sleep(3)
141
142 if not healthy:
143 log.error(
144 "Frigate did not become healthy within %ds after profile switch",
145 SWITCH_TIMEOUT,
146 )
147 self._publish_status(target_profile, "error", "health_timeout")
148 return
149
150 # Persist and announce success
151 self.current_profile = target_profile
152 MARKER_FILE.write_text(target_profile)
153 self._publish_status(target_profile, "active")
154 log.info("Profile switch to %s completed successfully", target_profile)
155
156 finally:
157 self.is_switching = False
158 self._lock.release()
159
160
161# ---------------------------------------------------------------------------
162# Snapshot Scheduler
163# ---------------------------------------------------------------------------
164class SnapshotScheduler:
165 def __init__(self, switcher: ProfileSwitcher, mqtt_client: mqtt.Client):
166 self._switcher = switcher
167 self._mqtt = mqtt_client
168 self._timer: threading.Timer | None = None
169 self._camera_states: dict[str, str] = {}
170 self._stop = threading.Event()
171
172 # Subscribe to camera enabled states
173 for cam in SNAPSHOT_CAMERAS:
174 topic = f"frigate/{cam}/enabled/state"
175 mqtt_client.subscribe(topic, qos=1)
176 mqtt_client.message_callback_add(topic, self._on_camera_state)
177
178 def _on_camera_state(self, _client, _userdata, msg):
179 cam = msg.topic.split("/")[1]
180 self._camera_states[cam] = msg.payload.decode().strip()
181
182 def start(self):
183 self._stop.clear()
184 self._schedule_next()
185
186 def stop(self):
187 self._stop.set()
188 if self._timer:
189 self._timer.cancel()
190
191 def _schedule_next(self):
192 if self._stop.is_set():
193 return
194 self._timer = threading.Timer(SNAPSHOT_INTERVAL, self._capture_all)
195 self._timer.daemon = True
196 self._timer.start()
197
198 def _capture_all(self):
199 if self._switcher.is_switching:
200 log.info("Profile switch in progress, skipping snapshot cycle")
201 self._schedule_next()
202 return
203
204 for cam in SNAPSHOT_CAMERAS:
205 state = self._camera_states.get(cam, "ON")
206 if state != "ON":
207 log.debug("Camera %s not enabled (state=%s), skipping", cam, state)
208 continue
209 self._capture_snapshot(cam)
210
211 self._schedule_next()
212
213 def _capture_snapshot(self, cam: str):
214 today = datetime.now().strftime("%Y-%m-%d")
215 ts = datetime.now().strftime("%Y%m%d_%H%M%S")
216 out_dir = SNAPSHOT_DIR / cam / today
217 out_dir.mkdir(parents=True, exist_ok=True)
218 out_file = out_dir / f"{cam}_{ts}.jpg"
219
220 try:
221 resp = requests.get(
222 f"{FRIGATE_URL}/api/{cam}/latest.jpg", timeout=15
223 )
224 resp.raise_for_status()
225 out_file.write_bytes(resp.content)
226 log.info("Saved snapshot %s", out_file)
227 except requests.RequestException as exc:
228 log.warning("Snapshot failed for %s: %s", cam, exc)
229
230
231# ---------------------------------------------------------------------------
232# MQTT Setup
233# ---------------------------------------------------------------------------
234def create_mqtt_client(switcher_ref: list) -> mqtt.Client:
235 client = mqtt.Client(
236 client_id="frigate-manager",
237 callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
238 )
239
240 if MQTT_USER:
241 client.username_pw_set(MQTT_USER, MQTT_PASS)
242
243 def on_connect(client, _userdata, _flags, rc, _properties=None):
244 if rc == 0:
245 log.info("Connected to MQTT broker %s:%d", MQTT_HOST, MQTT_PORT)
246 client.subscribe(TOPIC_PROFILE_SET, qos=1)
247 else:
248 log.error("MQTT connection failed with code %d", rc)
249
250 def on_message(_client, _userdata, msg):
251 if msg.topic == TOPIC_PROFILE_SET:
252 target = msg.payload.decode().strip().lower()
253 if target in ("home", "away"):
254 switcher = switcher_ref[0]
255 if switcher:
256 threading.Thread(
257 target=switcher.switch,
258 args=(target,),
259 name=f"switch-{target}",
260 daemon=True,
261 ).start()
262 else:
263 log.warning("Unknown profile requested: %s", target)
264
265 client.on_connect = on_connect
266 client.on_message = on_message
267 client.reconnect_delay_set(min_delay=1, max_delay=60)
268
269 return client
270
271
272# ---------------------------------------------------------------------------
273# Main
274# ---------------------------------------------------------------------------
275def main():
276 log.info("Frigate Manager starting")
277 log.info("Config dir: %s, Snapshot dir: %s", CONFIG_DIR, SNAPSHOT_DIR)
278 log.info("Snapshot cameras: %s, interval: %ds", SNAPSHOT_CAMERAS, SNAPSHOT_INTERVAL)
279
280 # Use a mutable reference so MQTT callbacks can access the switcher
281 switcher_ref: list = [None]
282 client = create_mqtt_client(switcher_ref)
283 client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
284 client.loop_start()
285
286 # Allow MQTT to connect before publishing initial status
287 time.sleep(2)
288
289 switcher = ProfileSwitcher(client)
290 switcher_ref[0] = switcher
291
292 # Publish initial status
293 client.publish(
294 TOPIC_PROFILE_STATUS,
295 json.dumps({"profile": switcher.current_profile, "state": "active"}),
296 qos=1,
297 retain=True,
298 )
299 log.info("Initial profile: %s", switcher.current_profile)
300
301 scheduler = SnapshotScheduler(switcher, client)
302 scheduler.start()
303 log.info("Snapshot scheduler started (interval=%ds)", SNAPSHOT_INTERVAL)
304
305 # Run forever
306 try:
307 while True:
308 time.sleep(60)
309 except KeyboardInterrupt:
310 log.info("Shutting down")
311 finally:
312 scheduler.stop()
313 client.loop_stop()
314 client.disconnect()
315
316
317if __name__ == "__main__":
318 main()
319