Obsah
1. Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
2. Instalace knihovny Qpid Proton
3. Producent zpráv publikovaných pod zvoleným tématem
4. Příjemce zpráv s nastaveným tématem
5. Producent zpráv posílaných do fronty
6. Konzument zpráv z vybrané fronty
7. Logika implementace producenta většího množství zpráv
8. Konzument zpráv ukončovaný explicitně poslanou zprávou EXIT
9. Speciální témata vytvářená automaticky systémem AMQ
10. Příprava běžného producenta a konzumenta zpráv
11. Jednoduchý nástroj pro sledování speciálních témat
12. Analýza výsledků práce nástroje
13. Použití message brokerů z programovacího jazyka Go s využitím protokolu STOMP
14. Připojení k message brokerovi, práce se zprávami
15. Implementace producenta zpráv (publishera)
16. Implementace konzumenta zpráv
17. Úprava producenta a konzumenta tak, aby komunikace s message brokerem probíhala v gorutině
18. Repositář s demonstračními příklady
19. Odkazy na předchozí části seriálu
1. Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
Dnešní část seriálu o systémech implementujících fronty zpráv bude rozdělena do dvou částí. V první části si na několika co nejjednodušších demonstračních příkladech ukážeme, jakým způsobem je možné komunikovat se systémem Apache ActiveMQ s využitím protokolu AMQP. Příklady budou naprogramovány v Pythonu, přičemž vlastní realizaci komunikačního protokolu bude zajišťovat knihovna pojmenovaná Qpid Proton, kterou nalezneme na adrese https://qpid.apache.org/proton/ (tato knihovna podporuje například i již popsaného message brokera RabbitMQ). Zajímavé je, že programátorské rozhraní knihovny Qpid Proton je zcela odlišné od rozhraní protokolu STOMP, které bylo realizováno v knihovně stomp.py, viz též předchozí část tohoto seriálu s popisem možností poskytovaných touto knihovnou.
Druhá část článku je věnována nepatrně odlišnému tématu. Ukážeme si v ní způsob realizace klientů vyvinutých v programovacím jazyce Go, kterému se podrobněji věnujeme v paralelně běžícím seriálu. Uvidíme, že díky některým vlastnostem programovacího jazyka Go, zejména díky kanálům a gorutinám, je implementace klientů message brokera v mnoha ohledech velmi elegantní.
<transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>
2. Instalace knihovny Qpid Proton
V demonstračních příkladech, které si popíšeme a ukážeme v navazujících kapitolách, použijeme pro komunikaci se systémem Apache ActiveMQ knihovnu nazvanou Qpid Proton, která je určena pro programovací jazyk Python. Tato knihovna je – jak je dnes již dobrým zvykem – dostupná na PyPi (Python Package Indexu), ovšem před její instalací je nutné nainstalovat i Cython, a to v samostatném kroku. Cython, s nímž jsme se seznámili v jiném článku, nainstalujeme pro právě přihlášeného uživatele následujícím příkazem:
$ pip3 install --user cython Collecting cython Downloading https://files.pythonhosted.org/packages/16/98/49aa24054e99e9c7734e49d6996662f547e4e2faae0051d35fbbc461afa4/Cython-0.29.5-cp36-cp36m-manylinux1_x86_64.whl (2.1MB) 100% |████████████████████████████████| 2.1MB 603kB/s Installing collected packages: cython Successfully installed cython-0.29.5
Po (doufejme že úspěšné) instalaci Cythonu již můžeme bez problémů nainstalovat i knihovnu Qpid Proton, a to takto:
$ pip3 install --user python-qpid-proton Collecting python-qpid-proton Downloading https://files.pythonhosted.org/packages/fe/e3/58379a2262a31788cfa037bde2c0c77273a74f34943f73abf20030c71f17/python-qpid-proton-0.26.0.zip (563kB) 100% |████████████████████████████████| 573kB 1.1MB/s Installing collected packages: python-qpid-proton Running setup.py install for python-qpid-proton ... done Successfully installed python-qpid-proton-0.26.0
Základní kontrolu instalace můžeme provést přímo z interaktivní smyčky jazyka Python. Použijeme interpret Pythonu 3, protože i instalace byla provedena pro Python 3:
$ python3 Python 3.6.3 (default, Oct 9 2017, 12:11:29) [GCC 7.2.1 20170915 (Red Hat 7.2.1-2)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import proton >>> help("proton")
Po potvrzení posledního příkazu by se měla zobrazit nápověda:
Help on package proton: NAME proton DESCRIPTION The proton module defines a suite of APIs that implement the AMQP 1.0 protocol. The proton APIs consist of the following classes: - L{Message} -- A class for creating and/or accessing AMQP message content. - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded data. PACKAGE CONTENTS
3. Producent zpráv publikovaných pod zvoleným tématem
Instalaci knihovny Qpid Proton jsme dokončili, takže si nyní můžeme ukázat způsob implementace jednoduchého producenta zpráv, které budou přes message brokera posílány s využitím strategie PUB-SUB – zprávy tedy nebudou ukládány do fronty, ale budou přímo přeposlány všem posluchačům, kteří jsou v daný okamžik k message brokerovi připojeni.
Ve výchozím nastavení systému Apache ActiveMQ je pro komunikaci s využitím protokolu AMQP používán port 5672, který je dokonce k tomuto účelu rezervován. Doménové jméno serveru tedy společně s portem 5672 použijeme pro určení jeho adresy ve formátu, v jakém ho očekává knihovna Qpid:
ADDRESS = "localhost:5672"
nebo:
ADDRESS = "192.168.1.42:5672"
popř.:
ADDRESS = "foo.bar.baz:5672"
Určit musíme i cíl zprávy. Ten typicky začíná prefixem „topic://“ nebo „queue://“. V prvním příkladu použijeme téma (topic) nazvané „event“, takže se cíl zapíše následovně (vypadá trošku jako URL i s uvedením „protokolu“):
TARGET = "topic://event"
Samotný producent zprávy se inicializuje a následně spustí takto (ve vlastním kontejneru, což je ovšem termín knihovny Qpid):
publisher = Publisher(ADDRESS, TARGET) container = Container(publisher) container.run()
Samozřejmě nám zbývá implementovat vlastní logiku producenta, tj. musíme naprogramovat třídu nazvanou Publisher, která byla zmíněna v předchozích programových řádcích. Tato třída je odvozena od třídy MessagingHandler, v níž je předepsáno několik callback metod volaných samotnou knihovnou Qpid Proton na základě různých událostí (Event), které nastávají v průběhu života producenta zpráv. Prozatím nás budou zajímat čtyři typy událostí:
- on_start – spuštění producenta v kontejneru
- on_accepted – akceptace poslané zprávy
- on_disconnected – odpojení od message brokera
- on_sendable – systém je připraven na poslání další zprávy
V nejjednodušším případě – což je výchozí stav – budou handlery prázdné, ovšem povšimněte si, že se každému handleru skutečně předává objekt, který reprezentuje vzniklou událost:
class Publisher(MessagingHandler): def __init__(self, url, target): pass def on_start(self, event): pass def on_sendable(self, event): pass def on_accepted(self, event): pass def on_disconnected(self, event): pass
Konkrétní implementace producenta zpráv samozřejmě bude složitější, což je patrné i při pohledu na jeho úplný zdrojový kód:
from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" TARGET = "topic://event" class Publisher(MessagingHandler): def __init__(self, url, target): super(Publisher, self).__init__() self.url = url self.target = target self.message_sent = False def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_sender(connection, self.target) def on_sendable(self, event): print("on_sendable()") if not self.message_sent: message = Message(id=1, body="Hello world1") event.sender.send(message) self.message_sent = True print("Message has been sent") else: print("Already sent... do nothing") def on_accepted(self, event): print("on_accepted()") if self.message_sent: event.connection.close() def on_disconnected(self, event): print("on_disconnected()") publisher = Publisher(ADDRESS, TARGET) container = Container(publisher) container.run()
4. Příjemce zpráv s nastaveným tématem
Příjemce zprávy se vytvoří velmi podobným způsobem jako její producent:
subscriber = Subscriber(ADDRESS, SOURCE) container = Container(subscriber) container.run()
V nejjednodušším případě bude implementace třídy Subscriber odvozené od nám již známé třídy MessagingHandler obsahovat jen dvě callback metody:
class Subscriber(MessagingHandler): def __init__(self, url, source): pass def on_start(self, event): pass def on_message(self, event): pass
Ve chvíli, kdy je automaticky zavolána metoda on_start, se můžeme připojit k message brokerovi a přihlásit se k odebírání zpráv s nějakým tématem:
connection = event.container.connect(self.url) event.container.create_receiver(connection, self.source)
Samotný příjem zprávy je již snadný, protože pro každou zprávu, kterou z message brokera získáme, je zavolána callback metoda on_message. Zpráva je součástí parametru event, text zprávy pak v atributu body:
def on_message(self, event): message = event.message print("Received message '{m}'".format(m=message.body)) event.receiver.close() event.connection.close()
Opět si ukažme úplný zdrojový kód tohoto příkladu:
from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" SOURCE = "topic://event" class Subscriber(MessagingHandler): def __init__(self, url, source): super(Subscriber, self).__init__() self.url = url self.source = source def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_receiver(connection, self.source) def on_message(self, event): message = event.message print("Received message '{m}'".format(m=message.body)) event.receiver.close() event.connection.close() subscriber = Subscriber(ADDRESS, SOURCE) container = Container(subscriber) container.run()
5. Producent zpráv posílaných do fronty
Pokud budeme chtít namísto komunikace typu PUB-SUB použít strategii PUSH-PULL budou změny provedené v producentovi i konzumentovi minimální, i když samotný transport zpráv bude naprosto odlišný (to je však záležitostí samotného message brokera, nikoli klientů, kteří se k němu připojují). Změní se pouze cíl, a to z:
TARGET = "topic://event"
na:
TARGET = "queue://test"
Upravený zdrojový kód producenta zpráv posílaných do fronty „test“ bude vypadat následovně:
from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" TARGET = "queue://test" class Publisher(MessagingHandler): def __init__(self, url, target): super(Publisher, self).__init__() self.url = url self.target = target self.message_sent = False def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_sender(connection, self.target) def on_sendable(self, event): print("on_sendable()") if not self.message_sent: message = Message(id=1, body="Hello world1") event.sender.send(message) self.message_sent = True print("Message has been sent") else: print("Already sent... do nothing") def on_accepted(self, event): print("on_accepted()") if self.message_sent: event.connection.close() def on_disconnected(self, event): print("on_disconnected()") publisher = Publisher(ADDRESS, TARGET) container = Container(publisher) container.run()
6. Konzument zpráv z vybrané fronty
Kód konzumenta, který bude postupně vybírat zprávy z fronty nazvané „test“, nalezneme na adrese https://github.com/tisnik/message-queues-examples/blob/master/amqp/example02_publisher_subcriber_queue/subscriber.py:
from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" SOURCE = "queue://test" class Subscriber(MessagingHandler): def __init__(self, url, source): super(Subscriber, self).__init__() self.url = url self.source = source def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_receiver(connection, self.source) def on_message(self, event): message = event.message print("Received message '{m}'".format(m=message.body)) event.receiver.close() event.connection.close() subscriber = Subscriber(ADDRESS, SOURCE) container = Container(subscriber) container.run()
7. Logika implementace producenta většího množství zpráv
Nyní si ukažme, jak se implementuje producent většího množství zpráv. Kvůli tomu, že producent veškerou komunikaci řídí z callback metod, bude implementace nepatrně složitější, než tomu bylo při použití jiných protokolů. Nejdříve ve třídě Publisher v konstruktoru vytvoříme dva pomocné atributy s počtem poslaných zpráv a počtem zpráv akceptovaných:
self.message_sent = 0 self.message_accepted = 0
Dále budeme v callback metodě on_sendable postupně posílat běžné zprávy a nakonec i zprávu s textem „exit“, kterou budeme řídit konzumenta zpráv:
if self.message_sent < MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="Hello world #{n}".format(n=self.message_sent)) event.sender.send(message) elif self.message_sent == MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="exit") event.sender.send(message)
V callback metodě on_accepted pouze zajistíme, aby se připojení ukončilo, ale až po odeslání všech zpráv (ne dříve!):
self.message_accepted += 1 if self.message_accepted == self.message_sent: print("All messages confirmed") event.connection.close()
Upravený zdrojový kód producenta zpráv bude vypadat následovně:
from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" TARGET = "queue://test" MAX_MESSAGES = 10 class Publisher(MessagingHandler): def __init__(self, url, target): super(Publisher, self).__init__() self.url = url self.target = target self.message_sent = 0 self.message_accepted = 0 def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_sender(connection, self.target) def on_sendable(self, event): print("on_sendable()") if self.message_sent < MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="Hello world #{n}".format(n=self.message_sent)) event.sender.send(message) print("Message #{n} has been sent".format(n=self.message_sent)) elif self.message_sent == MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="exit") event.sender.send(message) print("Exit message") def on_accepted(self, event): print("on_accepted()") self.message_accepted += 1 if self.message_accepted == self.message_sent: print("All messages confirmed") event.connection.close() print("Number of accepted messages: {n}".format(n=self.message_accepted)) def on_disconnected(self, event): print("on_disconnected()") publisher = Publisher(ADDRESS, TARGET) container = Container(publisher) container.run()
8. Konzument zpráv ukončovaný explicitně poslanou zprávou EXIT
Samotný konzument zpráv se příliš nezmění, ovšem odlišná bude logika naprogramovaná v callback metodě on_message. V této metodě totiž spojení ukončíme pouze v tom případě, že byla přijata zpráva „exit“:
message = event.message print("Received message '{m}'".format(m=message.body)) if message.body == "exit": print("Last message for me...good bye") event.receiver.close() event.connection.close()
Samozřejmě si opět ukážeme celý zdrojový kód konzumenta:
from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" SOURCE = "queue://test" class Subscriber(MessagingHandler): def __init__(self, url, source): super(Subscriber, self).__init__() self.url = url self.source = source def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_receiver(connection, self.source) def on_message(self, event): message = event.message print("Received message '{m}'".format(m=message.body)) if message.body == "exit": print("Last message for me...good bye") event.receiver.close() event.connection.close() subscriber = Subscriber(ADDRESS, SOURCE) container = Container(subscriber) container.run()
9. Speciální témata vytvářená automaticky systémem AMQ
Pokud se podíváme na administrátorské rozhraní systému Apache ActiveMQ ve chvíli, kdy se používá komunikační protokol AMQP, zjistíme, že se v sekci „Topics“ automaticky vytvořila čtyři nová témata:
ActiveMQ.Advisory.Connection |
ActiveMQ.Advisory.Queue |
ActiveMQ.Advisory.Topic |
ActiveMQ.Advisory.MasterBroker |
Pro každou používanou frontu se vytvoří ještě další dvojice témat, takže například pokud používáme frontu nazvanou „test“, uvidíme tato dvě témata:
ActiveMQ.Advisory.Consumer.Queue.test |
ActiveMQ.Advisory.Producer.Queue.test |
Tato témata, resp. přesněji řečeno zprávy, které se zde publikují, je možné využít pro sledování aktuálního stavu celého message brokera. Zpracování těchto zpráv je jednoduché, pouze si musíme ukázat jejich formát.
10. Příprava běžného producenta a konzumenta zpráv
Nejprve si připravíme běžného producenta zpráv, který se nijak neliší od producentů předchozích:
from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" TARGET = "queue://test" MAX_MESSAGES = 100 class Publisher(MessagingHandler): def __init__(self, url, target): super(Publisher, self).__init__() self.url = url self.target = target self.message_sent = 0 self.message_accepted = 0 def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_sender(connection, self.target) def on_sendable(self, event): print("on_sendable()") if self.message_sent < MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="Hello world #{n}".format(n=self.message_sent)) event.sender.send(message) print("Message #{n} has been sent".format(n=self.message_sent)) elif self.message_sent == MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="exit") event.sender.send(message) print("Exit message") def on_accepted(self, event): print("on_accepted()") self.message_accepted += 1 if self.message_accepted == self.message_sent: print("All messages confirmed") event.connection.close() print("Number of accepted messages: {n}".format(n=self.message_accepted)) def on_disconnected(self, event): print("on_disconnected()") publisher = Publisher(ADDRESS, TARGET) container = Container(publisher) container.run()
Konzument zpráv je upravený nepatrně – každou zprávu bude zpracovávat minimálně jednu sekundu, aby bylo možné mezitím sledovat stav speciálních témat:
import time from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" SOURCE = "queue://test" class Subscriber(MessagingHandler): def __init__(self, url, source): super(Subscriber, self).__init__() self.url = url self.source = source def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_receiver(connection, self.source) def on_message(self, event): message = event.message print("Received message '{m}'".format(m=message.body)) if message.body == "exit": print("Last message for me...good bye") event.receiver.close() event.connection.close() time.sleep(1) subscriber = Subscriber(ADDRESS, SOURCE) container = Container(subscriber) container.run()
11. Jednoduchý nástroj pro sledování speciálních témat
Samotný nástroj, který bude speciální témata sledovat, se vlastně žádným zásadním způsobem neodlišuje od konzumenta zpráv. Pouze musíme zajistit, aby došlo k sledování všech zajímavých témat; nikdo nám samozřejmě nebrání sledovat více témat současně:
TOPICS = ( "ActiveMQ.Advisory.Connection", "ActiveMQ.Advisory.Consumer.Queue.test", "ActiveMQ.Advisory.MasterBroker", "ActiveMQ.Advisory.Producer.Queue.test", "ActiveMQ.Advisory.Queue", "ActiveMQ.Advisory.Topic" )
Přihlášení ke všem tématům je provedeno v callback metodě on_start:
connection = event.container.connect(self.url) for topic in TOPICS: event.container.create_receiver(connection, "topic://" + topic)
Následuje výpis zdrojového kódu tohoto nástroje:
from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" TOPICS = ( "ActiveMQ.Advisory.Connection", "ActiveMQ.Advisory.Consumer.Queue.test", "ActiveMQ.Advisory.MasterBroker", "ActiveMQ.Advisory.Producer.Queue.test", "ActiveMQ.Advisory.Queue", "ActiveMQ.Advisory.Topic" ) class Subscriber(MessagingHandler): def __init__(self, url): super(Subscriber, self).__init__() self.url = url def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) for topic in TOPICS: event.container.create_receiver(connection, "topic://" + topic) def on_message(self, event): message = event.message print(message) subscriber = Subscriber(ADDRESS) container = Container(subscriber) container.run()
12. Analýza výsledků práce nástroje
Po spuštění producenta zpráv, jejich konzumenta a současně i nástroje pro sledování speciálních témat by tento nástroj měl zobrazit tyto zprávy:
Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:374", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Producer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:375", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "producerCount"=1, "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Producer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:376", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "producerCount"=0, "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:377", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:378", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Consumer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:379", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "consumerCount"=1, "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Consumer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:380", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "consumerCount"=0, "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:381", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}}
Povšimněte si, že zprávy neobsahují tělo, ale pouze předmět, identifikaci message brokera atd. Už samotné téma totiž podává mnohdy dostatečnou informaci o tom, k jaké události došlo:
ActiveMQ.Advisory.Connection | připojení či odpojení klienta |
ActiveMQ.Advisory.Producer.Queue.test | informace o připojení či odpojení producenta |
ActiveMQ.Advisory.Consumer.Queue.test | informace o připojení či odpojení konzumenta |
V posledních dvou případech se mění atribut „producerCount“ popř. „consumerCount“.
13. Použití message brokerů z programovacího jazyka Go s využitím protokolu STOMP
Ve druhé části dnešního článku se seznámíme s tím, jakým způsobem je možné použít message brokera Apache ActiveMQ z programovacího jazyka Go. Pro jednoduchost použijeme protokol STOMP (Streaming Text Oriented Messaging Protocol), který, jak již víme, sice nedokáže využít všechny možnosti AMQ, ovšem základní architekturu systému založeného na kombinaci AMQ+STOMP celkem bez problémů dokážeme postavit (ostatně největší problémy nás většinou čekají při návrhu subsystému pro persistenci zpráv popř. pro clusterování a konfiguraci load balancingu). Implementaci knihovny s podporou protokolu STOMP určenou pro jazyk Go nalezneme v GitHub repositáři https://github.com/go-stomp/stomp, takže samotná instalace bude jednoduchá. Nejdříve přejdeme do adresáře, na nějž ukazuje proměnná prostředí GOPATH; typicky se jedná o adresář ~/go. Zde zadáme následující příkaz:
$ go get github.com/go-stomp/stomp
V adresářové struktuře by se měl objevit podadresář src/github.com/go-stomp/stomp a taktéž pkg/linux_amd64/github.com/go-stomp/stomp s přibližně následujícím obsahem:
. ├── pkg │ └── linux_amd64 │ └── github.com │ └── go-stomp │ └── stomp.a ├── src │ ├── github.com │ │ └── go-stomp │ │ └── stomp │ │ ├── ack.go │ │ ├── breaking_changes.md │ │ ├── conn.go │ │ ├── conn_options.go │ │ ├── conn_test.go │ │ │ └── testutil.go ... ... ... ... ... ... │ │ ├── transaction.go │ │ ├── validator.go │ │ ├── version.go │ │ └── version_test.go
14. Připojení k message brokerovi, práce se zprávami
Použití balíčku go-stomp/stomp je poměrně snadné. Nejdříve si ukážeme způsob připojení k message brokerovi s využitím protokolu STOMP. První důležitou věcí je pochopitelně import balíčku:
import "github.com/go-stomp/stomp"
Připojení je realizováno funkcí Dial, které se předá způsob připojení (zde přes TCP) a adresa serveru s portem (pro STOMP „localhost:61613“). Ve třetím parametru je možné v případě potřeby přidat další údaje nutné pro přihlášení, ale pro nezabezpečenou variantu STOMPu zde můžeme použít jen nil. Samozřejmě je nutné zkontrolovat případné chyby, ke kterým může dojít:
conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) }
Dále zajistíme, že spojení bude na konci příslušné funkce automaticky ukončeno. K tomu slouží příkaz defer:
defer conn.Disconnect()
Samotné poslání zprávy je již jednoduché – musíme jen specifikovat jméno fronty, MIME typ posílané zprávy (viz předchozí článek), samotnou zprávu reprezentovanou polem/řezem bajtů (konverzi je nutné provést explicitně) a případné další parametry (hlavičky atd.):
err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil)
Příjem zprávy je realizován odlišně a elegantně. Nejdříve se přihlásíme k nějaké frontě a zkontrolujeme, zda se přihlášení skutečně podařilo:
sub, err := conn.Subscribe(queueName, stomp.AckAuto) if err != nil { println("cannot subscribe to", queueName, err.Error()) return } close(subscribed)
První návratovou hodnotou funkce Subscribe je struktura obsahující mj. i kanál, z něhož můžeme provést (blokující) čtení, a to takto:
for { msg := <-sub.C text := string(msg.Body) ... ... ... }
Pokud přijde nová zpráva, bude výsledkem msg := ← sub.C nová zpráva uložená do proměnné msg, v opačném případě se bude čekat na příchod další zprávy. Případné další paralelně běžící gorutiny nejsou touto operací nijak omezeny.
15. Implementace producenta zpráv (publishera)
Všechny potřebné informace o základním použití protokolu STOMP v jazyku Go již máme, takže se podívejme na způsob implementace producenta deseti běžných zpráv, za nimiž následuje zpráva „EXIT“, která ukončí případného konzumenta zpráv:
package main import ( "fmt" "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" const messageCount = 10 func sendMessages() { conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() for i := 1; i <= messageCount; i++ { text := fmt.Sprintf("Message #%d", i) err = conn.Send(queueName, "text/plain", []byte(text), nil) if err != nil { println("failed to send to server", err) return } else { println("message sent") } } println("sending EXIT message") err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil) if err != nil { println("failed to send EXIT message to server", err) return } else { println("message sent") } println("sender finished") } func main() { sendMessages() }
16. Implementace konzumenta zpráv
Naproti tomu konzument zpráv, který navíc dokáže zareagovat na zprávu s tělem „EXIT“, může vypadat následovně:
package main import ( "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" func receiveMessages() { conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() sub, err := conn.Subscribe(queueName, stomp.AckAuto) if err != nil { println("cannot subscribe to", queueName, err.Error()) return } for { msg := <-sub.C text := string(msg.Body) if text != "EXIT" { println("Received message", text) } else { println("Received EXIT command") break } } println("receiver finished") } func main() { receiveMessages() }
17. Úprava producenta a konzumenta tak, aby komunikace s message brokerem probíhala v gorutině
Příště si ukážeme některé další možnosti, které nám propojení Go+STOMP nabízí. Jen pro ukázku bude uveden producent zpráv, kde vlastní posílání zpráv probíhá ve vyhrazené gorutině, na jejíž ukončení čeká hlavní vlákno s využitím kanálu nazvaného stop:
package main import ( "fmt" "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" const messageCount = 10 var stop = make(chan bool) func sendMessages() { defer func() { stop <- true }() conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() for i := 1; i <= messageCount; i++ { text := fmt.Sprintf("Message #%d", i) err = conn.Send(queueName, "text/plain", []byte(text), nil) if err != nil { println("failed to send to server", err) return } else { println("message sent") } } println("sending EXIT message") err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil) if err != nil { println("failed to send EXIT message to server", err) return } else { println("message sent") } println("sender finished") } func main() { go sendMessages() <-stop }
Podobným způsobem můžeme vyřešit i konzumenta zpráv, který je upraven nepatrně odlišným způsobem:
package main import ( "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" var stop = make(chan bool) func receiveMessages(subscribed chan bool) { defer func() { stop <- true }() conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() sub, err := conn.Subscribe(queueName, stomp.AckAuto) if err != nil { println("cannot subscribe to", queueName, err.Error()) return } close(subscribed) for { msg := <-sub.C text := string(msg.Body) if text != "EXIT" { println("Received message", text) } else { println("Received EXIT command") break } } println("receiver finished") } func main() { subscribed := make(chan bool) go receiveMessages(subscribed) <-subscribed <-stop }
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu a taktéž v jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.
19. Odkazy na předchozí části seriálu
V této kapitole jsou uvedeny odkazy na všech deset předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:
- Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ - Celery: systém implementující asynchronní fronty úloh pro Python
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/ - Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/ - RabbitMQ: jedna z nejúspěšnějších implementací brokera
https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/ - Pokročilejší operace nabízené systémem RabbitMQ
https://www.root.cz/clanky/pokrocilejsi-operace-nabizene-systemem-rabbitmq/ - ØMQ: knihovna pro asynchronní předávání zpráv
https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/ - Další možnosti poskytované knihovnou ØMQ
https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/ - Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/ - Apache ActiveMQ – další systém implementující message brokera
https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/ - Použití Apache ActiveMQ s protokolem STOMP
https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/
20. Odkazy na Internetu
- Stránka Apache Software Foundation
http://www.apache.org/ - Informace o portu 5672
http://www.tcp-udp-ports.com/port-5672.htm - Třída MessagingHandler knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._handlers.MessagingHandler-class.html - Třída Event knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._events.Event-class.html - package stomp (Go)
https://godoc.org/github.com/go-stomp/stomp - Go language library for STOMP protocol
https://github.com/go-stomp/stomp - python-qpid-proton 0.26.0 na PyPi
https://pypi.org/project/python-qpid-proton/ - Qpid Proton
http://qpid.apache.org/proton/ - Using the AMQ Python Client
https://access.redhat.com/documentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/ - Apache ActiveMQ
http://activemq.apache.org/ - Apache ActiveMQ Artemis
https://activemq.apache.org/artemis/ - Apache ActiveMQ Artemis User Manual
https://activemq.apache.org/artemis/docs/latest/index.html - KahaDB
http://activemq.apache.org/kahadb.html - Understanding the KahaDB Message Store
https://access.redhat.com/documentation/en-US/Fuse_MQ_Enterprise/7.1/html/Configuring_Broker_Persistence/files/KahaDBOverview.html - Command Line Tools (Apache ActiveMQ)
https://activemq.apache.org/activemq-command-line-tools-reference.html - stomp.py 4.1.21 na PyPi
https://pypi.org/project/stomp.py/ - Stomp Tutorial
https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.5/html/Connectivity_Guide/files/FMBConnectivityStompTelnet.html - Heartbeat (computing)
https://en.wikipedia.org/wiki/Heartbeat_(computing) - Apache Camel
https://camel.apache.org/ - Red Hat Fuse
https://developers.redhat.com/products/fuse/overview/ - Confusion between ActiveMQ and ActiveMQ-Artemis?
https://serverfault.com/questions/873533/confusion-between-activemq-and-activemq-artemis - Staré stránky projektu HornetQ
http://hornetq.jboss.org/ - Snapshot JeroMQ verze 0.4.4
https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/ - Difference between ActiveMQ vs Apache ActiveMQ Artemis
http://activemq.2283324.n4.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html - Microservices communications. Why you should switch to message queues
https://dev.to/matteojoliveau/microservices-communications-why-you-should-switch-to-message-queues–48ia - Stomp.py 4.1.19 documentation
https://stomppy.readthedocs.io/en/stable/ - Repositář knihovny JeroMQ
https://github.com/zeromq/jeromq/ - ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queue – A thread-safe FIFO implementation
https://pymotw.com/2/Queue/ - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - How IPython and Jupyter Notebook work
https://jupyter.readthedocs.io/en/latest/architecture/how_jupyter_ipython_work.html - Context Managers
http://book.pythontips.com/en/latest/context_managers.html