Obsah
2. Kostra pipeline, na které jsou založeny všechny navazující demonstrační příklady
3. Konfigurace témat, definice pipeline i aplikace, v jejímž rámci pipeline poběží
4. Úplný zdrojový kód první aplikace s funkční pipeline
5. Spuštění a otestování dnešního prvního demonstračního příkladu
6. Transformace zpráv neboli „T“ v „Extract, transform, load“
7. Přečtení zpráv z kolony beze změny její funkce (logování atd.)
8. Úplný zdrojový kód druhé aplikace s funkční pipeline
9. Spuštění a otestování dnešního druhého demonstračního příkladu
10. Chování aplikace v případě příjmu nekorektní zprávy
11. Zalogování chyby bez pádu aplikace
12. Chování upravené aplikace v příjmu nekorektní zprávy
13. Složitější pipeline s několika transformačními funkcemi
14. Spuštění a otestování dnešního čtvrtého demonstračního příkladu
15. Rozdělení streamu do více částí
16. Spuštění a otestování dnešního pátého demonstračního příkladu
17. Transformace zpráv založená na uzávěru
18. Repositář s demonstračními příklady
19. Odkazy na předchozí části seriálu o programovacím jazyku Clojure
1. Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony)
Ve třetím (předposledním) článku o kombinaci programovacího jazyka Clojure a streamovací platformy Apache Kafka se zaměříme na popis skutečného streamingu využívajícího funkcionální vlastnosti jazyka Clojure. Ukážeme si konstrukci složitější „kolony“, rozdělení streamů (proudu zpráv) do více kolon atd. Popsány, popř. použity budou následující funkce a makra ze jmenného prostoru jackdaw.streams:
# | Funkce/makro | Použito | Stručný popis |
---|---|---|---|
1 | jackdaw.streams/start | řízení aplikace | zahájení činnosti kolony |
2 | jackdaw.streams/close | řízení aplikace | uzavření kolony |
3 | jackdaw.streams/kstream | vytvoření proudu | vytvoření proudu konzumujícího zprávy z vybraného tématu |
4 | jackdaw.streams/to | uvnitř proudu | uložení (materializace) zpráv z kolony do tématu |
5 | jackdaw.streams/map | uvnitř proudu | transformace zpráv |
6 | jackdaw.streams/map-values | uvnitř proudu | transformace zpráv (pouze hodnot zpráv, bez klíčů) |
7 | jackdaw.streams/peek | kolona | přečtení zprávy z kolony postranním kanálem bez porušení činnosti kolony |
8 | jackdaw.streams/through | kolona | vytvoření nové kolony rozdělením streamu |
9 | jackdaw.streams/merge | kolona | spojení dvou kolon |
10 | jackdaw.streams/filter | kolona | filtrace zpráv na základě predikátu |
11 | jackdaw.streams/filter-not | kolona | filtrace zpráv na základě predikátu |
2. Kostra pipeline, na které jsou založeny všechny navazující demonstrační příklady
V úvodní části dnešního článku si ukážeme kostru zcela funkční pipeline (kolony), na které budou založeny i všechny navazující demonstrační příklady. Struktura projektu s touto pipeline, která je mimochodem dostupná na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/stream-pipe-0, vypadá následovně (zvýrazněny jsou ty soubory, které jsou popsány v navazujícím textu):
. ├── CHANGELOG.md ├── README.md ├── LICENSE ├── doc │ └── intro.md ├── project.clj ├── resources │ └── log4j.properties ├── src │ └── stream_pipe_0 │ └── core.clj └── test └── stream_pipe_0 └── core_test.clj
Tato kostra byla získána příkazem:
$ lein new app stream-pipe-0
Nejdůležitější je popis projektu a jeho závislostí uvedený v souboru project.clj. Ručně provedené změny jsou zvýrazněny tučným písmem:
(defproject stream-pipe-0 "0.1.0-SNAPSHOT" :description "FIXME: write description" :url "http://example.com/FIXME" :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0" :url "https://www.eclipse.org/legal/epl-2.0/"} :dependencies [[org.clojure/clojure "1.10.1"] [fundingcircle/jackdaw "0.7.6"] [org.clojure/tools.logging "0.3.1"] [log4j/log4j "1.2.17" :exclusions [javax.mail/mail javax.jms/jms com.sun.jmdk/jmxtools com.sun.jmx/jmxri]] [org.slf4j/slf4j-log4j12 "1.6.6"]] :main ^:skip-aot stream-pipe-0.core :target-path "target/%s" :resource-paths ["resources/"] :profiles {:uberjar {:aot :all :jvm-opts ["-Dclojure.compiler.direct-linking=true"]}})
Aby knihovna Jackdaw vypisovala všechny důležité informace při inicializaci i běhu pipeline, musíme nakonfigurovat logování. Jedná se o soubor nazvaný log4.properties uložený v podadresáři resources. V tomto souboru je možné definovat, jakým způsobem bude proveden výpis logovacích informací na konzoli i do logovacího souboru (popř. i na jiná místa). Úroveň logovacích informací postačuje nastavit na WARN, čímž se (oproti úrovním DEBUG či INFO) poměrně dosti zásadním způsobem omezí výpis nepodstatných detailů. Navíc jsou v tomto souboru nastaveny i další důležité vlastnosti logování – maximální velikost logovacího souboru, jeho umístění v souborovém systému, případné rotace, formát výstupu aj.:
log4j.rootLogger=WARN, file, console log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=log/chainring.log log4j.appender.file.MaxFileSize=5MB log4j.appender.file.MaxBackupIndex=50 log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} | %-5p | %t | %c | %m%n log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} | %-5p | %t | %c | %m%n
3. Konfigurace témat, definice pipeline i aplikace, v jejímž rámci pipeline poběží
Zdrojový kód celé aplikace s pipeline je uložen v souboru nazvaném src/stream_pipe0/core.clj. První důležitou hodnotou je mapa obsahující konfiguraci vstupního a výstupního tématu, resp. přesněji řečeno tématu, který je použit pro vstup dat (zpráv) i tématu použitého pro výstup (uložení výsledných zpráv). Obě témata jsou nakonfigurována takovým způsobem, že klíče i hodnoty jsou reprezentovány v JSONu:
(def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}})
Následuje mapa obsahující konfiguraci celé aplikace – její jednoznačný identifikátor použitý v systému Kafka, určení adresy brokeru, nastavení vyrovnávací paměti atd. Interně je tato konfigurace převedena na properties:
(def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0"})
Funkce pojmenovaná build-topology slouží pro specifikaci konfigurace celé kolony. Prozatím se jedná o značně zjednodušenou kolonu:
- Je vytvořen proud zpráv čtených (konzumovaných) ze vstupního tématu.
- Proud zpráv se ukládá do tématu výstupního.
Pro vytvoření proudu se používá funkce nazvaná jackdaw.streams/kstream, pro uložení zpráv z proudu pak funkce jackdaw.streams/to. Povšimněte si použití threading makra, což je jedno z mnoha velmi užitečných maker programovacího jazyka Clojure:
(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/to (:output topic-config))) builder)
Dostáváme se ke dvojici funkcí určených pro spuštění aplikace, resp. pro její zastavení. Spuštění aplikace se skládá z inicializace pipeline, inicializace vlastní aplikace a z jejího následného spuštění funkcí jackdaw.streams/start:
(defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app))
Opakem funkce jackdaw.streams/start je funkce se jménem jackdaw.streams/close, která aplikaci zastaví a uvolní všechny prostředky:
(defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down"))
Aplikace je nakonfigurována a spuštěna z funkce -main. Ovšem ještě před vlastním spuštěním aplikace pro jistotu smažeme existující témata a znovu je vytvoříme:
(defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))
4. Úplný zdrojový kód první aplikace s funkční pipeline
Úplný zdrojový kód aplikace s plně funkční pipeline je dostupný na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/stream-pipe-0/src/stream_pipe0/core.clj. Jednotlivé definice hodnot i funkcí jsou popsány v dokumentačních řetězcích:
(ns stream-pipe-0.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/to (:output topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))
5. Spuštění a otestování dnešního prvního demonstračního příkladu
Demonstrační příklad popsaný v předchozích třech kapitolách spustíme příkazem:
$ lein run
Na standardním výstupu by se měly objevit informace o tom, že byla vymazána témata „input“ a „output“, že došlo k znovuvytvoření těchto témat a konečně, že se pipeline (kolona) spustila a očekává zprávy, které budou následně zkonzumovány (současně se v podadresáři log tvoří logovací soubor s podrobnějšími informacemi):
2020-12-23 16:28:36.354 | WARN | main | stream-pipe-0.core | Deleting topic input 2020-12-23 16:28:36.599 | WARN | main | stream-pipe-0.core | Deleting topic output 2020-12-23 16:28:36.618 | WARN | main | stream-pipe-0.core | Creating topic input 2020-12-23 16:28:36.650 | WARN | main | stream-pipe-0.core | Creating topic output 2020-12-23 16:28:36.678 | WARN | main | stream-pipe-0.core | Starting application 2020-12-23 16:28:36.797 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config. 2020-12-23 16:28:36.797 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config. 2020-12-23 16:28:36.802 | WARN | main | stream-pipe-0.core | Starting pipe 2020-12-23 16:28:36.805 | WARN | main | stream-pipe-0.core | Pipe is up 2020-12-23 16:28:36.807 | WARN | main | stream-pipe-0.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x61d8a491 org.apache.kafka.streams.KafkaStreams@61d8a491]
V dalším terminálu použijeme nástroj kafkacat pro poslání zprávy, resp. přesněji řečeno většího množství zpráv do tématu „input“. Zprávy musí být ve formátu JSON, ovšem jejich obsah není žádným způsobem kontrolován:
$ kafkacat -P -b localhost:9092 -t input {"foo":"bar"} {}
A konečně se ve třetím terminálu pokusíme (opět nástrojem kafkacat) přečíst zprávy z tématu „output“. Formát těchto zpráv i jejich obsah by měl odpovídat zprávám posílaným ze druhého terminálu:
$ kafkacat -C -b localhost:9092 -t output % Reached end of topic output [0] at offset 1 {"foo":"bar"} % Reached end of topic output [0] at offset 2 {} % Reached end of topic output [0] at offset 3
6. Transformace zpráv neboli „T“ v „Extract, transform, load“
Konzumace zpráv z jednoho tématu s jejich přímým uložením do tématu jiného samozřejmě nepředstavuje žádnou přelomovou technologii, takže si nyní ukážeme, jak by mohla probíhat nějaká skutečná transformace zpráv. Samotná transformace může být (v těch jednodušších případech) představována čistou funkcí, která pouze zpracuje svůj vstup (tedy zprávu získanou z proudu) a jejím výstupem bude transformovaná zpráva. Zprávy v proudu obsahují dvojici klíč+hodnota, kterou rozdělíme na klíč a hodnotu a vytvoříme z nich zprávu novou. Toto rozdělení je možné provést buď explicitně:
(defn etl "Transformační funkce." [message] (let [k (nth message 0) v (nth message 1)] [k {:result (+ (:x v) (:y v))}]))
Ovšem kratší a idiomatičtější kód by v Clojure mohl vypadat takto:
(defn etl "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}])
Nyní tedy máme funkci, která dokáže zprávu rozdělit na klíč a hodnotu a vytvořit zprávu novou. Klíč nové zprávy zůstane původní, ale hodnota se bude lišit – z původní zprávy se získají hodnoty uložené pod klíči „x“ a „y“, tyto hodnoty se sečtou a výsledek se uloží do výsledné zprávy pod klíčem „result“ (i výsledná zpráva bude uložena v JSONu).
Transformační funkci přidáme do pipeliny s využitím jackdaw.streams/map (viz podtržený řádek):
(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/map etl) (j/to (:output topic-config))) builder)
7. Přečtení zpráv z kolony beze změny její funkce (logování atd.)
V některých případech je užitečné zprávy z kolony získat „vedlejším kanálem“, tedy tak, aby se činnost kolony neporušila. Takto lze například zajistit logování zpráv atd. I pro tuto činnost existuje v knihovně Jackdaw funkce (vkládaná do kolony); ta se jmenuje jackdaw.streams/peek. Této funkci se předá jak vlastní stream, tak i funkce zavolaná pro každou zprávu. Při vložení jackdaw.streams/peek do threading makra se pochopitelně první parametr vynechá.
Snadno tedy můžeme do logu vložit jak informaci o původní zprávě, tak i o zprávě transformované:
(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek log-message) (j/map etl) (j/peek log-message) (j/to (:output topic-config))) builder)
Často se setkáme s tím, že jednoduché funkce (typu log-message či etl v našem případě) jsou definovány jako funkce anonymní:
(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/map etl) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder)
8. Úplný zdrojový kód druhé aplikace s funkční pipeline
Úplný zdrojový kód dnešní druhé aplikace s plně funkční pipeline je dostupný na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/stream-pipe-1/src/stream_pipe1/core.clj. Jednotlivé definice hodnot i funkcí jsou popsány v dokumentačních řetězcích:
(ns stream-pipe-1.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn etl "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}]) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/map etl) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))
9. Spuštění a otestování dnešního druhého demonstračního příkladu
Opět si můžeme vyzkoušet činnost dnešního druhého demonstračního příkladu, tentokrát i s transformací zpráv a s logováním přidaným do kolony. Nejprve aplikaci spustíme:
$ lein run 2020-12-23 16:52:16.362 | WARN | main | stream-pipe-1.core | Deleting topic input 2020-12-23 16:52:16.600 | WARN | main | stream-pipe-1.core | Deleting topic output 2020-12-23 16:52:16.615 | WARN | main | stream-pipe-1.core | Creating topic input 2020-12-23 16:52:16.643 | WARN | main | stream-pipe-1.core | Creating topic output 2020-12-23 16:52:16.669 | WARN | main | stream-pipe-1.core | Starting application 2020-12-23 16:52:16.786 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config. 2020-12-23 16:52:16.787 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config. 2020-12-23 16:52:16.792 | WARN | main | stream-pipe-1.core | Starting pipe 2020-12-23 16:52:16.796 | WARN | main | stream-pipe-1.core | Pipe is up 2020-12-23 16:52:16.798 | WARN | main | stream-pipe-1.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x6a97517 org.apache.kafka.streams.KafkaStreams@6a97517]
Následně do vstupního tématu pošleme několik zpráv ve správném formátu, tedy s klíčem i hodnotou v JSONu, přičemž hodnota obsahuje dvě numerické hodnoty s klíči x a y:
$ kafkacat -P -b localhost:9092 -t input -K: "***":{"x":1, "y":2} "---":{"x":100, "y":-100}
Obě zprávy by měly být správně zpracovány, což se dozvíme z logu (nejprve je zalogována zpráva vstupní, poté zpráva transformovaná):
2020-12-23 16:52:18.853 | WARN | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Received message with key: *** and value: {:x 1, :y 2} 2020-12-23 16:52:18.856 | WARN | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Transformed message with key: *** and value: {:result 3} 2020-12-23 16:52:33.498 | WARN | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Received message with key: --- and value: {:x 100, :y -100} 2020-12-23 16:52:33.498 | WARN | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Transformed message with key: --- and value: {:result 0}
A konečně výsledky přečteme z výstupního tématu:
$ kafkacat -C -b localhost:9092 -t output % Reached end of topic output [0] at offset 0 {"result":3} % Reached end of topic output [0] at offset 1 {"result":0}
10. Chování aplikace v případě příjmu nekorektní zprávy
Nyní si vyzkoušejme, co se stane v případě, že do vstupního tématu pošleme zprávu s neočekávaným obsahem, například zprávu, která vůbec není zakódována do JSONu:
$ kafkacat -P -b localhost:9092 -t input xyz
V takovém případě dojde k pádu aplikace, a to již při pokusu o deserializaci zprávy:
2020-12-23 16:32:57.649 | ERROR | pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1 | org.apache.kafka.streams.errors.LogAndFailExceptionHandler | Exception caught during Deserialization, taskId: 0_0, topic: input, partition: 0, offset: 0 java.lang.Exception: JSON error (unexpected character): x at clojure.data.json$_read.invokeStatic(json.clj:226) at clojure.data.json$_read.invoke(json.clj:177) at clojure.data.json$read.invokeStatic(json.clj:272) at clojure.data.json$read.doInvoke(json.clj:228) at clojure.lang.RestFn.applyTo(RestFn.java:139) at clojure.core$apply.invokeStatic(core.clj:667) at clojure.core$apply.invoke(core.clj:660) at clojure.data.json$read_str.invokeStatic(json.clj:278) at clojure.data.json$read_str.doInvoke(json.clj:274) at clojure.lang.RestFn.invoke(RestFn.java:439) at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34) at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60) at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788) 2020-12-23 16:32:57.652 | ERROR | pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1 | org.apache.kafka.streams.processor.internals.StreamThread | stream-thread [pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately. at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80) at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788) Caused by: java.lang.Exception: JSON error (unexpected character): x at clojure.data.json$_read.invokeStatic(json.clj:226) at clojure.data.json$_read.invoke(json.clj:177) at clojure.data.json$read.invokeStatic(json.clj:272) at clojure.data.json$read.doInvoke(json.clj:228) at clojure.lang.RestFn.applyTo(RestFn.java:139) at clojure.core$apply.invokeStatic(core.clj:667) at clojure.core$apply.invoke(core.clj:660) at clojure.data.json$read_str.invokeStatic(json.clj:278) at clojure.data.json$read_str.doInvoke(json.clj:274) at clojure.lang.RestFn.invoke(RestFn.java:439) at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34) at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60) at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) ... 8 more 2020-12-23 16:32:57.662 | ERROR | pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1 | org.apache.kafka.streams.KafkaStreams | stream-client [pipe-934ea525-1ffd-4c1a-b8a0-89a827416553] All stream threads have died. The instance will be in error state and should be closed.
11. Zalogování chyby bez pádu aplikace
Aby aplikace nezhavarovala při jakémkoli pokusu o zpracování zprávy s nekorektním formátem, budeme muset nastavit odlišný handler, tedy přesněji řečeno třídu implementující rozhraní DeserializationExceptionHandler. To lze udělat více způsoby – vytvořit si takovou třídu s využitím proxy, popř. gen-class. Taková třída musí mít přepsánu metodu handle:
DeserializationExceptionHandler.DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[],byte[]> record, Exception exception)
Existuje ovšem ještě jednodušší řešení – použít již vhodně připravenou třídu, konkrétně třídu org.apache.kafka.streams.errors.LogAndContinueExceptionHandler. To se provede následujícím způsobem při konfiguraci aplikace (poslední podtržený řádek):
(def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0" "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"})
Pro úplnost si uveďme celý zdrojový kód takto upraveného demonstračního příkladu (i když je úprava jednořádková):
(ns stream-pipe-2.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0" "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn etl "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}]) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/map etl) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))
12. Chování upravené aplikace v příjmu nekorektní zprávy
Nyní si můžeme chování aplikace upravené v rámci předchozí kapitoly otestovat. Nejdříve s korektními zprávami:
$ kafkacat -P -b localhost:9092 -t input -K: "x":{"x":1, "y":2} "y":{"x":10, "y":20}
Chování aplikace by se v takovém případě nemělo žádným způsobem změnit:
$ kafkacat -C -b localhost:9092 -t output % Reached end of topic output [0] at offset 0 {"result":3,"timestamp":"Fri Dec 25 18:15:44 CET 2020"} % Reached end of topic output [0] at offset 1 {"result":30,"timestamp":"Fri Dec 25 18:16:02 CET 2020"} % Reached end of topic output [0] at offset 2
Nyní zkusme do tématu „input“ předat nekorektně zakódovanou zprávu:
$ kafkacat -P -b localhost:9092 -t input -K: foobar
V takovém případě by se měla chyba (i celý výpis zásobníkových rámců) zalogovat, ale aplikace nezhavaruje:
2020-12-23 17:28:22.197 | WARN | pipe-3cb9ae24-57a5-4d20-b4f7-a89e9a311445-StreamThread-1 | org.apache.kafka.streams.errors.LogAndContinueExceptionHandler | Exception caught during Deserialization, taskId: 0_0, topic: input, partition: 0, offset: 0 java.lang.Exception: JSON error (expected false) at clojure.data.json$_read.invokeStatic(json.clj:215) at clojure.data.json$_read.invoke(json.clj:177) at clojure.data.json$read.invokeStatic(json.clj:272) at clojure.data.json$read.doInvoke(json.clj:228) at clojure.lang.RestFn.applyTo(RestFn.java:139) at clojure.core$apply.invokeStatic(core.clj:667) at clojure.core$apply.invoke(core.clj:660) at clojure.data.json$read_str.invokeStatic(json.clj:278) at clojure.data.json$read_str.doInvoke(json.clj:274) at clojure.lang.RestFn.invoke(RestFn.java:439) at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34) at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60) at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788) e020-12-23 17:28:22.205 | WARN | pipe-3cb9ae24-57a5-4d20-b4f7-a89e9a311445-StreamThread-1 | org.apache.kafka.streams.processor.internals.RecordDeserializer | task [0_0] Skipping record due to deserialization error. topic=[input] partition=[0] offset=[0] java.lang.Exception: JSON error (expected false) at clojure.data.json$_read.invokeStatic(json.clj:215) at clojure.data.json$_read.invoke(json.clj:177) at clojure.data.json$read.invokeStatic(json.clj:272) at clojure.data.json$read.doInvoke(json.clj:228) at clojure.lang.RestFn.applyTo(RestFn.java:139) at clojure.core$apply.invokeStatic(core.clj:667) at clojure.core$apply.invoke(core.clj:660) at clojure.data.json$read_str.invokeStatic(json.clj:278) at clojure.data.json$read_str.doInvoke(json.clj:274) at clojure.lang.RestFn.invoke(RestFn.java:439) at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34) at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60) at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
13. Složitější pipeline s několika transformačními funkcemi
Transformaci zpráv můžeme v koloně provést několikrát za sebou, opět s použitím nám již známé funkce jackdaw.streams.map. Pouze je nutné zajistit, aby transformační funkce vracely hodnotu, jejíž struktura odpovídá zprávám (dvojice klíč:hodnota):
(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/map etl-1) (j/map etl-2) (j/to (:output topic-config))) builder)
Druhá transformační funkce do zprávy přidá časové razítko:
(defn etl-2 "Transformační funkce." [[k v]] [k (assoc v :timestamp (str (new java.util.Date)))])
Pro sledování činnosti kolony je však vhodné přidat i logování s informacemi o průběhu jednotlivých transformací, takže samotná kolona bude o dva kroky delší:
(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder)
Opět si pro úplnost ukažme, jak bude vypadat úplný zdrojový kód takto upraveného demonstračního příkladu, jenž nalezneme na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/stream-pipe-3/src/stream_pipe3/core.clj:
(ns stream-pipe-3.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0" "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn etl-1 "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}]) (defn etl-2 "Transformační funkce." [[k v]] [k (assoc v :timestamp (str (new java.util.Date)))]) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))
14. Spuštění a otestování dnešního čtvrtého demonstračního příkladu
Aplikaci popsanou v předchozí kapitole spustíme nám již známým příkazem lein run:
$ lein run 2020-12-25 18:15:32.305 | WARN | main | stream-pipe-3.core | Deleting topic input 2020-12-25 18:15:32.561 | WARN | main | stream-pipe-3.core | Deleting topic output 2020-12-25 18:15:32.587 | WARN | main | stream-pipe-3.core | Creating topic input 2020-12-25 18:15:32.630 | WARN | main | stream-pipe-3.core | Creating topic output 2020-12-25 18:15:32.666 | WARN | main | stream-pipe-3.core | Starting application 2020-12-25 18:15:32.776 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config. 2020-12-25 18:15:32.776 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config. 2020-12-25 18:15:32.781 | WARN | main | stream-pipe-3.core | Starting pipe 2020-12-25 18:15:32.783 | WARN | main | stream-pipe-3.core | Pipe is up 2020-12-25 18:15:32.784 | WARN | main | stream-pipe-3.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x8f374de org.apache.kafka.streams.KafkaStreams@8f374de]
Po poslání korektně zakódované zprávy je možné v logovacím souboru (popř. přímo na terminálu aplikace) sledovat, jak je zpráva postupně transformována.
První zpráva:
2020-12-25 18:15:44.393 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Received message with key: x and value: {:x 1, :y 2} 2020-12-25 18:15:44.394 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: x and value: {:result 3} 2020-12-25 18:15:44.398 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: x and value: {:result 3, :timestamp Fri Dec 25 18:15:44 CET 2020}
Druhá zpráva:
2020-12-25 18:16:02.476 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Received message with key: y and value: {:x 10, :y 20} 2020-12-25 18:16:02.477 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: y and value: {:result 30} 2020-12-25 18:16:02.478 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: y and value: {:result 30, :timestamp Fri Dec 25 18:16:02 CET 2020}
15. Rozdělení streamu do více částí
Někdy se setkáme s nutností rozdělení celého streamu do více částí, což by v terminologii klasických Unixových kolon velmi zhruba odpovídalo příkazu tee. I to je pochopitelně v knihovně Jackdaw možné; konkrétně je vytvoření nové větve streamu zajištěno funkcí jackdaw.streams/through, která zajistí uložení zprávy do zvoleného tématu. Pokud tedy budeme chtít jednotlivé (částečně) transformované zprávy postupně uložit do témat nazvaných „output1“, „output2“ a „output3“, bude konfigurace kolony vypadat takto:
(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/through (:output-1 topic-config)) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-2 topic-config)) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output-3 topic-config))) builder)
Zdrojový kód příkladu je následně nutné upravit tak, aby se nová témata na začátku smazala a ihned poté znovu inicializovala:
(ns stream-pipe-4.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-1 {:topic-name "output1" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-2 {:topic-name "output2" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-3 {:topic-name "output3" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0" "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn etl-1 "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}]) (defn etl-2 "Transformační funkce." [[k v]] [k (assoc v :timestamp (str (new java.util.Date)))]) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/through (:output-1 topic-config)) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-2 topic-config)) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output-3 topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output-1 topic-config)) (delete-topic broker-config (:output-2 topic-config)) (delete-topic broker-config (:output-3 topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output-1 topic-config)) (new-topic broker-config (:output-2 topic-config)) (new-topic broker-config (:output-3 topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))
16. Spuštění a otestování dnešního pátého demonstračního příkladu
Pro otestování funkce dnešního pátého demonstračního příkladu nejdříve do vstupního tématu pošleme zprávu s korektním formátem:
$ kafkacat -P -b localhost:9092 -t input -K: "x":{"x":1, "y":2}
Následně aplikaci spustíme a budeme sledovat její činnost:
$ lein run 2020-12-25 19:09:08.876 | WARN | main | stream-pipe-4.core | Deleting topic input 2020-12-25 19:09:09.099 | WARN | main | stream-pipe-4.core | Deleting topic output1 2020-12-25 19:09:09.115 | WARN | main | stream-pipe-4.core | Deleting topic output2 2020-12-25 19:09:09.128 | WARN | main | stream-pipe-4.core | Deleting topic output3 2020-12-25 19:09:09.140 | WARN | main | stream-pipe-4.core | Creating topic input 2020-12-25 19:09:09.174 | WARN | main | stream-pipe-4.core | Creating topic output1 2020-12-25 19:09:09.205 | WARN | main | stream-pipe-4.core | Creating topic output2 2020-12-25 19:09:09.242 | WARN | main | stream-pipe-4.core | Creating topic output3 2020-12-25 19:09:09.275 | WARN | main | stream-pipe-4.core | Starting application 2020-12-25 19:09:09.367 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config. 2020-12-25 19:09:09.367 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config. 2020-12-25 19:09:09.372 | WARN | main | stream-pipe-4.core | Starting pipe 2020-12-25 19:09:09.374 | WARN | main | stream-pipe-4.core | Pipe is up 2020-12-25 19:09:09.376 | WARN | main | stream-pipe-4.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x68de8522 org.apache.kafka.streams.KafkaStreams@68de8522] 2020-12-25 19:09:26.385 | WARN | pipe-3dc0cffa-c985-47f5-9e61-15896b739fa6-StreamThread-1 | stream-pipe-4.core | Received message with key: x and value: {:x 1, :y 2} 2020-12-25 19:09:26.515 | WARN | pipe-3dc0cffa-c985-47f5-9e61-15896b739fa6-StreamThread-1 | stream-pipe-4.core | Transformed message with key: x and value: {:result 3} 2020-12-25 19:09:26.642 | WARN | pipe-3dc0cffa-c985-47f5-9e61-15896b739fa6-StreamThread-1 | stream-pipe-4.core | Transformed message with key: x and value: {:result 3, :timestamp Fri Dec 25 19:09:26 CET 2020}
Podle zpráv z logu byla zpráva úspěšně zpracována, takže si vypíšeme obsah témat „output1“, „output2“ i „output3“.
V tématu „output1“ je uložena kopie vstupní zprávy:
$ kafkacat -C -b localhost:9092 -t output1 {"x":1,"y":2}
V tématu „output2“ je zpráva s vypočteným výsledkem:
$ kafkacat -C -b localhost:9092 -t output2 {"result":3}
A konečně v tématu „output3“ je zpráva s přidaným časovým razítkem:
$ kafkacat -C -b localhost:9092 -t output3 {"result":3,"timestamp":"Fri Dec 25 19:09:26 CET 2020"}
17. Transformace zpráv založená na uzávěru
V některých aplikacích není transformační funkce čistou funkcí ve smyslu, že se jedná o funkci bez paměti a bez vedlejších efektů. Příkladem může být funkce, která by měla do zpráv vkládat hodnotu nějakého čítače atd. I toho lze – pochopitelně – v programovacím jazyku Clojure dosáhnout, a to konkrétně s použitím uzávěrů (closure); méně idiomatické a koncepčně horší by bylo použití třídy. Ukažme si tedy, jak lze vytvořit transformační uzávěr, který se volá jako běžná funkce, ovšem bude mít vnitřní paměť a umožní nám do zpráv vložit jejich index (tedy hodnotu čítače):
(def etl-3 "Transformační funkce vytvořená ve formě uzávěru." (let [counter (atom 0)] (fn [[k v]] (do (swap! counter inc) [k (assoc v :counter @counter)]))))
Změněná definice kolony s přidanou transformační funkcí etl-3:
(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/through (:output-1 topic-config)) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-2 topic-config)) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-3 topic-config)) (j/map etl-3) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output-4 topic-config))) builder)
Následuje výpis úplného zdrojového kódu dnešního posledního demonstračního příkladu:
(ns stream-pipe-5.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-1 {:topic-name "output1" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-2 {:topic-name "output2" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-3 {:topic-name "output3" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-4 {:topic-name "output4" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0" "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn etl-1 "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}]) (defn etl-2 "Transformační funkce." [[k v]] [k (assoc v :timestamp (str (new java.util.Date)))]) (def etl-3 "Transformační funkce vytvořená ve formě uzávěru." (let [counter (atom 0)] (fn [[k v]] (do (swap! counter inc) [k (assoc v :counter @counter)])))) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/through (:output-1 topic-config)) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-2 topic-config)) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-3 topic-config)) (j/map etl-3) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output-4 topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output-1 topic-config)) (delete-topic broker-config (:output-2 topic-config)) (delete-topic broker-config (:output-3 topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output-1 topic-config)) (new-topic broker-config (:output-2 topic-config)) (new-topic broker-config (:output-3 topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))
Příklad spustíme:
$ lein run 2020-12-27 15:39:39.101 | WARN | main | stream-pipe-5.core | Deleting topic input 2020-12-27 15:39:39.322 | WARN | main | stream-pipe-5.core | Deleting topic output1 2020-12-27 15:39:39.345 | WARN | main | stream-pipe-5.core | Deleting topic output2 2020-12-27 15:39:39.367 | WARN | main | stream-pipe-5.core | Deleting topic output3 2020-12-27 15:39:39.392 | WARN | main | stream-pipe-5.core | Creating topic input 2020-12-27 15:39:39.433 | WARN | main | stream-pipe-5.core | Creating topic output1 2020-12-27 15:39:39.464 | WARN | main | stream-pipe-5.core | Creating topic output2 2020-12-27 15:39:39.514 | WARN | main | stream-pipe-5.core | Creating topic output3 2020-12-27 15:39:39.542 | WARN | main | stream-pipe-5.core | Starting application 2020-12-27 15:39:39.644 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config. 2020-12-27 15:39:39.644 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config. 2020-12-27 15:39:39.649 | WARN | main | stream-pipe-5.core | Starting pipe 2020-12-27 15:39:39.651 | WARN | main | stream-pipe-5.core | Pipe is up 2020-12-27 15:39:39.653 | WARN | main | stream-pipe-5.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x185bf6e0 org.apache.kafka.streams.KafkaStreams@185bf6e0]
Pošleme zprávy (se správným formátem) do vstupního tématu:
$ kafkacat -P -b localhost:9092 -t input -K: "p1":{"x":1,"y":2} "p1":{"x":1,"y":2} "p2":{"x":0,"y":9} "p3":{"y":1,"x":2}
Přesvědčíme se, že jsou všechny zprávy ze vstupu zpracovány korektně:
2020-12-27 15:39:44.513 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Received message with key: p1 and value: {:x 1, :y 2} 2020-12-27 15:39:44.639 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p1 and value: {:result 3} 2020-12-27 15:39:44.757 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p1 and value: {:result 3, :timestamp Sun Dec 27 15:39:44 CET 2020} 2020-12-27 15:39:44.872 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p1 and value: {:result 3, :timestamp Sun Dec 27 15:39:44 CET 2020, :counter 1} 2020-12-27 15:40:01.612 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Received message with key: p2 and value: {:x 0, :y 9} 2020-12-27 15:40:01.720 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p2 and value: {:result 9} 2020-12-27 15:40:01.828 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p2 and value: {:result 9, :timestamp Sun Dec 27 15:40:01 CET 2020} 2020-12-27 15:40:01.937 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p2 and value: {:result 9, :timestamp Sun Dec 27 15:40:01 CET 2020, :counter 2} 2020-12-27 15:40:15.268 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Received message with key: p3 and value: {:y 1, :x 2} 2020-12-27 15:40:15.371 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p3 and value: {:result 3} 2020-12-27 15:40:15.475 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p3 and value: {:result 3, :timestamp Sun Dec 27 15:40:15 CET 2020} 2020-12-27 15:40:15.578 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p3 and value: {:result 3, :timestamp Sun Dec 27 15:40:15 CET 2020, :counter 3}
V posledním kroku otestujeme, že v posledním výstupním tématu „output4“ jsou zprávy obsahující i hodnotu čítače:
$ kafkacat -C -b localhost:9092 -t output4 % Reached end of topic output4 [0] at offset 0 {"result":3,"timestamp":"Sun Dec 27 15:39:44 CET 2020","counter":1} % Reached end of topic output4 [0] at offset 1 {"result":9,"timestamp":"Sun Dec 27 15:40:01 CET 2020","counter":2} % Reached end of topic output4 [0] at offset 2 {"result":3,"timestamp":"Sun Dec 27 15:40:15 CET 2020","counter":3} % Reached end of topic output4 [0] at offset 3
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Clojure byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-):
# | Projekt | Stručný popis | Cesta |
---|---|---|---|
1 | stream-pipe-0 | funkční kostra pipeline, na které jsou postaveny i další čtyři demonstrační příklady | https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/stream-pipe-0 |
2 | stream-pipe-1 | jednoduchá transformace zpráv | https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/stream-pipe-1 |
3 | stream-pipe-2 | zalogování chyby bez pádu aplikace v případě, že zpráva není uložena v JSONu | https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/stream-pipe-2 |
4 | stream-pipe-3 | složitější pipeline s několika transformačními funkcemi | https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/stream-pipe-3 |
5 | stream-pipe-4 | rozdělení streamu do více částí | https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/stream-pipe-4 |
6 | stream-pipe-5 | transformace zpráv založená na uzávěru | https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/stream-pipe-5 |
Demonstrační příklady z předchozích dvou článků o propojení systému Apache Kafka a Clojure s využitím knihovny Jackdaw [1] [2]:
19. Odkazy na předchozí části seriálu o programovacím jazyku Clojure
- Clojure 1: Úvod
http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm/ - Clojure 2: Symboly, kolekce atd.
http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-2-cast/ - Clojure 3: Funkcionální programování
http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-3-cast-funkcionalni-programovani/ - Clojure 4: Kolekce, sekvence a lazy sekvence
http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-4-cast-kolekce-sekvence-a-lazy-sekvence/ - Clojure 5: Sekvence, lazy sekvence a paralelní programy
http://www.root.cz/clanky/clojure-a-bezpecne-aplikace-pro-jvm-sekvence-lazy-sekvence-a-paralelni-programy/ - Clojure 6: Podpora pro paralelní programování
http://www.root.cz/clanky/programovaci-jazyk-clojure-6-futures-nejsou-jen-financni-derivaty/ - Clojure 7: Další funkce pro paralelní programování
http://www.root.cz/clanky/programovaci-jazyk-clojure-7-dalsi-podpurne-prostredky-pro-paralelni-programovani/ - Clojure 8: Identity, stavy, neměnné hodnoty a reference
http://www.root.cz/clanky/programovaci-jazyk-clojure-8-identity-stavy-nemenne-hodnoty-a-referencni-typy/ - Clojure 9: Validátory, pozorovatelé a kooperace s Javou
http://www.root.cz/clanky/programovaci-jazyk-clojure-9-validatory-pozorovatele-a-kooperace-mezi-clojure-a-javou/ - Clojure 10: Kooperace mezi Clojure a Javou
http://www.root.cz/clanky/programovaci-jazyk-clojure-10-kooperace-mezi-clojure-a-javou-pokracovani/ - Clojure 11: Generátorová notace seznamu/list comprehension
http://www.root.cz/clanky/programovaci-jazyk-clojure-11-generatorova-notace-seznamu-list-comprehension/ - Clojure 12: Překlad programů z Clojure do bajtkódu JVM I:
http://www.root.cz/clanky/programovaci-jazyk-clojure-12-preklad-programu-z-clojure-do-bajtkodu-jvm/ - Clojure 13: Překlad programů z Clojure do bajtkódu JVM II:
http://www.root.cz/clanky/programovaci-jazyk-clojure-13-preklad-programu-z-clojure-do-bajtkodu-jvm-pokracovani/ - Clojure 14: Základy práce se systémem maker
http://www.root.cz/clanky/programovaci-jazyk-clojure-14-zaklady-prace-se-systemem-maker/ - Clojure 15: Tvorba uživatelských maker
http://www.root.cz/clanky/programovaci-jazyk-clojure-15-tvorba-uzivatelskych-maker/ - Programovací jazyk Clojure – triky při práci s řetězci
http://www.root.cz/clanky/programovaci-jazyk-clojure-triky-pri-praci-s-retezci/ - Programovací jazyk Clojure – triky při práci s kolekcemi
http://www.root.cz/clanky/programovaci-jazyk-clojure-triky-pri-praci-s-kolekcemi/ - Programovací jazyk Clojure – práce s mapami a množinami
http://www.root.cz/clanky/programovaci-jazyk-clojure-prace-s-mapami-a-mnozinami/ - Programovací jazyk Clojure – základy zpracování XML
http://www.root.cz/clanky/programovaci-jazyk-clojure-zaklady-zpracovani-xml/ - Programovací jazyk Clojure – testování s využitím knihovny Expectations
http://www.root.cz/clanky/programovaci-jazyk-clojure-testovani-s-vyuzitim-knihovny-expectations/ - Programovací jazyk Clojure – některé užitečné triky použitelné (nejenom) v testech
http://www.root.cz/clanky/programovaci-jazyk-clojure-nektere-uzitecne-triky-pouzitelne-nejenom-v-testech/ - Enlive – výkonný šablonovací systém pro jazyk Clojure
http://www.root.cz/clanky/enlive-vykonny-sablonovaci-system-pro-jazyk-clojure/ - Nástroj Leiningen a programovací jazyk Clojure: tvorba vlastních knihoven pro veřejný repositář Clojars
http://www.root.cz/clanky/nastroj-leiningen-a-programovaci-jazyk-clojure-tvorba-vlastnich-knihoven-pro-verejny-repositar-clojars/ - Novinky v Clojure verze 1.8.0
http://www.root.cz/clanky/novinky-v-clojure-verze-1–8–0/ - Asynchronní programování v Clojure s využitím knihovny core.async
http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async/ - Asynchronní programování v Clojure s využitím knihovny core.async (pokračování)
http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async-pokracovani/ - Asynchronní programování v Clojure s využitím knihovny core.async (dokončení)
http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async-dokonceni/ - Vytváříme IRC bota v programovacím jazyce Clojure
http://www.root.cz/clanky/vytvarime-irc-bota-v-programovacim-jazyce-clojure/ - Gorilla REPL: interaktivní prostředí pro programovací jazyk Clojure
https://www.root.cz/clanky/gorilla-repl-interaktivni-prostredi-pro-programovaci-jazyk-clojure/ - Multimetody v Clojure aneb polymorfismus bez použití OOP
https://www.root.cz/clanky/multimetody-v-clojure-aneb-polymorfismus-bez-pouziti-oop/ - Práce s externími Java archivy v programovacím jazyku Clojure
https://www.root.cz/clanky/prace-s-externimi-java-archivy-v-programovacim-jazyku-clojure/ - Clojure 16: Složitější uživatelská makra
http://www.root.cz/clanky/programovaci-jazyk-clojure-16-slozitejsi-uzivatelska-makra/ - Clojure 17: Využití standardních maker v praxi
http://www.root.cz/clanky/programovaci-jazyk-clojure-17-vyuziti-standardnich-maker-v-praxi/ - Clojure 18: Základní techniky optimalizace aplikací
http://www.root.cz/clanky/programovaci-jazyk-clojure-18-zakladni-techniky-optimalizace-aplikaci/ - Clojure 19: Vývojová prostředí pro Clojure
http://www.root.cz/clanky/programovaci-jazyk-clojure-19-vyvojova-prostredi-pro-clojure/ - Clojure 20: Vývojová prostředí pro Clojure (Vimu s REPL)
http://www.root.cz/clanky/programovaci-jazyk-clojure-20-vyvojova-prostredi-pro-clojure-integrace-vimu-s-repl/ - Clojure 21: ClojureScript aneb překlad Clojure do JS
http://www.root.cz/clanky/programovaci-jazyk-clojure-21-clojurescript-aneb-preklad-clojure-do-javascriptu/ - Leiningen: nástroj pro správu projektů napsaných v Clojure
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure/ - Leiningen: nástroj pro správu projektů napsaných v Clojure (2)
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-2/ - Leiningen: nástroj pro správu projektů napsaných v Clojure (3)
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-3/ - Leiningen: nástroj pro správu projektů napsaných v Clojure (4)
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-4/ - Leiningen: nástroj pro správu projektů napsaných v Clojure (5)
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-5/ - Leiningen: nástroj pro správu projektů napsaných v Clojure (6)
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-6/ - Programovací jazyk Clojure a databáze (1.část)
http://www.root.cz/clanky/programovaci-jazyk-clojure-a-databaze-1-cast/ - Pluginy pro Leiningen
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-pluginy-pro-leiningen/ - Programovací jazyk Clojure a knihovny pro práci s vektory a maticemi
http://www.root.cz/clanky/programovaci-jazyk-clojure-a-knihovny-pro-praci-s-vektory-a-maticemi/ - Programovací jazyk Clojure a knihovny pro práci s vektory a maticemi (2)
http://www.root.cz/clanky/programovaci-jazyk-clojure-a-knihovny-pro-praci-s-vektory-a-maticemi-2/ - Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk
http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk/ - Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk (2)
http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk-2/ - Seesaw: knihovna pro snadnou tvorbu GUI v jazyce Clojure
http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure/ - Seesaw: knihovna pro snadnou tvorbu GUI v azyce Clojure (2)
http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure-2/ - Seesaw: knihovna pro snadnou tvorbu GUI v jazyce Clojure (3)
http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure-3/ - Programovací jazyk Clojure a práce s Gitem
http://www.root.cz/clanky/programovaci-jazyk-clojure-a-prace-s-gitem/ - Programovací jazyk Clojure a práce s Gitem (2)
http://www.root.cz/clanky/programovaci-jazyk-clojure-a-prace-s-gitem-2/ - Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk (dokončení)
http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk-dokonceni/ - Pixie: lehký skriptovací jazyk s „kouzelnými“ schopnostmi
https://www.root.cz/clanky/pixie-lehky-skriptovaci-jazyk-s-kouzelnymi-schopnostmi/ - Programovací jazyk Pixie: funkce ze základní knihovny a použití FFI
https://www.root.cz/clanky/programovaci-jazyk-pixie-funkce-ze-zakladni-knihovny-a-pouziti-ffi/ - Novinky v Clojure verze 1.9.0
https://www.root.cz/clanky/novinky-v-clojure-verze-1–9–0/ - Validace dat s využitím knihovny spec v Clojure 1.9.0
https://www.root.cz/clanky/validace-dat-s-vyuzitim-knihovny-spec-v-clojure-1–9–0/ - Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure
https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure/ - Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure (2)
https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure-2/ - Incanter: prostředí pro statistické výpočty s grafickým výstupem založené na Clojure
https://www.root.cz/clanky/incanter-prostredi-pro-statisticke-vypocty-s-grafickym-vystupem-zalozene-na-clojure/ - Incanter: operace s maticemi
https://www.root.cz/clanky/incanter-operace-s-maticemi/ - Interpret programovacího jazyka Clojure integrovaný do Jupyter Notebooku
https://www.root.cz/clanky/interpret-programovaciho-jazyka-clojure-integrovany-do-jupyter-notebooku/ - Babashka: interpret Clojure určený pro rychlé spouštění utilit z příkazového řádku
https://www.root.cz/clanky/babashka-interpret-clojure-urceny-pro-rychle-spousteni-utilit-z-prikazoveho-radku/ - Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw/ - Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw (2. část)
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw-2-cast/
20. Odkazy na Internetu
- Learn Clojure – Flow Control
https://clojure.org/guides/learn/flow - ETL Batch Processing With Kafka?
https://medium.com/swlh/etl-batch-processing-with-kafka-7f66f843e20d - ETL with Kafka
https://blog.codecentric.de/en/2018/03/etl-kafka/ - Building ETL Pipelines with Clojure and Transducers
https://www.grammarly.com/blog/engineering/building-etl-pipelines-with-clojure-and-transducers/ - pipeline (možné použít pro ETL)
https://clojuredocs.org/clojure.core.async/pipeline - On Track with Apache Kafka – Building a Streaming ETL Solution with Rail Data
https://www.confluent.io/blog/build-streaming-etl-solutions-with-kafka-and-rail-data/ - Kafka – Understanding Offset Commits
https://www.logicbig.com/tutorials/misc/kafka/committing-offsets.html - fundingcircle/jackdaw (na Clojars)
https://clojars.org/fundingcircle/jackdaw/versions/0.7.6 - Dokumentace ke knihovně jackdaw
https://cljdoc.org/d/fundingcircle/jackdaw/0.7.6/doc/readme - Jackdaw AdminClient API
https://cljdoc.org/d/fundingcircle/jackdaw/0.7.6/doc/jackdaw-adminclient-api - Jackdaw Client API
https://cljdoc.org/d/fundingcircle/jackdaw/0.7.6/doc/jackdaw-client-api - Kafka.clj
https://github.com/helins-io/kafka.clj - Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách
https://www.root.cz/clanky/pouziti-nastroje-apache-kafka-v-aplikacich-zalozenych-na-mikrosluzbach/ - Apache Kafka: distribuovaná streamovací platforma
https://www.root.cz/clanky/apache-kafka-distribuovana-streamovaci-platforma/ - Real-Time Payments with Clojure and Apache Kafka (podcast)
https://www.evidentsystems.com/news/confluent-podcast-about-apache-kafka/ - Kafka and Clojure – Immutable event streams
https://practicalli.github.io/kafka-and-clojure/ - Kafka Streams, the Clojure way
https://blog.davemartin.me/posts/kafka-streams-the-clojure-way/ - dvlopt.kafka na GitHubu
https://github.com/helins-io/kafka.clj - kafka-streams-the-clojure-way na GitHubu
https://github.com/DaveWM/kafka-streams-the-clojure-way - babashka: A Clojure babushka for the grey areas of Bash
https://github.com/borkdude/babashka - Babashka and the Small Clojure Interpreter @ ClojureD 2020 (slajdy)
https://speakerdeck.com/borkdude/babashka-and-the-small-clojure-interpreter-at-clojured-2020 - Babashka: ukázky použití
https://github.com/borkdude/babashka/blob/master/doc/examples.md - clojureD 2020: „Babashka and Small Clojure Interpreter: Clojure in new contexts“ by Michiel Borkent
https://www.youtube.com/watch?v=Nw8aN-nrdEk&t=5s - Meetup #124 Babashka, implementing an nREPL server & game engines with Clojure
https://www.youtube.com/watch?v=0YmZYnwyHHc - The Last Programming Language (shrnutí vývoje programovacích jazyků)
https://www.youtube.com/watch?v=P2yr-3F6PQo - Shebang (Unix): Wikipedia EN
https://en.wikipedia.org/wiki/Shebang_(Unix) - Shebang (Unix): Wikipedia CZ
https://cs.wikipedia.org/wiki/Shebang_(Unix) - How to create Clojure notebooks in Jupyter
https://s01blog.wordpress.com/2017/12/10/how-to-create-clojure-notebooks-in-jupyter/ - Dokumentace k nástroji Conda
https://docs.conda.io/en/latest/ - Notebook interface
https://en.wikipedia.org/wiki/Notebook_interface - Jypyter: open source, interactive data science and scientific computing across over 40 programming languages
https://jupyter.org/ - Calysto Scheme
https://github.com/Calysto/calysto_scheme - scheme.py (základ projektu Calysto Scheme)
https://github.com/Calysto/calysto_scheme/blob/master/calysto_scheme/scheme.py - Humane test output for clojure.test
https://github.com/pjstadig/humane-test-output - iota
https://github.com/juxt/iota - 5 Differences between clojure.spec and Schema
https://lispcast.com/clojure.spec-vs-schema/ - Schema: Clojure(Script) library for declarative data description and validation
https://github.com/plumatic/schema - Zip archiv s Clojure 1.9.0
http://repo1.maven.org/maven2/org/clojure/clojure/1.9.0/clojure-1.9.0.zip - Clojure 1.9 is now available
https://clojure.org/news/2017/12/08/clojure19 - Deps and CLI Guide
https://clojure.org/guides/deps_and_cli - Changes to Clojure in Version 1.9
https://github.com/clojure/clojure/blob/master/changes.md - clojure.spec – Rationale and Overview
https://clojure.org/about/spec - Zip archiv s Clojure 1.8.0
http://repo1.maven.org/maven2/org/clojure/clojure/1.8.0/clojure-1.8.0.zip - Clojure 1.8 is now available
http://clojure.org/news/2016/01/19/clojure18 - Socket Server REPL
http://dev.clojure.org/display/design/Socket+Server+REPL - CLJ-1671: Clojure socket server
http://dev.clojure.org/jira/browse/CLJ-1671 - CLJ-1449: Add clojure.string functions for portability to ClojureScript
http://dev.clojure.org/jira/browse/CLJ-1449 - Launching a Socket Server
http://clojure.org/reference/repl_and_main#_launching_a_socket_server - API for clojure.string
http://clojure.github.io/clojure/branch-master/clojure.string-api.html - Clojars:
https://clojars.org/ - Seznam knihoven na Clojars:
https://clojars.org/projects - Clojure Cookbook: Templating HTML with Enlive
https://github.com/clojure-cookbook/clojure-cookbook/blob/master/07_webapps/7–11_enlive.asciidoc - An Introduction to Enlive
https://github.com/swannodette/enlive-tutorial/ - Enlive na GitHubu
https://github.com/cgrand/enlive - Expectations: příklady atd.
http://jayfields.com/expectations/ - Expectations na GitHubu
https://github.com/jaycfields/expectations - Lein-expectations na GitHubu
https://github.com/gar3thjon3s/lein-expectations - Testing Clojure With Expectations
https://semaphoreci.com/blog/2014/09/23/testing-clojure-with-expectations.html - Clojure testing TDD/BDD libraries: clojure.test vs Midje vs Expectations vs Speclj
https://www.reddit.com/r/Clojure/comments/1viilt/clojure_testing_tddbdd_libraries_clojuretest_vs/ - Testing: One assertion per test
http://blog.jayfields.com/2007/06/testing-one-assertion-per-test.html - Rewriting Your Test Suite in Clojure in 24 hours
http://blog.circleci.com/rewriting-your-test-suite-in-clojure-in-24-hours/ - Clojure doc: zipper
http://clojuredocs.org/clojure.zip/zipper - Clojure doc: parse
http://clojuredocs.org/clojure.xml/parse - Clojure doc: xml-zip
http://clojuredocs.org/clojure.zip/xml-zip - Clojure doc: xml-seq
http://clojuredocs.org/clojure.core/xml-seq - Parsing XML in Clojure
https://github.com/clojuredocs/guides - Clojure Zipper Over Nested Vector
https://vitalyper.wordpress.com/2010/11/23/clojure-zipper-over-nested-vector/ - Understanding Clojure's PersistentVector implementation
http://blog.higher-order.net/2009/02/01/understanding-clojures-persistentvector-implementation - Understanding Clojure's PersistentHashMap (deftwice…)
http://blog.higher-order.net/2009/09/08/understanding-clojures-persistenthashmap-deftwice.html - Assoc and Clojure's PersistentHashMap: part ii
http://blog.higher-order.net/2010/08/16/assoc-and-clojures-persistenthashmap-part-ii.html - Ideal Hashtrees (paper)
http://lampwww.epfl.ch/papers/idealhashtrees.pdf - 4Clojure
http://www.4clojure.com/ - ClojureDoc (rozcestník s dokumentací jazyka Clojure)
http://clojuredocs.org/ - Clojure (na Wikipedia EN)
http://en.wikipedia.org/wiki/Clojure - Clojure (na Wikipedia CS)
http://cs.wikipedia.org/wiki/Clojure - SICP (The Structure and Interpretation of Computer Programs)
http://mitpress.mit.edu/sicp/ - Pure function
http://en.wikipedia.org/wiki/Pure_function - Funkcionální programování
http://cs.wikipedia.org/wiki/Funkcionální_programování - Čistě funkcionální (datové struktury, jazyky, programování)
http://cs.wikipedia.org/wiki/Čistě_funkcionální - Clojure Macro Tutorial (Part I, Getting the Compiler to Write Your Code For You)
http://www.learningclojure.com/2010/09/clojure-macro-tutorial-part-i-getting.html - Clojure Macro Tutorial (Part II: The Compiler Strikes Back)
http://www.learningclojure.com/2010/09/clojure-macro-tutorial-part-ii-compiler.html - Clojure Macro Tutorial (Part III: Syntax Quote)
http://www.learningclojure.com/2010/09/clojure-macro-tutorial-part-ii-syntax.html - Tech behind Tech: Clojure Macros Simplified
http://techbehindtech.com/2010/09/28/clojure-macros-simplified/ - Fatvat – Exploring functional programming: Clojure Macros
http://www.fatvat.co.uk/2009/02/clojure-macros.html