/
/
/
1import os
2import time
3import threading
4import json
5
6import paho.mqtt.client as mqtt
7from gpiozero import Button
8
9import board
10import neopixel
11
12
13# ---- Config from env ----
14MQTT_HOST = os.getenv("MQTT_HOST", "localhost")
15MQTT_PORT = int(os.getenv("MQTT_PORT", "1883"))
16MQTT_USERNAME = os.getenv("MQTT_USERNAME", "")
17MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", "")
18MQTT_CLIENT_ID = os.getenv("MQTT_CLIENT_ID", "doorbell-controller")
19MQTT_BASE_TOPIC = os.getenv("MQTT_BASE_TOPIC", "doorbell")
20
21BUTTON_GPIO = int(os.getenv("BUTTON_GPIO", "4"))
22LED_GPIO = int(os.getenv("LED_GPIO", "18"))
23LED_COUNT = int(os.getenv("LED_COUNT", "2"))
24LED_BRIGHTNESS = float(os.getenv("LED_BRIGHTNESS", "0.8"))
25
26# Topics
27TOPIC_BUTTON_STATE = f"{MQTT_BASE_TOPIC}/button/state"
28TOPIC_LED_STATE = f"{MQTT_BASE_TOPIC}/led/state"
29TOPIC_LED_COMMAND = f"{MQTT_BASE_TOPIC}/led/set" # optional: remote control
30
31
32# ---- LED setup ----
33pixels = neopixel.NeoPixel(
34 getattr(board, f"D{LED_GPIO}"),
35 LED_COUNT,
36 brightness=LED_BRIGHTNESS,
37 auto_write=True,
38)
39
40
41def led_off():
42 pixels.fill((0, 0, 0))
43
44
45def led_teal():
46 # bright teal: (R,G,B)
47 pixels.fill((0, 255, 180))
48
49
50# ---- MQTT setup ----
51client = mqtt.Client(client_id=MQTT_CLIENT_ID, clean_session=True)
52
53
54def on_connect(client, userdata, flags, rc, properties=None):
55 print("MQTT connected with rc =", rc)
56 # Subscribe for optional LED commands
57 client.subscribe(TOPIC_LED_COMMAND)
58
59
60def on_message(client, userdata, msg):
61 # Optional: allow HA to control LED
62 if msg.topic == TOPIC_LED_COMMAND:
63 try:
64 payload = msg.payload.decode("utf-8")
65 data = json.loads(payload)
66 # Expecting {"r":0-255,"g":0-255,"b":0-255}
67 r = int(data.get("r", 0))
68 g = int(data.get("g", 0))
69 b = int(data.get("b", 0))
70 pixels.fill((r, g, b))
71 publish_led_state((r, g, b))
72 except Exception as e:
73 print("Failed to parse LED command:", e)
74
75
76client.on_connect = on_connect
77client.on_message = on_message
78
79if MQTT_USERNAME:
80 client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
81
82client.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
83
84
85def publish_button_state(state: str):
86 # state: "pressed" or "released"
87 client.publish(TOPIC_BUTTON_STATE, state, qos=1, retain=False)
88
89
90def publish_led_state(rgb):
91 r, g, b = rgb
92 payload = json.dumps({"r": r, "g": g, "b": b})
93 client.publish(TOPIC_LED_STATE, payload, qos=1, retain=False)
94
95
96# ---- Button handling ----
97button = Button(BUTTON_GPIO, pull_up=True, bounce_time=0.05)
98
99
100def handle_press():
101 print("Doorbell button pressed")
102 publish_button_state("pressed")
103
104 # Local LED behavior: teal for 10 seconds
105 led_teal()
106 publish_led_state((0, 255, 180))
107
108 def turn_off_later():
109 time.sleep(10)
110 led_off()
111 publish_led_state((0, 0, 0))
112
113 threading.Thread(target=turn_off_later, daemon=True).start()
114
115
116button.when_pressed = handle_press
117
118
119# ---- Main loop ----
120try:
121 led_off()
122 print("Doorbell app started.")
123 client.loop_start()
124 while True:
125 time.sleep(1)
126except KeyboardInterrupt:
127 pass
128finally:
129 led_off()
130 client.loop_stop()
131 client.disconnect()
132
133