Hlavní navigace

Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě

Pavel Tišnovský

Dnes si ukážeme některé další použití knihovny ØMQ. Nejprve dokončíme popis komunikační strategie PUSH-PULL, ukážeme si zařízení PyZMQ a pak si řekneme, jak je možné použít knihovnu ØMQ v programovacím jazyce Java.

Doba čtení: 31 minut

11. Komunikace typu REQ-REP: klient naprogramovaný v Pythonu a server v Javě

12. Otestování komunikace klient-server

13. Portace klienta do Javy

14. Komunikační strategie typu PUSH-PULL: přepis producenta zpráv do Javy

15. Komunikační strategie typu PUSH-PULL: přepis konzumenta zpráv do Javy

16. Rozdělení zátěže mezi větší počet konzumentů (workerů)

17. Repositář s demonstračními příklady

18. Odkazy na předchozí části seriálu

19. Odkazy na Internetu

1. Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě

Dnešní článek o knihovně ØMQ je rozdělen na tři části. V úvodní části dokončíme popis využití komunikační strategie typu PUSH-PULL, s níž jsme se již v tomto seriálu několikrát setkali, ovšem prozatím jsme si neukázali všechny možnosti, které nám tato strategie nabízí. Na tuto část navážeme popisem vlastností zařízení PyZMQ (PyZMQ devices), která se používají jednodušeji, než zařízení samotné knihovny ØMQ (ØMQ devices). A konečně bude závěrečná část věnována základním způsobům použití knihovny ØMQ, přesněji řečeno její varianty nazvané JeroMQ, z programovacího jazyka Java. Díky závěrečné části článku se nám tedy podaří propojit témata tohoto seriálu a poněkud nepravidelně vycházejících článků o Javě a o virtuálním stroji Javy.

2. Pipeline využívající strategii PUSH-PULL

V této kapitole se na chvíli vrátíme k architektuře používající komunikační strategii PUSH-PULL.

Obrázek 1: Jednosměrná komunikace využívající strategii PUSH-PULL.

Minule jsme si ukázali, jakým způsobem je možné implementovat rozložení zátěže (úkolů) mezi větší množství workerů pomocí fan-out (fanout) a následně všechny výsledky práce workerů sloučit a popř. dále zpracovat (uložit atd.) v jediném uzlu (fan-in).

Obrázek 2: Využití většího množství workerů pomocí fan-in a fan-out.

Ovšem s využitím strategie PUSH-PULL je možné realizovat i „kolonu“ (pipeline) složenou z většího množství workerů, kteří si navzájem posílají výsledky své práce. Pojmenování pipeline je v tomto případě příznačné, protože prakticky tu stejnou funkci nám poskytuje shell při použití rour a FIFO. Ukažme si nyní, jak je možné realizovat relativně jednoduchou a přímočarou pipeline, v níž existuje jeden zdroj úkolů, workery dvou typů a na konci sběratel (collector) výsledků:

Obrázek 3: Jednoduchá pipeline.

Ve výše zobrazeném schématu se prozatím nenachází žádné zařízení, což je obecně poněkud problematické, protože to znamená, že jeden z workerů musí vystupovat v roli serveru (a přitom víme, že workeři jsou obecně nestabilním prvkem celého systému, jelikož je typicky připojujeme a odpojujeme podle aktuální zátěže, workeři mohou být restartováni po pádu, po zjištění velké spotřeby paměti atd.).

Implementace uzlu, který vytváří úkoly (job, task) pro jednotlivé workery a používá ve svém zdrojovém kódu dekorátory, vypadá následovně:

import zmq
from zmq.decorators import context, socket
from time import sleep
 
 
CONNECTION_TYPE = zmq.PUSH
PORT = 5556
 
 
def send_message(socket, message):
    """Poslání zprávy."""
    print("Sending message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_producer(context, socket):
    """Spuštění serveru."""
 
    address = "tcp://*:{port}".format(port=PORT)
    # socket.set_hwm(1)
    socket.bind(address)
    print("Bound to address {a}".format(a=address))
 
    for i in range(11):
        send_message(socket, "Message #{i}".format(i=i))
        sleep(0.1)
 
 
start_producer()

Implementace prvního workera, který je umístěn do levé části pipeline:

import zmq
from zmq.decorators import context, socket
from os import getpid
 
 
IN_PORT = 5556
OUT_PORT = 5557
 
 
@context()
@socket(zmq.PULL)
@socket(zmq.PUSH)
def start_worker(context, in_socket, out_socket):
    """Spuštění workera."""
 
    address = "tcp://localhost:{port}".format(port=IN_PORT)
    in_socket.connect(address)
    print("Connected to {a}".format(a=address))
 
    address = "tcp://*:{port}".format(port=OUT_PORT)
    out_socket.bind(address)
    print("And to {a}".format(a=address))
 
    print("Waiting for message from producer...")
    pid = getpid()
 
    while True:
        message = in_socket.recv_string()
        out_message = "Message from {pid}: '{m}'".format(pid=pid, m=message)
        print(out_message)
        out_socket.send_string(out_message)
 
 
start_worker()

Implementace druhého workera, který je umístěn do pravé části pipeline:

import zmq
from zmq.decorators import context, socket
from os import getpid
 
 
IN_PORT = 5557
OUT_PORT = 5558
 
 
@context()
@socket(zmq.PULL)
@socket(zmq.PUSH)
def start_worker(context, in_socket, out_socket):
    """Spuštění workera."""
 
    address = "tcp://localhost:{port}".format(port=IN_PORT)
    in_socket.connect(address)
    print("Connected to {a}".format(a=address))
 
    address = "tcp://localhost:{port}".format(port=OUT_PORT)
    out_socket.connect(address)
    print("And to {a}".format(a=address))
 
    print("Waiting for message from worker #1...")
    pid = getpid()
 
    while True:
        message = in_socket.recv_string()
        out_message = "Message from {pid}: '{m}'".format(pid=pid, m=message)
        print(out_message)
        out_socket.send_string(out_message)
 
 
start_worker()

A nakonec si ukažme implementaci sběratele výsledků:

import zmq
from zmq.decorators import context, socket
from time import sleep
 
 
CONNECTION_TYPE = zmq.PULL
PORT = 5558
 
 
@context()
@socket(CONNECTION_TYPE)
def start_collector(context, socket):
    """Spuštění sběratele."""
 
    address = "tcp://*:{port}".format(port=PORT)
    socket.bind(address)
    print("Connected to {a}".format(a=address))
 
    print("Waiting for message from worker #2 ...")
    while True:
        message = socket.recv_string()
        print("Collecting message: '{m}'".format(m=message))
        sleep(0)
 
 
start_collector()
Poznámka: povšimněte si, že se používají tři porty:
  1. 5556: od producenta k prvnímu workeru
  2. 5557: od prvního workera ke druhému workeru
  3. 5558: od druhého workera k procesu, který výsledky zpracuje

3. Zařízení typu Streamer

Posledním typem zařízení, s nímž se v dnešním článku seznámíme, je zařízení nazvané ZMQ.STREAMER. Toto zařízení se používá ve chvíli, kdy spolu jednotlivé uzly komunikují s využitím strategie PUSH-PULL. Streamer se do celého procesu zpracování přidává jednoduše, protože se na přední straně (frontend) chová jako běžný příjemze zpráv (PULL) a na zadní straně (backend) jako jejich zdroj. Na rozdíl od zařízení Queue tedy nebylo nutné přidat nové typy socketů typu XREP a XREQ:

Obrázek 4: Použití zařízení typu Streamer.

Opět si ukažme princip použití tohoto zařízení. Nejprve vytvoříme skripty se zdrojem zpráv a s jejich cílovým zpracováním. V obou skriptech se bude používat metoda Socket.connect, nikoli Socket.bind. Navíc se budou používat odlišná čísla portů, protože mezi producentem a konzumentem bude umístěno právě zařízení typu Streamer.

Implementace producenta zpráv:

import zmq
from zmq.decorators import context, socket
import time
 
 
CONNECTION_TYPE = zmq.PUSH
PORT = 5550
 
 
def send_message(socket, message):
    """Poslání zprávy."""
    print("Sending message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@context()
@socket(CONNECTION_TYPE)
def start_producer(port, context, socket):
    """Spuštění serveru."""
 
    address = "tcp://localhost:{port}".format(port=port)
    # socket.set_hwm(1)
    socket.connect(address)
    print("Connected to address {a}".format(a=address))
 
    for i in range(100):
        send_message(socket, "Message #{i}".format(i=i))
        time.sleep(0.2)
 
 
start_producer(PORT)

Implementace konzumenta zpráv:

import zmq
from zmq.decorators import context, socket
import time
 
 
CONNECTION_TYPE = zmq.PULL
PORT = 5551
 
 
@context()
@socket(CONNECTION_TYPE)
def start_consumer(port, context, socket):
    """Spuštění konzumenta."""
 
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
    print("Waiting for message...")
    cnt = 0
    while True:
        message = socket.recv_string()
        cnt += 1
        print("Received message {c} of 100: '{m}'".format(c=cnt, m=message))
        time.sleep(0)
 
 
start_consumer(PORT)

4. Připojení k zařízení typu Streamer

Vlastní implementace streameru je vlastně velmi jednoduchá a prakticky se neliší například od implementace zařízení typu Queue. Ostatně se můžeme podívat na zdrojový kód skriptu s implementací, v němž se na portu 5550 navazuje připojení typu PULL a na portu 5551 připojení typu PUSH, tj. přesně tak, jak to bylo naznačeno na čtvrtém obrázku. Na obou stranách se používá metoda Socket.bind, protože streamer bude ústředním a stabilním prvkem celé architektury:

import zmq
from zmq.decorators import context, socket
 
PULL_PORT = 5550
PUSH_PORT = 5551
 
 
@context()
@socket(zmq.PULL)
@socket(zmq.PUSH)
def create_streamer(pull_port, push_port, context, frontend, backend):
    """Vytvoření streameru."""
    context = zmq.Context()
 
    address = "tcp://*:{port}".format(port=pull_port)
    frontend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=pull_port))
 
    address = "tcp://*:{port}".format(port=push_port)
    backend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=push_port))
 
    zmq.device(zmq.STREAMER, frontend, backend)
 
 
create_streamer(PULL_PORT, PUSH_PORT)

5. Zařízení PyZMQ

V předchozím článku jsme se mj. zabývali i popisem takzvaných zařízení ØMQ. Připomeňme si, že samotná knihovna ØMQ nabízí vývojářům následující zařízení, která se většinou používají při návrhu systémů se složitější architekturou a s požadavkem na jejich větší robustnost:

Zařízení Stručný popis
ZMQ.QUEUE prostředník používaný především v komunikaci REQ-REP (klasický klient, server)
ZMQ.FORWARDER používá se jako prostředník mezi zdroji zpráv a jejich příjemci PUB-SUB
ZMQ.STREAMER používá se v komunikační strategii PUSH-PULL

Knihovna PyZMQ, která slouží jako rozhraní mezi nativní knihovnou ØMQ a programovacím jazykem Python, programátorům nabízí ještě jednu možnost – takzvaná zařízení PyZMQ (PyZMQ devices). Jejich základní význam je stejný, jako u zařízení ØMQ, ovšem zatímco u zařízení ØMQ jsme museli explicitně otevírat (popř. i zavírat) sockety, u zařízení PyZMQ se pracuje pouze s typy socketů. Všechny operace typu vytvoření socketu, vytvoření zařízení ØMQ atd., knihovna PyZMQ udělá za vývojáře sama, takže výsledný zdrojový kód je mnohem jednodušší a taktéž idiomatičtější (navíc se v naprosté většině případů vyhneme i použití dekorátorů, dokonce ani není nutné explicitně vytvářet instanci kontextu ØMQ).

Navíc je kód reprezentující zařízení spuštěn v samostatném (automaticky vytvořeném) vláknu, což znamená, že budeme moci velmi snadno v jednom skriptu implementovat jak příslušné zařízení, tak například i server či producenta. Z dalších pohledů však zařízení PyZMQ svými vlastnostmi plně odpovídají zařízením ØMQ:

Zařízení ØMQ Zařízení PyZMQ
zmq.device(zmq.QUEUE, socket, socket) ProcessDevice(zmq.QUEUE, typ_socketu, typ_socketu)
zmq.device(zmq.FORWARDER, socket, socket) ProcessDevice(zmq.FORWARDER, typ_socketu, typ_socketu)
zmq.device(zmq.STREAMER, socket, socket) ProcessDevice(zmq.STREAMER, typ_socketu, typ_socketu)

6. Základní rozdíly mezi zařízeními 0MQ a PyZMQ

Podívejme se nyní na praktické rozdíly mezi zařízeními knihovny ØMQ a zařízeními rozhraní PyZMQ z pohledu běžného vývojáře.

Nejdříve si připomeňme, jakým způsobem jsme použili zařízení Streamer, které se používá při komunikaci typu PUSH-PULL. Toto zařízení bude vloženo mezi producenta zpráv a mezi jejich konzumenta.

Samotnou implementaci meziuzlu reprezentovaného zařízením Streamer můžeme přepsat s využitím zařízení PyZMQ. Nejprve musíme importovat třídu ProcessDevice z modulu zmq.devices.basedevice:

from zmq.devices.basedevice import ProcessDevice

Dále zkonstruujeme instanci třídy ProcessDevice, přičemž budeme muset specifikovat jak typ zařízení (Streamer), tak i typ socketu na frontendu i na backendu (skutečně se specifikují pouze typy socketů, nikoli samotné sockety – ty nemusíme explicitně vytvářet):

streamer_device = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)

Jakmile je zařízení vytvořeno, musíme nastavit adresy, na nichž zařízení typu Streamer naslouchá. K tomu se používají metody nazvané bind_in a bind_out:

frontend_address = "tcp://*:{port}".format(port=pull_port)
backend_address = "tcp://*:{port}".format(port=push_port)
 
streamer_device.bind_in(frontend_address)
streamer_device.bind_out(backend_address)

Nakonec pouze zařízení spustíme, takže se interně v novém vláknu (tedy z našeho pohledu na pozadí) spustí nekonečná smyčka zpracovávající zprávy přicházející na frontend a posílající zprávy na backend:

streamer_device.start()

Úplný zdrojový kód meziuzlu se zařízením typu Streamer a současně i s producentem zpráv vypadá takto:

import zmq
from zmq.decorators import context, socket
from zmq.devices.basedevice import ProcessDevice
import time
 
PULL_PORT = 5550
PUSH_PORT = 5551
 
PRODUCER_CONNECTION_TYPE = zmq.PUSH
PRODUCER_PORT = 5550
 
 
def create_streamer(pull_port, push_port):
    """Vytvoření streameru."""
    streamer_device = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
 
    frontend_address = "tcp://*:{port}".format(port=pull_port)
    backend_address = "tcp://*:{port}".format(port=push_port)
 
    streamer_device.bind_in(frontend_address)
    streamer_device.bind_out(backend_address)
 
    print("Bound to {a} on port {p}".format(a=frontend_address, p=pull_port))
    print("Bound to {a} on port {p}".format(a=backend_address, p=push_port))
 
    streamer_device.start()
    print("Device started in background")
 
 
def send_message(socket, message):
    """Poslání zprávy."""
    print("Sending message '{m}'".format(m=message))
    socket.send_string(message)
 
 
@socket(PRODUCER_CONNECTION_TYPE)
def start_producer(port, context, socket):
    """Spuštění zdroje zprav."""
 
    address = "tcp://localhost:{port}".format(port=port)
    # socket.set_hwm(1)
    socket.connect(address)
    print("Connected to address {a}".format(a=address))
 
    for i in range(100):
        send_message(socket, "Message #{i}".format(i=i))
        time.sleep(0.2)
 
 
@context()
def start_device_and_producer(context):
    create_streamer(PULL_PORT, PUSH_PORT)
    start_producer(PRODUCER_PORT, context)
 
 
start_device_and_producer()

Kód konzumenta se žádným způsobem nezmění, takže jen pro úplnost:

import zmq
from zmq.decorators import context, socket
import time
 
 
CONNECTION_TYPE = zmq.PULL
PORT = 5551
 
 
@context()
@socket(CONNECTION_TYPE)
def start_consumer(port, context, socket):
    """Spuštění konzumenta."""
 
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
 
    print("Waiting for message...")
    cnt = 0
    while True:
        message = socket.recv_string()
        cnt += 1
        print("Received message {c} of 100: '{m}'".format(c=cnt, m=message))
        time.sleep(0)
 
 
start_consumer(PORT)

7. PyZMQ zařízení typu Queue versus ØMQ Queue

Druhým typem zařízení rozhraní PyZMQ, s nímž se dnes seznámíme, je zařízení typu Queue, které se využívá především v komunikační strategii REP-REQ, s níž jsme se již taktéž setkali. Opět si nejdříve ukažme implementaci jednoduchého serveru a klienta, mezi něž je vložena fronta vytvořená klasicky pomocí funkcí 0MQ:

Server vypadal takto:

import zmq
from math import factorial
 
PORT = 5557
 
 
def connect(port, connection_type):
    """Otevření socketu se specifikovaným typem spojení."""
    context = zmq.Context()
    socket = context.socket(connection_type)
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
    return socket
 
 
def send_response(socket, response):
    """Odeslání odpovědi."""
    print("Sending response '{r}'".format(r=response))
    socket.send_string(response)
 
 
def receive_request(socket):
    """Zpracování požadavku klienta."""
    request = socket.recv_string()
    print("Received request from client: '{r}'".format(r=request))
    return request
 
 
def start_server():
    """Spuštění serveru."""
    socket = connect(PORT, zmq.REP)
    while True:
        request = receive_request(socket)
        try:
            n = int(request)
            fact = factorial(n)
            send_response(socket, "{n}! = {f}".format(n=n, f=fact))
        except Exception as e:
            send_response(socket, "Wrong input")
 
 
start_server()

Zdrojový kód klienta:

import zmq
 
PORT = 5556
 
 
def connect(port, connection_type):
    """Otevření socketu se specifikovaným typem spojení."""
    context = zmq.Context()
    socket = context.socket(connection_type)
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
    return socket
 
 
def send_request(socket, request):
    """Poslání požadavku."""
    print("Sending request '{r}'".format(r=request))
    socket.send_string(request)
 
 
def start_client():
    """Spuštění klienta."""
    socket = connect(PORT, zmq.REQ)
 
    send_request(socket, "1")
    print(socket.recv_string())
    print()
 
    send_request(socket, "10")
    print(socket.recv_string())
    print()
 
    send_request(socket, "xyzzy")
    print(socket.recv_string())
    print()
 
    send_request(socket, "-10")
    print(socket.recv_string())
    print()
 
    send_request(socket, "100")
    print(socket.recv_string())
    print()
 
 
start_client()

A konečně vlastní zařízení typu Queue:

import zmq
 
XREP_PORT = 5556
XREQ_PORT = 5557
 
 
def create_queue(xrep_port, xreq_port):
    """Vytvoření fronty."""
    context = zmq.Context()
 
    frontend = context.socket(zmq.XREP)
    address = "tcp://*:{port}".format(port=xrep_port)
    frontend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=xrep_port))
 
    backend = context.socket(zmq.XREQ)
    address = "tcp://*:{port}".format(port=xreq_port)
    backend.bind(address)
    print("Bound to {a} on port {p}".format(a=address, p=xreq_port))
 
    zmq.device(zmq.QUEUE, frontend, backend)
 
 
create_queue(XREP_PORT, XREQ_PORT)

8. Úprava fronty takovým způsobem, aby se použilo zařízení PyZMQ

Úprava při použití zařízení PyZMQ bude spočívat se sloučení kódu klienta a fronty do následujícího skriptu:

import zmq
from zmq.decorators import context
from zmq.devices.basedevice import ProcessDevice
from math import factorial
 
XREP_PORT = 5556
XREQ_PORT = 5557
 
SERVER_PORT = 5557
 
 
def create_queue(xrep_port, xreq_port):
    """Vytvoření fronty."""
    queue_device = ProcessDevice(zmq.QUEUE, zmq.XREP, zmq.XREQ)
 
    frontend_address = "tcp://*:{port}".format(port=xrep_port)
    backend_address = "tcp://*:{port}".format(port=xreq_port)
 
    queue_device.bind_in(frontend_address)
    queue_device.bind_out(backend_address)
 
    print("Bound to {a} on port {p}".format(a=frontend_address, p=xrep_port))
    print("Bound to {a} on port {p}".format(a=backend_address, p=xreq_port))
 
    queue_device.start()
    print("Queue device started in background")
 
 
def connect(context, port, connection_type):
    """Otevření socketu se specifikovaným typem spojení."""
    socket = context.socket(connection_type)
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
    return socket
 
 
def send_response(socket, response):
    """Odeslání odpovědi."""
    print("Sending response '{r}'".format(r=response))
    socket.send_string(response)
 
 
def receive_request(socket):
    """Zpracování požadavku klienta."""
    request = socket.recv_string()
    print("Received request from client: '{r}'".format(r=request))
    return request
 
 
def start_server(context, port):
    """Spuštění serveru."""
    socket = connect(context, port, zmq.REP)
    while True:
        request = receive_request(socket)
        try:
            n = int(request)
            fact = factorial(n)
            send_response(socket, "{n}! = {f}".format(n=n, f=fact))
        except Exception as e:
            send_response(socket, "Wrong input")
 
 
@context()
def start_device_and_server(context):
    create_queue(XREP_PORT, XREQ_PORT)
    start_server(context, SERVER_PORT)
 
 
start_device_and_server()

9. Kooperace mezi 0MQ a aplikacemi naprogramovanými v Javě

V závěrečné části dnešního článku si ukážeme, jakým způsobem je možné ØMQ využít v Javě a (nepřímo) i v dalších programovacích jazycích, které jsou postaveny nad virtuálním strojem Javy (Groovy, Scale, Clojure, pravděpodobně i dnes již poněkud obstarožní Jython atd.). Připomeňme si, že ØMQ je naprogramována v jazyce C++ a existuje pro ni nativní rozhraní jak pro C, tak i pro C++. Díky tomu je relativně přímočaré použít rozhraní JNI (Java Native Interface) pro volání nativního kódu z Javy. Tento přístup je použit v knihovně JZMQ, jejíž zdrojové kódy naleznete v GitHub repositáři https://github.com/zeromq/jzmq. Předností tohoto způsobu je automaticky zaručená kompatibilita s ostatními aplikacemi a nástroji, které ØMQ používají. Nevýhodou pak především složitější překlad a instalace JZMQ (ostatně většinou se při kombinaci nativních knihoven a JVM dostaneme do podobné situace).

10. Knihovna JeroMQ

Pokud vám z nějakého důvodu knihovna JZMQ nebude vyhovovat, například při požadavku na to, že se může nasazovat jen bajtkód JVM, můžete využít alternativní knihovnu nazvanou JeroMQ. Tato knihovna, jejíž repositář je umístěn na adrese https://github.com/zeromq/jeromq, reimplementuje celou infrastrukturu ØMQ, ovšem na rozdíl od jazyka C++ je použita Java. Tím se podařilo obejít JNI, zjednodušit celý proces překladu a instalace knihovny atd. Možná poněkud paradoxní je, že výsledek nemusí být pomalejší, než nativní varianta ØMQ, a to z toho důvodu, že JNI je samo o sobě dosti problematické a pomalé, takže záleží především na tom, jak často se nativní funkce volají a jak velké datové struktury se jim předávají. Viz též několik porovnání dostupných na wiki https://github.com/zeromq/je­romq/wiki/Performance.

Knihovnu JeroMQ můžeme buď přidat do svého pom.xml (pokud používáte Maven popř. systém postavený na Mavenu), popř. si pro pouhé otestování můžete stáhnout již připravený java archiv s přeloženou knihovnou:

wget https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/jeromq-0.4.4-20180323.141003-1.jar

Tento druhý způsob využijeme v dalších příkladech.

11. Komunikace typu REQ-REP: klient naprogramovaný v Pythonu a server v Javě

Ukažme si nyní, jak je možné komunikovat mezi klientem naprogramovaným v Pythonu a používajícím nativní ØMQ na jedné straně, se serverem naprogramovaným v Javě a používajícím JeroMQ na straně druhé. Využijeme základní komunikační strategii REQ-REP, s níž jsme již dobře seznámeni.

Jak jsme si již řekli v úvodním odstavci, je klient vytvořen v Pythonu a prozatím v něm nenajdeme žádnou novou funkcionalitu:

import zmq
 
 
def connect(port, connection_type):
    """Otevření socketu se specifikovaným typem spojení."""
    context = zmq.Context()
    socket = context.socket(connection_type)
    address = "tcp://localhost:{port}".format(port=port)
    socket.connect(address)
    print("Connected to {a}".format(a=address))
    return socket
 
 
def send_request(socket, request):
    """Poslání požadavku."""
    print("Sending request '{r}'".format(r=request))
    socket.send_string(request)
 
 
def start_client():
    """Spuštění klienta."""
    socket = connect(5555, zmq.REQ)
 
    send_request(socket, "1")
    print(socket.recv_string())
    print()
 
    send_request(socket, "10")
    print(socket.recv_string())
    print()
 
    send_request(socket, "xyzzy")
    print(socket.recv_string())
    print()
 
 
start_client()

Naproti tomu pro implementaci serveru použijeme Javu. Server bude přijímat požadavky a pokud bude v požadavku předáno celé číslo (zapsané do řetězce), vypočte server faktoriál tohoto čísla a vrátí výsledek. V případě jakéhokoli problému se vrátí zpráva s informací o tom, proč k problému došlo. Zdrojový kód serveru může vypadat následovně:

import org.zeromq.ZMQ;
import org.zeromq.ZContext;
 
public class Server
{
    public static long factorial(long n)
    {
        int i, result=1;
        for(i=2; i<=n; i++) {
            result *= i;
        }
        return result;
    }
 
    public static void main(String[] args) throws Exception
    {
        try (ZContext context = new ZContext()) {
            ZMQ.Socket socket = context.createSocket(ZMQ.REP);
            socket.bind("tcp://*:5555");
 
            while (!Thread.currentThread().isInterrupted()) {
                byte[] raw_request = socket.recv(0);
                String request = new String(raw_request, ZMQ.CHARSET);
                String response = null;
 
                System.out.println("Received: " + request);
 
                try {
                    long n = Integer.parseInt(request);
                    if (n < 0) {
                        response = "Invalid input!";
                    } else {
                        response = n + "! = " + Server.factorial(n);
                    }
                }
                catch (Exception e) {
                    response = "Wrong input!";
                }
 
                socket.send(response.getBytes(ZMQ.CHARSET), 0);
            }
        }
    }
}

Oproti kódu naprogramovanému v Pythonu je zde provedeno několik změn, které se týkají kódování a dekódování řetězců. Musíme mít totiž na paměti, že v ØMQ se přenáší zprávy obsahující pouze sekvenci bajtů a o případnou interpretaci těchto bajtů se musíme postarat sami. Týká se to především interpretace konce řetězce a potom taktéž kódování, resp. přesněji řečeno mapování mezi bajty a znaky (typicky v Unicode). Proto můžeme v kódu výše vidět dekódování přijatého požadavku a na konci kódování řetězce se zprávou zpět na pole bajtů (byte[]).

12. Otestování komunikace klient-server

V případě, že jste si již v rámci předchozí kapitoly stáhli java archiv s přeloženou knihovnou JeroMQ, může překlad serveru proběhnout takto:

javac -cp jeromq-0.4.4-20180323.141003-1.jar Server.java

Spuštění serveru:

java -cp .:jeromq-0.4.4-20180323.141003-1.jar Server

Nyní můžeme spustit klienta a sledovat, že se serverem skutečně dokáže bez problémů komunikovat, i když se na každé komunikující straně používá zcela odlišná implementace ØMQ:

$ python3 client.py
 
Connected to tcp://localhost:5555
Sending request '1'
1! = 1
 
Sending request '10'
10! = 3628800
 
Sending request 'xyzzy'
Wrong input!

Logy na straně serveru:

Received: 1
Received: 10
Received: xyzzy

13. Portace klienta do Javy

Samozřejmě nám nic nebrání přepsat do Javy i samotného klienta. Jedna z možných reimplementací může vypadat následovně:

import org.zeromq.ZMQ;
import org.zeromq.ZContext;
 
public class Client
{
 
    static void send_request(ZMQ.Socket socket, int number)
    {
        String request = String.valueOf(number);
 
        socket.send(request.getBytes(ZMQ.CHARSET), 0);
        System.out.println("Sent: " + request);
 
        byte[] raw_response = socket.recv(0);
        String response = new String(raw_response, ZMQ.CHARSET);
        System.out.println("Received from server: " + response);
    }
 
    public static void main(String[] args) throws Exception
    {
        try (ZContext context = new ZContext()) {
            ZMQ.Socket socket = context.createSocket(ZMQ.REQ);
            socket.connect("tcp://localhost:5555");
            for (int i=0; i<10; i++) {
                send_request(socket, i);
            }
            send_request(socket, -10);
        }
    }
}

Překlad klienta:

java -cp .:jeromq-0.4.4-20180323.141003-1.jar Client

Spuštění nové verze klienta:

java -cp .:jeromq-0.4.4-20180323.141003-1.jar Client

14. Komunikační strategie typu PUSH-PULL: přepis producenta zpráv do Javy

Další příklad, který přepíšeme z Pythonu do Javy, se skládá z producenta a konzumenta zpráv, kteří mezi sebou komunikují s využitím strategie PUSH-PULL, kterou jsme si připomněli mj. i v úvodních kapitolách. Javovská implementace producenta zpráv vznikla prakticky přímým přepisem původního kódu napsaného v Pythonu (samozřejmě až na větší „ukecanost“Javy a striktní typování):

import org.zeromq.ZMQ;
import org.zeromq.ZContext;
 
public class Producer
{
    static final int PORT = 5555;
 
    static void send_message(ZMQ.Socket socket, String message)
    {
        socket.send(message.getBytes(ZMQ.CHARSET), 0);
    }
 
    public static void main(String[] args) throws Exception
    {
        try (ZContext context = new ZContext()) {
            ZMQ.Socket socket = context.createSocket(ZMQ.PUSH);
 
            String address = "tcp://*:" + PORT;
            socket.bind(address);
            System.out.println("Bound to address " + address);
 
            for (int i=0; i<100; i++) {
                String message = "Messsage #" + i;
                send_message(socket, message);
                try {
                    Thread.sleep(50);
                }
                catch (InterruptedException e) {
                    System.out.println("Interrupted");
                    return;
                }
            }
        }
    }
}

Spuštění producenta se nebude nijak lišit od předchozích implementací serveru či klienta – pouze musíme zajistit, aby na classpath byl uložen mj. i Java archiv s knihovnou JeroMQ:

java -cp .:jeromq-0.4.4-20180323.141003-1.jar Producer

15. Komunikační strategie typu PUSH-PULL: přepis konzumenta zpráv do Javy

I při přepisu konzumenta zpráv jsme vycházeli z originálního skriptu vytvořeného v Pythonu. Povšimněte si, že se v javovské verzi konzumenta opět převádí pole bajtů na řetězec:

import org.zeromq.ZMQ;
import org.zeromq.ZContext;
 
public class Consumer
{
    static final int PORT = 5555;
 
    static void receive_messages(ZMQ.Socket socket)
    {
        int cnt = 0;
        while (true) {
            byte[] raw_message = socket.recv(0);
            cnt++;
            String message = new String(raw_message, ZMQ.CHARSET);
            System.out.println("Received message: " + message);
        }
    }
 
    public static void main(String[] args) throws Exception
    {
        try (ZContext context = new ZContext()) {
            ZMQ.Socket socket = context.createSocket(ZMQ.PULL);
 
            String address = "tcp://localhost:" + PORT;
            socket.connect(address);
            System.out.println("Connected to " + address);
 
            System.out.println("Waiting for message...");
 
            receive_messages(socket);
        }
    }
}

Spuštění konzumenta:

java -cp .:jeromq-0.4.4-20180323.141003-1.jar Consumer

Ukázka vzájemného posílání zpráv od producenta ke konzumentovi:

$ ./run_producer.sh 
Bound to address tcp://*:5555
$ ./run_consumer.sh 
Connected to tcp://localhost:5555
Waiting for message...
Received message: Messsage #0
Received message: Messsage #1
Received message: Messsage #2
...
...
...
Received message: Messsage #97
Received message: Messsage #98
Received message: Messsage #99

16. Rozdělení zátěže mezi větší počet konzumentů (workerů)

Nic nám samozřejmě nebrání spustit větší množství konzumentů zpráv (workerů) a tím pádem rozdělit zátěž – úkoly vytvářené producentem. Zkusme si například pustit tři workery a sledovat algoritmus round robin v praxi:

Worker #1:

$ ./run_consumer.sh 
 
Connected to tcp://localhost:5555
Waiting for message...
Received message: Messsage #0
Received message: Messsage #1
Received message: Messsage #4
Received message: Messsage #7
...
...
...
Received message: Messsage #91
Received message: Messsage #94
Received message: Messsage #97

Worker #2:

$ ./run_consumer.sh 
 
Connected to tcp://localhost:5555
Waiting for message...
Received message: Messsage #2
Received message: Messsage #5
Received message: Messsage #8
...
...
...
Received message: Messsage #92
Received message: Messsage #95
Received message: Messsage #98

Worker #3:

$ ./run_consumer.sh 
 
Connected to tcp://localhost:5555
Waiting for message...
Received message: Messsage #3
Received message: Messsage #6
Received message: Messsage #9
...
...
...
Received message: Messsage #93
Received message: Messsage #96
Received message: Messsage #99

17. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu a taktéž v programovacím jazyku C byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.

Příklad Skript/kód Popis Cesta
1 producer.py producent na začátku pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/producer.py
1 worker1.py první worker jako součást pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/worker1.py
1 worker2.py druhý worker jako součást pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/worker2.py
1 collector.py shromažďovač výsledků na konci pipeline https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example15_push_pull_pi­peline/collector.py
       
2 streamer.py použití zařízení typu streamer https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example23_push_pull_stre­amer/streamer.py
2 producer.py komunikační strategie PUSH-PULL, producent https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example23_push_pull_stre­amer/producer.py
2 consumer.py komunikační strategie PUSH-PULL, konzument https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example23_push_pull_stre­amer/consumer.py
       
3 streamer.py použití zařízení typu streamer (producent a streamer současně) https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example24_push_pull_stre­amer_device/streamer.py
3 consumer.py komunikační strategie PUSH-PULL, konzument https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example24_push_pull_stre­amer_device/consumer.py
       
4 server.py použití zařízení typu queue (server a queue současně) https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example25_queue_device/ser­ver.py
4 client.py komunikační strategie REQ-REP, klient https://github.com/tisnik/message-queues-examples/blob/master/0mq/pyt­hon/example25_queue_device/cli­ent.py
       
5 client.py klient napsaný v Pythonu (REQ-REP) https://github.com/tisnik/message-queues-examples/blob/master/0mq/ja­va/client.py
5 Server.java server napsaný v Javě (REQ-REP) https://github.com/tisnik/message-queues-examples/blob/master/0mq/ja­va/Server.java
       
6 Client.java klient napsaný v Javě (REQ-REP) https://github.com/tisnik/message-queues-examples/blob/master/0mq/java/req-rep-pure-java/Client.java
6 Server.java server napsaný v Javě (REQ-REP) https://github.com/tisnik/message-queues-examples/blob/master/0mq/java/req-rep-pure-java/Server.java
       
7 Producer.java producent zpráv naprogramovaný v Javě https://github.com/tisnik/message-queues-examples/blob/master/0mq/java/push-pull/Producer.java
7 Consumer.java konzument zpráv naprogramovaný v Javě https://github.com/tisnik/message-queues-examples/blob/master/0mq/java/push-pull/Consumer.java

18. Odkazy na předchozí části seriálu

V této kapitole jsou uvedeny odkazy na všech osm předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:

  1. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
  2. Celery: systém implementující asynchronní fronty úloh pro Python
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/
  3. Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/
  4. RabbitMQ: jedna z nejúspěšnějších implementací brokera
    https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/
  5. Pokročilejší operace nabízené systémem RabbitMQ
    https://www.root.cz/clanky/po­krocilejsi-operace-nabizene-systemem-rabbitmq/
  6. ØMQ: knihovna pro asynchronní předávání zpráv
    https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/
  7. Další možnosti poskytované knihovnou ØMQ
    https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/

19. Odkazy na Internetu

  1. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  2. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  3. ØMQ – Distributed Messaging
    http://zeromq.org/
  4. ØMQ Community
    http://zeromq.org/community
  5. Get The Software
    http://zeromq.org/intro:get-the-software
  6. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  7. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  8. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  9. ZeroMQ RFC
    https://rfc.zeromq.org/
  10. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  11. zeromq/czmq
    https://github.com/zeromq/czmq
  12. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  13. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  14. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  15. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  16. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  17. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  18. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  19. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  20. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  21. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  22. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  23. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  24. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  25. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  26. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  27. Python binding
    http://zeromq.org/bindings:python
  28. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  29. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  30. About Nanomsg
    https://nanomsg.org/
  31. Advanced Message Queuing Protocol
    https://www.amqp.org/
  32. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  33. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  34. RabbitMQ
    https://www.rabbitmq.com/
  35. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  36. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  37. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  38. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  39. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  40. Erlang
    http://www.erlang.org/
  41. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  42. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  43. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  44. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  45. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  46. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  47. celery na PyPi
    https://pypi.org/project/celery/
  48. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  49. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  50. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  51. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  52. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  53. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  54. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  55. node-celery
    https://github.com/mher/node-celery
  56. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  57. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  58. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  59. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  60. Stránky projektu Redis
    https://redis.io/
  61. Introduction to Redis
    https://redis.io/topics/introduction
  62. Try Redis
    http://try.redis.io/
  63. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  64. Python Redis
    https://redislabs.com/lp/python-redis/
  65. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  66. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  67. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  68. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  69. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  70. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  71. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  72. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  73. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  74. Redis Persistence
    https://redis.io/topics/persistence
  75. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  76. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  77. Ost (knihovna)
    https://github.com/soveran/ost
  78. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  79. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  80. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  81. What Is Sharding?
    https://btcmanager.com/what-sharding/
  82. Redis clients
    https://redis.io/clients
  83. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  84. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  85. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  86. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  87. Resque
    https://github.com/resque/resque
  88. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  89. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  90. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  91. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  92. Pub/Sub
    https://redis.io/topics/pubsub
  93. ZeroMQ distributed messaging
    http://zeromq.org/
  94. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  95. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  96. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  97. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  98. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  99. Redis Protocol specification
    https://redis.io/topics/protocol
  100. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  101. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  102. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  103. Apache Kafka
    https://kafka.apache.org/
  104. Iron
    http://www.iron.io/mq
  105. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  106. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  107. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  108. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  109. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  110. Enqueueing internals
    http://python-rq.org/contrib/
  111. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  112. Queues
    http://queues.io/
  113. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  114. RestMQ
    http://restmq.com/
  115. ActiveMQ
    http://activemq.apache.org/
  116. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  117. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  118. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  119. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  120. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  121. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  122. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  123. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  124. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  125. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  126. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  127. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  128. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  129. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  130. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  131. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  132. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  133. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  134. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  135. Jupyter
    https://jupyter.org/
  136. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  137. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html
Našli jste v článku chybu?