Hlavní navigace

Využití zařízení v knihovně ØMQ při tvorbě systému se složitější architekturou

Pavel Tišnovský

V další části seriálu o knihovně ØMQ a rozhraní PyZMQ se budeme zabývat využitím takzvaných zařízení (device), která se poměrně často používají při implementaci systémů se složitější architekturou a s větším počtem uzlů.

Doba čtení: 38 minut

11. Připojení zdroje zpráv i jejich příjemce k forwarderu

12. Spuštění tří producentů zpráv a tří příjemců připojených na jediný forwarder

13. Praktická ukázka filtrace zpráv: tři producenti a čtyři příjemci zpráv

14. Výsledky činnosti předchozí aplikace

15. Repositář s demonstračními příklady

16. Odkazy na předchozí části seriálu

17. Odkazy na Internetu

1. Využití zařízení v knihovně ØMQ při tvorbě systému se složitější architekturou

Ve třetím článku o knihovně ØMQ i o rozhraní PyZMQ určeném pro použití knihovny ØMQ z Pythonu si ukážeme, jakým způsobem je možné použít takzvaná zařízení (device). Tímto názvem se v ØMQ označují jednoduché služby zajišťující roli prostředníka (intermediary) mezi jednotlivými komunikujícími uzly implementovaného systému, umístěné například mezi klienta a server popř. mezi zdroji zpráv (publisher) a jejími příjemci (subscriber). Na tomto místě se asi můžete ptát, proč vlastně potřebujeme nějaké prostředníky mezi jednotlivými částmi aplikace? Není jednodušší přímo propojit klienty se serverem a zdroje zpráv přímo s příjemci? V případě, že je architektura navrhovaného systému složena jen z několika komunikujících uzlů a jejich vzájemná konfigurace je relativně neměnná, není (alespoň většinou) zapotřebí prostředníky používat. Typickým příkladem může být systém s jedním zdrojem zpráv, ke kterému se připojuje několik (předem neomezený počet) příjemců, popř. jednoduchá pipeline s jedním producentem a několika konzumenty založená na komunikační strategii PUSH-PULL.

Obrázek 1: Jednosměrná komunikace využívající strategii PUSH-PULL.

V předchozí části tohoto seriálu jsme si navíc řekli, že se komunikující uzly většinou v praxi rozlišují podle toho, zda se jedná o uzly (relativně) stabilní či o uzly, které se mohou dynamicky připojovat a odpojovat podle aktuálních potřeb aplikace. První skupina uzlů typicky používá připojení s využitím metody Socket.bind(), protože právě tyto uzly by měly používat předem známá čísla portů. Naopak druhá skupina uzlů vytváří druhý konec komunikačního kanálu s využitím metody Socket.connect(), protože se připojuje k již otevřenému a předem známému číslu portu. Nejtypičtějším příkladem je komunikační strategie REQ-REP, v níž server (popř. větší množství serverů) otevírá sockety na předem známých portech, na nichž očekává požadavky od klientů. Naopak klienti se k těmto portům připojují, což znamená, že klienti musí mít k dispozici konfiguraci se seznamem serverů, k nimž se mohou připojovat a při změně serverů je nutné o této skutečnosti klienty informovat. Toto již dříve popsané schéma je zobrazeno na následujícím diagramu:

Obrázek 2: Systém s uzly, které mezi sebou komunikují s využitím strategie REQ-REP.

Poznámka: v tomto článku, podobně jako v celém seriálu, se setkáme s relativně velkým množstvím anglických pojmů, které nemají svůj ustálený český ekvivalent. Do závěrečné části seriálu je naplánován stručný výkladový slovník, který ovšem ještě není dokončen, takže prozatím prosím o laskavé strpení.

2. Role zařízení v knihovně ØMQ

Poněkud méně jasná může být role jednotlivých komunikujících uzlů při použití strategie PUSH-PULL, zejména tehdy, pokud tuto strategii použijeme současně s rozesíláním zpráv mezi větší množství workerů (fan-out) a s akumulací výsledků workerů v jednom koncovém uzlu typu collector (fan-in). Prozatím jsme tento problém řešili takovým způsobem, že první uzel, který vytváří úkoly pro workery, vystupuje v roli serveru, který použije socket typu PUSH. Workeři otevírají dva sockety – jeden vstupní (PULL) pro získání parametrů úkolu a druhý výstupní (PUSH) pro poslání výsledků. Vzhledem k tomu, že workeři se typicky připojují a odpojují podle zátěže aplikace, vystupují v roli klienta. A konečně poslední uzel, který sbírá výsledky práce workerů, používá socket typu PULL a musí se jednat o server, aby se k němu mohli workeři připojovat. Opět se podívejme na to, jak vlastně celá architektura aplikace vypadá:

Obrázek 3: Architektura, ve které se používá rozesílání zpráv mezi větší množství workerů (fan-out) a následný sběr zpráv v collectoru.

V praxi se ovšem setkáme i se složitějšími situacemi. Představme si například škálovatelný systém s komunikační strategií typu REQ-REP, v němž budeme potřebovat přidávat a ubírat servery podle toho, jak se bude měnit zátěž aplikace. V takovém případě nastane problém – jakým způsobem vlastně informovat klienty, ke kterým portům se mají připojit? A je vůbec vhodné, aby se po přidání/ubrání serveru museli všichni klienti znovu konfigurovat? Tyto (velmi časté, ovšem někdy přehlížené) případy je možné řešit následujícím způsobem: vzhledem k tomu, že jak klienti, tak i servery jsou z hlediska architektury aplikace „pohyblivé“ (nestabilní) uzly, musíme mezi ně přidat nějakého prostředníka, který bude zprostředkovávat komunikaci – v tom nejjednodušším případě bude pouze přeposílat data mezi klienty a servery. Tím pádem bude právě tento prostředník stabilní částí, která bude sockety vytvářet metodou Socket.bind() a ostatní uzly – jak klienti, tak i servery – se budou k tomuto prostředníku připojovat pomocí Socket.connect().

Nic nám samozřejmě nebrání si takové prostředníky naprogramovat, ovšem je to zbytečně složité a hlavně není zapotřebí znovuobjevovat kolo, protože knihovna ØMQ některé typické implementace prostředníků již obsahuje. Říká se jim, jak již ostatně víme z úvodní kapitoly, zařízení:

Zařízení Stručný popis
ZMQ.QUEUE prostředník používaný především v komunikaci REQ-REP (klasický klient, server)
ZMQ.FORWARDER používá se jako prostředník mezi zdroji zpráv a jejich příjemci PUB-SUB
ZMQ.STREAMER používá se v komunikační strategii PUSH-PULL

V dalších kapitolách si ukážeme, jak se jednotlivé typy zařízení mohou použít v praxi.

Zprostředkovatele ostatně známe i z reálného světa. Typickým příkladem „reálné implementace“ zařízení typu ZMQ.QUEUE jsou obchody, k nimž se „připojují“ jak zákazníci (klienti), tak i výrobci (servery). Současně obchodníci implementují i frontu zmíněnou v názvu zařízení, a to jak frontu jednostrannou (sklad zboží), tak mnohdy i oboustrannou (seznam objednávek). Povšimněte si, že tato – samozřejmě že jen nepřesná – analogie nevyžaduje, aby výrobci znali adresy zákazníků a naopak: jediným styčným bodem je právě obchod. Současně je, podobně jako v knihovně ØMQ, zajištěna i situace, kdy do obchodu začnou chodit noví zákazníci, staří naopak odejdou, naveze se zboží od dalšího výrobce atd. atd.

3. Komunikační strategie REQ-REP bez použití zařízení typu „Queue“

Nejprve si připomeňme, jakým způsobem jsme implementovali jednoduchého klienta a server, u nichž se pro vzájemnou komunikaci používala strategie REQ-REP. To znamená, že server naslouchá na předem známém portu na požadavky (request) klienta, na něž by měl odpovědět (response). Server se po svém spuštění navazuje na předem známý port a tudíž používá metodu Socket.bind(). Naproti tomu klient se k tomuto portu připojuje s využitím metody Socket.connect().

Implementace serveru využívajícího socket typu REP vypadá následovně:

import zmq
from math import factorial
 
 
def bind(port, connection_type):
    """Otevření socketu se specifikovaným typem spojení."""
    context = zmq.Context()
    socket = context.socket(connection_type)
    address = "tcp://*:{port}".format(port=port)
    socket.bind(address)
    print("Bound to address {a}".format(a=address))
    return socket
 
 
def send_response(socket, response):
    """Odeslání odpovědi."""
    print("Sending response '{r}'".format(r=response))
    socket.send_string(response)
 
 
def receive_request(socket):
    """Zpracování požadavku klienta."""
    request = socket.recv_string()
    print("Received request from client: '{r}'".format(r=request))
    return request
 
 
def start_server():
    """Spuštění serveru."""
    socket = bind(5556, zmq.REP)
    while True:
        request = receive_request(socket)
        try:
            n = int(request)
            fact = factorial(n)
            send_response(socket, "{n}! = {f}".format(n=n, f=fact))
        except Exception as e:
            send_response(socket, "Wrong input")
 
 
start_server()

Samozřejmě si ukážeme i implementaci klienta se socketem typu REQ:

import zmq
 
 
def connect(port, connection_type):
    """Otevření socketu se specifikovaným typem spojení."""
    context = zmq.Context()
    socket = context.socket(connection_type)
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
    return socket
 
 
def send_request(socket, request):
    """Poslání požadavku."""
    print("Sending request '{r}'".format(r=request))
    socket.send_string(request)
 
 
def start_client():
    """Spuštění klienta."""
    socket = connect(5556, zmq.REQ)
 
    send_request(socket, "1")
    print(socket.recv_string())
    print()
 
    send_request(socket, "10")
    print(socket.recv_string())
    print()
 
    send_request(socket, "xyzzy")
    print(socket.recv_string())
    print()
 
 
start_client()

4. Použití zařízení typu „Queue“

Jak jsme si již řekli ve druhé kapitole, je možné a v mnoha případech i velmi žádoucí mezi klienty a server(y) vložit prostředníka, který je v knihovně ØMQ představován zařízením typu ZMQ.QUEUE. Celá konfigurace aplikace se změní, i když změny nejsou nijak zásadní:

  1. Server se již nebude navazovat na port pomocí metodySocket.bind(), ale použije běžné „klientské“ připojení Socket.connect(). V našem konkrétním případě se použije port 5555.
  2. Klient stále bude navazovat připojení přesSocket.connect(), ovšem na odlišné číslo portu. Pro jednoduchost použijeme port 5556, tedy port s číslem o jedničku vyšším, než je port, na který se připojuje server.
  3. A konečně – mezi klienty a serverem (servery) bude ležet zprostředkovatel implementovaný zařízením typu ZMQ.QUEUE. Tento zprostředkovatel bude navázán na oba výše zmíněné porty 5555 i 5556, protože se na něj z jedné strany budou připojovat servery a ze strany druhé klient (připomeňme si, že sockety v ØMQ umožňují oboustrannou komunikaci).

Zprostředkovatel na straně, která komunikuje se serverem, použije socket typu XREQ a na straně klientů socket typu XREP. Původní komunikační schéma REQ-REP se tedy změní na REQ-XREP/XREQ-REP.

Obrázek 4: Komunikační schéma REQ-XREP/XREQ-REP.

První implementace prostředníka může vypadat následovně:

import zmq
 
XREP_PORT = 5556
XREQ_PORT = 5557
 
 
def create_queue(xrep_port, xreq_port):
    """Vytvoření fronty."""
    context = zmq.Context()
 
    frontend = context.socket(zmq.XREP)
    address = "tcp://*:{port}".format(port=xrep_port)
    frontend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=xrep_port))
 
    backend = context.socket(zmq.XREQ)
    address = "tcp://*:{port}".format(port=xreq_port)
    backend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=xreq_port))
 
    zmq.device(zmq.QUEUE, frontend, backend)
 
 
create_queue(5556, 5557)

5. Klient a server, kteří se k frontě připojují

V implementaci klienta pošleme serveru (zprostředkovaně) několik požadavků na výpočet faktoriálu. Konkrétně budeme chtít, aby server vypočítal a vrátil faktoriál jedné, deseti, řetězce „xyzzy“ (což je samozřejmě chyba), –10 (taktéž chyba) a konečně faktoriál hodnoty 100. Odpověď serveru se přímo vypíše na standardní výstup (terminál):

import zmq
 
PORT = 5556
 
 
def connect(port, connection_type):
    """Otevření socketu se specifikovaným typem spojení."""
    context = zmq.Context()
    socket = context.socket(connection_type)
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
    return socket
 
 
def send_request(socket, request):
    """Poslání požadavku."""
    print("Sending request '{r}'".format(r=request))
    socket.send_string(request)
 
 
def start_client():
    """Spuštění klienta."""
    socket = connect(PORT, zmq.REQ)
 
    send_request(socket, "1")
    print(socket.recv_string())
    print()
 
    send_request(socket, "10")
    print(socket.recv_string())
    print()
 
    send_request(socket, "xyzzy")
    print(socket.recv_string())
    print()
 
    send_request(socket, "-10")
    print(socket.recv_string())
    print()
 
    send_request(socket, "100")
    print(socket.recv_string())
    print()
 
 
start_client()

V implementaci serveru naproti tomu očekáváme požadavky na portu 5557 (nikoli 5556!) a pokud se jedná o celé číslo, pokusíme se vypočítat jeho faktoriál a vrátit výsledek zpět klientovi. Ve skutečnosti se výsledky opět budou vracet nepřímo přes zařízení ZMQ.QUEUE:

import zmq
from math import factorial
 
PORT = 5557
 
 
def connect(port, connection_type):
    """Otevření socketu se specifikovaným typem spojení."""
    context = zmq.Context()
    socket = context.socket(connection_type)
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
    return socket
 
 
def send_response(socket, response):
    """Odeslání odpovědi."""
    print("Sending response '{r}'".format(r=response))
    socket.send_string(response)
 
 
def receive_request(socket):
    """Zpracování požadavku klienta."""
    request = socket.recv_string()
    print("Received request from client: '{r}'".format(r=request))
    return request
 
 
def start_server():
    """Spuštění serveru."""
    socket = connect(PORT, zmq.REP)
    while True:
        request = receive_request(socket)
        try:
            n = int(request)
            fact = factorial(n)
            send_response(socket, "{n}! = {f}".format(n=n, f=fact))
        except Exception as e:
            send_response(socket, "Wrong input")
 
 
start_server()

Podívejme se na chování celého systému ve chvíli, kdy spustíme zprostředkovatele, dále server a nakonec klienta (či několik klientů). Následují výpisy generované jednotlivými uzly na standardní výstup.

Zprávy vypsané zprostředkovatelem:

$ python3 queue.py 
 
Bound to tcp://*:5556 on port 5556
Bound to tcp://*:5557 on port 5557

Zprávy vypisované serverem:

$ python3 server.py 
 
Connected to tcp://localhost:5557
Received request from client: '1'
Sending response '1! = 1'
Received request from client: '10'
Sending response '10! = 3628800'
Received request from client: 'xyzzy'
Sending response 'Wrong input'
Received request from client: '-10'
Sending response 'Wrong input'
Received request from client: '100'
Sending response '100! = 93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000'

Zprávy vypisované klientem:

$ python3 client.py 
 
Connected to tcp://localhost:5556
Sending request '1'
1! = 1
 
Sending request '10'
10! = 3628800
 
Sending request 'xyzzy'
Wrong input
 
Sending request '-10'
Wrong input
 
Sending request '100'
100! = 93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000

6. Vylepšení předchozího příkladu použitím dekorátorů

Předchozí příklad s implementací klienta, serveru a zprostředkovatele typu ZMQ.QUEUE ještě upravíme takovým způsobem, aby se v jednotlivých skriptech nemusel explicitně vytvářet (a rušit) kontext ØMQ popř. vytvořené sockety. Využijeme přitom dekorátory, s jejichž použitím jsme se seznámili minule.

Implementace vlastního zprostředkovatele:

import zmq
from zmq.decorators import context, socket
 
XREP_PORT = 5556
XREQ_PORT = 5557
 
 
@context()
@socket(zmq.XREP)
@socket(zmq.XREQ)
def create_queue(xrep_port, xreq_port, context, frontend, backend):
    """Vytvoření fronty."""
    context = zmq.Context()
 
    address = "tcp://*:{port}".format(port=xrep_port)
    frontend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=xrep_port))
 
    address = "tcp://*:{port}".format(port=xreq_port)
    backend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=xreq_port))
 
    zmq.device(zmq.QUEUE, frontend, backend)
 
 
create_queue(5556, 5557)

Implementace klienta může vypadat následovně:

import zmq
from zmq.decorators import context, socket
 
CONNECTION_TYPE = zmq.REQ
PORT = 5556
 
 
def connect(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
 
def send_request(socket, request):
    """Poslání požadavku."""
    print("Sending request '{r}'".format(r=request))
    socket.send_string(request)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_client(port, context, socket):
    """Spuštění klienta."""
    connect(socket, port)
 
    send_request(socket, "1")
    print(socket.recv_string())
    print()
 
    send_request(socket, "10")
    print(socket.recv_string())
    print()
 
    send_request(socket, "xyzzy")
    print(socket.recv_string())
    print()
 
    send_request(socket, "-10")
    print(socket.recv_string())
    print()
 
    send_request(socket, "100")
    print(socket.recv_string())
    print()
 
 
start_client(PORT)

Implementace serveru vypadá takto:

import zmq
from zmq.decorators import context, socket
from math import factorial
 
CONNECTION_TYPE = zmq.REP
PORT = 5557
 
 
def connect(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
 
def send_response(socket, response):
    """Odeslání odpovědi."""
    print("Sending response '{r}'".format(r=response))
    socket.send_string(response)
 
 
def receive_request(socket):
    """Zpracování požadavku klienta."""
    request = socket.recv_string()
    print("Received request from client: '{r}'".format(r=request))
    return request
 
 
@context()
@socket(CONNECTION_TYPE)
def start_server(port, context, socket):
    """Spuštění serveru."""
    connect(socket, port)
    while True:
        request = receive_request(socket)
        try:
            n = int(request)
            fact = factorial(n)
            send_response(socket, "{n}! = {f}".format(n=n, f=fact))
        except Exception as e:
            send_response(socket, "Wrong input")
 
 
start_server(PORT)

7. Klasická architektura PUB-SUB a její omezení

V této části článku se budeme zabývat komunikační strategií PUB-SUB a role zprostředkovatele při použití této strategie. Nejprve si ukažme klasický příklad té nejjednodušší strategie PUB-SUB s jediným zdrojem zpráv a s jediným příjemcem.

Takto vypadá zdroj zpráv, který otevírá port 5556 a na tento port každou sekundu pošle zprávu bez ohledu na to, kdo a kdy (a zda vůbec) zprávu přijme:

import zmq
from zmq.decorators import context, socket
 
from time import sleep
from os import getpid
 
 
CONNECTION_TYPE = zmq.PUB
PORT = 5556
 
 
def bind(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://*:{port}".format(port=port)
    socket.bind(address)
    print("Bound to {a}".format(a=address))
 
 
def publish_message(socket, message):
    """Publikování zprávy zprávy."""
    print("Publishing message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_publisher(port, context, socket):
    """Spuštění publisheru."""
    pid = getpid()
    print("Publisher PID={pid}".format(pid=pid))
 
    bind(socket, port)
    for i in range(100):
        message = "Message #{i} from {pid}".format(i=i, pid=pid)
        publish_message(socket, message)
        sleep(1)
 
 
start_publisher(PORT)

Příjemce zpráv očekává zprávy na portu 5556. Filtrace je sice prováděna (což je u komunikační strategie PUB-SUB nutnost), ovšem filtr je nastaven na prázdný řetězec, takže ve skutečnosti jsou akceptovány všechny zprávy:

import zmq
from zmq.decorators import context, socket
 
CONNECTION_TYPE = zmq.SUB
PORT = 5556
 
 
def connect(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
 
@context()
@socket(CONNECTION_TYPE)
def start_subscriber(port, context, socket):
    """Spuštění příjemce."""
    connect(socket, port)
    socket.setsockopt_string(zmq.SUBSCRIBE, "")
 
    print("Waiting for messages...")
 
    while True:
        message = socket.recv_string()
        print("Received message '{m}'".format(m=message))
 
 
start_subscriber(PORT)

Následuje ukázka komunikace mezi zdrojem zpráv a jejich příjemcem:

$ python3 subscriber.py 
 
Connected to tcp://localhost:5556
Waiting for messages...
Received message 'Message #1 from 4330'
Received message 'Message #2 from 4330'
Received message 'Message #3 from 4330'
Received message 'Message #4 from 4330'
$ python3 publisher.py 
 
Publisher PID=4330
Bound to tcp://*:5556
Publishing message 'Message #0 from 4330'
Publishing message 'Message #1 from 4330'
Publishing message 'Message #2 from 4330'
Publishing message 'Message #3 from 4330'
Publishing message 'Message #4 from 4330'
...
...
...

8. Připojení příjemce zpráv k většímu množství publisherů

Ve skutečnosti je možné nejjednodušší architekturu z předchozí kapitoly s jedním zdrojem zpráv a jedním příjemcem rozšířit, a to jak o více příjemců (což je triviální), tak i o větší množství zdrojů zpráv. Ukažme si tuto druhou možnost, která je z implementačního hlediska mnohem zajímavější.

Nejprve bude uveden zdrojový kód prvního publisheru, který bude posílat zprávy na port číslo 5556. Mezi jednotlivými zprávami bude pomlka o trvání přibližně jedné sekundy:

import zmq
from zmq.decorators import context, socket
 
from time import sleep
from os import getpid
 
 
CONNECTION_TYPE = zmq.PUB
PORT = 5556
 
 
def bind(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://*:{port}".format(port=port)
    socket.bind(address)
    print("Bound to {a}".format(a=address))
 
 
def publish_message(socket, message):
    """Publikování zprávy zprávy."""
    print("Publishing message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_publisher(port, context, socket):
    """Spuštění publisheru."""
    pid = getpid()
    print("Publisher PID={pid}".format(pid=pid))
 
    bind(socket, port)
    for i in range(100):
        message = "Message #{i} from {pid}".format(i=i, pid=pid)
        publish_message(socket, message)
        sleep(1)
 
 
start_publisher(PORT)

Druhý publisher se do značné míry podobá publisheru prvnímu, ovšem zprávy se v tomto případě posílají na port 5557 a nikoli na port 5556. Zprávy se taktéž posílají rychleji, přibližně každých 300 milisekund:

import zmq
from zmq.decorators import context, socket
 
from time import sleep
from os import getpid
 
 
CONNECTION_TYPE = zmq.PUB
PORT = 5557
 
 
def bind(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://*:{port}".format(port=port)
    socket.bind(address)
    print("Bound to {a}".format(a=address))
 
 
def publish_message(socket, message):
    """Publikování zprávy zprávy."""
    print("Publishing message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_publisher(port, context, socket):
    """Spuštění publisheru."""
    pid = getpid()
    print("Publisher PID={pid}".format(pid=pid))
 
    bind(socket, port)
    for i in range(100):
        message = "Message #{i} from {pid}".format(i=i, pid=pid)
        publish_message(socket, message)
        sleep(0.3)
 
 
start_publisher(PORT)

Nejzajímavější je příjemce zpráv, který nyní musí zprávy očekávat na dvou portech 5556 a současně i 5557. Ve skutečnosti je ovšem implementace příjemce stále velmi jednoduchá, a to z toho důvodu, že zobecněné sockety v knihovně ØMQ umožňují současné připojení k většímu množství portů:

ports = (5556, 5557)
 
for port in ports:
    connect(socket, port)

Úplný zdrojový kód tohoto příjemce zpráv vypadá následovně:

import zmq
from zmq.decorators import context, socket
 
CONNECTION_TYPE = zmq.SUB
PORT1 = 5556
PORT2 = 5557
 
 
def connect(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
 
@context()
@socket(CONNECTION_TYPE)
def start_subscriber(ports, context, socket):
    """Spuštění příjemce."""
    for port in ports:
        connect(socket, port)
    socket.setsockopt_string(zmq.SUBSCRIBE, "")
 
    print("Waiting for messages...")
 
    while True:
        message = socket.recv_string()
        print("Received message '{m}'".format(m=message))
 
 
start_subscriber((PORT1, PORT2))

9. Otestování chování aplikace se dvěma publishery a jedním příjemcem zpráv

Ukažme si nyní, jakým způsobem vlastně bude příjemce získávat a vypisovat zprávy vytvářené dvěma producenty.

Spustíme prvního producenta zpráv:

$ python3 publisher1.py 
 
Publisher PID=4396
Bound to tcp://*:5556
Publishing message 'Message #0 from 4396'
Publishing message 'Message #1 from 4396'
Publishing message 'Message #2 from 4396'
Publishing message 'Message #3 from 4396'
Publishing message 'Message #4 from 4396'
Publishing message 'Message #5 from 4396'
...
...
...

Ihned poté spustíme druhého producenta zpráv:

$ python3 publisher2.py 
 
Publisher PID=4399
Bound to tcp://*:5557
Publishing message 'Message #0 from 4399'
Publishing message 'Message #1 from 4399'
Publishing message 'Message #2 from 4399'
Publishing message 'Message #3 from 4399'
Publishing message 'Message #4 from 4399'
Publishing message 'Message #5 from 4399'
Publishing message 'Message #6 from 4399'
Publishing message 'Message #7 from 4399'
Publishing message 'Message #8 from 4399'
Publishing message 'Message #9 from 4399'
...
...
...

Pokud nyní spustíme konzumenta (příjemce) zpráv, měl by vypisovat zprávy, které postupně přicházejí od obou producentů:

$ python3 subscriber.py 
 
Connected to tcp://localhost:5556
Connected to tcp://localhost:5557
Waiting for messages...
Received message 'Message #1 from 4396'
Received message 'Message #2 from 4396'
Received message 'Message #3 from 4396'
Received message 'Message #4 from 4396'
Received message 'Message #1 from 4399'
Received message 'Message #2 from 4399'
Received message 'Message #3 from 4399'
Received message 'Message #5 from 4396'
Received message 'Message #4 from 4399'
Received message 'Message #5 from 4399'
Received message 'Message #6 from 4399'
Received message 'Message #7 from 4399'
Received message 'Message #6 from 4396'
Received message 'Message #8 from 4399'
Received message 'Message #9 from 4399'
...
...
...
Poznámka: povšimněte si, že zprávy ze druhého zdroje zpráv (PID=4399) skutečně dostáváme s přibližně třikrát větší frekvencí než od zdroje prvního.

10. Vylepšení architektury s využitím tzv. forwarderu

Dalším typem zařízení, s nímž se v dnešním článku seznámíme, je zařízení typu forwarder (ZMQ.FORWARDER). Toto zařízení se používá ve chvíli, kdy jednotlivé uzlu komunikují se strategií PUB-SUB, tj. pokud máme uzly produkující zprávy (producers) a uzly zprávy přijímající (consumers). Připomeňme si, že producenti zpráv nijak nekontrolují kdo a kdy zprávy vlastně přijme – dokonce se může stát (a je to zcela legální a vlastně i očekávané chování), že se některé zprávy ztratí. Vzhledem k tomu, že komunikační strategie typu PUB-SUB je velmi jednoduchá, nepoužívá potvrzování atd., je i použití zařízení ZMQ.FORWARDER přímočaré, což je patrné z následujícího schématu:

Obrázek 5: Zařízení typu ZMQ.FORWARDER.

Vidíme, že forwarder přijímá zprávy socketem typu SUB a posílá je socketem typu PUB, takže se původní komunikační strategie PUB-SUB změní na PUB-SUB/PUB-SUB. Nesmíme ovšem zapomenout na to, že zprávy je možné filtrovat, což platí i pro forwarder. Pro jednoduchost si naimplementujeme forwarder, který přeposílá všechny zprávy, tj. jeho filtr bude prázdný:

import zmq
from zmq.decorators import context, socket
 
SUB_PORT = 5556
PUB_PORT = 5557
 
 
@context()
@socket(zmq.SUB)
@socket(zmq.PUB)
def create_queue(sub_port, pub_port, context, frontend, backend):
    """Vytvoření forwarderu."""
    context = zmq.Context()
 
    address = "tcp://*:{port}".format(port=sub_port)
    frontend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=sub_port))
    frontend.setsockopt_string(zmq.SUBSCRIBE, "")
 
    address = "tcp://*:{port}".format(port=pub_port)
    backend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=pub_port))
 
    zmq.device(zmq.FORWARDER, frontend, backend)
 
 
create_queue(SUB_PORT, PUB_PORT)

11. Připojení zdroje zpráv i jejich příjemce k forwarderu

Samotného producenta zpráv bude nutné upravit pouze nepatrně – změníme metodu připojení ze Socket.bind() a na Socket.connect(). Výsledná podoba producenta (zdroje zpráv) bude vypadat takto:

import zmq
from zmq.decorators import context, socket
 
from time import sleep
from os import getpid
 
 
CONNECTION_TYPE = zmq.PUB
PORT = 5556
 
 
def connect(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
 
def publish_message(socket, message):
    """Publikování zprávy zprávy."""
    print("Publishing message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_publisher(port, context, socket):
    """Spuštění publisheru."""
    pid = getpid()
    print("Publisher PID={pid}".format(pid=pid))
 
    connect(socket, port)
    for i in range(100):
        message = "Message #{i} from {pid}".format(i=i, pid=pid)
        publish_message(socket, message)
        sleep(1)
 
 
start_publisher(PORT)

Konzument, tj. příjemce zpráv, se prakticky nezmění, pouze nesmíme zapomenout na to, že se nebude připojovat přímo k producentovi na portu 5556, ale k forwarderu na portu 5557. Opět se podívejme na jeho zdrojový kód:

import zmq
from zmq.decorators import context, socket
 
CONNECTION_TYPE = zmq.SUB
PORT = 5557
 
 
def connect(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
 
@context()
@socket(CONNECTION_TYPE)
def start_subscriber(port, context, socket):
    """Spuštění příjemce."""
    connect(socket, port)
    socket.setsockopt_string(zmq.SUBSCRIBE, "")
 
    print("Waiting for messages...")
 
    while True:
        message = socket.recv_string()
        print("Received message '{m}'".format(m=message))
 
 
start_subscriber(PORT)

12. Spuštění tří producentů zpráv a tří příjemců připojených na jediný forwarder

Nyní si ukažme, jak bude systém reagovat ve chvíli, kdy spustíme tři producenty zpráv a současně i tři příjemce. Mezi producenty a příjemcem bude vložen forwarder popsaný v předchozích kapitolách. Nejprve nepatrně upravíme producenta zpráv takovým způsobem, abychom mohli specifikovat jeho jméno a taktéž interval mezi zprávami na příkazovém řádku:

import zmq
from zmq.decorators import context, socket
 
from time import sleep
from sys import argv, exit
 
 
CONNECTION_TYPE = zmq.PUB
PORT = 5556
 
 
def connect(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
 
def publish_message(socket, message):
    """Publikování zprávy zprávy."""
    print("Publishing message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_publisher(name, delay, port, context, socket):
    """Spuštění publisheru."""
    print("Publisher '{name}'".format(name=name))
 
    connect(socket, port)
    for i in range(100):
        message = "Message #{i} from {name}".format(i=i, name=name)
        publish_message(socket, message)
        sleep(delay)
 
 
if len(argv) <= 2:
    print('Please provide publisher name and sleep amount on the CLI')
    exit(1)
 
 
name = argv[1]
delay = float(argv[2])
start_publisher(name, delay, PORT)

Celý test spustíme následujícím skriptem:

export PYTHONUNBUFFERED=yes
 
python3 forwarder.py > forwarder.out &
forwarder_pid=$!
 
python3 publisher.py Publisher1 1 > publisher1.out 2>&1 &
publisher1_pid=$!
 
python3 publisher.py Publisher2 0.5 > publisher2.out 2>&1 &
publisher2_pid=$!
 
python3 publisher.py Publisher3 0.2 > publisher3.out 2>&1 &
publisher3_pid=$!
 
python3 subscriber.py > subscriber1.out &
subscriber1_pid=$!
 
python3 subscriber.py > subscriber2.out &
subscriber2_pid=$!
 
python3 subscriber.py > subscriber3.out &
subscriber3_pid=$!
 
sleep 20
 
kill $forwarder_pid
kill $publisher1_pid
kill $publisher2_pid
kill $publisher3_pid
kill $subscriber1_pid
kill $subscriber2_pid
kill $subscriber3_pid

Ukázka výstupu z prvního příjemce zpráv:

Connected to tcp://localhost:5557
Waiting for messages...
Received message 'Message #1 from Publisher3'
Received message 'Message #2 from Publisher3'
Received message 'Message #1 from Publisher2'
Received message 'Message #3 from Publisher3'
Received message 'Message #4 from Publisher3'
Received message 'Message #1 from Publisher1'
Received message 'Message #5 from Publisher3'
Received message 'Message #2 from Publisher2'
Received message 'Message #6 from Publisher3'
Received message 'Message #7 from Publisher3'
Received message 'Message #3 from Publisher2'
Received message 'Message #8 from Publisher3'
Received message 'Message #9 from Publisher3'
Received message 'Message #2 from Publisher1'

13. Praktická ukázka filtrace zpráv: tři producenti a čtyři příjemci zpráv

Předchozí příklad si ještě upravíme, a to takovým způsobem, aby se nepatrně pozměnil formát zpráv – ty nyní budou začínat jménem zdroje zpráv a navíc konzumenti zpráv budou obsahovat filtr konfigurovatelný z příkazové řádky. Samotný forwarder je stále stejný:

import zmq
from zmq.decorators import context, socket
 
SUB_PORT = 5556
PUB_PORT = 5557
 
 
@context()
@socket(zmq.SUB)
@socket(zmq.PUB)
def create_queue(sub_port, pub_port, context, frontend, backend):
    """Vytvoření forwarderu."""
    context = zmq.Context()
 
    address = "tcp://*:{port}".format(port=sub_port)
    frontend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=sub_port))
    frontend.setsockopt_string(zmq.SUBSCRIBE, "")
 
    address = "tcp://*:{port}".format(port=pub_port)
    backend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=pub_port))
 
    zmq.device(zmq.FORWARDER, frontend, backend)
 
 
create_queue(SUB_PORT, PUB_PORT)

Zdroj zpráv bude na začátek zprávy připisovat svoje jméno nastavitelné na příkazové řádce:

import zmq
from zmq.decorators import context, socket
 
from time import sleep
from sys import argv, exit
 
 
CONNECTION_TYPE = zmq.PUB
PORT = 5556
 
 
def connect(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
 
def publish_message(socket, message):
    """Publikování zprávy zprávy."""
    print("Publishing message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_publisher(name, delay, port, context, socket):
    """Spuštění publisheru."""
    print("Publisher '{name}'".format(name=name))
 
    connect(socket, port)
    for i in range(100):
        message = "{name}: Message #{i}".format(name=name, i=i)
        publish_message(socket, message)
        sleep(delay)
 
 
if len(argv) <= 2:
    print('Please provide publisher name and sleep amount on the CLI')
    exit(1)
 
 
name = argv[1]
delay = float(argv[2])
start_publisher(name, delay, PORT)

Naproti tomu příjemce zpráv obsahuje konfigurovatelný filtr, opět specifikovatelný z příkazové řádky:

import zmq
from zmq.decorators import context, socket
from sys import argv, exit
 
 
CONNECTION_TYPE = zmq.SUB
PORT = 5557
 
 
def connect(socket, port):
    """Otevření socketu se specifikovaným typem spojení."""
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
 
@context()
@socket(CONNECTION_TYPE)
def start_subscriber(filter, port, context, socket):
    """Spuštění příjemce."""
    connect(socket, port)
    socket.setsockopt_string(zmq.SUBSCRIBE, filter)
 
    print("Waiting for messages...")
 
    while True:
        message = socket.recv_string()
        print("Received message '{m}'".format(m=message))
 
 
if len(argv) <= 1:
    print('Please provide filter on the CLI')
    exit(1)
 
 
start_subscriber(argv[1], PORT)

14. Výsledky činnosti předchozí aplikace

Pro otestování předchozí aplikace (resp. systému) použijeme upravený skript, ve kterém se při spuštění publisherů specifikují jejich jména a u jednotlivých příjemců zpráv pak filtry (v podstatě prefixy řetězců zpráv):

export PYTHONUNBUFFERED=yes
 
python3 forwarder.py > forwarder.out &
forwarder_pid=$!
 
python3 publisher.py Publisher1 1 > publisher1.out 2>&1 &
publisher1_pid=$!
 
python3 publisher.py Publisher2 0.5 > publisher2.out 2>&1 &
publisher2_pid=$!
 
python3 publisher.py Publisher3 0.2 > publisher3.out 2>&1 &
publisher3_pid=$!
 
python3 subscriber.py "" > subscriber1.out &
subscriber1_pid=$!
 
python3 subscriber.py "Publisher1" > subscriber2.out &
subscriber2_pid=$!
 
python3 subscriber.py "Publisher2" > subscriber3.out &
subscriber3_pid=$!
 
python3 subscriber.py "PublisherX" > subscriber4.out &
subscriber4_pid=$!
 
sleep 20
 
kill $forwarder_pid
kill $publisher1_pid
kill $publisher2_pid
kill $publisher3_pid
kill $subscriber1_pid
kill $subscriber2_pid
kill $subscriber3_pid
kill $subscriber4_pid

Výsledky pro prvního příjemce zpráv s prázdným filtrem:

Connected to tcp://localhost:5557
Waiting for messages...
Received message 'Publisher3: Message #1'
Received message 'Publisher3: Message #2'
Received message 'Publisher2: Message #1'
Received message 'Publisher3: Message #3'
Received message 'Publisher3: Message #4'
Received message 'Publisher3: Message #5'
Received message 'Publisher1: Message #1'
Received message 'Publisher2: Message #2'
Received message 'Publisher3: Message #6'
...
...
...

Výsledky pro druhého příjemce zpráv s filtrem nastaveným na „Publisher1“:

Connected to tcp://localhost:5557
Waiting for messages...
Received message 'Publisher1: Message #1'
Received message 'Publisher1: Message #2'
Received message 'Publisher1: Message #3'
...
...
...

Výsledky pro třetího příjemce zpráv s filtrem nastaveným na „Publisher2“:

Connected to tcp://localhost:5557
Waiting for messages...
Received message 'Publisher2: Message #1'
Received message 'Publisher2: Message #2'
Received message 'Publisher2: Message #3'
...
...
...

Výsledky pro čtvrtého příjemce zpráv s filtrem nastaveným na „PublisherX“:

Connected to tcp://localhost:5557
Waiting for messages...
Poznámka: ze zobrazených výsledků je patrné, že filtrace je skutečně funkční a že probíhá na straně příjemce, nikoli na straně odesilatele. Ovšem ve starších verzích knihovny ØMQ byla sémantika PUB-SUB odlišná, proto je někdy zapotřebí přepsat starší aplikace takovým způsobem, aby používaly filtraci korektně.

15. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu a taktéž v programovacím jazyku C byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.

Příklad Skript/kód Popis Cesta
1 client.py komunikační strategie REQ-REP, klientská část https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example05/client.py
1 server.py komunikační strategie REQ-REP, serverová část https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example05/server.py
       
2 client.py komunikační schéma REQ-XREP/XREQ-REP, implementace klienta https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example16_queue_device/cli­ent.py
2 server.py komunikační schéma REQ-XREP/XREQ-REP, implementace serveru https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example16_queue_device/ser­ver.py
2 queue.py komunikační schéma REQ-XREP/XREQ-REP, implementace fronty https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example16_queue_device/qu­eue.py
       
3 client.py komunikační schéma REQ-XREP/XREQ-REP, implementace klienta, použití dekorátorů https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example17_queue_device_pro­per_close/client.py
3 server.py komunikační schéma REQ-XREP/XREQ-REP, implementace serveru, použití dekorátorů https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example17_queue_device_pro­per_close/server.py
3 queue.py komunikační schéma REQ-XREP/XREQ-REP, implementace fronty, použití dekorátorů https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example17_queue_device_pro­per_close/queue.py
       
4 publisher.py komunikační schéma PUB-SUB, implementace publishera https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example18_publisher_sub­scriber/publisher.py
4 subscriber.py komunikační schéma PUB-SUB, implementace subscribera https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example18_publisher_sub­scriber/subscriber.py
       
5 publisher1.py komunikační schéma PUB-SUB, první publisher https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example19_more_publisher­s/publisher1.py
5 publisher2.py komunikační schéma PUB-SUB, druhý publisher https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example19_more_publisher­s/publisher2.py
5 subscriber.py komunikační schéma PUB-SUB, příjemce zpráv https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example19_more_publisher­s/subscriber.py
       
6 forwarder.py komunikační schéma PUB-SUB, zařízení typu forwarder https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example20_forwarder/for­warder.py
6 publisher.py komunikační schéma PUB-SUB, publisher https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example20_forwarder/pu­blisher.py
6 subscriber.py komunikační schéma PUB-SUB, příjemce zpráv https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example20_forwarder/sub­scriber.py
       
7 forwarder.py komunikační schéma PUB-SUB, zařízení typu forwarder https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example21_more_pubs_sub­s_and_forwarder/forwarder­.py
7 publisher.py komunikační schéma PUB-SUB, publisher https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example21_more_pubs_sub­s_and_forwarder/publisher­.py
7 subscriber.py komunikační schéma PUB-SUB, příjemce zpráv https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example21_more_pubs_sub­s_and_forwarder/subscriber­.py
7 run.sh skript pro spuštění testu https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example21_more_pubs_sub­s_and_forwarder/run.sh
       
8 forwarder.py komunikační schéma PUB-SUB, zařízení typu forwarder https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example22_sub_filterin­g/forwarder.py
8 publisher.py komunikační schéma PUB-SUB, publisher https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example22_sub_filterin­g/publisher.py
8 subscriber.py komunikační schéma PUB-SUB, příjemce zpráv https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example22_sub_filterin­g/subscriber.py
8 run.sh skript pro spuštění testu https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example22_sub_filterin­g/run.sh
       
9* streamer.py použití zařízení typu streamer https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example23_push_pull_stre­amer/streamer.py
9* producer.py komunikační strategie PUSH-PULL, producent https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example23_push_pull_stre­amer/producer.py
9* consumer.py komunikační strategie PUSH-PULL, konzument https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example23_push_pull_stre­amer/consumer.py
       
10* producer.py producent na začátku pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/producer.py
10* worker1.py první worker jako součást pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/worker1.py
10* worker2.py druhý worker jako součást pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/worker2.py
10* collector.py shromažďovač výsledků na konci pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/collector.py
Poznámka: příklady označené hvězdičkou sice již v repositáři najdete, ovšem jejich podrobnější popis bude uveden v navazující části tohoto seriálu.

16. Odkazy na předchozí části seriálu

V této kapitole jsou uvedeny odkazy na všech sedm předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:

  1. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
  2. Celery: systém implementující asynchronní fronty úloh pro Python
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/
  3. Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/
  4. RabbitMQ: jedna z nejúspěšnějších implementací brokera
    https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/
  5. Pokročilejší operace nabízené systémem RabbitMQ
    https://www.root.cz/clanky/po­krocilejsi-operace-nabizene-systemem-rabbitmq/
  6. ØMQ: knihovna pro asynchronní předávání zpráv
    https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/
  7. Další možnosti poskytované knihovnou ØMQ
    https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/

17. Odkazy na Internetu

  1. ØMQ – Distributed Messaging
    http://zeromq.org/
  2. ØMQ Community
    http://zeromq.org/community
  3. Get The Software
    http://zeromq.org/intro:get-the-software
  4. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  5. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  6. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  7. ZeroMQ RFC
    https://rfc.zeromq.org/
  8. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  9. zeromq/czmq
    https://github.com/zeromq/czmq
  10. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  11. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  12. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  13. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  14. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  15. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  16. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  17. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  18. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  19. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  20. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  21. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  22. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  23. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  24. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  25. Python binding
    http://zeromq.org/bindings:python
  26. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  27. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  28. About Nanomsg
    https://nanomsg.org/
  29. Advanced Message Queuing Protocol
    https://www.amqp.org/
  30. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  31. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  32. RabbitMQ
    https://www.rabbitmq.com/
  33. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  34. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  35. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  36. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  37. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  38. Erlang
    http://www.erlang.org/
  39. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  40. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  41. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  42. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  43. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  44. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  45. celery na PyPi
    https://pypi.org/project/celery/
  46. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  47. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  48. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  49. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  50. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  51. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  52. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  53. node-celery
    https://github.com/mher/node-celery
  54. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  55. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  56. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  57. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  58. Stránky projektu Redis
    https://redis.io/
  59. Introduction to Redis
    https://redis.io/topics/introduction
  60. Try Redis
    http://try.redis.io/
  61. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  62. Python Redis
    https://redislabs.com/lp/python-redis/
  63. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  64. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  65. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  66. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  67. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  68. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  69. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  70. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  71. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  72. Redis Persistence
    https://redis.io/topics/persistence
  73. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  74. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  75. Ost (knihovna)
    https://github.com/soveran/ost
  76. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  77. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  78. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  79. What Is Sharding?
    https://btcmanager.com/what-sharding/
  80. Redis clients
    https://redis.io/clients
  81. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  82. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  83. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  84. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  85. Resque
    https://github.com/resque/resque
  86. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  87. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  88. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  89. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  90. Pub/Sub
    https://redis.io/topics/pubsub
  91. ZeroMQ distributed messaging
    http://zeromq.org/
  92. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  93. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  94. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  95. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  96. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  97. Redis Protocol specification
    https://redis.io/topics/protocol
  98. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  99. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  100. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  101. Apache Kafka
    https://kafka.apache.org/
  102. Iron
    http://www.iron.io/mq
  103. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  104. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  105. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  106. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  107. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  108. Enqueueing internals
    http://python-rq.org/contrib/
  109. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  110. Queues
    http://queues.io/
  111. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  112. RestMQ
    http://restmq.com/
  113. ActiveMQ
    http://activemq.apache.org/
  114. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  115. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  116. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  117. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  118. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  119. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  120. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  121. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  122. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  123. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  124. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  125. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  126. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  127. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  128. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  129. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  130. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  131. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  132. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  133. Jupyter
    https://jupyter.org/
  134. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  135. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html
Našli jste v článku chybu?