Obsah
1. JetStream – nová technologie přidaná do brokeru NATS
2. Komunikační strategie nabízené nejnovějším NATS serverem
4. Instalace CLI klienta a současně i vývojového serveru
5. Spuštění vývojového serveru
6. Nastavení kontextu pro CLI klienta
8. Publikace zprávy nebo většího množství zpráv z příkazové řádky
9. Přečtení zpráv či zprávy z vybraného streamu
10. Dočasní konzumenti zpráv vs. stálí konzumenti
11. Získání podrobnějších operací a vybraném streamu
12. Konzumenti explicitně žádající o zprávy
13. Manuální potvrzení zkonzumované zprávy konzumentem, další možnosti konzumentů
14. Kde jsou uloženy zprávy poslané do témat?
15. Komunikace s brokerem NATS z Pythonu
16. Připojení k NATS serveru z Pythonu
17. Producenti a konzumenti zpráv založení na komunikační strategii PUB-SUB
18. Producenti a konzumenti zpráv založení na využití stream processingu
19. Repositář s demonstračními příklady
1. JetStream – nová technologie přidaná do brokeru NATS
Na stránkách Roota jsme se v trojici článků Komunikace s message brokery z programovacího jazyka Go, Použití message brokeru NATS a NATS Streaming Server seznámili se zajímavým a potenciálně i velmi užitečným message brokerem nazvaným NATS. Připomeňme si ve stručnosti, že se jedná o poměrně úspěšný projekt, jenž je vyvinut v programovacím jazyku Go, což mu do jisté míry zajišťuje stabilitu i škálovatelnost (a taktéž velmi snadnou instalaci i tvorbu obrazů pro kontejnery). Stabilita a škálovatelnost jsou ostatně vlastnosti, které u message brokerů většinou očekáváme.
Původní varianta NATSu byla vytvořena Derekem Collisonem; zajímavé je, že tato varianta nebyla naprogramována v jazyce Go, ale v programovacím jazyce Ruby. Dnes se ovšem budeme zabývat moderní (a jedinou podporovanou) verzí systému NATS, která byla přeportována do jazyka Go. Celý systém NATS se skládá z několika komponent:
- V první řadě se jedná o samotný server, jenž se spouští příkazem nats-server (což si ostatně za chvíli vyzkoušíme). Server je naprogramovaný v jazyce Go a při jeho vývoji bylo dbáno na to, aby byla zaručena vysoká dostupnost celé služby a přitom byla samotná služba s běžícím serverem málo náročná na systémové zdroje, především na spotřebu operační paměti (to má v době Dockeru a podobných nástrojů poměrně velký význam).
- Dalším typem komponenty jsou programátorská rozhraní pro klienty, která v současnosti existují pro několik ekosystémů (což je většinou kombinace programovacího jazyka, knihoven a popř. jeho virtuálního stroje); viz též tabulky s podporovanými ekosystémy, které jsou zobrazeny pod tímto odstavcem.
- Třetí komponentou, kvůli které ostatně vznikl tento článek, jeJetStream, což je technologie umožňující využití stream processingu zpopularizovaného brokerem Apache Kafka. V porovnání s Kafkou je NATS + JetStream „odlehčenou“ alternativou, která ovšem navíc podporuje i mnohé další komunikační strategie, takže má šanci se stát ústřední technologií v architekturách založených na mikroslužbách.
- Čtvrtým typem komponenty je takzvaný NATS Connector Framework zajišťující propojení systému NATS s dalšími technologiemi (XMPP, logování, notifikační služby aj.). Ten je naprogramovaný v Javě a v současnosti je podporován například konektor pro Redis (https://github.com/nats-io/nats-connector-redis). Touto komponentou se prozatím nebudeme zabývat.
Oficiálně v současnosti existují tato rozhraní pro broker NATS (bez JetStreamu):
1 | Arduino | NATS Arduino Client |
2 | Asio C++ | NATS Asio C++ Client |
3 | Ballerina | NATS Ballerina Client |
4 | C | NATS C Client |
5 | .NET C# | NATS .NET C# Client |
6 | Clojure | NATS Clojure Client |
7 | Crystal | NATS Crystal Client |
8 | Dart | NATS Dart Client |
9 | Deno | NATS Deno Client |
10 | Elixir | NATS Elixir Client |
11 | Elm | NATS Elm Client |
12 | Erlang | NATS Erlang Client |
13 | Go | NATS Go Client |
14 | Haskell | NATS Haskell Client |
15 | Java | NATS Java Client |
16 | Java Android | NATS Java Android Client |
17 | LabVIEW | NATS LabVIEW Client |
18 | Common Lisp | NATS Common Lisp Client |
19 | Lua | NATS Lua Client |
20 | MicroPython | NATS MicroPython Client |
21 | NGINX | NATS NGINX Client |
22 | Node.js | NATS Node.js Client |
23 | Perl | NATS Perl Client |
24 | PHP | NATS PHP Client |
25 | Pure Ruby | NATS Pure Ruby Client |
26 | Python | NATS Python Client |
27 | Python Asyncio | NATS Python Asyncio Client |
28 | Python Minimal | NATS Python Minimal Client |
29 | Python Tornado | NATS Python Tornado Client |
30 | Python Twisted | NATS Python Twisted Client |
31 | C++/Qt/QML | NATS C++/Qt/QML Client |
32 | Qt5 C++ | NATS Qt5 C++ Client |
33 | Ruby | NATS Ruby Client |
34 | Rust | NATS Rust Client |
35 | Scala | NATS Scala Client |
36 | Swift | NATS Swift Client |
37 | Tcl | NATS Tcl Client |
38 | WebSocket | NATS WebSocket Client |
Technologie JetStream je oficiálně podporována v menším množství ekosystémů, ovšem ty základní (C, Go, Java, Rust a Python) podporovány již jsou:
1 | C | NATS C Client |
2 | .NET C# | NATS .NET C# Client |
3 | Deno | NATS Deno Client |
4 | Go | NATS Go Client |
5 | Java | NATS Java Client |
6 | Node.js | NATS Node.js Client |
7 | PHP | NATS PHP Client |
8 | Pure Ruby | NATS Pure Ruby Client |
9 | Python Asyncio | NATS Python Asyncio Client |
10 | C++/Qt/QML | NATS C++/Qt/QML Client |
11 | Rust | NATS Rust Client |
12 | Tcl | NATS Tcl Client |
13 | WebSocket | NATS WebSocket Client |
2. Komunikační strategie nabízené nejnovějším NATS serverem
Nejnovější NATS server s technologií JetStream nabízí uživatelům hned několik komunikačních strategií. Některé z těchto strategií jsme si již popsali v předchozích dvou článcích; nové strategie si postupně popíšeme. Pro větší přehlednost jsou všechny podporované komunikační strategie vizualizovány na osmici diagramů:

Obrázek 1: Komunikační strategie nazvaná PUB-SUB, v níž spolu komunikují dva typy uzlů: zdroje zpráv (publishers) a konzumenti zpráv (subscribers). Konzumenti se mohou k odebírání zpráv kdykoli přihlásit a kdykoli se také odpojit, což zdroj zpráv nijak neovlivní – ten bude posílat zprávy nezávisle na tom, kdo je přijímá.

Obrázek 2: Počet producentů není nijak omezen, takže se na strategii PUB-SUB můžeme dívat i jako na klasický message-bus.

Obrázek 3: Strategie PUSH-PULL je založena na použití fronty. Producent pošle zprávu message brokerovi, který ji uloží do fronty (každá fronta je pojmenovaná, resp. přesněji řečeno každé téma je reprezentované frontou), konzumenti se mohou k frontě (frontám) připojit a zprávy přečíst.

Obrázek 4: U strategie PUSH-PULL je možné zvolit, že všichni konzumenti dostanou stejné zprávy (a ideálně i ve stejném pořadí).

Obrázek 5: V tomto případě je konfigurace odlišná: zpráva se doručí jen jedinému konzumentovi. Pokud se k frontě připojí větší množství konzumentů, je typicky použit algoritmus round-robin pro (polo)spravedlivé rozložení zátěže.

Obrázek 6: Další velmi často používaná komunikační strategie se nazývá Request-Response nebo zkráceně REQ-REP, popř. pouze REQREP. Při použití této strategie spolu komunikují dva typy uzlů – server a teoreticky neomezené množství klientů. Server přijímá požadavky (request) a odpovídá na ně (response), přičemž je možné, aby požadavky posílalo několik klientů (a jeden klient naopak může v případě potřeby posílat požadavky více serverům).

Obrázek 7: Nově je díky technologii JetStream podporován i streaming, jenž byl zpopularizován především Kafkou. JetStream podporuje dva typy připojení konzumentů. Zde je naznačeno připojení typu PUSH-PUSH, kdy jsou zprávy posílány konzumentovi serverem. Konzument tedy o zprávy nemusí explicitně žádat, pouze se stačí připojit k danému tématu. Indexy naposledy přijatých zpráv jsou (resp. mohou být) uloženy na serveru.

Obrázek 8: Alternativou k předchozí strategii je koncept PUSH-PULL, v němž jednotliví konzumenti explicitně žádají o zprávy server, a to ve chvíli, kdy jsou připraveni na příjem zprávy. Stále se však jedná o klasický streaming, tedy existuje zde možnost „přehrání“ starších zpráv.
3. Instalace NATS serveru
Lokální instalace NATS serveru je skutečně triviální, protože se jedná o jediný binární (přímo spustitelný) soubor, který navíc nevyžaduje ani žádnou konfiguraci (samozřejmě že je možné server v případě potřeby nakonfigurovat, ovšem bude funkční i bez konfigurace).
Nejprve je nutné ze stránky https://github.com/nats-io/nats-server/releases/tag/v2.9.0 stáhnout archiv s NATS serverem. Vzhledem k tomu, že se jedná o nativní aplikaci, musíme vybrat archiv pro danou platformu (v mém případě amd64/x86–64):
$ wget https://github.com/nats-io/nats-server/releases/download/v2.9.0/nats-server-v2.9.0-linux-amd64.tar.gz --2022-09-18 17:47:57-- https://github.com/nats-io/nats-server/releases/download/v2.9.0/nats-server-v2.9.0-linux-amd64.tar.gz ... ... ... 2022-09-18 17:47:59 (3.78 MB/s) - ‘nats-server-v2.9.0-linux-amd64.tar.gz’ saved [4898804/4898804]
Dále právě stažený archiv běžným způsobem rozbalíme:
$ tar xvfz nats-server-v2.9.0-linux-amd64.tar.gz nats-server-v2.9.0-linux-amd64/LICENSE nats-server-v2.9.0-linux-amd64/README.md nats-server-v2.9.0-linux-amd64/nats-server
Přesuneme se do vytvořeného adresáře:
$ cd nats-server-v2.9.0-linux-amd64
A prozkoumáme jeho obsah:
$ ls -la total 12044 drwxrwxr-x. 2 ptisnovs ptisnovs 100 Sep 18 17:48 . drwxrwxrwt. 8 root root 180 Sep 18 17:48 .. -rw-rw-r--. 1 ptisnovs ptisnovs 11357 Sep 9 16:49 LICENSE -rwxrwxr-x. 1 ptisnovs ptisnovs 12315116 Sep 9 16:51 nats-server -rw-rw-r--. 1 ptisnovs ptisnovs 3622 Sep 9 16:49 README.md
Pro jistotu se přesvědčíme, že server lze spustit:
$ ./nats-server --version nats-server: v2.9.0
A spustíme ho (což je operace, která je prakticky okamžitá):
$ ./nats-server [13644] 2022/09/18 19:05:37.697707 [INF] Starting nats-server [13644] 2022/09/18 19:05:37.697822 [INF] Version: 2.9.0 [13644] 2022/09/18 19:05:37.697825 [INF] Git: [517d9b3] [13644] 2022/09/18 19:05:37.697832 [INF] Name: NCVERYFIQFOUACDK6AF7QR7VXMPBQFSAWXJFI2M3PAPYU3DRRWGCKISG [13644] 2022/09/18 19:05:37.697834 [INF] ID: NCVERYFIQFOUACDK6AF7QR7VXMPBQFSAWXJFI2M3PAPYU3DRRWGCKISG [13644] 2022/09/18 19:05:37.698641 [INF] Listening for client connections on 0.0.0.0:4222 [13644] 2022/09/18 19:05:37.698979 [INF] Server is ready
4. Instalace CLI klienta a současně i vývojového serveru
Pro komunikaci s NATS serverem dnes použijeme CLI klienta (tedy aplikaci ovládanou z příkazového řádku), pomocí níž budeme vytvářet témata a streamy, produkovat zprávy, přijímat zprávy a současně i sledovat stav serveru. CLI klient je opět nativní aplikací, kterou lze získat z repositáře dostupného na adrese https://github.com/nats-io/natscli.
Podobně jako v případě serveru, i nyní stáhneme archiv s aplikací určený pro námi používanou platformu:
$ wget https://github.com/nats-io/natscli/releases/download/v0.0.34/nats-0.0.34-linux-amd64.zip ... ... ... 2022-09-18 17:55:30 (4.53 MB/s) - ‘nats-0.0.34-linux-amd64.zip’ saved [7827956/7827956]
Archiv rozbalíme:
$ unzip nats-0.0.34-linux-amd64.zip Archive: nats-0.0.34-linux-amd64.zip inflating: nats-0.0.34-linux-amd64/LICENSE inflating: nats-0.0.34-linux-amd64/README.md inflating: nats-0.0.34-linux-amd64/nats
Přejdeme do adresáře s rozbaleným klientem:
$ cd nats-0.0.34-linux-amd64
A podíváme se, jaké soubory se rozbalily:
$ ls -la total 19972 drwxrwxr-x. 2 ptisnovs ptisnovs 100 Sep 18 17:55 . drwxrwxrwt. 10 root root 220 Sep 18 17:55 .. -rw-r--r--. 1 ptisnovs ptisnovs 11357 Apr 2 20:40 LICENSE -rwxr-xr-x. 1 ptisnovs ptisnovs 20414464 Sep 9 17:09 nats -rw-r--r--. 1 ptisnovs ptisnovs 24502 Aug 19 15:04 README.md
5. Spuštění vývojového serveru
Spuštění právě nainstalovaného NATS serveru ve vývojovém režimu je triviální. Pouze postačuje z adresáře, do kterého byl CLI klient rozbalen, spustit příkaz:
$ ./nats server run Starting local development NATS Server instance: nats_development User Credentials: User: local Password: WTJUCsUpDZIASz89c0I1F5HSeDWjmzMz Context: nats_development Service Credentials: User: service Password: HURHd7OEFuijnAwH9IwacQGhMyaK2FJs Context: nats_development_service System Credentials: User: system Password: q0YT97I9TXpyUgz2bLSoTwenh5TPZF0t Context: nats_development_system Extending Demo Network: false Extending Remote NATS: false URL: nats://0.0.0.0:44853 Clean on Exit: false NOTE: This is not a supported way to run a production NATS Server, view documentation at https://docs.nats.io/running-a-nats-service/introduction for production use. [9305] [INF] Starting nats-server [9305] [INF] Version: 2.9.0 [9305] [INF] Git: [not set] [9305] [INF] Name: nats_development [9305] [INF] ID: NBQMRLOETYHYXZJKGBKPXOXGPSWH6IOCVXYAGJYPZ5AL4VSO2QAAIR57 [9305] [INF] Using configuration file: /tmp/ramdisk/nats-server-run-2951373316.cfg [9305] [INF] Listening for client connections on 0.0.0.0:44853 [9305] [INF] Server is ready
V dalším textu budeme využívat technologii JetStream. Tu je nutné explicitně povolit:
$ ./nats server run --jetstream Starting local development NATS Server instance: nats_development User Credentials: User: local Password: WTJUCsUpDZIASz89c0I1F5HSeDWjmzMz Context: nats_development Service Credentials: User: service Password: HURHd7OEFuijnAwH9IwacQGhMyaK2FJs Context: nats_development_service System Credentials: User: system Password: q0YT97I9TXpyUgz2bLSoTwenh5TPZF0t Context: nats_development_system Extending Demo Network: false Extending Remote NATS: false URL: nats://0.0.0.0:44853 Clean on Exit: false NOTE: This is not a supported way to run a production NATS Server, view documentation at https://docs.nats.io/running-a-nats-service/introduction for production use. [9332] [INF] Starting nats-server [9332] [INF] Version: 2.9.0 [9332] [INF] Git: [not set] [9332] [INF] Name: nats_development [9332] [INF] Node: HhEiCaYr [9332] [INF] ID: NBVG2F5TWP5N6UY3DVDYT7U6G7JD5CNBT7RX5X4QEZYE4X32QT6CENOZ [9332] [INF] Using configuration file: /tmp/ramdisk/nats-server-run-3289015964.cfg [9332] [INF] Starting JetStream [9332] [INF] _ ___ _____ ___ _____ ___ ___ _ __ __ [9332] [INF] _ | | __|_ _/ __|_ _| _ \ __| /_\ | \/ | [9332] [INF] | || | _| | | \__ \ | | | / _| / _ \| |\/| | [9332] [INF] \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_| |_| [9332] [INF] [9332] [INF] https://docs.nats.io/jetstream [9332] [INF] [9332] [INF] ---------------- JETSTREAM ---------------- [9332] [INF] Max Memory: 11.33 GB [9332] [INF] Max Storage: 60.84 GB [9332] [INF] Store Directory: "/home/ptisnovs/.local/share/nats/nats_development/jetstream" [9332] [INF] ------------------------------------------- [9332] [INF] Listening for client connections on 0.0.0.0:44853 [9332] [INF] Server is ready
$ ./nats-server --jetstream
Povšimněte si, že v tomto případě se již nevypíše žádné varování, ovšem ani žádná hesla atd.:
[32528] 2022/09/21 08:15:29.778802 [INF] Starting nats-server [32528] 2022/09/21 08:15:29.778849 [INF] Version: 2.9.0 [32528] 2022/09/21 08:15:29.778852 [INF] Git: [517d9b3] [32528] 2022/09/21 08:15:29.778856 [INF] Name: ND4O2ZQS6XGKN5J4ORYAML2TSG2HTLAPKEHQWTCJMBS5UFXIZRRC2HIE [32528] 2022/09/21 08:15:29.778860 [INF] Node: NmzhvZB9 [32528] 2022/09/21 08:15:29.778862 [INF] ID: ND4O2ZQS6XGKN5J4ORYAML2TSG2HTLAPKEHQWTCJMBS5UFXIZRRC2HIE [32528] 2022/09/21 08:15:29.779350 [INF] Starting JetStream [32528] 2022/09/21 08:15:29.779614 [INF] _ ___ _____ ___ _____ ___ ___ _ __ __ [32528] 2022/09/21 08:15:29.779621 [INF] _ | | __|_ _/ __|_ _| _ \ __| /_\ | \/ | [32528] 2022/09/21 08:15:29.779623 [INF] | || | _| | | \__ \ | | | / _| / _ \| |\/| | [32528] 2022/09/21 08:15:29.779625 [INF] \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_| |_| [32528] 2022/09/21 08:15:29.779628 [INF] [32528] 2022/09/21 08:15:29.779630 [INF] https://docs.nats.io/jetstream [32528] 2022/09/21 08:15:29.779632 [INF] [32528] 2022/09/21 08:15:29.779634 [INF] ---------------- JETSTREAM ---------------- [32528] 2022/09/21 08:15:29.779638 [INF] Max Memory: 11.33 GB [32528] 2022/09/21 08:15:29.779642 [INF] Max Storage: 725.26 MB [32528] 2022/09/21 08:15:29.779644 [INF] Store Directory: "/tmp/ramdisk/nats/jetstream" [32528] 2022/09/21 08:15:29.779647 [INF] ------------------------------------------- [32528] 2022/09/21 08:15:29.780203 [INF] Listening for client connections on 0.0.0.0:4222 [32528] 2022/09/21 08:15:29.780593 [INF] Server is ready
6. Nastavení kontextu pro CLI klienta
Z terminálu, z něhož budeme spouštět producenty a konzumenty zpráv, popř. vytvářet streamy atd., je nutné specifikovat kontext, což současně vede k nastavení hesla. Při spuštění serveru byly vytvořeny tři kontexty nazvané nats_development, nats_development_service a nats_development_system. Použijeme první z těchto kontextů:
$ ./nats context select nats_development NATS Configuration Context "nats_development" Description: Local user access for NATS Development instance Server URLs: nats://0.0.0.0:44853 Username: local Password: ********* Path: /home/ptisnovs/.config/nats/context/nats_development.json
7. Vytvoření nového streamu
V této kapitole si ukážeme, jak lze z příkazové řádky vytvořit nový stream. Nejprve se přesvědčme o tom, že žádný stream v daném okamžiku neexistuje:
$ ./nats stream ls No Streams defined
Stav streamů můžeme sledovat neustále, například tímto příkazem:
$ watch ./nats stream ls
Stream se vytvoří příkazem stream add, kterému můžeme, ale také nemusíme předat další parametry:
$ ./nats stream add
V případě, že žádné další parametry nezadáme, bude se na ně klient postupně dotazovat – což je podle mého názoru zcela nejlepší chování aplikace ovládané z příkazového řádku (mnohem lepší, než se učit či opisovat desítku přepínačů, které se použijí jednou za rok). Nejdůležitějšími parametry je jméno streamu (použijeme dále) a taktéž téma (subject), které se obecně skládá ze slov oddělených tečkami. Namísto slova lze použít hvězdičku (jedná se o glob, nikoli o regulární výraz!):
? Stream Name foo ? Subjects foo.* ? Storage memory ? Replication 1 ? Retention Policy Limits ? Discard Policy Old ? Stream Messages Limit -1 ? Per Subject Messages Limit -1 ? Total Stream Size -1 ? Message TTL -1 ? Max Message Size -1 ? Duplicate tracking time window 2m0s ? Allow message Roll-ups No ? Allow message deletion Yes ? Allow purging subjects or the entire stream (Y/n)
Stream byl vytvořen s následujícími vlastnostmi:
Stream foo was created Information for Stream foo created 2022-09-18 18:04:48 Subjects: foo.* Replicas: 1 Storage: Memory Options: Retention: Limits Acknowledgements: true Discard Policy: Old Duplicate Window: 2m0s Allows Msg Delete: true Allows Purge: true Allows Rollups: false Limits: Maximum Messages: unlimited Maximum Per Subject: unlimited Maximum Bytes: unlimited Maximum Age: unlimited Maximum Message Size: unlimited Maximum Consumers: unlimited State: Messages: 0 Bytes: 0 B FirstSeq: 0 LastSeq: 0 Active Consumers: 0
Příkazem stream ls se můžeme přesvědčit o tom, zda byl stream skutečně vytvořen:
$ ./nats stream ls ╭───────────────────────────────────────────────────────────────────────────╮ │ Streams │ ├──────┬─────────────┬─────────────────────┬──────────┬──────┬──────────────┤ │ Name │ Description │ Created │ Messages │ Size │ Last Message │ ├──────┼─────────────┼─────────────────────┼──────────┼──────┼──────────────┤ │ foo │ │ 2022-09-18 18:04:48 │ 0 │ 0 B │ never │ ╰──────┴─────────────┴─────────────────────┴──────────┴──────┴──────────────╯
8. Publikace zprávy nebo většího množství zpráv z příkazové řádky
Publikace zprávy z příkazové řádky je velmi jednoduchou operací. Použijeme příkaz pub, za nímž zadáme jméno tématu „foo.x“ (tedy konkrétní jméno, nikoli s hvězdičkou) i vlastní zprávu, která může být v nejjednodušším případě pouze řetězcem:
$ ./nats pub foo.x "test" 18:06:11 Published 4 bytes to "foo.x"
Podívejme se nyní, jestli se zpráva skutečně uložila do příslušného streamu:
$ ./nats stream ls ╭───────────────────────────────────────────────────────────────────────────╮ │ Streams │ ├──────┬─────────────┬─────────────────────┬──────────┬──────┬──────────────┤ │ Name │ Description │ Created │ Messages │ Size │ Last Message │ ├──────┼─────────────┼─────────────────────┼──────────┼──────┼──────────────┤ │ foo │ │ 2022-09-18 18:04:48 │ 1 │ 25 B │ 18.87s │ ╰──────┴─────────────┴─────────────────────┴──────────┴──────┴──────────────╯
NATS nám dokonce umožňuje publikovat větší množství zpráv, přičemž každá z těchto zpráv může mít odlišný obsah. Povšimněte si, že tělo zprávy je vlastně šablonou:
$ ./nats pub foo.x "Message #{{.Count}}" --count 100 100 / 100 [======================================================] 0s
Opět se podívejme na obsah streamů:
$ ./nats stream ls ╭──────────────────────────────────────────────────────────────────────────────╮ │ Streams │ ├──────┬─────────────┬─────────────────────┬──────────┬─────────┬──────────────┤ │ Name │ Description │ Created │ Messages │ Size │ Last Message │ ├──────┼─────────────┼─────────────────────┼──────────┼─────────┼──────────────┤ │ foo │ │ 2022-09-18 18:04:48 │ 101 │ 3.1 KiB │ 32.54s │ ╰──────┴─────────────┴─────────────────────┴──────────┴─────────┴──────────────╯
A nakonec přidejme do streamu dalších sto zpráv, tentokrát ovšem s tématem „foo.y“ a nikoli „foo.x“:
$ ./nats pub foo.y "Message #{{.Count}}" --count 100 100 / 100 [======================================================] 0s $ ./nats stream ls ╭──────────────────────────────────────────────────────────────────────────────╮ │ Streams │ ├──────┬─────────────┬─────────────────────┬──────────┬─────────┬──────────────┤ │ Name │ Description │ Created │ Messages │ Size │ Last Message │ ├──────┼─────────────┼─────────────────────┼──────────┼─────────┼──────────────┤ │ foo │ │ 2022-09-18 18:04:48 │ 201 │ 6.3 KiB │ 1.61s │ ╰──────┴─────────────┴─────────────────────┴──────────┴─────────┴──────────────╯
9. Přečtení zpráv či zprávy z vybraného streamu
Konzument zpráv se spouští příkazem sub, kterému navíc musíme předat buď jméno streamu nebo jméno tématu. Zkusme nejprve přečíst všechny zprávy ze streamu:
$ ./nats sub --stream foo [#1] Received JetStream message: stream: foo seq 1 / subject: foo.x / time: 2022-09-18T18:06:11+02:00 test [#2] Received JetStream message: stream: foo seq 2 / subject: foo.x / time: 2022-09-18T18:07:08+02:00 Message #1 [#3] Received JetStream message: stream: foo seq 3 / subject: foo.x / time: 2022-09-18T18:07:08+02:00 Message #2 [#4] Received JetStream message: stream: foo seq 4 / subject: foo.x / time: 2022-09-18T18:07:08+02:00 Message #3 ... ... ... [#199] Received JetStream message: stream: foo seq 199 / subject: foo.y / time: 2022-09-18T18:08:00+02:00 Message #98 [#200] Received JetStream message: stream: foo seq 200 / subject: foo.y / time: 2022-09-18T18:08:00+02:00 Message #99 [#201] Received JetStream message: stream: foo seq 201 / subject: foo.y / time: 2022-09-18T18:08:00+02:00 Message #100
Přečíst všechny zprávy (bez ohledu na aktuální hodnotu offsetu) můžeme tímto příkazem:
$ ./nats sub --stream foo --all
Naopak čekání pouze na nové zprávy zajistí příkaz:
$ ./nats sub --stream foo --new 18:11:24 Subscribing to JetStream Stream holding messages with subject foo.* delivering any new messages received
Velmi užitečná je možnost přečíst poslední zprávu ve streamu, samozřejmě za předpokladu, že stream není prázdný:
$ ./nats sub --stream foo --last 18:11:47 Subscribing to JetStream Stream holding messages with subject foo.* starting with the last message received [#1] Received JetStream message: stream: foo seq 201 / subject: foo.y / time: 2022-09-18T18:08:00+02:00 Message #100
Zadat lze i explicitní offset zprávy, od které se má provést „replay“:
$ ./nats sub --stream foo --start-sequence=100
Popř. je možné uvést časové razítko zprávy, od které se má začít:
$ ./nats sub --stream foo --since DURATION
10. Dočasní konzumenti zpráv vs. stálí konzumenti
Server NATS rozlišuje mezi dočasnými konzumenty a stálými konzumenty (durable consumers). Pro dočasné konzumenty není nutné si pamatovat index poslední přečtené a/nebo potvrzené zprávy – tuto činnost server plně ponechává na samotných konzumentech zda a jak si uchovají informaci o tom, o které zprávy budou mít zájem (zde se tedy využívají přepínače uvedené v předchozí kapitole). Naproti tomu se pro stálé konzumenty pamatuje index poslední přečtené a/nebo potvrzené zprávy, a to přímo na NATS serveru (není tedy nutné spouštět a udržovat instanci další služby podobné ZooKeeperu).
Stálý konzument se vytvoří jednoduše – uvedením přepínače –durable, za nímž následuje jméno tématu a konečně jméno stálého konzumenta. Po zadání následujícího příkazu se konzument automaticky spustí:
$ ./nats sub --stream foo --durable foo_consumer1 [#1] Received JetStream message: stream: foo seq 1 / subject: foo.x / time: 2022-09-19T10:48:08+02:00 Message #1 [#2] Received JetStream message: stream: foo seq 2 / subject: foo.x / time: 2022-09-19T10:48:08+02:00 Message #2 [#3] Received JetStream message: stream: foo seq 3 / subject: foo.x / time: 2022-09-19T10:48:08+02:00 Message #3 [#99] Received JetStream message: stream: foo seq 99 / subject: foo.x / time: 2022-09-19T10:48:08+02:00 Message #99 [#100] Received JetStream message: stream: foo seq 100 / subject: foo.x / time: 2022-09-19T10:48:08+02:00 Message #100
Konzumenta zastavíme klávesou Ctrl+C.
Po svém dalším spuštění začne konzument nazvaný „consumer1“ číst jen prozatím nezpracované zprávy – server si tedy zapamatoval index poslední přečtené zprávy:
$ ./nats sub --stream foo --durable foo_consumer_1 ... ... ...
Vzhledem k tomu, že stálý konzument je pojmenovaný, lze o něm snadno získat další informace:
$ ./nats consumer info foo foo_consumer_1 Information for Consumer foo > foo_consumer_1 created 2022-09-19T10:47:22+02:00 Configuration: Durable Name: foo_consumer_1 Delivery Subject: _INBOX.1AZwC1SLqhNK2OzZ8em644 Deliver Policy: All Ack Policy: None Replay Policy: Instant Idle Heartbeat: 5.00s Flow Control: true State: Last Delivered Message: Consumer sequence: 100 Stream sequence: 100 Last delivery: 8m5s ago Unprocessed Messages: 10 Active Interest: No interest
11. Získání podrobnějších operací a vybraném streamu
O libovolném streamu lze získat podrobnější informace zadáním příkazu stream info, kterému se předá jméno příslušného streamu. Podívejme se na jednoduchý příklad – získáme informaci o streamu „foo“:
$ ./nats stream info foo Information for Stream foo created 2022-09-19 10:46:29 Subjects: foo.* Replicas: 1 Storage: Memory Options: Retention: Limits Acknowledgements: true Discard Policy: Old Duplicate Window: 2m0s Allows Msg Delete: true Allows Purge: true Allows Rollups: false Limits: Maximum Messages: unlimited Maximum Per Subject: unlimited Maximum Bytes: unlimited Maximum Age: unlimited Maximum Message Size: unlimited Maximum Consumers: unlimited State: Messages: 100 Bytes: 3.1 KiB FirstSeq: 1 @ 2022-09-19T08:48:08 UTC LastSeq: 100 @ 2022-09-19T08:48:08 UTC Active Consumers: 1 Number of Subjects: 1
12. Konzumenti explicitně žádající o zprávy
Ve druhé kapitole jsme se zmínili o tom, že streaming poskytovaný technologií JetStream nabízí možnost využití dvou komunikačních strategií – PUSH-PUSH (zprávy konzumentovi posílá server) nebo PUSH-PULL (zprávy si explicitně vyzvedává klient). První z těchto strategií je implicitní, druhou je nutné zvolit při konfiguraci nového konzumenta, a to nevyplněním položky Delivery target:
$ ./nats consumer create ? Consumer name foo_consumer_2 ? Delivery target (empty for Pull Consumers) ? Start policy (all, new, last, subject, 1h, msg sequence) all ? Acknowledgement policy explicit ? Replay policy instant ? Filter Stream by subject (blank for all) ? Maximum Allowed Deliveries -1 ? Maximum Acknowledgements Pending 0 ? Deliver headers only without bodies No ? Add a Retry Backoff Policy No ? Select a Stream foo Information for Consumer foo > foo_consumer_2 created 2022-09-19T10:54:51+02:00 Configuration: Durable Name: foo_consumer_2 Pull Mode: true Deliver Policy: All Ack Policy: Explicit Ack Wait: 30s Replay Policy: Instant Max Ack Pending: 1,000 Max Waiting Pulls: 512 State: Last Delivered Message: Consumer sequence: 0 Stream sequence: 0 Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0 Outstanding Acks: 0 out of maximum 1,000 Redelivered Messages: 0 Unprocessed Messages: 100 Waiting Pulls: 0 of maximum 512
Jaká komunikační strategie je pro zvoleného konzumenta nakonfigurována, se dozvíme i při zadání nám již známého příkazu consumer info (viz zvýrazněný řádek):
$ ./nats consumer info foo foo_consumer_2 Information for Consumer foo > foo_consumer_2 created 2022-09-19T10:54:51+02:00 Configuration: Durable Name: foo_consumer_2 Pull Mode: true Deliver Policy: All Ack Policy: Explicit Ack Wait: 30s Replay Policy: Instant Max Ack Pending: 1,000 Max Waiting Pulls: 512 State: Last Delivered Message: Consumer sequence: 0 Stream sequence: 0 Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0 Outstanding Acks: 0 out of maximum 1,000 Redelivered Messages: 0 Unprocessed Messages: 100 Waiting Pulls: 0 of maximum 512
13. Manuální potvrzení zkonzumované zprávy konzumentem, další možnosti konzumentů
Konzumenti zpráv musí nějakým způsobem potvrdit, že zprávu získali, popř. zpracovali. Implicitně je nastaveno automatické potvrzení doručení zprávy, ovšem to nemusí být v mnoha případech ideální (samotné doručení zprávy ještě nezaručuje její zpracování). Mnohdy totiž potřebujeme potvrdit až zpracování zprávy, nejenom její přijetí. Jedná se o konfiguraci, která může být pro každého konzumenta nastavena odlišně.
Potvrzování zpráv funguje dobře i pro konzumenty spouštěné z příkazového řádku:
$ ./nats consumer next foo foo_consumer_2 --count=1 [10:58:11] subj: foo.x / tries: 1 / cons seq: 1 / str seq: 1 / pending: 99 Message #1 Acknowledged message
Příkaz nats consumer ovšem uživatelům nabízí i další volby, například:
usage: nats consumer [<flags>] <command> [<args> ...] JetStream Consumer management Subcommands: consumer ls List known Consumers consumer report Reports on Consumer statistics consumer info Consumer information consumer add Creates a new Consumer consumer edit Edits the configuration of a consumer consumer rm Removes a Consumer consumer copy Creates a new Consumer based on the configuration of another consumer next Retrieves messages from Pull Consumers without interactive prompts consumer sub Retrieves messages from Consumers consumer cluster Manages a clustered Consumer Flags: -a, --all Operate on all streams including system ones
Příklad tabulky získané příkazem nats consumer report:
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ │ Consumer report for bar with 1 consumers │ ├────────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬───────────────────┤ │ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │ ├────────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼───────────────────┤ │ consumer-a │ Pull │ Explicit │ 30.00s │ 0 │ 0 │ 0 │ 0 │ nats_development* │ ╰────────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴───────────────────╯
14. Kde jsou uloženy zprávy poslané do témat?
Podívejme se nyní, kde jsou vlastně uloženy zprávy, jenž jsou posílané do jednotlivých témat. Zprávy jsou na základě uvedeného tématu rozděleny do streamů (pro jeden stream může existovat větší množství témat, popř. neomezené množství témat) a pro každý stream je vytvořen zvláštní podadresář v adresáři zvoleném při spuštění nebo konfiguraci serveru. Informace o tomto adresáři se vypisuje během spuštění serveru:
[31151] [INF] Store Directory: "/home/ptisnovs/.local/share/nats/nats_development/jetstream" [31151] [INF] ------------------------------------------- [31151] [INF] Starting restore for stream 'USER > bar' [31151] [INF] Restored 20 messages for stream 'USER > bar' [31151] [INF] Starting restore for stream 'USER > foo' [31151] [INF] Restored 104 messages for stream 'USER > foo' [31151] [INF] Recovering 1 consumers for stream - 'USER > bar'
Celý strom s uloženými streamy vypadá takto:
└── USER └── streams ├── bar │ ├── meta.inf │ ├── meta.sum │ ├── msgs │ │ ├── 1.blk │ │ └── 1.idx │ └── obs ├── foo │ ├── meta.inf │ ├── meta.sum │ ├── msgs │ │ ├── 1.blk │ │ └── 1.idx │ └── obs ├── user-add-operation │ ├── meta.inf │ ├── meta.sum │ ├── msgs │ │ ├── 1.blk │ │ ├── 1.fss │ │ └── 1.idx │ └── obs └── user-delete-operation ├── meta.inf ├── meta.sum ├── msgs │ ├── 1.blk │ ├── 1.fss │ └── 1.idx └── obs
15. Komunikace s brokerem NATS z Pythonu
V rámci navazujících kapitol si ukážeme, jakým způsobem lze realizovat komunikaci s NATS z Pythonu. V úvodních dvou článcích jsme se zaměřili na jazyk Go, ovšem jak uvidíme dále, je realizace producentů i konzumentů zpráv v Pythonu ještě nepatrně jednodušší, zejména v případě, že využijeme možnosti standardní knihovny asyncio a nových klíčových slov async a await, které byly do Pythonu přidány právě pro podporu práce s korutinami a obecně asynchronně běžícími úlohami.
Nejprve musíme nainstalovat knihovnu, v níž je realizováno rozhraní (connector) k NATSu. Instalace této knihovny je snadná, neboť je dostupná přímo na PyPi:
$ pip3 install nats-py Defaulting to user installation because normal site-packages is not writeable Collecting nats-py Downloading nats-py-2.1.7.tar.gz (45 kB) |████████████████████████████████| 45 kB 952 kB/s Preparing metadata (setup.py) ... done Building wheels for collected packages: nats-py Building wheel for nats-py (setup.py) ... done Created wheel for nats-py: filename=nats_py-2.1.7-py3-none-any.whl size=55717 sha256=beedaa208a24cc31490abf3966285f43cf7566967c5cb5be26999133e0f42f28 Stored in directory: /home/ptisnovs/.cache/pip/wheels/b3/c2/1d/cd05fddafb2d9960a8a748a3cfd4ab1b997cac79bd059a7ba3 Successfully built nats-py Installing collected packages: nats-py Successfully installed nats-py-2.1.7
Krátké otestování, zda je knihovna skutečně nainstalována:
$ python3 Python 3.8.10 (default, Jun 22 2022, 20:18:18) [GCC 9.4.0] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import nats >>> help("nats") Help on package nats: NAME nats DESCRIPTION # Copyright 2016-2021 The NATS Authors # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ... ... ...
16. Připojení k NATS serveru z Pythonu
Podívejme se nyní na způsob realizace připojení k NATS serveru ze skriptu, který je naprogramován v Pythonu. Tento skript používá výše uvedený balíček nats-py, jenž je založen na standardní knihovně asyncio. Samotné navázání spojení je realizováno korutinou (nikoli tedy standardní funkcí) nazvanou connect. Této korutině je nutné předat jak adresu počítače, na kterém NATS běží, ale i příslušný port, jméno uživatele a jeho heslo (poslední dvě hodnoty jsou vypsány samotným serverem při jeho spuštění, odkud je můžeme snadno zkopírovat). Pokud je spojení navázáno (tedy v případě, že nedojde k vyhození výjimky), lze spojení ukončit metodou/korutinou nazvanou natsConnection.close(). Vzhledem k tomu, že se volají asynchronně běžící korutiny, musí být samotná funkce main realizována taktéž korutinou a zavolána přes asyncio.run(main()):
import asyncio import nats URL = "192.168.1.34:44853" NATS_USERNAME = "local" NATS_PASSWORD = "--password--" async def main(): print(f"Connecting to NATS at address {URL}") nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD) print(f"Connected...{nats_connection}") print("Closing connection") await nats_connection.close() print("Connection closed") if __name__ == '__main__': asyncio.run(main())
Vyzkoušejme si nyní použití tohoto skriptu, pochopitelně po zadání korektního hesla:
$ python3 nats_connection.py Connecting to NATS at address 192.168.1.34:44853 Connected...<nats client v2.1.7> Closing connection Connection closed
17. Producenti a konzumenti zpráv založení na komunikační strategii PUB-SUB
Ukažme si nyní, jak snadno je možné v Pythonu realizovat producenty a konzumenty zpráv posílaných a přijímaných s využitím komunikační strategie PUB-SUB. Nejdříve si ukažme producenta zpráv, který je až triviálně jednoduchý – zprávu po navázání spojení s NATS pošleme korutinou nazvanou publish do vybraného tématu:
import asyncio import nats URL = "192.168.1.34:44853" NATS_USERNAME = "local" NATS_PASSWORD = "--password--" SUBJECT_NAME = "foo" async def main(): print(f"Connecting to NATS at address {URL}") nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD) print("Connected...") print("Publishing message") await nats_connection.publish(SUBJECT_NAME, b"Hello from Python!") print("Published") print("Closing connection") await nats_connection.close() print("Connection closed") if __name__ == '__main__': asyncio.run(main())
Konzument zpráv je jen nepatrně složitější, neboť zprávy se přijímají v nekonečné smyčce (to pro jednoduchost, jsou totiž i další způsoby, které například zajistí odpojení od NATS). Povšimněte si, že konzument se musí nejprve přihlásit k odebírání tématu a teprve poté může zprávy přijímat:
import asyncio import nats URL = "192.168.1.34:44853" NATS_USERNAME = "local" NATS_PASSWORD = "--password--" SUBJECT_NAME = "foo" async def main(): print(f"Connecting to NATS at address {URL}") nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD) print("Connected...") print(f"Subscribing to stream with name '{SUBJECT_NAME}'") sub = await nats_connection.subscribe(SUBJECT_NAME) print("Subscribed") print("Waiting for messages") while True: message = await sub.next_msg(timeout=100) print("Received:", message) await nc.close() if __name__ == '__main__': asyncio.run(main())
18. Producenti a konzumenti zpráv založení na využití stream processingu
Zajímavější bude zjistit, jakým způsobem lze realizovat producenty a konzumenty v případě, kdy budeme chtít zprávy posílat do streamu a posléze je ze streamu číst, resp. „přehrávat“. V prvním příkladu je ukázán způsob přečtení informací o vybraných streamech:
import asyncio import nats from pprint import pprint URL = "192.168.1.34:44853" NATS_USERNAME = "local" NATS_PASSWORD = "--password--" STREAM_NAMES = ["foo", "bar", "baz"] async def main(): print(f"Connecting to NATS at address {URL}") nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD) print("Connected...") print("Retrieving JetStream object") jet_stream = nats_connection.jetstream() print(f"Retrieved {jet_stream}") for stream_name in STREAM_NAMES: print(f"Stream info for stream {stream_name}:") print("-" * 40) stream_info = await jet_stream.stream_info(stream_name) pprint(stream_info.__dict__) print() print("Closing connection") await nats_connection.close() print("Connection closed") if __name__ == '__main__': asyncio.run(main())
Ve skutečnosti na mém NATS serveru existují pouze streamy „foo“ a „bar“, nikoli již stream „baz“, což znamená, že při zjišťování informací o neexistujícím streamu dojde k vyhození výjimky:
Connecting to NATS at address 192.168.1.34:44853 Connected... Retrieving JetStream object Retrieved <nats.js.client.JetStreamContext object at 0x7faf51cb0940> Stream info for stream foo: ---------------------------------------- {'cluster': None, 'config': StreamConfig(name='foo', description=None, subjects=['foo'], retention='limits', max_consumers=-1, max_msgs=-1, max_bytes=-1, discard='old', max_age=0.0, max_msgs_per_subject=-1, max_msg_size=-1, storage='memory', num_replicas=1, no_ack=False, template_owner=None, duplicate_window=120000000000, placement=None, mirror=None, sources=None, sealed=False, deny_delete=False, deny_purge=False, allow_rollup_hdrs=False), 'did_create': None, 'mirror': None, 'sources': None, 'state': StreamState(messages=0, bytes=0, first_seq=0, last_seq=0, consumer_count=0, deleted=None, num_deleted=None, lost=None)} Stream info for stream bar: ---------------------------------------- {'cluster': None, 'config': StreamConfig(name='bar', description=None, subjects=['bar'], retention='limits', max_consumers=-1, max_msgs=-1, max_bytes=-1, discard='old', max_age=0.0, max_msgs_per_subject=-1, max_msg_size=-1, storage='file', num_replicas=1, no_ack=False, template_owner=None, duplicate_window=120000000000, placement=None, mirror=None, sources=None, sealed=False, deny_delete=False, deny_purge=False, allow_rollup_hdrs=False), 'did_create': None, 'mirror': None, 'sources': None, 'state': StreamState(messages=0, bytes=0, first_seq=0, last_seq=0, consumer_count=0, deleted=None, num_deleted=None, lost=None)} Stream info for stream baz: ---------------------------------------- Traceback (most recent call last): File "stream_info.py", line 37, in asyncio.run(main()) File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run return loop.run_until_complete(main) File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete return future.result() File "stream_info.py", line 27, in main stream_info = await jet_stream.stream_info(stream_name) File "/home/ptisnovs/.local/lib/python3.8/site-packages/nats/js/manager.py", line 71, in stream_info resp = await self._api_request( File "/home/ptisnovs/.local/lib/python3.8/site-packages/nats/js/manager.py", line 267, in _api_request raise APIError.from_error(resp['error']) File "/home/ptisnovs/.local/lib/python3.8/site-packages/nats/js/errors.py", line 84, in from_error raise NotFoundError(**err) nats.js.errors.NotFoundError: nats: NotFoundError: code=404 err_code=10059 description='stream not found'
Publikace zpráv do zvoleného tématu (subject) je opět přímočará, jak je to ostatně ukázáno v následujícím demonstračním příkladu. Pouze namísto korutiny nats_connection.publish zavoláme korutinu jet_stream.publish. Povšimněte si, že zpráva je reprezentována polem bajtů a může tedy obsahovat jakékoli hodnoty:
import asyncio import nats URL = "192.168.1.34:44853" NATS_USERNAME = "local" NATS_PASSWORD = "--password--" SUBJECT_NAME = "bar" async def main(): print(f"Connecting to NATS at address {URL}") nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD) print("Connected...") print("Retrieving JetStream object") jet_stream = nats_connection.jetstream() print(f"Retrieved {jet_stream}") print("Publishing message to stream") ack = await jet_stream.publish(f"{SUBJECT_NAME}", b"Hello from Python!") print(f"Published and acked: stream={ack.stream}, sequence={ack.seq}") print("Closing connection") await nats_connection.close() print("Connection closed") if __name__ == '__main__': asyncio.run(main())
A konečně se podívejme na konzumenta zpráv uložených do streamu a dostupných přes zvolené téma. Tento demonstrační příklad je založen na smyčce, ve které se volá korutina next_msg, přičemž se při přijetí zprávy zavolá korutina on_message (jedná se tedy opět o asynchronní kód):
import asyncio import nats URL = "192.168.1.34:44853" NATS_USERNAME = "local" NATS_PASSWORD = "--password--" SUBJECT_NAME = "bar" async def on_message(message): print(f"Received message {message}") await message.ack() async def main(): print(f"Connecting to NATS at address {URL}") nats_connection = await nats.connect(URL, user=NATS_USERNAME, password=NATS_PASSWORD) print("Connected...") print("Retrieving JetStream object") jet_stream = nats_connection.jetstream() print(f"Retrieved {jet_stream}") print("Waiting for messages") sub = await jet_stream.subscribe(SUBJECT_NAME, cb=on_message) while True: await sub.next_msg(timeout=1) print("Closing connection") await nats_connection.close() print("Connection closed") if __name__ == '__main__': asyncio.run(main())
19. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.
Demonstrační příklady uvedené v následující tabulce jsme si již popsali v článcích Komunikace s message brokery z programovacího jazyka Go, Použití message brokeru NATS a NATS Streaming Server. Tyto příklady jsou naprogramovány v jazyku Go a (alespoň prozatím) nepokrývají technologii JetStream.
20. Odkazy na Internetu
- NATS
https://nats.io/about/ - JetStream
https://docs.nats.io/nats-concepts/jetstream - NATS Introduction
https://nats.io/documentation/ - NATS Protocols
https://docs.nats.io/legacy/stan/streaming/protocol#protocols - NATS Server Clients
https://docs.nats.io/running-a-nats-service/clients - NATS Messaging (Wikipedia)
https://en.wikipedia.org/wiki/NATS_Messaging - NATS Clients
https://nats.io/download/#nats-clients - NATS JetStream Enabled Clients
https://nats.io/download/#nats-jetstream-enabled-clients - Move over Kafka! Let's try NATS JetStream
https://www.youtube.com/watch?v=EJJ2SG-cKyM - NATS Jetstream Technical Overview
https://www.youtube.com/watch?v=w8fc44SmJDw - Using NATS JetStream with Go
https://www.youtube.com/watch?v=uZ4fzg_eqSw - The coolest OSS project you've never heard of: NATS Getting started!
https://www.youtube.com/watch?v=hjXIUPZ7ArM - Komunikace s message brokery z programovacího jazyka Go
https://www.root.cz/clanky/komunikace-s-message-brokery-z-programovaciho-jazyka-go/ - Použití message brokeru NATS
https://www.root.cz/clanky/pouziti-message-brokeru-nats/ - NATS Streaming Server
https://www.root.cz/clanky/nats-streaming-server/ - Microservices: The Rise Of Kafka
https://movio.co/blog/microservices-rise-kafka/ - Building a Microservices Ecosystem with Kafka Streams and KSQL
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/ - An introduction to Apache Kafka and microservices communication
https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63 - kappa-architecture.com
http://milinda.pathirage.org/kappa-architecture.com/ - Questioning the Lambda Architecture
https://www.oreilly.com/ideas/questioning-the-lambda-architecture - Lambda architecture
https://en.wikipedia.org/wiki/Lambda_architecture - Apache Kafka: distribuovaná streamovací platforma
https://www.root.cz/clanky/apache-kafka-distribuovana-streamovaci-platforma/