src.nth.io/

summaryrefslogtreecommitdiff
path: root/doorbell-viewport/files/doorbell-viewport.py
diff options
context:
space:
mode:
Diffstat (limited to 'doorbell-viewport/files/doorbell-viewport.py')
-rw-r--r--doorbell-viewport/files/doorbell-viewport.py567
1 files changed, 567 insertions, 0 deletions
diff --git a/doorbell-viewport/files/doorbell-viewport.py b/doorbell-viewport/files/doorbell-viewport.py
new file mode 100644
index 0000000..1bdcc29
--- /dev/null
+++ b/doorbell-viewport/files/doorbell-viewport.py
@@ -0,0 +1,567 @@
+#!/usr/bin/env python3
+"""
+doorbell-viewport: UniFi Protect doorbell display daemon
+
+Connects to UniFi Protect, listens for doorbell ring events, and manages
+display power + video playback on a Raspberry Pi portrait touchscreen.
+
+States:
+ IDLE — display backlight off, no playback
+ ACTIVE — display on, live RTSP stream playing via mpv
+
+Transitions:
+ ring event : IDLE -> ACTIVE (or extend timer if already ACTIVE)
+ touch (idle) : IDLE -> ACTIVE
+ touch (active): ACTIVE -> IDLE (immediate)
+ timeout : ACTIVE -> IDLE (after doorbell_viewport_timeout seconds)
+"""
+
+import asyncio
+import json
+import logging
+import os
+import signal
+import ssl
+import struct
+import subprocess
+import sys
+import time
+import zlib
+from enum import Enum
+from pathlib import Path
+
+import evdev
+import requests
+import urllib3
+import websockets
+
+urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
+
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s %(levelname)s %(message)s",
+ stream=sys.stdout,
+)
+log = logging.getLogger("doorbell-viewport")
+
+
+class State(Enum):
+ IDLE = "idle"
+ ACTIVE = "active"
+
+
+class Config:
+ def __init__(self):
+ self.protect_host = os.environ["DOORBELL_VIEWPORT_PROTECT_HOST"]
+ self.protect_username = os.environ["DOORBELL_VIEWPORT_PROTECT_USERNAME"]
+ self.protect_password = os.environ["DOORBELL_VIEWPORT_PROTECT_PASSWORD"]
+ self.camera_id = os.environ["DOORBELL_VIEWPORT_CAMERA_ID"]
+ self.timeout = int(os.environ.get("DOORBELL_VIEWPORT_TIMEOUT", "45"))
+ self.touch_match = os.environ.get("DOORBELL_VIEWPORT_TOUCH_MATCH", "")
+ self.prebuffer_mode = os.environ.get("DOORBELL_VIEWPORT_PREBUFFER_MODE", "warm")
+ self.display_backend = os.environ.get("DOORBELL_VIEWPORT_DISPLAY_BACKEND", "vcgencmd")
+ self.orientation = int(os.environ.get("DOORBELL_VIEWPORT_ORIENTATION", "270"))
+ self.drm_device = os.environ.get("DOORBELL_VIEWPORT_DRM_DEVICE", "/dev/dri/card1")
+ self.drm_connector = os.environ.get("DOORBELL_VIEWPORT_DRM_CONNECTOR", "HDMI-A-1")
+ self.drm_mode = os.environ.get("DOORBELL_VIEWPORT_DRM_MODE", "848x480")
+ self.rtsp_url = None
+
+ def log_config(self):
+ log.info(
+ "Config: protect_host=%s camera_id=%s timeout=%ds "
+ "prebuffer_mode=%s display_backend=%s orientation=%d "
+ "drm_device=%s drm_connector=%s drm_mode=%s",
+ self.protect_host,
+ self.camera_id,
+ self.timeout,
+ self.prebuffer_mode,
+ self.display_backend,
+ self.orientation,
+ self.drm_device,
+ self.drm_connector,
+ self.drm_mode,
+ )
+
+
+class DisplayController:
+ """
+ Controls display backlight power.
+
+ Backends:
+ vcgencmd — vcgencmd display_power 0/1 (Raspberry Pi firmware)
+ drm — /sys/class/backlight sysfs (DRM/KMS kernel interface)
+ panel — /sys/class/backlight sysfs (alias for drm, panel-specific path)
+ """
+
+ def __init__(self, backend: str):
+ self.backend = backend
+ log.info("Display backend: %s", backend)
+
+ def on(self):
+ log.info("Display power: ON [backend=%s]", self.backend)
+ try:
+ if self.backend == "vcgencmd":
+ subprocess.run(
+ ["vcgencmd", "display_power", "1"],
+ check=True,
+ capture_output=True,
+ )
+ elif self.backend in ("drm", "panel"):
+ self._sysfs_set(True)
+ else:
+ log.warning("Unknown display backend: %s", self.backend)
+ except Exception as exc:
+ log.error("Display ON failed: %s", exc)
+
+ def off(self):
+ log.info("Display power: OFF [backend=%s]", self.backend)
+ try:
+ if self.backend == "vcgencmd":
+ subprocess.run(
+ ["vcgencmd", "display_power", "0"],
+ check=True,
+ capture_output=True,
+ )
+ elif self.backend in ("drm", "panel"):
+ self._sysfs_set(False)
+ else:
+ log.warning("Unknown display backend: %s", self.backend)
+ except Exception as exc:
+ log.error("Display OFF failed: %s", exc)
+
+ def _sysfs_set(self, enabled: bool):
+ paths = sorted(Path("/sys/class/backlight").glob("*"))
+ if not paths:
+ log.warning("No backlight device found in /sys/class/backlight")
+ return
+ path = paths[0]
+ log.debug("Backlight device: %s", path)
+ if enabled:
+ try:
+ max_b = int((path / "max_brightness").read_text().strip())
+ (path / "brightness").write_text(str(max_b))
+ except Exception as exc:
+ log.error("Backlight on failed: %s", exc)
+ else:
+ try:
+ (path / "brightness").write_text("0")
+ except Exception as exc:
+ log.error("Backlight off failed: %s", exc)
+
+
+class ProtectClient:
+ """Handles UniFi Protect authentication and camera info retrieval."""
+
+ def __init__(self, config: Config):
+ self.config = config
+ self.session = requests.Session()
+ self.session.verify = False
+ self.base_url = f"https://{config.protect_host}"
+
+ def authenticate(self) -> bool:
+ log.info("Protect: authenticating at %s", self.config.protect_host)
+ try:
+ resp = self.session.post(
+ f"{self.base_url}/api/auth/login",
+ json={
+ "username": self.config.protect_username,
+ "password": self.config.protect_password,
+ },
+ timeout=10,
+ )
+ resp.raise_for_status()
+ log.info("Protect: authentication successful")
+ return True
+ except Exception as exc:
+ log.error("Protect: authentication failed: %s", exc)
+ return False
+
+ def get_camera_rtsp_url(self) -> str | None:
+ log.info("Protect: fetching camera info for id=%s", self.config.camera_id)
+ try:
+ resp = self.session.get(
+ f"{self.base_url}/proxy/protect/api/cameras/{self.config.camera_id}",
+ timeout=10,
+ )
+ resp.raise_for_status()
+ data = resp.json()
+ for channel in data.get("channels", []):
+ if channel.get("isRtspEnabled"):
+ alias = channel.get("rtspAlias")
+ if alias:
+ url = f"rtsp://{self.config.protect_host}:7447/{alias}"
+ log.info("Protect: RTSP URL: %s", url)
+ return url
+ log.error("Protect: no enabled RTSP channel for camera %s", self.config.camera_id)
+ return None
+ except Exception as exc:
+ log.error("Protect: failed to fetch camera info: %s", exc)
+ return None
+
+ def ws_headers(self) -> dict:
+ cookies = self.session.cookies.get_dict()
+ return {"Cookie": "; ".join(f"{k}={v}" for k, v in cookies.items())}
+
+
+def decode_protect_packet(data: bytes) -> dict | None:
+ """
+ Decode a binary UniFi Protect WebSocket packet.
+
+ Header (8 bytes):
+ [0] packet_type 1=action, 2=data
+ [1] payload_format 1=JSON, 2=UTF8, 3=buffer
+ [2] deflated 0 or 1
+ [3] unused
+ [4:8] payload_size big-endian uint32
+
+ Followed by payload_size bytes of payload.
+ """
+ if len(data) < 8:
+ return None
+ packet_type = data[0]
+ payload_format = data[1]
+ deflated = bool(data[2])
+ payload_size = struct.unpack(">I", data[4:8])[0]
+ payload_bytes = data[8:8 + payload_size]
+ if deflated:
+ try:
+ payload_bytes = zlib.decompress(payload_bytes)
+ except Exception as exc:
+ log.debug("Packet decompression failed: %s", exc)
+ return None
+ if payload_format in (1, 2):
+ try:
+ return {
+ "packet_type": packet_type,
+ "payload": json.loads(payload_bytes),
+ }
+ except Exception:
+ return None
+ return None
+
+
+class DoorbellViewport:
+ def __init__(self, config: Config):
+ self.config = config
+ self.state = State.IDLE
+ self.display = DisplayController(config.display_backend)
+ self.protect = ProtectClient(config)
+ self.mpv_proc = None
+ self.timer_task = None
+ self._running = True
+
+ async def run(self):
+ log.info("doorbell-viewport starting")
+ self.config.log_config()
+
+ # Ensure display starts off regardless of prior state
+ self.display.off()
+
+ # Authenticate and obtain RTSP URL
+ loop = asyncio.get_event_loop()
+ ok = await loop.run_in_executor(None, self.protect.authenticate)
+ if ok:
+ url = await loop.run_in_executor(None, self.protect.get_camera_rtsp_url)
+ if url:
+ self.config.rtsp_url = url
+
+ if not self.config.rtsp_url:
+ log.warning("No RTSP URL at startup; will retry after reconnect")
+
+ # Warm prebuffer: mpv starts immediately, rendering to framebuffer
+ # while display remains off — zero-latency activation later
+ if self.config.prebuffer_mode == "warm" and self.config.rtsp_url:
+ await self.start_mpv()
+ log.info("Warm prebuffer active: mpv running, display off")
+
+ await asyncio.gather(
+ self.protect_listener(),
+ self.touch_listener(),
+ )
+
+ async def activate(self):
+ """Transition IDLE->ACTIVE, or extend timer if already ACTIVE."""
+ was_idle = self.state == State.IDLE
+ # Set state before any await so concurrent coroutines see updated state
+ self.state = State.ACTIVE
+
+ if self.timer_task and not self.timer_task.done():
+ self.timer_task.cancel()
+ self.timer_task = asyncio.create_task(self._timeout_task())
+
+ if was_idle:
+ log.info("State: IDLE -> ACTIVE")
+ if self.config.prebuffer_mode == "warm":
+ # mpv already buffering; just enable the display
+ self.display.on()
+ else:
+ await self.start_mpv()
+ self.display.on()
+ else:
+ log.info("State: ACTIVE -> timer extended")
+
+ async def deactivate(self):
+ """Transition ACTIVE->IDLE: immediately stop playback and kill display."""
+ if self.state == State.IDLE:
+ return
+ # Set state before any await
+ self.state = State.IDLE
+ log.info("State: ACTIVE -> IDLE")
+
+ if self.timer_task and not self.timer_task.done():
+ self.timer_task.cancel()
+ self.timer_task = None
+
+ self.display.off()
+ if self.config.prebuffer_mode != "warm":
+ await self.stop_mpv()
+
+ async def on_ring(self):
+ log.info("Event: doorbell ring")
+ await self.activate()
+
+ async def on_touch(self):
+ log.info("Event: touch input (state=%s)", self.state.value)
+ if self.state == State.IDLE:
+ await self.activate()
+ else:
+ await self.deactivate()
+
+ async def _timeout_task(self):
+ await asyncio.sleep(self.config.timeout)
+ log.info("Timeout: %ds elapsed", self.config.timeout)
+ await self.deactivate()
+
+ async def start_mpv(self):
+ if self.mpv_proc and self.mpv_proc.returncode is None:
+ log.debug("mpv already running (pid=%d)", self.mpv_proc.pid)
+ return
+ if not self.config.rtsp_url:
+ log.error("Cannot start mpv: no RTSP URL")
+ return
+
+ cmd = [
+ "mpv",
+ "--vo=drm",
+ f"--drm-device={self.config.drm_device}",
+ f"--drm-connector={self.config.drm_connector}",
+ f"--drm-mode={self.config.drm_mode}",
+ f"--video-rotate={self.config.orientation}",
+ "--fullscreen",
+ "--no-border",
+ "--no-osc",
+ "--no-input-default-bindings",
+ "--no-config",
+ "--really-quiet",
+ "--loop=inf",
+ "--cache=yes",
+ "--cache-secs=3",
+ "--demuxer-max-bytes=50M",
+ "--hwdec=auto",
+ self.config.rtsp_url,
+ ]
+ log.info("Starting mpv: %s", " ".join(cmd))
+ try:
+ self.mpv_proc = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ log.info("mpv started (pid=%d)", self.mpv_proc.pid)
+ asyncio.create_task(self._watch_mpv())
+ except Exception as exc:
+ log.error("Failed to start mpv: %s", exc)
+
+ async def stop_mpv(self):
+ if not self.mpv_proc or self.mpv_proc.returncode is not None:
+ return
+ pid = self.mpv_proc.pid
+ log.info("Stopping mpv (pid=%d)", pid)
+ try:
+ self.mpv_proc.terminate()
+ await asyncio.wait_for(self.mpv_proc.wait(), timeout=3.0)
+ except asyncio.TimeoutError:
+ log.warning("mpv (pid=%d) did not terminate cleanly, killing", pid)
+ self.mpv_proc.kill()
+ await self.mpv_proc.wait()
+ self.mpv_proc = None
+
+ async def _watch_mpv(self):
+ """Restart mpv once on unexpected exit."""
+ proc = self.mpv_proc
+ if not proc:
+ return
+ await proc.wait()
+ rc = proc.returncode
+ if not self._running:
+ return
+ if rc != 0:
+ log.warning("mpv exited unexpectedly (rc=%d)", rc)
+ if self.mpv_proc is proc:
+ self.mpv_proc = None
+ await asyncio.sleep(2)
+ if self.config.prebuffer_mode == "warm" or self.state == State.ACTIVE:
+ log.info("Restarting mpv after unexpected exit")
+ await self.start_mpv()
+
+ async def protect_listener(self):
+ """Maintain UniFi Protect WebSocket connection with exponential backoff."""
+ backoff = 1
+ while self._running:
+ try:
+ await self._connect_protect_ws()
+ backoff = 1
+ except Exception as exc:
+ log.error("Protect WebSocket error: %s", exc)
+ if not self._running:
+ break
+ log.info("Protect: reconnecting in %ds", backoff)
+ await asyncio.sleep(backoff)
+ backoff = min(backoff * 2, 60)
+ loop = asyncio.get_event_loop()
+ ok = await loop.run_in_executor(None, self.protect.authenticate)
+ if ok and not self.config.rtsp_url:
+ url = await loop.run_in_executor(None, self.protect.get_camera_rtsp_url)
+ if url:
+ self.config.rtsp_url = url
+ if self.config.prebuffer_mode == "warm":
+ await self.start_mpv()
+
+ async def _connect_protect_ws(self):
+ log.info("Protect: connecting to WebSocket")
+ ssl_ctx = ssl.create_default_context()
+ ssl_ctx.check_hostname = False
+ ssl_ctx.verify_mode = ssl.CERT_NONE
+ ws_url = f"wss://{self.config.protect_host}/proxy/protect/ws/updates"
+ headers = self.protect.ws_headers()
+
+ async with websockets.connect(
+ ws_url,
+ additional_headers=headers,
+ ssl=ssl_ctx,
+ ping_interval=20,
+ ping_timeout=10,
+ ) as ws:
+ log.info("Protect: WebSocket connected")
+ pending_action = None
+ async for msg in ws:
+ if isinstance(msg, bytes):
+ pkt = decode_protect_packet(msg)
+ if not pkt:
+ continue
+ if pkt["packet_type"] == 1:
+ pending_action = pkt["payload"]
+ elif pkt["packet_type"] == 2 and pending_action is not None:
+ await self._handle_protect_event(pending_action, pkt["payload"])
+ pending_action = None
+
+ async def _handle_protect_event(self, action: dict, data: dict):
+ if action.get("action") != "add":
+ return
+ if action.get("modelKey") != "event":
+ return
+ event_type = data.get("type")
+ # 'camera' field varies by Protect firmware version
+ camera = data.get("camera") or data.get("cameraId") or ""
+ if event_type == "ring" and camera == self.config.camera_id:
+ log.info("Protect: ring event from camera %s", camera)
+ await self.on_ring()
+
+ async def touch_listener(self):
+ """Monitor evdev touch input with automatic device re-discovery."""
+ while self._running:
+ try:
+ await self._monitor_touch_device()
+ except Exception as exc:
+ log.error("Touch device error: %s", exc)
+ if not self._running:
+ break
+ log.info("Touch: retrying device discovery in 10s")
+ await asyncio.sleep(10)
+
+ def _find_touch_device_sync(self):
+ """Find touch device by name match or multitouch capability."""
+ match = self.config.touch_match.lower()
+ for path in evdev.list_devices():
+ try:
+ dev = evdev.InputDevice(path)
+ if match and match in dev.name.lower():
+ log.info("Touch: matched by name: %s (%s)", dev.name, path)
+ return dev
+ # Fallback: any device advertising multitouch absolute axes
+ caps = dev.capabilities()
+ if evdev.ecodes.EV_ABS in caps:
+ axis_codes = [code for code, _ in caps[evdev.ecodes.EV_ABS]]
+ if evdev.ecodes.ABS_MT_POSITION_X in axis_codes:
+ log.info("Touch: matched by capability: %s (%s)", dev.name, path)
+ return dev
+ dev.close()
+ except Exception:
+ continue
+ return None
+
+ async def _monitor_touch_device(self):
+ loop = asyncio.get_event_loop()
+ dev = await loop.run_in_executor(None, self._find_touch_device_sync)
+ if not dev:
+ log.warning("Touch: no device found")
+ await asyncio.sleep(10)
+ return
+
+ log.info("Touch: monitoring %s (%s)", dev.name, dev.path)
+ last_touch_time = 0.0
+ try:
+ async for event in dev.async_read_loop():
+ if not self._running:
+ break
+ triggered = False
+ if event.type == evdev.ecodes.EV_ABS:
+ # Multitouch: new contact (tracking ID assigned)
+ if (event.code == evdev.ecodes.ABS_MT_TRACKING_ID
+ and event.value >= 0):
+ triggered = True
+ elif event.type == evdev.ecodes.EV_KEY:
+ # Single-touch BTN_TOUCH press
+ if event.code == evdev.ecodes.BTN_TOUCH and event.value == 1:
+ triggered = True
+ if triggered:
+ now = time.monotonic()
+ if now - last_touch_time > 0.5: # 500ms debounce
+ last_touch_time = now
+ await self.on_touch()
+ finally:
+ dev.close()
+
+
+async def main():
+ try:
+ config = Config()
+ except KeyError as exc:
+ log.error("Missing required environment variable: %s", exc)
+ sys.exit(1)
+
+ viewport = DoorbellViewport(config)
+ loop = asyncio.get_event_loop()
+
+ def shutdown(sig):
+ log.info("Signal %s received, shutting down", sig.name)
+ viewport._running = False
+ for task in asyncio.all_tasks(loop):
+ task.cancel()
+
+ for sig in (signal.SIGTERM, signal.SIGINT):
+ loop.add_signal_handler(sig, lambda s=sig: shutdown(s))
+
+ try:
+ await viewport.run()
+ except asyncio.CancelledError:
+ pass
+ finally:
+ log.info("Shutting down")
+ await viewport.stop_mpv()
+ viewport.display.off()
+ log.info("doorbell-viewport stopped")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())