vigilar/vigilar/camera/worker.py
Aaron D. Lee 713d16d445 Fix lint issues in pet detection integration
- Import PetsConfig directly instead of forward ref string
- Fix import sorting in worker.py, queries.py, constants.py
- Remove unused imports in profiles.py, trainer.py, processor.py

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 13:32:17 -04:00

329 lines
12 KiB
Python

"""Per-camera worker process.
Each camera runs in its own multiprocessing.Process. Responsibilities:
1. Open RTSP stream via OpenCV with reconnect logic
2. Run MOG2 motion detection on every frame (downscaled)
3. Manage adaptive FPS recording (2 FPS idle → 30 FPS on motion)
4. Feed frames to HLS streamer for live viewing
5. Publish motion events and heartbeats via MQTT
"""
import logging
import signal
import time
import cv2
from vigilar.bus import MessageBus
from vigilar.camera.hls import HLSStreamer, RemoteHLSStreamer
from vigilar.camera.motion import MotionDetector
from vigilar.camera.recorder import AdaptiveRecorder
from vigilar.camera.ring_buffer import RingBuffer
from vigilar.config import CameraConfig, MQTTConfig, PetsConfig, RemoteConfig
from vigilar.constants import Topics
from vigilar.detection.pet_id import PetIDClassifier
from vigilar.detection.wildlife import classify_wildlife_threat
from vigilar.detection.yolo import YOLODetector
log = logging.getLogger(__name__)
class CameraState:
"""Tracks the current state of a camera worker."""
def __init__(self):
self.connected = False
self.motion_active = False
self.motion_start_time: float = 0
self.last_motion_time: float = 0
self.fps_actual: float = 0
self.frame_count: int = 0
self.reconnect_count: int = 0
self.idle_frame_skip: int = 0
def run_camera_worker(
camera_cfg: CameraConfig,
mqtt_cfg: MQTTConfig,
recordings_dir: str,
hls_dir: str,
remote_cfg: RemoteConfig | None = None,
pets_cfg: PetsConfig | None = None,
) -> None:
"""Main entry point for a camera worker process."""
camera_id = camera_cfg.id
log.info("Camera worker starting: %s (%s)", camera_id, camera_cfg.rtsp_url)
# Setup logging for this process
logging.basicConfig(
level=logging.INFO,
format=f"%(asctime)s [{camera_id}] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# MQTT bus
bus = MessageBus(mqtt_cfg, client_id=f"camera-{camera_id}")
bus.connect()
# Subscribe to threshold update commands
def on_threshold_update(topic: str, payload: dict) -> None:
if "sensitivity" in payload:
detector.update_sensitivity(payload["sensitivity"])
if "min_area" in payload:
detector.update_min_area(payload["min_area"])
bus.subscribe(f"vigilar/camera/{camera_id}/config", on_threshold_update)
# Components
ring_buf = RingBuffer(
duration_s=camera_cfg.pre_motion_buffer_s,
max_fps=camera_cfg.motion_fps,
)
detector = MotionDetector(
sensitivity=camera_cfg.motion_sensitivity,
min_area_px=camera_cfg.motion_min_area_px,
zones=camera_cfg.motion_zones,
resolution=tuple(camera_cfg.resolution_motion),
)
recorder = AdaptiveRecorder(
camera_id=camera_id,
recordings_dir=recordings_dir,
idle_fps=camera_cfg.idle_fps,
motion_fps=camera_cfg.motion_fps,
resolution=tuple(camera_cfg.resolution_capture),
)
hls = HLSStreamer(
camera_id=camera_id,
hls_dir=hls_dir,
fps=15, # HLS stream at 15fps for bandwidth
resolution=(640, 360),
)
# Remote HLS streamer (lower quality for WireGuard tunnel)
remote_hls: RemoteHLSStreamer | None = None
if remote_cfg and remote_cfg.enabled:
remote_hls = RemoteHLSStreamer(
camera_id=camera_id,
hls_dir=hls_dir,
fps=remote_cfg.remote_hls_fps,
resolution=tuple(remote_cfg.remote_hls_resolution),
bitrate_kbps=remote_cfg.remote_hls_bitrate_kbps,
)
# Object detection (YOLOv8 unified detector)
yolo_detector = None
pet_classifier = None
if pets_cfg and pets_cfg.enabled:
yolo_detector = YOLODetector(
model_path=pets_cfg.model_path,
confidence_threshold=pets_cfg.confidence_threshold,
)
if pets_cfg.pet_id_enabled:
pet_classifier = PetIDClassifier(
model_path=pets_cfg.pet_id_model_path,
high_threshold=pets_cfg.pet_id_threshold,
low_threshold=pets_cfg.pet_id_low_confidence,
)
state = CameraState()
shutdown = False
def handle_signal(signum, frame):
nonlocal shutdown
shutdown = True
signal.signal(signal.SIGTERM, handle_signal)
# Camera capture loop
cap = None
last_heartbeat = 0
fps_timer = time.monotonic()
fps_frame_count = 0
native_fps = camera_cfg.motion_fps # assumed until we read from stream
# Idle frame skip: at 30fps native, skip factor 15 gives 2fps written
idle_skip_factor = max(1, native_fps // camera_cfg.idle_fps)
while not shutdown:
# Connect / reconnect
if cap is None or not cap.isOpened():
state.connected = False
backoff = min(2 ** state.reconnect_count, 60)
if state.reconnect_count > 0:
log.info("Reconnecting to %s in %ds...", camera_id, backoff)
bus.publish_event(Topics.camera_error(camera_id), error="disconnected", reconnect_in=backoff)
time.sleep(backoff)
cap = cv2.VideoCapture(camera_cfg.rtsp_url, cv2.CAP_FFMPEG)
if not cap.isOpened():
state.reconnect_count += 1
log.warning("Failed to connect to %s (%s)", camera_id, camera_cfg.rtsp_url)
continue
state.connected = True
state.reconnect_count = 0
native_fps = cap.get(cv2.CAP_PROP_FPS) or camera_cfg.motion_fps
idle_skip_factor = max(1, int(native_fps / camera_cfg.idle_fps))
log.info("Connected to %s (native %d fps, idle skip %d)", camera_id, native_fps, idle_skip_factor)
# Start HLS streamers
hls.start()
if remote_hls:
remote_hls.start()
# Start idle recording if continuous recording is enabled
if camera_cfg.record_continuous:
recorder.start_idle_recording()
# Read frame
ret, frame = cap.read()
if not ret:
log.warning("Frame read failed for %s", camera_id)
cap.release()
cap = None
hls.stop()
if remote_hls:
remote_hls.stop()
recorder.stop_recording()
continue
state.frame_count += 1
fps_frame_count += 1
now = time.monotonic()
# Calculate actual FPS every second
if now - fps_timer >= 1.0:
state.fps_actual = fps_frame_count / (now - fps_timer)
fps_frame_count = 0
fps_timer = now
# Always push to ring buffer (full FPS for pre-motion context)
ring_buf.push(frame.copy())
# Feed HLS streamers
hls.write_frame(frame)
if remote_hls:
remote_hls.write_frame(frame)
# Run motion detection
motion_detected, rects, confidence = detector.detect(frame)
if motion_detected and not state.motion_active:
# --- Motion START ---
state.motion_active = True
state.motion_start_time = now
state.last_motion_time = now
log.info("Motion START on %s (confidence=%.3f, zones=%d)", camera_id, confidence, len(rects))
bus.publish_event(
Topics.camera_motion_start(camera_id),
confidence=confidence,
zones=len(rects),
)
# Switch to motion recording with pre-buffer
if camera_cfg.record_on_motion:
pre_frames = ring_buf.flush()
recorder.start_motion_recording(pre_buffer=pre_frames)
elif motion_detected and state.motion_active:
# Ongoing motion — update timestamp
state.last_motion_time = now
elif not motion_detected and state.motion_active:
# Check if silence period exceeded
silence = now - state.last_motion_time
if silence >= camera_cfg.post_motion_buffer_s:
# --- Motion END ---
duration = now - state.motion_start_time
state.motion_active = False
log.info("Motion END on %s (duration=%.1fs)", camera_id, duration)
segment = recorder.stop_recording()
bus.publish_event(
Topics.camera_motion_end(camera_id),
duration_s=round(duration, 1),
clip_path=segment.file_path if segment else "",
)
# Resume idle recording if continuous mode
if camera_cfg.record_continuous:
recorder.start_idle_recording()
# Write frame to recorder (adaptive FPS)
if recorder.is_recording:
if state.motion_active:
# Motion: write every frame (full FPS)
recorder.write_frame(frame)
else:
# Idle: write every Nth frame
if state.frame_count % idle_skip_factor == 0:
recorder.write_frame(frame)
# Run object detection on motion frames
if state.motion_active and yolo_detector and yolo_detector.is_loaded:
detections = yolo_detector.detect(frame)
for det in detections:
category = YOLODetector.classify(det)
if category == "domestic_animal":
# Crop for pet ID
x, y, w, h = det.bbox
crop = frame[max(0, y):y + h, max(0, x):x + w]
pet_result = None
if pet_classifier and pet_classifier.is_loaded and crop.size > 0:
pet_result = pet_classifier.identify(crop, species=det.class_name)
payload = {
"species": det.class_name,
"confidence": round(det.confidence, 3),
"camera_location": camera_cfg.location,
}
if pet_result and pet_result.is_identified:
payload["pet_id"] = pet_result.pet_id
payload["pet_name"] = pet_result.pet_name
payload["pet_confidence"] = round(pet_result.confidence, 3)
bus.publish_event(
Topics.pet_location(pet_result.pet_name.lower()),
camera_id=camera_id,
camera_location=camera_cfg.location,
)
bus.publish_event(Topics.camera_pet_detected(camera_id), **payload)
elif category == "wildlife" and pets_cfg:
frame_area = frame.shape[0] * frame.shape[1]
threat_level, species = classify_wildlife_threat(
det, pets_cfg.wildlife, frame_area,
)
bus.publish_event(
Topics.camera_wildlife_detected(camera_id),
species=species,
threat_level=threat_level,
confidence=round(det.confidence, 3),
camera_location=camera_cfg.location,
)
# Heartbeat every 10 seconds
if now - last_heartbeat >= 10:
last_heartbeat = now
bus.publish_event(
Topics.camera_heartbeat(camera_id),
connected=state.connected,
fps=round(state.fps_actual, 1),
motion=state.motion_active,
recording=recorder.is_recording,
recording_fps=recorder.current_fps,
resolution=list(frame.shape[:2][::-1]), # [w, h]
)
# Shutdown
log.info("Camera worker shutting down: %s", camera_id)
if cap and cap.isOpened():
cap.release()
recorder.stop_recording()
hls.stop()
if remote_hls:
remote_hls.stop()
bus.disconnect()