Obsah
1. Použití message brokeru NATS
2. Instalace serveru systému NATS
3. Instalace rozhraní NATSu pro aplikace vytvořené v programovacím jazyku Go
4. Nejjednodušší forma producenta zpráv
5. Malá odbočka – standardní balíček sync
7. Vylepšení producenta i konzumenta zpráv
8. Automatické odhlášení konzumenta po přijetí zadaného množství zpráv
9. Využití kanálů při posílání zpráv na straně producenta
10. Využití kanálů při příjmu zpráv na straně konzumenta
11. Implementace jednoduchého komunikačního protokolu
12. Komunikace přes dva kanály: producent zpráv
13. Komunikace přes dva kanály: konzument zpráv
14. Rozdělování zpráv mezi příjemce patřící do stejné skupiny (Queueing)
15. Projekt používající queueing
16. Chování systému po připojení pětice příjemců zpráv
17. Obsah následující části seriálu
18. Repositář s demonstračními příklady
19. Odkazy na předchozí části seriálu
1. Použití message brokeru NATS
V předchozí části seriálu o message brokerech jsme se ve stručnosti seznámili s projektem NATS. Připomeňme si, že se jedná o poměrně úspěšný a často nasazovaný projekt, jenž je vyvinut v programovacím jazyku Go, což mu do jisté míry zajišťuje stabilitu i škálovatelnost. To jsou vlastnosti, které u message brokerů většinou očekáváme.
Také jsme si řekli, že se systém NATS skládá z několika komponent:
- V první řadě se jedná o samotný server, jenž se spouští příkazem gnatsd. Server je naprogramovaný v Go a při jeho vývoji bylo dbáno na to, aby byla zaručena vysoká dostupnost celé služby a přitom byla samotná služba s běžícím serverem málo náročná na systémové zdroje, především na spotřebu operační paměti (to má v době Dockeru a podobných nástrojů poměrně velký význam). Způsobem instalace a spuštění serveru se budeme zabývat hned v navazující kapitole; ostatně ve skutečnosti se nejedná o nijak složitou operaci.
- Dalším typem komponenty jsou programátorská rozhraní pro klienty, která v současnosti existují pro několik ekosystémů (což je většinou kombinace programovacího jazyka, knihoven a popř. jeho virtuálního stroje); viz též tabulky s podporovanými ekosystémy, které jsou zobrazeny pod tímto seznamem. Programátorské rozhraní určené pro jazyk Go bude popsáno ve třetí kapitole i v kapitolách navazujících.
- Třetí komponentou je NATS Streaming Server, který je opět naprogramován v jazyce Go. Způsobem využití Streaming Serveru a vůbec teorií, na které streamování stojí, se budeme věnovat v dalším článku.
- Čtvrtým typem komponenty je takzvaný NATS Connector Framework zajišťující propojení systému NATS s dalšími technologiemi (XMPP, logování, notifikační služby aj.). Ten je naprogramovaný v Javě a v současnosti je podporován například konektor pro Redis (https://github.com/nats-io/nats-connector-redis). I této problematice bude věnován samostatný článek.
Oficiálně jsou podporována rozhraní pro následující ekosystémy:
# | Programovací jazyk/ekosystém |
---|---|
1 | C |
2 | C# |
3 | Elixir |
4 | Go |
5 | Java |
6 | NGINX |
7 | Node.js |
8 | Python Asyncio |
9 | Python Tornado |
10 | Ruby |
11 | TypeScript |
2. Instalace serveru systému NATS
Předkompilovaná verze serveru NATS je dostupná na stránce https://nats.io/download/nats-io/gnatsd/.
Ovšem vzhledem k tomu, že si možnosti systému NATS nejprve popíšeme na klientských aplikacích naprogramovaných v jazyce Go, předpokládám, že máte tento jazyk (překladač, linker, základní balíčky a další nástroje) nainstalovaný způsobem, který byl popsán v tomto článku. V takovém případě je lepší server systému NATS nainstalovat naprosto stejným způsobem, jako jakýkoli jiný externí balíček jazyka Go. Nejprve přejdeme do adresáře, na nějž ukazuje proměnná GOPATH, což typicky bývá adresář ~/go:
$ cd $GOPATH
Dále spustíme příkaz pro nainstalování balíčku, který obsahuje server NATSu:
$ go get github.com/nats-io/gnatsd
Po instalaci by měla adresářová struktura začínající adresářem $GOPATH obsahovat mj. i tyto podadresáře:
├── pkg │ └── linux_amd64 │ └── github.com │ └── nats-io ├── src │ └── github.com │ └── nats-io │ ├── gnatsd │ ├── │ ├── │ └──
Nejdůležitější však je, že se do podadresáře $GOPATH/bin nainstaloval spustitelný soubor gnatsd, který obsahuje celý NATS server. Můžeme si tedy otestovat jeho základní funkcionalitu. Nejprve přejdeme do adresáře, na který ukazuje proměnná prostředí $GOPATH/bin:
$ cd ~/go/bin
Poslední verze serveru NATS by měla být 2.0.0-RC-5, což si můžeme snadno ověřit:
$ ./gnatsd --version nats-server: v2.0.0-RC5
K dispozici je samozřejmě nápověda se všemi podporovanými volbami, které lze předat přes příkazový řádek:
$ ./gnatsd --help Usage: gnatsd [options] Server Options: -a, --addr <host> Bind to host address (default: 0.0.0.0) -p, --port <port> Use port for clients (default: 4222) -P, --pid <file> File to store PID -m, --http_port <port> Use port for http monitoring -ms,--https_port <port> Use port for https monitoring -c, --config <file> Configuration file -sl,--signal <signal>[=<pid>] Send signal to gnatsd process (stop, quit, reopen, reload) ... ... ...
V tom nejjednodušším případě můžeme nechat server spuštěný s výchozími parametry:
$ ./gnatsd [3918] 2019/03/16 16:40:26.554289 [INF] Starting nats-server version 2.0.0-RC5 [3918] 2019/03/16 16:40:26.554399 [INF] Git commit [not set] [3918] 2019/03/16 16:40:26.554673 [INF] Listening for client connections on 0.0.0.0:4222 [3918] 2019/03/16 16:40:26.554707 [INF] Server id is NALRV2K2U77VDPICK3NM3XQV7TY4DM5UVG6FT6IJQWACUW3WA2BRM5SM [3918] 2019/03/16 16:40:26.554726 [INF] Server is ready
3. Instalace rozhraní NATSu pro aplikace vytvořené v programovacím jazyku Go
V úvodní kapitole jsme si řekli, že existuje hned několik oficiálně podporovaných rozhraní NATSu pro klienty psané v různých programovacích jazycích popř. ekosystémech. Dnes si ukážeme použití rozhraní pro klienty naprogramované v jazyce Go. Samotná instalace tohoto rozhraní se prakticky nijak neliší od samotné instalace serveru, pouze se (logicky) použije odlišný repositář s balíčkem:
$ cd $GOPATH $ go get github.com/nats-io/go-nats
Po instalaci by měla adresářová struktura začínající adresářem $GOPATH obsahovat mj. tyto podadresáře:
├── pkg │ └── linux_amd64 │ └── github.com │ └── nats-io ├── src │ └── github.com │ └── nats-io │ ├── gnatsd │ ├── go-nats │ ├── │ └──
Ve všech dalších příkladech tedy bude import balíčku nazvaného „nats“ vypadat následovně (jméno balíčku je skutečně „nats“ a nikoli „go-nats“; to ostatně v Go není platný identifikátor):
import "github.com/nats-io/go-nats"
Popř. si můžeme explicitně zvolit i jmenný alias nats (což budeme důsledně používat v demonstračních příkladech):
import nats "github.com/nats-io/go-nats"
4. Nejjednodušší forma producenta zpráv
Vytváření (resp. přesněji řečeno publikování) zpráv s jejich posláním na server systému NATS je relativně přímočaré. Nejprve se musíme připojit k serveru, což zajišťuje funkce pojmenovaná Connect, které se předá URL serveru a výslednými hodnotami je buď struktura Conn představující připojení k serveru popř. objekt s popisem chyby, pokud se připojení z nějakého důvodu nepodařilo:
func Connect(url string, options ...Option) (*Conn, error)
Pro připojení k lokálně běžícímu serveru NATS můžeme použít konstantu nazvanou DefaultURL, která ve skutečnosti obsahuje tento řetězec:
"nats://localhost:4222"
Vlastní připojení bez kontroly případných chyb tedy může vypadat takto:
conn, _ := nats.Connect(nats.DefaultURL)
Jakmile je připojení navázáno, je možné zprávu poslat s využitím těchto metod datového typu Conn:
func (nc *Conn) Publish(subj string, data []byte) error func (nc *Conn) PublishMsg(m *Msg) error func (nc *Conn) PublishRequest(subj, reply string, data []byte) error
Nejjednodušší je použití první metody, která umožňuje poslat libovolnou sekvenci bajtů s určitým tématem (topic, subject). Převod řetězce na pole/řez bajtů známe, takže:
conn.Publish(Subject, []byte("Hello World"))
Na konci pro jistotu zavoláme i metodu Flush, která zajistí skutečné poslání zprávy před odpojením klienta:
conn.Flush()
Celý zdrojový kód producenta, který po svém spuštění pošle jednu zprávu s nastaveným tématem, vypadá takto:
package main import nats "github.com/nats-io/go-nats" const Subject = "test1" func main() { conn, _ := nats.Connect(nats.DefaultURL) conn.Publish(Subject, []byte("Hello World")) conn.Flush() }
Pokud tento příklad spustíte příkazem go run publisher.go, bude zpráva poslána serveru NATS, který se tuto zprávu bude snažit doručit případnému příjemci či více příjemcům (jejich počet není prakticky omezen). Ovšem v případě, že žádný příjemce nebude v daný okamžik připojen, bude zpráva zahozena, což je ovšem výchozí (a žádoucí) chování komunikační strategie typu pub-sub.
5. Malá odbočka – standardní balíček sync
Příjemce zpráv je realizován callback funkcí, která je asynchronně zavolána v samostatné gorutině ve chvíli, kdy je přijata nějaká zpráva. Registrace takové callback funkce (zde se jedná o funkci anonymní) může vypadat například takto:
conn.Subscribe(Subject, func(m *nats.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) })
Problém ovšem nastává v tom, že pokud dojde k ukončení funkce main, skončí i běh celého programu. Musíme tedy zajistit, aby se na konci funkce main čekalo na příjem zprávy. K tomuto účelu je možné použít možnosti nabízené standardním balíčkem sync.
Využijeme strukturu WaitGroup sloužící pro čekání na další gorutiny, přesněji řečeno na přesně stanovený počet gorutin. Při čekání na jedinou gorutinu použijeme konstrukci:
wg := sync.WaitGroup{} wg.Add(1) ... ... ... spuštění gorutiny, která má přístup k proměnné wg ... ... ... wg.Wait() // zde se běh zastaví, čeká se na wg.Done()
Samotná gorutina musí zavolat wg.Done, čímž se sníží počitadlo synchronizační struktury a jakmile se dojde k nule, je metoda wg.Wait ukončena.
6. Konzument zpráv
Nyní již máme k dispozici všechny informace, které jsou zapotřebí pro implementaci jednoduchého konzumenta zpráv. Nejprve se opět musíme připojit k NATS serveru funkcí Connect, kterou jsme si již popsali výše:
conn, _ := nats.Connect(nats.DefaultURL)
Následně vytvoříme synchronizační strukturu, která bude při zavolání metody Wait čekat na jedno volání metody Done (pochopitelně bude toto volání provedeno v jiné gorutině):
wg := sync.WaitGroup{} wg.Add(1)
Nyní se již klient může přihlásit k odebrání zprávy s nastaveným tématem. Ve chvíli, kdy bude zpráva přijata, zavolá se specifikovaná anonymní funkce, která je druhým parametrem metody Subscribe:
conn.Subscribe(Subject, func(m *nats.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) wg.Done() })
Povšimněte si, že po přijetí zprávy skutečně zavoláme metodu Done, čímž se sníží interní počitadlo v synchronizační struktuře.
Poslední volání ve funkci main zajistí čekání na příjem (jediné) zprávy:
wg.Wait()
Úplný zdrojový kód konzumenta, který po svém spuštění přijme jedinou zprávu s nastaveným tématem, vypadá následovně:
package main import ( "fmt" nats "github.com/nats-io/go-nats" "sync" ) const Subject = "test1" func main() { conn, _ := nats.Connect(nats.DefaultURL) wg := sync.WaitGroup{} wg.Add(1) conn.Subscribe(Subject, func(m *nats.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) wg.Done() }) wg.Wait() }
V této chvíli si můžeme zkusit ve dvou terminálech příjemce zpráv spustit:
$ go run subscriber.go
Oba příjemci by měli vyčkávat na doručení první zprávy.
V okamžiku, kdy se spustí producent zpráv:
$ go run publisher.go
…dostanou oba příjemci tu stejnou zprávu a automaticky se ukončí.
7. Vylepšení producenta i konzumenta zpráv
Naše první implementace producenta a konzumenta zcela jistě nebyla připravena pro produkční nasazení. Je tomu tak mj. i z toho důvodu, že nedocházelo ke korektní detekci chyb při připojení i při posílání zpráv a taktéž proto, že se spojení s NATS serverem automaticky neuzavíralo. Oba problémy můžeme snadno napravit:
- Zkontrolujeme chybovou hodnotu vrácenou funkcí Connect
- Blokem defer zajistíme, že se připojení vždy uzavře
- Zkontrolujeme chybovou hodnotu vrácenou metodou Connection.Publish
- U konzumenta zajistíme automatické odhlášení zavoláním metody sub.Unsubscribe
Výsledkem je upravený kód producenta:
package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") err2 := conn.Publish(Subject, []byte("Hello World")) if err2 != nil { log.Fatal(err2) } conn.Flush() println("Message sent") }
Podobně je upraven i konzument/příjemce zpráv:
package main import ( "fmt" nats "github.com/nats-io/go-nats" "log" "sync" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() wg := sync.WaitGroup{} wg.Add(1) sub, err2 := conn.Subscribe(Subject, func(m *nats.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) wg.Done() }) if err2 != nil { log.Fatal(err2) } println("Subscribed", sub) wg.Wait() println("Finished waiting for message") err3 := sub.Unsubscribe() if err3 != nil { log.Fatal(err3) } println("Unsubscribed") }
8. Automatické odhlášení konzumenta po přijetí zadaného množství zpráv
V této kapitole si ukážeme, jak je možné implementovat automatické odhlášení konzumenta po přijetí předem známého množství zpráv.
Nejprve bude uveden zdrojový kód producenta zpráv, po jehož spuštění se do message brokera pošle deset zpráv s tématem nastaveným na „test1“:
package main import ( "fmt" nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") for i := 1; i < 10; i++ { message := fmt.Sprintf("Hello World #%d", i) err2 := conn.Publish(Subject, []byte(message)) println("Published", message) if err2 != nil { log.Fatal(err2) } conn.Flush() } println("All messages sent") }
Konzumenta, který se musí po přijetí určitého množství zpráv automaticky odpojit, je nutné na několika místech modifikovat. Nejprve nastavíme počitadlo pro synchronizační strukturu:
wg := sync.WaitGroup{} wg.Add(5)
Dále po navázání připojení a přihlášení k odběru zpráv nastavíme maximální počet přijímaných zpráv s využitím metody AutoUnsubscribe, a to následujícím způsobem:
err3 := sub.AutoUnsubscribe(5) if err3 != nil { log.Fatal(err3) }
Další kód již může zůstat zachován – nemusíme nikde použít počítanou smyčku ani další podobné řídicí struktury.
Zdrojový kód takto upraveného konzumenta vypadá následovně:
package main import ( "fmt" nats "github.com/nats-io/go-nats" "log" "sync" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() wg := sync.WaitGroup{} wg.Add(5) sub, err2 := conn.Subscribe(Subject, func(m *nats.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) wg.Done() }) if err2 != nil { log.Fatal(err2) } println("Subscribed", sub) err3 := sub.AutoUnsubscribe(5) if err3 != nil { log.Fatal(err3) } println("Automatic unsubscribe after 5 messages") wg.Wait() println("Finished waiting for messages") }
9. Využití kanálů při posílání zpráv na straně producenta
V praxi se poměrně často setkáme s tím, že zprávy jsou odesílány, popř. naopak přijímány s využitím kanálů. Je to pochopitelné, neboť se jedná o velmi elegantní přístup využívající možnosti poskytované samotným jazykem.
Při posílání zpráv se kanál vytvoří (zjednodušeně) takto:
conn, _ := nats.Connect(nats.DefaultURL) econn, _ := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) channel := make(chan string) econn.BindSendChan(Subject, channel)
Musíme tedy nejdříve specifikovat objekt používaný při kódování zpráv ze vstupního formátu (řetězec, JSON, …) do pole bajtů a následně již můžeme vytvořit a zaregistrovat kanál daného typu.
Samotné poslání zprávy je realizováno posláním řetězce do kanálu:
channel <- "Hello World #1" channel <- "Hello World #2" channel <- "Hello World #3"
Upravený producent zpráv tedy může vypadat následovně:
package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err) } defer econn.Close() channel := make(chan string) econn.BindSendChan(Subject, channel) println("Channel created") channel <- "Hello World #1" channel <- "Hello World #2" channel <- "Hello World #3" println("All messages sent") }
10. Využití kanálů při příjmu zpráv na straně konzumenta
Podobným způsobem je realizován i konzument zpráv, který se tak mohl značně zjednodušit (odpadla například potřeba práce se synchronizační strukturou atd.). Následující implementace konzumenta zpráv po svém spuštění přijme trojici zpráv (a to opět přes kanál) a následně se ukončí:
package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err) } defer econn.Close() channel := make(chan string) econn.BindRecvChan(Subject, channel) println("Channel created") println(<-channel) println(<-channel) println(<-channel) }
11. Implementace jednoduchého komunikačního protokolu
V některých typech aplikací se přes message brokera neposílají pouze „obyčejné“ zprávy obsahující data, ale i zprávy s příkazy, které nějakým způsobem ovlivňují či řídí příjemce zpráv. Můžeme tak vytvořit libovolně jednoduchý či naopak komplikovaný komunikační a řídicí protokol. Ukažme si nyní velmi primitivní implementaci takového protokolu, v němž se bude rozeznávat pouze jediný příkaz „EXIT“, jímž se ukončí všichni příjemci zprávy. Zprávy s jiným obsahem budou považovány za datové zprávy.
Samotná implementace zdroje zpráv se nebude žádným zásadním způsobem odlišovat od již známých příkladů. Pouze nesmíme zapomenout poslat zprávu s tělem „EXIT“, aby ji dostali všichni připojení příjemci:
channel <- "Hello World #1" channel <- "Hello World #2" channel <- "Hello World #3" channel <- "EXIT"
Celá implementace zdroje zpráv bude vypadat následovně:
package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err) } defer econn.Close() channel := make(chan string) econn.BindSendChan(Subject, channel) println("Channel created") channel <- "Hello World #1" channel <- "Hello World #2" channel <- "Hello World #3" channel <- "EXIT" conn.Flush() println("All messages sent") }
Příjemce zprávy bude muset být upraven takovým způsobem, aby dokázat rozeznat tělo zprávy s textem „EXIT“ a po jejím přijetí se ukončil. O uzavření připojení se postará kód deklarovaný v blocích defer:
for { message := <-channel println(message) if message == "EXIT" { println("Received EXIT message...") break } }
Samozřejmě si opět ukážeme úplnou implementaci příjemce zpráv, která může vypadat následovně:
package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err) } defer econn.Close() channel := make(chan string) econn.BindRecvChan(Subject, channel) println("Channel created") for { message := <-channel println(message) if message == "EXIT" { println("Received EXIT message...") break } } }
12. Komunikace přes dva kanály: producent zpráv
Protokol je možné vylepšit oddělením datových zpráv od zpráv, které slouží k řízení klienta (vzdáleně to připomíná FTP). V následujícím příkladu je použita dvojice témat nazvaných „test1“ a „test2“, přičemž zprávy s prvním tématem budou datové a zprávy s tématem druhým řídicí. Pro posílání zpráv jsou otevřeny dva oddělené kanály data_channel a control_channel:
package main import ( nats "github.com/nats-io/go-nats" "log" "time" ) const Subject = "test1" const Control = "test2" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err2) } defer econn.Close() cconn, err3 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err3 != nil { log.Fatal(err3) } defer cconn.Close() data_channel := make(chan string) econn.BindSendChan(Subject, data_channel) println("Data channel created") control_channel := make(chan string) cconn.BindSendChan(Control, control_channel) println("Control channel created") data_channel <- "Hello World #1" data_channel <- "Hello World #2" data_channel <- "Hello World #3" data_channel <- "EXIT" time.Sleep(2 * time.Second) control_channel <- "EXIT" conn.Flush() println("All messages sent") }
13. Komunikace přes dva kanály: konzument zpráv
Klient přijímající zprávy se dvěma tématy je již implementován složitějším způsobem, protože je zde nutné využít programovou konstrukci select-case, která vybere ten kanál, v němž jsou k dispozici data (popř. vybere náhodný kanál, pokud jsou data přístupná ve více kanálech). Při příjmu řídicí zprávy se smyčka ukončí:
MESSAGE_LOOP: for { select { case message := <-data_channel: println("Received data message", message) case control := <-control_channel: println("Received control message", control) if control == "EXIT" { break MESSAGE_LOOP } } println("--------") }
Zdrojový kód takto upraveného klienta vypadá následovně:
package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" const Control = "test2" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err2) } defer econn.Close() cconn, err3 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err3 != nil { log.Fatal(err3) } defer cconn.Close() data_channel := make(chan string) econn.BindRecvChan(Subject, data_channel) println("Data channel created") control_channel := make(chan string) cconn.BindRecvChan(Control, control_channel) println("Control channel created") MESSAGE_LOOP: for { select { case message := <-data_channel: println("Received data message", message) case control := <-control_channel: println("Received control message", control) if control == "EXIT" { break MESSAGE_LOOP } } println("--------") } }
14. Rozdělování zpráv mezi příjemce patřící do stejné skupiny (Queueing)
V systému NATS se setkáme s pojmem queueing [1]. Nejedná se ovšem o skutečnou implementaci fronty v message brokerovi, ale o koncept, jakým jsou zprávy rozesílány příjemcům. V příkladech uvedených v předchozích kapitolách se používala klasická komunikační strategie nazvaná pub-sub, která je založena na tom, že zpráva, která je poslána do message brokera, je následně rozeslána všem příjemcům, kteří jsou v daný okamžik připojeni a přijímají zprávy se zvoleným tématem.
Pokud je tedy k message brokerovi připojeno deset příjemců zpráv s tématem (subject, topic) odpovídajícím poslané zprávě, bude zpráva přeposlána právě těmto deseti příjemcům. Alternativou je právě zmíněný queueing, jehož princip spočívá v tom, že se zpráva pošle pouze jedinému příjemci patřícímu do nějaké skupiny (queue group), a to na základě algoritmu, který se snaží o load balancing zpráv. To, do jaké skupiny příjemce patří, je řízeno pouze příjemcem, nikoli zdrojem zpráv (ten vlastně ani nemusí vědět, komu bude zpráva přeposlána).
15. Projekt používající queueing
Ukažme si nyní, jakým způsobem je možné queueing otestovat v praxi. Nejdříve vytvoříme producenta zpráv, který po svém spuštění vytvoří a pošle přesně sto zpráv s tématem nastaveným na „test“. Zdrojový kód tohoto producenta vypadá následovně:
package main import ( "fmt" nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() println("Connected") econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err2) } defer econn.Close() channel := make(chan string) econn.BindSendChan(Subject, channel) println("Channel created") // poslat 100 zprav for i := 1; i < 100; i++ { message := fmt.Sprintf("Hello World #%d", i) channel <- message conn.Flush() } println("All messages sent") }
V první implementaci konzumenta zpráv se použije metoda BindRecvQueueChan určená pro zaregistrování konzumenta k zadanému tématu společně se zařazením konzumenta do fronty nazvané „q1“:
const Queue = "q1" ... ... ... channel := make(chan string) econn.BindRecvQueueChan(Subject, Queue, channel)
Úplný zdrojový kód první varianty konzumenta zpráv:
package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" const Queue = "q1" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err2) } defer econn.Close() channel := make(chan string) econn.BindRecvQueueChan(Subject, Queue, channel) println("Channel created") for i := 1; i < 20; i++ { println(<-channel) } }
Druhá implementace je prakticky totožná, ovšem používá odlišné jméno fronty:
const Queue = "X" ... ... ... channel := make(chan string) econn.BindRecvQueueChan(Subject, Queue, channel)
Úplný zdrojový kód druhé varianty konzumenta zpráv:
package main import ( nats "github.com/nats-io/go-nats" "log" ) const Subject = "test1" const Queue = "X" func main() { conn, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatal(err) } defer conn.Close() econn, err2 := nats.NewEncodedConn(conn, nats.DEFAULT_ENCODER) if err2 != nil { log.Fatal(err2) } defer econn.Close() channel := make(chan string) econn.BindRecvQueueChan(Subject, Queue, channel) println("Channel created") for i := 1; i < 20; i++ { println(<-channel) } }
16. Chování systému po připojení pětice příjemců zpráv
Chování systému se dvěma frontami konzumentů zpráv si můžeme relativně snadno otestovat. Použijeme k tomu následující skript, který spustí pět konzumentů zařazených do fronty „q1“ a jednoho konzumenta zařazeného do fronty „X“:
for i in `seq 1 5` do nohup go run subscriber.go > subscriber${i}.log & done nohup go run subscriberX.go > subscriberX.log &
V jiném terminálu spustíme producenta 100 zpráv:
$ go run producent.go
Měla by vzniknout šestice souborů s informacemi o tom, které zprávy jaký konzument přijal. Pořadí zpráv sice nemusí být přesně zachováno, ovšem každý konzument by měl přijmout přesně dvacet zpráv.
Konzument #1 z fronty „q1“:
Channel created Hello World #9 Hello World #19 Hello World #20 Hello World #34 Hello World #37 Hello World #40 Hello World #47 Hello World #64 Hello World #66 Hello World #73 Hello World #83 Hello World #93 Hello World #94 Hello World #95 Hello World #96 Hello World #97
Konzument #2 z fronty „q1“:
Channel created Hello World #1 Hello World #3 Hello World #6 Hello World #10 Hello World #15 Hello World #18 Hello World #21 Hello World #24 Hello World #25 Hello World #43 Hello World #45 Hello World #46 Hello World #48 Hello World #51 Hello World #53 Hello World #54 Hello World #65 Hello World #68 Hello World #69
Konzument #3 z fronty „q1“:
Channel created Hello World #7 Hello World #13 Hello World #16 Hello World #22 Hello World #42 Hello World #50 Hello World #52 Hello World #58 Hello World #67 Hello World #76 Hello World #77 Hello World #78 Hello World #82 Hello World #84 Hello World #85 Hello World #88 Hello World #89 Hello World #91 Hello World #98
Konzument #4 z fronty „q1“:
Channel created Hello World #2 Hello World #11 Hello World #23 Hello World #27 Hello World #28 Hello World #29 Hello World #30 Hello World #31 Hello World #35 Hello World #36 Hello World #39 Hello World #49 Hello World #55 Hello World #57 Hello World #59 Hello World #63 Hello World #71 Hello World #74 Hello World #79
Konzument #5 z fronty „q1“:
Channel created Hello World #4 Hello World #5 Hello World #8 Hello World #12 Hello World #14 Hello World #17 Hello World #26 Hello World #32 Hello World #33 Hello World #38 Hello World #41 Hello World #44 Hello World #56 Hello World #60 Hello World #61 Hello World #62 Hello World #70 Hello World #72 Hello World #75
Jediný konzument z fronty „X“:
Channel created Hello World #1 Hello World #2 Hello World #3 Hello World #4 Hello World #5 Hello World #6 Hello World #7 Hello World #8 Hello World #9 Hello World #10 Hello World #11 Hello World #12 Hello World #13 Hello World #14 Hello World #15 Hello World #16 Hello World #17 Hello World #18 Hello World #19
17. Obsah následující části seriálu
V navazující části tohoto seriálu si ukážeme některé další možnosti, které systém NATS vývojářům složitějších aplikací nabízí. Popíšeme si například komunikační strategii req-rep (request-response), která je řešena s využitím takzvaných inboxů a kterou lze v případě potřeby zkombinovat s výše popsaným queueingem (což není zdaleka triviální). Ovšem důležitější bude popis systému pojmenovaného NATS Streaming, protože se bude jednat o první podobný systém pro streamování zpráv, s nímž se v tomto seriálu podrobněji seznámíme (ze známějších systémů jmenujme například projekt Apache Kafka).
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.
19. Odkazy na předchozí části seriálu
V této kapitole jsou uvedeny odkazy na všech dvanáct předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:
- Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ - Celery: systém implementující asynchronní fronty úloh pro Python
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/ - Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/ - RabbitMQ: jedna z nejúspěšnějších implementací brokera
https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/ - Pokročilejší operace nabízené systémem RabbitMQ
https://www.root.cz/clanky/pokrocilejsi-operace-nabizene-systemem-rabbitmq/ - ØMQ: knihovna pro asynchronní předávání zpráv
https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/ - Další možnosti poskytované knihovnou ØMQ
https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/ - Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/ - Apache ActiveMQ – další systém implementující message brokera
https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/ - Použití Apache ActiveMQ s protokolem STOMP
https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/ - Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-amqp-jazyk-go-a-message-brokeri/ - Komunikace s message brokery z programovacího jazyka Go
https://www.root.cz/clanky/komunikace-s-message-brokery-z-programovaciho-jazyka-go/
20. Odkazy na Internetu
- NATS
https://nats.io/about/ - NATS Streaming Concepts
https://nats.io/documentation/streaming/nats-streaming-intro/ - NATS Streaming Server
https://nats.io/download/nats-io/nats-streaming-server/ - NATS Introduction
https://nats.io/documentation/ - NATS Client Protocol
https://nats.io/documentation/internals/nats-protocol/ - NATS Messaging (Wikipedia)
https://en.wikipedia.org/wiki/NATS_Messaging - Stránka Apache Software Foundation
http://www.apache.org/ - Informace o portu 5672
http://www.tcp-udp-ports.com/port-5672.htm - Třída MessagingHandler knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._handlers.MessagingHandler-class.html - Třída Event knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._events.Event-class.html - package stomp (Go)
https://godoc.org/github.com/go-stomp/stomp - Go language library for STOMP protocol
https://github.com/go-stomp/stomp - python-qpid-proton 0.26.0 na PyPi
https://pypi.org/project/python-qpid-proton/ - Qpid Proton
http://qpid.apache.org/proton/ - Using the AMQ Python Client
https://access.redhat.com/documentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/ - Apache ActiveMQ
http://activemq.apache.org/ - Apache ActiveMQ Artemis
https://activemq.apache.org/artemis/ - Apache ActiveMQ Artemis User Manual
https://activemq.apache.org/artemis/docs/latest/index.html - KahaDB
http://activemq.apache.org/kahadb.html - Understanding the KahaDB Message Store
https://access.redhat.com/documentation/en-US/Fuse_MQ_Enterprise/7.1/html/Configuring_Broker_Persistence/files/KahaDBOverview.html - Command Line Tools (Apache ActiveMQ)
https://activemq.apache.org/activemq-command-line-tools-reference.html - stomp.py 4.1.21 na PyPi
https://pypi.org/project/stomp.py/ - Stomp Tutorial
https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.5/html/Connectivity_Guide/files/FMBConnectivityStompTelnet.html - Heartbeat (computing)
https://en.wikipedia.org/wiki/Heartbeat_(computing) - Apache Camel
https://camel.apache.org/ - Red Hat Fuse
https://developers.redhat.com/products/fuse/overview/ - Confusion between ActiveMQ and ActiveMQ-Artemis?
https://serverfault.com/questions/873533/confusion-between-activemq-and-activemq-artemis - Staré stránky projektu HornetQ
http://hornetq.jboss.org/ - Snapshot JeroMQ verze 0.4.4
https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/ - Difference between ActiveMQ vs Apache ActiveMQ Artemis
http://activemq.2283324.n4.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html - Microservices communications. Why you should switch to message queues
https://dev.to/matteojoliveau/microservices-communications-why-you-should-switch-to-message-queues–48ia - Stomp.py 4.1.19 documentation
https://stomppy.readthedocs.io/en/stable/ - Repositář knihovny JeroMQ
https://github.com/zeromq/jeromq/ - ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queue – A thread-safe FIFO implementation
https://pymotw.com/2/Queue/ - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - How IPython and Jupyter Notebook work
https://jupyter.readthedocs.io/en/latest/architecture/how_jupyter_ipython_work.html - Context Managers
http://book.pythontips.com/en/latest/context_managers.html