Hlavní navigace

NSQ – systém pro doručování zpráv bez centrálního message brokera (dokončení)

Pavel Tišnovský

Ve druhém článku o nástroji NSQ si ukážeme některé možné konfigurace používané v praxi. Zaměříme se i na ty, které do značné míry vylučují existenci SPOF, tedy jednoho uzlu (počítače, služby, procesu).

Doba čtení: 39 minut

Sdílet

11. Paralelně a nezávisle na sobě běžící služby nsqd

12. Chování dvou nezávislých služeb nsqd

13. Konzument zpráv připojený k více službám nsqd

14. Chování dvou nezávislých služeb nsqd s konzumentem připojeným k oběma službám

15. Využití adresářové služby nsqlookupd

16. Úprava klienta (konzumenta) pro použití nsqlookupd

17. Otestování chování klienta

18. Konzument současně přijímající zprávy z více služeb nsqd

19. Repositář s demonstračními příklady

20. Odkazy na Internetu

1. NSQ – systém pro doručování zpráv bez centrálního message brokera (dokončení)

V prvním článku o nástroji NSQ jsme se seznámili se základními vlastnostmi tohoto systému pro doručování zpráv vyvinutého v programovacím jazyce Go. Připomeňme si, že tento systém je založen na myšlence decentralizovaného doručování zpráv, což znamená, že se nejedná o klasického message brokera, ale o distribuovaný systém s větším množstvím uzlů, k nimž se mohou připojit jak producenti zpráv, tak i jejich konzumenti (příjemci zpráv). Celý systém je přitom navržen takovým způsobem, aby se (pochopitelně při správné konfiguraci a pokud je to vůbec vyžadováno) výpadek nějakého uzlu neprojevil na pádu či nedostupnosti celého systému. Na druhou stranu je však možné, že jedna zpráva bude nějakým příjemcem přijata vícekrát, což je daň, kterou musíme zaplatit za decentralizovanou architekturu (jinými slovy to znamená, že by příjemci měli být idempotentní, což si někdy vyžádá nutnost použití databáze či jiného datového úložiště s „pamětí“ již zpracovaných zpráv).

Celý systém NSQ je složen z několika typů uzlů, přičemž jednotlivé uzly mohou být provozovány na různých počítačích, geograficky distribuovány atd. Jedná se o následující typy uzlů:

  1. nsqlookupd – jedná se o takzvanou directory service, tj. o službu, do které se registrují všechny ostatní uzly a která tak má přehled o umístění (adresa+číslo portu) i o stavu jednotlivých uzlů. Konzumenti se typicky nejprve připojují právě k nsqlookupd, aby zjistili, které brokery jsou schopny jim dodávat zprávy požadovaných témat (topic). Existuje ovšem možnost nsqlookupd vůbec nevyužívat a připojovat se přímo ke službě nsqd.
  2. nsqd – tyto uzly, jichž může běžet libovolné množství, zajišťují vlastní příjem zpráv, jejich ukládání do front a následné doručení konzumentům. Zprávy jsou perzistentní, tj. ve chvíli, kdy je uzel nsqd zastavován, jsou uloženy do souboru a po znovuspuštění jsou ze souboru načteny do paměti a popř. zaslány konzumentům.
  3. nsqadmin – tato služba poskytuje (webové) uživatelské rozhraní, ze kterého je možné sledovat činnost celého systému. Samotné sledování je primárně založeno na komunikaci s prvním typem uzlu – nsqlookupd a sekundárně s uzly nsqd.

Obrázek 1: Klasický message broker využívaný dalšími systémy.

Připomeňme si navíc, že zprávy se do NSQ posílají s nastaveným tématem (topic). Výběr (či možná lépe řečeno odběr) zpráv je nepatrně složitější, protože je nutné specifikovat jak téma odebíraných zpráv, tak i takzvaný kanál (channel), ze kterého se zprávy vybírají. Pokud pro nějaké schéma existuje větší množství kanálů, bude zpráva přeposlána (distribuována) do všech kanálů s tímto tématem. V případě, že se k jednomu kanálu připojí více příjemců, bude zpráva doručena jen jednomu z nich (buď náhodně nebo prvnímu volnému příjemci). Případný load-balancing se tedy provádí na úrovni kanálů, které můžeme považovat za obdobu front zpráv (message queue), jež známe z popisu klasických message brokerů (ve skutečnosti se ovšem fronta interně vytváří i na úrovni samotného tématu, tj. na vstupu do nsqd). Některé možné konfigurace systému NSQ si naznačíme na schématech v navazujících kapitolách.

Obrázek 2: Počet producentů ani konzumentů není v podstatě nijak omezen.

2. Zdrojové kódy producentů a konzumentů zpráv použitých v dnešním článku

Pro otestování základních funkcí různých konfigurací systému NSQ použijeme producenta zpráv naprogramovaného v jazyce Go. Tento producent bude s frekvencí přibližně jedné sekundy vytvářet zprávy, které budou posílány do lokálně běžící služby nsqd. Zprávy budou směrovány do tématu „test“. Zdrojový kód tohoto producenta naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/nsq/producer3.go:

package main
 
import (
        "fmt"
        "github.com/nsqio/go-nsq"
        "log"
        "time"
)
 
const Address = "127.0.0.1:4150"
 
const Topic = "test"
 
func main() {
        config := nsq.NewConfig()
 
        producer, err := nsq.NewProducer(Address, config)
        if err != nil {
                log.Panic("Producer can't be constructed")
        }
        defer producer.Stop()
 
        i := 0
 
        for {
                message := fmt.Sprintf("Zprava z Go #%d", i)
                log.Print("Sending message: ", message)
                err = producer.Publish(Topic, []byte(message))
                if err != nil {
                        log.Panic("Could not connect")
                }
                i++
                time.Sleep(1 * time.Second)
        }
}

Taktéž pochopitelně použijeme konzumenta, který bude opět vyvinut v programovacím jazyce Go. První varianta konzumenta, kterou naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/nsq/con­sumer4_chan_A.go, se bude připojovat na lokálně běžící službu nsqd (tedy na stejnou službu, jako konzument) a bude odebírat zprávy s tématem „test“ z kanálu, jehož jméno je z důvodu co největší jednoduchosti nastaveno na „A“ (ovšem může se jednat o libovolný řetězec):

package main
 
import (
        "github.com/nsqio/go-nsq"
        "log"
)
 
const Address = "127.0.0.1:4150"
 
const Topic = "test"
 
const Channel = "A"
 
func main() {
        config := nsq.NewConfig()
 
        consumer, err := nsq.NewConsumer(Topic, Channel, config)
        if err != nil {
                log.Panic("Consumer can't be constructed")
        }
 
        done := make(chan bool)
 
        consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
                log.Printf("Received a message: %s", string(message.Body))
                // done <- true
                return nil
        }))
 
        err = consumer.ConnectToNSQD(Address)
        if err != nil {
                log.Panic("Could not connect")
        }
 
        log.Println("Waiting for message")
        <-done
}

Druhý konzument se odlišuje jen v tom, že se připojuje na kanál B. Zdrojový kód tohoto konzumenta je uložen na adrese https://github.com/tisnik/message-queues-examples/blob/master/nsq/con­sumer4_chan_B.go:

package main
 
import (
        "github.com/nsqio/go-nsq"
        "log"
)
 
const Address = "127.0.0.1:4150"
 
const Topic = "test"
 
const Channel = "B"
 
func main() {
        config := nsq.NewConfig()
 
        consumer, err := nsq.NewConsumer(Topic, Channel, config)
        if err != nil {
                log.Panic("Consumer can't be constructed")
        }
 
        done := make(chan bool)
 
        consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
                log.Printf("Received a message: %s", string(message.Body))
                // done <- true
                return nil
        }))
 
        err = consumer.ConnectToNSQD(Address)
        if err != nil {
                log.Panic("Could not connect")
        }
 
        log.Println("Waiting for message")
        <-done
}

3. Nejjednodušší konfigurace s jediným nsqd, využití serializace zpráv na disk

V závislosti na tom, jaké chování a vlastnosti od systému doručování zpráv vyžadujeme, je možné NSQ nakonfigurovat několika možnými způsoby. Nejprve si pro úplnost uvedeme tu nejjednodušší možnou konfiguraci, v níž vystupuje pouze jediný uzel typu nsqd, k němuž se připojují jak ty služby, které zprávy vytváří, tak i konzumenti zpráv. V případě, že bude použito jen jediné téma (se jménem „“) a jediný kanál (se jménem „A“), bude celá architektura připomínat jednoduchého message brokera s jedinou frontou zpráv:

Obrázek 3: Nejjednodušší konfigurace systému NSQ s jediným nsqd.

Předchozí nastavení NSQ bylo skutečně triviální, ovšem v praxi se setkáme s mnoha dalšími požadavky na službu pro doručování zpráv, které je nutné nějakým způsobem implementovat. Poměrně častý a přitom jednoduše řešitelný je požadavek na to, aby se zprávy nehromadily v operační paměti alokované službou nsqd. Při startu nsqd je možné určit maximální kapacitu paměti použitou pro zapamatování zpráv. Tato kapacita (resp. přesněji řečeno mezní hodnota) se označuje termínem „high water mark“. Ve chvíli, kdy by mělo dojít k překročení této kapacity, budou zprávy uloženy na disk, takže se paměťové požadavky uzlu nsqd mohou udržet ve stanovených mezích. Z pohledu producentů a konzumentů zpráv ovšem nedojde k žádné podstatné změně:

Obrázek 4: Konfigurace systému NSQ s jediným nsqd a případnou serializací zpráv na disk.

Poznámka: na tomto místě si připomeňme, že pokud dojde k ukončení běhu služby nsqd, budou zprávy taktéž serializovány, a to do té chvíle, než se služba znovu nastartuje a dojde k jejich opětovné deserializaci. Může se ovšem stát, že některá zpráva bude konzumentům doručena vícekrát (nsqd se totiž může ukončit ve chvíli, kdy ještě nedojde k potvrzení doručení posledních zpráv odeslaných konzumentům, popř. potvrzení dojde, ale informace o něm se již nestačí serializovat).

4. Připojení většího množství producentů zpráv

Samozřejmě nejsme omezeni pouze na jednoho producenta zpráv (pro zvolený topic). Producentů může existovat libovolné množství a jedinou informaci, kterou musí znát, je adresa služby nsqd a jméno tématu:

Obrázek 5: Větší množství producentů zpráv připojených do jediného nsqd.

Chování takto nastaveného systému si můžeme snadno odzkoušet. Po spuštění služby nsqd (ta musí běžet) nastartujeme prvního producenta zpráv:

$ ./producer3
 
2019/11/11 23:32:12 Sending message: Zprava z Go #0
2019/11/11 23:32:12 INF    1 (127.0.0.1:4150) connecting to nsqd
2019/11/11 23:32:13 Sending message: Zprava z Go #1
2019/11/11 23:32:14 Sending message: Zprava z Go #2
2019/11/11 23:32:15 Sending message: Zprava z Go #3
2019/11/11 23:32:16 Sending message: Zprava z Go #4
2019/11/11 23:32:17 Sending message: Zprava z Go #5
2019/11/11 23:32:18 Sending message: Zprava z Go #6
2019/11/11 23:32:19 Sending message: Zprava z Go #7
2019/11/11 23:32:20 Sending message: Zprava z Go #8
2019/11/11 23:32:21 Sending message: Zprava z Go #9
2019/11/11 23:32:22 Sending message: Zprava z Go #10

Současně ve druhém terminálu spustíme druhého producenta zpráv (se stejným tématem):

$ ./producer3B
 
2019/11/11 23:32:12 Sending message: Message from Go #0
2019/11/11 23:32:12 INF    1 (127.0.0.1:4150) connecting to nsqd
2019/11/11 23:32:13 Sending message: Message from Go #1
2019/11/11 23:32:14 Sending message: Message from Go #2
2019/11/11 23:32:15 Sending message: Message from Go #3
2019/11/11 23:32:16 Sending message: Message from Go #4
2019/11/11 23:32:17 Sending message: Message from Go #5
2019/11/11 23:32:18 Sending message: Message from Go #6
2019/11/11 23:32:19 Sending message: Message from Go #7
2019/11/11 23:32:20 Sending message: Message from Go #8
2019/11/11 23:32:21 Sending message: Message from Go #9

Ve třetím terminálu spustíme konzumenta zpráv připojeného k tématu „test“ a (jedinému) kanálu „A“. Vidíme, že konzument zpracovává zprávy z obou producentů:

$ ./consumer4_chan_A
 
2019/11/11 23:32:02 INF    1 [test/A] (127.0.0.1:4150) connecting to nsqd
2019/11/11 23:32:02 Waiting for message
2019/11/11 23:32:12 Received a message: Zprava z Go #0
2019/11/11 23:32:12 Received a message: Message from Go #0
2019/11/11 23:32:13 Received a message: Zprava z Go #1
2019/11/11 23:32:13 Received a message: Message from Go #1
2019/11/11 23:32:14 Received a message: Zprava z Go #2
2019/11/11 23:32:14 Received a message: Message from Go #2
2019/11/11 23:32:15 Received a message: Zprava z Go #3
2019/11/11 23:32:15 Received a message: Message from Go #3
2019/11/11 23:32:16 Received a message: Zprava z Go #4
2019/11/11 23:32:16 Received a message: Message from Go #4
2019/11/11 23:32:17 Received a message: Zprava z Go #5
2019/11/11 23:32:17 Received a message: Message from Go #5
2019/11/11 23:32:18 Received a message: Zprava z Go #6

5. Load-balancing konzumentů zpráv připojených ke stejnému kanálu

Podobně je možné – a velmi často se s takovou situací setkáme – že existuje několik konzumentů zpráv zaregistrovaných k odběru stejného tématu na shodném kanálu. V takovém případě dojde k již výše zmíněnému load-balancingu – zpráva je poslána jen jednomu zvolenému klientovi:

Obrázek 6: Větší množství konzumentů zpráv připojených do jediného nsqd.

Toto chování si opět můžeme snadno otestovat.

Opět spustíme producenta zpráv:

$ ./producer3
 
2019/11/11 23:32:12 Sending message: Zprava z Go #0
2019/11/11 23:32:12 INF    1 (127.0.0.1:4150) connecting to nsqd
2019/11/11 23:32:13 Sending message: Zprava z Go #1
2019/11/11 23:32:14 Sending message: Zprava z Go #2
2019/11/11 23:32:15 Sending message: Zprava z Go #3
2019/11/11 23:32:16 Sending message: Zprava z Go #4
2019/11/11 23:32:17 Sending message: Zprava z Go #5
2019/11/11 23:32:18 Sending message: Zprava z Go #6
2019/11/11 23:32:19 Sending message: Zprava z Go #7
2019/11/11 23:32:20 Sending message: Zprava z Go #8
2019/11/11 23:32:21 Sending message: Zprava z Go #9
2019/11/11 23:32:22 Sending message: Zprava z Go #10

A následně spustíme dva konzumenty, kteří se připojí ke stejnému tématu i shodnému kanálu:

$ ./consumer4_chan_A 
 
2019/11/10 19:33:49 INF    1 [test/A] (127.0.0.1:4150) connecting to nsqd
2019/11/10 19:33:49 Waiting for message
2019/11/10 19:33:56 Received a message: Zprava z Go #0
2019/11/10 19:33:58 Received a message: Zprava z Go #2
2019/11/10 19:34:00 Received a message: Zprava z Go #4
2019/11/10 19:34:02 Received a message: Zprava z Go #6
2019/11/10 19:34:04 Received a message: Zprava z Go #8
2019/11/10 19:34:06 Received a message: Zprava z Go #10
2019/11/10 19:34:08 Received a message: Zprava z Go #12
2019/11/10 19:34:10 Received a message: Zprava z Go #14
$ ./consumer4_chan_A
 
2019/11/10 19:33:52 INF    1 [test/A] (127.0.0.1:4150) connecting to nsqd
2019/11/10 19:33:52 Waiting for message
2019/11/10 19:33:57 Received a message: Zprava z Go #1
2019/11/10 19:33:59 Received a message: Zprava z Go #3
2019/11/10 19:34:01 Received a message: Zprava z Go #5
2019/11/10 19:34:03 Received a message: Zprava z Go #7
2019/11/10 19:34:05 Received a message: Zprava z Go #9
2019/11/10 19:34:07 Received a message: Zprava z Go #11
2019/11/10 19:34:09 Received a message: Zprava z Go #13
2019/11/10 19:34:11 Received a message: Zprava z Go #15
Poznámka: můžeme vidět, že se konzumenti skutečně střídají v získávání a zpracovávání zpráv.

6. Předání stejných zpráv několika konzumentům (distribuce)

Ovšem můžeme požadovat i odlišné chování – předání shodných zpráv několika konzumentům. Řešení tohoto požadavku je ve skutečnosti velmi snadné, protože postačuje zajistit, aby každý z konzumentů používal odlišný kanál tak, jak je to naznačeno na dalším schématu:

Obrázek 7: Příjem zpráv se stejným tématem, ale z odlišných kanálů.

Otestování – spustíme producenta zpráv:

$ ./producer3
 
2019/11/11 23:32:12 Sending message: Zprava z Go #0
2019/11/11 23:32:12 INF    1 (127.0.0.1:4150) connecting to nsqd
2019/11/11 23:32:13 Sending message: Zprava z Go #1
2019/11/11 23:32:14 Sending message: Zprava z Go #2
2019/11/11 23:32:15 Sending message: Zprava z Go #3
2019/11/11 23:32:16 Sending message: Zprava z Go #4
2019/11/11 23:32:17 Sending message: Zprava z Go #5
2019/11/11 23:32:18 Sending message: Zprava z Go #6
2019/11/11 23:32:19 Sending message: Zprava z Go #7
2019/11/11 23:32:20 Sending message: Zprava z Go #8
2019/11/11 23:32:21 Sending message: Zprava z Go #9
2019/11/11 23:32:22 Sending message: Zprava z Go #10

První konzument bude přijímat zprávy z kanálu A:

$ ./consumer4_chan_A 
 
2019/11/10 19:35:25 INF    1 [test/A] (127.0.0.1:4150) connecting to nsqd
2019/11/10 19:35:25 Waiting for message
2019/11/10 19:35:32 Received a message: Zprava z Go #0
2019/11/10 19:35:33 Received a message: Zprava z Go #1
2019/11/10 19:35:34 Received a message: Zprava z Go #2
2019/11/10 19:35:35 Received a message: Zprava z Go #3
2019/11/10 19:35:36 Received a message: Zprava z Go #4
2019/11/10 19:35:37 Received a message: Zprava z Go #5
2019/11/10 19:35:38 Received a message: Zprava z Go #6
2019/11/10 19:35:39 Received a message: Zprava z Go #7
2019/11/10 19:35:40 Received a message: Zprava z Go #8
2019/11/10 19:35:41 Received a message: Zprava z Go #9
2019/11/10 19:35:42 Received a message: Zprava z Go #10

Druhý konzument bude přijímat zprávy z kanálu B:

$ ./consumer4_chan_B
 
2019/11/10 19:35:29 INF    1 [test/B] (127.0.0.1:4150) connecting to nsqd
2019/11/10 19:35:29 Waiting for message
2019/11/10 19:35:32 Received a message: Zprava z Go #0
2019/11/10 19:35:33 Received a message: Zprava z Go #1
2019/11/10 19:35:34 Received a message: Zprava z Go #2
2019/11/10 19:35:35 Received a message: Zprava z Go #3
2019/11/10 19:35:36 Received a message: Zprava z Go #4
2019/11/10 19:35:37 Received a message: Zprava z Go #5
2019/11/10 19:35:38 Received a message: Zprava z Go #6
2019/11/10 19:35:39 Received a message: Zprava z Go #7
2019/11/10 19:35:40 Received a message: Zprava z Go #8
2019/11/10 19:35:41 Received a message: Zprava z Go #9
2019/11/10 19:35:42 Received a message: Zprava z Go #10

Mimochodem: v pracovním adresáři služby nsqd se vytvoří soubor obsahující informace o tématech i o kanálech:

{
  "topics": [
    {
      "channels": [
        {
          "name": "A",
          "paused": false
        },
        {
          "name": "B",
          "paused": false
        }
      ],
      "name": "test",
      "paused": false
    }
  ],
  "version": "1.2.1-alpha"
}

7. Archivace zpráv utilitou nsq_to_file

V mnoha situacích se setkáme s požadavkem archivace zpráv, které byly do systému poslány. Buď se může jednat o všechny zprávy nebo o zprávy s určitým tématem. Samozřejmě je možné naprogramovat konzumenta, který bude archivaci nějakým způsobem provádět, ovšem přesně pro tento účel existuje standardní utilita, která je součástí standardní instalace systému NSQ. Tato utilita se jmenuje příznačně nsq_to_file a při jejím volání musíme určit jak téma zpráv, které mají být archivovány, tak i jméno kanálu (a pochopitelně i adresu nsqd použitou pro příjem zpráv):

$ nsq_to_file --nsqd-tcp-address=localhost:4150 --topic test --channel archive

Přitom je nutné zajistit, aby se k danému kanálu nepřipojovali žádní běžní konzumenti zpráv, neboť již víme, že v takovém případě by se zpráva poslala jen jednomu zvolenému konzumentovi (podle jejich aktuálního vytížení). V případě, že utilita nsq_to_file bude naopak připojena k unikátnímu kanálu, mohou být ostatní konzumenti připojeni ke kanálu jinému a stále bude zaručeno, že se všechny zprávy s daným tématem budou archivovat. Toto chování si můžeme velmi snadno otestovat, protože postačuje:

  1. Spustit nsqd
  2. Spustit konzumenta zpráv
  3. Spustit zde popisovanou utilitu nsq_to_file
  4. A nakonec spustit producenta zpráv

Výsledky po odeslání jedenácti zpráv vypadají následovně.

Konzument:

$ ./consumer4_chan_A 
 
2019/11/10 19:39:29 INF    1 [test/A] (127.0.0.1:4150) connecting to nsqd
2019/11/10 19:39:29 Waiting for message
2019/11/10 19:39:31 Received a message: Zprava z Go #0
2019/11/10 19:39:32 Received a message: Zprava z Go #1
2019/11/10 19:39:33 Received a message: Zprava z Go #2
2019/11/10 19:39:34 Received a message: Zprava z Go #3
2019/11/10 19:39:35 Received a message: Zprava z Go #4
2019/11/10 19:39:36 Received a message: Zprava z Go #5
2019/11/10 19:39:37 Received a message: Zprava z Go #6
2019/11/10 19:39:38 Received a message: Zprava z Go #7
2019/11/10 19:39:39 Received a message: Zprava z Go #8
2019/11/10 19:39:40 Received a message: Zprava z Go #9
2019/11/10 19:39:41 Received a message: Zprava z Go #10

Chování archivátoru zpráv:

$ ./nsq_to_file --nsqd-tcp-address=localhost:4150 --topic test --channel archive
 
2019/11/10 19:39:18 INF    1 [test/archive] (localhost:4150) connecting to nsqd
[nsq_to_file] 2019/11/10 19:39:31.461572 INFO: [test/archive] opening /tmp/test.tester-ThinkPad-T410.2019-11-10_19.log
[nsq_to_file] 2019/11/10 19:39:31.461687 INFO: [test/archive] syncing 1 records to disk
[nsq_to_file] 2019/11/10 19:39:48.314973 INFO: [test/archive] syncing 10 records to disk
^C2019/11/10 19:40:02 INF    1 [test/archive] stopping...
2019/11/10 19:40:02 INF    1 [test/archive] (localhost:4150) received CLOSE_WAIT from nsqd
2019/11/10 19:40:02 INF    1 [test/archive] (localhost:4150) beginning close
2019/11/10 19:40:02 INF    1 [test/archive] (localhost:4150) readLoop exiting
2019/11/10 19:40:02 INF    1 [test/archive] (localhost:4150) breaking out of writeLoop
2019/11/10 19:40:02 INF    1 [test/archive] (localhost:4150) writeLoop exiting
2019/11/10 19:40:03 INF    1 [test/archive] (localhost:4150) finished draining, cleanup exiting
2019/11/10 19:40:03 INF    1 [test/archive] (localhost:4150) clean close complete
2019/11/10 19:40:03 WRN    1 [test/archive] there are 0 connections left alive
2019/11/10 19:40:03 INF    1 [test/archive] stopping handlers
2019/11/10 19:40:03 INF    1 [test/archive] rdyLoop exiting

Obsah souboru, do kterého se archivují zprávy (implicitně je tento soubor umístěn v adresáři /tmp, ovšem v případě potřeby lze určit jeho odlišné umístění):

$ cat /tmp/test.tester-ThinkPad-T410.2019-11-10_19.log 
 
Zprava z Go #0
Zprava z Go #1
Zprava z Go #2
Zprava z Go #3
Zprava z Go #4
Zprava z Go #5
Zprava z Go #6
Zprava z Go #7
Zprava z Go #8
Zprava z Go #9
Zprava z Go #10
Poznámka: můžeme zde vidět, že konzument skutečně získal všechny zprávy a současně byly všechny zprávy uloženy na disk. To je většinou přesně to chování, které od systému NSQ při archivaci zpráv vyžadujeme.

8. Špatné nastavení archivace

Pro zajímavost si můžeme otestovat jinou situaci – jak konzument zpráv, tak i archivační utilita nsq_to_file budou zprávy odebírat ze stejného kanálu. Systém NSQ by v takovém případě měl zprávy mezi oba producenty rozdělit, protože z jeho pohledu je i archivační utilita běžným příjemcem zpráv.

Výsledky po odeslání jedenácti zpráv vypadají následovně.

Konzument:

$ ./consumer4_chan_A 
 
2019/11/10 19:42:02 INF    1 [test/A] (127.0.0.1:4150) connecting to nsqd
2019/11/10 19:42:02 Waiting for message
2019/11/10 19:42:04 Received a message: Zprava z Go #1
2019/11/10 19:42:06 Received a message: Zprava z Go #3
2019/11/10 19:42:08 Received a message: Zprava z Go #5
2019/11/10 19:42:10 Received a message: Zprava z Go #7
2019/11/10 19:42:12 Received a message: Zprava z Go #9
2019/11/10 19:42:14 Received a message: Zprava z Go #11

Utilita pro archivaci zpráv je nyní spuštěna s volbou kanálu A:

$ ./nsq_to_file --nsqd-tcp-address=localhost:4150 --topic test --channel A
 
2019/11/10 19:41:58 INF    1 [test/A] (localhost:4150) connecting to nsqd
[nsq_to_file] 2019/11/10 19:42:03.902131 INFO: [test/A] opening /tmp/test.tester-ThinkPad-T410.2019-11-10_19.log
[nsq_to_file] 2019/11/10 19:42:03.902605 INFO: [test/A] syncing 1 records to disk
[nsq_to_file] 2019/11/10 19:42:28.905202 INFO: [test/A] syncing 5 records to disk

Obsah souboru, do kterého se archivují zprávy:

$ cat /tmp/test.tester-ThinkPad-T410.2019-11-10_19.log 
 
Zprava z Go #0
Zprava z Go #1
Zprava z Go #2
Zprava z Go #3
Zprava z Go #4
Zprava z Go #5
Zprava z Go #6
Zprava z Go #7
Zprava z Go #8
Zprava z Go #9
    ↑
... ↑ původní obsah archivu z předchozího příklad
...
... ↓ nový obsah archivu
    ↓
Zprava z Go #10
Zprava z Go #0
Zprava z Go #2
Zprava z Go #4
Zprava z Go #6
Zprava z Go #8
Zprava z Go #10
Poznámka: v tomto případě je patrné, že se zprávy skutečně rozdělily principem round-robin mezi běžného příjemce zpráv a mezi utilitu nsq_to_file. V naprosté většině případů se nejedná o to chování, které je v praxi vyžadováno. Jediným důvodem, kdy by tato konfigurace mohla být použita, je situace, kdy nechceme nezpracované zprávy ponechat v nsqd, ale současně je ani nechceme ztratit, například tehdy, pokud jsou všichni příjemci zpráv vytížení.

9. Chování systému ve chvíli, kdy zpráva nemůže být doručena

V této kapitole si popíšeme chování systému NSQ ve chvíli, kdy zpráva nemůže být z nějakého důvodu doručena. Systém NSQ v takové situaci používá dvě techniky nazvané retry a exponential backoff. Ve skutečnosti se nejedná o nic složitého:

  1. retry: v případě, že dojde na straně konzumenta zpráv k nějaké chybě, informuje o ni (přes příslušnou knihovnu – rozhraní pro NSQ) službu nsqd příkazem REQ neboli re-queue. Podobně pokud služba nsqd nedostane v určeném čase informaci o zpracování zprávy, dojde k timeoutu následovaném opět operací typu REQ. Nutno říci, že tento systém trpí všemi nedostatky komunikační strategie příkaz-ack, protože konzument ve skutečnosti zprávu mohl korektně zpracovat, ovšem k chybě došlo až při posílání ack, tedy při potvrzování, že zpráva byla zpracována. To je jeden z důvodů, proč by měly být producenti zpráv idempotentní, protože jim taková zpráva bude poslána znovu.
  2. exponential backoff: pokud není zpráva korektně zpracována v konzumentovi zpráv, bude žádost o další zprávy pozdržena o dobu, která postupně (s dalšími případnými pády) exponenciálně roste. A naopak – ve chvíli, kdy je nějaká zpráva zpracována korektně, bude se doba mezi příjmem další zprávy postupně (nelineárně) snižovat až na nulu. Díky tomu se (z nějakého důvodu problémový) konzument zpráv v systému postupně zpomaluje a po jeho opravě zase postupně zrychluje. Dává se tím prostor pro další konzumenty, které jsou v korektním stavu – ty budou zprávy zpracovávat nejrychlejším možným způsobem, což ovšem neznamená, že by se opravený uzel nemohl znovu stát plnohodnotným uzlem v celém systému – akorát musí svoji funkci zpracovávat zprávy několikrát dokázat.

Obrázek 8: Složitější konfigurace se dvěma kanály.

10. Důvody vedoucí k využití více služeb nsqd

Všechny předchozí konfigurace systému NSQ, které jsme si popsali, byly založeny na jediné službě nsqd, ke které se připojovali jak producenti zpráv, tak i jejich konzumenti (popř. nástroj pro archivaci zpráv). Konfigurace s jedinou službou nsqd je nejjednodušší a do značné míry se podobá konfiguraci klasických message brokerů. V praxi je pochopitelně možné nsqd využívat i tímto způsobem, ovšem musíme se přitom smířit s tím, že se případný pád či dokonce jen restart služby nsqd může negativně projevit na činnosti celého systému, a to z toho důvodu, že message brokeři bývají ústřední částí celého messagingu. Částečně lze tento problém (alespoň zdánlivě) vyřešit tím, že se v různých částech systému budou používat odlišné instance message brokerů – jeden pro rezervaci zboží, druhý pro vyskladňování atd. Pád/restart jednoho z těchto message brokerů ovlivní jen určitou část systému (pochopitelně jen v případě, že zbylé části systému nejsou vnitřně provázány a používají pro vzájemnou komunikaci jen message broker).

Současně je nutné myslet na to, že může nastat situace, v níž se NSQ stane úzkým hrdlem celého systému, ať již proto, že nebude mít dostatečný výkon CPU (nepravděpodobné), nižší propustnost sítě (pravděpodobné) či nedostatek RAM a diskového prostoru pro meziuložení všech zpracovávaných zpráv (pravděpodobné). A právě v tomto okamžiku je možné využít nejzajímavější funkci celého systému NSQ – schopnost provozovat větší množství služeb nsqd, které budou buď centrálně spravovány jednou službou nsqlookupd či v případě potřeby větším množstvím těchto služeb. Jednotlivé nsqd mohou běžet na jednom počítači (přímo na holém železe či v kontejneru), v rámci jednoho clusteru či naopak mohou být navzájem vzdálené (a to i geograficky). A právě některé konfigurace systému NSQ založené na dvou či více službách nsqd si popíšeme v navazujících podkapitolách.

Obrázek 9: Nepatrně složitější architektura, v níž si klienti vybírají službu nsqd.

11. Paralelně a nezávisle na sobě běžící služby nsqd

V navazujících kapitolách použijeme upravený kód producentů i konzumentů zpráv. Úprava spočívá v tom, že se port pro připojení bude získávat z parametru předaného na příkazovém řádku.

Upravený producent:

package main
 
import (
        "fmt"
        "github.com/nsqio/go-nsq"
        "log"
        "os"
        "time"
)
 
const Topic = "test"
 
func main() {
        if len(os.Args) < 2 {
                log.Panic("nsqd address needs to be specified on CLI")
        }
 
        address := os.Args[1]
 
        config := nsq.NewConfig()
 
        producer, err := nsq.NewProducer(address, config)
        if err != nil {
                log.Panic("Producer can't be constructed")
        }
        defer producer.Stop()
 
        i := 0
 
        for {
                message := fmt.Sprintf("Zprava z Go #%d", i)
                log.Print("Sending message: ", message)
                err = producer.Publish(Topic, []byte(message))
                if err != nil {
                        log.Panic("Could not connect")
                }
                i++
                time.Sleep(1 * time.Second)
        }
}

Upravený konzument:

package main
 
import (
        "github.com/nsqio/go-nsq"
        "log"
        "os"
)
 
const Topic = "test"
 
const Channel = "A"
 
func main() {
        if len(os.Args) < 2 {
                log.Panic("nsqd address needs to be specified on CLI")
        }
 
        address := os.Args[1]
        config := nsq.NewConfig()
 
        consumer, err := nsq.NewConsumer(Topic, Channel, config)
        if err != nil {
                log.Panic("Consumer can't be constructed")
        }
 
        done := make(chan bool)
 
        consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
                log.Printf("Received a message: %s", string(message.Body))
                // done <- true
                return nil
        }))
 
        err = consumer.ConnectToNSQD(address)
        if err != nil {
                log.Panic("Could not connect")
        }
 
        log.Println("Waiting for message")
        <-done
}

12. Chování dvou nezávislých služeb nsqd

Nejprve se podívejme na situaci, kdy jsou v provozu dvě služby nsqd, ovšem nsqlookupd není provozována. To tedy znamená, že se jednotliví producenti zpráv sami rozhodují, ke kterému nsqd se budou připojovat. Totéž platí i pro konzumenty zpráv. Toto řešení je sice plně funkční, ovšem konzumenti i producenti zpráv musí znát „své“ služby nsqd a není zde zajištěn stav, kdy jedna ze služeb nsqd není (z libovolného důvodu) dostupná:

Chování takto nakonfigurovaného systému si můžeme vyzkoušet, a to velmi snadno (i na jediném počítači). Postačuje spustit dvě služby nsqd, každou na jiném portu a ideálně v odlišném terminálu (ve Screenu/Tmuxu atd.), aby byly vidět zprávy, které služby v průběhu své činnosti generují:

$ nsqd -tcp-address localhost:4150 -http-address localhost:4151
$ nsqd -tcp-address localhost:5150 -http-address localhost:5151 --data-path=/tmp
Poznámka: povšimněte si, že u druhé služby musíme specifikovat umístění souborů s konfigurací, stavem kanálů a serializovaných zpráv. Jinak by došlo ke kolizi s již běžící první službou.

Dále spustíme konzumenty zpráv, z nichž první se bude připojovat k první službě a druhý naopak ke službě druhé:

$ nohup ./consumer5 localhost:4150 > consumer1.log &
$ nohup ./consumer5 localhost:5150 > consumer2.log &

Zbývá nám spustit producenty zpráv, jednoho pro první službu (a standardní topic i kanál), druhého pro službu druhou:

$ nohup ./producer4 localhost:4150 > producer1.log &
$ nohup ./producer4 localhost:5150 > producer2.log &

Obě služby nsqd pracují nezávisle na sobě, takže se jedná o dva zcela nezávislé systémy, které mezi sebou ani nesdílí žádná data. Je to ostatně patrné i ze zpráv, které jednotlivé prvky systému vypisují do terminálu:

$ cat producer1.log
 
2019/11/12 22:32:45 Sending message: Zprava z Go #0
2019/11/12 22:32:45 INF    1 (localhost:4150) connecting to nsqd
2019/11/12 22:32:46 Sending message: Zprava z Go #1
2019/11/12 22:32:47 Sending message: Zprava z Go #2
2019/11/12 22:32:48 Sending message: Zprava z Go #3
2019/11/12 22:32:49 Sending message: Zprava z Go #4
2019/11/12 22:32:50 Sending message: Zprava z Go #5
2019/11/12 22:32:51 Sending message: Zprava z Go #6
2019/11/12 22:32:52 Sending message: Zprava z Go #7
2019/11/12 22:32:53 Sending message: Zprava z Go #8
$ cat producer2.log
 
2019/11/12 22:32:50 Sending message: Zprava z Go #0
2019/11/12 22:32:50 INF    1 (localhost:5150) connecting to nsqd
2019/11/12 22:32:51 Sending message: Zprava z Go #1
2019/11/12 22:32:52 Sending message: Zprava z Go #2
2019/11/12 22:32:53 Sending message: Zprava z Go #3
2019/11/12 22:32:54 Sending message: Zprava z Go #4
2019/11/12 22:32:55 Sending message: Zprava z Go #5
2019/11/12 22:32:56 Sending message: Zprava z Go #6
$ cat consumer1.log
 
2019/11/12 22:32:19 INF    1 [test/A] (localhost:4150) connecting to nsqd
2019/11/12 22:32:19 Waiting for message
2019/11/12 22:32:45 Received a message: Zprava z Go #0
2019/11/12 22:32:47 Received a message: Zprava z Go #1
2019/11/12 22:32:50 Received a message: Zprava z Go #2
2019/11/12 22:32:50 Received a message: Zprava z Go #3
2019/11/12 22:32:50 Received a message: Zprava z Go #4
...
...
...
$ cat consumer2.log
 
2019/11/12 22:32:19 INF    1 [test/A] (localhost:4150) connecting to nsqd
2019/11/12 22:32:19 Waiting for message
2019/11/12 22:32:45 Received a message: Zprava z Go #0
2019/11/12 22:32:47 Received a message: Zprava z Go #1
2019/11/12 22:32:50 Received a message: Zprava z Go #2
2019/11/12 22:32:50 Received a message: Zprava z Go #3
2019/11/12 22:32:50 Received a message: Zprava z Go #4
...
...
...

Na konec nezapomeňte producenty i konzumenty zpráv ukončit:

$ killall consumer5
$ killall producer4

13. Konzument zpráv připojený k více službám nsqd

Nyní se můžeme pokusit vytvořit konzumenta, který se dokáže připojit k více službám nsqd podle obrázku číslo 9. Je to snadné, protože namísto volání:

consumer.ConnectToNSQD(jedna_adresa)

můžeme použít funkci:

consumer.ConnectToNSQDs([]string{adresa1, adresa2, adresa3...})

Zdrojový kód klienta se nepatrně změní, protože se budeme připojovat ke dvojici služeb nsqd, jejichž adresy se zadávají na příkazovém řádku:

package main
 
import (
        "github.com/nsqio/go-nsq"
        "log"
        "os"
)
 
const Topic = "test"
 
const Channel = "A"
 
func main() {
        if len(os.Args) < 3 {
                log.Panic("two nsqd addresses needs to be specified on CLI")
        }
 
        address1 := os.Args[1]
        address2 := os.Args[2]
        config := nsq.NewConfig()
 
        consumer, err := nsq.NewConsumer(Topic, Channel, config)
        if err != nil {
                log.Panic("Consumer can't be constructed")
        }
 
        done := make(chan bool)

        consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
                log.Printf("Received a message: %s", string(message.Body))
                // done <- true
                return nil
        }))
 
        err = consumer.ConnectToNSQDs([]string{address1, address2})
        if err != nil {
                log.Panic("Could not connect")
        }
 
        log.Println("Waiting for message")
        <-done
}

14. Chování dvou nezávislých služeb nsqd s konzumentem připojeným k oběma službám

Chování klienta (konzumenta zpráv) připojeného ke dvěma službám bude záviset na tom, zda obě služby běží či nikoli. Na začátku si klient vybere (například) první službu a bude zpracovávat zprávy, které z této služby přichází:

$ ./consumer6 localhost:4150 localhost:5150
 
2019/11/12 22:57:45 INF    1 [test/A] (localhost:4150) connecting to nsqd
2019/11/12 22:57:45 INF    1 [test/A] (localhost:5150) connecting to nsqd
2019/11/12 22:57:45 Waiting for message
2019/11/12 22:58:03 Received a message: Zprava z Go #0
2019/11/12 22:58:04 Received a message: Zprava z Go #1
2019/11/12 22:58:05 Received a message: Zprava z Go #2
2019/11/12 22:58:06 Received a message: Zprava z Go #3
2019/11/12 22:58:07 Received a message: Zprava z Go #4
2019/11/12 22:58:08 Received a message: Zprava z Go #5
2019/11/12 22:58:09 Received a message: Zprava z Go #6
2019/11/12 22:58:10 Received a message: Zprava z Go #7
2019/11/12 22:58:11 Received a message: Zprava z Go #8
...
...
...

Pokud ovšem první službu restartujeme popř. ji ukončíme (kill atd.), připojí se klient ke službě druhé a začne zpracovávat zprávy, které do ní přichází:

2019/11/12 22:58:29 Received a message: Zprava z Go #26
2019/11/12 22:58:30 Received a message: Zprava z Go #27
2019/11/12 22:58:36 ERR    1 [test/A] (localhost:4150) IO error - EOF
2019/11/12 22:58:36 INF    1 [test/A] (localhost:4150) beginning close
2019/11/12 22:58:36 INF    1 [test/A] (localhost:4150) readLoop exiting
2019/11/12 22:58:36 INF    1 [test/A] (localhost:4150) breaking out of writeLoop
2019/11/12 22:58:36 INF    1 [test/A] (localhost:4150) writeLoop exiting
2019/11/12 22:58:36 INF    1 [test/A] (localhost:4150) finished draining, cleanup exiting
2019/11/12 22:58:36 INF    1 [test/A] (localhost:4150) clean close complete
2019/11/12 22:58:36 WRN    1 [test/A] there are 1 connections left alive
2019/11/12 22:58:36 INF    1 [test/A] (localhost:4150) re-connecting in 1m0s
2019/11/12 22:58:40 Received a message: Zprava z Go #0
2019/11/12 22:58:40 Received a message: Zprava z Go #1
2019/11/12 22:58:40 Received a message: Zprava z Go #2
2019/11/12 22:58:40 Received a message: Zprava z Go #3
Poznámka: povšimněte si zpráv typu ERRor a INFo, které nám oznamují přepnutí na službu druhou.

15. Využití adresářové služby nsqlookupd

Předchozí konfiguraci můžeme ještě upravit, a to spuštěním služby nsqlookupd, která dokáže sledovat jednotlivé služby nsqd a poskytovat tyto informace klientům. Tuto službu (běžící na odlišném portu, než jednotlivé nsqd) spustíme jako první a necháme nsqd, aby se k ní přihlásily:

$ ./nsqlookupd 
 
[nsqlookupd] 2019/11/13 20:25:10.392683 INFO: nsqlookupd v1.2.1-alpha (built w/go1.11.2)
[nsqlookupd] 2019/11/13 20:25:10.394993 INFO: TCP: listening on [::]:4160
[nsqlookupd] 2019/11/13 20:25:10.395000 INFO: HTTP: listening on [::]:4161

První služba:

$ ./nsqd -tcp-address localhost:4150 -http-address localhost:4151 --lookupd-tcp-address=127.0.0.1:4160 -broadcast-address=127.0.0.1

Druhá služba:

$ ./nsqd -tcp-address localhost:5150 -http-address localhost:5151 --lookupd-tcp-address=127.0.0.1:4160 -broadcast-address=127.0.0.1 --data-path=/tmp/
Poznámka: volbu -broadcast-address je v tomto případě nutné použít, jinak se klient (konzument) nedokáže k nsqd připojit.

Otestujeme, zda obě služby nahlašují svůj stav do adresářové služby:

$ alias pp='python -mjson.tool'
 
$ curl -s localhost:4161/nodes |pp
{
    "producers": [
        {
            "broadcast_address": "tester-ThinkPad-T410",
            "hostname": "tester-ThinkPad-T410",
            "http_port": 5151,
            "remote_address": "127.0.0.1:43230",
            "tcp_port": 5150,
            "tombstones": [],
            "topics": [],
            "version": "1.2.1-alpha"
        },
        {
            "broadcast_address": "tester-ThinkPad-T410",
            "hostname": "tester-ThinkPad-T410",
            "http_port": 4151,
            "remote_address": "127.0.0.1:43231",
            "tcp_port": 4150,
            "tombstones": [],
            "topics": [],
            "version": "1.2.1-alpha"
        }
    ]
}

16. Úprava klienta (konzumenta) pro použití nsqlookupd

Jak se tato změna dotkne klientů, kteří spolu potřebují komunikovat pomocí zpráv? Klienty je možné upravit takovým způsobem, aby se zeptaly služby nsqlookupd, na kterou konkrétní službu nsqd se mají připojit. Pokud by došlo k restartu či dokonce k pádu (odpojení) některé nsqd, mohou se klienti připojit k jiné službě (což ovšem neznamená, že nemůže dojít k tomu, že některé zprávy dojdou až po opětovném spuštění dané nsqd). Konzumenta upravíme snadno, ostatně podobný zdrojový kód jsme již viděli v předchozí části tohoto seriálu:

package main
 
import (
        "github.com/nsqio/go-nsq"
        "log"
)
 
const Topic = "test"
 
const Channel = "A"
 
func main() {
        config := nsq.NewConfig()
 
        consumer, err := nsq.NewConsumer(Topic, Channel, config)
        if err != nil {
                log.Panic("Consumer can't be constructed")
        }
 
        done := make(chan bool)
 
        consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
                log.Printf("Received a message: %s", string(message.Body))
                return nil
        }))
 
        err = consumer.ConnectToNSQLookupd("127.0.0.1:4161")
        if err != nil {
                log.Panic("Could not connect")
        }
        defer consumer.DisconnectFromNSQLookupd("127.0.0.1:4160")
 
        log.Println("Waiting for message")
        <-done
}

17. Otestování chování klienta

Nového klienta můžeme otestovat velmi snadno. V dalším výpisu je ukázána situace, kdy se první služba odpoví (havaruje) a namísto ní se začnou zpracovávat zprávy ze služby druhé:

2019/11/13 20:43:00 Received a message: Zprava z Go #17
2019/11/13 20:43:01 Received a message: Zprava z Go #18
2019/11/13 20:43:02 Received a message: Zprava z Go #19
2019/11/13 20:43:03 Received a message: Zprava z Go #20
2019/11/13 20:43:04 Received a message: Zprava z Go #21
2019/11/13 20:43:04 ERR    1 [test/A] (127.0.0.1:4150) IO error - EOF
2019/11/13 20:43:04 INF    1 [test/A] (127.0.0.1:4150) beginning close
2019/11/13 20:43:04 INF    1 [test/A] (127.0.0.1:4150) readLoop exiting
2019/11/13 20:43:04 INF    1 [test/A] (127.0.0.1:4150) breaking out of writeLoop
2019/11/13 20:43:04 INF    1 [test/A] (127.0.0.1:4150) writeLoop exiting
2019/11/13 20:43:04 INF    1 [test/A] (127.0.0.1:4150) finished draining, cleanup exiting
2019/11/13 20:43:04 INF    1 [test/A] (127.0.0.1:4150) clean close complete
2019/11/13 20:43:04 WRN    1 [test/A] there are 1 connections left alive
2019/11/13 20:43:04 INF    1 [test/A] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test
2019/11/13 20:43:05 Received a message: Zprava z Go #0
2019/11/13 20:43:05 Received a message: Zprava z Go #1
2019/11/13 20:43:05 Received a message: Zprava z Go #2
2019/11/13 20:43:05 Received a message: Zprava z Go #3
2019/11/13 20:43:05 Received a message: Zprava z Go #4
2019/11/13 20:43:05 Received a message: Zprava z Go #5

18. Konzument současně přijímající zprávy z více služeb nsqd

V případě, že je nutné přijímat zprávy (a to současně) z více služeb nsqd, musí se kód konzumenta nepatrně upravit takovým způsobem, aby byl handler (pro příjem zpráv) skutečně zaregistrován pro všechny služby nsqd. Jednoduché řešení (které ovšem nepočítá s pádem služby) může vypadat následovně:

Diners Vánoce 2019

package main
 
import (
        "github.com/nsqio/go-nsq"
        "log"
        "os"
)
 
const Topic = "test"
 
const Channel = "A"
 
func main() {
        if len(os.Args) < 3 {
                log.Panic("two nsqd addresses needs to be specified on CLI")
        }
 
        config := nsq.NewConfig()
 
        for i := 0; i <= 1; i++ {
                consumer, err := nsq.NewConsumer(Topic, Channel, config)
                if err != nil {
                        log.Panic("Consumer can't be constructed")
                }
                consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
                        log.Printf("Received a message from nsqd #%d: %s", i+1, string(message.Body))
                        return nil
                }))
                err = consumer.ConnectToNSQD(os.Args[i+1])
                if err != nil {
                        log.Panicf("Could not connect to nsqd #%d", i)
                }
        }
 
        log.Println("Waiting for message")
        done := make(chan bool)
 
        <-done
}

Po spuštění konzumenta (a dvou producentů) lze snadno zjistit, že konzument skutečně přijímá zprávy z obou služeb nsqd:

$ ./consumer7 localhost:4150 localhost:5150
 
2019/11/13 20:55:34 INF    1 [test/A] (localhost:4150) connecting to nsqd
2019/11/13 20:55:34 INF    2 [test/A] (localhost:5150) connecting to nsqd
2019/11/13 20:55:34 Waiting for message
2019/11/13 20:55:36 Received a message from nsqd #3: Zprava z Go #0
2019/11/13 20:55:37 Received a message from nsqd #3: Zprava z Go #1
2019/11/13 20:55:37 Received a message from nsqd #3: Zprava z Go #0
2019/11/13 20:55:38 Received a message from nsqd #3: Zprava z Go #2
2019/11/13 20:55:38 Received a message from nsqd #3: Zprava z Go #1
2019/11/13 20:55:39 Received a message from nsqd #3: Zprava z Go #3
2019/11/13 20:55:39 Received a message from nsqd #3: Zprava z Go #2
2019/11/13 20:55:40 Received a message from nsqd #3: Zprava z Go #4
2019/11/13 20:55:40 Received a message from nsqd #3: Zprava z Go #3
2019/11/13 20:55:41 Received a message from nsqd #3: Zprava z Go #5
2019/11/13 20:55:41 Received a message from nsqd #3: Zprava z Go #4

19. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má stále ještě doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:

Příklad Skript Stručný popis Cesta
1 producer3.go producent sekvence zpráv naprogramovaný v jazyce Go s blokem defer https://github.com/tisnik/message-queues-examples/blob/master/nsq/producer3.go
2 producer3B.go producent sekvence zpráv s jiným obsahem naprogramovaný taktéž v jazyce Go s blokem defer https://github.com/tisnik/message-queues-examples/blob/master/nsq/producer3B.go
3 producer3C.go rychlejší posílání zpráv https://github.com/tisnik/message-queues-examples/blob/master/nsq/producer3C.go
4 producer4.go producent se specifikací adresy nsqd https://github.com/tisnik/message-queues-examples/blob/master/nsq/producer4.go
5 consumer4_chan_A.go konzument naprogramovaný v Go, který zpracuje všechny jemu dostupné zprávy (kanál A) https://github.com/tisnik/message-queues-examples/blob/master/nsq/con­sumer4_chan_A.go
6 consumer4_chan_B.go konzument naprogramovaný v Go, který zpracuje všechny jemu dostupné zprávy (kanál B) https://github.com/tisnik/message-queues-examples/blob/master/nsq/con­sumer4_chan_B.go
7 consumer5.go konzument se specifikací adresy nsqd https://github.com/tisnik/message-queues-examples/blob/master/nsq/consumer5.go
8 consumer6.go konzument připojený k více službám nsqd, vybírá vždy zprávy z jediné služby https://github.com/tisnik/message-queues-examples/blob/master/nsq/consumer6.go
9 consumer7.go konzument připojený k více službám nsqd a přijímající současně zprávy z obou služeb https://github.com/tisnik/message-queues-examples/blob/master/nsq/consumer7.go

20. Odkazy na Internetu

  1. Stránka projektu NSQ
    https://nsq.io/
  2. Dokumentace k projektu NSQ
    https://nsq.io/overview/design.html
  3. Dokumentace ke klientovi pro Go
    https://godoc.org/github.com/nsqio/go-nsq
  4. Dokumentace ke klientovi pro Python
    https://pynsq.readthedocs­.io/en/latest/
  5. Binární tarbally s NSQ
    https://nsq.io/deployment/in­stalling.html
  6. GitHub repositář projektu NSQ
    https://github.com/nsqio/nsq
  7. Klienti pro NSQ
    https://nsq.io/clients/cli­ent_libraries.html
  8. Klient pro Go
    https://github.com/nsqio/go-nsq
  9. Klient pro Python
    https://github.com/nsqio/pynsq
  10. An Example of Using NSQ From Go
    http://tleyden.github.io/blog/2014/11/12/an-example-of-using-nsq-from-go/
  11. Go Go Gadget
    https://word.bitly.com/pos­t/29550171827/go-go-gadget
  12. Simplehttp
    https://github.com/bitly/simplehttp
  13. Dramatiq: simple task processing
    https://dramatiq.io/
  14. Cookbook (for Dramatiq)
    https://dramatiq.io/cookbook.html
  15. Balíček dramatiq na PyPi
    https://pypi.org/project/dramatiq/
  16. Dramatiq dashboard
    https://github.com/Bogdan­p/dramatiq_dashboard
  17. Dramatiq na Redditu
    https://www.reddit.com/r/dramatiq/
  18. A Dramatiq broker that can be used with Amazon SQS
    https://github.com/Bogdan­p/dramatiq_sqs
  19. nanomsg na GitHubu
    https://github.com/nanomsg/nanomsg
  20. Referenční příručka knihovny nanomsg
    https://nanomsg.org/v1.1.5/na­nomsg.html
  21. nng (nanomsg-next-generation)
    https://github.com/nanomsg/nng
  22. Differences between nanomsg and ZeroMQ
    https://nanomsg.org/documentation-zeromq.html
  23. NATS
    https://nats.io/about/
  24. NATS Streaming Concepts
    https://nats.io/documenta­tion/streaming/nats-streaming-intro/
  25. NATS Streaming Server
    https://nats.io/download/nats-io/nats-streaming-server/
  26. NATS Introduction
    https://nats.io/documentation/
  27. NATS Client Protocol
    https://nats.io/documenta­tion/internals/nats-protocol/
  28. NATS Messaging (Wikipedia)
    https://en.wikipedia.org/wi­ki/NATS_Messaging
  29. Stránka Apache Software Foundation
    http://www.apache.org/
  30. Informace o portu 5672
    http://www.tcp-udp-ports.com/port-5672.htm
  31. Třída MessagingHandler knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._handlers.MessagingHan­dler-class.html
  32. Třída Event knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._events.Event-class.html
  33. package stomp (Go)
    https://godoc.org/github.com/go-stomp/stomp
  34. Go language library for STOMP protocol
    https://github.com/go-stomp/stomp
  35. python-qpid-proton 0.26.0 na PyPi
    https://pypi.org/project/python-qpid-proton/
  36. Qpid Proton
    http://qpid.apache.org/proton/
  37. Using the AMQ Python Client
    https://access.redhat.com/do­cumentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/
  38. Apache ActiveMQ
    http://activemq.apache.org/
  39. Apache ActiveMQ Artemis
    https://activemq.apache.org/artemis/
  40. Apache ActiveMQ Artemis User Manual
    https://activemq.apache.or­g/artemis/docs/latest/index­.html
  41. KahaDB
    http://activemq.apache.or­g/kahadb.html
  42. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html
  43. Command Line Tools (Apache ActiveMQ)
    https://activemq.apache.org/activemq-command-line-tools-reference.html
  44. stomp.py 4.1.21 na PyPi
    https://pypi.org/project/stomp.py/
  45. Stomp Tutorial
    https://access.redhat.com/do­cumentation/en-US/Fuse_Message_Broker/5.5/html/Con­nectivity_Guide/files/FMBCon­nectivityStompTelnet.html
  46. Heartbeat (computing)
    https://en.wikipedia.org/wi­ki/Heartbeat_(computing)
  47. Apache Camel
    https://camel.apache.org/
  48. Red Hat Fuse
    https://developers.redhat­.com/products/fuse/overvi­ew/
  49. Confusion between ActiveMQ and ActiveMQ-Artemis?
    https://serverfault.com/qu­estions/873533/confusion-between-activemq-and-activemq-artemis
  50. Staré stránky projektu HornetQ
    http://hornetq.jboss.org/
  51. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  52. Difference between ActiveMQ vs Apache ActiveMQ Artemis
    http://activemq.2283324.n4­.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html
  53. Microservices communications. Why you should switch to message queues
    https://dev.to/matteojoli­veau/microservices-communications-why-you-should-switch-to-message-queues–48ia
  54. Stomp.py 4.1.19 documentation
    https://stomppy.readthedoc­s.io/en/stable/
  55. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  56. ØMQ – Distributed Messaging
    http://zeromq.org/
  57. ØMQ Community
    http://zeromq.org/community
  58. Get The Software
    http://zeromq.org/intro:get-the-software
  59. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  60. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  61. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  62. ZeroMQ RFC
    https://rfc.zeromq.org/
  63. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  64. zeromq/czmq
    https://github.com/zeromq/czmq
  65. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  66. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  67. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  68. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  69. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  70. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  71. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  72. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  73. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  74. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  75. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  76. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  77. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  78. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  79. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  80. Python binding
    http://zeromq.org/bindings:python
  81. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  82. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  83. About Nanomsg
    https://nanomsg.org/
  84. Advanced Message Queuing Protocol
    https://www.amqp.org/
  85. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  86. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  87. RabbitMQ
    https://www.rabbitmq.com/
  88. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  89. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  90. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  91. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  92. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  93. Erlang
    http://www.erlang.org/
  94. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  95. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  96. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  97. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  98. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  99. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  100. celery na PyPi
    https://pypi.org/project/celery/
  101. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  102. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  103. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  104. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  105. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  106. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  107. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  108. node-celery
    https://github.com/mher/node-celery
  109. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  110. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  111. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  112. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  113. Stránky projektu Redis
    https://redis.io/
  114. Introduction to Redis
    https://redis.io/topics/introduction
  115. Try Redis
    http://try.redis.io/
  116. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  117. Python Redis
    https://redislabs.com/lp/python-redis/
  118. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  119. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  120. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  121. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  122. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  123. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  124. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  125. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  126. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  127. Redis Persistence
    https://redis.io/topics/persistence
  128. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  129. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  130. Ost (knihovna)
    https://github.com/soveran/ost
  131. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  132. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  133. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  134. What Is Sharding?
    https://btcmanager.com/what-sharding/
  135. Redis clients
    https://redis.io/clients
  136. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  137. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  138. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  139. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  140. Resque
    https://github.com/resque/resque
  141. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  142. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  143. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  144. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  145. Pub/Sub
    https://redis.io/topics/pubsub
  146. ZeroMQ distributed messaging
    http://zeromq.org/
  147. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  148. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  149. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  150. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  151. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  152. Redis Protocol specification
    https://redis.io/topics/protocol
  153. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  154. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  155. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  156. Apache Kafka
    https://kafka.apache.org/
  157. Iron
    http://www.iron.io/mq
  158. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  159. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  160. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  161. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  162. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  163. Enqueueing internals
    http://python-rq.org/contrib/
  164. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  165. Queue – A thread-safe FIFO implementation
    https://pymotw.com/2/Queue/
  166. Queues
    http://queues.io/
  167. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  168. RestMQ
    http://restmq.com/
  169. ActiveMQ
    http://activemq.apache.org/
  170. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  171. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  172. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  173. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  174. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  175. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  176. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  177. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  178. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  179. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  180. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  181. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  182. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  183. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  184. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  185. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  186. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  187. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  188. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  189. Jupyter
    https://jupyter.org/
  190. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  191. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html