Hlavní navigace

Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony)

29. 12. 2020
Doba čtení: 53 minut

Sdílet

Ve třetím článku o kombinaci Clojure a platformy Apache Kafka se zaměříme na popis skutečného streamingu využívajícího funkcionální vlastnosti jazyka Clojure. Ukážeme si konstrukci složitější „kolony“, rozdělení streamů do více kolon atd.

Obsah

1. Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony)

2. Kostra pipeline, na které jsou založeny všechny navazující demonstrační příklady

3. Konfigurace témat, definice pipeline i aplikace, v jejímž rámci pipeline poběží

4. Úplný zdrojový kód první aplikace s funkční pipeline

5. Spuštění a otestování dnešního prvního demonstračního příkladu

6. Transformace zpráv neboli „T“ v „Extract, transform, load“

7. Přečtení zpráv z kolony beze změny její funkce (logování atd.)

8. Úplný zdrojový kód druhé aplikace s funkční pipeline

9. Spuštění a otestování dnešního druhého demonstračního příkladu

10. Chování aplikace v případě příjmu nekorektní zprávy

11. Zalogování chyby bez pádu aplikace

12. Chování upravené aplikace v příjmu nekorektní zprávy

13. Složitější pipeline s několika transformačními funkcemi

14. Spuštění a otestování dnešního čtvrtého demonstračního příkladu

15. Rozdělení streamu do více částí

16. Spuštění a otestování dnešního pátého demonstračního příkladu

17. Transformace zpráv založená na uzávěru

18. Repositář s demonstračními příklady

19. Odkazy na předchozí části seriálu o programovacím jazyku Clojure

20. Odkazy na Internetu

1. Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony)

Ve třetím (předposledním) článku o kombinaci programovacího jazyka Clojure a streamovací platformy Apache Kafka se zaměříme na popis skutečného streamingu využívajícího funkcionální vlastnosti jazyka Clojure. Ukážeme si konstrukci složitější „kolony“, rozdělení streamů (proudu zpráv) do více kolon atd. Popsány, popř. použity budou následující funkce a makra ze jmenného prostoru jackdaw.streams:

# Funkce/makro Použito Stručný popis
1 jackdaw.streams/start řízení aplikace zahájení činnosti kolony
2 jackdaw.streams/close řízení aplikace uzavření kolony
3 jackdaw.streams/kstream vytvoření proudu vytvoření proudu konzumujícího zprávy z vybraného tématu
4 jackdaw.streams/to uvnitř proudu uložení (materializace) zpráv z kolony do tématu
5 jackdaw.streams/map uvnitř proudu transformace zpráv
6 jackdaw.streams/map-values uvnitř proudu transformace zpráv (pouze hodnot zpráv, bez klíčů)
7 jackdaw.streams/peek kolona přečtení zprávy z kolony postranním kanálem bez porušení činnosti kolony
8 jackdaw.streams/through kolona vytvoření nové kolony rozdělením streamu
9 jackdaw.streams/merge kolona spojení dvou kolon
10 jackdaw.streams/filter kolona filtrace zpráv na základě predikátu
11 jackdaw.streams/filter-not kolona filtrace zpráv na základě predikátu
Poznámka: připomeňme si, že z hlediska jazyka Clojure je celý proud zpráv (stream) považován za nekonečnou a současně i líně vyhodnocovanou sekvenci.

2. Kostra pipeline, na které jsou založeny všechny navazující demonstrační příklady

V úvodní části dnešního článku si ukážeme kostru zcela funkční pipeline (kolony), na které budou založeny i všechny navazující demonstrační příklady. Struktura projektu s touto pipeline, která je mimochodem dostupná na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-0, vypadá následovně (zvýrazněny jsou ty soubory, které jsou popsány v navazujícím textu):

.
├── CHANGELOG.md
├── README.md
├── LICENSE
├── doc
│   └── intro.md
├── project.clj
├── resources
│   └── log4j.properties
├── src
│   └── stream_pipe_0
│       └── core.clj
└── test
    └── stream_pipe_0
        └── core_test.clj

Tato kostra byla získána příkazem:

$ lein new app stream-pipe-0

Nejdůležitější je popis projektu a jeho závislostí uvedený v souboru project.clj. Ručně provedené změny jsou zvýrazněny tučným písmem:

(defproject stream-pipe-0 "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
            :url "https://www.eclipse.org/legal/epl-2.0/"}
  :dependencies [[org.clojure/clojure "1.10.1"]
                 [fundingcircle/jackdaw "0.7.6"]
                 [org.clojure/tools.logging "0.3.1"]
                 [log4j/log4j "1.2.17" :exclusions [javax.mail/mail
                                                    javax.jms/jms
                                                    com.sun.jmdk/jmxtools
                                                    com.sun.jmx/jmxri]]
                 [org.slf4j/slf4j-log4j12 "1.6.6"]]
  :main ^:skip-aot stream-pipe-0.core
  :target-path "target/%s"
  :resource-paths ["resources/"]
  :profiles {:uberjar {:aot :all
                       :jvm-opts ["-Dclojure.compiler.direct-linking=true"]}})

Aby knihovna Jackdaw vypisovala všechny důležité informace při inicializaci i běhu pipeline, musíme nakonfigurovat logování. Jedná se o soubor nazvaný log4.properties uložený v podadresáři resources. V tomto souboru je možné definovat, jakým způsobem bude proveden výpis logovacích informací na konzoli i do logovacího souboru (popř. i na jiná místa). Úroveň logovacích informací postačuje nastavit na WARN, čímž se (oproti úrovním DEBUG či INFO) poměrně dosti zásadním způsobem omezí výpis nepodstatných detailů. Navíc jsou v tomto souboru nastaveny i další důležité vlastnosti logování – maximální velikost logovacího souboru, jeho umístění v souborovém systému, případné rotace, formát výstupu aj.:

log4j.rootLogger=WARN, file, console
 
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=log/chainring.log
log4j.appender.file.MaxFileSize=5MB
log4j.appender.file.MaxBackupIndex=50
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} | %-5p | %t | %c | %m%n
 
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} | %-5p | %t | %c | %m%n

3. Konfigurace témat, definice pipeline i aplikace, v jejímž rámci pipeline poběží

Zdrojový kód celé aplikace s pipeline je uložen v souboru nazvaném src/stream_pipe0/core.clj. První důležitou hodnotou je mapa obsahující konfiguraci vstupního a výstupního tématu, resp. přesněji řečeno tématu, který je použit pro vstup dat (zpráv) i tématu použitého pro výstup (uložení výsledných zpráv). Obě témata jsou nakonfigurována takovým způsobem, že klíče i hodnoty jsou reprezentovány v JSONu:

(def topic-config
  "Konfigurace témat - vstupního i výstupního."
  {:input
   {:topic-name "input"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output
   {:topic-name "output"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}})

Následuje mapa obsahující konfiguraci celé aplikace – její jednoznačný identifikátor použitý v systému Kafka, určení adresy brokeru, nastavení vyrovnávací paměti atd. Interně je tato konfigurace převedena na properties:

(def app-config
  "Konfigurace aplikace (ve smyslu knihovny Jackdaw)."
  {"application.id" "pipe"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"})

Funkce pojmenovaná build-topology slouží pro specifikaci konfigurace celé kolony. Prozatím se jedná o značně zjednodušenou kolonu:

  1. Je vytvořen proud zpráv čtených (konzumovaných) ze vstupního tématu.
  2. Proud zpráv se ukládá do tématu výstupního.

Pro vytvoření proudu se používá funkce nazvaná jackdaw.streams/kstream, pro uložení zpráv z proudu pak funkce jackdaw.streams/to. Povšimněte si použití threading makra, což je jedno z mnoha velmi užitečných maker programovacího jazyka Clojure:

(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/to (:output topic-config)))
  builder)

Dostáváme se ke dvojici funkcí určených pro spuštění aplikace, resp. pro její zastavení. Spuštění aplikace se skládá z inicializace pipeline, inicializace vlastní aplikace a z jejího následného spuštění funkcí jackdaw.streams/start:

(defn start-app
  "Spuštění aplikace."
  [app-config topic-config]
  (let [builder (j/streams-builder)
        topology (build-topology builder topic-config)
        app (j/kafka-streams topology app-config)]
    (log/warn "Starting pipe")
    (j/start app)
    (log/warn "Pipe is up")
    app))

Opakem funkce jackdaw.streams/start je funkce se jménem jackdaw.streams/close, která aplikaci zastaví a uvolní všechny prostředky:

(defn stop-app
  "Zastavení aplikace."
  [app]
  (log/warn "Stopping pipe")
  (j/close app)
  (log/warn "Pipe is down"))

Aplikace je nakonfigurována a spuštěna z funkce -main. Ovšem ještě před vlastním spuštěním aplikace pro jistotu smažeme existující témata a znovu je vytvoříme:

(defn -main
  [& args]
  (let [broker-config {"bootstrap.servers" "localhost:9092"}]
 
    ;; na začátku pro jistotu vymažeme témata používaná pipou
    (delete-topic broker-config (:input topic-config))
    (delete-topic broker-config (:output topic-config))
 
    ;; vytvoření nových témat akceptujících zprávy ve formátu JSON
    (new-topic broker-config (:input topic-config))
    (new-topic broker-config (:output topic-config))
 
    ;; spuštění kolony
    (log/warn "Starting application")
    (let [app (start-app app-config topic-config)]
      (log/warn "App created:" app))))
Poznámka: povšimněte si, že v celém zdrojovém kódu důsledně používáme logování dostupné přes dnes již prakticky standardní balíček (jmenný prostor) clojure.tools.logging.

4. Úplný zdrojový kód první aplikace s funkční pipeline

Úplný zdrojový kód aplikace s plně funkční pipeline je dostupný na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-0/src/stream_pipe0/core.clj. Jednotlivé definice hodnot i funkcí jsou popsány v dokumentačních řetězcích:

(ns stream-pipe-0.core
  (:require [jackdaw.admin :as ja]
            [jackdaw.client :as jc]
            [jackdaw.client.log :as jl]
            [jackdaw.serdes.json]
            [jackdaw.streams :as j]
            [clojure.pprint :as pp]
            [clojure.tools.logging :as log]))
 
 
(def topic-config
  "Konfigurace témat - vstupního i výstupního."
  {:input
   {:topic-name "input"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output
   {:topic-name "output"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}})
 
 
(def app-config
  "Konfigurace aplikace (ve smyslu knihovny Jackdaw)."
  {"application.id" "pipe"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"})
 
 
(defn delete-topic
  "Pomocná funkce pro smazání vybraného tématu."
  [broker-config topic]
  (try
    (log/warn "Deleting topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/delete-topics! client [topic]))
    (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn new-topic
  "Pomocná funkce pro vytvoření nového tématu."
  [broker-config topic]
  (try
    (log/warn "Creating topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/create-topics! client [topic]))
      (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/to (:output topic-config)))
  builder)
 
 
(defn start-app
  "Spuštění aplikace."
  [app-config topic-config]
  (let [builder (j/streams-builder)
        topology (build-topology builder topic-config)
        app (j/kafka-streams topology app-config)]
    (log/warn "Starting pipe")
    (j/start app)
    (log/warn "Pipe is up")
    app))
 
 
(defn stop-app
  "Zastavení aplikace."
  [app]
  (log/warn "Stopping pipe")
  (j/close app)
  (log/warn "Pipe is down"))
 
 
(defn -main
  [& args]
  (let [broker-config {"bootstrap.servers" "localhost:9092"}]
 
    ;; na začátku pro jistotu vymažeme témata používaná pipou
    (delete-topic broker-config (:input topic-config))
    (delete-topic broker-config (:output topic-config))
 
    ;; vytvoření nových témat akceptujících zprávy ve formátu JSON
    (new-topic broker-config (:input topic-config))
    (new-topic broker-config (:output topic-config))
 
    ;; spuštění kolony
    (log/warn "Starting application")
    (let [app (start-app app-config topic-config)]
      (log/warn "App created:" app))))

5. Spuštění a otestování dnešního prvního demonstračního příkladu

Demonstrační příklad popsaný v předchozích třech kapitolách spustíme příkazem:

$ lein run

Na standardním výstupu by se měly objevit informace o tom, že byla vymazána témata „input“ a „output“, že došlo k znovuvytvoření těchto témat a konečně, že se pipeline (kolona) spustila a očekává zprávy, které budou následně zkonzumovány (současně se v podadresáři log tvoří logovací soubor s podrobnějšími informacemi):

2020-12-23 16:28:36.354 | WARN  | main | stream-pipe-0.core | Deleting topic input
2020-12-23 16:28:36.599 | WARN  | main | stream-pipe-0.core | Deleting topic output
2020-12-23 16:28:36.618 | WARN  | main | stream-pipe-0.core | Creating topic input
2020-12-23 16:28:36.650 | WARN  | main | stream-pipe-0.core | Creating topic output
2020-12-23 16:28:36.678 | WARN  | main | stream-pipe-0.core | Starting application
2020-12-23 16:28:36.797 | WARN  | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config.
2020-12-23 16:28:36.797 | WARN  | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config.
2020-12-23 16:28:36.802 | WARN  | main | stream-pipe-0.core | Starting pipe
2020-12-23 16:28:36.805 | WARN  | main | stream-pipe-0.core | Pipe is up
2020-12-23 16:28:36.807 | WARN  | main | stream-pipe-0.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x61d8a491 org.apache.kafka.streams.KafkaStreams@61d8a491]

V dalším terminálu použijeme nástroj kafkacat pro poslání zprávy, resp. přesněji řečeno většího množství zpráv do tématu „input“. Zprávy musí být ve formátu JSON, ovšem jejich obsah není žádným způsobem kontrolován:

$ kafkacat -P -b localhost:9092 -t input
 
{"foo":"bar"}
{}

A konečně se ve třetím terminálu pokusíme (opět nástrojem kafkacat) přečíst zprávy z tématu „output“. Formát těchto zpráv i jejich obsah by měl odpovídat zprávám posílaným ze druhého terminálu:

$ kafkacat -C -b localhost:9092 -t output
 
% Reached end of topic output [0] at offset 1
{"foo":"bar"}
% Reached end of topic output [0] at offset 2
{}
% Reached end of topic output [0] at offset 3

6. Transformace zpráv neboli „T“ v „Extract, transform, load“

Konzumace zpráv z jednoho tématu s jejich přímým uložením do tématu jiného samozřejmě nepředstavuje žádnou přelomovou technologii, takže si nyní ukážeme, jak by mohla probíhat nějaká skutečná transformace zpráv. Samotná transformace může být (v těch jednodušších případech) představována čistou funkcí, která pouze zpracuje svůj vstup (tedy zprávu získanou z proudu) a jejím výstupem bude transformovaná zpráva. Zprávy v proudu obsahují dvojici klíč+hodnota, kterou rozdělíme na klíč a hodnotu a vytvoříme z nich zprávu novou. Toto rozdělení je možné provést buď explicitně:

(defn etl
  "Transformační funkce."
  [message]
  (let [k (nth message 0)
        v (nth message 1)]
    [k {:result (+ (:x v) (:y v))}]))

Ovšem kratší a idiomatičtější kód by v Clojure mohl vypadat takto:

(defn etl
  "Transformační funkce."
  [[k v]]
  [k {:result (+ (:x v) (:y v))}])

Nyní tedy máme funkci, která dokáže zprávu rozdělit na klíč a hodnotu a vytvořit zprávu novou. Klíč nové zprávy zůstane původní, ale hodnota se bude lišit – z původní zprávy se získají hodnoty uložené pod klíči „x“ a „y“, tyto hodnoty se sečtou a výsledek se uloží do výsledné zprávy pod klíčem „result“ (i výsledná zpráva bude uložena v JSONu).

Transformační funkci přidáme do pipeliny s využitím jackdaw.streams/map (viz podtržený řádek):

(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/map etl)
      (j/to (:output topic-config)))
  builder)
Poznámka: tímto způsobem lze do pipeliny přidat i libovolné další množství transformačních funkcí. Ty budou tvořit skutečnou kolonu a představovat tak funkcionální obdobu klasické Unixové kolony.

7. Přečtení zpráv z kolony beze změny její funkce (logování atd.)

V některých případech je užitečné zprávy z kolony získat „vedlejším kanálem“, tedy tak, aby se činnost kolony neporušila. Takto lze například zajistit logování zpráv atd. I pro tuto činnost existuje v knihovně Jackdaw funkce (vkládaná do kolony); ta se jmenuje jackdaw.streams/peek. Této funkci se předá jak vlastní stream, tak i funkce zavolaná pro každou zprávu. Při vložení jackdaw.streams/peek do threading makra se pochopitelně první parametr vynechá.

Snadno tedy můžeme do logu vložit jak informaci o původní zprávě, tak i o zprávě transformované:

(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/peek log-message)
      (j/map etl)
      (j/peek log-message)
      (j/to (:output topic-config)))
  builder)

Často se setkáme s tím, že jednoduché funkce (typu log-message či etl v našem případě) jsou definovány jako funkce anonymní:

(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/peek (fn [[k v]]
                (log/warn "Received message with key:   " k " and value:" v)))
      (j/map etl)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/to (:output topic-config)))
  builder)
Poznámka: jméno funkce peek je zvoleno korektně, protože se u abstraktních datových typů (a nekonečné sekvence – streamy – sem také spadají) jedná o operaci určenou pro přečtení prvku bez jeho odstranění z daného typu (viz též https://en.wikipedia.org/wi­ki/Peek_(data_type_operati­on).

8. Úplný zdrojový kód druhé aplikace s funkční pipeline

Úplný zdrojový kód dnešní druhé aplikace s plně funkční pipeline je dostupný na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-1/src/stream_pipe1/core.clj. Jednotlivé definice hodnot i funkcí jsou popsány v dokumentačních řetězcích:

(ns stream-pipe-1.core
  (:require [jackdaw.admin :as ja]
            [jackdaw.client :as jc]
            [jackdaw.client.log :as jl]
            [jackdaw.serdes.json]
            [jackdaw.streams :as j]
            [clojure.pprint :as pp]
            [clojure.tools.logging :as log]))
 
 
(def topic-config
  "Konfigurace témat - vstupního i výstupního."
  {:input
   {:topic-name "input"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output
   {:topic-name "output"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}})
 
 
(def app-config
  "Konfigurace aplikace (ve smyslu knihovny Jackdaw)."
  {"application.id" "pipe"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"})
 
 
(defn delete-topic
  "Pomocná funkce pro smazání vybraného tématu."
  [broker-config topic]
  (try
    (log/warn "Deleting topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/delete-topics! client [topic]))
    (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn new-topic
  "Pomocná funkce pro vytvoření nového tématu."
  [broker-config topic]
  (try
    (log/warn "Creating topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/create-topics! client [topic]))
      (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn etl
  "Transformační funkce."
  [[k v]]
  [k {:result (+ (:x v) (:y v))}])
 
 
(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/peek (fn [[k v]]
                (log/warn "Received message with key:   " k " and value:" v)))
      (j/map etl)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/to (:output topic-config)))
  builder)
 
 
(defn start-app
  "Spuštění aplikace."
  [app-config topic-config]
  (let [builder (j/streams-builder)
        topology (build-topology builder topic-config)
        app (j/kafka-streams topology app-config)]
    (log/warn "Starting pipe")
    (j/start app)
    (log/warn "Pipe is up")
    app))
 
 
(defn stop-app
  "Zastavení aplikace."
  [app]
  (log/warn "Stopping pipe")
  (j/close app)
  (log/warn "Pipe is down"))
 
 
(defn -main
  [& args]
  (let [broker-config {"bootstrap.servers" "localhost:9092"}]
 
    ;; na začátku pro jistotu vymažeme témata používaná pipou
    (delete-topic broker-config (:input topic-config))
    (delete-topic broker-config (:output topic-config))
 
    ;; vytvoření nových témat akceptujících zprávy ve formátu JSON
    (new-topic broker-config (:input topic-config))
    (new-topic broker-config (:output topic-config))
 
    ;; spuštění kolony
    (log/warn "Starting application")
    (let [app (start-app app-config topic-config)]
      (log/warn "App created:" app))))

9. Spuštění a otestování dnešního druhého demonstračního příkladu

Opět si můžeme vyzkoušet činnost dnešního druhého demonstračního příkladu, tentokrát i s transformací zpráv a s logováním přidaným do kolony. Nejprve aplikaci spustíme:

$ lein run
 
2020-12-23 16:52:16.362 | WARN  | main | stream-pipe-1.core | Deleting topic input
2020-12-23 16:52:16.600 | WARN  | main | stream-pipe-1.core | Deleting topic output
2020-12-23 16:52:16.615 | WARN  | main | stream-pipe-1.core | Creating topic input
2020-12-23 16:52:16.643 | WARN  | main | stream-pipe-1.core | Creating topic output
2020-12-23 16:52:16.669 | WARN  | main | stream-pipe-1.core | Starting application
2020-12-23 16:52:16.786 | WARN  | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config.
2020-12-23 16:52:16.787 | WARN  | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config.
2020-12-23 16:52:16.792 | WARN  | main | stream-pipe-1.core | Starting pipe
2020-12-23 16:52:16.796 | WARN  | main | stream-pipe-1.core | Pipe is up
2020-12-23 16:52:16.798 | WARN  | main | stream-pipe-1.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x6a97517 org.apache.kafka.streams.KafkaStreams@6a97517]

Následně do vstupního tématu pošleme několik zpráv ve správném formátu, tedy s klíčem i hodnotou v JSONu, přičemž hodnota obsahuje dvě numerické hodnoty s klíči x a y:

$ kafkacat -P -b localhost:9092 -t input -K:
 
"***":{"x":1, "y":2}
"---":{"x":100, "y":-100}

Obě zprávy by měly být správně zpracovány, což se dozvíme z logu (nejprve je zalogována zpráva vstupní, poté zpráva transformovaná):

2020-12-23 16:52:18.853 | WARN  | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Received message with key:    ***  and value: {:x 1, :y 2}
2020-12-23 16:52:18.856 | WARN  | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Transformed message with key: ***  and value: {:result 3}
2020-12-23 16:52:33.498 | WARN  | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Received message with key:    ---  and value: {:x 100, :y -100}
2020-12-23 16:52:33.498 | WARN  | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Transformed message with key: ---  and value: {:result 0}

A konečně výsledky přečteme z výstupního tématu:

$ kafkacat -C -b localhost:9092 -t output
 
% Reached end of topic output [0] at offset 0
{"result":3}
% Reached end of topic output [0] at offset 1
{"result":0}

10. Chování aplikace v případě příjmu nekorektní zprávy

Nyní si vyzkoušejme, co se stane v případě, že do vstupního tématu pošleme zprávu s neočekávaným obsahem, například zprávu, která vůbec není zakódována do JSONu:

$ kafkacat -P -b localhost:9092 -t input
 
xyz

V takovém případě dojde k pádu aplikace, a to již při pokusu o deserializaci zprávy:

2020-12-23 16:32:57.649 | ERROR | pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1 | org.apache.kafka.streams.errors.LogAndFailExceptionHandler | Exception caught during Deserialization, taskId: 0_0, topic: input, partition: 0, offset: 0
java.lang.Exception: JSON error (unexpected character): x
        at clojure.data.json$_read.invokeStatic(json.clj:226)
        at clojure.data.json$_read.invoke(json.clj:177)
        at clojure.data.json$read.invokeStatic(json.clj:272)
        at clojure.data.json$read.doInvoke(json.clj:228)
        at clojure.lang.RestFn.applyTo(RestFn.java:139)
        at clojure.core$apply.invokeStatic(core.clj:667)
        at clojure.core$apply.invoke(core.clj:660)
        at clojure.data.json$read_str.invokeStatic(json.clj:278)
        at clojure.data.json$read_str.doInvoke(json.clj:274)
        at clojure.lang.RestFn.invoke(RestFn.java:439)
        at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34)
        at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
2020-12-23 16:32:57.652 | ERROR | pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1 | org.apache.kafka.streams.processor.internals.StreamThread | stream-thread [pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: java.lang.Exception: JSON error (unexpected character): x
        at clojure.data.json$_read.invokeStatic(json.clj:226)
        at clojure.data.json$_read.invoke(json.clj:177)
        at clojure.data.json$read.invokeStatic(json.clj:272)
        at clojure.data.json$read.doInvoke(json.clj:228)
        at clojure.lang.RestFn.applyTo(RestFn.java:139)
        at clojure.core$apply.invokeStatic(core.clj:667)
        at clojure.core$apply.invoke(core.clj:660)
        at clojure.data.json$read_str.invokeStatic(json.clj:278)
        at clojure.data.json$read_str.doInvoke(json.clj:274)
        at clojure.lang.RestFn.invoke(RestFn.java:439)
        at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34)
        at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        ... 8 more
2020-12-23 16:32:57.662 | ERROR | pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1 | org.apache.kafka.streams.KafkaStreams | stream-client [pipe-934ea525-1ffd-4c1a-b8a0-89a827416553] All stream threads have died. The instance will be in error state and should be closed.

11. Zalogování chyby bez pádu aplikace

Aby aplikace nezhavarovala při jakémkoli pokusu o zpracování zprávy s nekorektním formátem, budeme muset nastavit odlišný handler, tedy přesněji řečeno třídu implementující rozhraní DeserializationExceptionHandler. To lze udělat více způsoby – vytvořit si takovou třídu s využitím proxy, popř. gen-class. Taková třída musí mít přepsánu metodu handle:

DeserializationExceptionHandler.DeserializationHandlerResponse handle(ProcessorContext context,
                                                                      ConsumerRecord<byte[],byte[]> record,
                                                                      Exception exception)

Existuje ovšem ještě jednodušší řešení – použít již vhodně připravenou třídu, konkrétně třídu org.apache.kafka.streams.e­rrors.LogAndContinueExcep­tionHandler. To se provede následujícím způsobem při konfiguraci aplikace (poslední podtržený řádek):

(def app-config
  "Konfigurace aplikace (ve smyslu knihovny Jackdaw)."
  {"application.id" "pipe"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"
   "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"})
Poznámka: třídu je nutné specifikovat jménem (řetězec) nebo přímo identifikátorem třídy.

Pro úplnost si uveďme celý zdrojový kód takto upraveného demonstračního příkladu (i když je úprava jednořádková):

(ns stream-pipe-2.core
  (:require [jackdaw.admin :as ja]
            [jackdaw.client :as jc]
            [jackdaw.client.log :as jl]
            [jackdaw.serdes.json]
            [jackdaw.streams :as j]
            [clojure.pprint :as pp]
            [clojure.tools.logging :as log]))
 
 
(def topic-config
  "Konfigurace témat - vstupního i výstupního."
  {:input
   {:topic-name "input"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output
   {:topic-name "output"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}})
 
 
(def app-config
  "Konfigurace aplikace (ve smyslu knihovny Jackdaw)."
  {"application.id" "pipe"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"
   "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"})
 
 
(defn delete-topic
  "Pomocná funkce pro smazání vybraného tématu."
  [broker-config topic]
  (try
    (log/warn "Deleting topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/delete-topics! client [topic]))
    (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn new-topic
  "Pomocná funkce pro vytvoření nového tématu."
  [broker-config topic]
  (try
    (log/warn "Creating topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/create-topics! client [topic]))
      (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn etl
  "Transformační funkce."
  [[k v]]
  [k {:result (+ (:x v) (:y v))}])
 
 
(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/peek (fn [[k v]]
                (log/warn "Received message with key:   " k " and value:" v)))
      (j/map etl)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/to (:output topic-config)))
  builder)
 
 
(defn start-app
  "Spuštění aplikace."
  [app-config topic-config]
  (let [builder (j/streams-builder)
        topology (build-topology builder topic-config)
        app (j/kafka-streams topology app-config)]
    (log/warn "Starting pipe")
    (j/start app)
    (log/warn "Pipe is up")
    app))
 
 
(defn stop-app
  "Zastavení aplikace."
  [app]
  (log/warn "Stopping pipe")
  (j/close app)
  (log/warn "Pipe is down"))
 
 
(defn -main
  [& args]
  (let [broker-config {"bootstrap.servers" "localhost:9092"}]
 
    ;; na začátku pro jistotu vymažeme témata používaná pipou
    (delete-topic broker-config (:input topic-config))
    (delete-topic broker-config (:output topic-config))
 
    ;; vytvoření nových témat akceptujících zprávy ve formátu JSON
    (new-topic broker-config (:input topic-config))
    (new-topic broker-config (:output topic-config))
 
    ;; spuštění kolony
    (log/warn "Starting application")
    (let [app (start-app app-config topic-config)]
      (log/warn "App created:" app))))

12. Chování upravené aplikace v příjmu nekorektní zprávy

Nyní si můžeme chování aplikace upravené v rámci předchozí kapitoly otestovat. Nejdříve s korektními zprávami:

$ kafkacat -P -b localhost:9092 -t input -K:
 
"x":{"x":1, "y":2}
"y":{"x":10, "y":20}

Chování aplikace by se v takovém případě nemělo žádným způsobem změnit:

$ kafkacat -C -b localhost:9092 -t output
 
% Reached end of topic output [0] at offset 0
{"result":3,"timestamp":"Fri Dec 25 18:15:44 CET 2020"}
% Reached end of topic output [0] at offset 1
{"result":30,"timestamp":"Fri Dec 25 18:16:02 CET 2020"}
% Reached end of topic output [0] at offset 2

Nyní zkusme do tématu „input“ předat nekorektně zakódovanou zprávu:

$ kafkacat -P -b localhost:9092 -t input -K:
 
foobar

V takovém případě by se měla chyba (i celý výpis zásobníkových rámců) zalogovat, ale aplikace nezhavaruje:

2020-12-23 17:28:22.197 | WARN  | pipe-3cb9ae24-57a5-4d20-b4f7-a89e9a311445-StreamThread-1 | org.apache.kafka.streams.errors.LogAndContinueExceptionHandler | Exception caught during Deserialization, taskId: 0_0, topic: input, partition: 0, offset: 0
java.lang.Exception: JSON error (expected false)
        at clojure.data.json$_read.invokeStatic(json.clj:215)
        at clojure.data.json$_read.invoke(json.clj:177)
        at clojure.data.json$read.invokeStatic(json.clj:272)
        at clojure.data.json$read.doInvoke(json.clj:228)
        at clojure.lang.RestFn.applyTo(RestFn.java:139)
        at clojure.core$apply.invokeStatic(core.clj:667)
        at clojure.core$apply.invoke(core.clj:660)
        at clojure.data.json$read_str.invokeStatic(json.clj:278)
        at clojure.data.json$read_str.doInvoke(json.clj:274)
        at clojure.lang.RestFn.invoke(RestFn.java:439)
        at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34)
        at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
e020-12-23 17:28:22.205 | WARN  | pipe-3cb9ae24-57a5-4d20-b4f7-a89e9a311445-StreamThread-1 | org.apache.kafka.streams.processor.internals.RecordDeserializer | task [0_0] Skipping record due to deserialization error. topic=[input] partition=[0] offset=[0]
java.lang.Exception: JSON error (expected false)
        at clojure.data.json$_read.invokeStatic(json.clj:215)
        at clojure.data.json$_read.invoke(json.clj:177)
        at clojure.data.json$read.invokeStatic(json.clj:272)
        at clojure.data.json$read.doInvoke(json.clj:228)
        at clojure.lang.RestFn.applyTo(RestFn.java:139)
        at clojure.core$apply.invokeStatic(core.clj:667)
        at clojure.core$apply.invoke(core.clj:660)
        at clojure.data.json$read_str.invokeStatic(json.clj:278)
        at clojure.data.json$read_str.doInvoke(json.clj:274)
        at clojure.lang.RestFn.invoke(RestFn.java:439)
        at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34)
        at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)

13. Složitější pipeline s několika transformačními funkcemi

Transformaci zpráv můžeme v koloně provést několikrát za sebou, opět s použitím nám již známé funkce jackdaw.streams.map. Pouze je nutné zajistit, aby transformační funkce vracely hodnotu, jejíž struktura odpovídá zprávám (dvojice klíč:hodnota):

(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/map etl-1)
      (j/map etl-2)
      (j/to (:output topic-config)))
  builder)

Druhá transformační funkce do zprávy přidá časové razítko:

(defn etl-2
  "Transformační funkce."
  [[k v]]
  [k (assoc v :timestamp (str (new java.util.Date)))])

Pro sledování činnosti kolony je však vhodné přidat i logování s informacemi o průběhu jednotlivých transformací, takže samotná kolona bude o dva kroky delší:

(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/peek (fn [[k v]]
                (log/warn "Received message with key:   " k " and value:" v)))
      (j/map etl-1)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/map etl-2)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/to (:output topic-config)))
  builder)

Opět si pro úplnost ukažme, jak bude vypadat úplný zdrojový kód takto upraveného demonstračního příkladu, jenž nalezneme na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-3/src/stream_pipe3/core.clj:

(ns stream-pipe-3.core
  (:require [jackdaw.admin :as ja]
            [jackdaw.client :as jc]
            [jackdaw.client.log :as jl]
            [jackdaw.serdes.json]
            [jackdaw.streams :as j]
            [clojure.pprint :as pp]
            [clojure.tools.logging :as log]))
 
 
(def topic-config
  "Konfigurace témat - vstupního i výstupního."
  {:input
   {:topic-name "input"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output
   {:topic-name "output"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}})
 
 
(def app-config
  "Konfigurace aplikace (ve smyslu knihovny Jackdaw)."
  {"application.id" "pipe"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"
   "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"})
 
 
(defn delete-topic
  "Pomocná funkce pro smazání vybraného tématu."
  [broker-config topic]
  (try
    (log/warn "Deleting topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/delete-topics! client [topic]))
    (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn new-topic
  "Pomocná funkce pro vytvoření nového tématu."
  [broker-config topic]
  (try
    (log/warn "Creating topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/create-topics! client [topic]))
      (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn etl-1
  "Transformační funkce."
  [[k v]]
  [k {:result (+ (:x v) (:y v))}])
 
 
(defn etl-2
  "Transformační funkce."
  [[k v]]
  [k (assoc v :timestamp (str (new java.util.Date)))])
 
 
(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/peek (fn [[k v]]
                (log/warn "Received message with key:   " k " and value:" v)))
      (j/map etl-1)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/map etl-2)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/to (:output topic-config)))
  builder)
 
 
(defn start-app
  "Spuštění aplikace."
  [app-config topic-config]
  (let [builder (j/streams-builder)
        topology (build-topology builder topic-config)
        app (j/kafka-streams topology app-config)]
    (log/warn "Starting pipe")
    (j/start app)
    (log/warn "Pipe is up")
    app))
 
 
(defn stop-app
  "Zastavení aplikace."
  [app]
  (log/warn "Stopping pipe")
  (j/close app)
  (log/warn "Pipe is down"))
 
 
(defn -main
  [& args]
  (let [broker-config {"bootstrap.servers" "localhost:9092"}]
 
    ;; na začátku pro jistotu vymažeme témata používaná pipou
    (delete-topic broker-config (:input topic-config))
    (delete-topic broker-config (:output topic-config))
 
    ;; vytvoření nových témat akceptujících zprávy ve formátu JSON
    (new-topic broker-config (:input topic-config))
    (new-topic broker-config (:output topic-config))
 
    ;; spuštění kolony
    (log/warn "Starting application")
    (let [app (start-app app-config topic-config)]
      (log/warn "App created:" app))))

14. Spuštění a otestování dnešního čtvrtého demonstračního příkladu

Aplikaci popsanou v předchozí kapitole spustíme nám již známým příkazem lein run:

$ lein run
 
2020-12-25 18:15:32.305 | WARN  | main | stream-pipe-3.core | Deleting topic input
2020-12-25 18:15:32.561 | WARN  | main | stream-pipe-3.core | Deleting topic output
2020-12-25 18:15:32.587 | WARN  | main | stream-pipe-3.core | Creating topic input
2020-12-25 18:15:32.630 | WARN  | main | stream-pipe-3.core | Creating topic output
2020-12-25 18:15:32.666 | WARN  | main | stream-pipe-3.core | Starting application
2020-12-25 18:15:32.776 | WARN  | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config.
2020-12-25 18:15:32.776 | WARN  | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config.
2020-12-25 18:15:32.781 | WARN  | main | stream-pipe-3.core | Starting pipe
2020-12-25 18:15:32.783 | WARN  | main | stream-pipe-3.core | Pipe is up
2020-12-25 18:15:32.784 | WARN  | main | stream-pipe-3.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x8f374de org.apache.kafka.streams.KafkaStreams@8f374de]

Po poslání korektně zakódované zprávy je možné v logovacím souboru (popř. přímo na terminálu aplikace) sledovat, jak je zpráva postupně transformována.

První zpráva:

2020-12-25 18:15:44.393 | WARN  | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Received message with key:    x  and value: {:x 1, :y 2}
2020-12-25 18:15:44.394 | WARN  | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: x  and value: {:result 3}
2020-12-25 18:15:44.398 | WARN  | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: x  and value: {:result 3, :timestamp Fri Dec 25 18:15:44 CET 2020}

Druhá zpráva:

2020-12-25 18:16:02.476 | WARN  | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Received message with key:    y  and value: {:x 10, :y 20}
2020-12-25 18:16:02.477 | WARN  | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: y  and value: {:result 30}
2020-12-25 18:16:02.478 | WARN  | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: y  and value: {:result 30, :timestamp Fri Dec 25 18:16:02 CET 2020}

15. Rozdělení streamu do více částí

Někdy se setkáme s nutností rozdělení celého streamu do více částí, což by v terminologii klasických Unixových kolon velmi zhruba odpovídalo příkazu tee. I to je pochopitelně v knihovně Jackdaw možné; konkrétně je vytvoření nové větve streamu zajištěno funkcí jackdaw.streams/through, která zajistí uložení zprávy do zvoleného tématu. Pokud tedy budeme chtít jednotlivé (částečně) transformované zprávy postupně uložit do témat nazvaných „output1“, „output2“ a „output3“, bude konfigurace kolony vypadat takto:

(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/peek (fn [[k v]]
                (log/warn "Received message with key:   " k " and value:" v)))
      (j/through (:output-1 topic-config))
      (j/map etl-1)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/through (:output-2 topic-config))
      (j/map etl-2)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/to (:output-3 topic-config)))
  builder)

Zdrojový kód příkladu je následně nutné upravit tak, aby se nová témata na začátku smazala a ihned poté znovu inicializovala:

(ns stream-pipe-4.core
  (:require [jackdaw.admin :as ja]
            [jackdaw.client :as jc]
            [jackdaw.client.log :as jl]
            [jackdaw.serdes.json]
            [jackdaw.streams :as j]
            [clojure.pprint :as pp]
            [clojure.tools.logging :as log]))
 
 
(def topic-config
  "Konfigurace témat - vstupního i výstupního."
  {:input
   {:topic-name "input"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output-1
   {:topic-name "output1"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output-2
   {:topic-name "output2"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output-3
   {:topic-name "output3"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}})
 
 
(def app-config
  "Konfigurace aplikace (ve smyslu knihovny Jackdaw)."
  {"application.id" "pipe"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"
   "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"})
 
 
(defn delete-topic
  "Pomocná funkce pro smazání vybraného tématu."
  [broker-config topic]
  (try
    (log/warn "Deleting topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/delete-topics! client [topic]))
    (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn new-topic
  "Pomocná funkce pro vytvoření nového tématu."
  [broker-config topic]
  (try
    (log/warn "Creating topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/create-topics! client [topic]))
      (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn etl-1
  "Transformační funkce."
  [[k v]]
  [k {:result (+ (:x v) (:y v))}])
 
 
(defn etl-2
  "Transformační funkce."
  [[k v]]
  [k (assoc v :timestamp (str (new java.util.Date)))])
 
 
(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/peek (fn [[k v]]
                (log/warn "Received message with key:   " k " and value:" v)))
      (j/through (:output-1 topic-config))
      (j/map etl-1)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/through (:output-2 topic-config))
      (j/map etl-2)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/to (:output-3 topic-config)))
  builder)
 
 
(defn start-app
  "Spuštění aplikace."
  [app-config topic-config]
  (let [builder (j/streams-builder)
        topology (build-topology builder topic-config)
        app (j/kafka-streams topology app-config)]
    (log/warn "Starting pipe")
    (j/start app)
    (log/warn "Pipe is up")
    app))
 
 
(defn stop-app
  "Zastavení aplikace."
  [app]
  (log/warn "Stopping pipe")
  (j/close app)
  (log/warn "Pipe is down"))
 
 
(defn -main
  [& args]
  (let [broker-config {"bootstrap.servers" "localhost:9092"}]
 
    ;; na začátku pro jistotu vymažeme témata používaná pipou
    (delete-topic broker-config (:input topic-config))
    (delete-topic broker-config (:output-1 topic-config))
    (delete-topic broker-config (:output-2 topic-config))
    (delete-topic broker-config (:output-3 topic-config))
 
    ;; vytvoření nových témat akceptujících zprávy ve formátu JSON
    (new-topic broker-config (:input topic-config))
    (new-topic broker-config (:output-1 topic-config))
    (new-topic broker-config (:output-2 topic-config))
    (new-topic broker-config (:output-3 topic-config))
 
    ;; spuštění kolony
    (log/warn "Starting application")
    (let [app (start-app app-config topic-config)]
      (log/warn "App created:" app))))

16. Spuštění a otestování dnešního pátého demonstračního příkladu

Pro otestování funkce dnešního pátého demonstračního příkladu nejdříve do vstupního tématu pošleme zprávu s korektním formátem:

$ kafkacat -P -b localhost:9092 -t input -K:
 
"x":{"x":1, "y":2}

Následně aplikaci spustíme a budeme sledovat její činnost:

$ lein run
 
2020-12-25 19:09:08.876 | WARN  | main | stream-pipe-4.core | Deleting topic input
2020-12-25 19:09:09.099 | WARN  | main | stream-pipe-4.core | Deleting topic output1
2020-12-25 19:09:09.115 | WARN  | main | stream-pipe-4.core | Deleting topic output2
2020-12-25 19:09:09.128 | WARN  | main | stream-pipe-4.core | Deleting topic output3
2020-12-25 19:09:09.140 | WARN  | main | stream-pipe-4.core | Creating topic input
2020-12-25 19:09:09.174 | WARN  | main | stream-pipe-4.core | Creating topic output1
2020-12-25 19:09:09.205 | WARN  | main | stream-pipe-4.core | Creating topic output2
2020-12-25 19:09:09.242 | WARN  | main | stream-pipe-4.core | Creating topic output3
2020-12-25 19:09:09.275 | WARN  | main | stream-pipe-4.core | Starting application
2020-12-25 19:09:09.367 | WARN  | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config.
2020-12-25 19:09:09.367 | WARN  | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config.
2020-12-25 19:09:09.372 | WARN  | main | stream-pipe-4.core | Starting pipe
2020-12-25 19:09:09.374 | WARN  | main | stream-pipe-4.core | Pipe is up
2020-12-25 19:09:09.376 | WARN  | main | stream-pipe-4.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x68de8522 org.apache.kafka.streams.KafkaStreams@68de8522]
2020-12-25 19:09:26.385 | WARN  | pipe-3dc0cffa-c985-47f5-9e61-15896b739fa6-StreamThread-1 | stream-pipe-4.core | Received message with key:    x  and value: {:x 1, :y 2}
2020-12-25 19:09:26.515 | WARN  | pipe-3dc0cffa-c985-47f5-9e61-15896b739fa6-StreamThread-1 | stream-pipe-4.core | Transformed message with key: x  and value: {:result 3}
2020-12-25 19:09:26.642 | WARN  | pipe-3dc0cffa-c985-47f5-9e61-15896b739fa6-StreamThread-1 | stream-pipe-4.core | Transformed message with key: x  and value: {:result 3, :timestamp Fri Dec 25 19:09:26 CET 2020}

Podle zpráv z logu byla zpráva úspěšně zpracována, takže si vypíšeme obsah témat „output1“, „output2“ i „output3“.

V tématu „output1“ je uložena kopie vstupní zprávy:

$ kafkacat -C -b localhost:9092 -t output1
 
{"x":1,"y":2}

V tématu „output2“ je zpráva s vypočteným výsledkem:

$ kafkacat -C -b localhost:9092 -t output2
 
{"result":3}

A konečně v tématu „output3“ je zpráva s přidaným časovým razítkem:

$ kafkacat -C -b localhost:9092 -t output3
 
{"result":3,"timestamp":"Fri Dec 25 19:09:26 CET 2020"}

17. Transformace zpráv založená na uzávěru

V některých aplikacích není transformační funkce čistou funkcí ve smyslu, že se jedná o funkci bez paměti a bez vedlejších efektů. Příkladem může být funkce, která by měla do zpráv vkládat hodnotu nějakého čítače atd. I toho lze – pochopitelně – v programovacím jazyku Clojure dosáhnout, a to konkrétně s použitím uzávěrů (closure); méně idiomatické a koncepčně horší by bylo použití třídy. Ukažme si tedy, jak lze vytvořit transformační uzávěr, který se volá jako běžná funkce, ovšem bude mít vnitřní paměť a umožní nám do zpráv vložit jejich index (tedy hodnotu čítače):

(def etl-3
  "Transformační funkce vytvořená ve formě uzávěru."
  (let [counter (atom 0)]
    (fn [[k v]] (do
                  (swap! counter inc)
                  [k (assoc v :counter @counter)]))))
Poznámka: vidíme, že čítač je realizován pomocí atomu, což je opět koncepčně v pořádku.

Změněná definice kolony s přidanou transformační funkcí etl-3:

(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/peek (fn [[k v]]
                (log/warn "Received message with key:   " k " and value:" v)))
      (j/through (:output-1 topic-config))
      (j/map etl-1)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/through (:output-2 topic-config))
      (j/map etl-2)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/through (:output-3 topic-config))
      (j/map etl-3)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/to (:output-4 topic-config)))
  builder)

Následuje výpis úplného zdrojového kódu dnešního posledního demonstračního příkladu:

(ns stream-pipe-5.core
  (:require [jackdaw.admin :as ja]
            [jackdaw.client :as jc]
            [jackdaw.client.log :as jl]
            [jackdaw.serdes.json]
            [jackdaw.streams :as j]
            [clojure.pprint :as pp]
            [clojure.tools.logging :as log]))
 
 
(def topic-config
  "Konfigurace témat - vstupního i výstupního."
  {:input
   {:topic-name "input"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output-1
   {:topic-name "output1"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output-2
   {:topic-name "output2"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output-3
   {:topic-name "output3"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}
   :output-4
   {:topic-name "output4"
    :partition-count 1
    :replication-factor 1
    :key-serde (jackdaw.serdes.json/serde)
    :value-serde (jackdaw.serdes.json/serde)}})
 
 
(def app-config
  "Konfigurace aplikace (ve smyslu knihovny Jackdaw)."
  {"application.id" "pipe"
   "bootstrap.servers" "localhost:9092"
   "cache.max.bytes.buffering" "0"
   "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"})
 
 
(defn delete-topic
  "Pomocná funkce pro smazání vybraného tématu."
  [broker-config topic]
  (try
    (log/warn "Deleting topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/delete-topics! client [topic]))
    (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn new-topic
  "Pomocná funkce pro vytvoření nového tématu."
  [broker-config topic]
  (try
    (log/warn "Creating topic" (:topic-name topic))
    (let [client (ja/->AdminClient broker-config)]
      (ja/create-topics! client [topic]))
      (catch Exception e (str "caught exception: " (.getMessage e)))))
 
 
(defn etl-1
  "Transformační funkce."
  [[k v]]
  [k {:result (+ (:x v) (:y v))}])
 
 
(defn etl-2
  "Transformační funkce."
  [[k v]]
  [k (assoc v :timestamp (str (new java.util.Date)))])
 
 
(def etl-3
  "Transformační funkce vytvořená ve formě uzávěru."
  (let [counter (atom 0)]
    (fn [[k v]] (do
                  (swap! counter inc)
                  [k (assoc v :counter @counter)]))))
 
 
(defn build-topology
  "Definice celé pipeliny (kolony) - základ aplikace."
  [builder topic-config]
  (-> (j/kstream builder (:input topic-config))
      (j/peek (fn [[k v]]
                (log/warn "Received message with key:   " k " and value:" v)))
      (j/through (:output-1 topic-config))
      (j/map etl-1)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/through (:output-2 topic-config))
      (j/map etl-2)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/through (:output-3 topic-config))
      (j/map etl-3)
      (j/peek (fn [[k v]]
                (log/warn "Transformed message with key:" k " and value:" v)))
      (j/to (:output-4 topic-config)))
  builder)
 
 
(defn start-app
  "Spuštění aplikace."
  [app-config topic-config]
  (let [builder (j/streams-builder)
        topology (build-topology builder topic-config)
        app (j/kafka-streams topology app-config)]
    (log/warn "Starting pipe")
    (j/start app)
    (log/warn "Pipe is up")
    app))
 
 
(defn stop-app
  "Zastavení aplikace."
  [app]
  (log/warn "Stopping pipe")
  (j/close app)
  (log/warn "Pipe is down"))
 
 
(defn -main
  [& args]
  (let [broker-config {"bootstrap.servers" "localhost:9092"}]
 
    ;; na začátku pro jistotu vymažeme témata používaná pipou
    (delete-topic broker-config (:input topic-config))
    (delete-topic broker-config (:output-1 topic-config))
    (delete-topic broker-config (:output-2 topic-config))
    (delete-topic broker-config (:output-3 topic-config))
 
    ;; vytvoření nových témat akceptujících zprávy ve formátu JSON
    (new-topic broker-config (:input topic-config))
    (new-topic broker-config (:output-1 topic-config))
    (new-topic broker-config (:output-2 topic-config))
    (new-topic broker-config (:output-3 topic-config))
 
    ;; spuštění kolony
    (log/warn "Starting application")
    (let [app (start-app app-config topic-config)]
      (log/warn "App created:" app))))

Příklad spustíme:

$ lein run
 
2020-12-27 15:39:39.101 | WARN  | main | stream-pipe-5.core | Deleting topic input
2020-12-27 15:39:39.322 | WARN  | main | stream-pipe-5.core | Deleting topic output1
2020-12-27 15:39:39.345 | WARN  | main | stream-pipe-5.core | Deleting topic output2
2020-12-27 15:39:39.367 | WARN  | main | stream-pipe-5.core | Deleting topic output3
2020-12-27 15:39:39.392 | WARN  | main | stream-pipe-5.core | Creating topic input
2020-12-27 15:39:39.433 | WARN  | main | stream-pipe-5.core | Creating topic output1
2020-12-27 15:39:39.464 | WARN  | main | stream-pipe-5.core | Creating topic output2
2020-12-27 15:39:39.514 | WARN  | main | stream-pipe-5.core | Creating topic output3
2020-12-27 15:39:39.542 | WARN  | main | stream-pipe-5.core | Starting application
2020-12-27 15:39:39.644 | WARN  | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config.
2020-12-27 15:39:39.644 | WARN  | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config.
2020-12-27 15:39:39.649 | WARN  | main | stream-pipe-5.core | Starting pipe
2020-12-27 15:39:39.651 | WARN  | main | stream-pipe-5.core | Pipe is up
2020-12-27 15:39:39.653 | WARN  | main | stream-pipe-5.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x185bf6e0 org.apache.kafka.streams.KafkaStreams@185bf6e0]

Pošleme zprávy (se správným formátem) do vstupního tématu:

$ kafkacat -P -b localhost:9092 -t input -K:
 
"p1":{"x":1,"y":2}
"p1":{"x":1,"y":2}
"p2":{"x":0,"y":9}
"p3":{"y":1,"x":2}

Přesvědčíme se, že jsou všechny zprávy ze vstupu zpracovány korektně:

2020-12-27 15:39:44.513 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Received message with key:    p1  and value: {:x 1, :y 2}
2020-12-27 15:39:44.639 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p1  and value: {:result 3}
2020-12-27 15:39:44.757 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p1  and value: {:result 3, :timestamp Sun Dec 27 15:39:44 CET 2020}
2020-12-27 15:39:44.872 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p1  and value: {:result 3, :timestamp Sun Dec 27 15:39:44 CET 2020, :counter 1}
2020-12-27 15:40:01.612 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Received message with key:    p2  and value: {:x 0, :y 9}
2020-12-27 15:40:01.720 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p2  and value: {:result 9}
2020-12-27 15:40:01.828 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p2  and value: {:result 9, :timestamp Sun Dec 27 15:40:01 CET 2020}
2020-12-27 15:40:01.937 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p2  and value: {:result 9, :timestamp Sun Dec 27 15:40:01 CET 2020, :counter 2}
2020-12-27 15:40:15.268 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Received message with key:    p3  and value: {:y 1, :x 2}
2020-12-27 15:40:15.371 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p3  and value: {:result 3}
2020-12-27 15:40:15.475 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p3  and value: {:result 3, :timestamp Sun Dec 27 15:40:15 CET 2020}
2020-12-27 15:40:15.578 | WARN  | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p3  and value: {:result 3, :timestamp Sun Dec 27 15:40:15 CET 2020, :counter 3}

V posledním kroku otestujeme, že v posledním výstupním tématu „output4“ jsou zprávy obsahující i hodnotu čítače:

Cloud23

$ kafkacat -C -b localhost:9092 -t output4
 
% Reached end of topic output4 [0] at offset 0
{"result":3,"timestamp":"Sun Dec 27 15:39:44 CET 2020","counter":1}
% Reached end of topic output4 [0] at offset 1
{"result":9,"timestamp":"Sun Dec 27 15:40:01 CET 2020","counter":2}
% Reached end of topic output4 [0] at offset 2
{"result":3,"timestamp":"Sun Dec 27 15:40:15 CET 2020","counter":3}
% Reached end of topic output4 [0] at offset 3

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Clojure byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-):

# Projekt Stručný popis Cesta
1 stream-pipe-0 funkční kostra pipeline, na které jsou postaveny i další čtyři demonstrační příklady https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-0
2 stream-pipe-1 jednoduchá transformace zpráv https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-1
3 stream-pipe-2 zalogování chyby bez pádu aplikace v případě, že zpráva není uložena v JSONu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-2
4 stream-pipe-3 složitější pipeline s několika transformačními funkcemi https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-3
5 stream-pipe-4 rozdělení streamu do více částí https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-4
6 stream-pipe-5 transformace zpráv založená na uzávěru https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-5

Demonstrační příklady z předchozích dvou článků o propojení systému Apache Kafka a Clojure s využitím knihovny Jackdaw [1] [2]:

# Projekt Stručný popis Cesta
1 kafka-repl prázdný projekt připravený pro interaktivní práci v REPLu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/kafka-repl
       
2 topic-constructor vytvoření nového tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor
3 topic-constructor-logger vytvoření nového tématu; konfigurace logování https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor-logger
4 topic-constructor-10-partitions vytvoření nového tématu s deseti oddíly https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor-10-partitions
5 topic-destructor vymazání tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-destructor
6 get-broker-config zobrazení konfigurace brokera https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/get-broker-config
7 describe-cluster zobrazení aktuální konfigurace clusteru https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-cluster
8 describe-all-topics zobrazení všech dostupných témat https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-all-topics
9 describe-topics zobrazení informací o vybraných tématech https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-topics
10 describe-topics-configs zobrazení podrobnějších informací o vybraných tématech https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-topics-configs
11 produce-messages-1 vytvoření jediné zprávy https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-1
12 produce-messages-2 vytvoření 100 zpráv https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-2
13 produce-messages-3 vytvoření 100 zpráv se specifikací klíče https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-3
14 consume-messages-1 konzument zpráv z vybraného tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/consume-messages-1
15 consume-messages-2 konzument zpráv z vybraného tématu, alternativní práce s offsetem na začátku https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/consume-messages-2
       
16 edn-1-serializer producent zpráv ve formátu EDN https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/edn-1-serializer
17 edn-2-serializer producent zpráv ve formátu EDN bez konců řádků https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/edn-2-serializer
18 json-serializer producent zpráv ve formátu JSON https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/json-serializer
19 custom-pipe jednoduchá kolona (pipe) https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/custom-pipe
20 custom-pipe-processing-function kolona využívající libovolnou transformační funkci https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/custom-pipe-processing-function

19. Odkazy na předchozí části seriálu o programovacím jazyku Clojure

  1. Clojure 1: Úvod
    http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm/
  2. Clojure 2: Symboly, kolekce atd.
    http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-2-cast/
  3. Clojure 3: Funkcionální programování
    http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-3-cast-funkcionalni-programovani/
  4. Clojure 4: Kolekce, sekvence a lazy sekvence
    http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-4-cast-kolekce-sekvence-a-lazy-sekvence/
  5. Clojure 5: Sekvence, lazy sekvence a paralelní programy
    http://www.root.cz/clanky/clojure-a-bezpecne-aplikace-pro-jvm-sekvence-lazy-sekvence-a-paralelni-programy/
  6. Clojure 6: Podpora pro paralelní programování
    http://www.root.cz/clanky/programovaci-jazyk-clojure-6-futures-nejsou-jen-financni-derivaty/
  7. Clojure 7: Další funkce pro paralelní programování
    http://www.root.cz/clanky/programovaci-jazyk-clojure-7-dalsi-podpurne-prostredky-pro-paralelni-programovani/
  8. Clojure 8: Identity, stavy, neměnné hodnoty a reference
    http://www.root.cz/clanky/programovaci-jazyk-clojure-8-identity-stavy-nemenne-hodnoty-a-referencni-typy/
  9. Clojure 9: Validátory, pozorovatelé a kooperace s Javou
    http://www.root.cz/clanky/programovaci-jazyk-clojure-9-validatory-pozorovatele-a-kooperace-mezi-clojure-a-javou/
  10. Clojure 10: Kooperace mezi Clojure a Javou
    http://www.root.cz/clanky/programovaci-jazyk-clojure-10-kooperace-mezi-clojure-a-javou-pokracovani/
  11. Clojure 11: Generátorová notace seznamu/list comprehension
    http://www.root.cz/clanky/programovaci-jazyk-clojure-11-generatorova-notace-seznamu-list-comprehension/
  12. Clojure 12: Překlad programů z Clojure do bajtkódu JVM I:
    http://www.root.cz/clanky/programovaci-jazyk-clojure-12-preklad-programu-z-clojure-do-bajtkodu-jvm/
  13. Clojure 13: Překlad programů z Clojure do bajtkódu JVM II:
    http://www.root.cz/clanky/programovaci-jazyk-clojure-13-preklad-programu-z-clojure-do-bajtkodu-jvm-pokracovani/
  14. Clojure 14: Základy práce se systémem maker
    http://www.root.cz/clanky/programovaci-jazyk-clojure-14-zaklady-prace-se-systemem-maker/
  15. Clojure 15: Tvorba uživatelských maker
    http://www.root.cz/clanky/programovaci-jazyk-clojure-15-tvorba-uzivatelskych-maker/
  16. Programovací jazyk Clojure – triky při práci s řetězci
    http://www.root.cz/clanky/programovaci-jazyk-clojure-triky-pri-praci-s-retezci/
  17. Programovací jazyk Clojure – triky při práci s kolekcemi
    http://www.root.cz/clanky/programovaci-jazyk-clojure-triky-pri-praci-s-kolekcemi/
  18. Programovací jazyk Clojure – práce s mapami a množinami
    http://www.root.cz/clanky/programovaci-jazyk-clojure-prace-s-mapami-a-mnozinami/
  19. Programovací jazyk Clojure – základy zpracování XML
    http://www.root.cz/clanky/programovaci-jazyk-clojure-zaklady-zpracovani-xml/
  20. Programovací jazyk Clojure – testování s využitím knihovny Expectations
    http://www.root.cz/clanky/programovaci-jazyk-clojure-testovani-s-vyuzitim-knihovny-expectations/
  21. Programovací jazyk Clojure – některé užitečné triky použitelné (nejenom) v testech
    http://www.root.cz/clanky/programovaci-jazyk-clojure-nektere-uzitecne-triky-pouzitelne-nejenom-v-testech/
  22. Enlive – výkonný šablonovací systém pro jazyk Clojure
    http://www.root.cz/clanky/enlive-vykonny-sablonovaci-system-pro-jazyk-clojure/
  23. Nástroj Leiningen a programovací jazyk Clojure: tvorba vlastních knihoven pro veřejný repositář Clojars
    http://www.root.cz/clanky/nastroj-leiningen-a-programovaci-jazyk-clojure-tvorba-vlastnich-knihoven-pro-verejny-repositar-clojars/
  24. Novinky v Clojure verze 1.8.0
    http://www.root.cz/clanky/novinky-v-clojure-verze-1–8–0/
  25. Asynchronní programování v Clojure s využitím knihovny core.async
    http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async/
  26. Asynchronní programování v Clojure s využitím knihovny core.async (pokračování)
    http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async-pokracovani/
  27. Asynchronní programování v Clojure s využitím knihovny core.async (dokončení)
    http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async-dokonceni/
  28. Vytváříme IRC bota v programovacím jazyce Clojure
    http://www.root.cz/clanky/vytvarime-irc-bota-v-programovacim-jazyce-clojure/
  29. Gorilla REPL: interaktivní prostředí pro programovací jazyk Clojure
    https://www.root.cz/clanky/gorilla-repl-interaktivni-prostredi-pro-programovaci-jazyk-clojure/
  30. Multimetody v Clojure aneb polymorfismus bez použití OOP
    https://www.root.cz/clanky/multimetody-v-clojure-aneb-polymorfismus-bez-pouziti-oop/
  31. Práce s externími Java archivy v programovacím jazyku Clojure
    https://www.root.cz/clanky/prace-s-externimi-java-archivy-v-programovacim-jazyku-clojure/
  32. Clojure 16: Složitější uživatelská makra
    http://www.root.cz/clanky/programovaci-jazyk-clojure-16-slozitejsi-uzivatelska-makra/
  33. Clojure 17: Využití standardních maker v praxi
    http://www.root.cz/clanky/programovaci-jazyk-clojure-17-vyuziti-standardnich-maker-v-praxi/
  34. Clojure 18: Základní techniky optimalizace aplikací
    http://www.root.cz/clanky/programovaci-jazyk-clojure-18-zakladni-techniky-optimalizace-aplikaci/
  35. Clojure 19: Vývojová prostředí pro Clojure
    http://www.root.cz/clanky/programovaci-jazyk-clojure-19-vyvojova-prostredi-pro-clojure/
  36. Clojure 20: Vývojová prostředí pro Clojure (Vimu s REPL)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-20-vyvojova-prostredi-pro-clojure-integrace-vimu-s-repl/
  37. Clojure 21: ClojureScript aneb překlad Clojure do JS
    http://www.root.cz/clanky/programovaci-jazyk-clojure-21-clojurescript-aneb-preklad-clojure-do-javascriptu/
  38. Leiningen: nástroj pro správu projektů napsaných v Clojure
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure/
  39. Leiningen: nástroj pro správu projektů napsaných v Clojure (2)
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-2/
  40. Leiningen: nástroj pro správu projektů napsaných v Clojure (3)
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-3/
  41. Leiningen: nástroj pro správu projektů napsaných v Clojure (4)
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-4/
  42. Leiningen: nástroj pro správu projektů napsaných v Clojure (5)
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-5/
  43. Leiningen: nástroj pro správu projektů napsaných v Clojure (6)
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-6/
  44. Programovací jazyk Clojure a databáze (1.část)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-a-databaze-1-cast/
  45. Pluginy pro Leiningen
    http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-pluginy-pro-leiningen/
  46. Programovací jazyk Clojure a knihovny pro práci s vektory a maticemi
    http://www.root.cz/clanky/programovaci-jazyk-clojure-a-knihovny-pro-praci-s-vektory-a-maticemi/
  47. Programovací jazyk Clojure a knihovny pro práci s vektory a maticemi (2)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-a-knihovny-pro-praci-s-vektory-a-maticemi-2/
  48. Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk
    http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk/
  49. Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk (2)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk-2/
  50. Seesaw: knihovna pro snadnou tvorbu GUI v jazyce Clojure
    http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure/
  51. Seesaw: knihovna pro snadnou tvorbu GUI v azyce Clojure (2)
    http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure-2/
  52. Seesaw: knihovna pro snadnou tvorbu GUI v jazyce Clojure (3)
    http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure-3/
  53. Programovací jazyk Clojure a práce s Gitem
    http://www.root.cz/clanky/programovaci-jazyk-clojure-a-prace-s-gitem/
  54. Programovací jazyk Clojure a práce s Gitem (2)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-a-prace-s-gitem-2/
  55. Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk (dokončení)
    http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk-dokonceni/
  56. Pixie: lehký skriptovací jazyk s „kouzelnými“ schopnostmi
    https://www.root.cz/clanky/pixie-lehky-skriptovaci-jazyk-s-kouzelnymi-schopnostmi/
  57. Programovací jazyk Pixie: funkce ze základní knihovny a použití FFI
    https://www.root.cz/clanky/pro­gramovaci-jazyk-pixie-funkce-ze-zakladni-knihovny-a-pouziti-ffi/
  58. Novinky v Clojure verze 1.9.0
    https://www.root.cz/clanky/novinky-v-clojure-verze-1–9–0/
  59. Validace dat s využitím knihovny spec v Clojure 1.9.0
    https://www.root.cz/clanky/validace-dat-s-vyuzitim-knihovny-spec-v-clojure-1–9–0/
  60. Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure
    https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure/
  61. Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure (2)
    https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure-2/
  62. Incanter: prostředí pro statistické výpočty s grafickým výstupem založené na Clojure
    https://www.root.cz/clanky/incanter-prostredi-pro-statisticke-vypocty-s-grafickym-vystupem-zalozene-na-clojure/
  63. Incanter: operace s maticemi
    https://www.root.cz/clanky/incanter-operace-s-maticemi/
  64. Interpret programovacího jazyka Clojure integrovaný do Jupyter Notebooku
    https://www.root.cz/clanky/interpret-programovaciho-jazyka-clojure-integrovany-do-jupyter-notebooku/
  65. Babashka: interpret Clojure určený pro rychlé spouštění utilit z příkazového řádku
    https://www.root.cz/clanky/babashka-interpret-clojure-urceny-pro-rychle-spousteni-utilit-z-prikazoveho-radku/
  66. Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw
    https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw/
  67. Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw (2. část)
    https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw-2-cast/

20. Odkazy na Internetu

  1. Learn Clojure – Flow Control
    https://clojure.org/guides/learn/flow
  2. ETL Batch Processing With Kafka?
    https://medium.com/swlh/etl-batch-processing-with-kafka-7f66f843e20d
  3. ETL with Kafka
    https://blog.codecentric.de/en/2018/03/e­tl-kafka/
  4. Building ETL Pipelines with Clojure and Transducers
    https://www.grammarly.com/blog/en­gineering/building-etl-pipelines-with-clojure-and-transducers/
  5. pipeline (možné použít pro ETL)
    https://clojuredocs.org/clo­jure.core.async/pipeline
  6. On Track with Apache Kafka – Building a Streaming ETL Solution with Rail Data
    https://www.confluent.io/blog/build-streaming-etl-solutions-with-kafka-and-rail-data/
  7. Kafka – Understanding Offset Commits
    https://www.logicbig.com/tu­torials/misc/kafka/commit­ting-offsets.html
  8. fundingcircle/jackdaw (na Clojars)
    https://clojars.org/fundin­gcircle/jackdaw/versions/0­.7.6
  9. Dokumentace ke knihovně jackdaw
    https://cljdoc.org/d/fundin­gcircle/jackdaw/0.7.6/doc/re­adme
  10. Jackdaw AdminClient API
    https://cljdoc.org/d/fundin­gcircle/jackdaw/0.7.6/doc/jac­kdaw-adminclient-api
  11. Jackdaw Client API
    https://cljdoc.org/d/fundin­gcircle/jackdaw/0.7.6/doc/jac­kdaw-client-api
  12. Kafka.clj
    https://github.com/helins-io/kafka.clj
  13. Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách
    https://www.root.cz/clanky/pouziti-nastroje-apache-kafka-v-aplikacich-zalozenych-na-mikrosluzbach/
  14. Apache Kafka: distribuovaná streamovací platforma
    https://www.root.cz/clanky/apache-kafka-distribuovana-streamovaci-platforma/
  15. Real-Time Payments with Clojure and Apache Kafka (podcast)
    https://www.evidentsystem­s.com/news/confluent-podcast-about-apache-kafka/
  16. Kafka and Clojure – Immutable event streams
    https://practicalli.github.io/kafka-and-clojure/
  17. Kafka Streams, the Clojure way
    https://blog.davemartin.me/posts/kafka-streams-the-clojure-way/
  18. dvlopt.kafka na GitHubu
    https://github.com/helins-io/kafka.clj
  19. kafka-streams-the-clojure-way na GitHubu
    https://github.com/DaveWM/kafka-streams-the-clojure-way
  20. babashka: A Clojure babushka for the grey areas of Bash
    https://github.com/borkdude/babashka
  21. Babashka and the Small Clojure Interpreter @ ClojureD 2020 (slajdy)
    https://speakerdeck.com/bor­kdude/babashka-and-the-small-clojure-interpreter-at-clojured-2020
  22. Babashka: ukázky použití
    https://github.com/borkdu­de/babashka/blob/master/doc/e­xamples.md
  23. clojureD 2020: „Babashka and Small Clojure Interpreter: Clojure in new contexts“ by Michiel Borkent
    https://www.youtube.com/watch?v=Nw8aN-nrdEk&t=5s
  24. Meetup #124 Babashka, implementing an nREPL server & game engines with Clojure
    https://www.youtube.com/wat­ch?v=0YmZYnwyHHc
  25. The Last Programming Language (shrnutí vývoje programovacích jazyků)
    https://www.youtube.com/watch?v=P2yr-3F6PQo
  26. Shebang (Unix): Wikipedia EN
    https://en.wikipedia.org/wi­ki/Shebang_(Unix)
  27. Shebang (Unix): Wikipedia CZ
    https://cs.wikipedia.org/wi­ki/Shebang_(Unix)
  28. How to create Clojure notebooks in Jupyter
    https://s01blog.wordpress­.com/2017/12/10/how-to-create-clojure-notebooks-in-jupyter/
  29. Dokumentace k nástroji Conda
    https://docs.conda.io/en/latest/
  30. Notebook interface
    https://en.wikipedia.org/wi­ki/Notebook_interface
  31. Jypyter: open source, interactive data science and scientific computing across over 40 programming languages
    https://jupyter.org/
  32. Calysto Scheme
    https://github.com/Calysto/ca­lysto_scheme
  33. scheme.py (základ projektu Calysto Scheme)
    https://github.com/Calysto/ca­lysto_scheme/blob/master/ca­lysto_scheme/scheme.py
  34. Humane test output for clojure.test
    https://github.com/pjstadig/humane-test-output
  35. iota
    https://github.com/juxt/iota
  36. 5 Differences between clojure.spec and Schema
    https://lispcast.com/clojure.spec-vs-schema/
  37. Schema: Clojure(Script) library for declarative data description and validation
    https://github.com/plumatic/schema
  38. Zip archiv s Clojure 1.9.0
    http://repo1.maven.org/ma­ven2/org/clojure/clojure/1­.9.0/clojure-1.9.0.zip
  39. Clojure 1.9 is now available
    https://clojure.org/news/2017/12/08/clo­jure19
  40. Deps and CLI Guide
    https://clojure.org/guides/dep­s_and_cli
  41. Changes to Clojure in Version 1.9
    https://github.com/clojure/clo­jure/blob/master/changes.md
  42. clojure.spec – Rationale and Overview
    https://clojure.org/about/spec
  43. Zip archiv s Clojure 1.8.0
    http://repo1.maven.org/ma­ven2/org/clojure/clojure/1­.8.0/clojure-1.8.0.zip
  44. Clojure 1.8 is now available
    http://clojure.org/news/2016/01/19/clo­jure18
  45. Socket Server REPL
    http://dev.clojure.org/dis­play/design/Socket+Server+REPL
  46. CLJ-1671: Clojure socket server
    http://dev.clojure.org/jira/browse/CLJ-1671
  47. CLJ-1449: Add clojure.string functions for portability to ClojureScript
    http://dev.clojure.org/jira/browse/CLJ-1449
  48. Launching a Socket Server
    http://clojure.org/referen­ce/repl_and_main#_launchin­g_a_socket_server
  49. API for clojure.string
    http://clojure.github.io/clo­jure/branch-master/clojure.string-api.html
  50. Clojars:
    https://clojars.org/
  51. Seznam knihoven na Clojars:
    https://clojars.org/projects
  52. Clojure Cookbook: Templating HTML with Enlive
    https://github.com/clojure-cookbook/clojure-cookbook/blob/master/07_webapps/7–11_enlive.asciidoc
  53. An Introduction to Enlive
    https://github.com/swannodette/enlive-tutorial/
  54. Enlive na GitHubu
    https://github.com/cgrand/enlive
  55. Expectations: příklady atd.
    http://jayfields.com/expectations/
  56. Expectations na GitHubu
    https://github.com/jaycfi­elds/expectations
  57. Lein-expectations na GitHubu
    https://github.com/gar3thjon3s/lein-expectations
  58. Testing Clojure With Expectations
    https://semaphoreci.com/blog/2014/09/23/tes­ting-clojure-with-expectations.html
  59. Clojure testing TDD/BDD libraries: clojure.test vs Midje vs Expectations vs Speclj
    https://www.reddit.com/r/Clo­jure/comments/1viilt/cloju­re_testing_tddbdd_librari­es_clojuretest_vs/
  60. Testing: One assertion per test
    http://blog.jayfields.com/2007/06/tes­ting-one-assertion-per-test.html
  61. Rewriting Your Test Suite in Clojure in 24 hours
    http://blog.circleci.com/rewriting-your-test-suite-in-clojure-in-24-hours/
  62. Clojure doc: zipper
    http://clojuredocs.org/clo­jure.zip/zipper
  63. Clojure doc: parse
    http://clojuredocs.org/clo­jure.xml/parse
  64. Clojure doc: xml-zip
    http://clojuredocs.org/clojure.zip/xml-zip
  65. Clojure doc: xml-seq
    http://clojuredocs.org/clo­jure.core/xml-seq
  66. Parsing XML in Clojure
    https://github.com/clojuredocs/guides
  67. Clojure Zipper Over Nested Vector
    https://vitalyper.wordpres­s.com/2010/11/23/clojure-zipper-over-nested-vector/
  68. Understanding Clojure's PersistentVector implementation
    http://blog.higher-order.net/2009/02/01/understanding-clojures-persistentvector-implementation
  69. Understanding Clojure's PersistentHashMap (deftwice…)
    http://blog.higher-order.net/2009/09/08/understanding-clojures-persistenthashmap-deftwice.html
  70. Assoc and Clojure's PersistentHashMap: part ii
    http://blog.higher-order.net/2010/08/16/assoc-and-clojures-persistenthashmap-part-ii.html
  71. Ideal Hashtrees (paper)
    http://lampwww.epfl.ch/pa­pers/idealhashtrees.pdf
  72. 4Clojure
    http://www.4clojure.com/
  73. ClojureDoc (rozcestník s dokumentací jazyka Clojure)
    http://clojuredocs.org/
  74. Clojure (na Wikipedia EN)
    http://en.wikipedia.org/wiki/Clojure
  75. Clojure (na Wikipedia CS)
    http://cs.wikipedia.org/wiki/Clojure
  76. SICP (The Structure and Interpretation of Computer Programs)
    http://mitpress.mit.edu/sicp/
  77. Pure function
    http://en.wikipedia.org/wi­ki/Pure_function
  78. Funkcionální programování
    http://cs.wikipedia.org/wi­ki/Funkcionální_programová­ní
  79. Čistě funkcionální (datové struktury, jazyky, programování)
    http://cs.wikipedia.org/wi­ki/Čistě_funkcionální
  80. Clojure Macro Tutorial (Part I, Getting the Compiler to Write Your Code For You)
    http://www.learningclojure­.com/2010/09/clojure-macro-tutorial-part-i-getting.html
  81. Clojure Macro Tutorial (Part II: The Compiler Strikes Back)
    http://www.learningclojure­.com/2010/09/clojure-macro-tutorial-part-ii-compiler.html
  82. Clojure Macro Tutorial (Part III: Syntax Quote)
    http://www.learningclojure­.com/2010/09/clojure-macro-tutorial-part-ii-syntax.html
  83. Tech behind Tech: Clojure Macros Simplified
    http://techbehindtech.com/2010/09/28/clo­jure-macros-simplified/
  84. Fatvat – Exploring functional programming: Clojure Macros
    http://www.fatvat.co.uk/2009/02/clo­jure-macros.html

Autor článku

Pavel Tišnovský vystudoval VUT FIT a v současné době pracuje ve společnosti Red Hat, kde vyvíjí nástroje pro OpenShift.io.