#!/usr/bin/env python3 """ doorbell-viewport: UniFi Protect doorbell display daemon Connects to UniFi Protect, listens for doorbell ring events, and manages display power + video playback on a Raspberry Pi portrait touchscreen. States: IDLE — display backlight off, no playback ACTIVE — display on, live RTSP stream playing via mpv Transitions: ring event : IDLE -> ACTIVE (or extend timer if already ACTIVE) touch (idle) : IDLE -> ACTIVE touch (active): ACTIVE -> IDLE (immediate) timeout : ACTIVE -> IDLE (after doorbell_viewport_timeout seconds) """ import asyncio import json import logging import os import signal import ssl import struct import subprocess import sys import time import zlib from enum import Enum from pathlib import Path import inspect import evdev import requests import urllib3 import websockets urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Parameter name for passing headers to websockets.connect() varies by version. # Detect it at import time rather than assuming based on version number. _WS_HEADERS_KWARG = ( "additional_headers" if "additional_headers" in inspect.signature(websockets.connect).parameters else "extra_headers" ) logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", stream=sys.stdout, ) log = logging.getLogger("doorbell-viewport") class State(Enum): IDLE = "idle" ACTIVE = "active" class Config: def __init__(self): self.protect_host = os.environ["DOORBELL_VIEWPORT_PROTECT_HOST"] self.protect_username = os.environ["DOORBELL_VIEWPORT_PROTECT_USERNAME"] self.protect_password = os.environ["DOORBELL_VIEWPORT_PROTECT_PASSWORD"] self.camera_id = os.environ["DOORBELL_VIEWPORT_CAMERA_ID"] self.timeout = int(os.environ.get("DOORBELL_VIEWPORT_TIMEOUT", "45")) self.touch_match = os.environ.get("DOORBELL_VIEWPORT_TOUCH_MATCH", "") self.prebuffer_mode = os.environ.get("DOORBELL_VIEWPORT_PREBUFFER_MODE", "warm") self.display_backend = os.environ.get("DOORBELL_VIEWPORT_DISPLAY_BACKEND", "vcgencmd") self.orientation = int(os.environ.get("DOORBELL_VIEWPORT_ORIENTATION", "270")) self.drm_device = os.environ.get("DOORBELL_VIEWPORT_DRM_DEVICE", "/dev/dri/card1") self.drm_connector = os.environ.get("DOORBELL_VIEWPORT_DRM_CONNECTOR", "HDMI-A-1") self.drm_mode = os.environ.get("DOORBELL_VIEWPORT_DRM_MODE", "848x480") self.rtsp_url = None def log_config(self): log.info( "Config: protect_host=%s camera_id=%s timeout=%ds " "prebuffer_mode=%s display_backend=%s orientation=%d " "drm_device=%s drm_connector=%s drm_mode=%s", self.protect_host, self.camera_id, self.timeout, self.prebuffer_mode, self.display_backend, self.orientation, self.drm_device, self.drm_connector, self.drm_mode, ) class DisplayController: """ Controls display backlight power. Backends: vcgencmd — vcgencmd display_power 0/1 (Raspberry Pi firmware) drm — /sys/class/backlight sysfs (DRM/KMS kernel interface) panel — /sys/class/backlight sysfs (alias for drm, panel-specific path) """ def __init__(self, backend: str): self.backend = backend log.info("Display backend: %s", backend) def on(self): log.info("Display power: ON [backend=%s]", self.backend) try: if self.backend == "vcgencmd": subprocess.run( ["/usr/bin/vcgencmd", "display_power", "1"], check=True, capture_output=True, ) elif self.backend in ("drm", "panel"): self._sysfs_set(True) else: log.warning("Unknown display backend: %s", self.backend) except Exception as exc: log.error("Display ON failed: %s", exc) def off(self): log.info("Display power: OFF [backend=%s]", self.backend) try: if self.backend == "vcgencmd": subprocess.run( ["/usr/bin/vcgencmd", "display_power", "0"], check=True, capture_output=True, ) elif self.backend in ("drm", "panel"): self._sysfs_set(False) else: log.warning("Unknown display backend: %s", self.backend) except Exception as exc: log.error("Display OFF failed: %s", exc) def _sysfs_set(self, enabled: bool): paths = sorted(Path("/sys/class/backlight").glob("*")) if not paths: log.warning("No backlight device found in /sys/class/backlight") return path = paths[0] log.debug("Backlight device: %s", path) if enabled: try: max_b = int((path / "max_brightness").read_text().strip()) (path / "brightness").write_text(str(max_b)) except Exception as exc: log.error("Backlight on failed: %s", exc) else: try: (path / "brightness").write_text("0") except Exception as exc: log.error("Backlight off failed: %s", exc) class ProtectClient: """Handles UniFi Protect authentication and camera info retrieval.""" def __init__(self, config: Config): self.config = config self.session = requests.Session() self.session.verify = False self.base_url = f"https://{config.protect_host}" def authenticate(self) -> bool: log.info("Protect: authenticating at %s", self.config.protect_host) try: resp = self.session.post( f"{self.base_url}/api/auth/login", json={ "username": self.config.protect_username, "password": self.config.protect_password, }, timeout=10, ) resp.raise_for_status() log.info("Protect: authentication successful") return True except Exception as exc: log.error("Protect: authentication failed: %s", exc) return False def get_last_update_id(self) -> str | None: try: resp = self.session.get( f"{self.base_url}/proxy/protect/api/bootstrap", timeout=10, ) resp.raise_for_status() uid = resp.json().get("lastUpdateId") log.info("Protect: lastUpdateId=%s", uid) return uid except Exception as exc: log.warning("Protect: failed to fetch lastUpdateId: %s", exc) return None def get_camera_rtsp_url(self) -> str | None: log.info("Protect: fetching camera info for id=%s", self.config.camera_id) try: resp = self.session.get( f"{self.base_url}/proxy/protect/api/cameras/{self.config.camera_id}", timeout=10, ) resp.raise_for_status() data = resp.json() for channel in data.get("channels", []): if channel.get("isRtspEnabled"): alias = channel.get("rtspAlias") if alias: url = f"rtsp://{self.config.protect_host}:7447/{alias}" log.info("Protect: RTSP URL: %s", url) return url log.error("Protect: no enabled RTSP channel for camera %s", self.config.camera_id) return None except Exception as exc: log.error("Protect: failed to fetch camera info: %s", exc) return None def ws_headers(self) -> dict: cookies = self.session.cookies.get_dict() return {"Cookie": "; ".join(f"{k}={v}" for k, v in cookies.items())} def decode_protect_packets(data: bytes) -> list: """ Decode all binary UniFi Protect WebSocket packets from a single message. Each WebSocket message contains one or more concatenated packets. Each packet header (8 bytes): [0] packet_type 1=action, 2=data [1] payload_format 1=JSON, 2=UTF8, 3=buffer [2] deflated 0 or 1 [3] unused [4:8] payload_size big-endian uint32 Followed by payload_size bytes of payload. """ packets = [] offset = 0 while offset + 8 <= len(data): packet_type = data[offset] payload_format = data[offset + 1] deflated = bool(data[offset + 2]) payload_size = struct.unpack(">I", data[offset + 4:offset + 8])[0] end = offset + 8 + payload_size if end > len(data): break payload_bytes = data[offset + 8:end] if deflated: try: payload_bytes = zlib.decompress(payload_bytes) except Exception as exc: log.debug("Packet decompression failed: %s", exc) offset = end continue if payload_format in (1, 2): try: packets.append({ "packet_type": packet_type, "payload": json.loads(payload_bytes), }) except Exception: pass offset = end return packets class DoorbellViewport: def __init__(self, config: Config): self.config = config self.state = State.IDLE self.display = DisplayController(config.display_backend) self.protect = ProtectClient(config) self.mpv_proc = None self.timer_task = None self._running = True async def run(self): log.info("doorbell-viewport starting") self.config.log_config() # Ensure display starts off regardless of prior state self.display.off() # Authenticate and obtain RTSP URL loop = asyncio.get_event_loop() ok = await loop.run_in_executor(None, self.protect.authenticate) if ok: url = await loop.run_in_executor(None, self.protect.get_camera_rtsp_url) if url: self.config.rtsp_url = url if not self.config.rtsp_url: log.warning("No RTSP URL at startup; will retry after reconnect") # Warm prebuffer: mpv starts immediately, rendering to framebuffer # while display remains off — zero-latency activation later. # display.off() is called AFTER start_mpv() because mpv's DRM mode # setting re-enables the HDMI output, overriding any prior vcgencmd state. if self.config.prebuffer_mode == "warm" and self.config.rtsp_url: await self.start_mpv() await asyncio.sleep(3) # wait for mpv DRM mode-setting to complete self.display.off() log.info("Warm prebuffer active: mpv running, display off") await asyncio.gather( self.protect_listener(), self.touch_listener(), ) async def activate(self): """Transition IDLE->ACTIVE, or extend timer if already ACTIVE.""" was_idle = self.state == State.IDLE # Set state before any await so concurrent coroutines see updated state self.state = State.ACTIVE if self.timer_task and not self.timer_task.done(): self.timer_task.cancel() self.timer_task = asyncio.create_task(self._timeout_task()) if was_idle: log.info("State: IDLE -> ACTIVE") if self.config.prebuffer_mode == "warm": # mpv already buffering; just enable the display self.display.on() else: await self.start_mpv() self.display.on() else: log.info("State: ACTIVE -> timer extended") async def deactivate(self): """Transition ACTIVE->IDLE: immediately stop playback and kill display.""" if self.state == State.IDLE: return # Set state before any await self.state = State.IDLE log.info("State: ACTIVE -> IDLE") if self.timer_task and not self.timer_task.done(): self.timer_task.cancel() self.timer_task = None self.display.off() if self.config.prebuffer_mode != "warm": await self.stop_mpv() async def on_ring(self): log.info("Event: doorbell ring") await self.activate() async def on_touch(self): log.info("Event: touch input (state=%s)", self.state.value) if self.state == State.IDLE: await self.activate() else: await self.deactivate() async def _timeout_task(self): await asyncio.sleep(self.config.timeout) log.info("Timeout: %ds elapsed", self.config.timeout) await self.deactivate() async def start_mpv(self): if self.mpv_proc and self.mpv_proc.returncode is None: log.debug("mpv already running (pid=%d)", self.mpv_proc.pid) return if not self.config.rtsp_url: log.error("Cannot start mpv: no RTSP URL") return cmd = [ "mpv", "--vo=drm", f"--drm-device={self.config.drm_device}", f"--drm-connector={self.config.drm_connector}", *( [f"--drm-mode={self.config.drm_mode}"] if self.config.drm_mode else [] ), f"--video-rotate={self.config.orientation}", "--fullscreen", "--no-border", "--no-osc", "--no-audio", "--no-input-default-bindings", "--no-config", "--really-quiet", "--loop=inf", "--cache=yes", "--cache-secs=3", "--demuxer-max-bytes=50M", "--hwdec=no", self.config.rtsp_url, ] log.info("Starting mpv: %s", " ".join(cmd)) try: self.mpv_proc = await asyncio.create_subprocess_exec( *cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) log.info("mpv started (pid=%d)", self.mpv_proc.pid) asyncio.create_task(self._watch_mpv()) except Exception as exc: log.error("Failed to start mpv: %s", exc) async def stop_mpv(self): if not self.mpv_proc or self.mpv_proc.returncode is not None: return pid = self.mpv_proc.pid log.info("Stopping mpv (pid=%d)", pid) try: self.mpv_proc.terminate() await asyncio.wait_for(self.mpv_proc.wait(), timeout=3.0) except asyncio.TimeoutError: log.warning("mpv (pid=%d) did not terminate cleanly, killing", pid) self.mpv_proc.kill() await self.mpv_proc.wait() self.mpv_proc = None async def _watch_mpv(self): """Restart mpv once on unexpected exit.""" proc = self.mpv_proc if not proc: return await proc.wait() rc = proc.returncode if not self._running: return if rc != 0: log.warning("mpv exited unexpectedly (rc=%d)", rc) if self.mpv_proc is proc: self.mpv_proc = None await asyncio.sleep(2) if self.config.prebuffer_mode == "warm" or self.state == State.ACTIVE: log.info("Restarting mpv after unexpected exit") await self.start_mpv() async def protect_listener(self): """Maintain UniFi Protect WebSocket connection with exponential backoff.""" backoff = 1 while self._running: try: await self._connect_protect_ws() backoff = 1 except Exception as exc: log.error("Protect WebSocket error: %s", exc) if not self._running: break log.info("Protect: reconnecting in %ds", backoff) await asyncio.sleep(backoff) backoff = min(backoff * 2, 60) loop = asyncio.get_event_loop() ok = await loop.run_in_executor(None, self.protect.authenticate) if ok and not self.config.rtsp_url: url = await loop.run_in_executor(None, self.protect.get_camera_rtsp_url) if url: self.config.rtsp_url = url if self.config.prebuffer_mode == "warm": await self.start_mpv() async def _connect_protect_ws(self): log.info("Protect: connecting to WebSocket") ssl_ctx = ssl.create_default_context() ssl_ctx.check_hostname = False ssl_ctx.verify_mode = ssl.CERT_NONE loop = asyncio.get_event_loop() last_update_id = await loop.run_in_executor(None, self.protect.get_last_update_id) ws_url = f"wss://{self.config.protect_host}/proxy/protect/ws/updates" if last_update_id: ws_url += f"?lastUpdateId={last_update_id}" headers = self.protect.ws_headers() async with websockets.connect( ws_url, **{_WS_HEADERS_KWARG: headers}, ssl=ssl_ctx, ping_interval=20, ping_timeout=10, ) as ws: log.info("Protect: WebSocket connected") pending_action = None async for msg in ws: if isinstance(msg, bytes): packets = decode_protect_packets(msg) for pkt in packets: if pkt["packet_type"] == 1: pending_action = pkt["payload"] elif pkt["packet_type"] == 2 and pending_action is not None: await self._handle_protect_event(pending_action, pkt["payload"]) pending_action = None async def _handle_protect_event(self, action: dict, data: dict): if action.get("action") != "add": return if action.get("modelKey") != "event": return event_type = data.get("type") camera = data.get("camera") or data.get("cameraId") or "" if event_type == "ring" and camera == self.config.camera_id: log.info("Protect: ring event from camera %s", camera) await self.on_ring() async def touch_listener(self): """Monitor evdev touch input with automatic device re-discovery.""" while self._running: try: await self._monitor_touch_device() except Exception as exc: log.error("Touch device error: %s", exc) if not self._running: break log.info("Touch: retrying device discovery in 10s") await asyncio.sleep(10) def _find_touch_device_sync(self): """Find touch device by name match or multitouch capability.""" match = self.config.touch_match.lower() for path in evdev.list_devices(): try: dev = evdev.InputDevice(path) if match and match in dev.name.lower(): log.info("Touch: matched by name: %s (%s)", dev.name, path) return dev # Fallback: any device advertising multitouch absolute axes caps = dev.capabilities() if evdev.ecodes.EV_ABS in caps: axis_codes = [code for code, _ in caps[evdev.ecodes.EV_ABS]] if evdev.ecodes.ABS_MT_POSITION_X in axis_codes: log.info("Touch: matched by capability: %s (%s)", dev.name, path) return dev dev.close() except Exception: continue return None async def _monitor_touch_device(self): loop = asyncio.get_event_loop() dev = await loop.run_in_executor(None, self._find_touch_device_sync) if not dev: log.warning("Touch: no device found") await asyncio.sleep(10) return log.info("Touch: monitoring %s (%s)", dev.name, dev.path) last_touch_time = 0.0 try: async for event in dev.async_read_loop(): if not self._running: break triggered = False if event.type == evdev.ecodes.EV_ABS: # Multitouch: new contact (tracking ID assigned) if (event.code == evdev.ecodes.ABS_MT_TRACKING_ID and event.value >= 0): triggered = True elif event.type == evdev.ecodes.EV_KEY: # Single-touch BTN_TOUCH press if event.code == evdev.ecodes.BTN_TOUCH and event.value == 1: triggered = True if triggered: now = time.monotonic() if now - last_touch_time > 0.5: # 500ms debounce last_touch_time = now await self.on_touch() finally: dev.close() async def main(): try: config = Config() except KeyError as exc: log.error("Missing required environment variable: %s", exc) sys.exit(1) viewport = DoorbellViewport(config) loop = asyncio.get_event_loop() def shutdown(sig): log.info("Signal %s received, shutting down", sig.name) viewport._running = False for task in asyncio.all_tasks(loop): task.cancel() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, lambda s=sig: shutdown(s)) try: await viewport.run() except asyncio.CancelledError: pass finally: log.info("Shutting down") await viewport.stop_mpv() viewport.display.off() log.info("doorbell-viewport stopped") if __name__ == "__main__": asyncio.run(main())