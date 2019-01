11. Připojení zdroje zpráv i jejich příjemce k forwarderu

12. Spuštění tří producentů zpráv a tří příjemců připojených na jediný forwarder

13. Praktická ukázka filtrace zpráv: tři producenti a čtyři příjemci zpráv

14. Výsledky činnosti předchozí aplikace

15. Repositář s demonstračními příklady

16. Odkazy na předchozí části seriálu

17. Odkazy na Internetu

1. Využití zařízení v knihovně ØMQ při tvorbě systému se složitější architekturou

Ve třetím článku o knihovně ØMQ i o rozhraní PyZMQ určeném pro použití knihovny ØMQ z Pythonu si ukážeme, jakým způsobem je možné použít takzvaná zařízení (device). Tímto názvem se v ØMQ označují jednoduché služby zajišťující roli prostředníka (intermediary) mezi jednotlivými komunikujícími uzly implementovaného systému, umístěné například mezi klienta a server popř. mezi zdroji zpráv (publisher) a jejími příjemci (subscriber). Na tomto místě se asi můžete ptát, proč vlastně potřebujeme nějaké prostředníky mezi jednotlivými částmi aplikace? Není jednodušší přímo propojit klienty se serverem a zdroje zpráv přímo s příjemci? V případě, že je architektura navrhovaného systému složena jen z několika komunikujících uzlů a jejich vzájemná konfigurace je relativně neměnná, není (alespoň většinou) zapotřebí prostředníky používat. Typickým příkladem může být systém s jedním zdrojem zpráv, ke kterému se připojuje několik (předem neomezený počet) příjemců, popř. jednoduchá pipeline s jedním producentem a několika konzumenty založená na komunikační strategii PUSH-PULL.

Obrázek 1: Jednosměrná komunikace využívající strategii PUSH-PULL.

V předchozí části tohoto seriálu jsme si navíc řekli, že se komunikující uzly většinou v praxi rozlišují podle toho, zda se jedná o uzly (relativně) stabilní či o uzly, které se mohou dynamicky připojovat a odpojovat podle aktuálních potřeb aplikace. První skupina uzlů typicky používá připojení s využitím metody Socket.bind(), protože právě tyto uzly by měly používat předem známá čísla portů. Naopak druhá skupina uzlů vytváří druhý konec komunikačního kanálu s využitím metody Socket.connect(), protože se připojuje k již otevřenému a předem známému číslu portu. Nejtypičtějším příkladem je komunikační strategie REQ-REP, v níž server (popř. větší množství serverů) otevírá sockety na předem známých portech, na nichž očekává požadavky od klientů. Naopak klienti se k těmto portům připojují, což znamená, že klienti musí mít k dispozici konfiguraci se seznamem serverů, k nimž se mohou připojovat a při změně serverů je nutné o této skutečnosti klienty informovat. Toto již dříve popsané schéma je zobrazeno na následujícím diagramu:

Obrázek 2: Systém s uzly, které mezi sebou komunikují s využitím strategie REQ-REP.

Poznámka: v tomto článku, podobně jako v celém seriálu, se setkáme s relativně velkým množstvím anglických pojmů, které nemají svůj ustálený český ekvivalent. Do závěrečné části seriálu je naplánován stručný výkladový slovník, který ovšem ještě není dokončen, takže prozatím prosím o laskavé strpení.

2. Role zařízení v knihovně ØMQ

Poněkud méně jasná může být role jednotlivých komunikujících uzlů při použití strategie PUSH-PULL, zejména tehdy, pokud tuto strategii použijeme současně s rozesíláním zpráv mezi větší množství workerů (fan-out) a s akumulací výsledků workerů v jednom koncovém uzlu typu collector (fan-in). Prozatím jsme tento problém řešili takovým způsobem, že první uzel, který vytváří úkoly pro workery, vystupuje v roli serveru, který použije socket typu PUSH. Workeři otevírají dva sockety – jeden vstupní (PULL) pro získání parametrů úkolu a druhý výstupní (PUSH) pro poslání výsledků. Vzhledem k tomu, že workeři se typicky připojují a odpojují podle zátěže aplikace, vystupují v roli klienta. A konečně poslední uzel, který sbírá výsledky práce workerů, používá socket typu PULL a musí se jednat o server, aby se k němu mohli workeři připojovat. Opět se podívejme na to, jak vlastně celá architektura aplikace vypadá:

Obrázek 3: Architektura, ve které se používá rozesílání zpráv mezi větší množství workerů (fan-out) a následný sběr zpráv v collectoru.

V praxi se ovšem setkáme i se složitějšími situacemi. Představme si například škálovatelný systém s komunikační strategií typu REQ-REP, v němž budeme potřebovat přidávat a ubírat servery podle toho, jak se bude měnit zátěž aplikace. V takovém případě nastane problém – jakým způsobem vlastně informovat klienty, ke kterým portům se mají připojit? A je vůbec vhodné, aby se po přidání/ubrání serveru museli všichni klienti znovu konfigurovat? Tyto (velmi časté, ovšem někdy přehlížené) případy je možné řešit následujícím způsobem: vzhledem k tomu, že jak klienti, tak i servery jsou z hlediska architektury aplikace „pohyblivé“ (nestabilní) uzly, musíme mezi ně přidat nějakého prostředníka, který bude zprostředkovávat komunikaci – v tom nejjednodušším případě bude pouze přeposílat data mezi klienty a servery. Tím pádem bude právě tento prostředník stabilní částí, která bude sockety vytvářet metodou Socket.bind() a ostatní uzly – jak klienti, tak i servery – se budou k tomuto prostředníku připojovat pomocí Socket.connect().

Nic nám samozřejmě nebrání si takové prostředníky naprogramovat, ovšem je to zbytečně složité a hlavně není zapotřebí znovuobjevovat kolo, protože knihovna ØMQ některé typické implementace prostředníků již obsahuje. Říká se jim, jak již ostatně víme z úvodní kapitoly, zařízení:

Zařízení Stručný popis ZMQ.QUEUE prostředník používaný především v komunikaci REQ-REP (klasický klient, server) ZMQ.FORWARDER používá se jako prostředník mezi zdroji zpráv a jejich příjemci PUB-SUB ZMQ.STREAMER používá se v komunikační strategii PUSH-PULL

V dalších kapitolách si ukážeme, jak se jednotlivé typy zařízení mohou použít v praxi.

ZMQ.QUEUE jsou obchody, k nimž se „připojují“ jak zákazníci (klienti), tak i výrobci (servery). Současně obchodníci implementují i frontu zmíněnou v názvu zařízení, a to jak frontu jednostrannou (sklad zboží), tak mnohdy i oboustrannou (seznam objednávek). Povšimněte si, že tato – samozřejmě že jen nepřesná – analogie nevyžaduje, aby výrobci znali adresy zákazníků a naopak: jediným styčným bodem je právě obchod. Současně je, podobně jako v knihovně ØMQ, zajištěna i situace, kdy do obchodu začnou chodit noví zákazníci, staří naopak odejdou, naveze se zboží od dalšího výrobce atd. atd. Zprostředkovatele ostatně známe i z reálného světa. Typickým příkladem „reálné implementace“ zařízení typujsou obchody, k nimž se „připojují“ jak zákazníci (klienti), tak i výrobci (servery). Současně obchodníci implementují i frontu zmíněnou v názvu zařízení, a to jak frontu jednostrannou (sklad zboží), tak mnohdy i oboustrannou (seznam objednávek). Povšimněte si, že tato – samozřejmě že jen nepřesná – analogie nevyžaduje, aby výrobci znali adresy zákazníků a naopak: jediným styčným bodem je právě obchod. Současně je, podobně jako v knihovně ØMQ, zajištěna i situace, kdy do obchodu začnou chodit noví zákazníci, staří naopak odejdou, naveze se zboží od dalšího výrobce atd. atd.

3. Komunikační strategie REQ-REP bez použití zařízení typu „Queue“

Nejprve si připomeňme, jakým způsobem jsme implementovali jednoduchého klienta a server, u nichž se pro vzájemnou komunikaci používala strategie REQ-REP. To znamená, že server naslouchá na předem známém portu na požadavky (request) klienta, na něž by měl odpovědět (response). Server se po svém spuštění navazuje na předem známý port a tudíž používá metodu Socket.bind(). Naproti tomu klient se k tomuto portu připojuje s využitím metody Socket.connect().

Implementace serveru využívajícího socket typu REP vypadá následovně:

import zmq from math import factorial def bind(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://*:{port}".format(port=port) socket.bind(address) print("Bound to address {a}".format(a=address)) return socket def send_response(socket, response): """Odeslání odpovědi.""" print("Sending response '{r}'".format(r=response)) socket.send_string(response) def receive_request(socket): """Zpracování požadavku klienta.""" request = socket.recv_string() print("Received request from client: '{r}'".format(r=request)) return request def start_server(): """Spuštění serveru.""" socket = bind(5556, zmq.REP) while True: request = receive_request(socket) try: n = int(request) fact = factorial(n) send_response(socket, "{n}! = {f}".format(n=n, f=fact)) except Exception as e: send_response(socket, "Wrong input") start_server()

Samozřejmě si ukážeme i implementaci klienta se socketem typu REQ:

import zmq def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_request(socket, request): """Poslání požadavku.""" print("Sending request '{r}'".format(r=request)) socket.send_string(request) def start_client(): """Spuštění klienta.""" socket = connect(5556, zmq.REQ) send_request(socket, "1") print(socket.recv_string()) print() send_request(socket, "10") print(socket.recv_string()) print() send_request(socket, "xyzzy") print(socket.recv_string()) print() start_client()

4. Použití zařízení typu „Queue“

Jak jsme si již řekli ve druhé kapitole, je možné a v mnoha případech i velmi žádoucí mezi klienty a server(y) vložit prostředníka, který je v knihovně ØMQ představován zařízením typu ZMQ.QUEUE. Celá konfigurace aplikace se změní, i když změny nejsou nijak zásadní:

Server se již nebude navazovat na port pomocí metodySocket.bind(), ale použije běžné „klientské“ připojení Socket.connect(). V našem konkrétním případě se použije port 5555. Klient stále bude navazovat připojení přesSocket.connect(), ovšem na odlišné číslo portu. Pro jednoduchost použijeme port 5556, tedy port s číslem o jedničku vyšším, než je port, na který se připojuje server. A konečně – mezi klienty a serverem (servery) bude ležet zprostředkovatel implementovaný zařízením typu ZMQ.QUEUE. Tento zprostředkovatel bude navázán na oba výše zmíněné porty 5555 i 5556, protože se na něj z jedné strany budou připojovat servery a ze strany druhé klient (připomeňme si, že sockety v ØMQ umožňují oboustrannou komunikaci).

Zprostředkovatel na straně, která komunikuje se serverem, použije socket typu XREQ a na straně klientů socket typu XREP. Původní komunikační schéma REQ-REP se tedy změní na REQ-XREP/XREQ-REP.

Obrázek 4: Komunikační schéma REQ-XREP/XREQ-REP.

První implementace prostředníka může vypadat následovně:

import zmq XREP_PORT = 5556 XREQ_PORT = 5557 def create_queue(xrep_port, xreq_port): """Vytvoření fronty.""" context = zmq.Context() frontend = context.socket(zmq.XREP) address = "tcp://*:{port}".format(port=xrep_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xrep_port)) backend = context.socket(zmq.XREQ) address = "tcp://*:{port}".format(port=xreq_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xreq_port)) zmq.device(zmq.QUEUE, frontend, backend) create_queue(5556, 5557)

5. Klient a server, kteří se k frontě připojují

V implementaci klienta pošleme serveru (zprostředkovaně) několik požadavků na výpočet faktoriálu. Konkrétně budeme chtít, aby server vypočítal a vrátil faktoriál jedné, deseti, řetězce „xyzzy“ (což je samozřejmě chyba), –10 (taktéž chyba) a konečně faktoriál hodnoty 100. Odpověď serveru se přímo vypíše na standardní výstup (terminál):

import zmq PORT = 5556 def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_request(socket, request): """Poslání požadavku.""" print("Sending request '{r}'".format(r=request)) socket.send_string(request) def start_client(): """Spuštění klienta.""" socket = connect(PORT, zmq.REQ) send_request(socket, "1") print(socket.recv_string()) print() send_request(socket, "10") print(socket.recv_string()) print() send_request(socket, "xyzzy") print(socket.recv_string()) print() send_request(socket, "-10") print(socket.recv_string()) print() send_request(socket, "100") print(socket.recv_string()) print() start_client()

V implementaci serveru naproti tomu očekáváme požadavky na portu 5557 (nikoli 5556!) a pokud se jedná o celé číslo, pokusíme se vypočítat jeho faktoriál a vrátit výsledek zpět klientovi. Ve skutečnosti se výsledky opět budou vracet nepřímo přes zařízení ZMQ.QUEUE:

import zmq from math import factorial PORT = 5557 def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_response(socket, response): """Odeslání odpovědi.""" print("Sending response '{r}'".format(r=response)) socket.send_string(response) def receive_request(socket): """Zpracování požadavku klienta.""" request = socket.recv_string() print("Received request from client: '{r}'".format(r=request)) return request def start_server(): """Spuštění serveru.""" socket = connect(PORT, zmq.REP) while True: request = receive_request(socket) try: n = int(request) fact = factorial(n) send_response(socket, "{n}! = {f}".format(n=n, f=fact)) except Exception as e: send_response(socket, "Wrong input") start_server()

Podívejme se na chování celého systému ve chvíli, kdy spustíme zprostředkovatele, dále server a nakonec klienta (či několik klientů). Následují výpisy generované jednotlivými uzly na standardní výstup.

Zprávy vypsané zprostředkovatelem:

$ python3 queue.py Bound to tcp://*:5556 on port 5556 Bound to tcp://*:5557 on port 5557

Zprávy vypisované serverem:

$ python3 server.py Connected to tcp://localhost:5557 Received request from client: '1' Sending response '1! = 1' Received request from client: '10' Sending response '10! = 3628800' Received request from client: 'xyzzy' Sending response 'Wrong input' Received request from client: '-10' Sending response 'Wrong input' Received request from client: '100' Sending response '100! = 93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000'

Zprávy vypisované klientem:

$ python3 client.py Connected to tcp://localhost:5556 Sending request '1' 1! = 1 Sending request '10' 10! = 3628800 Sending request 'xyzzy' Wrong input Sending request '-10' Wrong input Sending request '100' 100! = 93326215443944152681699238856266700490715968264381621468592963895217599993229915608941463976156518286253697920827223758251185210916864000000000000000000000000

6. Vylepšení předchozího příkladu použitím dekorátorů

Předchozí příklad s implementací klienta, serveru a zprostředkovatele typu ZMQ.QUEUE ještě upravíme takovým způsobem, aby se v jednotlivých skriptech nemusel explicitně vytvářet (a rušit) kontext ØMQ popř. vytvořené sockety. Využijeme přitom dekorátory, s jejichž použitím jsme se seznámili minule.

Implementace vlastního zprostředkovatele:

import zmq from zmq.decorators import context, socket XREP_PORT = 5556 XREQ_PORT = 5557 @context() @socket(zmq.XREP) @socket(zmq.XREQ) def create_queue(xrep_port, xreq_port, context, frontend, backend): """Vytvoření fronty.""" context = zmq.Context() address = "tcp://*:{port}".format(port=xrep_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xrep_port)) address = "tcp://*:{port}".format(port=xreq_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xreq_port)) zmq.device(zmq.QUEUE, frontend, backend) create_queue(5556, 5557)

Implementace klienta může vypadat následovně:

import zmq from zmq.decorators import context, socket CONNECTION_TYPE = zmq.REQ PORT = 5556 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) def send_request(socket, request): """Poslání požadavku.""" print("Sending request '{r}'".format(r=request)) socket.send_string(request) @context() @socket(CONNECTION_TYPE) def start_client(port, context, socket): """Spuštění klienta.""" connect(socket, port) send_request(socket, "1") print(socket.recv_string()) print() send_request(socket, "10") print(socket.recv_string()) print() send_request(socket, "xyzzy") print(socket.recv_string()) print() send_request(socket, "-10") print(socket.recv_string()) print() send_request(socket, "100") print(socket.recv_string()) print() start_client(PORT)

Implementace serveru vypadá takto:

import zmq from zmq.decorators import context, socket from math import factorial CONNECTION_TYPE = zmq.REP PORT = 5557 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) def send_response(socket, response): """Odeslání odpovědi.""" print("Sending response '{r}'".format(r=response)) socket.send_string(response) def receive_request(socket): """Zpracování požadavku klienta.""" request = socket.recv_string() print("Received request from client: '{r}'".format(r=request)) return request @context() @socket(CONNECTION_TYPE) def start_server(port, context, socket): """Spuštění serveru.""" connect(socket, port) while True: request = receive_request(socket) try: n = int(request) fact = factorial(n) send_response(socket, "{n}! = {f}".format(n=n, f=fact)) except Exception as e: send_response(socket, "Wrong input") start_server(PORT)

7. Klasická architektura PUB-SUB a její omezení

V této části článku se budeme zabývat komunikační strategií PUB-SUB a role zprostředkovatele při použití této strategie. Nejprve si ukažme klasický příklad té nejjednodušší strategie PUB-SUB s jediným zdrojem zpráv a s jediným příjemcem.

Takto vypadá zdroj zpráv, který otevírá port 5556 a na tento port každou sekundu pošle zprávu bez ohledu na to, kdo a kdy (a zda vůbec) zprávu přijme:

import zmq from zmq.decorators import context, socket from time import sleep from os import getpid CONNECTION_TYPE = zmq.PUB PORT = 5556 def bind(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://*:{port}".format(port=port) socket.bind(address) print("Bound to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(port, context, socket): """Spuštění publisheru.""" pid = getpid() print("Publisher PID={pid}".format(pid=pid)) bind(socket, port) for i in range(100): message = "Message #{i} from {pid}".format(i=i, pid=pid) publish_message(socket, message) sleep(1) start_publisher(PORT)

Příjemce zpráv očekává zprávy na portu 5556. Filtrace je sice prováděna (což je u komunikační strategie PUB-SUB nutnost), ovšem filtr je nastaven na prázdný řetězec, takže ve skutečnosti jsou akceptovány všechny zprávy:

import zmq from zmq.decorators import context, socket CONNECTION_TYPE = zmq.SUB PORT = 5556 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) @context() @socket(CONNECTION_TYPE) def start_subscriber(port, context, socket): """Spuštění příjemce.""" connect(socket, port) socket.setsockopt_string(zmq.SUBSCRIBE, "") print("Waiting for messages...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) start_subscriber(PORT)

Následuje ukázka komunikace mezi zdrojem zpráv a jejich příjemcem:

$ python3 subscriber.py Connected to tcp://localhost:5556 Waiting for messages... Received message 'Message #1 from 4330' Received message 'Message #2 from 4330' Received message 'Message #3 from 4330' Received message 'Message #4 from 4330'

$ python3 publisher.py Publisher PID=4330 Bound to tcp://*:5556 Publishing message 'Message #0 from 4330' Publishing message 'Message #1 from 4330' Publishing message 'Message #2 from 4330' Publishing message 'Message #3 from 4330' Publishing message 'Message #4 from 4330' ... ... ...

8. Připojení příjemce zpráv k většímu množství publisherů

Ve skutečnosti je možné nejjednodušší architekturu z předchozí kapitoly s jedním zdrojem zpráv a jedním příjemcem rozšířit, a to jak o více příjemců (což je triviální), tak i o větší množství zdrojů zpráv. Ukažme si tuto druhou možnost, která je z implementačního hlediska mnohem zajímavější.

Nejprve bude uveden zdrojový kód prvního publisheru, který bude posílat zprávy na port číslo 5556. Mezi jednotlivými zprávami bude pomlka o trvání přibližně jedné sekundy:

import zmq from zmq.decorators import context, socket from time import sleep from os import getpid CONNECTION_TYPE = zmq.PUB PORT = 5556 def bind(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://*:{port}".format(port=port) socket.bind(address) print("Bound to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(port, context, socket): """Spuštění publisheru.""" pid = getpid() print("Publisher PID={pid}".format(pid=pid)) bind(socket, port) for i in range(100): message = "Message #{i} from {pid}".format(i=i, pid=pid) publish_message(socket, message) sleep(1) start_publisher(PORT)

Druhý publisher se do značné míry podobá publisheru prvnímu, ovšem zprávy se v tomto případě posílají na port 5557 a nikoli na port 5556. Zprávy se taktéž posílají rychleji, přibližně každých 300 milisekund:

import zmq from zmq.decorators import context, socket from time import sleep from os import getpid CONNECTION_TYPE = zmq.PUB PORT = 5557 def bind(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://*:{port}".format(port=port) socket.bind(address) print("Bound to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(port, context, socket): """Spuštění publisheru.""" pid = getpid() print("Publisher PID={pid}".format(pid=pid)) bind(socket, port) for i in range(100): message = "Message #{i} from {pid}".format(i=i, pid=pid) publish_message(socket, message) sleep(0.3) start_publisher(PORT)

Nejzajímavější je příjemce zpráv, který nyní musí zprávy očekávat na dvou portech 5556 a současně i 5557. Ve skutečnosti je ovšem implementace příjemce stále velmi jednoduchá, a to z toho důvodu, že zobecněné sockety v knihovně ØMQ umožňují současné připojení k většímu množství portů:

ports = (5556, 5557) for port in ports: connect(socket, port)

Úplný zdrojový kód tohoto příjemce zpráv vypadá následovně:

import zmq from zmq.decorators import context, socket CONNECTION_TYPE = zmq.SUB PORT1 = 5556 PORT2 = 5557 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) @context() @socket(CONNECTION_TYPE) def start_subscriber(ports, context, socket): """Spuštění příjemce.""" for port in ports: connect(socket, port) socket.setsockopt_string(zmq.SUBSCRIBE, "") print("Waiting for messages...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) start_subscriber((PORT1, PORT2))

9. Otestování chování aplikace se dvěma publishery a jedním příjemcem zpráv

Ukažme si nyní, jakým způsobem vlastně bude příjemce získávat a vypisovat zprávy vytvářené dvěma producenty.

Spustíme prvního producenta zpráv:

$ python3 publisher1.py Publisher PID=4396 Bound to tcp://*:5556 Publishing message 'Message #0 from 4396' Publishing message 'Message #1 from 4396' Publishing message 'Message #2 from 4396' Publishing message 'Message #3 from 4396' Publishing message 'Message #4 from 4396' Publishing message 'Message #5 from 4396' ... ... ...

Ihned poté spustíme druhého producenta zpráv:

$ python3 publisher2.py Publisher PID=4399 Bound to tcp://*:5557 Publishing message 'Message #0 from 4399' Publishing message 'Message #1 from 4399' Publishing message 'Message #2 from 4399' Publishing message 'Message #3 from 4399' Publishing message 'Message #4 from 4399' Publishing message 'Message #5 from 4399' Publishing message 'Message #6 from 4399' Publishing message 'Message #7 from 4399' Publishing message 'Message #8 from 4399' Publishing message 'Message #9 from 4399' ... ... ...

Pokud nyní spustíme konzumenta (příjemce) zpráv, měl by vypisovat zprávy, které postupně přicházejí od obou producentů:

$ python3 subscriber.py Connected to tcp://localhost:5556 Connected to tcp://localhost:5557 Waiting for messages... Received message 'Message #1 from 4396' Received message 'Message #2 from 4396' Received message 'Message #3 from 4396' Received message 'Message #4 from 4396' Received message 'Message #1 from 4399' Received message 'Message #2 from 4399' Received message 'Message #3 from 4399' Received message 'Message #5 from 4396' Received message 'Message #4 from 4399' Received message 'Message #5 from 4399' Received message 'Message #6 from 4399' Received message 'Message #7 from 4399' Received message 'Message #6 from 4396' Received message 'Message #8 from 4399' Received message 'Message #9 from 4399' ... ... ...

Poznámka: povšimněte si, že zprávy ze druhého zdroje zpráv (PID=4399) skutečně dostáváme s přibližně třikrát větší frekvencí než od zdroje prvního.

10. Vylepšení architektury s využitím tzv. forwarderu

Dalším typem zařízení, s nímž se v dnešním článku seznámíme, je zařízení typu forwarder (ZMQ.FORWARDER). Toto zařízení se používá ve chvíli, kdy jednotlivé uzlu komunikují se strategií PUB-SUB, tj. pokud máme uzly produkující zprávy (producers) a uzly zprávy přijímající (consumers). Připomeňme si, že producenti zpráv nijak nekontrolují kdo a kdy zprávy vlastně přijme – dokonce se může stát (a je to zcela legální a vlastně i očekávané chování), že se některé zprávy ztratí. Vzhledem k tomu, že komunikační strategie typu PUB-SUB je velmi jednoduchá, nepoužívá potvrzování atd., je i použití zařízení ZMQ.FORWARDER přímočaré, což je patrné z následujícího schématu:

Obrázek 5: Zařízení typu ZMQ.FORWARDER.

Vidíme, že forwarder přijímá zprávy socketem typu SUB a posílá je socketem typu PUB, takže se původní komunikační strategie PUB-SUB změní na PUB-SUB/PUB-SUB. Nesmíme ovšem zapomenout na to, že zprávy je možné filtrovat, což platí i pro forwarder. Pro jednoduchost si naimplementujeme forwarder, který přeposílá všechny zprávy, tj. jeho filtr bude prázdný:

import zmq from zmq.decorators import context, socket SUB_PORT = 5556 PUB_PORT = 5557 @context() @socket(zmq.SUB) @socket(zmq.PUB) def create_queue(sub_port, pub_port, context, frontend, backend): """Vytvoření forwarderu.""" context = zmq.Context() address = "tcp://*:{port}".format(port=sub_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=sub_port)) frontend.setsockopt_string(zmq.SUBSCRIBE, "") address = "tcp://*:{port}".format(port=pub_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=pub_port)) zmq.device(zmq.FORWARDER, frontend, backend) create_queue(SUB_PORT, PUB_PORT)

11. Připojení zdroje zpráv i jejich příjemce k forwarderu

Samotného producenta zpráv bude nutné upravit pouze nepatrně – změníme metodu připojení ze Socket.bind() a na Socket.connect(). Výsledná podoba producenta (zdroje zpráv) bude vypadat takto:

import zmq from zmq.decorators import context, socket from time import sleep from os import getpid CONNECTION_TYPE = zmq.PUB PORT = 5556 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(port, context, socket): """Spuštění publisheru.""" pid = getpid() print("Publisher PID={pid}".format(pid=pid)) connect(socket, port) for i in range(100): message = "Message #{i} from {pid}".format(i=i, pid=pid) publish_message(socket, message) sleep(1) start_publisher(PORT)

Konzument, tj. příjemce zpráv, se prakticky nezmění, pouze nesmíme zapomenout na to, že se nebude připojovat přímo k producentovi na portu 5556, ale k forwarderu na portu 5557. Opět se podívejme na jeho zdrojový kód:

import zmq from zmq.decorators import context, socket CONNECTION_TYPE = zmq.SUB PORT = 5557 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) @context() @socket(CONNECTION_TYPE) def start_subscriber(port, context, socket): """Spuštění příjemce.""" connect(socket, port) socket.setsockopt_string(zmq.SUBSCRIBE, "") print("Waiting for messages...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) start_subscriber(PORT)

12. Spuštění tří producentů zpráv a tří příjemců připojených na jediný forwarder

Nyní si ukažme, jak bude systém reagovat ve chvíli, kdy spustíme tři producenty zpráv a současně i tři příjemce. Mezi producenty a příjemcem bude vložen forwarder popsaný v předchozích kapitolách. Nejprve nepatrně upravíme producenta zpráv takovým způsobem, abychom mohli specifikovat jeho jméno a taktéž interval mezi zprávami na příkazovém řádku:

import zmq from zmq.decorators import context, socket from time import sleep from sys import argv, exit CONNECTION_TYPE = zmq.PUB PORT = 5556 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(name, delay, port, context, socket): """Spuštění publisheru.""" print("Publisher '{name}'".format(name=name)) connect(socket, port) for i in range(100): message = "Message #{i} from {name}".format(i=i, name=name) publish_message(socket, message) sleep(delay) if len(argv) <= 2: print('Please provide publisher name and sleep amount on the CLI') exit(1) name = argv[1] delay = float(argv[2]) start_publisher(name, delay, PORT)

Celý test spustíme následujícím skriptem:

export PYTHONUNBUFFERED=yes python3 forwarder.py > forwarder.out & forwarder_pid=$! python3 publisher.py Publisher1 1 > publisher1.out 2>&1 & publisher1_pid=$! python3 publisher.py Publisher2 0.5 > publisher2.out 2>&1 & publisher2_pid=$! python3 publisher.py Publisher3 0.2 > publisher3.out 2>&1 & publisher3_pid=$! python3 subscriber.py > subscriber1.out & subscriber1_pid=$! python3 subscriber.py > subscriber2.out & subscriber2_pid=$! python3 subscriber.py > subscriber3.out & subscriber3_pid=$! sleep 20 kill $forwarder_pid kill $publisher1_pid kill $publisher2_pid kill $publisher3_pid kill $subscriber1_pid kill $subscriber2_pid kill $subscriber3_pid

Ukázka výstupu z prvního příjemce zpráv:

Connected to tcp://localhost:5557 Waiting for messages... Received message 'Message #1 from Publisher3' Received message 'Message #2 from Publisher3' Received message 'Message #1 from Publisher2' Received message 'Message #3 from Publisher3' Received message 'Message #4 from Publisher3' Received message 'Message #1 from Publisher1' Received message 'Message #5 from Publisher3' Received message 'Message #2 from Publisher2' Received message 'Message #6 from Publisher3' Received message 'Message #7 from Publisher3' Received message 'Message #3 from Publisher2' Received message 'Message #8 from Publisher3' Received message 'Message #9 from Publisher3' Received message 'Message #2 from Publisher1'

13. Praktická ukázka filtrace zpráv: tři producenti a čtyři příjemci zpráv

Předchozí příklad si ještě upravíme, a to takovým způsobem, aby se nepatrně pozměnil formát zpráv – ty nyní budou začínat jménem zdroje zpráv a navíc konzumenti zpráv budou obsahovat filtr konfigurovatelný z příkazové řádky. Samotný forwarder je stále stejný:

import zmq from zmq.decorators import context, socket SUB_PORT = 5556 PUB_PORT = 5557 @context() @socket(zmq.SUB) @socket(zmq.PUB) def create_queue(sub_port, pub_port, context, frontend, backend): """Vytvoření forwarderu.""" context = zmq.Context() address = "tcp://*:{port}".format(port=sub_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=sub_port)) frontend.setsockopt_string(zmq.SUBSCRIBE, "") address = "tcp://*:{port}".format(port=pub_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=pub_port)) zmq.device(zmq.FORWARDER, frontend, backend) create_queue(SUB_PORT, PUB_PORT)

Zdroj zpráv bude na začátek zprávy připisovat svoje jméno nastavitelné na příkazové řádce:

import zmq from zmq.decorators import context, socket from time import sleep from sys import argv, exit CONNECTION_TYPE = zmq.PUB PORT = 5556 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) def publish_message(socket, message): """Publikování zprávy zprávy.""" print("Publishing message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_publisher(name, delay, port, context, socket): """Spuštění publisheru.""" print("Publisher '{name}'".format(name=name)) connect(socket, port) for i in range(100): message = "{name}: Message #{i}".format(name=name, i=i) publish_message(socket, message) sleep(delay) if len(argv) <= 2: print('Please provide publisher name and sleep amount on the CLI') exit(1) name = argv[1] delay = float(argv[2]) start_publisher(name, delay, PORT)

Naproti tomu příjemce zpráv obsahuje konfigurovatelný filtr, opět specifikovatelný z příkazové řádky:

import zmq from zmq.decorators import context, socket from sys import argv, exit CONNECTION_TYPE = zmq.SUB PORT = 5557 def connect(socket, port): """Otevření socketu se specifikovaným typem spojení.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) @context() @socket(CONNECTION_TYPE) def start_subscriber(filter, port, context, socket): """Spuštění příjemce.""" connect(socket, port) socket.setsockopt_string(zmq.SUBSCRIBE, filter) print("Waiting for messages...") while True: message = socket.recv_string() print("Received message '{m}'".format(m=message)) if len(argv) <= 1: print('Please provide filter on the CLI') exit(1) start_subscriber(argv[1], PORT)

14. Výsledky činnosti předchozí aplikace

Pro otestování předchozí aplikace (resp. systému) použijeme upravený skript, ve kterém se při spuštění publisherů specifikují jejich jména a u jednotlivých příjemců zpráv pak filtry (v podstatě prefixy řetězců zpráv):

export PYTHONUNBUFFERED=yes python3 forwarder.py > forwarder.out & forwarder_pid=$! python3 publisher.py Publisher1 1 > publisher1.out 2>&1 & publisher1_pid=$! python3 publisher.py Publisher2 0.5 > publisher2.out 2>&1 & publisher2_pid=$! python3 publisher.py Publisher3 0.2 > publisher3.out 2>&1 & publisher3_pid=$! python3 subscriber.py "" > subscriber1.out & subscriber1_pid=$! python3 subscriber.py "Publisher1" > subscriber2.out & subscriber2_pid=$! python3 subscriber.py "Publisher2" > subscriber3.out & subscriber3_pid=$! python3 subscriber.py "PublisherX" > subscriber4.out & subscriber4_pid=$! sleep 20 kill $forwarder_pid kill $publisher1_pid kill $publisher2_pid kill $publisher3_pid kill $subscriber1_pid kill $subscriber2_pid kill $subscriber3_pid kill $subscriber4_pid

Výsledky pro prvního příjemce zpráv s prázdným filtrem:

Connected to tcp://localhost:5557 Waiting for messages... Received message 'Publisher3: Message #1' Received message 'Publisher3: Message #2' Received message 'Publisher2: Message #1' Received message 'Publisher3: Message #3' Received message 'Publisher3: Message #4' Received message 'Publisher3: Message #5' Received message 'Publisher1: Message #1' Received message 'Publisher2: Message #2' Received message 'Publisher3: Message #6' ... ... ...

Výsledky pro druhého příjemce zpráv s filtrem nastaveným na „Publisher1“:

Connected to tcp://localhost:5557 Waiting for messages... Received message 'Publisher1: Message #1' Received message 'Publisher1: Message #2' Received message 'Publisher1: Message #3' ... ... ...

Výsledky pro třetího příjemce zpráv s filtrem nastaveným na „Publisher2“:

Connected to tcp://localhost:5557 Waiting for messages... Received message 'Publisher2: Message #1' Received message 'Publisher2: Message #2' Received message 'Publisher2: Message #3' ... ... ...

Výsledky pro čtvrtého příjemce zpráv s filtrem nastaveným na „PublisherX“:

Connected to tcp://localhost:5557 Waiting for messages...

Poznámka: ze zobrazených výsledků je patrné, že filtrace je skutečně funkční a že probíhá na straně příjemce, nikoli na straně odesilatele. Ovšem ve starších verzích knihovny ØMQ byla sémantika PUB-SUB odlišná, proto je někdy zapotřebí přepsat starší aplikace takovým způsobem, aby používaly filtraci korektně.

15. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu a taktéž v programovacím jazyku C byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.

Příklad Skript/kód Popis Cesta 1 client.py komunikační strategie REQ-REP, klientská část https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example05/client.py 1 server.py komunikační strategie REQ-REP, serverová část https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example05/server.py 2 client.py komunikační schéma REQ-XREP/XREQ-REP, implementace klienta https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example16_queue_device/cli­ent.py 2 server.py komunikační schéma REQ-XREP/XREQ-REP, implementace serveru https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example16_queue_device/ser­ver.py 2 queue.py komunikační schéma REQ-XREP/XREQ-REP, implementace fronty https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example16_queue_device/qu­eue.py 3 client.py komunikační schéma REQ-XREP/XREQ-REP, implementace klienta, použití dekorátorů https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example17_queue_device_pro­per_close/client.py 3 server.py komunikační schéma REQ-XREP/XREQ-REP, implementace serveru, použití dekorátorů https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example17_queue_device_pro­per_close/server.py 3 queue.py komunikační schéma REQ-XREP/XREQ-REP, implementace fronty, použití dekorátorů https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example17_queue_device_pro­per_close/queue.py 4 publisher.py komunikační schéma PUB-SUB, implementace publishera https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example18_publisher_sub­scriber/publisher.py 4 subscriber.py komunikační schéma PUB-SUB, implementace subscribera https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example18_publisher_sub­scriber/subscriber.py 5 publisher1.py komunikační schéma PUB-SUB, první publisher https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example19_more_publisher­s/publisher1.py 5 publisher2.py komunikační schéma PUB-SUB, druhý publisher https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example19_more_publisher­s/publisher2.py 5 subscriber.py komunikační schéma PUB-SUB, příjemce zpráv https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example19_more_publisher­s/subscriber.py 6 forwarder.py komunikační schéma PUB-SUB, zařízení typu forwarder https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example20_forwarder/for­warder.py 6 publisher.py komunikační schéma PUB-SUB, publisher https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example20_forwarder/pu­blisher.py 6 subscriber.py komunikační schéma PUB-SUB, příjemce zpráv https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example20_forwarder/sub­scriber.py 7 forwarder.py komunikační schéma PUB-SUB, zařízení typu forwarder https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example21_more_pubs_sub­s_and_forwarder/forwarder­.py 7 publisher.py komunikační schéma PUB-SUB, publisher https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example21_more_pubs_sub­s_and_forwarder/publisher­.py 7 subscriber.py komunikační schéma PUB-SUB, příjemce zpráv https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example21_more_pubs_sub­s_and_forwarder/subscriber­.py 7 run.sh skript pro spuštění testu https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example21_more_pubs_sub­s_and_forwarder/run.sh 8 forwarder.py komunikační schéma PUB-SUB, zařízení typu forwarder https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example22_sub_filterin­g/forwarder.py 8 publisher.py komunikační schéma PUB-SUB, publisher https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example22_sub_filterin­g/publisher.py 8 subscriber.py komunikační schéma PUB-SUB, příjemce zpráv https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example22_sub_filterin­g/subscriber.py 8 run.sh skript pro spuštění testu https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example22_sub_filterin­g/run.sh 9* streamer.py použití zařízení typu streamer https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example23_push_pull_stre­amer/streamer.py 9* producer.py komunikační strategie PUSH-PULL, producent https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example23_push_pull_stre­amer/producer.py 9* consumer.py komunikační strategie PUSH-PULL, konzument https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example23_push_pull_stre­amer/consumer.py 10* producer.py producent na začátku pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/producer.py 10* worker1.py první worker jako součást pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/worker1.py 10* worker2.py druhý worker jako součást pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/worker2.py 10* collector.py shromažďovač výsledků na konci pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/collector.py

Poznámka: příklady označené hvězdičkou sice již v repositáři najdete, ovšem jejich podrobnější popis bude uveden v navazující části tohoto seriálu.

16. Odkazy na předchozí části seriálu

V této kapitole jsou uvedeny odkazy na všech sedm předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:

Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí

https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ Celery: systém implementující asynchronní fronty úloh pro Python

https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/ Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)

https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/ RabbitMQ: jedna z nejúspěšnějších implementací brokera

https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/ Pokročilejší operace nabízené systémem RabbitMQ

https://www.root.cz/clanky/po­krocilejsi-operace-nabizene-systemem-rabbitmq/ ØMQ: knihovna pro asynchronní předávání zpráv

https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/ Další možnosti poskytované knihovnou ØMQ

https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/

17. Odkazy na Internetu