1. Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw

S užitečným a populárním nástrojem Apache Kafka jsme se již na stránkách Rootu několikrát setkali, a to například v článcích Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách a Apache Kafka: distribuovaná streamovací platforma. Pro tuto asi nejznámější streamovací platformu existují knihovny realizující rozhraní pro různé programovací jazyky a jejich ekosystémy. Tato rozhraní jsou vypsána v následující tabulce:

Dnes se budeme zabývat rozhraním určeným pro programovací jazyk Clojure. Toto rozhraní se jmenuje Jackdaw a umožňuje „funkcionální“ přístup k datům (zprávám, událostem) uloženým v Kafce. Využít je například možné takzvané transducery atd.

Obrázek 1: Logo nástroje Apache Kafka, kterému se budeme dnes věnovat.

Nejprve si popíšeme funkce a makra deklarovaná ve jmenných prostorech jackdaw.admin (administrace Kafky) a jackdaw.client (pro implementaci producentů a konzumentů zpráv). V navazujícím článku si ukážeme i použití již zmíněných transducerů, specifikaci skutečného streamu atd. – tedy důvodů, proč vůbec o použití kombinace Clojure a Apache Kafka uvažovat (a právě specifikace streamů je v Clojure jak přímočará, tak i velmi nasaditelná).

2. Instalace a spuštění Kafky

V případě, že je na počítači nainstalováno JRE (Java Runtime Environment), je instalace Kafky pro testovací účely triviální. Tarball s instalací poslední stabilní verze Kafky lze získat z adresy https://www.apache.org/dyn/clo­ser.cgi?path=/kafka/2.6.0/kaf­ka 2 .13–2.6.0.tgz. Stažení a rozbalení tarballu se provede následovně:

$ wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz $ tar -xzf kafka_2.13-2.6.0.tgz $ cd kafka_2.13-2.6.0

Po rozbalení získáme adresář, v němž se nachází všechny potřebné Java archivy (JAR), konfigurační soubory (v podadresáři config) a několik pomocných skriptů (v podadresáři bin). Pro spuštění Zookeepera a brokerů je zapotřebí mít nainstalovánu již zmíněnou JRE (Java Runtime Environment) a samozřejmě též nějaký shell (BASH, cmd, …).

Prozatím nebudeme žádné další nastavení ani žádné další nástroje potřebovat, ovšem v navazujícím článku se zmíníme o některých užitečných utilitách určených pro administraci a sledování (monitorování) Kafky.

Obrázek 2: Sledování činnosti brokeru přes standardní nástroj JConsole.

Po (doufejme že úspěšné) instalaci Kafky již můžeme spustit ZooKeeper a jednu instanci brokera (a to přesně v tomto pořadí, tedy nejdřívě ZooKeeper!). Konfigurace ZooKeepera je uložena v souboru config/zookeeper.properties a zajímat nás budou především tyto volby – adresář, kam ZooKeeper ukládá svoje data, port, který použijí brokeři a omezení počtu připojení jednoho klienta v daný okamžik:

dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0

maxClientCnxns v tomto případě neznamená, že by se nemohly připojit žádní klienti, ale že je vypnutý mechanismus, který zabezpečuje infrastrukturu Kafky před některými typy DOS útoků. Na disku, kde je adresář dataDir by také mělo být dostatek místa, protože ZooKeeper v některých případech mívá větší nároky. Další informace lze nalézt na stránce Poznámka: hodnotav tomto případě neznamená, že by se nemohly připojit žádní klienti, ale že je vypnutý mechanismus, který zabezpečuje infrastrukturu Kafky před některými typy DOS útoků. Na disku, kde je adresářby také mělo být dostatek místa, protože ZooKeeper v některých případech mívá větší nároky. Další informace lze nalézt na stránce https://zookeeper.apache.or­g/doc/r3.5.6/

Nyní již můžeme Zookeepera spustit:

$ bin/zookeeper-server-start.sh config/zookeeper.properties [2020-01-20 17:00:07,823] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2020-01-20 17:00:07,825] WARN config/zookeeper.properties is relative. Prepend ./ to indicate that you're sure! (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2020-01-20 17:00:07,827] INFO clientPortAddress is 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ... ... ... [2020-01-20 17:00:07,947] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager) [2020-01-20 17:00:26,978] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)

Konfigurace jednoho brokera je uložená v souboru config/server.properties. Samotný konfigurační soubor obsahuje několik sekcí:

Port, na kterém broker naslouchá, jeho ID, počet použitých vláken pro IO operace a počet vláken pro komunikaci. Velikost bufferů, maximální povolená velikost požadavků (což omezuje velikost zprávy) atd. Nastavení počtu partitions Nastavení retence dat Připojení k Zookeeperovi

Pro testovací účely změníme především umístění souborů s logy (tedy s jednotlivými partitions):

broker.id=0 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000

Poznámka: i velikost adresáře log.dirs roste, a to mnohdy velmi rychle (typicky se alokuje 20MB pro každou partition), takže se vyplatí sledovat příslušné metriky.

Spuštění jednoho brokera vypadá jednoduše:

$ bin/kafka-server-start.sh config/server.properties

Alternativně je možné Zookeepera i Kafku (jednu instanci brokera) spustit v Dockeru:

$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka

Poznámka: předchozí nastavení předpokládá, že současně na stejném stroji nepoběží žádná další instance Kafky ani Zookeepera. Pokud budete potřebovat spustit větší množství brokerů, je nutné minimálně změnit mapování portů (přepínače -p).

Po spuštění jak Zookepera, tak i brokera Kafky se podívejte na obsah adresáře /tmp/kafka-logs/, který byl nakonfigurován v rámci předchozího textu. Prozatím by se v tomto adresáři mělo vyskytovat pouze několik souborů, ovšem žádné adresáře (každý adresář odpovídá jedné partition určeného topicu, popř. právě smazané partition – to uvidíme v dalším textu):

$ ls -l /tmp/kafka-logs/ total 4 -rw-rw-r--. 1 ptisnovs ptisnovs 0 Nov 27 14:39 cleaner-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 0 Nov 27 14:39 log-start-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 88 Nov 27 14:39 meta.properties -rw-rw-r--. 1 ptisnovs ptisnovs 0 Nov 27 14:39 recovery-point-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 0 Nov 27 14:39 replication-offset-checkpoint

3. Stažení všech knihoven potřebných pro volání funkcí Apache Kafky z jazyka Clojure

Nyní je zapotřebí získat všechny knihovny, které jsou přímo či nepřímo používány pro volání funkcí Apache Kafky z programovacího jazyka Clojure. Využijeme přitom nástroj Leiningen, který je úzce navázán na Maven. Nejprve vytvoříme kostru nového projektu příkazem:

$ lein new app topic-constructor Generating a project called topic-constructor based on the 'app' template.

V nově vytvořeném adresáři topic-constuctor se nachází soubor project.clj, jenž mj. obsahuje i seznam závislostí:

(defproject topic-constructor "0.1.0-SNAPSHOT" :description "FIXME: write description" :url "http://example.com/FIXME" :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0" :url "https://www.eclipse.org/legal/epl-2.0/"} :dependencies [[org.clojure/clojure "1.10.1"]] :main ^:skip-aot topic-constructor.core :target-path "target/%s" :profiles {:uberjar {:aot :all :jvm-opts ["-Dclojure.compiler.direct-linking=true"]}})

Tento soubor upravíme do následující podoby – přidáme závislost na knihovně fundingcircle/jackdaw:

(defproject topic-constructor "0.1.0-SNAPSHOT" :description "FIXME: write description" :url "http://example.com/FIXME" :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0" :url "https://www.eclipse.org/legal/epl-2.0/"} :dependencies [[org.clojure/clojure "1.10.1"] [fundingcircle/jackdaw "0.7.6"]] :main ^:skip-aot topic-constructor.core :target-path "target/%s" :profiles {:uberjar {:aot :all :jvm-opts ["-Dclojure.compiler.direct-linking=true"]}})

Příkazem lein deps se zahájí stažení všech doposud nenainstalovaných závislostí:

$ lein deps Retrieving fundingcircle/jackdaw/0.7.6/jackdaw-0.7.6.pom from clojars Retrieving aleph/aleph/0.4.6/aleph-0.4.6.pom from clojars Retrieving manifold/manifold/0.1.8/manifold-0.1.8.pom from clojars Retrieving io/aleph/dirigiste/0.1.5/dirigiste-0.1.5.pom from central Retrieving riddley/riddley/0.1.14/riddley-0.1.14.pom from clojars Retrieving byte-streams/byte-streams/0.2.4/byte-streams-0.2.4.pom from clojars Retrieving primitive-math/primitive-math/0.1.6/primitive-math-0.1.6.pom from clojars Retrieving io/netty/netty-transport/4.1.25.Final/netty-transport-4.1.25.Final.pom from central Retrieving io/netty/netty-parent/4.1.25.Final/netty-parent-4.1.25.Final.pom from central Retrieving io/netty/netty-buffer/4.1.25.Final/netty-buffer-4.1.25.Final.pom from central Retrieving io/netty/netty-common/4.1.25.Final/netty-common-4.1.25.Final.pom from central Retrieving io/netty/netty-resolver/4.1.25.Final/netty-resolver-4.1.25.Final.pom from central Retrieving io/netty/netty-transport-native-epoll/4.1.25.Final/netty-transport-native-epoll-4.1.25.Final.pom from central Retrieving io/netty/netty-transport-native-unix-common/4.1.25.Final/netty-transport-native-unix-common-4.1.25.Final.pom from central Retrieving io/netty/netty-codec/4.1.25.Final/netty-codec-4.1.25.Final.pom from central Retrieving io/netty/netty-codec-http/4.1.25.Final/netty-codec-http-4.1.25.Final.pom from central Retrieving io/netty/netty-handler/4.1.25.Final/netty-handler-4.1.25.Final.pom from central Retrieving io/netty/netty-handler-proxy/4.1.25.Final/netty-handler-proxy-4.1.25.Final.pom from central Retrieving io/netty/netty-codec-socks/4.1.25.Final/netty-codec-socks-4.1.25.Final.pom from central Retrieving io/netty/netty-resolver-dns/4.1.25.Final/netty-resolver-dns-4.1.25.Final.pom from central Retrieving io/netty/netty-codec-dns/4.1.25.Final/netty-codec-dns-4.1.25.Final.pom from central Retrieving clj-time/clj-time/0.15.1/clj-time-0.15.1.pom from clojars Retrieving joda-time/joda-time/2.10/joda-time-2.10.pom from central Retrieving danlentz/clj-uuid/0.1.9/clj-uuid-0.1.9.pom from clojars Retrieving io/confluent/kafka-schema-registry-client/5.3.1/kafka-schema-registry-client-5.3.1.pom from confluent Retrieving io/confluent/kafka-schema-registry-parent/5.3.1/kafka-schema-registry-parent-5.3.1.pom from confluent Retrieving io/confluent/rest-utils-parent/5.3.1/rest-utils-parent-5.3.1.pom from confluent Retrieving io/confluent/common/5.3.1/common-5.3.1.pom from confluent Retrieving io/confluent/common-parent/5.3.1/common-parent-5.3.1.pom from confluent Retrieving org/glassfish/jersey/jersey-bom/2.27/jersey-bom-2.27.pom from central Retrieving net/java/jvnet-parent/4/jvnet-parent-4.pom from central Retrieving org/apache/kafka/kafka-clients/5.3.1-ccs/kafka-clients-5.3.1-ccs.pom from confluent Retrieving com/github/luben/zstd-jni/1.4.0-1/zstd-jni-1.4.0-1.pom from central Retrieving org/lz4/lz4-java/1.6.0/lz4-java-1.6.0.pom from central Retrieving org/xerial/snappy/snappy-java/1.1.7.3/snappy-java-1.1.7.3.pom from central Retrieving org/slf4j/slf4j-api/1.7.26/slf4j-api-1.7.26.pom from central Retrieving org/slf4j/slf4j-parent/1.7.26/slf4j-parent-1.7.26.pom from central Retrieving io/confluent/common-config/5.3.1/common-config-5.3.1.pom from confluent Retrieving io/confluent/common-utils/5.3.1/common-utils-5.3.1.pom from confluent Retrieving org/apache/zookeeper/zookeeper/3.4.14/zookeeper-3.4.14.pom from central Retrieving org/slf4j/slf4j-parent/1.7.25/slf4j-parent-1.7.25.pom from central Retrieving com/github/spotbugs/spotbugs-annotations/3.1.9/spotbugs-annotations-3.1.9.pom from central Retrieving com/google/code/findbugs/jsr305/3.0.2/jsr305-3.0.2.pom from central Retrieving jline/jline/0.9.94/jline-0.9.94.pom from central Retrieving org/apache/yetus/audience-annotations/0.5.0/audience-annotations-0.5.0.pom from central Retrieving org/apache/yetus/yetus-project/0.5.0/yetus-project-0.5.0.pom from central Retrieving org/apache/apache/17/apache-17.pom from central Retrieving io/netty/netty/3.10.6.Final/netty-3.10.6.Final.pom from central Retrieving com/101tec/zkclient/0.10/zkclient-0.10.pom from central Retrieving org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.pom from central Retrieving org/slf4j/slf4j-parent/1.6.1/slf4j-parent-1.6.1.pom from central Retrieving org/apache/zookeeper/zookeeper/3.4.8/zookeeper-3.4.8.pom from central Retrieving io/netty/netty/3.7.0.Final/netty-3.7.0.Final.pom from central Retrieving org/apache/avro/avro/1.8.1/avro-1.8.1.pom from central Retrieving org/apache/avro/avro-parent/1.8.1/avro-parent-1.8.1.pom from central Retrieving org/apache/avro/avro-toplevel/1.8.1/avro-toplevel-1.8.1.pom from central Retrieving org/apache/apache/10/apache-10.pom from central Retrieving org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.pom from central Retrieving org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.pom from central Retrieving com/thoughtworks/paranamer/paranamer/2.7/paranamer-2.7.pom from central Retrieving com/thoughtworks/paranamer/paranamer-parent/2.7/paranamer-parent-2.7.pom from central Retrieving org/codehaus/codehaus-parent/4/codehaus-parent-4.pom from central Retrieving org/xerial/snappy/snappy-java/1.1.1.3/snappy-java-1.1.1.3.pom from central Retrieving org/apache/commons/commons-compress/1.8.1/commons-compress-1.8.1.pom from central Retrieving org/apache/commons/commons-parent/33/commons-parent-33.pom from central Retrieving org/tukaani/xz/1.5/xz-1.5.pom from central Retrieving org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.pom from central Retrieving org/slf4j/slf4j-parent/1.7.7/slf4j-parent-1.7.7.pom from central Retrieving io/confluent/kafka-avro-serializer/5.3.1/kafka-avro-serializer-5.3.1.pom from confluent Retrieving com/fasterxml/jackson/core/jackson-databind/2.9.9.3/jackson-databind-2.9.9.3.pom from central Retrieving com/fasterxml/jackson/jackson-base/2.9.9/jackson-base-2.9.9.pom from central Retrieving com/fasterxml/jackson/jackson-bom/2.9.9/jackson-bom-2.9.9.pom from central Retrieving com/fasterxml/jackson/jackson-parent/2.9.1.2/jackson-parent-2.9.1.2.pom from central Retrieving com/fasterxml/oss-parent/34/oss-parent-34.pom from central Retrieving com/fasterxml/jackson/core/jackson-annotations/2.9.0/jackson-annotations-2.9.0.pom from central Retrieving com/fasterxml/jackson/jackson-parent/2.9.0/jackson-parent-2.9.0.pom from central Retrieving com/fasterxml/oss-parent/28/oss-parent-28.pom from central Retrieving com/fasterxml/jackson/core/jackson-core/2.9.9/jackson-core-2.9.9.pom from central Retrieving org/apache/kafka/kafka-clients/2.3.1/kafka-clients-2.3.1.pom from central Retrieving org/apache/kafka/kafka-streams/2.3.1/kafka-streams-2.3.1.pom from central Retrieving org/apache/kafka/connect-json/2.3.1/connect-json-2.3.1.pom from central Retrieving org/apache/kafka/connect-api/2.3.1/connect-api-2.3.1.pom from central Retrieving com/fasterxml/jackson/core/jackson-databind/2.10.0/jackson-databind-2.10.0.pom from central Retrieving com/fasterxml/jackson/jackson-base/2.10.0/jackson-base-2.10.0.pom from central Retrieving com/fasterxml/jackson/jackson-bom/2.10.0/jackson-bom-2.10.0.pom from central Retrieving com/fasterxml/jackson/jackson-parent/2.10/jackson-parent-2.10.pom from central Retrieving com/fasterxml/oss-parent/38/oss-parent-38.pom from central Retrieving com/fasterxml/jackson/core/jackson-annotations/2.10.0/jackson-annotations-2.10.0.pom from central Retrieving com/fasterxml/jackson/core/jackson-core/2.10.0/jackson-core-2.10.0.pom from central Retrieving com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.10.0/jackson-datatype-jdk8-2.10.0.pom from central Retrieving com/fasterxml/jackson/module/jackson-modules-java8/2.10.0/jackson-modules-java8-2.10.0.pom from central Retrieving org/rocksdb/rocksdbjni/5.18.3/rocksdbjni-5.18.3.pom from central Retrieving org/clojure/data.fressian/0.2.1/data.fressian-0.2.1.pom from central Retrieving org/fressian/fressian/0.6.6/fressian-0.6.6.pom from central Retrieving org/clojure/core.cache/0.7.2/core.cache-0.7.2.pom from central Retrieving org/clojure/data.priority-map/0.0.7/data.priority-map-0.0.7.pom from central Retrieving io/aleph/dirigiste/0.1.5/dirigiste-0.1.5.jar from central Retrieving io/netty/netty-transport-native-epoll/4.1.25.Final/netty-transport-native-epoll-4.1.25.Final.jar from central Retrieving io/netty/netty-buffer/4.1.25.Final/netty-buffer-4.1.25.Final.jar from central Retrieving io/netty/netty-transport/4.1.25.Final/netty-transport-4.1.25.Final.jar from central Retrieving io/netty/netty-common/4.1.25.Final/netty-common-4.1.25.Final.jar from central Retrieving io/netty/netty-transport-native-unix-common/4.1.25.Final/netty-transport-native-unix-common-4.1.25.Final.jar from central Retrieving io/netty/netty-codec/4.1.25.Final/netty-codec-4.1.25.Final.jar from central Retrieving io/netty/netty-codec-http/4.1.25.Final/netty-codec-http-4.1.25.Final.jar from central Retrieving io/netty/netty-handler/4.1.25.Final/netty-handler-4.1.25.Final.jar from central Retrieving io/netty/netty-handler-proxy/4.1.25.Final/netty-handler-proxy-4.1.25.Final.jar from central Retrieving io/netty/netty-resolver/4.1.25.Final/netty-resolver-4.1.25.Final.jar from central Retrieving io/netty/netty-resolver-dns/4.1.25.Final/netty-resolver-dns-4.1.25.Final.jar from central Retrieving io/netty/netty-codec-socks/4.1.25.Final/netty-codec-socks-4.1.25.Final.jar from central Retrieving io/netty/netty-codec-dns/4.1.25.Final/netty-codec-dns-4.1.25.Final.jar from central Retrieving joda-time/joda-time/2.10/joda-time-2.10.jar from central Retrieving org/apache/avro/avro/1.8.1/avro-1.8.1.jar from central Retrieving org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar from central Retrieving org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar from central Retrieving com/thoughtworks/paranamer/paranamer/2.7/paranamer-2.7.jar from central Retrieving org/apache/commons/commons-compress/1.8.1/commons-compress-1.8.1.jar from central Retrieving org/tukaani/xz/1.5/xz-1.5.jar from central Retrieving org/apache/zookeeper/zookeeper/3.4.14/zookeeper-3.4.14.jar from central Retrieving com/google/code/findbugs/jsr305/3.0.2/jsr305-3.0.2.jar from central Retrieving com/github/spotbugs/spotbugs-annotations/3.1.9/spotbugs-annotations-3.1.9.jar from central Retrieving jline/jline/0.9.94/jline-0.9.94.jar from central Retrieving org/apache/yetus/audience-annotations/0.5.0/audience-annotations-0.5.0.jar from central Retrieving io/netty/netty/3.10.6.Final/netty-3.10.6.Final.jar from central Retrieving com/101tec/zkclient/0.10/zkclient-0.10.jar from central Retrieving org/lz4/lz4-java/1.6.0/lz4-java-1.6.0.jar from central Retrieving com/github/luben/zstd-jni/1.4.0-1/zstd-jni-1.4.0-1.jar from central Retrieving org/xerial/snappy/snappy-java/1.1.7.3/snappy-java-1.1.7.3.jar from central Retrieving org/apache/kafka/kafka-clients/2.3.1/kafka-clients-2.3.1.jar from central Retrieving org/slf4j/slf4j-api/1.7.26/slf4j-api-1.7.26.jar from central Retrieving org/apache/kafka/kafka-streams/2.3.1/kafka-streams-2.3.1.jar from central Retrieving org/apache/kafka/connect-json/2.3.1/connect-json-2.3.1.jar from central Retrieving org/apache/kafka/connect-api/2.3.1/connect-api-2.3.1.jar from central Retrieving com/fasterxml/jackson/core/jackson-databind/2.10.0/jackson-databind-2.10.0.jar from central Retrieving com/fasterxml/jackson/core/jackson-annotations/2.10.0/jackson-annotations-2.10.0.jar from central Retrieving com/fasterxml/jackson/core/jackson-core/2.10.0/jackson-core-2.10.0.jar from central Retrieving com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.10.0/jackson-datatype-jdk8-2.10.0.jar from central Retrieving org/rocksdb/rocksdbjni/5.18.3/rocksdbjni-5.18.3.jar from central Retrieving org/clojure/data.fressian/0.2.1/data.fressian-0.2.1.jar from central Retrieving org/fressian/fressian/0.6.6/fressian-0.6.6.jar from central Retrieving org/clojure/core.cache/0.7.2/core.cache-0.7.2.jar from central Retrieving org/clojure/data.priority-map/0.0.7/data.priority-map-0.0.7.jar from central Retrieving byte-streams/byte-streams/0.2.4/byte-streams-0.2.4.jar from clojars Retrieving manifold/manifold/0.1.8/manifold-0.1.8.jar from clojars Retrieving riddley/riddley/0.1.14/riddley-0.1.14.jar from clojars Retrieving aleph/aleph/0.4.6/aleph-0.4.6.jar from clojars Retrieving primitive-math/primitive-math/0.1.6/primitive-math-0.1.6.jar from clojars Retrieving clj-time/clj-time/0.15.1/clj-time-0.15.1.jar from clojars Retrieving danlentz/clj-uuid/0.1.9/clj-uuid-0.1.9.jar from clojars Retrieving fundingcircle/jackdaw/0.7.6/jackdaw-0.7.6.jar from clojars Retrieving io/confluent/kafka-schema-registry-client/5.3.1/kafka-schema-registry-client-5.3.1.jar from confluent Retrieving io/confluent/common-utils/5.3.1/common-utils-5.3.1.jar from confluent Retrieving io/confluent/common-config/5.3.1/common-config-5.3.1.jar from confluent Retrieving io/confluent/kafka-avro-serializer/5.3.1/kafka-avro-serializer-5.3.1.jar from confluent

Poznámka: knihovny jsou nainstalovány do podadresáře ~/.m2.

4. Interaktivní práce s Kafkou z REPLu popř. z textového editoru/IDE

Pro otestování možností nabízených kombinací Kafka + Clojure je nejlepší využít interaktivní práci v REPLu, který lze zavolat jak z textového editoru, tak i z prakticky jakéhokoli IDE s podporou jazyka Clojure. O této možnosti jsem se zmínil například v souvislosti s textovým editorem Vim v textu Plugin Vim Slime. Téměř ideální kombinace tedy vypadá tak, že se v terminálu otevře Screen popř. TMux a z adresáře, v němž je projekt https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/kafka-repl spustit interaktivní smyčku REPL:

$ lein repl nREPL server started on port 37371 on host 127.0.0.1 - nrepl://127.0.0.1:37371 REPL-y 0.4.4, nREPL 0.7.0 Clojure 1.10.1 OpenJDK 64-Bit Server VM 1.8.0_191-b12 Docs: (doc function-name-here) (find-doc "part-of-name-here") Source: (source function-name-here) Javadoc: (javadoc java-object-or-class-here) Exit: Control+D or (exit) or (quit) Results: Stored in vars *1, *2, *3, an exception in *e kafka-repl.core=>

V dalším terminálu je pak možné spustit editor se souborem https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/kafka-repl/src/kafka_repl/core.clj a klávesovou zkratkou Ctrl+C Ctrl+C postupně do REPLu posílat jednotlivé formy ze sekce comment:

(ns kafka-repl.core (:gen-class)) (defn -main [& args] (println "Hello, World!")) (comment ;; vsechny potrebne jmenne prostory (require '[jackdaw.admin :as ja]) (require '[jackdaw.client :as jc]) (require '[jackdaw.client.log :as jl]) (require '[clojure.pprint :as pp]) ;; vytvoreni noveho tematu s jednim oddilem (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (ja/create-topics! client [{:topic-name "test1" :partition-count 1 :replication-factor 1 :topic-config {"cleanup.policy" "compact"}}])) ;; vytvoreni dalsiho tematu s jednim oddilem (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (ja/create-topics! client [{:topic-name "test2" :partition-count 1 :replication-factor 1 :topic-config {"cleanup.policy" "compact"}}])) ;; vytvoreni tematu s deseti oddily (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (ja/create-topics! client [{:topic-name "test3" :partition-count 10 :replication-factor 1 :topic-config {"cleanup.policy" "compact"}}])) ;; vymazani temat test1, test2 a test3 (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (ja/delete-topics! client [{:topic-name "test1"} {:topic-name "test2"} {:topic-name "test3"}])) ;; vypis konfigurace brokera (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"}) configs (ja/describe-cluster client) id (->> configs :controller :id)] (pp/pprint (ja/get-broker-config client id))) ;; vypis konfigurace clusteru (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (pp/pprint (ja/describe-cluster client))) ;; vypis informaci o vybranych tematech (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (pp/pprint (ja/describe-topics client [{:topic-name "test1"} {:topic-name "test2"} {:topic-name "test3"}]))) ;; vypis informaci o vsech tematech (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (pp/pprint (ja/describe-topics client))) ;; vypis konfiguraci o jednom tematu (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (pp/pprint (ja/describe-topics-configs client [{:topic-name "test1"}]))) ;; vypis konfiguraci vybranych temat (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (pp/pprint (ja/describe-topics-configs client [{:topic-name "test1"} {:topic-name "test2"} {:topic-name "test3"}]))) ;; konfigurace producenta zprav (def producer-config {"bootstrap.servers" "localhost:9092" "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" "value.serializer" "org.apache.kafka.common.serialization.StringSerializer" "acks" "all" "client.id" "foo"}) ;; poslani jedine zpravy (with-open [producer (jc/producer producer-config)] (let [record-metadata (jc/produce! producer {:topic-name "test1"} "1" "Hello, Kafka!")] (pp/pprint @record-metadata))) ;; poslani 100 zprav (with-open [producer (jc/producer producer-config)] (doseq [i (range 1 101)] (let [key (str i) value (str "Message #" i)] (println "Publishing message with key '" key "' and value '" value "'") (let [record-metadata (jc/produce! producer {:topic-name "test1"} key value)] (pp/pprint @record-metadata))))) ;; konfigurace konzumenta zprav (def consumer-config {"bootstrap.servers" "localhost:9092" "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "auto.offset.reset" "earliest" "group.id" "group-A"}) #"auto.offset.reset" "none" ;; start konzumenta (with-open [consumer (-> (jc/consumer consumer-config) (jc/subscribe [{:topic-name "test1"}]))] (doseq [{:keys [key value partition timestamp offset]} (jl/log consumer 10)] (println "key: " key) (println "value: " value) (println "partition: " partition) (println "timestamp: " timestamp) (println "offset: " offset))) ) ; comment

5. Vytvoření nového tématu s jedním oddílem

Nejprve si popíšeme základy práce se jmenným prostorem jackdaw.admin, v němž jsou deklarovány funkce a makra určená pro administraci Apache Kafky. První demonstrační příklad slouží pro vytvoření tématu (topic), které má jen jediný oddíl (partition). Nejprve se zkonstruuje objekt typu AdminClient, a to specifikací portu, na kterém běží broker (viz též druhou kapitolu):

(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] ... ... ... )

Posléze se použije funkce create-topics! pro vytvoření nového tématu (či témat). Této funkci se předá objekt získaný předchozím voláním a sekvence (například seznam) popisů témat. Nejdůležitějšími atributy je jméno nového tématu a počet oddílů:

(ja/create-topics! client [{:topic-name "test1" :partition-count 1 :replication-factor 1 :topic-config {"cleanup.policy" "compact"}}])))

Poznámka: vykřičník na konci jména funkce značí, že tato funkce má vedlejší efekt (ono vytvoření nového tématu).

Úplný kód dnešního prvního plnohodnotného demonstračního příkladu vypadá následovně:

(ns topic-constructor.core (:require [jackdaw.admin :as ja])) (defn -main [] (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (ja/create-topics! client [{:topic-name "test1" :partition-count 1 :replication-factor 1 :topic-config {"cleanup.policy" "compact"}}])))

Příklad spustíme:

$ lein run SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

V logu brokera by se měly objevit informace o vytvoření nového tématu:

[2020-11-27 14:43:08,474] INFO Created log for partition test1-0 in /tmp/kafka-logs/test2-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, message.format.version -> 2.4-IV1, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager) [2020-11-27 14:41:48,973] INFO [Partition test1-0 broker=0] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition) [2020-11-27 14:41:48,974] INFO [Partition test1-0 broker=0] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition) [2020-11-27 14:41:48,974] INFO [Partition test1-0 broker=0] test1-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)

Navíc by se v adresáři /tmp/kafka-logs/ měl objevit nový podadresář nazvaný test1–0. Toto jméno je složeno ze jména tématu a čísla oddílu:

$ ls -l /tmp/kafka-logs/ total 16 -rw-rw-r--. 1 ptisnovs ptisnovs 0 Nov 27 14:39 cleaner-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 4 Nov 27 14:42 log-start-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 88 Nov 27 14:39 meta.properties -rw-rw-r--. 1 ptisnovs ptisnovs 14 Nov 27 14:42 recovery-point-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 14 Nov 27 14:42 replication-offset-checkpoint drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 14:41 test1-0

6. Korektní nastavení logování v projektu

Při spuštění předchozího příkladu se zobrazilo varování o tom, že není správně nastavena konfigurace logů. To napravíme v dnešním druhém demonstračním příkladu přidáním další závislosti do souboru project.clj:

(defproject topic-constructor "0.1.0-SNAPSHOT" :description "FIXME: write description" :url "http://example.com/FIXME" :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0" :url "https://www.eclipse.org/legal/epl-2.0/"} :dependencies [[org.clojure/clojure "1.10.1"] [fundingcircle/jackdaw "0.7.6"] [ch.qos.logback/logback-classic "1.1.3"]] :main ^:skip-aot topic-constructor.core :target-path "target/%s" :profiles {:uberjar {:aot :all :jvm-opts ["-Dclojure.compiler.direct-linking=true"]}})

Samotný zdrojový kód příkladu se liší v jediné maličkosti – je vytvořeno téma s odlišným názvem:

(ns topic-constructor.core (:require [jackdaw.admin :as ja])) (defn -main [] (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (ja/create-topics! client [{:topic-name "test2" :partition-count 1 :replication-factor 1 :topic-config {"cleanup.policy" "compact"}}])))

Po spuštění bude příklad již velmi „ukecaný“, a to právě díky nastavení logování:

$ lein run 21:17:16.076 [main] INFO o.a.k.c.admin.AdminClientConfig - AdminClientConfig values: bootstrap.servers = [localhost:9092] client.dns.lookup = default client.id = connections.max.idle.ms = 300000 metadata.max.age.ms = 300000 metric.reporters = [] ... ... ... 21:17:16.381 [kafka-admin-client-thread | adminclient-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent 21:17:16.382 [kafka-admin-client-thread | adminclient-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-received 21:17:16.382 [kafka-admin-client-thread | adminclient-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.latency 21:17:16.382 [kafka-admin-client-thread | adminclient-1] DEBUG o.a.kafka.common.network.Selector - [AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 0 21:17:16.382 [kafka-admin-client-thread | adminclient-1] DEBUG o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Completed connection to node 0. Fetching API versions. 21:17:16.382 [kafka-admin-client-thread | adminclient-1] DEBUG o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Initiating API versions fetch from node 0.

Nově zkonstruované téma je po spuštění příkladu zmíněno i v logu brokera:

[2020-11-27 14:43:08,474] INFO Created log for partition test2-0 in /tmp/kafka-logs/test2-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, message.format.version -> 2.4-IV1, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager) [2020-11-27 14:43:08,475] INFO [Partition test2-0 broker=0] No checkpointed highwatermark is found for partition test2-0 (kafka.cluster.Partition) [2020-11-27 14:43:08,475] INFO [Partition test2-0 broker=0] Log loaded for partition test2-0 with initial high watermark 0 (kafka.cluster.Partition) [2020-11-27 14:43:08,475] INFO [Partition test2-0 broker=0] test2-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)

A po pohledu do adresáře s daty brokera se objevil nový adresář pro nově zkonstruované téma:

$ ls -l /tmp/kafka-logs/ total 16 -rw-rw-r--. 1 ptisnovs ptisnovs 0 Nov 27 14:39 cleaner-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 4 Nov 27 14:43 log-start-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 88 Nov 27 14:39 meta.properties -rw-rw-r--. 1 ptisnovs ptisnovs 14 Nov 27 14:43 recovery-point-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 24 Nov 27 14:43 replication-offset-checkpoint drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 14:41 test1-0 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 14:43 test2-0

7. Vytvoření tématu s deseti oddíly

Nyní si předchozí dva demonstrační příklady nepatrně upravíme a vytvoříme nové téma, ovšem rozdělené do deseti oddílů. Takto nakonfigurované téma nám umožní snadný paralelní přístup ke zprávám (událostem). Povšimněte si, že hodnota replication-factor je stále nastavena na jedničku, protože pracujeme pouze s lokálně běžící Kafkou a nikoli (před)produkčním systémem:

(ns topic-constructor.core (:require [jackdaw.admin :as ja])) (defn -main [] (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (ja/create-topics! client [{:topic-name "test3" :partition-count 10 :replication-factor 1 :topic-config {"cleanup.policy" "compact"}}])))

Nyní by měl adresář s daty brokera vypadat následovně:

$ ls -l /tmp/kafka-logs/ total 16 -rw-rw-r--. 1 ptisnovs ptisnovs 0 Nov 27 14:39 cleaner-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 4 Nov 27 14:43 log-start-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 88 Nov 27 14:39 meta.properties -rw-rw-r--. 1 ptisnovs ptisnovs 14 Nov 27 14:43 recovery-point-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 24 Nov 27 14:43 replication-offset-checkpoint drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 14:41 test1-0 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 14:43 test2-0 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-0 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-1 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-2 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-3 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-4 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-5 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-6 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-7 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-8 drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-9

8. Výpis informací o konfiguraci brokeru

V dalším kroku si ukážeme, jak lze vypsat základní informace o konfiguraci brokeru. K tomuto účelu se používá funkce:

(jackdaw.admin/get-broker-config client id)

Vidíme, že této funkci musíme předat jak instanci klienta (to již známe):

(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"}) ... ... ...

Tak i jednoznačný identifikátor brokera. Ten přečteme takto:

(jackdaw.admin/describe-cluster client)

Přičemž z vrácené mapy musíme přečíst pouze ono ID, například s využitím threading makra:

(->> configs :controller :id)]

Celý kód příkladu bude vypadat následovně:

(ns get-broker-config.core (:require [jackdaw.admin :as ja]) (:require [clojure.pprint :as pp])) (defn -main [& args] (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"}) configs (ja/describe-cluster client) id (->> configs :controller :id)] (pp/pprint (ja/get-broker-config client id))))

Výsledek může vypadat následovně:

{"ssl.enabled.protocols" {:name "ssl.enabled.protocols", :value "TLSv1.2,TLSv1.1,TLSv1", :default? true, :read-only? false, :sensitive? false}, "advertised.host.name" {:name "advertised.host.name", :value nil, :default? true, :read-only? true, :sensitive? false}, "socket.request.max.bytes" ... ... ... {:name "log.preallocate", :value "false", :default? true, :read-only? false, :sensitive? false}, "log.roll.ms" {:name "log.roll.ms", :value nil, :default? true, :read-only? false, :sensitive? false}, "password.encoder.cipher.algorithm" {:name "password.encoder.cipher.algorithm", :value "AES/CBC/PKCS5Padding", :default? true, :read-only? true, :sensitive? false}}

9. Přečtení a zobrazení základních informací o Kafka clusteru

Získat a dále zobrazit (či jinak zpracovat) je možné i o Kafka clusteru, a to poměrně jednoduchým způsobem – zavoláním funkce jackdaw.admin/describe-cluster, s níž jsme se mimochodem seznámili v předchozí kapitole:

(ns describe-cluster.core (:require [jackdaw.admin :as ja] [clojure.pprint :as pp])) (defn -main [& args] (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (pp/pprint (ja/describe-cluster client))))

Výsledek by měl (pro lokálně běžící cluster) vypadat takto:

{:cluster-id "hOg8auCqRcK23gy5b8Ekow", :controller {:host "localhost", :port 9092, :id 0, :rack nil}, :nodes [{:host "localhost", :port 9092, :id 0, :rack nil}]}

Poznámka: povšimněte si, že jak všechny konfigurace, tak i vrácená data mají podobu klasické mapy programovacího jazyka Clojure, i když se interně může jednat o Properties (Javovská třída).

10. Popis témat, ke kterým má klient přístup

Rozhraní AdminClient nám umožňuje získat i mapu se základním popisem témat, ke kterým má klient přístup. Použije se přitom funkce nazvaná jackdaw.admin/describe-topics, které se v tom nejjednodušším případě předá jen informace o klientovi:

(ns describe-topics.core (:require [jackdaw.admin :as ja] [clojure.pprint :as pp])) (defn -main [& args] (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (pp/pprint (ja/describe-topics client))))

Vrátí se mapa, jejímiž klíči jsou jména témat (řetězce, nikoli keywords) a hodnotami pak informace o tématech. Povšimněte si i podrobnějších informací o replikách (kde je téma uloženo):

{"test1" {:is-internal? false, :partition-info ({:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 0, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]})}, "test2" {:is-internal? false, :partition-info ({:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 0, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]})}, "test3" {:is-internal? false, :partition-info ({:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 0, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 1, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 2, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 3, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 4, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 5, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 6, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 7, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 8, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 9, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]})}}

11. Získání popisu vybraných témat

V případě, že budeme chtít získat popis pouze pro vybrané téma nebo několik témat, je možné funkci jackdaw.admin/describe-topics ve druhém parametru předat sekvenci (například vektor) se jmény témat. A podobně jako v jiných případech, i zde se nepředávají „holé“ řetězce, ale mapy s klíčem :topic-name:

(ns describe-topics.core (:require [jackdaw.admin :as ja] [clojure.pprint :as pp])) (defn -main [& args] (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (pp/pprint (ja/describe-topics client [{:topic-name "test1"} {:topic-name "test3"}]))))

Nyní budou ve vrácené mapě informace pouze o vybraných tématech „test1“ a „test3“:

{"test1" {:is-internal? false, :partition-info ({:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 0, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]})}, "test3" {:is-internal? false, :partition-info ({:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 0, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 1, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 2, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 3, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 4, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 5, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 6, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 7, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 8, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]} {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}], :leader {:host "localhost", :port 9092, :id 0, :rack nil}, :partition 9, :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]})}}

12. Konfigurace jednotlivých témat

Poslední demonstrační příklad, který se týká čtení administračních údajů, vypíše konfiguraci všech témat popř. konfiguraci témat vybraných. Použijeme přitom funkci jackdaw.admin/describe-topics-configs, která se volá následujícím způsobem:

(ns describe-topics-configs.core (:require [jackdaw.admin :as ja] [clojure.pprint :as pp])) (defn -main [& args] (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (pp/pprint (ja/describe-topics-configs client [{:topic-name "test1"} {:topic-name "test3"}]))))

Výsledkem je opět mapa, tentokrát však složitěji strukturovaná, protože o každém tématu obsahuje všechny konfigurační údaje:

{{:name "test1", :type :config-resource/topic} {"min.insync.replicas" {:name "min.insync.replicas", :value "1", :default? true, :read-only? false, :sensitive? false}, "follower.replication.throttled.replicas" {:name "follower.replication.throttled.replicas", :value "", :default? true, :read-only? false, :sensitive? false}, "flush.messages" {:name "flush.messages", :value "9223372036854775807", :default? true, :read-only? false, :sensitive? false}, "leader.replication.throttled.replicas" {:name "leader.replication.throttled.replicas", :value "", :default? true, :read-only? false, :sensitive? false}, "segment.bytes" {:name "segment.bytes", :value "1073741824", :default? false, :read-only? false, :sensitive? false}, "flush.ms" {:name "flush.ms", :value "9223372036854775807", :default? true, :read-only? false, :sensitive? false}, "unclean.leader.election.enable" {:name "unclean.leader.election.enable", :value "false", :default? true, :read-only? false, :sensitive? false}, "retention.bytes" {:name "retention.bytes", :value "-1", :default? true, :read-only? false, :sensitive? false}, "cleanup.policy" {:name "cleanup.policy", :value "compact", :default? false, :read-only? false, :sensitive? false}, "file.delete.delay.ms" {:name "file.delete.delay.ms", :value "60000", :default? true, :read-only? false, :sensitive? false}, "segment.index.bytes" {:name "segment.index.bytes", :value "10485760", :default? true, :read-only? false, :sensitive? false}, "index.interval.bytes" {:name "index.interval.bytes", :value "4096", :default? true, :read-only? false, :sensitive? false}, "delete.retention.ms" {:name "delete.retention.ms", :value "86400000", :default? true, :read-only? false, :sensitive? false}, "segment.ms" {:name "segment.ms", :value "604800000", :default? true, :read-only? false, :sensitive? false}, "min.compaction.lag.ms" {:name "min.compaction.lag.ms", :value "0", :default? true, :read-only? false, :sensitive? false}, "segment.jitter.ms" {:name "segment.jitter.ms", :value "0", :default? true, :read-only? false, :sensitive? false}, "message.timestamp.difference.max.ms" {:name "message.timestamp.difference.max.ms", :value "9223372036854775807", :default? true, :read-only? false, :sensitive? false}, "max.compaction.lag.ms" {:name "max.compaction.lag.ms", :value "9223372036854775807", :default? true, :read-only? false, :sensitive? false}, "preallocate" {:name "preallocate", :value "false", :default? true, :read-only? false, :sensitive? false}, "message.downconversion.enable" {:name "message.downconversion.enable", :value "true", :default? true, :read-only? false, :sensitive? false}, "max.message.bytes" {:name "max.message.bytes", :value "1000012", :default? true, :read-only? false, :sensitive? false}, "message.timestamp.type" {:name "message.timestamp.type", :value "CreateTime", :default? true, :read-only? false, :sensitive? false}, "retention.ms" {:name "retention.ms", :value "604800000", :default? true, :read-only? false, :sensitive? false}, "min.cleanable.dirty.ratio" {:name "min.cleanable.dirty.ratio", :value "0.5", :default? true, :read-only? false, :sensitive? false}, "message.format.version" {:name "message.format.version", :value "2.4-IV1", :default? true, :read-only? false, :sensitive? false}, "compression.type" {:name "compression.type", :value "producer", :default? true, :read-only? false, :sensitive? false}}, {:name "test3", :type :config-resource/topic} {"min.insync.replicas" {:name "min.insync.replicas", :value "1", :default? true, :read-only? false, :sensitive? false}, "follower.replication.throttled.replicas" {:name "follower.replication.throttled.replicas", :value "", :default? true, :read-only? false, :sensitive? false}, "flush.messages" {:name "flush.messages", :value "9223372036854775807", :default? true, :read-only? false, :sensitive? false}, "leader.replication.throttled.replicas" {:name "leader.replication.throttled.replicas", :value "", :default? true, :read-only? false, :sensitive? false}, "segment.bytes" {:name "segment.bytes", :value "1073741824", :default? false, :read-only? false, :sensitive? false}, "flush.ms" {:name "flush.ms", :value "9223372036854775807", :default? true, :read-only? false, :sensitive? false}, "unclean.leader.election.enable" {:name "unclean.leader.election.enable", :value "false", :default? true, :read-only? false, :sensitive? false}, "retention.bytes" {:name "retention.bytes", :value "-1", :default? true, :read-only? false, :sensitive? false}, "cleanup.policy" {:name "cleanup.policy", :value "compact", :default? false, :read-only? false, :sensitive? false}, "file.delete.delay.ms" {:name "file.delete.delay.ms", :value "60000", :default? true, :read-only? false, :sensitive? false}, "segment.index.bytes" {:name "segment.index.bytes", :value "10485760", :default? true, :read-only? false, :sensitive? false}, "index.interval.bytes" {:name "index.interval.bytes", :value "4096", :default? true, :read-only? false, :sensitive? false}, "delete.retention.ms" {:name "delete.retention.ms", :value "86400000", :default? true, :read-only? false, :sensitive? false}, "segment.ms" {:name "segment.ms", :value "604800000", :default? true, :read-only? false, :sensitive? false}, "min.compaction.lag.ms" {:name "min.compaction.lag.ms", :value "0", :default? true, :read-only? false, :sensitive? false}, "segment.jitter.ms" {:name "segment.jitter.ms", :value "0", :default? true, :read-only? false, :sensitive? false}, "message.timestamp.difference.max.ms" {:name "message.timestamp.difference.max.ms", :value "9223372036854775807", :default? true, :read-only? false, :sensitive? false}, "max.compaction.lag.ms" {:name "max.compaction.lag.ms", :value "9223372036854775807", :default? true, :read-only? false, :sensitive? false}, "preallocate" {:name "preallocate", :value "false", :default? true, :read-only? false, :sensitive? false}, "message.downconversion.enable" {:name "message.downconversion.enable", :value "true", :default? true, :read-only? false, :sensitive? false}, "max.message.bytes" {:name "max.message.bytes", :value "1000012", :default? true, :read-only? false, :sensitive? false}, "message.timestamp.type" {:name "message.timestamp.type", :value "CreateTime", :default? true, :read-only? false, :sensitive? false}, "retention.ms" {:name "retention.ms", :value "604800000", :default? true, :read-only? false, :sensitive? false}, "min.cleanable.dirty.ratio" {:name "min.cleanable.dirty.ratio", :value "0.5", :default? true, :read-only? false, :sensitive? false}, "message.format.version" {:name "message.format.version", :value "2.4-IV1", :default? true, :read-only? false, :sensitive? false}, "compression.type" {:name "compression.type", :value "producer", :default? true, :read-only? false, :sensitive? false}}}

Poznámka: jedná se o běžnou mapu jazyka Clojure, kterou je možné zpracovávat funkcemi ze standardní knihovny.

13. Vymazání tématu

Poslední funkcí ze jmenného prostoru jackdaw.admin, kterou si dnes popíšeme, je funkce určená pro smazání tématu popř. většího množství témat. Tato funkce se jmenuje delete-topics!, přičemž vykřičník na konci názvu opět znamená, že se jedná o funkci s vedlejším efektem (která uvnitř mění „stav světa“). Příklad použití – smazání dvojice témat nazvaných „test1“ a „test3“:

(ns topic-destructor.core (:require [jackdaw.admin :as ja])) (defn -main [] (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (ja/delete-topics! client [{:topic-name "test1"} {:topic-name "test3"}])))

Po spuštění příkladu příkazem lein run by se v logu brokeru měla objevit informace o tom, že témata byla přejmenována a bylo naplánováno jejich smazání:

... ... ... [2020-11-27 19:52:11,127] INFO Log for partition test3-7 is renamed to /tmp/kafka-logs/test3-7.d3fb75bbbcab4f5f95d4b818c3362533-delete and is scheduled for deletion (kafka.log.LogManager) [2020-11-27 19:52:11,130] INFO Log for partition test3-1 is renamed to /tmp/kafka-logs/test3-1.f351358e5f294ce088335e08611a4304-delete and is scheduled for deletion (kafka.log.LogManager) [2020-11-27 19:52:11,133] INFO Log for partition test3-4 is renamed to /tmp/kafka-logs/test3-4.51da619c145f4c40a1f6c36b5a16dbdd-delete and is scheduled for deletion (kafka.log.LogManager) ... ... ...

Přejmenování bude ihned viditelné ve výpisu adresáře /tmp/kafka-logs:

total 20 -rw-rw-r--. 1 ptisnovs ptisnovs 4 Nov 27 19:52 cleaner-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 4 Nov 27 19:52 log-start-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 88 Nov 27 19:45 meta.properties -rw-rw-r--. 1 ptisnovs ptisnovs 14 Nov 27 19:52 recovery-point-offset-checkpoint -rw-rw-r--. 1 ptisnovs ptisnovs 115 Nov 27 19:52 replication-offset-checkpoint drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:45 test1-0.640fd381aef54650aeac15ca775248b8-delete drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-0.a235153017ca4d9facdc380fbd6cdd95-delete drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-1.f351358e5f294ce088335e08611a4304-delete drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-2.c529de213c4f4c1e8374f04fc167af53-delete drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-3.fb2b771909964eb8bc4448ed62410717-delete drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-4.51da619c145f4c40a1f6c36b5a16dbdd-delete drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-5.8788f5813d3d476bb35fe70bddddd52a-delete drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-6.cfb995d4c66c45f0aa4edd313f4eb202-delete drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-7.d3fb75bbbcab4f5f95d4b818c3362533-delete drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-8.318bdc9af16f44b4a454ba2716dc6090-delete drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-9.4efc612d790540deb53a0cb5365f0a91-delete

Po určité době dojde ke skutečnému vymazání (přejmenovaných) témat:

[2020-11-27 19:53:11,180] INFO Deleted log /tmp/kafka-logs/test1-0.640fd381aef54650aeac15ca775248b8-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment) [2020-11-27 19:53:11,181] INFO Deleted offset index /tmp/kafka-logs/test1-0.640fd381aef54650aeac15ca775248b8-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment) [2020-11-27 19:53:11,183] INFO Deleted time index /tmp/kafka-logs/test1-0.640fd381aef54650aeac15ca775248b8-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment) [2020-11-27 19:53:11,183] INFO Deleted log for partition test1-0 in /tmp/kafka-logs/test1-0.640fd381aef54650aeac15ca775248b8-delete. (kafka.log.LogManager)

14. Vytvoření zprávy s jejím posláním do vybraného tématu (producent zpráv)

Nyní se již dostáváme k popisu funkcí a maker deklarovaných ve jmenném prostoru jackdaw.client. Nalezneme zde především funkci nazvanou produce!, která slouží k poslání zprávy do zvoleného tématu (a pochopitelně o funkci měnící stav světa; ostatně právě proto má ve svém jméně vykřičník). Nejdříve je však nutné producenta zpráv nakonfigurovat, a to s využitím mapy, která je interně převedena na Properties (jednotlivé vlastnosti budou podrobněji popsány příště):

{"bootstrap.servers" "localhost:9092" "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" "value.serializer" "org.apache.kafka.common.serialization.StringSerializer" "acks" "all" "client.id" "foo"})

Dále je nutné vytvořit instanci producenta. A zde narazíme na malý problém – na konci činnosti aplikace je nutné producenta uzavřít. K otevření s automatickým uzavřením slouží standardní makro nazvané with-open:

user=> (doc with-open) ------------------------- clojure.core/with-open ([bindings & body]) Macro bindings => [name init ...] Evaluates body in a try expression with names bound to the values of the inits, and a finally clause that calls (.close name) on each name in reverse order. nil

Použití tohoto makra je snadné a přímočaré (navíc pro Clojure zcela idiomatické):

(with-open [producer (jc/producer producer-config)] ... ... ... )

Jakmile máme k dispozici objekt představující producenta, můžeme do tématu poslat zprávu s využitím již výše zmíněné funkce produce!, které se předá téma, klíč a vlastní text zprávy (data):

(jc/produce! producer {:topic-name "test1"} "1" "Hello, Kafka!")

Návratovou hodnotou je future s metainformacemi o poslané zprávě.

Úplný zdrojový kód demonstračního příkladu, který zprávu pošle a zobrazí výsledek, je zobrazen pod tímto odstavcem:

(ns produce-messages-1.core (:require [jackdaw.client :as jc] [clojure.pprint :as pp])) (def producer-config {"bootstrap.servers" "localhost:9092" "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" "value.serializer" "org.apache.kafka.common.serialization.StringSerializer" "acks" "all" "client.id" "foo"}) (defn -main [& args] (with-open [producer (jc/producer producer-config)] (let [record-metadata (jc/produce! producer {:topic-name "test1"} "1" "Hello, Kafka!")] (pp/pprint @record-metadata))))

15. Producent většího množství zpráv, nastavení klíče

Do zvoleného tématu můžeme poslat větší množství zpráv, přičemž není zapotřebí čekat na potvrzení, že byla zpráva skutečně zpracována (záleží ovšem na konkrétním použití Kafky). Pokud se používá téma s větším množstvím oddílů, což je v našem případě téma „test3“, je nutné se zamyslet nad obsahem klíče, neboť ten je zahashován a tato hodnota je použita pro volbu oddílu. Příklad s využitím klíče obsahujícího řetězec s lokálním číslem zprávy je uveden níže:

(ns produce-messages-3.core (:require [jackdaw.client :as jc] [clojure.pprint :as pp])) (def producer-config {"bootstrap.servers" "localhost:9092" "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" "value.serializer" "org.apache.kafka.common.serialization.StringSerializer" "acks" "all" "client.id" "foo"}) (defn -main [& args] (with-open [producer (jc/producer producer-config)] (doseq [i (range 1 101)] (let [key (str i) value (str "Message #" i)] (println "Publishing message with key '" key "' and value '" value "'") (let [record-metadata (jc/produce! producer {:topic-name "test3"} key value)] (pp/pprint @record-metadata))) )))

Příklad metadat, z nichž je zřejmé, že zprávy byly skutečně distribuovány do různých oddílů:

Publishing message with key ' 1 ' and value ' Message #1 ' {:topic-name "test3", :partition 9, :timestamp 1606555678845, :offset 0, :serialized-key-size 1, :serialized-value-size 10} Publishing message with key ' 2 ' and value ' Message #2 ' {:topic-name "test3", :partition 8, :timestamp 1606555678885, :offset 0, :serialized-key-size 1, :serialized-value-size 10} Publishing message with key ' 3 ' and value ' Message #3 ' {:topic-name "test3", :partition 3, :timestamp 1606555678891, :offset 0, :serialized-key-size 1, :serialized-value-size 10}

16. Jednoduchý konzument zpráv

Zprávy uložené do vybraného tématu je pochopitelně možné číst – konzumovat. Ukažme si tedy implementaci velmi jednoduchého (až primitivního) konzumenta zpráv; přičemž podrobnější informace budou uvedeny příště. Konzument náleží do skupiny „group-A“ a v nekonečné smyčce čeká na zprávy, které ihned vypisuje na standardní výstup:

(ns consume-messages-1.core (:require [jackdaw.client :as jc] [jackdaw.client.log :as jl] [clojure.pprint :as pp])) (def consumer-config {"bootstrap.servers" "localhost:9092" "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "group.id" "group-A"}) (defn -main [& args] (with-open [consumer (-> (jc/consumer consumer-config) (jc/subscribe [{:topic-name "test1"}]))] (doseq [{:keys [key value partition timestamp offset]} (jl/log consumer 10)] (println "key: " key) (println "value: " value) (println "partition: " partition) (println "timestamp: " timestamp) (println "offset: " offset))))

17. Obsah následující části článku

Posílání a konzumace zpráv není ve skutečnosti tak přímočará, jak by se možná mohlo z demonstračních příkladů zdát. Z tohoto důvodu se tímto tématem budeme zabývat i příště. Ovšem mnohem důležitější je skutečný „streaming“, který knihovna Jackdaw taktéž podporuje. Tímto tématem se tedy budeme taktéž zabývat, a to poměrně podrobně.

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Clojure byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-):

# Projekt Stručný popis Cesta 1 kafka-repl prázdný projekt připravený pro interaktivní práci v REPLu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/kafka-repl 2 topic-constructor vytvoření nového tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor 3 topic-constructor-logger vytvoření nového tématu; konfigurace logování https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor-logger 4 topic-constructor-10-partitions vytvoření nového tématu s deseti oddíly https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor-10-partitions 5 topic-destructor vymazání tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-destructor 6 get-broker-config zobrazení konfigurace brokera https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/get-broker-config 7 describe-cluster zobrazení aktuální konfigurace clusteru https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-cluster 8 describe-all-topics zobrazení všech dostupných témat https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-all-topics 9 describe-topics zobrazení informací o vybraných tématech https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-topics 10 describe-topics-configs zobrazení podrobnějších informací o vybraných tématech https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-topics-configs 11 produce-messages-1 vytvoření jediné zprávy https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-1 12 produce-messages-2 vytvoření 100 zpráv https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-2 13 produce-messages-3 vytvoření 100 zpráv se specifikací klíče https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-3 14 consume-messages-1 konzument zpráv z vybraného tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/consume-messages-1

