Hlavní navigace

Vývoj služeb postavených na systému Apache Kafka v jazyku Go

15. 6. 2021
Doba čtení: 32 minut

Sdílet

 Autor: Apache Foundation
Dnes si ukážeme, jak lze použít základní funkce Apache Kafky v jazyku Go. Demonstrační příklady budou využívat dvě knihovny, a to konkrétně confluent-kafka-go a Sarama. Důležitým tématem je však i testování aplikací.

Obsah

1. Vývoj služeb postavených na systému Apache Kafka v jazyku Go

2. Knihovna confluent-kafka-go (Confluent's Golang Client for Apache Kafka)

3. Producent zpráv založený na knihovně confluent-kafka-go

4. Úplný zdrojový kód producenta zpráv

5. Konzument zpráv založený na knihovně confluent-kafka-go

6. Úplný zdrojový kód konzumenta zpráv

7. Knihovna Sarama

8. Producent zpráv založený na knihovně Sarama

9. Úplný zdrojový kód producenta zpráv

10. Konzument zpráv založený na knihovně Sarama

11. Modulární podoba aplikace představující producenta zpráv

12. Využití nového rozhraní i jeho implementace

13. Modulární podoba aplikace představující konzumenta zpráv

14. Využití nového rozhraní i jeho implementace

15. Sarama/mocks a jednotkové testy

16. Mock objekty dostupné v knihovně Sarama/mocks

17. Ukázka konstrukce jednotkového testu

18. Repositář s demonstračními příklady

19. Odkazy na relevantní články na Rootu

20. Odkazy na Internetu

1. Tvorba služeb postavených na systému Apache Kafka v jazyku Go

S užitečným, populárním a dnes velmi často nasazovaným nástrojem Apache Kafka jsme se již na stránkách Rootu několikrát setkali, a to například v článcích Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách , Apache Kafka: distribuovaná streamovací platforma a taktéž v trojici článků Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw, Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw (2. část) a Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony). Pro tuto asi nejznámější streamovací platformu existují knihovny realizující rozhraní pro různé programovací jazyky a jejich ekosystémy. Tato rozhraní jsou vypsána v následující tabulce:

# Jazyk/platforma
1 C/C++
2 Python
3 Go/Golang Go/Golang
4 Erlang
5 .NET
6 Ruby
7 Node.js
8 Perl
9 PHP
10 Rust
11 Storm
12 Scala (DSL jazyk)
13 Clojure
14 Clojure
15 Swift
   
16 CLI (stdin/stdout)

Dnes si ukážeme, jak lze použít základní funkce Apache Kafky v programovacím jazyku Go. Demonstrační příklady budou využívat dvě knihovny, a to konkrétně confluent-kafka-go a Sarama. Důležitým tématem je však i testování aplikací, které jsou založeny na Kafce; zejména se to týká jednotkových testů. Tomuto tématu je věnována druhá polovina článku, která tak nepřímo navazuje na dvojici článků Jazyk Go prakticky: jednotkové testy kódu, který přistupuje k SQL databázím a Jazyk Go prakticky: jednotkové testy kódu, který přistupuje k SQL databázím (dokončení).

Poznámka: jako broker Apache Kafka, tak i ZooKeeper zapisují poměrně velké množství dat na disk. Pokud Kafku spouštíte lokálně pouze pro účely vývoje (a tedy bez reálných dat, které je zapotřebí zachovat), může být užitečné všechny zápisy provádět na ramdisk – výsledkem bude jak rychlejší start obou procesů (cca 2× urychleno), tak i menší „opotřebování“ SSD. Konkrétně na mém vývojovém počítači mám vytvořen ramdisk o velikosti jednoho gigabajtu, který je připojen do adresáře /tmp/ramdisk. Konfigurace Apache Kafky i ZooKeepera je nutné nepatrně pozměnit, aby se pracovní data ukládala do tohoto adresáře (resp. přesněji řečeno do podadresářů). Konkrétní nastavení je ukázáno v dalším odstavci.

Konfigurace ZooKeepera je uložena v souboru config/zookeeper.properties:

...
...
...
# the directory where the snapshot is stored.
dataDir=/tmp/ramdisk/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
 
# Enable snapshot.trust.empty config if the ZK upgrade from 3.4.X to 3.5.6 is failing
# with "java.io.IOException: No snapshot found, but there are log entries" error.
# Check upgrade docs for more details.
# snapshot.trust.empty=true
...
...
...

Konfigurace brokera je uložena v souboru :

...
...
...
############################# Log Basics #############################
 
# A comma separated list of directories under which to store log files
log.dirs=/tmp/ramdisk/kafka-logs
 
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
 
############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
...
...
...

2. Knihovna confluent-kafka-go (Confluent's Golang Client for Apache Kafka)

Pro komunikaci s Apache Kafkou z programovacího jazyka Go lze využít dvě knihovny. První z nich se jmenuje confluent-kafka-go neboli v lidské variantě Confluent's Golang Client for Apache Kafka. Druhá z těchto knihoven má název Sarama. Knihovna confluent-kafka-go nabízí programátorům spíše vysokoúrovňové operace se systémem Apache Kafka, což může být pro mnoho aplikací dostačující. Naproti tomu knihovna Sarama před programátorem odhaluje i některé nízkoúrovňové operace související se samotným komunikačním protokolem, takže tvorba aplikací může být nepatrně složitější, ale o to zajímavější (většinou si ovšem nad Saramou programátoři vytvoří vlastní zjednodušené rozhraní).

Začněme popisem zcela základních operací poskytovaných knihovnou confluent-kafka-go. Tato knihovna je sice naprogramovaná v jazyce Go, ovšem pro svoji korektní činnost vyžaduje i nativní knihovnu nazvanou librdkafka, kterou lze nainstalovat buď z repositářů dané distribuce, nebo běžným postupem:

$ git clone https://github.com/edenhill/librdkafka.git
$ cd librdkafka
$ ./configure --prefix /usr
$ make
$ sudo make install

Dále je ještě před instalací knihovny pro Go většinou nutné nastavit PKG_CONFIG_PATH (pokud již není korektně nastavena):

$ export PKG_CONFIG_PATH=$PKG_CONFIG_PATH:/usr/lib/pkgconfig/
Poznámka: předchozí krok na některých systémech není nutné provádět, ovšem pokud uvidíte chybovou zprávu, že nelze nalézt knihovnu librdkafka kvůli problémům s pkgconfig, mělo by nastavení proměnné PKG_CONFIG_PATH pomoci.

Stažení a překlad knihovny confluent-kafka-go pro jazyk Go je posléze již triviální a měl by být proveden bez komplikací:

$ go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

3. Producent zpráv založený na knihovně confluent-kafka-go

Nyní si ukažme, jakým způsobem je možné vytvořit producenta zpráv v programovacím jazyce Go s využitím knihovny confluent-kafka-go. Pro připojení ke Kafce a posílání zpráv musíme znát nejméně dva údaje – počítač a port, na kterém běží broker, a taktéž jméno tématu (topic):

const (
        server = "localhost"
)
 
topic := "upload"
Poznámka: téma je uloženo v řetězci a nikoli v konstantě, protože budeme potřebovat adresu tohoto řetězce (což je, pravda, poněkud nešikovné řešení).

Vytvoření instance datové struktury představující producenta se zajištěním jeho destrukce (odpojení) na konci příslušné funkce může být naprogramováno následovně:

producer, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": server,
})
 
// kontrola chyby při připojování ke Kafce
if err != nil {
        panic(err)
}
 
// producenta zpráv je nutné na konci odpojit
defer producer.Close()

Samotné vytváření zpráv s jejich naformátováním (do řetězce), převodem na pole bajtů a posíláním do Kafky vypadá následovně:

for i := 0; i < 100; i++ {
        text := fmt.Sprintf("Message #%d", i)
        producer.Produce(&kafka.Message{
                TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                Value:          []byte(text),
        }, nil)
}

Vidíme, že je zapotřebí specifikovat téma (topic) a popř. i sekci (partition). Samotná zpráva je reprezentována polem bajtů, může se tedy jednat o libovolná data.

Poznámka: povšimněte si toho, že musíme předávat odkaz (referenci) na jméno tématu, což je důvod, proč není možné použít konstantu (k té nelze v Go získat adresu).

Navíc je ovšem někdy vhodné asynchronně reagovat na události, které mohou při práci se systémem Apache Kafky nastat. Asynchronní zpracování je v Go přímočaré – použijeme k tomu takzvanou gorutinu a budeme reagovat na případné chyby a informace o doručení zprávy:

go func() {
        for event := range producer.Events() {
                switch ev := event.(type) {
                case *kafka.Message:
                        if ev.TopicPartition.Error != nil {
                                fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                        } else {
                                fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                        }
                }
        }
}()
Poznámka: v tomto jednoduchém demonstračním příkladu se nečeká na korektní ukončení této gorutiny – aplikace bude ukončena po všech blocích defer ve funkci main.

4. Úplný zdrojový kód producenta zpráv

Úplný zdrojový kód výše popsaného producenta zpráv naleznete na adrese https://github.com/tisnik/go-root/blob/master/article75/con­fluent_kafka_producer.go:

// Ukázka použití rozhraní pro systém Apache Kafka představovaného knihovnou
// confluent-kafka-go: implementace producenta zpráv.
 
package main
 
import (
        "fmt"
        "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
 
const (
        server = "localhost"
)
 
func main() {
        topic := "upload"
 
        // konstrukce producenta
        producer, err := kafka.NewProducer(&kafka.ConfigMap{
                "bootstrap.servers": server,
        })
 
        // kontrola chyby při připojování ke Kafce
        if err != nil {
                panic(err)
        }
 
        // producenta zpráv je nutné na konci odpojit
        defer producer.Close()
 
        // funkce volaná pro každou událost, která při práci s Kafkou může nastat
        go func() {
                for event := range producer.Events() {
                        switch ev := event.(type) {
                        case *kafka.Message:
                                if ev.TopicPartition.Error != nil {
                                        fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                                } else {
                                        fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                                }
                        }
                }
        }()
 
        // vytváření a produkce zpráv posílaných do zvoleného tématu
        for i := 0; i < 100; i++ {
                text := fmt.Sprintf("Message #%d", i)
                producer.Produce(&kafka.Message{
                        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                        Value:          []byte(text),
                }, nil)
        }
        producer.Flush(15 * 1000)
}

Překlad tohoto producenta lze provést buď s tím, že při spuštění bude vyžadována dynamicky linkovaná knihovna librdkafka:

$ go build producer1.go
 
$ ldd ./producer1
        linux-vdso.so.1 (0x00007ffc70b8e000)
        librdkafka.so.1 => not found
        libpthread.so.0 => /lib64/libpthread.so.0 (0x00007ffb2b6f8000)
        libc.so.6 => /lib64/libc.so.6 (0x00007ffb2b342000)
        /lib64/ld-linux-x86-64.so.2 (0x00007ffb2b916000)

Alternativně lze vynutit statické slinkování, což je v tomto případě podle mého názoru lepší řešení (není zapotřebí nastavovat LD_LIBRARY_PATH atd. a statické linkování je ve světě Go používáno velmi často):

$ go build -tags static producer1.go
 
$ ldd ./producer1
        linux-vdso.so.1 (0x00007ffdd3156000)
        libm.so.6 => /lib64/libm.so.6 (0x00007f424359c000)
        libz.so.1 => /lib64/libz.so.1 (0x00007f4243385000)
        libdl.so.2 => /lib64/libdl.so.2 (0x00007f4243181000)
        libpthread.so.0 => /lib64/libpthread.so.0 (0x00007f4242f63000)
        librt.so.1 => /lib64/librt.so.1 (0x00007f4242d5b000)
        libc.so.6 => /lib64/libc.so.6 (0x00007f42429a5000)
        /lib64/ld-linux-x86-64.so.2 (0x00007f42438e7000)

5. Konzument zpráv založený na knihovně confluent-kafka-go

Konzument zpráv naprogramovaný v jazyce Go, jenž je opět založený na knihovně confluent-kafka-go, je postaven na použití datové struktury pojmenované Consumer:

consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": server,
        "group.id":          group_id,
        "auto.offset.reset": "earliest",
})
// kontrola chyby při připojování ke Kafce
if err != nil {
        panic(err)
}
 
// i konzumenta je nutné na konci uzavřít
defer consumer.Close()

Dále se přihlásíme k příjmu zpráv se zadaným tématem (nebo tématy, protože lze předat řez s větším množstvím témat):

consumer.SubscribeTopics([]string{topic}, nil)

Příjem zpráv (s případným čekáním na nové zprávy) je řešen v nekonečné smyčce, v níž se současně kontroluje, zda nedošlo k nějaké chybě:

for {
        message, err := consumer.ReadMessage(-1)
        if err == nil {
                fmt.Printf("Message on %s: %s %s\n", message.TopicPartition, string(message.Key), string(message.Value))
        } else {
                fmt.Printf("Consumer error: %v (%v)\n", err, message)
        }
}
Poznámka: povšimněte si, že u zprávy můžeme zjistit, ze které sekce (partition) byla přečtena. Samotné informace, kterou zpráva nese, jsou rozděleny na klíč (key) a hodnotu (value), což jsou sekvence bajtů (a mohou tedy obsahovat cokoli).

6. Úplný zdrojový kód konzumenta zpráv

Úplný zdrojový kód výše popsaného konzumenta zpráv naleznete na adrese https://github.com/tisnik/go-root/blob/master/article75/con­fluent_kafka_consumer.go:

// Ukázka použití rozhraní pro systém Apache Kafka představovaného knihovnou
// confluent-kafka-go: implementace konzumenta zpráv.
 
package main
 
import (
        "fmt"
        "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
 
const (
        server   = "localhost:9092"
        topic    = "upload"
        group_id = "group1"
)
 
func main() {
        // konstrukce konzumenta
        consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
                "bootstrap.servers": server,
                "group.id":          group_id,
                "auto.offset.reset": "earliest",
        })
 
        // kontrola chyby při připojování ke Kafce
        if err != nil {
                panic(err)
        }
 
        // i konzumenta je nutné na konci uzavřít
        defer consumer.Close()
 
        // přihlášení konzumenta do zvoleného tématu (či témat)
        consumer.SubscribeTopics([]string{topic}, nil)
 
        // postupné čtení zpráv, které byly do zvoleného tématu publikovány
        for {
                message, err := consumer.ReadMessage(-1)
                if err == nil {
                        fmt.Printf("Message on %s: %s %s\n", message.TopicPartition, string(message.Key), string(message.Value))
                } else {
                        fmt.Printf("Consumer error: %v (%v)\n", err, message)
                }
        }
}

7. Knihovna Sarama

Ve druhé části dnešního článku se seznámíme se základními možnostmi poskytovanými knihovnou nazvanou Sarama. Tato knihovna je opět určena pro komunikaci mezi aplikacemi vyvinutými v programovacím jazyku Go na jedné straně a Apache Kafkou na straně druhé. Sarama je kompletně naprogramována v jazyku Go, takže nevyžaduje ani nativní knihovnu librdkafka (https://github.com/edenhi­ll/librdkafka) ani JVM. Zajímavé je, že Sarama podporuje jak vysokoúrovňový přístup ke Kafce, tak i přístup na nižší úrovni, v němž se „odhalují“ některé vlastnosti protokolu Apache Kafky. Podobně jako tomu bylo u předchozí knihovny si i nyní ukážeme, jak bude vypadat jednoduchý producent zpráv i jejich konzument. Posléze si ukážeme nepatrně složitější kód použitý v reálném projektu.

8. Producent zpráv založený na knihovně Sarama

Ukažme si nyní, jak může vypadat jednoduchý producent zpráv posílaných do Apache Kafky. Na rozdíl od předchozích demonstračních příkladů bude producent založen na použití knihovny Sarama. V knihovně Sarama existují dva typy producentů zpráv – synchronní a asynchronní. Synchronní producent, který je představován instancí SyncProducer, je blokující, tj. aktuálně běžící gorutina musí počkat, až je zpráva poslána. Naproti tomu asynchronní producent, jenž je představován instancí AsyncProducer, se z programátorského pohledu chová jako kanál s kapacitou – zprávy jsou tedy posílány na pozadí (asynchronně).

Základními konfiguračními parametry jsou adresa brokera a jméno tématu (topicu):

const (
        // KafkaConnectionString obsahuje jméno počítače a port, na kterém běží Kafka broker
        KafkaConnectionString = "localhost:9092"
 
        // KafkaTopic obsahuje jméno tématu
        KafkaTopic = "test-topic"
)

V demonstračním příkladu použijeme synchronního producenta zpráv, kterému je nutné při jeho inicializaci konstruktorem NewSyncProducer předat adresy brokerů (minimálně jednoho brokera) a ve druhém parametru konfiguraci. Při výchozí konfiguraci je však možné předat nil, což v našem případě plně dostačuje:

// konstrukce konzumenta
producer, err := sarama.NewSyncProducer([]string{KafkaConnectionString}, nil)
 
// kontrola chyby při připojování ke Kafce
if err != nil {
        log.Fatal(err)
}

Samozřejmě je nutné zajistit, aby se připojení taktéž uzavřelo ve chvíli, kdy již není zapotřebí, tedy konkrétně při ukončování hlavní gorutiny:

// zajištění uzavření připojení ke Kafce
defer func() {
        if err := producer.Close(); err != nil {
                log.Fatal(err)
        }
}()

Posílaná zpráva je reprezentována datovým typem ProducerMessage. Ten obsahuje několik atributů, ovšem nejdůležitější jsou atributy Topic, Key, Value. V Apache Kafce jsou tyto údaje uloženy jako sekvence bajtů, ovšem nic nám nebrání použít řetězce, které jsou na sekvenci bajtů převedeny s využitím jedné (z několika) pomocných funkcí pro serializaci:

// poslání (produkce) zprávy
msg := &sarama.ProducerMessage{Topic: KafkaTopic, Value: sarama.StringEncoder("testing 123")}

Poslání zprávy je provedeno metodou:

func (producer SyncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)

Jak je z popisu patrné, vrátí tato metoda informace o oddílu, do kterého byla uložena. Taktéž se vrátí offset uložené zprávy v rámci oddílu a v případě chyby i informace o této chybě (v ideálním případě nil):

partition, offset, err := producer.SendMessage(msg)
if err != nil {
        log.Printf("FAILED to send message: %s\n", err)
} else {
        log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
Poznámka: existuje i varianta této metody slouží pro poslání většího množství zpráv.

Takto naprogramovaného producenta zpráv můžeme otestovat s využitím velmi užitečného nástroje nazvaného kafkacat. Otestování bude probíhat tak, že kafkacat bude použit v režimu konzumenta. To znamená, že v jednom terminálu kafkacat spustíme, samozřejmě se správně nastavenými parametry – adresou a portem brokera a jménem tématu (topicu):

$ kafkacat -C -b localhost:9092 -t test-topic
 
% Reached end of topic test-topic [0] at offset 0

Ve druhém terminálu spustíme producenta zpráv a budeme sledovat, zda jsou posílané zprávy skutečně uloženy do tématu a zkonzumovány:

$ ./sarama-producer
 
2021/06/13 17:48:50 Connected to localhost:9092
2021/06/13 17:48:50 > message sent to partition 0 at offset 0
2021/06/13 17:48:50 Done

Na terminálu se spuštěným kafkacatem by se nyní měla objevit zpráva o přijaté zprávě:

testing 123
% Reached end of topic test-topic [0] at offset 1

9. Úplný zdrojový kód producenta zpráv

Úplný zdrojový kód výše popsaného producenta zpráv naleznete na adrese https://github.com/tisnik/go-root/blob/master/article75/sarama-producer:

// Ukázka použití rozhraní pro systém Apache Kafka představovaného knihovnou
// Sarama: implementace producenta zpráv.
 
package main
 
import (
        "log"
 
        "github.com/Shopify/sarama"
)
 
const (
        // KafkaConnectionString obsahuje jméno počítače a port, na kterém běží Kafka broker
        KafkaConnectionString = "localhost:9092"
 
        // KafkaTopic obsahuje jméno tématu
        KafkaTopic = "test-topic"
)
 
func main() {
        // konstrukce konzumenta
        producer, err := sarama.NewSyncProducer([]string{KafkaConnectionString}, nil)
 
        // kontrola chyby při připojování ke Kafce
        if err != nil {
                log.Fatal(err)
        }
 
        log.Printf("Connected to %s", KafkaConnectionString)
 
        // zajištění uzavření připojení ke Kafce
        defer func() {
                if err := producer.Close(); err != nil {
                        log.Fatal(err)
                }
        }()
 
        // poslání (produkce) zprávy
        msg := &sarama.ProducerMessage{Topic: KafkaTopic, Value: sarama.StringEncoder("testing 123")}
        partition, offset, err := producer.SendMessage(msg)
        if err != nil {
                log.Printf("FAILED to send message: %s\n", err)
        } else {
                log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
        }
 
        log.Print("Done")
}

10. Konzument zpráv založený na knihovně Sarama

V této kapitole si ukážeme způsob vytvoření jednoduchého konzumenta zpráv. Ten je představován instancí Consumer, která se vytváří konstruktorem NewConsumer. Tomuto konstruktoru je nutné předat adresy brokerů (minimálně jednoho brokera) a ve druhém parametru konfiguraci. Při výchozí konfiguraci je však možné předat nil, což v našem případě plně dostačuje, podobně jako v případě producenta zpráv:

const (
        // KafkaConnectionString obsahuje jméno počítače a port, na kterém běží Kafka broker
        KafkaConnectionString = "localhost:9092"
 
)
 
// konstrukce konzumenta
consumer, err := sarama.NewConsumer([]string{KafkaConnectionString}, nil)
// kontrola chyby při připojování ke Kafce
if err != nil {
        log.Fatal(err)
}

Samozřejmě je (opět) nutné zajistit, aby se připojení taktéž uzavřelo ve chvíli, kdy již není zapotřebí, tedy konkrétně při ukončování hlavní gorutiny:

// zajištění uzavření připojení ke Kafce
defer func() {
        if err := consumer.Close(); err != nil {
                log.Fatal(err)
        }
}()

Čtení (tedy konzumaci) zpráv zajišťuje objekt PartitionConsumer, jenž je vytvořen konstruktorem ConsumePartition. Tomuto konstruktoru je nutné předat téma, z něhož se mají zprávy číst, index oddílu a taktéž offset první zprávy, která se má přečíst. Kromě skutečného offsetu (tedy kladného celého čísla) lze použít i symbolické konstanty OffsetNewest a OffsetOldest a tím určit, od jakého offsetu se mají zprávy zpracovat – zda všechny od nejstarší, či naopak zda se mají zpracovat až nově příchozí zprávy:

func (consumer Consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)

Konkrétně:

// přihlášení ke zvolenému tématu
partitionConsumer, err := consumer.ConsumePartition(KafkaTopic, 0, sarama.OffsetNewest)
if err != nil {
        log.Fatal(err)
}

Samotné čtení zpráv je v knihovně Sarama provedeno velmi elegantně – s využitím kanálu, který zajistí i blokující čekání na nové zprávy:

for {
        msg := <-partitionConsumer.Messages()
        // vypíšeme pouze offset zprávy, její klíč a tělo (value, payload)
        log.Printf("Consumed message offset %d: %s:%s", msg.Offset, msg.Key, msg.Value)
        consumed++
}

Úplný zdrojový kód konzumenta vypadá následovně:

// Ukázka použití rozhraní pro systém Apache Kafka představovaného knihovnou
// Sarama: implementace konzumenta zpráv.
 
package main
 
import (
        "log"
 
        "github.com/Shopify/sarama"
)
 
const (
        // KafkaConnectionString obsahuje jméno počítače a port, na kterém běží Kafka broker
        KafkaConnectionString = "localhost:9092"
 
        // KafkaTopic obsahuje jméno tématu
        KafkaTopic = "test-topic"
)
 
func main() {
        // konstrukce konzumenta
        consumer, err := sarama.NewConsumer([]string{KafkaConnectionString}, nil)
 
        // kontrola chyby při připojování ke Kafce
        if err != nil {
                log.Fatal(err)
        }
 
        log.Printf("Connected to %s", KafkaConnectionString)
 
        // zajištění uzavření připojení ke Kafce
        defer func() {
                if err := consumer.Close(); err != nil {
                        log.Fatal(err)
                }
        }()
 
        // přihlášení ke zvolenému tématu
        partitionConsumer, err := consumer.ConsumePartition(KafkaTopic, 0, sarama.OffsetNewest)
        if err != nil {
                log.Fatal(err)
        }
 
        // zajištění ukončení přihlášení ke zvolenému tématu
        defer func() {
                if err := partitionConsumer.Close(); err != nil {
                        log.Fatal(err)
                }
        }()
 
        // postupné čtení zpráv, které byly do zvoleného tématu publikovány
        consumed := 0
        for {
                msg := <-partitionConsumer.Messages()
                // vypíšeme pouze offset zprávy, její klíč a tělo (value, payload)
                log.Printf("Consumed message offset %d: %s:%s", msg.Offset, msg.Key, msg.Value)
                consumed++
        }
 
        // výpis počtu zpracovaných zpráv (ovšem sem se stejně nedostaneme :-)
        log.Printf("Consumed: %d", consumed)
        log.Print("Done")
}

Existují i mnohé další možnosti poskytované knihovnou Sarama, například výpis informací o tématech atd.:

// Ukázka použití rozhraní pro systém Apache Kafka představovaného knihovnou
// Sarama: výpis informací o tématech.
 
package main
 
import (
        "log"
 
        "github.com/Shopify/sarama"
)
 
const (
        // KafkaConnectionString obsahuje jméno počítače a port, na kterém běží Kafka broker
        KafkaConnectionString = "localhost:9092"
 
        // KafkaTopic obsahuje jméno tématu
        KafkaTopic = "test-topic"
)
 
func main() {
        // konstrukce rozhraní k brokerovi
        broker := sarama.NewBroker(KafkaConnectionString)
 
        // kontrola chyby při připojování k brokerovi
        err := broker.Open(nil)
        if err != nil {
                log.Fatal(err)
        }
 
        log.Printf("Connected to %s", KafkaConnectionString)
 
        request := sarama.MetadataRequest{Topics: []string{KafkaTopic}}
        response, err := broker.GetMetadata(&request)
        if err != nil {
                _ = broker.Close()
                log.Fatal(err)
        }
 
        if len(response.Topics) == 1 {
                log.Print("There is one topic active in the cluster.")
        } else {
                log.Print("There are", len(response.Topics), "topics active in the cluster.")
        }
 
        if err = broker.Close(); err != nil {
                log.Fatal(err)
        }
 
        log.Print("Done")
}

11. Modulární podoba aplikace představující producenta zpráv

U reálných aplikací je možné knihovnu Sarama „obalit“ vlastním rozhraním, které bude předepisovat pouze ty metody, které jsou skutečně v dané aplikaci zapotřebí. Následující demonstrační příklad je do určité míry založen na reálné aplikaci. Začneme definicí datového typu představujícího minimální užitečnou konfiguraci brokera:

package main
 
// BrokerConfiguration represents configuration of Kafka brokers and topics
type BrokerConfiguration struct {
        Address string `mapstructure:"address" toml:"address"`
        Topic   string `mapstructure:"topic"   toml:"topic"`
}

Dále je deklarováno rozhraní pro všechny implementace brokera. Jsou předepsány pouze ty metody, které jsou upotřebitelné v aplikaci:

// Producer represents any producer
type Producer interface {
        ProduceMessage(message Message) (int32, int64, error)
        Close() error
}

Následuje konkrétní implementace tohoto rozhraní, včetně konstruktoru NewKafkaProducer:

import (
        "encoding/json"
 
        "github.com/Shopify/sarama"
        "github.com/rs/zerolog/log"
)

// KafkaProducer is an implementation of Producer interface
type KafkaProducer struct {
        Configuration BrokerConfiguration
        Producer      sarama.SyncProducer
}

// NewKafkaProducer constructs new implementation of Producer interface
func NewKafkaProducer(brokerConfiguration BrokerConfiguration) (*KafkaProducer, error) {
        producer, err := sarama.NewSyncProducer([]string{brokerConfiguration.Address}, nil)
        if err != nil {
                log.Error().Err(err).Msg("unable to create a new Kafka producer")
                return nil, err
        }
 
        return &KafkaProducer{
                Configuration: brokerConfiguration,
                Producer:      producer,
        }, nil
}
 
// ProduceMessage produces message to selected topic. That function returns
// partition ID and offset of new message or an error value in case of any
// problem on broker side.
func (producer *KafkaProducer) ProduceMessage(message Message) (partitionID int32, offset int64, err error) {
        jsonBytes, err := json.Marshal(message)
 
        if err != nil {
                log.Error().Err(err).Msg("Couldn't turn notification message into valid JSON")
                return -1, -1, err
        }
 
        // construct message to be produced using the provided payload (message body)
        producerMsg := &sarama.ProducerMessage{
                Topic: producer.Configuration.Topic,
                Value: sarama.ByteEncoder(jsonBytes),
        }
 
        // try to produce message
        partitionID, offset, err = producer.Producer.SendMessage(producerMsg)
        if err != nil {
                log.Error().Err(err).Msg("failed to produce message to Kafka")
        } else {
                log.Info().Msgf("message sent to partition %d at offset %d\n", partitionID, offset)
        }
        return
}
 
// Close allow the Sarama producer to be gracefully closed
func (producer *KafkaProducer) Close() error {
        if err := producer.Producer.Close(); err != nil {
                log.Error().Err(err).Msg("unable to close Kafka producer")
                return err
        }
 
        return nil
}

Výsledná struktura typů:

Obrázek 1: Datové struktury a rozhraní deklarované v předchozím kódu.

12. Využití nového rozhraní i jeho implementace

Rozhraní Producer a jeho implementaci KafkaProducer můžeme velmi snadno použít v aplikaci, která do brokera pošle zprávu se třemi atributy uloženými ve formátu JSON. Samotný kód je přímočarý (i když, jak je v jazyce Go zvykem, „prošpikovaný“ kontrolami chyb):

package main
 
import (
        "os"
 
        "github.com/rs/zerolog"
        "github.com/rs/zerolog/log"
)
 
type Message struct {
        ID      int    `json:"id"`
        Name    string `json:"name"`
        Surname string `json:"surname"`
}
 
func main() {
        log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
        log.Info().Msg("Started")
 
        brokerConfiguration := BrokerConfiguration{
                Address: "localhost:9092",
                Topic:   "test-topic2",
        }
 
        producer, err := NewKafkaProducer(brokerConfiguration)
        if err != nil {
                log.Error().Err(err).Msg("Can not connect to Kafka")
                return
        }
 
        log.Info().Str("address", brokerConfiguration.Address).Msg("Connected to Kafka")
 
        defer producer.Close()
 
        message := Message{
                ID:      42,
                Name:    "Václav",
                Surname: "Trachta",
        }
 
        _, _, err = producer.ProduceMessage(message)
        if err != nil {
                log.Error().Err(err).Msg("Unable to produce message")
                return
        }
 
        log.Info().Msg("Finished")
}

13. Modulární podoba aplikace představující konzumenta zpráv

Stejným způsobem, jako producenta zpráv, můžeme vytvořit „obalující“ rozhraní i pro konzumenta. Ovšem zde se ukazují některá úskalí nízkoúrovňového přístupu knihovny Sarama. Opět začneme definicí typu brokera, nyní s přidanou informací o skupině:

// BrokerConfiguration represents configuration of Kafka brokers and topics
type BrokerConfiguration struct {
        Address string `mapstructure:"address" toml:"address"`
        Topic   string `mapstructure:"topic"   toml:"topic"`
        Group   string `mapstructure:"group"   toml:"group"`
}

Dále definujeme rozhraní představující konzumenta zpráv:

// Consumer represents any consumer
type Consumer interface {
        Serve()
        ProcessMessage(msg *sarama.ConsumerMessage) error
        Close() error
}

Implementace konzumenta je relativně složitá, a to z toho důvodu, že reálný kód musí být schopen se znovu připojit ke Kafce i po rebalancingu (které může nastat v závislosti na nastavené konfiguraci). Následující kód byl vytvořen po několika neblahých zkušenostech s reálným nasazením aplikací:

import (
        "context"
        "encoding/json"

        "github.com/Shopify/sarama"
        "github.com/rs/zerolog/log"
)
 
// KafkaConsumer in an implementation of Consumer interface
type KafkaConsumer struct {
        Configuration                        BrokerConfiguration
        ConsumerGroup                        sarama.ConsumerGroup
        numberOfSuccessfullyConsumedMessages uint64
        numberOfErrorsConsumingMessages      uint64
        ready                                chan bool
        cancel                               context.CancelFunc
}
 
// DefaultSaramaConfig is a config which will be used by default
// here you can use specific version of a protocol for example
// useful for testing
var DefaultSaramaConfig *sarama.Config
 
// NewConsumer constructs new implementation of Consumer interface
func NewKafkaConsumer(brokerCfg BrokerConfiguration) (*KafkaConsumer, error) {
        return NewWithSaramaConfig(brokerCfg, DefaultSaramaConfig)
}
 
// NewWithSaramaConfig constructs new implementation of Consumer interface with custom sarama config
func NewWithSaramaConfig(
        brokerConfiguration BrokerConfiguration,
        saramaConfig *sarama.Config,
) (*KafkaConsumer, error) {
        if saramaConfig == nil {
                saramaConfig = sarama.NewConfig()
                saramaConfig.Version = sarama.V0_10_2_0
 
                /* TODO: we need to do it in production code
                if brokerCfg.Timeout > 0 {
                        saramaConfig.Net.DialTimeout = brokerCfg.Timeout
                        saramaConfig.Net.ReadTimeout = brokerCfg.Timeout
                        saramaConfig.Net.WriteTimeout = brokerCfg.Timeout
                }
                */
        }
 
        consumerGroup, err := sarama.NewConsumerGroup([]string{brokerConfiguration.Address}, brokerConfiguration.Group, saramaConfig)
        if err != nil {
                return nil, err
        }
 
        consumer := &KafkaConsumer{
                Configuration:                        brokerConfiguration,
                ConsumerGroup:                        consumerGroup,
                numberOfSuccessfullyConsumedMessages: 0,
                numberOfErrorsConsumingMessages:      0,
                ready:                                make(chan bool),
        }
 
        return consumer, nil
}
 
// Serve starts listening for messages and processing them. It blocks current thread.
func (consumer *KafkaConsumer) Serve() {
        ctx, cancel := context.WithCancel(context.Background())
        consumer.cancel = cancel
 
        go func() {
                for {
                        // `Consume` should be called inside an infinite loop, when a
                        // server-side rebalance happens, the consumer session will need to be
                        // recreated to get the new claims
                        if err := consumer.ConsumerGroup.Consume(ctx, []string{consumer.Configuration.Topic}, consumer); err != nil {
                                log.Fatal().Err(err).Msg("Unable to recreate Kafka session")
                        }
 
                        // check if context was cancelled, signaling that the consumer should stop
                        if ctx.Err() != nil {
                                log.Info().Err(ctx.Err()).Msg("Stopping consumer")
                                return
                        }
 
                        log.Info().Msg("Created new kafka session")
 
                        consumer.ready = make(chan bool)
                }
        }()
 
        // Await till the consumer has been set up
        log.Info().Msg("Waiting for consumer to become ready")
        <-consumer.ready
        log.Info().Msg("Finished waiting for consumer to become ready")
 
        // Actual processing is done in goroutine created by sarama (see ConsumeClaim below)
        log.Info().Msg("Started serving consumer")
        <-ctx.Done()
        log.Info().Msg("Context cancelled, exiting")
 
        cancel()
}
 
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error {
        log.Info().Msg("New session has been setup")
        // Mark the consumer as ready
        close(consumer.ready)
        return nil
}
 
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error {
        log.Info().Msg("New session has been finished")
        return nil
}
 
// ConsumeClaim starts a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
        log.Info().
                Int64("offset", claim.InitialOffset()).
                Msg("Starting messages loop")
 
        for message := range claim.Messages() {
                consumer.HandleMessage(message)
 
                session.MarkMessage(message, "")
        }
 
        return nil
}
 
// Close method closes all resources used by consumer
func (consumer *KafkaConsumer) Close() error {
        if consumer.cancel != nil {
                consumer.cancel()
        }
 
        if consumer.ConsumerGroup != nil {
                if err := consumer.ConsumerGroup.Close(); err != nil {
                        log.Error().
                                Err(err).
                                Msg("Unable to close consumer group")
                }
        }
 
        return nil
}
 
// GetNumberOfSuccessfullyConsumedMessages returns number of consumed messages
// since creating KafkaConsumer obj
func (consumer *KafkaConsumer) GetNumberOfSuccessfullyConsumedMessages() uint64 {
        return consumer.numberOfSuccessfullyConsumedMessages
}
 
// GetNumberOfErrorsConsumingMessages returns number of errors during consuming messages
// since creating KafkaConsumer obj
func (consumer *KafkaConsumer) GetNumberOfErrorsConsumingMessages() uint64 {
        return consumer.numberOfErrorsConsumingMessages
}
 
// HandleMessage handles the message and does all logging, metrics, etc
func (consumer *KafkaConsumer) HandleMessage(msg *sarama.ConsumerMessage) {
        log.Info().
                Int64("offset", msg.Offset).
                Int32("partition", msg.Partition).
                Str("topic", msg.Topic).
                Msg("Started processing message")
 
        err := consumer.ProcessMessage(msg)
 
        log.Info().
                Int64("offset", msg.Offset).
                Int32("partition", msg.Partition).
                Str("topic", msg.Topic).
                Msg("Finished processing message")
 
        // Something went wrong while processing the message.
        if err != nil {
                log.Error().
                        Err(err).
                        Msg("Error processing message consumed from Kafka")
                consumer.numberOfErrorsConsumingMessages++
        } else {
                // The message was processed successfully.
                consumer.numberOfSuccessfullyConsumedMessages++
        }
}
 
// ProcessMessage processes an incoming message
func (consumer *KafkaConsumer) ProcessMessage(msg *sarama.ConsumerMessage) error {
        log.Info().
                Str("key", string(msg.Key)).
                Str("value", string(msg.Value)).
                Msg("Consumed")
 
        var deserialized Message
 
        err := json.Unmarshal(msg.Value, &deserialized)
        if err != nil {
                return err
        }
 
        log.Info().Int("ID", deserialized.ID).
                Str("Name", deserialized.Name).
                Str("Surname", deserialized.Surname).
                Msg("Deserialized message")

        return nil
}
Poznámka: jak je z kódu patrné, akceptují se zprávy ve formátu produkovaného předchozím demonstračním příkladem s implementací producenta zpráv.

Výsledná struktura typů:

Obrázek 2: Datové struktury a rozhraní deklarované v předchozím kódu.

14. Využití nového rozhraní i jeho implementace

I když byla realizace rozhraní KafkaConsumer relativně složitá, samotné jeho použití je snadné a přímočaré, jak je ostatně patrné z následující ukázky zdrojového kódu:

package main
 
import (
        "os"

        "github.com/rs/zerolog"
        "github.com/rs/zerolog/log"
)
 
type Message struct {
        ID      int    `json:"id"`
        Name    string `json:"name"`
        Surname string `json:"surname"`
}
 
func main() {
        log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
        log.Info().Msg("Started")
 
        brokerConfiguration := BrokerConfiguration{
                Address: "localhost:9092",
                Topic:   "test-topic2",
                Group:   "grpX",
        }
 
        consumer, err := NewKafkaConsumer(brokerConfiguration)
        if err != nil {
                log.Error().Err(err).Msg("Can not connect to Kafka")
                return
        }
 
        log.Info().Str("address", brokerConfiguration.Address).Msg("Connected to Kafka")
 
        defer consumer.Close()
 
        consumer.Serve()
 
        log.Info().Msg("Finished")
}

15. Sarama/mocks a jednotkové testy

Knihovna Sarama poskytuje aplikacím poměrně snadno použitelné rozhraní k volání funkcí systému Apache Kafka přes komunikační protokol Kafky. Ovšem v případě, že se mají vytvořit jednotkové testy pro ty funkce a metody, které volají funkce z knihovny Sarama, je nutné skutečné volání Apache Kafky nahradit vhodnými „mocky“. Samozřejmě nám nic nebrání v použití nějaké obecné knihovny pro tvorbu mocků v programovacím jazyce Go, ale knihovna Sarama nabízí vývojářům již připravené mocky, které lze v jednotkových testech inicializovat a přímo použít. Tyto mocky jsou dostupné v balíčku sarama/mocks. Nabízeny jsou tyto datové typy, jejichž konstrukcí vzniknout objekty:

# Datový typ Konstruktor
1 Consumer func NewConsumer(t ErrorReporter, config *sarama.Config) *Consumer
2 PartitionConsumer func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
3 AsyncProducer func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer
4 SyncProducer func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer

V navazující kapitole jsou vypsány všechny metody implementované těmito užitečnými datovými typy.

16. Mock objekty dostupné v knihovně Sarama/mocks

Metody implementované v typu Consumer

# Metoda
1 func (c *Consumer) Close() error
2 func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)
3 func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset int64) *PartitionConsumer
4 func (c *Consumer) HighWaterMarks() map[string]map[int32]int64
5 func (c *Consumer) Partitions(topic string) ([]int32, error)
6 func (c *Consumer) SetTopicMetadata(metadata map[string][]int32)
7 func (c *Consumer) Topics() ([]string, error)
Poznámka: povšimněte si, že se jedná o rozšíření původního datového typu se stejným jménem Consumer o metodu ExpectConsumePartition sloužící pro registraci tématu a oddílu společně se specifikací, jaké operace jsou očekávány (tudíž se jedná o typickou metodu používanou jednotkovými testy).

Metody implementované v typu PartitionConsumer

# Metoda
1 func (pc *PartitionConsumer) AsyncClose()
2 func (pc *PartitionConsumer) Close() error
3 func (pc *PartitionConsumer) Errors() ← chan *sarama.ConsumerError
4 func (pc *PartitionConsumer) ExpectErrorsDrainedOnClose()
5 func (pc *PartitionConsumer) ExpectMessagesDrainedOnClose()
6 func (pc *PartitionConsumer) HighWaterMarkOffset() int64
7 func (pc *PartitionConsumer) Messages() ← chan *sarama.ConsumerMessage
8 func (pc *PartitionConsumer) YieldError(err error)
9 func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage)
Poznámka: tento typ, resp. přesněji řečeno objekt tohoto typu, je získán výše zmíněnou metodou ExpectConsumePartition datového typu Consumer.

Metody implementované v typu AsyncProducer

# Metoda
1 func (mp *AsyncProducer) AsyncClose()
2 func (mp *AsyncProducer) Close() error
3 func (mp *AsyncProducer) Errors() ← chan *sarama.ProducerError
4 func (mp *AsyncProducer) ExpectInputAndFail(err error)
5 func (mp *AsyncProducer) ExpectInputAndSucceed()
6 func (mp *AsyncProducer) ExpectInputWithCheckerFun­ctionAndFail(cf ValueChecker, err error)
7 func (mp *AsyncProducer) ExpectInputWithCheckerFun­ctionAndSucceed(cf ValueChecker)
8 func (mp *AsyncProducer) Input() chan<- *sarama.ProducerMessage
9 func (mp *AsyncProducer) Successes() ← chan *sarama.ProducerMessage
Poznámka: jedná se o rozšíření původního datového typu AsyncProducer doplněného o metody Expect* používané v jednotkových testech.

Metody implementované v typu SyncProducer

# Metoda
1 func (sp *SyncProducer) Close() error
2 func (sp *SyncProducer) ExpectSendMessageAndFail(err error)
3 func (sp *SyncProducer) ExpectSendMessageAndSucceed()
4 func (sp *SyncProducer) ExpectSendMessageWithChec­kerFunctionAndFail(cf ValueChecker, err error)
5 func (sp *SyncProducer) ExpectSendMessageWithChec­kerFunctionAndSucceed(cf ValueChecker)
6 func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
7 func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error
Poznámka: i zde se jedná o rozšíření původního datového typu SyncProducer doplněného o metody Expect* používané v jednotkových testech.

17. Ukázka konstrukce jednotkového testu

Podívejme se nyní, jak lze mock objekty využít při tvorbě jednotkových testů. Zaměříme se na objekt typu SyncProducer, jehož „nemockovanou“ variantu jsme použili v předchozích demonstračních příkladech. Vzhledem k tomu, že tento objekt použijeme v jednotkových testech, deklarujme novou funkci začínající jménem Test a akceptující jediný parametr typu*testing.T:

func TestSyncProducer(t *testing.T) {

V testu nejdříve vytvoříme instanci mockované varianty objektu typu SyncProducer a zajistíme jeho uzavření na konci funkce s definicí jednotkového testu:

syncProducer := NewSyncProducer(t, nil)
 
defer func() {
        err := syncProducer.Close()
        if err != nil {
                t.Error(err)
        }
}()

Nyní nastává nejzajímavější část jednotkového testu – určíme totiž, jak přesně se má mockovaný objekt typu SyncProducer chovat a jaké chování kódu má naopak očekávat. Budeme specifikovat, že mockovaný objekt očekává, že mu bude poslána jedna zpráva (metodou SendMessage a že tato operace bude úspěšná – mockovaný objekt se tedy bude chovat tak, jakoby k poslání zprávy skutečně došlo:

syncProducer.ExpectSendMessageAndSucceed()

Dále vytvoříme zprávu, která se má poslat:

message := &sarama.ProducerMessage{Topic: "test-topic", Value: string("test")}

Zprávu pošleme s kontrolou výsledku provedené operace:

_, offset, err := syncProducer.SendMessage(msg)
if err != nil {
        t.Errorf("Message produce error %s", err)
}
if offset != 1 || offset != message.Offset {
        t.Errorf("Wrong offset %d", msg.Offset)
}

Taktéž můžeme specifikovat, že mockovaný objekt bude očekávat poslání zprávy a posléze nasimuluje chybu při odesílání, resp. přesněji při komunikaci s Kafkou (o jakou chybu se jedná můžeme sami vybrat):

syncProducer.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)

Nyní budeme kontrolovat, jaká chyba byla vrácena:

MIF temata

_, _, err = syncProducer.SendMessage(msg)
if err != sarama.ErrOutOfBrokers {
        t.Errorf("Message produce error %s is different than expected error", err)
}
 
err := syncProducer.Close()
if err != nil {
        t.Error(err)
}
Poznámka: otestování, zda opravdu došlo k odeslání zprávy (tedy zda testovaný kód provádí to, co se od něj očekává) se provede automaticky při zavolání metody syncProducer.Close().

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes použitých demonstračních příkladů byly uloženy do nového Git repositáře, který je dostupný na adrese https://github.com/tisnik/go-root (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má přibližně stovku kilobajtů), můžete namísto toho použít odkazy na jednotlivé demonstrační příklady, které naleznete v následující tabulce:

# Příklad/soubor Stručný popis Cesta
1 confluent_kafka_producer.go jednoduchý producent zpráv založený na knihovně confluent-kafka-go https://github.com/tisnik/go-root/blob/master/article75/con­fluent_kafka_producer.go
2 confluent_kafka_consumer.go jednoduchý konzument zpráv založený na knihovně confluent-kafka-go https://github.com/tisnik/go-root/blob/master/article75/con­fluent_kafka_consumer.go
       
3 sarama-consumer jednoduchý konzument zpráv založený na knihovně Sarama https://github.com/tisnik/go-root/blob/master/article75/sarama-consumer
4 sarama-producer jednoduchý producent zpráv založený na knihovně Sarama https://github.com/tisnik/go-root/blob/master/article75/sarama-producer
5 sarama-list-topics výpis informací o zvoleném tématu https://github.com/tisnik/go-root/blob/master/article75/sarama-list-topics
       
6 sarama-consumer-2 vylepšený konzument zpráv založený na knihovně Sarama https://github.com/tisnik/go-root/blob/master/article75/sarama-consumer-2
7 sarama-producer-2 vylepšený producent zpráv založený na knihovně Sarama https://github.com/tisnik/go-root/blob/master/article75/sarama-producer-2

19. Odkazy na relevantní články na Rootu

V dnešním článku jsme se zabývali dvojicí témat – využitím systému Apache Kafka v programovacím jazyku Go pro tvorbu producentů a konzumentů zpráv (prozatím bez Kafka Streams) a taktéž tvorbou jednotkových testů pro ty jednotky aplikace, které přímo s Apache Kafkou komunikují, tedy většinou s producenty a konzumenty zpráv. Jak Apache Kafkou, tak i problematikou tvorby jednotkových testů pro „okrajové“ části aplikací, jsme se již na stránkách Roota zabývali v těchto článcích:

  1. Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách
    https://www.root.cz/clanky/pouziti-nastroje-apache-kafka-v-aplikacich-zalozenych-na-mikrosluzbach/
  2. Apache Kafka: distribuovaná streamovací platforma
    https://www.root.cz/clanky/apache-kafka-distribuovana-streamovaci-platforma/
  3. Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw
    https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw/
  4. Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw (2. část)
    https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw-2-cast/
  5. Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony)
    https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-projektu-apache-kafka-jazyku-clojure-a-knihovne-jackdaw-streamy-a-kolony/
  6. Knihovny určené pro tvorbu testů v programovacím jazyce Go
    https://www.root.cz/clanky/knihovny-urcene-pro-tvorbu-testu-v-programovacim-jazyce-go/
  7. Testování aplikací naprogramovaných v jazyce Go
    https://www.root.cz/clanky/testovani-aplikaci-naprogramovanych-v-jazyce-go/
  8. Pomůcky při tvorbě jednotkových testů v jazyce Go
    https://www.root.cz/clanky/pomucky-pri-tvorbe-jednotkovych-testu-v-jazyce-go/
  9. Jazyk Go prakticky: jednotkové testy kódu, který přistupuje k SQL databázím
    https://www.root.cz/clanky/jazyk-go-prakticky-jednotkove-testy-kodu-ktery-pristupuje-k-sql-databazim/
  10. Jazyk Go prakticky: jednotkové testy kódu, který přistupuje k SQL databázím (dokončení)
    https://www.root.cz/clanky/jazyk-go-prakticky-jednotkove-testy-kodu-ktery-pristupuje-k-sql-databazim-dokonceni/

20. Odkazy na Internetu

  1. Confluent's Golang Client for Apache Kafka
    https://github.com/conflu­entinc/confluent-kafka-go
  2. Confluent: Kafka Go Client
    https://docs.confluent.io/clients-confluent-kafka-go/current/overview.html
  3. sarama: an MIT-licensed Go client library for Apache Kafka version 0.8 (and later)
    https://github.com/Shopify/sarama
  4. Sarama FAQ
    https://github.com/Shopify/sa­rama/wiki/Frequently-Asked-Questions
  5. Awesome Go
    https://github.com/avelino/awesome-go
  6. Real-Time Payments with Clojure and Apache Kafka (podcast)
    https://www.evidentsystem­s.com/news/confluent-podcast-about-apache-kafka/
  7. Kafka.clj
    https://github.com/helins-io/kafka.clj
  8. Microservices: The Rise Of Kafka
    https://movio.co/blog/microservices-rise-kafka/
  9. Building a Microservices Ecosystem with Kafka Streams and KSQL
    https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
  10. An introduction to Apache Kafka and microservices communication
    https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63
  11. kappa-architecture.com
    http://milinda.pathirage.org/kappa-architecture.com/
  12. Questioning the Lambda Architecture
    https://www.oreilly.com/i­deas/questioning-the-lambda-architecture
  13. Lambda architecture
    https://en.wikipedia.org/wi­ki/Lambda_architecture
  14. Kafka – ecosystem (Wiki)
    https://cwiki.apache.org/con­fluence/display/KAFKA/Eco­system
  15. The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
    http://cloudurable.com/blog/kafka-ecosystem/index.html
  16. A Kafka Operator for Kubernetes
    https://github.com/krallistic/kafka-operator
  17. Kafka Streams
    https://cwiki.apache.org/con­fluence/display/KAFKA/Kaf­ka+Streams
  18. Kafka Streams
    http://kafka.apache.org/do­cumentation/streams/
  19. Kafka Streams (FAQ)
    https://cwiki.apache.org/con­fluence/display/KAFKA/FAQ#FAQ-Streams
  20. Event stream processing
    https://en.wikipedia.org/wi­ki/Event_stream_processing
  21. Part 1: Apache Kafka for beginners – What is Apache Kafka?
    https://www.cloudkarafka.com/blog/2016–11–30-part1-kafka-for-beginners-what-is-apache-kafka.html
  22. What are some alternatives to Apache Kafka?
    https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka
  23. What is the best alternative to Kafka?
    https://www.slant.co/opti­ons/961/alternatives/~kaf­ka-alternatives
  24. A super quick comparison between Kafka and Message Queues
    https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0
  25. Kafka Queuing: Kafka as a Messaging System
    https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system