Obsah
1. Realizace transakcí v systému Redis
2. Instalace Redisu a použití klienta redis-cli
3. Transakce a atomické operace v Redisu
4. Jak jsou transakce realizovány?
6. Od samostatně probíhajících operací k transakcím
7. Přepis hodnot uložených pod jediným klíčem
9. Úprava producenta tak, aby po zápisu neočekávané hodnoty ihned zapsal hodnotu korektní
10. Transakce realizovaná formou pipeline
12. Lze vytvořit pipeline pro operaci get-and-set?
13. Operace get následovaná transakcí
14. Realizace optimistického zamykání
15. Pipeline použitá jako správce kontextu
16. Chování klienta ve chvíli, kdy běží další klient zapisující do prvku se stejným klíčem
17. Konkrétní problém – atomické připojení zpráv do seznamu uloženého v Redisu
18. Repositář s demonstračními příklady
19. Odkazy na předchozí části seriálu
1. Realizace transakcí v systému Redis
Databáze Redis (REmote DIctionary Server) patří do rozsáhlé a dnes již značně nehomogenní skupiny nerelačních databází. Konkrétně se v případě Redisu v první řadě jedná o databázi typu key-value, což znamená, že hodnoty ukládané do databáze je možné jednoznačně identifikovat (najít, smazat, změnit jejich hodnotu atd.) na základě klíče, který je reprezentován řetězcem. Podobných databází samozřejmě existuje celá řada; za zmínku stojí především Berkeley DB, dále pak MemcacheDB, Dynamo či InfinityDB. Dnes popisovaná databáze Redis může být pro vývojáře zajímavá a užitečná zejména z toho důvodu, že podporuje i další užitečné datové typy (tedy hodnotami nemusí být jen řetězce, ale například i mapy, seznamy, proudy atd). Dále se jedná o velmi flexibilní systém, který je možné v případě potřeby škálovat nejenom „nahoru“ (distribuované systémy se shardingem a/nebo replikací), ale i „dolů“ (jednoduše nastavitelné datové odkladiště pro jednouživatelskou aplikaci).
Na jedné straně je možné Redis provozovat na jediném stroji s tím, že data budou uložena pouze v operační paměti, na straně druhé je však možné relativně snadno nakonfigurovat Redis takovým způsobem, že data budou rozložena mezi více strojů (sharding), popř. se použije architektura typu master-slave, kdy bude Redis data replikovat na pozadí mezi uzlem typu master a uzly typu slave (se všemi z toho vyplývajícími důsledky).
Díky této flexibilitě je možné systém Redis v praxi využít mnoha různými způsoby. Používá se například ve formě vyrovnávací paměti (cache), přičemž je dokonce možné specifikovat dobu životnosti dat, takže mazání starších položek nemusí být řešeno přímo v aplikaci (životnost údajů lze snadno obnovit, což se hodí například při ukládání informací o session/sezení u webových aplikací). Ovšem Redis můžeme použít i jako plnohodnotnou key-value databázi s tím, že data budou na pozadí ukládána na nevolatilní paměť (typicky na pevný disk či dnes spíše na SSD), což znamená, že údaje přežijí restart Redisu, pád počítače či systému atd. K dispozici mají vývojáři dvě základní strategie ukládání dat do nevolatilní paměti, které lze dokonce použít současně.
V případě potřeby je možné realizovat i systém s transakcemi, popř. využít některé atomické operace, které Redis podporuje už ve své základní sadě příkazů. A právě touto problematikou se budeme zabývat v dnešním článku. Zaměříme se sice především na řešení naprogramované v Pythonu, ovšem samotný princip je zcela univerzální a je ho možné realizovat i v dalších programovacích jazycích.
2. Instalace Redisu a použití klienta redis-cli
Instalace Redisu do Linuxu je snadná, protože ve většině Linuxových distribucí existuje příslušný balíček s touto službou. Tímto tématem jsme se ostatně již zabývali v páté kapitole článku Nástroj huey: užitečná knihovna pro práci s frontami úloh v Pythonu.
Po instalaci a úpravě konfiguračního souboru redis.conf celou službu Redis spustíme příkazem:
$ redis-server redis.conf
S výsledkem:
784933:C 16 Mar 2024 10:22:36.843 * oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo 784933:C 16 Mar 2024 10:22:36.843 * Redis version=7.2.4, bits=64, commit=00000000, modified=0, pid=784933, just started 784933:C 16 Mar 2024 10:22:36.843 * Configuration loaded 784933:M 16 Mar 2024 10:22:36.843 * Increased maximum number of open files to 10032 (it was originally set to 1024). 784933:M 16 Mar 2024 10:22:36.843 * monotonic clock: POSIX clock_gettime 784933:M 16 Mar 2024 10:22:36.844 # Failed to write PID file: Permission denied _._ _.-``__ ''-._ _.-`` `. `_. ''-._ Redis 7.2.4 (00000000/0) 64 bit .-`` .-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in standalone mode |`-._`-...-` __...-.``-._|'` _.-'| Port: 6379 | `-._ `._ / _.-' | PID: 784933 `-._ `-._ `-./ _.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | https://redis.io `-._ `-._`-.__.-'_.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | `-._ `-._`-.__.-'_.-' _.-' `-._ `-.__.-' _.-' `-._ _.-' `-.__.-' 784933:M 16 Mar 2024 10:22:36.844 * Server initialized 784933:M 16 Mar 2024 10:22:36.844 * DB loaded from append only file: 0.000 seconds 784933:M 16 Mar 2024 10:22:36.844 * Opening AOF incr file appendonly.aof.1.incr.aof on server start 784933:M 16 Mar 2024 10:22:36.844 * Ready to accept connections tcp
V jiném terminálu si můžeme pustit klienta ovládaného z příkazového řádku:
$ redis-cli
Měla by se zobrazit takzvaná výzva (prompt) očekávající uživatelem zapisované příkazy:
127.0.0.1:6379>
3. Transakce a atomické operace v Redisu
Jak jsme si napsali v závěru úvodní kapitoly, umožňuje systém Redis do určité míry i realizaci transakcí (ovšem s relativně velkými omezeními, jak ostatně uvidíme dále – některé operace musíme provádět ručně). U příkazů, které jsou součástí transakce, jsou dodrženy (zajištěny) následující podmínky:
- Všechny příkazy uvedené v transakci jsou provedeny v takovém pořadí, v jakém jsou specifikovány uživatelem nebo programátorem.
- Mezi tyto příkazy se nikdy nevmísí příkazy vyžadované jiným klientem.
- Transakce je atomická: buď se provede celá (všechny příkazy), nebo se neprovede vůbec. O provedení či neprovedení transakce rozhoduje poslední příkaz (exec nebo discard).
Pro práci s transakcemi jsou určeny příkazy multi a již výše zmíněné příkazy exec a discard. Podívejme se nyní na velmi jednoduchý „školní“ příklad, v němž se převádí nějaká částka mezi dvěma účty s částkami uloženými pod klíči „ucet1“ a „ucet2“:
127.0.0.1:6379> set ucet1 1000.0 OK 127.0.0.1:6379> set ucet2 0.0 OK
Tuto peněžní transakci musíme provést v databázové transakci, tj. zadáme ji mezi příkazy multi a exec:
127.0.0.1:6379> multi OK 127.0.0.1:6379> incrbyfloat ucet1 -150.50 QUEUED 127.0.0.1:6379> incrbyfloat ucet2 +150.50 QUEUED 127.0.0.1:6379> exec 1) "849.5" 2) "150.5"
Výsledné hodnoty na účtu po provedení transakce získáme následujícím způsobem:
127.0.0.1:6379> get ucet1 "849.5" 127.0.0.1:6379> get ucet2 "150.5"
Získané hodnoty odpovídají hodnotám vypsaným po provedení příkazu exec, kterým se transakce potvrdí a ukončí.
127.0.0.1:6379> multi OK 127.0.0.1:6379(TX)> multi (error) ERR MULTI calls can not be nested
Po příkazu multi se změnila výzva – obsahuje řetězec „(TX)“.
Systém Redis navíc neumožňuje ani rollback v případě, že při zpracování transakce dojde k chybě nějakého příkazu.
4. Jak jsou transakce realizovány?
Samotný systém Redis sice nepodporuje skutečné transakce (s plnohodnotným rollbackem), ovšem alespoň zajišťuje možnost spuštění několika příkazů v jediném nepřerušitelném bloku. Tyto příkazy jsou nejdříve zapamatovány a posléze spuštěny (nebo jsou jako celek zrušeny). Pro tento účel slouží trojice základních příkazů multi, exec a discard, o nichž jsme se zmínili. Ovšem důležitý je především fakt, že se skupina příkazů skutečně provede jako celek a není přerušena jinými příkazy.
Jak se však tento koncept odlišuje od běžných databází? Například si to můžeme ukázat na sekvenci příkazů z předchozí kapitoly:
multi incrbyfloat ucet1 -150.50 incrbyfloat ucet2 +150.50 exec
Příkaz multi ve skutečnosti nezahájí transakci (tedy nezamkne databázi nebo její část). Pouze se změní stav klienta, který si jednotlivé příkazy bude pamatovat a příkazem exec je pošle do Redisu jako jeden blok (takže exec je vlastně kombinací začátek transakce + commit). Neboli jinými slovy: mezi multi a exec bude Redis stále obsluhovat další klienty.
5. Optimistické zamykání
Při pokusu o práci s transakcemi v Redisu se setkáme s pojmem optimistické zamykání (optimistic lock). Jedná se o postup, kdy se pokusíme realizovat transakci a přitom budeme sledovat, zda jiný klient nepřistupuje k hodnotám, které v transakci měníme (zahájení sledování řeší příkaz watch, ukončení sledování pak příkaz unwatch). V případě, že se v průběhu zápisu transakce nějaká hodnota změní, provedeme transakci znovu. Jak uvidíme v dalším textu, je realizace tohoto algoritmu celkem jednoduchá, ale má svoje problémy – zejména se může stát, že s velkým množstvím zápisů nedojde k dokončení transakce nikdy nebo až po delším čase. Také musí být programové řešení zapsáno korektně a nesmí se zapomenout na dokončení transakce a taktéž se nesmí zapomenout na ukončení sledování hodnoty či hodnot.
6. Od samostatně probíhajících operací k transakcím
V navazujících kapitolách si ukážeme postupnou změnu klienta, který do Redisu zapisuje nějaké údaje pod stále stejným klíčem. Původní verze klienta nebude používat transakce. K těm se dostaneme postupně a v závěrečné verzi bude klient realizovat i optimistické zamykání.
Klient bude realizován v Pythonu a bude používat knihovnu Redis-Py, kterou je možné nainstalovat přes pip.
7. Přepis hodnot uložených pod jediným klíčem
Podívejme se nejprve na velmi jednoduchý skript, který neustále do Redisu zapisuje hodnoty uložené pod stejným klíčem. Nejprve se zapíše hodnota (řetězec) „^“ a po určité době řetězec „^$“. Tyto operace se neustále opakují, pouze s malou časovou prodlevou. V Pythonu by bylo možné takového klienta naprogramovat následujícím způsobem:
import time import redis redis_client = redis.StrictRedis( host="localhost", port="6379", decode_responses=True, ) key = "foo" sleep_time = 0.1 while True: v = "^" redis_client.set(key, v) time.sleep(sleep_time) v = "^$" redis_client.set(key, v) time.sleep(sleep_time)
Dále vytvoříme druhého klienta, který naopak bude hodnoty ze stejného klíče číst a bude zjišťovat, o jakou hodnotu se jedná. Pokud se nalezne hodnota odlišná od řetězce „^$“, bude to považováno za chybu, zvýší se počitadlo chyb a vypočte se průměrná frekvence chyb za sekundu:
import redis import time redis_client = redis.StrictRedis( host="localhost", port="6379", decode_responses=True, ) key = "foo" expected_value = "^$" sleep_time = 0.01 start_time = time.perf_counter() errors = 0 while True: value = redis_client.get(key) if value != expected_value: errors += 1 now = time.perf_counter() duration = now - start_time frequency = errors / duration print(f"{value:3} {errors:3} {duration:6.3} {frequency:4.3}") time.sleep(sleep_time)
8. Otestování chování Redisu
Nyní v jednom terminálu pustíme „producenta“ hodnot ukládaných pod klíčem „foo“:
$ python3 producer.py
A v terminálu druhém spustíme konzumenta, který kontroluje, zda je pod klíčem „foo“ uložena hodnota „^$“ nebo hodnota jiná:
$ python3 consumer1.py
Podle očekávání se začne vypisovat značné množství chyb, protože přibližně v 50% případů se přečte odlišná hodnota (viz zdrojový kód producenta):
chyb čas(s) chyb/s ^ 1 0.105 9.52 ^ 2 0.115 17.3 ^ 3 0.126 23.9 ^ 4 0.136 29.4 ^ 5 0.146 34.2 ^ 6 0.157 38.3 ^ 7 0.167 41.9 ^ 8 0.177 45.1 ^ 9 0.188 48.0 ^ 10 0.198 50.6 ^ 11 0.302 36.5 ^ 12 0.313 38.4 ^ 13 0.324 40.1 ^ 14 0.335 41.8 ^ 15 0.346 43.3 ^ 16 0.358 44.7 ^ 17 0.369 46.1 ^ 18 0.38 47.4 ^ 19 0.391 48.6 ^ 20 0.512 39.0 ^ 21 0.523 40.1 ^ 22 0.535 41.2 ^ 23 0.546 42.1
9. Úprava producenta tak, aby po zápisu neočekávané hodnoty ihned zapsal hodnotu korektní
Producenta ve druhém kroku upravíme do takové podoby, že ihned po zápisu hodnoty „^“ zapíše pod stejným klíčem správnou (resp. přesněji řečeno očekávanou) hodnotu „^$“. Tím pochopitelně nedosáhneme atomicity operací, ovšem počet neočekávaných hodnot na straně konzumenta by měl být nižší:
import time import redis redis_client = redis.StrictRedis( host="localhost", port="6379", decode_responses=True, ) key = "foo" sleep_time = 0.1 while True: v = "^" redis_client.set(key, v) v = "^$" redis_client.set(key, v) time.sleep(sleep_time)
Nyní bude po kontrole konzumentem počet chyb (neočekávaných hodnot) o dva řády nižší:
$ python3 consumer1.py chyb čas(s) chyb/s ^ 1 15.3 0.0655 ^ 2 18.9 0.106 ^ 3 20.6 0.145 ^ 4 23.2 0.173 ^ 5 24.2 0.207 ^ 6 24.7 0.243 ^ 7 26.9 0.26 ^ 8 29.3 0.273 ^ 9 31.9 0.282 ^ 10 32.4 0.309 ^ 11 33.9 0.324 ^ 12 35.1 0.342 ^ 13 36.4 0.358 ^ 14 37.9 0.37 ^ 15 38.4 0.391 ^ 16 38.9 0.411
10. Transakce realizovaná formou pipeline
Oba zápisy lze ovšem provést atomicky, tedy takovým způsobem, aby žádný jiný klient nemohl vidět stav uložené hodnoty mezi oběma operacemi set. K tomuto účelu se používají transakce, které jsou v klientovi pro Python realizovány formou takzvané pipeline. V tomto případě se nejprve vytvoří pipeline, příkazy se „zapamatují“ a následně se provedou v jediném bloku, a to tak, že další klienti uvidí až celkový výsledek. Transakce se zahajuje metodou multi a ukončuje metodou execute (nebo discarc, pokud naopak potřebujeme všechny příkazy v pipeline zahodit):
import time import redis redis_client = redis.StrictRedis( host="localhost", port="6379", decode_responses=True, ) key = "foo" sleep_time = 0.1 while True: pipeline = redis_client.pipeline(transaction=True) pipeline.multi() v = "^" pipeline.set(key, v) time.sleep(sleep_time) v = "^$" pipeline.set(key, v) time.sleep(sleep_time) pipeline.execute()
Spuštěním testu (producent + konzument) je možné zjistit, že nyní již konzument nikdy neuvidí mezistav, tedy hodnotu „^“.
11. Operace typu get-and-set
Nyní se předchozí demonstrační příklad pokusíme upravit, a to tak, že naimplementujeme operaci typu get-and-set. Jedná se o operaci, která nejdříve přečte hodnotu z Redisu, nějakým způsobem ji modifikuje a následně ji uloží zpět (stále pod stejným klíčem). Naimplementujeme si například obdobu operace append, kdy budeme k uložené hodnotě přidávat znaky „^“ a „$“:
import time import redis redis_client = redis.StrictRedis( host="localhost", port="6379", decode_responses=True, ) key = "bar" sleep_time = 0.1 while True: v = redis_client.get(key) or "" v += "^" redis_client.set(key, v) # time.sleep(1.4) v += "$" redis_client.set(key, v) time.sleep(sleep_time) print(v)
Výsledek budeme kontrolovat upraveným konzumentem, který bude sledovat pouze poslední dva znaky zprávy (jestli se bude jednat o sekvenci „^$“ či nikoli):
import redis import time redis_client = redis.StrictRedis( host="localhost", port="6379", decode_responses=True, ) key = "bar" expected_value = "^$" sleep_time = 0.01 start_time = time.perf_counter() errors = 0 while True: value = redis_client.get(key) if value is None: continue last_chars = value[-2:] if last_chars != expected_value: errors += 1 now = time.perf_counter() duration = now - start_time frequency = errors / duration print(f"{last_chars:3} {errors:3} {duration:6.3} {frequency:4.3}") time.sleep(sleep_time)
Konzument, opět podle očekávání, nalezne „neočekávané“ hodnoty, protože přidání znaků do hodnoty uložené v Redisu je provedeno po jednotlivých znacích, takže je v některých okamžicích viditelný i mezistav mezi oběma příkazy set:
^ 1 3.21 0.311 $^ 2 3.62 0.552 $^ 3 3.83 0.784 $^ 4 4.03 0.993 $^ 5 4.23 1.18 $^ 6 5.76 1.04 $^ 7 9.84 0.712 $^ 8 10.8 0.744 $^ 9 13.0 0.693 $^ 10 14.1 0.708
12. Lze vytvořit pipeline pro operaci get-and-set?
Pokusme se nyní vytvořit pipeline (a tím pádem i možnost spuštění v transakci) pro operaci get-and-set. Budeme postupovat naprosto stejným způsobem, jako v desáté kapitole, tj. vytvoříme pipeline a operace get a set budeme volat jako metody pipeline:
import time import redis redis_client = redis.StrictRedis( host="localhost", port="6379", decode_responses=True, ) key = "bar" sleep_time = 0.1 while True: pipeline = redis_client.pipeline(transaction=True) pipeline.multi() v = pipeline.get(key) or "" v += "^" pipeline.set(key, v) time.sleep(sleep_time) v += "$" pipeline.set(key, v) pipeline.execute()
Tento skript ovšem nepůjde spustit, protože se detekuje chyba na řádku, v němž se pokoušíme k výsledku operace get přičíst další část řetězce:
$ python3 producer5.py Traceback (most recent call last): File "producer5.py", line 18, in v += "^" TypeError: unsupported operand type(s) for +=: 'Pipeline' and 'str'
Proč tomu tak je? Připomeňme si, že operace zapsané do pipeline jsou nejprve shromážděny a teprve poté poslány (jako jeden nedělitelný blok) do Redisu. To znamená, že výsledek operace get nemůžeme použít jako parametr do navazující funkce set.
13. Operace get následovaná transakcí
Předchozí skript můžeme upravit takovým způsobem, že bude funkční v případě, že se použije jen jediný klient zapisující data. Toto řešení je postaveno na „běžné“ operaci get, která je následována transakcí, v níž již můžeme výsledek operace get bez problémů použít. Jako celek se tedy sice nebude jednat o atomickou operaci, na druhou stranu minimálně hodnota viditelná konzumenty bude vždy končit správnou dvojicí znaků „^$“ (neuvidíme tedy nechtěný „mezistav“):
import time import redis redis_client = redis.StrictRedis( host="localhost", port="6379", decode_responses=True, ) key = "bar" sleep_time = 0.1 while True: pipeline = redis_client.pipeline(transaction=True) error_count = 0 v = redis_client.get(key) or "" v += "^" pipeline.set(key, v) time.sleep(sleep_time) v += "$" pipeline.set(key, v) pipeline.execute() time.sleep(sleep_time) print(v)
14. Realizace optimistického zamykání
V další verzi původního klienta ještě vylepšíme, a to konkrétně o algoritmus optimistického zamykání. Nejprve se vytvoří objekt typu pipeline (může se používat i jako správce kontextu, to uvidíme dále):
pipeline = redis_client.pipeline(transaction=True)
Dále začneme sledovat ostatní klienty, zda přistupují k hodnotě uložené pod daným klíčem:
pipeline.watch(key)
Přečteme hodnotu uloženou pod klíčem (nikoli v pipeline, to jíž víme, že není možné):
v = redis_client.get(key) or "" v += "^"
Samotná modifikace hodnoty je však již v pipeline realizována:
pipeline.multi() pipeline.set(key, v) time.sleep(sleep_time) v += "$" pipeline.set(key, v) pipeline.execute()
Nyní lze pipeline zavřít, popř. navíc přestat sledovat hodnotu uloženou pod daným klíčem:
pipeline.unwatch()
V případě zápisu jiným klientem do klíče se vyvolá výjimka:
except redis.WatchError as err: error_count += 1 print(f"Watch error #{error_count}:", err)
A takto vypadá úplný zdrojový kód již sedmé verze klienta:
import time import redis redis_client = redis.StrictRedis( host="localhost", port="6379", decode_responses=True, ) key = "bar" sleep_time = 0.1 error_count = 0 while True: pipeline = redis_client.pipeline(transaction=True) try: print("transaction started...", end="") pipeline.watch(key) v = redis_client.get(key) or "" v += "^" pipeline.multi() pipeline.set(key, v) time.sleep(sleep_time) v += "$" pipeline.set(key, v) pipeline.execute() pipeline.unwatch() print("...commited") time.sleep(sleep_time) except redis.WatchError as err: error_count += 1 print(f"Watch error #{error_count}:", err) print(v)
15. Pipeline použitá jako správce kontextu
V předchozí kapitole jsme si řekli, že se objekt typu pipeline může použít i jako správce kontextu (context manager), což je (podle mého názoru) čitelnější řešení. Namísto zápisu:
pipeline = redis_client.pipeline(transaction=True)
tedy použijeme zápis:
with redis_client.pipeline(transaction=True) as pipeline: ... ... ...
To vede k takto upravenému zdrojovému kódu klienta:
import time import redis redis_client = redis.StrictRedis( host="localhost", port="6379", decode_responses=True, ) key = "bar" sleep_time = 0.1 error_count = 0 while True: with redis_client.pipeline(transaction=True) as pipeline: try: print("transaction started...", end="") pipeline.watch(key) v = redis_client.get(key) or "" v += "^" pipeline.multi() pipeline.set(key, v) time.sleep(sleep_time) v += "$" pipeline.set(key, v) pipeline.execute() pipeline.unwatch() print("...commited") time.sleep(sleep_time) except redis.WatchError as err: error_count += 1 print(f"Watch error #{error_count}:", err) print(len(v))
16. Chování klienta ve chvíli, kdy běží další klient zapisující do prvku se stejným klíčem
Klient popsaný v předchozí kapitole by se měl chovat korektně i v případě, že se během jeho transakce provede přístup k hodnotě pod stejným klíčem. Můžeme si to ověřit, když souběžně v jiném terminálu spustíme následujícího klienta. Ten se snaží hodnotu zcela přepsat, bez ohledu na transakce či další klienty:
import time import redis redis_client = redis.StrictRedis( host="localhost", port="6379", decode_responses=True, ) key = "bar" sleep_time = 0.18 while True: redis_client.set(key, "that's me") time.sleep(sleep_time)
Po současném spuštění obou producentů (i testovacího konzumenta) je snadné zjistit, že se skutečně detekují zápisy hodnoty pod stejným klíčem a že se v tomto případě transakce (jako celek) neprovede, protože se nezavolá metoda exec (namísto toho se přejde do obsluhy výjimky):
transaction started...Watch error #8: Watched variable changed. transaction started......commited transaction started......commited transaction started......commited transaction started......commited transaction started...Watch error #9: Watched variable changed. transaction started......commited transaction started......commited transaction started......commited transaction started......commited transaction started...Watch error #10: Watched variable changed. transaction started......commited transaction started......commited transaction started......commited transaction started......commited transaction started...Watch error #11: Watched variable changed. transaction started......commited transaction started......commited transaction started......commited
17. Konkrétní problém – atomické připojení zpráv do seznamu uloženého v Redisu
Pro zajímavost se podívejme na konkrétní problém řešený s využitím Redisu. Jedná se o cache určenou pro úschovu konverzace (seznamu zpráv), přičemž kromě klasické operace get potřebujeme realizovat i operaci typu insert_or_append (klasický příkaz append nebude plně funkční). A právě tuto operaci je možné naprogramovat s využitím optimistického zamykání v kombinaci s pipeline:
"""Cache that uses Redis to store cached values.""" import pickle import threading from typing import Any, Optional import logging import redis from redis.retry import Retry class BaseMessage: def __init__(self, content): self.content = content def __str__(self): return self.content class RedisCache: """Cache that uses Redis to store cached values.""" def __init__(self): self.initialize_redis() def initialize_redis(self) -> None: """Initialize the Redis client and logging. This method sets up the Redis client with custom configuration parameters. """ # initialize Redis client self.redis_client = redis.StrictRedis( host="localhost", port=6379, decode_responses=False, # we store serialized messages as bytes, not strings ) # Set custom configuration parameters def get(self, user_id: str, conversation_id: str): """Get the value associated with the given key. Args: user_id: User identification. conversation_id: Conversation ID unique for given user. Returns: The value associated with the key, or None if not found. """ key = user_id + ":" + conversation_id value = self.redis_client.get(key) if value is None: return None return pickle.loads(value, errors="strict") # noqa S301 def insert_or_append(self, user_id: str, conversation_id: str, value) -> None: """Set the value associated with the given key. Args: user_id: User identification. conversation_id: Conversation ID unique for given user. value: The value to set. Raises: OutOfMemoryError: If item is evicted when Redis allocated memory is higher than maxmemory. """ key = user_id + ":" + conversation_id while True: with self.redis_client.pipeline(transaction=True) as pipeline: try: logging.debug("Transaction started") pipeline.watch(key) old_value = self.get(user_id, conversation_id) pipeline.multi() if old_value: old_value.extend(value) pipeline.set( key, pickle.dumps(old_value, protocol=pickle.HIGHEST_PROTOCOL), ) else: pipeline.set( key, pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL) ) pipeline.execute() pipeline.unwatch() logging.debug("Transaction finished") break except redis.WatchError as err: logging.info("Watch error", err)
A takto vypadá test, který ověřuje atomičnost operace insert_or_append i ve chvíli, kdy běží dva klienti připojující zprávy do historie:
import multiprocessing import time import redis PRODUCER_CYCLES = 100 PRODUCER_SLEEP_TIME = 0.1 USER_ID = "1234" CONVERSATION_ID = "5678" from redis_cache import RedisCache from redis_cache import BaseMessage def producer_1(cycles, user_id, conversation_id, sleep_time): redis_cache = RedisCache() for i in range(cycles): message = [BaseMessage(f"message #{i}")] redis_cache.insert_or_append(user_id, conversation_id, message) time.sleep(sleep_time) def producer_2(cycles, user_id, conversation_id, sleep_time): redis_cache = RedisCache() for i in range(cycles): message = [BaseMessage(f"second producer")] redis_cache.insert_or_append(user_id, conversation_id, message) time.sleep(sleep_time) def test_atomic_insert_or_append(): redis_cache = RedisCache() # make sure the conversation is cleared before testing redis_cache.redis_client.delete(USER_ID + ":" + CONVERSATION_ID) queue = multiprocessing.Queue() # construct both producers producer_1_process = multiprocessing.Process( target=producer_1, args=(PRODUCER_CYCLES, USER_ID, CONVERSATION_ID, PRODUCER_SLEEP_TIME), ) producer_2_process = multiprocessing.Process( target=producer_2, args=(PRODUCER_CYCLES, USER_ID, CONVERSATION_ID, PRODUCER_SLEEP_TIME), ) start_time = time.perf_counter() # start both producers producer_1_process.start() producer_2_process.start() # wait for producers print("Waiting for producers to finish") producer_1_process.join() producer_2_process.join() print("Finished") # compute duration end_time = time.perf_counter() duration = end_time - start_time print(f"Duration: {duration:4.3} seconds") # read messages from cache and perform elementary checks messages = redis_cache.get(USER_ID, CONVERSATION_ID) assert len(messages) == PRODUCER_CYCLES * 2 # PRODUCER_CYCLES messages needs to be in cache in proper order index = 0 for message in messages: if message.content == "second producer": continue expected = f"message #{index}" assert message.content == expected, f"Wrong message content {message.content}" index += 1 if __name__ == "__main__": print("Running test") test_atomic_insert_or_append()
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes použitých demonstračních příkladů (resp. spíše pomocných skriptů) určených pro programovací jazyk Python 3 a knihovnu redis-py byly uloženy do Git repositáře dostupného na adrese https://github.com/tisnik/most-popular-python-libs:
19. Odkazy na předchozí části seriálu
V této kapitole jsou uvedeny odkazy na všechny části seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií (a to včetně Redisu – i když dnes jsme Redis používali spíše jako key-value databázi):
- Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Použití databáze Redis v aplikacích naprogramovaných v Go
https://www.root.cz/clanky/pouziti-databaze-redis-v-aplikacich-naprogramovanych-v-go/ - Použití databáze Redis v aplikacích naprogramovaných v Go (2)
https://www.root.cz/clanky/pouziti-databaze-redis-v-aplikacich-naprogramovanych-v-go-2/ - Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ - Celery: systém implementující asynchronní fronty úloh pro Python
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/ - Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/ - RabbitMQ: jedna z nejúspěšnějších implementací brokera
https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/ - Pokročilejší operace nabízené systémem RabbitMQ
https://www.root.cz/clanky/pokrocilejsi-operace-nabizene-systemem-rabbitmq/ - ØMQ: knihovna pro asynchronní předávání zpráv
https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/ - Další možnosti poskytované knihovnou ØMQ
https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/ - Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/ - Apache ActiveMQ – další systém implementující message brokera
https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/ - Použití Apache ActiveMQ s protokolem STOMP
https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/ - Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-amqp-jazyk-go-a-message-brokeri/ - Komunikace s message brokery z programovacího jazyka Go
https://www.root.cz/clanky/komunikace-s-message-brokery-z-programovaciho-jazyka-go/ - Použití message brokeru NATS
https://www.root.cz/clanky/pouziti-message-brokeru-nats/ - NATS Streaming Server
https://www.root.cz/clanky/nats-streaming-server/ - Implementace různých komunikačních strategií s využitím knihovny nanomsg
https://www.root.cz/clanky/implementace-ruznych-komunikacnich-strategii-s-vyuzitim-knihovny-nanomsg/ - Dokončení popisu komunikačních strategií poskytovaných knihovnou nanomsg
https://www.root.cz/clanky/dokonceni-popisu-komunikacnich-strategii-poskytovanych-knihovnou-nanomsg/ - Komunikace s využitím knihovny nanomsg a programovacího jazyka Python
https://www.root.cz/clanky/komunikace-s-vyuzitim-knihovny-nanomsg-a-programovaciho-jazyka-python/ - Dramatiq: knihovna pro práci s frontami úloh v Pythonu
https://www.root.cz/clanky/dramatiq-knihovna-pro-praci-s-frontami-uloh-v-pythonu/ - Durable Queue aneb implementace front zpráv bez použití klasického message brokera
https://www.root.cz/clanky/durable-queue-aneb-implementace-front-zprav-bez-pouziti-klasickeho-message-brokera/ - Nsq – systém pro doručování zpráv bez centrálního message brokera
https://www.root.cz/clanky/nsq-system-pro-dorucovani-zprav-bez-centralniho-message-brokera/ - NSQ – systém pro doručování zpráv bez centrálního message brokera (dokončení)
https://www.root.cz/clanky/nsq-system-pro-dorucovani-zprav-bez-centralniho-message-brokera-dokonceni/ - Fronty zpráv podle Systemu V
https://www.root.cz/clanky/fronty-zprav-podle-systemu-v/ - Implementace front zpráv podle normy POSIX
https://www.root.cz/clanky/implementace-front-zprav-podle-normy-posix/ - Apache Kafka: distribuovaná streamovací platforma
https://www.root.cz/clanky/apache-kafka-distribuovana-streamovaci-platforma/ - Nástroj huey: užitečná knihovna pro práci s frontami úloh v Pythonu
https://www.root.cz/clanky/nastroj-huey-uzitecna-knihovna-pro-praci-s-frontami-uloh-v-pythonu/ - Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw/ - Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw (2. část)
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw-2-cast/ - Pokročilý streaming založený na projektu Apache Kafka, jazyku Clojure a knihovně Jackdaw (streamy a kolony)
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-projektu-apache-kafka-jazyku-clojure-a-knihovne-jackdaw-streamy-a-kolony/ - Sledování činnosti systému Apache Kafka přes JMX i metriky Promethea
https://www.root.cz/clanky/sledovani-cinnosti-systemu-apache-kafka-pres-jmx-i-metriky-promethea/ - Sledování činnosti systému Apache Kafka přes JMX i metriky Promethea (dokončení)
https://www.root.cz/clanky/sledovani-cinnosti-systemu-apache-kafka-pres-jmx-i-metriky-promethea-dokonceni/ - Využití Redisu z jazyka Clojure pomocí knihovny Carmine
https://www.root.cz/clanky/vyuziti-redisu-z-jazyka-clojure-pomoci-knihovny-carmine/ - Využití Redisu z jazyka Clojure pomocí knihovny Carmine (dokončení)
https://www.root.cz/clanky/vyuziti-redisu-z-jazyka-clojure-pomoci-knihovny-carmine-dokonceni/ - JetStream: nová technologie brokeru NATS konkurující Kafce
https://www.root.cz/clanky/jetstream-nova-technologie-brokeru-nats-konkurujici-kafce/ - Kafka Connect: tvorba producentů a konzumentů bez zdrojového kódu
https://www.root.cz/clanky/kafka-connect-tvorba-producentu-a-konzumentu-bez-zdrojoveho-kodu/ - Kafka Connect: definice a kontrola schématu zpráv
https://www.root.cz/clanky/kafka-connect-definice-a-kontrola-schematu-zprav/ - Témata, oddíly a replikace v systému Apache Kafka
https://www.root.cz/clanky/temata-oddily-a-replikace-v-systemu-apache-kafka/
20. Odkazy na Internetu
- Transactions
https://redis.io/docs/interact/transactions/ - MULTI
https://redis.io/commands/multi/ - EXEC
https://redis.io/commands/exec/ - DISCARD
https://redis.io/commands/discard/ - WATCH
https://redis.io/commands/watch/ - UNWATCH
https://redis.io/commands/unwatch/ - You Don’t Need Transaction Rollbacks in Redis
https://redis.com/blog/you-dont-need-transaction-rollbacks-in-redis/ - Proudy (streams) podporované systémem Redis
https://www.root.cz/clanky/proudy-streams-podporovane-systemem-redis/ - Proudy (streams) podporované systémem Redis (dokončení)
https://www.root.cz/clanky/proudy-streams-podporovane-systemem-redis-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in Linux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250