Obsah
1. Práce s Kafkou z příkazové řádky: nástroje Kafkacat a Kcli
3. Spuštění služby ZooKeeper a jedné instance brokera
4. Uložení logů na ramdisk pro vývojový počítač
5. Konfigurace většího množství brokerů
7. Instalace nástroje kafkacat
11. Příprava dat – zpráv poslaných do témat
12. Prohlížení témat nástrojem kcli
13. Zápis a prohlížení strukturovaných dat
14. Vytvoření tématu s více oddíly
15. Zápis dat se specifikací oddílů
16. Výběr oddílu v nástroji kcli
17. Vyhledávání zpráv, přesun na zvolený offset a další možnosti nástroje kcli
18. Repositář s použitými datovými soubory i skripty
19. Odkazy na relevantní články na Rootu
1. Práce s Kafkou z příkazové řádky: nástroje Kafkacat a Kcli
Na stránkách Roota jsme se již několikrát setkali s užitečnou službou Apache Kafka, která se v současnosti používá v relativně velkém množství aplikačních oblastí, od zpracování dat, přes klasické pipeliny, mikroslužby až po využití Kafky ve funkci message brokera popř. v architekturách lambda a kappa (resp. spíše původní stránka o tomto tématu). Typicky se s Kafkou komunikuje přímo z aplikací, které mohou být vyvinuty v různých programovacích jazycích (víme již, jak vytvořit klienta pro Python, Go a jazyk Clojure). Jedinou podmínkou je, aby pro daný jazyk (resp. přesněji řečeno pro ekosystém postavený okolo tohoto jazyka) existovala knihovna poskytující rozhraní ke komunikačnímu protokolu využívaného Apache Kafkou.
Obrázek 1: Logo nástroje Apache Kafka, kterému se budeme dnes věnovat.
Existuje ovšem i poněkud odlišný přístup k práci s Kafkou, který oceníme při učení, testování, hledání problémů při posílání či příjmu zpráv, popř. při administrativních úkonech, které s Kafkou souvisí. Tento přístup je založen na využití příkazového řádku a nástrojů, které jsou z něj spouštěny a dokážou s Kafkou pracovat (a to pochopitelně i vzdáleně). Nejedná se vlastně o žádnou žhavou novinku, protože i pro databáze (relační i postrelační) existují nástroje ovládané z příkazové řádky, které tvoří alternativu k databázovým ovladačům využívaných v aplikacích (psql, sqlite3, redis-cli atd.). V případě Apache Kafky se s úspěchem používá nástroj nazvaný Kafkacat, který může být s mnoha výhodami doplněn o poněkud méně známý nástroj pojmenovaný přímočaře Kcli. Se základními možnostmi poskytovanými oběma zmíněnými nástroji se seznámíme v navazujících kapitolách.
Obrázek 2: Příklad rozdělení témat (topic) v clusteru.
Obrázek 3: Příklad použití ekosystému Kafky (Kafka Streams, konektory pro databáze atd.).
2. Instalace Kafky
Popišme si nyní ve stručnosti způsob lokální instalace Apache Kafky. V případě, že je na počítači nainstalováno JRE (tedy běhové prostředí Javy), je instalace Kafky pro testovací a vývojové účely triviální. Tarball s instalací Kafky lze získat z adresy https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka2.12–2.8.0.tgz (verze platná v době vydání dnešního článku). Stažení a rozbalení tarballu se provede následovně:
$ wget http://apache.miloslavbrada.cz/kafka/2.8.0/kafka_2.12-2.8.0.tgz $ tar -xzf kafka_2.12-2.8.0.tgz $ cd kafka_2.12-2.8.0
Po rozbalení stáhnutého tarballu získáme adresář, v němž se nachází všechny potřebné Java archivy (JAR), konfigurační soubory (v podadresáři config) a několik pomocných skriptů (v podadresáři bin). Pro spuštění Zookeepera a brokerů je zapotřebí mít nainstalovánu JRE (neboli již zmíněný Java Runtime Environment) a samozřejmě též nějaký shell (BASH, cmd, …).
Obrázek 4: Sledování činnosti brokeru přes standardní nástroj JConsole.
Prozatím nebudeme žádné další nastavení ani žádné další nástroje potřebovat, pochopitelně s výjimkou dále popsaných nástrojů Kafkacat a Kcli.
Obrázek 5: Sledování činnosti brokeru přes standardní nástroj JConsole.
3. Spuštění služby ZooKeeper a jedné instance brokera
Po (doufejme že úspěšné a bezproblémové) instalaci Kafky již můžeme spustit ZooKeeper a jednu instanci brokera (a to přesně v tomto pořadí! – tedy nejdříve ZooKeeper a posléze broker). Konfigurace ZooKeepera je uložena v souboru config/zookeeper.properties a zajímat nás budou především tyto volby – adresář, kam ZooKeeper ukládá svoje data, port, který použijí brokeři a omezení počtu připojení jednoho klienta v daný okamžik:
dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0
Nyní již můžeme Zookeepera spustit z příkazové řádky (ideálně ve vyhrazeném terminálu):
$ bin/zookeeper-server-start.sh config/zookeeper.properties [2020-01-20 17:00:07,823] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2020-01-20 17:00:07,825] WARN config/zookeeper.properties is relative. Prepend ./ to indicate that you're sure! (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2020-01-20 17:00:07,827] INFO clientPortAddress is 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ... ... ... [2020-01-20 17:00:07,947] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager) [2020-01-20 17:00:26,978] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)
Konfigurace jednoho brokera je uložená v souboru config/server.properties. Samotný konfigurační soubor obsahuje několik sekcí:
- Port, na kterém broker naslouchá, jeho ID, počet použitých vláken pro IO operace a počet vláken pro komunikaci.
- Velikost bufferů, maximální povolená velikost požadavků (což omezuje velikost zprávy) atd.
- Nastavení počtu partitions
- Nastavení retence dat
- Připojení k Zookeeperovi
broker.id=0 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000
Spuštění jednoho brokera z příkazové řádky vypadá jednoduše:
$ bin/kafka-server-start.sh config/server.properties
Alternativně je možné Zookeepera i Kafku (jednu instanci brokera) spustit v Dockeru:
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka
Obrázek 6: Schéma aplikace založené na architektuře kappa.
4. Uložení logů na ramdisk pro vývojový počítač
Jak broker Apache Kafka, tak i ZooKeeper zapisují poměrně velké množství dat na disk. Pokud Kafku spouštíte lokálně pouze pro účely vývoje (a tedy bez reálných dat, které je zapotřebí zachovat), může být užitečné všechny zápisy provádět na ramdisk – výsledkem bude jak rychlejší start obou procesů (cca 2× urychleno), tak i menší „opotřebování“ SSD. Konkrétně na mém vývojovém počítači mám vytvořen ramdisk o velikosti jednoho gigabajtu, který je připojen do adresáře /tmp/ramdisk. Konfigurace Apache Kafky i ZooKeepera je nutné nepatrně pozměnit, aby se pracovní data ukládala do tohoto adresáře (resp. přesněji řečeno do podadresářů). Konkrétní nastavení je ukázáno v dalším odstavci.
Konfigurace ZooKeepera je uložena v souboru config/zookeeper.properties:
... ... ... # the directory where the snapshot is stored. dataDir=/tmp/ramdisk/zookeeper # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 # Disable the adminserver by default to avoid port conflicts. # Set the port to something non-conflicting if choosing to enable this admin.enableServer=false # admin.serverPort=8080 # Enable snapshot.trust.empty config if the ZK upgrade from 3.4.X to 3.5.6 is failing # with "java.io.IOException: No snapshot found, but there are log entries" error. # Check upgrade docs for more details. # snapshot.trust.empty=true ... ... ...
Konfigurace brokera je uložena v souboru :
... ... ... ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/tmp/ramdisk/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ... ... ...
5. Konfigurace většího množství brokerů
Spuštění většího množství brokerů je možné a v praxi i velmi časté. Postačuje pouze zkopírovat soubor config/server.properties do několika podobných souborů nazvaných například config/server0.properties, config/server1.properties atd. Dále je nutné provést následující úpravy v každém z těchto konfiguračních souborů:
- broker.id musí být unikátní hodnota (každý broker je nutné jednoznačně identifikovat), takže postačuje ID postupně zvyšovat: 0, 1, 2, atd.
- listeners musí obsahovat unikátní číslo portu, takže například 9092, 9192, 9292 atd. (ale můžete použít jakýkoli volný port větší než 1024 – porty s menším číslem jsou vyhrazeny pro roota).
- log.dirs by taktéž mělo ukazovat na unikátní adresář nesdílený s ostatními instancemi brokera. Pokud půjde o sdílený adresář, mohou sice brokeři zdánlivě pracovat, ovšem nikoli bezproblémově.
Po splnění těchto podmínek je možné brokery běžným způsobem spustit – každý broker jako samostatný proces:
$ nohup bin/kafka-server-start.sh config/server1.properties & $ nohup bin/kafka-server-start.sh config/server2.properties & $ nohup bin/kafka-server-start.sh config/server3.properties & ... ... ...
Obrázek 7: Sledování činnosti brokeru přes standardní nástroj JConsole.
6. Nástroj kafkacat
Součástí stále se rozšiřujícího a vylepšujícího se ekosystému vytvořeného okolo Apache Kafky je i užitečný nástroj nazvaný kafkacat (autoři ho taktéž označují jako „netcat for Kafka“, v kontextu REST API by se hodilo i „curl for Kafka“). Tento nástroj, který naleznete na adrese https://github.com/edenhill/kafkacat slouží pro komunikaci s lokálními i vzdáleně běžícími brokery přímo z příkazové řádky. Pochopitelně se s velkou pravděpodobností nebude jednat o řešení používané v produkčním kódu, ovšem možnost vytvořit producenta zpráv či jejich konzumenta přímo z příkazového řádku je použitelná jak při vývoji, tak i při řešení problémů, které mohou při běhu aplikace nastat. Tento nástroj budeme používat později, při ukázkách nasazení Apache Kafky, takže se v této kapitole krátce zmiňme o příkladech použití převzatých z oficiální dokumentace. Všechny ukázky předpokládají, že broker běží na lokálním počítači (localhost) na portu 9092.
Výpis informací o všech dostupných tématech (topic) a jejich konfigurace:
$ kafkacat -L -b localhost:9092
Spuštění nového producenta zpráv čtených ze souborů specifikovaných na příkazové řádce:
$ kafkacat -P -b localhost:9092 -t filedrop -p 0 file1.bin file2.txt /etc/motd dalsi_soubor.tgz
Producent zpráv zapisovaných na standardní vstup uživatelem (co zpráva, to jeden řádek):
$ kafkacat -P -b localhost:9092 -t "upload"
Dtto, ale u každé zprávy lze specifikovat i klíč oddělený od těla zprávy dvojtečkou:
$ kafkacat -P -b localhost:9092 -t "upload" -K:
Konzument zpráv posílaných do tématu „upload“:
$ kafkacat -C -b localhost:9092 -t "upload"
Přečtení posledních 1000 zpráv z tématu s názvem „téma1“. Po této operaci se konzument automaticky ukončí, tj. nebude čekat na další zprávy:
$ kafkacat -C -b localhost:9092 -t téma1 -p 0 -o -1000 -e
Spuštění konzumentů, kteří jsou přihlášení k tématu „téma1“:
$ kafkacat -b localhost:9092 -G skupina_konzumentů téma1
Přihlásit se lze i k odběru většího množství témat:
$ kafkacat -b localhost:9092 -G skupina_konzumentů téma1 téma2
7. Instalace nástroje kafkacat
Nástroj kafkacat se skládá z několika komponent, které jsou většinou naprogramovány v jazyku C popř. v C++. Překlad a slinkování lze provést takovým způsobem, že výsledkem bude jediný spustitelný soubor nazvaný též „kafkacat“, který bude obsahovat i všechny potřebné (tedy staticky slinkované) knihovny, což zjednodušuje nasazení této utility. Na druhou stranu však nebude možné použít běžné prostředky operačního systému při updatu knihoven, například při opravách CVE atd.
Napřed se provede naklonování repositáře se zdrojovými kódy Kafkacatu:
$ git clone git@github.com:edenhill/kafkacat.git
po přesunu do naklonovaného repositáře:
$ cd kafkacat
se překlad provede běžnou trojkombinací příkazů configure+make+make install:
$ ./configure $ make $ sudo make install
Alternativně je ovšem možné použít připravený skript bootstrap.sh, který zajistí stažení všech potřebných knihoven (crypto atd.) s jejich překladem:
$ ./bootstrap.sh
Výsledkem je potom skutečně „tlustý“ binární soubor o velikosti přibližně dvaceti megabajtů:
$ ls -l ~/bin/kafkacat -rwxrwxr-x. 1 ptisnovs ptisnovs 20987784 17. led 14.34 /home/ptisnovs/bin/kafkacat
Některé linuxové distribuce obsahují přímo ve svých repositářích balíček kafkacat, což samozřejmě celý proces instalace (a případných updatů) značně zjednodušuje. Například na systémech založených na Debianu postačuje použít:
$ apt-get install kafkacat
$ kafkacat Error: -b <broker,..> missing Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]] kafkacat - Apache Kafka producer and consumer tool https://github.com/edenhill/kafkacat Copyright (c) 2014-2019, Magnus Edenhill Version 1.5.0-4-ge461fb (JSON, Avro, librdkafka 1.1.0 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer) ... ... ...
K dispozici je pochopitelně i vestavěná nápověda:
Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]] kafkacat - Apache Kafka producer and consumer tool https://github.com/edenhill/kafkacat Copyright (c) 2014-2019, Magnus Edenhill Version 1.5.0-4-ge461fb (JSON, Avro, librdkafka 1.1.0 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer) General options: -C | -P | -L | -Q Mode: Consume, Produce, Metadata List, Query mode -G <group-id> Mode: High-level KafkaConsumer (Kafka >=0.9 balanced consumer groups) Expects a list of topics to subscribe to -t <topic> Topic to consume from, produce to, or list -p <partition> Partition -b <brokers,..> Bootstrap broker(s) (host[:port]) -D <delim> Message delimiter character: a-z.. | \r | \n | \t | \xNN Default: \n -E Do not exit on non fatal error -K <delim> Key delimiter (same format as -D) -c <cnt> Limit message count -F <config-file> Read configuration properties from file, ... ... ...
8. Nástroj kcli
Ve druhé části článku se zaměříme na popis nástroje nazvaného Kcli. Tento nástroj slouží k interaktivnímu prohlížení zpráv uložených v jednotlivých tématech (topic) Kafky. Kcli se tedy snaží o to, aby data byla přístupná nikoli pouze ve formě proudu (streamu) údajů (to již zajišťuje Kafkacat), ale v ucelené formě, podobně, jako je tomu v databázích. Navíc je podporováno i vyhledávání zpráv podle zadaných vzorů. Sice se v žádném případě nejedná o ten nejrychlejší možný způsob práce s Kafkou, ovšem na druhé straně je tento přístup v mnoha případech velmi užitečný – a do jisté míry zahlazuje dualitu pohledu na to, co vlastně data reprezentují: buď se jedná o uložení konkrétního stavu modelovaného světa, nebo naopak o sérii změn stavu tohoto světa – oba přístupy jsou přitom relevantní a mají své výhody, ale i zápory. Toto téma je rozpracováno například zde v souvislosti s technologiemi Kafka Streams a KTable.
Obrázek 8: Nápověda ke klávesovým zkratkám nástroje kcli.
9. Instalace nástroje kcli
Nástroj Kcli je kompletně naprogramován v jazyku Go, takže jeho instalace je velmi jednoduchá – v Go se totiž snadno tvoří staticky slinkované spustitelné binární soubory. Ty jsou pro všechny kombinace nejpoužívanější operační systém + nejpoužívanější architektura dostupné na adrese https://github.com/cswank/kcli/releases, odkud lze stáhnout tarball, rozbalit ho a ihned začít používat.
Alternativně je možné získat a přeložit poslední vývojářskou verzi nástroje Kcli. Pro tuto operaci je nutné mít nainstalován jazyk Go, a to minimálně ve verzi 1.11 (vyzkoušeno s Go 1.13, starší verze již nemám k dispozici). Stažení zdrojových kódů, jejich překlad a instalace výsledného spustitelného souboru se provede příkazem:
$ go get -u github.com/cswank/kcli
10. Připojení kcli k brokeru
V případě, že broker běží na lokálním počítači na standardním portu (9092), je možné přímo kcli spustit, a to bez nutnosti specifikace dalších parametrů. Pokud je nutné se připojit ke Kafce běžící na odlišném portu (či na jiném počítači), použije se přepínač -a resp. –addresses. Už z pojmenování tohoto přepínače je zřejmé, že se lze připojit k většímu počtu brokerů:
$ kcli --help usage: kcli [<flags>] Flags: --help Show context-sensitive help (also try --help-long and --help-man). -a, --addresses=localhost:9092 ... comma separated list of kafka addresses -l, --log=LOG for debugging, set the log output to a file -t, --topic=TOPIC go directly to a topic -p, --partition=-1 go directly to a partition of a topic -o, --offset=-1 go directly to a message -d, --decoder=DECODER path to a plugin to decode kafka messages
11. Příprava dat – zpráv poslaných do témat
Abychom si mohli kcli odzkoušet v praxi, vytvoříme tři nová témata (topicy) a pošleme do nich nějaké zprávy. Téma se automaticky vytvoří s první poslanou zprávou, čímž se nám situace zjednodušuje. Vytvoříme tedy téma topic1 a na příkazové řádce zadáme trojici zpráv (jejich těl). Zadávání se ukončuje standardní klávesovou zkratkou Ctrl+D:
$ kafkacat -P -b localhost:9092 -t topic1 foo bar baz
Totéž provedeme pro téma názvem topic2:
$ kafkacat -P -b localhost:9092 -t topic2 first second third
A do třetice pošleme tři zprávy do tématu s názvem topic3:
$ kafkacat -P -b localhost:9092 -t topic3 jedna dva tri
Pro zajímavost se nyní podívejme, jak vlastně vypadá struktura souborů a podadresářů vytvořených brokerem. V rámci úvodních kapitol jsme nastavili cestu k logům na adresář /tmp/kafka-logs, který nyní můžeme prozkoumat:
$ tree /tmp/kafka-logs kafka-logs ├── cleaner-offset-checkpoint ├── log-start-offset-checkpoint ├── meta.properties ├── recovery-point-offset-checkpoint ├── replication-offset-checkpoint ├── topic1-0 │ ├── 00000000000000000000.index │ ├── 00000000000000000000.log │ ├── 00000000000000000000.timeindex │ ├── 00000000000000000003.snapshot │ └── leader-epoch-checkpoint ├── topic2-0 │ ├── 00000000000000000000.index │ ├── 00000000000000000000.log │ ├── 00000000000000000000.timeindex │ ├── 00000000000000000003.snapshot │ └── leader-epoch-checkpoint └── topic3-0 ├── 00000000000000000000.index ├── 00000000000000000000.log ├── 00000000000000000000.timeindex ├── 00000000000000000003.snapshot └── leader-epoch-checkpoint 3 directories, 20 files
Pracovní adresář je vytvořen i pro samotného ZooKeepera (což je ale téma mimo rámec dnešního článku):
$ tree zookeeper/ zookeeper/ └── version-2 ├── log.1 ├── log.1e ├── snapshot.0 └── snapshot.1d 1 directory, 4 files
12. Prohlížení témat nástrojem kcli
Nyní již můžeme spustit nástroj kcli:
$ kcli -a localhost:9092
Po spuštění by se měla zobrazit obrazovka s nabídkou všech nalezených témat, která daný broker obsluhuje:
Obrázek 9: Seznam témat obsluhovaných brokerem.
Nyní vybereme první téma nazvané topic1. Zobrazí se nový seznam, tentokrát s oddíly. Dané téma není rozděleno na více oddílů, takže se zobrazí jediný řádek:
Obrázek 10: Výběr oddílů s informacemi o offsetu (čtení) a offsetu poslední (nejmladší) zprávy.
Pokud vybereme první (a jediný) oddíl, zobrazí se již seznam zpráv se začátkem jejich obsahu:
Obrázek 11: Zobrazení offsetu a zkráceného obsahu zpráv uložených ve vybraném tématu a oddílu.
13. Zápis a prohlížení strukturovaných dat
Nástrojem Kafkacat je možné do vybraného tématu poslat i obsahy souborů. Následující skript umožňuje přečíst všechny soubory s koncovkou „json“ umístěné v aktuálním adresáři a poslat jejich obsah do tématu pojmenovaného results:
# Update the port accordingly (this one is for Kafka running inside Docker) KAFKA_PORT=9092 # Produce messages from current directory # All JSON files in current directory will be sent to Kafka via Kafkacat for file in *.json do echo $file kafkacat -b localhost:${KAFKA_PORT} -P -t results $file # It is possible to change the sleep value (or remove it completely) sleep 1 done
V nástroji kcli po výběru tohoto tématu získáme seznam zpráv (nyní skutečně se zkráceným obsahem):
Obrázek 12: Zobrazení offsetu a zkráceného obsahu zpráv uložených ve vybraném tématu a oddílu – pozor ovšem na chybu v kcli, která způsobuje špatný přechod na novou zprávu u víceřádkového zobrazení.
Jednotlivé zprávy lze vybrat kurzorovými klávesami a po stisku Enter se zobrazí celá zpráva:
Obrázek 13: Tělo zprávy.
14. Vytvoření tématu s více oddíly
Systém Apache Kafka umožňuje ukládání zpráv (zde se ovšem, jak již víme, používá termín záznam – record) do různých témat, přičemž každé téma je rozděleno do oddílů neboli partition (samozřejmě je možné pro téma vyhradit pouze jediný oddíl a tvářit se, že máme k dispozici „vylepšenou“ frontu – ostatně přesně takto lze s Kafkou začít). Rozdělení do oddílů se provádí z několika důvodů. Jedním z nich je rozdělení zátěže, protože jednotlivé oddíly mohou být provozovány na různých počítačích v mnohdy i velmi rozsáhlém clusteru.
Dále se dělení provádí z toho důvodu, že každý oddíl obsahuje neměnnou (immutable) sekvenci zpráv. Oddíly pro jednotlivá témata lze zpracovávat v několika brokerech umístěných do clusteru a tak zajistit potřebný load balancing, případnou replikaci zpráv atd. Každá zpráva uložená do oddílu má přiřazen jednoznačný offset (reprezentovaný v Javě typem long). Navíc je možné, aby se pro každé téma udržovalo několik logů (partition logs), což umožňuje připojení většího množství konzumentů zpráv k jednomu tématu s tím, že tito konzumenti budou pracovat paralelně a nezávisle na sobě.
Obrázek 14: Příklad rozdělení témat v clusteru.
U většiny nasazení Kafky se taktéž počítá s využitím většího množství instancí brokerů, z nichž je vytvořen cluster. Zde se setkáme s důležitým termínem replikace – každý oddíl je typicky replikován na několika message brokerech v clusteru (ovšem nemusí se jednat o všechny brokery, replikace se provádí například na tři brokery ve větším clusteru).
To však není vše, jelikož je ve skutečnosti konfigurace poněkud složitější – každý oddíl totiž může být replikován na více počítačích, přičemž jeden z těchto oddílů je takzvaným „leaderem“ a ostatní jsou „followeři“. Zápis nových zpráv popř. čtení se provádí vždy jen v rámci leaderu, ovšem změny jsou replikovány na všechny kopie oddílu. Ve chvíli, kdy z nějakého (libovolného) důvodu dojde k pádu „leadera“, převezme jeho roli jeden z dalších uzlů. Pokud tedy existuje N uzlů s replikou oddílu, bude systém funkční i ve chvíli, kdy zhavaruje N-1 uzlů!
Téma zpracovávané Kafkou může na clusteru vypadat například následovně:
+---+---+---+---+---+---+ oddíl #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ oddíl #1 | 0 | 1 | 2 | ... +---+---+---+ oddíl #2 | ... +---+---+---+---+---+---+---+---+---+ oddíl #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+
Vytvořme si nyní nové téma (tentokrát explicitně) s tím, že bude mít tři oddíly. Pro tento účel použijeme skript kafka-topics.sh, který je dodáván společně s Apache Kafkou (je v podadresáři bin):
$ ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic partitioned Created topic partitioned.
15. Zápis dat se specifikací oddílů
Další pomocný skript zapíše obsahy souborů s koncovkou „.json“, které jsou nalezeny v aktuálním adresáři do stejného tématu, ovšem do všech třech vytvořených oddílů:
# Update the port accordingly (this one is for Kafka running inside Docker) KAFKA_PORT=9092 # Produce messages from current directory # All JSON files in current directory will be sent to Kafka via Kafkacat for file in *.json do echo $file kafkacat -b localhost:${KAFKA_PORT} -P -t partitioned -p 0 $file kafkacat -b localhost:${KAFKA_PORT} -P -t partitioned -p 1 $file kafkacat -b localhost:${KAFKA_PORT} -P -t partitioned -p 2 $file # It is possible to change the sleep value (or remove it completely) sleep 1 done
16. Výběr oddílu v nástroji kcli
Podívejme se nyní, jak se téma rozdělené do většího množství oddílů zobrazí v nástroji kcli:
Obrázek 15: Obrazovka s výběrem tématu zůstala zachována.
Obrázek 16: Téma partitioned je rozděleno na tři oddíly.
Obrázek 17: Zobrazení obsahu tématu partitioned, konkrétně oddílu číslo 1.
Obrázek 18: Opět lze (pochopitelně) zobrazit i celé tělo zprávy (navíc se zvýrazněnou syntaxí).
17. Vyhledávání zpráv, přesun na zvolený offset a další možnosti nástroje kcli
Klávesovou zkratkou / je možné spustit vyhledávání textu ve zprávách, což je velmi užitečná operace. Pokud je řetězec v nějaké zprávě nalezen, je upraven i offset v kcli (tedy dojde k přesunu na tuto zprávu). Přímý přeskok na zvolený offset zajišťuje klávesová zkratka Ctrl+j. A poslední užitečná věc, o které se zmíníme, souvisí s možností tisku obsahu zprávy či zpráv při opouštění kcli s využitím klávesové zkratky Ctrl+p. Obsah aktuálně zobrazených zpráv se pošle na standardní výstup a je ho možné zpracovat dalšími nástroji, například v koloně.
18. Repositář s použitými datovými soubory i skripty
Datové soubory i podpůrné skripty použité v dnešním článku byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/presentations/. V případě, že nebudete chtít klonovat celý repositář, můžete namísto toho použít odkazy na jednotlivé soubory, které naleznete v následující tabulce:
# | Příklad/soubor | Stručný popis | Cesta | |
---|---|---|---|---|
1 | produce.sh | produkce (poslání) obsahu datových souborů do tématu s jedním oddílem | https://github.com/tisnik/presentations/blob/master/kcli/produce.sh | |
2 | produce2.sh | produkce (poslání) obsahu datových souborů do tématu s větším množstvím oddílů | https://github.com/tisnik/presentations/blob/master/kcli/produce2.sh | |
3 | 05_rules_hits.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/05_rules_hits.json | |
4 | 05_rules_hits_org1.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/05_rules_hits_org1.json | |
5 | 10_rules_hits.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/10_rules_hits.json | |
6 | 10_rules_hits_org1.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/10_rules_hits_org1.json | |
7 | 15_rules_hits.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/15_rules_hits.json | |
8 | 15_rules_hits_org1.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/15_rules_hits_org1.json | |
9 | big_results.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/big_results.json | |
10 | big_results_no_skips.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/big_results_no_skips.json | |
11 | big_results_no_skips_tutorial.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/big_results_no_skips_tutorial.json | |
12 | big_results_org1.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/big_results_org1.json | |
13 | big_results_tutorial.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/big_results_tutorial.json | |
14 | no_hits.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/no_hits.json | |
15 | no_hits_no_skips.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/no_hits_no_skips.json | |
16 | result01.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/result01.json | |
17 | result02.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/result02.json | |
18 | result03.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/result03.json | |
19 | result04.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/result04.json | |
20 | result05.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/result05.json | |
21 | tutorial_only.json | datový soubor ve formátu JSON, který je poslán do vybraného tématu | https://github.com/tisnik/presentations/blob/master/kcli/tutorial_only.json |
19. Odkazy na relevantní články na Rootu
- Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách
https://www.root.cz/clanky/pouziti-nastroje-apache-kafka-v-aplikacich-zalozenych-na-mikrosluzbach/ - Apache Kafka: distribuovaná streamovací platforma
https://www.root.cz/clanky/apache-kafka-distribuovana-streamovaci-platforma/ - Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw/ - Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw (2. část)
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw-2-cast/ - Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony)
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-projektu-apache-kafka-jazyku-clojure-a-knihovne-jackdaw-streamy-a-kolony/ - Vývoj služeb postavených na systému Apache Kafka v jazyku Go
https://www.root.cz/clanky/vyvoj-sluzeb-postavenych-na-systemu-apache-kafka-v-jazyku-go/
20. Odkazy na Internetu
- Kcli: is a kafka read only command line browser.
https://github.com/cswank/kcli - Kcli: a kafka command line browser
https://go.libhunt.com/kcli-alternatives - Awesome Go
https://github.com/avelino/awesome-go - Real-Time Payments with Clojure and Apache Kafka (podcast)
https://www.evidentsystems.com/news/confluent-podcast-about-apache-kafka/ - Microservices: The Rise Of Kafka
https://movio.co/blog/microservices-rise-kafka/ - Building a Microservices Ecosystem with Kafka Streams and KSQL
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/ - An introduction to Apache Kafka and microservices communication
https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63 - kappa-architecture.com
http://milinda.pathirage.org/kappa-architecture.com/ - Questioning the Lambda Architecture
https://www.oreilly.com/ideas/questioning-the-lambda-architecture - Lambda architecture
https://en.wikipedia.org/wiki/Lambda_architecture - Kafka – ecosystem (Wiki)
https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem - The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
http://cloudurable.com/blog/kafka-ecosystem/index.html - A Kafka Operator for Kubernetes
https://github.com/krallistic/kafka-operator - Kafka Streams
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams - Kafka Streams
http://kafka.apache.org/documentation/streams/ - Kafka Streams (FAQ)
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Streams - Event stream processing
https://en.wikipedia.org/wiki/Event_stream_processing - Part 1: Apache Kafka for beginners – What is Apache Kafka?
https://www.cloudkarafka.com/blog/2016–11–30-part1-kafka-for-beginners-what-is-apache-kafka.html - What are some alternatives to Apache Kafka?
https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka - What is the best alternative to Kafka?
https://www.slant.co/options/961/alternatives/~kafka-alternatives - A super quick comparison between Kafka and Message Queues
https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0 - Kafka Queuing: Kafka as a Messaging System
https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system - Apache Kafka Logs: A Comprehensive Guide
https://hevodata.com/learn/apache-kafka-logs-a-comprehensive-guide/ - Microservices – Not a free lunch!
http://highscalability.com/blog/2014/4/8/microservices-not-a-free-lunch.html - Microservices, Monoliths, and NoOps
http://blog.arungupta.me/microservices-monoliths-noops/ - Microservice Design Patterns
http://blog.arungupta.me/microservice-design-patterns/ - REST vs Messaging for Microservices – Which One is Best?
https://solace.com/blog/experience-awesomeness-event-driven-microservices/ - Kappa Architecture Our Experience
https://events.static.linuxfound.org/sites/events/files/slides/ASPgems%20-%20Kappa%20Architecture.pdf - Apache Kafka Streams and Tables, the stream-table duality
https://towardsdatascience.com/apache-kafka-streams-and-tables-the-stream-table-duality-ee904251a7e?gi=f22a29cd1854