Obsah
1. Další možnosti poskytované knihovnou ØMQ
2. Problematika uzavírání prostředků: sockety a kontext ØMQ
3. Implementace serveru a klienta bez explicitního uzavírání prostředků
4. Explicitní kontrola chyb a uzavírání prostředků v blocích finally
5. Využití bloků with (správci kontextu v Pythonu)
6. Upravená implementace serveru a klienta
8. Třetí verze serveru a klienta
9. Současné využití obou dekorátorů context a socket
10. Posílání složitějších datových struktur ve formátu JSON
11. Serializace a deserializace objektů
12. Model (komunikační strategie) PUSH-PULL
13. Producent a konzument používající strategii PUSH-PULL
14. Rozdělení zpráv mezi větší množství konzumentů ve strategii PUSH-PULL
15. Architektura producent-workeři-collector (fan-out a fan-in)
16. Implementace architektury producent-workeři-collector
17. Chování architektury s fan-out a fan-in
18. Repositář s demonstračními příklady
19. Odkazy na předchozí části seriálu
1. Další možnosti poskytované knihovnou ØMQ
Ve druhém článku o knihovně ØMQ navážeme na část předchozí, v níž jsme se seznámili se základními koncepty, na nichž je knihovna ØMQ postavena. Připomeňme si pouze, že se jedná o nativní knihovnu vyvinutou v programovacím jazyku C++, pro níž však vznikla rozhraní umožňující její použití z mnoha dalších programovacích jazyků, a to včetně rozhraní pro jazyk C, Javu (celou JVM, tj. včetně JRuby, Jythonu, Scaly, Groovy, Clojure apod.), Python atd. Při práci s touto knihovnou se využívají především dva typy objektů. Jedná se o takzvaný kontext (context) a socket. Kontext si zjednodušeně řečeno můžeme představit jako kontejner (řekněme seznam – i když je to velmi nepřesné) s jednotlivými sockety, které klient/server otevřel a používá. Typicky se vytváří pouze jediný kontext pro celou aplikaci, a to nezávisle na tom, zda se jedná o jednoduchého jednovláknového klienta či o server, který komunikaci po každém socketu obsluhuje v jiném vláknu (kontext se tedy typicky vytváří ještě před spuštěním jednotlivých vláken).
Samotné sockety sice svým jménem připomínají klasické Berkeley/BSD sockety, ve skutečnosti se ale v kontextu knihovny ØMQ jedná o objekt reprezentující logickou „zásuvku“, nikoli konkrétní „zásuvku“ síťovou. Jaký je mezi oběma koncepty rozdíl? U ØMQ socketů se musí již při jejich konstrukci specifikovat typ (či možná lépe řečeno strategie) komunikace, samotná obsluha přenosu dat je řešena ve vlastním vláknu (do toho běžný programátor nezasahuje) a navíc je k socketu většinou připojena i fronta (queue) umožňující posílání zpráv i ve chvíli, kdy druhá strana připojení není připravena na příjem dat. U socketů v podání ØMQ je navíc možné specifikovat větší množství konkrétních přípojných bodů, například více portů protokolu TCP, na nichž server očekává dotazy od klientů. Příkladem může být následující implementace velmi jednoduchého serveru, který současně naslouchá na TCP portech 5556 a 5557:
import zmq import time def bind(port1, port2, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address1 = "tcp://*:{port}".format(port=port1) socket.bind(address1) print("Bound to address {a}".format(a=address1)) address2 = "tcp://*:{port}".format(port=port2) socket.bind(address2) print("Bound to address {a}".format(a=address2)) return socket def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) def start_server(): """Spuštění serveru.""" socket = bind(5556, 5557, zmq.PAIR) for i in range(10): send_message(socket, "Message #{i}".format(i=i)) time.sleep(1) start_server()
Komunikace s využitím ØMQ socketů může probíhat s využitím různých technologií. Typicky se používá protokol TCP (Transmission Control Protocol), ovšem setkat se můžeme i s použitím TIPC (Transparent Inter-process Communication), IPC nebo i se sockety používanými „pouze“ pro komunikaci v rámci jediné aplikace – a to konkrétně pro přenos dat mezi jednotlivými vlákny. Pokud totiž pro tuto činnost využijeme možnosti nabízené ØMQ, přiblížíme se technologii gorutin a kanálů představených v programovacím jazyku Go, a to navíc bez toho, aby docházelo k uváznutím a podobným nepříjemnostem, které se typicky projevují až při produkčním nasazení.
Ještě si připomeňme, že knihovna ØMQ podporuje čtyři základní komunikační strategie:
- PAIR – jednosměrné či obousměrné propojení dvou procesů, z nichž každý může běžet na odlišném počítači. Tato strategie se nejvíce přibližuje běžnému použití klasických Berkeley socketů a setkali jsme se s ní minule. Jak klient, tak i server použijí při konstrukci socketu stejný typ: zmq.PAIR.
- REQ-REP – jedná se o komunikaci typu požadavek-odpověď. Požadavky posílají klienti, odpovědi generuje server, který dokáže obsloužit prakticky libovolné množství klientů. I s touto komunikační strategií jsme se setkali v předchozím článku. Klient používá při konstrukci socketu typ zmq.REQ, server pak typ zmq.REP.
- PUB-SUB – server zde publikuje zprávy, k jejichž odběru se mohou přihlásit různí klienti. Zprávy je možné filtrovat na straně klientů (tato vlastnost se ovšem ve starších verzích ØMQ odlišuje). Základní způsob použití této komunikační strategie jsme si popsali minule. Jedná se o jednosměrnou komunikaci, při níž může dojít ke ztrátě zpráv v případě, že se odběratel nepřipojí či pokud se odpojí ve chvíli, kdy je publikována další zpráva. Server publikující zprávy používá socket typu zmq.PUB, všichni odběratelé pak zmq.SUB.
- PUSH-PULL – rozšíření předchozí strategie PUB-SUB: server či servery vytváří zprávy zpracovávané buď přímo připojenými workery nebo celou kolonou (pipeline) workerů. Nedochází zde ke ztrátě dat, protože se na obou koncích komunikačního kanálu používají fronty. Navíc je možné vytvořit „pipeline“ složenou z většího množství uzlů, z nichž každý přijímá data na jednom portu (v případě TCP) a posílá zpracovaná data na jiný port. Přijímající konec komunikačního kanálu využívá socket typu zmq.PULL, posílající/vysílající konec pak socket typu zmq.PUSH.
Obrázek 1: Jednosměrná komunikace využívající strategii PAIR-PAIR.
Obrázek 2: Obousměrná komunikace využívající strategii PAIR-PAIR.
Obrázek 3: Obousměrná komunikace využívající strategii REQ-REP.
Obrázek 4: Jednosměrná komunikace využívající strategii PUB-SUB.
2. Problematika uzavírání prostředků: sockety a kontext ØMQ
Ve většině až doposud ukázaných příkladů jsme se příliš nezabývali tím, v jakém okamžiku vlastně budou uzavřeny sockety (a tím pádem i inicializovaná připojení) a kdy se ukončí platnost celého kontextu ØMQ. U zmíněných demonstračních příkladů, které jsme chtěli mít co nejkratší a nejsrozumitelnější, se počítá s tím, že se připojení uzavře (odpojí) automaticky ve chvíli, kdy budou objekty typu socket uvolňovány z paměti automatickým správcem paměti (garbage collector), protože v této chvíli by se měla zavolat speciální metoda __del__ (takzvaný finalizér). Podobně je tomu u kontextu, který je taktéž ukončen před finalizér zavolaný správcem paměti.
V produkčním kódu je však většinou mnohem lepší nespoléhat se na správce paměti (ostatně jeho sémantika při volání konstruktorů se už minimálně jednou změnila a obecně nemusí být zaručeno, že se finalizér vůbec stihne zavolat) a raději explicitně ukončit připojení ve chvíli, kdy už není zapotřebí. Tím se automaticky uvolní i příslušný TCP port, pokud pro připojení používáme TCP. Prakticky totéž platí o samotném kontextu ØMQ.
Abychom si otestovali, v jakém okamžiku (a zda vůbec) se prostředky/objekty typu context a socket skutečně uzavírají, budeme muset nepatrně modifikovat dva soubory knihovny PyZMQ. Pokud jste dodrželi postup instalace ØMQ a PyZMQ zmíněný v předchozím článku, měly by být tyto soubory uloženy v adresáři ~/.local/lib/python3.podverze/zmq/sugar/. Samotné úpravy jsou ve skutečnosti triviální a týkají se přidání volání funkce print() do metod se specifickými názvy __del__ a __exit__.
Patch pro úpravu souboru context.py:
--- context.py 2019-01-18 17:36:03.000000000 +0100 +++ ~/.local/lib/python3.6/site-packages/zmq/sugar/context.py 2019-01-18 18:00:23.726179780 +0100 @@ -42,6 +42,7 @@ def __del__(self): """deleting a Context should terminate it, without trying non-threadsafe destroy""" + print("Context.__del__") if not self._shadow and not _exiting: self.term() @@ -49,6 +50,7 @@ return self def __exit__(self, *args, **kwargs): + print("Context.__exit__") self.term() def __copy__(self, memo=None):
Patch pro úpravu souboru socket.py:
--- socket.py 2019-01-18 17:36:03.000000000 +0100 +++ ~/.local/lib/python3.6/site-packages/zmq/sugar/socket.py 2019-01-18 18:01:56.788181903 +0100 @@ -63,6 +63,7 @@ self._shadow = False def __del__(self): + print("Socket.__del__") if not self._shadow: self.close() @@ -75,6 +76,7 @@ return self def __exit__(self, *args, **kwargs): + print("Socket.__exit__") self.close() #-------------------------------------------------------------------------
Po těchto úpravách by se na standardní výstup klienta, serveru, workera atd. měly mj. vypisovat i zprávy o to, že se bude uzavírat socket, popř. celý kontext ØMQ.
3. Implementace serveru a klienta bez explicitního uzavírání prostředků
V dnešním prvním demonstračním příkladu se vrátíme k použití komunikační strategie PAIR, v níž klient i server vystupují ve stejné roli – mohou používat otevřený socket k jednosměrné či k obousměrné komunikaci. Server se od klienta odlišuje jen v tom, že spojení otevírá metodou socket.bind(), kterou se naváže na předem známý TCP port, zatímco klient používá metodu socket.connect() a připojuje se k portu otevřeném serverem. V příkladu nebude použito žádné explicitní uzavření socketů ani kontextu ØMQ.
Nejprve si ukažme implementaci serveru:
import zmq import time CONNECTION_TYPE = zmq.PAIR PORT = 5556 def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) def start_server(): """Spuštění serveru.""" context = zmq.Context() socket = context.socket(CONNECTION_TYPE) address = "tcp://*:{port}".format(port=PORT) socket.bind(address) print("Bound to address {a}".format(a=address)) for i in range(10): send_message(socket, "Message #{i}".format(i=i)) time.sleep(1) start_server()
Následuje implementace klienta:
import zmq CONNECTION_TYPE = zmq.PAIR PORT = 5556 def start_client(): """Spuštění klienta.""" context = zmq.Context() socket = context.socket(CONNECTION_TYPE) address = "tcp://localhost:{port}".format(port=PORT) socket.connect(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) start_client()
Po spuštění serveru se otevře port 5556, na němž server očekává připojení klienta. Po připojení odešle deset zpráv a ihned poté se automaticky ukončí:
$ python3 server.py Bound to address tcp://*:5556 Sending message 'Message #0' Sending message 'Message #1' Sending message 'Message #2' Sending message 'Message #3' Sending message 'Message #4' Sending message 'Message #5' Sending message 'Message #6' Sending message 'Message #7' Sending message 'Message #8' Sending message 'Message #9' Socket.__del__ Context.__del__
Klient odebírá zprávy v nekonečné smyčce, takže ho musíme ukončit násilně:
$ python3 client.py Connected to tcp://localhost:5556 Waiting for message... Received message 'Message #0' Received message 'Message #1' Received message 'Message #2' Received message 'Message #3' Received message 'Message #4' Received message 'Message #5' Received message 'Message #6' Received message 'Message #7' Received message 'Message #8' Received message 'Message #9' ^CTraceback (most recent call last): File "client.py", line 23, in <module> start_client() File "client.py", line 19, in start_client message = socket.recv_string() File "/home/tester/.local/lib/python3.6/site-packages/zmq/sugar/socket.py", line 586, in recv_string msg = self.recv(flags=flags) File "zmq/backend/cython/socket.pyx", line 788, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 824, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc KeyboardInterrupt Socket.__del__ Context.__del__
4. Explicitní kontrola chyb a uzavírání prostředků v blocích finally
Dnešní druhý demonstrační příklad bude mnohem delší, než příklad první, i když v něm implementujeme naprosto stejný algoritmus komunikace s využitím strategie PAIR. Tentokrát se ovšem kontroluje většina chyb, k nimž může dojít při vytváření kontextu, socketu, při posílání zpráv atd. Současně se explicitně provádí uzavírání socketů metodou socket.close() a kontextu metodou socket.term().
Nejprve si opět ukažme implementaci serveru:
import zmq import time CONNECTION_TYPE = zmq.PAIR PORT = 5556 def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) def start_server(): """Spuštění serveru.""" context = zmq.Context() try: socket = context.socket(CONNECTION_TYPE) address = "tcp://*:{port}".format(port=PORT) socket.bind(address) try: print("Bound to address {a}".format(a=address)) for i in range(10): send_message(socket, "Message #{i}".format(i=i)) time.sleep(1) except Exception as e: print(e) finally: print("Closing socket") socket.close() print("Closed") except Exception as e: print(e) finally: print("Terminating context") context.term() print("Terminated") start_server()
Implementace klienta vypadá následovně:
import zmq CONNECTION_TYPE = zmq.PAIR PORT = 5556 def start_client(): """Spuštění klienta.""" context = zmq.Context() try: socket = context.socket(CONNECTION_TYPE) address = "tcp://localhost:{port}".format(port=PORT) socket.connect(address) try: print("Connected to {a}".format(a=address)) print("Waiting for message...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) except Exception as e: print(e) finally: print("Closing socket") socket.close() print("Closed") except Exception as e: print(e) finally: print("Terminating context") context.term() print("Terminated") start_client()
Z výpisu běhu serveru je patrné, že se nyní skutečně socket i kontext explicitně uvolní a teprve poté se pro oba objekty zavolá finalizér, který ovšem již další činnosti nebude provádět:
$ python3 server.py Bound to address tcp://*:5556 Sending message 'Message #0' Sending message 'Message #1' Sending message 'Message #2' Sending message 'Message #3' Sending message 'Message #4' Sending message 'Message #5' Sending message 'Message #6' Sending message 'Message #7' Sending message 'Message #8' Sending message 'Message #9' Closing socket Closed Terminating context Terminated Socket.__del__ Context.__del__
Prakticky totéž chování uvidíme i po spuštění klienta, samozřejmě s tím, rozdílem, že klienta ukončíme ručně klávesovou zkratkou Ctrl+C:
$ python3 client.py Connected to tcp://localhost:5556 Waiting for message... Received message 'Message #0' Received message 'Message #1' Received message 'Message #2' Received message 'Message #3' Received message 'Message #4' Received message 'Message #5' Received message 'Message #6' Received message 'Message #7' Received message 'Message #8' Received message 'Message #9' ^CClosing socket Closed Terminating context Terminated Traceback (most recent call last): File "client.py", line 37, in <module> start_client() File "client.py", line 21, in start_client message = socket.recv_string() File "/home/tester/.local/lib/python3.6/site-packages/zmq/sugar/socket.py", line 586, in recv_string msg = self.recv(flags=flags) File "zmq/backend/cython/socket.pyx", line 788, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 824, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc KeyboardInterrupt Socket.__del__ Context.__del__
5. Využití bloků with (správci kontextu v Pythonu)
Knihovna PyZMQ zajišťující rozhraní mezi vývojáři používajícími programovací jazyk Python a nativní knihovnou ØMQ je navržena takovým stylem, aby podporovala většinu idiomů známých v komunitě Pythonu. Jedním z těchto idiomů je používání správců kontextu a bloků with pro ty prostředky, které se mají automaticky uzavírat po odchodu z bloku with (a to jakýmkoli způsobem, včetně výskoku z funkce atd.):
with open('hello.txt', 'w') as fout: fout.write('Hi there!')
V praxi to znamená, že jak třída Context, tak i třída Socket implementují obě metody __enter__ a __exit__ nezbytné pro to, aby se jednalo o korektně naprogramované správce kontextu (context manager). Je tedy možné zapsat například tento kód:
with zmq.Context() as context: with context.socket(CONNECTION_TYPE) as socket: address = "tcp://*:{port}".format(port=PORT) socket.bind(address) ... ... ...
Po opuštění vnitřního bloku with se zavře socket, po opuštění venkovního bloku pak dojde k ukončení celého kontextu ØMQ.
6. Upravená implementace serveru a klienta
Podívejme se nyní na způsob, jakým je možné správce kontextu (v praxi bloky with) použít při úpravě serveru a i klienta. Vlastní kód bude jednodušší, než v předchozím příkladu, protože nemusíme zapisovat konstrukci try-except-finally jen proto, aby bylo zaručeno uvolnění nějakého prostředku.
Upravená implementace serveru:
import zmq import time CONNECTION_TYPE = zmq.PAIR PORT = 5556 def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) def start_server(): """Spuštění serveru.""" with zmq.Context() as context: with context.socket(CONNECTION_TYPE) as socket: address = "tcp://*:{port}".format(port=PORT) socket.bind(address) print("Bound to address {a}".format(a=address)) for i in range(10): send_message(socket, "Message #{i}".format(i=i)) time.sleep(1) start_server()
Upravená implementace klienta:
import zmq CONNECTION_TYPE = zmq.PAIR PORT = 5556 def start_client(): """Spuštění klienta.""" with zmq.Context() as context: with context.socket(CONNECTION_TYPE) as socket: address = "tcp://localhost:{port}".format(port=PORT) socket.connect(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) start_client()
Samozřejmě si pro úplnost ukážeme chování serveru i klienta po jejich spuštění:
$ python3 server.py Bound to address tcp://*:5556 Sending message 'Message #0' Sending message 'Message #1' Sending message 'Message #2' Sending message 'Message #3' Sending message 'Message #4' Sending message 'Message #5' Sending message 'Message #6' Sending message 'Message #7' Sending message 'Message #8' Sending message 'Message #9' Socket.__exit__ Context.__exit__ Socket.__del__ Context.__del__
$ python3 client.py Connected to tcp://localhost:5556 Waiting for message... Received message 'Message #0' Received message 'Message #1' Received message 'Message #2' Received message 'Message #3' Received message 'Message #4' Received message 'Message #5' Received message 'Message #6' Received message 'Message #7' Received message 'Message #8' Received message 'Message #9' ^CSocket.__exit__ Context.__exit__ Traceback (most recent call last): File "client.py", line 23, in <module> start_client() File "client.py", line 19, in start_client message = socket.recv_string() File "/home/tester/.local/lib/python3.6/site-packages/zmq/sugar/socket.py", line 586, in recv_string msg = self.recv(flags=flags) File "zmq/backend/cython/socket.pyx", line 788, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 824, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc KeyboardInterrupt Socket.__del__ Context.__del__
7. Použití dekorátorů
Kromě explicitní práce se správci kontextu (tj. v praxi použitím bloku with) máme v PyZMQ ještě jednu možnost automatické či poloautomatické správy objektů typu Context a Socket. Můžeme totiž použít takzvané dekorátory a zcela se vyhnout explicitní konstrukci zmíněných objektů a tím pádem i jejich explicitnímu zavírání. Podívejme se, jak se tyto dekorátory použijí v praxi.
Nejprve je nutné naimportovat příslušné dekorátory, v prvním případě pouze dekorátor pro kontext ØMQ:
from zmq.decorators import context
Následně můžeme upravit funkci pro start serveru (či klienta) takovým způsobem, že před ni zapíšeme příslušný dekorátor. Ten zajistí automatické vytvoření kontextu, který se předá do volané funkce jako její první parametr. Hlavičku funkce tedy budeme muset upravit následovně:
@context() def start_server(context): """Spuštění serveru.""" ... ... ...
Ovšem volání bude provedeno bez zmíněného parametru (ten je z pohledu programátora vytvořen a předán automaticky):
start_server()
Podobně můžeme stejnou funkci upravit ještě lépe – použijeme dekorátor jak pro kontext, tak i pro socket. Povšimněte si pořadí zápisu dekorátorů a parametrů:
CONNECTION_TYPE = zmq.PAIR @context() @socket(CONNECTION_TYPE) def start_server(context, socket): """Spuštění serveru.""" address = "tcp://*:{port}".format(port=PORT) socket.bind(address) ... ... ...
Volání této funkce v programovém kódu bude stále stejné:
start_server()
8. Třetí verze serveru a klienta
Ve třetí (předposlední) variantě původního demonstračního příkladu použijeme dekorátor, ovšem pouze pro práci s kontextem ØMQ. Nejprve si opět ukážeme zdrojový kód serveru:
import zmq from zmq.decorators import context import time CONNECTION_TYPE = zmq.PAIR PORT = 5556 def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) @context() def start_server(context): """Spuštění serveru.""" with context.socket(CONNECTION_TYPE) as socket: address = "tcp://*:{port}".format(port=PORT) socket.bind(address) print("Bound to address {a}".format(a=address)) for i in range(10): send_message(socket, "Message #{i}".format(i=i)) time.sleep(1) start_server()
Nový zdrojový kód klienta bude vypadat takto:
import zmq from zmq.decorators import context CONNECTION_TYPE = zmq.PAIR PORT = 5556 @context() def start_client(context): """Spuštění klienta.""" with context.socket(CONNECTION_TYPE) as socket: address = "tcp://localhost:{port}".format(port=PORT) socket.connect(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) start_client()
Chování serveru a klienta po jejich spuštění se bude podobat předchozímu příkladu:
$ python3 server.py Bound to address tcp://*:5556 Sending message 'Message #0' Sending message 'Message #1' Sending message 'Message #2' Sending message 'Message #3' Sending message 'Message #4' Sending message 'Message #5' Sending message 'Message #6' Sending message 'Message #7' Sending message 'Message #8' Sending message 'Message #9' Socket.__exit__ Socket.__del__ Context.__exit__ Context.__del__
$ python3 client.py Connected to tcp://localhost:5556 Waiting for message... Received message 'Message #0' Received message 'Message #1' Received message 'Message #2' Received message 'Message #3' Received message 'Message #4' Received message 'Message #5' Received message 'Message #6' Received message 'Message #7' Received message 'Message #8' Received message 'Message #9' ^CSocket.__exit__ Context.__exit__ Traceback (most recent call last): File "client.py", line 24, in <module> start_client() File "/home/tester/.local/lib/python3.6/site-packages/zmq/decorators.py", line 75, in wrapper return func(*args, **kwargs) File "client.py", line 20, in start_client message = socket.recv_string() File "/home/tester/.local/lib/python3.6/site-packages/zmq/sugar/socket.py", line 586, in recv_string msg = self.recv(flags=flags) File "zmq/backend/cython/socket.pyx", line 788, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 824, in zmq.backend.cython.socket.Socket.recv File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc KeyboardInterrupt Socket.__del__ Context.__del__
9. Současné využití obou dekorátorů context a socket
Jen v krátkosti se podívejme na způsob využití obou dekorátorů importovaných z balíčku zmq.decorators, tj. jak dekorátoru pro objekt představující kontext, tak i pro objekt představující socket.
Nová implementace serveru:
import zmq from zmq.decorators import context, socket import time CONNECTION_TYPE = zmq.PAIR PORT = 5556 def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_server(context, socket): """Spuštění serveru.""" address = "tcp://*:{port}".format(port=PORT) socket.bind(address) print("Bound to address {a}".format(a=address)) for i in range(10): send_message(socket, "Message #{i}".format(i=i)) time.sleep(1) start_server()
Nová implementace klienta:
import zmq from zmq.decorators import context, socket CONNECTION_TYPE = zmq.PAIR PORT = 5556 @context() @socket(CONNECTION_TYPE) def start_client(context, socket): """Spuštění klienta.""" address = "tcp://localhost:{port}".format(port=PORT) socket.connect(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) start_client()
10. Posílání složitějších datových struktur ve formátu JSON
Již v předchozím článku jsme se zmínili o tom, že v knihovně PyZMQ nalezneme metody, které nám umožní posílat kromě obecných sekvencí bajtů a řetězců taktéž složitější datové struktury. Ty mohou být buď převedeny do formátu JSON, popř. je možné vzít strukturu reprezentovanou pythoním objektem a poslat (a na druhé straně samozřejmě přijmout a zpracovat) serializovanou variantu tohoto objektu.
Nejprve se podívejme na způsob posílání dat ve formátu JSON. Ve skutečnosti je použití JSONu velmi jednoduché, protože můžeme použít metody Socket.send_json() a Socket.recv_json(). V mnoha případech si tak ušetříme nutnost parsování textových zpráv.
Server, který bude posílat složitější datové struktury ve formátu JSON, bude implementován následujícím způsobem:
import zmq from zmq.decorators import context, socket import time from datetime import datetime CONNECTION_TYPE = zmq.PAIR PORT = 5556 def send_json_message(socket, message, i, timestamp): """Poslání zprávy.""" print("Sending message #{i}'".format(i=i)) payload = { "message": message, "number": i, "timestamp": str(timestamp) } socket.send_json(payload) @context() @socket(CONNECTION_TYPE) def start_server(context, socket): """Spuštění serveru.""" address = "tcp://*:{port}".format(port=PORT) socket.bind(address) print("Bound to address {a}".format(a=address)) for i in range(10): send_json_message(socket, "Message", i, datetime.now()) time.sleep(1) start_server()
Klient zprávu přijme a deserializuje ji do běžného slovníku, popř. seznamu (obecně se jedná o rekurzivní datovou strukturu):
import zmq from zmq.decorators import context, socket CONNECTION_TYPE = zmq.PAIR PORT = 5556 @context() @socket(CONNECTION_TYPE) def start_client(context, socket): """Spuštění klienta.""" address = "tcp://localhost:{port}".format(port=PORT) socket.connect(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") while True: payload = socket.recv_json() print("Received message #{i} with timestamp {t}: '{m}'".format( i=payload["number"], t=payload["timestamp"], m=payload["message"])) start_client()
Klient by měl po svém spuštění přijmout deset zpráv a zobrazovat o nich podrobnější informace, zejména jejich časové razítko, pořadí a vlastní text zprávy:
$ python3 client.py Connected to tcp://localhost:5556 Waiting for message... Received message #0 with timestamp 2019-01-21 17:14:27.687456: 'Message' Received message #1 with timestamp 2019-01-21 17:14:28.774764: 'Message' Received message #2 with timestamp 2019-01-21 17:14:29.776133: 'Message' Received message #3 with timestamp 2019-01-21 17:14:30.777448: 'Message' Received message #4 with timestamp 2019-01-21 17:14:31.778886: 'Message' Received message #5 with timestamp 2019-01-21 17:14:32.780256: 'Message' Received message #6 with timestamp 2019-01-21 17:14:33.781626: 'Message' Received message #7 with timestamp 2019-01-21 17:14:34.783037: 'Message' Received message #8 with timestamp 2019-01-21 17:14:35.784384: 'Message' Received message #9 with timestamp 2019-01-21 17:14:36.785755: 'Message'
11. Serializace a deserializace objektů
V případě, že nám nebude dostačovat použití slovníků, popř. seznamů posílaných ve formátu JSON, můžeme využít další užitečnou vlastnost knihovny PyZMQ. Tou je možnost serializace a deserializace pythonních objektů. Pro vlastní serializaci se používá knihovna pickle, ovšem z pohledu běžného programátora je její volání zcela transparentní. Programátor pouze potřebuje zavolat metodu Socket.send_pyobj() a na druhé straně komunikačního kanálu metodu Socket.recv_pyobj().
Posílat budeme instance této třídy:
from datetime import datetime class Message(): def __init__(self, number): self.number = number self.timestamp = str(datetime.now()) self.message = "Message"
Implementace serveru:
import zmq from zmq.decorators import context, socket import time from message import Message CONNECTION_TYPE = zmq.PAIR PORT = 5556 def send_serialized_object(socket, obj, i): """Poslání zprávy.""" print("Sending message #{i}'".format(i=i)) socket.send_pyobj(obj) @context() @socket(CONNECTION_TYPE) def start_server(context, socket): """Spuštění serveru.""" address = "tcp://*:{port}".format(port=PORT) socket.bind(address) print("Bound to address {a}".format(a=address)) for i in range(10): m = Message(i) send_serialized_object(socket, m, i) time.sleep(1) start_server()
Implementace klienta:
import zmq from zmq.decorators import context, socket from message import Message CONNECTION_TYPE = zmq.PAIR PORT = 5556 @context() @socket(CONNECTION_TYPE) def start_client(context, socket): """Spuštění klienta.""" address = "tcp://localhost:{port}".format(port=PORT) socket.connect(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") while True: payload = socket.recv_pyobj() print("Received message #{i} with timestamp {t}: '{m}'".format( i=payload.number, t=payload.timestamp, m=payload.message)) start_client()
12. Model (komunikační strategie) PUSH-PULL
V dalším textu se budeme zabývat způsoby využití komunikační strategie PUSH-PULL. Komunikace je v tomto případě jednosměrná, a to od serveru (ten se zde nazývá producent, producer) ke klientovi (konzument, consumer). Oproti podobně koncipované strategii PUB-SUB jsou však zprávy doručovány odlišným způsobem: vždy jen jednomu vybranému konzumentu (rozdělení zhruba round-robin) a navíc s tím, že pokud žádný konzument není v danou chvíli připravený na zpracování zprávy, je zpráva uložena do interně spravované fronty. Tato komunikační strategie se tedy přibližuje možnostem klasických message brokerů, ovšem jak uvidíme v další části, je zde stále co vylepšovat.
Obrázek 5: Jednosměrná komunikace využívající strategii PUSH-PUL.
Obrázek 6: Použití většího množství konzumentů při použití strategie PUSH-PUL.
13. Producent a konzument používající strategii PUSH-PULL
Podívejme se nejprve na implementaci producenta zpráv. Ten musí používat sockety typu zmq.PUSH a po svém spuštění vytvoří sto zpráv se zajištěním jejich přijetí nějakým konzumentem:
import zmq from zmq.decorators import context, socket import time CONNECTION_TYPE = zmq.PUSH PORT = 5556 def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_producer(context, socket): """Spuštění serveru.""" address = "tcp://*:{port}".format(port=PORT) # socket.set_hwm(1) socket.bind(address) print("Bound to address {a}".format(a=address)) for i in range(100): send_message(socket, "Message #{i}".format(i=i)) time.sleep(0.1) start_producer()
Implementace konzumenta zpráv, který musí využívat sockety typu zmq.PULL:
import zmq from zmq.decorators import context, socket import time CONNECTION_TYPE = zmq.PULL PORT = 5556 @context() @socket(CONNECTION_TYPE) def start_consumer(context, socket): """Spuštění konzumenta.""" address = "tcp://localhost:{port}".format(port=PORT) socket.connect(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") cnt = 0 while True: message = socket.recv_string() cnt += 1 print("Received message {c} of 100: '{m}'".format(c=cnt, m=message)) time.sleep(0) start_consumer()
14. Rozdělení zpráv mezi větší množství konzumentů ve strategii PUSH-PULL
Nyní si spustíme dva konzumenty a jediného producenta, aby bylo patrné, jak se budou zprávy přeposílat algoritmem round-robin:
$ python3 consumer.py Connected to tcp://localhost:5556 Waiting for message... Received message 1 of 100: 'Message #2' Received message 2 of 100: 'Message #4' ... ... ... Received message 49 of 100: 'Message #98'
$ python3 consumer.py Connected to tcp://localhost:5556 Waiting for message... Received message 1 of 100: 'Message #0' Received message 2 of 100: 'Message #1' ... ... ... Received message 51 of 100: 'Message #99'
$ python3 producent.py Bound to address tcp://*:5556 Sending message 'Message #0' Sending message 'Message #1' ... ... ...
15. Architektura producent-workeři-collector (fan-out a fan-in)
Předchozí demonstrační příklad ukazoval použití jednoho producenta zpráv a několika konzumentů (nebo též workerů, podle toho, o jakou aplikaci se jedná). V praxi ovšem mnohdy s takovou architekturou nevystačíme a budeme muset přidat ještě jeden typ uzlu, který se nazývá collector (shromažďovač). Typicky bývá producent i collector spuštěn pouze jedenkrát, workerů bývá větší množství. Zajímavé bude zjistit, který typ uzlu by měl být serverem s pevně nastavenými porty. Bývá dobrým zvykem, aby serverem byly ty uzly, které jsou nejvíce stabilní, což je v tomto případě producent i collector. Co to znamená z hlediska implementace: producent i collector budou při vytváření připojení používat metodu Socket.bind() zatímco workeři, které je možné připojovat a odpojovat podle potřeb aplikace, budou používat metodu Socket.connect. Dále je nutné použít dvě čísla portů – jeden pro komunikaci producent → workeři, druhý pro komunikaci workeři → collector.
Při pohledu na schéma této architektury je zřejmé, proč se první (horní) část nazývá fanout (též fan-out) a druhá fanin (fan-in, i když osobně bych spíš použit funnel):
Obrázek 7: Architektura producent-workeři-collector (fan-out a fan-in).
16. Implementace architektury producent-workeři-collector
Implementace producenta se vlastně nijak zásadně neliší od předchozího příkladu. Pouze si povšimněte, že producent používá socket typu zmq.PUSH a otevírá připojení na portu 5556:
import zmq from zmq.decorators import context, socket from time import sleep CONNECTION_TYPE = zmq.PUSH PORT = 5556 def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_producer(context, socket): """Spuštění serveru.""" address = "tcp://*:{port}".format(port=PORT) # socket.set_hwm(1) socket.bind(address) print("Bound to address {a}".format(a=address)) for i in range(101): send_message(socket, "Message #{i}".format(i=i)) sleep(0.1) start_producer()
Nejsložitější je konzument neboli worker. Ten totiž musí pracovat se dvěma sockety různých typů. První socket je připojen na port 5556, je typu zmq.PULL a slouží pro příjem zprávy od producenta. Druhý socket je připojen na port 5557, je typu zmq.PUSH a používá se pro poslání výsledku do collectoru:
import zmq from zmq.decorators import context, socket from os import getpid IN_PORT = 5556 OUT_PORT = 5557 @context() @socket(zmq.PULL) @socket(zmq.PUSH) def start_consumer(context, in_socket, out_socket): """Spuštění konzumenta.""" address = "tcp://localhost:{port}".format(port=IN_PORT) in_socket.connect(address) print("Connected to {a}".format(a=address)) address = "tcp://localhost:{port}".format(port=OUT_PORT) out_socket.connect(address) print("And to {a}".format(a=address)) print("Waiting for message...") pid = getpid() while True: message = in_socket.recv_string() out_message = "Message from {pid}: '{m}'".format(pid=pid, m=message) print(out_message) out_socket.send_string(out_message) start_consumer()
A konečně se na konci celé „pipeline“ nachází collector. Ten používá socket typu zmq.PULL, který se otevírá na portu 5557:
import zmq from zmq.decorators import context, socket from time import sleep CONNECTION_TYPE = zmq.PULL PORT = 5557 @context() @socket(CONNECTION_TYPE) def start_collector(context, socket): """Spuštění sběratele.""" address = "tcp://*:{port}".format(port=PORT) socket.bind(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") while True: message = socket.recv_string() print("Collecting message: '{m}'".format(m=message)) sleep(0) start_collector()
17. Chování architektury s fan-out a fan-in
Chování výše implementovaného příkladu si ukážeme na této konfiguraci:
- Jeden producent
- Dva konzumenti/workeři (simulace rozdělení zátěže)
- Jeden collector (shromažďovač)
Producent:
$ python3 producer.py Bound to address tcp://*:5556 Sending message 'Message #0'
Worker #1:
$ python3 consumer.py Connected to tcp://localhost:5556 And to tcp://localhost:5557 Waiting for message... Message from 8050: 'Message #6' Message from 8050: 'Message #8' ... ... ...
Worker #2:
$ python3 consumer.py Connected to tcp://localhost:5556 And to tcp://localhost:5557 Waiting for message... Message from 8047: 'Message #0' Message from 8047: 'Message #1' ... ... ...
Collector:
$ python3 collector.py Connected to tcp://*:5557 Waiting for message... Collecting message: 'Message from 8047: 'Message #0'' ... ... ... Collecting message: 'Message from 8050: 'Message #6'' ... ... ...
Můžeme vidět, že collector skutečně postupně přijme zprávy ode všech workerů.
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu a taktéž v programovacím jazyku C byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.
Pro úplnost ještě přidejme odkazy na dva patche, kterými je možné upravit knihovnu PyZMQ takovým způsobem, aby bylo možné snadno sledovat uzavírání prostředků context a socket:
19. Odkazy na předchozí části seriálu
V této kapitole jsou uvedeny odkazy na všech šest předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:
- Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ - Celery: systém implementující asynchronní fronty úloh pro Python
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/ - Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/ - RabbitMQ: jedna z nejúspěšnějších implementací brokera
https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/ - Pokročilejší operace nabízené systémem RabbitMQ
https://www.root.cz/clanky/pokrocilejsi-operace-nabizene-systemem-rabbitmq/ - ØMQ: knihovna pro asynchronní předávání zpráv
https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/
20. Odkazy na Internetu
- ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - How IPython and Jupyter Notebook work
https://jupyter.readthedocs.io/en/latest/architecture/how_jupyter_ipython_work.html - Context Managers
http://book.pythontips.com/en/latest/context_managers.html