Hlavní navigace

JetStream: nová technologie brokeru NATS konkurující Kafce

22. 9. 2022
Doba čtení: 34 minut

Sdílet

 Autor: NATS Project
Už jsme si řekli, že tento broker podporuje, podobně jako Kafka, takzvaný stream processing. Tento koncept byl nedávno vylepšen a představena byla technologie nazvaná JetStreams, která dokáže konkurovat Kafce.

Obsah

1. JetStream – nová technologie přidaná do brokeru NATS

2. Komunikační strategie nabízené nejnovějším NATS serverem

3. Instalace NATS serveru

4. Instalace CLI klienta a současně i vývojového serveru

5. Spuštění vývojového serveru

6. Nastavení kontextu pro CLI klienta

7. Vytvoření nového streamu

8. Publikace zprávy nebo většího množství zpráv z příkazové řádky

9. Přečtení zpráv či zprávy z vybraného streamu

10. Dočasní konzumenti zpráv vs. stálí konzumenti

11. Získání podrobnějších operací a vybraném streamu

12. Konzumenti explicitně žádající o zprávy

13. Manuální potvrzení zkonzumované zprávy konzumentem, další možnosti konzumentů

14. Kde jsou uloženy zprávy poslané do témat?

15. Komunikace s brokerem NATS z Pythonu

16. Připojení k NATS serveru z Pythonu

17. Producenti a konzumenti zpráv založení na komunikační strategii PUB-SUB

18. Producenti a konzumenti zpráv založení na využití stream processingu

19. Repositář s demonstračními příklady

20. Odkazy na Internetu

1. JetStream – nová technologie přidaná do brokeru NATS

Na stránkách Roota jsme se v trojici článků Komunikace s message brokery z programovacího jazyka Go, Použití message brokeru NATS a NATS Streaming Server seznámili se zajímavým a potenciálně i velmi užitečným message brokerem nazvaným NATS. Připomeňme si ve stručnosti, že se jedná o poměrně úspěšný projekt, jenž je vyvinut v programovacím jazyku Go, což mu do jisté míry zajišťuje stabilitu i škálovatelnost (a taktéž velmi snadnou instalaci i tvorbu obrazů pro kontejnery). Stabilita a škálovatelnost jsou ostatně vlastnosti, které u message brokerů většinou očekáváme.

Původní varianta NATSu byla vytvořena Derekem Collisonem; zajímavé je, že tato varianta nebyla naprogramována v jazyce Go, ale v programovacím jazyce Ruby. Dnes se ovšem budeme zabývat moderní (a jedinou podporovanou) verzí systému NATS, která byla přeportována do jazyka Go. Celý systém NATS se skládá z několika komponent:

  1. V první řadě se jedná o samotný server, jenž se spouští příkazem nats-server (což si ostatně za chvíli vyzkoušíme). Server je naprogramovaný v jazyce Go a při jeho vývoji bylo dbáno na to, aby byla zaručena vysoká dostupnost celé služby a přitom byla samotná služba s běžícím serverem málo náročná na systémové zdroje, především na spotřebu operační paměti (to má v době Dockeru a podobných nástrojů poměrně velký význam).
  2. Dalším typem komponenty jsou programátorská rozhraní pro klienty, která v současnosti existují pro několik ekosystémů (což je většinou kombinace programovacího jazyka, knihoven a popř. jeho virtuálního stroje); viz též tabulky s podporovanými ekosystémy, které jsou zobrazeny pod tímto odstavcem.
  3. Třetí komponentou, kvůli které ostatně vznikl tento článek, jeJetStream, což je technologie umožňující využití stream processingu zpopularizovaného brokerem Apache Kafka. V porovnání s Kafkou je NATS + JetStream „odlehčenou“ alternativou, která ovšem navíc podporuje i mnohé další komunikační strategie, takže má šanci se stát ústřední technologií v architekturách založených na mikroslužbách.
  4. Čtvrtým typem komponenty je takzvaný NATS Connector Framework zajišťující propojení systému NATS s dalšími technologiemi (XMPP, logování, notifikační služby aj.). Ten je naprogramovaný v Javě a v současnosti je podporován například konektor pro Redis (https://github.com/nats-io/nats-connector-redis). Touto komponentou se prozatím nebudeme zabývat.

Oficiálně v současnosti existují tato rozhraní pro broker NATS (bez JetStreamu):

1 Arduino NATS Arduino Client
2 Asio C++ NATS Asio C++ Client
3 Ballerina NATS Ballerina Client
4 C NATS C Client
5 .NET C# NATS .NET C# Client
6 Clojure NATS Clojure Client
7 Crystal NATS Crystal Client
8 Dart NATS Dart Client
9 Deno NATS Deno Client
10 Elixir NATS Elixir Client
11 Elm NATS Elm Client
12 Erlang NATS Erlang Client
13 Go NATS Go Client
14 Haskell NATS Haskell Client
15 Java NATS Java Client
16 Java Android NATS Java Android Client
17 LabVIEW NATS LabVIEW Client
18 Common Lisp NATS Common Lisp Client
19 Lua NATS Lua Client
20 MicroPython NATS MicroPython Client
21 NGINX NATS NGINX Client
22 Node.js NATS Node.js Client
23 Perl NATS Perl Client
24 PHP NATS PHP Client
25 Pure Ruby NATS Pure Ruby Client
26 Python NATS Python Client
27 Python Asyncio NATS Python Asyncio Client
28 Python Minimal NATS Python Minimal Client
29 Python Tornado NATS Python Tornado Client
30 Python Twisted NATS Python Twisted Client
31 C++/Qt/QML NATS C++/Qt/QML Client
32 Qt5 C++ NATS Qt5 C++ Client
33 Ruby NATS Ruby Client
34 Rust NATS Rust Client
35 Scala NATS Scala Client
36 Swift NATS Swift Client
37 Tcl NATS Tcl Client
38 WebSocket NATS WebSocket Client

Technologie JetStream je oficiálně podporována v menším množství ekosystémů, ovšem ty základní (C, Go, Java, Rust a Python) podporovány již jsou:

1 C NATS C Client
2 .NET C# NATS .NET C# Client
3 Deno NATS Deno Client
4 Go NATS Go Client
5 Java NATS Java Client
6 Node.js NATS Node.js Client
7 PHP NATS PHP Client
8 Pure Ruby NATS Pure Ruby Client
9 Python Asyncio NATS Python Asyncio Client
10 C++/Qt/QML NATS C++/Qt/QML Client
11 Rust NATS Rust Client
12 Tcl NATS Tcl Client
13 WebSocket NATS WebSocket Client

2. Komunikační strategie nabízené nejnovějším NATS serverem

Nejnovější NATS server s technologií JetStream nabízí uživatelům hned několik komunikačních strategií. Některé z těchto strategií jsme si již popsali v předchozích dvou článcích; nové strategie si postupně popíšeme. Pro větší přehlednost jsou všechny podporované komunikační strategie vizualizovány na osmici diagramů:

Obrázek 1: Komunikační strategie nazvaná PUB-SUB, v níž spolu komunikují dva typy uzlů: zdroje zpráv (publishers) a konzumenti zpráv (subscribers). Konzumenti se mohou k odebírání zpráv kdykoli přihlásit a kdykoli se také odpojit, což zdroj zpráv nijak neovlivní – ten bude posílat zprávy nezávisle na tom, kdo je přijímá.

Obrázek 2: Počet producentů není nijak omezen, takže se na strategii PUB-SUB můžeme dívat i jako na klasický message-bus.

Obrázek 3: Strategie PUSH-PULL je založena na použití fronty. Producent pošle zprávu message brokerovi, který ji uloží do fronty (každá fronta je pojmenovaná, resp. přesněji řečeno každé téma je reprezentované frontou), konzumenti se mohou k frontě (frontám) připojit a zprávy přečíst.

Obrázek 4: U strategie PUSH-PULL je možné zvolit, že všichni konzumenti dostanou stejné zprávy (a ideálně i ve stejném pořadí).

Obrázek 5: V tomto případě je konfigurace odlišná: zpráva se doručí jen jedinému konzumentovi. Pokud se k frontě připojí větší množství konzumentů, je typicky použit algoritmus round-robin pro (polo)spravedlivé rozložení zátěže.

Obrázek 6: Další velmi často používaná komunikační strategie se nazývá Request-Response nebo zkráceně REQ-REP popř. pouze REQREP. Při použití této strategie spolu komunikují dva typy uzlů – server a teoreticky neomezené množství klientů. Server přijímá požadavky (request) a odpovídá na ně (response), přičemž je možné, aby požadavky posílalo několik klientů (a jeden klient naopak může v případě potřeby posílat požadavky více serverům).

Obrázek 7: Nově je díky technologii JetStream podporován i streaming, jenž byl zpopularizován především Kafkou. JetStream podporuje dva typy připojení konzumentů. Zde je naznačeno připojení typu PUSH-PUSH, kdy jsou zprávy posílány konzumentovi serverem. Konzument tedy o zprávy nemusí explicitně žádat, pouze se stačí připojit k danému tématu. Indexy naposledy přijatých zpráv jsou (resp. mohou být) uloženy na serveru.

Obrázek 8: Alternativou k předchozí strategii je koncept PUSH-PULL, v němž jednotliví konzumenti explicitně žádají o zprávy server, a to ve chvíli, kdy jsou připraveni na příjem zprávy. Stále se však jedná o klasický streaming, tedy existuje zde možnost „přehrání“ starších zpráv.

3. Instalace NATS serveru

Lokální instalace NATS serveru je skutečně triviální, protože se jedná o jediný binární (přímo spustitelný) soubor, který navíc nevyžaduje ani žádnou konfiguraci (samozřejmě že je možné server v případě potřeby nakonfigurovat, ovšem bude funkční i bez konfigurace).

Nejprve je nutné ze stránky https://github.com/nats-io/nats-server/releases/tag/v2.9.0 stáhnout archiv s NATS serverem. Vzhledem k tomu, že se jedná o nativní aplikaci, musíme vybrat archiv pro danou platformu (v mém případě amd64/x86–64):

$ wget https://github.com/nats-io/nats-server/releases/download/v2.9.0/nats-server-v2.9.0-linux-amd64.tar.gz
 
--2022-09-18 17:47:57--  https://github.com/nats-io/nats-server/releases/download/v2.9.0/nats-server-v2.9.0-linux-amd64.tar.gz
...
...
...
2022-09-18 17:47:59 (3.78 MB/s) - ‘nats-server-v2.9.0-linux-amd64.tar.gz’ saved [4898804/4898804]
Poznámka: povšimněte si, že velikost archivu není na dnešní poměry nijak velká.

Dále právě stažený archiv běžným způsobem rozbalíme:

$ tar xvfz nats-server-v2.9.0-linux-amd64.tar.gz
 
nats-server-v2.9.0-linux-amd64/LICENSE
nats-server-v2.9.0-linux-amd64/README.md
nats-server-v2.9.0-linux-amd64/nats-server

Přesuneme se do vytvořeného adresáře:

$ cd nats-server-v2.9.0-linux-amd64

A prozkoumáme jeho obsah:

$ ls -la
 
total 12044
drwxrwxr-x. 2 ptisnovs ptisnovs      100 Sep 18 17:48 .
drwxrwxrwt. 8 root     root          180 Sep 18 17:48 ..
-rw-rw-r--. 1 ptisnovs ptisnovs    11357 Sep  9 16:49 LICENSE
-rwxrwxr-x. 1 ptisnovs ptisnovs 12315116 Sep  9 16:51 nats-server
-rw-rw-r--. 1 ptisnovs ptisnovs     3622 Sep  9 16:49 README.md
Poznámka: opět platí, že velikost binárního souboru není tak ohromující, zvláště když si uvědomíme, že je použito statické linkování (a tedy tento binární soubor má jen minimální závislosti).

Pro jistotu se přesvědčíme, že server lze spustit:

$ ./nats-server --version
 
nats-server: v2.9.0

A spustíme ho (což je operace, která je prakticky okamžitá):

$ ./nats-server 
 
[13644] 2022/09/18 19:05:37.697707 [INF] Starting nats-server
[13644] 2022/09/18 19:05:37.697822 [INF]   Version:  2.9.0
[13644] 2022/09/18 19:05:37.697825 [INF]   Git:      [517d9b3]
[13644] 2022/09/18 19:05:37.697832 [INF]   Name:     NCVERYFIQFOUACDK6AF7QR7VXMPBQFSAWXJFI2M3PAPYU3DRRWGCKISG
[13644] 2022/09/18 19:05:37.697834 [INF]   ID:       NCVERYFIQFOUACDK6AF7QR7VXMPBQFSAWXJFI2M3PAPYU3DRRWGCKISG
[13644] 2022/09/18 19:05:37.698641 [INF] Listening for client connections on 0.0.0.0:4222
[13644] 2022/09/18 19:05:37.698979 [INF] Server is ready

4. Instalace CLI klienta a současně i vývojového serveru

Pro komunikaci s NATS serverem dnes použijeme CLI klienta (tedy aplikaci ovládanou z příkazového řádku), pomocí níž budeme vytvářet témata a streamy, produkovat zprávy, přijímat zprávy a současně i sledovat stav serveru. CLI klient je opět nativní aplikací, kterou lze získat z repositáře dostupného na adrese https://github.com/nats-io/natscli.

Podobně jako v případě serveru, i nyní stáhneme archiv s aplikací určený pro námi používanou platformu:

$ wget https://github.com/nats-io/natscli/releases/download/v0.0.34/nats-0.0.34-linux-amd64.zip
...
...
...
2022-09-18 17:55:30 (4.53 MB/s) - ‘nats-0.0.34-linux-amd64.zip’ saved [7827956/7827956]

Archiv rozbalíme:

$ unzip nats-0.0.34-linux-amd64.zip
 
Archive:  nats-0.0.34-linux-amd64.zip
  inflating: nats-0.0.34-linux-amd64/LICENSE
  inflating: nats-0.0.34-linux-amd64/README.md
  inflating: nats-0.0.34-linux-amd64/nats

Přejdeme do adresáře s rozbaleným klientem:

$ cd nats-0.0.34-linux-amd64

A podíváme se, jaké soubory se rozbalily:

$ ls -la
 
total 19972
drwxrwxr-x.  2 ptisnovs ptisnovs      100 Sep 18 17:55 .
drwxrwxrwt. 10 root     root          220 Sep 18 17:55 ..
-rw-r--r--.  1 ptisnovs ptisnovs    11357 Apr  2 20:40 LICENSE
-rwxr-xr-x.  1 ptisnovs ptisnovs 20414464 Sep  9 17:09 nats
-rw-r--r--.  1 ptisnovs ptisnovs    24502 Aug 19 15:04 README.md
Poznámka: binární soubor s klientem je poněkud paradoxně větší, než samotný server. Je tomu tak z toho důvodu, že klient obsahuje i server, což si ostatně vyzkoušíme v dalších příkladech.

5. Spuštění vývojového serveru

Spuštění právě nainstalovaného NATS serveru ve vývojovém režimu je triviální. Pouze postačuje z adresáře, do kterého byl CLI klient rozbalen, spustit příkaz:

$ ./nats server run
 
Starting local development NATS Server instance: nats_development
 
        User Credentials: User: local   Password: WTJUCsUpDZIASz89c0I1F5HSeDWjmzMz Context: nats_development
     Service Credentials: User: service Password: HURHd7OEFuijnAwH9IwacQGhMyaK2FJs Context: nats_development_service
      System Credentials: User: system  Password: q0YT97I9TXpyUgz2bLSoTwenh5TPZF0t Context: nats_development_system
  Extending Demo Network: false
   Extending Remote NATS: false
                     URL: nats://0.0.0.0:44853
           Clean on Exit: false
 
 
NOTE: This is not a supported way to run a production NATS Server, view documentation
      at https://docs.nats.io/running-a-nats-service/introduction for production use.
 
[9305] [INF] Starting nats-server
[9305] [INF]   Version:  2.9.0
[9305] [INF]   Git:      [not set]
[9305] [INF]   Name:     nats_development
[9305] [INF]   ID:       NBQMRLOETYHYXZJKGBKPXOXGPSWH6IOCVXYAGJYPZ5AL4VSO2QAAIR57
[9305] [INF] Using configuration file: /tmp/ramdisk/nats-server-run-2951373316.cfg
[9305] [INF] Listening for client connections on 0.0.0.0:44853
[9305] [INF] Server is ready
Poznámka: samotné spuštění je velmi rychlé, z pohledu uživatele prakticky okamžité.

V dalším textu budeme využívat technologii JetStream. Tu je nutné explicitně povolit:

$ ./nats server run --jetstream
 
Starting local development NATS Server instance: nats_development
 
        User Credentials: User: local   Password: WTJUCsUpDZIASz89c0I1F5HSeDWjmzMz Context: nats_development
     Service Credentials: User: service Password: HURHd7OEFuijnAwH9IwacQGhMyaK2FJs Context: nats_development_service
      System Credentials: User: system  Password: q0YT97I9TXpyUgz2bLSoTwenh5TPZF0t Context: nats_development_system
  Extending Demo Network: false
   Extending Remote NATS: false
                     URL: nats://0.0.0.0:44853
           Clean on Exit: false
 
 
NOTE: This is not a supported way to run a production NATS Server, view documentation
      at https://docs.nats.io/running-a-nats-service/introduction for production use.
 
[9332] [INF] Starting nats-server
[9332] [INF]   Version:  2.9.0
[9332] [INF]   Git:      [not set]
[9332] [INF]   Name:     nats_development
[9332] [INF]   Node:     HhEiCaYr
[9332] [INF]   ID:       NBVG2F5TWP5N6UY3DVDYT7U6G7JD5CNBT7RX5X4QEZYE4X32QT6CENOZ
[9332] [INF] Using configuration file: /tmp/ramdisk/nats-server-run-3289015964.cfg
[9332] [INF] Starting JetStream
[9332] [INF]     _ ___ _____ ___ _____ ___ ___   _   __  __
[9332] [INF]  _ | | __|_   _/ __|_   _| _ \ __| /_\ |  \/  |
[9332] [INF] | || | _|  | | \__ \ | | |   / _| / _ \| |\/| |
[9332] [INF]  \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_|  |_|
[9332] [INF]
[9332] [INF]          https://docs.nats.io/jetstream
[9332] [INF]
[9332] [INF] ---------------- JETSTREAM ----------------
[9332] [INF]   Max Memory:      11.33 GB
[9332] [INF]   Max Storage:     60.84 GB
[9332] [INF]   Store Directory: "/home/ptisnovs/.local/share/nats/nats_development/jetstream"
[9332] [INF] -------------------------------------------
[9332] [INF] Listening for client connections on 0.0.0.0:44853
[9332] [INF] Server is ready
Poznámka: jak nás samotný server správně upozorňuje, jedná se o variantu serveru určeného jen pro vývojové prostředí. Jak spustit server v prostředí produkčním si řekneme příště. Pro začátek postačuje přejít do adresáře se serverem (nats-server-v2.9.0-linux-amd64) a odtud spustit příkaz:
$ ./nats-server --jetstream

Povšimněte si, že v tomto případě se již nevypíše žádné varování, ovšem ani žádná hesla atd.:

[32528] 2022/09/21 08:15:29.778802 [INF] Starting nats-server
[32528] 2022/09/21 08:15:29.778849 [INF]   Version:  2.9.0
[32528] 2022/09/21 08:15:29.778852 [INF]   Git:      [517d9b3]
[32528] 2022/09/21 08:15:29.778856 [INF]   Name:     ND4O2ZQS6XGKN5J4ORYAML2TSG2HTLAPKEHQWTCJMBS5UFXIZRRC2HIE
[32528] 2022/09/21 08:15:29.778860 [INF]   Node:     NmzhvZB9
[32528] 2022/09/21 08:15:29.778862 [INF]   ID:       ND4O2ZQS6XGKN5J4ORYAML2TSG2HTLAPKEHQWTCJMBS5UFXIZRRC2HIE
[32528] 2022/09/21 08:15:29.779350 [INF] Starting JetStream
[32528] 2022/09/21 08:15:29.779614 [INF]     _ ___ _____ ___ _____ ___ ___   _   __  __
[32528] 2022/09/21 08:15:29.779621 [INF]  _ | | __|_   _/ __|_   _| _ \ __| /_\ |  \/  |
[32528] 2022/09/21 08:15:29.779623 [INF] | || | _|  | | \__ \ | | |   / _| / _ \| |\/| |
[32528] 2022/09/21 08:15:29.779625 [INF]  \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_|  |_|
[32528] 2022/09/21 08:15:29.779628 [INF]
[32528] 2022/09/21 08:15:29.779630 [INF]          https://docs.nats.io/jetstream
[32528] 2022/09/21 08:15:29.779632 [INF]
[32528] 2022/09/21 08:15:29.779634 [INF] ---------------- JETSTREAM ----------------
[32528] 2022/09/21 08:15:29.779638 [INF]   Max Memory:      11.33 GB
[32528] 2022/09/21 08:15:29.779642 [INF]   Max Storage:     725.26 MB
[32528] 2022/09/21 08:15:29.779644 [INF]   Store Directory: "/tmp/ramdisk/nats/jetstream"
[32528] 2022/09/21 08:15:29.779647 [INF] -------------------------------------------
[32528] 2022/09/21 08:15:29.780203 [INF] Listening for client connections on 0.0.0.0:4222
[32528] 2022/09/21 08:15:29.780593 [INF] Server is ready

6. Nastavení kontextu pro CLI klienta

Z terminálu, z něhož budeme spouštět producenty a konzumenty zpráv, popř. vytvářet streamy atd., je nutné specifikovat kontext, což současně vede k nastavení hesla. Při spuštění serveru byly vytvořeny tři kontexty nazvané nats_development, nats_development_service a nats_development_system. Použijeme první z těchto kontextů:

$ ./nats context select nats_development
 
NATS Configuration Context "nats_development"
 
      Description: Local user access for NATS Development instance
      Server URLs: nats://0.0.0.0:44853
         Username: local
         Password: *********
             Path: /home/ptisnovs/.config/nats/context/nats_development.json
Poznámka: od této chvíle budeme všechny dále uvedené příkazy spouštět v rámci tohoto terminálu.

7. Vytvoření nového streamu

V této kapitole si ukážeme, jak lze z příkazové řádky vytvořit nový stream. Nejprve se přesvědčme o tom, že žádný stream v daném okamžiku neexistuje:

$ ./nats stream ls
 
No Streams defined

Stav streamů můžeme sledovat neustále, například tímto příkazem:

$ watch ./nats stream ls

Stream se vytvoří příkazem stream add, kterému můžeme, ale také nemusíme předat další parametry:

$ ./nats stream add

V případě, že žádné další parametry nezadáme, bude se na ně klient postupně dotazovat – což je podle mého názoru zcela nejlepší chování aplikace ovládané z příkazového řádku (mnohem lepší, než se učit či opisovat desítku přepínačů, které se použijí jednou za rok). Nejdůležitějšími parametry je jméno streamu (použijeme dále) a taktéž téma (subject), které se obecně skládá ze slov oddělených tečkami. Namísto slova lze použít hvězdičku (jedná se o glob, nikoli o regulární výraz!):

? Stream Name foo
? Subjects foo.*
? Storage memory
? Replication 1
? Retention Policy Limits
? Discard Policy Old
? Stream Messages Limit -1
? Per Subject Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 2m0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream (Y/n)
Poznámka: další parametry si podrobněji popíšeme příště.

Stream byl vytvořen s následujícími vlastnostmi:

Stream foo was created
 
Information for Stream foo created 2022-09-18 18:04:48
 
             Subjects: foo.*
             Replicas: 1
              Storage: Memory
 
Options:
 
            Retention: Limits
     Acknowledgements: true
       Discard Policy: Old
     Duplicate Window: 2m0s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false
 
Limits:
 
     Maximum Messages: unlimited
  Maximum Per Subject: unlimited
        Maximum Bytes: unlimited
          Maximum Age: unlimited
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited
 
 
State:
 
             Messages: 0
                Bytes: 0 B
             FirstSeq: 0
              LastSeq: 0
     Active Consumers: 0

Příkazem stream ls se můžeme přesvědčit o tom, zda byl stream skutečně vytvořen:

$ ./nats stream ls
 
╭───────────────────────────────────────────────────────────────────────────╮
│                                  Streams                                  │
├──────┬─────────────┬─────────────────────┬──────────┬──────┬──────────────┤
│ Name │ Description │ Created             │ Messages │ Size │ Last Message │
├──────┼─────────────┼─────────────────────┼──────────┼──────┼──────────────┤
│ foo  │             │ 2022-09-18 18:04:48 │ 0        │ 0 B  │ never        │
╰──────┴─────────────┴─────────────────────┴──────────┴──────┴──────────────╯

8. Publikace zprávy nebo většího množství zpráv z příkazové řádky

Publikace zprávy z příkazové řádky je velmi jednoduchou operací. Použijeme příkaz pub, za nímž zadáme jméno tématu „foo.x“ (tedy konkrétní jméno, nikoli s hvězdičkou) i vlastní zprávu, která může být v nejjednodušším případě pouze řetězcem:

$ ./nats pub foo.x "test"
 
18:06:11 Published 4 bytes to "foo.x"

Podívejme se nyní, jestli se zpráva skutečně uložila do příslušného streamu:

$ ./nats stream ls
 
╭───────────────────────────────────────────────────────────────────────────╮
│                                  Streams                                  │
├──────┬─────────────┬─────────────────────┬──────────┬──────┬──────────────┤
│ Name │ Description │ Created             │ Messages │ Size │ Last Message │
├──────┼─────────────┼─────────────────────┼──────────┼──────┼──────────────┤
│ foo  │             │ 2022-09-18 18:04:48 │ 1        │ 25 B │ 18.87s       │
╰──────┴─────────────┴─────────────────────┴──────────┴──────┴──────────────╯

NATS nám dokonce umožňuje publikovat větší množství zpráv, přičemž každá z těchto zpráv může mít odlišný obsah. Povšimněte si, že tělo zprávy je vlastně šablonou:

$ ./nats pub foo.x "Message #{{.Count}}" --count 100
 
100 / 100 [======================================================]    0s

Opět se podívejme na obsah streamů:

$ ./nats stream ls
 
╭──────────────────────────────────────────────────────────────────────────────╮
│                                   Streams                                    │
├──────┬─────────────┬─────────────────────┬──────────┬─────────┬──────────────┤
│ Name │ Description │ Created             │ Messages │ Size    │ Last Message │
├──────┼─────────────┼─────────────────────┼──────────┼─────────┼──────────────┤
│ foo  │             │ 2022-09-18 18:04:48 │ 101      │ 3.1 KiB │ 32.54s       │
╰──────┴─────────────┴─────────────────────┴──────────┴─────────┴──────────────╯

A nakonec přidejme do streamu dalších sto zpráv, tentokrát ovšem s tématem „foo.y“ a nikoli „foo.x“:

$ ./nats pub foo.y "Message #{{.Count}}" --count 100
 
100 / 100 [======================================================]    0s
 
$ ./nats stream ls
 
╭──────────────────────────────────────────────────────────────────────────────╮
│                                   Streams                                    │
├──────┬─────────────┬─────────────────────┬──────────┬─────────┬──────────────┤
│ Name │ Description │ Created             │ Messages │ Size    │ Last Message │
├──────┼─────────────┼─────────────────────┼──────────┼─────────┼──────────────┤
│ foo  │             │ 2022-09-18 18:04:48 │ 201      │ 6.3 KiB │ 1.61s        │
╰──────┴─────────────┴─────────────────────┴──────────┴─────────┴──────────────╯

9. Přečtení zpráv či zprávy z vybraného streamu

Konzument zpráv se spouští příkazem sub, kterému navíc musíme předat buď jméno streamu nebo jméno tématu. Zkusme nejprve přečíst všechny zprávy ze streamu:

$ ./nats sub --stream foo
 
[#1] Received JetStream message: stream: foo seq 1 / subject: foo.x / time: 2022-09-18T18:06:11+02:00
test
 
[#2] Received JetStream message: stream: foo seq 2 / subject: foo.x / time: 2022-09-18T18:07:08+02:00
Message #1
 
[#3] Received JetStream message: stream: foo seq 3 / subject: foo.x / time: 2022-09-18T18:07:08+02:00
Message #2
 
[#4] Received JetStream message: stream: foo seq 4 / subject: foo.x / time: 2022-09-18T18:07:08+02:00
Message #3
 
...
...
...
 
[#199] Received JetStream message: stream: foo seq 199 / subject: foo.y / time: 2022-09-18T18:08:00+02:00
Message #98
 
[#200] Received JetStream message: stream: foo seq 200 / subject: foo.y / time: 2022-09-18T18:08:00+02:00
Message #99
 
[#201] Received JetStream message: stream: foo seq 201 / subject: foo.y / time: 2022-09-18T18:08:00+02:00
Message #100
Poznámka: přečetlo se tedy celkem 201 zpráv, což zcela odpovídá stavu streamu (viz též předchozí kapitolu).

Přečíst všechny zprávy (bez ohledu na aktuální hodnotu offsetu) můžeme tímto příkazem:

$ ./nats sub --stream foo --all

Naopak čekání pouze na nové zprávy zajistí příkaz:

$ ./nats sub --stream foo --new
 
18:11:24 Subscribing to JetStream Stream holding messages with subject foo.* delivering any new messages received

Velmi užitečná je možnost přečíst poslední zprávu ve streamu, samozřejmě za předpokladu, že stream není prázdný:

$ ./nats sub --stream foo --last
 
18:11:47 Subscribing to JetStream Stream holding messages with subject foo.* starting with the last message received
[#1] Received JetStream message: stream: foo seq 201 / subject: foo.y / time: 2022-09-18T18:08:00+02:00
Message #100

Zadat lze i explicitní offset zprávy, od které se má provést „replay“:

$ ./nats sub --stream foo --start-sequence=100

Popř. je možné uvést časové razítko zprávy, od které se má začít:

$ ./nats sub --stream foo --since DURATION

10. Dočasní konzumenti zpráv vs. stálí konzumenti

Server NATS rozlišuje mezi dočasnými konzumenty a stálými konzumenty (durable consumers). Pro dočasné konzumenty není nutné si pamatovat index poslední přečtené a/nebo potvrzené zprávy – tuto činnost server plně ponechává na samotných konzumentech zda a jak si uchovají informaci o tom, o které zprávy budou mít zájem (zde se tedy využívají přepínače uvedené v předchozí kapitole). Naproti tomu se pro stálé konzumenty pamatuje index poslední přečtené a/nebo potvrzené zprávy, a to přímo na NATS serveru (není tedy nutné spouštět a udržovat instanci další služby podobné ZooKeeperu).

Stálý konzument se vytvoří jednoduše – uvedením přepínače –durable, za nímž následuje jméno tématu a konečně jméno stálého konzumenta. Po zadání následujícího příkazu se konzument automaticky spustí:

$ ./nats sub --stream foo --durable foo_consumer1
 
[#1] Received JetStream message: stream: foo seq 1 / subject: foo.x / time: 2022-09-19T10:48:08+02:00
Message #1
 
[#2] Received JetStream message: stream: foo seq 2 / subject: foo.x / time: 2022-09-19T10:48:08+02:00
Message #2
 
[#3] Received JetStream message: stream: foo seq 3 / subject: foo.x / time: 2022-09-19T10:48:08+02:00
Message #3
 
[#99] Received JetStream message: stream: foo seq 99 / subject: foo.x / time: 2022-09-19T10:48:08+02:00
Message #99
 
[#100] Received JetStream message: stream: foo seq 100 / subject: foo.x / time: 2022-09-19T10:48:08+02:00
Message #100

Konzumenta zastavíme klávesou Ctrl+C.

Po svém dalším spuštění začne konzument nazvaný „consumer1“ číst jen prozatím nezpracované zprávy – server si tedy zapamatoval index poslední přečtené zprávy:

$ ./nats sub --stream foo --durable foo_consumer_1
...
...
...

Vzhledem k tomu, že stálý konzument je pojmenovaný, lze o něm snadno získat další informace:

$ ./nats consumer info foo foo_consumer_1
 
Information for Consumer foo > foo_consumer_1 created 2022-09-19T10:47:22+02:00
 
Configuration:
 
        Durable Name: foo_consumer_1
    Delivery Subject: _INBOX.1AZwC1SLqhNK2OzZ8em644
      Deliver Policy: All
          Ack Policy: None
       Replay Policy: Instant
      Idle Heartbeat: 5.00s
        Flow Control: true
 
State:
 
   Last Delivered Message: Consumer sequence: 100 Stream sequence: 100 Last delivery: 8m5s ago
     Unprocessed Messages: 10
          Active Interest: No interest
Poznámka: povšimněte si především informace o poslední doručené zprávě. Jedná se o prakticky jediný stav konzumenta, který si musí server dobře zapamatovat.

11. Získání podrobnějších operací a vybraném streamu

O libovolném streamu lze získat podrobnější informace zadáním příkazu stream info, kterému se předá jméno příslušného streamu. Podívejme se na jednoduchý příklad – získáme informaci o streamu „foo“:

$ ./nats stream info foo
 
Information for Stream foo created 2022-09-19 10:46:29
 
             Subjects: foo.*
             Replicas: 1
              Storage: Memory
 
Options:
 
            Retention: Limits
     Acknowledgements: true
       Discard Policy: Old
     Duplicate Window: 2m0s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false
 
Limits:
 
     Maximum Messages: unlimited
  Maximum Per Subject: unlimited
        Maximum Bytes: unlimited
          Maximum Age: unlimited
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited
 
 
State:
 
             Messages: 100
                Bytes: 3.1 KiB
             FirstSeq: 1 @ 2022-09-19T08:48:08 UTC
              LastSeq: 100 @ 2022-09-19T08:48:08 UTC
     Active Consumers: 1
   Number of Subjects: 1
Poznámka: povšimněte si, že některé z těchto informací byly vypsány již při vytváření streamu.

12. Konzumenti explicitně žádající o zprávy

Ve druhé kapitole jsme se zmínili o tom, že streaming poskytovaný technologií JetStream nabízí možnost využití dvou komunikačních strategií – PUSH-PUSH (zprávy konzumentovi posílá server) nebo PUSH-PULL (zprávy si explicitně vyzvedává klient). První z těchto strategií je implicitní, druhou je nutné zvolit při konfiguraci nového konzumenta, a to nevyplněním položky Delivery target:

$ ./nats consumer create
 
? Consumer name foo_consumer_2
? Delivery target (empty for Pull Consumers) 
? Start policy (all, new, last, subject, 1h, msg sequence) all
? Acknowledgement policy explicit
? Replay policy instant
? Filter Stream by subject (blank for all)
? Maximum Allowed Deliveries -1
? Maximum Acknowledgements Pending 0
? Deliver headers only without bodies No
? Add a Retry Backoff Policy No
? Select a Stream foo
Information for Consumer foo > foo_consumer_2 created 2022-09-19T10:54:51+02:00
 
Configuration:
 
        Durable Name: foo_consumer_2
           Pull Mode: true
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
     Max Ack Pending: 1,000
   Max Waiting Pulls: 512
 
State:
 
   Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
     Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
         Outstanding Acks: 0 out of maximum 1,000
     Redelivered Messages: 0
     Unprocessed Messages: 100
            Waiting Pulls: 0 of maximum 512

Jaká komunikační strategie je pro zvoleného konzumenta nakonfigurována, se dozvíme i při zadání nám již známého příkazu consumer info (viz zvýrazněný řádek):

$ ./nats consumer info foo foo_consumer_2
 
Information for Consumer foo > foo_consumer_2 created 2022-09-19T10:54:51+02:00
 
Configuration:
 
        Durable Name: foo_consumer_2
           Pull Mode: true
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
     Max Ack Pending: 1,000
   Max Waiting Pulls: 512
 
State:
 
   Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
     Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
         Outstanding Acks: 0 out of maximum 1,000
     Redelivered Messages: 0
     Unprocessed Messages: 100
            Waiting Pulls: 0 of maximum 512

13. Manuální potvrzení zkonzumované zprávy konzumentem, další možnosti konzumentů

Konzumenti zpráv musí nějakým způsobem potvrdit, že zprávu získali popř. zpracovali. Implicitně je nastaveno automatické potvrzení doručení zprávy, ovšem to nemusí být v mnoha případech ideální (samotné doručení zprávy ještě nezaručuje její zpracování). Mnohdy totiž potřebujeme potvrdit až zpracování zprávy, nejenom její přijetí. Jedná se o konfiguraci, která může být pro každého konzumenta nastavena odlišně.

Potvrzování zpráv funguje dobře i pro konzumenty spouštěné z příkazového řádku:

$ ./nats consumer next foo foo_consumer_2 --count=1
 
[10:58:11] subj: foo.x / tries: 1 / cons seq: 1 / str seq: 1 / pending: 99
 
Message #1
 
Acknowledged message
Poznámka: manuální potvrzování zpráv si ukážeme později u klientů naprogramovaných v Pythonu a taktéž v Go.

Příkaz nats consumer ovšem uživatelům nabízí i další volby, například:

usage: nats consumer [<flags>] <command> [<args> ...]
 
JetStream Consumer management
 
Subcommands:
  consumer ls       List known Consumers
  consumer report   Reports on Consumer statistics
  consumer info     Consumer information
  consumer add      Creates a new Consumer
  consumer edit     Edits the configuration of a consumer
  consumer rm       Removes a Consumer
  consumer copy     Creates a new Consumer based on the configuration of another
  consumer next     Retrieves messages from Pull Consumers without interactive
                    prompts
  consumer sub      Retrieves messages from Consumers
  consumer cluster  Manages a clustered Consumer
 
Flags:
  -a, --all  Operate on all streams including system ones

Příklad tabulky získané příkazem nats consumer report:

╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                      Consumer report for bar with 1 consumers                                       │
├────────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬───────────────────┤
│ Consumer   │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster           │
├────────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼───────────────────┤
│ consumer-a │ Pull │ Explicit   │ 30.00s   │ 0           │ 0           │ 0           │ 0         │ nats_development* │
╰────────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴───────────────────╯

14. Kde jsou uloženy zprávy poslané do témat?

Podívejme se nyní, kde jsou vlastně uloženy zprávy, jenž jsou posílané do jednotlivých témat. Zprávy jsou na základě uvedeného tématu rozděleny do streamů (pro jeden stream může existovat větší množství témat popř. neomezené množství témat) a pro každý stream je vytvořen zvláštní podadresář v adresáři zvoleném při spuštění nebo konfiguraci serveru. Informace o tomto adresáři se vypisuje během spuštění serveru:

[31151] [INF]   Store Directory: "/home/ptisnovs/.local/share/nats/nats_development/jetstream"
[31151] [INF] -------------------------------------------
[31151] [INF]   Starting restore for stream 'USER > bar'
[31151] [INF]   Restored 20 messages for stream 'USER > bar'
[31151] [INF]   Starting restore for stream 'USER > foo'
[31151] [INF]   Restored 104 messages for stream 'USER > foo'
[31151] [INF]   Recovering 1 consumers for stream - 'USER > bar'
Poznámka: toto umístění je samozřejmě plně konfigurovatelné a navíc záleží na tom, kolik replik streamu jsme vyžadovali při jeho konstrukci.

Celý strom s uloženými streamy vypadá takto:

└── USER
    └── streams
        ├── bar
        │   ├── meta.inf
        │   ├── meta.sum
        │   ├── msgs
        │   │   ├── 1.blk
        │   │   └── 1.idx
        │   └── obs
        ├── foo
        │   ├── meta.inf
        │   ├── meta.sum
        │   ├── msgs
        │   │   ├── 1.blk
        │   │   └── 1.idx
        │   └── obs
        ├── user-add-operation
        │   ├── meta.inf
        │   ├── meta.sum
        │   ├── msgs
        │   │   ├── 1.blk
        │   │   ├── 1.fss
        │   │   └── 1.idx
        │   └── obs
        └── user-delete-operation
            ├── meta.inf
            ├── meta.sum
            ├── msgs
            │   ├── 1.blk
            │   ├── 1.fss
            │   └── 1.idx
            └── obs
Poznámka: důležité jsou zejména soubory .idx s indexy a soubory .blk s bloky zpráv.

15. Komunikace s brokerem NATS z Pythonu

V rámci navazujících kapitol si ukážeme, jakým způsobem lze realizovat komunikaci s NATS z Pythonu. V úvodních dvou článcích jsme se zaměřili na jazyk Go, ovšem jak uvidíme dále, je realizace producentů i konzumentů zpráv v Pythonu ještě nepatrně jednodušší, zejména v případě, že využijeme možnosti standardní knihovny asyncio a nových klíčových slov async a await, které byly do Pythonu přidány právě pro podporu práce s korutinami a obecně asynchronně běžícími úlohami.

Nejprve musíme nainstalovat knihovnu, v níž je realizováno rozhraní (connector) k NATSu. Instalace této knihovny je snadná, neboť je dostupná přímo na PyPi:

$ pip3 install nats-py
 
Defaulting to user installation because normal site-packages is not writeable
Collecting nats-py
  Downloading nats-py-2.1.7.tar.gz (45 kB)
     |████████████████████████████████| 45 kB 952 kB/s
  Preparing metadata (setup.py) ... done
Building wheels for collected packages: nats-py
  Building wheel for nats-py (setup.py) ... done
  Created wheel for nats-py: filename=nats_py-2.1.7-py3-none-any.whl size=55717 sha256=beedaa208a24cc31490abf3966285f43cf7566967c5cb5be26999133e0f42f28
  Stored in directory: /home/ptisnovs/.cache/pip/wheels/b3/c2/1d/cd05fddafb2d9960a8a748a3cfd4ab1b997cac79bd059a7ba3
Successfully built nats-py
Installing collected packages: nats-py
Successfully installed nats-py-2.1.7

Krátké otestování, zda je knihovna skutečně nainstalována:

$ python3
 
Python 3.8.10 (default, Jun 22 2022, 20:18:18)
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import nats
>>> help("nats")
 
Help on package nats:
 
NAME
    nats
 
DESCRIPTION
    # Copyright 2016-2021 The NATS Authors
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    # http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #
    ...
    ...
    ...

16. Připojení k NATS serveru z Pythonu

Podívejme se nyní na způsob realizace připojení k NATS serveru ze skriptu, který je naprogramován v Pythonu. Tento skript používá výše uvedený balíček nats-py, jenž je založen na standardní knihovně asyncio. Samotné navázání spojení je realizováno korutinou (nikoli tedy standardní funkcí) nazvanou connect. Této korutině je nutné předat jak adresu počítače, na kterém NATS běží, ale i příslušný port, jméno uživatele a jeho heslo (poslední dvě hodnoty jsou vypsány samotným serverem při jeho spuštění, odkud je můžeme snadno zkopírovat). Pokud je spojení navázáno (tedy v případě, že nedojde k vyhození výjimky), lze spojení ukončit metodou/korutinou nazvanou natsConnection.close(). Vzhledem k tomu, že se volají asynchronně běžící korutiny, musí být samotná funkce main realizována taktéž korutinou a zavolána přes asyncio.run(main()):

import asyncio
import nats
 
URL = "192.168.1.34:44853"
NATS_USERNAME = "local"
NATS_PASSWORD = "--password--"
 
 
async def main():
    print(f"Connecting to NATS at address {URL}")
    nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD)
    print(f"Connected...{nats_connection}")
 
    print("Closing connection")
    await nats_connection.close()
    print("Connection closed")
 
 
if __name__ == '__main__':
    asyncio.run(main())

Vyzkoušejme si nyní použití tohoto skriptu, pochopitelně po zadání korektního hesla:

$ python3 nats_connection.py
 
Connecting to NATS at address 192.168.1.34:44853
Connected...<nats client v2.1.7>
Closing connection
Connection closed

17. Producenti a konzumenti zpráv založení na komunikační strategii PUB-SUB

Ukažme si nyní, jak snadno je možné v Pythonu realizovat producenty a konzumenty zpráv posílaných a přijímaných s využitím komunikační strategie PUB-SUB. Nejdříve si ukažme producenta zpráv, který je až triviálně jednoduchý – zprávu po navázání spojení s NATS pošleme korutinou nazvanou publish do vybraného tématu:

import asyncio
import nats
 
URL = "192.168.1.34:44853"
NATS_USERNAME = "local"
NATS_PASSWORD = "--password--"
 
SUBJECT_NAME = "foo"
 
 
async def main():
    print(f"Connecting to NATS at address {URL}")
    nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD)
    print("Connected...")
 
    print("Publishing message")
    await nats_connection.publish(SUBJECT_NAME, b"Hello from Python!")
    print("Published")
 
    print("Closing connection")
    await nats_connection.close()
    print("Connection closed")
 
 
if __name__ == '__main__':
    asyncio.run(main())

Konzument zpráv je jen nepatrně složitější, neboť zprávy se přijímají v nekonečné smyčce (to pro jednoduchost, jsou totiž i další způsoby, které například zajistí odpojení od NATS). Povšimněte si, že konzument se musí nejprve přihlásit k odebírání tématu a teprve poté může zprávy přijímat:

import asyncio
import nats
 
URL = "192.168.1.34:44853"
NATS_USERNAME = "local"
NATS_PASSWORD = "--password--"
 
SUBJECT_NAME = "foo"
 
 
async def main():
    print(f"Connecting to NATS at address {URL}")
    nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD)
    print("Connected...")
 
    print(f"Subscribing to stream with name '{SUBJECT_NAME}'")
    sub = await nats_connection.subscribe(SUBJECT_NAME)
    print("Subscribed")
 
    print("Waiting for messages")
 
    while True:
        message = await sub.next_msg(timeout=100)
        print("Received:", message)
 
    await nc.close()
 
 
if __name__ == '__main__':
    asyncio.run(main())

18. Producenti a konzumenti zpráv založení na využití stream processingu

Zajímavější bude zjistit, jakým způsobem lze realizovat producenty a konzumenty v případě, kdy budeme chtít zprávy posílat do streamu a posléze je ze streamu číst resp. „přehrávat“. V prvním příkladu je ukázán způsob přečtení informací o vybraných streamech:

import asyncio
import nats
from pprint import pprint
 
URL = "192.168.1.34:44853"
NATS_USERNAME = "local"
NATS_PASSWORD = "--password--"
 
STREAM_NAMES = ["foo", "bar", "baz"]
 
 
async def main():
    print(f"Connecting to NATS at address {URL}")
    nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD)
    print("Connected...")
 
    print("Retrieving JetStream object")
    jet_stream = nats_connection.jetstream()
    print(f"Retrieved {jet_stream}")
 
    for stream_name in STREAM_NAMES:
        print(f"Stream info for stream {stream_name}:")
        print("-" * 40)
 
        stream_info = await jet_stream.stream_info(stream_name)
        pprint(stream_info.__dict__)
        print()
 
    print("Closing connection")
    await nats_connection.close()
    print("Connection closed")
 
 
if __name__ == '__main__':
    asyncio.run(main())

Ve skutečnosti na mém NATS serveru existují pouze streamy „foo“ a „bar“, nikoli již stream „baz“, což znamená, že při zjišťování informací o neexistujícím streamu dojde k vyhození výjimky:

Connecting to NATS at address 192.168.1.34:44853
Connected...
Retrieving JetStream object
Retrieved <nats.js.client.JetStreamContext object at 0x7faf51cb0940>
 
Stream info for stream foo:
----------------------------------------
{'cluster': None,
 'config': StreamConfig(name='foo', description=None, subjects=['foo'], retention='limits', max_consumers=-1, max_msgs=-1, max_bytes=-1, discard='old', max_age=0.0, max_msgs_per_subject=-1, max_msg_size=-1, storage='memory', num_replicas=1, no_ack=False, template_owner=None, duplicate_window=120000000000, placement=None, mirror=None, sources=None, sealed=False, deny_delete=False, deny_purge=False, allow_rollup_hdrs=False),
 'did_create': None,
 'mirror': None,
 'sources': None,
 'state': StreamState(messages=0, bytes=0, first_seq=0, last_seq=0, consumer_count=0, deleted=None, num_deleted=None, lost=None)}
 
Stream info for stream bar:
----------------------------------------
{'cluster': None,
 'config': StreamConfig(name='bar', description=None, subjects=['bar'], retention='limits', max_consumers=-1, max_msgs=-1, max_bytes=-1, discard='old', max_age=0.0, max_msgs_per_subject=-1, max_msg_size=-1, storage='file', num_replicas=1, no_ack=False, template_owner=None, duplicate_window=120000000000, placement=None, mirror=None, sources=None, sealed=False, deny_delete=False, deny_purge=False, allow_rollup_hdrs=False),
 'did_create': None,
 'mirror': None,
 'sources': None,
 'state': StreamState(messages=0, bytes=0, first_seq=0, last_seq=0, consumer_count=0, deleted=None, num_deleted=None, lost=None)}
 
Stream info for stream baz:
----------------------------------------
Traceback (most recent call last):
  File "stream_info.py", line 37, in
    asyncio.run(main())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "stream_info.py", line 27, in main
    stream_info = await jet_stream.stream_info(stream_name)
  File "/home/ptisnovs/.local/lib/python3.8/site-packages/nats/js/manager.py", line 71, in stream_info
    resp = await self._api_request(
  File "/home/ptisnovs/.local/lib/python3.8/site-packages/nats/js/manager.py", line 267, in _api_request
    raise APIError.from_error(resp['error'])
  File "/home/ptisnovs/.local/lib/python3.8/site-packages/nats/js/errors.py", line 84, in from_error
    raise NotFoundError(**err)
nats.js.errors.NotFoundError: nats: NotFoundError: code=404 err_code=10059 description='stream not found'

Publikace zpráv do zvoleného tématu (subject) je opět přímočará, jak je to ostatně ukázáno v následujícím demonstračním příkladu. Pouze namísto korutiny nats_connection.publish zavoláme korutinu jet_stream.publish. Povšimněte si, že zpráva je reprezentována polem bajtů a může tedy obsahovat jakékoli hodnoty:

import asyncio
import nats
 
URL = "192.168.1.34:44853"
NATS_USERNAME = "local"
NATS_PASSWORD = "--password--"
 
SUBJECT_NAME = "bar"
 
 
async def main():
    print(f"Connecting to NATS at address {URL}")
    nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD)
    print("Connected...")
 
    print("Retrieving JetStream object")
    jet_stream = nats_connection.jetstream()
    print(f"Retrieved {jet_stream}")
 
    print("Publishing message to stream")
    ack = await jet_stream.publish(f"{SUBJECT_NAME}", b"Hello from Python!")
    print(f"Published and acked: stream={ack.stream}, sequence={ack.seq}")
 
    print("Closing connection")
    await nats_connection.close()
    print("Connection closed")
 
 
if __name__ == '__main__':
    asyncio.run(main())

A konečně se podívejme na konzumenta zpráv uložených do streamu a dostupných přes zvolené téma. Tento demonstrační příklad je založen na smyčce, ve které se volá korutina next_msg, přičemž se při přijetí zprávy zavolá korutina on_message (jedná se tedy opět o asynchronní kód):

skolení ELK

import asyncio
import nats
 
URL = "192.168.1.34:44853"
NATS_USERNAME = "local"
NATS_PASSWORD = "--password--"
 
SUBJECT_NAME = "bar"
 
 
async def on_message(message):
    print(f"Received message {message}")
    await message.ack()
 
 
async def main():
    print(f"Connecting to NATS at address {URL}")
    nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD)
    print("Connected...")
 
    print("Retrieving JetStream object")
    jet_stream = nats_connection.jetstream()
    print(f"Retrieved {jet_stream}")
 
    print("Waiting for messages")
    sub = await jet_stream.subscribe(SUBJECT_NAME, cb=on_message)
    while True:
        await sub.next_msg(timeout=1)
 
    print("Closing connection")
    await nats_connection.close()
    print("Connection closed")
 
 
if __name__ == '__main__':
    asyncio.run(main())
Poznámka: jedná se pouze o zcela základní způsoby použití. Příště si ukážeme složitější příklady, vylepšenou specifikaci témat atd.

19. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.

Příklad Skript/kód Popis Cesta
1 nats_connection.py navázání spojení se serverem NATS z Pythonu, posléze ukončení spojení https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/nats_connection.py
2 simple_publisher.py publikace zprávy či většího množství zpráv do zvoleného tématu https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/simple_publisher.py
3 simple_subscriber.py čtení (konzumace) zpráv ze zvoleného tématu https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/simple_subscriber.py
4 stream_info.py informace o trojici streamů (pokud existují) https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/stream_info.py
5 stream_publisher.py producent zpráv do zvoleného tématu https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/stream_publisher.py
6 stream_subscriber1.py nejjednodušší forma konzumenta zpráv https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/stream_subscriber1.py
7 stream_subscriber2.py korektní reakce konzumenta zpráv na chyby https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/stream_subscriber2.py
8 stream_subscriber3.py explicitní potvrzování zpráv https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/stream_subscriber3.py
9 stream_subscriber4.py změna konfigurace konzumenta – přečtení poslední zprávy ze streamu https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/stream_subscriber4.py
10 stream_subscriber5.py změna konfigurace konzumenta – specifikace offsetu zprávy https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/stream_subscriber5.py
11 stream_subscriber6.py konzument akceptující všechny zprávy ze zvoleného streamu (nepoužívá se téma) https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/stream_subscriber6.py
12 stream_subscriber7.py konzument se specifikací jak streamu, tak i konkrétního tématu https://github.com/tisnik/message-queues-examples/blob/master/jetstream-py/stream_subscriber7.py

Demonstrační příklady uvedené v následující tabulce jsme si již popsali v článcích Komunikace s message brokery z programovacího jazyka Go, Použití message brokeru NATS a NATS Streaming Server. Tyto příklady jsou naprogramovány v jazyku Go a (alespoň prozatím) nepokrývají technologii JetStream.

Příklad Skript/kód Popis Cesta
1 publisher.go základní varianta producenta zpráv bez kontroly chyb https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/01-simple-pub-sub/publisher.go
1 consumer.go základní varianta konzumenta zpráv bez kontroly chyb https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/01-simple-pub-sub/consumer.go
       
2 publisher.go vylepšená verze producenta zpráv s kontrolami a uzavíráním připojení https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/02-pub-sub-error-handling/publisher.go
2 consumer.go vylepšená verze konzumenta zpráv s kontrolami a uzavíráním připojení https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/02-pub-sub-error-handling/consumer.go
       
3 publisher.go producent deseti zpráv do zvoleného tématu https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/03-replay-all/publisher.go
3 consumer.go konzument přijímající (a přehrávající) všechny zprávy ze zvoleného tématu https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/03-replay-all/consumer.go
       
4 publisher.go producent deseti zpráv do zvoleného tématu (shodný s předchozím příkladem) https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/04-replay-from-last-received/publisher.go
4 consumer.go konzument přijímající zprávy od poslední přijaté zprávy https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/04-replay-from-last-received/consumer.go
       
5 publisher.go producent deseti zpráv do zvoleného tématu (shodný s předchozím příkladem) https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/05-start-at-sequence/publisher.go
5 consumer.go konzument, v němž je možné specifikovat pořadí zprávy v sekvenci https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/05-start-at-sequence/consumer.go
       
6 publisher.go producent deseti zpráv do zvoleného tématu (shodný s předchozím příkladem) https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/06-time-duration/publisher.go
6 consumer.go konzument, v němž se specifikuje časový okamžik, od kterého se má zpráva či zprávy přijmout https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/06-time-duration/consumer.go
       
7 publisher.go producent deseti zpráv do zvoleného tématu (opět shodný s předchozím příkladem) https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/07-durable-subscription/publisher.go
7 consumer.go konzument využívající tzv. durable subscription popsanou v šestnácté kapitole https://github.com/tisnik/message-queues-examples/blob/master/go-nats-streaming/07-durable-subscription/consumer.go

20. Odkazy na Internetu

  1. NATS
    https://nats.io/about/
  2. JetStream
    https://docs.nats.io/nats-concepts/jetstream
  3. NATS Introduction
    https://nats.io/documentation/
  4. NATS Protocols
    https://docs.nats.io/lega­cy/stan/streaming/protocol#pro­tocols
  5. NATS Server Clients
    https://docs.nats.io/running-a-nats-service/clients
  6. NATS Messaging (Wikipedia)
    https://en.wikipedia.org/wi­ki/NATS_Messaging
  7. NATS Clients
    https://nats.io/download/#nats-clients
  8. NATS JetStream Enabled Clients
    https://nats.io/download/#nats-jetstream-enabled-clients
  9. Move over Kafka! Let's try NATS JetStream
    https://www.youtube.com/watch?v=EJJ2SG-cKyM
  10. NATS Jetstream Technical Overview
    https://www.youtube.com/wat­ch?v=w8fc44SmJDw
  11. Using NATS JetStream with Go
    https://www.youtube.com/wat­ch?v=uZ4fzg_eqSw
  12. The coolest OSS project you've never heard of: NATS Getting started!
    https://www.youtube.com/wat­ch?v=hjXIUPZ7ArM
  13. Komunikace s message brokery z programovacího jazyka Go
    https://www.root.cz/clanky/komunikace-s-message-brokery-z-programovaciho-jazyka-go/
  14. Použití message brokeru NATS
    https://www.root.cz/clanky/pouziti-message-brokeru-nats/
  15. NATS Streaming Server
    https://www.root.cz/clanky/nats-streaming-server/
  16. Microservices: The Rise Of Kafka
    https://movio.co/blog/microservices-rise-kafka/
  17. Building a Microservices Ecosystem with Kafka Streams and KSQL
    https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
  18. An introduction to Apache Kafka and microservices communication
    https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63
  19. kappa-architecture.com
    http://milinda.pathirage.org/kappa-architecture.com/
  20. Questioning the Lambda Architecture
    https://www.oreilly.com/i­deas/questioning-the-lambda-architecture
  21. Lambda architecture
    https://en.wikipedia.org/wi­ki/Lambda_architecture
  22. Apache Kafka: distribuovaná streamovací platforma
    https://www.root.cz/clanky/apache-kafka-distribuovana-streamovaci-platforma/