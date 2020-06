11. Implementace příjemce zpráv

12. Úplný zdrojový kód příkladu se zdrojem i příjemcem zpráv

13. Chování systému ve chvíli, kdy je připojeno větší množství příjemců

14. Příjemce používající kanál jazyka Go

15. Využití pipeline nabízené Redisem

16. Zdrojový kód příkladu používajícího pipeline

17. Skripty naprogramované v jazyku Lua a spouštěné na serveru

18. Příklad jednoduchého skriptu spouštěného na straně serveru

19. Repositář s demonstračními příklady

20. Odkazy na Internetu

1. Posílání zpráv v aplikacích založených na mikroslužbách

„It's hard to get messaging right“

Programovací jazyk Go se používá v několika oblastech a jednou z těchto oblastí jsou i mikroslužby. Ty spolu musí vhodným způsobem komunikovat – a to s využitím nějakých zpráv, které typicky nebývají posílané přímo mezi jednotlivými mikroslužbami, ale využívá se nějaký centralizovaný či decentralizovaný message broker. Namísto specializovaného message brokeru si však mnohdy vystačíme i s jednodušším a přitom dobře škálovatelným řešením založeným na Redisu. A právě z tohoto důvodu je kombinace Go+Redis velmi často využívaná v praxi. V úvodních kapitolách si připomeneme, jaké dva typy komunikace se používá, ovšem s tím, že třetí typ (QUERY) není zmíněn, protože pro něj nepotřebujeme použít Redis. Ovšem pro komunikace typu PUSH-PULL a PUBLISH-SUBSCRIBE je Redis v mnoha případech poměrně dobrým řešením.

Obrázek 1: Příklad aplikace používající architekturu kappa. Teoreticky je možné i zde použít Redis, ovšem častěji se setkáme se specializovaným řešením založeným na projektu Apache Kafka (méně často s NATS).

V souvislosti se zprávami i s celým návrhem architektury aplikace založené na mikroslužbách se často setkáme s termíny CQS a CQRS. Termín CQS znamená Command–query separation a CQRS je zkratkou vzniklou z Command-query responsibility segregation (nebo separation). CQR se používá při vývoji a znamená takový návrh aplikace (typicky založené na OOP, ale není to nutné), kdy každá funkce či metoda provádí buď nějaký příkaz (command) nebo slouží k získání dat (query); žádná z metod by neměla provádět obě akce. Zatímco se metodika CQR typicky aplikuje na jednotlivé třídy, tedy na relativně malou a izolovanou část aplikace, je druhá z metodik CQRS aplikována na celou architekturu služeb a mikroslužeb, přičemž command je ta část modelu aplikace, která slouží ke změně stavu a query druhá část modelu používaná pro agregaci dat.

2. Zpráva požadující provedení úlohy: COMMAND

„Microservice – “small autonomous services modelled around business domain that work together““

Sam Newman, jeden z původních autorů myšlenky mikroslužeb

Zprávy typu COMMAND obecně slouží ke změně stavu aplikace a obecně tedy provádí nějaký side-effect, typicky modifikaci dat (kromě odpovědi se změní i nějaká další část aplikace, například se zapíše záznam do databáze atd.). Můžeme se ale setkat i s takovými zprávami typu COMMAND, které stav aplikace nezmění. Poměrně dobrým příkladem může být žádost o poslání e-mailu uživateli – zde se tedy mění spíše stav okolního systému (mailboxu příjemce).

Zprávy typu COMMAND jsou typické tím, že většinou existuje pouze jediná komponenta, která může daný příkaz provést. To, o kterou komponentu ve funkci příjemce zprávy se konkrétně jedná, však nemusí zdrojová komponenta (tj. komponenta, která příkaz posílá) řešit, resp. přesněji řečeno by to ani ve správně navržené aplikaci neměla řešit, protože by se jednalo o zbytečně těsné svázání obou komponent (mikroslužeb).

Většinou taktéž požadujeme, aby přijímající komponenta poslala odpověď na zprávu typu COMMAND. V naprosté většině případů se jedná o jednoduchou stavovou informaci typu OK/Not OK popř. ACK/NACK, jen výjimečně s dalšími daty (například s ID vytvořeného požadavku); v případě, že by v odpovědi byla další data, jednalo by se pravděpodobně o porušení CQRS. Tento typ odpovědi může být poslán synchronně či asynchronně.

Existuje několik způsobů směrování (routing) používaných pro zprávy typu COMMAND. Typicky je možné rozhodnout přímo na základě příslušného příkazu, která komponenta má příkaz zpracovat (příklady příkazů: create_new_user, send_notification_email atd.). V tom nejjednodušším případě se příkaz ihned přepošle cílové komponentě, ovšem většinou se setkáme s využitím front zpráv (message queue), které slouží jak pro zajištění persistence zpráv v případě, že přijímající komponenta není spuštěna či pokud je přetížena, tak i případně pro load balancin. Použitou komunikační strategií je tedy strategie PUSH-PULL.

Obrázek 2: Komunikační strategie typu PUSH-PULL bez použití fronty.

3. Zpráva poslaná ve chvíli, kdy došlo k nějaké události – EVENT

Druhý typ zpráv nazvaný EVENT je prakticky libovolnými komponentami posílán ve chvíli, kdy dojde k určité události, o nichž chce komponenta informovat okolní systém. Může se jednat o prakticky libovolnou událost (tedy nikoli pouze o událost na GUI). Příkladem může být detekce změny některých dat, zjištění, že došlo k překročení nějakého časového limitu, informace o přetížení určitého uzlu v clusteru, informace o překročení nastaveného limitu databáze, opakované pokusy o přihlášení atd. Můžeme sem řadit i komponenty/mikroslužby typu cron, které slouží právě k posílání informací o naplánovaných událostech, ať již periodických (zaslání e-mailu se žádostí o změnu hesla, pravidelná kontrola jiné služby přes její API), tak i neperiodických (ad-hoc události). Typicky komponenta pouze oznámí, že došlo k nějaké události a neočekává žádné odpovědi. Z implementačního hlediska je posílání a zpracování těchto zpráv nejjednodušší.

Zprávy typu EVENT se většinou směrují odlišným způsobem, než zprávy typu COMMAND. Je tomu tak z toho důvodu, že na události může reagovat obecně větší množství komponent, nikoli jediný typ komponenty. Z tohoto důvodu se používá komunikační strategie PUBLISH-SUBSCRIBE neboli PUB-SUB, která je podporována většinou message brokerů. Ovšem můžeme se setkat i s dalšími konfiguracemi, například s takzvanými soupeřícími konzumenty (competing consumers) nebo s balanced consumers.

Obrázek 3: Komunikační strategie typu PUBLISH-SUBSCRIBE používaná při vzniku událostí.

Poznámka: stále častěji se ovšem setkáme s tím, že se namísto klasické komunikační strategie PUBLISH-SUBSCRIBE použije technologie streamingu událostí implementovaná v již zmíněném NATS Streaming Serveru nebo v Apache Kafka. V takovém případě se záznam o události neztratí (pokud ho komponenty nestihnou přijmout), ale je uložen do rostoucího logu s neměnitelnými záznamy předchozích událostí. Toto řešení je pro některé typy problémů takřka ideální, ovšem u některých aplikací můžeme vidět i opačný problém – nasazování streamingu i tam, kde by neměl být použit resp. tam, kde jeho použití pouze přináší větší nároky na systémové zdroje.

4. Implementace komunikačních strategií PUSH-PULL a PUBLISH-SUBSCRIBE

Obě výše zmíněné komunikační strategie lze v Go (ale samozřejmě i v dalších jazycích) relativně snadno implementovat s využitím Redisu, přičemž se využijí jak jeho základní podporované datové typy (zejména seznamy), tak i možnosti poskytované knihovnou pro jazyk Go, která nabízí realizaci komunikační strategie PubSub (přesněji PUBLISH-SUBSCRIBE) takovým způsobem, že lze snadno využít kanály. Dále uvedené demonstrační příklady jsou vždy realizovány jediným zdrojovým kódem, v němž je naprogramován jak producent (producer) či zdroj (publisher) zpráv, tak i jejich konzument (consumer) či příjemce (subscriber). V praxi jsou však tyto role většinou odděleny, což lze v případě dále popsaných demonstračních příkladů realizovat poměrně snadným způsobem.

5. Jednoduchý message broker implementující strategii PUSH-PULL

Již v závěru předchozího článku jsme si ukázali funkce naprogramované v jazyku Go, které je možné využít při implementaci jednoduchého message brokera s frontou, který implementuje komunikační strategii PUSH-PULL: producenti zpráv používají operaci typu PUSH, konzumenti pak typu PULL. Jak producenta tak i konzumenta zpráv lze realizovat jediným programem, ovšem (pochopitelně) s tím, že producent poběží v jiné gorutině než konzument:

// spustíme producenta zpráv go producer(client, context, queueName, 0, 10) timeout, err := time.ParseDuration("10s") if err != nil { panic(err) } // nyní již můžeme spustit konzumenta zpráv consumer(client, context, queueName, timeout)

Takový systém bude funkční a jeho úplný programový kód je zobrazen pod tímto odstavcem:

package main import ( "context" "fmt" "time" "github.com/go-redis/redis/v8" ) // adresa určující službu Redisu, která se má použít const redisAddress = "localhost:6379" // jméno hodnoty použité pro implementaci jednoduché fronty const queueName = "fronta" // printQueueLength vypíše aktuální délku fronty, samotná délka je přitom // získána jiným způsobem (vložením prvku, použitím LLen atd.) func printQueueLength(length int64) { fmt.Printf("Queue length after enqueuing is %d

", length) } // mustEnqueue zajistí vložení prvku do fronty popř. pád aplikace v případě, // kdy vložení není možné provést (Redis je odpojen atd.) func mustEnqueueInteger(client *redis.Client, context context.Context, key string, value int) { fmt.Printf("Enqueuing %d into queue named '%s'

", value, key) // přidání prvku do seznamu length, err := client.LPush(context, key, value).Result() if err != nil { panic(err) } printQueueLength(length) } func producer(client *redis.Client, context context.Context, key string, from int, to int) { // postupné vložení prvků do fronty for i := from; i < to; i++ { mustEnqueueInteger(client, context, queueName, i) time.Sleep(1 * time.Second) } } func consumer(client *redis.Client, context context.Context, key string, timeout time.Duration) { // přečtení všech hodnot z fronty for { // pokus o přečtení hodnoty z fronty keyValue, err := client.BRPop(context, timeout, queueName).Result() // vyhodnocení předchozí operace switch { case err == redis.Nil: fmt.Println("no value found") return case err != nil: panic(err) default: key := keyValue[0] value := keyValue[1] fmt.Printf( "Value dequed from queue named '%s': '%s'

", key, value) } length := client.LLen(context, queueName).Val() printQueueLength(length) fmt.Println() } } // vstupní bod do demonstračního příkladu func main() { // vytvoření nového klienta s předáním konfiguračních parametrů client := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: "", // no password set DB: 0, // use default DB }) // neměli bychom zapomenout na ukončení práce s klientem defer func() { err := client.Close() if err != nil { panic(err) } }() // získáme kontext context := client.Context() // pokus o klasický handshake typu PING-PONG _, err := client.Ping(context).Result() if err != nil { panic(err) } // smazání seznamu, pokud existoval client.Del(context, queueName) // spustíme producenta zpráv go producer(client, context, queueName, 0, 10) timeout, err := time.ParseDuration("10s") if err != nil { panic(err) } // nyní již můžeme spustit konzumenta zpráv consumer(client, context, queueName, timeout) }

6. Otestování činnosti message brokera

Po překladu zdrojového kódu a spuštění procesu by se měly objevit zprávy, které ukazují, že konzument začíná zprávy číst ihned poté, co jsou připraveny producentem:

Enqueuing 0 into queue named 'fronta' Value dequed from queue named 'fronta': '0' Queue length after enqueuing is 1 Queue length after enqueuing is 0 Enqueuing 1 into queue named 'fronta' Queue length after enqueuing is 1 Value dequed from queue named 'fronta': '1' Queue length after enqueuing is 0 Enqueuing 2 into queue named 'fronta' Value dequed from queue named 'fronta': '2' Queue length after enqueuing is 1 Queue length after enqueuing is 0 Enqueuing 3 into queue named 'fronta' Queue length after enqueuing is 1 Value dequed from queue named 'fronta': '3' Queue length after enqueuing is 0 Enqueuing 4 into queue named 'fronta' Queue length after enqueuing is 1 Value dequed from queue named 'fronta': '4' Queue length after enqueuing is 0 Enqueuing 5 into queue named 'fronta' Queue length after enqueuing is 1 Value dequed from queue named 'fronta': '5' Queue length after enqueuing is 0 Enqueuing 6 into queue named 'fronta' Queue length after enqueuing is 1 Value dequed from queue named 'fronta': '6' Queue length after enqueuing is 0 Enqueuing 7 into queue named 'fronta' Queue length after enqueuing is 1 Value dequed from queue named 'fronta': '7' Queue length after enqueuing is 0 Enqueuing 8 into queue named 'fronta' Queue length after enqueuing is 1 Value dequed from queue named 'fronta': '8' Queue length after enqueuing is 0 Enqueuing 9 into queue named 'fronta' Queue length after enqueuing is 1 Value dequed from queue named 'fronta': '9' Queue length after enqueuing is 0 no value found

7. Chování brokera při pozdějším spuštění konzumenta zpráv

Taktéž si můžeme otestovat chování brokera ve chvíli, kdy je konzument zpráv spuštěn později (což je ostatně v praxi velmi častý příklad). Upravíme tedy konec předchozího demonstračního příkladu vložením time.Sleep, aby konzument začal zpracovávat zprávy přibližně až po sedmi sekundách a nikoli okamžitě:

// spustíme producenta zpráv go producer(client, context, queueName, 0, 10) timeout, err := time.ParseDuration("10s") if err != nil { panic(err) } // konzument (jeho start) bude zpomalen time.Sleep(7 * time.Second) // nyní již můžeme spustit konzumenta zpráv consumer(client, context, queueName, timeout)

Výsledek nyní bude podle očekávání vypadat odlišně, protože se nejdříve začnou zprávy hromadit ve frontě, poté je konzument rychle zpracuje a následně bude čekat na zbývající tři zprávy:

Enqueuing 0 into queue named 'fronta' Queue length after enqueuing is 1 Enqueuing 1 into queue named 'fronta' Queue length after enqueuing is 2 Enqueuing 2 into queue named 'fronta' Queue length after enqueuing is 3 Enqueuing 3 into queue named 'fronta' Queue length after enqueuing is 4 Enqueuing 4 into queue named 'fronta' Queue length after enqueuing is 5 Enqueuing 5 into queue named 'fronta' Queue length after enqueuing is 6 Enqueuing 6 into queue named 'fronta' Queue length after enqueuing is 7 Value dequed from queue named 'fronta': '0' Queue length after enqueuing is 6 Value dequed from queue named 'fronta': '1' Queue length after enqueuing is 5 Value dequed from queue named 'fronta': '2' Queue length after enqueuing is 4 Value dequed from queue named 'fronta': '3' Queue length after enqueuing is 3 Value dequed from queue named 'fronta': '4' Queue length after enqueuing is 2 Value dequed from queue named 'fronta': '5' Queue length after enqueuing is 1 Value dequed from queue named 'fronta': '6' Queue length after enqueuing is 0 Enqueuing 7 into queue named 'fronta' Value dequed from queue named 'fronta': '7' Queue length after enqueuing is 1 Queue length after enqueuing is 0 Enqueuing 8 into queue named 'fronta' Queue length after enqueuing is 1 Value dequed from queue named 'fronta': '8' Queue length after enqueuing is 0 Enqueuing 9 into queue named 'fronta' Value dequed from queue named 'fronta': '9' Queue length after enqueuing is 1 Queue length after enqueuing is 0 no value found

8. Vylepšení příkladu: použití session se všemi informacemi o připojení k Redisu

Při vývoji aplikací v Go nebývá zvykem předávat do funkcí či metod příliš velké množství parametrů (ostatně pravidla formátování jsou v tomto ohledu poněkud nepříjemná, takže zápis parametrů pod sebe je nepřehledný), takže se namísto toho příbuzné parametry předávají v jediné datové struktuře popř. v odkazu na strukturu. Toto řešení můžeme použít i při úpravě předchozího příkladu vytvořením struktury nesoucí informace o „sezení“ Redisu, tedy s odkazem na klienta a současně i na kontext:

type redisSession struct { client *redis.Client context context.Context }

Zdrojový kód demonstračního příkladu po úpravě bude vypadat následovně:

package main import ( "context" "fmt" "time" "github.com/go-redis/redis/v8" ) // adresa určující službu Redisu, která se má použít const redisAddress = "localhost:6379" // jméno hodnoty použité pro implementaci jednoduché fronty const queueName = "fronta" type redisSession struct { client *redis.Client context context.Context } // printQueueLength vypíše aktuální délku fronty, samotná délka je přitom // získána jiným způsobem (vložením prvku, použitím LLen atd.) func printQueueLength(length int64) { fmt.Printf("Queue length after enqueuing is %d

", length) } // mustEnqueue zajistí vložení prvku do fronty popř. pád aplikace v případě, // kdy vložení není možné provést (Redis je odpojen atd.) func mustEnqueueInteger(session redisSession, key string, value int) { fmt.Printf("Enqueuing %d into queue named '%s'

", value, key) // přidání prvku do seznamu length, err := session.client.LPush( session.context, key, value).Result() if err != nil { panic(err) } printQueueLength(length) } func producer(session redisSession, key string, from int, to int) { // postupné vložení prvků do fronty for i := from; i < to; i++ { mustEnqueueInteger(session, queueName, i) time.Sleep(1 * time.Second) } } func consumer(session redisSession, key string, timeout time.Duration) { // přečtení všech hodnot z fronty for { // pokus o přečtení hodnoty z fronty keyValue, err := session.client.BRPop( session.context, timeout, queueName).Result() // vyhodnocení předchozí operace switch { case err == redis.Nil: fmt.Println("no value found") return case err != nil: panic(err) default: key := keyValue[0] value := keyValue[1] fmt.Printf( "Value dequed from queue named '%s': '%s'

", key, value) } length := session.client.LLen(session.context, queueName).Val() printQueueLength(length) fmt.Println() } } // vstupní bod do demonstračního příkladu func main() { // vytvoření nového klienta s předáním konfiguračních parametrů client := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: "", // no password set DB: 0, // use default DB }) // neměli bychom zapomenout na ukončení práce s klientem defer func() { err := client.Close() if err != nil { panic(err) } }() // získáme kontext context := client.Context() session := redisSession{ client: client, context: context, } // pokus o klasický handshake typu PING-PONG _, err := client.Ping(context).Result() if err != nil { panic(err) } // smazání seznamu, pokud existoval client.Del(context, queueName) // spustíme producenta zpráv go producer(session, queueName, 0, 10) // je možné pustit více producentů // go producer(session, queueName, 11, 20) timeout, err := time.ParseDuration("10s") if err != nil { panic(err) } // nyní již můžeme spustit konzumenta zpráv consumer(session, queueName, timeout) }

Poznámka: chování konzumenta ani producenta by se nemělo nijak změnit.

9. Komunikační strategie PUBLISH-SUBSCRIBE

Komunikační strategie PUBLISH-SUBSCRIBE se v několika ohledech odlišuje od strategie PUSH-PULL. Ve strategii PUSH-PULL jednu zprávu přijme a zpracuje pouze jeden příjemce, zatímco v PUBLISH-SUBSCRIBE to může být libovolné množství příjemců. Navíc se v PUSH-PULL používá fronta, takže pokud není žádný příjemce připojen, je zpráva (mezi)uložena do fronty. V PUBLISH-SUBSCRIBE je tomu jinak – zpráva je dodána aktuálně připojeným příjemcům a pokud takoví neexistují, není vůbec předána. Zprávy jsou navíc posílány do takzvaného kanálu, který je obdobou fronty ve strategii PUSH-PULL. Redis a knihovna s rozhraním pro Go navíc umožňují, aby příjemce byl připojen k více kanálům, resp. podle správné terminologie aby byl odběratelem zpráv z většího množství kanálů. Tato komunikační strategie je tedy určena pro oddělení zdroje zpráv od příjemce, což je v mnoha případech velmi dobré řešení.

10. Implementace zdroje zpráv

Vzhledem k tomu, že samotný Redis podporuje operace PUBLISH i SUBSCRIBE, je implementace zdroje zpráv, tedy procesu nebo gorutiny, která zprávy posílá, jednoduchá – postačuje použít metodu Publish, které se kromě kontextu musí předat i jméno kanálu (řetězec) a vlastní zpráva (hodnota prakticky jakéhokoli datového typu). Pro otestování, zda se publikování zprávy podařilo, postačuje zavolat metodu Err objektu/struktury IntCmd. Implementace je tedy jen několikařádková:

// zdroj zpráv func publisher(session redisSession, channel string, from int, to int) { for i := from; i < to; i++ { err := session.client.Publish(session.context, channel, i).Err() if err != nil { panic(err) } time.Sleep(1 * time.Second) } }

Poznámka: povšimněte si, že se jedná o neblokující operaci, tudíž se u ní ani neuvádí timeout.

11. Implementace příjemce zpráv

Příjemce zpráv neboli subscriber je podobně jednoduchý, neboť zaregistrování a příjem zpráv je opět plně podporováno samotným Redisem. Nejdříve je nutné se zaregistrovat k příjmu zpráv z určitého kanálu (metoda Subscribe) a následně zprávy přijímat, například v nekonečné smyčce, metodou ReceiveMessage. Vrátí se dvě hodnoty – samotná zpráva (data) a informace o chybě:

// příjemce zpráv func subscriber(session redisSession, channel string) { pubsub := session.client.Subscribe(session.context, channel) for { message, err := pubsub.ReceiveMessage(session.context) if err != nil { panic(err) } fmt.Printf("Channel: %s Message: '%s'

", message.Channel, message.Payload) } }

Poznámka: tento způsob má jednu zásadní nevýhodu – umožňuje příjem zpráv z jednoho kanálu. Ovšem v dalších kapitolách si ukážeme řešení použitelné pro příjem zpráv z více kanálů.

12. Úplný zdrojový kód příkladu se zdrojem i příjemcem zpráv

Pro úplnost si ukažme úplný zdrojový kód příkladu, který používá zdroje i příjemce zpráv popsané v kapitole 10 a taktéž v kapitole 11. Příjemce pochopitelně musí běžet v jiném vláknu, než zdroj zpráv:

package main import ( "context" "fmt" "time" "github.com/go-redis/redis/v8" ) // adresa určující službu Redisu, která se má použít const redisAddress = "localhost:6379" // jméno kanálu const channelName = "c1" // struktura obsahující "session" type redisSession struct { client *redis.Client context context.Context } // zdroj zpráv func publisher(session redisSession, channel string, from int, to int) { for i := from; i < to; i++ { err := session.client.Publish(session.context, channel, i).Err() if err != nil { panic(err) } time.Sleep(1 * time.Second) } } // příjemce zpráv func subscriber(session redisSession, channel string) { pubsub := session.client.Subscribe(session.context, channel) for { message, err := pubsub.ReceiveMessage(session.context) if err != nil { panic(err) } fmt.Printf("Channel: %s Message: '%s'

", message.Channel, message.Payload) } } // vstupní bod do demonstračního příkladu func main() { // vytvoření nového klienta s předáním konfiguračních parametrů client := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: "", // no password set DB: 0, // use default DB }) // neměli bychom zapomenout na ukončení práce s klientem defer func() { err := client.Close() if err != nil { panic(err) } }() // získáme kontext context := client.Context() // vytvoříme session session := redisSession{ client: client, context: context, } // pokus o klasický handshake typu PING-PONG _, err := client.Ping(context).Result() if err != nil { panic(err) } // smazání kanálu, pokud existoval client.Del(context, channelName) // spustíme zdroj zpráv go publisher(session, channelName, 0, 10) // nyní můžeme spustit příjemce zpráv subscriber(session, channelName) }

Poznámka: celý zdrojový kód tohoto příkladu je také dostupný na adrese https://github.com/tisnik/go-root/blob/master/article 66 /02_pub_sub/02_pub_­sub.go

13. Chování systému ve chvíli, kdy je připojeno větší množství příjemců

Příklad si můžeme nepatrně upravit takovým způsobem, aby se spustilo větší množství příjemců zpráv, pochopitelně každý ve své vlastní gorutině. Upravit ve skutečnosti postačuje jen posledních pět řádků funkce main, a to takto:

// nyní můžeme spustit několik příjemců zpráv go subscriber(session, channelName) go subscriber(session, channelName) go subscriber(session, channelName) // spustíme zdroj zpráv publisher(session, channelName, 0, 10)

V praxi to znamená, že na stejném kanálu bude naslouchat několik příjemců a každý z těchto příjemců získá stejné zprávy – ostatně právě proto se jedná o komunikační strategii typu PUBLISH-SUBSCRIBE. Zdroj zpráv (či více zdrojů) přitom nemá žádnou přímou vazbu na příjemce, takže je zajištěno oddělení (decoupling) těchto modulů. Po spuštění příkladu by se stejná zpráva (představovaná pro jednoduchost celým číslem) měla přeposlat všem třem příjemcům:

Channel: c1 Message: '1' Channel: c1 Message: '1' Channel: c1 Message: '1' Channel: c1 Message: '2' Channel: c1 Message: '2' Channel: c1 Message: '2' Channel: c1 Message: '3' Channel: c1 Message: '3' Channel: c1 Message: '3' Channel: c1 Message: '4' Channel: c1 Message: '4' Channel: c1 Message: '4' Channel: c1 Message: '5' Channel: c1 Message: '5' Channel: c1 Message: '5' Channel: c1 Message: '6' Channel: c1 Message: '6' Channel: c1 Message: '6' Channel: c1 Message: '7' Channel: c1 Message: '7' Channel: c1 Message: '7' Channel: c1 Message: '8' Channel: c1 Message: '8' Channel: c1 Message: '8' Channel: c1 Message: '9' Channel: c1 Message: '9' Channel: c1 Message: '9'

Upravený demonstrační příklad vypadá takto:

package main import ( "context" "fmt" "time" "github.com/go-redis/redis/v8" ) // adresa určující službu Redisu, která se má použít const redisAddress = "localhost:6379" // jméno kanálu const channelName = "c1" // struktura obsahující "session" type redisSession struct { client *redis.Client context context.Context } // zdroj zpráv func publisher(session redisSession, channel string, from int, to int) { for i := from; i < to; i++ { err := session.client.Publish(session.context, channel, i).Err() if err != nil { panic(err) } time.Sleep(1 * time.Second) } } // příjemce zpráv func subscriber(session redisSession, channel string) { pubsub := session.client.Subscribe(session.context, channel) for { message, err := pubsub.ReceiveMessage(session.context) if err != nil { panic(err) } fmt.Printf("Channel: %s Message: '%s'

", message.Channel, message.Payload) } } // vstupní bod do demonstračního příkladu func main() { // vytvoření nového klienta s předáním konfiguračních parametrů client := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: "", // no password set DB: 0, // use default DB }) // neměli bychom zapomenout na ukončení práce s klientem defer func() { err := client.Close() if err != nil { panic(err) } }() // získáme kontext context := client.Context() // vytvoříme session session := redisSession{ client: client, context: context, } // pokus o klasický handshake typu PING-PONG _, err := client.Ping(context).Result() if err != nil { panic(err) } // smazání kanálu, pokud existoval client.Del(context, channelName) // nyní můžeme spustit několik příjemců zpráv go subscriber(session, channelName) go subscriber(session, channelName) go subscriber(session, channelName) // spustíme zdroj zpráv publisher(session, channelName, 0, 10) }

14. Příjemce používající kanál jazyka Go

Předchozí implementace příjemce s využitím nekonečné smyčky, v níž se vždy čeká na další zprávu, má jeden nepříjemný důsledek – bylo by poměrně složité zařídit, aby se zprávy četly z většího množství kanálů Redisu, což je ovšem na druhou stranu poměrně běžný požadavek. Řešení tohoto problému existuje a spočívá ve využití kanálů programovacího jazyka Go (což je něco zcela jiného než pojmenovaný kanál Redisu). Příjemce lze upravit takto:

// příjemce zpráv func subscriber(session redisSession, channel string) { pubsub := session.client.Subscribe(session.context, channel) ch := pubsub.Channel() for message := range ch { fmt.Printf("Channel: %s Message: '%s'

", message.Channel, message.Payload) } }

Koncepce kanálů v jazyku Go je velmi mocná, protože nám umožňuje například čekat na zprávu, která může dojít do libovolného kanálu:

select { case <-ch1: fmt.Println("Data z kanálu 1") case <-ch2: fmt.Println("Data z kanálu 2") }

Tuto programovou konstrukci můžeme rozšířit o větev vyvolanou ve chvíli, kdy žádné zprávy nejsou v určitém časovém úseku přijaty:

select { case <-ch1: fmt.Println("Data z kanálu 1") case <-ch2: fmt.Println("Data z kanálu 2") case <-time.After(10 * time.Second): fmt.Println("Timeout!") }

Poznámka: v praxi to znamená, že se celé schéma poskládání mikroslužeb může stát komplikovanější, protože jedna mikroslužba může odebírat zprávy z většího množství kanálů, nebo naopak může zprávy produkovat do více kanálů.

Příklad s upravenou funkcí příjemce může vypadat následovně (ostatní kód se nijak nezměnil):

package main import ( "context" "fmt" "time" "github.com/go-redis/redis/v8" ) // adresa určující službu Redisu, která se má použít const redisAddress = "localhost:6379" // jméno kanálu const channelName = "c1" // struktura obsahující "session" type redisSession struct { client *redis.Client context context.Context } // zdroj zpráv func publisher(session redisSession, channel string, from int, to int) { for i := from; i < to; i++ { err := session.client.Publish(session.context, channel, i).Err() if err != nil { panic(err) } time.Sleep(1 * time.Second) } } // příjemce zpráv func subscriber(session redisSession, channel string) { pubsub := session.client.Subscribe(session.context, channel) ch := pubsub.Channel() for message := range ch { fmt.Printf("Channel: %s Message: '%s'

", message.Channel, message.Payload) } } // vstupní bod do demonstračního příkladu func main() { // vytvoření nového klienta s předáním konfiguračních parametrů client := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: "", // no password set DB: 0, // use default DB }) // neměli bychom zapomenout na ukončení práce s klientem defer func() { err := client.Close() if err != nil { panic(err) } }() // získáme kontext context := client.Context() // vytvoříme session session := redisSession{ client: client, context: context, } // pokus o klasický handshake typu PING-PONG _, err := client.Ping(context).Result() if err != nil { panic(err) } // smazání kanálu, pokud existoval client.Del(context, channelName) // nyní můžeme spustit několik příjemců zpráv go subscriber(session, channelName) go subscriber(session, channelName) go subscriber(session, channelName) // spustíme zdroj zpráv publisher(session, channelName, 0, 10) }

15. Využití pipeline nabízené Redisem

V Redisu je možné využít i takzvané pipeline, což jsou vlastně sekvence příkazů zpracované sekvenčně v takovém pořadí, v jakém jsou specifikovány. Nejedná se však o transakce a příkazy provedené v rámci pipeline ani nesplňují kritéria ACID. Samotné příkazy vkládané do pipeline se vytváří pomocí rozhraní Pipeliner, jehož předpis vypadá následovně:

type Pipeliner interface { StatefulCmdable Do(ctx context.Context, args ...interface{}) *Cmd Process(ctx context.Context, cmd Cmder) error Close() error Discard() error Exec(ctx context.Context) ([]Cmder, error) }

Příklad deklarace nové pipeline:

_, err = client.Pipelined(context, func(pipe redis.Pipeliner) error { ... ... ... return nil }) if err != nil { panic(err) }

V pipeline, která je reprezentována anonymní funkcí, lze pochopitelně používat i proměnné deklarované vně funkce a viditelné uvnitř jejího těla (Go podporuje uzávěry). Proto můžeme vytvořit pipeline se třemi příkazy, které ovlivní obsah vázaných proměnných counter1, counter2 a accumulator:

var counter1 *redis.IntCmd var counter2 *redis.IntCmd var accumulator *redis.FloatCmd _, err = client.Pipelined(context, func(pipe redis.Pipeliner) error { counter1 = pipe.Incr(context, "counter1") counter2 = pipe.Decr(context, "counter2") accumulator = pipe.IncrByFloat(context, "accumulator", 3.14) return nil }) if err != nil { panic(err) }

16. Zdrojový kód příkladu používajícího pipeline

V následujícím (dnes již předposledním) demonstračním příkladu je celá pipeline zavolána pětkrát za sebou, pokaždé s následným výpisem obsahu obou čítačů i akumulátoru:

package main import ( "fmt" "github.com/go-redis/redis/v8" ) // adresa určující službu Redisu, která se má použít const redisAddress = "localhost:6379" func main() { // vytvoření nového klienta s předáním konfiguračních parametrů client := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: "", // no password set DB: 0, // use default DB }) // neměli bychom zapomenout na ukončení práce s klientem defer func() { err := client.Close() if err != nil { panic(err) } }() // získáme kontext context := client.Context() // pokus o klasický handshake typu PING-PONG _, err := client.Ping(context).Result() if err != nil { panic(err) } // smazání hodnoty, pokud existovala client.Del(context, "counter1") client.Del(context, "counter2") client.Del(context, "accumulator") var counter1 *redis.IntCmd var counter2 *redis.IntCmd var accumulator *redis.FloatCmd for i := 0; i < 5; i++ { _, err = client.Pipelined(context, func(pipe redis.Pipeliner) error { counter1 = pipe.Incr(context, "counter1") counter2 = pipe.Decr(context, "counter2") accumulator = pipe.IncrByFloat(context, "accumulator", 3.14) return nil }) if err != nil { panic(err) } fmt.Printf("1st counter: %d

", counter1.Val()) fmt.Printf("2nd counter: %d

", counter2.Val()) fmt.Printf("accumulator: %f



", accumulator.Val()) } }

17. Skripty naprogramované v jazyku Lua a spouštěné na serveru

Jednou z poměrně velkých předností Redisu je fakt, že od verze 2.6 je možné dokonce využít i podporu pro přímé spouštění příkazů Redisu ze skriptů napsaných v programovacím jazyce Lua, který se skutečně velmi dobře hodí pro skriptování aplikací. Důležité přitom je, že skripty lze nahrát přímo do serveru Redisu a spustit je až ve chvíli, kdy jsou skutečně zapotřebí. Pro často prováděné operace tak může dojít k poměrně významnému ušetření přenosové kapacity, nehledě na to, že skripty psané v jazyku Lua mohou pro aplikace představovat rozhraní, které aplikace odstíní od nízkoúrovňového přístupu Redisu. Tímto způsobem by například bylo možné realizovat operace s frontou, prioritní frontou či specializovanější operace prováděné v transakci (lze zařídit i ACID).

18. Příklad jednoduchého skriptu spouštěného na straně serveru

Podívejme se nyní na jednoduchý skript naprogramovaný v jazyku Lua. Tento skript byl získán přímo z dokumentace k rozhraní k Redisu a vrací výsledek zvýšení nějakého čítače (hodnoty uložené pod klíčem) o zadaný offset:

if redis.call("GET", KEYS[1]) ~= false then return redis.call("INCRBY", KEYS[1], ARGV[1]) end return false

Skript načteme běžnými funkcemi ze standardní knihovny jazyka Go:

func mustLoadScriptSource(filename string) string { bytes, err := ioutil.ReadFile(filename) if err != nil { panic(err) } return string(bytes) } scriptSource := mustLoadScriptSource("script.lua")

Převedeme zdrojový kód na objekt reprezentující spustitelný skript:

script := redis.NewScript(scriptSource)

A použijeme ho:

n, err := script.Run(context, client, []string{counterKey}, 2).Result() fmt.Println(n, err)

Poznámka: povšimněte si způsobu předávání parametrů skriptu (pole řetězců, což odpovídá zpracování parametru).

Úplný zdrojový kód takto vytvořeného demonstračního příkladu vypadá následovně:

package main import ( "fmt" "io/ioutil" "github.com/go-redis/redis/v8" ) // adresa určující službu Redisu, která se má použít const redisAddress = "localhost:6379" const counterKey = "counter2" func mustLoadScriptSource(filename string) string { bytes, err := ioutil.ReadFile(filename) if err != nil { panic(err) } return string(bytes) } func main() { // vytvoření nového klienta s předáním konfiguračních parametrů client := redis.NewClient(&redis.Options{ Addr: redisAddress, Password: "", // no password set DB: 0, // use default DB }) // neměli bychom zapomenout na ukončení práce s klientem defer func() { err := client.Close() if err != nil { panic(err) } }() // získáme kontext context := client.Context() // pokus o klasický handshake typu PING-PONG _, err := client.Ping(context).Result() if err != nil { panic(err) } scriptSource := mustLoadScriptSource("script.lua") fmt.Printf("Loaded script:

%s



", scriptSource) script := redis.NewScript(scriptSource) n, err := script.Run(context, client, []string{counterKey}, 2).Result() fmt.Println(n, err) err = client.Set(context, counterKey, "40", 0).Err() if err != nil { panic(err) } n, err = script.Run(context, client, []string{counterKey}, 2).Result() fmt.Println(n, err) }

19. Repositář s demonstračními příklady

Zdrojové kódy všech dnes použitých demonstračních příkladů byly uloženy do nového Git repositáře, který je dostupný na adrese https://github.com/tisnik/go-root (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má přibližně stovku kilobajtů), můžete namísto toho použít odkazy na jednotlivé demonstrační příklady, které naleznete v následující tabulce:

Poznámka: jednotlivé zdrojové kódy jsou umístěny ve vlastních adresářích, protože musíme využít systém modulů programovacího jazyka Go

20. Odkazy na Internetu