Práca s protokolom MQTT

pripojenie sa, publikovanie správ, prihlásenie sa na odber správ, nastavenie poslednej vôle

Komunikačný protokol MQTT patrí medzi najpopulárnejšie a najpoužívanejšie protokoly v IoT riešeniach. To je dané jeho jednoduchosťou a výrazne nižším množstvom prenášaných dát v porovnaní s inými protokolmi.

V tomto návode sa pozrieme na to, ako ho použiť v jazyku MicroPython.

Podpora protokolu v jazyku MicroPython

Modul pre prácu s MQTT sa nenachádza v každom firmvéri s jazykom MicroPython. Nájdete ho napr. vo firmvéri pre mikrokontrolér ESP32, ale nenájdete ho vo firmévri pre mikrokontrolér RP2040. To, či ho máte alebo nemáte k dispozícii, si môžete overiť vypísaním zoznamu všetkých dostupných modulov príkazom:

>>> help('modules')

Ak ho nemáte, môžete ho nainštalovať primo z prostredia REPL jazyka MicroPython týmito príkazmi:

>>> import mip
>>> mip.install('umqtt.simple')

Pripojenie k brokerovi

Aby sme sa mohli pripojiť na MQTT broker, potrebujeme vytvoriť objekt typu MQTTClient a pri vytváraní mu dodať potrebné informácie pre pripojenie. Trieda MQTTClient sa nachádza v balíku umqtt.simple:

from umqtt.simple import MQTTClient

Vytvorenie MQTT klienta vyzerá nasledovne:

client = MQTTClient(
    client_id,
    server,
    port=0,
    user=None,
    password=None,
    keepalive=0,
    ssl=None
)

Význam jednotlivých parametrov je nasledovný:

  • client_id - Jednoznačný identifikátor klienta, ktorým sa odlišuje od ostatných klientov pripojených k rovnakému MQTT brokerovi. Jedná sa o povinný parameter.
  • server - URL adresa alebo IP adresa MQTT brokra. Jedná sa o povinný parameter.
  • port - Port, na ktorom broker počúva. Ak ho nezadáme, použije sa port 1883, čo je štandardný port pre komunikáciu pomocou protokolu MQTT alebo sa použije port 8883, ak je nastavený parameter ssl.
  • user a password - V prípade, že pripojenie k MQTT brokerovi si vyžaduje prihlasovacie meno a heslo, zadáte ich pomocou týchto dvoch parametrov.
  • keepalive -
  • ssl -

Pre vygenerovanie jedinečného identifikátora klienta môžete využiť napr. tento fragment kódu:

import binascii
import machine

CLIENT_ID = binascii.hexlify(machine.unique_id())

Pripojenie sa k brokerovi

Po vytvorení klienta sa môžete pokúsiť pripojiť pomocou volania metódy .connect(). Signatúra metódy je nasledovná:

.connect(self, clean_session=True)

Parameter clean_session

client = MQTTClient(CLIENT_ID, server)
client.connect()
# tu príde komunikácia

Ak sa pripojiť nepodarí, dôjde k vyvolaniu výnimky MQTTException z balíka umqtt.simple. Ak sa pripojenie podarí, metóda vráti hodnotu 0.

Odpojenie sa od brokera

Odpojiť sa od brokera sa môžete zavolaním metódy .disconnect() nad objektom klienta. Táto metóda nemá žiadny parameter.

client = MQTTClient(CLIENT_ID, server)
client.connect()
# tu príde komunikácia
client.disconnect()

Ak klient nie je pripojený, dôjde k vyvolaniu výnimky OSError.

Nastavenie poslednej vôle

Pred pripojením k MQTT brokeru je možné nastaviť aj poslednú vôľu zariadenia. Správa, ktorá je uvedená ako posledná vôľa, sa pošle do zadanej témy v prípade, keď dôjde k neočakávanej strate spojenia.

Poslednú vôľu je možné zadať zavolaním metódy .set_last_will() nad objektom klienta. Jej signatúra je nasledovná:

.set_last_will(self, topic, msg, retain=False, qos=0)

Význam jednotlivých parametrov metódy je nasledovný:

  • topic - Názov témy, do ktorej bude posledná vôľa odoslaná.
  • msg - Samotná správa, ktorá reprezentuje poslednú vôľu.
  • retain - Nastavenie príznaku retain pre správu poslednej vôle.
  • qos - Úroveň QoS.

Odoslanie dát na MQTT broker

Pre odoslanie správy (tzv. publikovanie) použijeme metódu nad MQTT klientom s názvom .publish(). Signatúra tejto metódy vyzerá nasledovne:

.publish(self, topic, msg, retain=False, qos=0)

Význam jednotlivých parametrov metódy je nasledovný:

  • topic - Názov témy, do ktorej bude správa poslaná. Musí byť serializovateľný do poľa bytov.
  • msg - Samotná správa. Musí byť serializovateľná do poľa bytov.
  • retain - Označenie správy príznakom retain. Predvolene správa nie je s týmto príznakom odoslaná.
  • qos - Úroveň QoS, ktorá môže byť 0, 1 alebo 2. Predvolená hodnota je 0.

Je dôležité, aby parametre topic, ako aj msg boli serializovateľné do poľa bytov. Ak by ste sa napríklad pokúsili odoslať priamo slovník, došlo by k vyvolaniu výnimky TypeError. Slovník preto pred odoslaním prevedte na reťazec pomocou balíka json.

Príklad použitia môže vyzerať takto:

topic = 'kpi/kronos/humidity/ab123cd'
message = {
  'dt': '2024-10-30T16:55:06Z',
  'metrics': [
    {
      'dt': '2024-10-30T16:50:00Z',
      'name': 'humidity',
      'unit': 'percent',
      'value': 77
    }
  ]
}
client.publish(topic, json.dumps(message))

Prijímanie správ z MQTT brokera

Pokiaľ chceme správy z MQTT brokera prijímať, musíme sa prihlásiť na ich odber z konkrétnej témy. Samozrejme naraz je možné byť prihlásený na odber správ z viacerých tém.

Na zabezpečenie prijímania správ je potrebné spraviť nasledovné:

  1. vytvoriť tzv. callback funkciu, ktorá sa zavolá vtedy, keď zariadenie dostane správu
  2. prihlásiť sa na odber správ do príslušnej témy
  3. skontrolovať dostupnosť správ na brokeri

Vytvorenie callback funkcie

Callback funkcia je normálna funkcia, ktorá však musí mať dva parametre:

  • topic - názov témy, v ktorej došlo k prijatiu správy, a
  • message - samotná prijatá správa.

Oba parametre sú reprezentované postupnosťou bytov. Preto ich netreba zabudnúť konvertovať do požadovaných údajových typov.

Funkcia nič nevracia.

Jednoduchá podoba callback funkcie, v ktorej je téma vo formáte reťazec a správa je prenášaná vo formáte JSON, môže vyzerať nasledovne:

def on_message(topic: bytes, message: bytes):
    topic = topic.decode('utf-8')
    message = json.loads( message )
    print(f'>>> {topic}: {message}')

Callback funkciu je potrebné zaregistrovať u klienta ešte predtým, ako dôjde k pripojeniu k MQTT brokeru, resp. predtým, ako dôjde k prihláseniu sa na odber správ z konkrétnej témy. To je možné zabezpečiť volaním metódy .set_callback() nad objektom klienta. Táto metóda má jediný parameter, ktorým je adresa callback funkcie.

Zaregistrovanie vytvorenej callback funkcie on_message() bude vyzerať takto:

client = MQTTClient(CLIENT_ID, server)
client.set_callback(on_message)
client.connect()

Prihlásenie sa na odber správ

Prihlásiť sa na odber tém je možné pomocou metódy .subscribe() nad objektom klienta. Táto metóda má dva parametre:

  • topic - názov témy, do ktorej sa chceme prihlásiť na odber správ, a
  • qos - úroveň QoS, ktorá môže byť 0, 1 alebo 2.

Funkcia nič nevracia.

Prihlásiť sa na odber je možné až potom, keď je klient pripojený k MQTT brokeru. Okrem toho musí mať klient zadefinovanú callback funkciu. Ak to tak nie je, prihlásenie na odber skončí s výnimkou.

Príklad prihlásenia sa na odber správ z témy kpi/hyperion/switch/cd456ef bude vyzerať takto:

client = MQTTClient(CLIENT_ID, server)
client.set_callback(on_message)
client.connect()

client.subscribe('kpi/hyperion/switch/cd456ef')

Kontrola doručených správ

Overiť dostupnosť správ je však potrebné vykonať ručne. To je možné vykonať dvoma funkciami nad objektom MQTT klienta:

  • .check_msg() - Neblokujúca funkcia. Skontroluje, či sa na serveri nenachádza správa určená pre klienta. Ak sa správa na serveri nenachádza, funkcia skončí. Ak sa správa na serveri nachádza, spracuje ju pomocou callback funkcie a funkcia skončí.

  • .wait_msg() - Blokujúca funkcia. Zastaví vykonávanie programu a bude čakať na prijatie jednej správy. Tú následne spracuje pomocou callback funkcie a následne funkcia skončí.

Funkciu .wait_msg() je možné s výhodou využiť v prípadoch, keď funkcionalita zariadenia je vyslovene závislá od správ, ktoré prijme zo servera. V tomto prípade môžeme hovoriť o okamžitom spracovaní prijatých správ.

Príklad jej použitia môže vyzerať nasledovne:

while True:
  print('Waiting for message...')
  client.wait_msg()  # blocking call
  print('Message was processed')

Naopak funkcia .check_msg() je výhodná vtedy, ak je potrebné len rýchlo overiť a spracovať prijatú správu a následne napr. zaspať. V tomto prípade je veľmi dôležité, aby správa mala v danej téme na serveri príznak retain. Mohlo by sa totiž stať, že k overeniu dôjde v dobe, keď sa už správa na serveri nenachádza, pretože ju broker rozposlal všetkým známym klientom.

Príklad jej použitia môže vyzerať nasledovne:

client.check_msg()  # non-blocking call
machine.deepsleep(10 * 1000)

Ďalšie zdroje

  • MicroPython: umqtt.simple - GitHub repozitár s implementáciou modulu umqtt.simple