Hlavní navigace

Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw

1. 12. 2020
Doba čtení: 51 minut

Sdílet

S nástrojem Apache Kafka jsme se již na stránkách Rootu ve stručnosti seznámili. Dnes si ukážeme, jak lze s Kafkou komunikovat z programovacího jazyka Clojure s využitím knihovny Jackdaw.

Obsah

1. Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw

2. Instalace a spuštění Kafky

3. Stažení všech knihoven potřebných pro volání funkcí Apache Kafky z jazyka Clojure

4. Interaktivní práce s Kafkou z REPLu popř. z textového editoru/IDE

5. Vytvoření nového tématu s jedním oddílem

6. Korektní nastavení logování v projektu

7. Vytvoření tématu s deseti oddíly

8. Výpis informací o konfiguraci brokeru

9. Přečtení a zobrazení základních informací o Kafka clusteru

10. Popis témat, ke kterým má klient přístup

11. Získání popisu vybraných témat

12. Konfigurace jednotlivých témat

13. Vymazání tématu

14. Vytvoření zprávy s jejím posláním do vybraného tématu (producent zpráv)

15. Producent většího množství zpráv, nastavení klíče

16. Jednoduchý konzument zpráv

17. Obsah následující části článku

18. Repositář s demonstračními příklady

19. Odkazy na předchozí části seriálu o programovacím jazyku Clojure

20. Odkazy na Internetu

1. Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw

S užitečným a populárním nástrojem Apache Kafka jsme se již na stránkách Rootu několikrát setkali, a to například v článcích Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách a Apache Kafka: distribuovaná streamovací platforma. Pro tuto asi nejznámější streamovací platformu existují knihovny realizující rozhraní pro různé programovací jazyky a jejich ekosystémy. Tato rozhraní jsou vypsána v následující tabulce:

# Jazyk/platforma
1 C/C++
2 Python
3 Go/Golang Go/Golang
4 Erlang
5 .NET
6 Ruby
7 Node.js
8 Perl
9 PHP
10 Rust
11 Storm
12 Scala (DSL jazyk)
13 Clojure
14 Clojure
15 Swift
   
16 CLI (stdin/stdout)

Dnes se budeme zabývat rozhraním určeným pro programovací jazyk Clojure. Toto rozhraní se jmenuje Jackdaw a umožňuje „funkcionální“ přístup k datům (zprávám, událostem) uloženým v Kafce. Využít je například možné takzvané transducery atd.

Obrázek 1: Logo nástroje Apache Kafka, kterému se budeme dnes věnovat.

Nejprve si popíšeme funkce a makra deklarovaná ve jmenných prostorech jackdaw.admin (administrace Kafky) a jackdaw.client (pro implementaci producentů a konzumentů zpráv). V navazujícím článku si ukážeme i použití již zmíněných transducerů, specifikaci skutečného streamu atd. – tedy důvodů, proč vůbec o použití kombinace Clojure a Apache Kafka uvažovat (a právě specifikace streamů je v Clojure jak přímočará, tak i velmi nasaditelná).

2. Instalace a spuštění Kafky

V případě, že je na počítači nainstalováno JRE (Java Runtime Environment), je instalace Kafky pro testovací účely triviální. Tarball s instalací poslední stabilní verze Kafky lze získat z adresy https://www.apache.org/dyn/clo­ser.cgi?path=/kafka/2.6.0/kaf­ka2.13–2.6.0.tgz. Stažení a rozbalení tarballu se provede následovně:

$ wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz
$ tar -xzf kafka_2.13-2.6.0.tgz
$ cd kafka_2.13-2.6.0

Po rozbalení získáme adresář, v němž se nachází všechny potřebné Java archivy (JAR), konfigurační soubory (v podadresáři config) a několik pomocných skriptů (v podadresáři bin). Pro spuštění Zookeepera a brokerů je zapotřebí mít nainstalovánu již zmíněnou JRE (Java Runtime Environment) a samozřejmě též nějaký shell (BASH, cmd, …).

Prozatím nebudeme žádné další nastavení ani žádné další nástroje potřebovat, ovšem v navazujícím článku se zmíníme o některých užitečných utilitách určených pro administraci a sledování (monitorování) Kafky.

Obrázek 2: Sledování činnosti brokeru přes standardní nástroj JConsole.

Po (doufejme že úspěšné) instalaci Kafky již můžeme spustit ZooKeeper a jednu instanci brokera (a to přesně v tomto pořadí, tedy nejdřívě ZooKeeper!). Konfigurace ZooKeepera je uložena v souboru config/zookeeper.properties a zajímat nás budou především tyto volby – adresář, kam ZooKeeper ukládá svoje data, port, který použijí brokeři a omezení počtu připojení jednoho klienta v daný okamžik:

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
Poznámka: hodnota maxClientCnxns v tomto případě neznamená, že by se nemohly připojit žádní klienti, ale že je vypnutý mechanismus, který zabezpečuje infrastrukturu Kafky před některými typy DOS útoků. Na disku, kde je adresář dataDir by také mělo být dostatek místa, protože ZooKeeper v některých případech mívá větší nároky. Další informace lze nalézt na stránce https://zookeeper.apache.or­g/doc/r3.5.6/.

Nyní již můžeme Zookeepera spustit:

$ bin/zookeeper-server-start.sh config/zookeeper.properties
 
[2020-01-20 17:00:07,823] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2020-01-20 17:00:07,825] WARN config/zookeeper.properties is relative. Prepend ./ to indicate that you're sure! (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2020-01-20 17:00:07,827] INFO clientPortAddress is 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
...
...
[2020-01-20 17:00:07,947] INFO Using checkIntervalMs=60000 maxPerMinute=10000 (org.apache.zookeeper.server.ContainerManager)
[2020-01-20 17:00:26,978] INFO Creating new log file: log.1 (org.apache.zookeeper.server.persistence.FileTxnLog)

Konfigurace jednoho brokera je uložená v souboru config/server.properties. Samotný konfigurační soubor obsahuje několik sekcí:

  1. Port, na kterém broker naslouchá, jeho ID, počet použitých vláken pro IO operace a počet vláken pro komunikaci.
  2. Velikost bufferů, maximální povolená velikost požadavků (což omezuje velikost zprávy) atd.
  3. Nastavení počtu partitions
  4. Nastavení retence dat
  5. Připojení k Zookeeperovi

Pro testovací účely změníme především umístění souborů s logy (tedy s jednotlivými partitions):

broker.id=0
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
 
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
 
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
 
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
Poznámka: i velikost adresáře log.dirs roste, a to mnohdy velmi rychle (typicky se alokuje 20MB pro každou partition), takže se vyplatí sledovat příslušné metriky.

Spuštění jednoho brokera vypadá jednoduše:

$ bin/kafka-server-start.sh config/server.properties

Alternativně je možné Zookeepera i Kafku (jednu instanci brokera) spustit v Dockeru:

$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka
Poznámka: předchozí nastavení předpokládá, že současně na stejném stroji nepoběží žádná další instance Kafky ani Zookeepera. Pokud budete potřebovat spustit větší množství brokerů, je nutné minimálně změnit mapování portů (přepínače -p).

Po spuštění jak Zookepera, tak i brokera Kafky se podívejte na obsah adresáře /tmp/kafka-logs/, který byl nakonfigurován v rámci předchozího textu. Prozatím by se v tomto adresáři mělo vyskytovat pouze několik souborů, ovšem žádné adresáře (každý adresář odpovídá jedné partition určeného topicu, popř. právě smazané partition – to uvidíme v dalším textu):

$ ls -l /tmp/kafka-logs/
 
total 4
-rw-rw-r--. 1 ptisnovs ptisnovs  0 Nov 27 14:39 cleaner-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs  0 Nov 27 14:39 log-start-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs 88 Nov 27 14:39 meta.properties
-rw-rw-r--. 1 ptisnovs ptisnovs  0 Nov 27 14:39 recovery-point-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs  0 Nov 27 14:39 replication-offset-checkpoint

3. Stažení všech knihoven potřebných pro volání funkcí Apache Kafky z jazyka Clojure

Nyní je zapotřebí získat všechny knihovny, které jsou přímo či nepřímo používány pro volání funkcí Apache Kafky z programovacího jazyka Clojure. Využijeme přitom nástroj Leiningen, který je úzce navázán na Maven. Nejprve vytvoříme kostru nového projektu příkazem:

$ lein new app topic-constructor
 
Generating a project called topic-constructor based on the 'app' template.

V nově vytvořeném adresáři topic-constuctor se nachází soubor project.clj, jenž mj. obsahuje i seznam závislostí:

(defproject topic-constructor "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
            :url "https://www.eclipse.org/legal/epl-2.0/"}
  :dependencies [[org.clojure/clojure "1.10.1"]]
  :main ^:skip-aot topic-constructor.core
  :target-path "target/%s"
  :profiles {:uberjar {:aot :all
                       :jvm-opts ["-Dclojure.compiler.direct-linking=true"]}})

Tento soubor upravíme do následující podoby – přidáme závislost na knihovně fundingcircle/jackdaw:

(defproject topic-constructor "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
            :url "https://www.eclipse.org/legal/epl-2.0/"}
  :dependencies [[org.clojure/clojure "1.10.1"]
                 [fundingcircle/jackdaw "0.7.6"]]
  :main ^:skip-aot topic-constructor.core
  :target-path "target/%s"
  :profiles {:uberjar {:aot :all
                       :jvm-opts ["-Dclojure.compiler.direct-linking=true"]}})

Příkazem lein deps se zahájí stažení všech doposud nenainstalovaných závislostí:

$ lein deps
 
Retrieving fundingcircle/jackdaw/0.7.6/jackdaw-0.7.6.pom from clojars
Retrieving aleph/aleph/0.4.6/aleph-0.4.6.pom from clojars
Retrieving manifold/manifold/0.1.8/manifold-0.1.8.pom from clojars
Retrieving io/aleph/dirigiste/0.1.5/dirigiste-0.1.5.pom from central
Retrieving riddley/riddley/0.1.14/riddley-0.1.14.pom from clojars
Retrieving byte-streams/byte-streams/0.2.4/byte-streams-0.2.4.pom from clojars
Retrieving primitive-math/primitive-math/0.1.6/primitive-math-0.1.6.pom from clojars
Retrieving io/netty/netty-transport/4.1.25.Final/netty-transport-4.1.25.Final.pom from central
Retrieving io/netty/netty-parent/4.1.25.Final/netty-parent-4.1.25.Final.pom from central
Retrieving io/netty/netty-buffer/4.1.25.Final/netty-buffer-4.1.25.Final.pom from central
Retrieving io/netty/netty-common/4.1.25.Final/netty-common-4.1.25.Final.pom from central
Retrieving io/netty/netty-resolver/4.1.25.Final/netty-resolver-4.1.25.Final.pom from central
Retrieving io/netty/netty-transport-native-epoll/4.1.25.Final/netty-transport-native-epoll-4.1.25.Final.pom from central
Retrieving io/netty/netty-transport-native-unix-common/4.1.25.Final/netty-transport-native-unix-common-4.1.25.Final.pom from central
Retrieving io/netty/netty-codec/4.1.25.Final/netty-codec-4.1.25.Final.pom from central
Retrieving io/netty/netty-codec-http/4.1.25.Final/netty-codec-http-4.1.25.Final.pom from central
Retrieving io/netty/netty-handler/4.1.25.Final/netty-handler-4.1.25.Final.pom from central
Retrieving io/netty/netty-handler-proxy/4.1.25.Final/netty-handler-proxy-4.1.25.Final.pom from central
Retrieving io/netty/netty-codec-socks/4.1.25.Final/netty-codec-socks-4.1.25.Final.pom from central
Retrieving io/netty/netty-resolver-dns/4.1.25.Final/netty-resolver-dns-4.1.25.Final.pom from central
Retrieving io/netty/netty-codec-dns/4.1.25.Final/netty-codec-dns-4.1.25.Final.pom from central
Retrieving clj-time/clj-time/0.15.1/clj-time-0.15.1.pom from clojars
Retrieving joda-time/joda-time/2.10/joda-time-2.10.pom from central
Retrieving danlentz/clj-uuid/0.1.9/clj-uuid-0.1.9.pom from clojars
Retrieving io/confluent/kafka-schema-registry-client/5.3.1/kafka-schema-registry-client-5.3.1.pom from confluent
Retrieving io/confluent/kafka-schema-registry-parent/5.3.1/kafka-schema-registry-parent-5.3.1.pom from confluent
Retrieving io/confluent/rest-utils-parent/5.3.1/rest-utils-parent-5.3.1.pom from confluent
Retrieving io/confluent/common/5.3.1/common-5.3.1.pom from confluent
Retrieving io/confluent/common-parent/5.3.1/common-parent-5.3.1.pom from confluent
Retrieving org/glassfish/jersey/jersey-bom/2.27/jersey-bom-2.27.pom from central
Retrieving net/java/jvnet-parent/4/jvnet-parent-4.pom from central
Retrieving org/apache/kafka/kafka-clients/5.3.1-ccs/kafka-clients-5.3.1-ccs.pom from confluent
Retrieving com/github/luben/zstd-jni/1.4.0-1/zstd-jni-1.4.0-1.pom from central
Retrieving org/lz4/lz4-java/1.6.0/lz4-java-1.6.0.pom from central
Retrieving org/xerial/snappy/snappy-java/1.1.7.3/snappy-java-1.1.7.3.pom from central
Retrieving org/slf4j/slf4j-api/1.7.26/slf4j-api-1.7.26.pom from central
Retrieving org/slf4j/slf4j-parent/1.7.26/slf4j-parent-1.7.26.pom from central
Retrieving io/confluent/common-config/5.3.1/common-config-5.3.1.pom from confluent
Retrieving io/confluent/common-utils/5.3.1/common-utils-5.3.1.pom from confluent
Retrieving org/apache/zookeeper/zookeeper/3.4.14/zookeeper-3.4.14.pom from central
Retrieving org/slf4j/slf4j-parent/1.7.25/slf4j-parent-1.7.25.pom from central
Retrieving com/github/spotbugs/spotbugs-annotations/3.1.9/spotbugs-annotations-3.1.9.pom from central
Retrieving com/google/code/findbugs/jsr305/3.0.2/jsr305-3.0.2.pom from central
Retrieving jline/jline/0.9.94/jline-0.9.94.pom from central
Retrieving org/apache/yetus/audience-annotations/0.5.0/audience-annotations-0.5.0.pom from central
Retrieving org/apache/yetus/yetus-project/0.5.0/yetus-project-0.5.0.pom from central
Retrieving org/apache/apache/17/apache-17.pom from central
Retrieving io/netty/netty/3.10.6.Final/netty-3.10.6.Final.pom from central
Retrieving com/101tec/zkclient/0.10/zkclient-0.10.pom from central
Retrieving org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.pom from central
Retrieving org/slf4j/slf4j-parent/1.6.1/slf4j-parent-1.6.1.pom from central
Retrieving org/apache/zookeeper/zookeeper/3.4.8/zookeeper-3.4.8.pom from central
Retrieving io/netty/netty/3.7.0.Final/netty-3.7.0.Final.pom from central
Retrieving org/apache/avro/avro/1.8.1/avro-1.8.1.pom from central
Retrieving org/apache/avro/avro-parent/1.8.1/avro-parent-1.8.1.pom from central
Retrieving org/apache/avro/avro-toplevel/1.8.1/avro-toplevel-1.8.1.pom from central
Retrieving org/apache/apache/10/apache-10.pom from central
Retrieving org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.pom from central
Retrieving org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.pom from central
Retrieving com/thoughtworks/paranamer/paranamer/2.7/paranamer-2.7.pom from central
Retrieving com/thoughtworks/paranamer/paranamer-parent/2.7/paranamer-parent-2.7.pom from central
Retrieving org/codehaus/codehaus-parent/4/codehaus-parent-4.pom from central
Retrieving org/xerial/snappy/snappy-java/1.1.1.3/snappy-java-1.1.1.3.pom from central
Retrieving org/apache/commons/commons-compress/1.8.1/commons-compress-1.8.1.pom from central
Retrieving org/apache/commons/commons-parent/33/commons-parent-33.pom from central
Retrieving org/tukaani/xz/1.5/xz-1.5.pom from central
Retrieving org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.pom from central
Retrieving org/slf4j/slf4j-parent/1.7.7/slf4j-parent-1.7.7.pom from central
Retrieving io/confluent/kafka-avro-serializer/5.3.1/kafka-avro-serializer-5.3.1.pom from confluent
Retrieving com/fasterxml/jackson/core/jackson-databind/2.9.9.3/jackson-databind-2.9.9.3.pom from central
Retrieving com/fasterxml/jackson/jackson-base/2.9.9/jackson-base-2.9.9.pom from central
Retrieving com/fasterxml/jackson/jackson-bom/2.9.9/jackson-bom-2.9.9.pom from central
Retrieving com/fasterxml/jackson/jackson-parent/2.9.1.2/jackson-parent-2.9.1.2.pom from central
Retrieving com/fasterxml/oss-parent/34/oss-parent-34.pom from central
Retrieving com/fasterxml/jackson/core/jackson-annotations/2.9.0/jackson-annotations-2.9.0.pom from central
Retrieving com/fasterxml/jackson/jackson-parent/2.9.0/jackson-parent-2.9.0.pom from central
Retrieving com/fasterxml/oss-parent/28/oss-parent-28.pom from central
Retrieving com/fasterxml/jackson/core/jackson-core/2.9.9/jackson-core-2.9.9.pom from central
Retrieving org/apache/kafka/kafka-clients/2.3.1/kafka-clients-2.3.1.pom from central
Retrieving org/apache/kafka/kafka-streams/2.3.1/kafka-streams-2.3.1.pom from central
Retrieving org/apache/kafka/connect-json/2.3.1/connect-json-2.3.1.pom from central
Retrieving org/apache/kafka/connect-api/2.3.1/connect-api-2.3.1.pom from central
Retrieving com/fasterxml/jackson/core/jackson-databind/2.10.0/jackson-databind-2.10.0.pom from central
Retrieving com/fasterxml/jackson/jackson-base/2.10.0/jackson-base-2.10.0.pom from central
Retrieving com/fasterxml/jackson/jackson-bom/2.10.0/jackson-bom-2.10.0.pom from central
Retrieving com/fasterxml/jackson/jackson-parent/2.10/jackson-parent-2.10.pom from central
Retrieving com/fasterxml/oss-parent/38/oss-parent-38.pom from central
Retrieving com/fasterxml/jackson/core/jackson-annotations/2.10.0/jackson-annotations-2.10.0.pom from central
Retrieving com/fasterxml/jackson/core/jackson-core/2.10.0/jackson-core-2.10.0.pom from central
Retrieving com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.10.0/jackson-datatype-jdk8-2.10.0.pom from central
Retrieving com/fasterxml/jackson/module/jackson-modules-java8/2.10.0/jackson-modules-java8-2.10.0.pom from central
Retrieving org/rocksdb/rocksdbjni/5.18.3/rocksdbjni-5.18.3.pom from central
Retrieving org/clojure/data.fressian/0.2.1/data.fressian-0.2.1.pom from central
Retrieving org/fressian/fressian/0.6.6/fressian-0.6.6.pom from central
Retrieving org/clojure/core.cache/0.7.2/core.cache-0.7.2.pom from central
Retrieving org/clojure/data.priority-map/0.0.7/data.priority-map-0.0.7.pom from central
Retrieving io/aleph/dirigiste/0.1.5/dirigiste-0.1.5.jar from central
Retrieving io/netty/netty-transport-native-epoll/4.1.25.Final/netty-transport-native-epoll-4.1.25.Final.jar from central
Retrieving io/netty/netty-buffer/4.1.25.Final/netty-buffer-4.1.25.Final.jar from central
Retrieving io/netty/netty-transport/4.1.25.Final/netty-transport-4.1.25.Final.jar from central
Retrieving io/netty/netty-common/4.1.25.Final/netty-common-4.1.25.Final.jar from central
Retrieving io/netty/netty-transport-native-unix-common/4.1.25.Final/netty-transport-native-unix-common-4.1.25.Final.jar from central
Retrieving io/netty/netty-codec/4.1.25.Final/netty-codec-4.1.25.Final.jar from central
Retrieving io/netty/netty-codec-http/4.1.25.Final/netty-codec-http-4.1.25.Final.jar from central
Retrieving io/netty/netty-handler/4.1.25.Final/netty-handler-4.1.25.Final.jar from central
Retrieving io/netty/netty-handler-proxy/4.1.25.Final/netty-handler-proxy-4.1.25.Final.jar from central
Retrieving io/netty/netty-resolver/4.1.25.Final/netty-resolver-4.1.25.Final.jar from central
Retrieving io/netty/netty-resolver-dns/4.1.25.Final/netty-resolver-dns-4.1.25.Final.jar from central
Retrieving io/netty/netty-codec-socks/4.1.25.Final/netty-codec-socks-4.1.25.Final.jar from central
Retrieving io/netty/netty-codec-dns/4.1.25.Final/netty-codec-dns-4.1.25.Final.jar from central
Retrieving joda-time/joda-time/2.10/joda-time-2.10.jar from central
Retrieving org/apache/avro/avro/1.8.1/avro-1.8.1.jar from central
Retrieving org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar from central
Retrieving org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar from central
Retrieving com/thoughtworks/paranamer/paranamer/2.7/paranamer-2.7.jar from central
Retrieving org/apache/commons/commons-compress/1.8.1/commons-compress-1.8.1.jar from central
Retrieving org/tukaani/xz/1.5/xz-1.5.jar from central
Retrieving org/apache/zookeeper/zookeeper/3.4.14/zookeeper-3.4.14.jar from central
Retrieving com/google/code/findbugs/jsr305/3.0.2/jsr305-3.0.2.jar from central
Retrieving com/github/spotbugs/spotbugs-annotations/3.1.9/spotbugs-annotations-3.1.9.jar from central
Retrieving jline/jline/0.9.94/jline-0.9.94.jar from central
Retrieving org/apache/yetus/audience-annotations/0.5.0/audience-annotations-0.5.0.jar from central
Retrieving io/netty/netty/3.10.6.Final/netty-3.10.6.Final.jar from central
Retrieving com/101tec/zkclient/0.10/zkclient-0.10.jar from central
Retrieving org/lz4/lz4-java/1.6.0/lz4-java-1.6.0.jar from central
Retrieving com/github/luben/zstd-jni/1.4.0-1/zstd-jni-1.4.0-1.jar from central
Retrieving org/xerial/snappy/snappy-java/1.1.7.3/snappy-java-1.1.7.3.jar from central
Retrieving org/apache/kafka/kafka-clients/2.3.1/kafka-clients-2.3.1.jar from central
Retrieving org/slf4j/slf4j-api/1.7.26/slf4j-api-1.7.26.jar from central
Retrieving org/apache/kafka/kafka-streams/2.3.1/kafka-streams-2.3.1.jar from central
Retrieving org/apache/kafka/connect-json/2.3.1/connect-json-2.3.1.jar from central
Retrieving org/apache/kafka/connect-api/2.3.1/connect-api-2.3.1.jar from central
Retrieving com/fasterxml/jackson/core/jackson-databind/2.10.0/jackson-databind-2.10.0.jar from central
Retrieving com/fasterxml/jackson/core/jackson-annotations/2.10.0/jackson-annotations-2.10.0.jar from central
Retrieving com/fasterxml/jackson/core/jackson-core/2.10.0/jackson-core-2.10.0.jar from central
Retrieving com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.10.0/jackson-datatype-jdk8-2.10.0.jar from central
Retrieving org/rocksdb/rocksdbjni/5.18.3/rocksdbjni-5.18.3.jar from central
Retrieving org/clojure/data.fressian/0.2.1/data.fressian-0.2.1.jar from central
Retrieving org/fressian/fressian/0.6.6/fressian-0.6.6.jar from central
Retrieving org/clojure/core.cache/0.7.2/core.cache-0.7.2.jar from central
Retrieving org/clojure/data.priority-map/0.0.7/data.priority-map-0.0.7.jar from central
Retrieving byte-streams/byte-streams/0.2.4/byte-streams-0.2.4.jar from clojars
Retrieving manifold/manifold/0.1.8/manifold-0.1.8.jar from clojars
Retrieving riddley/riddley/0.1.14/riddley-0.1.14.jar from clojars
Retrieving aleph/aleph/0.4.6/aleph-0.4.6.jar from clojars
Retrieving primitive-math/primitive-math/0.1.6/primitive-math-0.1.6.jar from clojars
Retrieving clj-time/clj-time/0.15.1/clj-time-0.15.1.jar from clojars
Retrieving danlentz/clj-uuid/0.1.9/clj-uuid-0.1.9.jar from clojars
Retrieving fundingcircle/jackdaw/0.7.6/jackdaw-0.7.6.jar from clojars
Retrieving io/confluent/kafka-schema-registry-client/5.3.1/kafka-schema-registry-client-5.3.1.jar from confluent
Retrieving io/confluent/common-utils/5.3.1/common-utils-5.3.1.jar from confluent
Retrieving io/confluent/common-config/5.3.1/common-config-5.3.1.jar from confluent
Retrieving io/confluent/kafka-avro-serializer/5.3.1/kafka-avro-serializer-5.3.1.jar from confluent
Poznámka: knihovny jsou nainstalovány do podadresáře ~/.m2.

4. Interaktivní práce s Kafkou z REPLu, popř. z textového editoru/IDE

Pro otestování možností nabízených kombinací Kafka + Clojure je nejlepší využít interaktivní práci v REPLu, který lze zavolat jak z textového editoru, tak i z prakticky jakéhokoli IDE s podporou jazyka Clojure. O této možnosti jsem se zmínil například v souvislosti s textovým editorem Vim v textu Plugin Vim Slime. Téměř ideální kombinace tedy vypadá tak, že se v terminálu otevře Screen, popř. TMux a z adresáře, v němž je projekt https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/kafka-repl spustit interaktivní smyčku REPL:

$ lein repl
 
nREPL server started on port 37371 on host 127.0.0.1 - nrepl://127.0.0.1:37371
REPL-y 0.4.4, nREPL 0.7.0
Clojure 1.10.1
OpenJDK 64-Bit Server VM 1.8.0_191-b12
    Docs: (doc function-name-here)
          (find-doc "part-of-name-here")
  Source: (source function-name-here)
 Javadoc: (javadoc java-object-or-class-here)
    Exit: Control+D or (exit) or (quit)
 Results: Stored in vars *1, *2, *3, an exception in *e
 
kafka-repl.core=>

V dalším terminálu je pak možné spustit editor se souborem https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/kafka-repl/src/kafka_repl/core.clj a klávesovou zkratkou Ctrl+C Ctrl+C postupně do REPLu posílat jednotlivé formy ze sekce comment:

(ns kafka-repl.core
  (:gen-class))
 
(defn -main
  [& args]
  (println "Hello, World!"))
 
(comment
 
 
;; vsechny potrebne jmenne prostory
(require '[jackdaw.admin :as ja])
(require '[jackdaw.client :as jc])
(require '[jackdaw.client.log :as jl])
(require '[clojure.pprint :as pp])
 
 
;; vytvoreni noveho tematu s jednim oddilem
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
  (ja/create-topics! client [{:topic-name "test1"
                              :partition-count 1
                              :replication-factor 1
                              :topic-config {"cleanup.policy" "compact"}}]))
 
 
;; vytvoreni dalsiho tematu s jednim oddilem
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
  (ja/create-topics! client [{:topic-name "test2"
                              :partition-count 1
                              :replication-factor 1
                              :topic-config {"cleanup.policy" "compact"}}]))
 
 
;; vytvoreni tematu s deseti oddily
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
  (ja/create-topics! client [{:topic-name "test3"
                              :partition-count 10
                              :replication-factor 1
                              :topic-config {"cleanup.policy" "compact"}}]))
 
 
;; vymazani temat test1, test2 a test3
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
  (ja/delete-topics! client [{:topic-name "test1"}
                             {:topic-name "test2"}
                             {:topic-name "test3"}]))
 
 
;; vypis konfigurace brokera
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})
      configs (ja/describe-cluster client)
      id      (->> configs :controller :id)]
  (pp/pprint (ja/get-broker-config client id)))
 
 
;; vypis konfigurace clusteru
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
  (pp/pprint (ja/describe-cluster client)))
 
 
;; vypis informaci o vybranych tematech
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
  (pp/pprint (ja/describe-topics client [{:topic-name "test1"}
                                         {:topic-name "test2"}
                                         {:topic-name "test3"}])))
 
 
;; vypis informaci o vsech tematech
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
  (pp/pprint (ja/describe-topics client)))
 
 
;; vypis konfiguraci o jednom tematu
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
  (pp/pprint (ja/describe-topics-configs client [{:topic-name "test1"}])))
 
 
;; vypis konfiguraci vybranych temat
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
  (pp/pprint (ja/describe-topics-configs client [{:topic-name "test1"}
                                                 {:topic-name "test2"}
                                                 {:topic-name "test3"}])))
 
 
;; konfigurace producenta zprav
(def producer-config
  {"bootstrap.servers" "localhost:9092"
   "key.serializer" "org.apache.kafka.common.serialization.StringSerializer"
   "value.serializer" "org.apache.kafka.common.serialization.StringSerializer"
   "acks" "all"
   "client.id" "foo"})
 
 
;; poslani jedine zpravy
(with-open [producer (jc/producer producer-config)]
  (let [record-metadata (jc/produce! producer {:topic-name "test1"} "1" "Hello, Kafka!")]
    (pp/pprint @record-metadata)))
 
 
;; poslani 100 zprav
(with-open [producer (jc/producer producer-config)]
  (doseq [i (range 1 101)]
    (let [key (str i)
          value (str "Message #" i)]
      (println "Publishing message with key '" key "' and value '" value "'")
      (let [record-metadata (jc/produce! producer {:topic-name "test1"} key value)]
        (pp/pprint @record-metadata)))))
 
 
;; konfigurace konzumenta zprav
(def consumer-config
  {"bootstrap.servers" "localhost:9092"
   "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
   "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
   "auto.offset.reset" "earliest"
   "group.id"  "group-A"})

   #"auto.offset.reset" "none"
 
 
;; start konzumenta
(with-open [consumer (-> (jc/consumer consumer-config)
                         (jc/subscribe [{:topic-name "test1"}]))]
  (doseq [{:keys [key value partition timestamp offset]} (jl/log consumer 10)]
    (println "key: " key)
    (println "value: " value)
    (println "partition: " partition)
    (println "timestamp: " timestamp)
    (println "offset: " offset)))
 
 
) ; comment

5. Vytvoření nového tématu s jedním oddílem

Nejprve si popíšeme základy práce se jmenným prostorem jackdaw.admin, v němž jsou deklarovány funkce a makra určená pro administraci Apache Kafky. První demonstrační příklad slouží pro vytvoření tématu (topic), které má jen jediný oddíl (partition). Nejprve se zkonstruuje objekt typu AdminClient, a to specifikací portu, na kterém běží broker (viz též druhou kapitolu):

(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
   ...
   ...
   ...
)

Posléze se použije funkce create-topics! pro vytvoření nového tématu (či témat). Této funkci se předá objekt získaný předchozím voláním a sekvence (například seznam) popisů témat. Nejdůležitějšími atributy je jméno nového tématu a počet oddílů:

(ja/create-topics! client [{:topic-name "test1"
                            :partition-count 1
                            :replication-factor 1
                            :topic-config {"cleanup.policy" "compact"}}])))
Poznámka: vykřičník na konci jména funkce značí, že tato funkce má vedlejší efekt (ono vytvoření nového tématu).

Úplný kód dnešního prvního plnohodnotného demonstračního příkladu vypadá následovně:

(ns topic-constructor.core
  (:require [jackdaw.admin :as ja]))

(defn -main
  []
  (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
    (ja/create-topics! client [{:topic-name "test1"
                                :partition-count 1
                                :replication-factor 1
                                :topic-config {"cleanup.policy" "compact"}}])))

Příklad spustíme:

$ lein run
 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

V logu brokera by se měly objevit informace o vytvoření nového tématu:

[2020-11-27 14:43:08,474] INFO Created log for partition test1-0 in /tmp/kafka-logs/test2-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, message.format.version -> 2.4-IV1, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2020-11-27 14:41:48,973] INFO [Partition test1-0 broker=0] No checkpointed highwatermark is found for partition test1-0 (kafka.cluster.Partition)
[2020-11-27 14:41:48,974] INFO [Partition test1-0 broker=0] Log loaded for partition test1-0 with initial high watermark 0 (kafka.cluster.Partition)
[2020-11-27 14:41:48,974] INFO [Partition test1-0 broker=0] test1-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)

Navíc by se v adresáři /tmp/kafka-logs/ měl objevit nový podadresář nazvaný test1–0. Toto jméno je složeno ze jména tématu a čísla oddílu:

$ ls -l /tmp/kafka-logs/
 
total 16
-rw-rw-r--. 1 ptisnovs ptisnovs   0 Nov 27 14:39 cleaner-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs   4 Nov 27 14:42 log-start-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs  88 Nov 27 14:39 meta.properties
-rw-rw-r--. 1 ptisnovs ptisnovs  14 Nov 27 14:42 recovery-point-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs  14 Nov 27 14:42 replication-offset-checkpoint
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 14:41 test1-0

6. Korektní nastavení logování v projektu

Při spuštění předchozího příkladu se zobrazilo varování o tom, že není správně nastavena konfigurace logů. To napravíme v dnešním druhém demonstračním příkladu přidáním další závislosti do souboru project.clj:

(defproject topic-constructor "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
            :url "https://www.eclipse.org/legal/epl-2.0/"}
  :dependencies [[org.clojure/clojure "1.10.1"]
                 [fundingcircle/jackdaw "0.7.6"]
                 [ch.qos.logback/logback-classic "1.1.3"]]
  :main ^:skip-aot topic-constructor.core
  :target-path "target/%s"
  :profiles {:uberjar {:aot :all
                       :jvm-opts ["-Dclojure.compiler.direct-linking=true"]}})

Samotný zdrojový kód příkladu se liší v jediné maličkosti – je vytvořeno téma s odlišným názvem:

(ns topic-constructor.core
  (:require [jackdaw.admin :as ja]))
 
(defn -main
  []
  (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
    (ja/create-topics! client [{:topic-name "test2"
                                :partition-count 1
                                :replication-factor 1
                                :topic-config {"cleanup.policy" "compact"}}])))

Po spuštění bude příklad již velmi „ukecaný“, a to právě díky nastavení logování:

$ lein run
 
21:17:16.076 [main] INFO  o.a.k.c.admin.AdminClientConfig - AdminClientConfig values:
    bootstrap.servers = [localhost:9092]
    client.dns.lookup = default
    client.id =
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
        ...
        ...
        ...
21:17:16.381 [kafka-admin-client-thread | adminclient-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent
21:17:16.382 [kafka-admin-client-thread | adminclient-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-received
21:17:16.382 [kafka-admin-client-thread | adminclient-1] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.latency
21:17:16.382 [kafka-admin-client-thread | adminclient-1] DEBUG o.a.kafka.common.network.Selector - [AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 0
21:17:16.382 [kafka-admin-client-thread | adminclient-1] DEBUG o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Completed connection to node 0. Fetching API versions.
21:17:16.382 [kafka-admin-client-thread | adminclient-1] DEBUG o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] Initiating API versions fetch from node 0.

Nově zkonstruované téma je po spuštění příkladu zmíněno i v logu brokera:

[2020-11-27 14:43:08,474] INFO Created log for partition test2-0 in /tmp/kafka-logs/test2-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, message.format.version -> 2.4-IV1, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2020-11-27 14:43:08,475] INFO [Partition test2-0 broker=0] No checkpointed highwatermark is found for partition test2-0 (kafka.cluster.Partition)
[2020-11-27 14:43:08,475] INFO [Partition test2-0 broker=0] Log loaded for partition test2-0 with initial high watermark 0 (kafka.cluster.Partition)
[2020-11-27 14:43:08,475] INFO [Partition test2-0 broker=0] test2-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)

A po pohledu do adresáře s daty brokera se objevil nový adresář pro nově zkonstruované téma:

$ ls -l /tmp/kafka-logs/
total 16
-rw-rw-r--. 1 ptisnovs ptisnovs   0 Nov 27 14:39 cleaner-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs   4 Nov 27 14:43 log-start-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs  88 Nov 27 14:39 meta.properties
-rw-rw-r--. 1 ptisnovs ptisnovs  14 Nov 27 14:43 recovery-point-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs  24 Nov 27 14:43 replication-offset-checkpoint
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 14:41 test1-0
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 14:43 test2-0

7. Vytvoření tématu s deseti oddíly

Nyní si předchozí dva demonstrační příklady nepatrně upravíme a vytvoříme nové téma, ovšem rozdělené do deseti oddílů. Takto nakonfigurované téma nám umožní snadný paralelní přístup ke zprávám (událostem). Povšimněte si, že hodnota replication-factor je stále nastavena na jedničku, protože pracujeme pouze s lokálně běžící Kafkou a nikoli (před)produkčním systémem:

(ns topic-constructor.core
  (:require [jackdaw.admin :as ja]))
 
(defn -main
  []
  (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
    (ja/create-topics! client [{:topic-name "test3"
                                :partition-count 10
                                :replication-factor 1
                                :topic-config {"cleanup.policy" "compact"}}])))

Nyní by měl adresář s daty brokera vypadat následovně:

$ ls -l /tmp/kafka-logs/
total 16
-rw-rw-r--. 1 ptisnovs ptisnovs   0 Nov 27 14:39 cleaner-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs   4 Nov 27 14:43 log-start-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs  88 Nov 27 14:39 meta.properties
-rw-rw-r--. 1 ptisnovs ptisnovs  14 Nov 27 14:43 recovery-point-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs  24 Nov 27 14:43 replication-offset-checkpoint
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 14:41 test1-0
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 14:43 test2-0
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-0
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-1
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-2
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-3
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-4
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-5
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-6
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-7
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-8
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 30 11:55 test3-9

8. Výpis informací o konfiguraci brokeru

V dalším kroku si ukážeme, jak lze vypsat základní informace o konfiguraci brokeru. K tomuto účelu se používá funkce:

(jackdaw.admin/get-broker-config client id)

Vidíme, že této funkci musíme předat jak instanci klienta (to již známe):

(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})
   ...
   ...
   ...

Tak i jednoznačný identifikátor brokera. Ten přečteme takto:

(jackdaw.admin/describe-cluster client)

Přičemž z vrácené mapy musíme přečíst pouze ono ID, například s využitím threading makra:

(->> configs :controller :id)]

Celý kód příkladu bude vypadat následovně:

(ns get-broker-config.core
  (:require [jackdaw.admin :as ja])
  (:require [clojure.pprint :as pp]))
 
(defn -main
  [& args]
  (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})
        configs (ja/describe-cluster client)
        id      (->> configs :controller :id)]
    (pp/pprint (ja/get-broker-config client id))))

Výsledek může vypadat následovně:

{"ssl.enabled.protocols"
 {:name "ssl.enabled.protocols",
  :value "TLSv1.2,TLSv1.1,TLSv1",
  :default? true,
  :read-only? false,
  :sensitive? false},
 "advertised.host.name"
 {:name "advertised.host.name",
  :value nil,
  :default? true,
  :read-only? true,
  :sensitive? false},
 "socket.request.max.bytes"
 ...
 ...
 ...
 {:name "log.preallocate",
  :value "false",
  :default? true,
  :read-only? false,
  :sensitive? false},
 "log.roll.ms"
 {:name "log.roll.ms",
  :value nil,
  :default? true,
  :read-only? false,
  :sensitive? false},
 "password.encoder.cipher.algorithm"
 {:name "password.encoder.cipher.algorithm",
  :value "AES/CBC/PKCS5Padding",
  :default? true,
  :read-only? true,
  :sensitive? false}}

9. Přečtení a zobrazení základních informací o Kafka clusteru

Získat a dále zobrazit (či jinak zpracovat) je možné i o Kafka clusteru, a to poměrně jednoduchým způsobem – zavoláním funkce jackdaw.admin/describe-cluster, s níž jsme se mimochodem seznámili v předchozí kapitole:

(ns describe-cluster.core
  (:require [jackdaw.admin :as ja]
            [clojure.pprint :as pp]))
 
(defn -main
  [& args]
  (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
    (pp/pprint (ja/describe-cluster client))))

Výsledek by měl (pro lokálně běžící cluster) vypadat takto:

{:cluster-id "hOg8auCqRcK23gy5b8Ekow",
 :controller {:host "localhost", :port 9092, :id 0, :rack nil},
 :nodes [{:host "localhost", :port 9092, :id 0, :rack nil}]}
Poznámka: povšimněte si, že jak všechny konfigurace, tak i vrácená data mají podobu klasické mapy programovacího jazyka Clojure, i když se interně může jednat o Properties (Javovská třída).

10. Popis témat, ke kterým má klient přístup

Rozhraní AdminClient nám umožňuje získat i mapu se základním popisem témat, ke kterým má klient přístup. Použije se přitom funkce nazvaná jackdaw.admin/describe-topics, které se v tom nejjednodušším případě předá jen informace o klientovi:

(ns describe-topics.core
  (:require [jackdaw.admin :as ja]
            [clojure.pprint :as pp]))
 
(defn -main
  [& args]
  (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
    (pp/pprint (ja/describe-topics client))))

Vrátí se mapa, jejímiž klíči jsou jména témat (řetězce, nikoli keywords) a hodnotami pak informace o tématech. Povšimněte si i podrobnějších informací o replikách (kde je téma uloženo):

{"test1"
 {:is-internal? false,
  :partition-info
  ({:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 0,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]})},
 "test2"
 {:is-internal? false,
  :partition-info
  ({:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 0,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]})},
 "test3"
 {:is-internal? false,
  :partition-info
  ({:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 0,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 1,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 2,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 3,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 4,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 5,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 6,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 7,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 8,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 9,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]})}}

11. Získání popisu vybraných témat

V případě, že budeme chtít získat popis pouze pro vybrané téma nebo několik témat, je možné funkci jackdaw.admin/describe-topics ve druhém parametru předat sekvenci (například vektor) se jmény témat. A podobně jako v jiných případech, i zde se nepředávají „holé“ řetězce, ale mapy s klíčem :topic-name:

(ns describe-topics.core
  (:require [jackdaw.admin :as ja]
            [clojure.pprint :as pp]))
 
(defn -main
  [& args]
  (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
    (pp/pprint (ja/describe-topics client [{:topic-name "test1"}
                                           {:topic-name "test3"}]))))

Nyní budou ve vrácené mapě informace pouze o vybraných tématech „test1“ a „test3“:

{"test1"
 {:is-internal? false,
  :partition-info
  ({:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 0,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]})},
 "test3"
 {:is-internal? false,
  :partition-info
  ({:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 0,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 1,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 2,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 3,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 4,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 5,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 6,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 7,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 8,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]}
   {:isr [{:host "localhost", :port 9092, :id 0, :rack nil}],
    :leader {:host "localhost", :port 9092, :id 0, :rack nil},
    :partition 9,
    :replicas [{:host "localhost", :port 9092, :id 0, :rack nil}]})}}

12. Konfigurace jednotlivých témat

Poslední demonstrační příklad, který se týká čtení administračních údajů, vypíše konfiguraci všech témat, popř. konfiguraci témat vybraných. Použijeme přitom funkci jackdaw.admin/describe-topics-configs, která se volá následujícím způsobem:

(ns describe-topics-configs.core
  (:require [jackdaw.admin :as ja]
            [clojure.pprint :as pp]))
 
(defn -main
  [& args]
  (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
    (pp/pprint (ja/describe-topics-configs client [{:topic-name "test1"}
                                                   {:topic-name "test3"}]))))

Výsledkem je opět mapa, tentokrát však složitěji strukturovaná, protože o každém tématu obsahuje všechny konfigurační údaje:

{{:name "test1", :type :config-resource/topic}
 {"min.insync.replicas"
  {:name "min.insync.replicas",
   :value "1",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "follower.replication.throttled.replicas"
  {:name "follower.replication.throttled.replicas",
   :value "",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "flush.messages"
  {:name "flush.messages",
   :value "9223372036854775807",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "leader.replication.throttled.replicas"
  {:name "leader.replication.throttled.replicas",
   :value "",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "segment.bytes"
  {:name "segment.bytes",
   :value "1073741824",
   :default? false,
   :read-only? false,
   :sensitive? false},
  "flush.ms"
  {:name "flush.ms",
   :value "9223372036854775807",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "unclean.leader.election.enable"
  {:name "unclean.leader.election.enable",
   :value "false",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "retention.bytes"
  {:name "retention.bytes",
   :value "-1",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "cleanup.policy"
  {:name "cleanup.policy",
   :value "compact",
   :default? false,
   :read-only? false,
   :sensitive? false},
  "file.delete.delay.ms"
  {:name "file.delete.delay.ms",
   :value "60000",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "segment.index.bytes"
  {:name "segment.index.bytes",
   :value "10485760",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "index.interval.bytes"
  {:name "index.interval.bytes",
   :value "4096",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "delete.retention.ms"
  {:name "delete.retention.ms",
   :value "86400000",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "segment.ms"
  {:name "segment.ms",
   :value "604800000",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "min.compaction.lag.ms"
  {:name "min.compaction.lag.ms",
   :value "0",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "segment.jitter.ms"
  {:name "segment.jitter.ms",
   :value "0",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "message.timestamp.difference.max.ms"
  {:name "message.timestamp.difference.max.ms",
   :value "9223372036854775807",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "max.compaction.lag.ms"
  {:name "max.compaction.lag.ms",
   :value "9223372036854775807",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "preallocate"
  {:name "preallocate",
   :value "false",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "message.downconversion.enable"
  {:name "message.downconversion.enable",
   :value "true",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "max.message.bytes"
  {:name "max.message.bytes",
   :value "1000012",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "message.timestamp.type"
  {:name "message.timestamp.type",
   :value "CreateTime",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "retention.ms"
  {:name "retention.ms",
   :value "604800000",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "min.cleanable.dirty.ratio"
  {:name "min.cleanable.dirty.ratio",
   :value "0.5",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "message.format.version"
  {:name "message.format.version",
   :value "2.4-IV1",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "compression.type"
  {:name "compression.type",
   :value "producer",
   :default? true,
   :read-only? false,
   :sensitive? false}},
 {:name "test3", :type :config-resource/topic}
 {"min.insync.replicas"
  {:name "min.insync.replicas",
   :value "1",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "follower.replication.throttled.replicas"
  {:name "follower.replication.throttled.replicas",
   :value "",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "flush.messages"
  {:name "flush.messages",
   :value "9223372036854775807",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "leader.replication.throttled.replicas"
  {:name "leader.replication.throttled.replicas",
   :value "",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "segment.bytes"
  {:name "segment.bytes",
   :value "1073741824",
   :default? false,
   :read-only? false,
   :sensitive? false},
  "flush.ms"
  {:name "flush.ms",
   :value "9223372036854775807",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "unclean.leader.election.enable"
  {:name "unclean.leader.election.enable",
   :value "false",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "retention.bytes"
  {:name "retention.bytes",
   :value "-1",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "cleanup.policy"
  {:name "cleanup.policy",
   :value "compact",
   :default? false,
   :read-only? false,
   :sensitive? false},
  "file.delete.delay.ms"
  {:name "file.delete.delay.ms",
   :value "60000",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "segment.index.bytes"
  {:name "segment.index.bytes",
   :value "10485760",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "index.interval.bytes"
  {:name "index.interval.bytes",
   :value "4096",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "delete.retention.ms"
  {:name "delete.retention.ms",
   :value "86400000",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "segment.ms"
  {:name "segment.ms",
   :value "604800000",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "min.compaction.lag.ms"
  {:name "min.compaction.lag.ms",
   :value "0",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "segment.jitter.ms"
  {:name "segment.jitter.ms",
   :value "0",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "message.timestamp.difference.max.ms"
  {:name "message.timestamp.difference.max.ms",
   :value "9223372036854775807",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "max.compaction.lag.ms"
  {:name "max.compaction.lag.ms",
   :value "9223372036854775807",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "preallocate"
  {:name "preallocate",
   :value "false",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "message.downconversion.enable"
  {:name "message.downconversion.enable",
   :value "true",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "max.message.bytes"
  {:name "max.message.bytes",
   :value "1000012",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "message.timestamp.type"
  {:name "message.timestamp.type",
   :value "CreateTime",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "retention.ms"
  {:name "retention.ms",
   :value "604800000",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "min.cleanable.dirty.ratio"
  {:name "min.cleanable.dirty.ratio",
   :value "0.5",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "message.format.version"
  {:name "message.format.version",
   :value "2.4-IV1",
   :default? true,
   :read-only? false,
   :sensitive? false},
  "compression.type"
  {:name "compression.type",
   :value "producer",
   :default? true,
   :read-only? false,
   :sensitive? false}}}
Poznámka: jedná se o běžnou mapu jazyka Clojure, kterou je možné zpracovávat funkcemi ze standardní knihovny.

13. Vymazání tématu

Poslední funkcí ze jmenného prostoru jackdaw.admin, kterou si dnes popíšeme, je funkce určená pro smazání tématu popř. většího množství témat. Tato funkce se jmenuje delete-topics!, přičemž vykřičník na konci názvu opět znamená, že se jedná o funkci s vedlejším efektem (která uvnitř mění „stav světa“). Příklad použití – smazání dvojice témat nazvaných „test1“ a „test3“:

(ns topic-destructor.core
  (:require [jackdaw.admin :as ja]))
 
(defn -main
  []
  (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})]
    (ja/delete-topics! client [{:topic-name "test1"}
                               {:topic-name "test3"}])))

Po spuštění příkladu příkazem lein run by se v logu brokeru měla objevit informace o tom, že témata byla přejmenována a bylo naplánováno jejich smazání:

...
...
...
[2020-11-27 19:52:11,127] INFO Log for partition test3-7 is renamed to /tmp/kafka-logs/test3-7.d3fb75bbbcab4f5f95d4b818c3362533-delete and is scheduled for deletion (kafka.log.LogManager)
[2020-11-27 19:52:11,130] INFO Log for partition test3-1 is renamed to /tmp/kafka-logs/test3-1.f351358e5f294ce088335e08611a4304-delete and is scheduled for deletion (kafka.log.LogManager)
[2020-11-27 19:52:11,133] INFO Log for partition test3-4 is renamed to /tmp/kafka-logs/test3-4.51da619c145f4c40a1f6c36b5a16dbdd-delete and is scheduled for deletion (kafka.log.LogManager)
...
...
...

Přejmenování bude ihned viditelné ve výpisu adresáře /tmp/kafka-logs:

total 20
-rw-rw-r--. 1 ptisnovs ptisnovs   4 Nov 27 19:52 cleaner-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs   4 Nov 27 19:52 log-start-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs  88 Nov 27 19:45 meta.properties
-rw-rw-r--. 1 ptisnovs ptisnovs  14 Nov 27 19:52 recovery-point-offset-checkpoint
-rw-rw-r--. 1 ptisnovs ptisnovs 115 Nov 27 19:52 replication-offset-checkpoint
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:45 test1-0.640fd381aef54650aeac15ca775248b8-delete
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-0.a235153017ca4d9facdc380fbd6cdd95-delete
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-1.f351358e5f294ce088335e08611a4304-delete
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-2.c529de213c4f4c1e8374f04fc167af53-delete
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-3.fb2b771909964eb8bc4448ed62410717-delete
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-4.51da619c145f4c40a1f6c36b5a16dbdd-delete
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-5.8788f5813d3d476bb35fe70bddddd52a-delete
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-6.cfb995d4c66c45f0aa4edd313f4eb202-delete
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-7.d3fb75bbbcab4f5f95d4b818c3362533-delete
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-8.318bdc9af16f44b4a454ba2716dc6090-delete
drwxrwxr-x. 2 ptisnovs ptisnovs 120 Nov 27 19:46 test3-9.4efc612d790540deb53a0cb5365f0a91-delete

Po určité době dojde ke skutečnému vymazání (přejmenovaných) témat:

[2020-11-27 19:53:11,180] INFO Deleted log /tmp/kafka-logs/test1-0.640fd381aef54650aeac15ca775248b8-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment)
[2020-11-27 19:53:11,181] INFO Deleted offset index /tmp/kafka-logs/test1-0.640fd381aef54650aeac15ca775248b8-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment)
[2020-11-27 19:53:11,183] INFO Deleted time index /tmp/kafka-logs/test1-0.640fd381aef54650aeac15ca775248b8-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment)
[2020-11-27 19:53:11,183] INFO Deleted log for partition test1-0 in /tmp/kafka-logs/test1-0.640fd381aef54650aeac15ca775248b8-delete. (kafka.log.LogManager)

14. Vytvoření zprávy s jejím posláním do vybraného tématu (producent zpráv)

Nyní se již dostáváme k popisu funkcí a maker deklarovaných ve jmenném prostoru jackdaw.client. Nalezneme zde především funkci nazvanou produce!, která slouží k poslání zprávy do zvoleného tématu (a pochopitelně o funkci měnící stav světa; ostatně právě proto má ve svém jméně vykřičník). Nejdříve je však nutné producenta zpráv nakonfigurovat, a to s využitím mapy, která je interně převedena na Properties (jednotlivé vlastnosti budou podrobněji popsány příště):

{"bootstrap.servers" "localhost:9092"
 "key.serializer" "org.apache.kafka.common.serialization.StringSerializer"
 "value.serializer" "org.apache.kafka.common.serialization.StringSerializer"
 "acks" "all"
 "client.id" "foo"})

Dále je nutné vytvořit instanci producenta. A zde narazíme na malý problém – na konci činnosti aplikace je nutné producenta uzavřít. K otevření s automatickým uzavřením slouží standardní makro nazvané with-open:

user=> (doc with-open)
-------------------------
clojure.core/with-open
([bindings & body])
Macro
  bindings => [name init ...]
 
  Evaluates body in a try expression with names bound to the values
  of the inits, and a finally clause that calls (.close name) on each
  name in reverse order.
nil

Použití tohoto makra je snadné a přímočaré (navíc pro Clojure zcela idiomatické):

(with-open [producer (jc/producer producer-config)]
  ...
  ...
  ...
)

Jakmile máme k dispozici objekt představující producenta, můžeme do tématu poslat zprávu s využitím již výše zmíněné funkce produce!, které se předá téma, klíč a vlastní text zprávy (data):

(jc/produce! producer {:topic-name "test1"} "1" "Hello, Kafka!")

Návratovou hodnotou je future s metainformacemi o poslané zprávě.

Úplný zdrojový kód demonstračního příkladu, který zprávu pošle a zobrazí výsledek, je zobrazen pod tímto odstavcem:

(ns produce-messages-1.core
  (:require [jackdaw.client :as jc]
            [clojure.pprint :as pp]))
 
(def producer-config
  {"bootstrap.servers" "localhost:9092"
   "key.serializer" "org.apache.kafka.common.serialization.StringSerializer"
   "value.serializer" "org.apache.kafka.common.serialization.StringSerializer"
   "acks" "all"
   "client.id" "foo"})
 
(defn -main
  [& args]
  (with-open [producer (jc/producer producer-config)]
    (let [record-metadata (jc/produce! producer {:topic-name "test1"} "1" "Hello, Kafka!")]
      (pp/pprint @record-metadata))))

15. Producent většího množství zpráv, nastavení klíče

Do zvoleného tématu můžeme poslat větší množství zpráv, přičemž není zapotřebí čekat na potvrzení, že byla zpráva skutečně zpracována (záleží ovšem na konkrétním použití Kafky). Pokud se používá téma s větším množstvím oddílů, což je v našem případě téma „test3“, je nutné se zamyslet nad obsahem klíče, neboť ten je zahashován a tato hodnota je použita pro volbu oddílu. Příklad s využitím klíče obsahujícího řetězec s lokálním číslem zprávy je uveden níže:

(ns produce-messages-3.core
  (:require [jackdaw.client :as jc]
            [clojure.pprint :as pp]))
 
(def producer-config
  {"bootstrap.servers" "localhost:9092"
   "key.serializer" "org.apache.kafka.common.serialization.StringSerializer"
   "value.serializer" "org.apache.kafka.common.serialization.StringSerializer"
   "acks" "all"
   "client.id" "foo"})
 
(defn -main
  [& args]
  (with-open [producer (jc/producer producer-config)]
    (doseq [i (range 1 101)]
      (let [key (str i)
            value (str "Message #" i)]
        (println "Publishing message with key '" key "' and value '" value "'")
        (let [record-metadata (jc/produce! producer {:topic-name "test3"} key value)]
          (pp/pprint @record-metadata)))
      )))

Příklad metadat, z nichž je zřejmé, že zprávy byly skutečně distribuovány do různých oddílů:

Publishing message with key ' 1 ' and value ' Message #1 '
{:topic-name "test3",
 :partition 9,
 :timestamp 1606555678845,
 :offset 0,
 :serialized-key-size 1,
 :serialized-value-size 10}
Publishing message with key ' 2 ' and value ' Message #2 '
{:topic-name "test3",
 :partition 8,
 :timestamp 1606555678885,
 :offset 0,
 :serialized-key-size 1,
 :serialized-value-size 10}
Publishing message with key ' 3 ' and value ' Message #3 '
{:topic-name "test3",
 :partition 3,
 :timestamp 1606555678891,
 :offset 0,
 :serialized-key-size 1,
 :serialized-value-size 10}

16. Jednoduchý konzument zpráv

Zprávy uložené do vybraného tématu je pochopitelně možné číst – konzumovat. Ukažme si tedy implementaci velmi jednoduchého (až primitivního) konzumenta zpráv; přičemž podrobnější informace budou uvedeny příště. Konzument náleží do skupiny „group-A“ a v nekonečné smyčce čeká na zprávy, které ihned vypisuje na standardní výstup:

DT2021 tip

(ns consume-messages-1.core
  (:require [jackdaw.client :as jc]
            [jackdaw.client.log :as jl]
            [clojure.pprint :as pp]))
 
(def consumer-config
  {"bootstrap.servers" "localhost:9092"
   "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
   "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
   "group.id"  "group-A"})
 
(defn -main
  [& args]
  (with-open [consumer (-> (jc/consumer consumer-config)
                           (jc/subscribe [{:topic-name "test1"}]))]
    (doseq [{:keys [key value partition timestamp offset]} (jl/log consumer 10)]
      (println "key: " key)
      (println "value: " value)
      (println "partition: " partition)
      (println "timestamp: " timestamp)
      (println "offset: " offset))))

17. Obsah následující části článku

Posílání a konzumace zpráv není ve skutečnosti tak přímočará, jak by se možná mohlo z demonstračních příkladů zdát. Z tohoto důvodu se tímto tématem budeme zabývat i příště. Ovšem mnohem důležitější je skutečný „streaming“, který knihovna Jackdaw taktéž podporuje. Tímto tématem se tedy budeme taktéž zabývat, a to poměrně podrobně.

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Clojure byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-):

# Projekt Stručný popis Cesta
1 kafka-repl prázdný projekt připravený pro interaktivní práci v REPLu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/kafka-repl
2 topic-constructor vytvoření nového tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor
3 topic-constructor-logger vytvoření nového tématu; konfigurace logování https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor-logger
4 topic-constructor-10-partitions vytvoření nového tématu s deseti oddíly https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor-10-partitions
5 topic-destructor vymazání tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-destructor
6 get-broker-config zobrazení konfigurace brokera https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/get-broker-config
7 describe-cluster zobrazení aktuální konfigurace clusteru https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-cluster
8 describe-all-topics zobrazení všech dostupných témat https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-all-topics
9 describe-topics zobrazení informací o vybraných tématech https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-topics
10 describe-topics-configs zobrazení podrobnějších informací o vybraných tématech https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-topics-configs
11 produce-messages-1 vytvoření jediné zprávy https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-1
12 produce-messages-2 vytvoření 100 zpráv https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-2
13 produce-messages-3 vytvoření 100 zpráv se specifikací klíče https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-3
14 consume-messages-1 konzument zpráv z vybraného tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/consume-messages-1

19. Odkazy na předchozí části seriálu o programovacím jazyku Clojure

  1. Clojure 1: Úvod
    http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm/
  2. Clojure 2: Symboly, kolekce atd.
    http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-2-cast/
  3. Clojure 3: Funkcionální programování
    http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-3-cast-funkcionalni-programovani/
  4. Clojure 4: Kolekce, sekvence a lazy sekvence
    http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-4-cast-kolekce-sekvence-a-lazy-sekvence/
  5. Clojure 5: Sekvence, lazy sekvence a paralelní programy
    http://www.root.cz/clanky/clojure-a-bezpecne-aplikace-pro-jvm-sekvence-lazy-sekvence-a-paralelni-programy/
  6. Clojure 6: Podpora pro paralelní programování
    http://www.root.cz/clanky/programovaci-jazyk-clojure-6-futures-nejsou-jen-financni-derivaty/
  7. Clojure 7: Další funkce pro paralelní programování
    http://www.root.cz/clanky/programovaci-jazyk-clojure-7-dalsi-podpurne-prostredky-pro-paralelni-programovani/
  8. Clojure 8: Identity, stavy, neměnné hodnoty a reference
    http://www.root.cz/clanky/programovaci-jazyk-clojure-8-identity-stavy-nemenne-hodnoty-a-referencni-typy/
  9. Clojure 9: Validátory, pozorovatelé a kooperace s Javou
    http://www.root.cz/clanky/programovaci-jazyk-clojure-9-validatory-pozorovatele-a-kooperace-mezi-clojure-a-javou/
  10. Clojure 10: Kooperace mezi Clojure a Javou
    http://www.root.cz/clanky/programovaci-jazyk-clojure-10-kooperace-mezi-clojure-a-javou-pokracovani/
  11. Clojure 11: Generátorová notace seznamu/list comprehension
    http://www.root.cz/clanky/programovaci-jazyk-clojure-11-generatorova-notace-seznamu-list-comprehension/
  12. Clojure 12: Překlad programů z Clojure do bajtkódu JVM I:
    http://www.root.cz/clanky/programovaci-jazyk-clojure-12-preklad-programu-z-clojure-do-bajtkodu-jvm/
  13. Clojure 13: Překlad programů z Clojure do bajtkódu JVM II:
    http://www.root.cz/clanky/programovaci-jazyk-clojure-13-preklad-programu-z-clojure-do-bajtkodu-jvm-pokracovani/
  14. Clojure 14: Základy práce se systémem maker
    http://www.root.cz/clanky/programovaci-jazyk-clojure-14-zaklady-prace-se-systemem-maker/
  15. Clojure 15: Tvorba uživatelských maker
    http://www.root.cz/clanky/programovaci-jazyk-clojure-15-tvorba-uzivatelskych-maker/
  16. Programovací jazyk Clojure – triky při práci s řetězci
    http://www.root.cz/clanky/programovaci-jazyk-clojure-triky-pri-praci-s-retezci/
  17. Programovací jazyk Clojure – triky při práci s kolekcemi
    http://www.root.cz/clanky/programovaci-jazyk-clojure-triky-pri-praci-s-kolekcemi/
  18. Programovací jazyk Clojure – práce s mapami a množinami
    http://www.root.cz/clanky/programovaci-jazyk-clojure-prace-s-mapami-a-mnozinami/
  19. Programovací jazyk Clojure – základy zpracování XML
    http://www.root.cz/clanky/programovaci-jazyk-clojure-zaklady-zpracovani-xml/
  20. Programovací jazyk Clojure – testování s využitím knihovny Expectations
    http://www.root.cz/clanky/programovaci-jazyk-clojure-testovani-s-vyuzitim-knihovny-expectations/
  21. Programovací jazyk Clojure – některé užitečné triky použitelné (nejenom) v testech
    http://www.root.cz/clanky/programovaci-jazyk-clojure-nektere-uzitecne-triky-pouzitelne-nejenom-v-testech/
  22. Enlive – výkonný šablonovací systém pro jazyk Clojure
    http://www.root.cz/clanky/enlive-vykonny-sablonovaci-system-pro-jazyk-clojure/
  23. Nástroj Leiningen a programovací jazyk Clojure: tvorba vlastních knihoven pro veřejný repositář Clojars
    http://www.root.cz/clanky/nastroj-leiningen-a-programovaci-jazyk-clojure-tvorba-vlastnich-knihoven-pro-verejny-repositar-clojars/
  24. Novinky v Clojure verze 1.8.0
    http://www.root.cz/clanky/novinky-v-clojure-verze-1–8–0/
  25. Asynchronní programování v Clojure s využitím knihovny core.async
    http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async/
  26. Asynchronní programování v Clojure s využitím knihovny core.async (pokračování)
    http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async-pokracovani/
  27. Asynchronní programování v Clojure s využitím knihovny core.async (dokončení)
    http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async-dokonceni/
  28. Vytváříme IRC bota v programovacím jazyce Clojure
    http://www.root.cz/clanky/vytvarime-irc-bota-v-programovacim-jazyce-clojure/
  29. Gorilla REPL: interaktivní prostředí pro programovací jazyk Clojure
    https://www.root.cz/clanky/gorilla-repl-interaktivni-prostredi-pro-programovaci-jazyk-clojure/
  30. Multimetody v Clojure aneb polymorfismus bez použití OOP
    https://www.root.cz/clanky/multimetody-v-clojure-aneb-polymorfismus-bez-pouziti-oop/
  31. Práce s externími Java archivy v programovacím jazyku Clojure
    https://www.root.cz/clanky/prace-s-externimi-java-archivy-v-programovacim-jazyku-clojure/
  32. Clojure 16: Složitější uživatelská makra
    http://www.root.cz/clanky/programovaci-jazyk-clojure-16-slozitejsi-uzivatelska-makra/
  33. Clojure 17: Využití standardních maker v praxi
    http://www.root.cz/clanky/programovaci-jazyk-clojure-17-vyuziti-standardnich-maker-v-praxi/
  34. Clojure 18: Základní techniky optimalizace aplikací
    http://www.root.cz/clanky/programovaci-jazyk-clojure-18-zakladni-techniky-optimalizace-aplikaci/
  35. Clojure 19: Vývojová prostředí pro Clojure
    http://www.root.cz/clanky/programovaci-jazyk-clojure-19-vyvojova-prostredi-pro-clojure/
  36. Clojure 20: Vývojová prostředí pro Clojure (Vimu s REPL)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-20-vyvojova-prostredi-pro-clojure-integrace-vimu-s-repl/
  37. Clojure 21: ClojureScript aneb překlad Clojure do JS
    http://www.root.cz/clanky/programovaci-jazyk-clojure-21-clojurescript-aneb-preklad-clojure-do-javascriptu/
  38. Leiningen: nástroj pro správu projektů napsaných v Clojure
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure/
  39. Leiningen: nástroj pro správu projektů napsaných v Clojure (2)
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-2/
  40. Leiningen: nástroj pro správu projektů napsaných v Clojure (3)
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-3/
  41. Leiningen: nástroj pro správu projektů napsaných v Clojure (4)
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-4/
  42. Leiningen: nástroj pro správu projektů napsaných v Clojure (5)
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-5/
  43. Leiningen: nástroj pro správu projektů napsaných v Clojure (6)
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-6/
  44. Programovací jazyk Clojure a databáze (1.část)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-a-databaze-1-cast/
  45. Pluginy pro Leiningen
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-pluginy-pro-leiningen/
  46. Programovací jazyk Clojure a knihovny pro práci s vektory a maticemi
    http://www.root.cz/clanky/programovaci-jazyk-clojure-a-knihovny-pro-praci-s-vektory-a-maticemi/
  47. Programovací jazyk Clojure a knihovny pro práci s vektory a maticemi (2)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-a-knihovny-pro-praci-s-vektory-a-maticemi-2/
  48. Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk
    http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk/
  49. Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk (2)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk-2/
  50. Seesaw: knihovna pro snadnou tvorbu GUI v jazyce Clojure
    http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure/
  51. Seesaw: knihovna pro snadnou tvorbu GUI v jazyce Clojure (2)
    http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure-2/
  52. Seesaw: knihovna pro snadnou tvorbu GUI v jazyce Clojure (3)
    http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure-3/
  53. Programovací jazyk Clojure a práce s Gitem
    http://www.root.cz/clanky/programovaci-jazyk-clojure-a-prace-s-gitem/
  54. Programovací jazyk Clojure a práce s Gitem (2)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-a-prace-s-gitem-2/
  55. Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk (dokončení)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk-dokonceni/
  56. Pixie: lehký skriptovací jazyk s „kouzelnými“ schopnostmi
    https://www.root.cz/clanky/pixie-lehky-skriptovaci-jazyk-s-kouzelnymi-schopnostmi/
  57. Programovací jazyk Pixie: funkce ze základní knihovny a použití FFI
    https://www.root.cz/clanky/pro­gramovaci-jazyk-pixie-funkce-ze-zakladni-knihovny-a-pouziti-ffi/
  58. Novinky v Clojure verze 1.9.0
    https://www.root.cz/clanky/novinky-v-clojure-verze-1–9–0/
  59. Validace dat s využitím knihovny spec v Clojure 1.9.0
    https://www.root.cz/clanky/validace-dat-s-vyuzitim-knihovny-spec-v-clojure-1–9–0/
  60. Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure
    https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure/
  61. Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure (2)
    https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure-2/
  62. Incanter: prostředí pro statistické výpočty s grafickým výstupem založené na Clojure
    https://www.root.cz/clanky/incanter-prostredi-pro-statisticke-vypocty-s-grafickym-vystupem-zalozene-na-clojure/
  63. Incanter: operace s maticemi
    https://www.root.cz/clanky/incanter-operace-s-maticemi/
  64. Pixie: lehký skriptovací jazyk s „kouzelnými“ schopnostmi
    https://www.root.cz/clanky/pixie-lehky-skriptovaci-jazyk-s-kouzelnymi-schopnostmi/
  65. Programovací jazyk Pixie: funkce ze základní knihovny a použití FFI
    https://www.root.cz/clanky/pro­gramovaci-jazyk-pixie-funkce-ze-zakladni-knihovny-a-pouziti-ffi/
  66. Novinky v Clojure verze 1.9.0
    https://www.root.cz/clanky/novinky-v-clojure-verze-1–9–0/
  67. Validace dat s využitím knihovny spec v Clojure 1.9.0
    https://www.root.cz/clanky/validace-dat-s-vyuzitim-knihovny-spec-v-clojure-1–9–0/
  68. Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure
    https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure/
  69. Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure (2)
    https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure-2/
  70. Interpret programovacího jazyka Clojure integrovaný do Jupyter Notebooku
    https://www.root.cz/clanky/interpret-programovaciho-jazyka-clojure-integrovany-do-jupyter-notebooku/
  71. Babashka: interpret Clojure určený pro rychlé spouštění utilit z příkazového řádku
    https://www.root.cz/clanky/babashka-interpret-clojure-urceny-pro-rychle-spousteni-utilit-z-prikazoveho-radku/

20. Odkazy na Internetu

  1. fundingcircle/jackdaw (na Clojars)
    https://clojars.org/fundin­gcircle/jackdaw/versions/0­.7.6
  2. Dokumentace ke knihovně jackdaw
    https://cljdoc.org/d/fundin­gcircle/jackdaw/0.7.6/doc/re­adme
  3. Jackdaw AdminClient API
    https://cljdoc.org/d/fundin­gcircle/jackdaw/0.7.6/doc/jac­kdaw-adminclient-api
  4. Jackdaw Client API
    https://cljdoc.org/d/fundin­gcircle/jackdaw/0.7.6/doc/jac­kdaw-client-api
  5. Kafka.clj
    https://github.com/helins-io/kafka.clj
  6. Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách
    https://www.root.cz/clanky/pouziti-nastroje-apache-kafka-v-aplikacich-zalozenych-na-mikrosluzbach/
  7. Apache Kafka: distribuovaná streamovací platforma
    https://www.root.cz/clanky/apache-kafka-distribuovana-streamovaci-platforma/
  8. Real-Time Payments with Clojure and Apache Kafka (podcast)
    https://www.evidentsystem­s.com/news/confluent-podcast-about-apache-kafka/
  9. Kafka and Clojure – Immutable event streams
    https://practicalli.github.io/kafka-and-clojure/
  10. Kafka Streams, the Clojure way
    https://blog.davemartin.me/posts/kafka-streams-the-clojure-way/
  11. dvlopt.kafka na GitHubu
    https://github.com/helins-io/kafka.clj
  12. kafka-streams-the-clojure-way na GitHubu
    https://github.com/DaveWM/kafka-streams-the-clojure-way
  13. babashka: A Clojure babushka for the grey areas of Bash
    https://github.com/borkdude/babashka
  14. Babashka and the Small Clojure Interpreter @ ClojureD 2020 (slajdy)
    https://speakerdeck.com/bor­kdude/babashka-and-the-small-clojure-interpreter-at-clojured-2020
  15. Babashka: ukázky použití
    https://github.com/borkdu­de/babashka/blob/master/doc/e­xamples.md
  16. clojureD 2020: „Babashka and Small Clojure Interpreter: Clojure in new contexts“ by Michiel Borkent
    https://www.youtube.com/watch?v=Nw8aN-nrdEk&t=5s
  17. Meetup #124 Babashka, implementing an nREPL server & game engines with Clojure
    https://www.youtube.com/wat­ch?v=0YmZYnwyHHc
  18. The Last Programming Language (shrnutí vývoje programovacích jazyků)
    https://www.youtube.com/watch?v=P2yr-3F6PQo
  19. Shebang (Unix): Wikipedia EN
    https://en.wikipedia.org/wi­ki/Shebang_(Unix)
  20. Shebang (Unix): Wikipedia CZ
    https://cs.wikipedia.org/wi­ki/Shebang_(Unix)
  21. How to create Clojure notebooks in Jupyter
    https://s01blog.wordpress­.com/2017/12/10/how-to-create-clojure-notebooks-in-jupyter/
  22. Dokumentace k nástroji Conda
    https://docs.conda.io/en/latest/
  23. Notebook interface
    https://en.wikipedia.org/wi­ki/Notebook_interface
  24. Jypyter: open source, interactive data science and scientific computing across over 40 programming languages
    https://jupyter.org/
  25. Calysto Scheme
    https://github.com/Calysto/ca­lysto_scheme
  26. scheme.py (základ projektu Calysto Scheme)
    https://github.com/Calysto/ca­lysto_scheme/blob/master/ca­lysto_scheme/scheme.py
  27. Humane test output for clojure.test
    https://github.com/pjstadig/humane-test-output
  28. iota
    https://github.com/juxt/iota
  29. 5 Differences between clojure.spec and Schema
    https://lispcast.com/clojure.spec-vs-schema/
  30. Schema: Clojure(Script) library for declarative data description and validation
    https://github.com/plumatic/schema
  31. Zip archiv s Clojure 1.9.0
    http://repo1.maven.org/ma­ven2/org/clojure/clojure/1­.9.0/clojure-1.9.0.zip
  32. Clojure 1.9 is now available
    https://clojure.org/news/2017/12/08/clo­jure19
  33. Deps and CLI Guide
    https://clojure.org/guides/dep­s_and_cli
  34. Changes to Clojure in Version 1.9
    https://github.com/clojure/clo­jure/blob/master/changes.md
  35. clojure.spec – Rationale and Overview
    https://clojure.org/about/spec
  36. Zip archiv s Clojure 1.8.0
    http://repo1.maven.org/ma­ven2/org/clojure/clojure/1­.8.0/clojure-1.8.0.zip
  37. Clojure 1.8 is now available
    http://clojure.org/news/2016/01/19/clo­jure18
  38. Socket Server REPL
    http://dev.clojure.org/dis­play/design/Socket+Server+REPL
  39. CLJ-1671: Clojure socket server
    http://dev.clojure.org/jira/browse/CLJ-1671
  40. CLJ-1449: Add clojure.string functions for portability to ClojureScript
    http://dev.clojure.org/jira/browse/CLJ-1449
  41. Launching a Socket Server
    http://clojure.org/referen­ce/repl_and_main#_launchin­g_a_socket_server
  42. API for clojure.string
    http://clojure.github.io/clo­jure/branch-master/clojure.string-api.html
  43. Clojars:
    https://clojars.org/
  44. Seznam knihoven na Clojars:
    https://clojars.org/projects
  45. Clojure Cookbook: Templating HTML with Enlive
    https://github.com/clojure-cookbook/clojure-cookbook/blob/master/07_webapps/7–11_enlive.asciidoc
  46. An Introduction to Enlive
    https://github.com/swannodette/enlive-tutorial/
  47. Enlive na GitHubu
    https://github.com/cgrand/enlive
  48. Expectations: příklady atd.
    http://jayfields.com/expectations/
  49. Expectations na GitHubu
    https://github.com/jaycfi­elds/expectations
  50. Lein-expectations na GitHubu
    https://github.com/gar3thjon3s/lein-expectations
  51. Testing Clojure With Expectations
    https://semaphoreci.com/blog/2014/09/23/tes­ting-clojure-with-expectations.html
  52. Clojure testing TDD/BDD libraries: clojure.test vs Midje vs Expectations vs Speclj
    https://www.reddit.com/r/Clo­jure/comments/1viilt/cloju­re_testing_tddbdd_librari­es_clojuretest_vs/
  53. Testing: One assertion per test
    http://blog.jayfields.com/2007/06/tes­ting-one-assertion-per-test.html
  54. Rewriting Your Test Suite in Clojure in 24 hours
    http://blog.circleci.com/rewriting-your-test-suite-in-clojure-in-24-hours/
  55. Clojure doc: zipper
    http://clojuredocs.org/clo­jure.zip/zipper
  56. Clojure doc: parse
    http://clojuredocs.org/clo­jure.xml/parse
  57. Clojure doc: xml-zip
    http://clojuredocs.org/clojure.zip/xml-zip
  58. Clojure doc: xml-seq
    http://clojuredocs.org/clo­jure.core/xml-seq
  59. Parsing XML in Clojure
    https://github.com/clojuredocs/guides
  60. Clojure Zipper Over Nested Vector
    https://vitalyper.wordpres­s.com/2010/11/23/clojure-zipper-over-nested-vector/
  61. Understanding Clojure's PersistentVector implementation
    http://blog.higher-order.net/2009/02/01/understanding-clojures-persistentvector-implementation
  62. Understanding Clojure's PersistentHashMap (deftwice…)
    http://blog.higher-order.net/2009/09/08/understanding-clojures-persistenthashmap-deftwice.html
  63. Assoc and Clojure's PersistentHashMap: part ii
    http://blog.higher-order.net/2010/08/16/assoc-and-clojures-persistenthashmap-part-ii.html
  64. Ideal Hashtrees (paper)
    http://lampwww.epfl.ch/pa­pers/idealhashtrees.pdf
  65. Clojure home page
    http://clojure.org/
  66. Clojure (downloads)
    http://clojure.org/downloads
  67. Clojure Sequences
    http://clojure.org/sequences
  68. Clojure Data Structures
    http://clojure.org/data_structures
  69. The Structure and Interpretation of Computer Programs: 2.2.1 Representing Sequences
    http://mitpress.mit.edu/sicp/full-text/book/book-Z-H-15.html#%_sec2.2.1
  70. The Structure and Interpretation of Computer Programs: 3.3.1 Mutable List Structure
    http://mitpress.mit.edu/sicp/full-text/book/book-Z-H-22.html#%_sec3.3.1
  71. Clojure – Functional Programming for the JVM
    http://java.ociweb.com/mar­k/clojure/article.html
  72. Clojure quick reference
    http://faustus.webatu.com/clj-quick-ref.html
  73. 4Clojure
    http://www.4clojure.com/
  74. ClojureDoc (rozcestník s dokumentací jazyka Clojure)
    http://clojuredocs.org/
  75. Clojure (na Wikipedia EN)
    http://en.wikipedia.org/wiki/Clojure
  76. Clojure (na Wikipedia CS)
    http://cs.wikipedia.org/wiki/Clojure
  77. SICP (The Structure and Interpretation of Computer Programs)
    http://mitpress.mit.edu/sicp/
  78. Pure function
    http://en.wikipedia.org/wi­ki/Pure_function
  79. Funkcionální programování
    http://cs.wikipedia.org/wi­ki/Funkcionální_programová­ní
  80. Čistě funkcionální (datové struktury, jazyky, programování)
    http://cs.wikipedia.org/wi­ki/Čistě_funkcionální
  81. Clojure Macro Tutorial (Part I, Getting the Compiler to Write Your Code For You)
    http://www.learningclojure­.com/2010/09/clojure-macro-tutorial-part-i-getting.html
  82. Clojure Macro Tutorial (Part II: The Compiler Strikes Back)
    http://www.learningclojure­.com/2010/09/clojure-macro-tutorial-part-ii-compiler.html
  83. Clojure Macro Tutorial (Part III: Syntax Quote)
    http://www.learningclojure­.com/2010/09/clojure-macro-tutorial-part-ii-syntax.html
  84. Tech behind Tech: Clojure Macros Simplified
    http://techbehindtech.com/2010/09/28/clo­jure-macros-simplified/
  85. Fatvat – Exploring functional programming: Clojure Macros
    http://www.fatvat.co.uk/2009/02/clo­jure-macros.html
  86. Eulerovo číslo
    http://cs.wikipedia.org/wi­ki/Eulerovo_číslo
  87. List comprehension
    http://en.wikipedia.org/wi­ki/List_comprehension
  88. List Comprehensions in Clojure
    http://asymmetrical-view.com/2008/11/18/list-comprehensions-in-clojure.html
  89. Clojure Programming Concepts: List Comprehension
    http://en.wikibooks.org/wi­ki/Clojure_Programming/Con­cepts#List_Comprehension
  90. Clojure core API: for macro
    http://clojure.github.com/clo­jure/clojure.core-api.html#clojure.core/for
  91. cirrus machina – The Clojure for macro
    http://www.cirrusmachina.com/blog/com­ment/the-clojure-for-macro/
  92. Riastradh's Lisp Style Rules
    http://mumble.net/~campbe­ll/scheme/style.txt
  93. Dynamic Languages Strike Back
    http://steve-yegge.blogspot.cz/2008/05/dynamic-languages-strike-back.html
  94. Scripting: Higher Level Programming for the 21st Century
    http://www.tcl.tk/doc/scripting.html
  95. Java Virtual Machine Support for Non-Java Languages
    http://docs.oracle.com/ja­vase/7/docs/technotes/gui­des/vm/multiple-language-support.html
  96. Třída java.lang.String
    http://docs.oracle.com/ja­vase/7/docs/api/java/lang/Strin­g.html
  97. Třída java.lang.StringBuffer
    http://docs.oracle.com/ja­vase/7/docs/api/java/lang/Strin­gBuffer.html
  98. Třída java.lang.StringBuilder
    http://docs.oracle.com/ja­vase/7/docs/api/java/lang/Strin­gBuilder.html
  99. StringBuffer versus String
    http://www.javaworld.com/ar­ticle/2076072/build-ci-sdlc/stringbuffer-versus-string.html
  100. Threading macro (dokumentace k jazyku Clojure)
    https://clojure.github.io/clo­jure/clojure.core-api.html#clojure.core/->
  101. Understanding the Clojure → macro
    http://blog.fogus.me/2009/09/04/un­derstanding-the-clojure-macro/
  102. clojure.inspector
    http://clojure.github.io/clo­jure/clojure.inspector-api.html
  103. The Clojure Toolbox
    http://www.clojure-toolbox.com/
  104. Unit Testing in Clojure
    http://nakkaya.com/2009/11/18/unit-testing-in-clojure/
  105. Testing in Clojure (Part-1: Unit testing)
    http://blog.knoldus.com/2014/03/22/tes­ting-in-clojure-part-1-unit-testing/
  106. API for clojure.test – Clojure v1.6 (stable)
    https://clojure.github.io/clo­jure/clojure.test-api.html
  107. Leiningen: úvodní stránka
    http://leiningen.org/
  108. Leiningen: Git repository
    https://github.com/techno­mancy/leiningen
  109. leiningen-win-installer
    http://leiningen-win-installer.djpowell.net/
  110. Clojure.org: Vars and the Global Environment
    http://clojure.org/Vars
  111. Clojure.org: Refs and Transactions
    http://clojure.org/Refs
  112. Clojure.org: Atoms
    http://clojure.org/Atoms
  113. Clojure.org: Agents as Asynchronous Actions
    http://clojure.org/agents
  114. Transient Data Structures
    http://clojure.org/transients