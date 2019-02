11. Jednoduchý nástroj pro sledování speciálních témat

1. Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři

Dnešní část seriálu o systémech implementujících fronty zpráv bude rozdělena do dvou částí. V první části si na několika co nejjednodušších demonstračních příkladech ukážeme, jakým způsobem je možné komunikovat se systémem Apache ActiveMQ s využitím protokolu AMQP. Příklady budou naprogramovány v Pythonu, přičemž vlastní realizaci komunikačního protokolu bude zajišťovat knihovna pojmenovaná Qpid Proton, kterou nalezneme na adrese https://qpid.apache.org/proton/ (tato knihovna podporuje například i již popsaného message brokera RabbitMQ). Zajímavé je, že programátorské rozhraní knihovny Qpid Proton je zcela odlišné od rozhraní protokolu STOMP, které bylo realizováno v knihovně stomp.py, viz též předchozí část tohoto seriálu s popisem možností poskytovaných touto knihovnou.

Druhá část článku je věnována nepatrně odlišnému tématu. Ukážeme si v ní způsob realizace klientů vyvinutých v programovacím jazyce Go, kterému se podrobněji věnujeme v paralelně běžícím seriálu. Uvidíme, že díky některým vlastnostem programovacího jazyka Go, zejména díky kanálům a gorutinám, je implementace klientů message brokera v mnoha ohledech velmi elegantní.

Poznámka: před spuštěním demonstračních příkladů si zkontrolujte, zda soubor conf/activemq.xml

<transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>

2. Instalace knihovny Qpid Proton

obsahuje konfiguraci protokolů STOMP i AMQP – viz zvýrazněné části:

V demonstračních příkladech, které si popíšeme a ukážeme v navazujících kapitolách, použijeme pro komunikaci se systémem Apache ActiveMQ knihovnu nazvanou Qpid Proton, která je určena pro programovací jazyk Python. Tato knihovna je – jak je dnes již dobrým zvykem – dostupná na PyPi (Python Package Indexu), ovšem před její instalací je nutné nainstalovat i Cython, a to v samostatném kroku. Cython, s nímž jsme se seznámili v jiném článku, nainstalujeme pro právě přihlášeného uživatele následujícím příkazem:

$ pip3 install --user cython Collecting cython Downloading https://files.pythonhosted.org/packages/16/98/49aa24054e99e9c7734e49d6996662f547e4e2faae0051d35fbbc461afa4/Cython-0.29.5-cp36-cp36m-manylinux1_x86_64.whl (2.1MB) 100% |████████████████████████████████| 2.1MB 603kB/s Installing collected packages: cython Successfully installed cython-0.29.5

Po (doufejme že úspěšné) instalaci Cythonu již můžeme bez problémů nainstalovat i knihovnu Qpid Proton, a to takto:

$ pip3 install --user python-qpid-proton Collecting python-qpid-proton Downloading https://files.pythonhosted.org/packages/fe/e3/58379a2262a31788cfa037bde2c0c77273a74f34943f73abf20030c71f17/python-qpid-proton-0.26.0.zip (563kB) 100% |████████████████████████████████| 573kB 1.1MB/s Installing collected packages: python-qpid-proton Running setup.py install for python-qpid-proton ... done Successfully installed python-qpid-proton-0.26.0

Poznámka: instalace proběhla pro Python 3, i když by knihovnu mělo být možné použít i společně s Pythonem 2.

Základní kontrolu instalace můžeme provést přímo z interaktivní smyčky jazyka Python. Použijeme interpret Pythonu 3, protože i instalace byla provedena pro Python 3:

$ python3 Python 3.6.3 (default, Oct 9 2017, 12:11:29) [GCC 7.2.1 20170915 (Red Hat 7.2.1-2)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import proton >>> help("proton")

Po potvrzení posledního příkazu by se měla zobrazit nápověda:

Help on package proton: NAME proton DESCRIPTION The proton module defines a suite of APIs that implement the AMQP 1.0 protocol. The proton APIs consist of the following classes: - L{Message} -- A class for creating and/or accessing AMQP message content. - L{Data} -- A class for creating and/or accessing arbitrary AMQP encoded data. PACKAGE CONTENTS

3. Producent zpráv publikovaných pod zvoleným tématem

Instalaci knihovny Qpid Proton jsme dokončili, takže si nyní můžeme ukázat způsob implementace jednoduchého producenta zpráv, které budou přes message brokera posílány s využitím strategie PUB-SUB – zprávy tedy nebudou ukládány do fronty, ale budou přímo přeposlány všem posluchačům, kteří jsou v daný okamžik k message brokerovi připojeny.

Ve výchozím nastavení systému Apache ActiveMQ je pro komunikaci s využitím protokolu AMQP používán port 5672, který je dokonce k tomuto účelu rezervován. Doménové jméno serveru tedy, společně s portem 5672, použijeme pro určení jeho adresy ve formátu, v jakém ho očekává knihovna Qpid:

ADDRESS = "localhost:5672"

nebo:

ADDRESS = "192.168.1.42:5672"

popř.:

ADDRESS = "foo.bar.baz:5672"

Určit musíme i cíl zprávy. Ten typicky začíná prefixem „topic://“ nebo „queue://“. V prvním příkladu použijeme téma (topic) nazvané „event“, takže se cíl zapíše následovně (vypadá trošku jako URL i s uvedením „protokolu“):

TARGET = "topic://event"

Samotný producent zprávy se inicializuje a následně spustí takto (ve vlastním kontejneru, což je ovšem termín knihovny Qpid):

publisher = Publisher(ADDRESS, TARGET) container = Container(publisher) container.run()

Samozřejmě nám zbývá implementovat vlastní logiku producenta, tj. musíme naprogramovat třídu nazvanou Publisher, která byla zmíněna v předchozích programových řádcích. Tato třída je odvozena od třídy MessagingHandler, v níž je předepsáno několik callback metod volaných samotnou knihovnou Qpid Proton na základě různých událostí (Event), které nastávají v průběhu života producenta zpráv. Prozatím nás budou zajímat čtyři typy událostí:

on_start – spuštění producenta v kontejneru on_accepted – akceptace poslané zprávy on_disconnected – odpojení od message brokera on_sendable – systém je připraven na poslání další zprávy

V nejjednodušším případě – což je výchozí stav – budou handlery prázdné, ovšem povšimněte si, že se každému handleru skutečně předává objekt, který reprezentuje vzniklou událost:

class Publisher(MessagingHandler): def __init__(self, url, target): pass def on_start(self, event): pass def on_sendable(self, event): pass def on_accepted(self, event): pass def on_disconnected(self, event): pass

Konkrétní implementace producenta zpráv samozřejmě bude složitější, což je patrné i při pohledu na jeho úplný zdrojový kód:

from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" TARGET = "topic://event" class Publisher(MessagingHandler): def __init__(self, url, target): super(Publisher, self).__init__() self.url = url self.target = target self.message_sent = False def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_sender(connection, self.target) def on_sendable(self, event): print("on_sendable()") if not self.message_sent: message = Message(id=1, body="Hello world1") event.sender.send(message) self.message_sent = True print("Message has been sent") else: print("Already sent... do nothing") def on_accepted(self, event): print("on_accepted()") if self.message_sent: event.connection.close() def on_disconnected(self, event): print("on_disconnected()") publisher = Publisher(ADDRESS, TARGET) container = Container(publisher) container.run()

Poznámka: důležité je počkat na skutečné poslání a akceptaci zprávy. Pokud producent skončí dřív, než byla zpráva akceptována, nemusí dojít k jejímu doručení.

4. Příjemce zpráv s nastaveným tématem

Příjemce zprávy se vytvoří velmi podobným způsobem, jako její producent:

subscriber = Subscriber(ADDRESS, SOURCE) container = Container(subscriber) container.run()

V nejjednodušším případě bude implementace třídy Subscriber odvozené od nám již známé třídy MessagingHandler obsahovat jen dvě callback metody:

class Subscriber(MessagingHandler): def __init__(self, url, source): pass def on_start(self, event): pass def on_message(self, event): pass

Ve chvíli, kdy je automaticky zavolána metoda on_start, se můžeme připojit k message brokerovi a přihlásit se k odebírání zpráv s nějakým tématem:

connection = event.container.connect(self.url) event.container.create_receiver(connection, self.source)

Samotný příjem zprávy je již snadný, protože pro každou zprávu, kterou z message brokera získáme, je zavolána callback metoda on_message. Zpráva je součástí parametru event, text zprávy pak v atributu body:

def on_message(self, event): message = event.message print("Received message '{m}'".format(m=message.body)) event.receiver.close() event.connection.close()

Poznámka: tento příjemce zprávy je velmi jednoduchý a akceptuje pouze jedinou zprávu. Proto ihned po jejím přijetí ukončí připojení.

Opět si ukažme úplný zdrojový kód tohoto příkladu:

from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" SOURCE = "topic://event" class Subscriber(MessagingHandler): def __init__(self, url, source): super(Subscriber, self).__init__() self.url = url self.source = source def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_receiver(connection, self.source) def on_message(self, event): message = event.message print("Received message '{m}'".format(m=message.body)) event.receiver.close() event.connection.close() subscriber = Subscriber(ADDRESS, SOURCE) container = Container(subscriber) container.run()

5. Producent zpráv posílaných do fronty

Pokud budeme chtít namísto komunikace typu PUB-SUB použít strategii PUSH-PULL budou změny provedené v producentovi i konzumentovi minimální, i když samotný transport zpráv bude naprosto odlišný (to je však záležitostí samotného message brokera, nikoli klientů, kteří se k němu připojují). Změní se pouze cíl, a to z:

TARGET = "topic://event"

na:

TARGET = "queue://test"

Poznámka: „test“ je v tomto případě jménem fronty, do níž se budou zprávy posílat.

Upravený zdrojový kód producenta zpráv posílaných do fronty „test“ bude vypadat následovně:

from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" TARGET = "queue://test" class Publisher(MessagingHandler): def __init__(self, url, target): super(Publisher, self).__init__() self.url = url self.target = target self.message_sent = False def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_sender(connection, self.target) def on_sendable(self, event): print("on_sendable()") if not self.message_sent: message = Message(id=1, body="Hello world1") event.sender.send(message) self.message_sent = True print("Message has been sent") else: print("Already sent... do nothing") def on_accepted(self, event): print("on_accepted()") if self.message_sent: event.connection.close() def on_disconnected(self, event): print("on_disconnected()") publisher = Publisher(ADDRESS, TARGET) container = Container(publisher) container.run()

6. Konzument zpráv z vybrané fronty

Kód konzumenta, který bude postupně vybírat zprávy z fronty nazvané „test“, nalezneme na adrese https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample02_publisher_subcri­ber_queue/subscriber.py:

from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" SOURCE = "queue://test" class Subscriber(MessagingHandler): def __init__(self, url, source): super(Subscriber, self).__init__() self.url = url self.source = source def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_receiver(connection, self.source) def on_message(self, event): message = event.message print("Received message '{m}'".format(m=message.body)) event.receiver.close() event.connection.close() subscriber = Subscriber(ADDRESS, SOURCE) container = Container(subscriber) container.run()

7. Logika implementace producenta většího množství zpráv

Nyní si ukažme, jak se implementuje producent většího množství zpráv. Kvůli tomu, že producent veškerou komunikaci řídí z callback metod, bude implementace nepatrně složitější, než tomu bylo při použití jiných protokolů. Nejdříve ve třídě Publisher v konstruktoru vytvoříme dva pomocné atributy s počtem poslaných zpráv a počtem zpráv akceptovaných:

self.message_sent = 0 self.message_accepted = 0

Dále budeme v callback metodě on_sendable postupně posílat běžné zprávy a nakonec i zprávu s textem „exit“, kterou budeme řídit konzumenta zpráv:

if self.message_sent < MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="Hello world #{n}".format(n=self.message_sent)) event.sender.send(message) elif self.message_sent == MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="exit") event.sender.send(message)

V callback metodě on_accepted pouze zajistíme, aby se připojení ukončilo, ale až po odeslání všech zpráv (ne dříve!):

self.message_accepted += 1 if self.message_accepted == self.message_sent: print("All messages confirmed") event.connection.close()

Upravený zdrojový kód producenta zpráv bude vypadat následovně:

from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" TARGET = "queue://test" MAX_MESSAGES = 10 class Publisher(MessagingHandler): def __init__(self, url, target): super(Publisher, self).__init__() self.url = url self.target = target self.message_sent = 0 self.message_accepted = 0 def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_sender(connection, self.target) def on_sendable(self, event): print("on_sendable()") if self.message_sent < MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="Hello world #{n}".format(n=self.message_sent)) event.sender.send(message) print("Message #{n} has been sent".format(n=self.message_sent)) elif self.message_sent == MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="exit") event.sender.send(message) print("Exit message") def on_accepted(self, event): print("on_accepted()") self.message_accepted += 1 if self.message_accepted == self.message_sent: print("All messages confirmed") event.connection.close() print("Number of accepted messages: {n}".format(n=self.message_accepted)) def on_disconnected(self, event): print("on_disconnected()") publisher = Publisher(ADDRESS, TARGET) container = Container(publisher) container.run()

8. Konzument zpráv ukončovaný explicitně poslanou zprávou EXIT

Samotný konzument zpráv se příliš nezmění, ovšem odlišná bude logika naprogramovaná v callback metodě on_message. V této metodě totiž spojení ukončíme pouze v tom případě, že byla přijata zpráva „exit“:

message = event.message print("Received message '{m}'".format(m=message.body)) if message.body == "exit": print("Last message for me...good bye") event.receiver.close() event.connection.close()

Samozřejmě si opět ukážeme celý zdrojový kód konzumenta:

from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" SOURCE = "queue://test" class Subscriber(MessagingHandler): def __init__(self, url, source): super(Subscriber, self).__init__() self.url = url self.source = source def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_receiver(connection, self.source) def on_message(self, event): message = event.message print("Received message '{m}'".format(m=message.body)) if message.body == "exit": print("Last message for me...good bye") event.receiver.close() event.connection.close() subscriber = Subscriber(ADDRESS, SOURCE) container = Container(subscriber) container.run()

9. Speciální témata vytvářená automaticky systémem AMQ

Pokud se podíváme na administrátorské rozhraní systému Apache ActiveMQ ve chvíli, kdy se používá komunikační protokol AMQP, zjistíme, že se v sekci „Topics“ automaticky vytvořila čtyři nová témata:

ActiveMQ.Advisory.Connection ActiveMQ.Advisory.Queue ActiveMQ.Advisory.Topic ActiveMQ.Advisory.MasterBroker

Pro každou používanou frontu se vytvoří ještě další dvojice témat, takže například pokud používáme frontu nazvanou „test“, uvidíme tato dvě témata:

ActiveMQ.Advisory.Consumer.Queue.test ActiveMQ.Advisory.Producer.Queue.test

Tato témata, resp. přesněji řečeno zprávy, které se zde publikují, je možné využít pro sledování aktuálního stavu celého message brokera. Zpracování těchto zpráv je jednoduché, pouze si musíme ukázat jejich formát.

10. Příprava běžného producenta a konzumenta zpráv

Nejprve si připravíme běžného producenta zpráv, který se nijak neliší od producentů předchozích:

from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" TARGET = "queue://test" MAX_MESSAGES = 100 class Publisher(MessagingHandler): def __init__(self, url, target): super(Publisher, self).__init__() self.url = url self.target = target self.message_sent = 0 self.message_accepted = 0 def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_sender(connection, self.target) def on_sendable(self, event): print("on_sendable()") if self.message_sent < MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="Hello world #{n}".format(n=self.message_sent)) event.sender.send(message) print("Message #{n} has been sent".format(n=self.message_sent)) elif self.message_sent == MAX_MESSAGES: self.message_sent += 1 message = Message(id=self.message_sent, body="exit") event.sender.send(message) print("Exit message") def on_accepted(self, event): print("on_accepted()") self.message_accepted += 1 if self.message_accepted == self.message_sent: print("All messages confirmed") event.connection.close() print("Number of accepted messages: {n}".format(n=self.message_accepted)) def on_disconnected(self, event): print("on_disconnected()") publisher = Publisher(ADDRESS, TARGET) container = Container(publisher) container.run()

Konzument zpráv je upravený nepatrně – každou zprávu bude zpracovávat minimálně jednu sekundu, aby bylo možné mezitím sledovat stav speciálních témat:

import time from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" SOURCE = "queue://test" class Subscriber(MessagingHandler): def __init__(self, url, source): super(Subscriber, self).__init__() self.url = url self.source = source def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) event.container.create_receiver(connection, self.source) def on_message(self, event): message = event.message print("Received message '{m}'".format(m=message.body)) if message.body == "exit": print("Last message for me...good bye") event.receiver.close() event.connection.close() time.sleep(1) subscriber = Subscriber(ADDRESS, SOURCE) container = Container(subscriber) container.run()

on_sendable a on_accepted musí být velmi rychlé. Poznámka: podobným způsobem producenta zpráv upravit nelze, protože reakce na události v callback metodáchmusí být velmi rychlé.

11. Jednoduchý nástroj pro sledování speciálních témat

Samotný nástroj, který bude speciální témata sledovat, se vlastně žádným zásadním způsobem neodlišuje od konzumenta zpráv. Pouze musíme zajistit, aby došlo k sledování všech zajímavých témat; nikdo nám samozřejmě nebrání sledovat více témat současně:

TOPICS = ( "ActiveMQ.Advisory.Connection", "ActiveMQ.Advisory.Consumer.Queue.test", "ActiveMQ.Advisory.MasterBroker", "ActiveMQ.Advisory.Producer.Queue.test", "ActiveMQ.Advisory.Queue", "ActiveMQ.Advisory.Topic" )

Přihlášení ke všem tématům je provedeno v callback metodě on_start:

connection = event.container.connect(self.url) for topic in TOPICS: event.container.create_receiver(connection, "topic://" + topic)

Následuje výpis zdrojového kódu tohoto nástroje:

from proton.handlers import MessagingHandler from proton.reactor import Container ADDRESS = "localhost:5672" TOPICS = ( "ActiveMQ.Advisory.Connection", "ActiveMQ.Advisory.Consumer.Queue.test", "ActiveMQ.Advisory.MasterBroker", "ActiveMQ.Advisory.Producer.Queue.test", "ActiveMQ.Advisory.Queue", "ActiveMQ.Advisory.Topic" ) class Subscriber(MessagingHandler): def __init__(self, url): super(Subscriber, self).__init__() self.url = url def on_start(self, event): print("on_start()") connection = event.container.connect(self.url) for topic in TOPICS: event.container.create_receiver(connection, "topic://" + topic) def on_message(self, event): message = event.message print(message) subscriber = Subscriber(ADDRESS) container = Container(subscriber) container.run()

12. Analýza výsledků práce nástroje

Po spuštění producenta zpráv, jejich konzumenta a současně i nástroje pro sledování speciálních témat by tento nástroj měl zobrazit tyto zprávy:

Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:374", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Producer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:375", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "producerCount"=1, "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Producer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:376", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "producerCount"=0, "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:377", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:378", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Consumer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:379", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "consumerCount"=1, "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Consumer.Queue.test", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:380", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "consumerCount"=0, "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}} Message{address="topic://ActiveMQ.Advisory.Connection", priority=0, id="ID:localhost.localdomain-37517-1550847645470-1:1:0:0:381", subject="Advisory", annotations={:"x-opt-jms-dest"=1}, properties={"originBrokerURL"="amqp://localhost.localdomain:5672", "originBrokerId"="ID:localhost.localdomain-37517-1550847645470-0:1", "originBrokerName"="localhost"}}

Povšimněte si, že zprávy neobsahují tělo, ale pouze předmět, identifikaci message brokera atd. Už samotné téma totiž podává mnohdy dostatečnou informaci o tom, k jaké události došlo:

ActiveMQ.Advisory.Connection připojení či odpojení klienta ActiveMQ.Advisory.Producer.Queue.test informace o připojení či odpojení producenta ActiveMQ.Advisory.Consumer.Queue.test informace o připojení či odpojení konzumenta

V posledních dvou případech se mění atribut „producerCount“ popř. „consumerCount“.

13. Použití message brokerů z programovacího jazyka Go s využitím protokolu STOMP

Ve druhé části dnešního článku se seznámíme s tím, jakým způsobem je možné použít message brokera Apache ActiveMQ z programovacího jazyka Go. Pro jednoduchost použijeme protokol STOMP (Streaming Text Oriented Messaging Protocol), který, jak již víme, sice nedokáže využít všechny možnosti AMQ, ovšem základní architekturu systému založeného na kombinaci AMQ+STOMP celkem bez problémů dokážeme postavit (ostatně největší problémy nás většinou čekají při návrhu subsystému pro persistenci zpráv popř. pro clusterování a konfiguraci load balancingu). Implementaci knihovny s podporou protokolu STOMP určenou pro jazyk Go nalezneme v GitHub repositáři https://github.com/go-stomp/stomp, takže samotná instalace bude jednoduchá. Nejdříve přejdeme do adresáře, na nějž ukazuje proměnná prostředí GOPATH; typicky se jedná o adresář ~/go. Zde zadáme následující příkaz:

$ go get github.com/go-stomp/stomp

V adresářové struktuře by se měl objevit podadresář src/github.com/go-stomp/stomp a taktéž pkg/linux_amd64/github.com/go-stomp/stomp s přibližně následujícím obsahem:

. ├── pkg │ └── linux_amd64 │ └── github.com │ └── go-stomp │ └── stomp.a ├── src │ ├── github.com │ │ └── go-stomp │ │ └── stomp │ │ ├── ack.go │ │ ├── breaking_changes.md │ │ ├── conn.go │ │ ├── conn_options.go │ │ ├── conn_test.go │ │ │ └── testutil.go ... ... ... ... ... ... │ │ ├── transaction.go │ │ ├── validator.go │ │ ├── version.go │ │ └── version_test.go

14. Připojení k message brokerovi, práce se zprávami

Použití balíčku go-stomp/stomp je poměrně snadné. Nejdříve si ukážeme způsob připojení k message brokerovi s využitím protokolu STOMP. První důležitou věcí je pochopitelně import balíčku:

import "github.com/go-stomp/stomp"

Připojení je realizováno funkcí Dial, které se předá způsob připojení (zde přes TCP) a adresa serveru s portem (pro STOMP „localhost:61613“). Ve třetím parametru je možné v případě potřeby přidat další údaje nutné pro přihlášení, ale pro nezabezpečenou variantu STOMPu zde můžeme použít jen nil. Samozřejmě je nutné zkontrolovat případné chyby, ke kterým může dojít:

conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) }

Dále zajistíme, že spojení bude na konci příslušné funkce automaticky ukončeno. K tomu slouží příkaz defer:

defer conn.Disconnect()

defer naleznete Poznámka: popis příkazunaleznete zde

Samotné poslání zprávy je již jednoduché – musíme jen specifikovat jméno fronty, MIME typ posílané zprávy (viz předchozí článek), samotnou zprávu reprezentovanou polem/řezem bajtů (konverzi je nutné provést explicitně) a případné další parametry (hlavičky atd.):

err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil)

Příjem zprávy je realizován odlišně a elegantně. Nejdříve se přihlásíme k nějaké frontě a zkontrolujeme, zda se přihlášení skutečně podařilo:

sub, err := conn.Subscribe(queueName, stomp.AckAuto) if err != nil { println("cannot subscribe to", queueName, err.Error()) return } close(subscribed)

První návratovou hodnotou funkce Subscribe je struktura obsahující mj. i kanál, z něhož můžeme provést (blokující) čtení, a to takto:

for { msg := <-sub.C text := string(msg.Body) ... ... ... }

Pokud přijde nová zpráva, bude výsledkem msg := ← sub.C nová zpráva uložená do proměnné msg, v opačném případě se bude čekat na příchod další zprávy. Případné další paralelně běžící gorutiny nejsou touto operací nijak omezeny.

select popsaná Poznámka: samozřejmě je možné, aby příjemce zpráv současně vykonával i další komunikaci. K tomu slouží mj. i konstrukcepopsaná v tomto článku

15. Implementace producenta zpráv (publishera)

Všechny potřebné informace o základním použití protokolu STOMP v jazyku Go již máme, takže se podívejme na způsob implementace producenta deseti běžných zpráv, za nimiž následuje zpráva „EXIT“, která ukončí případného konzumenta zpráv:

package main import ( "fmt" "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" const messageCount = 10 func sendMessages() { conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() for i := 1; i <= messageCount; i++ { text := fmt.Sprintf("Message #%d", i) err = conn.Send(queueName, "text/plain", []byte(text), nil) if err != nil { println("failed to send to server", err) return } else { println("message sent") } } println("sending EXIT message") err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil) if err != nil { println("failed to send EXIT message to server", err) return } else { println("message sent") } println("sender finished") } func main() { sendMessages() }

16. Implementace konzumenta zpráv

Naproti tomu konzument zpráv, který navíc dokáže zareagovat na zprávu s tělem „EXIT“, může vypadat následovně:

package main import ( "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" func receiveMessages() { conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() sub, err := conn.Subscribe(queueName, stomp.AckAuto) if err != nil { println("cannot subscribe to", queueName, err.Error()) return } for { msg := <-sub.C text := string(msg.Body) if text != "EXIT" { println("Received message", text) } else { println("Received EXIT command") break } } println("receiver finished") } func main() { receiveMessages() }

17. Úprava producenta a konzumenta tak, aby komunikace s message brokerem probíhala v gorutině

Příště si ukážeme některé další možnosti, které nám propojení Go+STOMP nabízí. Jen pro ukázku bude uveden producent zpráv, kde vlastní posílání zpráv probíhá ve vyhrazené gorutině, na jejíž ukončení čeká hlavní vlákno s využitím kanálu nazvaného stop:

package main import ( "fmt" "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" const messageCount = 10 var stop = make(chan bool) func sendMessages() { defer func() { stop <- true }() conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() for i := 1; i <= messageCount; i++ { text := fmt.Sprintf("Message #%d", i) err = conn.Send(queueName, "text/plain", []byte(text), nil) if err != nil { println("failed to send to server", err) return } else { println("message sent") } } println("sending EXIT message") err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil) if err != nil { println("failed to send EXIT message to server", err) return } else { println("message sent") } println("sender finished") } func main() { go sendMessages() <-stop }

Podobným způsobem můžeme vyřešit i konzumenta zpráv, který je upraven nepatrně odlišným způsobem:

package main import ( "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" var stop = make(chan bool) func receiveMessages(subscribed chan bool) { defer func() { stop <- true }() conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() sub, err := conn.Subscribe(queueName, stomp.AckAuto) if err != nil { println("cannot subscribe to", queueName, err.Error()) return } close(subscribed) for { msg := <-sub.C text := string(msg.Body) if text != "EXIT" { println("Received message", text) } else { println("Received EXIT command") break } } println("receiver finished") } func main() { subscribed := make(chan bool) go receiveMessages(subscribed) <-subscribed <-stop }

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu a taktéž v jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.

Příklad Skript/kód Popis Cesta 1 publisher.py producent jediné zprávy používající protokol AMQP https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample01_publisher_subcri­ber_topic/publisher.py 1 subscriber.py příjemce jediné zprávy používající protokol AMQP https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample01_publisher_subcri­ber_topic/subscriber.py 2 publisher.py producent používající strategii PUSH-PULL https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample02_publisher_subcri­ber_queue/publisher.py 2 subscriber.py konzument používající strategii PUSH-PULL https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample02_publisher_subcri­ber_queue/subscriber.py 3 publisher.py producent deseti běžných zpráv a zprávy „exit“ https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample03_publisher_subcri­ber_all_messages/publisher­.py 3 subscriber.py konzument zpráv ukončený zprávou „exit“ https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample03_publisher_subcri­ber_all_messages/subscriber­.py 4 publisher.py producent deseti běžných zpráv a zprávy „exit“ https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample04_special_events/pu­blisher.py 4 subscriber.py producent deseti běžných zpráv a zprávy „exit“ https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample04_special_events/sub­scriber.py 4 special_events.py nástroj pro sledování speciálních témat https://github.com/tisnik/message-queues-examples/blob/master/amqp/e­xample04_special_events/spe­cial_events.py 5 publisher.go producent zpráv naprogramovaný v Go https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull/publisher.go 5 subscriber.go konzument zpráv naprogramovaný v Go https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull/subscriber.go 6 publisher.go producent zpráv naprogramovaný v Go využívající gorutiny https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull-goroutines/publisher.go 6 subscriber.go konzument zpráv naprogramovaný v Go využívající gorutiny https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull-goroutines/subscriber.go

