Dnes si ukážeme některé další použití knihovny ØMQ. Nejprve dokončíme popis komunikační strategie PUSH-PULL, ukážeme si zařízení PyZMQ a pak si řekneme, jak je možné použít knihovnu ØMQ v programovacím jazyce Java.
Dnes si ukážeme některé další použití knihovny ØMQ. Nejprve dokončíme popis komunikační strategie PUSH-PULL, ukážeme si zařízení PyZMQ a pak si řekneme, jak je možné použít knihovnu ØMQ v programovacím jazyce Java.
1. Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
2. Pipeline využívající strategii PUSH-PULL
4. Připojení k zařízení typu Streamer
6. Základní rozdíly mezi zařízeními 0MQ a PyZMQ
7. PyZMQ zařízení typu Queue versus ØMQ Queue
8. Úprava fronty takovým způsobem, aby se použilo zařízení PyZMQ
11. Komunikace typu REQ-REP: klient naprogramovaný v Pythonu a server v Javě
12. Otestování komunikace klient-server
14. Komunikační strategie typu PUSH-PULL: přepis producenta zpráv do Javy
15. Komunikační strategie typu PUSH-PULL: přepis konzumenta zpráv do Javy
16. Rozdělení zátěže mezi větší počet konzumentů (workerů)
17. Repositář s demonstračními příklady
18. Odkazy na předchozí části seriálu
Dnešní článek o knihovně ØMQ je rozdělen na tři části. V úvodní části dokončíme popis využití komunikační strategie typu PUSH-PULL, s níž jsme se již v tomto seriálu několikrát setkali, ovšem prozatím jsme si neukázali všechny možnosti, které nám tato strategie nabízí. Na tuto část navážeme popisem vlastností zařízení PyZMQ (PyZMQ devices), která se používají jednodušeji, než zařízení samotné knihovny ØMQ (ØMQ devices). A konečně bude závěrečná část věnována základním způsobům použití knihovny ØMQ, přesněji řečeno její varianty nazvané JeroMQ, z programovacího jazyka Java. Díky závěrečné části článku se nám tedy podaří propojit témata tohoto seriálu a poněkud nepravidelně vycházejících článků o Javě a o virtuálním stroji Javy.
V této kapitole se na chvíli vrátíme k architektuře používající komunikační strategii PUSH-PULL.
Obrázek 1: Jednosměrná komunikace využívající strategii PUSH-PULL.
Minule jsme si ukázali, jakým způsobem je možné implementovat rozložení zátěže (úkolů) mezi větší množství workerů pomocí fan-out (fanout) a následně všechny výsledky práce workerů sloučit a popř. dále zpracovat (uložit atd.) v jediném uzlu (fan-in).
Obrázek 2: Využití většího množství workerů pomocí fan-in a fan-out.
Ovšem s využitím strategie PUSH-PULL je možné realizovat i „kolonu“ (pipeline) složenou z většího množství workerů, kteří si navzájem posílají výsledky své práce. Pojmenování pipeline je v tomto případě příznačné, protože prakticky tu stejnou funkci nám poskytuje shell při použití rour a FIFO. Ukažme si nyní, jak je možné realizovat relativně jednoduchou a přímočarou pipeline, v níž existuje jeden zdroj úkolů, workery dvou typů a na konci sběratel (collector) výsledků:
Obrázek 3: Jednoduchá pipeline.
Ve výše zobrazeném schématu se prozatím nenachází žádné zařízení, což je obecně poněkud problematické, protože to znamená, že jeden z workerů musí vystupovat v roli serveru (a přitom víme, že workeři jsou obecně nestabilním prvkem celého systému, jelikož je typicky připojujeme a odpojujeme podle aktuální zátěže, workeři mohou být restartováni po pádu, po zjištění velké spotřeby paměti atd.).
Implementace uzlu, který vytváří úkoly (job, task) pro jednotlivé workery a používá ve svém zdrojovém kódu dekorátory, vypadá následovně:
import zmq from zmq.decorators import context, socket from time import sleep CONNECTION_TYPE = zmq.PUSH PORT = 5556 def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_producer(context, socket): """Spuštění serveru.""" address = "tcp://*:{port}".format(port=PORT) # socket.set_hwm(1) socket.bind(address) print("Bound to address {a}".format(a=address)) for i in range(11): send_message(socket, "Message #{i}".format(i=i)) sleep(0.1) start_producer()
Implementace prvního workera, který je umístěn do levé části pipeline:
import zmq from zmq.decorators import context, socket from os import getpid IN_PORT = 5556 OUT_PORT = 5557 @context() @socket(zmq.PULL) @socket(zmq.PUSH) def start_worker(context, in_socket, out_socket): """Spuštění workera.""" address = "tcp://localhost:{port}".format(port=IN_PORT) in_socket.connect(address) print("Connected to {a}".format(a=address)) address = "tcp://*:{port}".format(port=OUT_PORT) out_socket.bind(address) print("And to {a}".format(a=address)) print("Waiting for message from producer...") pid = getpid() while True: message = in_socket.recv_string() out_message = "Message from {pid}: '{m}'".format(pid=pid, m=message) print(out_message) out_socket.send_string(out_message) start_worker()
Implementace druhého workera, který je umístěn do pravé části pipeline:
import zmq from zmq.decorators import context, socket from os import getpid IN_PORT = 5557 OUT_PORT = 5558 @context() @socket(zmq.PULL) @socket(zmq.PUSH) def start_worker(context, in_socket, out_socket): """Spuštění workera.""" address = "tcp://localhost:{port}".format(port=IN_PORT) in_socket.connect(address) print("Connected to {a}".format(a=address)) address = "tcp://localhost:{port}".format(port=OUT_PORT) out_socket.connect(address) print("And to {a}".format(a=address)) print("Waiting for message from worker #1...") pid = getpid() while True: message = in_socket.recv_string() out_message = "Message from {pid}: '{m}'".format(pid=pid, m=message) print(out_message) out_socket.send_string(out_message) start_worker()
A nakonec si ukažme implementaci sběratele výsledků:
import zmq from zmq.decorators import context, socket from time import sleep CONNECTION_TYPE = zmq.PULL PORT = 5558 @context() @socket(CONNECTION_TYPE) def start_collector(context, socket): """Spuštění sběratele.""" address = "tcp://*:{port}".format(port=PORT) socket.bind(address) print("Connected to {a}".format(a=address)) print("Waiting for message from worker #2 ...") while True: message = socket.recv_string() print("Collecting message: '{m}'".format(m=message)) sleep(0) start_collector()
Posledním typem zařízení, s nímž se v dnešním článku seznámíme, je zařízení nazvané ZMQ.STREAMER. Toto zařízení se používá ve chvíli, kdy spolu jednotlivé uzly komunikují s využitím strategie PUSH-PULL. Streamer se do celého procesu zpracování přidává jednoduše, protože se na přední straně (frontend) chová jako běžný příjemze zpráv (PULL) a na zadní straně (backend) jako jejich zdroj. Na rozdíl od zařízení Queue tedy nebylo nutné přidat nové typy socketů typu XREP a XREQ:
Obrázek 4: Použití zařízení typu Streamer.
Opět si ukažme princip použití tohoto zařízení. Nejprve vytvoříme skripty se zdrojem zpráv a s jejich cílovým zpracováním. V obou skriptech se bude používat metoda Socket.connect, nikoli Socket.bind. Navíc se budou používat odlišná čísla portů, protože mezi producentem a konzumentem bude umístěno právě zařízení typu Streamer.
Implementace producenta zpráv:
import zmq from zmq.decorators import context, socket import time CONNECTION_TYPE = zmq.PUSH PORT = 5550 def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_producer(port, context, socket): """Spuštění serveru.""" address = "tcp://localhost:{port}".format(port=port) # socket.set_hwm(1) socket.connect(address) print("Connected to address {a}".format(a=address)) for i in range(100): send_message(socket, "Message #{i}".format(i=i)) time.sleep(0.2) start_producer(PORT)
Implementace konzumenta zpráv:
import zmq from zmq.decorators import context, socket import time CONNECTION_TYPE = zmq.PULL PORT = 5551 @context() @socket(CONNECTION_TYPE) def start_consumer(port, context, socket): """Spuštění konzumenta.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") cnt = 0 while True: message = socket.recv_string() cnt += 1 print("Received message {c} of 100: '{m}'".format(c=cnt, m=message)) time.sleep(0) start_consumer(PORT)
Vlastní implementace streameru je vlastně velmi jednoduchá a prakticky se neliší například od implementace zařízení typu Queue. Ostatně se můžeme podívat na zdrojový kód skriptu s implementací, v němž se na portu 5550 navazuje připojení typu PULL a na portu 5551 připojení typu PUSH, tj. přesně tak, jak to bylo naznačeno na čtvrtém obrázku. Na obou stranách se používá metoda Socket.bind, protože streamer bude ústředním a stabilním prvkem celé architektury:
import zmq from zmq.decorators import context, socket PULL_PORT = 5550 PUSH_PORT = 5551 @context() @socket(zmq.PULL) @socket(zmq.PUSH) def create_streamer(pull_port, push_port, context, frontend, backend): """Vytvoření streameru.""" context = zmq.Context() address = "tcp://*:{port}".format(port=pull_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=pull_port)) address = "tcp://*:{port}".format(port=push_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=push_port)) zmq.device(zmq.STREAMER, frontend, backend) create_streamer(PULL_PORT, PUSH_PORT)
V předchozím článku jsme se mj. zabývali i popisem takzvaných zařízení ØMQ. Připomeňme si, že samotná knihovna ØMQ nabízí vývojářům následující zařízení, která se většinou používají při návrhu systémů se složitější architekturou a s požadavkem na jejich větší robustnost:
Zařízení | Stručný popis |
---|---|
ZMQ.QUEUE | prostředník používaný především v komunikaci REQ-REP (klasický klient, server) |
ZMQ.FORWARDER | používá se jako prostředník mezi zdroji zpráv a jejich příjemci PUB-SUB |
ZMQ.STREAMER | používá se v komunikační strategii PUSH-PULL |
Knihovna PyZMQ, která slouží jako rozhraní mezi nativní knihovnou ØMQ a programovacím jazykem Python, programátorům nabízí ještě jednu možnost – takzvaná zařízení PyZMQ (PyZMQ devices). Jejich základní význam je stejný, jako u zařízení ØMQ, ovšem zatímco u zařízení ØMQ jsme museli explicitně otevírat (popř. i zavírat) sockety, u zařízení PyZMQ se pracuje pouze s typy socketů. Všechny operace typu vytvoření socketu, vytvoření zařízení ØMQ atd., knihovna PyZMQ udělá za vývojáře sama, takže výsledný zdrojový kód je mnohem jednodušší a taktéž idiomatičtější (navíc se v naprosté většině případů vyhneme i použití dekorátorů, dokonce ani není nutné explicitně vytvářet instanci kontextu ØMQ).
Navíc je kód reprezentující zařízení spuštěn v samostatném (automaticky vytvořeném) vláknu, což znamená, že budeme moci velmi snadno v jednom skriptu implementovat jak příslušné zařízení, tak například i server či producenta. Z dalších pohledů však zařízení PyZMQ svými vlastnostmi plně odpovídají zařízením ØMQ:
Zařízení ØMQ | Zařízení PyZMQ |
---|---|
zmq.device(zmq.QUEUE, socket, socket) | ProcessDevice(zmq.QUEUE, typ_socketu, typ_socketu) |
zmq.device(zmq.FORWARDER, socket, socket) | ProcessDevice(zmq.FORWARDER, typ_socketu, typ_socketu) |
zmq.device(zmq.STREAMER, socket, socket) | ProcessDevice(zmq.STREAMER, typ_socketu, typ_socketu) |
Podívejme se nyní na praktické rozdíly mezi zařízeními knihovny ØMQ a zařízeními rozhraní PyZMQ z pohledu běžného vývojáře.
Nejdříve si připomeňme, jakým způsobem jsme použili zařízení Streamer, které se používá při komunikaci typu PUSH-PULL. Toto zařízení bude vloženo mezi producenta zpráv a mezi jejich konzumenta.
Samotnou implementaci meziuzlu reprezentovaného zařízením Streamer můžeme přepsat s využitím zařízení PyZMQ. Nejprve musíme importovat třídu ProcessDevice z modulu zmq.devices.basedevice:
from zmq.devices.basedevice import ProcessDevice
Dále zkonstruujeme instanci třídy ProcessDevice, přičemž budeme muset specifikovat jak typ zařízení (Streamer), tak i typ socketu na frontendu i na backendu (skutečně se specifikují pouze typy socketů, nikoli samotné sockety – ty nemusíme explicitně vytvářet):
streamer_device = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
Jakmile je zařízení vytvořeno, musíme nastavit adresy, na nichž zařízení typu Streamer naslouchá. K tomu se používají metody nazvané bind_in a bind_out:
frontend_address = "tcp://*:{port}".format(port=pull_port) backend_address = "tcp://*:{port}".format(port=push_port) streamer_device.bind_in(frontend_address) streamer_device.bind_out(backend_address)
Nakonec pouze zařízení spustíme, takže se interně v novém vláknu (tedy z našeho pohledu na pozadí) spustí nekonečná smyčka zpracovávající zprávy přicházející na frontend a posílající zprávy na backend:
streamer_device.start()
Úplný zdrojový kód meziuzlu se zařízením typu Streamer a současně i s producentem zpráv vypadá takto:
import zmq from zmq.decorators import context, socket from zmq.devices.basedevice import ProcessDevice import time PULL_PORT = 5550 PUSH_PORT = 5551 PRODUCER_CONNECTION_TYPE = zmq.PUSH PRODUCER_PORT = 5550 def create_streamer(pull_port, push_port): """Vytvoření streameru.""" streamer_device = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH) frontend_address = "tcp://*:{port}".format(port=pull_port) backend_address = "tcp://*:{port}".format(port=push_port) streamer_device.bind_in(frontend_address) streamer_device.bind_out(backend_address) print("Bound to {a} on port {p}".format(a=frontend_address, p=pull_port)) print("Bound to {a} on port {p}".format(a=backend_address, p=push_port)) streamer_device.start() print("Device started in background") def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) @socket(PRODUCER_CONNECTION_TYPE) def start_producer(port, context, socket): """Spuštění zdroje zprav.""" address = "tcp://localhost:{port}".format(port=port) # socket.set_hwm(1) socket.connect(address) print("Connected to address {a}".format(a=address)) for i in range(100): send_message(socket, "Message #{i}".format(i=i)) time.sleep(0.2) @context() def start_device_and_producer(context): create_streamer(PULL_PORT, PUSH_PORT) start_producer(PRODUCER_PORT, context) start_device_and_producer()
Kód konzumenta se žádným způsobem nezmění, takže jen pro úplnost:
import zmq from zmq.decorators import context, socket import time CONNECTION_TYPE = zmq.PULL PORT = 5551 @context() @socket(CONNECTION_TYPE) def start_consumer(port, context, socket): """Spuštění konzumenta.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") cnt = 0 while True: message = socket.recv_string() cnt += 1 print("Received message {c} of 100: '{m}'".format(c=cnt, m=message)) time.sleep(0) start_consumer(PORT)
Druhým typem zařízení rozhraní PyZMQ, s nímž se dnes seznámíme, je zařízení typu Queue, které se využívá především v komunikační strategii REP-REQ, s níž jsme se již taktéž setkali. Opět si nejdříve ukažme implementaci jednoduchého serveru a klienta, mezi něž je vložena fronta vytvořená klasicky pomocí funkcí 0MQ:
Server vypadal takto:
import zmq from math import factorial PORT = 5557 def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_response(socket, response): """Odeslání odpovědi.""" print("Sending response '{r}'".format(r=response)) socket.send_string(response) def receive_request(socket): """Zpracování požadavku klienta.""" request = socket.recv_string() print("Received request from client: '{r}'".format(r=request)) return request def start_server(): """Spuštění serveru.""" socket = connect(PORT, zmq.REP) while True: request = receive_request(socket) try: n = int(request) fact = factorial(n) send_response(socket, "{n}! = {f}".format(n=n, f=fact)) except Exception as e: send_response(socket, "Wrong input") start_server()
import zmq PORT = 5556 def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_request(socket, request): """Poslání požadavku.""" print("Sending request '{r}'".format(r=request)) socket.send_string(request) def start_client(): """Spuštění klienta.""" socket = connect(PORT, zmq.REQ) send_request(socket, "1") print(socket.recv_string()) print() send_request(socket, "10") print(socket.recv_string()) print() send_request(socket, "xyzzy") print(socket.recv_string()) print() send_request(socket, "-10") print(socket.recv_string()) print() send_request(socket, "100") print(socket.recv_string()) print() start_client()
A konečně vlastní zařízení typu Queue:
import zmq XREP_PORT = 5556 XREQ_PORT = 5557 def create_queue(xrep_port, xreq_port): """Vytvoření fronty.""" context = zmq.Context() frontend = context.socket(zmq.XREP) address = "tcp://*:{port}".format(port=xrep_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xrep_port)) backend = context.socket(zmq.XREQ) address = "tcp://*:{port}".format(port=xreq_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xreq_port)) zmq.device(zmq.QUEUE, frontend, backend) create_queue(XREP_PORT, XREQ_PORT)
Úprava při použití zařízení PyZMQ bude spočívat se sloučení kódu klienta a fronty do následujícího skriptu:
import zmq from zmq.decorators import context from zmq.devices.basedevice import ProcessDevice from math import factorial XREP_PORT = 5556 XREQ_PORT = 5557 SERVER_PORT = 5557 def create_queue(xrep_port, xreq_port): """Vytvoření fronty.""" queue_device = ProcessDevice(zmq.QUEUE, zmq.XREP, zmq.XREQ) frontend_address = "tcp://*:{port}".format(port=xrep_port) backend_address = "tcp://*:{port}".format(port=xreq_port) queue_device.bind_in(frontend_address) queue_device.bind_out(backend_address) print("Bound to {a} on port {p}".format(a=frontend_address, p=xrep_port)) print("Bound to {a} on port {p}".format(a=backend_address, p=xreq_port)) queue_device.start() print("Queue device started in background") def connect(context, port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_response(socket, response): """Odeslání odpovědi.""" print("Sending response '{r}'".format(r=response)) socket.send_string(response) def receive_request(socket): """Zpracování požadavku klienta.""" request = socket.recv_string() print("Received request from client: '{r}'".format(r=request)) return request def start_server(context, port): """Spuštění serveru.""" socket = connect(context, port, zmq.REP) while True: request = receive_request(socket) try: n = int(request) fact = factorial(n) send_response(socket, "{n}! = {f}".format(n=n, f=fact)) except Exception as e: send_response(socket, "Wrong input") @context() def start_device_and_server(context): create_queue(XREP_PORT, XREQ_PORT) start_server(context, SERVER_PORT) start_device_and_server()
V závěrečné části dnešního článku si ukážeme, jakým způsobem je možné ØMQ využít v Javě a (nepřímo) i v dalších programovacích jazycích, které jsou postaveny nad virtuálním strojem Javy (Groovy, Scale, Clojure, pravděpodobně i dnes již poněkud obstarožní Jython atd.). Připomeňme si, že ØMQ je naprogramována v jazyce C++ a existuje pro ni nativní rozhraní jak pro C, tak i pro C++. Díky tomu je relativně přímočaré použít rozhraní JNI (Java Native Interface) pro volání nativního kódu z Javy. Tento přístup je použit v knihovně JZMQ, jejíž zdrojové kódy naleznete v GitHub repositáři https://github.com/zeromq/jzmq. Předností tohoto způsobu je automaticky zaručená kompatibilita s ostatními aplikacemi a nástroji, které ØMQ používají. Nevýhodou pak především složitější překlad a instalace JZMQ (ostatně většinou se při kombinaci nativních knihoven a JVM dostaneme do podobné situace).
Pokud vám z nějakého důvodu knihovna JZMQ nebude vyhovovat, například při požadavku na to, že se může nasazovat jen bajtkód JVM, můžete využít alternativní knihovnu nazvanou JeroMQ. Tato knihovna, jejíž repositář je umístěn na adrese https://github.com/zeromq/jeromq, reimplementuje celou infrastrukturu ØMQ, ovšem na rozdíl od jazyka C++ je použita Java. Tím se podařilo obejít JNI, zjednodušit celý proces překladu a instalace knihovny atd. Možná poněkud paradoxní je, že výsledek nemusí být pomalejší, než nativní varianta ØMQ, a to z toho důvodu, že JNI je samo o sobě dosti problematické a pomalé, takže záleží především na tom, jak často se nativní funkce volají a jak velké datové struktury se jim předávají. Viz též několik porovnání dostupných na wiki https://github.com/zeromq/jeromq/wiki/Performance.
Knihovnu JeroMQ můžeme buď přidat do svého pom.xml (pokud používáte Maven popř. systém postavený na Mavenu), popř. si pro pouhé otestování můžete stáhnout již připravený java archiv s přeloženou knihovnou:
wget https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/jeromq-0.4.4-20180323.141003-1.jar
Tento druhý způsob využijeme v dalších příkladech.
Ukažme si nyní, jak je možné komunikovat mezi klientem naprogramovaným v Pythonu a používajícím nativní ØMQ na jedné straně, se serverem naprogramovaným v Javě a používajícím JeroMQ na straně druhé. Využijeme základní komunikační strategii REQ-REP, s níž jsme již dobře seznámeni.
Jak jsme si již řekli v úvodním odstavci, je klient vytvořen v Pythonu a prozatím v něm nenajdeme žádnou novou funkcionalitu:
import zmq def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_request(socket, request): """Poslání požadavku.""" print("Sending request '{r}'".format(r=request)) socket.send_string(request) def start_client(): """Spuštění klienta.""" socket = connect(5555, zmq.REQ) send_request(socket, "1") print(socket.recv_string()) print() send_request(socket, "10") print(socket.recv_string()) print() send_request(socket, "xyzzy") print(socket.recv_string()) print() start_client()
Naproti tomu pro implementaci serveru použijeme Javu. Server bude přijímat požadavky a pokud bude v požadavku předáno celé číslo (zapsané do řetězce), vypočte server faktoriál tohoto čísla a vrátí výsledek. V případě jakéhokoli problému se vrátí zpráva s informací o tom, proč k problému došlo. Zdrojový kód serveru může vypadat následovně:
import org.zeromq.ZMQ; import org.zeromq.ZContext; public class Server { public static long factorial(long n) { int i, result=1; for(i=2; i<=n; i++) { result *= i; } return result; } public static void main(String[] args) throws Exception { try (ZContext context = new ZContext()) { ZMQ.Socket socket = context.createSocket(ZMQ.REP); socket.bind("tcp://*:5555"); while (!Thread.currentThread().isInterrupted()) { byte[] raw_request = socket.recv(0); String request = new String(raw_request, ZMQ.CHARSET); String response = null; System.out.println("Received: " + request); try { long n = Integer.parseInt(request); if (n < 0) { response = "Invalid input!"; } else { response = n + "! = " + Server.factorial(n); } } catch (Exception e) { response = "Wrong input!"; } socket.send(response.getBytes(ZMQ.CHARSET), 0); } } } }
Oproti kódu naprogramovanému v Pythonu je zde provedeno několik změn, které se týkají kódování a dekódování řetězců. Musíme mít totiž na paměti, že v ØMQ se přenáší zprávy obsahující pouze sekvenci bajtů a o případnou interpretaci těchto bajtů se musíme postarat sami. Týká se to především interpretace konce řetězce a potom taktéž kódování, resp. přesněji řečeno mapování mezi bajty a znaky (typicky v Unicode). Proto můžeme v kódu výše vidět dekódování přijatého požadavku a na konci kódování řetězce se zprávou zpět na pole bajtů (byte[]).
V případě, že jste si již v rámci předchozí kapitoly stáhli java archiv s přeloženou knihovnou JeroMQ, může překlad serveru proběhnout takto:
javac -cp jeromq-0.4.4-20180323.141003-1.jar Server.java
Spuštění serveru:
java -cp .:jeromq-0.4.4-20180323.141003-1.jar Server
Nyní můžeme spustit klienta a sledovat, že se serverem skutečně dokáže bez problémů komunikovat, i když se na každé komunikující straně používá zcela odlišná implementace ØMQ:
$ python3 client.py Connected to tcp://localhost:5555 Sending request '1' 1! = 1 Sending request '10' 10! = 3628800 Sending request 'xyzzy' Wrong input!
Logy na straně serveru:
Received: 1 Received: 10 Received: xyzzy
Samozřejmě nám nic nebrání přepsat do Javy i samotného klienta. Jedna z možných reimplementací může vypadat následovně:
import org.zeromq.ZMQ; import org.zeromq.ZContext; public class Client { static void send_request(ZMQ.Socket socket, int number) { String request = String.valueOf(number); socket.send(request.getBytes(ZMQ.CHARSET), 0); System.out.println("Sent: " + request); byte[] raw_response = socket.recv(0); String response = new String(raw_response, ZMQ.CHARSET); System.out.println("Received from server: " + response); } public static void main(String[] args) throws Exception { try (ZContext context = new ZContext()) { ZMQ.Socket socket = context.createSocket(ZMQ.REQ); socket.connect("tcp://localhost:5555"); for (int i=0; i<10; i++) { send_request(socket, i); } send_request(socket, -10); } } }
Překlad klienta:
java -cp .:jeromq-0.4.4-20180323.141003-1.jar Client
Spuštění nové verze klienta:
java -cp .:jeromq-0.4.4-20180323.141003-1.jar Client
Další příklad, který přepíšeme z Pythonu do Javy, se skládá z producenta a konzumenta zpráv, kteří mezi sebou komunikují s využitím strategie PUSH-PULL, kterou jsme si připomněli mj. i v úvodních kapitolách. Javovská implementace producenta zpráv vznikla prakticky přímým přepisem původního kódu napsaného v Pythonu (samozřejmě až na větší „ukecanost“Javy a striktní typování):
import org.zeromq.ZMQ; import org.zeromq.ZContext; public class Producer { static final int PORT = 5555; static void send_message(ZMQ.Socket socket, String message) { socket.send(message.getBytes(ZMQ.CHARSET), 0); } public static void main(String[] args) throws Exception { try (ZContext context = new ZContext()) { ZMQ.Socket socket = context.createSocket(ZMQ.PUSH); String address = "tcp://*:" + PORT; socket.bind(address); System.out.println("Bound to address " + address); for (int i=0; i<100; i++) { String message = "Messsage #" + i; send_message(socket, message); try { Thread.sleep(50); } catch (InterruptedException e) { System.out.println("Interrupted"); return; } } } } }
Spuštění producenta se nebude nijak lišit od předchozích implementací serveru či klienta – pouze musíme zajistit, aby na classpath byl uložen mj. i Java archiv s knihovnou JeroMQ:
java -cp .:jeromq-0.4.4-20180323.141003-1.jar Producer
I při přepisu konzumenta zpráv jsme vycházeli z originálního skriptu vytvořeného v Pythonu. Povšimněte si, že se v javovské verzi konzumenta opět převádí pole bajtů na řetězec:
import org.zeromq.ZMQ; import org.zeromq.ZContext; public class Consumer { static final int PORT = 5555; static void receive_messages(ZMQ.Socket socket) { int cnt = 0; while (true) { byte[] raw_message = socket.recv(0); cnt++; String message = new String(raw_message, ZMQ.CHARSET); System.out.println("Received message: " + message); } } public static void main(String[] args) throws Exception { try (ZContext context = new ZContext()) { ZMQ.Socket socket = context.createSocket(ZMQ.PULL); String address = "tcp://localhost:" + PORT; socket.connect(address); System.out.println("Connected to " + address); System.out.println("Waiting for message..."); receive_messages(socket); } } }
Spuštění konzumenta:
java -cp .:jeromq-0.4.4-20180323.141003-1.jar Consumer
Ukázka vzájemného posílání zpráv od producenta ke konzumentovi:
$ ./run_producer.sh Bound to address tcp://*:5555
$ ./run_consumer.sh Connected to tcp://localhost:5555 Waiting for message... Received message: Messsage #0 Received message: Messsage #1 Received message: Messsage #2 ... ... ... Received message: Messsage #97 Received message: Messsage #98 Received message: Messsage #99
Nic nám samozřejmě nebrání spustit větší množství konzumentů zpráv (workerů) a tím pádem rozdělit zátěž – úkoly vytvářené producentem. Zkusme si například pustit tři workery a sledovat algoritmus round robin v praxi:
Worker #1:
$ ./run_consumer.sh Connected to tcp://localhost:5555 Waiting for message... Received message: Messsage #0 Received message: Messsage #1 Received message: Messsage #4 Received message: Messsage #7 ... ... ... Received message: Messsage #91 Received message: Messsage #94 Received message: Messsage #97
Worker #2:
$ ./run_consumer.sh Connected to tcp://localhost:5555 Waiting for message... Received message: Messsage #2 Received message: Messsage #5 Received message: Messsage #8 ... ... ... Received message: Messsage #92 Received message: Messsage #95 Received message: Messsage #98
Worker #3:
$ ./run_consumer.sh Connected to tcp://localhost:5555 Waiting for message... Received message: Messsage #3 Received message: Messsage #6 Received message: Messsage #9 ... ... ... Received message: Messsage #93 Received message: Messsage #96 Received message: Messsage #99
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu a taktéž v programovacím jazyku C byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.
V této kapitole jsou uvedeny odkazy na všech osm předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:
Pavel Tišnovský vystudoval VUT FIT a v současné době pracuje ve společnosti Red Hat, kde vyvíjí nástroje pro OpenShift.io.
Internet Info Root.cz (www.root.cz)
Informace nejen ze světa Linuxu. ISSN 1212-8309
Copyright © 1998 – 2019 Internet Info, s.r.o. Všechna práva vyhrazena.