src.nth.io/

summaryrefslogtreecommitdiff
path: root/doorbell-viewport/files/doorbell-viewport.py
diff options
context:
space:
mode:
Diffstat (limited to 'doorbell-viewport/files/doorbell-viewport.py')
-rw-r--r--doorbell-viewport/files/doorbell-viewport.py138
1 files changed, 41 insertions, 97 deletions
diff --git a/doorbell-viewport/files/doorbell-viewport.py b/doorbell-viewport/files/doorbell-viewport.py
index 555685c..6fa67c0 100644
--- a/doorbell-viewport/files/doorbell-viewport.py
+++ b/doorbell-viewport/files/doorbell-viewport.py
@@ -10,13 +10,14 @@ States:
ACTIVE — display on, live RTSP stream playing via mpv
Transitions:
- ring event : IDLE -> ACTIVE (or extend timer if already ACTIVE)
- touch (idle) : IDLE -> ACTIVE
+ ring event : IDLE -> ACTIVE (or extend timer if already ACTIVE)
+ touch (idle) : IDLE -> ACTIVE
touch (active): ACTIVE -> IDLE (immediate)
- timeout : ACTIVE -> IDLE (after doorbell_viewport_timeout seconds)
+ timeout : ACTIVE -> IDLE (after doorbell_viewport_timeout seconds)
"""
import asyncio
+import inspect
import json
import logging
import os
@@ -30,8 +31,6 @@ import zlib
from enum import Enum
from pathlib import Path
-import inspect
-
import evdev
import requests
import urllib3
@@ -68,22 +67,19 @@ class Config:
self.camera_id = os.environ["DOORBELL_VIEWPORT_CAMERA_ID"]
self.timeout = int(os.environ.get("DOORBELL_VIEWPORT_TIMEOUT", "45"))
self.touch_match = os.environ.get("DOORBELL_VIEWPORT_TOUCH_MATCH", "")
- self.display_backend = os.environ.get("DOORBELL_VIEWPORT_DISPLAY_BACKEND", "vcgencmd")
self.orientation = int(os.environ.get("DOORBELL_VIEWPORT_ORIENTATION", "270"))
self.drm_device = os.environ.get("DOORBELL_VIEWPORT_DRM_DEVICE", "/dev/dri/card1")
self.drm_connector = os.environ.get("DOORBELL_VIEWPORT_DRM_CONNECTOR", "HDMI-A-1")
- self.drm_mode = os.environ.get("DOORBELL_VIEWPORT_DRM_MODE", "848x480")
+ self.drm_mode = os.environ.get("DOORBELL_VIEWPORT_DRM_MODE", "")
self.rtsp_url = None
def log_config(self):
log.info(
"Config: protect_host=%s camera_id=%s timeout=%ds "
- "display_backend=%s orientation=%d "
- "drm_device=%s drm_connector=%s drm_mode=%s",
+ "orientation=%d drm_device=%s drm_connector=%s drm_mode=%s",
self.protect_host,
self.camera_id,
self.timeout,
- self.display_backend,
self.orientation,
self.drm_device,
self.drm_connector,
@@ -92,50 +88,18 @@ class Config:
class DisplayController:
- """
- Controls display backlight power.
-
- Backends:
- vcgencmd — vcgencmd display_power 0/1 (Raspberry Pi firmware)
- drm — /sys/class/backlight sysfs (DRM/KMS kernel interface)
- panel — /sys/class/backlight sysfs (alias for drm, panel-specific path)
- """
-
- def __init__(self, backend: str):
- self.backend = backend
- log.info("Display backend: %s", backend)
+ """Controls display backlight via /sys/class/backlight sysfs (DRM/KMS)."""
def on(self):
- log.info("Display power: ON [backend=%s]", self.backend)
- try:
- if self.backend == "vcgencmd":
- subprocess.run(
- ["/usr/bin/vcgencmd", "display_power", "1"],
- check=True,
- capture_output=True,
- )
- elif self.backend in ("drm", "panel"):
- self._sysfs_set(True)
- else:
- log.warning("Unknown display backend: %s", self.backend)
- except Exception as exc:
- log.error("Display ON failed: %s", exc)
+ self._set(True)
def off(self):
- log.info("Display power: OFF [backend=%s]", self.backend)
- try:
- if self.backend == "vcgencmd":
- subprocess.run(
- ["/usr/bin/vcgencmd", "display_power", "0"],
- check=True,
- capture_output=True,
- )
- elif self.backend in ("drm", "panel"):
- self._sysfs_set(False)
- else:
- log.warning("Unknown display backend: %s", self.backend)
- except Exception as exc:
- log.error("Display OFF failed: %s", exc)
+ self._set(False)
+
+ def _set(self, enabled: bool):
+ state = "ON" if enabled else "OFF"
+ log.info("Display power: %s", state)
+ self._sysfs_set(enabled)
def _sysfs_set(self, enabled: bool):
paths = sorted(Path("/sys/class/backlight").glob("*"))
@@ -143,19 +107,16 @@ class DisplayController:
log.warning("No backlight device found in /sys/class/backlight")
return
path = paths[0]
- if enabled:
- try:
+ try:
+ if enabled:
max_b = int((path / "max_brightness").read_text().strip())
(path / "brightness").write_text(str(max_b))
log.info("Backlight %s: brightness -> %d (max)", path.name, max_b)
- except Exception as exc:
- log.error("Backlight on failed: %s", exc)
- else:
- try:
+ else:
(path / "brightness").write_text("0")
log.info("Backlight %s: brightness -> 0", path.name)
- except Exception as exc:
- log.error("Backlight off failed: %s", exc)
+ except Exception as exc:
+ log.error("Backlight %s failed: %s", "on" if enabled else "off", exc)
class ProtectClient:
@@ -232,11 +193,11 @@ def decode_protect_packets(data: bytes) -> list:
Each WebSocket message contains one or more concatenated packets.
Each packet header (8 bytes):
- [0] packet_type 1=action, 2=data
- [1] payload_format 1=JSON, 2=UTF8, 3=buffer
- [2] deflated 0 or 1
+ [0] packet_type 1=action, 2=data
+ [1] payload_format 1=JSON, 2=UTF8, 3=buffer
+ [2] deflated 0 or 1
[3] unused
- [4:8] payload_size big-endian uint32
+ [4:8] payload_size big-endian uint32
Followed by payload_size bytes of payload.
"""
@@ -260,10 +221,7 @@ def decode_protect_packets(data: bytes) -> list:
continue
if payload_format in (1, 2):
try:
- packets.append({
- "packet_type": packet_type,
- "payload": json.loads(payload_bytes),
- })
+ packets.append({"packet_type": packet_type, "payload": json.loads(payload_bytes)})
except Exception:
pass
offset = end
@@ -274,7 +232,7 @@ class DoorbellViewport:
def __init__(self, config: Config):
self.config = config
self.state = State.IDLE
- self.display = DisplayController(config.display_backend)
+ self.display = DisplayController()
self.protect = ProtectClient(config)
self.mpv_proc = None
self.timer_task = None
@@ -284,16 +242,8 @@ class DoorbellViewport:
log.info("doorbell-viewport starting")
self.config.log_config()
- # Ensure display starts off regardless of prior state
self.display.off()
-
- # Authenticate and obtain RTSP URL
- loop = asyncio.get_event_loop()
- ok = await loop.run_in_executor(None, self.protect.authenticate)
- if ok:
- url = await loop.run_in_executor(None, self.protect.get_camera_rtsp_url)
- if url:
- self.config.rtsp_url = url
+ await self._login_and_fetch_rtsp()
if not self.config.rtsp_url:
log.warning("No RTSP URL at startup; will retry after reconnect")
@@ -303,10 +253,16 @@ class DoorbellViewport:
self.touch_listener(),
)
+ async def _login_and_fetch_rtsp(self):
+ ok = await asyncio.to_thread(self.protect.authenticate)
+ if ok and not self.config.rtsp_url:
+ url = await asyncio.to_thread(self.protect.get_camera_rtsp_url)
+ if url:
+ self.config.rtsp_url = url
+
async def activate(self):
"""Transition IDLE->ACTIVE, or extend timer if already ACTIVE."""
was_idle = self.state == State.IDLE
- # Set state before any await so concurrent coroutines see updated state
self.state = State.ACTIVE
if self.timer_task and not self.timer_task.done():
@@ -321,10 +277,9 @@ class DoorbellViewport:
log.info("State: ACTIVE -> timer extended")
async def deactivate(self):
- """Transition ACTIVE->IDLE: immediately stop playback and kill display."""
+ """Transition ACTIVE->IDLE: stop playback and turn off display."""
if self.state == State.IDLE:
return
- # Set state before any await
self.state = State.IDLE
log.info("State: ACTIVE -> IDLE")
@@ -387,7 +342,7 @@ class DoorbellViewport:
self.mpv_proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=subprocess.DEVNULL,
- stderr=None, # pass through to journal
+ stderr=subprocess.DEVNULL,
)
log.info("mpv started (pid=%d)", self.mpv_proc.pid)
asyncio.create_task(self._watch_mpv())
@@ -409,7 +364,7 @@ class DoorbellViewport:
self.mpv_proc = None
async def _watch_mpv(self):
- """Restart mpv once on unexpected exit."""
+ """Restart mpv on unexpected exit while display is active."""
proc = self.mpv_proc
if not proc:
return
@@ -440,28 +395,21 @@ class DoorbellViewport:
log.info("Protect: reconnecting in %ds", backoff)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 60)
- loop = asyncio.get_event_loop()
- ok = await loop.run_in_executor(None, self.protect.authenticate)
- if ok and not self.config.rtsp_url:
- url = await loop.run_in_executor(None, self.protect.get_camera_rtsp_url)
- if url:
- self.config.rtsp_url = url
+ await self._login_and_fetch_rtsp()
async def _connect_protect_ws(self):
log.info("Protect: connecting to WebSocket")
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
- loop = asyncio.get_event_loop()
- last_update_id = await loop.run_in_executor(None, self.protect.get_last_update_id)
+ last_update_id = await asyncio.to_thread(self.protect.get_last_update_id)
ws_url = f"wss://{self.config.protect_host}/proxy/protect/ws/updates"
if last_update_id:
ws_url += f"?lastUpdateId={last_update_id}"
- headers = self.protect.ws_headers()
async with websockets.connect(
ws_url,
- **{_WS_HEADERS_KWARG: headers},
+ **{_WS_HEADERS_KWARG: self.protect.ws_headers()},
ssl=ssl_ctx,
ping_interval=20,
ping_timeout=10,
@@ -510,7 +458,6 @@ class DoorbellViewport:
if match and match in dev.name.lower():
log.info("Touch: matched by name: %s (%s)", dev.name, path)
return dev
- # Fallback: any device advertising multitouch absolute axes
caps = dev.capabilities()
if evdev.ecodes.EV_ABS in caps:
axis_codes = [code for code, _ in caps[evdev.ecodes.EV_ABS]]
@@ -523,8 +470,7 @@ class DoorbellViewport:
return None
async def _monitor_touch_device(self):
- loop = asyncio.get_event_loop()
- dev = await loop.run_in_executor(None, self._find_touch_device_sync)
+ dev = await asyncio.to_thread(self._find_touch_device_sync)
if not dev:
log.warning("Touch: no device found")
await asyncio.sleep(10)
@@ -538,12 +484,10 @@ class DoorbellViewport:
break
triggered = False
if event.type == evdev.ecodes.EV_ABS:
- # Multitouch: new contact (tracking ID assigned)
if (event.code == evdev.ecodes.ABS_MT_TRACKING_ID
and event.value >= 0):
triggered = True
elif event.type == evdev.ecodes.EV_KEY:
- # Single-touch BTN_TOUCH press
if event.code == evdev.ecodes.BTN_TOUCH and event.value == 1:
triggered = True
if triggered:
@@ -563,7 +507,7 @@ async def main():
sys.exit(1)
viewport = DoorbellViewport(config)
- loop = asyncio.get_event_loop()
+ loop = asyncio.get_running_loop()
def shutdown(sig):
log.info("Signal %s received, shutting down", sig.name)