Práca s protokolom MQTT
pripojenie sa, publikovanie správ, prihlásenie sa na odber správ, nastavenie poslednej vôle
Komunikačný protokol MQTT patrí medzi najpopulárnejšie a najpoužívanejšie protokoly v IoT riešeniach. To je dané jeho jednoduchosťou a výrazne nižším množstvom prenášaných dát v porovnaní s inými protokolmi.
V tomto návode sa pozrieme na to, ako ho použiť v jazyku MicroPython.
Podpora protokolu v jazyku MicroPython
Modul pre prácu s MQTT sa nenachádza v každom firmvéri s jazykom MicroPython. Nájdete ho napr. vo firmvéri pre mikrokontrolér ESP32, ale nenájdete ho vo firmévri pre mikrokontrolér RP2040. To, či ho máte alebo nemáte k dispozícii, si môžete overiť vypísaním zoznamu všetkých dostupných modulov príkazom:
>>> help('modules')
Ak ho nemáte, môžete ho nainštalovať primo z prostredia REPL jazyka MicroPython týmito príkazmi:
>>> import mip
>>> mip.install('umqtt.simple')
Poznámka
Aby inštalácia prebehla úspešne, musíte byť s mikrokontrolérom pripojení do internetu.
Pripojenie k brokerovi
Aby sme sa mohli pripojiť na MQTT broker, potrebujeme
vytvoriť objekt typu MQTTClient
a pri vytváraní mu dodať
potrebné informácie pre pripojenie. Trieda MQTTClient
sa
nachádza v balíku umqtt.simple
:
from umqtt.simple import MQTTClient
Vytvorenie MQTT klienta vyzerá nasledovne:
= MQTTClient(
client
client_id,
server,=0,
port=None,
user=None,
password=0,
keepalive=None
ssl )
Význam jednotlivých parametrov je nasledovný:
client_id
- Jednoznačný identifikátor klienta, ktorým sa odlišuje od ostatných klientov pripojených k rovnakému MQTT brokerovi. Jedná sa o povinný parameter.server
- URL adresa alebo IP adresa MQTT brokra. Jedná sa o povinný parameter.port
- Port, na ktorom broker počúva. Ak ho nezadáme, použije sa port1883
, čo je štandardný port pre komunikáciu pomocou protokolu MQTT alebo sa použije port8883
, ak je nastavený parameterssl
.user
apassword
- V prípade, že pripojenie k MQTT brokerovi si vyžaduje prihlasovacie meno a heslo, zadáte ich pomocou týchto dvoch parametrov.keepalive
-ssl
-
Pre vygenerovanie jedinečného identifikátora klienta môžete využiť napr. tento fragment kódu:
import binascii
import machine
= binascii.hexlify(machine.unique_id()) CLIENT_ID
Pripojenie sa k brokerovi
Po vytvorení klienta sa môžete pokúsiť pripojiť pomocou volania
metódy .connect()
. Signatúra metódy je nasledovná:
connect(self, clean_session=True) .
Parameter clean_session
= MQTTClient(CLIENT_ID, server)
client connect()
client.# tu príde komunikácia
Ak sa pripojiť nepodarí, dôjde k vyvolaniu výnimky
MQTTException
z balíka umqtt.simple
. Ak sa
pripojenie podarí, metóda vráti hodnotu 0
.
Odpojenie sa od brokera
Odpojiť sa od brokera sa môžete zavolaním metódy
.disconnect()
nad objektom klienta. Táto metóda nemá žiadny
parameter.
= MQTTClient(CLIENT_ID, server)
client connect()
client.# tu príde komunikácia
client.disconnect()
Ak klient nie je pripojený, dôjde k vyvolaniu výnimky
OSError
.
Nastavenie poslednej vôle
Pred pripojením k MQTT brokeru je možné nastaviť aj poslednú vôľu zariadenia. Správa, ktorá je uvedená ako posledná vôľa, sa pošle do zadanej témy v prípade, keď dôjde k neočakávanej strate spojenia.
Poslednú vôľu je možné zadať zavolaním metódy
.set_last_will()
nad objektom klienta. Jej signatúra je
nasledovná:
self, topic, msg, retain=False, qos=0) .set_last_will(
Význam jednotlivých parametrov metódy je nasledovný:
topic
- Názov témy, do ktorej bude posledná vôľa odoslaná.msg
- Samotná správa, ktorá reprezentuje poslednú vôľu.retain
- Nastavenie príznakuretain
pre správu poslednej vôle.qos
- Úroveň QoS.
Odoslanie dát na MQTT broker
Pre odoslanie správy (tzv. publikovanie) použijeme metódu nad
MQTT klientom s názvom .publish()
. Signatúra tejto
metódy vyzerá nasledovne:
self, topic, msg, retain=False, qos=0) .publish(
Význam jednotlivých parametrov metódy je nasledovný:
topic
- Názov témy, do ktorej bude správa poslaná. Musí byť serializovateľný do poľa bytov.msg
- Samotná správa. Musí byť serializovateľná do poľa bytov.retain
- Označenie správy príznakomretain
. Predvolene správa nie je s týmto príznakom odoslaná.qos
- Úroveň QoS, ktorá môže byť0
,1
alebo2
. Predvolená hodnota je0
.
Je dôležité, aby parametre topic
, ako aj
msg
boli serializovateľné do poľa bytov. Ak by ste sa
napríklad pokúsili odoslať priamo slovník, došlo by k vyvolaniu výnimky
TypeError
. Slovník preto pred odoslaním prevedte na reťazec
pomocou balíka json
.
Príklad použitia môže vyzerať takto:
= 'kpi/kronos/humidity/ab123cd'
topic = {
message 'dt': '2024-10-30T16:55:06Z',
'metrics': [
{'dt': '2024-10-30T16:50:00Z',
'name': 'humidity',
'unit': 'percent',
'value': 77
}
]
} client.publish(topic, json.dumps(message))
Prijímanie správ z MQTT brokera
Pokiaľ chceme správy z MQTT brokera prijímať, musíme sa prihlásiť na ich odber z konkrétnej témy. Samozrejme naraz je možné byť prihlásený na odber správ z viacerých tém.
Na zabezpečenie prijímania správ je potrebné spraviť nasledovné:
- vytvoriť tzv. callback funkciu, ktorá sa zavolá vtedy, keď zariadenie dostane správu
- prihlásiť sa na odber správ do príslušnej témy
- skontrolovať dostupnosť správ na brokeri
Vytvorenie callback funkcie
Callback funkcia je normálna funkcia, ktorá však musí mať dva parametre:
topic
- názov témy, v ktorej došlo k prijatiu správy, amessage
- samotná prijatá správa.
Oba parametre sú reprezentované postupnosťou bytov. Preto ich netreba zabudnúť konvertovať do požadovaných údajových typov.
Funkcia nič nevracia.
Jednoduchá podoba callback funkcie, v ktorej je téma vo formáte reťazec a správa je prenášaná vo formáte JSON, môže vyzerať nasledovne:
def on_message(topic: bytes, message: bytes):
= topic.decode('utf-8')
topic = json.loads( message )
message print(f'>>> {topic}: {message}')
Callback funkciu je potrebné zaregistrovať u klienta ešte
predtým, ako dôjde k pripojeniu k MQTT brokeru, resp. predtým,
ako dôjde k prihláseniu sa na odber správ z konkrétnej témy. To je možné
zabezpečiť volaním metódy .set_callback()
nad objektom
klienta. Táto metóda má jediný parameter, ktorým je adresa
callback funkcie.
Zaregistrovanie vytvorenej callback funkcie
on_message()
bude vyzerať takto:
= MQTTClient(CLIENT_ID, server)
client
client.set_callback(on_message)connect() client.
Prihlásenie sa na odber správ
Prihlásiť sa na odber tém je možné pomocou metódy
.subscribe()
nad objektom klienta. Táto metóda má dva
parametre:
topic
- názov témy, do ktorej sa chceme prihlásiť na odber správ, aqos
- úroveň QoS, ktorá môže byť0
,1
alebo2
.
Funkcia nič nevracia.
Prihlásiť sa na odber je možné až potom, keď je klient pripojený k MQTT brokeru. Okrem toho musí mať klient zadefinovanú callback funkciu. Ak to tak nie je, prihlásenie na odber skončí s výnimkou.
Príklad prihlásenia sa na odber správ z témy
kpi/hyperion/switch/cd456ef
bude vyzerať takto:
= MQTTClient(CLIENT_ID, server)
client
client.set_callback(on_message)connect()
client.
'kpi/hyperion/switch/cd456ef') client.subscribe(
Kontrola doručených správ
Overiť dostupnosť správ je však potrebné vykonať ručne. To je možné vykonať dvoma funkciami nad objektom MQTT klienta:
.check_msg()
- Neblokujúca funkcia. Skontroluje, či sa na serveri nenachádza správa určená pre klienta. Ak sa správa na serveri nenachádza, funkcia skončí. Ak sa správa na serveri nachádza, spracuje ju pomocou callback funkcie a funkcia skončí..wait_msg()
- Blokujúca funkcia. Zastaví vykonávanie programu a bude čakať na prijatie jednej správy. Tú následne spracuje pomocou callback funkcie a následne funkcia skončí.
Funkciu .wait_msg()
je možné s výhodou využiť v
prípadoch, keď funkcionalita zariadenia je vyslovene závislá od správ,
ktoré prijme zo servera. V tomto prípade môžeme hovoriť o okamžitom
spracovaní prijatých správ.
Príklad jej použitia môže vyzerať nasledovne:
while True:
print('Waiting for message...')
# blocking call
client.wait_msg() print('Message was processed')
Naopak funkcia .check_msg()
je výhodná vtedy, ak je
potrebné len rýchlo overiť a spracovať prijatú správu a následne napr.
zaspať. V tomto prípade je veľmi dôležité, aby správa mala v danej téme
na serveri príznak retain
. Mohlo by sa totiž stať, že k
overeniu dôjde v dobe, keď sa už správa na serveri nenachádza, pretože
ju broker rozposlal všetkým známym klientom.
Príklad jej použitia môže vyzerať nasledovne:
# non-blocking call
client.check_msg() 10 * 1000) machine.deepsleep(
Poznámka
Môže sa stať, že pri overovaní dostupnosti správ pomocou volania
funkcie .check_msg()
nedôjde k prijatiu správy aj napriek
tomu, že sa správa na brokeri nachádza. V tom prípade si pred jej
použitím na chvíľu “zdriemnite” zavolaním funkcie sleep()
alebo funkciu zavolajte dvakrát po sebe:
0.1)
sleep(# non-blocking call
client.check_msg() 10 * 1000) machine.deepsleep(
Ďalšie zdroje
- MicroPython: umqtt.simple
- GitHub repozitár s implementáciou modulu
umqtt.simple