src.nth.io/

summaryrefslogtreecommitdiff
path: root/doorbell-viewport/files/doorbell-viewport.py
diff options
context:
space:
mode:
authorLuke Hoersten <[email protected]>2026-04-15 20:00:04 -0500
committerLuke Hoersten <[email protected]>2026-04-15 20:00:04 -0500
commit4ec792319b0cc9ab9aa3410c454f4880515c62c0 (patch)
tree876e693da2e2be042a574838484027de8be6b23d /doorbell-viewport/files/doorbell-viewport.py
parent7d61034d91cc716ad6165f6867c49940ced94909 (diff)
Rename doorbell-viewport role to unifi-protect-viewport
Diffstat (limited to 'doorbell-viewport/files/doorbell-viewport.py')
-rw-r--r--doorbell-viewport/files/doorbell-viewport.py518
1 files changed, 0 insertions, 518 deletions
diff --git a/doorbell-viewport/files/doorbell-viewport.py b/doorbell-viewport/files/doorbell-viewport.py
deleted file mode 100644
index 455a3cd..0000000
--- a/doorbell-viewport/files/doorbell-viewport.py
+++ /dev/null
@@ -1,518 +0,0 @@
-#!/usr/bin/env python3
-"""
-doorbell-viewport: UniFi Protect doorbell display daemon
-
-Connects to UniFi Protect, listens for doorbell ring events, and manages
-display power + video playback on a Raspberry Pi portrait touchscreen.
-
-States:
- IDLE — display backlight off, no playback
- ACTIVE — display on, live RTSP stream playing via mpv
-
-Transitions:
- ring event : IDLE -> ACTIVE (or extend timer if already ACTIVE)
- touch (idle) : IDLE -> ACTIVE
- touch (active): ACTIVE -> IDLE (immediate)
- timeout : ACTIVE -> IDLE (after doorbell_viewport_timeout seconds)
-"""
-
-import asyncio
-import json
-import logging
-import os
-import signal
-import ssl
-import struct
-import subprocess
-import sys
-import time
-import zlib
-from enum import Enum
-from pathlib import Path
-
-import evdev
-import requests
-import urllib3
-import websockets
-
-urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
-
-_SSL_CTX = ssl.create_default_context()
-_SSL_CTX.check_hostname = False
-_SSL_CTX.verify_mode = ssl.CERT_NONE
-
-logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s %(levelname)s %(message)s",
- stream=sys.stdout,
-)
-log = logging.getLogger("doorbell-viewport")
-
-
-class State(Enum):
- IDLE = "idle"
- ACTIVE = "active"
-
-
-class Config:
- def __init__(self):
- self.protect_host = os.environ["DOORBELL_VIEWPORT_PROTECT_HOST"]
- self.protect_username = os.environ["DOORBELL_VIEWPORT_PROTECT_USERNAME"]
- self.protect_password = os.environ["DOORBELL_VIEWPORT_PROTECT_PASSWORD"]
- self.camera_id = os.environ["DOORBELL_VIEWPORT_CAMERA_ID"]
- self.timeout = int(os.environ.get("DOORBELL_VIEWPORT_TIMEOUT", "45"))
- self.touch_match = os.environ.get("DOORBELL_VIEWPORT_TOUCH_MATCH", "")
- self.orientation = int(os.environ.get("DOORBELL_VIEWPORT_ORIENTATION", "270"))
- self.drm_device = os.environ.get("DOORBELL_VIEWPORT_DRM_DEVICE", "/dev/dri/card1")
- self.drm_connector = os.environ.get("DOORBELL_VIEWPORT_DRM_CONNECTOR", "HDMI-A-1")
- self.drm_mode = os.environ.get("DOORBELL_VIEWPORT_DRM_MODE", "")
- self.rtsp_url = None
-
- def log_config(self):
- log.info(
- "Config: protect_host=%s camera_id=%s timeout=%ds "
- "orientation=%d drm_device=%s drm_connector=%s drm_mode=%s",
- self.protect_host,
- self.camera_id,
- self.timeout,
- self.orientation,
- self.drm_device,
- self.drm_connector,
- self.drm_mode,
- )
-
-
-class DisplayController:
- """Controls display backlight via /sys/class/backlight sysfs (DRM/KMS)."""
-
- def on(self):
- log.info("Display: ON")
- self._sysfs_set(True)
-
- def off(self):
- log.info("Display: OFF")
- self._sysfs_set(False)
-
- def _sysfs_set(self, enabled: bool):
- paths = sorted(Path("/sys/class/backlight").glob("*"))
- if not paths:
- log.warning("No backlight device found in /sys/class/backlight")
- return
- path = paths[0]
- try:
- if enabled:
- max_b = int((path / "max_brightness").read_text().strip())
- (path / "brightness").write_text(str(max_b))
- log.info("Backlight %s: brightness -> %d (max)", path.name, max_b)
- else:
- (path / "brightness").write_text("0")
- log.info("Backlight %s: brightness -> 0", path.name)
- except Exception as exc:
- log.error("Backlight %s failed: %s", "on" if enabled else "off", exc)
-
-
-class ProtectClient:
- """Handles UniFi Protect authentication and camera info retrieval."""
-
- def __init__(self, config: Config):
- self.config = config
- self.session = requests.Session()
- self.session.verify = False
- self.base_url = f"https://{config.protect_host}"
-
- def authenticate(self) -> bool:
- log.info("Protect: authenticating at %s", self.config.protect_host)
- try:
- resp = self.session.post(
- f"{self.base_url}/api/auth/login",
- json={
- "username": self.config.protect_username,
- "password": self.config.protect_password,
- },
- timeout=10,
- )
- resp.raise_for_status()
- log.info("Protect: authentication successful")
- return True
- except Exception as exc:
- log.error("Protect: authentication failed: %s", exc)
- return False
-
- def get_last_update_id(self) -> str | None:
- try:
- resp = self.session.get(
- f"{self.base_url}/proxy/protect/api/bootstrap",
- timeout=10,
- )
- resp.raise_for_status()
- uid = resp.json().get("lastUpdateId")
- log.info("Protect: lastUpdateId=%s", uid)
- return uid
- except Exception as exc:
- log.warning("Protect: failed to fetch lastUpdateId: %s", exc)
- return None
-
- def get_camera_rtsp_url(self) -> str | None:
- log.info("Protect: fetching camera info for id=%s", self.config.camera_id)
- try:
- resp = self.session.get(
- f"{self.base_url}/proxy/protect/api/cameras/{self.config.camera_id}",
- timeout=10,
- )
- resp.raise_for_status()
- data = resp.json()
- for channel in data.get("channels", []):
- if channel.get("isRtspEnabled"):
- alias = channel.get("rtspAlias")
- if alias:
- url = f"rtsp://{self.config.protect_host}:7447/{alias}"
- log.info("Protect: RTSP URL: %s", url)
- return url
- log.error("Protect: no enabled RTSP channel for camera %s", self.config.camera_id)
- return None
- except Exception as exc:
- log.error("Protect: failed to fetch camera info: %s", exc)
- return None
-
- def cookie_header(self) -> dict:
- cookies = self.session.cookies.get_dict()
- return {"Cookie": "; ".join(f"{k}={v}" for k, v in cookies.items())}
-
-
-def decode_protect_packets(data: bytes) -> list:
- """
- Decode all binary UniFi Protect WebSocket packets from a single message.
-
- Each WebSocket message contains one or more concatenated packets.
- Each packet header (8 bytes):
- [0] packet_type 1=action, 2=data
- [1] payload_format 1=JSON, 2=UTF8, 3=buffer
- [2] deflated 0 or 1
- [3] unused
- [4:8] payload_size big-endian uint32
-
- Followed by payload_size bytes of payload.
- """
- packets = []
- offset = 0
- while offset + 8 <= len(data):
- packet_type = data[offset]
- payload_format = data[offset + 1]
- deflated = bool(data[offset + 2])
- payload_size = struct.unpack(">I", data[offset + 4:offset + 8])[0]
- end = offset + 8 + payload_size
- if end > len(data):
- break
- payload_bytes = data[offset + 8:end]
- if deflated:
- try:
- payload_bytes = zlib.decompress(payload_bytes)
- except Exception as exc:
- log.debug("Packet decompression failed: %s", exc)
- offset = end
- continue
- if payload_format in (1, 2):
- try:
- packets.append({"packet_type": packet_type, "payload": json.loads(payload_bytes)})
- except Exception:
- pass
- offset = end
- return packets
-
-
-class DoorbellViewport:
- def __init__(self, config: Config):
- self.config = config
- self.state = State.IDLE
- self.display = DisplayController()
- self.protect = ProtectClient(config)
- self.mpv_proc = None
- self.timer_task = None
- self._running = True
-
- async def run(self):
- log.info("doorbell-viewport starting")
- self.config.log_config()
-
- self.display.off()
- await self._login_and_fetch_rtsp()
-
- if not self.config.rtsp_url:
- log.warning("No RTSP URL at startup; will retry after reconnect")
-
- await asyncio.gather(
- self.protect_listener(),
- self.touch_listener(),
- )
-
- async def _login_and_fetch_rtsp(self):
- ok = await asyncio.to_thread(self.protect.authenticate)
- if ok and not self.config.rtsp_url:
- url = await asyncio.to_thread(self.protect.get_camera_rtsp_url)
- if url:
- self.config.rtsp_url = url
-
- async def activate(self):
- """Transition IDLE->ACTIVE, or extend timer if already ACTIVE."""
- was_idle = self.state == State.IDLE
- self.state = State.ACTIVE
-
- if self.timer_task and not self.timer_task.done():
- self.timer_task.cancel()
- self.timer_task = asyncio.create_task(self._timeout_task())
-
- if was_idle:
- log.info("State: IDLE -> ACTIVE")
- await self.start_mpv()
- self.display.on()
- else:
- log.info("State: ACTIVE -> timer extended")
-
- async def deactivate(self):
- """Transition ACTIVE->IDLE: stop playback and turn off display."""
- if self.state == State.IDLE:
- return
- self.state = State.IDLE
- log.info("State: ACTIVE -> IDLE")
-
- if self.timer_task and not self.timer_task.done():
- self.timer_task.cancel()
- self.timer_task = None
-
- self.display.off()
- await self.stop_mpv()
-
- async def on_touch(self):
- log.info("Event: touch input (state=%s)", self.state.value)
- if self.state == State.IDLE:
- await self.activate()
- else:
- await self.deactivate()
-
- async def _timeout_task(self):
- await asyncio.sleep(self.config.timeout)
- log.info("Timeout: %ds elapsed", self.config.timeout)
- await self.deactivate()
-
- async def start_mpv(self):
- if self.mpv_proc and self.mpv_proc.returncode is None:
- log.debug("mpv already running (pid=%d)", self.mpv_proc.pid)
- return
- if not self.config.rtsp_url:
- log.error("Cannot start mpv: no RTSP URL")
- return
-
- cmd = [
- "mpv",
- "--vo=drm",
- f"--drm-device={self.config.drm_device}",
- f"--drm-connector={self.config.drm_connector}",
- *(
- [f"--drm-mode={self.config.drm_mode}"]
- if self.config.drm_mode else []
- ),
- f"--video-rotate={self.config.orientation}",
- "--fullscreen",
- "--no-border",
- "--no-osc",
- "--no-audio",
- "--no-input-default-bindings",
- "--no-config",
- "--really-quiet",
- "--loop=no",
- "--cache=yes",
- "--demuxer-max-bytes=1M",
- "--hwdec=no",
- self.config.rtsp_url,
- ]
- log.info("Starting mpv: %s", " ".join(cmd))
- try:
- self.mpv_proc = await asyncio.create_subprocess_exec(
- *cmd,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL,
- )
- log.info("mpv started (pid=%d)", self.mpv_proc.pid)
- asyncio.create_task(self._watch_mpv())
- except Exception as exc:
- log.error("Failed to start mpv: %s", exc)
-
- async def stop_mpv(self):
- if not self.mpv_proc or self.mpv_proc.returncode is not None:
- return
- pid = self.mpv_proc.pid
- log.info("Stopping mpv (pid=%d)", pid)
- try:
- self.mpv_proc.terminate()
- await asyncio.wait_for(self.mpv_proc.wait(), timeout=3.0)
- except asyncio.TimeoutError:
- log.warning("mpv (pid=%d) did not terminate cleanly, killing", pid)
- self.mpv_proc.kill()
- await self.mpv_proc.wait()
- self.mpv_proc = None
-
- async def _watch_mpv(self):
- """Restart mpv on unexpected exit while display is active."""
- proc = self.mpv_proc
- if not proc:
- return
- await proc.wait()
- rc = proc.returncode
- if not self._running:
- return
- if rc != 0:
- log.warning("mpv exited unexpectedly (rc=%d)", rc)
- if self.mpv_proc is proc:
- self.mpv_proc = None
- await asyncio.sleep(2)
- if self.state == State.ACTIVE:
- log.info("Restarting mpv after unexpected exit")
- await self.start_mpv()
-
- async def protect_listener(self):
- """Maintain UniFi Protect WebSocket connection with exponential backoff."""
- backoff = 1
- while self._running:
- try:
- await self._connect_protect_ws()
- backoff = 1
- except Exception as exc:
- log.error("Protect WebSocket error: %s", exc)
- if not self._running:
- break
- log.info("Protect: reconnecting in %ds", backoff)
- await asyncio.sleep(backoff)
- backoff = min(backoff * 2, 60)
- await self._login_and_fetch_rtsp()
-
- async def _connect_protect_ws(self):
- log.info("Protect: connecting to WebSocket")
- last_update_id = await asyncio.to_thread(self.protect.get_last_update_id)
- ws_url = f"wss://{self.config.protect_host}/proxy/protect/ws/updates"
- if last_update_id:
- ws_url += f"?lastUpdateId={last_update_id}"
-
- async with websockets.connect(
- ws_url,
- extra_headers=self.protect.cookie_header(),
- ssl=_SSL_CTX,
- ping_interval=20,
- ping_timeout=10,
- ) as ws:
- log.info("Protect: WebSocket connected")
- pending_action = None
- async for msg in ws:
- if isinstance(msg, bytes):
- packets = decode_protect_packets(msg)
- for pkt in packets:
- if pkt["packet_type"] == 1:
- pending_action = pkt["payload"]
- elif pkt["packet_type"] == 2 and pending_action is not None:
- await self._handle_protect_event(pending_action, pkt["payload"])
- pending_action = None
-
- async def _handle_protect_event(self, action: dict, data: dict):
- if action.get("action") != "add":
- return
- if action.get("modelKey") != "event":
- return
- event_type = data.get("type")
- camera = data.get("camera") or data.get("cameraId") or ""
- if event_type == "ring" and camera == self.config.camera_id:
- log.info("Protect: ring event from camera %s", camera)
- await self.activate()
-
- async def touch_listener(self):
- """Monitor evdev touch input with automatic device re-discovery."""
- while self._running:
- try:
- await self._monitor_touch_device()
- except Exception as exc:
- log.error("Touch device error: %s", exc)
- if not self._running:
- break
- log.info("Touch: retrying device discovery in 10s")
- await asyncio.sleep(10)
-
- def _find_touch_device_sync(self):
- """Find touch device by name match or multitouch capability."""
- match = self.config.touch_match.lower()
- for path in evdev.list_devices():
- try:
- dev = evdev.InputDevice(path)
- if match and match in dev.name.lower():
- log.info("Touch: matched by name: %s (%s)", dev.name, path)
- return dev
- caps = dev.capabilities()
- if evdev.ecodes.EV_ABS in caps:
- axis_codes = [code for code, _ in caps[evdev.ecodes.EV_ABS]]
- if evdev.ecodes.ABS_MT_POSITION_X in axis_codes:
- log.info("Touch: matched by capability: %s (%s)", dev.name, path)
- return dev
- dev.close()
- except Exception:
- continue
- return None
-
- async def _monitor_touch_device(self):
- dev = await asyncio.to_thread(self._find_touch_device_sync)
- if not dev:
- log.warning("Touch: no device found")
- await asyncio.sleep(10)
- return
-
- log.info("Touch: monitoring %s (%s)", dev.name, dev.path)
- last_touch_time = 0.0
- try:
- async for event in dev.async_read_loop():
- if not self._running:
- break
- triggered = (
- (event.type == evdev.ecodes.EV_ABS
- and event.code == evdev.ecodes.ABS_MT_TRACKING_ID
- and event.value >= 0)
- or (event.type == evdev.ecodes.EV_KEY
- and event.code == evdev.ecodes.BTN_TOUCH
- and event.value == 1)
- )
- if triggered:
- now = time.monotonic()
- if now - last_touch_time > 0.5: # 500ms debounce
- last_touch_time = now
- await self.on_touch()
- finally:
- dev.close()
-
-
-async def main():
- try:
- config = Config()
- except KeyError as exc:
- log.error("Missing required environment variable: %s", exc)
- sys.exit(1)
-
- viewport = DoorbellViewport(config)
- loop = asyncio.get_running_loop()
-
- def shutdown(sig):
- log.info("Signal %s received, shutting down", sig.name)
- viewport._running = False
- for task in asyncio.all_tasks(loop):
- task.cancel()
-
- for sig in (signal.SIGTERM, signal.SIGINT):
- loop.add_signal_handler(sig, lambda s=sig: shutdown(s))
-
- try:
- await viewport.run()
- except asyncio.CancelledError:
- pass
- finally:
- log.info("Shutting down")
- await viewport.stop_mpv()
- viewport.display.off()
- log.info("doorbell-viewport stopped")
-
-
-if __name__ == "__main__":
- asyncio.run(main())