Obsah
1. Využití zařízení v knihovně ØMQ při tvorbě systému se složitější architekturou
2. Role zařízení v knihovně ØMQ
3. Komunikační strategie REQ-REP bez použití zařízení typu „Queue“
4. Použití zařízení typu „Queue“
5. Klient a server, kteří se k frontě připojují
6. Vylepšení předchozího příkladu použitím dekorátorů
7. Klasická architektura PUB-SUB a její omezení
8. Připojení příjemce zpráv k většímu množství publisherů
9. Otestování chování aplikace se dvěma publishery a jedním příjemcem zpráv
10. Vylepšení architektury s využitím tzv. forwarderu
11. Připojení zdroje zpráv i jejich příjemce k forwarderu
12. Spuštění tří producentů zpráv a tří příjemců připojených na jediný forwarder
13. Praktická ukázka filtrace zpráv: tři producenti a čtyři příjemci zpráv
14. Výsledky činnosti předchozí aplikace
15. Repositář s demonstračními příklady
16. Odkazy na předchozí části seriálu
1. Využití zařízení v knihovně ØMQ při tvorbě systému se složitější architekturou
Ve třetím článku o knihovně ØMQ i o rozhraní PyZMQ určeném pro použití knihovny ØMQ z Pythonu si ukážeme, jakým způsobem je možné použít takzvaná zařízení (device). Tímto názvem se v ØMQ označují jednoduché služby zajišťující roli prostředníka (intermediary) mezi jednotlivými komunikujícími uzly implementovaného systému, umístěné například mezi klienta a server popř. mezi zdroji zpráv (publisher) a jejími příjemci (subscriber). Na tomto místě se asi můžete ptát, proč vlastně potřebujeme nějaké prostředníky mezi jednotlivými částmi aplikace? Není jednodušší přímo propojit klienty se serverem a zdroje zpráv přímo s příjemci? V případě, že je architektura navrhovaného systému složena jen z několika komunikujících uzlů a jejich vzájemná konfigurace je relativně neměnná, není (alespoň většinou) zapotřebí prostředníky používat. Typickým příkladem může být systém s jedním zdrojem zpráv, ke kterému se připojuje několik (předem neomezený počet) příjemců, popř. jednoduchá pipeline s jedním producentem a několika konzumenty založená na komunikační strategii PUSH-PULL.

Obrázek 1: Jednosměrná komunikace využívající strategii PUSH-PULL.
V předchozí části tohoto seriálu jsme si navíc řekli, že se komunikující uzly většinou v praxi rozlišují podle toho, zda se jedná o uzly (relativně) stabilní či o uzly, které se mohou dynamicky připojovat a odpojovat podle aktuálních potřeb aplikace. První skupina uzlů typicky používá připojení s využitím metody Socket.bind(), protože právě tyto uzly by měly používat předem známá čísla portů. Naopak druhá skupina uzlů vytváří druhý konec komunikačního kanálu s využitím metody Socket.connect(), protože se připojuje k již otevřenému a předem známému číslu portu. Nejtypičtějším příkladem je komunikační strategie REQ-REP, v níž server (popř. větší množství serverů) otevírá sockety na předem známých portech, na nichž očekává požadavky od klientů. Naopak klienti se k těmto portům připojují, což znamená, že klienti musí mít k dispozici konfiguraci se seznamem serverů, k nimž se mohou připojovat a při změně serverů je nutné o této skutečnosti klienty informovat. Toto již dříve popsané schéma je zobrazeno na následujícím diagramu:

Obrázek 2: Systém s uzly, které mezi sebou komunikují s využitím strategie REQ-REP.
2. Role zařízení v knihovně ØMQ
Poněkud méně jasná může být role jednotlivých komunikujících uzlů při použití strategie PUSH-PULL, zejména tehdy, pokud tuto strategii použijeme současně s rozesíláním zpráv mezi větší množství workerů (fan-out) a s akumulací výsledků workerů v jednom koncovém uzlu typu collector (fan-in). Prozatím jsme tento problém řešili takovým způsobem, že první uzel, který vytváří úkoly pro workery, vystupuje v roli serveru, který použije socket typu PUSH. Workeři otevírají dva sockety – jeden vstupní (PULL) pro získání parametrů úkolu a druhý výstupní (PUSH) pro poslání výsledků. Vzhledem k tomu, že workeři se typicky připojují a odpojují podle zátěže aplikace, vystupují v roli klienta. A konečně poslední uzel, který sbírá výsledky práce workerů, používá socket typu PULL a musí se jednat o server, aby se k němu mohli workeři připojovat. Opět se podívejme na to, jak vlastně celá architektura aplikace vypadá:

Obrázek 3: Architektura, ve které se používá rozesílání zpráv mezi větší množství workerů (fan-out) a následný sběr zpráv v collectoru.
V praxi se ovšem setkáme i se složitějšími situacemi. Představme si například škálovatelný systém s komunikační strategií typu REQ-REP, v němž budeme potřebovat přidávat a ubírat servery podle toho, jak se bude měnit zátěž aplikace. V takovém případě nastane problém – jakým způsobem vlastně informovat klienty, ke kterým portům se mají připojit? A je vůbec vhodné, aby se po přidání/ubrání serveru museli všichni klienti znovu konfigurovat? Tyto (velmi časté, ovšem někdy přehlížené) případy je možné řešit následujícím způsobem: vzhledem k tomu, že jak klienti, tak i servery jsou z hlediska architektury aplikace „pohyblivé“ (nestabilní) uzly, musíme mezi ně přidat nějakého prostředníka, který bude zprostředkovávat komunikaci – v tom nejjednodušším případě bude pouze přeposílat data mezi klienty a servery. Tím pádem bude právě tento prostředník stabilní částí, která bude sockety vytvářet metodou Socket.bind() a ostatní uzly – jak klienti, tak i servery – se budou k tomuto prostředníku připojovat pomocí Socket.connect().
Nic nám samozřejmě nebrání si takové prostředníky naprogramovat, ovšem je to zbytečně složité a hlavně není zapotřebí znovuobjevovat kolo, protože knihovna ØMQ některé typické implementace prostředníků již obsahuje. Říká se jim, jak již ostatně víme z úvodní kapitoly, zařízení:
Zařízení | Stručný popis |
---|---|
ZMQ.QUEUE | prostředník používaný především v komunikaci REQ-REP (klasický klient, server) |
ZMQ.FORWARDER | používá se jako prostředník mezi zdroji zpráv a jejich příjemci PUB-SUB |
ZMQ.STREAMER | používá se v komunikační strategii PUSH-PULL |
V dalších kapitolách si ukážeme, jak se jednotlivé typy zařízení mohou použít v praxi.
3. Komunikační strategie REQ-REP bez použití zařízení typu „Queue“
Nejprve si připomeňme, jakým způsobem jsme implementovali jednoduchého klienta a server, u nichž se pro vzájemnou komunikaci používala strategie REQ-REP. To znamená, že server naslouchá na předem známém portu na požadavky (request) klienta, na něž by měl odpovědět (response). Server se po svém spuštění navazuje na předem známý port a tudíž používá metodu Socket.bind(). Naproti tomu klient se k tomuto portu připojuje s využitím metody Socket.connect().
Implementace serveru využívajícího socket typu REP vypadá následovně:
import zmq from math import factorial def bind(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://*:{port}".format(port=port) socket.bind(address) print("Bound to address {a}".format(a=address)) return socket def send_response(socket, response): """Odeslání odpovědi.""" print("Sending response '{r}'".format(r=response)) socket.send_string(response) def receive_request(socket): """Zpracování požadavku klienta.""" request = socket.recv_string() print("Received request from client: '{r}'".format(r=request)) return request def start_server(): """Spuštění serveru.""" socket = bind(5556, zmq.REP) while True: request = receive_request(socket) try: n = int(request) fact = factorial(n) send_response(socket, "{n}! = {f}".format(n=n, f=fact)) except Exception as e: send_response(socket, "Wrong input") start_server()
Samozřejmě si ukážeme i implementaci klienta se socketem typu REQ:
import zmq def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_request(socket, request): """Poslání požadavku.""" print("Sending request '{r}'".format(r=request)) socket.send_string(request) def start_client(): """Spuštění klienta.""" socket = connect(5556, zmq.REQ) send_request(socket, "1") print(socket.recv_string()) print() send_request(socket, "10") print(socket.recv_string()) print() send_request(socket, "xyzzy") print(socket.recv_string()) print() start_client()
4. Použití zařízení typu „Queue“
Jak jsme si již řekli ve druhé kapitole, je možné a v mnoha případech i velmi žádoucí mezi klienty a server(y) vložit prostředníka, který je v knihovně ØMQ představován zařízením typu ZMQ.QUEUE. Celá konfigurace aplikace se změní, i když změny nejsou nijak zásadní:
- Server se již nebude navazovat na port pomocí metodySocket.bind(), ale použije běžné „klientské“ připojení Socket.connect(). V našem konkrétním případě se použije port 5555.
- Klient stále bude navazovat připojení přesSocket.connect(), ovšem na odlišné číslo portu. Pro jednoduchost použijeme port 5556, tedy port s číslem o jedničku vyšším, než je port, na který se připojuje server.
- A konečně – mezi klienty a serverem (servery) bude ležet zprostředkovatel implementovaný zařízením typu ZMQ.QUEUE. Tento zprostředkovatel bude navázán na oba výše zmíněné porty 5555 i 5556, protože se na něj z jedné strany budou připojovat servery a ze strany druhé klient (připomeňme si, že sockety v ØMQ umožňují oboustrannou komunikaci).
Zprostředkovatel na straně, která komunikuje se serverem, použije socket typu XREQ a na straně klientů socket typu XREP. Původní komunikační schéma REQ-REP se tedy změní na REQ-XREP/XREQ-REP.

Obrázek 4: Komunikační schéma REQ-XREP/XREQ-REP.
První implementace prostředníka může vypadat následovně:
import zmq XREP_PORT = 5556 XREQ_PORT = 5557 def create_queue(xrep_port, xreq_port): """Vytvoření fronty.""" context = zmq.Context() frontend = context.socket(zmq.XREP) address = "tcp://*:{port}".format(port=xrep_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xrep_port)) backend = context.socket(zmq.XREQ) address = "tcp://*:{port}".format(port=xreq_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xreq_port)) zmq.device(zmq.QUEUE, frontend, backend) create_queue(5556, 5557)
5. Klient a server, kteří se k frontě připojují
V implementaci klienta pošleme serveru (zprostředkovaně) několik požadavků na výpočet faktoriálu. Konkrétně budeme chtít, aby server vypočítal a vrátil faktoriál jedné, deseti, řetězce „xyzzy“ (což je samozřejmě chyba), –10 (taktéž chyba) a konečně faktoriál hodnoty 100. Odpověď serveru se přímo vypíše na standardní výstup (terminál):
import zmq PORT = 5556 def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_request(socket, request): """Poslání požadavku.""" print("Sending request '{r}'".format(r=request)) socket.send_string(request) def start_client(): """Spuštění klienta.""" socket = connect(PORT, zmq.REQ) send_request(socket, "1") print(socket.recv_string()) print() send_request(socket, "10") print(socket.recv_string()) print() send_request(socket, "xyzzy") print(socket.recv_string()) print() send_request(socket, "-10") print(socket.recv_string()) print() send_request(socket, "100") print(socket.recv_string()) print() start_client()
V implementaci serveru naproti tomu očekáváme požadavky na portu 5557 (nikoli 5556!) a pokud se jedná o celé číslo, pokusíme se vypočítat jeho faktoriál a vrátit výsledek zpět klientovi. Ve skutečnosti se výsledky opět budou vracet nepřímo přes zařízení ZMQ.QUEUE:
import zmq from math import factorial PORT = 5557 def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_response(socket, response): """Odeslání odpovědi.""" print("Sending response '{r}'".format(r=response)) socket.send_string(response) def receive_request(socket): """Zpracování požadavku klienta.""" request = socket.recv_string() print("Received request from client: '{r}'".format(r=request)) return request def start_server(): """Spuštění serveru.""" socket = connect(PORT, zmq.REP) while True: request = receive_request(socket) try: n = int(request) fact = factorial(n) send_response(socket, "{n}! = {f}".format(n=n, f=fact)) except Exception as e: send_response(socket, "Wrong input") start_server()
Podívejme se na chování celého systému ve chvíli, kdy spustíme zprostředkovatele, dále server a nakonec klienta (či několik klientů). Následují výpisy generované jednotlivými uzly na standardní výstup.
Zprávy vypsané zprostředkovatelem:
$ python3 queue.py Bound to tcp://*:5556 on port 5556 Bound to tcp://*:5557 on port 5557
Zprávy vypisované serverem:
$ python3 server.py Connected to tcp://localhost:5557 Received request from client: '1' Sending response '1! = 1' Received request from client: '10' Sending response '10! = 3628800' Received request from client: 'xyzzy' Sending response 'Wrong input' Received request from client: '-10' Sending response 'Wrong input' Received request from client: '100' Sending response '100! = 93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000'
Zprávy vypisované klientem:
$ python3 client.py Connected to tcp://localhost:5556 Sending request '1' 1! = 1 Sending request '10' 10! = 3628800 Sending request 'xyzzy' Wrong input Sending request '-10' Wrong input Sending request '100' 100! = 93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000
6. Vylepšení předchozího příkladu použitím dekorátorů
Předchozí příklad s implementací klienta, serveru a zprostředkovatele typu ZMQ.QUEUE ještě upravíme takovým způsobem, aby se v jednotlivých skriptech nemusel explicitně vytvářet (a rušit) kontext ØMQ popř. vytvořené sockety. Využijeme přitom dekorátory, s jejichž použitím jsme se seznámili minule.
Implementace vlastního zprostředkovatele:
import zmq from zmq.decorators import context, socket XREP_PORT = 5556 XREQ_PORT = 5557 @context() @socket(zmq.XREP) @socket(zmq.XREQ) def create_queue(xrep_port, xreq_port, context, frontend, backend): """Vytvoření fronty.""" context = zmq.Context() address = "tcp://*:{port}".format(port=xrep_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xrep_port)) address = "tcp://*:{port}".format(port=xreq_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xreq_port)) zmq.device(zmq.QUEUE, frontend, backend) create_queue(5556, 5557)
Implementace klienta může vypadat následovně:
import zmq from zmq.decorators import context, socket CONNECTION_TYPE = zmq.REQ PORT = 5556 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) def send_request(socket, request): """Poslání požadavku.""" print("Sending request '{r}'".format(r=request)) socket.send_string(request) @context() @socket(CONNECTION_TYPE) def start_client(port, context, socket): """Spuštění klienta.""" connect(socket, port) send_request(socket, "1") print(socket.recv_string()) print() send_request(socket, "10") print(socket.recv_string()) print() send_request(socket, "xyzzy") print(socket.recv_string()) print() send_request(socket, "-10") print(socket.recv_string()) print() send_request(socket, "100") print(socket.recv_string()) print() start_client(PORT)
Implementace serveru vypadá takto:
import zmq from zmq.decorators import context, socket from math import factorial CONNECTION_TYPE = zmq.REP PORT = 5557 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) def send_response(socket, response): """Odeslání odpovědi.""" print("Sending response '{r}'".format(r=response)) socket.send_string(response) def receive_request(socket): """Zpracování požadavku klienta.""" request = socket.recv_string() print("Received request from client: '{r}'".format(r=request)) return request @context() @socket(CONNECTION_TYPE) def start_server(port, context, socket): """Spuštění serveru.""" connect(socket, port) while True: request = receive_request(socket) try: n = int(request) fact = factorial(n) send_response(socket, "{n}! = {f}".format(n=n, f=fact)) except Exception as e: send_response(socket, "Wrong input") start_server(PORT)
7. Klasická architektura PUB-SUB a její omezení
V této části článku se budeme zabývat komunikační strategií PUB-SUB a role zprostředkovatele při použití této strategie. Nejprve si ukažme klasický příklad té nejjednodušší strategie PUB-SUB s jediným zdrojem zpráv a s jediným příjemcem.
Takto vypadá zdroj zpráv, který otevírá port 5556 a na tento port každou sekundu pošle zprávu bez ohledu na to, kdo a kdy (a zda vůbec) zprávu přijme:
import zmq from zmq.decorators import context, socket from time import sleep from os import getpid CONNECTION_TYPE = zmq.PUB PORT = 5556 def bind(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://*:{port}".format(port=port) socket.bind(address) print("Bound to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(port, context, socket): """Spuštění publisheru.""" pid = getpid() print("Publisher PID={pid}".format(pid=pid)) bind(socket, port) for i in range(100): message = "Message #{i} from {pid}".format(i=i, pid=pid) publish_message(socket, message) sleep(1) start_publisher(PORT)
Příjemce zpráv očekává zprávy na portu 5556. Filtrace je sice prováděna (což je u komunikační strategie PUB-SUB nutnost), ovšem filtr je nastaven na prázdný řetězec, takže ve skutečnosti jsou akceptovány všechny zprávy:
import zmq from zmq.decorators import context, socket CONNECTION_TYPE = zmq.SUB PORT = 5556 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) @context() @socket(CONNECTION_TYPE) def start_subscriber(port, context, socket): """Spuštění příjemce.""" connect(socket, port) socket.setsockopt_string(zmq.SUBSCRIBE, "") print("Waiting for messages...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) start_subscriber(PORT)
Následuje ukázka komunikace mezi zdrojem zpráv a jejich příjemcem:
$ python3 subscriber.py Connected to tcp://localhost:5556 Waiting for messages... Received message 'Message #1 from 4330' Received message 'Message #2 from 4330' Received message 'Message #3 from 4330' Received message 'Message #4 from 4330'
$ python3 publisher.py Publisher PID=4330 Bound to tcp://*:5556 Publishing message 'Message #0 from 4330' Publishing message 'Message #1 from 4330' Publishing message 'Message #2 from 4330' Publishing message 'Message #3 from 4330' Publishing message 'Message #4 from 4330' ... ... ...
8. Připojení příjemce zpráv k většímu množství publisherů
Ve skutečnosti je možné nejjednodušší architekturu z předchozí kapitoly s jedním zdrojem zpráv a jedním příjemcem rozšířit, a to jak o více příjemců (což je triviální), tak i o větší množství zdrojů zpráv. Ukažme si tuto druhou možnost, která je z implementačního hlediska mnohem zajímavější.
Nejprve bude uveden zdrojový kód prvního publisheru, který bude posílat zprávy na port číslo 5556. Mezi jednotlivými zprávami bude pomlka o trvání přibližně jedné sekundy:
import zmq from zmq.decorators import context, socket from time import sleep from os import getpid CONNECTION_TYPE = zmq.PUB PORT = 5556 def bind(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://*:{port}".format(port=port) socket.bind(address) print("Bound to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(port, context, socket): """Spuštění publisheru.""" pid = getpid() print("Publisher PID={pid}".format(pid=pid)) bind(socket, port) for i in range(100): message = "Message #{i} from {pid}".format(i=i, pid=pid) publish_message(socket, message) sleep(1) start_publisher(PORT)
Druhý publisher se do značné míry podobá publisheru prvnímu, ovšem zprávy se v tomto případě posílají na port 5557 a nikoli na port 5556. Zprávy se taktéž posílají rychleji, přibližně každých 300 milisekund:
import zmq from zmq.decorators import context, socket from time import sleep from os import getpid CONNECTION_TYPE = zmq.PUB PORT = 5557 def bind(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://*:{port}".format(port=port) socket.bind(address) print("Bound to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(port, context, socket): """Spuštění publisheru.""" pid = getpid() print("Publisher PID={pid}".format(pid=pid)) bind(socket, port) for i in range(100): message = "Message #{i} from {pid}".format(i=i, pid=pid) publish_message(socket, message) sleep(0.3) start_publisher(PORT)
Nejzajímavější je příjemce zpráv, který nyní musí zprávy očekávat na dvou portech 5556 a současně i 5557. Ve skutečnosti je ovšem implementace příjemce stále velmi jednoduchá, a to z toho důvodu, že zobecněné sockety v knihovně ØMQ umožňují současné připojení k většímu množství portů:
ports = (5556, 5557) for port in ports: connect(socket, port)
Úplný zdrojový kód tohoto příjemce zpráv vypadá následovně:
import zmq from zmq.decorators import context, socket CONNECTION_TYPE = zmq.SUB PORT1 = 5556 PORT2 = 5557 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) @context() @socket(CONNECTION_TYPE) def start_subscriber(ports, context, socket): """Spuštění příjemce.""" for port in ports: connect(socket, port) socket.setsockopt_string(zmq.SUBSCRIBE, "") print("Waiting for messages...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) start_subscriber((PORT1, PORT2))
9. Otestování chování aplikace se dvěma publishery a jedním příjemcem zpráv
Ukažme si nyní, jakým způsobem vlastně bude příjemce získávat a vypisovat zprávy vytvářené dvěma producenty.
Spustíme prvního producenta zpráv:
$ python3 publisher1.py Publisher PID=4396 Bound to tcp://*:5556 Publishing message 'Message #0 from 4396' Publishing message 'Message #1 from 4396' Publishing message 'Message #2 from 4396' Publishing message 'Message #3 from 4396' Publishing message 'Message #4 from 4396' Publishing message 'Message #5 from 4396' ... ... ...
Ihned poté spustíme druhého producenta zpráv:
$ python3 publisher2.py Publisher PID=4399 Bound to tcp://*:5557 Publishing message 'Message #0 from 4399' Publishing message 'Message #1 from 4399' Publishing message 'Message #2 from 4399' Publishing message 'Message #3 from 4399' Publishing message 'Message #4 from 4399' Publishing message 'Message #5 from 4399' Publishing message 'Message #6 from 4399' Publishing message 'Message #7 from 4399' Publishing message 'Message #8 from 4399' Publishing message 'Message #9 from 4399' ... ... ...
Pokud nyní spustíme konzumenta (příjemce) zpráv, měl by vypisovat zprávy, které postupně přicházejí od obou producentů:
$ python3 subscriber.py Connected to tcp://localhost:5556 Connected to tcp://localhost:5557 Waiting for messages... Received message 'Message #1 from 4396' Received message 'Message #2 from 4396' Received message 'Message #3 from 4396' Received message 'Message #4 from 4396' Received message 'Message #1 from 4399' Received message 'Message #2 from 4399' Received message 'Message #3 from 4399' Received message 'Message #5 from 4396' Received message 'Message #4 from 4399' Received message 'Message #5 from 4399' Received message 'Message #6 from 4399' Received message 'Message #7 from 4399' Received message 'Message #6 from 4396' Received message 'Message #8 from 4399' Received message 'Message #9 from 4399' ... ... ...
10. Vylepšení architektury s využitím tzv. forwarderu
Dalším typem zařízení, s nímž se v dnešním článku seznámíme, je zařízení typu forwarder (ZMQ.FORWARDER). Toto zařízení se používá ve chvíli, kdy jednotlivé uzlu komunikují se strategií PUB-SUB, tj. pokud máme uzly produkující zprávy (producers) a uzly zprávy přijímající (consumers). Připomeňme si, že producenti zpráv nijak nekontrolují kdo a kdy zprávy vlastně přijme – dokonce se může stát (a je to zcela legální a vlastně i očekávané chování), že se některé zprávy ztratí. Vzhledem k tomu, že komunikační strategie typu PUB-SUB je velmi jednoduchá, nepoužívá potvrzování atd., je i použití zařízení ZMQ.FORWARDER přímočaré, což je patrné z následujícího schématu:

Obrázek 5: Zařízení typu ZMQ.FORWARDER.
Vidíme, že forwarder přijímá zprávy socketem typu SUB a posílá je socketem typu PUB, takže se původní komunikační strategie PUB-SUB změní na PUB-SUB/PUB-SUB. Nesmíme ovšem zapomenout na to, že zprávy je možné filtrovat, což platí i pro forwarder. Pro jednoduchost si naimplementujeme forwarder, který přeposílá všechny zprávy, tj. jeho filtr bude prázdný:
import zmq from zmq.decorators import context, socket SUB_PORT = 5556 PUB_PORT = 5557 @context() @socket(zmq.SUB) @socket(zmq.PUB) def create_queue(sub_port, pub_port, context, frontend, backend): """Vytvoření forwarderu.""" context = zmq.Context() address = "tcp://*:{port}".format(port=sub_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=sub_port)) frontend.setsockopt_string(zmq.SUBSCRIBE, "") address = "tcp://*:{port}".format(port=pub_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=pub_port)) zmq.device(zmq.FORWARDER, frontend, backend) create_queue(SUB_PORT, PUB_PORT)
11. Připojení zdroje zpráv i jejich příjemce k forwarderu
Samotného producenta zpráv bude nutné upravit pouze nepatrně – změníme metodu připojení ze Socket.bind() a na Socket.connect(). Výsledná podoba producenta (zdroje zpráv) bude vypadat takto:
import zmq from zmq.decorators import context, socket from time import sleep from os import getpid CONNECTION_TYPE = zmq.PUB PORT = 5556 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(port, context, socket): """Spuštění publisheru.""" pid = getpid() print("Publisher PID={pid}".format(pid=pid)) connect(socket, port) for i in range(100): message = "Message #{i} from {pid}".format(i=i, pid=pid) publish_message(socket, message) sleep(1) start_publisher(PORT)
Konzument, tj. příjemce zpráv, se prakticky nezmění, pouze nesmíme zapomenout na to, že se nebude připojovat přímo k producentovi na portu 5556, ale k forwarderu na portu 5557. Opět se podívejme na jeho zdrojový kód:
import zmq from zmq.decorators import context, socket CONNECTION_TYPE = zmq.SUB PORT = 5557 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) @context() @socket(CONNECTION_TYPE) def start_subscriber(port, context, socket): """Spuštění příjemce.""" connect(socket, port) socket.setsockopt_string(zmq.SUBSCRIBE, "") print("Waiting for messages...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) start_subscriber(PORT)
12. Spuštění tří producentů zpráv a tří příjemců připojených na jediný forwarder
Nyní si ukažme, jak bude systém reagovat ve chvíli, kdy spustíme tři producenty zpráv a současně i tři příjemce. Mezi producenty a příjemcem bude vložen forwarder popsaný v předchozích kapitolách. Nejprve nepatrně upravíme producenta zpráv takovým způsobem, abychom mohli specifikovat jeho jméno a taktéž interval mezi zprávami na příkazovém řádku:
import zmq from zmq.decorators import context, socket from time import sleep from sys import argv, exit CONNECTION_TYPE = zmq.PUB PORT = 5556 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(name, delay, port, context, socket): """Spuštění publisheru.""" print("Publisher '{name}'".format(name=name)) connect(socket, port) for i in range(100): message = "Message #{i} from {name}".format(i=i, name=name) publish_message(socket, message) sleep(delay) if len(argv) <= 2: print('Please provide publisher name and sleep amount on the CLI') exit(1) name = argv[1] delay = float(argv[2]) start_publisher(name, delay, PORT)
Celý test spustíme následujícím skriptem:
export PYTHONUNBUFFERED=yes python3 forwarder.py > forwarder.out & forwarder_pid=$! python3 publisher.py Publisher1 1 > publisher1.out 2>&1 & publisher1_pid=$! python3 publisher.py Publisher2 0.5 > publisher2.out 2>&1 & publisher2_pid=$! python3 publisher.py Publisher3 0.2 > publisher3.out 2>&1 & publisher3_pid=$! python3 subscriber.py > subscriber1.out & subscriber1_pid=$! python3 subscriber.py > subscriber2.out & subscriber2_pid=$! python3 subscriber.py > subscriber3.out & subscriber3_pid=$! sleep 20 kill $forwarder_pid kill $publisher1_pid kill $publisher2_pid kill $publisher3_pid kill $subscriber1_pid kill $subscriber2_pid kill $subscriber3_pid
Ukázka výstupu z prvního příjemce zpráv:
Connected to tcp://localhost:5557 Waiting for messages... Received message 'Message #1 from Publisher3' Received message 'Message #2 from Publisher3' Received message 'Message #1 from Publisher2' Received message 'Message #3 from Publisher3' Received message 'Message #4 from Publisher3' Received message 'Message #1 from Publisher1' Received message 'Message #5 from Publisher3' Received message 'Message #2 from Publisher2' Received message 'Message #6 from Publisher3' Received message 'Message #7 from Publisher3' Received message 'Message #3 from Publisher2' Received message 'Message #8 from Publisher3' Received message 'Message #9 from Publisher3' Received message 'Message #2 from Publisher1'
13. Praktická ukázka filtrace zpráv: tři producenti a čtyři příjemci zpráv
Předchozí příklad si ještě upravíme, a to takovým způsobem, aby se nepatrně pozměnil formát zpráv – ty nyní budou začínat jménem zdroje zpráv a navíc konzumenti zpráv budou obsahovat filtr konfigurovatelný z příkazové řádky. Samotný forwarder je stále stejný:
import zmq from zmq.decorators import context, socket SUB_PORT = 5556 PUB_PORT = 5557 @context() @socket(zmq.SUB) @socket(zmq.PUB) def create_queue(sub_port, pub_port, context, frontend, backend): """Vytvoření forwarderu.""" context = zmq.Context() address = "tcp://*:{port}".format(port=sub_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=sub_port)) frontend.setsockopt_string(zmq.SUBSCRIBE, "") address = "tcp://*:{port}".format(port=pub_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=pub_port)) zmq.device(zmq.FORWARDER, frontend, backend) create_queue(SUB_PORT, PUB_PORT)
Zdroj zpráv bude na začátek zprávy připisovat svoje jméno nastavitelné na příkazové řádce:
import zmq from zmq.decorators import context, socket from time import sleep from sys import argv, exit CONNECTION_TYPE = zmq.PUB PORT = 5556 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(name, delay, port, context, socket): """Spuštění publisheru.""" print("Publisher '{name}'".format(name=name)) connect(socket, port) for i in range(100): message = "{name}: Message #{i}".format(name=name, i=i) publish_message(socket, message) sleep(delay) if len(argv) <= 2: print('Please provide publisher name and sleep amount on the CLI') exit(1) name = argv[1] delay = float(argv[2]) start_publisher(name, delay, PORT)
Naproti tomu příjemce zpráv obsahuje konfigurovatelný filtr, opět specifikovatelný z příkazové řádky:
import zmq from zmq.decorators import context, socket from sys import argv, exit CONNECTION_TYPE = zmq.SUB PORT = 5557 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) @context() @socket(CONNECTION_TYPE) def start_subscriber(filter, port, context, socket): """Spuštění příjemce.""" connect(socket, port) socket.setsockopt_string(zmq.SUBSCRIBE, filter) print("Waiting for messages...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) if len(argv) <= 1: print('Please provide filter on the CLI') exit(1) start_subscriber(argv[1], PORT)
14. Výsledky činnosti předchozí aplikace
Pro otestování předchozí aplikace (resp. systému) použijeme upravený skript, ve kterém se při spuštění publisherů specifikují jejich jména a u jednotlivých příjemců zpráv pak filtry (v podstatě prefixy řetězců zpráv):
export PYTHONUNBUFFERED=yes python3 forwarder.py > forwarder.out & forwarder_pid=$! python3 publisher.py Publisher1 1 > publisher1.out 2>&1 & publisher1_pid=$! python3 publisher.py Publisher2 0.5 > publisher2.out 2>&1 & publisher2_pid=$! python3 publisher.py Publisher3 0.2 > publisher3.out 2>&1 & publisher3_pid=$! python3 subscriber.py "" > subscriber1.out & subscriber1_pid=$! python3 subscriber.py "Publisher1" > subscriber2.out & subscriber2_pid=$! python3 subscriber.py "Publisher2" > subscriber3.out & subscriber3_pid=$! python3 subscriber.py "PublisherX" > subscriber4.out & subscriber4_pid=$! sleep 20 kill $forwarder_pid kill $publisher1_pid kill $publisher2_pid kill $publisher3_pid kill $subscriber1_pid kill $subscriber2_pid kill $subscriber3_pid kill $subscriber4_pid
Výsledky pro prvního příjemce zpráv s prázdným filtrem:
Connected to tcp://localhost:5557 Waiting for messages... Received message 'Publisher3: Message #1' Received message 'Publisher3: Message #2' Received message 'Publisher2: Message #1' Received message 'Publisher3: Message #3' Received message 'Publisher3: Message #4' Received message 'Publisher3: Message #5' Received message 'Publisher1: Message #1' Received message 'Publisher2: Message #2' Received message 'Publisher3: Message #6' ... ... ...
Výsledky pro druhého příjemce zpráv s filtrem nastaveným na „Publisher1“:
Connected to tcp://localhost:5557 Waiting for messages... Received message 'Publisher1: Message #1' Received message 'Publisher1: Message #2' Received message 'Publisher1: Message #3' ... ... ...
Výsledky pro třetího příjemce zpráv s filtrem nastaveným na „Publisher2“:
Connected to tcp://localhost:5557 Waiting for messages... Received message 'Publisher2: Message #1' Received message 'Publisher2: Message #2' Received message 'Publisher2: Message #3' ... ... ...
Výsledky pro čtvrtého příjemce zpráv s filtrem nastaveným na „PublisherX“:
Connected to tcp://localhost:5557 Waiting for messages...
15. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu a taktéž v programovacím jazyku C byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.
16. Odkazy na předchozí části seriálu
V této kapitole jsou uvedeny odkazy na všech sedm předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:
- Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ - Celery: systém implementující asynchronní fronty úloh pro Python
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/ - Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/ - RabbitMQ: jedna z nejúspěšnějších implementací brokera
https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/ - Pokročilejší operace nabízené systémem RabbitMQ
https://www.root.cz/clanky/pokrocilejsi-operace-nabizene-systemem-rabbitmq/ - ØMQ: knihovna pro asynchronní předávání zpráv
https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/ - Další možnosti poskytované knihovnou ØMQ
https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/
17. Odkazy na Internetu
- ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - How IPython and Jupyter Notebook work
https://jupyter.readthedocs.io/en/latest/architecture/how_jupyter_ipython_work.html - Context Managers
http://book.pythontips.com/en/latest/context_managers.html