Obsah
1. Vývoj služeb postavených na systému Apache Kafka v jazyku Go
2. Knihovna confluent-kafka-go (Confluent's Golang Client for Apache Kafka)
3. Producent zpráv založený na knihovně confluent-kafka-go
4. Úplný zdrojový kód producenta zpráv
5. Konzument zpráv založený na knihovně confluent-kafka-go
6. Úplný zdrojový kód konzumenta zpráv
8. Producent zpráv založený na knihovně Sarama
9. Úplný zdrojový kód producenta zpráv
10. Konzument zpráv založený na knihovně Sarama
11. Modulární podoba aplikace představující producenta zpráv
12. Využití nového rozhraní i jeho implementace
13. Modulární podoba aplikace představující konzumenta zpráv
14. Využití nového rozhraní i jeho implementace
15. Sarama/mocks a jednotkové testy
16. Mock objekty dostupné v knihovně Sarama/mocks
17. Ukázka konstrukce jednotkového testu
18. Repositář s demonstračními příklady
19. Odkazy na relevantní články na Rootu
1. Tvorba služeb postavených na systému Apache Kafka v jazyku Go
S užitečným, populárním a dnes velmi často nasazovaným nástrojem Apache Kafka jsme se již na stránkách Rootu několikrát setkali, a to například v článcích Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách , Apache Kafka: distribuovaná streamovací platforma a taktéž v trojici článků Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw, Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw (2. část) a Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony). Pro tuto asi nejznámější streamovací platformu existují knihovny realizující rozhraní pro různé programovací jazyky a jejich ekosystémy. Tato rozhraní jsou vypsána v následující tabulce:
# | Jazyk/platforma |
---|---|
1 | C/C++ |
2 | Python |
3 | Go/Golang Go/Golang |
4 | Erlang |
5 | .NET |
6 | Ruby |
7 | Node.js |
8 | Perl |
9 | PHP |
10 | Rust |
11 | Storm |
12 | Scala (DSL jazyk) |
13 | Clojure |
14 | Clojure |
15 | Swift |
16 | CLI (stdin/stdout) |
Dnes si ukážeme, jak lze použít základní funkce Apache Kafky v programovacím jazyku Go. Demonstrační příklady budou využívat dvě knihovny, a to konkrétně confluent-kafka-go a Sarama. Důležitým tématem je však i testování aplikací, které jsou založeny na Kafce; zejména se to týká jednotkových testů. Tomuto tématu je věnována druhá polovina článku, která tak nepřímo navazuje na dvojici článků Jazyk Go prakticky: jednotkové testy kódu, který přistupuje k SQL databázím a Jazyk Go prakticky: jednotkové testy kódu, který přistupuje k SQL databázím (dokončení).
Konfigurace ZooKeepera je uložena v souboru config/zookeeper.properties:
... ... ... # the directory where the snapshot is stored. dataDir=/tmp/ramdisk/zookeeper # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 # Disable the adminserver by default to avoid port conflicts. # Set the port to something non-conflicting if choosing to enable this admin.enableServer=false # admin.serverPort=8080 # Enable snapshot.trust.empty config if the ZK upgrade from 3.4.X to 3.5.6 is failing # with "java.io.IOException: No snapshot found, but there are log entries" error. # Check upgrade docs for more details. # snapshot.trust.empty=true ... ... ...
Konfigurace brokera je uložena v souboru :
... ... ... ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/tmp/ramdisk/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ... ... ...
2. Knihovna confluent-kafka-go (Confluent's Golang Client for Apache Kafka)
Pro komunikaci s Apache Kafkou z programovacího jazyka Go lze využít dvě knihovny. První z nich se jmenuje confluent-kafka-go neboli v lidské variantě Confluent's Golang Client for Apache Kafka. Druhá z těchto knihoven má název Sarama. Knihovna confluent-kafka-go nabízí programátorům spíše vysokoúrovňové operace se systémem Apache Kafka, což může být pro mnoho aplikací dostačující. Naproti tomu knihovna Sarama před programátorem odhaluje i některé nízkoúrovňové operace související se samotným komunikačním protokolem, takže tvorba aplikací může být nepatrně složitější, ale o to zajímavější (většinou si ovšem nad Saramou programátoři vytvoří vlastní zjednodušené rozhraní).
Začněme popisem zcela základních operací poskytovaných knihovnou confluent-kafka-go. Tato knihovna je sice naprogramovaná v jazyce Go, ovšem pro svoji korektní činnost vyžaduje i nativní knihovnu nazvanou librdkafka, kterou lze nainstalovat buď z repositářů dané distribuce, nebo běžným postupem:
$ git clone https://github.com/edenhill/librdkafka.git $ cd librdkafka $ ./configure --prefix /usr $ make $ sudo make install
Dále je ještě před instalací knihovny pro Go většinou nutné nastavit PKG_CONFIG_PATH (pokud již není korektně nastavena):
$ export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:/usr/lib/pkgconfig/
Stažení a překlad knihovny confluent-kafka-go pro jazyk Go je posléze již triviální a měl by být proveden bez komplikací:
$ go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
3. Producent zpráv založený na knihovně confluent-kafka-go
Nyní si ukažme, jakým způsobem je možné vytvořit producenta zpráv v programovacím jazyce Go s využitím knihovny confluent-kafka-go. Pro připojení ke Kafce a posílání zpráv musíme znát nejméně dva údaje – počítač a port, na kterém běží broker, a taktéž jméno tématu (topic):
const ( server = "localhost" ) topic := "upload"
Vytvoření instance datové struktury představující producenta se zajištěním jeho destrukce (odpojení) na konci příslušné funkce může být naprogramováno následovně:
producer, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": server, }) // kontrola chyby při připojování ke Kafce if err != nil { panic(err) } // producenta zpráv je nutné na konci odpojit defer producer.Close()
Samotné vytváření zpráv s jejich naformátováním (do řetězce), převodem na pole bajtů a posíláním do Kafky vypadá následovně:
for i := 0; i < 100; i++ { text := fmt.Sprintf("Message #%d", i) producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(text), }, nil) }
Vidíme, že je zapotřebí specifikovat téma (topic) a popř. i sekci (partition). Samotná zpráva je reprezentována polem bajtů, může se tedy jednat o libovolná data.
Navíc je ovšem někdy vhodné asynchronně reagovat na události, které mohou při práci se systémem Apache Kafky nastat. Asynchronní zpracování je v Go přímočaré – použijeme k tomu takzvanou gorutinu a budeme reagovat na případné chyby a informace o doručení zprávy:
go func() { for event := range producer.Events() { switch ev := event.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }()
4. Úplný zdrojový kód producenta zpráv
Úplný zdrojový kód výše popsaného producenta zpráv naleznete na adrese https://github.com/tisnik/go-root/blob/master/article75/confluent_kafka_producer.go:
// Ukázka použití rozhraní pro systém Apache Kafka představovaného knihovnou // confluent-kafka-go: implementace producenta zpráv. package main import ( "fmt" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) const ( server = "localhost" ) func main() { topic := "upload" // konstrukce producenta producer, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": server, }) // kontrola chyby při připojování ke Kafce if err != nil { panic(err) } // producenta zpráv je nutné na konci odpojit defer producer.Close() // funkce volaná pro každou událost, která při práci s Kafkou může nastat go func() { for event := range producer.Events() { switch ev := event.(type) { case *kafka.Message: if ev.TopicPartition.Error != nil { fmt.Printf("Delivery failed: %v\n", ev.TopicPartition) } else { fmt.Printf("Delivered message to %v\n", ev.TopicPartition) } } } }() // vytváření a produkce zpráv posílaných do zvoleného tématu for i := 0; i < 100; i++ { text := fmt.Sprintf("Message #%d", i) producer.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(text), }, nil) } producer.Flush(15 * 1000) }
Překlad tohoto producenta lze provést buď s tím, že při spuštění bude vyžadována dynamicky linkovaná knihovna librdkafka:
$ go build producer1.go $ ldd ./producer1 linux-vdso.so.1 (0x00007ffc70b8e000) librdkafka.so.1 => not found libpthread.so.0 => /lib64/libpthread.so.0 (0x00007ffb2b6f8000) libc.so.6 => /lib64/libc.so.6 (0x00007ffb2b342000) /lib64/ld-linux-x86-64.so.2 (0x00007ffb2b916000)
Alternativně lze vynutit statické slinkování, což je v tomto případě podle mého názoru lepší řešení (není zapotřebí nastavovat LD_LIBRARY_PATH atd. a statické linkování je ve světě Go používáno velmi často):
$ go build -tags static producer1.go $ ldd ./producer1 linux-vdso.so.1 (0x00007ffdd3156000) libm.so.6 => /lib64/libm.so.6 (0x00007f424359c000) libz.so.1 => /lib64/libz.so.1 (0x00007f4243385000) libdl.so.2 => /lib64/libdl.so.2 (0x00007f4243181000) libpthread.so.0 => /lib64/libpthread.so.0 (0x00007f4242f63000) librt.so.1 => /lib64/librt.so.1 (0x00007f4242d5b000) libc.so.6 => /lib64/libc.so.6 (0x00007f42429a5000) /lib64/ld-linux-x86-64.so.2 (0x00007f42438e7000)
5. Konzument zpráv založený na knihovně confluent-kafka-go
Konzument zpráv naprogramovaný v jazyce Go, jenž je opět založený na knihovně confluent-kafka-go, je postaven na použití datové struktury pojmenované Consumer:
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": server, "group.id": group_id, "auto.offset.reset": "earliest", }) // kontrola chyby při připojování ke Kafce if err != nil { panic(err) } // i konzumenta je nutné na konci uzavřít defer consumer.Close()
Dále se přihlásíme k příjmu zpráv se zadaným tématem (nebo tématy, protože lze předat řez s větším množstvím témat):
consumer.SubscribeTopics([]string{topic}, nil)
Příjem zpráv (s případným čekáním na nové zprávy) je řešen v nekonečné smyčce, v níž se současně kontroluje, zda nedošlo k nějaké chybě:
for { message, err := consumer.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s %s\n", message.TopicPartition, string(message.Key), string(message.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, message) } }
6. Úplný zdrojový kód konzumenta zpráv
Úplný zdrojový kód výše popsaného konzumenta zpráv naleznete na adrese https://github.com/tisnik/go-root/blob/master/article75/confluent_kafka_consumer.go:
// Ukázka použití rozhraní pro systém Apache Kafka představovaného knihovnou // confluent-kafka-go: implementace konzumenta zpráv. package main import ( "fmt" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka" ) const ( server = "localhost:9092" topic = "upload" group_id = "group1" ) func main() { // konstrukce konzumenta consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": server, "group.id": group_id, "auto.offset.reset": "earliest", }) // kontrola chyby při připojování ke Kafce if err != nil { panic(err) } // i konzumenta je nutné na konci uzavřít defer consumer.Close() // přihlášení konzumenta do zvoleného tématu (či témat) consumer.SubscribeTopics([]string{topic}, nil) // postupné čtení zpráv, které byly do zvoleného tématu publikovány for { message, err := consumer.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s %s\n", message.TopicPartition, string(message.Key), string(message.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, message) } } }
7. Knihovna Sarama
Ve druhé části dnešního článku se seznámíme se základními možnostmi poskytovanými knihovnou nazvanou Sarama. Tato knihovna je opět určena pro komunikaci mezi aplikacemi vyvinutými v programovacím jazyku Go na jedné straně a Apache Kafkou na straně druhé. Sarama je kompletně naprogramována v jazyku Go, takže nevyžaduje ani nativní knihovnu librdkafka (https://github.com/edenhill/librdkafka) ani JVM. Zajímavé je, že Sarama podporuje jak vysokoúrovňový přístup ke Kafce, tak i přístup na nižší úrovni, v němž se „odhalují“ některé vlastnosti protokolu Apache Kafky. Podobně jako tomu bylo u předchozí knihovny si i nyní ukážeme, jak bude vypadat jednoduchý producent zpráv i jejich konzument. Posléze si ukážeme nepatrně složitější kód použitý v reálném projektu.
8. Producent zpráv založený na knihovně Sarama
Ukažme si nyní, jak může vypadat jednoduchý producent zpráv posílaných do Apache Kafky. Na rozdíl od předchozích demonstračních příkladů bude producent založen na použití knihovny Sarama. V knihovně Sarama existují dva typy producentů zpráv – synchronní a asynchronní. Synchronní producent, který je představován instancí SyncProducer, je blokující, tj. aktuálně běžící gorutina musí počkat, až je zpráva poslána. Naproti tomu asynchronní producent, jenž je představován instancí AsyncProducer, se z programátorského pohledu chová jako kanál s kapacitou – zprávy jsou tedy posílány na pozadí (asynchronně).
Základními konfiguračními parametry jsou adresa brokera a jméno tématu (topicu):
const ( // KafkaConnectionString obsahuje jméno počítače a port, na kterém běží Kafka broker KafkaConnectionString = "localhost:9092" // KafkaTopic obsahuje jméno tématu KafkaTopic = "test-topic" )
V demonstračním příkladu použijeme synchronního producenta zpráv, kterému je nutné při jeho inicializaci konstruktorem NewSyncProducer předat adresy brokerů (minimálně jednoho brokera) a ve druhém parametru konfiguraci. Při výchozí konfiguraci je však možné předat nil, což v našem případě plně dostačuje:
// konstrukce konzumenta producer, err := sarama.NewSyncProducer([]string{KafkaConnectionString}, nil) // kontrola chyby při připojování ke Kafce if err != nil { log.Fatal(err) }
Samozřejmě je nutné zajistit, aby se připojení taktéž uzavřelo ve chvíli, kdy již není zapotřebí, tedy konkrétně při ukončování hlavní gorutiny:
// zajištění uzavření připojení ke Kafce defer func() { if err := producer.Close(); err != nil { log.Fatal(err) } }()
Posílaná zpráva je reprezentována datovým typem ProducerMessage. Ten obsahuje několik atributů, ovšem nejdůležitější jsou atributy Topic, Key, Value. V Apache Kafce jsou tyto údaje uloženy jako sekvence bajtů, ovšem nic nám nebrání použít řetězce, které jsou na sekvenci bajtů převedeny s využitím jedné (z několika) pomocných funkcí pro serializaci:
// poslání (produkce) zprávy msg := &sarama.ProducerMessage{Topic: KafkaTopic, Value: sarama.StringEncoder("testing 123")}
Poslání zprávy je provedeno metodou:
func (producer SyncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
Jak je z popisu patrné, vrátí tato metoda informace o oddílu, do kterého byla uložena. Taktéž se vrátí offset uložené zprávy v rámci oddílu a v případě chyby i informace o této chybě (v ideálním případě nil):
partition, offset, err := producer.SendMessage(msg) if err != nil { log.Printf("FAILED to send message: %s\n", err) } else { log.Printf("> message sent to partition %d at offset %d\n", partition, offset) }
Takto naprogramovaného producenta zpráv můžeme otestovat s využitím velmi užitečného nástroje nazvaného kafkacat. Otestování bude probíhat tak, že kafkacat bude použit v režimu konzumenta. To znamená, že v jednom terminálu kafkacat spustíme, samozřejmě se správně nastavenými parametry – adresou a portem brokera a jménem tématu (topicu):
$ kafkacat -C -b localhost:9092 -t test-topic % Reached end of topic test-topic [0] at offset 0
Ve druhém terminálu spustíme producenta zpráv a budeme sledovat, zda jsou posílané zprávy skutečně uloženy do tématu a zkonzumovány:
$ ./sarama-producer 2021/06/13 17:48:50 Connected to localhost:9092 2021/06/13 17:48:50 > message sent to partition 0 at offset 0 2021/06/13 17:48:50 Done
Na terminálu se spuštěným kafkacatem by se nyní měla objevit zpráva o přijaté zprávě:
testing 123 % Reached end of topic test-topic [0] at offset 1
9. Úplný zdrojový kód producenta zpráv
Úplný zdrojový kód výše popsaného producenta zpráv naleznete na adrese https://github.com/tisnik/go-root/blob/master/article75/sarama-producer:
// Ukázka použití rozhraní pro systém Apache Kafka představovaného knihovnou // Sarama: implementace producenta zpráv. package main import ( "log" "github.com/Shopify/sarama" ) const ( // KafkaConnectionString obsahuje jméno počítače a port, na kterém běží Kafka broker KafkaConnectionString = "localhost:9092" // KafkaTopic obsahuje jméno tématu KafkaTopic = "test-topic" ) func main() { // konstrukce konzumenta producer, err := sarama.NewSyncProducer([]string{KafkaConnectionString}, nil) // kontrola chyby při připojování ke Kafce if err != nil { log.Fatal(err) } log.Printf("Connected to %s", KafkaConnectionString) // zajištění uzavření připojení ke Kafce defer func() { if err := producer.Close(); err != nil { log.Fatal(err) } }() // poslání (produkce) zprávy msg := &sarama.ProducerMessage{Topic: KafkaTopic, Value: sarama.StringEncoder("testing 123")} partition, offset, err := producer.SendMessage(msg) if err != nil { log.Printf("FAILED to send message: %s\n", err) } else { log.Printf("> message sent to partition %d at offset %d\n", partition, offset) } log.Print("Done") }
10. Konzument zpráv založený na knihovně Sarama
V této kapitole si ukážeme způsob vytvoření jednoduchého konzumenta zpráv. Ten je představován instancí Consumer, která se vytváří konstruktorem NewConsumer. Tomuto konstruktoru je nutné předat adresy brokerů (minimálně jednoho brokera) a ve druhém parametru konfiguraci. Při výchozí konfiguraci je však možné předat nil, což v našem případě plně dostačuje, podobně jako v případě producenta zpráv:
const ( // KafkaConnectionString obsahuje jméno počítače a port, na kterém běží Kafka broker KafkaConnectionString = "localhost:9092" ) // konstrukce konzumenta consumer, err := sarama.NewConsumer([]string{KafkaConnectionString}, nil) // kontrola chyby při připojování ke Kafce if err != nil { log.Fatal(err) }
Samozřejmě je (opět) nutné zajistit, aby se připojení taktéž uzavřelo ve chvíli, kdy již není zapotřebí, tedy konkrétně při ukončování hlavní gorutiny:
// zajištění uzavření připojení ke Kafce defer func() { if err := consumer.Close(); err != nil { log.Fatal(err) } }()
Čtení (tedy konzumaci) zpráv zajišťuje objekt PartitionConsumer, jenž je vytvořen konstruktorem ConsumePartition. Tomuto konstruktoru je nutné předat téma, z něhož se mají zprávy číst, index oddílu a taktéž offset první zprávy, která se má přečíst. Kromě skutečného offsetu (tedy kladného celého čísla) lze použít i symbolické konstanty OffsetNewest a OffsetOldest a tím určit, od jakého offsetu se mají zprávy zpracovat – zda všechny od nejstarší, či naopak zda se mají zpracovat až nově příchozí zprávy:
func (consumer Consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
Konkrétně:
// přihlášení ke zvolenému tématu partitionConsumer, err := consumer.ConsumePartition(KafkaTopic, 0, sarama.OffsetNewest) if err != nil { log.Fatal(err) }
Samotné čtení zpráv je v knihovně Sarama provedeno velmi elegantně – s využitím kanálu, který zajistí i blokující čekání na nové zprávy:
for { msg := <-partitionConsumer.Messages() // vypíšeme pouze offset zprávy, její klíč a tělo (value, payload) log.Printf("Consumed message offset %d: %s:%s", msg.Offset, msg.Key, msg.Value) consumed++ }
Úplný zdrojový kód konzumenta vypadá následovně:
// Ukázka použití rozhraní pro systém Apache Kafka představovaného knihovnou // Sarama: implementace konzumenta zpráv. package main import ( "log" "github.com/Shopify/sarama" ) const ( // KafkaConnectionString obsahuje jméno počítače a port, na kterém běží Kafka broker KafkaConnectionString = "localhost:9092" // KafkaTopic obsahuje jméno tématu KafkaTopic = "test-topic" ) func main() { // konstrukce konzumenta consumer, err := sarama.NewConsumer([]string{KafkaConnectionString}, nil) // kontrola chyby při připojování ke Kafce if err != nil { log.Fatal(err) } log.Printf("Connected to %s", KafkaConnectionString) // zajištění uzavření připojení ke Kafce defer func() { if err := consumer.Close(); err != nil { log.Fatal(err) } }() // přihlášení ke zvolenému tématu partitionConsumer, err := consumer.ConsumePartition(KafkaTopic, 0, sarama.OffsetNewest) if err != nil { log.Fatal(err) } // zajištění ukončení přihlášení ke zvolenému tématu defer func() { if err := partitionConsumer.Close(); err != nil { log.Fatal(err) } }() // postupné čtení zpráv, které byly do zvoleného tématu publikovány consumed := 0 for { msg := <-partitionConsumer.Messages() // vypíšeme pouze offset zprávy, její klíč a tělo (value, payload) log.Printf("Consumed message offset %d: %s:%s", msg.Offset, msg.Key, msg.Value) consumed++ } // výpis počtu zpracovaných zpráv (ovšem sem se stejně nedostaneme :-) log.Printf("Consumed: %d", consumed) log.Print("Done") }
Existují i mnohé další možnosti poskytované knihovnou Sarama, například výpis informací o tématech atd.:
// Ukázka použití rozhraní pro systém Apache Kafka představovaného knihovnou // Sarama: výpis informací o tématech. package main import ( "log" "github.com/Shopify/sarama" ) const ( // KafkaConnectionString obsahuje jméno počítače a port, na kterém běží Kafka broker KafkaConnectionString = "localhost:9092" // KafkaTopic obsahuje jméno tématu KafkaTopic = "test-topic" ) func main() { // konstrukce rozhraní k brokerovi broker := sarama.NewBroker(KafkaConnectionString) // kontrola chyby při připojování k brokerovi err := broker.Open(nil) if err != nil { log.Fatal(err) } log.Printf("Connected to %s", KafkaConnectionString) request := sarama.MetadataRequest{Topics: []string{KafkaTopic}} response, err := broker.GetMetadata(&request) if err != nil { _ = broker.Close() log.Fatal(err) } if len(response.Topics) == 1 { log.Print("There is one topic active in the cluster.") } else { log.Print("There are", len(response.Topics), "topics active in the cluster.") } if err = broker.Close(); err != nil { log.Fatal(err) } log.Print("Done") }
11. Modulární podoba aplikace představující producenta zpráv
U reálných aplikací je možné knihovnu Sarama „obalit“ vlastním rozhraním, které bude předepisovat pouze ty metody, které jsou skutečně v dané aplikaci zapotřebí. Následující demonstrační příklad je do určité míry založen na reálné aplikaci. Začneme definicí datového typu představujícího minimální užitečnou konfiguraci brokera:
package main // BrokerConfiguration represents configuration of Kafka brokers and topics type BrokerConfiguration struct { Address string `mapstructure:"address" toml:"address"` Topic string `mapstructure:"topic" toml:"topic"` }
Dále je deklarováno rozhraní pro všechny implementace brokera. Jsou předepsány pouze ty metody, které jsou upotřebitelné v aplikaci:
// Producer represents any producer type Producer interface { ProduceMessage(message Message) (int32, int64, error) Close() error }
Následuje konkrétní implementace tohoto rozhraní, včetně konstruktoru NewKafkaProducer:
import ( "encoding/json" "github.com/Shopify/sarama" "github.com/rs/zerolog/log" ) // KafkaProducer is an implementation of Producer interface type KafkaProducer struct { Configuration BrokerConfiguration Producer sarama.SyncProducer } // NewKafkaProducer constructs new implementation of Producer interface func NewKafkaProducer(brokerConfiguration BrokerConfiguration) (*KafkaProducer, error) { producer, err := sarama.NewSyncProducer([]string{brokerConfiguration.Address}, nil) if err != nil { log.Error().Err(err).Msg("unable to create a new Kafka producer") return nil, err } return &KafkaProducer{ Configuration: brokerConfiguration, Producer: producer, }, nil } // ProduceMessage produces message to selected topic. That function returns // partition ID and offset of new message or an error value in case of any // problem on broker side. func (producer *KafkaProducer) ProduceMessage(message Message) (partitionID int32, offset int64, err error) { jsonBytes, err := json.Marshal(message) if err != nil { log.Error().Err(err).Msg("Couldn't turn notification message into valid JSON") return -1, -1, err } // construct message to be produced using the provided payload (message body) producerMsg := &sarama.ProducerMessage{ Topic: producer.Configuration.Topic, Value: sarama.ByteEncoder(jsonBytes), } // try to produce message partitionID, offset, err = producer.Producer.SendMessage(producerMsg) if err != nil { log.Error().Err(err).Msg("failed to produce message to Kafka") } else { log.Info().Msgf("message sent to partition %d at offset %d\n", partitionID, offset) } return } // Close allow the Sarama producer to be gracefully closed func (producer *KafkaProducer) Close() error { if err := producer.Producer.Close(); err != nil { log.Error().Err(err).Msg("unable to close Kafka producer") return err } return nil }
Výsledná struktura typů:
Obrázek 1: Datové struktury a rozhraní deklarované v předchozím kódu.
12. Využití nového rozhraní i jeho implementace
Rozhraní Producer a jeho implementaci KafkaProducer můžeme velmi snadno použít v aplikaci, která do brokera pošle zprávu se třemi atributy uloženými ve formátu JSON. Samotný kód je přímočarý (i když, jak je v jazyce Go zvykem, „prošpikovaný“ kontrolami chyb):
package main import ( "os" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) type Message struct { ID int `json:"id"` Name string `json:"name"` Surname string `json:"surname"` } func main() { log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) log.Info().Msg("Started") brokerConfiguration := BrokerConfiguration{ Address: "localhost:9092", Topic: "test-topic2", } producer, err := NewKafkaProducer(brokerConfiguration) if err != nil { log.Error().Err(err).Msg("Can not connect to Kafka") return } log.Info().Str("address", brokerConfiguration.Address).Msg("Connected to Kafka") defer producer.Close() message := Message{ ID: 42, Name: "Václav", Surname: "Trachta", } _, _, err = producer.ProduceMessage(message) if err != nil { log.Error().Err(err).Msg("Unable to produce message") return } log.Info().Msg("Finished") }
13. Modulární podoba aplikace představující konzumenta zpráv
Stejným způsobem, jako producenta zpráv, můžeme vytvořit „obalující“ rozhraní i pro konzumenta. Ovšem zde se ukazují některá úskalí nízkoúrovňového přístupu knihovny Sarama. Opět začneme definicí typu brokera, nyní s přidanou informací o skupině:
// BrokerConfiguration represents configuration of Kafka brokers and topics type BrokerConfiguration struct { Address string `mapstructure:"address" toml:"address"` Topic string `mapstructure:"topic" toml:"topic"` Group string `mapstructure:"group" toml:"group"` }
Dále definujeme rozhraní představující konzumenta zpráv:
// Consumer represents any consumer type Consumer interface { Serve() ProcessMessage(msg *sarama.ConsumerMessage) error Close() error }
Implementace konzumenta je relativně složitá, a to z toho důvodu, že reálný kód musí být schopen se znovu připojit ke Kafce i po rebalancingu (které může nastat v závislosti na nastavené konfiguraci). Následující kód byl vytvořen po několika neblahých zkušenostech s reálným nasazením aplikací:
import ( "context" "encoding/json" "github.com/Shopify/sarama" "github.com/rs/zerolog/log" ) // KafkaConsumer in an implementation of Consumer interface type KafkaConsumer struct { Configuration BrokerConfiguration ConsumerGroup sarama.ConsumerGroup numberOfSuccessfullyConsumedMessages uint64 numberOfErrorsConsumingMessages uint64 ready chan bool cancel context.CancelFunc } // DefaultSaramaConfig is a config which will be used by default // here you can use specific version of a protocol for example // useful for testing var DefaultSaramaConfig *sarama.Config // NewConsumer constructs new implementation of Consumer interface func NewKafkaConsumer(brokerCfg BrokerConfiguration) (*KafkaConsumer, error) { return NewWithSaramaConfig(brokerCfg, DefaultSaramaConfig) } // NewWithSaramaConfig constructs new implementation of Consumer interface with custom sarama config func NewWithSaramaConfig( brokerConfiguration BrokerConfiguration, saramaConfig *sarama.Config, ) (*KafkaConsumer, error) { if saramaConfig == nil { saramaConfig = sarama.NewConfig() saramaConfig.Version = sarama.V0_10_2_0 /* TODO: we need to do it in production code if brokerCfg.Timeout > 0 { saramaConfig.Net.DialTimeout = brokerCfg.Timeout saramaConfig.Net.ReadTimeout = brokerCfg.Timeout saramaConfig.Net.WriteTimeout = brokerCfg.Timeout } */ } consumerGroup, err := sarama.NewConsumerGroup([]string{brokerConfiguration.Address}, brokerConfiguration.Group, saramaConfig) if err != nil { return nil, err } consumer := &KafkaConsumer{ Configuration: brokerConfiguration, ConsumerGroup: consumerGroup, numberOfSuccessfullyConsumedMessages: 0, numberOfErrorsConsumingMessages: 0, ready: make(chan bool), } return consumer, nil } // Serve starts listening for messages and processing them. It blocks current thread. func (consumer *KafkaConsumer) Serve() { ctx, cancel := context.WithCancel(context.Background()) consumer.cancel = cancel go func() { for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := consumer.ConsumerGroup.Consume(ctx, []string{consumer.Configuration.Topic}, consumer); err != nil { log.Fatal().Err(err).Msg("Unable to recreate Kafka session") } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { log.Info().Err(ctx.Err()).Msg("Stopping consumer") return } log.Info().Msg("Created new kafka session") consumer.ready = make(chan bool) } }() // Await till the consumer has been set up log.Info().Msg("Waiting for consumer to become ready") <-consumer.ready log.Info().Msg("Finished waiting for consumer to become ready") // Actual processing is done in goroutine created by sarama (see ConsumeClaim below) log.Info().Msg("Started serving consumer") <-ctx.Done() log.Info().Msg("Context cancelled, exiting") cancel() } // Setup is run at the beginning of a new session, before ConsumeClaim func (consumer *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error { log.Info().Msg("New session has been setup") // Mark the consumer as ready close(consumer.ready) return nil } // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error { log.Info().Msg("New session has been finished") return nil } // ConsumeClaim starts a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { log.Info(). Int64("offset", claim.InitialOffset()). Msg("Starting messages loop") for message := range claim.Messages() { consumer.HandleMessage(message) session.MarkMessage(message, "") } return nil } // Close method closes all resources used by consumer func (consumer *KafkaConsumer) Close() error { if consumer.cancel != nil { consumer.cancel() } if consumer.ConsumerGroup != nil { if err := consumer.ConsumerGroup.Close(); err != nil { log.Error(). Err(err). Msg("Unable to close consumer group") } } return nil } // GetNumberOfSuccessfullyConsumedMessages returns number of consumed messages // since creating KafkaConsumer obj func (consumer *KafkaConsumer) GetNumberOfSuccessfullyConsumedMessages() uint64 { return consumer.numberOfSuccessfullyConsumedMessages } // GetNumberOfErrorsConsumingMessages returns number of errors during consuming messages // since creating KafkaConsumer obj func (consumer *KafkaConsumer) GetNumberOfErrorsConsumingMessages() uint64 { return consumer.numberOfErrorsConsumingMessages } // HandleMessage handles the message and does all logging, metrics, etc func (consumer *KafkaConsumer) HandleMessage(msg *sarama.ConsumerMessage) { log.Info(). Int64("offset", msg.Offset). Int32("partition", msg.Partition). Str("topic", msg.Topic). Msg("Started processing message") err := consumer.ProcessMessage(msg) log.Info(). Int64("offset", msg.Offset). Int32("partition", msg.Partition). Str("topic", msg.Topic). Msg("Finished processing message") // Something went wrong while processing the message. if err != nil { log.Error(). Err(err). Msg("Error processing message consumed from Kafka") consumer.numberOfErrorsConsumingMessages++ } else { // The message was processed successfully. consumer.numberOfSuccessfullyConsumedMessages++ } } // ProcessMessage processes an incoming message func (consumer *KafkaConsumer) ProcessMessage(msg *sarama.ConsumerMessage) error { log.Info(). Str("key", string(msg.Key)). Str("value", string(msg.Value)). Msg("Consumed") var deserialized Message err := json.Unmarshal(msg.Value, &deserialized) if err != nil { return err } log.Info().Int("ID", deserialized.ID). Str("Name", deserialized.Name). Str("Surname", deserialized.Surname). Msg("Deserialized message") return nil }
Výsledná struktura typů:
Obrázek 2: Datové struktury a rozhraní deklarované v předchozím kódu.
14. Využití nového rozhraní i jeho implementace
I když byla realizace rozhraní KafkaConsumer relativně složitá, samotné jeho použití je snadné a přímočaré, jak je ostatně patrné z následující ukázky zdrojového kódu:
package main import ( "os" "github.com/rs/zerolog" "github.com/rs/zerolog/log" ) type Message struct { ID int `json:"id"` Name string `json:"name"` Surname string `json:"surname"` } func main() { log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) log.Info().Msg("Started") brokerConfiguration := BrokerConfiguration{ Address: "localhost:9092", Topic: "test-topic2", Group: "grpX", } consumer, err := NewKafkaConsumer(brokerConfiguration) if err != nil { log.Error().Err(err).Msg("Can not connect to Kafka") return } log.Info().Str("address", brokerConfiguration.Address).Msg("Connected to Kafka") defer consumer.Close() consumer.Serve() log.Info().Msg("Finished") }
15. Sarama/mocks a jednotkové testy
Knihovna Sarama poskytuje aplikacím poměrně snadno použitelné rozhraní k volání funkcí systému Apache Kafka přes komunikační protokol Kafky. Ovšem v případě, že se mají vytvořit jednotkové testy pro ty funkce a metody, které volají funkce z knihovny Sarama, je nutné skutečné volání Apache Kafky nahradit vhodnými „mocky“. Samozřejmě nám nic nebrání v použití nějaké obecné knihovny pro tvorbu mocků v programovacím jazyce Go, ale knihovna Sarama nabízí vývojářům již připravené mocky, které lze v jednotkových testech inicializovat a přímo použít. Tyto mocky jsou dostupné v balíčku sarama/mocks. Nabízeny jsou tyto datové typy, jejichž konstrukcí vzniknout objekty:
# | Datový typ | Konstruktor |
---|---|---|
1 | Consumer | func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer |
2 | PartitionConsumer | func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) |
3 | AsyncProducer | func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer |
4 | SyncProducer | func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer |
V navazující kapitole jsou vypsány všechny metody implementované těmito užitečnými datovými typy.
16. Mock objekty dostupné v knihovně Sarama/mocks
Metody implementované v typu Consumer
# | Metoda |
---|---|
1 | func (c *Consumer) Close() error |
2 | func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) |
3 | func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer |
4 | func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 |
5 | func (c *Consumer) Partitions(topic string) ([]int32, error) |
6 | func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) |
7 | func (c *Consumer) Topics() ([]string, error) |
Metody implementované v typu PartitionConsumer
# | Metoda |
---|---|
1 | func (pc *PartitionConsumer) AsyncClose() |
2 | func (pc *PartitionConsumer) Close() error |
3 | func (pc *PartitionConsumer) Errors() ← chan *sarama.ConsumerError |
4 | func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose() |
5 | func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose() |
6 | func (pc *PartitionConsumer) HighWaterMarkOffset() int64 |
7 | func (pc *PartitionConsumer) Messages() ← chan *sarama.ConsumerMessage |
8 | func (pc *PartitionConsumer) YieldError(err error) |
9 | func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) |
Metody implementované v typu AsyncProducer
# | Metoda |
---|---|
1 | func (mp *AsyncProducer) AsyncClose() |
2 | func (mp *AsyncProducer) Close() error |
3 | func (mp *AsyncProducer) Errors() ← chan *sarama.ProducerError |
4 | func (mp *AsyncProducer) ExpectInputAndFail(err error) |
5 | func (mp *AsyncProducer) ExpectInputAndSucceed() |
6 | func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) |
7 | func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) |
8 | func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage |
9 | func (mp *AsyncProducer) Successes() ← chan *sarama.ProducerMessage |
Metody implementované v typu SyncProducer
# | Metoda |
---|---|
1 | func (sp *SyncProducer) Close() error |
2 | func (sp *SyncProducer) ExpectSendMessageAndFail(err error) |
3 | func (sp *SyncProducer) ExpectSendMessageAndSucceed() |
4 | func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) |
5 | func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) |
6 | func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) |
7 | func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error |
17. Ukázka konstrukce jednotkového testu
Podívejme se nyní, jak lze mock objekty využít při tvorbě jednotkových testů. Zaměříme se na objekt typu SyncProducer, jehož „nemockovanou“ variantu jsme použili v předchozích demonstračních příkladech. Vzhledem k tomu, že tento objekt použijeme v jednotkových testech, deklarujme novou funkci začínající jménem Test a akceptující jediný parametr typu*testing.T:
func TestSyncProducer(t *testing.T) {
V testu nejdříve vytvoříme instanci mockované varianty objektu typu SyncProducer a zajistíme jeho uzavření na konci funkce s definicí jednotkového testu:
syncProducer := NewSyncProducer(t, nil) defer func() { err := syncProducer.Close() if err != nil { t.Error(err) } }()
Nyní nastává nejzajímavější část jednotkového testu – určíme totiž, jak přesně se má mockovaný objekt typu SyncProducer chovat a jaké chování kódu má naopak očekávat. Budeme specifikovat, že mockovaný objekt očekává, že mu bude poslána jedna zpráva (metodou SendMessage a že tato operace bude úspěšná – mockovaný objekt se tedy bude chovat tak, jakoby k poslání zprávy skutečně došlo:
syncProducer.ExpectSendMessageAndSucceed()
Dále vytvoříme zprávu, která se má poslat:
message := &sarama.ProducerMessage{Topic: "test-topic", Value: string("test")}
Zprávu pošleme s kontrolou výsledku provedené operace:
_, offset, err := syncProducer.SendMessage(msg) if err != nil { t.Errorf("Message produce error %s", err) } if offset != 1 || offset != message.Offset { t.Errorf("Wrong offset %d", msg.Offset) }
Taktéž můžeme specifikovat, že mockovaný objekt bude očekávat poslání zprávy a posléze nasimuluje chybu při odesílání, resp. přesněji při komunikaci s Kafkou (o jakou chybu se jedná můžeme sami vybrat):
syncProducer.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
Nyní budeme kontrolovat, jaká chyba byla vrácena:
_, _, err = syncProducer.SendMessage(msg) if err != sarama.ErrOutOfBrokers { t.Errorf("Message produce error %s is different than expected error", err) } err := syncProducer.Close() if err != nil { t.Error(err) }
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes použitých demonstračních příkladů byly uloženy do nového Git repositáře, který je dostupný na adrese https://github.com/tisnik/go-root (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má přibližně stovku kilobajtů), můžete namísto toho použít odkazy na jednotlivé demonstrační příklady, které naleznete v následující tabulce:
# | Příklad/soubor | Stručný popis | Cesta |
---|---|---|---|
1 | confluent_kafka_producer.go | jednoduchý producent zpráv založený na knihovně confluent-kafka-go | https://github.com/tisnik/go-root/blob/master/article75/confluent_kafka_producer.go |
2 | confluent_kafka_consumer.go | jednoduchý konzument zpráv založený na knihovně confluent-kafka-go | https://github.com/tisnik/go-root/blob/master/article75/confluent_kafka_consumer.go |
3 | sarama-consumer | jednoduchý konzument zpráv založený na knihovně Sarama | https://github.com/tisnik/go-root/blob/master/article75/sarama-consumer |
4 | sarama-producer | jednoduchý producent zpráv založený na knihovně Sarama | https://github.com/tisnik/go-root/blob/master/article75/sarama-producer |
5 | sarama-list-topics | výpis informací o zvoleném tématu | https://github.com/tisnik/go-root/blob/master/article75/sarama-list-topics |
6 | sarama-consumer-2 | vylepšený konzument zpráv založený na knihovně Sarama | https://github.com/tisnik/go-root/blob/master/article75/sarama-consumer-2 |
7 | sarama-producer-2 | vylepšený producent zpráv založený na knihovně Sarama | https://github.com/tisnik/go-root/blob/master/article75/sarama-producer-2 |
19. Odkazy na relevantní články na Rootu
V dnešním článku jsme se zabývali dvojicí témat – využitím systému Apache Kafka v programovacím jazyku Go pro tvorbu producentů a konzumentů zpráv (prozatím bez Kafka Streams) a taktéž tvorbou jednotkových testů pro ty jednotky aplikace, které přímo s Apache Kafkou komunikují, tedy většinou s producenty a konzumenty zpráv. Jak Apache Kafkou, tak i problematikou tvorby jednotkových testů pro „okrajové“ části aplikací, jsme se již na stránkách Roota zabývali v těchto článcích:
- Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách
https://www.root.cz/clanky/pouziti-nastroje-apache-kafka-v-aplikacich-zalozenych-na-mikrosluzbach/ - Apache Kafka: distribuovaná streamovací platforma
https://www.root.cz/clanky/apache-kafka-distribuovana-streamovaci-platforma/ - Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw/ - Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw (2. část)
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw-2-cast/ - Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony)
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-projektu-apache-kafka-jazyku-clojure-a-knihovne-jackdaw-streamy-a-kolony/ - Knihovny určené pro tvorbu testů v programovacím jazyce Go
https://www.root.cz/clanky/knihovny-urcene-pro-tvorbu-testu-v-programovacim-jazyce-go/ - Testování aplikací naprogramovaných v jazyce Go
https://www.root.cz/clanky/testovani-aplikaci-naprogramovanych-v-jazyce-go/ - Pomůcky při tvorbě jednotkových testů v jazyce Go
https://www.root.cz/clanky/pomucky-pri-tvorbe-jednotkovych-testu-v-jazyce-go/ - Jazyk Go prakticky: jednotkové testy kódu, který přistupuje k SQL databázím
https://www.root.cz/clanky/jazyk-go-prakticky-jednotkove-testy-kodu-ktery-pristupuje-k-sql-databazim/ - Jazyk Go prakticky: jednotkové testy kódu, který přistupuje k SQL databázím (dokončení)
https://www.root.cz/clanky/jazyk-go-prakticky-jednotkove-testy-kodu-ktery-pristupuje-k-sql-databazim-dokonceni/
20. Odkazy na Internetu
- Confluent's Golang Client for Apache Kafka
https://github.com/confluentinc/confluent-kafka-go - Confluent: Kafka Go Client
https://docs.confluent.io/clients-confluent-kafka-go/current/overview.html - sarama: an MIT-licensed Go client library for Apache Kafka version 0.8 (and later)
https://github.com/Shopify/sarama - Sarama FAQ
https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions - Awesome Go
https://github.com/avelino/awesome-go - Real-Time Payments with Clojure and Apache Kafka (podcast)
https://www.evidentsystems.com/news/confluent-podcast-about-apache-kafka/ - Kafka.clj
https://github.com/helins-io/kafka.clj - Microservices: The Rise Of Kafka
https://movio.co/blog/microservices-rise-kafka/ - Building a Microservices Ecosystem with Kafka Streams and KSQL
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/ - An introduction to Apache Kafka and microservices communication
https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63 - kappa-architecture.com
http://milinda.pathirage.org/kappa-architecture.com/ - Questioning the Lambda Architecture
https://www.oreilly.com/ideas/questioning-the-lambda-architecture - Lambda architecture
https://en.wikipedia.org/wiki/Lambda_architecture - Kafka – ecosystem (Wiki)
https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem - The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
http://cloudurable.com/blog/kafka-ecosystem/index.html - A Kafka Operator for Kubernetes
https://github.com/krallistic/kafka-operator - Kafka Streams
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams - Kafka Streams
http://kafka.apache.org/documentation/streams/ - Kafka Streams (FAQ)
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Streams - Event stream processing
https://en.wikipedia.org/wiki/Event_stream_processing - Part 1: Apache Kafka for beginners – What is Apache Kafka?
https://www.cloudkarafka.com/blog/2016–11–30-part1-kafka-for-beginners-what-is-apache-kafka.html - What are some alternatives to Apache Kafka?
https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka - What is the best alternative to Kafka?
https://www.slant.co/options/961/alternatives/~kafka-alternatives - A super quick comparison between Kafka and Message Queues
https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0 - Kafka Queuing: Kafka as a Messaging System
https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system