Notifikačná služba

jednoduchá služba v jazyku Python na odosielanie notifikácií prostredníctvom protokolu MQTT

Motivácia

Súčasťou riešení Internetu vecí nie sú len chytré zariadenia, ale aj softvérové služby. A práve jednu takú jednoduchú notifikačnú službu si urobíme počas tohto cvičenia. Pomocou tejto služby budeme vedieť z našich zariadení posielať notifikácie prostredníctvom protokolu MQTT. Táto služba bude súčasťou našej infraštruktúry a bude umiestnená v cloude.

Smart Department: Architektúra

Službu vytvoríme v jazyku Python. Na posielanie notifikácií budeme používať knižnicu Apprise, vďaka čomu budeme vedieť využiť na notifikácie desiatky rôznych služieb.

Ciele

  1. naučiť sa základy práce s knižniciou Paho-MQTT

  2. naučiť sa vytvoriť poslednú vôľuv MQTT

  3. naučiť sa základy práce s knižnicou Apprise

  4. naučiť sa vytvárať modely pomocou knižnice Pydantic

Postup

Bootstrap

Predtým, ako sa vrhneme do vývoja samotnej služby, si pripravíme prostredie.

Task

Vo svojom vývojovom prostredí si vytvorte nový projekt s názvom Notifier.

Upravte si štruktúru súborov projektu nasledovne:

./
├── readme.md
├── requirements.txt
└── src/
    ├── main.py
    └── models.py

Do súboru readme.md napíšte jednoduchý opis projektu vo formáte Markdown. Jeho obsah môže vyzerať nasledovne:

# Notifier

Simple service for sending notifications over _MQTT_ protocol, which are
delivered with [Apprise](https://github.com/caronc/apprise) library.

Ostatné súbory zatiaľ nechajte prázdne.

Task

Do prostredia svojho projektu nainštalujte potrebné balíky pre vývoj služby.

Pre vytvorenie služby budeme potrebovať tieto balíky:

  • apprise - Knižnica pre posielanie push notifikácií.
  • loguru - Knižnica pre jednoduché logovanie v jazyku Python.
  • paho-mqtt - Knižnica pre komunikáciu pomocou protokolu MQTT.
  • pydantic - Knižnica pre validáciu údajov.
  • pydantic-settings - Rozšírenie knižnice Pydantic pre načítanie konfigurácie (nie len) z premenných prostredia.

Vytvorte si samostatný súbor requirements.txt, do ktorého zapíšte všetky potrebné balíky, ktoré chcete nainštalovať:

apprise
loguru
paho-mqtt
pydantic
pydantic-settings

Z príkazového riadku môžete potrebné balíky nainštalovať pomocou príkazu pip takto:

$ pip install apprise paho-mqtt pydantic pydantic-settings loguru

Rovnako ich však môžete nainštalovať pomocou vytvoreného súboru requirements.txt takto:

$ pip install --requirement requirements.txt

Application Settings

Ešte predtým, ako sa vrhneme na samotnú implementáciu služby, sa pripravíme - vytvoríme si model pre reprezentáciu nastavení. Podľa odporúčaní Twelve factor app sa budeme snažiť oddeliť konfiguráciu aplikácie od jej implementácie a vďaka použitiu knižnice Pydantic bude možné konfiguráciu nastaviť aj pomocou premenných prostredia.

The 12 Factor App

Task

V module models.py vytvorte nový model Settings, ktorý bude obsahovať konfiguráciu aplikácie.

Model Settings bude tentokrát potomkom triedy BaseSettings z modulu pydantic_settings. Do konfigurácie uložíme nasledovné informácie:

  • broker - adresa MQTT brokera
  • port - číslo portu, na ktorom MQTT broker komunikuje, predvolená hodnota nech je 1883
  • username - prihlasovacie meno, predvolená hodnota nech je None
  • password - prihlasovacie heslo, predvolená hodnota nech je None
  • ssl - príznak, ktorým zapneme/vypneme používania SSL/TLS pri komunikácii pomocou MQTT protokolu, predvolene nastavíme na hodnotu False
  • topic_prefix - nemenná časť tém, ktorá bude tvoriť základ tém služby, v našom prípade môžeme rovno nastaviť predvolenú hodnotu na services/notifier
  • uid - jedinečný študentský identifikátor, ktorý sa stane súčasťou tém a bude fungovať aj ako identifikátor klienta

Výsledný model bude vyzerať nasledovne:

from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    broker: str
    port: int = 1883
    username: str | None = None
    password: str | None = None
    ssl: bool = False
    topic_prefix: str = 'services/notifier'
    uid: str

    model_config = SettingsConfigDict(
        env_prefix='NOTIFIER_',
        env_file_encoding='utf-8',
        env_file='.env',
        extra='ignore'
    )

Task

V module models.py vytvorte funkciu get_settings(), ktorá vráti inštanciu nastavení.

Keďže nastavenia stačí načítať len raz pri spustení aplikácie, funkciu dekorujte pomocou dekorátora @cache z modulu functools. Tým sa zabezpečí, že funkcia sa vykonaná len raz a výsledok bude uložený do cache. Pri každom ďalšom volaní funkcie sa použije hodnota, ktorá je v cache uložená.

Task

V triede Settings vytvorte inštančnú metódu base_topic(), ktorá vráti kompletnú podobu základnej témy.

Základná téma vznikne ako kompozícia prefixu tém a študentského identifikátoru.

Task

Overte správnosť svojej implementácie.

Vlastnosti služby je možné nastaviť pri spustení pomocou premenných prostredia. To je možné urobiť viacerými spôsobmi:

  1. Pomocou IDE - Premenné prostredia je možné nastaviť v dialógu pre spustenie aplikácie.

  2. Pomocou tzv. .env súboru - Jedná sa o súbor s premennými prostredia. To je výhodné najmä vtedy, ak je premenných prostredia veľa. Pre našu aplikáciu môže tento súbor vyzerať napríklad takto:

    # local.env
    # settings for notifier
    NOTIFIER_BROKER=147.232.205.176
    NOTIFIER_PORT=8883
    NOTIFIER_USERNAME=student
    NOTIFIER_PASSWORD=dgRk9cQ8sU4fCO
    NOTIFIER_SSL=true
    NOTIFIER_TOPIC_PREFIX=services/notifier
    NOTIFIER_UID=ab123cd
    
    # settings for loguru
    LOGURU_LEVEL=DEBUG

    Cestu k .env súboru je možné nastaviť priamo pomocou IDE. V konfigurácii triedy Settings je taktiež uvedené, že aplikácia sa pokúsi nájsť aj súbor s názvom .env v priečinku, v ktorom bola aplikácia spustená.

The Service Skeleton

V tomto kroku vytvoríme kostru aplikácie a pripojíme sa s ňou k MQTT brokeru. Na komunikáciu v jazyku Python s protokolom MQTT budeme používať knižnicu paho-mqtt.

Projekt paho

Task

Pomocou nasledujúceho kódu vytvorte kostru služby Notifier.

import ssl

from loguru import logger
import paho.mqtt.client as mqtt

from models import get_settings


def on_connect(client: mqtt.Client, userdata, flags, reason_code, properties):
    logger.debug(f"Connected with result code {reason_code}")


def on_message(client: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
    logger.debug(f'{msg.topic}: {msg.payload}')


def main():
    settings = get_settings()

    client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, settings.uid)
    client.username_pw_set(settings.username, settings.password)
    client.tls_set(cert_reqs=ssl.CERT_NONE)
    client.tls_insecure_set(settings.ssl)

    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(settings.broker, settings.port, 60)

    logger.info('Waiting for messages.')
    client.loop_forever()


if __name__ == '__main__':
    main()

Task

V callback funkcii on_connect() sa prihláste na odber správ tém services/notifier/ab123cd/notify a services/notifier/ab123cd/cmd.

Význam tém je nasledujúci:

  • services/notifier/ab123cd/notify - Pomocou tejto témy bude možné posielať vlastné notifikácie.
  • services/notifier/ab123cd/cmd - Pomocou tejto témy bude možné službe posielať príkazy na diaľku, čím ju bude možné ovládať.

Task

Overte správnosť svojej implementácie.

Na overenie použite MQTT klienta MQTT Explorer. Ak zašlete ľubovoľnú správu do ľubovoľnej z uvedených dvoch tém, v logoch aplikácie sa zobrazí prijatá správa spolu s informáciou o tom, do ktorej témy bola správa odoslaná.

Handling Commands

Službu, ktorú vytvárame, budeme môcť ovládať na diaľku. Na to bude slúžiť téma services/notifier/ab123cd/cmd. Príkaz bude odoslaný vo forme JSON dokumentu. V tomto kroku zabezpečíme ošetrenie príkazu shutdown na vypnutie služby, ale pripravíme ju na možnosť rozšíriť ju o ľubovoľný ďalší príkaz.

Task

Vytvorte kostru funkcie handle_command(), ktorá sa zavolá po prijatí správy v téme services/notifier/ab123cd/cmd.

Signatúra funkcie bude rovnaká, ako v prípade callback funkcie on_message(). Okrem logovania môžeme zabezpečiť konverziu prijatej správy na typ slovník v jazyku Python.

Task

Pomocou metódy .message_callback_add() nad objektom MQTT klienta nastavte vytvorenú funkciu handle_command() ako callback funkciu pre správy prichádzajúce do témy services/notifier/ab123cd/cmd. Implementáciu overte.

Po odoslaní správy do témy pre príkazy sa vám v logoch zobrazí príslušná správa.

Task

Zabezpečte, aby sa služba po prijatí príkazu shutdown korektne vypla.

Príkaz na vypnutie bude vyzerať nasledovne:

{
    "cmd": "shutdown"
}

Po prijatí príkazu shutdown službu ukončite. To môžete urobiť dvoma spôsobmi:

  • natvrdo zavolaním funkcií quit(0) a sys.exit(0), alebo
  • korektne odpojením klienta od brokera a ukončením aplikačnej slučky

Urobte všetko preto, aby ste službu ukončili korektne.

V prípade, že bol prijatý neznámy príkaz, túto udalosť vhodne zalogujte pomocou úrovne warning.

Task

Pomocou MQTT klienta MQTT Explorer overte, či sa služba naozaj korektne ukončí.

Službe pošlite správu na jej ukončenie. Ak ste postupovali správne, služba sa korektne ukončí. Rovnako overte, či sa v logoch zobrazí korektná správa v prípade prijatia neznámeho príkazu.

The Last Will

Keď sa služba odpojí alebo bude na diaľku vypnutá, bolo by dobré oznámiť všetkým ostatným, že už ďalej nepracuje. Vytvoríme preto samostatnú tému s postfixom /status, kde budeme informovať o jej aktuálnom stave. To znamená, že:

  • keď službu zapneme, pošleme do nej správu o tom, že sme v stave online
  • keď službu vypneme alebo z rozličných dôvodov dôjde k strate spojenia so službou, broker za nás nastaví jej nový stav na offline

Task

Po pripojení služby k brokeru pošlite do témy s postfixom /status správu {"status": "online"}.

Task

Pri inicializácii MQTT klienta nastavte pomocou metódy .will_set() poslednú vôľu služby.

Signatúra tejto metódy vyzerá nasledovne:

.will_set(topic, payload, qos, retain, properties)

Ich význam je nasledovný:

  • topic - téma pre odoslanie poslednej vôle
  • payload - samotná správa, ktorá bude reprezentovať poslednú vôľu
  • qos - úroveň QoS, predvolene je nastavená hodnota 0
  • retain - príznak retain, predvolene je nastavený na hodnotu False
  • properties - ďalšie nastavenia, ktorú sú predvolene prázdne

Vytvorenie poslednej vôle môže vyzerať nasledovne:

client.will_set(
    f'{settings.base_topic()}/status',
    '{"status": "offline"}',
    retain=True
)

Task

Overte správnosť svojej implementácie.

Overte, či správa v téme so stavom služby, sa po vypnutí služby korektne aktualizuje. Správanie overte pre všetky situácie:

  • stav online - zapnutie služby
  • stav offline - korektné vypnutie služby, ako aj nekorektné vypnutie služby (napr. jej vypnutím z vývojového prostredia)

Na overenie správania použite MQTT klienta MQTT Explorer.

The Notifications

V tomto kroku zabezpečíme samotné posielanie notifikácií a upozornení. Pre samotné správy, ktoré budu reprezentovať notifikáciu, vytvoríme model, pomocou ktorého budeme prijaté správy vedieť aj validovať. Na tento účel využijeme knižnicu Pydantic. Samotné notifikácie budeme posielať pomocou knižnice Apprise.

Knižnica Apprise

Task

V module models.py vytvorte model s názvom Notification, ktorý bude reprezentovať príslušnú notifkáciu.

Vytvorte samostatný modul s názvom models.py, do ktorého pridáme definície všetkých modelov, ktoré budeme používať. Model definujúci notifikáciu je uvedený nižšie:

from pydantic import BaseModel, AnyUrl

class Notification(BaseModel):
    urls: AnyUrl | list[AnyUrl]
    title: str = 'Alert'
    body: str
    attachments: list | None = None

Task

Vytvorte funkciu handle_notification(), ktorá bude slúžiť ako callback funkcia pre správy prichádzajúce do témy services/notifier/ab123cd/notify.

Funkcia bude mať rovnakú signatúru, ako callback funkcia on_message().

Task

Pomocou metódy .message_callback_add() nad objektom MQTT klienta nastavte vytvorenú funkciu handle_notification() ako callback funkciu pre správy prichádzajúce do témy services/notifier/ab123cd/notify.

Task

Overte správnosť svojej implementácie.

Správa reprezentujúca notifikáciu bude v našom prípade vyzerať nasledovne:

{
  "urls": "pbul://token-of-receiver",
  "title": "Hello world!",
  "body": "This message was sent with Apprise library, delivered by Paho MQTT Client and validated with Pydantic."
}

Po jej úspešnom odoslaní do témy pre notifikácie sa vám správa zobrazí v telefóne alebo v prehliadači. V prípade, že odošlete nevalídnu správu, služba sa ukončí s chybou validácie.

Ďalšie úlohy

  1. Vytvorte si vlastný validátor pre validáciu tvaru MQTT tém. Použite ho na validáciu nastavenia topic_prefix.

  2. Na nakonfigurovanie pripojenia k MQTT môžete využiť tzv. connection string. Jedná sa o jeden reťazec, v ktorom sa nachádzajú všetky potrebné informácie na vytvorenie pripojenia s MQTT brokerom. Jeho tvar je nasledovný:

    mqtts://username:password@broker.address.com:port/base/topic

    Upravte nastavenia tak, aby na základe premennej prostredia NOTIFIER_MQTT_URI vedel vytvoriť a správne naplniť jednotlivé položky pre pripojenie k MQTT brokeru.

  3. Knižnica Apprise umožňuje naraz poslať notifikáciu viacerým príjemcom. Model notifikácie podporu pre viacerých príjemcov už má, ale počas cvičenia sme túto funkcionalitu neimplementovali. Upravte ju teda tak, aby bolo možné prijatú správu poslať viacerým príjemcom na základe zoznamu príjemcov v prijatej správe cez protokol MQTT.

  4. Vo vytvorenej implementácii nijako nevalidujeme prijatý príkaz. Vytvorte preto samostatný model aj pre príkazy. Pozrite sa na možnosti knižnice Pydantic, ktoré je možné využiť na validovanie platných príkazov. Zatiaľ máme síce len jeden, ale rozšíriť ich zoznam o ďalšie by tým pádom nemal byť problém.

  5. Pokiaľ kedykoľvek dôjde k výnimke (najmä napr. pri validácii správ prijatých cez protokol MQTT), aplikácia sa ukončí. Ošetritre preto možné výskyty výnimiek, aby k ukončeniu aplikácie nedošlo, ale miesto toho vzniknuté chyby logujte.

Ďalšie zdroje

  1. HiveMQ: How to Use the Paho MQTT Client in Python - MQTT Client Library Encyclopedia

  2. Beginner’s Guide To Using Paho-MQTT, A Python MQTT Client

  3. Apprise- Push Notifications that work with just about every platform!

  4. Pydantic - The most widely used data validation library for Python.

  5. Eclipse Paho™ MQTT Python Client - Kompletná dokumentácia knižnice paho.

  6. functools — Higher-order functions and operations on callable objects - The functools module is for higher-order functions: functions that act on or return other functions.