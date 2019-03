11. Implementace jednoduchého komunikačního protokolu

1. Použití message brokeru NATS

V předchozí části seriálu o message brokerech jsme se ve stručnosti seznámili s projektem NATS. Připomeňme si, že se jedná o poměrně úspěšný a často nasazovaný projekt, jenž je vyvinut v programovacím jazyku Go, což mu do jisté míry zajišťuje stabilitu i škálovatelnost. To jsou vlastnosti, které u message brokerů většinou očekáváme.

Také jsme si řekli, že se systém NATS skládá z několika komponent:

V první řadě se jedná o samotný server, jenž se spouští příkazem gnatsd. Server je naprogramovaný v Go a při jeho vývoji bylo dbáno na to, aby byla zaručena vysoká dostupnost celé služby a přitom byla samotná služba s běžícím serverem málo náročná na systémové zdroje, především na spotřebu operační paměti (to má v době Dockeru a podobných nástrojů poměrně velký význam). Způsobem instalace a spuštění serveru se budeme zabývat hned v navazující kapitole; ostatně ve skutečnosti se nejedná o nijak složitou operaci. Dalším typem komponenty jsou programátorská rozhraní pro klienty, která v současnosti existují pro několik ekosystémů (což je většinou kombinace programovacího jazyka, knihoven a popř. jeho virtuálního stroje); viz též tabulky s podporovanými ekosystémy, které jsou zobrazeny pod tímto seznamem. Programátorské rozhraní určené pro jazyk Go bude popsáno ve třetí kapitole i v kapitolách navazujících. Třetí komponentou je NATS Streaming Server, který je opět naprogramován v jazyce Go. Způsobem využití Streaming Serveru a vůbec teorií, na které streamování stojí, se budeme věnovat v dalším článku. Čtvrtým typem komponenty je takzvaný NATS Connector Framework zajišťující propojení systému NATS s dalšími technologiemi (XMPP, logování, notifikační služby aj.). Ten je naprogramovaný v Javě a v současnosti je podporován například konektor pro Redis (https://github.com/nats-io/nats-connector-redis). I této problematice bude věnován samostatný článek.

Oficiálně jsou podporována rozhraní pro následující ekosystémy:

# Programovací jazyk/ekosystém 1 C 2 C# 3 Elixir 4 Go 5 Java 6 NGINX 7 Node.js 8 Python Asyncio 9 Python Tornado 10 Ruby 11 TypeScript

2. Instalace serveru systému NATS

Předkompilovaná verze serveru NATS je dostupná na stránce https://nats.io/download/nats-io/gnatsd/.

Ovšem vzhledem k tomu, že si možnosti systému NATS nejprve popíšeme na klientských aplikacích naprogramovaných v jazyce Go, předpokládám, že máte tento jazyk (překladač, linker, základní balíčky a další nástroje) nainstalovaný způsobem, který byl popsán v tomto článku. V takovém případě je lepší server systému NATS nainstalovat naprosto stejným způsobem, jako jakýkoli jiný externí balíček jazyka Go. Nejprve přejdeme do adresáře, na nějž ukazuje proměnná GOPATH, což typicky bývá adresář ~/go:

$ cd $GOPATH

Dále spustíme příkaz pro nainstalování balíčku, který obsahuje server NATSu:

$ go get github.com/nats-io/gnatsd

Po instalaci by měla adresářová struktura začínající adresářem $GOPATH obsahovat mj. i tyto podadresáře:

├── pkg │ └── linux_amd64 │ └── github.com │ └── nats-io ├── src │ └── github.com │ └── nats-io │ ├── gnatsd │ ├── │ ├── │ └──

Nejdůležitější však je, že se do podadresáře $GOPATH/bin nainstaloval spustitelný soubor gnatsd, který obsahuje celý NATS server. Můžeme si tedy otestovat jeho základní funkcionalitu. Nejprve přejdeme do adresáře, na který ukazuje proměnná prostředí $GOPATH/bin:

$ cd ~/go/bin

$GOPATH/bin přidat do proměnné PATH, takže spouštění všech binárních souborů vytvořených překladačem Go bude snadnější. Poznámka: alternativně si můžetepřidat do proměnné, takže spouštění všech binárních souborů vytvořených překladačem Go bude snadnější.

Poslední verze serveru NATS by měla být 2.0.0-RC-5, což si můžeme snadno ověřit:

$ ./gnatsd --version nats-server: v2.0.0-RC5

K dispozici je samozřejmě nápověda se všemi podporovanými volbami, které lze předat přes příkazový řádek:

$ ./gnatsd --help Usage: gnatsd [options] Server Options: -a, --addr <host> Bind to host address (default: 0.0.0.0) -p, --port <port> Use port for clients (default: 4222) -P, --pid <file> File to store PID -m, --http_port <port> Use port for http monitoring -ms,--https_port <port> Use port for https monitoring -c, --config <file> Configuration file -sl,--signal <signal>[=<pid>] Send signal to gnatsd process (stop, quit, reopen, reload) ... ... ...

V tom nejjednodušším případě můžeme nechat server spuštěný s výchozími parametry:

$ ./gnatsd [3918] 2019/03/16 16:40:26.554289 [INF] Starting nats-server version 2.0.0-RC5 [3918] 2019/03/16 16:40:26.554399 [INF] Git commit [not set] [3918] 2019/03/16 16:40:26.554673 [INF] Listening for client connections on 0.0.0.0:4222 [3918] 2019/03/16 16:40:26.554707 [INF] Server id is NALRV2K2U77VDPICK3NM3XQV7TY4DM5UVG6FT6IJQWACUW3WA2BRM5SM [3918] 2019/03/16 16:40:26.554726 [INF] Server is ready

Poznámka: server NATSu si prozatím nechte spuštěný na pozadí popř. na jiném terminálu, protože ho budeme volat ve všech dnešních demonstračních příkladech.

3. Instalace rozhraní NATSu pro aplikace vytvořené v programovacím jazyku Go

V úvodní kapitole jsme si řekli, že existuje hned několik oficiálně podporovaných rozhraní NATSu pro klienty psané v různých programovacích jazycích popř. ekosystémech. Dnes si ukážeme použití rozhraní pro klienty naprogramované v jazyce Go. Samotná instalace tohoto rozhraní se prakticky nijak neliší od samotné instalace serveru, pouze se (logicky) použije odlišný repositář s balíčkem:

$ cd $GOPATH $ go get github.com/nats-io/go-nats

Po instalaci by měla adresářová struktura začínající adresářem $GOPATH obsahovat mj. tyto podadresáře:

├── pkg │ └── linux_amd64 │ └── github.com │ └── nats-io ├── src │ └── github.com │ └── nats-io │ ├── gnatsd │ ├── go-nats │ ├── │ └──

Ve všech dalších příkladech tedy bude import balíčku nazvaného „go-nats“ vypadat následovně:

import "github.com/nats-io/go-nats"

Popř. si můžeme zvolit i jmenný alias nats (což budeme důsledně používat v demonstračních příkladech):

import nats "github.com/nats-io/go-nats"

bin nevznikne, na rozdíl od serveru. Poznámka: v tomto případě se skutečně nainstaluje pouze Go balíček; žádný spustitelný soubor v podadresářinevznikne, na rozdíl od serveru.

4. Nejjednodušší forma producenta zpráv

Vytváření (resp. přesněji řečeno publikování) zpráv s jejich posláním na server systému NATS je relativně přímočaré. Nejprve se musíme připojit k serveru, což zajišťuje funkce pojmenovaná Connect, které se předá URL serveru a výslednými hodnotami je buď struktura Conn představující připojení k serveru popř. objekt s popisem chyby, pokud se připojení z nějakého důvodu nepodařilo:

func Connect(url string, options ...Option) (*Conn, error)

Pro připojení k lokálně běžícímu serveru NATS můžeme použít konstantu nazvanou DefaultURL, která ve skutečnosti obsahuje tento řetězec:

"nats://localhost:4222"

Vlastní připojení bez kontroly případných chyb tedy může vypadat takto:

conn, _ := nats.Connect(nats.DefaultURL)

Jakmile je připojení navázáno, je možné zprávu poslat s využitím těchto metod datového typu Conn:

func (nc *Conn) Publish(subj string, data []byte) error func (nc *Conn) PublishMsg(m *Msg) error func (nc *Conn) PublishRequest(subj, reply string, data []byte) error

Nejjednodušší je použití první metody, která umožňuje poslat libovolnou sekvenci bajtů s určitým tématem (topic, subject). Převod řetězce na pole/řez bajtů známe, takže:

conn.Publish(Subject, []byte("Hello World"))

Na konci pro jistotu zavoláme i metodu Flush, která zajistí skutečné poslání zprávy před odpojením klienta:

conn.Flush()

Celý zdrojový kód producenta, který po svém spuštění pošle jednu zprávu s nastaveným tématem, vypadá takto:

package main import nats "github.com/nats-io/go-nats" const Subject = "test1" func main() { conn, _ := nats.Connect(nats.DefaultURL) conn.Publish(Subject, []byte("Hello World")) conn.Flush() }

Pokud tento příklad spustíte příkazem go run publisher.go, bude zpráva poslána serveru NATS, který se tuto zprávu bude snažit doručit případnému příjemci či více příjemcům (jejich počet není prakticky omezen). Ovšem v případě, že žádný příjemce nebude v daný okamžik připojen, bude zpráva zahozena, což je ovšem výchozí (a žádoucí) chování komunikační strategie typu pub-sub.

Poznámka: předchozí příklad neprovádí korektní ukončení připojení ani nekontroluje případné chyby, ke kterým může dojít. Tento nedostatek odstraníme v rámci následujících kapitol.

5. Malá odbočka – standardní balíček sync

Příjemce zpráv je realizován callback funkcí, která je asynchronně zavolána v samostatné gorutině ve chvíli, kdy je přijata nějaká zpráva. Registrace takové callback funkce (zde se jedná o funkci anonymní) může vypadat například takto:

conn.Subscribe(Subject, func(m *nats.Msg) { fmt.Printf("Received a message: %s

", string(m.Data)) })

Problém ovšem nastává v tom, že pokud dojde k ukončení funkce main, skončí i běh celého programu. Musíme tedy zajistit, aby se na konci funkce main čekalo na příjem zprávy. K tomuto účelu je možné použít možnosti nabízené standardním balíčkem sync.

Využijeme strukturu WaitGroup sloužící pro čekání na další gorutiny, přesněji řečeno na přesně stanovený počet gorutin. Při čekání na jedinou gorutinu použijeme konstrukci:

wg := sync.WaitGroup{} wg.Add(1) ... ... ... spuštění gorutiny, která má přístup k proměnné wg ... ... ... wg.Wait() // zde se běh zastaví, čeká se na wg.Done()

Samotná gorutina musí zavolat wg.Done, čímž se sníží počitadlo synchronizační struktury a jakmile se dojde k nule, je metoda wg.Wait ukončena.

6. Konzument zpráv

Nyní již máme k dispozici všechny informace, které jsou zapotřebí pro implementaci jednoduchého konzumenta zpráv. Nejprve se opět musíme připojit k NATS serveru funkcí Connect, kterou jsme si již popsali výše:

conn, _ := nats.Connect(nats.DefaultURL)

Následně vytvoříme synchronizační strukturu, která bude při zavolání metody Wait čekat na jedno volání metody Done (pochopitelně bude toto volání provedeno v jiné gorutině):

wg := sync.WaitGroup{} wg.Add(1)

Nyní se již klient může přihlásit k odebrání zprávy s nastaveným tématem. Ve chvíli, kdy bude zpráva přijata, zavolá se specifikovaná anonymní funkce, která je druhým parametrem metody Subscribe:

conn.Subscribe(Subject, func(m *nats.Msg) { fmt.Printf("Received a message: %s

", string(m.Data)) wg.Done() })

Povšimněte si, že po přijetí zprávy skutečně zavoláme metodu Done, čímž se sníží interní počitadlo v synchronizační struktuře.

Poslední volání ve funkci main zajistí čekání na příjem (jediné) zprávy:

wg.Wait()

Úplný zdrojový kód konzumenta, který po svém spuštění přijme jedinou zprávu s nastaveným tématem, vypadá následovně:

package main import ( "fmt" nats "github.com/nats-io/go-nats" "sync" ) const Subject = "test1" func main() { conn, _ := nats.Connect(nats.DefaultURL) wg := sync.WaitGroup{} wg.Add(1) conn.Subscribe(Subject, func(m *nats.Msg) { fmt.Printf("Received a message: %s

", string(m.Data)) wg.Done() }) wg.Wait() }

V této chvíli si můžeme zkusit ve dvou terminálech příjemce zpráv spustit:

$ go run subscriber.go

Oba příjemci by měli vyčkávat na doručení první zprávy.

V okamžiku, kdy se spustí producent zpráv:

$ go run publisher.go

…dostanou oba příjemci tu stejnou zprávu a automaticky se ukončí.

7. Vylepšení producenta i konzumenta zpráv

Naše první implementace producenta a konzumenta zcela jistě nebyla připravena pro produkční nasazení. Je tomu tak mj. i z toho důvodu, že nedocházelo ke korektní detekci chyb při připojení i při posílání zpráv a taktéž proto, že se spojení s NATS serverem automaticky neuzavíralo. Oba problémy můžeme snadno napravit:

Zkontrolujeme chybovou hodnotu vrácenou funkcí Connect Blokem defer zajistíme, že se připojení vždy uzavře Zkontrolujeme chybovou hodnotu vrácenou metodou Connection.Publish U konzumenta zajistíme automatické odhlášení zavoláním metody sub.Unsubscribe

Výsledkem je upravený kód producenta:

package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") err2 := conn.Publish(Subject, []byte("Hello World")) if err2 != nil { log.Fatal(err2) } conn.Flush() println("Message sent") }

Podobně je upraven i konzument/příjemce zpráv:

package main import ( "fmt" nats "github.com/nats-io/go-nats" "log" "sync" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() wg := sync.WaitGroup{} wg.Add(1) sub, err2 := conn.Subscribe(Subject, func(m *nats.Msg) { fmt.Printf("Received a message: %s

", string(m.Data)) wg.Done() }) if err2 != nil { log.Fatal(err2) } println("Subscribed", sub) wg.Wait() println("Finished waiting for message") err3 := sub.Unsubscribe() if err3 != nil { log.Fatal(err3) } println("Unsubscribed") }

8. Automatické odhlášení konzumenta po přijetí zadaného množství zpráv

V této kapitole si ukážeme, jak je možné implementovat automatické odhlášení konzumenta po přijetí předem známého množství zpráv.

Nejprve bude uveden zdrojový kód producenta zpráv, po jehož spuštění se do message brokera pošle deset zpráv s tématem nastaveným na „test1“:

package main import ( "fmt" nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") for i := 1; i < 10; i++ { message := fmt.Sprintf("Hello World #%d", i) err2 := conn.Publish(Subject, []byte(message)) println("Published", message) if err2 != nil { log.Fatal(err2) } conn.Flush() } println("All messages sent") }

Konzumenta, který se musí po přijetí určitého množství zpráv automaticky odpojit, je nutné na několika místech modifikovat. Nejprve nastavíme počitadlo pro synchronizační strukturu:

wg := sync.WaitGroup{} wg.Add(5)

Dále po navázání připojení a přihlášení k odběru zpráv nastavíme maximální počet přijímaných zpráv s využitím metody AutoUnsubscribe, a to následujícím způsobem:

err3 := sub.AutoUnsubscribe(5) if err3 != nil { log.Fatal(err3) }

Další kód již může zůstat zachován – nemusíme nikde použít počítanou smyčku ani další podobné řídicí struktury.

Zdrojový kód takto upraveného konzumenta vypadá následovně:

package main import ( "fmt" nats "github.com/nats-io/go-nats" "log" "sync" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() wg := sync.WaitGroup{} wg.Add(5) sub, err2 := conn.Subscribe(Subject, func(m *nats.Msg) { fmt.Printf("Received a message: %s

", string(m.Data)) wg.Done() }) if err2 != nil { log.Fatal(err2) } println("Subscribed", sub) err3 := sub.AutoUnsubscribe(5) if err3 != nil { log.Fatal(err3) } println("Automatic unsubscribe after 5 messages") wg.Wait() println("Finished waiting for messages") }

9. Využití kanálů při posílání zpráv na straně producenta

V praxi se poměrně často setkáme s tím, že zprávy jsou odesílány, popř. naopak přijímány s využitím kanálů. Je to pochopitelné, neboť se jedná o velmi elegantní přístup využívající možnosti poskytované samotným jazykem.

Při posílání zpráv se kanál vytvoří (zjednodušeně) takto:

conn, _ := nats.Connect(nats.DefaultURL) econn, _ := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) channel := make(chan string) econn.BindSendChan(Subject, channel)

Musíme tedy nejdříve specifikovat objekt používaný při kódování zpráv ze vstupního formátu (řetězec, JSON, …) do pole bajtů a následně již můžeme vytvořit a zaregistrovat kanál daného typu.

Samotné poslání zprávy je realizováno posláním řetězce do kanálu:

channel <- "Hello World #1" channel <- "Hello World #2" channel <- "Hello World #3"

Upravený producent zpráv tedy může vypadat následovně:

package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err) } defer econn.Close() channel := make(chan string) econn.BindSendChan(Subject, channel) println("Channel created") channel <- "Hello World #1" channel <- "Hello World #2" channel <- "Hello World #3" println("All messages sent") }

10. Využití kanálů při příjmu zpráv na straně konzumenta

Podobným způsobem je realizován i konzument zpráv, který se tak mohl značně zjednodušit (odpadla například potřeba práce se synchronizační strukturou atd.). Následující implementace konzumenta zpráv po svém spuštění přijme trojici zpráv (a to opět přes kanál) a následně se ukončí:

package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err) } defer econn.Close() channel := make(chan string) econn.BindRecvChan(Subject, channel) println("Channel created") println(<-channel) println(<-channel) println(<-channel) }

11. Implementace jednoduchého komunikačního protokolu

V některých typech aplikací se přes message brokera neposílají pouze „obyčejné“ zprávy obsahující data, ale i zprávy s příkazy, které nějakým způsobem ovlivňují či řídí příjemce zpráv. Můžeme tak vytvořit libovolně jednoduchý či naopak komplikovaný komunikační a řídicí protokol. Ukažme si nyní velmi primitivní implementaci takového protokolu, v němž se bude rozeznávat pouze jediný příkaz „EXIT“, jímž se ukončí všichni příjemci zprávy. Zprávy s jiným obsahem budou považovány za datové zprávy.

Poznámka: toto řešení je samozřejmě tak triviální, že může vést k nesprávnému chování, protože rozlišení významu zprávy je provedeno na základě jejího obsahu a nikoli (například) tématu či dalších příznaků posílaných v hlavičce. Nicméně se jedná pouze o první přiblížení.

Samotná implementace zdroje zpráv se nebude žádným zásadním způsobem odlišovat od již známých příkladů. Pouze nesmíme zapomenout poslat zprávu s tělem „EXIT“, aby ji dostali všichni připojení příjemci:

channel <- "Hello World #1" channel <- "Hello World #2" channel <- "Hello World #3" channel <- "EXIT"

Celá implementace zdroje zpráv bude vypadat následovně:

package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err) } defer econn.Close() channel := make(chan string) econn.BindSendChan(Subject, channel) println("Channel created") channel <- "Hello World #1" channel <- "Hello World #2" channel <- "Hello World #3" channel <- "EXIT" conn.Flush() println("All messages sent") }

Příjemce zprávy bude muset být upraven takovým způsobem, aby dokázat rozeznat tělo zprávy s textem „EXIT“ a po jejím přijetí se ukončil. O uzavření připojení se postará kód deklarovaný v blocích defer:

for { message := <-channel println(message) if message == "EXIT" { println("Received EXIT message...") break } }

Samozřejmě si opět ukážeme úplnou implementaci příjemce zpráv, která může vypadat následovně:

package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err) } defer econn.Close() channel := make(chan string) econn.BindRecvChan(Subject, channel) println("Channel created") for { message := <-channel println(message) if message == "EXIT" { println("Received EXIT message...") break } } }

12. Komunikace přes dva kanály: producent zpráv

Protokol je možné vylepšit oddělením datových zpráv od zpráv, které slouží k řízení klienta (vzdáleně to připomíná FTP). V následujícím příkladu je použita dvojice témat nazvaných „test1“ a „test2“, přičemž zprávy s prvním tématem budou datové a zprávy s tématem druhým řídicí. Pro posílání zpráv jsou otevřeny dva oddělené kanály data_channel a control_channel:

package main import ( nats "github.com/nats-io/go-nats" "log" "time" ) const Subject = "test1" const Control = "test2" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err2) } defer econn.Close() cconn, err3 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err3 != nil { log.Fatal(err3) } defer cconn.Close() data_channel := make(chan string) econn.BindSendChan(Subject, data_channel) println("Data channel created") control_channel := make(chan string) cconn.BindSendChan(Control, control_channel) println("Control channel created") data_channel <- "Hello World #1" data_channel <- "Hello World #2" data_channel <- "Hello World #3" data_channel <- "EXIT" time.Sleep(2 * time.Second) control_channel <- "EXIT" conn.Flush() println("All messages sent") }

13. Komunikace přes dva kanály: konzument zpráv

Klient přijímající zprávy se dvěma tématy je již implementován složitějším způsobem, protože je zde nutné využít programovou konstrukci select-case, která vybere ten kanál, v němž jsou k dispozici data (popř. vybere náhodný kanál, pokud jsou data přístupná ve více kanálech). Při příjmu řídicí zprávy se smyčka ukončí:

MESSAGE_LOOP: for { select { case message := <-data_channel: println("Received data message", message) case control := <-control_channel: println("Received control message", control) if control == "EXIT" { break MESSAGE_LOOP } } println("--------") }

Zdrojový kód takto upraveného klienta vypadá následovně:

package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" const Control = "test2" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err2) } defer econn.Close() cconn, err3 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err3 != nil { log.Fatal(err3) } defer cconn.Close() data_channel := make(chan string) econn.BindRecvChan(Subject, data_channel) println("Data channel created") control_channel := make(chan string) cconn.BindRecvChan(Control, control_channel) println("Control channel created") MESSAGE_LOOP: for { select { case message := <-data_channel: println("Received data message", message) case control := <-control_channel: println("Received control message", control) if control == "EXIT" { break MESSAGE_LOOP } } println("--------") } }

14. Rozdělování zpráv mezi příjemce patřící do stejné skupiny (Queueing)

V systému NATS se setkáme s pojmem queueing [1]. Nejedná se ovšem o skutečnou implementaci fronty v message brokerovi, ale o koncept, jakým jsou zprávy rozesílány příjemcům. V příkladech uvedených v předchozích kapitolách se používala klasická komunikační strategie nazvaná pub-sub, která je založena na tom, že zpráva, která je poslána do message brokera, je následně rozeslána všem příjemcům, kteří jsou v daný okamžik připojeni a přijímají zprávy se zvoleným tématem.

Pokud je tedy k message brokerovi připojeno deset příjemců zpráv s tématem (subject, topic) odpovídajícím poslané zprávě, bude zpráva přeposlána právě těmto deseti příjemcům. Alternativou je právě zmíněný queueing, jehož princip spočívá v tom, že se zpráva pošle pouze jedinému příjemci patřícímu do nějaké skupiny (queue group), a to na základě algoritmu, který se snaží o load balancing zpráv. To, do jaké skupiny příjemce patří, je řízeno pouze příjemcem, nikoli zdrojem zpráv (ten vlastně ani nemusí vědět, komu bude zpráva přeposlána).

Poznámka: velmi zjednodušeně řečeno se na queueing můžeme dívat jako na frontu, v níž stojí jednotliví příjemci zpráv. Zprávu vždy dostane příjemce, který je na začátku fronty, ihned po zpracování se zařadí zpět na konec fronty. Důležité je si uvědomit, že použití slova queue v tomto případě neznamená, že by message broker obsahoval přímou podporu pro (perzistentní) zprávy (to umí až NATS Streaming).

15. Projekt používající queueing

Ukažme si nyní, jakým způsobem je možné queueing otestovat v praxi. Nejdříve vytvoříme producenta zpráv, který po svém spuštění vytvoří a pošle přesně sto zpráv s tématem nastaveným na „test“. Zdrojový kód tohoto producenta vypadá následovně:

package main import ( "fmt" nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err2) } defer econn.Close() channel := make(chan string) econn.BindSendChan(Subject, channel) println("Channel created") // poslat 100 zprav for i := 1; i < 100; i++ { message := fmt.Sprintf("Hello World #%d", i) channel <- message conn.Flush() } println("All messages sent") }

V první implementaci konzumenta zpráv se použije metoda BindRecvQueueChan určená pro zaregistrování konzumenta k zadanému tématu společně se zařazením konzumenta do fronty nazvané „q1“:

const Queue = "q1" ... ... ... channel := make(chan string) econn.BindRecvQueueChan(Subject, Queue, channel)

Úplný zdrojový kód první varianty konzumenta zpráv:

package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" const Queue = "q1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err2) } defer econn.Close() channel := make(chan string) econn.BindRecvQueueChan(Subject, Queue, channel) println("Channel created") for i := 1; i < 20; i++ { println(<-channel) } }

Druhá implementace je prakticky totožná, ovšem používá odlišné jméno fronty:

const Queue = "X" ... ... ... channel := make(chan string) econn.BindRecvQueueChan(Subject, Queue, channel)

Úplný zdrojový kód druhé varianty konzumenta zpráv:

package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" const Queue = "X" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err2) } defer econn.Close() channel := make(chan string) econn.BindRecvQueueChan(Subject, Queue, channel) println("Channel created") for i := 1; i < 20; i++ { println(<-channel) } }

16. Chování systému po připojení pětice příjemců zpráv

Chování systému se dvěma frontami konzumentů zpráv si můžeme relativně snadno otestovat. Použijeme k tomu následující skript, který spustí pět konzumentů zařazených do fronty „q1“ a jednoho konzumenta zařazeného do fronty „X“:

for i in `seq 1 5` do nohup go run subscriber.go > subscriber${i}.log & done nohup go run subscriberX.go > subscriberX.log &

V jiném terminálu spustíme producenta 100 zpráv:

$ go run producent.go

Měla by vzniknout šestice souborů s informacemi o tom, které zprávy jaký konzument přijal. Pořadí zpráv sice nemusí být přesně zachováno, ovšem každý konzument by měl přijmout přesně dvacet zpráv.

Konzument #1 z fronty „q1“:

Channel created Hello World #9 Hello World #19 Hello World #20 Hello World #34 Hello World #37 Hello World #40 Hello World #47 Hello World #64 Hello World #66 Hello World #73 Hello World #83 Hello World #93 Hello World #94 Hello World #95 Hello World #96 Hello World #97

Konzument #2 z fronty „q1“:

Channel created Hello World #1 Hello World #3 Hello World #6 Hello World #10 Hello World #15 Hello World #18 Hello World #21 Hello World #24 Hello World #25 Hello World #43 Hello World #45 Hello World #46 Hello World #48 Hello World #51 Hello World #53 Hello World #54 Hello World #65 Hello World #68 Hello World #69

Konzument #3 z fronty „q1“:

Channel created Hello World #7 Hello World #13 Hello World #16 Hello World #22 Hello World #42 Hello World #50 Hello World #52 Hello World #58 Hello World #67 Hello World #76 Hello World #77 Hello World #78 Hello World #82 Hello World #84 Hello World #85 Hello World #88 Hello World #89 Hello World #91 Hello World #98

Konzument #4 z fronty „q1“:

Channel created Hello World #2 Hello World #11 Hello World #23 Hello World #27 Hello World #28 Hello World #29 Hello World #30 Hello World #31 Hello World #35 Hello World #36 Hello World #39 Hello World #49 Hello World #55 Hello World #57 Hello World #59 Hello World #63 Hello World #71 Hello World #74 Hello World #79

Konzument #5 z fronty „q1“:

Channel created Hello World #4 Hello World #5 Hello World #8 Hello World #12 Hello World #14 Hello World #17 Hello World #26 Hello World #32 Hello World #33 Hello World #38 Hello World #41 Hello World #44 Hello World #56 Hello World #60 Hello World #61 Hello World #62 Hello World #70 Hello World #72 Hello World #75

Jediný konzument z fronty „X“:

Channel created Hello World #1 Hello World #2 Hello World #3 Hello World #4 Hello World #5 Hello World #6 Hello World #7 Hello World #8 Hello World #9 Hello World #10 Hello World #11 Hello World #12 Hello World #13 Hello World #14 Hello World #15 Hello World #16 Hello World #17 Hello World #18 Hello World #19

Poznámka: vidíme, že prvních pět konzumentů si zprávy více méně spravedlivě rozdělilo, zatímco poslední konzument získal všechny zprávy v pořadí, v jakém byly poslány (mohl jich přijmout sto, ovšem po prvních dvaceti byl ukončen).

