Hlavní navigace

Práce s Kafkou z příkazové řádky: nástroje Kafkacat a Kcli

17. 6. 2021
Doba čtení: 26 minut

Sdílet

 Autor: Apache Foundation
V několika předchozích článcích jsme si ukázali, jak lze využívat služeb Apache Kafky z několika programovacích jazyků. Ve skutečnosti lze ovšem s Kafkou pracovat i přímo z příkazové řádky.

Obsah

1. Práce s Kafkou z příkazové řádky: nástroje Kafkacat a Kcli

2. Instalace Kafky

3. Spuštění služby ZooKeeper a jedné instance brokera

4. Uložení logů na ramdisk pro vývojový počítač

5. Konfigurace většího množství brokerů

6. Nástroj kafkacat

7. Instalace nástroje kafkacat

8. Nástroj kcli

9. Instalace nástroje kcli

10. Připojení kcli k brokeru

11. Příprava dat – zpráv poslaných do témat

12. Prohlížení témat nástrojem kcli

13. Zápis a prohlížení strukturovaných dat

14. Vytvoření tématu s více oddíly

15. Zápis dat se specifikací oddílů

16. Výběr oddílu v nástroji kcli

17. Vyhledávání zpráv, přesun na zvolený offset a další možnosti nástroje kcli

18. Repositář s použitými datovými soubory i skripty

19. Odkazy na relevantní články na Rootu

20. Odkazy na Internetu

1. Práce s Kafkou z příkazové řádky: nástroje Kafkacat a Kcli

Na stránkách Roota jsme se již několikrát setkali s užitečnou službou Apache Kafka, která se v současnosti používá v relativně velkém množství aplikačních oblastí, od zpracování dat, přes klasické pipeliny, mikroslužby až po využití Kafky ve funkci message brokera popř. v architekturách lambda a kappa (resp. spíše původní stránka o tomto tématu). Typicky se s Kafkou komunikuje přímo z aplikací, které mohou být vyvinuty v různých programovacích jazycích (víme již, jak vytvořit klienta pro Python, Go a jazyk Clojure). Jedinou podmínkou je, aby pro daný jazyk (resp. přesněji řečeno pro ekosystém postavený okolo tohoto jazyka) existovala knihovna poskytující rozhraní ke komunikačnímu protokolu využívaného Apache Kafkou.

Obrázek 1: Logo nástroje Apache Kafka, kterému se budeme dnes věnovat.

Existuje ovšem i poněkud odlišný přístup k práci s Kafkou, který oceníme při učení, testování, hledání problémů při posílání či příjmu zpráv, popř. při administrativních úkonech, které s Kafkou souvisí. Tento přístup je založen na využití příkazového řádku a nástrojů, které jsou z něj spouštěny a dokážou s Kafkou pracovat (a to pochopitelně i vzdáleně). Nejedná se vlastně o žádnou žhavou novinku, protože i pro databáze (relační i postrelační) existují nástroje ovládané z příkazové řádky, které tvoří alternativu k databázovým ovladačům využívaných v aplikacích (psql, sqlite3, redis-cli atd.). V případě Apache Kafky se s úspěchem používá nástroj nazvaný Kafkacat, který může být s mnoha výhodami doplněn o poněkud méně známý nástroj pojmenovaný přímočaře Kcli. Se základními možnostmi poskytovanými oběma zmíněnými nástroji se seznámíme v navazujících kapitolách.

Obrázek 2: Příklad rozdělení témat (topic) v clusteru.

Poznámka: s nástrojem Kafkacat jsme se již na stránkách Roota ve stručnosti seznámili – viz odkazy na příslušné články uvedené v devatenácté kapitole. Ovšem pro úplnost si některé informace o Kafkacatu znovu popíšeme dnes, aby byly všechny důležité a (doufejme že) užitečné informace dostupné v jediném přehledovém článku.

Obrázek 3: Příklad použití ekosystému Kafky (Kafka Streams, konektory pro databáze atd.).

2. Instalace Kafky

Popišme si nyní ve stručnosti způsob lokální instalace Apache Kafky. V případě, že je na počítači nainstalováno JRE (tedy běhové prostředí Javy), je instalace Kafky pro testovací a vývojové účely triviální. Tarball s instalací Kafky lze získat z adresy https://www.apache.org/dyn/clo­ser.cgi?path=/kafka/2.8.0/kaf­ka2.12–2.8.0.tgz (verze platná v době vydání dnešního článku). Stažení a rozbalení tarballu se provede následovně:

$ wget http://apache.miloslavbrada.cz/kafka/2.8.0/kafka_2.12-2.8.0.tgz
$ tar -xzf kafka_2.12-2.8.0.tgz
$ cd kafka_2.12-2.8.0

Po rozbalení stáhnutého tarballu získáme adresář, v němž se nachází všechny potřebné Java archivy (JAR), konfigurační soubory (v podadresáři config) a několik pomocných skriptů (v podadresáři bin). Pro spuštění Zookeepera a brokerů je zapotřebí mít nainstalovánu JRE (neboli již zmíněný Java Runtime Environment) a samozřejmě též nějaký shell (BASH, cmd, …).

Obrázek 4: Sledování činnosti brokeru přes standardní nástroj JConsole.

Prozatím nebudeme žádné další nastavení ani žádné další nástroje potřebovat, pochopitelně s výjimkou dále popsaných nástrojů Kafkacat a Kcli.

Obrázek 5: Sledování činnosti brokeru přes standardní nástroj JConsole.

3. Spuštění služby ZooKeeper a jedné instance brokera

Po (doufejme že úspěšné a bezproblémové) instalaci Kafky již můžeme spustit ZooKeeper a jednu instanci brokera (a to přesně v tomto pořadí! – tedy nejdříve ZooKeeper a posléze broker). Konfigurace ZooKeepera je uložena v souboru config/zookeeper.properties a zajímat nás budou především tyto volby – adresář, kam ZooKeeper ukládá svoje data, port, který použijí brokeři a omezení počtu připojení jednoho klienta v daný okamžik:

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
Poznámka: hodnota maxClientCnxns uložená v konfiguračním souboru v tomto případě neznamená, že by se nemohly připojit žádní klienti, ale je že vypnutý mechanismus, který zabezpečuje infrastrukturu Kafky před některými typy DOS útoků. Na disku, kde je adresář dataDir by také mělo být dostatek místa, protože ZooKeeper v některých případech mívá větší nároky. Další informace lze nalézt na stránce https://zookeeper.apache.or­g/doc/r3.5.6/.

Nyní již můžeme Zookeepera spustit z příkazové řádky (ideálně ve vyhrazeném terminálu):

$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2020-01-20 17:00:07,823] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2020-01-20 17:00:07,825] WARN config/zookeeper.properties is relative. Prepend ./ to indicate that you're sure! (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2020-01-20 17:00:07,827] INFO clientPortAddress is 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
...
...
[2020-01-20 17:00:07,947] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
[2020-01-20 17:00:26,978] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)

Konfigurace jednoho brokera je uložená v souboru config/server.properties. Samotný konfigurační soubor obsahuje několik sekcí:

  1. Port, na kterém broker naslouchá, jeho ID, počet použitých vláken pro IO operace a počet vláken pro komunikaci.
  2. Velikost bufferů, maximální povolená velikost požadavků (což omezuje velikost zprávy) atd.
  3. Nastavení počtu partitions
  4. Nastavení retence dat
  5. Připojení k Zookeeperovi
broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
 
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
 
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
 
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
Poznámka: i velikost adresáře log.dirs roste, a to mnohdy velmi rychle, takže se vyplatí sledovat příslušné metriky.

Spuštění jednoho brokera z příkazové řádky vypadá jednoduše:

$ bin/kafka-server-start.sh config/server.properties

Alternativně je možné Zookeepera i Kafku (jednu instanci brokera) spustit v Dockeru:

$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka
Poznámka: předchozí nastavení předpokládá, že současně na stejném stroji nepoběží žádná další instance Kafky ani Zookeepera. Pokud budete potřebovat spustit větší množství brokerů, je nutné minimálně změnit mapování portů (přepínače -p).
kappa

Obrázek 6: Schéma aplikace založené na architektuře kappa.

4. Uložení logů na ramdisk pro vývojový počítač

Poznámka: tyto informace byly uvedeny i v předchozím článku.

Jak broker Apache Kafka, tak i ZooKeeper zapisují poměrně velké množství dat na disk. Pokud Kafku spouštíte lokálně pouze pro účely vývoje (a tedy bez reálných dat, které je zapotřebí zachovat), může být užitečné všechny zápisy provádět na ramdisk – výsledkem bude jak rychlejší start obou procesů (cca 2× urychleno), tak i menší „opotřebování“ SSD. Konkrétně na mém vývojovém počítači mám vytvořen ramdisk o velikosti jednoho gigabajtu, který je připojen do adresáře /tmp/ramdisk. Konfigurace Apache Kafky i ZooKeepera je nutné nepatrně pozměnit, aby se pracovní data ukládala do tohoto adresáře (resp. přesněji řečeno do podadresářů). Konkrétní nastavení je ukázáno v dalším odstavci.

Konfigurace ZooKeepera je uložena v souboru config/zookeeper.properties:

...
...
...
# the directory where the snapshot is stored.
dataDir=/tmp/ramdisk/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
 
# Enable snapshot.trust.empty config if the ZK upgrade from 3.4.X to 3.5.6 is failing
# with "java.io.IOException: No snapshot found, but there are log entries" error.
# Check upgrade docs for more details.
# snapshot.trust.empty=true
...
...
...

Konfigurace brokera je uložena v souboru :

...
...
...
############################# Log Basics #############################
 
# A comma separated list of directories under which to store log files
log.dirs=/tmp/ramdisk/kafka-logs
 
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
 
############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
...
...
...

5. Konfigurace většího množství brokerů

Spuštění většího množství brokerů je možné a v praxi i velmi časté. Postačuje pouze zkopírovat soubor config/server.properties do několika podobných souborů nazvaných například config/server0.properties, config/server1.properties atd. Dále je nutné provést následující úpravy v každém z těchto konfiguračních souborů:

  1. broker.id musí být unikátní hodnota (každý broker je nutné jednoznačně identifikovat), takže postačuje ID postupně zvyšovat: 0, 1, 2, atd.
  2. listeners musí obsahovat unikátní číslo portu, takže například 9092, 9192, 9292 atd. (ale můžete použít jakýkoli volný port větší než 1024 – porty s menším číslem jsou vyhrazeny pro roota).
  3. log.dirs by taktéž mělo ukazovat na unikátní adresář nesdílený s ostatními instancemi brokera. Pokud půjde o sdílený adresář, mohou sice brokeři zdánlivě pracovat, ovšem nikoli bezproblémově.

Po splnění těchto podmínek je možné brokery běžným způsobem spustit – každý broker jako samostatný proces:

$ nohup bin/kafka-server-start.sh config/server1.properties &
$ nohup bin/kafka-server-start.sh config/server2.properties &
$ nohup bin/kafka-server-start.sh config/server3.properties &
...
...
...

Obrázek 7: Sledování činnosti brokeru přes standardní nástroj JConsole.

6. Nástroj kafkacat

Součástí stále se rozšiřujícího a vylepšujícího se ekosystému vytvořeného okolo Apache Kafky je i užitečný nástroj nazvaný kafkacat (autoři ho taktéž označují jako „netcat for Kafka“, v kontextu REST API by se hodilo i „curl for Kafka“). Tento nástroj, který naleznete na adrese https://github.com/edenhill/kafkacat slouží pro komunikaci s lokálními i vzdáleně běžícími brokery přímo z příkazové řádky. Pochopitelně se s velkou pravděpodobností nebude jednat o řešení používané v produkčním kódu, ovšem možnost vytvořit producenta zpráv či jejich konzumenta přímo z příkazového řádku je použitelná jak při vývoji, tak i při řešení problémů, které mohou při běhu aplikace nastat. Tento nástroj budeme používat později, při ukázkách nasazení Apache Kafky, takže se v této kapitole krátce zmiňme o příkladech použití převzatých z oficiální dokumentace. Všechny ukázky předpokládají, že broker běží na lokálním počítači (localhost) na portu 9092.

Poznámka: jedná se o nativní aplikaci, což je velmi dobré řešení, protože se kafkacat spouští poměrně často. To je v ostrém kontrastu se samotnou Kafkou, která sice startuje (jako každá aplikace pod JVM) pomaleji, ovšem doba provozu se počítá spíše v měsících a nikoli v sekundách tak, jak je tomu u nástrojů spouštěných z interaktivní příkazové řádky.

Výpis informací o všech dostupných tématech (topic) a jejich konfigurace:

$ kafkacat -L -b localhost:9092

Spuštění nového producenta zpráv čtených ze souborů specifikovaných na příkazové řádce:

$ kafkacat -P -b localhost:9092 -t filedrop -p 0 file1.bin file2.txt /etc/motd dalsi_soubor.tgz

Producent zpráv zapisovaných na standardní vstup uživatelem (co zpráva, to jeden řádek):

$ kafkacat -P -b localhost:9092 -t "upload"

Dtto, ale u každé zprávy lze specifikovat i klíč oddělený od těla zprávy dvojtečkou:

$ kafkacat -P -b localhost:9092 -t "upload" -K:
Poznámka: v tomto případě musí být každá zpráva zapsána na jeden řádek ve formátu „klíč:hodnota“. Zadávání se ukončuje klasicky stiskem klávesové zkratky Ctrl+D.

Konzument zpráv posílaných do tématu „upload“:

$ kafkacat -C -b localhost:9092 -t "upload"

Přečtení posledních 1000 zpráv z tématu s názvem „téma1“. Po této operaci se konzument automaticky ukončí, tj. nebude čekat na další zprávy:

$ kafkacat -C -b localhost:9092 -t téma1 -p 0 -o -1000 -e

Spuštění konzumentů, kteří jsou přihlášení k tématu „téma1“:

$ kafkacat -b localhost:9092 -G skupina_konzumentů téma1

Přihlásit se lze i k odběru většího množství témat:

$ kafkacat -b localhost:9092 -G skupina_konzumentů téma1 téma2
Poznámka: možnosti nástroje kafkacat jsou pochopitelně mnohem větší; některé z nich si ještě popíšeme v dalším textu.

7. Instalace nástroje kafkacat

Nástroj kafkacat se skládá z několika komponent, které jsou většinou naprogramovány v jazyku C popř. v C++. Překlad a slinkování lze provést takovým způsobem, že výsledkem bude jediný spustitelný soubor nazvaný též „kafkacat“, který bude obsahovat i všechny potřebné (tedy staticky slinkované) knihovny, což zjednodušuje nasazení této utility. Na druhou stranu však nebude možné použít běžné prostředky operačního systému při updatu knihoven, například při opravách CVE atd.

Napřed se provede naklonování repositáře se zdrojovými kódy Kafkacatu:

$ git clone git@github.com:edenhill/kafkacat.git

po přesunu do naklonovaného repositáře:

$ cd kafkacat

se překlad provede běžnou trojkombinací příkazů configure+make+make install:

$ ./configure
$ make
$ sudo make install

Alternativně je ovšem možné použít připravený skript bootstrap.sh, který zajistí stažení všech potřebných knihoven (crypto atd.) s jejich překladem:

$ ./bootstrap.sh

Výsledkem je potom skutečně „tlustý“ binární soubor o velikosti přibližně dvaceti megabajtů:

$ ls -l ~/bin/kafkacat
-rwxrwxr-x. 1 ptisnovs ptisnovs 20987784 17. led 14.34 /home/ptisnovs/bin/kafkacat

Některé linuxové distribuce obsahují přímo ve svých repositářích balíček kafkacat, což samozřejmě celý proces instalace (a případných updatů) značně zjednodušuje. Například na systémech založených na Debianu postačuje použít:

$ apt-get install kafkacat
$ kafkacat 
Error: -b <broker,..> missing
 
Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2019, Magnus Edenhill
Version 1.5.0-4-ge461fb (JSON, Avro, librdkafka 1.1.0 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer)
...
...
...

K dispozici je pochopitelně i vestavěná nápověda:

Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2019, Magnus Edenhill
Version 1.5.0-4-ge461fb (JSON, Avro, librdkafka 1.1.0 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer)
 
 
General options:
  -C | -P | -L | -Q  Mode: Consume, Produce, Metadata List, Query mode
  -G <group-id>      Mode: High-level KafkaConsumer (Kafka >=0.9 balanced consumer groups)
                     Expects a list of topics to subscribe to
  -t <topic>         Topic to consume from, produce to, or list
  -p <partition>     Partition
  -b <brokers,..>    Bootstrap broker(s) (host[:port])
  -D <delim>         Message delimiter character:
                     a-z.. | \r | \n | \t | \xNN
                     Default: \n
  -E                 Do not exit on non fatal error
  -K <delim>         Key delimiter (same format as -D)
  -c <cnt>           Limit message count
  -F <config-file>   Read configuration properties from file,
  ...
  ...
  ...

8. Nástroj kcli

Ve druhé části článku se zaměříme na popis nástroje nazvaného Kcli. Tento nástroj slouží k interaktivnímu prohlížení zpráv uložených v jednotlivých tématech (topic) Kafky. Kcli se tedy snaží o to, aby data byla přístupná nikoli pouze ve formě proudu (streamu) údajů (to již zajišťuje Kafkacat), ale v ucelené formě, podobně, jako je tomu v databázích. Navíc je podporováno i vyhledávání zpráv podle zadaných vzorů. Sice se v žádném případě nejedná o ten nejrychlejší možný způsob práce s Kafkou, ovšem na druhé straně je tento přístup v mnoha případech velmi užitečný – a do jisté míry zahlazuje dualitu pohledu na to, co vlastně data reprezentují: buď se jedná o uložení konkrétního stavu modelovaného světa, nebo naopak o sérii změn stavu tohoto světa – oba přístupy jsou přitom relevantní a mají své výhody, ale i zápory. Toto téma je rozpracováno například zde v souvislosti s technologiemi Kafka Streams a KTable.

Obrázek 8: Nápověda ke klávesovým zkratkám nástroje kcli.

9. Instalace nástroje kcli

Nástroj Kcli je kompletně naprogramován v jazyku Go, takže jeho instalace je velmi jednoduchá – v Go se totiž snadno tvoří staticky slinkované spustitelné binární soubory. Ty jsou pro všechny kombinace nejpoužívanější operační systém + nejpoužívanější architektura dostupné na adrese https://github.com/cswank/kcli/re­leases, odkud lze stáhnout tarball, rozbalit ho a ihned začít používat.

Alternativně je možné získat a přeložit poslední vývojářskou verzi nástroje Kcli. Pro tuto operaci je nutné mít nainstalován jazyk Go, a to minimálně ve verzi 1.11 (vyzkoušeno s Go 1.13, starší verze již nemám k dispozici). Stažení zdrojových kódů, jejich překlad a instalace výsledného spustitelného souboru se provede příkazem:

$ go get -u github.com/cswank/kcli
Poznámka: v tomto případě je výsledný spustitelný binární soubor uložen v adresáři, na který ukazuje $GOPATH/bin. Pokud je například proměnná prostředí GOPATH nastavena na /home/UŽIVATEL/go, bude výsledný spustitelný soubor nazvaný kcli uložen v adresáři /home/UŽIVATEL/go/bin. Buď si tedy nastavte proměnnou prostředí PATH tak, aby ukazovala i do tohoto adresáře, nebo kcli zkopírujte (popř. vytvořte symbolický odkaz) do libovolného adresáře zmíněného právě v $PATH.

10. Připojení kcli k brokeru

V případě, že broker běží na lokálním počítači na standardním portu (9092), je možné přímo kcli spustit, a to bez nutnosti specifikace dalších parametrů. Pokud je nutné se připojit ke Kafce běžící na odlišném portu (či na jiném počítači), použije se přepínač -a resp. –addresses. Už z pojmenování tohoto přepínače je zřejmé, že se lze připojit k většímu počtu brokerů:

$ kcli --help
 
usage: kcli [<flags>]
 
Flags:
      --help             Show context-sensitive help (also try --help-long and
                         --help-man).
  -a, --addresses=localhost:9092 ...
                         comma separated list of kafka addresses
  -l, --log=LOG          for debugging, set the log output to a file
  -t, --topic=TOPIC      go directly to a topic
  -p, --partition=-1     go directly to a partition of a topic
  -o, --offset=-1        go directly to a message
  -d, --decoder=DECODER  path to a plugin to decode kafka messages
Poznámka: v případě, že logy Apache Kafky neobsahují žádné téma, je nástroj kcli ihned po svém spuštění ukončen, takže zdánlivě nepracuje korektně. Tento „problém“ opravíme v následující kapitole.

11. Příprava dat – zpráv poslaných do témat

Abychom si mohli kcli odzkoušet v praxi, vytvoříme tři nová témata (topicy) a pošleme do nich nějaké zprávy. Téma se automaticky vytvoří s první poslanou zprávou, čímž se nám situace zjednodušuje. Vytvoříme tedy téma topic1 a na příkazové řádce zadáme trojici zpráv (jejich těl). Zadávání se ukončuje standardní klávesovou zkratkou Ctrl+D:

$ kafkacat -P -b localhost:9092 -t topic1
foo
bar
baz

Totéž provedeme pro téma  názvem topic2:

$ kafkacat -P -b localhost:9092 -t topic2
first
second
third

A do třetice pošleme tři zprávy do tématu s názvem topic3:

$ kafkacat -P -b localhost:9092 -t topic3
jedna
dva
tri

Pro zajímavost se nyní podívejme, jak vlastně vypadá struktura souborů a podadresářů vytvořených brokerem. V rámci úvodních kapitol jsme nastavili cestu k logům na adresář /tmp/kafka-logs, který nyní můžeme prozkoumat:

$ tree /tmp/kafka-logs
 
kafka-logs
├── cleaner-offset-checkpoint
├── log-start-offset-checkpoint
├── meta.properties
├── recovery-point-offset-checkpoint
├── replication-offset-checkpoint
├── topic1-0
│   ├── 00000000000000000000.index
│   ├── 00000000000000000000.log
│   ├── 00000000000000000000.timeindex
│   ├── 00000000000000000003.snapshot
│   └── leader-epoch-checkpoint
├── topic2-0
│   ├── 00000000000000000000.index
│   ├── 00000000000000000000.log
│   ├── 00000000000000000000.timeindex
│   ├── 00000000000000000003.snapshot
│   └── leader-epoch-checkpoint
└── topic3-0
    ├── 00000000000000000000.index
    ├── 00000000000000000000.log
    ├── 00000000000000000000.timeindex
    ├── 00000000000000000003.snapshot
    └── leader-epoch-checkpoint
 
3 directories, 20 files
Poznámka: povšimněte si, že pro každé téma je vytvořen samostatný podadresář s logy a dalšími pomocnými soubory.

Pracovní adresář je vytvořen i pro samotného ZooKeepera (což je ale téma mimo rámec dnešního článku):

$ tree zookeeper/
 
zookeeper/
└── version-2
    ├── log.1
    ├── log.1e
    ├── snapshot.0
    └── snapshot.1d
 
1 directory, 4 files

12. Prohlížení témat nástrojem kcli

Nyní již můžeme spustit nástroj kcli:

$ kcli -a localhost:9092

Po spuštění by se měla zobrazit obrazovka s nabídkou všech nalezených témat, která daný broker obsluhuje:

Obrázek 9: Seznam témat obsluhovaných brokerem.

Nyní vybereme první téma nazvané topic1. Zobrazí se nový seznam, tentokrát s oddíly. Dané téma není rozděleno na více oddílů, takže se zobrazí jediný řádek:

Obrázek 10: Výběr oddílů s informacemi o offsetu (čtení) a offsetu poslední (nejmladší) zprávy.

Pokud vybereme první (a jediný) oddíl, zobrazí se již seznam zpráv se začátkem jejich obsahu:

Obrázek 11: Zobrazení offsetu a zkráceného obsahu zpráv uložených ve vybraném tématu a oddílu.

13. Zápis a prohlížení strukturovaných dat

Nástrojem Kafkacat je možné do vybraného tématu poslat i obsahy souborů. Následující skript umožňuje přečíst všechny soubory s koncovkou „json“ umístěné v aktuálním adresáři a poslat jejich obsah do tématu pojmenovaného results:

# Update the port accordingly (this one is for Kafka running inside Docker)
KAFKA_PORT=9092
 
# Produce messages from current directory
# All JSON files in current directory will be sent to Kafka via Kafkacat
for file in *.json
do
    echo $file
    kafkacat -b localhost:${KAFKA_PORT} -P -t results $file
    # It is possible to change the sleep value (or remove it completely)
    sleep 1
done

V nástroji kcli po výběru tohoto tématu získáme seznam zpráv (nyní skutečně se zkráceným obsahem):

Obrázek 12: Zobrazení offsetu a zkráceného obsahu zpráv uložených ve vybraném tématu a oddílu – pozor ovšem na chybu v kcli, která způsobuje špatný přechod na novou zprávu u víceřádkového zobrazení.

Jednotlivé zprávy lze vybrat kurzorovými klávesami a po stisku Enter se zobrazí celá zpráva:

Obrázek 13: Tělo zprávy.

Poznámka: povšimněte si, že kcli dokáže zvýraznit JSON formát, což je velmi užitečné.

14. Vytvoření tématu s více oddíly

Systém Apache Kafka umožňuje ukládání zpráv (zde se ovšem, jak již víme, používá termín záznam – record) do různých témat, přičemž každé téma je rozděleno do oddílů neboli partition (samozřejmě je možné pro téma vyhradit pouze jediný oddíl a tvářit se, že máme k dispozici „vylepšenou“ frontu – ostatně přesně takto lze s Kafkou začít). Rozdělení do oddílů se provádí z několika důvodů. Jedním z nich je rozdělení zátěže, protože jednotlivé oddíly mohou být provozovány na různých počítačích v mnohdy i velmi rozsáhlém clusteru.

Dále se dělení provádí z toho důvodu, že každý oddíl obsahuje neměnnou (immutable) sekvenci zpráv. Oddíly pro jednotlivá témata lze zpracovávat v několika brokerech umístěných do clusteru a tak zajistit potřebný load balancing, případnou replikaci zpráv atd. Každá zpráva uložená do oddílu má přiřazen jednoznačný offset (reprezentovaný v Javě typem long). Navíc je možné, aby se pro každé téma udržovalo několik logů (partition logs), což umožňuje připojení většího množství konzumentů zpráv k jednomu tématu s tím, že tito konzumenti budou pracovat paralelně a nezávisle na sobě.

Obrázek 14: Příklad rozdělení témat v clusteru.

U většiny nasazení Kafky se taktéž počítá s využitím většího množství instancí brokerů, z nichž je vytvořen cluster. Zde se setkáme s důležitým termínem replikace – každý oddíl je typicky replikován na několika message brokerech v clusteru (ovšem nemusí se jednat o všechny brokery, replikace se provádí například na tři brokery ve větším clusteru).

To však není vše, jelikož je ve skutečnosti konfigurace poněkud složitější – každý oddíl totiž může být replikován na více počítačích, přičemž jeden z těchto oddílů je takzvaným „leaderem“ a ostatní jsou „followeři“. Zápis nových zpráv popř. čtení se provádí vždy jen v rámci leaderu, ovšem změny jsou replikovány na všechny kopie oddílu. Ve chvíli, kdy z nějakého (libovolného) důvodu dojde k pádu „leadera“, převezme jeho roli jeden z dalších uzlů. Pokud tedy existuje N uzlů s replikou oddílu, bude systém funkční i ve chvíli, kdy zhavaruje N-1 uzlů!

Poznámka: ve skutečnosti se lze snadno dostat do situace, kdy bude neustále prováděn rebalancing, ovšem to je již složitější téma související s reálným nasazením Kafky.

Téma zpracovávané Kafkou může na clusteru vypadat například následovně:

          +---+---+---+---+---+---+
oddíl #0  | 0 | 1 | 2 | 3 | 4 | 5 | ...
          +---+---+---+---+---+---+
oddíl #1  | 0 | 1 | 2 | ...
          +---+---+---+
oddíl #2  | ...
          +---+---+---+---+---+---+---+---+---+
oddíl #3  | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ...
          +---+---+---+---+---+---+---+---+---+

Vytvořme si nyní nové téma (tentokrát explicitně) s tím, že bude mít tři oddíly. Pro tento účel použijeme skript kafka-topics.sh, který je dodáván společně s Apache Kafkou (je v podadresáři bin):

$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic partitioned
 
Created topic partitioned.

15. Zápis dat se specifikací oddílů

Další pomocný skript zapíše obsahy souborů s koncovkou „.json“, které jsou nalezeny v aktuálním adresáři do stejného tématu, ovšem do všech třech vytvořených oddílů:

# Update the port accordingly (this one is for Kafka running inside Docker)
KAFKA_PORT=9092
 
# Produce messages from current directory
# All JSON files in current directory will be sent to Kafka via Kafkacat
for file in *.json
do
    echo $file
    kafkacat -b localhost:${KAFKA_PORT} -P -t partitioned -p 0 $file
    kafkacat -b localhost:${KAFKA_PORT} -P -t partitioned -p 1 $file
    kafkacat -b localhost:${KAFKA_PORT} -P -t partitioned -p 2 $file
    # It is possible to change the sleep value (or remove it completely)
    sleep 1
done

16. Výběr oddílu v nástroji kcli

Podívejme se nyní, jak se téma rozdělené do většího množství oddílů zobrazí v nástroji kcli:

Obrázek 15: Obrazovka s výběrem tématu zůstala zachována.

Obrázek 16: Téma partitioned je rozděleno na tři oddíly.

Obrázek 17: Zobrazení obsahu tématu partitioned, konkrétně oddílu číslo 1.

Obrázek 18: Opět lze (pochopitelně) zobrazit i celé tělo zprávy (navíc se zvýrazněnou syntaxí).

MIF temata

17. Vyhledávání zpráv, přesun na zvolený offset a další možnosti nástroje kcli

Klávesovou zkratkou / je možné spustit vyhledávání textu ve zprávách, což je velmi užitečná operace. Pokud je řetězec v nějaké zprávě nalezen, je upraven i offset v kcli (tedy dojde k přesunu na tuto zprávu). Přímý přeskok na zvolený offset zajišťuje klávesová zkratka Ctrl+j. A poslední užitečná věc, o které se zmíníme, souvisí s možností tisku obsahu zprávy či zpráv při opouštění kcli s využitím klávesové zkratky Ctrl+p. Obsah aktuálně zobrazených zpráv se pošle na standardní výstup a je ho možné zpracovat dalšími nástroji, například v koloně.

18. Repositář s použitými datovými soubory i skripty

Datové soubory i podpůrné skripty použité v dnešním článku byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/pre­sentations/. V případě, že nebudete chtít klonovat celý repositář, můžete namísto toho použít odkazy na jednotlivé soubory, které naleznete v následující tabulce:

# Příklad/soubor Stručný popis Cesta
1 produce.sh produkce (poslání) obsahu datových souborů do tématu s jedním oddílem https://github.com/tisnik/pre­sentations/blob/master/kcli/pro­duce.sh
2 produce2.sh produkce (poslání) obsahu datových souborů do tématu s větším množstvím oddílů https://github.com/tisnik/pre­sentations/blob/master/kcli/pro­duce2.sh
       
3 05_rules_hits.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/05_ru­les_hits.json
4 05_rules_hits_org1.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/05_ru­les_hits_org1.json
5 10_rules_hits.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/10_ru­les_hits.json
6 10_rules_hits_org1.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/10_ru­les_hits_org1.json
7 15_rules_hits.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/15_ru­les_hits.json
8 15_rules_hits_org1.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/15_ru­les_hits_org1.json
9 big_results.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/big_re­sults.json
10 big_results_no_skips.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/big_re­sults_no_skips.json
11 big_results_no_skips_tutorial.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/big_re­sults_no_skips_tutorial.json
12 big_results_org1.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/big_re­sults_org1.json
13 big_results_tutorial.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/big_re­sults_tutorial.json
14 no_hits.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/no_hit­s.json
15 no_hits_no_skips.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/no_hit­s_no_skips.json
16 result01.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/re­sult01.json
17 result02.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/re­sult02.json
18 result03.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/re­sult03.json
19 result04.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/re­sult04.json
20 result05.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/re­sult05.json
21 tutorial_only.json datový soubor ve formátu JSON, který je poslán do vybraného tématu https://github.com/tisnik/pre­sentations/blob/master/kcli/tu­torial_only.json

19. Odkazy na relevantní články na Rootu

  1. Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách
    https://www.root.cz/clanky/pouziti-nastroje-apache-kafka-v-aplikacich-zalozenych-na-mikrosluzbach/
  2. Apache Kafka: distribuovaná streamovací platforma
    https://www.root.cz/clanky/apache-kafka-distribuovana-streamovaci-platforma/
  3. Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw
    https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw/
  4. Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw (2. část)
    https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw-2-cast/
  5. Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony)
    https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-projektu-apache-kafka-jazyku-clojure-a-knihovne-jackdaw-streamy-a-kolony/
  6. Vývoj služeb postavených na systému Apache Kafka v jazyku Go
    https://www.root.cz/clanky/vyvoj-sluzeb-postavenych-na-systemu-apache-kafka-v-jazyku-go/

20. Odkazy na Internetu

  1. Kcli: is a kafka read only command line browser.
    https://github.com/cswank/kcli
  2. Kcli: a kafka command line browser
    https://go.libhunt.com/kcli-alternatives
  3. Awesome Go
    https://github.com/avelino/awesome-go
  4. Real-Time Payments with Clojure and Apache Kafka (podcast)
    https://www.evidentsystem­s.com/news/confluent-podcast-about-apache-kafka/
  5. Microservices: The Rise Of Kafka
    https://movio.co/blog/microservices-rise-kafka/
  6. Building a Microservices Ecosystem with Kafka Streams and KSQL
    https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
  7. An introduction to Apache Kafka and microservices communication
    https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63
  8. kappa-architecture.com
    http://milinda.pathirage.org/kappa-architecture.com/
  9. Questioning the Lambda Architecture
    https://www.oreilly.com/i­deas/questioning-the-lambda-architecture
  10. Lambda architecture
    https://en.wikipedia.org/wi­ki/Lambda_architecture
  11. Kafka – ecosystem (Wiki)
    https://cwiki.apache.org/con­fluence/display/KAFKA/Eco­system
  12. The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
    http://cloudurable.com/blog/kafka-ecosystem/index.html
  13. A Kafka Operator for Kubernetes
    https://github.com/krallistic/kafka-operator
  14. Kafka Streams
    https://cwiki.apache.org/con­fluence/display/KAFKA/Kaf­ka+Streams
  15. Kafka Streams
    http://kafka.apache.org/do­cumentation/streams/
  16. Kafka Streams (FAQ)
    https://cwiki.apache.org/con­fluence/display/KAFKA/FAQ#FAQ-Streams
  17. Event stream processing
    https://en.wikipedia.org/wi­ki/Event_stream_processing
  18. Part 1: Apache Kafka for beginners – What is Apache Kafka?
    https://www.cloudkarafka.com/blog/2016–11–30-part1-kafka-for-beginners-what-is-apache-kafka.html
  19. What are some alternatives to Apache Kafka?
    https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka
  20. What is the best alternative to Kafka?
    https://www.slant.co/opti­ons/961/alternatives/~kaf­ka-alternatives
  21. A super quick comparison between Kafka and Message Queues
    https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0
  22. Kafka Queuing: Kafka as a Messaging System
    https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system
  23. Apache Kafka Logs: A Comprehensive Guide
    https://hevodata.com/learn/apache-kafka-logs-a-comprehensive-guide/
  24. Microservices – Not a free lunch!
    http://highscalability.com/blog/2014/4/8/mi­croservices-not-a-free-lunch.html
  25. Microservices, Monoliths, and NoOps
    http://blog.arungupta.me/microservices-monoliths-noops/
  26. Microservice Design Patterns
    http://blog.arungupta.me/microservice-design-patterns/
  27. REST vs Messaging for Microservices – Which One is Best?
    https://solace.com/blog/experience-awesomeness-event-driven-microservices/
  28. Kappa Architecture Our Experience
    https://events.static.linux­found.org/sites/events/fi­les/slides/ASPgems%20-%20Kappa%20Architecture.pdf
  29. Apache Kafka Streams and Tables, the stream-table duality
    https://towardsdatascience.com/apache-kafka-streams-and-tables-the-stream-table-duality-ee904251a7e?gi=f22a29cd1854

Autor článku

Pavel Tišnovský vystudoval VUT FIT a v současné době pracuje ve společnosti Red Hat, kde vyvíjí nástroje pro OpenShift.io.