/
/
/
1#!/usr/bin/env python3
2import os
3import time
4import threading
5import json
6import signal
7import sys
8
9import paho.mqtt.client as mqtt
10import RPi.GPIO as GPIO
11from rpi_ws281x import PixelStrip, Color
12
13
14# =========================
15# CONFIG FROM ENVIRONMENT
16# =========================
17
18MQTT_HOST = os.getenv("MQTT_HOST", "localhost")
19MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
20MQTT_USERNAME = os.getenv("MQTT_USERNAME", "")
21MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "")
22MQTT_CLIENT_ID = os.getenv("MQTT_CLIENT_ID", "doorbell-controller")
23MQTT_BASE_TOPIC = os.getenv("MQTT_BASE_TOPIC", "doorbell/front_door")
24
25BUTTON_GPIO = int(os.getenv("BUTTON_GPIO", "4"))
26LED_GPIO = int(os.getenv("LED_GPIO", "18"))
27LED_COUNT = int(os.getenv("LED_COUNT", "2"))
28LED_BRIGHTNESS = float(os.getenv("LED_BRIGHTNESS", "0.8"))
29
30TOPIC_BUTTON_STATE = f"{MQTT_BASE_TOPIC}/button/state"
31TOPIC_LED_STATE = f"{MQTT_BASE_TOPIC}/led/state"
32TOPIC_LED_COMMAND = f"{MQTT_BASE_TOPIC}/led/set"
33
34PRESS_DEBOUNCE_MS = 200
35LED_FLASH_SECONDS = 10
36
37
38# =========================
39# LED STRIP (WS281x)
40# =========================
41
42strip = PixelStrip(
43 LED_COUNT,
44 LED_GPIO,
45 800000, # frequency: 800kHz
46 10, # DMA channel
47 False, # invert signal
48 int(255 * LED_BRIGHTNESS),
49 0, # channel
50 0 # strip_type (0 = default GRB)
51)
52strip.begin()
53
54
55def led_off():
56 for i in range(LED_COUNT):
57 strip.setPixelColor(i, Color(0, 0, 0))
58 strip.show()
59
60
61def led_teal():
62 for i in range(LED_COUNT):
63 strip.setPixelColor(i, Color(0, 255, 180)) # teal-ish
64 strip.show()
65
66
67def set_led_rgb(r, g, b):
68 for i in range(LED_COUNT):
69 strip.setPixelColor(i, Color(r, g, b))
70 strip.show()
71
72
73# =========================
74# MQTT SETUP
75# =========================
76
77# Use v2 API to avoid deprecation warning
78client = mqtt.Client(
79 mqtt.CallbackAPIVersion.VERSION2,
80 client_id=MQTT_CLIENT_ID
81)
82
83if MQTT_USERNAME:
84 client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
85
86
87def publish_led_state(r, g, b):
88 payload = json.dumps({"r": r, "g": g, "b": b})
89 client.publish(TOPIC_LED_STATE, payload, qos=1, retain=False)
90
91
92def publish_button_state(state: str):
93 client.publish(TOPIC_BUTTON_STATE, state, qos=1, retain=False)
94
95
96def on_connect(c, userdata, flags, reason_code, properties=None):
97 print("Connected to MQTT, reason_code:", reason_code)
98 # Subscribe for LED commands from Home Assistant
99 c.subscribe(TOPIC_LED_COMMAND)
100 # Publish Home Assistant discovery after connection
101 publish_discovery_messages()
102
103
104def on_message(c, userdata, msg):
105 if msg.topic == TOPIC_LED_COMMAND:
106 try:
107 data = json.loads(msg.payload.decode())
108 r, g, b = int(data.get("r", 0)), int(data.get("g", 0)), int(data.get("b", 0))
109 set_led_rgb(r, g, b)
110 publish_led_state(r, g, b)
111 except Exception as e:
112 print("LED command error:", e)
113
114
115client.on_connect = on_connect
116client.on_message = on_message
117
118
119def publish_discovery_messages():
120 """
121 Publish Home Assistant MQTT Discovery configs so the device & entities
122 show up automatically.
123 """
124 device_info = {
125 "identifiers": ["doorbell_front_door"],
126 "manufacturer": "Yannick",
127 "model": "Custom Pi Zero Doorbell",
128 "name": "Front Door Doorbell",
129 }
130
131 # ---- Button as binary_sensor ----
132 btn_config = {
133 "name": "Doorbell Button",
134 "unique_id": "doorbell_front_door_button",
135 "state_topic": TOPIC_BUTTON_STATE,
136 "device_class": "occupancy",
137 "payload_on": "pressed",
138 "payload_off": "released",
139 "device": device_info,
140 }
141
142 client.publish(
143 "homeassistant/binary_sensor/doorbell_front_door/button/config",
144 json.dumps(btn_config),
145 retain=True,
146 )
147
148 # ---- LED Ring as MQTT JSON light ----
149 light_config = {
150 "name": "Doorbell LED Ring",
151 "unique_id": "doorbell_front_door_led",
152 "command_topic": TOPIC_LED_COMMAND,
153 "state_topic": TOPIC_LED_STATE,
154 "schema": "json",
155 "brightness": False,
156 "rgb": True,
157 "device": device_info,
158 }
159
160 client.publish(
161 "homeassistant/light/doorbell_front_door/led/config",
162 json.dumps(light_config),
163 retain=True,
164 )
165
166 print("Home Assistant MQTT discovery messages published.")
167
168
169# =========================
170# BUTTON (RPi.GPIO, POLLING)
171# =========================
172
173GPIO.setwarnings(False)
174GPIO.setmode(GPIO.BCM)
175GPIO.setup(BUTTON_GPIO, GPIO.IN, pull_up_down=GPIO.PUD_UP)
176
177_last_press_time_ms = 0
178
179
180def handle_button_press():
181 """Called when we detect a button press."""
182 global _last_press_time_ms
183 now_ms = time.time() * 1000.0
184 if now_ms - _last_press_time_ms < PRESS_DEBOUNCE_MS:
185 return
186 _last_press_time_ms = now_ms
187
188 print("Doorbell button pressed")
189 publish_button_state("pressed")
190
191 # Local LED behavior: teal for N seconds
192 led_teal()
193 publish_led_state(0, 255, 180)
194
195 def turn_off():
196 time.sleep(LED_FLASH_SECONDS)
197 led_off()
198 publish_led_state(0, 0, 0)
199
200 threading.Thread(target=turn_off, daemon=True).start()
201
202
203# =========================
204# CLEAN SHUTDOWN
205# =========================
206
207def cleanup_and_exit(signum=None, frame=None):
208 print("Shutting down doorbell app...")
209 try:
210 led_off()
211 except Exception:
212 pass
213 GPIO.cleanup()
214 client.loop_stop()
215 client.disconnect()
216 sys.exit(0)
217
218
219signal.signal(signal.SIGINT, cleanup_and_exit)
220signal.signal(signal.SIGTERM, cleanup_and_exit)
221
222
223# =========================
224# MAIN LOOP
225# =========================
226
227if __name__ == "__main__":
228 led_off()
229
230 # Connect & start MQTT loop (non-blocking)
231 client.connect(MQTT_HOST, MQTT_PORT, 60)
232 client.loop_start()
233
234 print("Doorbell controller running (polling button).")
235
236 try:
237 last_state = GPIO.input(BUTTON_GPIO)
238
239 while True:
240 state = GPIO.input(BUTTON_GPIO)
241 # Button wired to GND, internal pull-up:
242 # idle = HIGH, pressed = LOW
243 if last_state == GPIO.HIGH and state == GPIO.LOW:
244 handle_button_press()
245 elif last_state == GPIO.LOW and state == GPIO.HIGH:
246 # Release
247 publish_button_state("released")
248
249 last_state = state
250 time.sleep(0.01) # 10 ms polling
251 except KeyboardInterrupt:
252 cleanup_and_exit()
253
254
255