Obsah
2. Klasičtí message brokeři a jejich komunikační strategie
3. Alternativní přístup – streaming
4. Koncepty, na nichž je streaming založen
5. Nejznámější systémy podporující streaming
6. Instalace systému NATS Streaming Server
7. Spuštění NATS Streaming Serveru
8. Instalace balíčku potřebného pro implementaci klientů
9. Nejjednodušší varianta zdroje zpráv (producenta)
12. Přehrání všech zpráv poslaných do zvoleného tématu
13. Příjem zpráv od poslední přijaté zprávy
14. Specifikace pořadí zprávy v sekvenci
15. Specifikace časového okamžiku, od kterého se má zpráva či zprávy přijmout
16. Zajištění trvalého odběru zpráv zvoleným konzumentem (durable subscription)
17. Termíny používané v souvislosti s message brokery
18. Repositář s demonstračními příklady
19. Odkazy na předchozí části seriálu o message brokerech
1. NATS Streaming Server
V dnešním článku navážeme na téma, kterému jsme se věnovali minule. Připomeňme si, že jsme se zabývali popisem message brokera nazvaného NATS, který je vytvořen v programovacím jazyku Go, takže výsledkem je server, jenž je poměrně nenáročný na systémové zdroje (především na operační paměť a taktéž na diskový prostor), ovšem i přesto podporuje rychlé přeposílání a popř. rozesílání zpráv. Ve své základní konfiguraci se NATS používá především pro implementaci komunikační strategie publish-subscribe neboli zkráceně pub-sub.
Dnes se seznámíme s projektem nazvaným NATS Streaming Server, jehož zdrojové kódy i základní dokumentaci naleznete na adrese https://github.com/nats-io/nats-streaming-server. Tento projekt je určen pro takzvaný streaming zpráv, podobně jako známější a pravděpodobně i častěji nasazovaný projekt Apache Kafka, na nějž jsme pochopitelně nezapomněli – popíšeme si ho v některém dalším dílu seriálu o message brokerech.
NATS Streaming Server používá klasický systém (server) NATS, který navíc doplňuje o takzvaný storage, tj. o technologii určenou pro ukládání zpráv (někdy nazývaných i záznamy – record) do perzistentního úložiště, kterým může být relační databáze či soubor (resp. skupina souborů). Samotný NATS, jak již víme z předchozího článku, se skládá z několika komponent:
- V první řadě se jedná o samotný server, jenž se spouští příkazem gnatsd. Server je naprogramovaný v jazyce Go a při jeho vývoji bylo dbáno na to, aby byla zaručena vysoká dostupnost celé služby a přitom byla samotná služba s běžícím serverem málo náročná na systémové zdroje, především na spotřebu operační paměti (viz úvodní odstavec).
- Dalším typem komponenty jsou programátorská rozhraní pro klienty, která v současnosti existují pro několik ekosystémů (což je většinou kombinace programovacího jazyka, knihoven a popř. jeho virtuálního stroje). Prozatím jsme se seznámili s rozhraním určeným přímo pro jazyk Go.
- Třetí komponentou je dnes popisovaný NATS Streaming Server, který je opět naprogramován v jazyce Go. K tomuto serveru ještě musíme připočítat příslušné rozhraní dodávané (v případě jazyka Go) ve formě samostatného balíčku.
- Čtvrtým typem komponenty je takzvaný NATS Connector Framework zajišťující propojení systému NATS s dalšími technologiemi (XMPP, logování, notifikační služby aj.). Ten je naprogramovaný v Javě a v současnosti je podporován například konektor pro Redis (https://github.com/nats-io/nats-connector-redis). I této zajímavé problematice bude později věnován samostatný článek.
2. Klasičtí message brokeři a jejich komunikační strategie
V navazujících kapitolách se budeme věnovat takzvanému streamingu zpráv (či záznamů), což je možné pokládat za rozšíření služeb klasických message brokerů o možnost „přehrání“ starších zpráv kombinovanou s možností, aby tu samou zprávu zpracovávalo větší množství konzumentů. Pro upřesnění, v čemž se vlastně streaming odlišuje od běžných message brokerů si ještě jednou (v tomto seriálu naposledy :-) zopakujme dvě komunikační strategie, které většinou nalezneme u běžných message brokerů (Redis Queue, RabbitMQ, ActiveMQ atd.).
Většina message brokerů, s nimiž jsme se až doposud v tomto seriálu seznámili, podporovala dvě základní komunikační strategie. První z těchto strategií se nazývá publish-subscribe (neboli zkráceně pub-sub) a spočívá v tom, že se zprávy s nastaveným tématem (topic, subject) posílají do message brokera, který tyto zprávy ihned přeposílá konzumentům přihlášeným k danému tématu. V případě, že žádný takový konzument neexistuje, je zpráva zahozena. Pokud konzumentů daného tématu naopak existuje větší množství, je zpráva doručena všem takovým konzumentům. Samotný message broker v tomto případě nepotřebuje zprávy ukládat, takže se většinou nesetkáme ani s podporou pro persistenci zpráv. Příkladem takového systému je právě NATS (bez dalších rozšíření).
Obrázek 1: Komunikační strategie publish-subscribe.
Druhá komunikační strategie, která se někdy nazývá push-pull či pouze queueing, je založena na pojmenovaných frontách zpráv (message queue) implementovaných v message brokerovi. Zpráva bývá v tomto případě doručena jen jedinému konzumentovi a pokud žádný konzument není k frontě připojen, zůstane zpráva ve frontě uložena (teoreticky) po libovolně dlouhou dobu. Message broker v takovém případě typicky podporuje perzistenci zpráv, které tak dokážou přečkat jeho případné restarty. Mnoho message brokerů navíc umožňuje, aby konzumenti potvrzovali zpracování zprávy, popř. je dokonce možné provádět transakce. K pojmenovaným frontám se potom přidávají další specializované fronty na nezpracované zprávy (DLQ – Dead Letters Queue.
Obrázek 2: Komunikační strategie push-pull.
V klasickým message brokerech orientovaných na komunikaci s využitím zpráv je každá zpráva většinou spravována izolovaně od ostatních zpráv. I když některé implementace message brokerů podporují prioritní fronty, není obecně vyžadováno (a ani se to neočekává), aby se zprávy doručovaly v takovém pořadí, v jakém je message broker přijímá (což je u pojmenovaných front problematické). Toto chování pro mnohé účely plně dostačuje a z hlediska klientů (tedy producentů zpráv i jejich konzumentů) je použití takových message brokerů velmi jednoduché, protože si klienti vlastně vůbec nemusí pamatovat stav zpráv (seznam zpracovaných zpráv, přijatých ale nezpracovaných zpráv atd.).
Obrázek 3: Složitější komunikační strategie, které jsou některými message brokery podporovány.
Ovšem existují problémy, které se s použitím klasickým message brokerů implementují velmi složitě popř. je nelze (na rozumném HW) implementovat vůbec. Příkladem může být systém pro zaznamenání událostí, ovšem v tom pořadí, v jakém události vznikly a s možností zpětného přehrávání (replay) zpráv/událostí. A právě pro řešení těchto problémů vznikl streaming zpráv.
3. Alternativní přístup – streaming
Představme si například situaci, kdy je nutné zpracovávat informace získávané z nějakého „živého systému“, tyto informace nějakým způsobem upravit a následně je například uložit do logovacího souboru, ovšem s tím, že zůstane zachováno pořadí čtených a zpracovávaných informací. Popř. se může jednat o skutečný streaming nějakých dat, například o rastrové obrázky čtené z několika kamer, které je taktéž zapotřebí zpracovat, katalogizovat a popř. na konci celého řetězce uložit. K tomu můžeme chtít použít takzvaný replay, tj. možnost vrátit se k nějaké sekvenci zpráv (událostí, obrázků) z minulosti, ovšem se zachováním jejich pořadí. Právě v těchto případech může být výhodnější namísto klasických message brokerů použít brokery podporující streaming.
Streaming v první řadě nabízí uživatelům kombinaci možností klasických komunikačních strategií push-pull a pub-sub, protože umožňuje:
- Implementovat skupinu konzumentů zpracovávajících zvolené téma způsobem, který pracuje podobně jako klasická fronta (queue), tj. tak, že zprávy jsou postupně vybírány a zpracovávány v takovém pořadí, v jakém byly do message brokera poslány.
- Současně je ovšem možné jednu zprávu přeposílat většímu množství konzumentů či jejich skupin, což je vlastnost, která je očekávána od strategie pub-sub. Přeposílání může být v případě potřeby okamžité a je zcela řízena konzumenty, nikoli message brokerem.
Až doposud se koncept streamingu až nápadně podobá použití klasických front zpráv, ovšem rozdíly nalezneme v tom, jak jsou zprávy organizovány na message brokerovi a jak s nimi nakládají konzumenti (z pohledu producentů zpráv se většinou nic nemusí měnit, ovšem někdy je zapotřebí u zpráv explicitně specifikovat takzvaný oddíl – partition).
Například v systému Apache Kafka jsou jednotlivá témata rozdělena do oddílů (partitions), přičemž každý oddíl obsahuje neměnnou sekvenci zpráv. Oddíly pro jednotlivá témata lze zpracovávat v několika brokerech umístěných do clusteru a tak zajistit potřebný load balancing, případnou replikaci atd. Každá zpráva uložená do oddílu má přiřazen jednoznačný offset (buď celé číslo nebo časové razítko, v závislosti na konkrétní implementaci). Navíc je možné – podle konkrétního brokera – aby se pro každé téma udržovalo několik logů (partition logs), což umožňuje připojení většího množství konzumentů zpráv k jednomu tématu s tím, že tito konzumenti budou pracovat paralelně.
Navíc je většinou možné používat větší množství instancí brokerů, z nichž je vytvořen cluster. Zde se setkáme s důležitým termínem replikace – každý oddíl je typicky replikován na několika message brokerech v clusteru (ovšem nemusí se jednat o všechny brokery, replikace se provádí například na tři brokery v clusteru).
4. Koncepty, na nichž je streaming založen
To, že každá zpráva uložená do oddílu má přiřazen offset, umožňuje, aby konzumenti zpráv specifikovali, od jakého offsetu potřebují zprávy přijímat. Díky tomu je možné, aby se konzumenti připojovali a odpojovali v jakýkoli okamžik a přitom měli možnost řídit, s jakými zprávami budou pracovat. Konzument se například po pozdějším připojení může rozhodnout, že bude zpracovávat nejnovější data a na případná starší data si (možná) vyhradí svůj strojový čas později. Umožněn je i takzvaný replay, tj. zpracování určité sekvence zpráv z minulosti. Postačuje znát jen offset první zprávy ze sekvence.
Samotné posílání zpráv konzumentů si přitom řídí sami konzumenti, kteří si zprávy (téma+oddíl+offset) musí vyžádat. Díky tomu si sami konzumenti určují, kolik zpráv zpracují a jak tedy budou zatíženi.
To však ve skutečnosti není vše, protože zprávy poslané do message brokera s podporou streamingu se v čase chovají odlišně, než v případě použití běžné fronty. Zprávy jsou totiž rozšířeny o své pořadové číslo a většinou i o časové razítko. Takto rozšířené zprávy jsou uloženy do takzvaného logu s daným tématem, kde jsou uchovány buď po neomezeně dlouhou dobu, nebo až do chvíle, kdy je překročena kapacita logu popř. maximální doba životnosti zprávy. Pokud dojde k alespoň jedné této události, budou nejstarší zprávy smazány (kapacita logu je většinou zadána jak celkovým objemem, tak i maximálním počtem zpráv, což je ostatně i případ NATS Streaming Serveru). Nezáleží tedy na tom, zda a kolika konzumenty byla zpráva přečtena a zpracována – přečtení a zpracování zprávy nemá vliv na její umístění v logu, což vlastně znamená, že většinou dochází k určitému zesložitění konzumentů, kteří si musí pamatovat, které zprávy s daným tématem již zpracovaly a které nikoli.
Aby bylo možné konfigurovat a řídit, které zprávy mají být na message brokeru uloženy a které již smazány, specifikuje se takzvaný retention time zajišťující, aby počet zpráv/záznamů nepřekročil časovou mez. Mnohé streaming servery dokážou omezit i celkový počet zpráv, počet zpráv v tématu a/nebo počet zpráv na jednom serveru v clusteru. Totéž omezení je možné aplikovat na celkovou velikost použitého paměťového či diskového prostoru. Ostatně tyto informace uvidíme i při spuštění NATS Streaming Serveru v sedmé kapitole.
Dále se u streaming serverů setkáme s možností zapojení více serverů do clusteru, což podporuje jak dnes popisovaný NATS Streaming Server, tak i Apache Kafka.
5. Nejznámější systémy podporující streaming
Mezi nejznámější a pravděpodobně nejčastěji nasazované systémy podporující streaming patří projekt Apache Kafka a taktéž dnes popisovaný NATS Streaming Server. Vzhledem k tomu, že NATS Streaming Server je popsán v navazujících kapitolách, řekneme si v této kapitole základní informace o „konkurenčním“ systému Apache Kafka (slovo „konkurenční“ je zapsáno v uvozovkách především z toho důvodu, že každý systém má svoji niku, ve které se používá a tyto niky se překrývají jen částečně).
Apache Kafka umožňuje ukládání zpráv (zde se ovšem používá termín záznam – record) do různých témat, přičemž každé téma je rozděleno do oddílů neboli partition (samozřejmě je možné pro téma vyhradit pouze jediný oddíl). Rozdělení do oddílů se provádí z několika důvodů. Jedním z nich je rozdělení zátěže, protože jednotlivé oddíly mohou být provozovány na různých počítačích v clusteru. To však není vše, jelikož je ve skutečnosti konfigurace poněkud složitější – každý oddíl totiž může být replikován na více počítačích, přičemž jeden z těchto oddílů je „leaderem“ a ostatní jsou „followeři“. Zápis nových zpráv, popř. čtení se provádí vždy jen v leaderu, ovšem změny jsou replikovány na všechny kopie oddílu. Ve chvíli, kdy dojde k pádu „leadera“, převezme jeho roli jeden z dalších uzlů. Pokud tedy existuje N uzlů s replikou oddílu, bude systém funkční i ve chvíli, kdy zhavaruje N-1 uzlů!
Téma zpracovávané Kafkou může na clusteru vypadat například následovně:
+---+---+---+---+---+---+ oddíl #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ oddíl #1 | 0 | 1 | 2 | ... +---+---+---+ oddíl #2 | ... +---+---+---+---+---+---+---+---+---+ oddíl #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+
Boxy s čísly odpovídají jednotlivým zprávám, kterým jsou tato pořadová čísla v sekvenci postupně přiřazována. Zápis nových zpráv je prováděn do oblastí označených třemi tečkami. Z tohoto diagramu můžeme odvodit, že každý oddíl obsahuje vlastní sekvenci zpráv/záznamů, ke kterým se postupně připojují záznamy další.
Pozor si musíme dát především na to, že pořadí záznamů je zachováno a garantováno pouze v rámci jednoho oddílu a nikoli pro celé téma. V případě, že požadujeme, aby všechny zprávy v tématu měly zaručené pořadí, používá se pouze jeden oddíl pro celé téma, což má ovšem vliv na další vlastnosti řešení – především se snižuje výkonnost a možnost rozkládání zátěže v rámci clusteru.
Producent při posílání zprávy volí jak téma (což je logické), tak i oddíl. Volbu oddílu je možné provést na základě mnoha kritérií. Například lze použít jednoduchý algoritmus round robin a používat oddíly čistě pro rozložení zátěže. Nebo se může oddíl zvolit například na základě ID přihlášeného uživatele, poslední části IP počítače, na němž je spuštěn producent atd. atd.
Podobně jako u dalších message brokerů s podporou streamování se i v systému Apache Kafka o výběr zpráv, které se mají zpracovat, starají přímo konzumenti, kteří mohou specifikovat offset zprávy. Konzumenti se spojují do skupin, přičemž zprávu získá vždy jeden z konzumentů ze skupiny (což se podobá strategii pub-sub).
Apache Kafka podporuje i speciální API používané ve chvíli, kdy potřebujeme implementovat uzel (proces), který získá zprávu, nějakým způsobem ji zpracuje a poté ji pošle zpět do message brokera. Takový uzel je samozřejmě možné naimplementovat tak, že se bude současně jednat o konzumenta i producenta, ovšem využití speciálního API je efektivnější.
6. Instalace systému NATS Streaming Server
Nyní si ve stručnosti ukážeme, jak vlastně probíhá instalace systému NATS Streaming Server. V této kapitole se předpokládá, že již máte nainstalován NATS Server, který byl popsán v předchozím článku. Pokud tomu tak není, postupujte následujícími dvěma kroky. Nejprve se přesuneme do adresáře, na který odkazuje proměnná prostředí $GOPATH, což je typicky adresář „~/go“:
$ cd $GOPATH
Dále spustíme příkaz pro nainstalování balíčku, který obsahuje server NATSu:
$ go get github.com/nats-io/gnatsd
Nyní již konečně můžeme přistoupit k instalaci samotného NATS Streaming Serveru. Budeme postupovat podobně, jako při instalaci NATS Serveru, tj. nejdříve se přesuneme do adresáře, na který odkazuje proměnná prostředí $GOPATH:
$ cd $GOPATH
Instalace balíčku se streaming serverem:
$ go get github.com/nats-io/nats-streaming-server
Po instalaci zdrojových kódů musíme NATS Streaming Server přeložit, a to tímto příkazem:
$ cd $GOPATH/src/github.com/nats-io/nats-streaming-server $ go build
Po překladu by měl v aktuálním adresáři vzniknout soubor pojmenovaný nats-streaming-server. Zkusíme si zobrazit jeho nápovědu:
$ ./nats-streaming-server --help
S výsledkem:
Usage: nats-streaming-server [options] Streaming Server Options: -cid, --cluster_id <string> Cluster ID (default: test-cluster) -st, --store <string> Store type: MEMORY|FILE|SQL (default: MEMORY) --dir <string> For FILE store type, this is the root directory -mc, --max_channels <int> Max number of channels (0 for unlimited) -msu, --max_subs <int> Max number of subscriptions per channel (0 for unlimited) -mm, --max_msgs <int> Max number of messages per channel (0 for unlimited) -mb, --max_bytes <size> Max messages total size per channel (0 for unlimited) -ma, --max_age <duration> Max duration a message can be stored ("0s" for unlimited) -mi, --max_inactivity <duration> Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited) -ns, --nats_server <string> Connect to this external NATS Server URL (embedded otherwise) -sc, --stan_config <string> Streaming server configuration file -hbi, --hb_interval <duration> Interval at which server sends heartbeat to a client -hbt, --hb_timeout <duration> How long server waits for a heartbeat response -hbf, --hb_fail_count <int> Number of failed heartbeats before server closes the client connection --ft_group <string> Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore -sl, --signal <signal>[=<pid>] Send signal to nats-streaming-server process (stop, quit, reopen) --encrypt <bool> Specify if server should use encryption at rest --encryption_cipher <string> Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES --encryption_key <sting> Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead Streaming Server Clustering Options: --clustered <bool> Run the server in a clustered configuration (default: false) --cluster_node_id <string> ID of the node within the cluster if there is no stored ID (default: random UUID) --cluster_bootstrap <bool> Bootstrap the cluster if there is no existing state by electing self as leader (default: false) --cluster_peers <string, ...> Comma separated list of cluster peer node IDs to bootstrap cluster state --cluster_log_path <string> Directory to store log replication data --cluster_log_cache_size <int> Number of log entries to cache in memory to reduce disk IO (default: 512) --cluster_log_snapshots <int> Number of log snapshots to retain (default: 2) --cluster_trailing_logs <int> Number of log entries to leave after a snapshot and compaction --cluster_sync <bool> Do a file sync after every write to the replication log and message store --cluster_raft_logging <bool> Enable logging from the Raft library (disabled by default) Streaming Server File Store Options: --file_compact_enabled <bool> Enable file compaction --file_compact_frag <int> File fragmentation threshold for compaction --file_compact_interval <int> Minimum interval (in seconds) between file compactions --file_compact_min_size <size> Minimum file size for compaction --file_buffer_size <size> File buffer size (in bytes) --file_crc <bool> Enable file CRC-32 checksum --file_crc_poly <int> Polynomial used to make the table used for CRC-32 checksum --file_sync <bool> Enable File.Sync on Flush --file_slice_max_msgs <int> Maximum number of messages per file slice (subject to channel limits) --file_slice_max_bytes <size> Maximum file slice size - including index file (subject to channel limits) --file_slice_max_age <duration> Maximum file slice duration starting when the first message is stored (subject to channel limits) --file_slice_archive_script <string> Path to script to use if you want to archive a file slice being removed --file_fds_limit <int> Store will try to use no more file descriptors than this given limit --file_parallel_recovery <int> On startup, number of channels that can be recovered in parallel --file_truncate_bad_eof <bool> Truncate files for which there is an unexpected EOF on recovery, dataloss may occur Streaming Server SQL Store Options: --sql_driver <string> Name of the SQL Driver ("mysql" or "postgres") --sql_source <string> Datasource used when opening an SQL connection to the database --sql_no_caching <bool> Enable/Disable caching for improved performance --sql_max_open_conns <int> Maximum number of opened connections to the database Streaming Server TLS Options: -secure <bool> Use a TLS connection to the NATS server without verification; weaker than specifying certificates. -tls_client_key <string> Client key for the streaming server -tls_client_cert <string> Client certificate for the streaming server -tls_client_cacert <string> Client certificate CA for the streaming server Streaming Server Logging Options: -SD, --stan_debug=<bool> Enable STAN debugging output -SV, --stan_trace=<bool> Trace the raw STAN protocol -SDV Debug and trace STAN --syslog_name On Windows, when running several servers as a service, use this name for the event source (See additional NATS logging options below) Embedded NATS Server Options: -a, --addr <string> Bind to host address (default: 0.0.0.0) -p, --port <int> Use port for clients (default: 4222) -P, --pid <string> File to store PID -m, --http_port <int> Use port for http monitoring -ms,--https_port <int> Use port for https monitoring -c, --config <string> Configuration file Logging Options: -l, --log <string> File to redirect log output -T, --logtime=<bool> Timestamp log entries (default: true) -s, --syslog <string> Enable syslog as log method -r, --remote_syslog <string> Syslog server addr (udp://localhost:514) -D, --debug=<bool> Enable debugging output -V, --trace=<bool> Trace the raw protocol -DV Debug and trace Authorization Options: --user <string> User required for connections --pass <string> Password required for connections --auth <string> Authorization token required for connections TLS Options: --tls=<bool> Enable TLS, do not verify clients (default: false) --tlscert <string> Server certificate file --tlskey <string> Private key for server certificate --tlsverify=<bool> Enable TLS, verify client certificates --tlscacert <string> Client certificate CA for verification NATS Clustering Options: --routes <string, ...> Routes to solicit and connect --cluster <string> Cluster URL for solicited routes Common Options: -h, --help Show this message -v, --version Show version --help_tls TLS help.
7. Spuštění NATS Streaming Serveru
V případě, že se nápověda skutečně zobrazila, můžeme se pokusit spustit NATS Streaming Server s výchozí konfigurací. Každé (další) spuštění serveru by mělo vypadat následovně:
$ cd $GOPATH/src/github.com/nats-io/nats-streaming-server $ ./nats-streaming-server
Na terminálu by se měly zobrazit přibližně následující informace, z nichž některé budou velmi důležité pro naše další pokusy:
[3568] 2019/04/05 17:50:18.681037 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.2 [3568] 2019/04/05 17:50:18.681156 [INF] STREAM: ServerID: fxLHdxa4OjkdnXtNZaYHSC [3568] 2019/04/05 17:50:18.681178 [INF] STREAM: Go version: go1.11.2 [3568] 2019/04/05 17:50:18.681621 [INF] STREAM: Git commit: [not set] [3568] 2019/04/05 17:50:18.683122 [INF] Starting nats-server version 1.4.1 [3568] 2019/04/05 17:50:18.683172 [INF] Git commit [not set] [3568] 2019/04/05 17:50:18.683422 [INF] Listening for client connections on 0.0.0.0:4222 [3568] 2019/04/05 17:50:18.683447 [INF] Server is ready [3568] 2019/04/05 17:50:18.712436 [INF] STREAM: Recovering the state... [3568] 2019/04/05 17:50:18.712549 [INF] STREAM: No recovered state [3568] 2019/04/05 17:50:18.963128 [INF] STREAM: Message store is MEMORY [3568] 2019/04/05 17:50:18.963226 [INF] STREAM: ---------- Store Limits ---------- [3568] 2019/04/05 17:50:18.963255 [INF] STREAM: Channels: 100 * [3568] 2019/04/05 17:50:18.963271 [INF] STREAM: --------- Channels Limits -------- [3568] 2019/04/05 17:50:18.963291 [INF] STREAM: Subscriptions: 1000 * [3568] 2019/04/05 17:50:18.963307 [INF] STREAM: Messages : 1000000 * [3568] 2019/04/05 17:50:18.963323 [INF] STREAM: Bytes : 976.56 MB * [3568] 2019/04/05 17:50:18.963339 [INF] STREAM: Age : unlimited * [3568] 2019/04/05 17:50:18.963360 [INF] STREAM: Inactivity : unlimited * [3568] 2019/04/05 17:50:18.963375 [INF] STREAM: ----------------------------------
Za povšimnutí stojí především čtyři zvýrazněné informace:
- Důležité je především jméno clusteru, v rámci něhož server běží. Jedná se o důležitou informaci z toho důvodu, že jméno clusteru budeme potřebovat při připojování klientů.
- Dále si povšimněte portu, na který se budou připojovat klienti. Pokud se výchozí hodnota portu nezmění, nemusíme ji explicitně v klientech specifikovat.
- Pravděpodobně nejdůležitější je informace o tom, že zprávy jsou drženy pouze v operační paměti a nejsou meziukládány na disk. Toto nastavení je možné změnit a využít pro ukládání zpráv například relační databáze či přímo souborový systém. Výchozí hodnota MEMORY znamená, že po restartu serveru budou zprávy ztraceny.
- Hodnota unlimited vlastnosti Age říká, že předem neomezujeme maximální životnost zpráv, což v praxi znamená, že pokud bude zpráv méně než jeden milion (informace o dva řádky výše) a nepřesáhne se maximální kapacita kanálu, nebudou zprávy smazány.
Server nechte spuštěný v samostatném terminálu, protože se ještě vrátíme k dalším zprávám, které budou na tento terminál zapsány po připojení prvního producenta zpráv.
8. Instalace balíčku potřebného pro implementaci klientů
Kromě běžícího serveru pro streaming zpráv budeme při tvorbě demonstračních příkladů potřebovat ještě jeden balíček, který se jmenuje go-nats-streaming. Tento balíček zajišťuje rozhraní mezi klienty a NATS Streaming Serverem. Nainstalujeme ho naprosto stejným způsobem jako jakýkoli jiný balíček určený pro programovací jazyk Go:
$ cd $GOPATH $ go get github.com/nats-io/go-nats-streaming
Import tohoto balíčku v demonstračních klientech bude vypadat takto:
import streaming "github.com/nats-io/go-nats-streaming"
9. Nejjednodušší varianta zdroje zpráv (producenta)
Nejprve si ukážeme, jakým způsobem je možné vytvořit producenta zpráv. Ten se svou činností vlastně vůbec neodlišuje od producentů zpráv posílaných do běžných message brokerů – samotná zpráva musí mít pouze nastaveno téma (topic) a na tělo zprávy nejsou kladena žádná omezení, protože se jedná o sekvenci bajtů.
V tom nejjednodušším případě se pouze připojíme k zadanému clusteru (viz též sedmou kapitolu, kde jsme si ukázali, kde jméno clusteru hledat) a specifikujeme jméno klienta, které by ideálně mělo být jednoznačné:
stream, _ := streaming.Connect(clusterId, clientId)
Zpráva s libovolným obsahem (sekvencí bajtů) se publikuje metodou Publish:
stream.Publish(topic, []byte("Hello World"))
Nakonec nesmíme zapomenout uzavřít připojení k serveru:
stream.Close()
Úplný zdrojový kód producenta zprávy naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/01-simple-pub-sub/publisher.go:
package main import streaming "github.com/nats-io/go-nats-streaming" const clusterId = "test-cluster" const clientId = "publisher1" const topic = "topic1" func main() { stream, _ := streaming.Connect(clusterId, clientId) stream.Publish(topic, []byte("Hello World")) stream.Close() }
Pokud klienta přeložíte a spustíte, můžete si v terminálu, kde běží NATS Streaming Server povšimnout nové zprávy:
[3568] 2019/04/05 17:50:42.121546 [INF] STREAM: Channel "topic1" has been created
Vidíme, že server správně poznal, že poslaná zpráva obsahuje prozatím neexistující téma, pro které byl ihned vytvořen takzvaný kanál.
10. Konzument zpráv
Jak je to u systémů, v nichž se posílají zprávy, běžné, je implementace konzumenta zpráv nepatrně složitější, než u jejich producenta. Je tomu tak proto, že konzument dopředu neví, kolik zpráv získá a musí na ně čekat – buď v explicitně zapsané smyčce (což nebude náš případ), nebo s využitím nějaké formy smyčky událostí.
Připojení k serveru se provádí stejným způsobem, jako u producenta. V nejjednodušším případě nám postačuje znát jméno clusteru a zadat libovolné jméno klienta:
stream, _ := streaming.Connect(clusterId, clientId)
K odběru zpráv je nutné se přihlásit metodou Subscribe, které se předá téma, které konzumenta zajímá a taktéž reference na callback funkci zavolanou ve chvíli, kdy bude zpráva přijata:
sub, _ := stream.Subscribe(topic, onReceive)
Samotná callback funkce je po příjmu zprávy zavolána s jediným parametrem, jímž je samotná zpráva:
func onReceive(m *streaming.Msg) { println("Received message: ", string(m.Data)) }
Na konci se odhlásíme od odběru zpráv a uzavřeme připojení k serveru:
sub.Unsubscribe() stream.Close()
Problém ovšem spočívá v tom, že samotné čekání na zprávy je prováděno asynchronně k běhu gorutiny s funkcí main. Proto musíme zajistit, aby funkce main nebyla ihned ukončena. K tomu může posloužit známý trik s kanálem, z něhož budeme číst hodnotu, kterou ovšem ve skutečnosti nikdo nepošle:
c := make(chan bool) sub, _ := stream.Subscribe(topic, onReceive) <-c
Opět se podívejme na úplný zdrojový kód konzumenta zpráv, který nalezneme na adrese https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/01-simple-pub-sub/consumer.go:
package main import streaming "github.com/nats-io/go-nats-streaming" const clusterId = "test-cluster" const clientId = "consumer1" const topic = "topic1" func onReceive(m *streaming.Msg) { println("Received message: ", string(m.Data)) } func main() { stream, _ := streaming.Connect(clusterId, clientId) c := make(chan bool) sub, _ := stream.Subscribe(topic, onReceive) <-c sub.Unsubscribe() stream.Close() }
11. Vylepšení producenta i konzumenta zpráv – test všech chybových stavů, použití defer pro uzavírání prostředků
Předchozí implementace obou klientů, tj. jak producenta, tak i konzumenta zpráv, byly naprogramovány takovým způsobem, aby byly co nejjednodušší a nejkratší. Proto jsme vynechali i kontroly případných chyb a taktéž zavírání prostředků (připojení k serveru) je provedeno nekorektně a nutno říci, že ne zcela idiomatickým způsobem (minimálně pro projekt naprogramovaný v Go). Zdrojové kódy je však možné vylepšit. Po pokusu o připojení k systému NATS Streaming zkontrolujeme případný chybový stav a s využitím bloku defer zajistíme, že se klient před odchodem z funkce main automaticky odpojí:
stream, err := streaming.Connect(clusterId, clientId) if err != nil { log.Fatalf("Can not connect to cluster %s\n", clusterId) } defer stream.Close()
Chyba pochopitelně může nastat i ve chvíli, kdy se budeme snažit publikovat nějakou zprávu, tj. poslat ji do určeného tématu. Proto otestujeme i hodnotu (jediné) návratové hodnoty z metody Publish:
err = stream.Publish(topic, []byte("Hello World")) if err != nil { log.Fatalf("Error during publish: %v\n", err) } else { log.Println("Message published") }
Nezávisle na tom, zda se zavolá funkce log.Fatalf dojde k ukončení připojení, protože se blok defer zavolá i v takovém případě.
Následuje zdrojový kód upraveného producenta zpráv:
package main import ( streaming "github.com/nats-io/go-nats-streaming" "log" ) const clusterId = "test-cluster" const clientId = "publisher1" const topic = "topic1" func main() { stream, err := streaming.Connect(clusterId, clientId) if err != nil { log.Fatalf("Can not connect to cluster %s\n", clusterId) } defer stream.Close() err = stream.Publish(topic, []byte("Hello World")) if err != nil { log.Fatalf("Error during publish: %v\n", err) } else { log.Println("Message published") } }
U konzumenta zpráv nesmíme zapomenout především na to, aby se za každých okolností podařilo odhlášení odběru zpráv. Tuto činnost opět zajistíme použitím bloku uvedeného za klíčovým slovem defer. Samozřejmě přidáme i podmínku testující, zda se podařilo přihlášení klienta k odběru zpráv z určitého tématu:
sub, err := stream.Subscribe(topic, onReceive) if err != nil { log.Fatalf("Can not subscribe to topic %s\n", topic) } defer sub.Unsubscribe()
Opět se podívejme, jak vypadá upravená varianta konzumenta zpráv:
package main import ( streaming "github.com/nats-io/go-nats-streaming" "log" ) const clusterId = "test-cluster" const clientId = "consumer1" const topic = "topic1" func onReceive(m *streaming.Msg) { println("Received message: ", string(m.Data)) } func main() { stream, err := streaming.Connect(clusterId, clientId) if err != nil { log.Fatalf("Can not connect to cluster %s\n", clusterId) } defer stream.Close() c := make(chan bool) sub, err := stream.Subscribe(topic, onReceive) if err != nil { log.Fatalf("Can not subscribe to topic %s\n", topic) } defer sub.Unsubscribe() <-c }
Chování obou klientů by mělo být v případě, že nedojde k žádné chybě, shodné s prvním demonstračním příkladem.
12. Přehrání všech zpráv poslaných do zvoleného tématu
V této kapitole si ukážeme, jakým způsobem lze naprogramovat klienta, který po svém připojení k NATSu zpracuje všechny zprávy, které se nachází ve zvoleném tématu. Povšimněte si, že tato operace není v běžných message brokerech dostupná, protože jednou zpracované zprávy jsou z fronty odstraněny a nelze se k nim tedy vracet. Nejprve však upravíme producenta zpráv takovým způsobem, aby po svém spuštění vytvořil deset zpráv s různým tělem (odlišným řetězcem zakódovaným do sekvence bajtů):
for i := 1; i <= 10; i++ { message := fmt.Sprintf("Hello World #%d", i) err = stream.Publish(topic, []byte(message)) if err != nil { log.Fatalf("Error during publish: %v\n", err) } else { log.Println("Message published: %s", message) } }
Upravený zdrojový kód producenta zpráv je dostupný na adrese https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/03-replay-all/publisher.go:
package main import ( "fmt" streaming "github.com/nats-io/go-nats-streaming" "log" ) const clusterId = "test-cluster" const clientId = "publisher1" const topic = "topic1" func main() { stream, err := streaming.Connect(clusterId, clientId) if err != nil { log.Fatalf("Can not connect to cluster %s\n", clusterId) } defer stream.Close() for i := 1; i <= 10; i++ { message := fmt.Sprintf("Hello World #%d", i) err = stream.Publish(topic, []byte(message)) if err != nil { log.Fatalf("Error during publish: %v\n", err) } else { log.Println("Message published: %s", message) } } }
Konzument zpráv musí být naprogramován odlišně od „klasického“ konzumenta, který pouze očekává nové zprávy. Před přihlášením k nějakému tématu (topicu) je totiž nutné specifikovat, že budeme chtít přehrát všechny zprávy v tématu. Provede se to s využitím nepovinného parametru předaného do metody Subscribe, kterému předáme výsledek volání funkce DeliverAllAvailable():
startOpt := streaming.DeliverAllAvailable() sub, err := stream.Subscribe(topic, onReceive, startOpt)
Následuje výpis úplného zdrojového kódu konzumenta:
package main import ( streaming "github.com/nats-io/go-nats-streaming" "log" ) const clusterId = "test-cluster" const clientId = "consumer1" const topic = "topic1" func onReceive(m *streaming.Msg) { println("Received message: ", string(m.Data)) } func main() { stream, err := streaming.Connect(clusterId, clientId) if err != nil { log.Fatalf("Can not connect to cluster %s\n", clusterId) } defer stream.Close() c := make(chan bool) startOpt := streaming.DeliverAllAvailable() sub, err := stream.Subscribe(topic, onReceive, startOpt) if err != nil { log.Fatalf("Can not subscribe to topic %s\n", topic) } defer sub.Unsubscribe() <-c }
Nyní můžeme provést následující operace:
- Spustíte dvakrát či třikrát producenta zpráv, který při každém běhu vytvoří deset nových zpráv.
- Spustíte konzumenta, jenž by měl postupně načíst a vypsat všechny zprávy.
- Spustíte konzumenta znovu, opět by se měly přehrát všechny zprávy.
13. Příjem zpráv od poslední přijaté zprávy
Relativně snadno lze docílit toho, aby konzument po svém připojení získal poslední přijatou zprávu a samozřejmě i všechny zprávy následující. Sice by se mohlo zdát, že získání zprávy, která již byla jednou poslána, je nadbytečné, ale zde se počítá s tím, že konzument při zpracování této zprávy zhavaroval a proto se mu pro jistotu pošle znovu. Tohoto chování docílíme takto:
startOpt := streaming.StartWithLastReceived() sub, err := stream.Subscribe(topic, onReceive, startOpt) if err != nil { log.Fatalf("Can not subscribe to topic %s\n", topic) } defer sub.Unsubscribe()
Pokud pro řešení nějakého problému vyhovuje spíše takové chování, kdy se pošle až následující zpráva, podívejte se na demonstrační příklad popsaný v šestnácté kapitole.
Následuje výpis upraveného zdrojového kódu konzumenta:
package main import ( streaming "github.com/nats-io/go-nats-streaming" "log" ) const clusterId = "test-cluster" const clientId = "consumer1" const topic = "topic1" func onReceive(m *streaming.Msg) { println("Received message: ", string(m.Data)) } func main() { stream, err := streaming.Connect(clusterId, clientId) if err != nil { log.Fatalf("Can not connect to cluster %s\n", clusterId) } defer stream.Close() c := make(chan bool) // jeste jednou ziska posledni zpravu zpracovanou minule startOpt := streaming.StartWithLastReceived() sub, err := stream.Subscribe(topic, onReceive, startOpt) if err != nil { log.Fatalf("Can not subscribe to topic %s\n", topic) } defer sub.Unsubscribe() <-c }
Zdrojový kód producenta si již v článku ukazovat nebudeme, protože je shodný s producentem předchozím.
14. Specifikace pořadí zprávy v sekvenci
V úvodních kapitolách jsme si řekli, že zprávám zařazovaným do zvoleného tématu je přiřazováno celé číslo odpovídající pořadí zprávy (v Apache Kafka je toto číslo přiřazováno v rámci oddílu, nikoli celého tématu). Klienta, přesněji řečeno konzumenta zpráv, lze naprogramovat takovým způsobem, aby začal zprávy zpracovávat od zvoleného offsetu:
startOpt := streaming.StartAtSequence(40) sub, err := stream.Subscribe(topic, onReceive, startOpt) if err != nil { log.Fatalf("Can not subscribe to topic %s\n", topic) } defer sub.Unsubscribe()
Můžeme si to snadno vyzkoušet:
- Spusťte ještě několikrát producenta zpráv, aby se do tématu „topic1“ vložilo minimálně padesát zpráv.
- Spusťte upraveného konzumenta, který by měl začít zpracovávat až čtyřicátou první zprávu.
Alternativně je možné producenta a konzumenta upravit takovým způsobem, aby začali používat odlišné téma.
Upravený konzument zpráv je uložen na adrese https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/05-start-at-sequence/consumer.go:
package main import ( streaming "github.com/nats-io/go-nats-streaming" "log" ) const clusterId = "test-cluster" const clientId = "consumer1" const topic = "topic1" func onReceive(m *streaming.Msg) { println("Received message: ", string(m.Data)) } func main() { stream, err := streaming.Connect(clusterId, clientId) if err != nil { log.Fatalf("Can not connect to cluster %s\n", clusterId) } defer stream.Close() c := make(chan bool) startOpt := streaming.StartAtSequence(40) sub, err := stream.Subscribe(topic, onReceive, startOpt) if err != nil { log.Fatalf("Can not subscribe to topic %s\n", topic) } defer sub.Unsubscribe() <-c }
15. Specifikace časového okamžiku, od kterého se má zpráva či zprávy přijmout
V dalším demonstračním příkladu, jehož zdrojový kód naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/06-time-duration/consumer.go je ukázáno, že při čtení a zpracovávání zpráv je možné určit i časový okamžik určující, že budeme chtít získat všechny zprávy, které jsou novější/mladší, než uvedený okamžik (každá zpráva má přiřazeno časové razítko). Užitečné přitom je, že standardní balíček time obsahuje funkci pro parsing řetězců s uvedením času v lidské podobě, například „5d“, „3m“ atd. Tento řetězec se převede na časový údaj (interval), který je použit pro výpočet časového offsetu:
package main import ( streaming "github.com/nats-io/go-nats-streaming" "log" "os" "time" ) const clusterId = "test-cluster" const clientId = "consumer1" const topic = "topic1" func onReceive(m *streaming.Msg) { println("Received message: ", string(m.Data)) } func main() { stream, err := streaming.Connect(clusterId, clientId) if err != nil { log.Fatalf("Can not connect to cluster %s\n", clusterId) } defer stream.Close() c := make(chan bool) ago := "5m" timeDelta, err := time.ParseDuration(ago) if err != nil { log.Fatalf("Unable to parse duration '%s'", ago) os.Exit(-1) } println("About to receive all messages newer than", timeDelta) startOpt := streaming.StartAtTimeDelta(timeDelta) sub, err := stream.Subscribe(topic, onReceive, startOpt) if err != nil { log.Fatalf("Can not subscribe to topic %s\n", topic) } defer sub.Unsubscribe() <-c }
16. Zajištění trvalého odběru zpráv zvoleným konzumentem (durable subscription)
Již ve třinácté kapitole jsme se seznámili se způsobem implementace klienta, který začne zpracovávat zprávy od poslední poslané zprávy (nezávisle na tom, o jakého klienta se jednalo). Existuje však ještě jeden způsob zajištění, aby klient i po svém pádu či pouhém znovupřipojení získal všechny zprávy, které ještě nestihl zpracovat. Tento způsob se v systému NATS Streaming nazývá durable subscription a je založen na jménu, pod kterým si NATS pamatuje poslední poslanou zprávu. Pokud každý klient použije jednoznačné jméno, bude celý proces funkční podle předpokladů – každý klient bude mít vlastní „stream“ zpráv:
const durableName = "durableXYZZY" startOpt := streaming.DurableName(durableName) sub, err := stream.Subscribe(topic, onReceive, startOpt) if err != nil { log.Fatalf("Can not subscribe to topic %s\n", topic) } defer sub.Unsubscribe()
Opět si samozřejmě ukážeme zdrojový kód konzumenta zpráv:
package main import ( streaming "github.com/nats-io/go-nats-streaming" "log" ) const clusterId = "test-cluster" const clientId = "consumer1" const topic = "topic1" const durableName = "durableXYZZY" func onReceive(m *streaming.Msg) { println("Received message: ", string(m.Data)) } func main() { stream, err := streaming.Connect(clusterId, clientId) if err != nil { log.Fatalf("Can not connect to cluster %s\n", clusterId) } defer stream.Close() c := make(chan bool) startOpt := streaming.DurableName(durableName) sub, err := stream.Subscribe(topic, onReceive, startOpt) if err != nil { log.Fatalf("Can not subscribe to topic %s\n", topic) } defer sub.Unsubscribe() <-c }
17. Termíny používané v souvislosti s message brokery
V této kapitole jsou zmíněny významy některých termínů, s nimiž se v souvislosti s message brokery a se streamingem často setkáme. České překlady jsou ovšem pouze přibližné, protože oficiální terminologie pravděpodobně ještě neexistuje:
# | Původní termín | Přibližný český překlad | Stručný popis významu termínu |
---|---|---|---|
1 | message | zpráva | ucelená informace či data posílaná mezi producentem a konzumentem přes message brokera popř. větší množství message brokerů |
2 | record | záznam | alternativní pojmenování pro zprávu, používané především v některých streamovacích message brokerech, například v Apache Kafka |
3 | producer | producent | aplikace/proces, která vytváří zprávy a posílá je do message brokera |
4 | consumer | konzument | aplikace/proces, která zprávy z message brokera přijímá |
5 | consumer group | skupina konzumentů | skupina konzumentů přijímajících zprávy z jednoho společného tématu; používáno například v systému Apache Kafka |
6 | topic | téma | kategorie či značka, pod kterou je zpráva v brokeru uložena a publikována; používá se pro směrování a/nebo pro ukládání zpráv |
7 | subject | téma | alternativní označení pro topic; použito pouze v některých message brokerech |
8 | topic partition | rozdělení tématu na oddíly, které mohou být rozmístěny na různé brokery v clusteru | |
9 | replica | replika/duplikát | záložní kopie oddílu, typicky uložená na jiném brokeru |
10 | offset | ofset | unikátní identifikátor platný v rámci oddílu (popř. tématu), který umožňuje konzumentům specifikovat, kterou zprávu mají zpracovávat |
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce. Každý příklad se skládá ze dvou samostatně překládaných a spouštěných souborů – producenta zpráv a konzumenta zpráv:
19. Odkazy na předchozí části seriálu o message brokerech
V této kapitole jsou uvedeny odkazy na všech třináct předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií message brokerů:
- Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ - Celery: systém implementující asynchronní fronty úloh pro Python
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/ - Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/ - RabbitMQ: jedna z nejúspěšnějších implementací brokera
https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/ - Pokročilejší operace nabízené systémem RabbitMQ
https://www.root.cz/clanky/pokrocilejsi-operace-nabizene-systemem-rabbitmq/ - ØMQ: knihovna pro asynchronní předávání zpráv
https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/ - Další možnosti poskytované knihovnou ØMQ
https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/ - Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/ - Apache ActiveMQ – další systém implementující message brokera
https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/ - Použití Apache ActiveMQ s protokolem STOMP
https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/ - Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-amqp-jazyk-go-a-message-brokeri/ - Komunikace s message brokery z programovacího jazyka Go
https://www.root.cz/clanky/komunikace-s-message-brokery-z-programovaciho-jazyka-go/ - Použití message brokeru NATS
https://www.root.cz/clanky/pouziti-message-brokeru-nats/
20. Odkazy na Internetu
- NATS
https://nats.io/about/ - NATS Streaming Concepts
https://nats.io/documentation/streaming/nats-streaming-intro/ - NATS Streaming Server
https://nats.io/download/nats-io/nats-streaming-server/ - NATS Introduction
https://nats.io/documentation/ - NATS Client Protocol
https://nats.io/documentation/internals/nats-protocol/ - NATS Messaging (Wikipedia)
https://en.wikipedia.org/wiki/NATS_Messaging - Stránka Apache Software Foundation
http://www.apache.org/ - Informace o portu 5672
http://www.tcp-udp-ports.com/port-5672.htm - Třída MessagingHandler knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._handlers.MessagingHandler-class.html - Třída Event knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._events.Event-class.html - package stomp (Go)
https://godoc.org/github.com/go-stomp/stomp - Go language library for STOMP protocol
https://github.com/go-stomp/stomp - python-qpid-proton 0.26.0 na PyPi
https://pypi.org/project/python-qpid-proton/ - Qpid Proton
http://qpid.apache.org/proton/ - Using the AMQ Python Client
https://access.redhat.com/documentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/ - Apache ActiveMQ
http://activemq.apache.org/ - Apache ActiveMQ Artemis
https://activemq.apache.org/artemis/ - Apache ActiveMQ Artemis User Manual
https://activemq.apache.org/artemis/docs/latest/index.html - KahaDB
http://activemq.apache.org/kahadb.html - Understanding the KahaDB Message Store
https://access.redhat.com/documentation/en-US/Fuse_MQ_Enterprise/7.1/html/Configuring_Broker_Persistence/files/KahaDBOverview.html - Command Line Tools (Apache ActiveMQ)
https://activemq.apache.org/activemq-command-line-tools-reference.html - stomp.py 4.1.21 na PyPi
https://pypi.org/project/stomp.py/ - Stomp Tutorial
https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.5/html/Connectivity_Guide/files/FMBConnectivityStompTelnet.html - Heartbeat (computing)
https://en.wikipedia.org/wiki/Heartbeat_(computing) - Apache Camel
https://camel.apache.org/ - Red Hat Fuse
https://developers.redhat.com/products/fuse/overview/ - Confusion between ActiveMQ and ActiveMQ-Artemis?
https://serverfault.com/questions/873533/confusion-between-activemq-and-activemq-artemis - Staré stránky projektu HornetQ
http://hornetq.jboss.org/ - Snapshot JeroMQ verze 0.4.4
https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/ - Difference between ActiveMQ vs Apache ActiveMQ Artemis
http://activemq.2283324.n4.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html - Microservices communications. Why you should switch to message queues
https://dev.to/matteojoliveau/microservices-communications-why-you-should-switch-to-message-queues–48ia - Stomp.py 4.1.19 documentation
https://stomppy.readthedocs.io/en/stable/ - Repositář knihovny JeroMQ
https://github.com/zeromq/jeromq/ - ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queue – A thread-safe FIFO implementation
https://pymotw.com/2/Queue/ - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - How IPython and Jupyter Notebook work
https://jupyter.readthedocs.io/en/latest/architecture/how_jupyter_ipython_work.html - Context Managers
http://book.pythontips.com/en/latest/context_managers.html