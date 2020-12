Obsah

1. Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony)

Ve třetím (předposledním) článku o kombinaci programovacího jazyka Clojure a streamovací platformy Apache Kafka se zaměříme na popis skutečného streamingu využívajícího funkcionální vlastnosti jazyka Clojure. Ukážeme si konstrukci složitější „kolony“, rozdělení streamů (proudu zpráv) do více kolon atd. Popsány, popř. použity budou následující funkce a makra ze jmenného prostoru jackdaw.streams:

# Funkce/makro Použito Stručný popis 1 jackdaw.streams/start řízení aplikace zahájení činnosti kolony 2 jackdaw.streams/close řízení aplikace uzavření kolony 3 jackdaw.streams/kstream vytvoření proudu vytvoření proudu konzumujícího zprávy z vybraného tématu 4 jackdaw.streams/to uvnitř proudu uložení (materializace) zpráv z kolony do tématu 5 jackdaw.streams/map uvnitř proudu transformace zpráv 6 jackdaw.streams/map-values uvnitř proudu transformace zpráv (pouze hodnot zpráv, bez klíčů) 7 jackdaw.streams/peek kolona přečtení zprávy z kolony postranním kanálem bez porušení činnosti kolony 8 jackdaw.streams/through kolona vytvoření nové kolony rozdělením streamu 9 jackdaw.streams/merge kolona spojení dvou kolon 10 jackdaw.streams/filter kolona filtrace zpráv na základě predikátu 11 jackdaw.streams/filter-not kolona filtrace zpráv na základě predikátu

Poznámka: připomeňme si, že z hlediska jazyka Clojure je celý proud zpráv (stream) považován za nekonečnou a současně i líně vyhodnocovanou sekvenci.

2. Kostra pipeline, na které jsou založeny všechny navazující demonstrační příklady

V úvodní části dnešního článku si ukážeme kostru zcela funkční pipeline (kolony), na které budou založeny i všechny navazující demonstrační příklady. Struktura projektu s touto pipeline, která je mimochodem dostupná na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-0, vypadá následovně (zvýrazněny jsou ty soubory, které jsou popsány v navazujícím textu):

. ├── CHANGELOG.md ├── README.md ├── LICENSE ├── doc │ └── intro.md ├── project.clj ├── resources │ └── log4j.properties ├── src │ └── stream_pipe_0 │ └── core.clj └── test └── stream_pipe_0 └── core_test.clj

Tato kostra byla získána příkazem:

$ lein new app stream-pipe-0

Nejdůležitější je popis projektu a jeho závislostí uvedený v souboru project.clj. Ručně provedené změny jsou zvýrazněny tučným písmem:

(defproject stream-pipe-0 "0.1.0-SNAPSHOT" :description "FIXME: write description" :url "http://example.com/FIXME" :license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0" :url "https://www.eclipse.org/legal/epl-2.0/"} :dependencies [[org.clojure/clojure "1.10.1"] [fundingcircle/jackdaw "0.7.6"] [org.clojure/tools.logging "0.3.1"] [log4j/log4j "1.2.17" :exclusions [javax.mail/mail javax.jms/jms com.sun.jmdk/jmxtools com.sun.jmx/jmxri]] [org.slf4j/slf4j-log4j12 "1.6.6"]] :main ^:skip-aot stream-pipe-0.core :target-path "target/%s" :resource-paths ["resources/"] :profiles {:uberjar {:aot :all :jvm-opts ["-Dclojure.compiler.direct-linking=true"]}})

Aby knihovna Jackdaw vypisovala všechny důležité informace při inicializaci i běhu pipeline, musíme nakonfigurovat logování. Jedná se o soubor nazvaný log4.properties uložený v podadresáři resources. V tomto souboru je možné definovat, jakým způsobem bude proveden výpis logovacích informací na konzoli i do logovacího souboru (popř. i na jiná místa). Úroveň logovacích informací postačuje nastavit na WARN, čímž se (oproti úrovním DEBUG či INFO) poměrně dosti zásadním způsobem omezí výpis nepodstatných detailů. Navíc jsou v tomto souboru nastaveny i další důležité vlastnosti logování – maximální velikost logovacího souboru, jeho umístění v souborovém systému, případné rotace, formát výstupu aj.:

log4j.rootLogger=WARN, file, console log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=log/chainring.log log4j.appender.file.MaxFileSize=5MB log4j.appender.file.MaxBackupIndex=50 log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} | %-5p | %t | %c | %m%n log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} | %-5p | %t | %c | %m%n

3. Konfigurace témat, definice pipeline i aplikace, v jejímž rámci pipeline poběží

Zdrojový kód celé aplikace s pipeline je uložen v souboru nazvaném src/stream_pipe 0 /core.clj. První důležitou hodnotou je mapa obsahující konfiguraci vstupního a výstupního tématu, resp. přesněji řečeno tématu, který je použit pro vstup dat (zpráv) i tématu použitého pro výstup (uložení výsledných zpráv). Obě témata jsou nakonfigurována takovým způsobem, že klíče i hodnoty jsou reprezentovány v JSONu:

(def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}})

Následuje mapa obsahující konfiguraci celé aplikace – její jednoznačný identifikátor použitý v systému Kafka, určení adresy brokeru, nastavení vyrovnávací paměti atd. Interně je tato konfigurace převedena na properties:

(def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0"})

Funkce pojmenovaná build-topology slouží pro specifikaci konfigurace celé kolony. Prozatím se jedná o značně zjednodušenou kolonu:

Je vytvořen proud zpráv čtených (konzumovaných) ze vstupního tématu. Proud zpráv se ukládá do tématu výstupního.

Pro vytvoření proudu se používá funkce nazvaná jackdaw.streams/kstream, pro uložení zpráv z proudu pak funkce jackdaw.streams/to. Povšimněte si použití threading makra, což je jedno z mnoha velmi užitečných maker programovacího jazyka Clojure:

(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/to (:output topic-config))) builder)

Dostáváme se ke dvojici funkcí určených pro spuštění aplikace, resp. pro její zastavení. Spuštění aplikace se skládá z inicializace pipeline, inicializace vlastní aplikace a z jejího následného spuštění funkcí jackdaw.streams/start:

(defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app))

Opakem funkce jackdaw.streams/start je funkce se jménem jackdaw.streams/close, která aplikaci zastaví a uvolní všechny prostředky:

(defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down"))

Aplikace je nakonfigurována a spuštěna z funkce -main. Ovšem ještě před vlastním spuštěním aplikace pro jistotu smažeme existující témata a znovu je vytvoříme:

(defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))

Poznámka: povšimněte si, že v celém zdrojovém kódu důsledně používáme logování dostupné přes dnes již prakticky standardní balíček (jmenný prostor) clojure.tools.logging.

4. Úplný zdrojový kód první aplikace s funkční pipeline

Úplný zdrojový kód aplikace s plně funkční pipeline je dostupný na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-0/src/stream_pipe 0 /core.clj. Jednotlivé definice hodnot i funkcí jsou popsány v dokumentačních řetězcích:

(ns stream-pipe-0.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/to (:output topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))

5. Spuštění a otestování dnešního prvního demonstračního příkladu

Demonstrační příklad popsaný v předchozích třech kapitolách spustíme příkazem:

$ lein run

Na standardním výstupu by se měly objevit informace o tom, že byla vymazána témata „input“ a „output“, že došlo k znovuvytvoření těchto témat a konečně, že se pipeline (kolona) spustila a očekává zprávy, které budou následně zkonzumovány (současně se v podadresáři log tvoří logovací soubor s podrobnějšími informacemi):

2020-12-23 16:28:36.354 | WARN | main | stream-pipe-0.core | Deleting topic input 2020-12-23 16:28:36.599 | WARN | main | stream-pipe-0.core | Deleting topic output 2020-12-23 16:28:36.618 | WARN | main | stream-pipe-0.core | Creating topic input 2020-12-23 16:28:36.650 | WARN | main | stream-pipe-0.core | Creating topic output 2020-12-23 16:28:36.678 | WARN | main | stream-pipe-0.core | Starting application 2020-12-23 16:28:36.797 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config. 2020-12-23 16:28:36.797 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config. 2020-12-23 16:28:36.802 | WARN | main | stream-pipe-0.core | Starting pipe 2020-12-23 16:28:36.805 | WARN | main | stream-pipe-0.core | Pipe is up 2020-12-23 16:28:36.807 | WARN | main | stream-pipe-0.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x61d8a491 org.apache.kafka.streams.KafkaStreams@61d8a491]

V dalším terminálu použijeme nástroj kafkacat pro poslání zprávy, resp. přesněji řečeno většího množství zpráv do tématu „input“. Zprávy musí být ve formátu JSON, ovšem jejich obsah není žádným způsobem kontrolován:

$ kafkacat -P -b localhost:9092 -t input {"foo":"bar"} {}

A konečně se ve třetím terminálu pokusíme (opět nástrojem kafkacat) přečíst zprávy z tématu „output“. Formát těchto zpráv i jejich obsah by měl odpovídat zprávám posílaným ze druhého terminálu:

$ kafkacat -C -b localhost:9092 -t output % Reached end of topic output [0] at offset 1 {"foo":"bar"} % Reached end of topic output [0] at offset 2 {} % Reached end of topic output [0] at offset 3

6. Transformace zpráv neboli „T“ v „Extract, transform, load“

Konzumace zpráv z jednoho tématu s jejich přímým uložením do tématu jiného samozřejmě nepředstavuje žádnou přelomovou technologii, takže si nyní ukážeme, jak by mohla probíhat nějaká skutečná transformace zpráv. Samotná transformace může být (v těch jednodušších případech) představována čistou funkcí, která pouze zpracuje svůj vstup (tedy zprávu získanou z proudu) a jejím výstupem bude transformovaná zpráva. Zprávy v proudu obsahují dvojici klíč+hodnota, kterou rozdělíme na klíč a hodnotu a vytvoříme z nich zprávu novou. Toto rozdělení je možné provést buď explicitně:

(defn etl "Transformační funkce." [message] (let [k (nth message 0) v (nth message 1)] [k {:result (+ (:x v) (:y v))}]))

Ovšem kratší a idiomatičtější kód by v Clojure mohl vypadat takto:

(defn etl "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}])

Nyní tedy máme funkci, která dokáže zprávu rozdělit na klíč a hodnotu a vytvořit zprávu novou. Klíč nové zprávy zůstane původní, ale hodnota se bude lišit – z původní zprávy se získají hodnoty uložené pod klíči „x“ a „y“, tyto hodnoty se sečtou a výsledek se uloží do výsledné zprávy pod klíčem „result“ (i výsledná zpráva bude uložena v JSONu).

Transformační funkci přidáme do pipeliny s využitím jackdaw.streams/map (viz podtržený řádek):

(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/map etl) (j/to (:output topic-config))) builder)

Poznámka: tímto způsobem lze do pipeliny přidat i libovolné další množství transformačních funkcí. Ty budou tvořit skutečnou kolonu a představovat tak funkcionální obdobu klasické Unixové kolony.

7. Přečtení zpráv z kolony beze změny její funkce (logování atd.)

V některých případech je užitečné zprávy z kolony získat „vedlejším kanálem“, tedy tak, aby se činnost kolony neporušila. Takto lze například zajistit logování zpráv atd. I pro tuto činnost existuje v knihovně Jackdaw funkce (vkládaná do kolony); ta se jmenuje jackdaw.streams/peek. Této funkci se předá jak vlastní stream, tak i funkce zavolaná pro každou zprávu. Při vložení jackdaw.streams/peek do threading makra se pochopitelně první parametr vynechá.

Snadno tedy můžeme do logu vložit jak informaci o původní zprávě, tak i o zprávě transformované:

(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek log-message) (j/map etl) (j/peek log-message) (j/to (:output topic-config))) builder)

Často se setkáme s tím, že jednoduché funkce (typu log-message či etl v našem případě) jsou definovány jako funkce anonymní:

(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/map etl) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder)

peek je zvoleno korektně, protože se u abstraktních datových typů (a nekonečné sekvence – streamy – sem také spadají) jedná o operaci určenou pro přečtení prvku bez jeho odstranění z daného typu (viz též Poznámka: jméno funkceje zvoleno korektně, protože se u abstraktních datových typů (a nekonečné sekvence – streamy – sem také spadají) jedná o operaci určenou pro přečtení prvku bez jeho odstranění z daného typu (viz též https://en.wikipedia.org/wi­ki/Peek_(data_type_operati­on)

8. Úplný zdrojový kód druhé aplikace s funkční pipeline

Úplný zdrojový kód dnešní druhé aplikace s plně funkční pipeline je dostupný na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-1/src/stream_pipe 1 /core.clj. Jednotlivé definice hodnot i funkcí jsou popsány v dokumentačních řetězcích:

(ns stream-pipe-1.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn etl "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}]) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/map etl) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))

9. Spuštění a otestování dnešního druhého demonstračního příkladu

Opět si můžeme vyzkoušet činnost dnešního druhého demonstračního příkladu, tentokrát i s transformací zpráv a s logováním přidaným do kolony. Nejprve aplikaci spustíme:

$ lein run 2020-12-23 16:52:16.362 | WARN | main | stream-pipe-1.core | Deleting topic input 2020-12-23 16:52:16.600 | WARN | main | stream-pipe-1.core | Deleting topic output 2020-12-23 16:52:16.615 | WARN | main | stream-pipe-1.core | Creating topic input 2020-12-23 16:52:16.643 | WARN | main | stream-pipe-1.core | Creating topic output 2020-12-23 16:52:16.669 | WARN | main | stream-pipe-1.core | Starting application 2020-12-23 16:52:16.786 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config. 2020-12-23 16:52:16.787 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config. 2020-12-23 16:52:16.792 | WARN | main | stream-pipe-1.core | Starting pipe 2020-12-23 16:52:16.796 | WARN | main | stream-pipe-1.core | Pipe is up 2020-12-23 16:52:16.798 | WARN | main | stream-pipe-1.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x6a97517 org.apache.kafka.streams.KafkaStreams@6a97517]

Následně do vstupního tématu pošleme několik zpráv ve správném formátu, tedy s klíčem i hodnotou v JSONu, přičemž hodnota obsahuje dvě numerické hodnoty s klíči x a y:

$ kafkacat -P -b localhost:9092 -t input -K: "***":{"x":1, "y":2} "---":{"x":100, "y":-100}

Obě zprávy by měly být správně zpracovány, což se dozvíme z logu (nejprve je zalogována zpráva vstupní, poté zpráva transformovaná):

2020-12-23 16:52:18.853 | WARN | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Received message with key: *** and value: {:x 1, :y 2} 2020-12-23 16:52:18.856 | WARN | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Transformed message with key: *** and value: {:result 3} 2020-12-23 16:52:33.498 | WARN | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Received message with key: --- and value: {:x 100, :y -100} 2020-12-23 16:52:33.498 | WARN | pipe-2d6a6caf-3ee0-49ec-a0d9-35edbf0cfe7c-StreamThread-1 | stream-pipe-1.core | Transformed message with key: --- and value: {:result 0}

A konečně výsledky přečteme z výstupního tématu:

$ kafkacat -C -b localhost:9092 -t output % Reached end of topic output [0] at offset 0 {"result":3} % Reached end of topic output [0] at offset 1 {"result":0}

10. Chování aplikace v případě příjmu nekorektní zprávy

Nyní si vyzkoušejme, co se stane v případě, že do vstupního tématu pošleme zprávu s neočekávaným obsahem, například zprávu, která vůbec není zakódována do JSONu:

$ kafkacat -P -b localhost:9092 -t input xyz

V takovém případě dojde k pádu aplikace, a to již při pokusu o deserializaci zprávy:

2020-12-23 16:32:57.649 | ERROR | pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1 | org.apache.kafka.streams.errors.LogAndFailExceptionHandler | Exception caught during Deserialization, taskId: 0_0, topic: input, partition: 0, offset: 0 java.lang.Exception: JSON error (unexpected character): x at clojure.data.json$_read.invokeStatic(json.clj:226) at clojure.data.json$_read.invoke(json.clj:177) at clojure.data.json$read.invokeStatic(json.clj:272) at clojure.data.json$read.doInvoke(json.clj:228) at clojure.lang.RestFn.applyTo(RestFn.java:139) at clojure.core$apply.invokeStatic(core.clj:667) at clojure.core$apply.invoke(core.clj:660) at clojure.data.json$read_str.invokeStatic(json.clj:278) at clojure.data.json$read_str.doInvoke(json.clj:274) at clojure.lang.RestFn.invoke(RestFn.java:439) at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34) at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60) at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788) 2020-12-23 16:32:57.652 | ERROR | pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1 | org.apache.kafka.streams.processor.internals.StreamThread | stream-thread [pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately. at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80) at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788) Caused by: java.lang.Exception: JSON error (unexpected character): x at clojure.data.json$_read.invokeStatic(json.clj:226) at clojure.data.json$_read.invoke(json.clj:177) at clojure.data.json$read.invokeStatic(json.clj:272) at clojure.data.json$read.doInvoke(json.clj:228) at clojure.lang.RestFn.applyTo(RestFn.java:139) at clojure.core$apply.invokeStatic(core.clj:667) at clojure.core$apply.invoke(core.clj:660) at clojure.data.json$read_str.invokeStatic(json.clj:278) at clojure.data.json$read_str.doInvoke(json.clj:274) at clojure.lang.RestFn.invoke(RestFn.java:439) at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34) at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60) at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) ... 8 more 2020-12-23 16:32:57.662 | ERROR | pipe-934ea525-1ffd-4c1a-b8a0-89a827416553-StreamThread-1 | org.apache.kafka.streams.KafkaStreams | stream-client [pipe-934ea525-1ffd-4c1a-b8a0-89a827416553] All stream threads have died. The instance will be in error state and should be closed.

11. Zalogování chyby bez pádu aplikace

Aby aplikace nezhavarovala při jakémkoli pokusu o zpracování zprávy s nekorektním formátem, budeme muset nastavit odlišný handler, tedy přesněji řečeno třídu implementující rozhraní DeserializationExceptionHandler. To lze udělat více způsoby – vytvořit si takovou třídu s využitím proxy, popř. gen-class. Taková třída musí mít přepsánu metodu handle:

DeserializationExceptionHandler.DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[],byte[]> record, Exception exception)

Existuje ovšem ještě jednodušší řešení – použít již vhodně připravenou třídu, konkrétně třídu org.apache.kafka.streams.e­rrors.LogAndContinueExcep­tionHandler. To se provede následujícím způsobem při konfiguraci aplikace (poslední podtržený řádek):

(def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0" "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"})

Poznámka: třídu je nutné specifikovat jménem (řetězec) nebo přímo identifikátorem třídy.

Pro úplnost si uveďme celý zdrojový kód takto upraveného demonstračního příkladu (i když je úprava jednořádková):

(ns stream-pipe-2.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0" "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn etl "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}]) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/map etl) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))

12. Chování upravené aplikace v příjmu nekorektní zprávy

Nyní si můžeme chování aplikace upravené v rámci předchozí kapitoly otestovat. Nejdříve s korektními zprávami:

$ kafkacat -P -b localhost:9092 -t input -K: "x":{"x":1, "y":2} "y":{"x":10, "y":20}

Chování aplikace by se v takovém případě nemělo žádným způsobem změnit:

$ kafkacat -C -b localhost:9092 -t output % Reached end of topic output [0] at offset 0 {"result":3,"timestamp":"Fri Dec 25 18:15:44 CET 2020"} % Reached end of topic output [0] at offset 1 {"result":30,"timestamp":"Fri Dec 25 18:16:02 CET 2020"} % Reached end of topic output [0] at offset 2

Nyní zkusme do tématu „input“ předat nekorektně zakódovanou zprávu:

$ kafkacat -P -b localhost:9092 -t input -K: foobar

V takovém případě by se měla chyba (i celý výpis zásobníkových rámců) zalogovat, ale aplikace nezhavaruje:

2020-12-23 17:28:22.197 | WARN | pipe-3cb9ae24-57a5-4d20-b4f7-a89e9a311445-StreamThread-1 | org.apache.kafka.streams.errors.LogAndContinueExceptionHandler | Exception caught during Deserialization, taskId: 0_0, topic: input, partition: 0, offset: 0 java.lang.Exception: JSON error (expected false) at clojure.data.json$_read.invokeStatic(json.clj:215) at clojure.data.json$_read.invoke(json.clj:177) at clojure.data.json$read.invokeStatic(json.clj:272) at clojure.data.json$read.doInvoke(json.clj:228) at clojure.lang.RestFn.applyTo(RestFn.java:139) at clojure.core$apply.invokeStatic(core.clj:667) at clojure.core$apply.invoke(core.clj:660) at clojure.data.json$read_str.invokeStatic(json.clj:278) at clojure.data.json$read_str.doInvoke(json.clj:274) at clojure.lang.RestFn.invoke(RestFn.java:439) at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34) at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60) at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788) e020-12-23 17:28:22.205 | WARN | pipe-3cb9ae24-57a5-4d20-b4f7-a89e9a311445-StreamThread-1 | org.apache.kafka.streams.processor.internals.RecordDeserializer | task [0_0] Skipping record due to deserialization error. topic=[input] partition=[0] offset=[0] java.lang.Exception: JSON error (expected false) at clojure.data.json$_read.invokeStatic(json.clj:215) at clojure.data.json$_read.invoke(json.clj:177) at clojure.data.json$read.invokeStatic(json.clj:272) at clojure.data.json$read.doInvoke(json.clj:228) at clojure.lang.RestFn.applyTo(RestFn.java:139) at clojure.core$apply.invokeStatic(core.clj:667) at clojure.core$apply.invoke(core.clj:660) at clojure.data.json$read_str.invokeStatic(json.clj:278) at clojure.data.json$read_str.doInvoke(json.clj:274) at clojure.lang.RestFn.invoke(RestFn.java:439) at jackdaw.serdes.json$deserializer$fn__5866.invoke(json.clj:34) at jackdaw.serdes.fn_impl.FnDeserializer.deserialize(fn_impl.clj:37) at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:60) at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:169) at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:744) at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1045) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)

13. Složitější pipeline s několika transformačními funkcemi

Transformaci zpráv můžeme v koloně provést několikrát za sebou, opět s použitím nám již známé funkce jackdaw.streams.map. Pouze je nutné zajistit, aby transformační funkce vracely hodnotu, jejíž struktura odpovídá zprávám (dvojice klíč:hodnota):

(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/map etl-1) (j/map etl-2) (j/to (:output topic-config))) builder)

Druhá transformační funkce do zprávy přidá časové razítko:

(defn etl-2 "Transformační funkce." [[k v]] [k (assoc v :timestamp (str (new java.util.Date)))])

Pro sledování činnosti kolony je však vhodné přidat i logování s informacemi o průběhu jednotlivých transformací, takže samotná kolona bude o dva kroky delší:

(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder)

Opět si pro úplnost ukažme, jak bude vypadat úplný zdrojový kód takto upraveného demonstračního příkladu, jenž nalezneme na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-3/src/stream_pipe 3 /core.clj:

(ns stream-pipe-3.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0" "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn etl-1 "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}]) (defn etl-2 "Transformační funkce." [[k v]] [k (assoc v :timestamp (str (new java.util.Date)))]) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))

14. Spuštění a otestování dnešního čtvrtého demonstračního příkladu

Aplikaci popsanou v předchozí kapitole spustíme nám již známým příkazem lein run:

$ lein run 2020-12-25 18:15:32.305 | WARN | main | stream-pipe-3.core | Deleting topic input 2020-12-25 18:15:32.561 | WARN | main | stream-pipe-3.core | Deleting topic output 2020-12-25 18:15:32.587 | WARN | main | stream-pipe-3.core | Creating topic input 2020-12-25 18:15:32.630 | WARN | main | stream-pipe-3.core | Creating topic output 2020-12-25 18:15:32.666 | WARN | main | stream-pipe-3.core | Starting application 2020-12-25 18:15:32.776 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config. 2020-12-25 18:15:32.776 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config. 2020-12-25 18:15:32.781 | WARN | main | stream-pipe-3.core | Starting pipe 2020-12-25 18:15:32.783 | WARN | main | stream-pipe-3.core | Pipe is up 2020-12-25 18:15:32.784 | WARN | main | stream-pipe-3.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x8f374de org.apache.kafka.streams.KafkaStreams@8f374de]

Po poslání korektně zakódované zprávy je možné v logovacím souboru (popř. přímo na terminálu aplikace) sledovat, jak je zpráva postupně transformována.

První zpráva:

2020-12-25 18:15:44.393 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Received message with key: x and value: {:x 1, :y 2} 2020-12-25 18:15:44.394 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: x and value: {:result 3} 2020-12-25 18:15:44.398 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: x and value: {:result 3, :timestamp Fri Dec 25 18:15:44 CET 2020}

Druhá zpráva:

2020-12-25 18:16:02.476 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Received message with key: y and value: {:x 10, :y 20} 2020-12-25 18:16:02.477 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: y and value: {:result 30} 2020-12-25 18:16:02.478 | WARN | pipe-36f18885-8cba-4d1c-a9d7-995abcfd9a0e-StreamThread-1 | stream-pipe-3.core | Transformed message with key: y and value: {:result 30, :timestamp Fri Dec 25 18:16:02 CET 2020}

15. Rozdělení streamu do více částí

Někdy se setkáme s nutností rozdělení celého streamu do více částí, což by v terminologii klasických Unixových kolon velmi zhruba odpovídalo příkazu tee. I to je pochopitelně v knihovně Jackdaw možné; konkrétně je vytvoření nové větve streamu zajištěno funkcí jackdaw.streams/through, která zajistí uložení zprávy do zvoleného tématu. Pokud tedy budeme chtít jednotlivé (částečně) transformované zprávy postupně uložit do témat nazvaných „output1“, „output2“ a „output3“, bude konfigurace kolony vypadat takto:

(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/through (:output-1 topic-config)) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-2 topic-config)) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output-3 topic-config))) builder)

Zdrojový kód příkladu je následně nutné upravit tak, aby se nová témata na začátku smazala a ihned poté znovu inicializovala:

(ns stream-pipe-4.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-1 {:topic-name "output1" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-2 {:topic-name "output2" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-3 {:topic-name "output3" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0" "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn etl-1 "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}]) (defn etl-2 "Transformační funkce." [[k v]] [k (assoc v :timestamp (str (new java.util.Date)))]) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/through (:output-1 topic-config)) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-2 topic-config)) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output-3 topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output-1 topic-config)) (delete-topic broker-config (:output-2 topic-config)) (delete-topic broker-config (:output-3 topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output-1 topic-config)) (new-topic broker-config (:output-2 topic-config)) (new-topic broker-config (:output-3 topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))

16. Spuštění a otestování dnešního pátého demonstračního příkladu

Pro otestování funkce dnešního pátého demonstračního příkladu nejdříve do vstupního tématu pošleme zprávu s korektním formátem:

$ kafkacat -P -b localhost:9092 -t input -K: "x":{"x":1, "y":2}

Následně aplikaci spustíme a budeme sledovat její činnost:

$ lein run 2020-12-25 19:09:08.876 | WARN | main | stream-pipe-4.core | Deleting topic input 2020-12-25 19:09:09.099 | WARN | main | stream-pipe-4.core | Deleting topic output1 2020-12-25 19:09:09.115 | WARN | main | stream-pipe-4.core | Deleting topic output2 2020-12-25 19:09:09.128 | WARN | main | stream-pipe-4.core | Deleting topic output3 2020-12-25 19:09:09.140 | WARN | main | stream-pipe-4.core | Creating topic input 2020-12-25 19:09:09.174 | WARN | main | stream-pipe-4.core | Creating topic output1 2020-12-25 19:09:09.205 | WARN | main | stream-pipe-4.core | Creating topic output2 2020-12-25 19:09:09.242 | WARN | main | stream-pipe-4.core | Creating topic output3 2020-12-25 19:09:09.275 | WARN | main | stream-pipe-4.core | Starting application 2020-12-25 19:09:09.367 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config. 2020-12-25 19:09:09.367 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config. 2020-12-25 19:09:09.372 | WARN | main | stream-pipe-4.core | Starting pipe 2020-12-25 19:09:09.374 | WARN | main | stream-pipe-4.core | Pipe is up 2020-12-25 19:09:09.376 | WARN | main | stream-pipe-4.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x68de8522 org.apache.kafka.streams.KafkaStreams@68de8522] 2020-12-25 19:09:26.385 | WARN | pipe-3dc0cffa-c985-47f5-9e61-15896b739fa6-StreamThread-1 | stream-pipe-4.core | Received message with key: x and value: {:x 1, :y 2} 2020-12-25 19:09:26.515 | WARN | pipe-3dc0cffa-c985-47f5-9e61-15896b739fa6-StreamThread-1 | stream-pipe-4.core | Transformed message with key: x and value: {:result 3} 2020-12-25 19:09:26.642 | WARN | pipe-3dc0cffa-c985-47f5-9e61-15896b739fa6-StreamThread-1 | stream-pipe-4.core | Transformed message with key: x and value: {:result 3, :timestamp Fri Dec 25 19:09:26 CET 2020}

Podle zpráv z logu byla zpráva úspěšně zpracována, takže si vypíšeme obsah témat „output1“, „output2“ i „output3“.

V tématu „output1“ je uložena kopie vstupní zprávy:

$ kafkacat -C -b localhost:9092 -t output1 {"x":1,"y":2}

V tématu „output2“ je zpráva s vypočteným výsledkem:

$ kafkacat -C -b localhost:9092 -t output2 {"result":3}

A konečně v tématu „output3“ je zpráva s přidaným časovým razítkem:

$ kafkacat -C -b localhost:9092 -t output3 {"result":3,"timestamp":"Fri Dec 25 19:09:26 CET 2020"}

17. Transformace zpráv založená na uzávěru

V některých aplikacích není transformační funkce čistou funkcí ve smyslu, že se jedná o funkci bez paměti a bez vedlejších efektů. Příkladem může být funkce, která by měla do zpráv vkládat hodnotu nějakého čítače atd. I toho lze – pochopitelně – v programovacím jazyku Clojure dosáhnout, a to konkrétně s použitím uzávěrů (closure); méně idiomatické a koncepčně horší by bylo použití třídy. Ukažme si tedy, jak lze vytvořit transformační uzávěr, který se volá jako běžná funkce, ovšem bude mít vnitřní paměť a umožní nám do zpráv vložit jejich index (tedy hodnotu čítače):

(def etl-3 "Transformační funkce vytvořená ve formě uzávěru." (let [counter (atom 0)] (fn [[k v]] (do (swap! counter inc) [k (assoc v :counter @counter)]))))

Poznámka: vidíme, že čítač je realizován pomocí atomu, což je opět koncepčně v pořádku.

Změněná definice kolony s přidanou transformační funkcí etl-3:

(defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/through (:output-1 topic-config)) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-2 topic-config)) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-3 topic-config)) (j/map etl-3) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output-4 topic-config))) builder)

Následuje výpis úplného zdrojového kódu dnešního posledního demonstračního příkladu:

(ns stream-pipe-5.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp] [clojure.tools.logging :as log])) (def topic-config "Konfigurace témat - vstupního i výstupního." {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-1 {:topic-name "output1" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-2 {:topic-name "output2" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-3 {:topic-name "output3" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output-4 {:topic-name "output4" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) (def app-config "Konfigurace aplikace (ve smyslu knihovny Jackdaw)." {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0" "default.deserialization.exception.handler" "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler"}) (defn delete-topic "Pomocná funkce pro smazání vybraného tématu." [broker-config topic] (try (log/warn "Deleting topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic "Pomocná funkce pro vytvoření nového tématu." [broker-config topic] (try (log/warn "Creating topic" (:topic-name topic)) (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn etl-1 "Transformační funkce." [[k v]] [k {:result (+ (:x v) (:y v))}]) (defn etl-2 "Transformační funkce." [[k v]] [k (assoc v :timestamp (str (new java.util.Date)))]) (def etl-3 "Transformační funkce vytvořená ve formě uzávěru." (let [counter (atom 0)] (fn [[k v]] (do (swap! counter inc) [k (assoc v :counter @counter)])))) (defn build-topology "Definice celé pipeliny (kolony) - základ aplikace." [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (log/warn "Received message with key: " k " and value:" v))) (j/through (:output-1 topic-config)) (j/map etl-1) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-2 topic-config)) (j/map etl-2) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/through (:output-3 topic-config)) (j/map etl-3) (j/peek (fn [[k v]] (log/warn "Transformed message with key:" k " and value:" v))) (j/to (:output-4 topic-config))) builder) (defn start-app "Spuštění aplikace." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (log/warn "Starting pipe") (j/start app) (log/warn "Pipe is up") app)) (defn stop-app "Zastavení aplikace." [app] (log/warn "Stopping pipe") (j/close app) (log/warn "Pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na začátku pro jistotu vymažeme témata používaná pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output-1 topic-config)) (delete-topic broker-config (:output-2 topic-config)) (delete-topic broker-config (:output-3 topic-config)) ;; vytvoření nových témat akceptujících zprávy ve formátu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output-1 topic-config)) (new-topic broker-config (:output-2 topic-config)) (new-topic broker-config (:output-3 topic-config)) ;; spuštění kolony (log/warn "Starting application") (let [app (start-app app-config topic-config)] (log/warn "App created:" app))))

Příklad spustíme:

$ lein run 2020-12-27 15:39:39.101 | WARN | main | stream-pipe-5.core | Deleting topic input 2020-12-27 15:39:39.322 | WARN | main | stream-pipe-5.core | Deleting topic output1 2020-12-27 15:39:39.345 | WARN | main | stream-pipe-5.core | Deleting topic output2 2020-12-27 15:39:39.367 | WARN | main | stream-pipe-5.core | Deleting topic output3 2020-12-27 15:39:39.392 | WARN | main | stream-pipe-5.core | Creating topic input 2020-12-27 15:39:39.433 | WARN | main | stream-pipe-5.core | Creating topic output1 2020-12-27 15:39:39.464 | WARN | main | stream-pipe-5.core | Creating topic output2 2020-12-27 15:39:39.514 | WARN | main | stream-pipe-5.core | Creating topic output3 2020-12-27 15:39:39.542 | WARN | main | stream-pipe-5.core | Starting application 2020-12-27 15:39:39.644 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retries' was supplied but isn't a known config. 2020-12-27 15:39:39.644 | WARN | main | org.apache.kafka.clients.consumer.ConsumerConfig | The configuration 'admin.retry.backoff.ms' was supplied but isn't a known config. 2020-12-27 15:39:39.649 | WARN | main | stream-pipe-5.core | Starting pipe 2020-12-27 15:39:39.651 | WARN | main | stream-pipe-5.core | Pipe is up 2020-12-27 15:39:39.653 | WARN | main | stream-pipe-5.core | App created: #object[org.apache.kafka.streams.KafkaStreams 0x185bf6e0 org.apache.kafka.streams.KafkaStreams@185bf6e0]

Pošleme zprávy (se správným formátem) do vstupního tématu:

$ kafkacat -P -b localhost:9092 -t input -K: "p1":{"x":1,"y":2} "p1":{"x":1,"y":2} "p2":{"x":0,"y":9} "p3":{"y":1,"x":2}

Přesvědčíme se, že jsou všechny zprávy ze vstupu zpracovány korektně:

2020-12-27 15:39:44.513 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Received message with key: p1 and value: {:x 1, :y 2} 2020-12-27 15:39:44.639 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p1 and value: {:result 3} 2020-12-27 15:39:44.757 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p1 and value: {:result 3, :timestamp Sun Dec 27 15:39:44 CET 2020} 2020-12-27 15:39:44.872 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p1 and value: {:result 3, :timestamp Sun Dec 27 15:39:44 CET 2020, :counter 1} 2020-12-27 15:40:01.612 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Received message with key: p2 and value: {:x 0, :y 9} 2020-12-27 15:40:01.720 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p2 and value: {:result 9} 2020-12-27 15:40:01.828 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p2 and value: {:result 9, :timestamp Sun Dec 27 15:40:01 CET 2020} 2020-12-27 15:40:01.937 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p2 and value: {:result 9, :timestamp Sun Dec 27 15:40:01 CET 2020, :counter 2} 2020-12-27 15:40:15.268 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Received message with key: p3 and value: {:y 1, :x 2} 2020-12-27 15:40:15.371 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p3 and value: {:result 3} 2020-12-27 15:40:15.475 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p3 and value: {:result 3, :timestamp Sun Dec 27 15:40:15 CET 2020} 2020-12-27 15:40:15.578 | WARN | pipe-59b1dc4a-3765-4852-9b26-77f96500c106-StreamThread-1 | stream-pipe-5.core | Transformed message with key: p3 and value: {:result 3, :timestamp Sun Dec 27 15:40:15 CET 2020, :counter 3}

V posledním kroku otestujeme, že v posledním výstupním tématu „output4“ jsou zprávy obsahující i hodnotu čítače:

$ kafkacat -C -b localhost:9092 -t output4 % Reached end of topic output4 [0] at offset 0 {"result":3,"timestamp":"Sun Dec 27 15:39:44 CET 2020","counter":1} % Reached end of topic output4 [0] at offset 1 {"result":9,"timestamp":"Sun Dec 27 15:40:01 CET 2020","counter":2} % Reached end of topic output4 [0] at offset 2 {"result":3,"timestamp":"Sun Dec 27 15:40:15 CET 2020","counter":3} % Reached end of topic output4 [0] at offset 3

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Clojure byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-):

# Projekt Stručný popis Cesta 1 stream-pipe-0 funkční kostra pipeline, na které jsou postaveny i další čtyři demonstrační příklady https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-0 2 stream-pipe-1 jednoduchá transformace zpráv https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-1 3 stream-pipe-2 zalogování chyby bez pádu aplikace v případě, že zpráva není uložena v JSONu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-2 4 stream-pipe-3 složitější pipeline s několika transformačními funkcemi https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-3 5 stream-pipe-4 rozdělení streamu do více částí https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-4 6 stream-pipe-5 transformace zpráv založená na uzávěru https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/stream-pipe-5

Demonstrační příklady z předchozích dvou článků o propojení systému Apache Kafka a Clojure s využitím knihovny Jackdaw [1] [2]:

# Projekt Stručný popis Cesta 1 kafka-repl prázdný projekt připravený pro interaktivní práci v REPLu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/kafka-repl 2 topic-constructor vytvoření nového tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor 3 topic-constructor-logger vytvoření nového tématu; konfigurace logování https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor-logger 4 topic-constructor-10-partitions vytvoření nového tématu s deseti oddíly https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-constructor-10-partitions 5 topic-destructor vymazání tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/topic-destructor 6 get-broker-config zobrazení konfigurace brokera https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/get-broker-config 7 describe-cluster zobrazení aktuální konfigurace clusteru https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-cluster 8 describe-all-topics zobrazení všech dostupných témat https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-all-topics 9 describe-topics zobrazení informací o vybraných tématech https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-topics 10 describe-topics-configs zobrazení podrobnějších informací o vybraných tématech https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/describe-topics-configs 11 produce-messages-1 vytvoření jediné zprávy https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-1 12 produce-messages-2 vytvoření 100 zpráv https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-2 13 produce-messages-3 vytvoření 100 zpráv se specifikací klíče https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/produce-messages-3 14 consume-messages-1 konzument zpráv z vybraného tématu https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/consume-messages-1 15 consume-messages-2 konzument zpráv z vybraného tématu, alternativní práce s offsetem na začátku https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/consume-messages-2 16 edn-1-serializer producent zpráv ve formátu EDN https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/edn-1-serializer 17 edn-2-serializer producent zpráv ve formátu EDN bez konců řádků https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/edn-2-serializer 18 json-serializer producent zpráv ve formátu JSON https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/json-serializer 19 custom-pipe jednoduchá kolona (pipe) https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/custom-pipe 20 custom-pipe-processing-function kolona využívající libovolnou transformační funkci https://github.com/tisnik/message-queues-examples/blob/master/kafka/clo­jure/custom-pipe-processing-function

20. Odkazy na Internetu