Hlavní navigace

Implementace různých komunikačních strategií s využitím knihovny nanomsg

Pavel Tišnovský

V seriálu o message brokerech a k nim přidružených technologiích jsme se mj. seznámili i s knihovnou ZeroMQ. Ideovým následovníkem této knihovny je projekt nazvaný nanomsg s jehož základními možnostmi se seznámíme v dnešním článku.

Doba čtení: 43 minut

Sdílet

11. Vylepšení obou uzlů – přidání kontroly návratových kódů všech funkcí

12. Oboustranná komunikace mezi dvěma uzly (strategie PAIR)

13. Realizace prvního uzlu (klienta)

14. Realizace druhého uzlu (serveru)

15. Refaktoring klienta i serveru

16. Chování uzlů ve chvíli, kdy zvolíme špatnou komunikační strategii

17. Využití komunikační strategie PUBSUB

18. Repositář s demonstračními příklady

19. Odkazy na předchozí části seriálu o message brokerech

20. Odkazy na Internetu

1. Implementace různých komunikačních strategií s využitím knihovny nanomsg

V dnešní části seriálu o message brokerech i o technologiích, které s message brokery souvisí, se seznámíme s projektem nazvaným nanomsg. Jedná se o následovníka (stále velmi často používané) knihovny ZeroMQ neboli ØMQ, s níž jsme se seznámili v trojici článků [1] [2] a taktéž [3]. Připomeňme si ve stručnosti, k čemu je vlastně knihovna ØMQ určena. Jedná se o relativně nízkoúrovňovou knihovnu vyvinutou v programovacím jazyce C++, která vývojářům nabízí implementaci různých tzv. komunikačních strategií. Tyto strategie je možné využít a popř. i vhodně zkombinovat při implementaci aplikací se složitější architekturou, v níž mezi sebou jednotlivé části komunikují s využitím synchronních či asynchronních zpráv, popř. přes takzvané brokery.

Poznámka: ØMQ je skutečná nativní knihovna, nikoli služba či démon. Proto se nijak nespouští; její funkce jsou volány konkrétními implementacemi serverů a klientů.

Na knihovnu ØMQ se ovšem můžeme dívat i z jiného pohledu, protože se jedná o abstrakci nad klasickými Berkeley sockety, ovšem s mnoha vylepšeními. V ØMQ je totiž možné zprávy odesílat asynchronně; samotné zpracování zpráv je provedeno na pozadí (ve vlastním vláknu), nemusíme se starat o délku zpráv ani o jejich případnou fragmentaci a do určité míry jsme odstíněni od toho, jaký konkrétní protokol bude pro komunikaci použit (IPC, TCP, atd.). Toto zjednodušení se ještě více projeví v těch programovacích jazycích, které se mohou postarat o automatické uvolňování prostředků (což je mj. i případ Pythonu, v němž je vytvořeno prvních pět demonstračních příkladů, s nimiž se seznámíme v navazujících kapitolách).

Poznámka: ve skutečnosti není knihovna nanomsg jediným následovníkem projektu ZeroMQ, protože stejný autor pracuje na novějším (prozatím ne zcela dokončeném) projektu s výmluvným názvem nng neboli nanomsg-next-generation, který naleznete na GitHubu, konkrétně na stránce https://github.com/nanomsg/nng.

2. Rozdíly mezi projekty nanomsg a ZeroMQ

Mezi projekty nanomsg a ZeroMQ přirozeně najdeme několik rozdílů, které jsou velmi pěkně shrnuty na stránce https://nanomsg.org/documentation-zeromq.html. Jedná se o následující podstatné rozdíly:

  1. Nepoužívá se žádný kontext, tj. programátor nemusí volat funkcezmq_ctx_new a zmq_ctx_destroy.
  2. Samotné sockety vytvářené funkcí nn_socket jsou reprezentovány celým číslem; záporné číslo značí chybu.
  3. Sémantika funkcí nn_send, nn_recv atd. odpovídá POSIXu.
  4. Změnil se i samotný implementační jazyk, protože se přešlo od C++ k čistému céčku (což je minimálně zajímavý směr vývoje).
  5. Počet komunikačních strategií se rozšířil na šest, viz též navazující kapitoly.
  6. Při práci se zprávami je podporováno „zero-copy“, což je technika, kterou si vysvětlíme příště.

3. Základní koncepty, na nichž je knihovna nanomsg postavena

V knihovně nanomsg jsou připraveny funkce navržené takovým způsobem, aby se s jejich využitím daly implementovat protokoly založené na jedné ze šesti základních komunikačních strategiích (neboli komunikačních vzorech). Jména a stručné charakteristiky těchto strategií naleznete v následující tabulce:

# Strategie/vzor Stručný popis významu strategie
1 PAIR jedna z nejjednodušších komunikačních strategií s dvojicí uzlů a vazbou 1:1; komunikace je obecně obousměrná (samozřejmě lze použít i komunikaci jednosměrnou)
2 BUS složitější strategie, v níž se používá obecnější vazba M:N; tuto strategii si popíšeme příště
3 PUBSUB klasická komunikační strategie PUB-SUB neboli PUBLISH-SUBSCRIBE
4 REQREP klasická komunikační strategie REQ-REP neboli REQUEST-RESPONSE, opět bude popsána příště
5 PIPELINE jednosměrná komunikace buď s vazbami 1:1 (jeden vysílač a jeden přijímač), popř. mezi více vysílači a několika přijímači
6 SURVEY speciální strategie umožňující získat stav více uzlů (procesů) jediným dotazem a mnoha odpovědmi; tato zcela nová strategie bude popsána v navazujícím článku

U jednotlivých strategií/vzorů se v čase běhu aplikace kontroluje, zda jsou použity správně. Příkladem může být strategie pojmenovaná PIPELINE, v níž jeden komunikující uzel používá socket typu PUSH pro vysílání zpráv a druhý uzel naopak socket typu PULL pro příjem zpráv. V případě, že se pokusíme poslat zprávu opačným směrem, dojde k běhové chybě, kterou lze snadno detekovat, což si ukážeme v šestnácté kapitole.

Poznámka: s jednotlivými strategiemi se podrobněji seznámíme při popisu jednotlivých demonstračních příkladů v navazujících kapitolách.

Jen pro připomenutí, jaké strategie poskytuje původní knihovna ØMQ:

  1. PAIR – jednosměrné či obousměrné propojení dvou procesů, z nichž každý může běžet na odlišném počítači. Tato strategie se nejvíce přibližuje běžnému použití klasických Berkeley socketů. Prakticky stejná strategie, i když implementovaná odlišným způsobem, je součástí knihovny nanomsg.
  2. REQ-REP – jedná se o komunikaci typu požadavek-odpověď. Požadavky posílají klienti, odpovědi generuje server, který dokáže obsloužit prakticky libovolné množství klientů. Podobnou strategii nalezneme i v dnes popisované knihovně nanomsg pod názvem REQREP.
  3. PUB-SUB – server zde publikuje zprávy, k jejichž odběru se mohou přihlásit různí klienti. Zprávy je možné filtrovat na straně klientů (tato vlastnost se ovšem ve starších verzích ØMQ odlišuje). I tuto strategii nalezneme v knihovně nanomsg, pouze pod nepatrně odlišným názvem.
  4. PUSH-PULL – rozšíření předchozí strategie PUB-SUB: server či servery vytváří zprávy zpracovávané buď přímo připojenými workery nebo celou kolonou (pipeline) workerů. Tato strategie je v knihovně nanomsg rozšířena a zobecněna ve strategii pojmenované PIPELINE.

4. Podporované přenosové mechanismy

Aplikace (tj. jak klienti, tak i servery), které spolu mají komunikovat s využitím knihovny nanomsg, mohou využívat různé přenosové mechanismy. Ty jsou vypsány v následující tabulce, přičemž jsou jednotlivé mechanismy seřazeny podle své „lokality“. Například přenosový mechanismus nazvaný INPROC je použitelný pouze v případě, že spolu komunikují jednotlivé části aplikace běžící v rámci stejného procesu, typicky každá ve svém vlastním vláknu. Menší „lokalitu“ nabízí přenosový mechanismus pojmenovaný IPC, jenž umožňuje komunikovat mezi několika procesy, které ovšem musí běžet na stejném počítači. Pro komunikaci mezi aplikacemi běžícími na různých počítačích v rámci intranetu či internetu se používá buď klasické TCP (tj. skutečné přenosy dat přes protokol TCP) nebo WS (web sockety):

# Přenosový mechanismus Stručný popis
1 INPROC komunikace v rámci jednoho procesu, například typizovaná komunikace mezi několika vlákny (obecně nejrychlejší řešení)
2 IPC komunikace mezi několika procesy běžícími na jednom počítači
3 TCP komunikace mezi procesy běžícími na různých počítačích s využitím protokolu TCP
4 WS komunikace mezi procesy běžícími na různých počítačích s využitím web socketů
Poznámka: v dnešních demonstračních příkladech využijeme mechanismus IPC, protože komunikující uzly budou spouštěny na jednom počítači a navíc nebudeme muset řešit problematiku nastavení firewallu. Jednoduchou úpravou URL je však možné příklady pozměnit takovým způsobem, aby spolu komunikovaly například s využitím protokolu TCP.

5. Rozhraní pro další programovací jazyky

Jak jsme si již řekli v úvodních kapitolách, je knihovna nanomsg vyvinuta v programovacím jazyku C a z tohoto důvodu je jejím primárním rozhraním právě API vázané na céčko. Ovšem to pochopitelně v žádném případě neznamená, že by tuto užitečnou knihovnu nebylo možné použít i v dalších programovacích jazycích. Ve skutečnosti je tomu právě naopak, protože existuje celá řada již hotových a otestovaných rozhraní, které nanomsg zpřístupní i vývojářům, kteří z různých důvodů preferují jiné programovací jazyky. Tato rozhraní jsou vypsána v následující tabulce:

Jazyk (platforma) Knihovna/projekt s rozhraním
C nanomsg
  NNG (nová reimplementace, viz poznámka v úvodní kapitole)
C++ nanomsgxx
  cppnanomsg
  nngpp (pro NNG)
Clojure jnanomsg (voláno přes standardní Java interop)
D nanomsg-wrapper
Dylan nanomsg-dylan
Erlang enm
Fortran nanofort
Go mangos (reimplementace v Go)
  mangos v2 (druhá verze)
  go-nanomsg
Haskell nanomsg-haskell
  nanomsg
Haxe hx-nanomsg
Swift swiftc nanomsg
Java jnano
  jnanomsg
  nngjvm (pro NNG)
JavaScript (Node.js) node-nanomsg
Lua lua-nanomsg
  luajit-nanomsg (pro systém LuaJIT)
  luananomsg
.NET NNanomsg
Ocaml onanomsg
Perl NanoMsg::Raw
PHP php-nano
PicoLisp picolisp-nanomsg FFI bindings
Python nanomsg-python
  pynanomsg
  nnpy
  pynng (pro NNG, prozatím ve vývoji)
R rnanomsg
Ruby nn-core
  nanomsg
Rust rust-nanomsg
  nng-rs (opět pro NNG)
Scheme (CHICKEN) chicken-nanomsg
Smalltalk NanoStrand
Poznámka: ve skutečnosti jsou některé výše uvedené projekty reimplementacemi knihovny nanomsg. Jedná se především o projekt mangos v2
, k němuž se ještě v tomto seriálu vrátíme.

6. Instalace nanomsg ze zdrojových kódů

Knihovnu nanomsg i k ní příslušející nástroje lze nainstalovat buď ze zdrojových kódů nebo z repositářů Linuxové distribuce. Vzhledem k tomu, že v distribučních balíčcích nemusí být dostupná vždy poslední verze této knihovny, si ukážeme, jak se nanomsg překládá ze zdrojových kódů. Není to ve skutečnosti nic těžkého; budeme potřebovat pouze překladač céčka (například GCC), linker a taktéž nástroje make a CMake. Pokud budeme chtít, aby se při překladu vytvořily i soubory s nápovědou, je vyžadován nástroj asciidoctor, ovšem v tomto případě se jedná pouze o volitelný krok (překlad i bez asciidoctoru proběhne bez problémů).

Nejdříve je nutné stáhnout archiv se zdrojovými kódy knihovny a dalších k ní příslušejících nástrojů. V dnešním článku se zabýváme poslední stabilní verzí 1.1.5, která je dostupná na adrese https://github.com/nanomsg/na­nomsg/archive/1.1.5.zip. Z této adresy archiv získáme takto:

$ wget https://github.com/nanomsg/nanomsg/archive/1.1.5.zip
 
--2019-04-11 20:03:44--  https://github.com/nanomsg/nanomsg/archive/1.1.5.zip
Resolving github.com (github.com)... 192.30.253.113, 192.30.253.112
Connecting to github.com (github.com)|192.30.253.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://codeload.github.com/nanomsg/nanomsg/zip/1.1.5 [following]
--2019-04-11 20:03:45--  https://codeload.github.com/nanomsg/nanomsg/zip/1.1.5
Resolving codeload.github.com (codeload.github.com)... 192.30.253.120, 192.30.253.121
Connecting to codeload.github.com (codeload.github.com)|192.30.253.120|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [application/zip]
Saving to: ‘1.1.5.zip’
 
2019-04-11 20:03:46 (1,16 MB/s) - ‘1.1.5.zip’ saved [661276]

Ve druhém kroku archiv běžným způsobem rozbalíme, což je triviální:

$ unzip 1.1.5.zip 
 
Archive:  1.1.5.zip
1749fd7b039165a91b8d556b4df18e3e632ad830
   creating: nanomsg-1.1.5/

Následně přejdeme do adresáře nanomsg-1.1.5, který vznikl po rozbalení archivu:

$ cd nanomsg-1.1.5/

Nyní je již možné přistoupit k vlastnímu překladu. Necháme si vygenerovat soubor Makefile s využitím utility cmake:

$ cmake .
 
-- The C compiler identification is GNU 7.3.1
 
-- Performing Test NN_HAVE_GCC_ATOMIC_BUILTINS
-- Performing Test NN_HAVE_GCC_ATOMIC_BUILTINS - Success
CMake Warning at CMakeLists.txt:294 (message):
  Could not find asciidoctor: skipping docs
 
 
  -- Configuring done
  -- Generating done
  -- Build files have been written to: /home/ptisnovs/nanomsg-1.1.5
Poznámka: případného upozornění na to, že není možné vygenerovat dokumentaci, se můžeme zbavit nainstalováním asciidoctoru (ovšem na vývojovém počítači tento nástroj z několika důvodů nemám).

Nyní již máme soubor Makefile vytvořený, takže je možné spustit nástroj make, který provede vlastní překlad a slinkování výsledné knihovny i podpůrných nástrojů (samotný překlad bude na moderním HW trvat několik sekund):

$ make
 
Scanning dependencies of target nanomsg
[  1%] Building C object src/CMakeFiles/nanomsg.dir/core/ep.c.o
[  1%] Building C object src/CMakeFiles/nanomsg.dir/core/global.c.o
[  2%] Building C object src/CMakeFiles/nanomsg.dir/core/pipe.c.o
[  2%] Building C object src/CMakeFiles/nanomsg.dir/core/poll.c.o
[  3%] Building C object src/CMakeFiles/nanomsg.dir/core/sock.c.o
...
...
...
[100%] Linking C executable symbol
[100%] Built target symbol

Výsledkem překladu by měly být minimálně tyto soubory:

Soubor Stručný popis
/usr/local/lib64/libnanomsg.so.5.1.0 samotná knihovna s implementací všech přenosových mechanismů
/usr/local/lib64/libnanomsg.so.5 symbolický odkaz na předchozí soubor
/usr/local/lib64/libnanomsg.so symbolický odkaz na předchozí soubor, typicky se právě tento soubor předává linkeru
   
/usr/local/include/nanomsg/nn.h hlavičkový soubor se základními funkcemi a datovými typy knihovny nanomsg
   
/usr/local/include/nanomsg/pair.h konstanty a funkce používané v komunikační strategii PAIR
/usr/local/include/nanomsg/pipeline.h konstanty a funkce používané v komunikační strategii PIPELINE
/usr/local/include/nanomsg/pubsub.h konstanty a funkce používané v komunikační strategii PUBSUB
/usr/local/include/nanomsg/survey.h konstanty a funkce používané v komunikační strategii SURVEY
/usr/local/include/nanomsg/reqrep.h konstanty a funkce používané v komunikační strategii REQREP
/usr/local/include/nanomsg/bus.h konstanty a funkce používané v komunikační strategii BUS
Poznámka: výše uvedené cesty jsou pochopitelně platné na 64bitových architekturách. Na architekturách 32bitových se namísto lib64 použije prosté lib.

7. Demonstrační příklady ukazující základní komunikační vzory (strategie)

V navazujících kapitolách si ukážeme demonstrační příklady, v nichž postupně použijeme základní komunikační vzory (neboli komunikační strategie), které nám knihovna nanomsg nabízí. Všechny příklady budou vyvinuty v programovacím jazyku C (jsou kompatibilní s ANSI C), protože rozhraními pro další programovací jazyky se budeme zabývat v navazující části tohoto seriálu. Příklady jsou navíc napsány s ohledem na co největší stručnost zápisu, takže u některých z nich nejsou provedeny všechny kontroly chyb, jež by se ovšem v produkčním kódu pochopitelně měly použít (na případné možnosti vylepšení se ovšem taktéž zaměříme).

Pro překlad, slinkování a spuštění je u každého příkladu použit Makefile, takže využijete příkaz/nástroj make, který by měl být nainstalován (ovšem bez tohoto nástroje nebude možné zkompilovat ani nainstalovat samotnou knihovnu nanomsg, viz též předchozí kapitolu).

Ve všech dále popisovaných aplikacích se používá stejný postup:

  1. Nejprve je nutné vytvořit a inicializovat socket, a to s využitím funkce nazvané nn_socket.
  2. Socket se na straně serveru otevře funkcí nn_bind, zatímco u klienta se používá funkce nn_connect. Rozdíl mezi oběma funkcemi odpovídá postupu při navazování připojení: server naslouchá na určitém portu (pokud se jedná o TCP), zatímco klient se k portu připojuje. To, který komunikující uzel bude serverem a který klientem, je většinou ponecháno na rozhodnutí programátora.
  3. Následně je již možné posílat zprávy, a to buď jedním směrem (strategiePIPELINE apod.) nebo směry oběma (strategie PAIR apod.). Pro posílání zpráv je určena funkce nn_send, pro jejich příjem pak funkce pojmenovaná nn_recv. Použití „recv“ namísto celého slova „receive“ vychází ze snahy sémanticky se přiblížit klasickým Berkeley socketům.
  4. Nakonec se spojení ukončí zavoláním funkcenn_shutdown.

Jen pro připomenutí si ukažme, do jaké míry je odlišné pojetí knihovny ØMQ. Tato knihovna totiž používá takzvaný kontext, který je nutné vytvořit ještě před konstrukcí socketu a na konci aplikace je ho nutné korektně deaktivovat. Příkladem může být jednoduchý klient přijímající zprávy na lokální adrese a portu 5556:

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <zmq.h>
 
#define BUFFER_LENGTH 32
 
int main()
{
    char buffer[BUFFER_LENGTH];
    char *address = "tcp://localhost:5556";
 
    void *context = zmq_ctx_new();
    void *socket = zmq_socket(context, ZMQ_PAIR);
 
    zmq_connect(socket, address);
    printf("Connected to address %s\n", address);
 
    while (1)
    {
        int num = zmq_recv(socket, buffer, BUFFER_LENGTH-1, 0);
        buffer[num] = '\0';
        printf("Received '%s'\n", buffer);
    }
 
    zmq_close(socket);
    zmq_ctx_destroy(context);
 
    return 0;
}

Vidíme, že základní postup sice skutečně zůstává zachován: zmq_socket, zmq_connect, zmq_send/zmq_recv a konečně zmq_close, ovšem navíc se musíme postarat o kontext pomocí funkcí zmq_ctx_new a zmq_ctx_destroy.

Poznámka: názvy funkcí knihovny ØMQ začínají prefixem „zmq_“, zatímco v případě knihovny nanomsg je to prefix „nn_“.

8. Aplikace posílající zprávy s využitím strategie PIPELINE

Strategie nazvaná PIPELINE zajišťuje jednosměrný přenos zpráv od vysílající aplikace (vlákna, procesu) k aplikaci přijímající. V tom nejjednodušším případě existuje pouze jediný vysílač (neboli zdroj zpráv, producent) a jediný přijímač (konzument). Nejprve se podívejme na implementaci producenta, která je nepatrně jednodušší, zejména s ohledem na to, že se nemusí pracovat s bufferem pro příjem zprávy. Zdrojový kód naleznete na adrese úplný zdrojový kód producenta; podrobnější popis jednotlivých kroků bude uveden pod tímto výpisem:

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
 
const char *URL = "ipc:///tmp/example1";
 
void sender(const char *url, const char *message)
{
    int message_size = strlen(message) + 1;
    int socket;
    int endpoint;
    int bytes;
 
    socket = nn_socket(AF_SP, NN_PUSH);
    puts("Socket created");
 
    endpoint = nn_connect(socket, url);
    puts("Remote endpoint added to the socket");
 
    printf("Sending message '%s'\n", message);
    bytes = nn_send(socket, message, message_size, 0);
 
    printf("Message with length %d bytes sent, flushing", bytes);
    sleep(1);
    puts("Done");
 
    nn_shutdown(socket, endpoint);
}
 
int main(const int argc, const char **argv)
{
    sender(URL, "Hello");
    sender(URL, "world");
    sender(URL, "!");
 
    return 0;
}

Popišme si nyní jednotlivé části producenta. Zajímavé je už určení URL použité pro připojení ke konzumentovi. Povšimněte si, že se URL skládá z určení komunikačního (přenosového) mechanismu zmíněného ve čtvrté kapitole, za nímž následují znaky „://“ (bez uvozovek). Za těmito znaky již následuje konkrétní určení adresy, které je ovšem závislé na použitém přenosovém mechanismu. U IPC se jedná o jméno speciálního souboru, v našem případě o jméno „/tmp/example1“ (proto se v URL nachází trojice lomítek za sebou, i když jsou podporovány i relativní cesty):

const char *URL = "ipc:///tmp/example1";
Poznámka: na systému Windows se používají pojmenované roury, takže například zápis adresy „ipc://test“ ve skutečnosti znamená pojmenovanou rouru „\\.\pipe\test“.

Samotná implementace producenta se nachází ve funkci sender. Nejprve vytvoříme socket s uvedením jeho typu, což je u strategie PIPELINE typ NN_PUSH u producenta a NN_PULL u konzumenta:

socket = nn_socket(AF_SP, NN_PUSH);

Dále k socketu přiřadíme koncový bod specifikovaný adresou (URL):

endpoint = nn_connect(socket, url);

V této chvíli je již možné přes socket posílat zprávy případným konzumentům. Poslání zprávy je realizováno funkcí nn_send, které se předá jak socket, tak i vlastní zpráva současně s její délkou. Vzhledem k tomu, že zpráva je považována za sekvenci bajtů a nikoli za řetězec ukončený nulou (ASCIIZ), je nutné délku skutečně vypočítat a specifikovat:

bytes = nn_send(socket, message, message_size, 0);

Dále – což je ovšem velké zjednodušení – je nutné počkat, až se zpráva skutečně přenese z interního bufferu, který je knihovnou nanomsg udržován. Pro dosažení co největší jednoduchosti použijeme funkci sleep:

sleep(1);

Zcela poslední operací je uzavření připojení a všech alokovaných prostředků, což zajišťuje funkce nn_shutdown:

nn_shutdown(socket, endpoint);

Tímto způsobem jsou poslány celkem tři zprávy.

9. Aplikace přijímající zprávy s využitím strategie PIPELINE

Podobným způsobem je možné realizovat klienta, který bude zprávy přijímat. Vysílací strana používá socket typu NN_PUSH, strana přijímací tedy použije socket NN_PULL:

socket = nn_socket(AF_SP, NN_PULL);

Odlišné bude i navázání připojení. Ve vysílači jsme použili nn_connect, zde tedy musíme pro zachování symetrie použít nn_bind (popř. volání otočit, podle toho, který z klientů je stabilnější a který tedy má plnit funkci serveru):

nn_bind(socket, url);

Zpráva se čte/přijímá funkcí nn_recv, které musíme předat identifikátor socketu, adresu proměnné, která bude obsahovat ukazatel na automaticky alokovaný buffer (jedná se tedy o ukazatel na ukazatel), délku zprávy popř. konstantu NN_MSG pokud se má buffer alokovat automaticky a konečně případné příznaky:

char *message = NULL;
int bytes = nn_recv(socket, &message, NN_MSG, 0);

Vzhledem k tomu, že byl buffer alokován automaticky knihovnou nanomsg, musíme zaručit i jeho pozdější dealokaci, a to zavoláním:

nn_freemsg(message);

Úplný zdrojový kód přijímače naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/01_simple_sender_recei­ver/receiver.c. Jeho ukončení se provede například pomocí Ctrl+C nebo příkazem kill:

#include <stdio.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
 
const char *URL = "ipc:///tmp/example1";
 
void receiver(const char *url)
{
    int socket;
 
    socket = nn_socket(AF_SP, NN_PULL);
    puts("Socket created");
 
    nn_bind(socket, url);
    puts("Endpoint bound to socket");
 
    puts("Waiting for messages...");
    while (1) {
        char *message = NULL;
        int bytes = nn_recv(socket, &message, NN_MSG, 0);
        printf("Received message '%s' with length %d bytes\n", message, bytes);
        nn_freemsg(message);
    }
}
 
int main(int argc, char **argv)
{
    receiver(URL);
    return 0;
}

10. Soubor Makefile pro překlad obou klientů

Na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/01_simple_sender_recei­ver/Makefile naleznete soubor Makefile určený pro překlad obou klientů, tj. jak vysílače, tak i přijímače:

CC=gcc
LINKER=gcc
 
LIBS=nanomsg
 
CFLAGS=-O0 -Wall -ansi -pedantic
LFLAGS=-l$(LIBS)
 
LIBRARY_PATH=/usr/local/lib64/
 
.PHONY: clean run_sender run_receiver
 
all:    sender receiver
 
%.o:    %.c
        $(CC) -c -o $@ $(CFLAGS) $<
 
sender: sender.o
        $(CC) -o $@ $(LFLAGS) $<
 
receiver:       receiver.o
        $(CC) -o $@ $(LFLAGS) $<
 
clean:
        rm -f sender.o \
        rm -f receiver.o \
        rm -f sender \
        rm -f receiver
 
run_sender:
        LD_LIBRARY_PATH=$(LIBRARY_PATH) ./sender
 
run_receiver:
        LD_LIBRARY_PATH=$(LIBRARY_PATH) ./receiver

Samotný překlad obou klientů zajistí příkaz:

$ make

Spuštění přijímače v jednom terminálu se provede příkazem:

$ make run_receiver

Spuštění vysílače pak příkazem:

$ make run_sender
Poznámka: tyto dva cíle jsou použity z toho důvodu, aby se správně nastavila proměnná prostředí LD_LIBRARY_PATH, protože jinak by nemusela být knihovna libnanomsg.5 nalezena (ovšem zde již hodně záleží na nastavení systému i způsobu instalace knihovny).

11. Vylepšení obou uzlů – přidání kontroly návratových kódů všech funkcí

Oba uzlů je vhodné vylepšit, a to takovým způsobem, že se přidají kontroly návratových kódů všech funkcí z knihovny nanomsg. Většina těchto funkcí vrací záporné číslo v případě chyby, což je stav, který můžeme velmi snadno otestovat. Navíc ještě přidáme jak do vysílače, tak i do přijímače volání funkce nn_shutdown, aby se skutečně provedlo korektní odpojení obou klientů (to sice není zcela striktně vyžadováno, ovšem jedná se o dobré vychování programátora).

Nejprve si ukažme upravený zdrojový kód vysílače:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
 
const char *URL = "ipc:///tmp/example2";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void sender(const char *url, const char *message)
{
    int message_size = strlen(message) + 1;
    int socket;
    int endpoint;
    int bytes;
 
    if ((socket = nn_socket(AF_SP, NN_PUSH)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_connect(socket, url)) < 0) {
        report_error("nn_connect");
    }
    puts("Remote endpoint added to the socket");
 
    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, message_size, 0)) < 0) {
        report_error("nn_send");
    }
 
    printf("Message with length %d bytes sent, flushing", bytes);
    sleep(1);
    puts("Done");
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
}
 
int main(const int argc, const char **argv)
{
    sender(URL, "Hello");
    sender(URL, "world");
    sender(URL, "!");
 
    return 0;
}

Následuje upravený zdrojový kód přijímače:

#include <stdlib.h>
#include <stdio.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
 
const char *URL = "ipc:///tmp/example2";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void receiver(const char *url)
{
    int socket;
    int endpoint;
 
    if ((socket = nn_socket(AF_SP, NN_PULL)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    puts("Endpoint bound to socket");
 
    puts("Waiting for messages...");
    while (1) {
        char *message = NULL;
        int bytes;
        if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
            report_error("nn_recv");
        }
        printf("Received message '%s' with length %d bytes\n", message, bytes);
        if (nn_freemsg(message) < 0) {
            report_error("nn_freemsg");
        }
    }
}
 
int main(int argc, char **argv)
{
    receiver(URL);
    return 0;
}

12. Oboustranná komunikace mezi dvěma uzly (strategie PAIR)

Druhá komunikační strategie, s níž se v dnešním článku alespoň ve stručnosti seznámíme, se jmenuje PAIR. Tato strategie umožňuje, aby mezi sebou jednotlivé uzly komunikovaly oboustranně, což nebylo při použití strategie PIPELINE možné (přesněji možné bylo, ovšem otevřením dvou komunikačních kanálů, což je velmi křehké řešení). I u strategie PAIR vystupuje jeden z komunikujících uzlů ve funkci serveru (otevírá svůj port a očekává, že se na něj připojí klient) a druhý uzel ve funkci klienta. Ovšem až na tento rozdíl jsou si po navázání spojení oba uzly rovnocenné, tj. každý z nich může vysílat i přijímat zprávy, a to libovolným způsobem, který si zvolí sám programátor. Ten například může implementovat jednoduchý systém typu dotaz-odpověď (což je ovšem lepší realizovat strategií REQREP) či skutečně použít plnohodnotný oboustranný komunikační kanál.

13. Realizace prvního uzlu (klienta)

Podívejme se nejprve na způsob realizace prvního uzlu, který je naprogramován jako klient, protože používá funkci nn_connect a nikoli nn_bind. Po navázání připojení pošle klient zprávu serveru a očekává jeho odpověď, opět ve funkci běžné zprávy. To, že se jedná o odpověď, je tedy řešeno čistě logikou aplikace. Pořadí volání funkcí knihovny nanomsg:

  1. nn_socket
  2. nn_connect
  3. nn_send
  4. nn_recv + nn_freemsg
  5. nn_shutdown

Úplný zdrojový kód klienta naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/03_simple_pair_communi­cation/first.c:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pair.h>
 
const char *URL = "ipc:///tmp/example3";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void first(const char *url)
{
    int socket;
    int endpoint;
    int bytes;
    char *message = NULL;
    char *response = NULL;
 
    if ((socket = nn_socket(AF_SP, NN_PAIR)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_connect(socket, url)) < 0) {
        report_error("nn_connect");
    }
    puts("Remote endpoint added to the socket");
 
    message = "Hello from 'first'!";
    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
 
    printf("Message with length %d bytes sent, flushing\n", bytes);
    sleep(1);
 
    puts("Waiting for response...");
    response = NULL;
    if ((bytes = nn_recv(socket, &response, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received response '%s' with length %d bytes\n", response, bytes);
    if (nn_freemsg(response) < 0) {
        report_error("nn_freemsg");
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(const int argc, const char **argv)
{
    first(URL);
    return 0;
}

14. Realizace druhého uzlu (serveru)

Druhý uzel je realizován jako server, opět z toho důvodu, že používá funkci nn_bind a nikoli nn_connect. Jedná se o velmi primitivní server, který přijme zprávu od klienta a následně mu pošle textovou odpověď „ACK!“. Pořadí volání funkcí knihovny nanomsg je nyní odlišné:

  1. nn_socket
  2. nn_bind
  3. nn_recv + nn_freemsg
  4. nn_send
  5. nn_shutdown

Úplný zdrojový kód serveru naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/03_simple_pair_communi­cation/second.c:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pair.h>
 
const char *URL = "ipc:///tmp/example3";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void second(const char *url)
{
    int socket;
    int endpoint;
    int bytes;
    char *message = NULL;
    char *response = NULL;
 
    if ((socket = nn_socket(AF_SP, NN_PAIR)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    puts("Endpoint bound to socket");
 
    puts("Waiting for message...");
    if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received message '%s' with length %d bytes\n", message, bytes);
    if (nn_freemsg(message) < 0) {
        report_error("nn_freemsg");
    }
 
    response = "ACK!";
 
    printf("Sending response '%s'\n", response);
    if ((bytes = nn_send(socket, response, strlen(response)+1, 0)) < 0) {
        report_error("nn_send");
    }
 
    printf("Response with length %d bytes sent, flushing\n", bytes);
    sleep(1);
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(int argc, char **argv)
{
    second(URL);
    return 0;
}

15. Refaktoring klienta i serveru

Oba výše realizované uzly, tj. jak klienta, tak i server, můžeme upravit takovým způsobem, aby se namísto dlouhé „špagety“ s voláním funkcí knihovny nanomsg použilo několik specializovaných uživatelských funkcí. Například klient bude používat funkce se sémanticky správnými názvy send_message a receive_response. Jeho úplný zdrojový kód bude vypadat následovně:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pair.h>
 
const char *URL = "ipc:///tmp/example4";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void send_message(const int socket, const char *message)
{
    int bytes;
 
    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Message with length %d bytes sent, flushing\n", bytes);
    sleep(1);
}
 
void receive_response(socket)
{
    char *response = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &response, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received response '%s' with length %d bytes\n", response, bytes);
    if (nn_freemsg(response) < 0) {
        report_error("nn_freemsg");
    }
}
 
void first(const char *url)
{
    int socket;
    int endpoint;
 
    if ((socket = nn_socket(AF_SP, NN_PAIR)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_connect(socket, url)) < 0) {
        report_error("nn_connect");
    }
    puts("Remote endpoint added to the socket");
 
    send_message(socket, "Hello from 'first'!");
 
    puts("Waiting for response...");
    receive_response(socket);
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(const int argc, const char **argv)
{
    first(URL);
    return 0;
}

V případě serveru použijeme naopak uživatelské funkce pojmenované receive_message a send_response, takže jeho modifikovaný zdrojový kód bude vypadat takto:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pair.h>
 
const char *URL = "ipc:///tmp/example4";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void receive_message(socket)
{
    char *message = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received message '%s' with length %d bytes\n", message, bytes);
    if (nn_freemsg(message) < 0) {
        report_error("nn_freemsg");
    }
}
 
void send_response(const int socket, const char *response)
{
    int bytes;
 
    printf("Sending response '%s'\n", response);
    if ((bytes = nn_send(socket, response, strlen(response)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Response with length %d bytes sent, flushing\n", bytes);
    sleep(1);
}
 
void second(const char *url)
{
    int socket;
    int endpoint;
 
    if ((socket = nn_socket(AF_SP, NN_PAIR)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    puts("Endpoint bound to socket");
 
    puts("Waiting for message...");
    receive_message(socket);
 
    send_response(socket, "ACK!");
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(int argc, char **argv)
{
    second(URL);
    return 0;
}

16. Chování uzlů ve chvíli, kdy zvolíme špatnou komunikační strategii

Nyní si zkusíme otestovat, co se stane ve chvíli, kdy při implementaci jednotlivých uzlů zvolíme špatnou komunikační strategii. Příkladem může být klient z předchozího příkladu, u nějž ovšem namísto strategie PAIR zvolíme strategii PIPELINE. Úprava (resp. přesněji řečeno rozbití) zdrojového kódu je přímočará:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
 
const char *URL = "ipc:///tmp/example5";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void send_message(const int socket, const char *message)
{
    int bytes;
 
    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Message with length %d bytes sent, flushing\n", bytes);
    sleep(1);
}
 
void receive_response(socket)
{
    char *response = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &response, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received response '%s' with length %d bytes\n", response, bytes);
    if (nn_freemsg(response) < 0) {
        report_error("nn_freemsg");
    }
}
 
void first(const char *url)
{
    int socket;
    int endpoint;
 
    if ((socket = nn_socket(AF_SP, NN_PULL)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_connect(socket, url)) < 0) {
        report_error("nn_connect");
    }
    puts("Remote endpoint added to the socket");
 
    send_message(socket, "Hello from 'first'!");
 
    puts("Waiting for response...");
    receive_response(socket);
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(const int argc, const char **argv)
{
    first(URL);
    return 0;
}

Při pokusu o spuštění klienta získáme pouze informaci o tom, že zvolená kombinace není správná – ve strategii PIPELINE není možné uplatnit obousměrnou komunikaci:

Socket created
Remote endpoint added to the socket
Sending message 'Hello from 'first'!'
nn_send: Operation not supported

Podobným způsobem samozřejmě můžeme upravit/rozbít i serverovou část:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pipeline.h>
 
const char *URL = "ipc:///tmp/example5";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void receive_message(socket)
{
    char *message = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received message '%s' with length %d bytes\n", message, bytes);
    if (nn_freemsg(message) < 0) {
        report_error("nn_freemsg");
    }
}
 
void send_response(const int socket, const char *response)
{
    int bytes;
 
    printf("Sending response '%s'\n", response);
    if ((bytes = nn_send(socket, response, strlen(response)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Response with length %d bytes sent, flushing\n", bytes);
    sleep(1);
}
 
void second(const char *url)
{
    int socket;
    int endpoint;
 
    if ((socket = nn_socket(AF_SP, NN_PUSH)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    puts("Endpoint bound to socket");
 
    puts("Waiting for message...");
    receive_message(socket);
 
    send_response(socket, "ACK!");
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(int argc, char **argv)
{
    second(URL);
    return 0;
}

Ani pokus o spuštění serveru nedopadne zcela korektně, což je patrné z následujícího výpisu:

Socket created
Endpoint bound to socket
Waiting for message...
nn_recv: Operation not supported

17. Využití komunikační strategie PUBSUB

V samotném závěru článku si ještě ukážeme způsob využití komunikační strategie PUBSUB, s níž jsme se již v tomto seriálu mnohokrát setkali. Tato strategie umožňuje rozesílat zprávy libovolnému množství příjemců. Zdroj zpráv bude v tomto případě implementován jako server a typ použitého socketu bude NN_PUB. Podívejme se na zdrojový kód producenta/zdroje zpráv. Vidíme, že producent posílá PIN všech platebních karet:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
 
const char *URL = "ipc:///tmp/example6";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void send_message(const int socket, const char *message)
{
    int bytes;
 
    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Message with length %d bytes sent, flushing\n", bytes);
    sleep(1);
}
 
void publisher(const char *url)
{
    int socket;
    int endpoint;
 
    if ((socket = nn_socket(AF_SP, NN_PUB)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    puts("Remote endpoint bound to the socket");
 
    while (1) {
        char buffer[45];
        int number = rand() % 10000;
        sprintf(buffer, "Hello, this is my top secret PIN: %04d", number);
        send_message(socket, buffer);
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(const int argc, const char **argv)
{
    publisher(URL);
    return 0;
}

Konzumenta zpráv je již nutné realizovat nepatrně složitějším způsobem, protože musíme specifikovat téma (topic), které má být přijímáno. Pro jednoduchost se přihlásíme k příjmu všech témat:

if (nn_setsockopt(socket, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
    report_error("nn_setsockopt");
}

Samotné zprávy jsou již přijímány klasicky v nekonečné smyčce:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
 
const char *URL = "ipc:///tmp/example6";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void receive_message(socket)
{
    char *message = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received message '%s' with length %d bytes\n", message, bytes);
    if (nn_freemsg(message) < 0) {
        report_error("nn_freemsg");
    }
}
 
void subscriber(const char *url)
{
    int socket;
    int endpoint;
 
    if ((socket = nn_socket(AF_SP, NN_SUB)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if (nn_setsockopt(socket, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
        report_error("nn_setsockopt");
    }
 
    if ((endpoint = nn_connect(socket, url)) < 0) {
        report_error("nn_connect");
    }
    puts("Endpoint connected to socket");
 
    puts("Waiting for messages...");
 
    while (1) {
        receive_message(socket);
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(int argc, char **argv)
{
    subscriber(URL);
    return 0;
}

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce. Každý příklad se skládá ze dvou samostatně překládaných a spouštěných souborů – producenta zpráv a konzumenta zpráv:

Příklad Skript/kód Popis Cesta
1 sender.c klient vysílající zprávy se strategií PIPELINE https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/01_simple_sender_recei­ver/sender.c
1 receiver.c klient přijímající zprávy se strategií PIPELINE https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/01_simple_sender_recei­ver/receiver.c
1 Makefile soubor pro překlad a spuštění obou klientů https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/01_simple_sender_recei­ver/Makefile
       
2 sender.c vylepšený klient vysílající zprávy se strategií PIPELINE https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/02_better_sender_recei­ver/sender.c
2 receiver.c vylepšený klient přijímající zprávy se strategií PIPELINE https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/02_better_sender_recei­ver/receiver.c
2 Makefile soubor pro překlad a spuštění obou klientů https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/02_better_sender_recei­ver/Makefile
       
3 first.c první uzel (klient) používající strategii PAIR https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/03_simple_pair_communi­cation/first.c
3 second.c druhý uzel (server) používající strategii PAIR https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/03_simple_pair_communi­cation/second.c
3 Makefile soubor pro překlad a spuštění obou klientů https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/03_simple_pair_communi­cation/Makefile
       
4 first.c vylepšený první uzel (klient) používající strategii PAIR https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/04_better_pair_communi­cation/first.c
4 second.c vylepšený druhý uzel (server) používající strategii PAIR https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/04_better_pair_communi­cation/second.c
4 Makefile soubor pro překlad a spuštění obou klientů https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/04_better_pair_communi­cation/Makefile
       
5 first.c první uzel se špatně zvolenou komunikační strategií https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/05_pair_communication_wron­g_socket/first.c
5 second.c druhý uzel se špatně zvolenou komunikační strategií https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/05_pair_communication_wron­g_socket/second.c
5 Makefile soubor pro překlad a spuštění obou klientů https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/05_pair_communication_wron­g_socket/Makefile
       
6 publisher.c zdroj zpráv využívající komunikační strategii PUBSUB https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/06_publisher_subscriber/pu­blisher.c
6 subscriber.c příjemce zpráv využívající komunikační strategii PUBSUB https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/06_publisher_subscriber/sub­scriber.c
6 Makefile soubor pro překlad a spuštění obou klientů https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/06_publisher_subscriber/Ma­kefile

19. Odkazy na předchozí části seriálu o message brokerech

V této kapitole jsou uvedeny odkazy na všech čtrnáct předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií message brokerů:

  1. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
  2. Celery: systém implementující asynchronní fronty úloh pro Python
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/
  3. Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/
  4. RabbitMQ: jedna z nejúspěšnějších implementací brokera
    https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/
  5. Pokročilejší operace nabízené systémem RabbitMQ
    https://www.root.cz/clanky/po­krocilejsi-operace-nabizene-systemem-rabbitmq/
  6. ØMQ: knihovna pro asynchronní předávání zpráv
    https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/
  7. Další možnosti poskytované knihovnou ØMQ
    https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/
  8. Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
    https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/
  9. Apache ActiveMQ – další systém implementující message brokera
    https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/
  10. Použití Apache ActiveMQ s protokolem STOMP
    https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/
  11. Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
    https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-amqp-jazyk-go-a-message-brokeri/
  12. Komunikace s message brokery z programovacího jazyka Go
    https://www.root.cz/clanky/komunikace-s-message-brokery-z-programovaciho-jazyka-go/
  13. Použití message brokeru NATS
    https://www.root.cz/clanky/pouziti-message-brokeru-nats/
  14. NATS Streaming Server
    https://www.root.cz/clanky/nats-streaming-server/

20. Odkazy na Internetu

  1. nanomsg na GitHubu
    https://github.com/nanomsg/nanomsg
  2. Referenční příručka knihovny nanomsg
    https://nanomsg.org/v1.1.5/na­nomsg.html
  3. nng (nanomsg-next-generation)
    https://github.com/nanomsg/nng
  4. Differences between nanomsg and ZeroMQ
    https://nanomsg.org/documentation-zeromq.html
  5. NATS
    https://nats.io/about/
  6. NATS Streaming Concepts
    https://nats.io/documenta­tion/streaming/nats-streaming-intro/
  7. NATS Streaming Server
    https://nats.io/download/nats-io/nats-streaming-server/
  8. NATS Introduction
    https://nats.io/documentation/
  9. NATS Client Protocol
    https://nats.io/documenta­tion/internals/nats-protocol/
  10. NATS Messaging (Wikipedia)
    https://en.wikipedia.org/wi­ki/NATS_Messaging
  11. Stránka Apache Software Foundation
    http://www.apache.org/
  12. Informace o portu 5672
    http://www.tcp-udp-ports.com/port-5672.htm
  13. Třída MessagingHandler knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._handlers.MessagingHan­dler-class.html
  14. Třída Event knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._events.Event-class.html
  15. package stomp (Go)
    https://godoc.org/github.com/go-stomp/stomp
  16. Go language library for STOMP protocol
    https://github.com/go-stomp/stomp
  17. python-qpid-proton 0.26.0 na PyPi
    https://pypi.org/project/python-qpid-proton/
  18. Qpid Proton
    http://qpid.apache.org/proton/
  19. Using the AMQ Python Client
    https://access.redhat.com/do­cumentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/
  20. Apache ActiveMQ
    http://activemq.apache.org/
  21. Apache ActiveMQ Artemis
    https://activemq.apache.org/artemis/
  22. Apache ActiveMQ Artemis User Manual
    https://activemq.apache.or­g/artemis/docs/latest/index­.html
  23. KahaDB
    http://activemq.apache.or­g/kahadb.html
  24. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html
  25. Command Line Tools (Apache ActiveMQ)
    https://activemq.apache.org/activemq-command-line-tools-reference.html
  26. stomp.py 4.1.21 na PyPi
    https://pypi.org/project/stomp.py/
  27. Stomp Tutorial
    https://access.redhat.com/do­cumentation/en-US/Fuse_Message_Broker/5.5/html/Con­nectivity_Guide/files/FMBCon­nectivityStompTelnet.html
  28. Heartbeat (computing)
    https://en.wikipedia.org/wi­ki/Heartbeat_(computing)
  29. Apache Camel
    https://camel.apache.org/
  30. Red Hat Fuse
    https://developers.redhat­.com/products/fuse/overvi­ew/
  31. Confusion between ActiveMQ and ActiveMQ-Artemis?
    https://serverfault.com/qu­estions/873533/confusion-between-activemq-and-activemq-artemis
  32. Staré stránky projektu HornetQ
    http://hornetq.jboss.org/
  33. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  34. Difference between ActiveMQ vs Apache ActiveMQ Artemis
    http://activemq.2283324.n4­.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html
  35. Microservices communications. Why you should switch to message queues
    https://dev.to/matteojoli­veau/microservices-communications-why-you-should-switch-to-message-queues–48ia
  36. Stomp.py 4.1.19 documentation
    https://stomppy.readthedoc­s.io/en/stable/
  37. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  38. ØMQ – Distributed Messaging
    http://zeromq.org/
  39. ØMQ Community
    http://zeromq.org/community
  40. Get The Software
    http://zeromq.org/intro:get-the-software
  41. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  42. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  43. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  44. ZeroMQ RFC
    https://rfc.zeromq.org/
  45. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  46. zeromq/czmq
    https://github.com/zeromq/czmq
  47. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  48. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  49. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  50. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  51. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  52. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  53. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  54. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  55. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  56. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  57. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  58. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  59. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  60. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  61. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  62. Python binding
    http://zeromq.org/bindings:python
  63. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  64. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  65. About Nanomsg
    https://nanomsg.org/
  66. Advanced Message Queuing Protocol
    https://www.amqp.org/
  67. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  68. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  69. RabbitMQ
    https://www.rabbitmq.com/
  70. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  71. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  72. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  73. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  74. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  75. Erlang
    http://www.erlang.org/
  76. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  77. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  78. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  79. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  80. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  81. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  82. celery na PyPi
    https://pypi.org/project/celery/
  83. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  84. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  85. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  86. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  87. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  88. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  89. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  90. node-celery
    https://github.com/mher/node-celery
  91. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  92. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  93. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  94. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  95. Stránky projektu Redis
    https://redis.io/
  96. Introduction to Redis
    https://redis.io/topics/introduction
  97. Try Redis
    http://try.redis.io/
  98. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  99. Python Redis
    https://redislabs.com/lp/python-redis/
  100. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  101. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  102. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  103. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  104. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  105. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  106. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  107. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  108. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  109. Redis Persistence
    https://redis.io/topics/persistence
  110. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  111. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  112. Ost (knihovna)
    https://github.com/soveran/ost
  113. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  114. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  115. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  116. What Is Sharding?
    https://btcmanager.com/what-sharding/
  117. Redis clients
    https://redis.io/clients
  118. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  119. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  120. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  121. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  122. Resque
    https://github.com/resque/resque
  123. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  124. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  125. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  126. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  127. Pub/Sub
    https://redis.io/topics/pubsub
  128. ZeroMQ distributed messaging
    http://zeromq.org/
  129. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  130. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  131. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  132. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  133. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  134. Redis Protocol specification
    https://redis.io/topics/protocol
  135. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  136. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  137. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  138. Apache Kafka
    https://kafka.apache.org/
  139. Iron
    http://www.iron.io/mq
  140. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  141. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  142. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  143. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  144. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  145. Enqueueing internals
    http://python-rq.org/contrib/
  146. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  147. Queue – A thread-safe FIFO implementation
    https://pymotw.com/2/Queue/
  148. Queues
    http://queues.io/
  149. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  150. RestMQ
    http://restmq.com/
  151. ActiveMQ
    http://activemq.apache.org/
  152. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  153. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  154. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  155. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  156. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  157. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  158. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  159. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  160. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  161. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  162. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  163. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  164. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  165. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  166. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  167. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  168. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  169. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  170. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  171. Jupyter
    https://jupyter.org/
  172. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  173. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html