Další možnosti poskytované knihovnou ØMQ

22. 1. 2019
Doba čtení: 39 minut

Sdílet

Ve druhém článku o knihovně ØMQ a rozhraní PyZMQ si ukážeme, jak korektně zavírat připojení s využitím správců kontextu, popř. dekorátorů. Dále si popíšeme přenos strukturovaných dat a komunikační strategii PUSH-PULL.

Obsah

1. Další možnosti poskytované knihovnou ØMQ

2. Problematika uzavírání prostředků: sockety a kontext ØMQ

3. Implementace serveru a klienta bez explicitního uzavírání prostředků

4. Explicitní kontrola chyb a uzavírání prostředků v blocích finally

5. Využití bloků with (správci kontextu v Pythonu)

6. Upravená implementace serveru a klienta

7. Použití dekorátorů

8. Třetí verze serveru a klienta

9. Současné využití obou dekorátorů contextsocket

10. Posílání složitějších datových struktur ve formátu JSON

11. Serializace a deserializace objektů

12. Model (komunikační strategie) PUSH-PULL

13. Producent a konzument používající strategii PUSH-PULL

14. Rozdělení zpráv mezi větší množství konzumentů ve strategii PUSH-PULL

15. Architektura producent-workeři-collector (fan-out a fan-in)

16. Implementace architektury producent-workeři-collector

17. Chování architektury s fan-out a fan-in

18. Repositář s demonstračními příklady

19. Odkazy na předchozí části seriálu

20. Odkazy na Internetu

1. Další možnosti poskytované knihovnou ØMQ

Ve druhém článku o knihovně ØMQ navážeme na část předchozí, v níž jsme se seznámili se základními koncepty, na nichž je knihovna ØMQ postavena. Připomeňme si pouze, že se jedná o nativní knihovnu vyvinutou v programovacím jazyku C++, pro níž však vznikla rozhraní umožňující její použití z mnoha dalších programovacích jazyků, a to včetně rozhraní pro jazyk C, Javu (celou JVM, tj. včetně JRuby, Jythonu, Scaly, Groovy, Clojure apod.), Python atd. Při práci s touto knihovnou se využívají především dva typy objektů. Jedná se o takzvaný kontext (context) a socket. Kontext si zjednodušeně řečeno můžeme představit jako kontejner (řekněme seznam – i když je to velmi nepřesné) s jednotlivými sockety, které klient/server otevřel a používá. Typicky se vytváří pouze jediný kontext pro celou aplikaci, a to nezávisle na tom, zda se jedná o jednoduchého jednovláknového klienta či o server, který komunikaci po každém socketu obsluhuje v jiném vláknu (kontext se tedy typicky vytváří ještě před spuštěním jednotlivých vláken).

Samotné sockety sice svým jménem připomínají klasické Berkeley/BSD sockety, ve skutečnosti se ale v kontextu knihovny ØMQ jedná o objekt reprezentující logickou „zásuvku“, nikoli konkrétní „zásuvku“ síťovou. Jaký je mezi oběma koncepty rozdíl? U ØMQ socketů se musí již při jejich konstrukci specifikovat typ (či možná lépe řečeno strategie) komunikace, samotná obsluha přenosu dat je řešena ve vlastním vláknu (do toho běžný programátor nezasahuje) a navíc je k socketu většinou připojena i fronta (queue) umožňující posílání zpráv i ve chvíli, kdy druhá strana připojení není připravena na příjem dat. U socketů v podání ØMQ je navíc možné specifikovat větší množství konkrétních přípojných bodů, například více portů protokolu TCP, na nichž server očekává dotazy od klientů. Příkladem může být následující implementace velmi jednoduchého serveru, který současně naslouchá na TCP portech 5556 a 5557:

import zmq
import time
 
 
def bind(port1, port2, connection_type):
    """Otevření socketu se specifikovaným typem spojení."""
    context = zmq.Context()
    socket = context.socket(connection_type)
    address1 = "tcp://*:{port}".format(port=port1)
    socket.bind(address1)
    print("Bound to address {a}".format(a=address1))
    address2 = "tcp://*:{port}".format(port=port2)
    socket.bind(address2)
    print("Bound to address {a}".format(a=address2))
    return socket
 
 
def send_message(socket, message):
    """Poslání zprávy."""
    print("Sending message '{m}'".format(m=message))
    socket.send_string(message)
 
 
def start_server():
    """Spuštění serveru."""
    socket = bind(5556, 5557, zmq.PAIR)
    for i in range(10):
        send_message(socket, "Message #{i}".format(i=i))
        time.sleep(1)
 
 
start_server()
Poznámka: ve skutečnosti jsou však možnosti ještě větší – jediný socket může naslouchat na TCP portu, současně na interní připojení v rámci samotného serveru apod.

Komunikace s využitím ØMQ socketů může probíhat s využitím různých technologií. Typicky se používá protokol TCP (Transmission Control Protocol), ovšem setkat se můžeme i s použitím TIPC (Transparent Inter-process Communication), IPC nebo i se sockety používanými „pouze“ pro komunikaci v rámci jediné aplikace – a to konkrétně pro přenos dat mezi jednotlivými vlákny. Pokud totiž pro tuto činnost využijeme možnosti nabízené ØMQ, přiblížíme se technologii gorutin a kanálů představených v programovacím jazyku Go, a to navíc bez toho, aby docházelo k uváznutím a podobným nepříjemnostem, které se typicky projevují až při produkčním nasazení.

Ještě si připomeňme, že knihovna ØMQ podporuje čtyři základní komunikační strategie:

  1. PAIR – jednosměrné či obousměrné propojení dvou procesů, z nichž každý může běžet na odlišném počítači. Tato strategie se nejvíce přibližuje běžnému použití klasických Berkeley socketů a setkali jsme se s ní minule. Jak klient, tak i server použijí při konstrukci socketu stejný typ: zmq.PAIR.
  2. REQ-REP – jedná se o komunikaci typu požadavek-odpověď. Požadavky posílají klienti, odpovědi generuje server, který dokáže obsloužit prakticky libovolné množství klientů. I s touto komunikační strategií jsme se setkali v předchozím článku. Klient používá při konstrukci socketu typ zmq.REQ, server pak typ zmq.REP.
  3. PUB-SUB – server zde publikuje zprávy, k jejichž odběru se mohou přihlásit různí klienti. Zprávy je možné filtrovat na straně klientů (tato vlastnost se ovšem ve starších verzích ØMQ odlišuje). Základní způsob použití této komunikační strategie jsme si popsali minule. Jedná se o jednosměrnou komunikaci, při níž může dojít ke ztrátě zpráv v případě, že se odběratel nepřipojí či pokud se odpojí ve chvíli, kdy je publikována další zpráva. Server publikující zprávy používá socket typu zmq.PUB, všichni odběratelé pak zmq.SUB.
  4. PUSH-PULL – rozšíření předchozí strategie PUB-SUB: server či servery vytváří zprávy zpracovávané buď přímo připojenými workery nebo celou kolonou (pipeline) workerů. Nedochází zde ke ztrátě dat, protože se na obou koncích komunikačního kanálu používají fronty. Navíc je možné vytvořit „pipeline“ složenou z většího množství uzlů, z nichž každý přijímá data na jednom portu (v případě TCP) a posílá zpracovaná data na jiný port. Přijímající konec komunikačního kanálu využívá socket typu zmq.PULL, posílající/vysílající konec pak socket typu zmq.PUSH.

Obrázek 1: Jednosměrná komunikace využívající strategii PAIR-PAIR.

Obrázek 2: Obousměrná komunikace využívající strategii PAIR-PAIR.

Obrázek 3: Obousměrná komunikace využívající strategii REQ-REP.

Obrázek 4: Jednosměrná komunikace využívající strategii PUB-SUB.

2. Problematika uzavírání prostředků: sockety a kontext ØMQ

Ve většině až doposud ukázaných příkladů jsme se příliš nezabývali tím, v jakém okamžiku vlastně budou uzavřeny sockety (a tím pádem i inicializovaná připojení) a kdy se ukončí platnost celého kontextu ØMQ. U zmíněných demonstračních příkladů, které jsme chtěli mít co nejkratší a nejsrozumitelnější, se počítá s tím, že se připojení uzavře (odpojí) automaticky ve chvíli, kdy budou objekty typu socket uvolňovány z paměti automatickým správcem paměti (garbage collector), protože v této chvíli by se měla zavolat speciální metoda __del__ (takzvaný finalizér). Podobně je tomu u kontextu, který je taktéž ukončen před finalizér zavolaný správcem paměti.

V produkčním kódu je však většinou mnohem lepší nespoléhat se na správce paměti (ostatně jeho sémantika při volání konstruktorů se už minimálně jednou změnila a obecně nemusí být zaručeno, že se finalizér vůbec stihne zavolat) a raději explicitně ukončit připojení ve chvíli, kdy už není zapotřebí. Tím se automaticky uvolní i příslušný TCP port, pokud pro připojení používáme TCP. Prakticky totéž platí o samotném kontextu ØMQ.

Abychom si otestovali, v jakém okamžiku (a zda vůbec) se prostředky/objekty typu context a socket skutečně uzavírají, budeme muset nepatrně modifikovat dva soubory knihovny PyZMQ. Pokud jste dodrželi postup instalace ØMQ a PyZMQ zmíněný v předchozím článku, měly by být tyto soubory uloženy v adresáři ~/.local/lib/python3.podver­ze/zmq/sugar/. Samotné úpravy jsou ve skutečnosti triviální a týkají se přidání volání funkce print() do metod se specifickými názvy __del__ a __exit__.

Patch pro úpravu souboru context.py:

--- context.py  2019-01-18 17:36:03.000000000 +0100
+++ ~/.local/lib/python3.6/site-packages/zmq/sugar/context.py   2019-01-18 18:00:23.726179780 +0100
@@ -42,6 +42,7 @@
 
     def __del__(self):
         """deleting a Context should terminate it, without trying non-threadsafe destroy"""
+        print("Context.__del__")
         if not self._shadow and not _exiting:
             self.term()
  
@@ -49,6 +50,7 @@
         return self
 
     def __exit__(self, *args, **kwargs):
+        print("Context.__exit__")
         self.term()
 
     def __copy__(self, memo=None):

Patch pro úpravu souboru socket.py:

--- socket.py   2019-01-18 17:36:03.000000000 +0100
+++ ~/.local/lib/python3.6/site-packages/zmq/sugar/socket.py    2019-01-18 18:01:56.788181903 +0100
@@ -63,6 +63,7 @@
             self._shadow = False
 
     def __del__(self):
+        print("Socket.__del__")
         if not self._shadow:
             self.close()
 
@@ -75,6 +76,7 @@
         return self
 
     def __exit__(self, *args, **kwargs):
+        print("Socket.__exit__")
         self.close()
 
     #-------------------------------------------------------------------------

Po těchto úpravách by se na standardní výstup klienta, serveru, workera atd. měly mj. vypisovat i zprávy o to, že se bude uzavírat socket, popř. celý kontext ØMQ.

3. Implementace serveru a klienta bez explicitního uzavírání prostředků

V dnešním prvním demonstračním příkladu se vrátíme k použití komunikační strategie PAIR, v níž klient i server vystupují ve stejné roli – mohou používat otevřený socket k jednosměrné či k obousměrné komunikaci. Server se od klienta odlišuje jen v tom, že spojení otevírá metodou socket.bind(), kterou se naváže na předem známý TCP port, zatímco klient používá metodu socket.connect() a připojuje se k portu otevřeném serverem. V příkladu nebude použito žádné explicitní uzavření socketů ani kontextu ØMQ.

Nejprve si ukažme implementaci serveru:

import zmq
import time
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
def send_message(socket, message):
    """Poslání zprávy."""
    print("Sending message '{m}'".format(m=message))
    socket.send_string(message)
 
 
def start_server():
    """Spuštění serveru."""
    context = zmq.Context()
    socket = context.socket(CONNECTION_TYPE)
    address = "tcp://*:{port}".format(port=PORT)
    socket.bind(address)
    print("Bound to address {a}".format(a=address))
 
    for i in range(10):
        send_message(socket, "Message #{i}".format(i=i))
        time.sleep(1)
 
 
start_server()

Následuje implementace klienta:

import zmq
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
def start_client():
    """Spuštění klienta."""
    context = zmq.Context()
 
    socket = context.socket(CONNECTION_TYPE)
    address = "tcp://localhost:{port}".format(port=PORT)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
    print("Waiting for message...")
    while True:
        message = socket.recv_string()
        print("Received message '{m}'".format(m=message))
 
 
start_client()

Po spuštění serveru se otevře port 5556, na němž server očekává připojení klienta. Po připojení odešle deset zpráv a ihned poté se automaticky ukončí:

$ python3 server.py 
 
Bound to address tcp://*:5556
Sending message 'Message #0'
Sending message 'Message #1'
Sending message 'Message #2'
Sending message 'Message #3'
Sending message 'Message #4'
Sending message 'Message #5'
Sending message 'Message #6'
Sending message 'Message #7'
Sending message 'Message #8'
Sending message 'Message #9'
Socket.__del__
Context.__del__
Poznámka: povšimněte si, že se finalizér zavolal nejprve pro socket a teprve poté pro kontext, což je logické, protože objekt představující socket je vlastněný kontextem.

Klient odebírá zprávy v nekonečné smyčce, takže ho musíme ukončit násilně:

$ python3 client.py 
 
Connected to tcp://localhost:5556
Waiting for message...
Received message 'Message #0'
Received message 'Message #1'
Received message 'Message #2'
Received message 'Message #3'
Received message 'Message #4'
Received message 'Message #5'
Received message 'Message #6'
Received message 'Message #7'
Received message 'Message #8'
Received message 'Message #9'
^CTraceback (most recent call last):
  File "client.py", line 23, in <module>
    start_client()
  File "client.py", line 19, in start_client
    message = socket.recv_string()
  File "/home/tester/.local/lib/python3.6/site-packages/zmq/sugar/socket.py", line 586, in recv_string
    msg = self.recv(flags=flags)
  File "zmq/backend/cython/socket.pyx", line 788, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 824, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc
KeyboardInterrupt
Socket.__del__
Context.__del__
Poznámka: opět si povšimněte, že socket i kontext byly řádně uzavřeny, i když to v obecném případě nemusí být zaručeno.

4. Explicitní kontrola chyb a uzavírání prostředků v blocích finally

Dnešní druhý demonstrační příklad bude mnohem delší, než příklad první, i když v něm implementujeme naprosto stejný algoritmus komunikace s využitím strategie PAIR. Tentokrát se ovšem kontroluje většina chyb, k nimž může dojít při vytváření kontextu, socketu, při posílání zpráv atd. Současně se explicitně provádí uzavírání socketů metodou socket.close() a kontextu metodou socket.term().

Nejprve si opět ukažme implementaci serveru:

import zmq
import time
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
def send_message(socket, message):
    """Poslání zprávy."""
    print("Sending message '{m}'".format(m=message))
    socket.send_string(message)
 
 
def start_server():
    """Spuštění serveru."""
    context = zmq.Context()
 
    try:
        socket = context.socket(CONNECTION_TYPE)
        address = "tcp://*:{port}".format(port=PORT)
        socket.bind(address)
        try:
            print("Bound to address {a}".format(a=address))
 
            for i in range(10):
                send_message(socket, "Message #{i}".format(i=i))
                time.sleep(1)
        except Exception as e:
            print(e)
        finally:
            print("Closing socket")
            socket.close()
            print("Closed")
    except Exception as e:
        print(e)
    finally:
        print("Terminating context")
        context.term()
        print("Terminated")
 
 
start_server()

Implementace klienta vypadá následovně:

import zmq
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
def start_client():
    """Spuštění klienta."""
    context = zmq.Context()
 
    try:
        socket = context.socket(CONNECTION_TYPE)
        address = "tcp://localhost:{port}".format(port=PORT)
        socket.connect(address)
        try:
            print("Connected to {a}".format(a=address))
 
            print("Waiting for message...")
            while True:
                message = socket.recv_string()
                print("Received message '{m}'".format(m=message))
        except Exception as e:
            print(e)
        finally:
            print("Closing socket")
            socket.close()
            print("Closed")
    except Exception as e:
        print(e)
    finally:
        print("Terminating context")
        context.term()
        print("Terminated")
 
 
start_client()

Z výpisu běhu serveru je patrné, že se nyní skutečně socket i kontext explicitně uvolní a teprve poté se pro oba objekty zavolá finalizér, který ovšem již další činnosti nebude provádět:

$ python3 server.py 
 
Bound to address tcp://*:5556
Sending message 'Message #0'
Sending message 'Message #1'
Sending message 'Message #2'
Sending message 'Message #3'
Sending message 'Message #4'
Sending message 'Message #5'
Sending message 'Message #6'
Sending message 'Message #7'
Sending message 'Message #8'
Sending message 'Message #9'
Closing socket
Closed
Terminating context
Terminated
Socket.__del__
Context.__del__

Prakticky totéž chování uvidíme i po spuštění klienta, samozřejmě s tím, rozdílem, že klienta ukončíme ručně klávesovou zkratkou Ctrl+C:

$ python3 client.py 
 
Connected to tcp://localhost:5556
Waiting for message...
Received message 'Message #0'
Received message 'Message #1'
Received message 'Message #2'
Received message 'Message #3'
Received message 'Message #4'
Received message 'Message #5'
Received message 'Message #6'
Received message 'Message #7'
Received message 'Message #8'
Received message 'Message #9'
^CClosing socket
Closed
Terminating context
Terminated
Traceback (most recent call last):
  File "client.py", line 37, in <module>
    start_client()
  File "client.py", line 21, in start_client
    message = socket.recv_string()
  File "/home/tester/.local/lib/python3.6/site-packages/zmq/sugar/socket.py", line 586, in recv_string
    msg = self.recv(flags=flags)
  File "zmq/backend/cython/socket.pyx", line 788, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 824, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc
KeyboardInterrupt
Socket.__del__
Context.__del__

5. Využití bloků with (správci kontextu v Pythonu)

Knihovna PyZMQ zajišťující rozhraní mezi vývojáři používajícími programovací jazyk Python a nativní knihovnou ØMQ je navržena takovým stylem, aby podporovala většinu idiomů známých v komunitě Pythonu. Jedním z těchto idiomů je používání správců kontextu a bloků with pro ty prostředky, které se mají automaticky uzavírat po odchodu z bloku with (a to jakýmkoli způsobem, včetně výskoku z funkce atd.):

with open('hello.txt', 'w') as fout:
    fout.write('Hi there!')

V praxi to znamená, že jak třída Context, tak i třída Socket implementují obě metody __enter__ a __exit__ nezbytné pro to, aby se jednalo o korektně naprogramované správce kontextu (context manager). Je tedy možné zapsat například tento kód:

    with zmq.Context() as context:
        with context.socket(CONNECTION_TYPE) as socket:
            address = "tcp://*:{port}".format(port=PORT)
            socket.bind(address)
            ...
            ...
            ...

Po opuštění vnitřního bloku with se zavře socket, po opuštění venkovního bloku pak dojde k ukončení celého kontextu ØMQ.

6. Upravená implementace serveru a klienta

Podívejme se nyní na způsob, jakým je možné správce kontextu (v praxi bloky with) použít při úpravě serveru a i klienta. Vlastní kód bude jednodušší, než v předchozím příkladu, protože nemusíme zapisovat konstrukci try-except-finally jen proto, aby bylo zaručeno uvolnění nějakého prostředku.

Upravená implementace serveru:

import zmq
import time
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
def send_message(socket, message):
    """Poslání zprávy."""
    print("Sending message '{m}'".format(m=message))
    socket.send_string(message)
 
 
def start_server():
    """Spuštění serveru."""
 
    with zmq.Context() as context:
        with context.socket(CONNECTION_TYPE) as socket:
            address = "tcp://*:{port}".format(port=PORT)
            socket.bind(address)
            print("Bound to address {a}".format(a=address))
 
            for i in range(10):
                send_message(socket, "Message #{i}".format(i=i))
                time.sleep(1)
 
 
start_server()

Upravená implementace klienta:

import zmq
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
def start_client():
    """Spuštění klienta."""
 
    with zmq.Context() as context:
        with context.socket(CONNECTION_TYPE) as socket:
            address = "tcp://localhost:{port}".format(port=PORT)
            socket.connect(address)
            print("Connected to {a}".format(a=address))
 
            print("Waiting for message...")
            while True:
                message = socket.recv_string()
                print("Received message '{m}'".format(m=message))
 
 
start_client()

Samozřejmě si pro úplnost ukážeme chování serveru i klienta po jejich spuštění:

$ python3 server.py 
 
Bound to address tcp://*:5556
Sending message 'Message #0'
Sending message 'Message #1'
Sending message 'Message #2'
Sending message 'Message #3'
Sending message 'Message #4'
Sending message 'Message #5'
Sending message 'Message #6'
Sending message 'Message #7'
Sending message 'Message #8'
Sending message 'Message #9'
Socket.__exit__
Context.__exit__
Socket.__del__
Context.__del__
Poznámka: povšimněte si, že se nyní volají jak metody __exit__(), tak i __del__(). To ve skutečnosti nevadí, protože PyZMQ automaticky hlídá, aby se už uzavřený prostředek (socket, context) neuzavřel podruhé.
$ python3 client.py 
 
Connected to tcp://localhost:5556
Waiting for message...
Received message 'Message #0'
Received message 'Message #1'
Received message 'Message #2'
Received message 'Message #3'
Received message 'Message #4'
Received message 'Message #5'
Received message 'Message #6'
Received message 'Message #7'
Received message 'Message #8'
Received message 'Message #9'
^CSocket.__exit__
Context.__exit__
Traceback (most recent call last):
  File "client.py", line 23, in <module>
    start_client()
  File "client.py", line 19, in start_client
    message = socket.recv_string()
  File "/home/tester/.local/lib/python3.6/site-packages/zmq/sugar/socket.py", line 586, in recv_string
    msg = self.recv(flags=flags)
  File "zmq/backend/cython/socket.pyx", line 788, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 824, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc
KeyboardInterrupt
Socket.__del__
Context.__del__

7. Použití dekorátorů

Kromě explicitní práce se správci kontextu (tj. v praxi použitím bloku with) máme v PyZMQ ještě jednu možnost automatické či poloautomatické správy objektů typu Context a Socket. Můžeme totiž použít takzvané dekorátory a zcela se vyhnout explicitní konstrukci zmíněných objektů a tím pádem i jejich explicitnímu zavírání. Podívejme se, jak se tyto dekorátory použijí v praxi.

Nejprve je nutné naimportovat příslušné dekorátory, v prvním případě pouze dekorátor pro kontext ØMQ:

from zmq.decorators import context

Následně můžeme upravit funkci pro start serveru (či klienta) takovým způsobem, že před ni zapíšeme příslušný dekorátor. Ten zajistí automatické vytvoření kontextu, který se předá do volané funkce jako její první parametr. Hlavičku funkce tedy budeme muset upravit následovně:

@context()
def start_server(context):
    """Spuštění serveru."""
    ...
    ...
    ...

Ovšem volání bude provedeno bez zmíněného parametru (ten je z pohledu programátora vytvořen a předán automaticky):

start_server()

Podobně můžeme stejnou funkci upravit ještě lépe – použijeme dekorátor jak pro kontext, tak i pro socket. Povšimněte si pořadí zápisu dekorátorů a parametrů:

CONNECTION_TYPE = zmq.PAIR
 
 
@context()
@socket(CONNECTION_TYPE)
def start_server(context, socket):
    """Spuštění serveru."""
 
    address = "tcp://*:{port}".format(port=PORT)
    socket.bind(address)
    ...
    ...
    ...

Volání této funkce v programovém kódu bude stále stejné:

start_server()
Poznámka: v dekorátoru je možné použít všechny v dané chvíli dostupné (viditelné) objekty. V našem konkrétním případě musíme specifikovat typ otevíraného socketu, který je představován kvazikonstantou pojmenovanou CONNECTION_TYPE.

8. Třetí verze serveru a klienta

Ve třetí (předposlední) variantě původního demonstračního příkladu použijeme dekorátor, ovšem pouze pro práci s kontextem ØMQ. Nejprve si opět ukážeme zdrojový kód serveru:

import zmq
from zmq.decorators import context
import time
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
def send_message(socket, message):
    """Poslání zprávy."""
    print("Sending message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
def start_server(context):
    """Spuštění serveru."""
 
    with context.socket(CONNECTION_TYPE) as socket:
        address = "tcp://*:{port}".format(port=PORT)
        socket.bind(address)
        print("Bound to address {a}".format(a=address))
 
        for i in range(10):
            send_message(socket, "Message #{i}".format(i=i))
            time.sleep(1)
 
 
start_server()

Nový zdrojový kód klienta bude vypadat takto:

import zmq
from zmq.decorators import context
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
@context()
def start_client(context):
    """Spuštění klienta."""
 
    with context.socket(CONNECTION_TYPE) as socket:
        address = "tcp://localhost:{port}".format(port=PORT)
        socket.connect(address)
        print("Connected to {a}".format(a=address))
 
        print("Waiting for message...")
        while True:
            message = socket.recv_string()
            print("Received message '{m}'".format(m=message))
 
 
start_client()

Chování serveru a klienta po jejich spuštění se bude podobat předchozímu příkladu:

$ python3 server.py 
 
Bound to address tcp://*:5556
Sending message 'Message #0'
Sending message 'Message #1'
Sending message 'Message #2'
Sending message 'Message #3'
Sending message 'Message #4'
Sending message 'Message #5'
Sending message 'Message #6'
Sending message 'Message #7'
Sending message 'Message #8'
Sending message 'Message #9'
Socket.__exit__
Socket.__del__
Context.__exit__
Context.__del__
$ python3 client.py 
 
Connected to tcp://localhost:5556
Waiting for message...
Received message 'Message #0'
Received message 'Message #1'
Received message 'Message #2'
Received message 'Message #3'
Received message 'Message #4'
Received message 'Message #5'
Received message 'Message #6'
Received message 'Message #7'
Received message 'Message #8'
Received message 'Message #9'
^CSocket.__exit__
Context.__exit__
Traceback (most recent call last):
  File "client.py", line 24, in <module>
    start_client()
  File "/home/tester/.local/lib/python3.6/site-packages/zmq/decorators.py", line 75, in wrapper
    return func(*args, **kwargs)
  File "client.py", line 20, in start_client
    message = socket.recv_string()
  File "/home/tester/.local/lib/python3.6/site-packages/zmq/sugar/socket.py", line 586, in recv_string
    msg = self.recv(flags=flags)
  File "zmq/backend/cython/socket.pyx", line 788, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 824, in zmq.backend.cython.socket.Socket.recv
  File "zmq/backend/cython/socket.pyx", line 186, in zmq.backend.cython.socket._recv_copy
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc
KeyboardInterrupt
Socket.__del__
Context.__del__

9. Současné využití obou dekorátorů contextsocket

Jen v krátkosti se podívejme na způsob využití obou dekorátorů importovaných z balíčku zmq.decorators, tj. jak dekorátoru pro objekt představující kontext, tak i pro objekt představující socket.

Nová implementace serveru:

import zmq
from zmq.decorators import context, socket
import time
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
def send_message(socket, message):
    """Poslání zprávy."""
    print("Sending message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_server(context, socket):
    """Spuštění serveru."""
 
    address = "tcp://*:{port}".format(port=PORT)
    socket.bind(address)
    print("Bound to address {a}".format(a=address))
 
    for i in range(10):
        send_message(socket, "Message #{i}".format(i=i))
        time.sleep(1)
 
 
start_server()

Nová implementace klienta:

import zmq
from zmq.decorators import context, socket
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
@context()
@socket(CONNECTION_TYPE)
def start_client(context, socket):
    """Spuštění klienta."""
 
    address = "tcp://localhost:{port}".format(port=PORT)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
    print("Waiting for message...")
    while True:
        message = socket.recv_string()
        print("Received message '{m}'".format(m=message))
 
 
start_client()

10. Posílání složitějších datových struktur ve formátu JSON

Již v předchozím článku jsme se zmínili o tom, že v knihovně PyZMQ nalezneme metody, které nám umožní posílat kromě obecných sekvencí bajtů a řetězců taktéž složitější datové struktury. Ty mohou být buď převedeny do formátu JSON, popř. je možné vzít strukturu reprezentovanou pythoním objektem a poslat (a na druhé straně samozřejmě přijmout a zpracovat) serializovanou variantu tohoto objektu.

Nejprve se podívejme na způsob posílání dat ve formátu JSON. Ve skutečnosti je použití JSONu velmi jednoduché, protože můžeme použít metody Socket.send_json() a Socket.recv_json(). V mnoha případech si tak ušetříme nutnost parsování textových zpráv.

Server, který bude posílat složitější datové struktury ve formátu JSON, bude implementován následujícím způsobem:

import zmq
from zmq.decorators import context, socket
import time
from datetime import datetime
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
def send_json_message(socket, message, i, timestamp):
    """Poslání zprávy."""
    print("Sending message #{i}'".format(i=i))
    payload = {
        "message": message,
        "number": i,
        "timestamp": str(timestamp)
    }
    socket.send_json(payload)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_server(context, socket):
    """Spuštění serveru."""
 
    address = "tcp://*:{port}".format(port=PORT)
    socket.bind(address)
    print("Bound to address {a}".format(a=address))
 
    for i in range(10):
        send_json_message(socket, "Message", i, datetime.now())
        time.sleep(1)
 
 
start_server()

Klient zprávu přijme a deserializuje ji do běžného slovníku, popř. seznamu (obecně se jedná o rekurzivní datovou strukturu):

import zmq
from zmq.decorators import context, socket
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
@context()
@socket(CONNECTION_TYPE)
def start_client(context, socket):
    """Spuštění klienta."""
 
    address = "tcp://localhost:{port}".format(port=PORT)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
    print("Waiting for message...")
    while True:
        payload = socket.recv_json()
        print("Received message #{i} with timestamp {t}: '{m}'".format(
            i=payload["number"],
            t=payload["timestamp"],
            m=payload["message"]))
 
 
start_client()

Klient by měl po svém spuštění přijmout deset zpráv a zobrazovat o nich podrobnější informace, zejména jejich časové razítko, pořadí a vlastní text zprávy:

$ python3 client.py
 
Connected to tcp://localhost:5556
Waiting for message...
Received message #0 with timestamp 2019-01-21 17:14:27.687456: 'Message'
Received message #1 with timestamp 2019-01-21 17:14:28.774764: 'Message'
Received message #2 with timestamp 2019-01-21 17:14:29.776133: 'Message'
Received message #3 with timestamp 2019-01-21 17:14:30.777448: 'Message'
Received message #4 with timestamp 2019-01-21 17:14:31.778886: 'Message'
Received message #5 with timestamp 2019-01-21 17:14:32.780256: 'Message'
Received message #6 with timestamp 2019-01-21 17:14:33.781626: 'Message'
Received message #7 with timestamp 2019-01-21 17:14:34.783037: 'Message'
Received message #8 with timestamp 2019-01-21 17:14:35.784384: 'Message'
Received message #9 with timestamp 2019-01-21 17:14:36.785755: 'Message'

11. Serializace a deserializace objektů

V případě, že nám nebude dostačovat použití slovníků, popř. seznamů posílaných ve formátu JSON, můžeme využít další užitečnou vlastnost knihovny PyZMQ. Tou je možnost serializace a deserializace pythonních objektů. Pro vlastní serializaci se používá knihovna pickle, ovšem z pohledu běžného programátora je její volání zcela transparentní. Programátor pouze potřebuje zavolat metodu Socket.send_pyobj() a na druhé straně komunikačního kanálu metodu Socket.recv_pyobj().

Posílat budeme instance této třídy:

from datetime import datetime
 
 
class Message():
    def __init__(self, number):
        self.number = number
        self.timestamp = str(datetime.now())
        self.message = "Message"

Implementace serveru:

import zmq
from zmq.decorators import context, socket
import time
from message import Message
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
def send_serialized_object(socket, obj, i):
    """Poslání zprávy."""
    print("Sending message #{i}'".format(i=i))
    socket.send_pyobj(obj)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_server(context, socket):
    """Spuštění serveru."""
 
    address = "tcp://*:{port}".format(port=PORT)
    socket.bind(address)
    print("Bound to address {a}".format(a=address))
 
    for i in range(10):
        m = Message(i)
        send_serialized_object(socket, m, i)
        time.sleep(1)
 
 
start_server()

Implementace klienta:

import zmq
from zmq.decorators import context, socket
from message import Message
 
 
CONNECTION_TYPE = zmq.PAIR
PORT = 5556
 
 
@context()
@socket(CONNECTION_TYPE)
def start_client(context, socket):
    """Spuštění klienta."""
 
    address = "tcp://localhost:{port}".format(port=PORT)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
    print("Waiting for message...")
    while True:
        payload = socket.recv_pyobj()
        print("Received message #{i} with timestamp {t}: '{m}'".format(
            i=payload.number,
            t=payload.timestamp,
            m=payload.message))
 
 
start_client()

12. Model (komunikační strategie) PUSH-PULL

V dalším textu se budeme zabývat způsoby využití komunikační strategie PUSH-PULL. Komunikace je v tomto případě jednosměrná, a to od serveru (ten se zde nazývá producent, producer) ke klientovi (konzument, consumer). Oproti podobně koncipované strategii PUB-SUB jsou však zprávy doručovány odlišným způsobem: vždy jen jednomu vybranému konzumentu (rozdělení zhruba round-robin) a navíc s tím, že pokud žádný konzument není v danou chvíli připravený na zpracování zprávy, je zpráva uložena do interně spravované fronty. Tato komunikační strategie se tedy přibližuje možnostem klasických message brokerů, ovšem jak uvidíme v další části, je zde stále co vylepšovat.

Obrázek 5: Jednosměrná komunikace využívající strategii PUSH-PUL.

Obrázek 6: Použití většího množství konzumentů při použití strategie PUSH-PUL.

13. Producent a konzument používající strategii PUSH-PULL

Podívejme se nejprve na implementaci producenta zpráv. Ten musí používat sockety typu zmq.PUSH a po svém spuštění vytvoří sto zpráv se zajištěním jejich přijetí nějakým konzumentem:

import zmq
from zmq.decorators import context, socket
import time
 
 
CONNECTION_TYPE = zmq.PUSH
PORT = 5556
 
 
def send_message(socket, message):
    """Poslání zprávy."""
    print("Sending message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_producer(context, socket):
    """Spuštění serveru."""
 
    address = "tcp://*:{port}".format(port=PORT)
    # socket.set_hwm(1)
    socket.bind(address)
    print("Bound to address {a}".format(a=address))
 
    for i in range(100):
        send_message(socket, "Message #{i}".format(i=i))
        time.sleep(0.1)
 
 
start_producer()

Implementace konzumenta zpráv, který musí využívat sockety typu zmq.PULL:

import zmq
from zmq.decorators import context, socket
import time
 
 
CONNECTION_TYPE = zmq.PULL
PORT = 5556
 
 
@context()
@socket(CONNECTION_TYPE)
def start_consumer(context, socket):
    """Spuštění konzumenta."""
 
    address = "tcp://localhost:{port}".format(port=PORT)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
    print("Waiting for message...")
    cnt = 0
    while True:
        message = socket.recv_string()
        cnt += 1
        print("Received message {c} of 100: '{m}'".format(c=cnt, m=message))
        time.sleep(0)
 
 
start_consumer()

14. Rozdělení zpráv mezi větší množství konzumentů ve strategii PUSH-PULL

Nyní si spustíme dva konzumenty a jediného producenta, aby bylo patrné, jak se budou zprávy přeposílat algoritmem round-robin:

$ python3 consumer.py
 
Connected to tcp://localhost:5556
Waiting for message...
Received message 1 of 100: 'Message #2'
Received message 2 of 100: 'Message #4'
...
...
...
Received message 49 of 100: 'Message #98'
$ python3 consumer.py
 
Connected to tcp://localhost:5556
Waiting for message...
Received message 1 of 100: 'Message #0'
Received message 2 of 100: 'Message #1'
...
...
...
Received message 51 of 100: 'Message #99'
$ python3 producent.py
 
Bound to address tcp://*:5556
Sending message 'Message #0'
Sending message 'Message #1'
...
...
...
Poznámka: prozatím se musíme spokojit s tím, že producent bude mezi jednotlivými zprávami na chvíli pozastaven. Tuto nedokonalost odstraníme příště.

15. Architektura producent-workeři-collector (fan-out a fan-in)

Předchozí demonstrační příklad ukazoval použití jednoho producenta zpráv a několika konzumentů (nebo též workerů, podle toho, o jakou aplikaci se jedná). V praxi ovšem mnohdy s takovou architekturou nevystačíme a budeme muset přidat ještě jeden typ uzlu, který se nazývá collector (shromažďovač). Typicky bývá producent i collector spuštěn pouze jedenkrát, workerů bývá větší množství. Zajímavé bude zjistit, který typ uzlu by měl být serverem s pevně nastavenými porty. Bývá dobrým zvykem, aby serverem byly ty uzly, které jsou nejvíce stabilní, což je v tomto případě producent i collector. Co to znamená z hlediska implementace: producent i collector budou při vytváření připojení používat metodu Socket.bind() zatímco workeři, které je možné připojovat a odpojovat podle potřeb aplikace, budou používat metodu Socket.connect. Dále je nutné použít dvě čísla portů – jeden pro komunikaci producent → workeři, druhý pro komunikaci workeři → collector.

Při pohledu na schéma této architektury je zřejmé, proč se první (horní) část nazývá fanout (též fan-out) a druhá fanin (fan-in, i když osobně bych spíš použit funnel):

Obrázek 7: Architektura producent-workeři-collector (fan-out a fan-in).

16. Implementace architektury producent-workeři-collector

Implementace producenta se vlastně nijak zásadně neliší od předchozího příkladu. Pouze si povšimněte, že producent používá socket typu zmq.PUSH a otevírá připojení na portu 5556:

import zmq
from zmq.decorators import context, socket
from time import sleep
 
 
CONNECTION_TYPE = zmq.PUSH
PORT = 5556
 
 
def send_message(socket, message):
    """Poslání zprávy."""
    print("Sending message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_producer(context, socket):
    """Spuštění serveru."""
 
    address = "tcp://*:{port}".format(port=PORT)
    # socket.set_hwm(1)
    socket.bind(address)
    print("Bound to address {a}".format(a=address))
 
    for i in range(101):
        send_message(socket, "Message #{i}".format(i=i))
        sleep(0.1)
 
 
start_producer()

Nejsložitější je konzument neboli worker. Ten totiž musí pracovat se dvěma sockety různých typů. První socket je připojen na port 5556, je typu zmq.PULL a slouží pro příjem zprávy od producenta. Druhý socket je připojen na port 5557, je typu zmq.PUSH a používá se pro poslání výsledku do collectoru:

import zmq
from zmq.decorators import context, socket
from os import getpid
 
 
IN_PORT = 5556
OUT_PORT = 5557
 
 
@context()
@socket(zmq.PULL)
@socket(zmq.PUSH)
def start_consumer(context, in_socket, out_socket):
    """Spuštění konzumenta."""
 
    address = "tcp://localhost:{port}".format(port=IN_PORT)
    in_socket.connect(address)
    print("Connected to {a}".format(a=address))
 
    address = "tcp://localhost:{port}".format(port=OUT_PORT)
    out_socket.connect(address)
    print("And to {a}".format(a=address))
 
    print("Waiting for message...")
    pid = getpid()
 
    while True:
        message = in_socket.recv_string()
        out_message = "Message from {pid}: '{m}'".format(pid=pid, m=message)
        print(out_message)
        out_socket.send_string(out_message)
 
 
start_consumer()

A konečně se na konci celé „pipeline“ nachází collector. Ten používá socket typu zmq.PULL, který se otevírá na portu 5557:

import zmq
from zmq.decorators import context, socket
from time import sleep
 
 
CONNECTION_TYPE = zmq.PULL
PORT = 5557
 
 
@context()
@socket(CONNECTION_TYPE)
def start_collector(context, socket):
    """Spuštění sběratele."""
 
    address = "tcp://*:{port}".format(port=PORT)
    socket.bind(address)
    print("Connected to {a}".format(a=address))
 
    print("Waiting for message...")
    while True:
        message = socket.recv_string()
        print("Collecting message: '{m}'".format(m=message))
        sleep(0)
 
 
start_collector()

17. Chování architektury s fan-out a fan-in

Chování výše implementovaného příkladu si ukážeme na této konfiguraci:

  • Jeden producent
  • Dva konzumenti/workeři (simulace rozdělení zátěže)
  • Jeden collector (shromažďovač)

Producent:

$ python3 producer.py 
 
Bound to address tcp://*:5556
Sending message 'Message #0'

Worker #1:

$ python3 consumer.py 
 
Connected to tcp://localhost:5556
And to tcp://localhost:5557
Waiting for message...
Message from 8050: 'Message #6'
Message from 8050: 'Message #8'
...
...
...

Worker #2:

$ python3 consumer.py
 
Connected to tcp://localhost:5556
And to tcp://localhost:5557
Waiting for message...
Message from 8047: 'Message #0'
Message from 8047: 'Message #1'
...
...
...

Collector:

$ python3 collector.py 
 
Connected to tcp://*:5557
Waiting for message...
Collecting message: 'Message from 8047: 'Message #0''
...
...
...
Collecting message: 'Message from 8050: 'Message #6''
...
...
...

Můžeme vidět, že collector skutečně postupně přijme zprávy ode všech workerů.

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu a taktéž v programovacím jazyku C byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.

bitcoin školení listopad 24

Příklad Skript/kód Popis Cesta
1 client.py automatické uzavírání prostředků, implementace klienta https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example06_pair_strate­gy/client.py
1 server.py automatické uzavírání prostředků, implementace serveru https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example06_pair_strate­gy/server.py
       
2 client.py explicitní uzavírání prostředků, implementace klienta https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example07_proper_close/cli­ent.py
2 server.py explicitní uzavírání prostředků, implementace serveru https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example07_proper_close/ser­ver.py
       
3 client.py použití správce kontextu, implementace klienta https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example08_context_mana­gers/client.py
3 server.py použití správce kontextu, implementace serveru https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example08_proper_close/ser­ver.py
       
4 client.py využití dekorátorů, implementace klienta https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example09_decorators/cli­ent.py
4 server.py využití dekorátorů, implementace serveru https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example09_decorators/ser­ver.py
       
5 client.py využití dekorátorů, implementace klienta https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example10_decorators/cli­ent.py
5 server.py využití dekorátorů, implementace serveru https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example10_decorators/ser­ver.py
       
6 client.py posílání strukturovaných dat v JSONu, implementace klienta https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example11_json_message/cli­ent.py
6 server.py posílání strukturovaných dat v JSONu, implementace serveru https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example11_json_message/ser­ver.py
       
7 client.py posílání serializovaných objektů, implementace klienta https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example12_serializati­on/client.py
7 server.py posílání serializovaných objektů, implementace serveru https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example12_serializati­on/server.py
7 message.py posílání serializovaných objektů, posílaný objekt https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example12_serializati­on/message.py
       
8 producer.py producent v PUSH-PULL modelu https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example13_push_pull/
8 consumer.py konzument v PUSH-PULL modelu https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example13_push_pull/con­sumer.py
       
9 producer.py producent v modelu fan-out, fan-in https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example14_push_pull_fa­nout_fanin/producer.py
9 consumer.py konzument v modelu fan-out, fan-in https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example14_push_pull_fa­nout_fanin/consumer.py
9 collector.py shromažďovač výsledků v modelu fan-out, fan-in https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example14_push_pull_fa­nout_fanin/collector.py
       
10 producer.py producent na začátku pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/producer.py
10 worker1.py první worker jako součást pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/worker1.py
10 worker2.py druhý worker jako součást pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/worker2.py
10 collector.py shromažďovač výsledků na konci pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/collector.py

Pro úplnost ještě přidejme odkazy na dva patche, kterými je možné upravit knihovnu PyZMQ takovým způsobem, aby bylo možné snadno sledovat uzavírání prostředků context a socket:

19. Odkazy na předchozí části seriálu

V této kapitole jsou uvedeny odkazy na všech šest předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:

  1. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
  2. Celery: systém implementující asynchronní fronty úloh pro Python
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/
  3. Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/
  4. RabbitMQ: jedna z nejúspěšnějších implementací brokera
    https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/
  5. Pokročilejší operace nabízené systémem RabbitMQ
    https://www.root.cz/clanky/po­krocilejsi-operace-nabizene-systemem-rabbitmq/
  6. ØMQ: knihovna pro asynchronní předávání zpráv
    https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/

20. Odkazy na Internetu

  1. ØMQ – Distributed Messaging
    http://zeromq.org/
  2. ØMQ Community
    http://zeromq.org/community
  3. Get The Software
    http://zeromq.org/intro:get-the-software
  4. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  5. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  6. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  7. ZeroMQ RFC
    https://rfc.zeromq.org/
  8. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  9. zeromq/czmq
    https://github.com/zeromq/czmq
  10. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  11. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  12. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  13. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  14. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  15. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  16. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  17. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  18. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  19. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  20. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  21. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  22. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  23. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  24. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  25. Python binding
    http://zeromq.org/bindings:python
  26. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  27. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  28. About Nanomsg
    https://nanomsg.org/
  29. Advanced Message Queuing Protocol
    https://www.amqp.org/
  30. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  31. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  32. RabbitMQ
    https://www.rabbitmq.com/
  33. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  34. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  35. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  36. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  37. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  38. Erlang
    http://www.erlang.org/
  39. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  40. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  41. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  42. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  43. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  44. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  45. celery na PyPi
    https://pypi.org/project/celery/
  46. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  47. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  48. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  49. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  50. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  51. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  52. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  53. node-celery
    https://github.com/mher/node-celery
  54. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  55. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  56. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  57. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  58. Stránky projektu Redis
    https://redis.io/
  59. Introduction to Redis
    https://redis.io/topics/introduction
  60. Try Redis
    http://try.redis.io/
  61. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  62. Python Redis
    https://redislabs.com/lp/python-redis/
  63. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  64. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  65. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  66. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  67. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  68. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  69. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  70. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  71. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  72. Redis Persistence
    https://redis.io/topics/persistence
  73. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  74. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  75. Ost (knihovna)
    https://github.com/soveran/ost
  76. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  77. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  78. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  79. What Is Sharding?
    https://btcmanager.com/what-sharding/
  80. Redis clients
    https://redis.io/clients
  81. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  82. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  83. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  84. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  85. Resque
    https://github.com/resque/resque
  86. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  87. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  88. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  89. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  90. Pub/Sub
    https://redis.io/topics/pubsub
  91. ZeroMQ distributed messaging
    http://zeromq.org/
  92. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  93. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  94. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  95. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  96. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  97. Redis Protocol specification
    https://redis.io/topics/protocol
  98. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  99. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  100. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  101. Apache Kafka
    https://kafka.apache.org/
  102. Iron
    http://www.iron.io/mq
  103. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  104. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  105. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  106. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  107. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  108. Enqueueing internals
    http://python-rq.org/contrib/
  109. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  110. Queues
    http://queues.io/
  111. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  112. RestMQ
    http://restmq.com/
  113. ActiveMQ
    http://activemq.apache.org/
  114. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  115. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  116. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  117. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  118. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  119. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  120. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  121. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  122. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  123. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  124. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  125. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  126. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  127. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  128. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  129. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  130. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  131. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  132. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  133. Jupyter
    https://jupyter.org/
  134. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  135. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html

Autor článku

Vystudoval VUT FIT a v současné době pracuje na projektech vytvářených v jazycích Python a Go.