Hlavní navigace

Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři

Pavel Tišnovský

Dnes si nejdříve ukážeme, jaký je možné komunikovat se systémem Apache ActiveMQ při použití protokolu AMQP. Druhá část článku je věnována spolupráci mezi Apache ActiveMQ a aplikacemi naprogramovanými v jazyku Go.

Doba čtení: 34 minut

Sdílet

11. Jednoduchý nástroj pro sledování speciálních témat

12. Analýza výsledků práce nástroje

13. Použití message brokerů z programovacího jazyka Go s využitím protokolu STOMP

14. Připojení k message brokerovi, práce se zprávami

15. Implementace producenta zpráv (publishera)

16. Implementace konzumenta zpráv

17. Úprava producenta a konzumenta tak, aby komunikace s message brokerem probíhala v gorutině

18. Repositář s demonstračními příklady

19. Odkazy na předchozí části seriálu

20. Odkazy na Internetu

1. Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři

Dnešní část seriálu o systémech implementujících fronty zpráv bude rozdělena do dvou částí. V první části si na několika co nejjednodušších demonstračních příkladech ukážeme, jakým způsobem je možné komunikovat se systémem Apache ActiveMQ s využitím protokolu AMQP. Příklady budou naprogramovány v Pythonu, přičemž vlastní realizaci komunikačního protokolu bude zajišťovat knihovna pojmenovaná Qpid Proton, kterou nalezneme na adrese https://qpid.apache.org/proton/ (tato knihovna podporuje například i již popsaného message brokera RabbitMQ). Zajímavé je, že programátorské rozhraní knihovny Qpid Proton je zcela odlišné od rozhraní protokolu STOMP, které bylo realizováno v knihovně stomp.py, viz též předchozí část tohoto seriálu s popisem možností poskytovaných touto knihovnou.

Druhá část článku je věnována nepatrně odlišnému tématu. Ukážeme si v ní způsob realizace klientů vyvinutých v programovacím jazyce Go, kterému se podrobněji věnujeme v paralelně běžícím seriálu. Uvidíme, že díky některým vlastnostem programovacího jazyka Go, zejména díky kanálům a gorutinám, je implementace klientů message brokera v mnoha ohledech velmi elegantní.

Poznámka: před spuštěním demonstračních příkladů si zkontrolujte, zda soubor conf/activemq.xml
obsahuje konfiguraci protokolů STOMP i AMQP – viz zvýrazněné části:
<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

2. Instalace knihovny Qpid Proton

V demonstračních příkladech, které si popíšeme a ukážeme v navazujících kapitolách, použijeme pro komunikaci se systémem Apache ActiveMQ knihovnu nazvanou Qpid Proton, která je určena pro programovací jazyk Python. Tato knihovna je – jak je dnes již dobrým zvykem – dostupná na PyPi (Python Package Indexu), ovšem před její instalací je nutné nainstalovat i Cython, a to v samostatném kroku. Cython, s nímž jsme se seznámili v jiném článku, nainstalujeme pro právě přihlášeného uživatele následujícím příkazem:

$ pip3 install --user cython
 
Collecting cython
  Downloading https://files.pythonhosted.org/packages/16/98/49aa24054e99e9c7734e49d6996662f547e4e2faae0051d35fbbc461afa4/Cython-0.29.5-cp36-cp36m-manylinux1_x86_64.whl (2.1MB)
    100% |████████████████████████████████| 2.1MB 603kB/s
Installing collected packages: cython
Successfully installed cython-0.29.5

Po (doufejme že úspěšné) instalaci Cythonu již můžeme bez problémů nainstalovat i knihovnu Qpid Proton, a to takto:

$ pip3 install --user python-qpid-proton
 
Collecting python-qpid-proton
  Downloading https://files.pythonhosted.org/packages/fe/e3/58379a2262a31788cfa037bde2c0c77273a74f34943f73abf20030c71f17/python-qpid-proton-0.26.0.zip (563kB)
    100% |████████████████████████████████| 573kB 1.1MB/s
Installing collected packages: python-qpid-proton
  Running setup.py install for python-qpid-proton ... done
Successfully installed python-qpid-proton-0.26.0
Poznámka: instalace proběhla pro Python 3, i když by knihovnu mělo být možné použít i společně s Pythonem 2.

Základní kontrolu instalace můžeme provést přímo z interaktivní smyčky jazyka Python. Použijeme interpret Pythonu 3, protože i instalace byla provedena pro Python 3:

$ python3
 
Python 3.6.3 (default, Oct  9 2017, 12:11:29)
[GCC 7.2.1 20170915 (Red Hat 7.2.1-2)] on linux
Type "help", "copyright", "credits" or "license" for more information.
 
>>> import proton
>>> help("proton")

Po potvrzení posledního příkazu by se měla zobrazit nápověda:

Help on package proton:
 
NAME
    proton
 
DESCRIPTION
    The proton module defines a suite of APIs that implement the AMQP 1.0
    protocol.
 
    The proton APIs consist of the following classes:
 
     - L{Message}   -- A class for creating and/or accessing AMQP message content.
     - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded
                      data.
 
PACKAGE CONTENTS

3. Producent zpráv publikovaných pod zvoleným tématem

Instalaci knihovny Qpid Proton jsme dokončili, takže si nyní můžeme ukázat způsob implementace jednoduchého producenta zpráv, které budou přes message brokera posílány s využitím strategie PUB-SUB – zprávy tedy nebudou ukládány do fronty, ale budou přímo přeposlány všem posluchačům, kteří jsou v daný okamžik k message brokerovi připojeni.

Ve výchozím nastavení systému Apache ActiveMQ je pro komunikaci s využitím protokolu AMQP používán port 5672, který je dokonce k tomuto účelu rezervován. Doménové jméno serveru tedy společně s portem 5672 použijeme pro určení jeho adresy ve formátu, v jakém ho očekává knihovna Qpid:

ADDRESS = "localhost:5672"

nebo:

ADDRESS = "192.168.1.42:5672"

popř.:

ADDRESS = "foo.bar.baz:5672"

Určit musíme i cíl zprávy. Ten typicky začíná prefixem „topic://“ nebo „queue://“. V prvním příkladu použijeme téma (topic) nazvané „event“, takže se cíl zapíše následovně (vypadá trošku jako URL i s uvedením „protokolu“):

TARGET = "topic://event"

Samotný producent zprávy se inicializuje a následně spustí takto (ve vlastním kontejneru, což je ovšem termín knihovny Qpid):

publisher = Publisher(ADDRESS, TARGET)
container = Container(publisher)
container.run()

Samozřejmě nám zbývá implementovat vlastní logiku producenta, tj. musíme naprogramovat třídu nazvanou Publisher, která byla zmíněna v předchozích programových řádcích. Tato třída je odvozena od třídy MessagingHandler, v níž je předepsáno několik callback metod volaných samotnou knihovnou Qpid Proton na základě různých událostí (Event), které nastávají v průběhu života producenta zpráv. Prozatím nás budou zajímat čtyři typy událostí:

  1. on_start – spuštění producenta v kontejneru
  2. on_accepted – akceptace poslané zprávy
  3. on_disconnected – odpojení od message brokera
  4. on_sendable – systém je připraven na poslání další zprávy

V nejjednodušším případě – což je výchozí stav – budou handlery prázdné, ovšem povšimněte si, že se každému handleru skutečně předává objekt, který reprezentuje vzniklou událost:

class Publisher(MessagingHandler):
 
    def __init__(self, url, target):
        pass
 
    def on_start(self, event):
        pass
 
    def on_sendable(self, event):
        pass
 
    def on_accepted(self, event):
        pass
 
    def on_disconnected(self, event):
        pass

Konkrétní implementace producenta zpráv samozřejmě bude složitější, což je patrné i při pohledu na jeho úplný zdrojový kód:

from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
 
ADDRESS = "localhost:5672"
TARGET = "topic://event"
 
 
class Publisher(MessagingHandler):
 
    def __init__(self, url, target):
        super(Publisher, self).__init__()
        self.url = url
        self.target = target
        self.message_sent = False
 
    def on_start(self, event):
        print("on_start()")
        connection = event.container.connect(self.url)
        event.container.create_sender(connection, self.target)
 
    def on_sendable(self, event):
        print("on_sendable()")
        if not self.message_sent:
            message = Message(id=1, body="Hello world1")
            event.sender.send(message)
            self.message_sent = True
            print("Message has been sent")
        else:
            print("Already sent... do nothing")
 
    def on_accepted(self, event):
        print("on_accepted()")
        if self.message_sent:
            event.connection.close()
 
    def on_disconnected(self, event):
        print("on_disconnected()")
 
 
publisher = Publisher(ADDRESS, TARGET)
container = Container(publisher)
container.run()
Poznámka: důležité je počkat na skutečné poslání a akceptaci zprávy. Pokud producent skončí dřív, než byla zpráva akceptována, nemusí dojít k jejímu doručení.

4. Příjemce zpráv s nastaveným tématem

Příjemce zprávy se vytvoří velmi podobným způsobem jako její producent:

subscriber = Subscriber(ADDRESS, SOURCE)
container = Container(subscriber)
container.run()

V nejjednodušším případě bude implementace třídy Subscriber odvozené od nám již známé třídy MessagingHandler obsahovat jen dvě callback metody:

class Subscriber(MessagingHandler):
 
    def __init__(self, url, source):
        pass
 
    def on_start(self, event):
        pass
 
    def on_message(self, event):
        pass

Ve chvíli, kdy je automaticky zavolána metoda on_start, se můžeme připojit k message brokerovi a přihlásit se k odebírání zpráv s nějakým tématem:

connection = event.container.connect(self.url)
event.container.create_receiver(connection, self.source)

Samotný příjem zprávy je již snadný, protože pro každou zprávu, kterou z message brokera získáme, je zavolána callback metoda on_message. Zpráva je součástí parametru event, text zprávy pak v atributu body:

def on_message(self, event):
    message = event.message
 
    print("Received message '{m}'".format(m=message.body))
 
    event.receiver.close()
    event.connection.close()
Poznámka: tento příjemce zprávy je velmi jednoduchý a akceptuje pouze jedinou zprávu. Proto ihned po jejím přijetí ukončí připojení.

Opět si ukažme úplný zdrojový kód tohoto příkladu:

from proton.handlers import MessagingHandler
from proton.reactor import Container
 
ADDRESS = "localhost:5672"
SOURCE = "topic://event"
 
 
class Subscriber(MessagingHandler):
 
    def __init__(self, url, source):
        super(Subscriber, self).__init__()
        self.url = url
        self.source = source
 
    def on_start(self, event):
        print("on_start()")
        connection = event.container.connect(self.url)
        event.container.create_receiver(connection, self.source)
 
    def on_message(self, event):
        message = event.message
 
        print("Received message '{m}'".format(m=message.body))
 
        event.receiver.close()
        event.connection.close()
 
 
subscriber = Subscriber(ADDRESS, SOURCE)
container = Container(subscriber)
container.run()

5. Producent zpráv posílaných do fronty

Pokud budeme chtít namísto komunikace typu PUB-SUB použít strategii PUSH-PULL budou změny provedené v producentovi i konzumentovi minimální, i když samotný transport zpráv bude naprosto odlišný (to je však záležitostí samotného message brokera, nikoli klientů, kteří se k němu připojují). Změní se pouze cíl, a to z:

TARGET = "topic://event"

na:

TARGET = "queue://test"
Poznámka: „test“ je v tomto případě jménem fronty, do níž se budou zprávy posílat.

Upravený zdrojový kód producenta zpráv posílaných do fronty „test“ bude vypadat následovně:

from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
 
ADDRESS = "localhost:5672"
TARGET = "queue://test"
 
 
class Publisher(MessagingHandler):
 
    def __init__(self, url, target):
        super(Publisher, self).__init__()
        self.url = url
        self.target = target
        self.message_sent = False
 
    def on_start(self, event):
        print("on_start()")
        connection = event.container.connect(self.url)
        event.container.create_sender(connection, self.target)
 
    def on_sendable(self, event):
        print("on_sendable()")
        if not self.message_sent:
            message = Message(id=1, body="Hello world1")
            event.sender.send(message)
            self.message_sent = True
            print("Message has been sent")
        else:
            print("Already sent... do nothing")
 
    def on_accepted(self, event):
        print("on_accepted()")
        if self.message_sent:
            event.connection.close()
 
    def on_disconnected(self, event):
        print("on_disconnected()")
 
 
publisher = Publisher(ADDRESS, TARGET)
container = Container(publisher)
container.run()

6. Konzument zpráv z vybrané fronty

Kód konzumenta, který bude postupně vybírat zprávy z fronty nazvané „test“, nalezneme na adrese https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample02_publisher_subcri­ber_queue/subscriber.py:

from proton.handlers import MessagingHandler
from proton.reactor import Container
 
ADDRESS = "localhost:5672"
SOURCE = "queue://test"
 
 
class Subscriber(MessagingHandler):
 
    def __init__(self, url, source):
        super(Subscriber, self).__init__()
        self.url = url
        self.source = source
 
    def on_start(self, event):
        print("on_start()")
        connection = event.container.connect(self.url)
        event.container.create_receiver(connection, self.source)
 
    def on_message(self, event):
        message = event.message
 
        print("Received message '{m}'".format(m=message.body))
 
        event.receiver.close()
        event.connection.close()
 
 
subscriber = Subscriber(ADDRESS, SOURCE)
container = Container(subscriber)
container.run()

7. Logika implementace producenta většího množství zpráv

Nyní si ukažme, jak se implementuje producent většího množství zpráv. Kvůli tomu, že producent veškerou komunikaci řídí z callback metod, bude implementace nepatrně složitější, než tomu bylo při použití jiných protokolů. Nejdříve ve třídě Publisher v konstruktoru vytvoříme dva pomocné atributy s počtem poslaných zpráv a počtem zpráv akceptovaných:

self.message_sent = 0
self.message_accepted = 0

Dále budeme v callback metodě on_sendable postupně posílat běžné zprávy a nakonec i zprávu s textem „exit“, kterou budeme řídit konzumenta zpráv:

if self.message_sent < MAX_MESSAGES:
    self.message_sent += 1
    message = Message(id=self.message_sent, body="Hello world #{n}".format(n=self.message_sent))
    event.sender.send(message)
elif self.message_sent == MAX_MESSAGES:
    self.message_sent += 1
    message = Message(id=self.message_sent, body="exit")
    event.sender.send(message)

V callback metodě on_accepted pouze zajistíme, aby se připojení ukončilo, ale až po odeslání všech zpráv (ne dříve!):

self.message_accepted += 1
if self.message_accepted == self.message_sent:
    print("All messages confirmed")
    event.connection.close()

Upravený zdrojový kód producenta zpráv bude vypadat následovně:

from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
 
ADDRESS = "localhost:5672"
TARGET = "queue://test"
 
MAX_MESSAGES = 10
 
class Publisher(MessagingHandler):
 
    def __init__(self, url, target):
        super(Publisher, self).__init__()
        self.url = url
        self.target = target
        self.message_sent = 0
        self.message_accepted = 0
 
    def on_start(self, event):
        print("on_start()")
        connection = event.container.connect(self.url)
        event.container.create_sender(connection, self.target)
 
    def on_sendable(self, event):
        print("on_sendable()")
        if self.message_sent < MAX_MESSAGES:
            self.message_sent += 1
            message = Message(id=self.message_sent, body="Hello world #{n}".format(n=self.message_sent))
            event.sender.send(message)
            print("Message #{n} has been sent".format(n=self.message_sent))
        elif self.message_sent == MAX_MESSAGES:
            self.message_sent += 1
            message = Message(id=self.message_sent, body="exit")
            event.sender.send(message)
            print("Exit message")
 
    def on_accepted(self, event):
        print("on_accepted()")
        self.message_accepted += 1
        if self.message_accepted == self.message_sent:
            print("All messages confirmed")
            event.connection.close()
        print("Number of accepted messages: {n}".format(n=self.message_accepted))
 
    def on_disconnected(self, event):
        print("on_disconnected()")
 
 
publisher = Publisher(ADDRESS, TARGET)
container = Container(publisher)
container.run()

8. Konzument zpráv ukončovaný explicitně poslanou zprávou EXIT

Samotný konzument zpráv se příliš nezmění, ovšem odlišná bude logika naprogramovaná v callback metodě on_message. V této metodě totiž spojení ukončíme pouze v tom případě, že byla přijata zpráva „exit“:

message = event.message
 
print("Received message '{m}'".format(m=message.body))
 
if message.body == "exit":
    print("Last message for me...good bye")
    event.receiver.close()
    event.connection.close()

Samozřejmě si opět ukážeme celý zdrojový kód konzumenta:

from proton.handlers import MessagingHandler
from proton.reactor import Container
 
ADDRESS = "localhost:5672"
SOURCE = "queue://test"
 
 
class Subscriber(MessagingHandler):
 
    def __init__(self, url, source):
        super(Subscriber, self).__init__()
        self.url = url
        self.source = source
 
    def on_start(self, event):
        print("on_start()")
        connection = event.container.connect(self.url)
        event.container.create_receiver(connection, self.source)
 
    def on_message(self, event):
        message = event.message
 
        print("Received message '{m}'".format(m=message.body))
 
        if message.body == "exit":
            print("Last message for me...good bye")
            event.receiver.close()
            event.connection.close()
 
 
subscriber = Subscriber(ADDRESS, SOURCE)
container = Container(subscriber)
container.run()

9. Speciální témata vytvářená automaticky systémem AMQ

Pokud se podíváme na administrátorské rozhraní systému Apache ActiveMQ ve chvíli, kdy se používá komunikační protokol AMQP, zjistíme, že se v sekci „Topics“ automaticky vytvořila čtyři nová témata:

ActiveMQ.Advisory.Connection
ActiveMQ.Advisory.Queue
ActiveMQ.Advisory.Topic
ActiveMQ.Advisory.MasterBroker

Pro každou používanou frontu se vytvoří ještě další dvojice témat, takže například pokud používáme frontu nazvanou „test“, uvidíme tato dvě témata:

ActiveMQ.Advisory.Consumer.Queue.test
ActiveMQ.Advisory.Producer.Queue.test

Tato témata, resp. přesněji řečeno zprávy, které se zde publikují, je možné využít pro sledování aktuálního stavu celého message brokera. Zpracování těchto zpráv je jednoduché, pouze si musíme ukázat jejich formát.

10. Příprava běžného producenta a konzumenta zpráv

Nejprve si připravíme běžného producenta zpráv, který se nijak neliší od producentů předchozích:

from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
 
ADDRESS = "localhost:5672"
TARGET = "queue://test"
 
MAX_MESSAGES = 100
 
 
class Publisher(MessagingHandler):
 
    def __init__(self, url, target):
        super(Publisher, self).__init__()
        self.url = url
        self.target = target
        self.message_sent = 0
        self.message_accepted = 0
 
    def on_start(self, event):
        print("on_start()")
        connection = event.container.connect(self.url)
        event.container.create_sender(connection, self.target)
 
    def on_sendable(self, event):
        print("on_sendable()")
        if self.message_sent < MAX_MESSAGES:
            self.message_sent += 1
            message = Message(id=self.message_sent, body="Hello world #{n}".format(n=self.message_sent))
            event.sender.send(message)
            print("Message #{n} has been sent".format(n=self.message_sent))
        elif self.message_sent == MAX_MESSAGES:
            self.message_sent += 1
            message = Message(id=self.message_sent, body="exit")
            event.sender.send(message)
            print("Exit message")
 
    def on_accepted(self, event):
        print("on_accepted()")
        self.message_accepted += 1
        if self.message_accepted == self.message_sent:
            print("All messages confirmed")
            event.connection.close()
        print("Number of accepted messages: {n}".format(n=self.message_accepted))
 
    def on_disconnected(self, event):
        print("on_disconnected()")
 
 
publisher = Publisher(ADDRESS, TARGET)
container = Container(publisher)
container.run()

Konzument zpráv je upravený nepatrně – každou zprávu bude zpracovávat minimálně jednu sekundu, aby bylo možné mezitím sledovat stav speciálních témat:

import time
 
from proton.handlers import MessagingHandler
from proton.reactor import Container
 
ADDRESS = "localhost:5672"
SOURCE = "queue://test"
 
 
class Subscriber(MessagingHandler):
 
    def __init__(self, url, source):
        super(Subscriber, self).__init__()
        self.url = url
        self.source = source
 
    def on_start(self, event):
        print("on_start()")
        connection = event.container.connect(self.url)
        event.container.create_receiver(connection, self.source)
 
    def on_message(self, event):
        message = event.message
 
        print("Received message '{m}'".format(m=message.body))
 
        if message.body == "exit":
            print("Last message for me...good bye")
            event.receiver.close()
            event.connection.close()
        time.sleep(1)
 
 
subscriber = Subscriber(ADDRESS, SOURCE)
container = Container(subscriber)
container.run()
Poznámka: podobným způsobem producenta zpráv upravit nelze, protože reakce na události v callback metodách on_sendable a on_accepted musí být velmi rychlé.

11. Jednoduchý nástroj pro sledování speciálních témat

Samotný nástroj, který bude speciální témata sledovat, se vlastně žádným zásadním způsobem neodlišuje od konzumenta zpráv. Pouze musíme zajistit, aby došlo k sledování všech zajímavých témat; nikdo nám samozřejmě nebrání sledovat více témat současně:

TOPICS = (
    "ActiveMQ.Advisory.Connection",
    "ActiveMQ.Advisory.Consumer.Queue.test",
    "ActiveMQ.Advisory.MasterBroker",
    "ActiveMQ.Advisory.Producer.Queue.test",
    "ActiveMQ.Advisory.Queue",
    "ActiveMQ.Advisory.Topic"
)

Přihlášení ke všem tématům je provedeno v callback metodě on_start:

connection = event.container.connect(self.url)
for topic in TOPICS:
    event.container.create_receiver(connection, "topic://" + topic)

Následuje výpis zdrojového kódu tohoto nástroje:

from proton.handlers import MessagingHandler
from proton.reactor import Container
 
ADDRESS = "localhost:5672"
 
TOPICS = (
    "ActiveMQ.Advisory.Connection",
    "ActiveMQ.Advisory.Consumer.Queue.test",
    "ActiveMQ.Advisory.MasterBroker",
    "ActiveMQ.Advisory.Producer.Queue.test",
    "ActiveMQ.Advisory.Queue",
    "ActiveMQ.Advisory.Topic"
)
 
 
class Subscriber(MessagingHandler):
 
    def __init__(self, url):
        super(Subscriber, self).__init__()
        self.url = url
 
    def on_start(self, event):
        print("on_start()")
        connection = event.container.connect(self.url)
        for topic in TOPICS:
            event.container.create_receiver(connection, "topic://" + topic)
 
    def on_message(self, event):
        message = event.message
        print(message)
 
 
subscriber = Subscriber(ADDRESS)
container = Container(subscriber)
container.run()

12. Analýza výsledků práce nástroje

Po spuštění producenta zpráv, jejich konzumenta a současně i nástroje pro sledování speciálních témat by tento nástroj měl zobrazit tyto zprávy:

Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:374", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}}
Message{address="topic://ActiveMQ.Advisory.Producer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:375", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "producerCount"=1, "originBrokerName"="localhost"}}
Message{address="topic://ActiveMQ.Advisory.Producer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:376", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "producerCount"=0, "originBrokerName"="localhost"}}
Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:377", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}}
Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:378", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}}
Message{address="topic://ActiveMQ.Advisory.Consumer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:379", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "consumerCount"=1, "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}}
Message{address="topic://ActiveMQ.Advisory.Consumer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:380", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "consumerCount"=0, "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}}
Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:381", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}}

Povšimněte si, že zprávy neobsahují tělo, ale pouze předmět, identifikaci message brokera atd. Už samotné téma totiž podává mnohdy dostatečnou informaci o tom, k jaké události došlo:

ActiveMQ.Advisory.Connection připojení či odpojení klienta
ActiveMQ.Advisory.Producer.Queue.test informace o připojení či odpojení producenta
ActiveMQ.Advisory.Consumer.Queue.test informace o připojení či odpojení konzumenta

V posledních dvou případech se mění atribut „producerCount“ popř. „consumerCount“.

13. Použití message brokerů z programovacího jazyka Go s využitím protokolu STOMP

Ve druhé části dnešního článku se seznámíme s tím, jakým způsobem je možné použít message brokera Apache ActiveMQ z programovacího jazyka Go. Pro jednoduchost použijeme protokol STOMP (Streaming Text Oriented Messaging Protocol), který, jak již víme, sice nedokáže využít všechny možnosti AMQ, ovšem základní architekturu systému založeného na kombinaci AMQ+STOMP celkem bez problémů dokážeme postavit (ostatně největší problémy nás většinou čekají při návrhu subsystému pro persistenci zpráv popř. pro clusterování a konfiguraci load balancingu). Implementaci knihovny s podporou protokolu STOMP určenou pro jazyk Go nalezneme v GitHub repositáři https://github.com/go-stomp/stomp, takže samotná instalace bude jednoduchá. Nejdříve přejdeme do adresáře, na nějž ukazuje proměnná prostředí GOPATH; typicky se jedná o adresář ~/go. Zde zadáme následující příkaz:

$ go get github.com/go-stomp/stomp

V adresářové struktuře by se měl objevit podadresář src/github.com/go-stomp/stomp a taktéž pkg/linux_amd64/github.com/go-stomp/stomp s přibližně následujícím obsahem:

.
├── pkg
│   └── linux_amd64
│       └── github.com
│           └── go-stomp
│               └── stomp.a
├── src
│   ├── github.com
│   │   └── go-stomp
│   │       └── stomp
│   │           ├── ack.go
│   │           ├── breaking_changes.md
│   │           ├── conn.go
│   │           ├── conn_options.go
│   │           ├── conn_test.go
│   │           │   └── testutil.go
...
...
...
...
...
...
│   │           ├── transaction.go
│   │           ├── validator.go
│   │           ├── version.go
│   │           └── version_test.go

14. Připojení k message brokerovi, práce se zprávami

Použití balíčku go-stomp/stomp je poměrně snadné. Nejdříve si ukážeme způsob připojení k message brokerovi s využitím protokolu STOMP. První důležitou věcí je pochopitelně import balíčku:

import "github.com/go-stomp/stomp"

Připojení je realizováno funkcí Dial, které se předá způsob připojení (zde přes TCP) a adresa serveru s portem (pro STOMP „localhost:61613“). Ve třetím parametru je možné v případě potřeby přidat další údaje nutné pro přihlášení, ale pro nezabezpečenou variantu STOMPu zde můžeme použít jen nil. Samozřejmě je nutné zkontrolovat případné chyby, ke kterým může dojít:

conn, err := stomp.Dial("tcp", serverAddr, nil)
if err != nil {
        println("cannot connect to server", err.Error())
        return
} else {
        println("connected to server", serverAddr)
}

Dále zajistíme, že spojení bude na konci příslušné funkce automaticky ukončeno. K tomu slouží příkaz defer:

defer conn.Disconnect()
Poznámka: popis příkazu defer naleznete zde.

Samotné poslání zprávy je již jednoduché – musíme jen specifikovat jméno fronty, MIME typ posílané zprávy (viz předchozí článek), samotnou zprávu reprezentovanou polem/řezem bajtů (konverzi je nutné provést explicitně) a případné další parametry (hlavičky atd.):

err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil)

Příjem zprávy je realizován odlišně a elegantně. Nejdříve se přihlásíme k nějaké frontě a zkontrolujeme, zda se přihlášení skutečně podařilo:

sub, err := conn.Subscribe(queueName, stomp.AckAuto)
if err != nil {
        println("cannot subscribe to", queueName, err.Error())
        return
}
close(subscribed)

První návratovou hodnotou funkce Subscribe je struktura obsahující mj. i kanál, z něhož můžeme provést (blokující) čtení, a to takto:

for {
        msg := <-sub.C
        text := string(msg.Body)
        ...
        ...
        ...
}

Pokud přijde nová zpráva, bude výsledkem msg := ← sub.C nová zpráva uložená do proměnné msg, v opačném případě se bude čekat na příchod další zprávy. Případné další paralelně běžící gorutiny nejsou touto operací nijak omezeny.

Poznámka: samozřejmě je možné, aby příjemce zpráv současně vykonával i další komunikaci. K tomu slouží mj. i konstrukce select popsaná v tomto článku.

15. Implementace producenta zpráv (publishera)

Všechny potřebné informace o základním použití protokolu STOMP v jazyku Go již máme, takže se podívejme na způsob implementace producenta deseti běžných zpráv, za nimiž následuje zpráva „EXIT“, která ukončí případného konzumenta zpráv:

package main
 
import (
        "fmt"
 
        "github.com/go-stomp/stomp"
)
 
const serverAddr = "localhost:61613"
const queueName = "/queue/go_test"
 
const messageCount = 10
 
func sendMessages() {
        conn, err := stomp.Dial("tcp", serverAddr, nil)
        if err != nil {
                println("cannot connect to server", err.Error())
                return
        } else {
                println("connected to server", serverAddr)
        }
        defer conn.Disconnect()
 
        for i := 1; i <= messageCount; i++ {
                text := fmt.Sprintf("Message #%d", i)
                err = conn.Send(queueName, "text/plain", []byte(text), nil)
                if err != nil {
                        println("failed to send to server", err)
                        return
                } else {
                        println("message sent")
                }
        }
        println("sending EXIT message")
        err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil)
        if err != nil {
                println("failed to send EXIT message to server", err)
                return
        } else {
                println("message sent")
        }
        println("sender finished")
}
 
func main() {
        sendMessages()
}

16. Implementace konzumenta zpráv

Naproti tomu konzument zpráv, který navíc dokáže zareagovat na zprávu s tělem „EXIT“, může vypadat následovně:

package main
 
import (
        "github.com/go-stomp/stomp"
)
 
const serverAddr = "localhost:61613"
const queueName = "/queue/go_test"
 
func receiveMessages() {
        conn, err := stomp.Dial("tcp", serverAddr, nil)
 
        if err != nil {
                println("cannot connect to server", err.Error())
                return
        } else {
                println("connected to server", serverAddr)
        }
 
        defer conn.Disconnect()
 
        sub, err := conn.Subscribe(queueName, stomp.AckAuto)
        if err != nil {
                println("cannot subscribe to", queueName, err.Error())
                return
        }
        for {
                msg := <-sub.C
                text := string(msg.Body)
                if text != "EXIT" {
                        println("Received message", text)
                } else {
                        println("Received EXIT command")
                        break
                }
        }
        println("receiver finished")
}
 
func main() {
        receiveMessages()
}

17. Úprava producenta a konzumenta tak, aby komunikace s message brokerem probíhala v gorutině

Příště si ukážeme některé další možnosti, které nám propojení Go+STOMP nabízí. Jen pro ukázku bude uveden producent zpráv, kde vlastní posílání zpráv probíhá ve vyhrazené gorutině, na jejíž ukončení čeká hlavní vlákno s využitím kanálu nazvaného stop:

package main
 
import (
        "fmt"
 
        "github.com/go-stomp/stomp"
)
 
const serverAddr = "localhost:61613"
const queueName = "/queue/go_test"
 
const messageCount = 10
 
var stop = make(chan bool)
 
func sendMessages() {
        defer func() {
                stop <- true
        }()
 
        conn, err := stomp.Dial("tcp", serverAddr, nil)
        if err != nil {
                println("cannot connect to server", err.Error())
                return
        } else {
                println("connected to server", serverAddr)
        }
        defer conn.Disconnect()
 
        for i := 1; i <= messageCount; i++ {
                text := fmt.Sprintf("Message #%d", i)
                err = conn.Send(queueName, "text/plain", []byte(text), nil)
                if err != nil {
                        println("failed to send to server", err)
                        return
                } else {
                        println("message sent")
                }
        }
        println("sending EXIT message")
        err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil)
        if err != nil {
                println("failed to send EXIT message to server", err)
                return
        } else {
                println("message sent")
        }
        println("sender finished")
}
 
func main() {
        go sendMessages()
 
        <-stop
}

Podobným způsobem můžeme vyřešit i konzumenta zpráv, který je upraven nepatrně odlišným způsobem:

package main
 
import (
        "github.com/go-stomp/stomp"
)
 
const serverAddr = "localhost:61613"
const queueName = "/queue/go_test"
 
var stop = make(chan bool)
 
func receiveMessages(subscribed chan bool) {
        defer func() {
                stop <- true
        }()
 
        conn, err := stomp.Dial("tcp", serverAddr, nil)
 
        if err != nil {
                println("cannot connect to server", err.Error())
                return
        } else {
                println("connected to server", serverAddr)
        }
 
        defer conn.Disconnect()
 
        sub, err := conn.Subscribe(queueName, stomp.AckAuto)
        if err != nil {
                println("cannot subscribe to", queueName, err.Error())
                return
        }
        close(subscribed)
 
        for {
                msg := <-sub.C
                text := string(msg.Body)
                if text != "EXIT" {
                        println("Received message", text)
                } else {
                        println("Received EXIT command")
                        break
                }
        }
        println("receiver finished")
}
 
func main() {
        subscribed := make(chan bool)
        go receiveMessages(subscribed)
 
        <-subscribed
 
        <-stop
}

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu a taktéž v jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.

Příklad Skript/kód Popis Cesta
1 publisher.py producent jediné zprávy používající protokol AMQP https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample01_publisher_subcri­ber_topic/publisher.py
1 subscriber.py příjemce jediné zprávy používající protokol AMQP https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample01_publisher_subcri­ber_topic/subscriber.py
       
2 publisher.py producent používající strategii PUSH-PULL https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample02_publisher_subcri­ber_queue/publisher.py
2 subscriber.py konzument používající strategii PUSH-PULL https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample02_publisher_subcri­ber_queue/subscriber.py
       
3 publisher.py producent deseti běžných zpráv a zprávy „exit“ https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample03_publisher_subcri­ber_all_messages/publisher­.py
3 subscriber.py konzument zpráv ukončený zprávou „exit“ https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample03_publisher_subcri­ber_all_messages/subscriber­.py
       
4 publisher.py producent deseti běžných zpráv a zprávy „exit“ https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample04_special_events/pu­blisher.py
4 subscriber.py producent deseti běžných zpráv a zprávy „exit“ https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample04_special_events/sub­scriber.py
4 special_events.py nástroj pro sledování speciálních témat https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample04_special_events/spe­cial_events.py
       
5 publisher.go producent zpráv naprogramovaný v Go https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull/publisher.go
5 subscriber.go konzument zpráv naprogramovaný v Go https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull/subscriber.go
       
6 publisher.go producent zpráv naprogramovaný v Go využívající gorutiny https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull-goroutines/publisher.go
6 subscriber.go konzument zpráv naprogramovaný v Go využívající gorutiny https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull-goroutines/subscriber.go

19. Odkazy na předchozí části seriálu

V této kapitole jsou uvedeny odkazy na všech deset předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:

  1. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
  2. Celery: systém implementující asynchronní fronty úloh pro Python
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/
  3. Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/
  4. RabbitMQ: jedna z nejúspěšnějších implementací brokera
    https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/
  5. Pokročilejší operace nabízené systémem RabbitMQ
    https://www.root.cz/clanky/po­krocilejsi-operace-nabizene-systemem-rabbitmq/
  6. ØMQ: knihovna pro asynchronní předávání zpráv
    https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/
  7. Další možnosti poskytované knihovnou ØMQ
    https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/
  8. Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
    https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/
  9. Apache ActiveMQ – další systém implementující message brokera
    https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/
  10. Použití Apache ActiveMQ s protokolem STOMP
    https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/

20. Odkazy na Internetu

  1. Stránka Apache Software Foundation
    http://www.apache.org/
  2. Informace o portu 5672
    http://www.tcp-udp-ports.com/port-5672.htm
  3. Třída MessagingHandler knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._handlers.MessagingHan­dler-class.html
  4. Třída Event knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._events.Event-class.html
  5. package stomp (Go)
    https://godoc.org/github.com/go-stomp/stomp
  6. Go language library for STOMP protocol
    https://github.com/go-stomp/stomp
  7. python-qpid-proton 0.26.0 na PyPi
    https://pypi.org/project/python-qpid-proton/
  8. Qpid Proton
    http://qpid.apache.org/proton/
  9. Using the AMQ Python Client
    https://access.redhat.com/do­cumentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/
  10. Apache ActiveMQ
    http://activemq.apache.org/
  11. Apache ActiveMQ Artemis
    https://activemq.apache.org/artemis/
  12. Apache ActiveMQ Artemis User Manual
    https://activemq.apache.or­g/artemis/docs/latest/index­.html
  13. KahaDB
    http://activemq.apache.or­g/kahadb.html
  14. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html
  15. Command Line Tools (Apache ActiveMQ)
    https://activemq.apache.org/activemq-command-line-tools-reference.html
  16. stomp.py 4.1.21 na PyPi
    https://pypi.org/project/stomp.py/
  17. Stomp Tutorial
    https://access.redhat.com/do­cumentation/en-US/Fuse_Message_Broker/5.5/html/Con­nectivity_Guide/files/FMBCon­nectivityStompTelnet.html
  18. Heartbeat (computing)
    https://en.wikipedia.org/wi­ki/Heartbeat_(computing)
  19. Apache Camel
    https://camel.apache.org/
  20. Red Hat Fuse
    https://developers.redhat­.com/products/fuse/overvi­ew/
  21. Confusion between ActiveMQ and ActiveMQ-Artemis?
    https://serverfault.com/qu­estions/873533/confusion-between-activemq-and-activemq-artemis
  22. Staré stránky projektu HornetQ
    http://hornetq.jboss.org/
  23. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  24. Difference between ActiveMQ vs Apache ActiveMQ Artemis
    http://activemq.2283324.n4­.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html
  25. Microservices communications. Why you should switch to message queues
    https://dev.to/matteojoli­veau/microservices-communications-why-you-should-switch-to-message-queues–48ia
  26. Stomp.py 4.1.19 documentation
    https://stomppy.readthedoc­s.io/en/stable/
  27. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  28. ØMQ – Distributed Messaging
    http://zeromq.org/
  29. ØMQ Community
    http://zeromq.org/community
  30. Get The Software
    http://zeromq.org/intro:get-the-software
  31. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  32. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  33. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  34. ZeroMQ RFC
    https://rfc.zeromq.org/
  35. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  36. zeromq/czmq
    https://github.com/zeromq/czmq
  37. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  38. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  39. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  40. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  41. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  42. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  43. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  44. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  45. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  46. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  47. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  48. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  49. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  50. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  51. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  52. Python binding
    http://zeromq.org/bindings:python
  53. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  54. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  55. About Nanomsg
    https://nanomsg.org/
  56. Advanced Message Queuing Protocol
    https://www.amqp.org/
  57. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  58. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  59. RabbitMQ
    https://www.rabbitmq.com/
  60. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  61. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  62. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  63. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  64. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  65. Erlang
    http://www.erlang.org/
  66. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  67. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  68. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  69. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  70. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  71. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  72. celery na PyPi
    https://pypi.org/project/celery/
  73. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  74. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  75. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  76. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  77. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  78. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  79. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  80. node-celery
    https://github.com/mher/node-celery
  81. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  82. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  83. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  84. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  85. Stránky projektu Redis
    https://redis.io/
  86. Introduction to Redis
    https://redis.io/topics/introduction
  87. Try Redis
    http://try.redis.io/
  88. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  89. Python Redis
    https://redislabs.com/lp/python-redis/
  90. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  91. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  92. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  93. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  94. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  95. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  96. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  97. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  98. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  99. Redis Persistence
    https://redis.io/topics/persistence
  100. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  101. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  102. Ost (knihovna)
    https://github.com/soveran/ost
  103. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  104. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  105. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  106. What Is Sharding?
    https://btcmanager.com/what-sharding/
  107. Redis clients
    https://redis.io/clients
  108. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  109. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  110. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  111. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  112. Resque
    https://github.com/resque/resque
  113. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  114. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  115. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  116. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  117. Pub/Sub
    https://redis.io/topics/pubsub
  118. ZeroMQ distributed messaging
    http://zeromq.org/
  119. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  120. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  121. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  122. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  123. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  124. Redis Protocol specification
    https://redis.io/topics/protocol
  125. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  126. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  127. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  128. Apache Kafka
    https://kafka.apache.org/
  129. Iron
    http://www.iron.io/mq
  130. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  131. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  132. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  133. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  134. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  135. Enqueueing internals
    http://python-rq.org/contrib/
  136. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  137. Queue – A thread-safe FIFO implementation
    https://pymotw.com/2/Queue/
  138. Queues
    http://queues.io/
  139. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  140. RestMQ
    http://restmq.com/
  141. ActiveMQ
    http://activemq.apache.org/
  142. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  143. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  144. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  145. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  146. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  147. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  148. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  149. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  150. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  151. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  152. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  153. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  154. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  155. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  156. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  157. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  158. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  159. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  160. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  161. Jupyter
    https://jupyter.org/
  162. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  163. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html