feat(F1): integrate Web Push notifications into event processor
Import send_alert into processor.py, store engine as self._engine after init_db(), extend _execute_action() to accept event_type/severity/source_id and call send_alert for alert_all and push_and_record actions, and pass those params from _handle_event(). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
35
tests/unit/test_processor_alerts.py
Normal file
35
tests/unit/test_processor_alerts.py
Normal file
@@ -0,0 +1,35 @@
|
||||
"""Test that event processor calls send_alert for alert actions."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from vigilar.config import VigilarConfig
|
||||
from vigilar.events.processor import EventProcessor
|
||||
|
||||
|
||||
def test_execute_action_calls_send_alert():
|
||||
cfg = VigilarConfig()
|
||||
processor = EventProcessor(cfg)
|
||||
processor._engine = MagicMock()
|
||||
mock_bus = MagicMock()
|
||||
|
||||
with patch("vigilar.events.processor.send_alert") as mock_send:
|
||||
processor._execute_action(
|
||||
action="alert_all", event_id=42, bus=mock_bus,
|
||||
payload={"species": "bear"},
|
||||
event_type="WILDLIFE_PREDATOR", severity="CRITICAL", source_id="front",
|
||||
)
|
||||
mock_send.assert_called_once()
|
||||
|
||||
|
||||
def test_execute_action_push_and_record():
|
||||
cfg = VigilarConfig()
|
||||
processor = EventProcessor(cfg)
|
||||
processor._engine = MagicMock()
|
||||
mock_bus = MagicMock()
|
||||
|
||||
with patch("vigilar.events.processor.send_alert") as mock_send:
|
||||
processor._execute_action(
|
||||
action="push_and_record", event_id=10, bus=mock_bus,
|
||||
payload={}, event_type="PERSON_DETECTED", severity="WARNING", source_id="cam1",
|
||||
)
|
||||
mock_send.assert_called_once()
|
||||
Reference in New Issue
Block a user