Obsah
1. Nástroj MirrorMaker pro Apache Kafku
2. Replikace oddílů v rámci jednoho Kafka clusteru
3. Několik samostatně běžících Kafka clusterů
5. MirrorMaker 1 vs MirrorMaker 2
7. Konfigurace dvou lokálních Kafka clusterů
8. Obsah konfiguračních souborů
12. Producent a konzument pro téma na prvním clusteru
13. Konzument naslouchající na druhém clusteru
14. Témata vytvořená a spravovaná MirrorMakerem
16. Omezení témat, která se mají replikovat
17. Když regulární výrazy nestačí
19. Zrcadlení bez přejmenování témat
1. Nástroj MirrorMaker pro Apache Kafku
Systém Apache Kafka je v současnosti velmi rozšířen a používá se v mnoha oblastech IT. Někdy se setkáme s tím, že je Apache Kafka nasazen a využíván jako pouhý „vylepšený“ message broker, tj. jako centrální část celé architektury sloužící pro komunikaci mezi jednotlivými (mikro)službami a nástroji. V takovém případě se typicky používají komunikační strategie pub-sub nebo push-pull, s nimiž jsme se již na stránkách Roota seznámili v seriálu o message brokerech (například v článku o Apache ActiveMQ).
Ovšem možnosti Apache Kafky jsou ve skutečnosti poněkud větší, a to díky poměrně unikátnímu způsobu práce s takzvanými tématy (topic) a oddíly (partition) i díky tomu, že si offsety čtených zpráv řídí konzumenti, resp. jejich skupiny (consumer groups). Navíc Apache Kafka dokáže zajistit svoji velkou dostupnost a odolnost vůči pádům jednotlivých komponent či síťové infrastruktury (resilience). Tomuto tématu jsme se již věnovali, ovšem zbývá nám popsat ještě jednu technologii, kterou lze použít pro replikaci zpráv z vybraných oddílů mezi oddělenými (a mnohdy i vzdálenými) datovými centry. Tato technologie se jmenuje MirrorMaker, což je název, který poměrně přesně popisuje, k jakým operacím při nasazení MirrorMakeru dochází.
Obrázek 1: Známé logo nástroje Apache Kafka, kterému se budeme věnovat v dnešním článku.
2. Replikace oddílů v rámci jednoho Kafka clusteru
Připomeňme si, že téma (topic), do kterého se posílají zprávy, může být v systému Apache Kafky rozděleno do několika oddílů. V takovém případě producent či producenti nezapisují zprávy do jednoho oddílu (samozřejmě na konec), ale zápis je proveden pouze do jediného z vybraných oddílů. O tom, do kterého oddílu bude zápis (resp. připojení) zprávy proveden, se rozhoduje na základě klíče připojeného ke zprávě. Samotná zpráva je totiž chápána jako dvě sekvence bajtů – první sekvence tvoří klíč zprávy a druhá sekvence tělo zprávy. A právě na základě předaného klíče se vypočítá hash a algoritmus implementovaný v samotném producentovi (resp. v knihovně, kterou používá) rozhodne, do kterého oddílu bude zpráva uložena:
+---+---+---+---+---+---+ partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ partition #1 | 0 | 1 | 2 | ... +---+---+---+ partition #2 | ... +---+---+---+---+---+---+---+---+---+ partition #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+
Po přijetí nové zprávy tedy může být zápis proveden například do prvního oddílu na offset číslo 6:
write | +---+---+---+---+---+---+ v partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ partition #1 | 0 | 1 | 2 | ... +---+---+---+ partition #2 | ... +---+---+---+---+---+---+---+---+---+ partition #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+
Nebo se může broker rozhodnout pro připojení zprávy do posledního oddílu atd. atd.:
+---+---+---+---+---+---+ partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ partition #1 | 0 | 1 | 2 | ... +---+---+---+ partition #2 | ... +---+---+---+---+---+---+---+---+---+ partition #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+ ^ | write
Další možná a podporovaná konfigurace tématu může vypadat tak, že pro dané téma je vytvořen pouze jediný oddíl, ovšem tento oddíl je replikován mezi několika brokery. Příkladem může být oddíl replikovaný mezi trojicí brokerů běžících v rámci stejného Kafka clusteru. V takovém případě je jeden z těchto oddílů nazvaný leader a veškeré operace viděné zvnějšku Kafky (tedy posílání zpráv a jejich konzumace) probíhá právě s leaderem. Ostatní repliky jsou nazvané follower(s), protože pouze sledují leadera a synchronizují svůj obsah s leaderem. Ovšem zápis nové zprávy primárně proběhne v oddílu leadera:
write | +---+---+---+---+---+---+---+---+---+ v | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... (leader) +---+---+---+---+---+---+---+---+---+ ^ ^ | | read | sync | | v +---+---+---+---+---+---+---+---+---+ | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | (follower) +---+---+---+---+---+---+---+---+---+ ^ | | sync | | v +---+---+---+---+---+---+---+---+---+ | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | (follower) +---+---+---+---+---+---+---+---+---+
K čemu je to však dobré? V případě, že nějaký broker z celého Kafka clusteru zhavaruje a tento broker bude (pro dané téma) obsahovat oddíl typu follower, bude komunikace pokračovat dál a teprve po znovupřipojení brokera se follower postupně sesynchronizuje s leaderem. Zajímavější situace nastane ve chvíli, kdy zhavaruje samotný leader. V takovém případě Kafka „povýší“nějakého followera za nového leadera. V případě, že téma (resp. oddíl) je replikováno na N brokerů, může jich zhavarovat N-1 a systém bude stále funkční. Jinými slovy to znamená, že takto nakonfigurované téma dokáže „přežít“ pád některého z brokerů aniž by došlo k závažnějšímu narušení komunikace (pouze se producenti a konzumenti přepojí na jiného brokera, což zabere nějaký čas). Na druhou stranu se však nezvyšuje propustnost v případě připojení většího množství konzumentů ze stejné skupiny konzumentů.
Obě výše zmíněné možnosti je pochopitelně možné v případě potřeby zkombinovat a vytvořit tak konfiguraci tématu, které bude rozděleno na větší množství oddílů a tyto oddíly budou replikovány mezi větší množství brokerů:
+---+---+---+---+---+---+ oddíl #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ oddíl #1 | 0 | 1 | 2 | ... +---+---+---+ (leader) oddíl #2 | ... +---+---+---+---+---+---+---+---+---+ oddíl #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+ ^ | | sync | | v +---+---+---+---+---+---+ oddíl #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ oddíl #1 | 0 | 1 | 2 | ... +---+---+---+ oddíl #2 | ... (follower) +---+---+---+---+---+---+---+---+---+ oddíl #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+
Tím dosáhneme toho, že Kafka cluster „přežije“ pád N-1 brokerů a navíc se zajistí paralelní čtení zpráv větším množstvím konzumentů patřících do stejné skupiny konzumentů.
3. Několik samostatně běžících Kafka clusterů
Podpora pro replikaci oddílů je v současnosti nedílnou součástí Apache Kafky a nebylo by bez ní možné zajistit fungování celého Kafka clusteru i v tom případě (který dříve či později nastane), kdy některý prvek clusteru zhavaruje (nebo se „jen“ stane nedostupným). Jedná se o známou a velmi často používanou technologii, která však má své limity. Na tyto limity poměrně brzy narazíme ve chvíli, kdy je provozováno několik Kafka clusterů, přičemž každý z nich je typicky umístěn v samostatném datovém centru. Zatímco propojení počítačů v rámci jednoho datového centra bývá velmi rychlé (to je ostatně jeden z důvodů, i když nikoli jediný, proč vůbec tato centra vznikají), přenos dat mezi datovými centry bývá pomalejší a mívá zpoždění. Ovšem pokud by jednotlivé uzly Kafka clusteru byly umístěny v různých centrech, celá technologie replikace by zpomalovala zpracování zpráv a byla by zde i větší náchylnost na rozpad clusteru kvůli tomu, že spojení mezi datovými centry může být (i na relativně krátkou chvíli) přerušeno.
V takové situaci se může přistoupit k tomu, že se v každém datovém centru spustí samostatný Kafka cluster, jenž bude prakticky nezávislý na clusterech v jiných datových centrech. Některá témata (topic) mohou být skutečně lokální (například některé téma může být určeno pouze pro Evropu, další téma pro jiný Kafka cluster pro severní Ameriku atd.), ale pochopitelně většinou nastane i situace vyžadující, aby bylo některé téma replikováno i mezi jednotlivými Kafka clustery. A právě v těchto případech lze sáhnout po nástroji nazvaném MirrorMaker, jímž se budeme zabývat v navazujících kapitolách.
4. Nástroj MirrorMaker
Nástroj MirrorMaker slouží, jak již bylo ostatně naznačeno v předchozím textu, k propojení několika Kafka clusterů, přičemž se očekává, že každý takový cluster poběží v odděleném datovém centru, takže komunikace mezi clustery může být obecně pomalejší (s případnými výpadky), než komunikace mezi uzly nacházejícími se v jediném clusteru. Samotný MirrorMaker je z pohledu systému Apache Kafky konzumentem zpráv z vybraných témat a producentem zpráv do jiných témat, přičemž témata se mohou nacházet v různých clusterech. Nejedná se tedy (alespoň ne v případě MirrorMakeru 1) o žádnou „raketovou vědu“, ale vlastně jen o triviální konzumaci zpráv docházejících do tématu (témat) v jednom clusteru a o jejich přeposílání do dalšího clusteru (MirrorMaker 2 je již komplikovanější). Samozřejmě je možné v případě potřeby realizovat i propojení mezi několika clustery, nikoli pouze mezi dvojicí clusterů.
Graficky můžeme nejjednodušší způsob použití MirrorMakeru znázornit takto:
Obrázek 2: MirrorMaker pro replikaci dat z prvního Kafka clusteru do clusteru druhého.
Interně používá MirrorMaker (1) standardní komunikační prostředky Apache Kafky. Konkrétně to znamená, že data (zprávy) načítá jako běžný konzument a naopak data zapisuje jako běžný producent. Z pohledu Kafka clusterů i jednotlivých brokerů se tedy jedná o zcela běžného klienta, který nepotřebuje využívat speciální komunikační prostředky (a navíc je relativně snadné si napsat vlastní verzi MirrorMakeru, pokud to někoho láká):
Obrázek 3: MirrorMaker se interně skládá z konzumenta a producenta zpráv.
Ovšem tento nástroj lze využít i pro další účely, například pro přenos zpráv z vybraných témat z interního (privátního) Kafka clusteru do clusteru dostupného i dalším firmám (data isolation). A taktéž lze stejnou technologii použít pro agregaci dat získaných z libovolného množství Kafka clusterů s uložením do libovolného (dalšího) clusteru. To ale není vše, protože lze realizovat i komunikaci typu fan-in a fan-out (tedy spojení zpráv z více clusterů do jediného výsledného tématu či naopak přečtení zpráv z jednoho tématu s jejich rozesláním do většího množství clusterů).
5. MirrorMaker 1 vs MirrorMaker 2
Jak jsme si již řekli v poznámce uvedené v úvodní kapitole, nacházíme se nyní v situaci, kdy existují dvě vzájemně odlišné verze MirrorMakeru. Původní verze je stále dostupná (a spouští se skriptem bin/kafka-mirror-maker.sh resp. bin/windows/kafka-mirror-maker.bat) a od verze Kafky 2.4.0 (viz též https://archive.apache.org/dist/kafka/2.4.0/RELEASE_NOTES.html) je dostupná i druhá verze MirrorMakeru. Ta se spouští skriptem bin/connect-mirror.maker.sh (pro Windows varianta tohoto skriptu neexistuje :-). Názvy těchto skriptů mohou být zpočátku matoucí, což se pravděpodobně vyřeší ve chvíli, kdy bude původní MirrorMaker 1 z Apache Kafky odstraněn.
Druhá verze MirrorMakeru je založena na technologii Kafka Connect, s níž jsme se seznámili ve dvojici článků Kafka Connect: tvorba producentů a konzumentů bez zdrojového kódu a Kafka Connect: definice a kontrola schématu zpráv. Konkrétně MirrorMaker 2 realizuje hned několik konektorů, z nichž každý je určen pro realizaci odlišných funkcí:
- MirrorSourceConnector: replikace zpráv z lokálního do vzdáleného clusteru, synchronizace offsetů
- MirrorCheckpointConnector: synchronizace offsetů, failower v případě výpadů Kafka clusterů, realizace checkpointů
- MirrorHeartbeatConnector: tzv. heartbeats (zjištění, zda protistrana komunikuje), monitoring replikací, zjištění topologie pro replikace atd.
6. Praktická část
Druhá část dnešního článku bude zaměřená více prakticky. Nejdříve vytvoříme dvě instance Kafka clusterů, přičemž každá instance se bude skládat z jednoho běžícího Zookeepera a z jednoho brokera. Na této instanci si otestujeme způsoby vytváření témat, chování většího množství konzumentů při připojení k tématu, použití většího množství oddílů pro téma atd. Dále oba Kafka clustery propojíme s využitím MirrorMakeru (2) a opět budeme sledovat chování producentů a konzumentů připojených k oběma clusterům.
V praktické části budeme brokera Apache Kafky i Zookeepera spouštět lokálně (popř. z Dockeru či Podmana), takže je nejdříve nutné Apache Kafku nainstalovat. Není to vůbec nic složitého. V případě, že je na počítači nainstalováno JRE (běhové prostředí Javy), je instalace Apache Kafky pro testovací účely triviální. V článku si ukážeme instalaci verze 2.13–3.6.1, ovšem můžete si stáhnout i prakticky libovolnou novější či některé starší verze (3.5.x nebo 3.6.x, ovšem dále popsaný postup by měl být platný i pro ještě starší verze, v podstatě můžeme dojít až k verzi 2.4.0). Tarball s instalací Apache Kafky lze získat z adresy https://downloads.apache.org/kafka/3.6.1/kafka2.13–3.6.1.tgz.
Stažení a rozbalení tarballu zajistí tyto příkazy:
$ wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz $ tar xvfz kafka_2.13-3.6.1.tgz $ cd kafka_2.13-3.6.1/
Po rozbalení staženého tarballu získáme adresář, v němž se nachází všechny potřebné Java archivy (JAR), konfigurační soubory (v podadresáři config) a několik pomocných skriptů (v podadresáři bin a bin/windows). Pro spuštění Zookeepera a brokerů je zapotřebí, jak jsme si již řekli v předchozím odstavci, mít nainstalovánu JRE (Java Runtime Environment) a samozřejmě též nějaký shell (BASH, cmd, …).
. ├── bin │ └── windows ├── config │ └── kraft ├── libs ├── licenses └── site-docs 7 directories
Mezi důležité soubory, které budeme používat v rámci dalších kapitol, patří především skripty pro spouštění jednotlivých služeb, konfiguraci témat, produkci zpráv či pro jejich konzumaci. Tyto skripty jsou uloženy v podadresáři bin (a pro Windows ještě v dalším podadresáři windows). A pochopitelně nesmíme zapomenout na skript spouštějící samotný MirrorMaker 1 či 2:
Skript | Stručný popis |
---|---|
bin/kafka-server-start.sh | spuštění brokera |
bin/zookeeper-server-start.sh | spuštění Zookeepera |
bin/kafka-mirror-maker.sh | spuštění MirrorMakeru 1 |
bin/connect-mirror-maker.sh | spuštění MirrorMakeru 2 |
bin/kafka-configs.sh | konfigurace brokerů |
bin/kafka-topics.sh | konfigurace témat, zjištění informace o tématech atd. |
bin/kafka-consumer-groups.sh | konfigurace popř. zjištění informací o skupinách konzumentů |
bin/kafka-run-class.sh | spuštění konkrétní třídy z Apache Kafky (například pro zjištění informací o skupinách konzumentů) |
bin/kafka-console-producer.sh | jednoduchý producent zpráv |
bin/kafka-console-consumer.sh | jednoduchý konzument zpráv |
7. Konfigurace dvou lokálních Kafka clusterů
Pro účely dnešního článku nám postačí si spustit dvojici lokálně běžících Kafka clusterů, ovšem samozřejmě vám nic nebrání ve spuštění dvou vzdálených clusterů. Každý cluster se bude skládat ze dvou prvků – jednoho Zookeepera a jednoho brokera. První cluster bude mít Zookeepera spuštěného na portu 2181 a brokera na portu 9091 (jedničky na konci se budou dobře pamatovat) a druhý cluster bude mít Zookeepera spuštěného na portu 2182 a brokera na portu 9092 (opět – dvojky na konci se budou dobře pamatovat):
Obrázek 4: Dva lokálně běžící Kafka clustery.
Budeme tedy potřebovat celkem pět konfiguračních souborů:
Soubor | Stručný popis |
---|---|
server1.properties | konfigurace brokera z prvního Kafka clusteru |
server2.properties | konfigurace brokera ze druhého Kafka clusteru |
zookeeper1.properties | konfigurace Zookeepera z prvního Kafka clusteru |
zookeeper2.properties | konfigurace Zookeepera ze druhého Kafka clusteru |
mirror-maker.properties | konfigurace MirrorMakeru 2 |
Pro jednoduchost budou tyto konfigurační soubory uloženy v tom adresáři, kam byla nainstalována Apache Kafka.
8. Obsah konfiguračních souborů
Broker z prvního Kafka clusteru bude mít následující konfiguraci. Povšimněte si nastaveného ID, portu, ale i unikátního adresáře pro ukládání témat (resp. přesněji řečeno oddílů). Důležité hodnoty jsou zvýrazněny:
# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # The address the socket server listens on. If not configured, the host name will be equal to the value of listeners=PLAINTEXT://:9091 # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=2 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=4 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 # A comma separated list of directories under which to store log files log.dirs=/tmp/ramdisk/kafka-logs-1 # The default number of log partitions per topic. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. num.recovery.threads.per.data.dir=1 # Internal Topic Settings offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 # Zookeeper connection string (see zookeeper docs for details). zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=18000 # Group Coordinator Settings group.initial.rebalance.delay.ms=0
Broker pro druhý cluster může mít stejné ID (to je unikátní jen v rámci jednoho clusteru), ale musí se lišit jeho port, adresář pro ukládání oddílů i port Zookeepera (pokud tedy cluster budeme provozovat na stejném počítači):
# The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # The address the socket server listens on. If not configured, the host name will be equal to the value of listeners=PLAINTEXT://:9092 # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=2 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=4 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 # A comma separated list of directories under which to store log files log.dirs=/tmp/ramdisk/kafka-logs-2 # The default number of log partitions per topic. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. num.recovery.threads.per.data.dir=1 # Internal Topic Settings offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 # Zookeeper connection string (see zookeeper docs for details). zookeeper.connect=localhost:2182 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=18000 # Group Coordinator Settings group.initial.rebalance.delay.ms=0
Následuje konfigurace Zookeepera pro první Kafka cluster. Opět musíme použít unikátní adresář i unikátní číslo portu:
# the directory where the snapshot is stored. dataDir=/tmp/ramdisk/zookeeper-1 # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 # Disable the adminserver by default to avoid port conflicts. # Set the port to something non-conflicting if choosing to enable this admin.enableServer=false
A konečně konfigurace Zookeepera pro druhý Kafka cluster:
# the directory where the snapshot is stored. dataDir=/tmp/ramdisk/zookeeper-2 # the port at which the clients will connect clientPort=2182 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 # Disable the adminserver by default to avoid port conflicts. # Set the port to something non-conflicting if choosing to enable this admin.enableServer=false
9. Konfigurace MirrorMakeru 2
Nejvíc nás ovšem bude zajímat konfigurace samotného MirrorMakeru 2. Ta nejdříve obsahuje symbolická jména Kafka clusterů, ke kterým se bude MirrorMaker 2 připojovat:
Obrázek 5: Pojmenování clusterů (platné pouze pro konkrétní instanci MirrorMakera).
Můžeme ponechat například jména z původního souboru, tedy clustery A a B:
clusters = A, B
Dále je nutné specifikovat seznam brokerů pro každý Kafka cluster. To je v našem případě jednoduché, protože oba clustery (a tedy i jejich brokery) běží na jediném počítači a lišit se budou jen čísla portů:
A.bootstrap.servers = localhost:9091 B.bootstrap.servers = localhost:9092
A konečně – konfigurace témat, která budou zrcadlena z clusteru A do clusteru B či naopak. Zde se používají regulární výrazy. Pro začátek povolíme zrcadlení všech témat, a to oběma směry:
A->B.enabled = true A->B.topics = .* B->A.enabled = true B->A.topics = .*
Reálný konfigurační soubor obsahuje i další vlastnosti, například replikační faktor. Ten prozatím ponecháme na hodnotě 1, protože každý cluster má jen jediného brokera. V praxi by měla být tato hodnota pochopitelně větší:
# specify any number of cluster aliases clusters = A, B # connection information for each cluster # This is a comma separated host:port pairs for each cluster # for e.g. "A_host1:9092, A_host2:9092, A_host3:9092" A.bootstrap.servers = localhost:9091 B.bootstrap.servers = localhost:9092 # enable and configure individual replication flows A->B.enabled = true # regex which defines which topics gets replicated. For eg "foo-.*" A->B.topics = .* B->A.enabled = true B->A.topics = .* # Setting replication factor of newly created remote topics replication.factor=1 ############################# Internal Topic Settings ############################# # The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and # "mm2-offset-syncs.B.internal" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 # The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and # "mm2-status.B.internal" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 # customize as needed # replication.policy.separator = _ # sync.topic.acls.enabled = false # emit.heartbeats.interval.seconds = 5
10. Spuštění Kafka clusterů
Nyní si oba Kafka clustery spustíme. Připomeňme si, že nejdříve je vždy nutné spustit Zookeepera a teprve poté brokera či brokery. Pro sledování činnosti všech procesů si můžete každého Zookeepera i brokera spustit v samostatném terminálu, využít nástroj screen atd.
Spuštění Zookeepera pro první cluster:
$ cd ${kafka_dir} $ bin/zookeeper-server-start.sh zookeeper1.properties
Spuštění brokera pro první cluster:
$ cd ${kafka_dir} $ bin/kafka-server-start.sh server1.properties
Spuštění Zookeepera pro druhý cluster:
$ cd ${kafka_dir} $ bin/zookeeper-server-start.sh zookeeper2.properties
Spuštění brokera pro druhý cluster:
$ cd ${kafka_dir} $ bin/kafka-server-start.sh server2.properties
11. Spuštění MirrorMakeru
V posledním terminálu spustíme samotného MirrorMakera. Vzhledem k tomu, že používáme MirrorMaker 2, je pro spuštění použit skript bin/connect-mirror-maker.sh:
$ bin/connect-mirror-maker.sh mirror-maker.properties
Pokud se nepodaří připojení k jednomu Kafka clusteru či k oběma Kafka clusterům, budou se vypisovat tyto zprávy:
[2024-02-10 16:35:30,310] WARN [AdminClient clientId=B->A] Connection to node -1 (localhost/127.0.0.1:9091) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:814) [2024-02-10 16:35:31,322] INFO [AdminClient clientId=B->A] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:977)
Naopak při úspěšném připojení by se na terminálu měla objevit tato hlášení:
[2024-02-10 16:38:15,657] INFO [MirrorSourceConnector|worker] Started MirrorSourceConnector with 2 topic-partitions. (org.apache.kafka.connect.mirror.MirrorSourceConnector:174) [2024-02-10 16:38:15,657] INFO [MirrorSourceConnector|worker] Starting MirrorSourceConnector took 56 ms. (org.apache.kafka.connect.mirror.MirrorSourceConnector:175) [2024-02-10 16:38:15,661] INFO [MirrorSourceConnector|worker] No ACL authorizer is configured on the source Kafka cluster, so no topic ACL syncing will take place. Consider disabling topic ACL syncing by setting sync.topic.acls.enabled to 'false'. (org.apache.kafka.connect.mirror.MirrorSourceConnector:533) [2024-02-10 16:38:15,661] INFO [MirrorSourceConnector|worker] syncing topic ACLs took 4 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-02-10 16:38:15,667] INFO [MirrorSourceConnector|worker] syncing topic configs took 14 ms (org.apache.kafka.connect.mirror.Scheduler:95) [2024-02-10 16:38:15,671] INFO [MirrorSourceConnector|worker] syncing topic configs took 10 ms (org.apache.kafka.connect.mirror.Scheduler:95)
Poté se již budou periodicky (zhruba po jedné minutě) opakovat zprávy o synchronizaci a o testování stavu obou clusterů:
[2024-02-10 16:40:15,211] INFO [MirrorHeartbeatConnector|task-0|offsets] WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets for 60 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233) [2024-02-10 16:40:15,212] INFO [MirrorSourceConnector|task-0|offsets] WorkerSourceTask{id=MirrorSourceConnector-0} Committing offsets for 60 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
12. Producent a konzument pro téma na prvním clusteru
V této chvíli by tedy mělo běžet všech pět služeb: dvojice Zookeeperů, dvojice Kafka brokerů a samozřejmě i MirrorMaker 2. Zbývá nám otestování funkcionality obou propojených Kafka clusterů. Spustíme tedy producenta zpráv. Konkrétně se bude jednat o producenta, který bude posílat zprávy do tématu nazvaného test1 na prvním Kafka clusteru. Producenta lze pochopitelně vytvořit v prakticky jakémkoli programovacím jazyku, pro který existuje příslušná knihovna s rozhraním k Apache Kafce. Ovšem my dnes využijeme standardního producenta dodávaného přímo s Apache Kafkou, který se spouští následovně (povšimněte si, že se připojujeme k brokeru z prvního clusteru):
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9091 --topic test1
Nyní lze do konzole, na které byl producent spuštěn, zapisovat zprávy, které budou posílány do tématu test1.
V dalším terminálu si otevřeme konzumenta zpráv z tohoto tématu. Opět se bude jednat o standardní implementaci konzumenta dodávaného společně s Apache Kafkou:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test1
Všechny zprávy posílané producentem budou vypsány na terminálu konzumenta – což je obvyklá vlastnost Kafky, do které (zdánlivě) Mirror Maker nijak nezasahuje.
13. Konzument naslouchající na druhém clusteru
Nyní si konečně vyzkoušejme, jak do celého systému přeposílání zpráv zasahuje Mirror Maker. Spustíme dalšího konzumenta, ovšem nyní se bude jednat o konzumenta připojeného ke druhému Kafka clusteru. Konkrétně se tedy budeme muset připojit k brokeru běžícímu na portu 9092. A dochází ještě k jedné změně – původní téma, které se jmenovalo test1, bude na druhém clusteru pojmenováno A.test1; bude tedy obsahovat i symbolické jméno clusteru, odkud je zrcadleno (toto jméno tedy volí Mirror Maker na základě své konfigurace).
Obrázek 6: Připojení producenta a dvojice konzumentů ke zrcadlenému tématu.
Konzumenta tedy spustíme následovně:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic A.test1
Zprávy posílané do tématu test1 do prvního clusteru budou zrcadleny do tématu A.test1 ve druhém clusteru, což si sami můžete velmi snadno ověřit zápisem zpráv do terminálu producenta:
Producent:
>prvni >druha >treti
Konzument:
prvni druha treti
14. Témata vytvořená a spravovaná MirrorMakerem
Podívejme se nyní, jaká témata vlastně v jednotlivých Kafka clusterech vznikla. Nejdříve si vypíšeme témata v prvním clusteru:
$ bin/kafka-topics.sh --bootstrap-server localhost:9091 --list B.checkpoints.internal B.heartbeats __consumer_offsets heartbeats mm2-configs.B.internal mm2-offset-syncs.B.internal mm2-offsets.B.internal mm2-status.B.internal test1
Nalezneme zde tedy zejména naše téma test1, ovšem i témata začínající prefixem B. (tedy symbolickým jménem druhého clusteru) a dále interní témata Mirror Makeru začínající na mm2-.
Naproti tomu ve druhém clusteru budou odlišná témata:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list A.checkpoints.internal A.heartbeats A.test1 __consumer_offsets heartbeats mm2-configs.A.internal mm2-offset-syncs.A.internal mm2-offsets.A.internal mm2-status.A.internal
Namísto tématu test1 je zde téma A.test1 a „zrcadleně“ zde nalezneme témata, v nichž je uložen stav prvního clusteru z pohledu Mirror Makeru.
Některá výše vypsaná témata jsou rozdělena na oddíly, další pak mají pouze jediný oddíl (což jsme ostatně nastavili v konfiguračním souboru):
Topic: heartbeats TopicId: 7IyMK73ETfaQXJl7JfdIDg PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact Topic: heartbeats Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: B.heartbeats TopicId: YgRWcTVMQqO6XHO62yO2Yg PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact Topic: B.heartbeats Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-configs.B.internal TopicId: py8p8QjOTuKRtvalWRsnOg PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact Topic: mm2-configs.B.internal Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-status.B.internal TopicId: 8cw1WB9sRz-p16On6F6ePA PartitionCount: 5 ReplicationFactor: 1 Configs: cleanup.policy=compact Topic: mm2-status.B.internal Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-status.B.internal Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-status.B.internal Partition: 2 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-status.B.internal Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-status.B.internal Partition: 4 Leader: 0 Replicas: 0 Isr: 0 Topic: B.checkpoints.internal TopicId: od5v7gliTI2oM6cfPmUkhQ PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact Topic: B.checkpoints.internal Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal TopicId: zgHMHUQdRQaY7kiTknju6g PartitionCount: 25 ReplicationFactor: 1 Configs: cleanup.policy=compact Topic: mm2-offsets.B.internal Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 2 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 4 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 5 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 6 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 7 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 8 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 9 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 10 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 11 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 12 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 13 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 14 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 15 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 16 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 17 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 18 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 19 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 20 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 21 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 22 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 23 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offsets.B.internal Partition: 24 Leader: 0 Replicas: 0 Isr: 0 Topic: mm2-offset-syncs.B.internal TopicId: hXRJKh8VTNWhmwYXCsEL5g PartitionCount: 1 ReplicationFactor: 1 Configs: cleanup.policy=compact Topic: mm2-offset-syncs.B.internal Partition: 0 Leader: 0 Replicas: 0 Isr: 0
15. Obousměrné zrcadlení
Nic nám nebrání v tom, aby se téma test1 vytvořilo i ve druhém Kafka clusteru. Pokud je povolena automatická tvorba témat, je to snadné, protože pro tento účel stačí použít producenta:
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test1
Zprávy jsou zrcadleny na první Kafka cluster, ovšem do tématu s prefixem, tedy do tématu B.test1:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic B.test1
Nyní budou seznamy témat „symetrické“ v obou clusterech:
B.checkpoints.internal B.heartbeats B.test1 __consumer_offsets heartbeats mm2-configs.B.internal mm2-offset-syncs.B.internal mm2-offsets.B.internal mm2-status.B.internal test1
popř. pro druhý cluster:
A.checkpoints.internal A.heartbeats A.test1 __consumer_offsets heartbeats mm2-configs.A.internal mm2-offset-syncs.A.internal mm2-offsets.A.internal mm2-status.A.internal test1
16. Omezení témat, která se mají replikovat
Témata, která se mají replikovat, jsou specifikována regulárním výrazem. To tedy znamená, že témata je vhodné pojmenovávat tak, aby jména tvořila resp. popisovala určitou strukturu. Například se může používat jmenná konvence odvozená od problémové domény, například public.sales.ecommerce.shoppingcarts nebo private.risk.portfolio.pricingengine.assetpricing. V takovém případě je relativně snadné specifikovat regulární výrazy pro ta témata, která se mají zrcadlit:
# enable and configure individual replication flows A->B.enabled = true # regex which defines which topics gets replicated. For eg "foo-.*" A->B.topics = public\..*
17. Když regulární výrazy nestačí
Regulární výrazy pochopitelně nedokáží vystihnout všechny způsoby zrcadlení. Mohlo by se tedy zdát, že je v tomto ohledu MirrorMaker poměrně omezený. Ovšem díky tomu, že jsou clusterům přiřazena nějaká symbolická jména, je možné stejný cluster v konfiguraci použít vícekrát a pokaždé s jiným nastavením:
clusters = A, B, C, D A.bootstrap.servers = localhost:9091 B.bootstrap.servers = localhost:9091 C.bootstrap.servers = localhost:9091 D.bootstrap.servers = localhost:9092
Nyní se bude MirrorMaker chovat ke clusterům A, B i C, jakoby se jednalo o samostatné entity, i když se ve skutečnosti jedná o stejný cluster.
18. Jednosměrná replikace
Zajištění jednosměrné replikace (resp. zrcadlení) je triviální – každou cestu z cluster A do B či naopak lze zakázat:
clusters = A, B A.bootstrap.servers = localhost:9091 B.bootstrap.servers = localhost:9092 A->B.enabled = true A->B.topics = .* B->A.enabled = false B->A.topics = .*
19. Zrcadlení bez přejmenování témat
V případě, že preferujete skutečné zrcadlení témat bez jejich přejmenování, je nutné do konfiguračního souboru Mirror Makeru přidat tento řádek:
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
20. Odkazy na Internetu
- Kafka mirroring (MirrorMaker)
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330 - Mastering Kafka migration with MirrorMaker 2
https://developers.redhat.com/articles/2024/01/04/mastering-kafka-migration-mirrormaker-2 - Apache Kafka MirrorMaker 2 (MM2) Part 1: Theory
https://www.instaclustr.com/blog/kafka-mirrormaker-2-theory/#h-2-replication-in-kafka - Apache Kafka MirrorMaker 2 (MM2) Part 2: Practice
https://www.instaclustr.com/blog/apache-kafka-mirrormaker-2-practice/ - Demystifying Kafka MirrorMaker 2: Use cases and architecture
https://developers.redhat.com/articles/2023/11/13/demystifying-kafka-mirrormaker-2-use-cases-and-architecture# - How to use Kafka MirrorMaker 2.0 in data migration, replication and the use-cases
https://learn.microsoft.com/en-us/azure/hdinsight/kafka/kafka-mirrormaker-2–0-guide - Release Notes – Kafka – Version 2.4.0
https://archive.apache.org/dist/kafka/2.4.0/RELEASE_NOTES.html - Kafka Mirror Maker Best Practices
https://community.cloudera.com/t5/Community-Articles/Kafka-Mirror-Maker-Best-Practices/ta-p/249269 - Apache Kafka MirrorMaker 2 (MM2) Part 1: Theory
https://www.instaclustr.com/blog/kafka-mirrormaker-2-theory/ - Kcli: is a kafka read only command line browser.
https://github.com/cswank/kcli - Kcli: a kafka command line browser
https://go.libhunt.com/kcli-alternatives - Kafka Connect and Schemas
https://rmoff.net/2020/01/22/kafka-connect-and-schemas/ - JSON and schemas
https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#json-schemas - What, why, when to use Apache Kafka, with an example
https://www.startdataengineering.com/post/what-why-and-how-apache-kafka/ - When NOT to use Apache Kafka?
https://www.kai-waehner.de/blog/2022/01/04/when-not-to-use-apache-kafka/ - Microservices: The Rise Of Kafka
https://movio.co/blog/microservices-rise-kafka/ - Building a Microservices Ecosystem with Kafka Streams and KSQL
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/ - An introduction to Apache Kafka and microservices communication
https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63 - kappa-architecture.com
http://milinda.pathirage.org/kappa-architecture.com/ - Questioning the Lambda Architecture
https://www.oreilly.com/ideas/questioning-the-lambda-architecture - Lambda architecture
https://en.wikipedia.org/wiki/Lambda_architecture - Kafka – ecosystem (Wiki)
https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem - The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
http://cloudurable.com/blog/kafka-ecosystem/index.html - A Kafka Operator for Kubernetes
https://github.com/krallistic/kafka-operator - Kafka Streams
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams - Kafka Streams
http://kafka.apache.org/documentation/streams/ - Kafka Streams (FAQ)
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Streams - Event stream processing
https://en.wikipedia.org/wiki/Event_stream_processing - Part 1: Apache Kafka for beginners – What is Apache Kafka?
https://www.cloudkarafka.com/blog/2016–11–30-part1-kafka-for-beginners-what-is-apache-kafka.html - What are some alternatives to Apache Kafka?
https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka - What is the best alternative to Kafka?
https://www.slant.co/options/961/alternatives/~kafka-alternatives - A super quick comparison between Kafka and Message Queues
https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0 - Kafka Queuing: Kafka as a Messaging System
https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system - Apache Kafka Logs: A Comprehensive Guide
https://hevodata.com/learn/apache-kafka-logs-a-comprehensive-guide/ - Microservices – Not a free lunch!
http://highscalability.com/blog/2014/4/8/microservices-not-a-free-lunch.html - Microservices, Monoliths, and NoOps
http://blog.arungupta.me/microservices-monoliths-noops/ - Microservice Design Patterns
http://blog.arungupta.me/microservice-design-patterns/ - REST vs Messaging for Microservices – Which One is Best?
https://solace.com/blog/experience-awesomeness-event-driven-microservices/ - Kappa Architecture Our Experience
https://events.static.linuxfound.org/sites/events/files/slides/ASPgems%20-%20Kappa%20Architecture.pdf - Apache Kafka Streams and Tables, the stream-table duality
https://towardsdatascience.com/apache-kafka-streams-and-tables-the-stream-table-duality-ee904251a7e?gi=f22a29cd1854 - Configure Self-Managed Connectors
https://docs.confluent.io/kafka-connectors/self-managed/configuring.html#configure-self-managed-connectors - Schema Evolution and Compatibility
https://docs.confluent.io/platform/current/schema-registry/avro.html#schema-evolution-and-compatibility - Configuring Key and Value Converters
https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configuring-key-and-value-converters - Introduction to Kafka Connectors
https://www.baeldung.com/kafka-connectors-guide - Kafka CLI: command to list all consumer groups for a topic?
https://stackoverflow.com/questions/63883999/kafka-cli-command-to-list-all-consumer-groups-for-a-topic - Java Property File Processing
https://www.w3resource.com/java-tutorial/java-propertyfile-processing.php - Skipping bad records with the Kafka Connect JDBC sink connector
https://rmoff.net/2019/10/15/skipping-bad-records-with-the-kafka-connect-jdbc-sink-connector/ - Kafka Connect Deep Dive – Error Handling and Dead Letter Queues
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/ - Errors and Dead Letter Queues
https://developer.confluent.io/learn-kafka/kafka-connect/error-handling-and-dead-letter-queues/ - Confluent Cloud Dead Letter Queue
https://docs.confluent.io/cloud/current/connectors/dead-letter-queue.html - Dead Letter Queues (DLQs) in Kafka
https://medium.com/@sannidhi.s.t/dead-letter-queues-dlqs-in-kafka-afb4b6835309 - Deserializer
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-json.html#json-schema-serializer-and-deserializer - JSON, Kafka, and the need for schema
https://mikemybytes.com/2022/07/11/json-kafka-and-the-need-for-schema/ - Using Kafka Connect with Schema Registry
https://docs.confluent.io/platform/current/schema-registry/connect.html - Zpracování dat reprezentovaných ve formátu JSON nástrojem jq
https://www.root.cz/clanky/zpracovani-dat-reprezentovanych-ve-formatu-json-nastrojem-jq/ - Repositář projektu jq (GitHub)
https://github.com/stedolan/jq - GitHub stránky projektu jq
https://stedolan.github.io/jq/ - 5 modern alternatives to essential Linux command-line tools
https://opensource.com/article/20/6/modern-linux-command-line-tools - Návod k nástroji jq
https://stedolan.github.io/jq/tutorial/ - jq Manual (development version)
https://stedolan.github.io/jq/manual/ - Introducing JSON
https://www.json.org/json-en.html - Understanding JSON schema
https://json-schema.org/understanding-json-schema/index.html - JDBC Sink Connector for Confluent Platform
https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/overview.html#jdbc-sink-connector-for-cp - JDBC Connector (Source and Sink)
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc - Introduction to Schema Registry in Kafka
https://medium.com/slalom-technology/introduction-to-schema-registry-in-kafka-915ccf06b902 - Understanding JSON Schema Compatibility
https://yokota.blog/2021/03/29/understanding-json-schema-compatibility/