Hlavní navigace

Témata, oddíly a replikace v systému Apache Kafka

7. 3. 2023
Doba čtení: 35 minut

Sdílet

 Autor: Depositphotos
Systému Apache Kafka jsme se již věnovali v několika článcích. Prozatím jsme si ovšem neukázali, jak se Kafka chová v případě, že se používají témata s replikovanými oddíly uloženými ve více brokerech.

Obsah

1. Témata, oddíly a replikace v systému Apache Kafka

2. Základní koncepty, na nichž je Apache Kafka postavena

3. Nejjednodušší konfigurace tématu: jediný broker, jeden oddíl, bez replikace

4. Téma rozdělené na větší počet oddílů

5. Téma s jedním replikovaným oddílem

6. Téma s větším množstvím replikovaných oddílů

7. Praktická část

8. Instalace Apache Kafky

9. Konfigurace a spuštění Zookeepera, test zda Zookeeper přijímá příkazy

10. Konfigurace a spuštění brokera

11. Komunikace přes téma s jediným oddílem a bez replikace

12. Větší množství konzumentů z jedné skupiny vs. z více skupin

13. Komunikace přes téma s větším množstvím oddílů

14. Souběžná konzumace zpráv konzumenty z jedné skupiny

15. Spuštění většího množství brokerů

16. Vytvoření a použití tématu s jedním replikovaným oddílem

17. Chování při nedostupnosti brokeru (brokerů)

18. Vytvoření a použití tématu s několika replikovanými oddíly

19. Repositář s pomocnými skripty a konfiguračními soubory

20. Odkazy na Internetu

1. Témata, oddíly a replikace v systému Apache Kafka

Systém Apache Kafka je v současnosti velmi rozšířen a používá se v mnoha oblastech IT. Někdy se setkáme s tím, že je Apache Kafka nasazena a využívána jako pouhý „vylepšený“ message broker, tj. jako centrální část celé architektury sloužící pro komunikaci mezi jednotlivými (mikro)službami a nástroji. Ovšem možnosti Apache Kafky jsou ve skutečnosti poněkud větší, a to díky poměrně unikátnímu způsobu práce s tzv. tématy (topic). Navíc Apache Kafka dokáže zajistit svoji velkou dostupnost a odolnost vůči pádům jednotlivých komponent či síťové infrastruktury (resilience). Tomuto tématu, s nímž do značné míry souvisí tzv. replikace oddílů a systém leader-follower(s), se budeme věnovat v dnešním článku.

Obrázek 1: Logo nástroje Apache Kafka, kterému se budeme dnes věnovat.

2. Základní koncepty, na nichž je Apache Kafka postavena

V této kapitole si ve stručnosti vysvětlíme základní koncepty, na nichž je Apache Kafka postavena. Systém Apache Kafka umožňuje ukládání zpráv (zde se ovšem poměrně často taktéž používá termín záznam – record) do různých témat (topic), přičemž každé téma je obecně rozděleno do oddílů neboli partition. Samozřejmě je možné pro téma vyhradit pouze jediný oddíl (což je ostatně výchozí nastavení, které se asi nejvíce podobá klasickým message brokerům) a tvářit se, že máme k dispozici „vylepšenou“ frontu – ostatně přesně takto lze s Kafkou začít a pro mnohé účely může být tato konfigurace dostatečná. Rozdělení tématu do většího množství oddílů se provádí z několika důvodů. Jedním z nich je snaha o rozdělení zátěže (load balancing), protože jednotlivé oddíly mohou být provozovány na různých počítačích v mnohdy i velmi rozsáhlém clusteru (většinou se jedná o zátěž disků, nikoli CPU).

kappa

Obrázek 2: Kafka nemusí být nasazena jako „pouhý“ message broker, ale může sloužit i jako primární zdroj dat pro další mikroslužby. To je základ pro architekturu Kappa.

Dále se dělení provádí z toho důvodu, že každý oddíl obsahuje sekvenci neměnných (immutable) zpráv, přičemž nové zprávy se pouze připojují na konec oddílu (append-only log). Zprávy z oddílů je možné číst (konzumovat) nezávisle na ostatních oddílech a zajistit tak potřebný load balancing (jak uvidíme dále, je tato možnost realizována přes skupiny konzumentů – consumer groups). Každá zpráva uložená do oddílu má přiřazen jednoznačný offset (reprezentovaný v Javě typem long, což je dostatečně vysoká hodnota na to, aby v reálném nasazení nedošlo k jejímu přetečení).

U většiny reálných nasazení Apache Kafky se taktéž počítá s využitím většího množství instancí brokerů, z nichž je vytvořen cluster (nazývaný Kafka Cluster). A právě při takovém uspořádání se setkáme s důležitým termínem replikace – každý oddíl je totiž typicky replikován na několika message brokerech v clusteru (ovšem nemusí se jednat o všechny brokery, replikace se provádí například na tři brokery ve větším clusteru, což si ostatně vyzkoušíme v dalších kapitolách).

To však není vše, jelikož je ve skutečnosti konfigurace poněkud složitější resp. může být složitější – každý oddíl totiž může být replikován na více počítačích, přičemž jeden z těchto oddílů je takzvaným „leaderem“ a ostatní jsou „followeři“. Zápis nových zpráv popř. čtení se provádí vždy jen v rámci leaderu, ovšem změny jsou replikovány na všechny kopie oddílu. Ve chvíli, kdy z nějakého (libovolného) důvodu dojde k pádu „leadera“, převezme jeho roli jeden z dalších uzlů. Pokud tedy existuje N uzlů s replikou oddílu, bude systém funkční i ve chvíli, kdy zhavaruje N-1 uzlů! (i to si vyzkoušíme).

3. Nejjednodušší konfigurace tématu: jediný broker, jeden oddíl, bez replikace

Podívejme se nejdříve na tu nejjednodušší možnou konfiguraci tématu. Jedná se o konfiguraci, v níž je téma spravováno jediným brokerem a není prováděna žádná replikace. Zprávy (události) jsou tedy fyzicky uloženy pouze v jediném souboru, přičemž zápis je všemi producenty prováděn na konec (což je očekávané chování), zatímco čtení zpráv může být konzumenty provedeno od libovolného offsetu. Příkladem je situace, kdy je konzument zpráv opožděn za producentem zpráv, protože čte zprávu na offsetu 4 zatímco producent bude zapisovat zprávu s offsetem 9:

                                     write
                                       |
+---+---+---+---+---+---+---+---+---+  v
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...
+---+---+---+---+---+---+---+---+---+
                  ^
                  |
                read

Druhá situace nastane ve chvíli, kdy je producent pomalejší než konzument a konzument dojde na konec tématu. V tomto případě konzument bude čekat na příchod (resp. přesněji řečeno na připojení) nové zprávy na konec tématu:

                                     write
                                       |
+---+---+---+---+---+---+---+---+---+  v
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...
+---+---+---+---+---+---+---+---+---+
                                       ^
                                       |
                                     read

A samozřejmě je možné k tématu připojit větší množství konzumentů. Pokud bude každý z konzumentů součástí jiné skupiny konzumentů, bude čtení zpráv probíhat pro každou skupinu nezávisle na ostatních skupinách. Jinými slovy – každá skupina konzumentů si „schraňuje“ svůj offset, jenž se může lišit od offsetu ostatních skupin:

                                     write
                                       |
+---+---+---+---+---+---+---+---+---+  v
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...
+---+---+---+---+---+---+---+---+---+  ^
      ^           ^               ^    |
      |           |               |    |
      |       read-group-1        |    |
      |                           |    |
  read-group-2                read-group-4
                                       |
                                   read-group-3
Poznámka: právě tato vlastnost odlišuje Apache Kafku od klasických message brokerů, kteří jsou založeni na použití front.

4. Téma rozdělené na větší počet oddílů

Poněkud komplikovanější, ale velmi častá je taková konfigurace tématu, kde dochází k rozdělení zpráv do několika oddílů. V takovém případě producent či producenti nezapisují zprávy do jednoho oddílu (samozřejmě na konec), ale zápis je proveden pouze do jednoho z vybraných oddílů. O tom, do kterého oddílu bude zápis (resp. připojení) zprávy proveden, se rozhoduje na základě klíče připojeného ke zprávě. Samotná zpráva je totiž chápána jako dvě sekvence bajtů – klíč zprávy a tělo zprávy. A právě na základě klíče se vypočítá hash a algoritmus implementovaný v samotném brokeru rozhodne, do kterého oddílu bude zpráva uložena:

              +---+---+---+---+---+---+
partition #0  | 0 | 1 | 2 | 3 | 4 | 5 | ...
              +---+---+---+---+---+---+
partition #1  | 0 | 1 | 2 | ...
              +---+---+---+
partition #2  | ...
              +---+---+---+---+---+---+---+---+---+
partition #3  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...
              +---+---+---+---+---+---+---+---+---+

Po přijetí nové zprávy tedy může být zápis proveden do tohoto místa:

                                   write
                                     |
              +---+---+---+---+---+---+  v
partition #0  | 0 | 1 | 2 | 3 | 4 | 5 | ...
              +---+---+---+---+---+---+
partition #1  | 0 | 1 | 2 | ...
              +---+---+---+
partition #2  | ...
              +---+---+---+---+---+---+---+---+---+
partition #3  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...
              +---+---+---+---+---+---+---+---+---+

Nebo se může broker rozhodnout pro připojení zprávy do posledního oddílu atd. atd.:

              +---+---+---+---+---+---+
partition #0  | 0 | 1 | 2 | 3 | 4 | 5 | ...
              +---+---+---+---+---+---+
partition #1  | 0 | 1 | 2 | ...
              +---+---+---+
partition #2  | ...
              +---+---+---+---+---+---+---+---+---+
partition #3  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...
              +---+---+---+---+---+---+---+---+---+  ^
                                                 |
                                               write

A jak se provádí čtení? Kafka ve chvíli, kdy je téma rozděleno do větví, musí přiřadit jednotlivé konzumenty (z jedné skupiny konzumentů) k nějakému oddílu. Nejjednodušší je situace, kdy má nadá skupina konzumentů stejný počet konzumentů, jako je počet oddílů, což je ostatně doporučované řešení. Pak je každý konzument přiřazen jednomu oddílu a konzumuje tedy pouze podmnožinu zpráv:

              +---+---+---+---+---+---+
partition #0  | 0 | 1 | 2 | 3 | 4 | 5 | ............... konzument #1
              +---+---+---+---+---+---+
partition #1  | 0 | 1 | 2 | ........................... konzument #3
              +---+---+---+
partition #2  | ....................................... konzument #2
              +---+---+---+---+---+---+---+---+---+
partition #3  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |.... konzument #4
              +---+---+---+---+---+---+---+---+---+
Poznámka: konzumenti jsou obecně přiřazeni k oddílům „náhodně“.

Konzumentů v jedné skupině ovšem může být méně, než je počet oddílů. V takovém případě musí některý konzument číst zprávy z většího množství oddílů:

              +---+---+---+---+---+---+
partition #0  | 0 | 1 | 2 | 3 | 4 | 5 |          ........... konzument #1
              +---+---+---+---+---+---+          :
partition #1  | 0 | 1 | 2 | ................................ konzument #3
              +---+---+---+
partition #2  | ............................................ konzument #2
              +---+---+---+---+---+---+---+---+---+       :
partition #3  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |........
              +---+---+---+---+---+---+---+---+---+

Konzumentů ovšem může být i více. Tehdy nějaký konzument v daný okamžik nepracuje, tj. nepřijímá zprávy a situace může vypadat takto:

              +---+---+---+---+---+---+
partition #0  | 0 | 1 | 2 | 3 | 4 | 5 | ............... konzument #1
              +---+---+---+---+---+---+
partition #1  | 0 | 1 | 2 | ........................... konzument #3
              +---+---+---+
partition #2  | ....................................... konzument #2
              +---+---+---+---+---+---+---+---+---+
partition #3  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |.... konzument #4
              +---+---+---+---+---+---+---+---+---+
                                                        konzument #5

                                                        konzument #6

                                                        konzument #7
Konzumenti z dalších skupin konzumentů se budou připojovat nezávisle na ostatních skupinách. Tj. může existovat skupina konzumentů s menším počtem konzumentů, než je počet oddílů, další skupina může mít přesný počet konzumentů a třetí skupina naopak větší počet konzumentů. Vše bude funkční, protože jednotlivé skupiny konzumentů jsou na sobě nezávislé.

5. Téma s jedním replikovaným oddílem

Další možná konfigurace tématu může vypadat tak, že téma má sice jediný oddíl, ovšem tento oddíl je replikován mezi několika brokery. Příkladem může být oddíl replikovaný mezi trojicí brokerů. V takovém případě je jeden z těchto oddílů nazvaný leader a veškeré operace viděné zvnějšku Kafky (tedy posílání zpráv a jejich konzumace) probíhá právě s leaderem. Ostatní repliky jsou nazvané follower(s), protože pouze sledují leadera a synchronizují svůj obsah s leaderem:

                                     write
                                       |
+---+---+---+---+---+---+---+---+---+  v
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...    (leader)
+---+---+---+---+---+---+---+---+---+
                  ^               ^
                  |               |
                read              |
                                 sync
                                  |
                                  |
                                  v
+---+---+---+---+---+---+---+---+---+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |        (follower)
+---+---+---+---+---+---+---+---+---+
                                  ^
                                  |
                                  |
                                 sync
                                  |
                                  |
                                  v
+---+---+---+---+---+---+---+---+---+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |        (follower)
+---+---+---+---+---+---+---+---+---+

K čemu je to však dobré? V případě, že nějaký broker z celého Kafka clusteru zhavaruje a bude obsahovat oddíl typu follower, bude komunikace pokračovat dál a teprve po znovupřipojení brokera se follower postupně sesynchronizuje s leaderem. Zajímavější situace nastane ve chvíli, kdy zhavaruje samotný leader. V takovém případě Kafka „povýší“nějakého followera za nového leadera. V případě, že téma (resp. oddíl) je replikováno na N brokerů, může jich zhavarovat N-1 a systém bude stále funkční. I toto chování si pochopitelně postupně otestujeme.

6. Téma s větším množstvím replikovaných oddílů

Možnosti popsané ve čtvrté a v páté kapitole je pochopitelně možné v případě potřeby zkombinovat a vytvořit tak konfiguraci tématu, které bude rozděleno na větší množství oddílů a tyto oddíly budou replikovány mezi větší množství brokerů:

          +---+---+---+---+---+---+
oddíl #0  | 0 | 1 | 2 | 3 | 4 | 5 | ...
          +---+---+---+---+---+---+
oddíl #1  | 0 | 1 | 2 | ...
          +---+---+---+                               (leader)
oddíl #2  | ...
          +---+---+---+---+---+---+---+---+---+
oddíl #3  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...
          +---+---+---+---+---+---+---+---+---+
                                  ^
                                  |
                                  |
                                 sync
                                  |
                                  |
                                  v
          +---+---+---+---+---+---+
oddíl #0  | 0 | 1 | 2 | 3 | 4 | 5 | ...
          +---+---+---+---+---+---+
oddíl #1  | 0 | 1 | 2 | ...
          +---+---+---+
oddíl #2  | ...                                       (follower)
          +---+---+---+---+---+---+---+---+---+
oddíl #3  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...
          +---+---+---+---+---+---+---+---+---+

Opět zde můžeme vidět, že oddíl na jednom z brokerů bude zvolen za leadera a ostatní oddíly budou sledovat změny leadera a aplikovat je na své straně. A samozřejmě při výpadku leadera je jeden z následovníků zvolen za leadera a činnost celé infrastruktury Apache Kafky tak může pokračovat dál.

7. Praktická část

Druhá část dnešního článku bude zaměřená více prakticky. Nejdříve vytvoříme tu nejjednodušší instanci Kafka clusteru, která se skládá z jednoho běžícího Zookeepera a jednoho brokera. Na této instanci si otestujeme způsoby vytváření témat, chování většího množství konzumentů při připojení k tématu, použití většího množství oddílů pro téma atd. Ovšem zajímavější je situace, kdy je spuštěno větší množství brokerů a kdy je navíc nějaký oddíl replikován mezi tyto brokery. V této chvíli by mělo být možné brokera (či brokery) zastavit s tím, že zbývající brokeři se postarají o zachování funkcionality Apache Kafky (resp. přesněji řečeno celého Kafka clusteru). To, zda je tomu skutečně tak, si taktéž ověříme.

8. Instalace Apache Kafky

V praktické části budeme brokery Apache Kafky i Zookeeper spouštět lokálně (popř. z Dockeru), takže je nejdříve nutné Kafku nainstalovat. Není to vůbec nic složitého. V případě, že je na počítači nainstalováno JRE (běhové prostředí Javy), je instalace Kafky pro testovací účely triviální. V článku si ukážeme instalaci verze 3.3.2, ovšem můžete si stáhnout i nejnovější verzi 3.4.0, která byla vydána prakticky přesně před měsícem. Tarball s instalací Kafky lze získat z adresy https://downloads.apache.or­g/kafka/3.3.2/kafka2.13–3.3.2.tgz. Stažení a rozbalení tarballu:

$ wget https://downloads.apache.org/kafka/3.3.2/kafka_2.13-3.3.2.tgz
$ tar xvfz kafka_2.13-3.3.2.tgz
$ cd kafka_2.13-3.3.2/

Po rozbalení staženého tarballu získáme adresář, v němž se nachází všechny potřebné Java archivy (JAR), konfigurační soubory (v podadresáři config) a několik pomocných skriptů (v podadresáři bin). Pro spuštění Zookeepera a brokerů je zapotřebí mít nainstalovánu JRE (Java Runtime Environment) a samozřejmě též nějaký shell (BASH, cmd, …).

.
├── bin
│   └── windows
├── config
│   └── kraft
├── libs
├── licenses
└── site-docs
 
7 directories

Mezi důležité soubory, které budeme používat v rámci dalších kapitol, patří především skripty pro spouštění jednotlivých služeb, konfiguraci témat, produkci zpráv či pro jejich konzumaci. Tyto skripty jsou uloženy v podadresáři bin (a pro Windows ještě v dalším podadresáři windows):

Skript Stručný popis
bin/kafka-server-start.sh spuštění brokera
bin/zookeeper-server-start.sh spuštění Zookeepera
   
bin/kafka-configs.sh konfigurace brokerů
bin/kafka-topics.sh konfigurace témat, zjištění informace o tématech atd.
bin/kafka-consumer-groups.sh konfigurace popř. zjištění informací o skupinách konzumentů
bin/kafka-run-class.sh spuštění konkrétní třídy z Apache Kafky (například pro zjištění informací o skupinách konzumentů)
   
bin/kafka-console-producer.sh jednoduchý producent zpráv
bin/kafka-console-consumer.sh jednoduchý konzument zpráv

Používat budeme i několik konfiguračních souborů. Ty jsou pro změnu uloženy v podadresáři config a jedná se o soubory ve formátu Java property (file), tj. vlastně se jedná o sekvence dvojic klíč:hodnota (s podporou zápisu poznámek):

Konfigurační soubor Stručný popis
config/server.properties konfigurace brokeru
config/zookeeper.properties konfigurace Zookeepera

9. Konfigurace a spuštění Zookeepera, test zda Zookeeper přijímá příkazy

Po (doufejme že úspěšné) instalaci Kafky již můžeme spustit Zookeeper a jednu instanci brokera (a to přesně v tomto pořadí!). Konfigurace Zookeepera je uložena ve výše zmíněném souboru config/zookeeper.properties a zajímat nás budou především následující tři konfigurační volby – adresář, kam ZooKeeper ukládá svoje data, port, který použijí brokeři a omezení počtu připojení jednoho klienta v daný okamžik:

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
Poznámka: hodnota maxClientCnxns v tomto případě neznamená, že by se nemohli připojit žádní klienti, ale je že vypnutý mechanismus, který zabezpečuje infrastrukturu Kafky před některými typy DOS útoků. Na disku, kde je adresář dataDir by také mělo být dostatek místa, protože Zookeeper v některých případech mívá větší nároky. Další informace lze nalézt na stránce https://zookeeper.apache.or­g/doc/r3.8.1/index.html.

Nyní již můžeme Zookeepera spustit:

$ cd kafka/kafka_2.12-3.3.2/
$ bin/zookeeper-server-start.sh config/zookeeper.properties

Průběh inicializace Zookeepera je vypisován na terminál:

[2023-02-04 08:37:49,555] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
...
...
[2023-02-04 08:37:49,591] INFO  (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-04 08:37:49,591] INFO   ______                  _                                           (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-04 08:37:49,591] INFO  |___  /                 | |                                          (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-04 08:37:49,591] INFO     / /    ___     ___   | | __   ___    ___   _ __     ___   _ __    (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-04 08:37:49,592] INFO    / /    / _ \   / _ \  | |/ /  / _ \  / _ \ | '_ \   / _ \ | '__| (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-04 08:37:49,592] INFO   / /__  | (_) | | (_) | |   <  |  __/ |  __/ | |_) | |  __/ | |     (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-04 08:37:49,592] INFO  /_____|  \___/   \___/  |_|\_\  \___|  \___| | .__/   \___| |_| (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-04 08:37:49,592] INFO                                               | |                      (org.apache.zookeeper.server.ZooKeeperServer)
[2023-02-04 08:37:49,592] INFO                                               |_|                      (org.apache.zookeeper.server.ZooKeeperServer)
...
...
...
[2023-02-04 08:37:49,691] INFO zookeeper.request_throttler.shutdownTimeout = 10000 (org.apache.zookeeper.server.RequestThrottler)
[2023-02-04 08:37:49,706] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)

Ověření, zda Zookeeper běží a přijímá požadavky, můžeme provést standardním nástrojem telnet. Nejprve se k Zookeeperovi připojíme, a to konkrétně na port 2181:

$ telnet localhost 2181
 
Trying 192.168.1.34...
Connected to 192.168.1.34.
Escape character is '^]'.

Připojit se pochopitelně můžeme i ke vzdálenému stroji s běžícím Zookeeperem, pokud nám to umožňuje konfigurace sítě a nastavení firewallů:

$ telnet 192.168.1.34 2181
 
Trying 192.168.1.34...
Connected to 192.168.1.34.
Escape character is '^]'.

Nyní Zookeeper čeká na zadání four letter word command, tedy příkazu, který je zapsán formou čtyř znaků. Příkladem může být příkaz srvr. Po jeho zápisu Zookeeper odpoví a odpojí se:

srvr
 
Zookeeper version: 3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT
Latency min/avg/max: 0/0.0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: standalone
Node count: 5
Connection closed by foreign host.

Mezi další podporované čtyřznakové příkazy patří:

Hodnota 32bitového slova Odpovídající příkaz
1936881266 srvr
1937006964 stat
2003003491 wchc
1685417328 dump
1668445044 crst
1936880500 srst
1701738089 envi
1668247142 conf
1751217000 hash
2003003507 wchs
2003003504 wchp
1684632179 dirs
1668247155 cons
1835955314 mntr
1769173615 isro
1920298859 ruok
1735683435 gtmk
1937010027 stmk
Poznámka: kód je odvozen jednoduše tak, že všechny čtyři znaky (resp. přesněji řečeno jejich ASCII kódy) uložíme do jednoho 32bitového slova, jehož číselnou hodnotu lze poté snadno přečíst.

10. Konfigurace a spuštění brokera

Nyní, když již běží jedna instance Zookeepera, si můžeme spustit brokera. Podívejme se ovšem nejdříve na jeho konfiguraci. Výchozí konfigurace jednoho brokera je uložená v souboru config/server.properties. Samotný konfigurační soubor obsahuje několik sekcí:

  1. Port, na kterém broker naslouchá
  2. Jednoznačné (unikátní) ID brokera
  3. Počet použitých vláken pro IO operace a počet vláken pro komunikaci.
  4. Velikost bufferů, maximální povolená velikost požadavků (což omezuje velikost zprávy) atd.
  5. Nastavení počtu partitions
  6. Nastavení retence dat
  7. Připojení k Zookeeperovi

Takto vypadá výchozí konfigurace (po odstranění původních komentářů a přidání komentářů vlastních):

#
########################### Jednoznačná identifikace brokera ####################
#
broker.id=0
num.io.threads=8
#
######################## Komunikace s producenty i konzumenty ###################
#
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
#
########################### Adresář se soubory s daty oddílů ####################
#
log.dirs=/tmp/kafka-logs
#
################### Výchozí konfigurace oddílů pro nová témata ##################
#
num.partitions=1
num.recovery.threads.per.data.dir=1
#
######################## Interní témata s offsety a transakcemi #################
#
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#
############################# Log Retention Policy ##############################
#
log.retention.hours=168
log.retention.check.interval.ms=300000
#
############################# Zookeeper #########################################
#
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
#
############################# Group Coordinator Settings ########################
#
group.initial.rebalance.delay.ms=0
Poznámka: povšimněte si, že se broker bude snažit komunikovat se Zookeeperem běžícím na stejném stroji.
Poznámka2: velikost adresáře log.dirs roste, a to mnohdy velmi rychle, takže se vyplatí sledovat příslušné metriky.

Spuštění jednoho brokera vypadá i probíhá jednoduše:

$ cd kafka/kafka_2.12-3.3.2/
$ bin/kafka-server-start.sh config/server.properties

Broker by měl vypsat minimálně informaci o tom, že se připojil k Zookeeperu a že dokázal inicializovat adresář pro uložení dat (zpráv) oddílů:

[2023-02-04 08:41:47,105] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2023-02-04 08:41:47,506] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2023-02-04 08:41:47,587] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2023-02-04 08:41:47,589] INFO starting (kafka.server.KafkaServer)
[2023-02-04 08:41:47,590] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2023-02-04 08:41:47,606] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
...
...
...
[2023-02-04 08:49:52,076] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2023-02-04 08:49:52,167] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use node localhost:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-02-04 08:49:52,184] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use node localhost:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)

Alternativně je možné ZooKeepera i Kafku (jednu instanci brokera) spustit v Dockeru:

$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka
Poznámka: předchozí nastavení předpokládá, že současně na stejném stroji nepoběží žádná další instance Kafky ani Zookeepera. Pokud budete potřebovat spustit větší množství brokerů, je nutné minimálně změnit mapování portů (přepínače -p) a taktéž změnit ID brokeru – to vše si pochopitelně ukážeme v dalším textu.

11. Komunikace přes téma s jediným oddílem a bez replikace

Nejprve si ukažme komunikaci mezi producenty zpráv a jejich konzumenty v případě, že jak producenti, tak i konzumenti používají společné téma (topic) s jediným oddílem, který navíc není replikovaný. Jedná se tedy o konfiguraci, která byla popsána ve třetí kapitole. V dalším textu budeme předpokládat, že je již spuštěn jak jeden Zookeeper, tak i jeden Kafka broker.

Budeme potřebovat další tři terminály (nebo okna spravovaná přes tmux atd.):

  1. V prvním terminálu budeme spouštět příkazy pro konfiguraci Kafky a pro zjišťování jejího stavu
  2. Ve druhém terminálu spustíme producenta zpráv.
  3. Ve třetím terminálu spustíme konzumenta zpráv.

Pro větší přehlednost bude mít každý z terminálů nastaven odlišnou výzvu (prompt):

kafka $
producer $
consumer $

Vypíšeme si seznam témat; ten by měl být prázdný:

kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
 

Vytvoříme nové téma nazvané topic1 a necháme si opět vypsat seznam témat:

kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic1
Created topic topic1.
 
kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
topic1

Získáme i podrobnější informace o tématu (s jediným oddílem):

kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic1
 
Topic: topic1   TopicId: 3Xfn9Hu1QhmRAdXKmEua-w PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0     Isr: 0

Vypíšeme si (prozatím prázdný) seznam skupin konzumentů:

kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
 

V dalším terminálu spustíme producenta zpráv a pošleme do tématu topic1 tři zprávy s hodnotou (tělem) „foo“, „bar“ a „baz“:

producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1
>foo
>bar
>baz
<Ctrl+D>

Spustíme konzumenta zpráv a zajistíme, aby zprávy četl od začátku tématu (jinak by čekal na nejnovější zprávy a již uložené zprávy by ignoroval):

consumer $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --from-beginning
foo
bar
baz

Opět si zobrazíme seznam skupin konzumentů:

kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-73416

A vypíšeme si o nich i podrobnější informace:

kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
 
GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
console-consumer-73416 topic1          0          -               3               -               console-consumer-938ac3be-f3b6-4e93-aab4-74e34b9a3ac5 /192.168.1.34   console-consumer

Lepší však bude vytvořit konzumenta s explicitně specifikovaným jménem skupiny konzumentů:

consumer $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --group topic1-group1

Nyní bude výpis informací o skupinách konzumentů vypadat takto:

kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
 
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
topic1-group1   topic1          0          -               3               -               console-consumer-3dc1ad8a-14fb-4758-b25e-9319436b409b /192.168.1.34   console-consumer
kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups

Po poslání další zprávy provede konzument její zpracování a upraví si „svůj“ offset, díky čemuž může Kafka spočítat i lag, tj. zpoždění konzumentů vůči producentovi (měřené v počtu nepřečtených zpráv):

kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
 
GROUP          TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID                                           HOST            CLIENT-ID
topic1-group1  topic1   0          4               4               0     console-consumer-3dc1ad8a-14fb-4758-b25e-9319436b409b /192.168.1.34   console-consumer

12. Větší množství konzumentů z jedné skupiny vs. z více skupin

V této kapitole si ukážeme chování Kafky ve chvíli, kdy se k vybranému tématu připojí větší množství konzumentů, kteří mohou patřit buď do jedné skupiny nebo do většího množství skupin.

Nejprve nové téma vytvoříme. Není to sice nutné, protože téma dokáže vytvořit i připojený klient, ale proč nebýt explicitní:

kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic2
Created topic topic2.

Vypíšeme si podrobnější informace o právě vytvořeném tématu. Vidíme, že téma má jediný oddíl a jediného leadera (což je ten samý oddíl):

kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic2
 
Topic: topic2   TopicId: 43_uP1TFTuOd-lXgKxbIaA PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: topic2   Partition: 0    Leader: 0       Replicas: 0     Isr: 0

V samostatných terminálech spustíme dvojici konzumentů, kteří budou patřit do stejné skupiny nazvané topic2-group1:

consumer1 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic2 --group topic2-group1
consumer2 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic2 --group topic2-group1

A samozřejmě v dalším terminálu spustíme producenta, který bude do tématu předávat zprávy:

producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic2
>foo
>bar
>baz
<Ctrl+D>

Povšimněte si, že pouze jeden z producentů (náhodně vybraný) bude dostávat zprávy, zatímco druhý bude pouze čekat na zprávy, které nedojdou. Jedná se tedy o následující situaci, kdy v rámci jedné skupiny může být k jedinému oddílu připojen jediný konzument:

              +---+---+---+---+---+---+
partition #0  | 0 | 1 | 2 | 3 | 4 | 5 | ............... konzument #1
              +---+---+---+---+---+---+
                                                        konzument #2
                                                           ...
                                                           ...
                                                           ...
                                                        konzument #N

Co se však bude dít ve chvíli, kdy budeme mít dvojici konzumentů, ovšem každý bude patřit do jiné skupiny? I to si pochopitelně otestujeme. Vytvoříme pro tento účel nové téma:

kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic3
Created topic topic3.
 
kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic3
Topic: topic3   TopicId: o1Szo6FoQLu67SUK4m9QsA PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: topic3   Partition: 0    Leader: 0       Replicas: 0     Isr: 0

V samostatných terminálech spustíme dvojici konzumentů, každý ovšem bude patřit do jiné skupiny:

consumer1 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic3 --group topic3-group1
consumer2 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic3 --group topic3-group2

Spustíme producenta zpráv:

producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic3
>foo
>bar
>baz
<Ctrl+D>

Nyní by oba konzumenti měli dostávat stejné zprávy! Tj. bude se jednat o tuto konfiguraci:

                                     write
                                       |
+---+---+---+---+---+---+---+---+---+  v
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...
+---+---+---+---+---+---+---+---+---+
                  ^               ^
                  |               |
            topic3-group1-        |
                                  |
                              topic3-group2

O tom, že konzumenti z různých skupin mají uložen svůj offset nezávisle na dalších skupinách, se lze snadno přesvědčit:

kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
 
GROUP          TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID                                           HOST            CLIENT-ID
topic3-group1  topic3  0          3               3               0    console-consumer-bec8112f-8c21-46e0-8a22-997ec663615f /192.168.1.34   console-consumer
 
GROUP          TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID                                           HOST            CLIENT-ID
topic3-group2  topic3  0          3               3               0    console-consumer-feb4ff84-460c-4ddb-959c-f949fc18968f /192.168.1.34   console-consumer
Poznámka: v tomto konkrétním případě mají obě skupiny konzumentů nulový lag, ovšem v praxi může být lagodlišný, neboť konzumenti z jedné skupiny mohou být (z různých důvodů) rychlejší, než je tomu u dalších skupin.

13. Komunikace přes téma s větším množstvím oddílů

Vyzkoušejme si nyní další situaci, konkrétně stav, kdy je jedno téma rozděleno na dva oddíly na jediném Kafka brokeru. Takové téma je nutné vytvořit explicitně s použitím parametru –partitions:

kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic4 --partitions 2
Created topic topic4.

Nyní bude konfigurace tématu vypsaná samotnou Kafkou vypadat odlišně, protože se vypíšou informace o obou oddílech:

kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic4
Topic: topic4   TopicId: 4qaw38skShK9elEG-gwEIA PartitionCount: 2       ReplicationFactor: 1    Configs:
        Topic: topic4   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic4   Partition: 1    Leader: 0       Replicas: 0     Isr: 0

Spusťme opět dva konzumenty patřící do stejné skupiny:

consumer1 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic4 --group topic4-group1
consumer2 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic4 --group topic4-group1

Kafka broker v tomto případě konzumenty rozdělí po jednotlivých oddílech:

              +---+---+---+---+---+---+
partition #0  | 0 | 1 | 2 | 3 | 4 | 5 | ............... konzument #1
              +---+---+---+---+---+---+
partition #1  | 0 | 1 | 2 | ........................... konzument #2
              +---+---+---+

O tom se ostatně můžeme velmi snadno přesvědčit:

kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups 
 
GROUP          TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID                                           HOST            CLIENT-ID
topic4-group1  topic4  0          0               0               0    console-consumer-00d59376-f63e-4833-adfa-b0578c275244 /192.168.1.34   console-consumer
topic4-group1  topic4  1          0               0               0    console-consumer-22ddccc4-6ea4-4b97-9c32-e85755788e35 /192.168.1.34   console-consumer
Poznámka: povšimněte si, že každý z konzumentů je nyní skutečně připojen ke „svému“ oddílu a pamatuje si offset v rámci tohoto oddílu.

Teoreticky by nyní měly být zprávy rozdělovány „spravedlivě“ mezi oba oddíly:

producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic4
>foo
>bar
>baz
<Ctrl+D>

Ve skutečnosti však k rozdělení nedojde, o čemž se lze snadno přesvědčit – jeden z konzumentů (nevíme který) bude dostávat všechny zprávy, druhý konzument nebude zprávy dostávat žádné!

              +---+---+---+---+---+---+
partition #0  | 0 | 1 | 2 | 3 | 4 | 5 | ............... konzument #1
              +---+---+---+---+---+---+
partition #1  | ....................................... konzument #2
              +

14. Souběžná konzumace zpráv konzumenty z jedné skupiny

Zprávy se (ve výchozím nastavení) rozdělují do jednotlivých oddílů v tématu na základě svého klíče. Ten lze u posílaných (resp. produkovaných) zpráv specifikovat podobně jako tělo zprávy. Musíme však producenta ovládaného z příkazové řádky vhodným způsobem nakonfigurovat tak, aby věděl, jakým způsobem je klíč (sekvence bajtů) oddělen od těla zprávy (což je taktéž sekvence bajtů). Jako oddělovač můžeme použít například dvojtečku. Producent se nakonfiguruje následovně:

producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic4 --property parse.key=true --property key.separator=":"

Do tématu nyní pošleme šest zpráv, každou s odlišným klíčem (zadán před dvojtečkou):

>1:a
>2:b
>3:c
>foo:1
>bar:2
>baz:3
<Ctrl+D>

Pokud jsou současně spuštění dva konzumenti patřící do stejné skupiny, uvidíme, že zprávy by měly být mezi konzumenty rozděleny a to zhruba férově:

consumer1 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic4 --group topic4-group1
b
1
3
consumer2 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic4 --group topic4-group1
a
c
2

Dosáhli jsme tedy tohoto stavu:

              +---+---+---+
partition #0  | 0 | 1 | 2 | ................ konzument #1
              +---+---+---+
partition #1  | 0 | 1 | 2 | ................ konzument #2
              +---+---+---+
Poznámka: rozdělení nemusí být zcela férové ve chvíli, kdy se používá jen malý počet klíčů popř. když heše klíčů nemají uniformní rozdělení.

O stavu jednotlivých oddílů se můžeme snadno přesvědčit zadáním následujícího příkazu:

kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
 
GROUP          TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID                                           HOST            CLIENT-ID
topic4-group1  topic4  0          3               3               0    console-consumer-00d59376-f63e-4833-adfa-b0578c275244 /192.168.1.34   console-consumer
topic4-group1  topic4  1          6               6               0    console-consumer-22ddccc4-6ea4-4b97-9c32-e85755788e35 /192.168.1.34   console-consumer

Z výpisu je patrné, že ve druhém oddílu je větší množství zpráv. To je způsobeno zprávami, které jsme do stejného tématu poslali v rámci předchozí kapitoly. Tyto zprávy neměly klíče a proto byly umístěny do jediného oddílu.

15. Spuštění většího množství brokerů

V dalším textu si ukážeme, jakým způsobem je možné vytvořit oddíly replikované mezi větší množství brokerů. Ovšem nejprve musíme tyto brokery spustit. Každý broker musí mít unikátní ID a musí běžet na svém vlastním (prozatím neobsazeném) portu. Pro spuštění tří brokerů tedy musíme mít tři konfigurační soubory, které postupně vypadají následovně (důležité řádky jsou zvýrazněny):

Konfigurační soubor server1.properties:

broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

Konfigurační soubor server2.properties:

broker.id=1
listeners=PLAINTEXT://:9093
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-2
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

Konfigurační soubor server3.properties:

broker.id=2
listeners=PLAINTEXT://:9094
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-3
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

Jednotlivé instance Kafka brokerů postupně spustíme s využitím následujících skriptů:

Skript pro první broker:

bin/kafka-server-start.sh config/server1.properties

Skript pro druhý broker:

bin/kafka-server-start.sh config/server2.properties

Skript pro třetí broker:

bin/kafka-server-start.sh config/server3.properties
Poznámka: v následujících kapitolách budeme předpokládat, že je spuštěn jeden Zookeeper a Kafka brokery s výše uvedenou konfigurací.

16. Vytvoření a použití tématu s jedním replikovaným oddílem

Nyní, když máme spuštěny tři brokery, si vytvoříme téma s jediným oddílem, který bude replikován na dva vybrané brokery:

kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic5 --replica-assignment 0:1
Created topic topic5.
Poznámka: zpráva o vytvoření nového tématu by se ihned měla objevit v logu prvního a druhého serveru. Proč tomu tak je? V parametru –replica-assignment jsme předali specifikaci 0:1, což není číselný rozsah, ale ID prvního a druhého serveru (viz konfiguraci uvedenou výše).

Nyní si vypišme konfiguraci nového tématu tak, jak ji vidí Apache Kafka:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic5
Topic: topic5   TopicId: 3XcqjGiHT-OpkO1jIuqKcQ PartitionCount: 1       ReplicationFactor: 2    Configs:
        Topic: topic5   Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1

Informace, které jsme dříve do jisté míry ignorovali, se nyní stávají důležité:

  • V sekci Leader je uvedeno ID brokeru, který je v daný okamžik leaderem
  • V sekci Replicas vidíme zápis 0,1, což jsou ID brokerů, kde jsou uloženy repliky prvního oddílu (s indexem 0)
  • A konečně v sekci Isr taktéž vidíme zápis 0,1, což jsou ID brokerů, kteří obsahují synchronizované repliky oddílů (in-sync replicas)

Vytvořili jsme tedy téma s touto konfigurací:

                                     write
                                       |
+---+---+---+---+---+---+---+---+---+  v
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...    (leader)
+---+---+---+---+---+---+---+---+---+
                  ^               ^
                  |               |
                read              |
                                 sync
                                  |
                                  |
                                  v
+---+---+---+---+---+---+---+---+---+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |        (follower)
+---+---+---+---+---+---+---+---+---+

Pro zajímavost si vytvořme další téma, které bude obsahovat jeden oddíl replikovaný na všechny tři servery:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic6 --replica-assignment 0:1:2
Created topic topic6.

Konfigurace tohoto tématu:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic6
Topic: topic6   TopicId: 1kEi0YYaQ5uYot7xAsN3Eg PartitionCount: 1       ReplicationFactor: 3    Configs:
        Topic: topic6   Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2

Jedná se tedy o tuto strukturu:

                                     write
                                       |
+---+---+---+---+---+---+---+---+---+  v
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...    (leader)
+---+---+---+---+---+---+---+---+---+
                  ^               ^
                  |               |
                read              |
                                 sync
                                  |
                                  |
                                  v
+---+---+---+---+---+---+---+---+---+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |        (follower)
+---+---+---+---+---+---+---+---+---+
                                  ^
                                  |
                                  |
                                 sync
                                  |
                                  |
                                  v
+---+---+---+---+---+---+---+---+---+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |        (follower)
+---+---+---+---+---+---+---+---+---+

A nakonec – téma nazvané topic7 bude taktéž obsahovat jediný oddíl replikovaný na všechny tři brokery, ovšem nyní zvolíme jako leadera třetí broker (s ID nastaveným na 2):

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic7 --replica-assignment 2:0:1
Created topic topic7.

Konfigurace tohoto tématu:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic7
Topic: topic7   TopicId: 55pDYK4gSYSB2Cc4Me3eIA PartitionCount: 1       ReplicationFactor: 3    Configs:
        Topic: topic7   Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1

V tomto případě se jedná o tuto strukturu:

+---+---+---+---+---+---+---+---+---+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |        (follower)
+---+---+---+---+---+---+---+---+---+
                                  ^
                                  |
                                  |
                                 sync
                                  |
                                  |
                                  v
+---+---+---+---+---+---+---+---+---+
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |        (follower)
+---+---+---+---+---+---+---+---+---+
                                  ^
                                  |
                                  |
                read             sync
                  |               |
                  |               |   write
                  v               v     |
+---+---+---+---+---+---+---+---+---+   v
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |  ...   (leader)
+---+---+---+---+---+---+---+---+---+

17. Chování při nedostupnosti brokeru (brokerů)

V této kapitole použijeme téma nazvané topic7, které bylo vytvořeno v rámci předchozí kapitoly. Do tohoto tématu budeme posílat zprávy konzumentem ovládaným z příkazové řádky:

producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic7
>foo
>bar
>baz

Konzument se připojí – což je možná zvláštní – k prvnímu brokeru, i když ve skutečnosti je leaderem třetí broker. To je však interní záležitost Apache Kafky:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic7 --group topic7-group1
foo
bar
baz

Pokud nyní klávesou Ctrl+C nebo příkazem kill ukončíme činnost třetího brokera, který je leaderem, vypíše producent toto varování:

[2023-03-06 13:07:58,594] WARN [Producer clientId=console-producer] Connection to node 2 (unknown.home/192.168.1.34:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Podobné varování vypíše i konzument:

[2023-03-06 13:07:41,241] WARN [Consumer clientId=console-consumer, groupId=topic7-group1] Connection to node 2 (unknown.home/192.168.1.34:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Současně však dojde k přepojení na jiného brokera, který se stane leaderem. Který prober to je zjistíme opět z konfigurace tématu:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic7
Topic: topic7   TopicId: 55pDYK4gSYSB2Cc4Me3eIA PartitionCount: 1       ReplicationFactor: 3    Configs:
        Topic: topic7   Partition: 0    Leader: 0       Replicas: 2,0,1 Isr: 0,1

Nyní se tedy leaderem stává broker číslo 0, repliky jsou stále nakonfigurovány pro brokery 0, 1 i 2, ovšem synchronizováni jsou jen brokeři 0 a 1 (což je logické, protože zbývající broker neběží).

Nyní třetí broker opět nastartujeme. V logu brokerů by se měly objevit zprávy o synchronizaci témat (což může nějakou dobu trvat):

[2023-03-06 13:08:45,041] INFO [Partition topic7-0 broker=0] ISR updated to 0,1,2 and version updated to 2 (kafka.cluster.Partition)

A v konfiguraci tématu by opět měli být v sekci Isr vypsáni všichni brokeři (ovšem leader se už nezmění – není k tomu ostatně důvod):

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic7
Topic: topic7   TopicId: 55pDYK4gSYSB2Cc4Me3eIA PartitionCount: 1       ReplicationFactor: 3    Configs:
        Topic: topic7   Partition: 0    Leader: 0       Replicas: 2,0,1 Isr: 0,1,2
Poznámka: producenti i konzumenti budou v průběhu těchto operací stále plně funkční!

18. Vytvoření a použití tématu s několika replikovanými oddíly

V závěrečné kapitole si ukažme, jak se vytvoří téma s několika oddíly, které jsou replikovány. Jedná se tedy o nejsložitější možnou konfiguraci, která umožňuje souběžnou činnost více konzumentů a navíc zajišťuje dostupnost oddílů i v případě, kdy nějaký broker zhavaruje. Téma bude mít tuto konfiguraci:

  • První oddíl bude replikován na prvním a druhém brokeru
  • Druhý oddíl bude replikován na prvním a třetím brokeru
  • Třetí oddíl bude replikován na druhém a třetím brokeru

Čísla oddílů i ID brokerů začínají od nuly, takže téma vytvoříme takto (povšimněte si syntaxe zápisu posledního parametru):

CS24 tip temata

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic8 --replica-assignment 0:1,0:2,1:2
Created topic topic8.
Poznámka: počet replik musí být konstantní! Není tedy možné, aby jeden z oddílů měl více replik, než oddíl jiný.

Samotná konfigurace tématu se nyní vypíše v této poměrně dobře pochopitelné podobě:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic8
Topic: topic8   TopicId: sLky-pgtQAibNaoguwwDVA PartitionCount: 3       ReplicationFactor: 2    Configs:
        Topic: topic8   Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: topic8   Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
        Topic: topic8   Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2
Poznámka: jedná se o ideální stav, kdy údaje v sekci Replicas odpovídají údajům v sekci Isr – všichni brokeři tedy běží a současně jsou repliky synchronizovány.

19. Repositář s pomocnými skripty a konfiguračními soubory

Zdrojové kódy dnes použitých producentů a konzumentů byly společně s konfiguračními soubory Zookeeperu a brokerů uloženy do repositáře, jenž je dostupný na adrese https://github.com/tisnik/slides/. V případě, že nebudete chtít klonovat celý repositář, můžete namísto toho použít odkazy na jednotlivé demonstrační příklady i další soubory, které naleznete v následující tabulce:

# Soubor Stručný popis Adresa
1 server1.properties konfigurace prvního brokera https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/server1.properties
2 server2.properties konfigurace druhého brokera https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/server2.properties
3 server3.properties konfigurace třetího brokera https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/server3.properties
4 zookeeper.properties konfigurace Zookeepera https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/zookeeper.properties
5 server1.sh skript pro spuštění prvního brokera https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/server1.sh
6 server2.sh skript pro spuštění druhého brokera https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/server2.sh
7 server3.sh skript pro spuštění třetího brokera https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/server3.sh
8 zookeeper.sh skript pro spuštění Zookeepera https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/zookeeper.sh

20. Odkazy na Internetu

  1. Kcli: is a kafka read only command line browser.
    https://github.com/cswank/kcli
  2. Kcli: a kafka command line browser
    https://go.libhunt.com/kcli-alternatives
  3. Kafka Connect and Schemas
    https://rmoff.net/2020/01/22/kafka-connect-and-schemas/
  4. JSON and schemas
    https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#json-schemas
  5. What, why, when to use Apache Kafka, with an example
    https://www.startdataengi­neering.com/post/what-why-and-how-apache-kafka/
  6. When NOT to use Apache Kafka?
    https://www.kai-waehner.de/blog/2022/01/04/when-not-to-use-apache-kafka/
  7. Microservices: The Rise Of Kafka
    https://movio.co/blog/microservices-rise-kafka/
  8. Building a Microservices Ecosystem with Kafka Streams and KSQL
    https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
  9. An introduction to Apache Kafka and microservices communication
    https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63
  10. kappa-architecture.com
    http://milinda.pathirage.org/kappa-architecture.com/
  11. Questioning the Lambda Architecture
    https://www.oreilly.com/i­deas/questioning-the-lambda-architecture
  12. Lambda architecture
    https://en.wikipedia.org/wi­ki/Lambda_architecture
  13. Kafka – ecosystem (Wiki)
    https://cwiki.apache.org/con­fluence/display/KAFKA/Eco­system
  14. The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
    http://cloudurable.com/blog/kafka-ecosystem/index.html
  15. A Kafka Operator for Kubernetes
    https://github.com/krallistic/kafka-operator
  16. Kafka Streams
    https://cwiki.apache.org/con­fluence/display/KAFKA/Kaf­ka+Streams
  17. Kafka Streams
    http://kafka.apache.org/do­cumentation/streams/
  18. Kafka Streams (FAQ)
    https://cwiki.apache.org/con­fluence/display/KAFKA/FAQ#FAQ-Streams
  19. Event stream processing
    https://en.wikipedia.org/wi­ki/Event_stream_processing
  20. Part 1: Apache Kafka for beginners – What is Apache Kafka?
    https://www.cloudkarafka.com/blog/2016–11–30-part1-kafka-for-beginners-what-is-apache-kafka.html
  21. What are some alternatives to Apache Kafka?
    https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka
  22. What is the best alternative to Kafka?
    https://www.slant.co/opti­ons/961/alternatives/~kaf­ka-alternatives
  23. A super quick comparison between Kafka and Message Queues
    https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0
  24. Kafka Queuing: Kafka as a Messaging System
    https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system
  25. Apache Kafka Logs: A Comprehensive Guide
    https://hevodata.com/learn/apache-kafka-logs-a-comprehensive-guide/
  26. Microservices – Not a free lunch!
    http://highscalability.com/blog/2014/4/8/mi­croservices-not-a-free-lunch.html
  27. Microservices, Monoliths, and NoOps
    http://blog.arungupta.me/microservices-monoliths-noops/
  28. Microservice Design Patterns
    http://blog.arungupta.me/microservice-design-patterns/
  29. REST vs Messaging for Microservices – Which One is Best?
    https://solace.com/blog/experience-awesomeness-event-driven-microservices/
  30. Kappa Architecture Our Experience
    https://events.static.linux­found.org/sites/events/fi­les/slides/ASPgems%20-%20Kappa%20Architecture.pdf
  31. Apache Kafka Streams and Tables, the stream-table duality
    https://towardsdatascience.com/apache-kafka-streams-and-tables-the-stream-table-duality-ee904251a7e?gi=f22a29cd1854
  32. Configure Self-Managed Connectors
    https://docs.confluent.io/kafka-connectors/self-managed/configuring.html#configure-self-managed-connectors
  33. Schema Evolution and Compatibility
    https://docs.confluent.io/plat­form/current/schema-registry/avro.html#schema-evolution-and-compatibility
  34. Configuring Key and Value Converters
    https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configuring-key-and-value-converters
  35. Introduction to Kafka Connectors
    https://www.baeldung.com/kafka-connectors-guide
  36. Kafka CLI: command to list all consumer groups for a topic?
    https://stackoverflow.com/qu­estions/63883999/kafka-cli-command-to-list-all-consumer-groups-for-a-topic
  37. Java Property File Processing
    https://www.w3resource.com/java-tutorial/java-propertyfile-processing.php
  38. Skipping bad records with the Kafka Connect JDBC sink connector
    https://rmoff.net/2019/10/15/skipping-bad-records-with-the-kafka-connect-jdbc-sink-connector/
  39. Kafka Connect Deep Dive – Error Handling and Dead Letter Queues
    https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
  40. Errors and Dead Letter Queues
    https://developer.confluent.io/learn-kafka/kafka-connect/error-handling-and-dead-letter-queues/
  41. Confluent Cloud Dead Letter Queue
    https://docs.confluent.io/clou­d/current/connectors/dead-letter-queue.html
  42. Dead Letter Queues (DLQs) in Kafka
    https://medium.com/@sannidhi.s.t/dead-letter-queues-dlqs-in-kafka-afb4b6835309
  43. Deserializer
    https://docs.confluent.io/plat­form/current/schema-registry/serdes-develop/serdes-json.html#json-schema-serializer-and-deserializer
  44. JSON, Kafka, and the need for schema
    https://mikemybytes.com/2022/07/11/json-kafka-and-the-need-for-schema/
  45. Using Kafka Connect with Schema Registry
    https://docs.confluent.io/plat­form/current/schema-registry/connect.html
  46. Zpracování dat reprezentovaných ve formátu JSON nástrojem jq
    https://www.root.cz/clanky/zpracovani-dat-reprezentovanych-ve-formatu-json-nastrojem-jq/
  47. Repositář projektu jq (GitHub)
    https://github.com/stedolan/jq
  48. GitHub stránky projektu jq
    https://stedolan.github.io/jq/
  49. 5 modern alternatives to essential Linux command-line tools
    https://opensource.com/ar­ticle/20/6/modern-linux-command-line-tools
  50. Návod k nástroji jq
    https://stedolan.github.i­o/jq/tutorial/
  51. jq Manual (development version)
    https://stedolan.github.io/jq/manual/
  52. Introducing JSON
    https://www.json.org/json-en.html
  53. Understanding JSON schema
    https://json-schema.org/understanding-json-schema/index.html
  54. JDBC Sink Connector for Confluent Platform
    https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/overview.html#jdbc-sink-connector-for-cp
  55. JDBC Connector (Source and Sink)
    https://www.confluent.io/hub/con­fluentinc/kafka-connect-jdbc
Seriál: Message brokery

Autor článku

Vystudoval VUT FIT a v současné době pracuje na projektech vytvářených v jazycích Python a Go.