from datetime import timedelta
from pathlib import Path
from my_iot.services.file_ import FloatValueFile
from my_iot.types_ import Unit
SERVICES = [
FloatValueFile(
Path('/sys/class/thermal/thermal_zone0/temp'),
'cpu_temperature',
timedelta(seconds=5.0),
Unit.CELSIUS,
0.001,
'CPU Temperature',
),
]
TELEGRAM_TOKEN = ...
TELEGRAM_CHAT_ID = ...
NEST_STRUCTURE_ID = ...
AWAY_CHANNEL = f'nest:structure:{NEST_STRUCTURE_ID}:away'
from typing import Any
from aiohttp import ClientSession
from my_iot.actions.telegram import ParseMode, send_animation
from my_iot.routing import (
router,
if_actual_value_not_equal,
if_channel_like,
if_newer,
)
from my_iot.types_ import Event
@router
@if_channel_like(r'.*:last_animated_image_url')
@if_newer
@if_actual_value_not_equal(AWAY_CHANNEL, 'home')
async def on_away_camera_event(*, event: Event, session: ClientSession, **_: Any):
await send_animation(
session=session,
token=TELEGRAM_TOKEN,
chat_id=TELEGRAM_CHAT_ID,
animation=event.value,
disable_notification=True,
caption=f'*{event.title}*\n{event.timestamp.astimezone():%b %d, %H:%M:%S}',
parse_mode=ParseMode.MARKDOWN,
)
TELEGRAM_TOKEN = ...
TELEGRAM_CHAT_ID = ...
from typing import Any
from aiohttp import ClientSession
from my_iot.actions.telegram import send_message
from my_iot.routing import (
if_channel_like,
if_changed,
if_not_equal,
router,
)
@router
@if_channel_like(r'.*:wwn_security_state')
@if_changed
@if_not_equal('ok')
async def on_deter(*, session: ClientSession, **_: Any):
await send_message(
session=session,
token=TELEGRAM_TOKEN,
chat_id=TELEGRAM_CHAT_ID,
text='🔴 Intrusion detected',
)