Hlavní navigace

NATS Streaming Server

Pavel Tišnovský

V předchozí části seriálu jsme se seznámili s nástrojem nazvaným NATS, který lze použít ve funkci výkonného brokera zpráv. Nad tímto nástrojem je postavený NATS Streaming Server, jehož popisem se budeme zabývat dnes.

Doba čtení: 44 minut

Sdílet

11. Vylepšení producenta i konzumenta zpráv – test všech chybových stavů, použití defer pro uzavírání prostředků

12. Přehrání všech zpráv poslaných do zvoleného tématu

13. Příjem zpráv od poslední přijaté zprávy

14. Specifikace pořadí zprávy v sekvenci

15. Specifikace časového okamžiku, od kterého se má zpráva či zprávy přijmout

16. Zajištění trvalého odběru zpráv zvoleným konzumentem (durable subscription)

17. Termíny používané v souvislosti s message brokery

18. Repositář s demonstračními příklady

19. Odkazy na předchozí části seriálu o message brokerech

20. Odkazy na Internetu

1. NATS Streaming Server

V dnešním článku navážeme na téma, kterému jsme se věnovali minule. Připomeňme si, že jsme se zabývali popisem message brokera nazvaného NATS, který je vytvořen v programovacím jazyku Go, takže výsledkem je server, jenž je poměrně nenáročný na systémové zdroje (především na operační paměť a taktéž na diskový prostor), ovšem i přesto podporuje rychlé přeposílání a popř. rozesílání zpráv. Ve své základní konfiguraci se NATS používá především pro implementaci komunikační strategie publish-subscribe neboli zkráceně pub-sub.

Dnes se seznámíme s projektem nazvaným NATS Streaming Server, jehož zdrojové kódy i základní dokumentaci naleznete na adrese https://github.com/nats-io/nats-streaming-server. Tento projekt je určen pro takzvaný streaming zpráv, podobně jako známější a pravděpodobně i častěji nasazovaný projekt Apache Kafka, na nějž jsme pochopitelně nezapomněli – popíšeme si ho v některém dalším dílu seriálu o message brokerech.

NATS Streaming Server používá klasický systém (server) NATS, který navíc doplňuje o takzvaný storage, tj. o technologii určenou pro ukládání zpráv (někdy nazývaných i záznamy – record) do perzistentního úložiště, kterým může být relační databáze či soubor (resp. skupina souborů). Samotný NATS, jak již víme z předchozího článku, se skládá z několika komponent:

  1. V první řadě se jedná o samotný server, jenž se spouští příkazem gnatsd. Server je naprogramovaný v jazyce Go a při jeho vývoji bylo dbáno na to, aby byla zaručena vysoká dostupnost celé služby a přitom byla samotná služba s běžícím serverem málo náročná na systémové zdroje, především na spotřebu operační paměti (viz úvodní odstavec).
  2. Dalším typem komponenty jsou programátorská rozhraní pro klienty, která v současnosti existují pro několik ekosystémů (což je většinou kombinace programovacího jazyka, knihoven a popř. jeho virtuálního stroje). Prozatím jsme se seznámili s rozhraním určeným přímo pro jazyk Go.
  3. Třetí komponentou je dnes popisovaný NATS Streaming Server, který je opět naprogramován v jazyce Go. K tomuto serveru ještě musíme připočítat příslušné rozhraní dodávané (v případě jazyka Go) ve formě samostatného balíčku.
  4. Čtvrtým typem komponenty je takzvaný NATS Connector Framework zajišťující propojení systému NATS s dalšími technologiemi (XMPP, logování, notifikační služby aj.). Ten je naprogramovaný v Javě a v současnosti je podporován například konektor pro Redis (https://github.com/nats-io/nats-connector-redis). I této zajímavé problematice bude později věnován samostatný článek.

2. Klasičtí message brokeři a jejich komunikační strategie

V navazujících kapitolách se budeme věnovat takzvanému streamingu zpráv (či záznamů), což je možné pokládat za rozšíření služeb klasických message brokerů o možnost „přehrání“ starších zpráv kombinovanou s možností, aby tu samou zprávu zpracovávalo větší množství konzumentů. Pro upřesnění, v čemž se vlastně streaming odlišuje od běžných message brokerů si ještě jednou (v tomto seriálu naposledy :-) zopakujme dvě komunikační strategie, které většinou nalezneme u běžných message brokerů (Redis Queue, RabbitMQ, ActiveMQ atd.).

Většina message brokerů, s nimiž jsme se až doposud v tomto seriálu seznámili, podporovala dvě základní komunikační strategie. První z těchto strategií se nazývá publish-subscribe (neboli zkráceně pub-sub) a spočívá v tom, že se zprávy s nastaveným tématem (topic, subject) posílají do message brokera, který tyto zprávy ihned přeposílá konzumentům přihlášeným k danému tématu. V případě, že žádný takový konzument neexistuje, je zpráva zahozena. Pokud konzumentů daného tématu naopak existuje větší množství, je zpráva doručena všem takovým konzumentům. Samotný message broker v tomto případě nepotřebuje zprávy ukládat, takže se většinou nesetkáme ani s podporou pro persistenci zpráv. Příkladem takového systému je právě NATS (bez dalších rozšíření).

Obrázek 1: Komunikační strategie publish-subscribe.

Druhá komunikační strategie, která se někdy nazývá push-pull či pouze queueing, je založena na pojmenovaných frontách zpráv (message queue) implementovaných v message brokerovi. Zpráva bývá v tomto případě doručena jen jedinému konzumentovi a pokud žádný konzument není k frontě připojen, zůstane zpráva ve frontě uložena (teoreticky) po libovolně dlouhou dobu. Message broker v takovém případě typicky podporuje perzistenci zpráv, které tak dokážou přečkat jeho případné restarty. Mnoho message brokerů navíc umožňuje, aby konzumenti potvrzovali zpracování zprávy, popř. je dokonce možné provádět transakce. K pojmenovaným frontám se potom přidávají další specializované fronty na nezpracované zprávy (DLQ – Dead Letters Queue.

Obrázek 2: Komunikační strategie push-pull.

V klasickým message brokerech orientovaných na komunikaci s využitím zpráv je každá zpráva většinou spravována izolovaně od ostatních zpráv. I když některé implementace message brokerů podporují prioritní fronty, není obecně vyžadováno (a ani se to neočekává), aby se zprávy doručovaly v takovém pořadí, v jakém je message broker přijímá (což je u pojmenovaných front problematické). Toto chování pro mnohé účely plně dostačuje a z hlediska klientů (tedy producentů zpráv i jejich konzumentů) je použití takových message brokerů velmi jednoduché, protože si klienti vlastně vůbec nemusí pamatovat stav zpráv (seznam zpracovaných zpráv, přijatých ale nezpracovaných zpráv atd.).

Obrázek 3: Složitější komunikační strategie, které jsou některými message brokery podporovány.

Ovšem existují problémy, které se s použitím klasickým message brokerů implementují velmi složitě popř. je nelze (na rozumném HW) implementovat vůbec. Příkladem může být systém pro zaznamenání událostí, ovšem v tom pořadí, v jakém události vznikly a s možností zpětného přehrávání (replay) zpráv/událostí. A právě pro řešení těchto problémů vznikl streaming zpráv.

3. Alternativní přístup – streaming

Představme si například situaci, kdy je nutné zpracovávat informace získávané z nějakého „živého systému“, tyto informace nějakým způsobem upravit a následně je například uložit do logovacího souboru, ovšem s tím, že zůstane zachováno pořadí čtených a zpracovávaných informací. Popř. se může jednat o skutečný streaming nějakých dat, například o rastrové obrázky čtené z několika kamer, které je taktéž zapotřebí zpracovat, katalogizovat a popř. na konci celého řetězce uložit. K tomu můžeme chtít použít takzvaný replay, tj. možnost vrátit se k nějaké sekvenci zpráv (událostí, obrázků) z minulosti, ovšem se zachováním jejich pořadí. Právě v těchto případech může být výhodnější namísto klasických message brokerů použít brokery podporující streaming.

Streaming v první řadě nabízí uživatelům kombinaci možností klasických komunikačních strategií push-pull a pub-sub, protože umožňuje:

  1. Implementovat skupinu konzumentů zpracovávajících zvolené téma způsobem, který pracuje podobně jako klasická fronta (queue), tj. tak, že zprávy jsou postupně vybírány a zpracovávány v takovém pořadí, v jakém byly do message brokera poslány.
  2. Současně je ovšem možné jednu zprávu přeposílat většímu množství konzumentů či jejich skupin, což je vlastnost, která je očekávána od strategie pub-sub. Přeposílání může být v případě potřeby okamžité a je zcela řízena konzumenty, nikoli message brokerem.

Až doposud se koncept streamingu až nápadně podobá použití klasických front zpráv, ovšem rozdíly nalezneme v tom, jak jsou zprávy organizovány na message brokerovi a jak s nimi nakládají konzumenti (z pohledu producentů zpráv se většinou nic nemusí měnit, ovšem někdy je zapotřebí u zpráv explicitně specifikovat takzvaný oddíl – partition).

Například v systému Apache Kafka jsou jednotlivá témata rozdělena do oddílů (partitions), přičemž každý oddíl obsahuje neměnnou sekvenci zpráv. Oddíly pro jednotlivá témata lze zpracovávat v několika brokerech umístěných do clusteru a tak zajistit potřebný load balancing, případnou replikaci atd. Každá zpráva uložená do oddílu má přiřazen jednoznačný offset (buď celé číslo nebo časové razítko, v závislosti na konkrétní implementaci). Navíc je možné – podle konkrétního brokera – aby se pro každé téma udržovalo několik logů (partition logs), což umožňuje připojení většího množství konzumentů zpráv k jednomu tématu s tím, že tito konzumenti budou pracovat paralelně.

Navíc je většinou možné používat větší množství instancí brokerů, z nichž je vytvořen cluster. Zde se setkáme s důležitým termínem replikace – každý oddíl je typicky replikován na několika message brokerech v clusteru (ovšem nemusí se jednat o všechny brokery, replikace se provádí například na tři brokery v clusteru).

4. Koncepty, na nichž je streaming založen

To, že každá zpráva uložená do oddílu má přiřazen offset, umožňuje, aby konzumenti zpráv specifikovali, od jakého offsetu potřebují zprávy přijímat. Díky tomu je možné, aby se konzumenti připojovali a odpojovali v jakýkoli okamžik a přitom měli možnost řídit, s jakými zprávami budou pracovat. Konzument se například po pozdějším připojení může rozhodnout, že bude zpracovávat nejnovější data a na případná starší data si (možná) vyhradí svůj strojový čas později. Umožněn je i takzvaný replay, tj. zpracování určité sekvence zpráv z minulosti. Postačuje znát jen offset první zprávy ze sekvence.

Samotné posílání zpráv konzumentů si přitom řídí sami konzumenti, kteří si zprávy (téma+oddíl+offset) musí vyžádat. Díky tomu si sami konzumenti určují, kolik zpráv zpracují a jak tedy budou zatíženi.

To však ve skutečnosti není vše, protože zprávy poslané do message brokera s podporou streamingu se v čase chovají odlišně, než v případě použití běžné fronty. Zprávy jsou totiž rozšířeny o své pořadové číslo a většinou i o časové razítko. Takto rozšířené zprávy jsou uloženy do takzvaného logu s daným tématem, kde jsou uchovány buď po neomezeně dlouhou dobu, nebo až do chvíle, kdy je překročena kapacita logu popř. maximální doba životnosti zprávy. Pokud dojde k alespoň jedné této události, budou nejstarší zprávy smazány (kapacita logu je většinou zadána jak celkovým objemem, tak i maximálním počtem zpráv, což je ostatně i případ NATS Streaming Serveru). Nezáleží tedy na tom, zda a kolika konzumenty byla zpráva přečtena a zpracována – přečtení a zpracování zprávy nemá vliv na její umístění v logu, což vlastně znamená, že většinou dochází k určitému zesložitění konzumentů, kteří si musí pamatovat, které zprávy s daným tématem již zpracovaly a které nikoli.

Aby bylo možné konfigurovat a řídit, které zprávy mají být na message brokeru uloženy a které již smazány, specifikuje se takzvaný retention time zajišťující, aby počet zpráv/záznamů nepřekročil časovou mez. Mnohé streaming servery dokážou omezit i celkový počet zpráv, počet zpráv v tématu a/nebo počet zpráv na jednom serveru v clusteru. Totéž omezení je možné aplikovat na celkovou velikost použitého paměťového či diskového prostoru. Ostatně tyto informace uvidíme i při spuštění NATS Streaming Serveru v sedmé kapitole.

Dále se u streaming serverů setkáme s možností zapojení více serverů do clusteru, což podporuje jak dnes popisovaný NATS Streaming Server, tak i Apache Kafka.

5. Nejznámější systémy podporující streaming

Mezi nejznámější a pravděpodobně nejčastěji nasazované systémy podporující streaming patří projekt Apache Kafka a taktéž dnes popisovaný NATS Streaming Server. Vzhledem k tomu, že NATS Streaming Server je popsán v navazujících kapitolách, řekneme si v této kapitole základní informace o „konkurenčním“ systému Apache Kafka (slovo „konkurenční“ je zapsáno v uvozovkách především z toho důvodu, že každý systém má svoji niku, ve které se používá a tyto niky se překrývají jen částečně).

Apache Kafka umožňuje ukládání zpráv (zde se ovšem používá termín záznam – record) do různých témat, přičemž každé téma je rozděleno do oddílů neboli partition (samozřejmě je možné pro téma vyhradit pouze jediný oddíl). Rozdělení do oddílů se provádí z několika důvodů. Jedním z nich je rozdělení zátěže, protože jednotlivé oddíly mohou být provozovány na různých počítačích v clusteru. To však není vše, jelikož je ve skutečnosti konfigurace poněkud složitější – každý oddíl totiž může být replikován na více počítačích, přičemž jeden z těchto oddílů je „leaderem“ a ostatní jsou „followeři“. Zápis nových zpráv, popř. čtení se provádí vždy jen v leaderu, ovšem změny jsou replikovány na všechny kopie oddílu. Ve chvíli, kdy dojde k pádu „leadera“, převezme jeho roli jeden z dalších uzlů. Pokud tedy existuje N uzlů s replikou oddílu, bude systém funkční i ve chvíli, kdy zhavaruje N-1 uzlů!

Téma zpracovávané Kafkou může na clusteru vypadat například následovně:

          +---+---+---+---+---+---+
oddíl #0  | 0 | 1 | 2 | 3 | 4 | 5 | ...
          +---+---+---+---+---+---+
oddíl #1  | 0 | 1 | 2 | ...
          +---+---+---+
oddíl #2  | ...
          +---+---+---+---+---+---+---+---+---+
oddíl #3  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...
          +---+---+---+---+---+---+---+---+---+

Boxy s čísly odpovídají jednotlivým zprávám, kterým jsou tato pořadová čísla v sekvenci postupně přiřazována. Zápis nových zpráv je prováděn do oblastí označených třemi tečkami. Z tohoto diagramu můžeme odvodit, že každý oddíl obsahuje vlastní sekvenci zpráv/záznamů, ke kterým se postupně připojují záznamy další.

Pozor si musíme dát především na to, že pořadí záznamů je zachováno a garantováno pouze v rámci jednoho oddílu a nikoli pro celé téma. V případě, že požadujeme, aby všechny zprávy v tématu měly zaručené pořadí, používá se pouze jeden oddíl pro celé téma, což má ovšem vliv na další vlastnosti řešení – především se snižuje výkonnost a možnost rozkládání zátěže v rámci clusteru.

Producent při posílání zprávy volí jak téma (což je logické), tak i oddíl. Volbu oddílu je možné provést na základě mnoha kritérií. Například lze použít jednoduchý algoritmus round robin a používat oddíly čistě pro rozložení zátěže. Nebo se může oddíl zvolit například na základě ID přihlášeného uživatele, poslední části IP počítače, na němž je spuštěn producent atd. atd.

Podobně jako u dalších message brokerů s podporou streamování se i v systému Apache Kafka o výběr zpráv, které se mají zpracovat, starají přímo konzumenti, kteří mohou specifikovat offset zprávy. Konzumenti se spojují do skupin, přičemž zprávu získá vždy jeden z konzumentů ze skupiny (což se podobá strategii pub-sub).

Apache Kafka podporuje i speciální API používané ve chvíli, kdy potřebujeme implementovat uzel (proces), který získá zprávu, nějakým způsobem ji zpracuje a poté ji pošle zpět do message brokera. Takový uzel je samozřejmě možné naimplementovat tak, že se bude současně jednat o konzumenta i producenta, ovšem využití speciálního API je efektivnější.

6. Instalace systému NATS Streaming Server

Nyní si ve stručnosti ukážeme, jak vlastně probíhá instalace systému NATS Streaming Server. V této kapitole se předpokládá, že již máte nainstalován NATS Server, který byl popsán v předchozím článku. Pokud tomu tak není, postupujte následujícími dvěma kroky. Nejprve se přesuneme do adresáře, na který odkazuje proměnná prostředí $GOPATH, což je typicky adresář „~/go“:

$ cd $GOPATH

Dále spustíme příkaz pro nainstalování balíčku, který obsahuje server NATSu:

$ go get github.com/nats-io/gnatsd
Poznámka: ve skutečnosti ovšem nebudeme, na rozdíl od předchozího článku, takto nainstalovaný NATS server spouštět ručně. Tuto činnost zajistí NATS Streaming server automaticky.

Nyní již konečně můžeme přistoupit k instalaci samotného NATS Streaming Serveru. Budeme postupovat podobně, jako při instalaci NATS Serveru, tj. nejdříve se přesuneme do adresáře, na který odkazuje proměnná prostředí $GOPATH:

$ cd $GOPATH

Instalace balíčku se streaming serverem:

$ go get github.com/nats-io/nats-streaming-server

Po instalaci zdrojových kódů musíme NATS Streaming Server přeložit, a to tímto příkazem:

$ cd $GOPATH/src/github.com/nats-io/nats-streaming-server
$ go build

Po překladu by měl v aktuálním adresáři vzniknout soubor pojmenovaný nats-streaming-server. Zkusíme si zobrazit jeho nápovědu:

$ ./nats-streaming-server --help

S výsledkem:

Usage: nats-streaming-server [options]
 
Streaming Server Options:
    -cid, --cluster_id  <string>         Cluster ID (default: test-cluster)
    -st,  --store <string>               Store type: MEMORY|FILE|SQL (default: MEMORY)
          --dir <string>                 For FILE store type, this is the root directory
    -mc,  --max_channels <int>           Max number of channels (0 for unlimited)
    -msu, --max_subs <int>               Max number of subscriptions per channel (0 for unlimited)
    -mm,  --max_msgs <int>               Max number of messages per channel (0 for unlimited)
    -mb,  --max_bytes <size>             Max messages total size per channel (0 for unlimited)
    -ma,  --max_age <duration>           Max duration a message can be stored ("0s" for unlimited)
    -mi,  --max_inactivity <duration>    Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited)
    -ns,  --nats_server <string>         Connect to this external NATS Server URL (embedded otherwise)
    -sc,  --stan_config <string>         Streaming server configuration file
    -hbi, --hb_interval <duration>       Interval at which server sends heartbeat to a client
    -hbt, --hb_timeout <duration>        How long server waits for a heartbeat response
    -hbf, --hb_fail_count <int>          Number of failed heartbeats before server closes the client connection
          --ft_group <string>            Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore
    -sl,  --signal <signal>[=<pid>]      Send signal to nats-streaming-server process (stop, quit, reopen)
          --encrypt <bool>               Specify if server should use encryption at rest
          --encryption_cipher <string>   Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES
          --encryption_key <sting>       Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead

Streaming Server Clustering Options:
    --clustered <bool>                   Run the server in a clustered configuration (default: false)
    --cluster_node_id <string>           ID of the node within the cluster if there is no stored ID (default: random UUID)
    --cluster_bootstrap <bool>           Bootstrap the cluster if there is no existing state by electing self as leader (default: false)
    --cluster_peers <string, ...>        Comma separated list of cluster peer node IDs to bootstrap cluster state
    --cluster_log_path <string>          Directory to store log replication data
    --cluster_log_cache_size <int>       Number of log entries to cache in memory to reduce disk IO (default: 512)
    --cluster_log_snapshots <int>        Number of log snapshots to retain (default: 2)
    --cluster_trailing_logs <int>        Number of log entries to leave after a snapshot and compaction
    --cluster_sync <bool>                Do a file sync after every write to the replication log and message store
    --cluster_raft_logging <bool>        Enable logging from the Raft library (disabled by default)
 
Streaming Server File Store Options:
    --file_compact_enabled <bool>        Enable file compaction
    --file_compact_frag <int>            File fragmentation threshold for compaction
    --file_compact_interval <int>        Minimum interval (in seconds) between file compactions
    --file_compact_min_size <size>       Minimum file size for compaction
    --file_buffer_size <size>            File buffer size (in bytes)
    --file_crc <bool>                    Enable file CRC-32 checksum
    --file_crc_poly <int>                Polynomial used to make the table used for CRC-32 checksum
    --file_sync <bool>                   Enable File.Sync on Flush
    --file_slice_max_msgs <int>          Maximum number of messages per file slice (subject to channel limits)
    --file_slice_max_bytes <size>        Maximum file slice size - including index file (subject to channel limits)
    --file_slice_max_age <duration>      Maximum file slice duration starting when the first message is stored (subject to channel limits)
    --file_slice_archive_script <string> Path to script to use if you want to archive a file slice being removed
    --file_fds_limit <int>               Store will try to use no more file descriptors than this given limit
    --file_parallel_recovery <int>       On startup, number of channels that can be recovered in parallel
    --file_truncate_bad_eof <bool>       Truncate files for which there is an unexpected EOF on recovery, dataloss may occur
 
Streaming Server SQL Store Options:
    --sql_driver <string>            Name of the SQL Driver ("mysql" or "postgres")
    --sql_source <string>            Datasource used when opening an SQL connection to the database
    --sql_no_caching <bool>          Enable/Disable caching for improved performance
    --sql_max_open_conns <int>       Maximum number of opened connections to the database
 
Streaming Server TLS Options:
    -secure <bool>                   Use a TLS connection to the NATS server without
                                     verification; weaker than specifying certificates.
    -tls_client_key <string>         Client key for the streaming server
    -tls_client_cert <string>        Client certificate for the streaming server
    -tls_client_cacert <string>      Client certificate CA for the streaming server
 
Streaming Server Logging Options:
    -SD, --stan_debug=<bool>         Enable STAN debugging output
    -SV, --stan_trace=<bool>         Trace the raw STAN protocol
    -SDV                             Debug and trace STAN
         --syslog_name               On Windows, when running several servers as a service, use this name for the event source
    (See additional NATS logging options below)
 
Embedded NATS Server Options:
    -a, --addr <string>              Bind to host address (default: 0.0.0.0)
    -p, --port <int>                 Use port for clients (default: 4222)
    -P, --pid <string>               File to store PID
    -m, --http_port <int>            Use port for http monitoring
    -ms,--https_port <int>           Use port for https monitoring
    -c, --config <string>            Configuration file
 
Logging Options:
    -l, --log <string>               File to redirect log output
    -T, --logtime=<bool>             Timestamp log entries (default: true)
    -s, --syslog <string>            Enable syslog as log method
    -r, --remote_syslog <string>     Syslog server addr (udp://localhost:514)
    -D, --debug=<bool>               Enable debugging output
    -V, --trace=<bool>               Trace the raw protocol
    -DV                              Debug and trace
 
Authorization Options:
        --user <string>              User required for connections
        --pass <string>              Password required for connections
        --auth <string>              Authorization token required for connections
 
TLS Options:
        --tls=<bool>                 Enable TLS, do not verify clients (default: false)
        --tlscert <string>           Server certificate file
        --tlskey <string>            Private key for server certificate
        --tlsverify=<bool>           Enable TLS, verify client certificates
        --tlscacert <string>         Client certificate CA for verification
 
NATS Clustering Options:
        --routes <string, ...>       Routes to solicit and connect
        --cluster <string>           Cluster URL for solicited routes
 
Common Options:
    -h, --help                       Show this message
    -v, --version                    Show version
        --help_tls                   TLS help.

7. Spuštění NATS Streaming Serveru

V případě, že se nápověda skutečně zobrazila, můžeme se pokusit spustit NATS Streaming Server s výchozí konfigurací. Každé (další) spuštění serveru by mělo vypadat následovně:

$ cd $GOPATH/src/github.com/nats-io/nats-streaming-server
 
$ ./nats-streaming-server

Na terminálu by se měly zobrazit přibližně následující informace, z nichž některé budou velmi důležité pro naše další pokusy:

[3568] 2019/04/05 17:50:18.681037 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.2
[3568] 2019/04/05 17:50:18.681156 [INF] STREAM: ServerID: fxLHdxa4OjkdnXtNZaYHSC
[3568] 2019/04/05 17:50:18.681178 [INF] STREAM: Go version: go1.11.2
[3568] 2019/04/05 17:50:18.681621 [INF] STREAM: Git commit: [not set]
[3568] 2019/04/05 17:50:18.683122 [INF] Starting nats-server version 1.4.1
[3568] 2019/04/05 17:50:18.683172 [INF] Git commit [not set]
[3568] 2019/04/05 17:50:18.683422 [INF] Listening for client connections on 0.0.0.0:4222
[3568] 2019/04/05 17:50:18.683447 [INF] Server is ready
[3568] 2019/04/05 17:50:18.712436 [INF] STREAM: Recovering the state...
[3568] 2019/04/05 17:50:18.712549 [INF] STREAM: No recovered state
[3568] 2019/04/05 17:50:18.963128 [INF] STREAM: Message store is MEMORY
[3568] 2019/04/05 17:50:18.963226 [INF] STREAM: ---------- Store Limits ----------
[3568] 2019/04/05 17:50:18.963255 [INF] STREAM: Channels:                  100 *
[3568] 2019/04/05 17:50:18.963271 [INF] STREAM: --------- Channels Limits --------
[3568] 2019/04/05 17:50:18.963291 [INF] STREAM:   Subscriptions:          1000 *
[3568] 2019/04/05 17:50:18.963307 [INF] STREAM:   Messages     :       1000000 *
[3568] 2019/04/05 17:50:18.963323 [INF] STREAM:   Bytes        :     976.56 MB *
[3568] 2019/04/05 17:50:18.963339 [INF] STREAM:   Age          :     unlimited *
[3568] 2019/04/05 17:50:18.963360 [INF] STREAM:   Inactivity   :     unlimited *
[3568] 2019/04/05 17:50:18.963375 [INF] STREAM: ----------------------------------

Za povšimnutí stojí především čtyři zvýrazněné informace:

  1. Důležité je především jméno clusteru, v rámci něhož server běží. Jedná se o důležitou informaci z toho důvodu, že jméno clusteru budeme potřebovat při připojování klientů.
  2. Dále si povšimněte portu, na který se budou připojovat klienti. Pokud se výchozí hodnota portu nezmění, nemusíme ji explicitně v klientech specifikovat.
  3. Pravděpodobně nejdůležitější je informace o tom, že zprávy jsou drženy pouze v operační paměti a nejsou meziukládány na disk. Toto nastavení je možné změnit a využít pro ukládání zpráv například relační databáze či přímo souborový systém. Výchozí hodnota MEMORY znamená, že po restartu serveru budou zprávy ztraceny.
  4. Hodnota unlimited vlastnosti Age říká, že předem neomezujeme maximální životnost zpráv, což v praxi znamená, že pokud bude zpráv méně než jeden milion (informace o dva řádky výše) a nepřesáhne se maximální kapacita kanálu, nebudou zprávy smazány.

Server nechte spuštěný v samostatném terminálu, protože se ještě vrátíme k dalším zprávám, které budou na tento terminál zapsány po připojení prvního producenta zpráv.

8. Instalace balíčku potřebného pro implementaci klientů

Kromě běžícího serveru pro streaming zpráv budeme při tvorbě demonstračních příkladů potřebovat ještě jeden balíček, který se jmenuje go-nats-streaming. Tento balíček zajišťuje rozhraní mezi klienty a NATS Streaming Serverem. Nainstalujeme ho naprosto stejným způsobem jako jakýkoli jiný balíček určený pro programovací jazyk Go:

$ cd $GOPATH
 
$ go get github.com/nats-io/go-nats-streaming

Import tohoto balíčku v demonstračních klientech bude vypadat takto:

import streaming "github.com/nats-io/go-nats-streaming"
Poznámka: z pohledu jazyka Go se tento balíček jmenuje stan (nezáleží totiž na jménu adresáře, ale identifikátoru za package), my ovšem použijeme jmenný alias streaming.

9. Nejjednodušší varianta zdroje zpráv (producenta)

Nejprve si ukážeme, jakým způsobem je možné vytvořit producenta zpráv. Ten se svou činností vlastně vůbec neodlišuje od producentů zpráv posílaných do běžných message brokerů – samotná zpráva musí mít pouze nastaveno téma (topic) a na tělo zprávy nejsou kladena žádná omezení, protože se jedná o sekvenci bajtů.

V tom nejjednodušším případě se pouze připojíme k zadanému clusteru (viz též sedmou kapitolu, kde jsme si ukázali, kde jméno clusteru hledat) a specifikujeme jméno klienta, které by ideálně mělo být jednoznačné:

stream, _ := streaming.Connect(clusterId, clientId)

Zpráva s libovolným obsahem (sekvencí bajtů) se publikuje metodou Publish:

stream.Publish(topic, []byte("Hello World"))

Nakonec nesmíme zapomenout uzavřít připojení k serveru:

stream.Close()

Úplný zdrojový kód producenta zprávy naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/01-simple-pub-sub/publisher.go:

package main
 
import streaming "github.com/nats-io/go-nats-streaming"
 
const clusterId = "test-cluster"
const clientId = "publisher1"
const topic = "topic1"
 
func main() {
        stream, _ := streaming.Connect(clusterId, clientId)
 
        stream.Publish(topic, []byte("Hello World"))
 
        stream.Close()
}

Pokud klienta přeložíte a spustíte, můžete si v terminálu, kde běží NATS Streaming Server povšimnout nové zprávy:

[3568] 2019/04/05 17:50:42.121546 [INF] STREAM: Channel "topic1" has been created

Vidíme, že server správně poznal, že poslaná zpráva obsahuje prozatím neexistující téma, pro které byl ihned vytvořen takzvaný kanál.

10. Konzument zpráv

Jak je to u systémů, v nichž se posílají zprávy, běžné, je implementace konzumenta zpráv nepatrně složitější, než u jejich producenta. Je tomu tak proto, že konzument dopředu neví, kolik zpráv získá a musí na ně čekat – buď v explicitně zapsané smyčce (což nebude náš případ), nebo s využitím nějaké formy smyčky událostí.

Připojení k serveru se provádí stejným způsobem, jako u producenta. V nejjednodušším případě nám postačuje znát jméno clusteru a zadat libovolné jméno klienta:

stream, _ := streaming.Connect(clusterId, clientId)

K odběru zpráv je nutné se přihlásit metodou Subscribe, které se předá téma, které konzumenta zajímá a taktéž reference na callback funkci zavolanou ve chvíli, kdy bude zpráva přijata:

sub, _ := stream.Subscribe(topic, onReceive)

Samotná callback funkce je po příjmu zprávy zavolána s jediným parametrem, jímž je samotná zpráva:

func onReceive(m *streaming.Msg) {
        println("Received message: ", string(m.Data))
}

Na konci se odhlásíme od odběru zpráv a uzavřeme připojení k serveru:

sub.Unsubscribe()
 
stream.Close()

Problém ovšem spočívá v tom, že samotné čekání na zprávy je prováděno asynchronně k běhu gorutiny s funkcí main. Proto musíme zajistit, aby funkce main nebyla ihned ukončena. K tomu může posloužit známý trik s kanálem, z něhož budeme číst hodnotu, kterou ovšem ve skutečnosti nikdo nepošle:

c := make(chan bool)
 
sub, _ := stream.Subscribe(topic, onReceive)
 
<-c

Opět se podívejme na úplný zdrojový kód konzumenta zpráv, který nalezneme na adrese https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/01-simple-pub-sub/consumer.go:

package main
 
import streaming "github.com/nats-io/go-nats-streaming"
 
const clusterId = "test-cluster"
const clientId = "consumer1"
const topic = "topic1"
 
func onReceive(m *streaming.Msg) {
        println("Received message: ", string(m.Data))
}
 
func main() {
        stream, _ := streaming.Connect(clusterId, clientId)
 
        c := make(chan bool)
 
        sub, _ := stream.Subscribe(topic, onReceive)
 
        <-c
 
        sub.Unsubscribe()
 
        stream.Close()
}

11. Vylepšení producenta i konzumenta zpráv – test všech chybových stavů, použití defer pro uzavírání prostředků

Předchozí implementace obou klientů, tj. jak producenta, tak i konzumenta zpráv, byly naprogramovány takovým způsobem, aby byly co nejjednodušší a nejkratší. Proto jsme vynechali i kontroly případných chyb a taktéž zavírání prostředků (připojení k serveru) je provedeno nekorektně a nutno říci, že ne zcela idiomatickým způsobem (minimálně pro projekt naprogramovaný v Go). Zdrojové kódy je však možné vylepšit. Po pokusu o připojení k systému NATS Streaming zkontrolujeme případný chybový stav a s využitím bloku defer zajistíme, že se klient před odchodem z funkce main automaticky odpojí:

stream, err := streaming.Connect(clusterId, clientId)
if err != nil {
        log.Fatalf("Can not connect to cluster %s\n", clusterId)
}
defer stream.Close()

Chyba pochopitelně může nastat i ve chvíli, kdy se budeme snažit publikovat nějakou zprávu, tj. poslat ji do určeného tématu. Proto otestujeme i hodnotu (jediné) návratové hodnoty z metody Publish:

err = stream.Publish(topic, []byte("Hello World"))
if err != nil {
        log.Fatalf("Error during publish: %v\n", err)
} else {
        log.Println("Message published")
}

Nezávisle na tom, zda se zavolá funkce log.Fatalf dojde k ukončení připojení, protože se blok defer zavolá i v takovém případě.

Následuje zdrojový kód upraveného producenta zpráv:

package main
 
import (
        streaming "github.com/nats-io/go-nats-streaming"
        "log"
)
 
const clusterId = "test-cluster"
const clientId = "publisher1"
const topic = "topic1"
 
func main() {
        stream, err := streaming.Connect(clusterId, clientId)
        if err != nil {
                log.Fatalf("Can not connect to cluster %s\n", clusterId)
        }
        defer stream.Close()
 
        err = stream.Publish(topic, []byte("Hello World"))
        if err != nil {
                log.Fatalf("Error during publish: %v\n", err)
        } else {
                log.Println("Message published")
        }
}

U konzumenta zpráv nesmíme zapomenout především na to, aby se za každých okolností podařilo odhlášení odběru zpráv. Tuto činnost opět zajistíme použitím bloku uvedeného za klíčovým slovem defer. Samozřejmě přidáme i podmínku testující, zda se podařilo přihlášení klienta k odběru zpráv z určitého tématu:

sub, err := stream.Subscribe(topic, onReceive)
if err != nil {
        log.Fatalf("Can not subscribe to topic %s\n", topic)
}
defer sub.Unsubscribe()

Opět se podívejme, jak vypadá upravená varianta konzumenta zpráv:

package main
 
import (
        streaming "github.com/nats-io/go-nats-streaming"
        "log"
)
 
const clusterId = "test-cluster"
const clientId = "consumer1"
const topic = "topic1"
 
func onReceive(m *streaming.Msg) {
        println("Received message: ", string(m.Data))
}
 
func main() {
        stream, err := streaming.Connect(clusterId, clientId)
        if err != nil {
                log.Fatalf("Can not connect to cluster %s\n", clusterId)
        }
        defer stream.Close()
 
        c := make(chan bool)
 
        sub, err := stream.Subscribe(topic, onReceive)
        if err != nil {
                log.Fatalf("Can not subscribe to topic %s\n", topic)
        }
        defer sub.Unsubscribe()
 
        <-c
}

Chování obou klientů by mělo být v případě, že nedojde k žádné chybě, shodné s prvním demonstračním příkladem.

12. Přehrání všech zpráv poslaných do zvoleného tématu

V této kapitole si ukážeme, jakým způsobem lze naprogramovat klienta, který po svém připojení k NATSu zpracuje všechny zprávy, které se nachází ve zvoleném tématu. Povšimněte si, že tato operace není v běžných message brokerech dostupná, protože jednou zpracované zprávy jsou z fronty odstraněny a nelze se k nim tedy vracet. Nejprve však upravíme producenta zpráv takovým způsobem, aby po svém spuštění vytvořil deset zpráv s různým tělem (odlišným řetězcem zakódovaným do sekvence bajtů):

for i := 1; i <= 10; i++ {
        message := fmt.Sprintf("Hello World #%d", i)
        err = stream.Publish(topic, []byte(message))
        if err != nil {
                log.Fatalf("Error during publish: %v\n", err)
        } else {
                log.Println("Message published: %s", message)
        }
}

Upravený zdrojový kód producenta zpráv je dostupný na adrese https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/03-replay-all/publisher.go:

package main
 
import (
        "fmt"
        streaming "github.com/nats-io/go-nats-streaming"
        "log"
)
 
const clusterId = "test-cluster"
const clientId = "publisher1"
const topic = "topic1"
 
func main() {
        stream, err := streaming.Connect(clusterId, clientId)
        if err != nil {
                log.Fatalf("Can not connect to cluster %s\n", clusterId)
        }
        defer stream.Close()
 
        for i := 1; i <= 10; i++ {
                message := fmt.Sprintf("Hello World #%d", i)
                err = stream.Publish(topic, []byte(message))
                if err != nil {
                        log.Fatalf("Error during publish: %v\n", err)
                } else {
                        log.Println("Message published: %s", message)
                }
        }
}

Konzument zpráv musí být naprogramován odlišně od „klasického“ konzumenta, který pouze očekává nové zprávy. Před přihlášením k nějakému tématu (topicu) je totiž nutné specifikovat, že budeme chtít přehrát všechny zprávy v tématu. Provede se to s využitím nepovinného parametru předaného do metody Subscribe, kterému předáme výsledek volání funkce DeliverAllAvailable():

startOpt := streaming.DeliverAllAvailable()
sub, err := stream.Subscribe(topic, onReceive, startOpt)

Následuje výpis úplného zdrojového kódu konzumenta:

package main
 
import (
        streaming "github.com/nats-io/go-nats-streaming"
        "log"
)
 
const clusterId = "test-cluster"
const clientId = "consumer1"
const topic = "topic1"
 
func onReceive(m *streaming.Msg) {
        println("Received message: ", string(m.Data))
}
 
func main() {
        stream, err := streaming.Connect(clusterId, clientId)
        if err != nil {
                log.Fatalf("Can not connect to cluster %s\n", clusterId)
        }
        defer stream.Close()
 
        c := make(chan bool)
 
        startOpt := streaming.DeliverAllAvailable()
        sub, err := stream.Subscribe(topic, onReceive, startOpt)
        if err != nil {
                log.Fatalf("Can not subscribe to topic %s\n", topic)
        }
        defer sub.Unsubscribe()
 
        <-c
}

Nyní můžeme provést následující operace:

  1. Spustíte dvakrát či třikrát producenta zpráv, který při každém běhu vytvoří deset nových zpráv.
  2. Spustíte konzumenta, jenž by měl postupně načíst a vypsat všechny zprávy.
  3. Spustíte konzumenta znovu, opět by se měly přehrát všechny zprávy.

13. Příjem zpráv od poslední přijaté zprávy

Relativně snadno lze docílit toho, aby konzument po svém připojení získal poslední přijatou zprávu a samozřejmě i všechny zprávy následující. Sice by se mohlo zdát, že získání zprávy, která již byla jednou poslána, je nadbytečné, ale zde se počítá s tím, že konzument při zpracování této zprávy zhavaroval a proto se mu pro jistotu pošle znovu. Tohoto chování docílíme takto:

startOpt := streaming.StartWithLastReceived()
sub, err := stream.Subscribe(topic, onReceive, startOpt)
if err != nil {
        log.Fatalf("Can not subscribe to topic %s\n", topic)
}
defer sub.Unsubscribe()

Pokud pro řešení nějakého problému vyhovuje spíše takové chování, kdy se pošle až následující zpráva, podívejte se na demonstrační příklad popsaný v šestnácté kapitole.

Následuje výpis upraveného zdrojového kódu konzumenta:

package main
 
import (
        streaming "github.com/nats-io/go-nats-streaming"
        "log"
)
 
const clusterId = "test-cluster"
const clientId = "consumer1"
const topic = "topic1"
 
func onReceive(m *streaming.Msg) {
        println("Received message: ", string(m.Data))
}
 
func main() {
        stream, err := streaming.Connect(clusterId, clientId)
        if err != nil {
                log.Fatalf("Can not connect to cluster %s\n", clusterId)
        }
        defer stream.Close()
 
        c := make(chan bool)
 
        // jeste jednou ziska posledni zpravu zpracovanou minule
        startOpt := streaming.StartWithLastReceived()
        sub, err := stream.Subscribe(topic, onReceive, startOpt)
        if err != nil {
                log.Fatalf("Can not subscribe to topic %s\n", topic)
        }
        defer sub.Unsubscribe()
 
        <-c
}

Zdrojový kód producenta si již v článku ukazovat nebudeme, protože je shodný s producentem předchozím.

14. Specifikace pořadí zprávy v sekvenci

V úvodních kapitolách jsme si řekli, že zprávám zařazovaným do zvoleného tématu je přiřazováno celé číslo odpovídající pořadí zprávy (v Apache Kafka je toto číslo přiřazováno v rámci oddílu, nikoli celého tématu). Klienta, přesněji řečeno konzumenta zpráv, lze naprogramovat takovým způsobem, aby začal zprávy zpracovávat od zvoleného offsetu:

startOpt := streaming.StartAtSequence(40)
sub, err := stream.Subscribe(topic, onReceive, startOpt)
if err != nil {
        log.Fatalf("Can not subscribe to topic %s\n", topic)
}
 
defer sub.Unsubscribe()

Můžeme si to snadno vyzkoušet:

  1. Spusťte ještě několikrát producenta zpráv, aby se do tématu „topic1“ vložilo minimálně padesát zpráv.
  2. Spusťte upraveného konzumenta, který by měl začít zpracovávat až čtyřicátou první zprávu.

Alternativně je možné producenta a konzumenta upravit takovým způsobem, aby začali používat odlišné téma.

Upravený konzument zpráv je uložen na adrese https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/05-start-at-sequence/consumer.go:

package main
 
import (
        streaming "github.com/nats-io/go-nats-streaming"
        "log"
)
 
const clusterId = "test-cluster"
const clientId = "consumer1"
const topic = "topic1"
 
func onReceive(m *streaming.Msg) {
        println("Received message: ", string(m.Data))
}
 
func main() {
        stream, err := streaming.Connect(clusterId, clientId)
        if err != nil {
                log.Fatalf("Can not connect to cluster %s\n", clusterId)
        }
        defer stream.Close()
 
        c := make(chan bool)
 
        startOpt := streaming.StartAtSequence(40)
        sub, err := stream.Subscribe(topic, onReceive, startOpt)
        if err != nil {
                log.Fatalf("Can not subscribe to topic %s\n", topic)
        }
        defer sub.Unsubscribe()
 
        <-c
}

15. Specifikace časového okamžiku, od kterého se má zpráva či zprávy přijmout

V dalším demonstračním příkladu, jehož zdrojový kód naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/06-time-duration/consumer.go je ukázáno, že při čtení a zpracovávání zpráv je možné určit i časový okamžik určující, že budeme chtít získat všechny zprávy, které jsou novější/mladší, než uvedený okamžik (každá zpráva má přiřazeno časové razítko). Užitečné přitom je, že standardní balíček time obsahuje funkci pro parsing řetězců s uvedením času v lidské podobě, například „5d“, „3m“ atd. Tento řetězec se převede na časový údaj (interval), který je použit pro výpočet časového offsetu:

package main
 
import (
        streaming "github.com/nats-io/go-nats-streaming"
        "log"
        "os"
        "time"
)
 
const clusterId = "test-cluster"
const clientId = "consumer1"
const topic = "topic1"
 
func onReceive(m *streaming.Msg) {
        println("Received message: ", string(m.Data))
}
 
func main() {
        stream, err := streaming.Connect(clusterId, clientId)
        if err != nil {
                log.Fatalf("Can not connect to cluster %s\n", clusterId)
        }
        defer stream.Close()
 
        c := make(chan bool)
 
        ago := "5m"
        timeDelta, err := time.ParseDuration(ago)
        if err != nil {
                log.Fatalf("Unable to parse duration '%s'", ago)
                os.Exit(-1)
        }
        println("About to receive all messages newer than", timeDelta)
 
        startOpt := streaming.StartAtTimeDelta(timeDelta)
        sub, err := stream.Subscribe(topic, onReceive, startOpt)
        if err != nil {
                log.Fatalf("Can not subscribe to topic %s\n", topic)
        }
        defer sub.Unsubscribe()
 
        <-c
}

16. Zajištění trvalého odběru zpráv zvoleným konzumentem (durable subscription)

Již ve třinácté kapitole jsme se seznámili se způsobem implementace klienta, který začne zpracovávat zprávy od poslední poslané zprávy (nezávisle na tom, o jakého klienta se jednalo). Existuje však ještě jeden způsob zajištění, aby klient i po svém pádu či pouhém znovupřipojení získal všechny zprávy, které ještě nestihl zpracovat. Tento způsob se v systému NATS Streaming nazývá durable subscription a je založen na jménu, pod kterým si NATS pamatuje poslední poslanou zprávu. Pokud každý klient použije jednoznačné jméno, bude celý proces funkční podle předpokladů – každý klient bude mít vlastní „stream“ zpráv:

const durableName = "durableXYZZY"
 
startOpt := streaming.DurableName(durableName)
 
sub, err := stream.Subscribe(topic, onReceive, startOpt)
if err != nil {
        log.Fatalf("Can not subscribe to topic %s\n", topic)
}
 
defer sub.Unsubscribe()
Poznámka: povšimněte si, že jméno klienta nemusí být ztotožněno se jménem předaným do funkce DurableName. To znamená, že několik klientů může sdílet stejné durableName.

Opět si samozřejmě ukážeme zdrojový kód konzumenta zpráv:

package main
 
import (
        streaming "github.com/nats-io/go-nats-streaming"
        "log"
)
 
const clusterId = "test-cluster"
const clientId = "consumer1"
const topic = "topic1"
const durableName = "durableXYZZY"
 
func onReceive(m *streaming.Msg) {
        println("Received message: ", string(m.Data))
}
 
func main() {
        stream, err := streaming.Connect(clusterId, clientId)
        if err != nil {
                log.Fatalf("Can not connect to cluster %s\n", clusterId)
        }
        defer stream.Close()
 
        c := make(chan bool)
 
        startOpt := streaming.DurableName(durableName)
 
        sub, err := stream.Subscribe(topic, onReceive, startOpt)
        if err != nil {
                log.Fatalf("Can not subscribe to topic %s\n", topic)
        }
        defer sub.Unsubscribe()
 
        <-c
}

17. Termíny používané v souvislosti s message brokery

V této kapitole jsou zmíněny významy některých termínů, s nimiž se v souvislosti s message brokery a se streamingem často setkáme. České překlady jsou ovšem pouze přibližné, protože oficiální terminologie pravděpodobně ještě neexistuje:

# Původní termín Přibližný český překlad Stručný popis významu termínu
1 message zpráva ucelená informace či data posílaná mezi producentem a konzumentem přes message brokera popř. větší množství message brokerů
2 record záznam alternativní pojmenování pro zprávu, používané především v některých streamovacích message brokerech, například v Apache Kafka
3 producer producent aplikace/proces, která vytváří zprávy a posílá je do message brokera
4 consumer konzument aplikace/proces, která zprávy z message brokera přijímá
5 consumer group skupina konzumentů skupina konzumentů přijímajících zprávy z jednoho společného tématu; používáno například v systému Apache Kafka
6 topic téma kategorie či značka, pod kterou je zpráva v brokeru uložena a publikována; používá se pro směrování a/nebo pro ukládání zpráv
7 subject téma alternativní označení pro topic; použito pouze v některých message brokerech
8 topic partition rozdělení tématu na oddíly, které mohou být rozmístěny na různé brokery v clusteru
9 replica replika/duplikát záložní kopie oddílu, typicky uložená na jiném brokeru
10 offset ofset unikátní identifikátor platný v rámci oddílu (popř. tématu), který umožňuje konzumentům specifikovat, kterou zprávu mají zpracovávat

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce. Každý příklad se skládá ze dvou samostatně překládaných a spouštěných souborů – producenta zpráv a konzumenta zpráv:

Příklad Skript/kód Popis Cesta
1 publisher.go základní varianta producenta zpráv bez kontroly chyb https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/01-simple-pub-sub/publisher.go
1 consumer.go základní varianta konzumenta zpráv bez kontroly chyb https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/01-simple-pub-sub/consumer.go
       
2 publisher.go vylepšená verze producenta zpráv s kontrolami a uzavíráním připojení https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/02-pub-sub-error-handling/publisher.go
2 consumer.go vylepšená verze konzumenta zpráv s kontrolami a uzavíráním připojení https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/02-pub-sub-error-handling/consumer.go
       
3 publisher.go producent deseti zpráv do zvoleného tématu https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/03-replay-all/publisher.go
3 consumer.go konzument přijímající (a přehrávající) všechny zprávy ze zvoleného tématu https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/03-replay-all/consumer.go
       
4 publisher.go producent deseti zpráv do zvoleného tématu (shodný s předchozím příkladem) https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/04-replay-from-last-received/publisher.go
4 consumer.go konzument přijímající zprávy od poslední přijaté zprávy https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/04-replay-from-last-received/consumer.go
       
5 publisher.go producent deseti zpráv do zvoleného tématu (shodný s předchozím příkladem) https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/05-start-at-sequence/publisher.go
5 consumer.go konzument, v němž je možné specifikovat pořadí zprávy v sekvenci https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/05-start-at-sequence/consumer.go
       
6 publisher.go producent deseti zpráv do zvoleného tématu (shodný s předchozím příkladem) https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/06-time-duration/publisher.go
6 consumer.go konzument, v němž se specifikuje časový okamžik, od kterého se má zpráva či zprávy přijmout https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/06-time-duration/consumer.go
       
7 publisher.go producent deseti zpráv do zvoleného tématu (opět shodný s předchozím příkladem) https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/07-durable-subscription/publisher.go
7 consumer.go konzument využívající tzv. durable subscription popsanou v šestnácté kapitole https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/07-durable-subscription/consumer.go

19. Odkazy na předchozí části seriálu o message brokerech

V této kapitole jsou uvedeny odkazy na všech třináct předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií message brokerů:

  1. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
  2. Celery: systém implementující asynchronní fronty úloh pro Python
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/
  3. Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/
  4. RabbitMQ: jedna z nejúspěšnějších implementací brokera
    https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/
  5. Pokročilejší operace nabízené systémem RabbitMQ
    https://www.root.cz/clanky/po­krocilejsi-operace-nabizene-systemem-rabbitmq/
  6. ØMQ: knihovna pro asynchronní předávání zpráv
    https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/
  7. Další možnosti poskytované knihovnou ØMQ
    https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/
  8. Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
    https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/
  9. Apache ActiveMQ – další systém implementující message brokera
    https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/
  10. Použití Apache ActiveMQ s protokolem STOMP
    https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/
  11. Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
    https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-amqp-jazyk-go-a-message-brokeri/
  12. Komunikace s message brokery z programovacího jazyka Go
    https://www.root.cz/clanky/komunikace-s-message-brokery-z-programovaciho-jazyka-go/
  13. Použití message brokeru NATS
    https://www.root.cz/clanky/pouziti-message-brokeru-nats/

20. Odkazy na Internetu

  1. NATS
    https://nats.io/about/
  2. NATS Streaming Concepts
    https://nats.io/documenta­tion/streaming/nats-streaming-intro/
  3. NATS Streaming Server
    https://nats.io/download/nats-io/nats-streaming-server/
  4. NATS Introduction
    https://nats.io/documentation/
  5. NATS Client Protocol
    https://nats.io/documenta­tion/internals/nats-protocol/
  6. NATS Messaging (Wikipedia)
    https://en.wikipedia.org/wi­ki/NATS_Messaging
  7. Stránka Apache Software Foundation
    http://www.apache.org/
  8. Informace o portu 5672
    http://www.tcp-udp-ports.com/port-5672.htm
  9. Třída MessagingHandler knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._handlers.MessagingHan­dler-class.html
  10. Třída Event knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._events.Event-class.html
  11. package stomp (Go)
    https://godoc.org/github.com/go-stomp/stomp
  12. Go language library for STOMP protocol
    https://github.com/go-stomp/stomp
  13. python-qpid-proton 0.26.0 na PyPi
    https://pypi.org/project/python-qpid-proton/
  14. Qpid Proton
    http://qpid.apache.org/proton/
  15. Using the AMQ Python Client
    https://access.redhat.com/do­cumentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/
  16. Apache ActiveMQ
    http://activemq.apache.org/
  17. Apache ActiveMQ Artemis
    https://activemq.apache.org/artemis/
  18. Apache ActiveMQ Artemis User Manual
    https://activemq.apache.or­g/artemis/docs/latest/index­.html
  19. KahaDB
    http://activemq.apache.or­g/kahadb.html
  20. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html
  21. Command Line Tools (Apache ActiveMQ)
    https://activemq.apache.org/activemq-command-line-tools-reference.html
  22. stomp.py 4.1.21 na PyPi
    https://pypi.org/project/stomp.py/
  23. Stomp Tutorial
    https://access.redhat.com/do­cumentation/en-US/Fuse_Message_Broker/5.5/html/Con­nectivity_Guide/files/FMBCon­nectivityStompTelnet.html
  24. Heartbeat (computing)
    https://en.wikipedia.org/wi­ki/Heartbeat_(computing)
  25. Apache Camel
    https://camel.apache.org/
  26. Red Hat Fuse
    https://developers.redhat­.com/products/fuse/overvi­ew/
  27. Confusion between ActiveMQ and ActiveMQ-Artemis?
    https://serverfault.com/qu­estions/873533/confusion-between-activemq-and-activemq-artemis
  28. Staré stránky projektu HornetQ
    http://hornetq.jboss.org/
  29. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  30. Difference between ActiveMQ vs Apache ActiveMQ Artemis
    http://activemq.2283324.n4­.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html
  31. Microservices communications. Why you should switch to message queues
    https://dev.to/matteojoli­veau/microservices-communications-why-you-should-switch-to-message-queues–48ia
  32. Stomp.py 4.1.19 documentation
    https://stomppy.readthedoc­s.io/en/stable/
  33. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  34. ØMQ – Distributed Messaging
    http://zeromq.org/
  35. ØMQ Community
    http://zeromq.org/community
  36. Get The Software
    http://zeromq.org/intro:get-the-software
  37. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  38. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  39. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  40. ZeroMQ RFC
    https://rfc.zeromq.org/
  41. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  42. zeromq/czmq
    https://github.com/zeromq/czmq
  43. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  44. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  45. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  46. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  47. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  48. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  49. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  50. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  51. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  52. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  53. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  54. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  55. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  56. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  57. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  58. Python binding
    http://zeromq.org/bindings:python
  59. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  60. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  61. About Nanomsg
    https://nanomsg.org/
  62. Advanced Message Queuing Protocol
    https://www.amqp.org/
  63. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  64. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  65. RabbitMQ
    https://www.rabbitmq.com/
  66. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  67. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  68. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  69. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  70. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  71. Erlang
    http://www.erlang.org/
  72. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  73. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  74. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  75. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  76. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  77. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  78. celery na PyPi
    https://pypi.org/project/celery/
  79. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  80. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  81. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  82. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  83. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  84. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  85. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  86. node-celery
    https://github.com/mher/node-celery
  87. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  88. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  89. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  90. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  91. Stránky projektu Redis
    https://redis.io/
  92. Introduction to Redis
    https://redis.io/topics/introduction
  93. Try Redis
    http://try.redis.io/
  94. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  95. Python Redis
    https://redislabs.com/lp/python-redis/
  96. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  97. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  98. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  99. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  100. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  101. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  102. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  103. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  104. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  105. Redis Persistence
    https://redis.io/topics/persistence
  106. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  107. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  108. Ost (knihovna)
    https://github.com/soveran/ost
  109. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  110. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  111. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  112. What Is Sharding?
    https://btcmanager.com/what-sharding/
  113. Redis clients
    https://redis.io/clients
  114. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  115. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  116. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  117. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  118. Resque
    https://github.com/resque/resque
  119. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  120. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  121. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  122. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  123. Pub/Sub
    https://redis.io/topics/pubsub
  124. ZeroMQ distributed messaging
    http://zeromq.org/
  125. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  126. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  127. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  128. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  129. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  130. Redis Protocol specification
    https://redis.io/topics/protocol
  131. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  132. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  133. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  134. Apache Kafka
    https://kafka.apache.org/
  135. Iron
    http://www.iron.io/mq
  136. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  137. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  138. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  139. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  140. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  141. Enqueueing internals
    http://python-rq.org/contrib/
  142. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  143. Queue – A thread-safe FIFO implementation
    https://pymotw.com/2/Queue/
  144. Queues
    http://queues.io/
  145. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  146. RestMQ
    http://restmq.com/
  147. ActiveMQ
    http://activemq.apache.org/
  148. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  149. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  150. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  151. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  152. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  153. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  154. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  155. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  156. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  157. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  158. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  159. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  160. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  161. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  162. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  163. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  164. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  165. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  166. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  167. Jupyter
    https://jupyter.org/
  168. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  169. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html