Hlavní navigace

Použití message brokeru NATS

21. 3. 2019
Doba čtení: 34 minut

Sdílet

 Autor: NATS Project
V dnešní části seriálu o message brokerech se seznámíme se základními možnostmi nabízenými systémem NATS. Ukážeme si jak instalaci a spuštění serveru NATS, tak i komunikaci s ním s využitím klientů naprogramovaných v jazyce Go.

Obsah

1. Použití message brokeru NATS

2. Instalace serveru systému NATS

3. Instalace rozhraní NATSu pro aplikace vytvořené v programovacím jazyku Go

4. Nejjednodušší forma producenta zpráv

5. Malá odbočka – standardní balíček sync

6. Konzument zpráv

7. Vylepšení producenta i konzumenta zpráv

8. Automatické odhlášení konzumenta po přijetí zadaného množství zpráv

9. Využití kanálů při posílání zpráv na straně producenta

10. Využití kanálů při příjmu zpráv na straně konzumenta

11. Implementace jednoduchého komunikačního protokolu

12. Komunikace přes dva kanály: producent zpráv

13. Komunikace přes dva kanály: konzument zpráv

14. Rozdělování zpráv mezi příjemce patřící do stejné skupiny (Queueing)

15. Projekt používající queueing

16. Chování systému po připojení pětice příjemců zpráv

17. Obsah následující části seriálu

18. Repositář s demonstračními příklady

19. Odkazy na předchozí části seriálu

20. Odkazy na Internetu

1. Použití message brokeru NATS

V předchozí části seriálu o message brokerech jsme se ve stručnosti seznámili s projektem NATS. Připomeňme si, že se jedná o poměrně úspěšný a často nasazovaný projekt, jenž je vyvinut v programovacím jazyku Go, což mu do jisté míry zajišťuje stabilitu i škálovatelnost. To jsou vlastnosti, které u message brokerů většinou očekáváme.

Také jsme si řekli, že se systém NATS skládá z několika komponent:

  1. V první řadě se jedná o samotný server, jenž se spouští příkazem gnatsd. Server je naprogramovaný v Go a při jeho vývoji bylo dbáno na to, aby byla zaručena vysoká dostupnost celé služby a přitom byla samotná služba s běžícím serverem málo náročná na systémové zdroje, především na spotřebu operační paměti (to má v době Dockeru a podobných nástrojů poměrně velký význam). Způsobem instalace a spuštění serveru se budeme zabývat hned v navazující kapitole; ostatně ve skutečnosti se nejedná o nijak složitou operaci.
  2. Dalším typem komponenty jsou programátorská rozhraní pro klienty, která v současnosti existují pro několik ekosystémů (což je většinou kombinace programovacího jazyka, knihoven a popř. jeho virtuálního stroje); viz též tabulky s podporovanými ekosystémy, které jsou zobrazeny pod tímto seznamem. Programátorské rozhraní určené pro jazyk Go bude popsáno ve třetí kapitole i v kapitolách navazujících.
  3. Třetí komponentou je NATS Streaming Server, který je opět naprogramován v jazyce Go. Způsobem využití Streaming Serveru a vůbec teorií, na které streamování stojí, se budeme věnovat v dalším článku.
  4. Čtvrtým typem komponenty je takzvaný NATS Connector Framework zajišťující propojení systému NATS s dalšími technologiemi (XMPP, logování, notifikační služby aj.). Ten je naprogramovaný v Javě a v současnosti je podporován například konektor pro Redis (https://github.com/nats-io/nats-connector-redis). I této problematice bude věnován samostatný článek.

Oficiálně jsou podporována rozhraní pro následující ekosystémy:

# Programovací jazyk/ekosystém
1 C
2 C#
3 Elixir
4 Go
5 Java
6 NGINX
7 Node.js
8 Python Asyncio
9 Python Tornado
10 Ruby
11 TypeScript

2. Instalace serveru systému NATS

Předkompilovaná verze serveru NATS je dostupná na stránce https://nats.io/download/nats-io/gnatsd/.

Ovšem vzhledem k tomu, že si možnosti systému NATS nejprve popíšeme na klientských aplikacích naprogramovaných v jazyce Go, předpokládám, že máte tento jazyk (překladač, linker, základní balíčky a další nástroje) nainstalovaný způsobem, který byl popsán v tomto článku. V takovém případě je lepší server systému NATS nainstalovat naprosto stejným způsobem, jako jakýkoli jiný externí balíček jazyka Go. Nejprve přejdeme do adresáře, na nějž ukazuje proměnná GOPATH, což typicky bývá adresář ~/go:

$ cd $GOPATH

Dále spustíme příkaz pro nainstalování balíčku, který obsahuje server NATSu:

$ go get github.com/nats-io/gnatsd

Po instalaci by měla adresářová struktura začínající adresářem $GOPATH obsahovat mj. i tyto podadresáře:

├── pkg
│   └── linux_amd64
│       └── github.com
│           └── nats-io
├── src
│   └── github.com
│       └── nats-io
│           ├── gnatsd
│           ├──
│           ├──
│           └──

Nejdůležitější však je, že se do podadresáře $GOPATH/bin nainstaloval spustitelný soubor gnatsd, který obsahuje celý NATS server. Můžeme si tedy otestovat jeho základní funkcionalitu. Nejprve přejdeme do adresáře, na který ukazuje proměnná prostředí $GOPATH/bin:

$ cd ~/go/bin
Poznámka: alternativně si můžete $GOPATH/bin přidat do proměnné PATH, takže spouštění všech binárních souborů vytvořených překladačem Go bude snadnější.

Poslední verze serveru NATS by měla být 2.0.0-RC-5, což si můžeme snadno ověřit:

$ ./gnatsd --version
 
nats-server: v2.0.0-RC5

K dispozici je samozřejmě nápověda se všemi podporovanými volbami, které lze předat přes příkazový řádek:

$ ./gnatsd --help
 
Usage: gnatsd [options]
 
Server Options:
    -a, --addr <host>                Bind to host address (default: 0.0.0.0)
    -p, --port <port>                Use port for clients (default: 4222)
    -P, --pid <file>                 File to store PID
    -m, --http_port <port>           Use port for http monitoring
    -ms,--https_port <port>          Use port for https monitoring
    -c, --config <file>              Configuration file
    -sl,--signal <signal>[=<pid>]    Send signal to gnatsd process (stop, quit, reopen, reload)
 
...
...
...

V tom nejjednodušším případě můžeme nechat server spuštěný s výchozími parametry:

$ ./gnatsd
 
[3918] 2019/03/16 16:40:26.554289 [INF] Starting nats-server version 2.0.0-RC5
[3918] 2019/03/16 16:40:26.554399 [INF] Git commit [not set]
[3918] 2019/03/16 16:40:26.554673 [INF] Listening for client connections on 0.0.0.0:4222
[3918] 2019/03/16 16:40:26.554707 [INF] Server id is NALRV2K2U77VDPICK3NM3XQV7TY4DM5UVG6FT6IJQWACUW3WA2BRM5SM
[3918] 2019/03/16 16:40:26.554726 [INF] Server is ready
Poznámka: server NATSu si prozatím nechte spuštěný na pozadí popř. na jiném terminálu, protože ho budeme volat ve všech dnešních demonstračních příkladech.

3. Instalace rozhraní NATSu pro aplikace vytvořené v programovacím jazyku Go

V úvodní kapitole jsme si řekli, že existuje hned několik oficiálně podporovaných rozhraní NATSu pro klienty psané v různých programovacích jazycích popř. ekosystémech. Dnes si ukážeme použití rozhraní pro klienty naprogramované v jazyce Go. Samotná instalace tohoto rozhraní se prakticky nijak neliší od samotné instalace serveru, pouze se (logicky) použije odlišný repositář s balíčkem:

$ cd $GOPATH
 
$ go get github.com/nats-io/go-nats

Po instalaci by měla adresářová struktura začínající adresářem $GOPATH obsahovat mj. tyto podadresáře:

├── pkg
│   └── linux_amd64
│       └── github.com
│           └── nats-io
├── src
│   └── github.com
│       └── nats-io
│           ├── gnatsd
│           ├── go-nats
│           ├──
│           └──

Ve všech dalších příkladech tedy bude import balíčku nazvaného „nats“ vypadat následovně (jméno balíčku je skutečně „nats“ a nikoli „go-nats“; to ostatně v Go není platný identifikátor):

import "github.com/nats-io/go-nats"

Popř. si můžeme explicitně zvolit i jmenný alias nats (což budeme důsledně používat v demonstračních příkladech):

import nats "github.com/nats-io/go-nats"
Poznámka: v tomto případě se skutečně nainstaluje pouze Go balíček; žádný spustitelný soubor v podadresáři bin nevznikne, na rozdíl od serveru.

4. Nejjednodušší forma producenta zpráv

Vytváření (resp. přesněji řečeno publikování) zpráv s jejich posláním na server systému NATS je relativně přímočaré. Nejprve se musíme připojit k serveru, což zajišťuje funkce pojmenovaná Connect, které se předá URL serveru a výslednými hodnotami je buď struktura Conn představující připojení k serveru popř. objekt s popisem chyby, pokud se připojení z nějakého důvodu nepodařilo:

func Connect(url string, options ...Option) (*Conn, error)

Pro připojení k lokálně běžícímu serveru NATS můžeme použít konstantu nazvanou DefaultURL, která ve skutečnosti obsahuje tento řetězec:

"nats://localhost:4222"

Vlastní připojení bez kontroly případných chyb tedy může vypadat takto:

conn, _ := nats.Connect(nats.DefaultURL)

Jakmile je připojení navázáno, je možné zprávu poslat s využitím těchto metod datového typu Conn:

func (nc *Conn) Publish(subj string, data []byte) error
func (nc *Conn) PublishMsg(m *Msg) error
func (nc *Conn) PublishRequest(subj, reply string, data []byte) error

Nejjednodušší je použití první metody, která umožňuje poslat libovolnou sekvenci bajtů s určitým tématem (topic, subject). Převod řetězce na pole/řez bajtů známe, takže:

conn.Publish(Subject, []byte("Hello World"))

Na konci pro jistotu zavoláme i metodu Flush, která zajistí skutečné poslání zprávy před odpojením klienta:

conn.Flush()

Celý zdrojový kód producenta, který po svém spuštění pošle jednu zprávu s nastaveným tématem, vypadá takto:

package main
 
import nats "github.com/nats-io/go-nats"
 
const Subject = "test1"
 
func main() {
        conn, _ := nats.Connect(nats.DefaultURL)
 
        conn.Publish(Subject, []byte("Hello World"))
 
        conn.Flush()
}

Pokud tento příklad spustíte příkazem go run publisher.go, bude zpráva poslána serveru NATS, který se tuto zprávu bude snažit doručit případnému příjemci či více příjemcům (jejich počet není prakticky omezen). Ovšem v případě, že žádný příjemce nebude v daný okamžik připojen, bude zpráva zahozena, což je ovšem výchozí (a žádoucí) chování komunikační strategie typu pub-sub.

Poznámka: předchozí příklad neprovádí korektní ukončení připojení ani nekontroluje případné chyby, ke kterým může dojít. Tento nedostatek odstraníme v rámci následujících kapitol.

5. Malá odbočka – standardní balíček sync

Příjemce zpráv je realizován callback funkcí, která je asynchronně zavolána v samostatné gorutině ve chvíli, kdy je přijata nějaká zpráva. Registrace takové callback funkce (zde se jedná o funkci anonymní) může vypadat například takto:

conn.Subscribe(Subject, func(m *nats.Msg) {
        fmt.Printf("Received a message: %s\n", string(m.Data))
})

Problém ovšem nastává v tom, že pokud dojde k ukončení funkce main, skončí i běh celého programu. Musíme tedy zajistit, aby se na konci funkce main čekalo na příjem zprávy. K tomuto účelu je možné použít možnosti nabízené standardním balíčkem sync.

Využijeme strukturu WaitGroup sloužící pro čekání na další gorutiny, přesněji řečeno na přesně stanovený počet gorutin. Při čekání na jedinou gorutinu použijeme konstrukci:

wg := sync.WaitGroup{}
wg.Add(1)
...
...
...
spuštění gorutiny, která má přístup k proměnné wg
...
...
...
wg.Wait() // zde se běh zastaví, čeká se na wg.Done()

Samotná gorutina musí zavolat wg.Done, čímž se sníží počitadlo synchronizační struktury a jakmile se dojde k nule, je metoda wg.Wait ukončena.

6. Konzument zpráv

Nyní již máme k dispozici všechny informace, které jsou zapotřebí pro implementaci jednoduchého konzumenta zpráv. Nejprve se opět musíme připojit k NATS serveru funkcí Connect, kterou jsme si již popsali výše:

conn, _ := nats.Connect(nats.DefaultURL)

Následně vytvoříme synchronizační strukturu, která bude při zavolání metody Wait čekat na jedno volání metody Done (pochopitelně bude toto volání provedeno v jiné gorutině):

wg := sync.WaitGroup{}
wg.Add(1)

Nyní se již klient může přihlásit k odebrání zprávy s nastaveným tématem. Ve chvíli, kdy bude zpráva přijata, zavolá se specifikovaná anonymní funkce, která je druhým parametrem metody Subscribe:

conn.Subscribe(Subject, func(m *nats.Msg) {
        fmt.Printf("Received a message: %s\n", string(m.Data))
        wg.Done()
})

Povšimněte si, že po přijetí zprávy skutečně zavoláme metodu Done, čímž se sníží interní počitadlo v synchronizační struktuře.

Poslední volání ve funkci main zajistí čekání na příjem (jediné) zprávy:

wg.Wait()

Úplný zdrojový kód konzumenta, který po svém spuštění přijme jedinou zprávu s nastaveným tématem, vypadá následovně:

package main
 
import (
        "fmt"
        nats "github.com/nats-io/go-nats"
        "sync"
)
 
const Subject = "test1"
 
func main() {
        conn, _ := nats.Connect(nats.DefaultURL)
 
        wg := sync.WaitGroup{}
        wg.Add(1)
 
        conn.Subscribe(Subject, func(m *nats.Msg) {
                fmt.Printf("Received a message: %s\n", string(m.Data))
                wg.Done()
        })
        wg.Wait()
}

V této chvíli si můžeme zkusit ve dvou terminálech příjemce zpráv spustit:

$ go run subscriber.go

Oba příjemci by měli vyčkávat na doručení první zprávy.

V okamžiku, kdy se spustí producent zpráv:

$ go run publisher.go

…dostanou oba příjemci tu stejnou zprávu a automaticky se ukončí.

7. Vylepšení producenta i konzumenta zpráv

Naše první implementace producenta a konzumenta zcela jistě nebyla připravena pro produkční nasazení. Je tomu tak mj. i z toho důvodu, že nedocházelo ke korektní detekci chyb při připojení i při posílání zpráv a taktéž proto, že se spojení s NATS serverem automaticky neuzavíralo. Oba problémy můžeme snadno napravit:

  1. Zkontrolujeme chybovou hodnotu vrácenou funkcí Connect
  2. Blokem defer zajistíme, že se připojení vždy uzavře
  3. Zkontrolujeme chybovou hodnotu vrácenou metodou Connection.Publish
  4. U konzumenta zajistíme automatické odhlášení zavoláním metody sub.Unsubscribe

Výsledkem je upravený kód producenta:

package main
 
import (
        nats "github.com/nats-io/go-nats"
        "log"
)
 
const Subject = "test1"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        println("Connected")
 
        err2 := conn.Publish(Subject, []byte("Hello World"))
 
        if err2 != nil {
                log.Fatal(err2)
        }
 
        conn.Flush()
 
        println("Message sent")
}

Podobně je upraven i konzument/příjemce zpráv:

package main
 
import (
        "fmt"
        nats "github.com/nats-io/go-nats"
        "log"
        "sync"
)
 
const Subject = "test1"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        wg := sync.WaitGroup{}
        wg.Add(1)
 
        sub, err2 := conn.Subscribe(Subject, func(m *nats.Msg) {
                fmt.Printf("Received a message: %s\n", string(m.Data))
                wg.Done()
        })
 
        if err2 != nil {
                log.Fatal(err2)
        }
 
        println("Subscribed", sub)
 
        wg.Wait()
 
        println("Finished waiting for message")
 
        err3 := sub.Unsubscribe()
        if err3 != nil {
                log.Fatal(err3)
        }
 
        println("Unsubscribed")
}

8. Automatické odhlášení konzumenta po přijetí zadaného množství zpráv

V této kapitole si ukážeme, jak je možné implementovat automatické odhlášení konzumenta po přijetí předem známého množství zpráv.

Nejprve bude uveden zdrojový kód producenta zpráv, po jehož spuštění se do message brokera pošle deset zpráv s tématem nastaveným na „test1“:

package main
 
import (
        "fmt"
        nats "github.com/nats-io/go-nats"
        "log"
)
 
const Subject = "test1"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        println("Connected")
 
        for i := 1; i < 10; i++ {
                message := fmt.Sprintf("Hello World #%d", i)
                err2 := conn.Publish(Subject, []byte(message))
                println("Published", message)
 
                if err2 != nil {
                        log.Fatal(err2)
                }
 
                conn.Flush()
        }
 
        println("All messages sent")
}

Konzumenta, který se musí po přijetí určitého množství zpráv automaticky odpojit, je nutné na několika místech modifikovat. Nejprve nastavíme počitadlo pro synchronizační strukturu:

wg := sync.WaitGroup{}
wg.Add(5)

Dále po navázání připojení a přihlášení k odběru zpráv nastavíme maximální počet přijímaných zpráv s využitím metody AutoUnsubscribe, a to následujícím způsobem:

err3 := sub.AutoUnsubscribe(5)
 
if err3 != nil {
        log.Fatal(err3)
}

Další kód již může zůstat zachován – nemusíme nikde použít počítanou smyčku ani další podobné řídicí struktury.

Zdrojový kód takto upraveného konzumenta vypadá následovně:

package main
 
import (
        "fmt"
        nats "github.com/nats-io/go-nats"
        "log"
        "sync"
)
 
const Subject = "test1"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        wg := sync.WaitGroup{}
        wg.Add(5)
 
        sub, err2 := conn.Subscribe(Subject, func(m *nats.Msg) {
                fmt.Printf("Received a message: %s\n", string(m.Data))
                wg.Done()
        })
 
        if err2 != nil {
                log.Fatal(err2)
        }
 
        println("Subscribed", sub)
 
        err3 := sub.AutoUnsubscribe(5)
 
        if err3 != nil {
                log.Fatal(err3)
        }
 
        println("Automatic unsubscribe after 5 messages")
 
        wg.Wait()
 
        println("Finished waiting for messages")
}

9. Využití kanálů při posílání zpráv na straně producenta

V praxi se poměrně často setkáme s tím, že zprávy jsou odesílány, popř. naopak přijímány s využitím kanálů. Je to pochopitelné, neboť se jedná o velmi elegantní přístup využívající možnosti poskytované samotným jazykem.

Při posílání zpráv se kanál vytvoří (zjednodušeně) takto:

conn, _ := nats.Connect(nats.DefaultURL)
 
econn, _ := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
channel := make(chan string)
econn.BindSendChan(Subject, channel)

Musíme tedy nejdříve specifikovat objekt používaný při kódování zpráv ze vstupního formátu (řetězec, JSON, …) do pole bajtů a následně již můžeme vytvořit a zaregistrovat kanál daného typu.

Samotné poslání zprávy je realizováno posláním řetězce do kanálu:

channel <- "Hello World #1"
channel <- "Hello World #2"
channel <- "Hello World #3"

Upravený producent zpráv tedy může vypadat následovně:

package main
 
import (
        nats "github.com/nats-io/go-nats"
        "log"
)
 
const Subject = "test1"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        println("Connected")
 
        econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
        if err2 != nil {
                log.Fatal(err)
        }
 
        defer econn.Close()
 
        channel := make(chan string)
        econn.BindSendChan(Subject, channel)
 
        println("Channel created")
 
        channel <- "Hello World #1"
        channel <- "Hello World #2"
        channel <- "Hello World #3"
 
        println("All messages sent")
}

10. Využití kanálů při příjmu zpráv na straně konzumenta

Podobným způsobem je realizován i konzument zpráv, který se tak mohl značně zjednodušit (odpadla například potřeba práce se synchronizační strukturou atd.). Následující implementace konzumenta zpráv po svém spuštění přijme trojici zpráv (a to opět přes kanál) a následně se ukončí:

package main
 
import (
        nats "github.com/nats-io/go-nats"
        "log"
)
 
const Subject = "test1"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
        if err2 != nil {
                log.Fatal(err)
        }
 
        defer econn.Close()
 
        channel := make(chan string)
        econn.BindRecvChan(Subject, channel)
 
        println("Channel created")
 
        println(<-channel)
        println(<-channel)
        println(<-channel)
}

11. Implementace jednoduchého komunikačního protokolu

V některých typech aplikací se přes message brokera neposílají pouze „obyčejné“ zprávy obsahující data, ale i zprávy s příkazy, které nějakým způsobem ovlivňují či řídí příjemce zpráv. Můžeme tak vytvořit libovolně jednoduchý či naopak komplikovaný komunikační a řídicí protokol. Ukažme si nyní velmi primitivní implementaci takového protokolu, v němž se bude rozeznávat pouze jediný příkaz „EXIT“, jímž se ukončí všichni příjemci zprávy. Zprávy s jiným obsahem budou považovány za datové zprávy.

Poznámka: toto řešení je samozřejmě tak triviální, že může vést k nesprávnému chování, protože rozlišení významu zprávy je provedeno na základě jejího obsahu a nikoli (například) tématu či dalších příznaků posílaných v hlavičce. Nicméně se jedná pouze o první přiblížení.

Samotná implementace zdroje zpráv se nebude žádným zásadním způsobem odlišovat od již známých příkladů. Pouze nesmíme zapomenout poslat zprávu s tělem „EXIT“, aby ji dostali všichni připojení příjemci:

channel <- "Hello World #1"
channel <- "Hello World #2"
channel <- "Hello World #3"
channel <- "EXIT"

Celá implementace zdroje zpráv bude vypadat následovně:

package main
 
import (
        nats "github.com/nats-io/go-nats"
        "log"
)
 
const Subject = "test1"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        println("Connected")
 
        econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
        if err2 != nil {
                log.Fatal(err)
        }
 
        defer econn.Close()
 
        channel := make(chan string)
        econn.BindSendChan(Subject, channel)
 
        println("Channel created")
 
        channel <- "Hello World #1"
        channel <- "Hello World #2"
        channel <- "Hello World #3"
        channel <- "EXIT"
 
        conn.Flush()
 
        println("All messages sent")
}

Příjemce zprávy bude muset být upraven takovým způsobem, aby dokázat rozeznat tělo zprávy s textem „EXIT“ a po jejím přijetí se ukončil. O uzavření připojení se postará kód deklarovaný v blocích defer:

for {
        message := <-channel
        println(message)
        if message == "EXIT" {
                println("Received EXIT message...")
                break
        }
}

Samozřejmě si opět ukážeme úplnou implementaci příjemce zpráv, která může vypadat následovně:

package main
 
import (
        nats "github.com/nats-io/go-nats"
        "log"
)
 
const Subject = "test1"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
        if err2 != nil {
                log.Fatal(err)
        }
 
        defer econn.Close()
 
        channel := make(chan string)
        econn.BindRecvChan(Subject, channel)
 
        println("Channel created")
 
        for {
                message := <-channel
                println(message)
                if message == "EXIT" {
                        println("Received EXIT message...")
                        break
                }
        }
}

12. Komunikace přes dva kanály: producent zpráv

Protokol je možné vylepšit oddělením datových zpráv od zpráv, které slouží k řízení klienta (vzdáleně to připomíná FTP). V následujícím příkladu je použita dvojice témat nazvaných „test1“ a „test2“, přičemž zprávy s prvním tématem budou datové a zprávy s tématem druhým řídicí. Pro posílání zpráv jsou otevřeny dva oddělené kanály data_channel a control_channel:

package main
 
import (
        nats "github.com/nats-io/go-nats"
        "log"
        "time"
)
 
const Subject = "test1"
const Control = "test2"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        println("Connected")
 
        econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
        if err2 != nil {
                log.Fatal(err2)
        }
 
        defer econn.Close()
 
        cconn, err3 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
        if err3 != nil {
                log.Fatal(err3)
        }
 
        defer cconn.Close()
 
        data_channel := make(chan string)
        econn.BindSendChan(Subject, data_channel)
 
        println("Data channel created")
 
        control_channel := make(chan string)
        cconn.BindSendChan(Control, control_channel)
 
        println("Control channel created")
 
        data_channel <- "Hello World #1"
        data_channel <- "Hello World #2"
        data_channel <- "Hello World #3"
        data_channel <- "EXIT"
        time.Sleep(2 * time.Second)
        control_channel <- "EXIT"
 
        conn.Flush()
 
        println("All messages sent")
}

13. Komunikace přes dva kanály: konzument zpráv

Klient přijímající zprávy se dvěma tématy je již implementován složitějším způsobem, protože je zde nutné využít programovou konstrukci select-case, která vybere ten kanál, v němž jsou k dispozici data (popř. vybere náhodný kanál, pokud jsou data přístupná ve více kanálech). Při příjmu řídicí zprávy se smyčka ukončí:

MESSAGE_LOOP:
        for {
                select {
                case message := <-data_channel:
                        println("Received data message", message)
                case control := <-control_channel:
                        println("Received control message", control)
                        if control == "EXIT" {
                                break MESSAGE_LOOP
                        }
                }
                println("--------")
        }

Zdrojový kód takto upraveného klienta vypadá následovně:

package main
 
import (
        nats "github.com/nats-io/go-nats"
        "log"
)
 
const Subject = "test1"
const Control = "test2"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
        if err2 != nil {
                log.Fatal(err2)
        }
 
        defer econn.Close()
 
        cconn, err3 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
        if err3 != nil {
                log.Fatal(err3)
        }
 
        defer cconn.Close()
 
        data_channel := make(chan string)
        econn.BindRecvChan(Subject, data_channel)
 
        println("Data channel created")
 
        control_channel := make(chan string)
        cconn.BindRecvChan(Control, control_channel)
 
        println("Control channel created")
 
MESSAGE_LOOP:
        for {
                select {
                case message := <-data_channel:
                        println("Received data message", message)
                case control := <-control_channel:
                        println("Received control message", control)
                        if control == "EXIT" {
                                break MESSAGE_LOOP
                        }
                }
                println("--------")
        }
}

14. Rozdělování zpráv mezi příjemce patřící do stejné skupiny (Queueing)

V systému NATS se setkáme s pojmem queueing [1]. Nejedná se ovšem o skutečnou implementaci fronty v message brokerovi, ale o koncept, jakým jsou zprávy rozesílány příjemcům. V příkladech uvedených v předchozích kapitolách se používala klasická komunikační strategie nazvaná pub-sub, která je založena na tom, že zpráva, která je poslána do message brokera, je následně rozeslána všem příjemcům, kteří jsou v daný okamžik připojeni a přijímají zprávy se zvoleným tématem.

Pokud je tedy k message brokerovi připojeno deset příjemců zpráv s tématem (subject, topic) odpovídajícím poslané zprávě, bude zpráva přeposlána právě těmto deseti příjemcům. Alternativou je právě zmíněný queueing, jehož princip spočívá v tom, že se zpráva pošle pouze jedinému příjemci patřícímu do nějaké skupiny (queue group), a to na základě algoritmu, který se snaží o load balancing zpráv. To, do jaké skupiny příjemce patří, je řízeno pouze příjemcem, nikoli zdrojem zpráv (ten vlastně ani nemusí vědět, komu bude zpráva přeposlána).

Poznámka: velmi zjednodušeně řečeno se na queueing můžeme dívat jako na frontu, v níž stojí jednotliví příjemci zpráv. Zprávu vždy dostane příjemce, který je na začátku fronty, ihned po zpracování se zařadí zpět na konec fronty. Důležité je si uvědomit, že použití slova queue v tomto případě neznamená, že by message broker obsahoval přímou podporu pro (perzistentní) zprávy (to umí až NATS Streaming).

15. Projekt používající queueing

Ukažme si nyní, jakým způsobem je možné queueing otestovat v praxi. Nejdříve vytvoříme producenta zpráv, který po svém spuštění vytvoří a pošle přesně sto zpráv s tématem nastaveným na „test“. Zdrojový kód tohoto producenta vypadá následovně:

package main
 
import (
        "fmt"
        nats "github.com/nats-io/go-nats"
        "log"
)
 
const Subject = "test1"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        println("Connected")
 
        econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
        if err2 != nil {
                log.Fatal(err2)
        }
 
        defer econn.Close()
 
        channel := make(chan string)
        econn.BindSendChan(Subject, channel)
 
        println("Channel created")
 
        // poslat 100 zprav
        for i := 1; i < 100; i++ {
                message := fmt.Sprintf("Hello World #%d", i)
                channel <- message
                conn.Flush()
        }
 
        println("All messages sent")
}

V první implementaci konzumenta zpráv se použije metoda BindRecvQueueChan určená pro zaregistrování konzumenta k zadanému tématu společně se zařazením konzumenta do fronty nazvané „q1“:

const Queue = "q1"
...
...
...
channel := make(chan string)
econn.BindRecvQueueChan(Subject, Queue, channel)

Úplný zdrojový kód první varianty konzumenta zpráv:

package main
 
import (
        nats "github.com/nats-io/go-nats"
        "log"
)
 
const Subject = "test1"
const Queue = "q1"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
        if err2 != nil {
                log.Fatal(err2)
        }
 
        defer econn.Close()
 
        channel := make(chan string)
        econn.BindRecvQueueChan(Subject, Queue, channel)
 
        println("Channel created")
 
        for i := 1; i < 20; i++ {
                println(<-channel)
        }
}

Druhá implementace je prakticky totožná, ovšem používá odlišné jméno fronty:

const Queue = "X"
...
...
...
channel := make(chan string)
econn.BindRecvQueueChan(Subject, Queue, channel)

Úplný zdrojový kód druhé varianty konzumenta zpráv:

package main
 
import (
        nats "github.com/nats-io/go-nats"
        "log"
)
 
const Subject = "test1"
const Queue = "X"
 
func main() {
        conn, err := nats.Connect(nats.DefaultURL)
 
        if err != nil {
                log.Fatal(err)
        }
 
        defer conn.Close()
 
        econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER)
 
        if err2 != nil {
                log.Fatal(err2)
        }
 
        defer econn.Close()
 
        channel := make(chan string)
        econn.BindRecvQueueChan(Subject, Queue, channel)
 
        println("Channel created")
 
        for i := 1; i < 20; i++ {
                println(<-channel)
        }
}

16. Chování systému po připojení pětice příjemců zpráv

Chování systému se dvěma frontami konzumentů zpráv si můžeme relativně snadno otestovat. Použijeme k tomu následující skript, který spustí pět konzumentů zařazených do fronty „q1“ a jednoho konzumenta zařazeného do fronty „X“:

for i in `seq 1 5`
do
    nohup go run subscriber.go > subscriber${i}.log &
done
 
nohup go run subscriberX.go > subscriberX.log &

V jiném terminálu spustíme producenta 100 zpráv:

$ go run producent.go

Měla by vzniknout šestice souborů s informacemi o tom, které zprávy jaký konzument přijal. Pořadí zpráv sice nemusí být přesně zachováno, ovšem každý konzument by měl přijmout přesně dvacet zpráv.

Konzument #1 z fronty „q1“:

Channel created
Hello World #9
Hello World #19
Hello World #20
Hello World #34
Hello World #37
Hello World #40
Hello World #47
Hello World #64
Hello World #66
Hello World #73
Hello World #83
Hello World #93
Hello World #94
Hello World #95
Hello World #96
Hello World #97

Konzument #2 z fronty „q1“:

Channel created
Hello World #1
Hello World #3
Hello World #6
Hello World #10
Hello World #15
Hello World #18
Hello World #21
Hello World #24
Hello World #25
Hello World #43
Hello World #45
Hello World #46
Hello World #48
Hello World #51
Hello World #53
Hello World #54
Hello World #65
Hello World #68
Hello World #69

Konzument #3 z fronty „q1“:

Channel created
Hello World #7
Hello World #13
Hello World #16
Hello World #22
Hello World #42
Hello World #50
Hello World #52
Hello World #58
Hello World #67
Hello World #76
Hello World #77
Hello World #78
Hello World #82
Hello World #84
Hello World #85
Hello World #88
Hello World #89
Hello World #91
Hello World #98

Konzument #4 z fronty „q1“:

Channel created
Hello World #2
Hello World #11
Hello World #23
Hello World #27
Hello World #28
Hello World #29
Hello World #30
Hello World #31
Hello World #35
Hello World #36
Hello World #39
Hello World #49
Hello World #55
Hello World #57
Hello World #59
Hello World #63
Hello World #71
Hello World #74
Hello World #79

Konzument #5 z fronty „q1“:

Channel created
Hello World #4
Hello World #5
Hello World #8
Hello World #12
Hello World #14
Hello World #17
Hello World #26
Hello World #32
Hello World #33
Hello World #38
Hello World #41
Hello World #44
Hello World #56
Hello World #60
Hello World #61
Hello World #62
Hello World #70
Hello World #72
Hello World #75

Jediný konzument z fronty „X“:

Channel created
Hello World #1
Hello World #2
Hello World #3
Hello World #4
Hello World #5
Hello World #6
Hello World #7
Hello World #8
Hello World #9
Hello World #10
Hello World #11
Hello World #12
Hello World #13
Hello World #14
Hello World #15
Hello World #16
Hello World #17
Hello World #18
Hello World #19
Poznámka: vidíme, že prvních pět konzumentů si zprávy více méně spravedlivě rozdělilo, zatímco poslední konzument získal všechny zprávy v pořadí, v jakém byly poslány (mohl jich přijmout sto, ovšem po prvních dvaceti byl ukončen).

17. Obsah následující části seriálu

V navazující části tohoto seriálu si ukážeme některé další možnosti, které systém NATS vývojářům složitějších aplikací nabízí. Popíšeme si například komunikační strategii req-rep (request-response), která je řešena s využitím takzvaných inboxů a kterou lze v případě potřeby zkombinovat s výše popsaným queueingem (což není zdaleka triviální). Ovšem důležitější bude popis systému pojmenovaného NATS Streaming, protože se bude jednat o první podobný systém pro streamování zpráv, s nímž se v tomto seriálu podrobněji seznámíme (ze známějších systémů jmenujme například projekt Apache Kafka).

root_podpora

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.

Příklad Skript/kód Popis Cesta
1 publisher.go producent jediné zprávy s využitím strategie pub-sub https://github.com/tisnik/message-queues-examples/blob/master/go-nats/01-simple-pub-sub/publisher.go
1 subscriber.go příjemce/konzument jediné zprávy s využitím strategie pub-sub https://github.com/tisnik/message-queues-examples/blob/master/go-nats/01-simple-pub-sub/subscriber.go
       
2 publisher.go vylepšení prvního příkladu: lepší kontrola chyb a reakce na ně https://github.com/tisnik/message-queues-examples/blob/master/go-nats/02-better-pub-sub/publisher.go
2 subscriber.go vylepšení prvního příkladu: lepší kontrola chyb a reakce na ně https://github.com/tisnik/message-queues-examples/blob/master/go-nats/02-better-pub-sub/subscriber.go
       
3 publisher.go producent deseti zpráv https://github.com/tisnik/message-queues-examples/blob/master/go-nats/03-automatic-unsubscribe/publisher.go
3 subscriber.go příjemce zpráv, který se dokáže od message brokera automaticky odpojit po přijení N zpráv https://github.com/tisnik/message-queues-examples/blob/master/go-nats/03-automatic-unsubscribe/subscriber.go
       
4 publisher.go producent zpráv posílaných přes kanál https://github.com/tisnik/message-queues-examples/blob/master/go-nats/04-use-channels/publisher.go
4 subscriber.go konzument zpráv přijímaných přes kanál https://github.com/tisnik/message-queues-examples/blob/master/go-nats/04-use-channels/subscriber.go
       
5 publisher.go implementace jednoduchého komunikačního protokolu s využitím jediného kanálu https://github.com/tisnik/message-queues-examples/blob/master/go-nats/05-simple-protocol/publisher.go
5 subscriber.go dtto, strana příjemce zpráv https://github.com/tisnik/message-queues-examples/blob/master/go-nats/05-simple-protocol/subscriber.go
       
6 publisher.go implementace jednoduchého komunikačního protokolu s využitím dvou samostatných kanálů a dvou samostatných témat https://github.com/tisnik/message-queues-examples/blob/master/go-nats/06-two-channels/publisher.go
6 subscriber.go dtto, strana příjemce zpráv https://github.com/tisnik/message-queues-examples/blob/master/go-nats/06-two-channels/subscriber.go
       
7 publisher.go zdroj mnoha zpráv posílaných se stejným tématem https://github.com/tisnik/message-queues-examples/blob/master/go-nats/07-queueing/publisher.go
7 subscriber.go příjemce zpráv zařazený do fronty příjemců https://github.com/tisnik/message-queues-examples/blob/master/go-nats/07-queueing/subscriber.go
7 subscriberX.go další příjemce zpráv, ovšem zařazený do odlišné fronty https://github.com/tisnik/message-queues-examples/blob/master/go-nats/07-queueing/subscriberX.go
7 run_subscribers.sh skript pro spuštění několika konzumentů zpráv a shrnutí výsledků https://github.com/tisnik/message-queues-examples/blob/master/go-nats/07-queueing/run_subscribers.sh

19. Odkazy na předchozí části seriálu

V této kapitole jsou uvedeny odkazy na všech dvanáct předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:

  1. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
  2. Celery: systém implementující asynchronní fronty úloh pro Python
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/
  3. Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/
  4. RabbitMQ: jedna z nejúspěšnějších implementací brokera
    https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/
  5. Pokročilejší operace nabízené systémem RabbitMQ
    https://www.root.cz/clanky/po­krocilejsi-operace-nabizene-systemem-rabbitmq/
  6. ØMQ: knihovna pro asynchronní předávání zpráv
    https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/
  7. Další možnosti poskytované knihovnou ØMQ
    https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/
  8. Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
    https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/
  9. Apache ActiveMQ – další systém implementující message brokera
    https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/
  10. Použití Apache ActiveMQ s protokolem STOMP
    https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/
  11. Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
    https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-amqp-jazyk-go-a-message-brokeri/
  12. Komunikace s message brokery z programovacího jazyka Go
    https://www.root.cz/clanky/komunikace-s-message-brokery-z-programovaciho-jazyka-go/

20. Odkazy na Internetu

  1. NATS
    https://nats.io/about/
  2. NATS Streaming Concepts
    https://nats.io/documenta­tion/streaming/nats-streaming-intro/
  3. NATS Streaming Server
    https://nats.io/download/nats-io/nats-streaming-server/
  4. NATS Introduction
    https://nats.io/documentation/
  5. NATS Client Protocol
    https://nats.io/documenta­tion/internals/nats-protocol/
  6. NATS Messaging (Wikipedia)
    https://en.wikipedia.org/wi­ki/NATS_Messaging
  7. Stránka Apache Software Foundation
    http://www.apache.org/
  8. Informace o portu 5672
    http://www.tcp-udp-ports.com/port-5672.htm
  9. Třída MessagingHandler knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._handlers.MessagingHan­dler-class.html
  10. Třída Event knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._events.Event-class.html
  11. package stomp (Go)
    https://godoc.org/github.com/go-stomp/stomp
  12. Go language library for STOMP protocol
    https://github.com/go-stomp/stomp
  13. python-qpid-proton 0.26.0 na PyPi
    https://pypi.org/project/python-qpid-proton/
  14. Qpid Proton
    http://qpid.apache.org/proton/
  15. Using the AMQ Python Client
    https://access.redhat.com/do­cumentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/
  16. Apache ActiveMQ
    http://activemq.apache.org/
  17. Apache ActiveMQ Artemis
    https://activemq.apache.org/artemis/
  18. Apache ActiveMQ Artemis User Manual
    https://activemq.apache.or­g/artemis/docs/latest/index­.html
  19. KahaDB
    http://activemq.apache.or­g/kahadb.html
  20. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html
  21. Command Line Tools (Apache ActiveMQ)
    https://activemq.apache.org/activemq-command-line-tools-reference.html
  22. stomp.py 4.1.21 na PyPi
    https://pypi.org/project/stomp.py/
  23. Stomp Tutorial
    https://access.redhat.com/do­cumentation/en-US/Fuse_Message_Broker/5.5/html/Con­nectivity_Guide/files/FMBCon­nectivityStompTelnet.html
  24. Heartbeat (computing)
    https://en.wikipedia.org/wi­ki/Heartbeat_(computing)
  25. Apache Camel
    https://camel.apache.org/
  26. Red Hat Fuse
    https://developers.redhat­.com/products/fuse/overvi­ew/
  27. Confusion between ActiveMQ and ActiveMQ-Artemis?
    https://serverfault.com/qu­estions/873533/confusion-between-activemq-and-activemq-artemis
  28. Staré stránky projektu HornetQ
    http://hornetq.jboss.org/
  29. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  30. Difference between ActiveMQ vs Apache ActiveMQ Artemis
    http://activemq.2283324.n4­.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html
  31. Microservices communications. Why you should switch to message queues
    https://dev.to/matteojoli­veau/microservices-communications-why-you-should-switch-to-message-queues–48ia
  32. Stomp.py 4.1.19 documentation
    https://stomppy.readthedoc­s.io/en/stable/
  33. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  34. ØMQ – Distributed Messaging
    http://zeromq.org/
  35. ØMQ Community
    http://zeromq.org/community
  36. Get The Software
    http://zeromq.org/intro:get-the-software
  37. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  38. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  39. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  40. ZeroMQ RFC
    https://rfc.zeromq.org/
  41. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  42. zeromq/czmq
    https://github.com/zeromq/czmq
  43. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  44. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  45. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  46. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  47. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  48. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  49. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  50. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  51. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  52. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  53. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  54. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  55. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  56. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  57. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  58. Python binding
    http://zeromq.org/bindings:python
  59. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  60. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  61. About Nanomsg
    https://nanomsg.org/
  62. Advanced Message Queuing Protocol
    https://www.amqp.org/
  63. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  64. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  65. RabbitMQ
    https://www.rabbitmq.com/
  66. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  67. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  68. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  69. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  70. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  71. Erlang
    http://www.erlang.org/
  72. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  73. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  74. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  75. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  76. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  77. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  78. celery na PyPi
    https://pypi.org/project/celery/
  79. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  80. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  81. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  82. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  83. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  84. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  85. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  86. node-celery
    https://github.com/mher/node-celery
  87. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  88. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  89. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  90. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  91. Stránky projektu Redis
    https://redis.io/
  92. Introduction to Redis
    https://redis.io/topics/introduction
  93. Try Redis
    http://try.redis.io/
  94. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  95. Python Redis
    https://redislabs.com/lp/python-redis/
  96. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  97. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  98. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  99. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  100. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  101. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  102. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  103. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  104. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  105. Redis Persistence
    https://redis.io/topics/persistence
  106. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  107. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  108. Ost (knihovna)
    https://github.com/soveran/ost
  109. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  110. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  111. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  112. What Is Sharding?
    https://btcmanager.com/what-sharding/
  113. Redis clients
    https://redis.io/clients
  114. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  115. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  116. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  117. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  118. Resque
    https://github.com/resque/resque
  119. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  120. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  121. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  122. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  123. Pub/Sub
    https://redis.io/topics/pubsub
  124. ZeroMQ distributed messaging
    http://zeromq.org/
  125. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  126. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  127. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  128. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  129. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  130. Redis Protocol specification
    https://redis.io/topics/protocol
  131. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  132. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  133. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  134. Apache Kafka
    https://kafka.apache.org/
  135. Iron
    http://www.iron.io/mq
  136. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  137. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  138. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  139. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  140. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  141. Enqueueing internals
    http://python-rq.org/contrib/
  142. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  143. Queue – A thread-safe FIFO implementation
    https://pymotw.com/2/Queue/
  144. Queues
    http://queues.io/
  145. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  146. RestMQ
    http://restmq.com/
  147. ActiveMQ
    http://activemq.apache.org/
  148. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  149. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  150. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  151. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  152. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  153. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  154. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  155. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  156. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  157. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  158. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  159. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  160. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  161. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  162. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  163. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  164. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  165. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  166. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  167. Jupyter
    https://jupyter.org/
  168. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  169. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html

Byl pro vás článek přínosný?