Hlavní navigace

Dramatiq: knihovna pro práci s frontami úloh v Pythonu

Pavel Tišnovský

V dnešní části seriálu o message brokerech si popíšeme projekt nazvaný Dramatiq. Podobně jako v případě projektů RQ a Celery se jedná o systém pro správu front a úloh určený pro použití v aplikacích naprogramovaných v jazyku Python.

Doba čtení: 44 minut

Sdílet

11. Čtvrtý příklad – omezení počtu znovuposlaných zpráv do workera

12. Pátý příklad – nastavení minimálního a maximálního časového intervalu mezi znovuposláním zprávy

13. Konfigurace aplikace v případě použití většího množství workerů a front

14. Šestý demonstrační příklad – tři na sobě nezávislí workeři

15. Úlohy spouštěné ve skupině

16. Uložení výsledků práce workerů

17. Kolona (pipeline) workerů

18. Nástroj dramatiq-dashboard pro sledování front

19. Repositář s demonstračními příklady

20. Odkazy na Internetu

1. Dramatiq – další užitečná knihovna pro práci s frontami úloh v Pythonu

V seriálu o message brokerech a na nich založených technologiích jsme se již setkali s několika projekty, které zprostředkovávají vysokoúrovňové rozhraní k message brokerům pro vývojáře používající programovací jazyk Python. Připomeňme si, že se jedná především o projekty RQ (Redis Queue) a Celery, ovšem existují i další podobné nástroje, například příště popsaný projekt s názvem Huey. Dnes se seznámíme s dalším podobně koncipovaným projektem, který se jmenuje Dramatiq a který je určen pro plánování úloh (job, task) pro samostatně běžící workery. V případě, že worker nějakou úlohu nezpracuje (tj. vyhodí výjimku), je možné přesně specifikovat, co se má s úlohou dále udělat; může se například přeposlat znovu, uložit do takzvané DLQ apod. To však není vše, protože lze vytvářet skupiny (groups) workerů, čekat na jejich výsledky, tvořit kolony (pipeline) workerů s automatickým předáváním mezivýsledků atd.

Sám autor projektu Dramatiq píše, že je dlouholetým uživatelem systému Celery a že je frustrován některými jeho vlastnostmi. A právě z těchto důvodů vytvořil dnes popisovaný projekt Dramatiq, který by měl být oproti Celery v mnoha směrech vylepšen a/nebo zjednodušen. Do jaké míry se to podařilo, by mohlo být patrné z demonstračních příkladů, které budou popsány v navazujících kapitolách.

V následující tabulce jsou pro přehlednost porovnány čtyři systémy s implementací front zpráv pro Python, samozřejmě včetně projektu Dramatiq:

# Vlastnost Dramatiq Celery Huey RQ
1 podpora Pythonu 2
2 jednoduchá implementace
3 automatické přeposlání zhavarovaných úloh
4 zajištění doručení úlohy
5 omezení počtu zpráv
6 specifikace priority úlohy
7 úlohy naplánované na pozdější dobu
8 plánování úloh ve stylu cronu
9 podpora pro kolony (pipeline)
10 možnost uložení výsledků do databáze (Redis…)
11 automatické znovunačtení kódu workera při změně
12 podpora RabbitMQ jako brokera
13 podpora Redisu jako brokera
14 podpora brokera umístěného v paměti
15 podpora greenletů

2. Koncept workerů a skriptů pro plánování úloh

Zprávy (či přesněji řečeno úlohy), s nimiž projekt Dramatiq pracuje, jsou interně spravovány nějakým obecným brokerem. Nativně jsou podporováni dva brokeři. První z nich je založen na databázi Redis, s níž jsme se seznámili v článku Databáze Redis (nejenom) pro vývojáře používající Python. Druhým standardně podporovaným brokerem je RabbitMQ. I ten již známe, viz též články RabbitMQ: jedna z nejúspěšnějších implementací brokera a Pokročilejší operace nabízené systémem RabbitMQ. To však není vše, protože v případě potřeby lze použít i další brokery a dokonce i služby s implementací front zpráv, zejména pak SQS, což je služba nabízená jako součást systému Amazon Web Services (AWS).

Poznámka: ve všech dnešních demonstračních příkladech použijeme message brokera založeného na databázi Redis. Je tomu tak z toho důvodu, že samotný Redis je poměrně snadno konfigurovatelný i nasaditelný. Navíc existuje ještě užitečný projekt dramatiq-dashboard, jehož současná verze podporuje právě Redis (další message brokery prozatím nikoli).

3. Instalace nástroje Dramatiq

Instalace celého systému Dramatiq se skládá ze dvou kroků. V prvním kroku je nutné nainstalovat zvoleného message brokera, který bude interně používán jak pro správu front, tak i pro případné ukládání výsledků práce jednotlivých workerů. V kroku druhém se nainstaluje samotný Dramatiq, a to s využitím nástroje pip nebo pip3.

Nejprve si popišme první krok, a to konkrétně instalaci Redisu. To je většinou snadné, protože balíček s Redisem je součástí většiny linuxových distribucí. Například na Fedoře může instalace vypadat následovně:

$ sudo dnf install redis
 
Last metadata expiration check: 0:15:30 ago on Wed 24 Oct 2018, 22:50:11 CEST.
Dependencies resolved.
================================================================================
 Package           Arch            Version               Repository        Size
================================================================================
Installing:
 redis             x86_64          4.0.9-1.fc27          updates          580 k
Installing dependencies:
 jemalloc          x86_64          4.5.0-5.fc27          updates          210 k
 
Transaction Summary
================================================================================
Install  2 Packages
 
Total download size: 790 k
Installed size: 2.0 M
Is this ok [y/N]:
 

Na systémech založených na Debianu (včetně Ubuntu) lze pro instalaci použít příkaz:

$ apt-get install redis-server

V případě, že budete potřebovat použít nejnovější verzi Redisu, můžete si ho sami přeložit. Postup je jednoduchý (mj. i díky minimálním závislostem na dalších knihovnách) a je podrobně popsán na stránce https://redis.io/topics/quickstart.

Pro vlastní databázi, konfigurační soubor, žurnál a logy Redisu použijeme samostatný adresář, který vytvoříme v domácím adresáři připojeného uživatele:

$ mkdir redis
$ cd redis

Po instalaci se můžeme přesvědčit, že je skutečně k dispozici spustitelný soubor s implementací serveru i řádkového klienta:

$ whereis -b redis-cli
redis-cli: /usr/bin/redis-cli
$ whereis -b redis-server
redis-server: /usr/bin/redis-server

Následně přímo v tomto adresáři vytvoříme konfigurační soubor nazvaný redis.conf. Můžeme se přitom inspirovat souborem /etc/redis/redis.conf (Debian a systémy od něj odvozené), popř. /etc/redis.conf (Fedora, RHEL, CentOS), který je však poměrně rozsáhlý, protože kromě vlastních konfiguračních voleb obsahuje i podrobné informace o významu jednotlivých konfiguračních voleb. Tento soubor je taktéž dostupný na internetu na adrese https://raw.githubusercon­tent.com/antirez/redis/4.0/re­dis.conf.

Následuje výpis obsahu konfiguračního souboru, který je připraven pro lokální spuštění Redisu, bez nebezpečí, že se k běžícímu serveru připojí případný útočník. Důležité volby jsou zvýrazněny. Pokud se vám soubor nechce kopírovat, naleznete ho na adrese https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf:

bind 127.0.0.1
protected-mode yes
port 6379
tcp-backlog 511
timeout 0
tcp-keepalive 300
daemonize no
supervised no
pidfile /var/run/redis_6379.pid
loglevel notice
logfile redis.log
databases 16
always-show-logo yes
save 900 1
save 300 10
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir .
slave-serve-stale-data yes
slave-read-only yes
repl-diskless-sync no
repl-diskless-sync-delay 5
repl-disable-tcp-nodelay no
slave-priority 100
lazyfree-lazy-eviction no
lazyfree-lazy-expire no
lazyfree-lazy-server-del no
slave-lazy-flush no
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec
no-appendfsync-on-rewrite no
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
aof-load-truncated yes
aof-use-rdb-preamble no
lua-time-limit 5000
slowlog-log-slower-than 10000
slowlog-max-len 128
latency-monitor-threshold 0
notify-keyspace-events ""
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
list-compress-depth 0
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
hll-sparse-max-bytes 3000
activerehashing yes
client-output-buffer-limit normal 0 0 0
client-output-buffer-limit slave 256mb 64mb 60
client-output-buffer-limit pubsub 32mb 8mb 60
hz 10
aof-rewrite-incremental-fsync yes

Nyní již můžeme databázi Redis spustit, aniž by došlo k tomu, že bude její API viditelné ostatním počítačům připojeným do sítě:

$ redis-server redis.conf 

Na druhém terminálu pak již můžeme (čistě pro otestování) spustit klienta Redisu, který uživatelům nabízí interaktivní příkazový řádek:

$ redis-cli
127.0.0.1:6379>

Příkazem „ping“ můžeme otestovat, jestli se klient připojí k serveru a zda od něj dokáže získávat odpovědi:

127.0.0.1:6379> ping
PONG
 
127.0.0.1:6379> ping test
"test"

Jak již víme z předchozího textu, bude následovat instalace samotného systému Dramatiq. Samotná instalace se provede s využitím nástroje pip resp. pip3, protože samotný Dramatiq je pochopitelně dostupný jako balíček na PyPi. Musíme si ovšem dát pozor na to, že instalaci nelze provést pouze takto:

$ pip3 install --user dramatiq

Musíme totiž specifikovat i to, na jakém brokerovi bude nástroj Dramatiq založen. Pokud použijeme RabbitMQ, mohl by příkaz pro instalaci vypadat takto:

$ pip3 install --user 'dramatiq[rabbitmq, watch]'

My ovšem použijeme Redis, takže příkaz pro instalaci bude nepatrně odlišný:

$ pip3 install --user 'dramatiq[redis, watch]'

Samotná instalace proběhne velmi rychle (v mém případě ještě rychleji, protože jsem již mnoho závislostí měl nainstalovaných):

Requirement already satisfied: dramatiq[redis,watch] in ./.local/lib/python3.6/site-packages
Requirement already satisfied: prometheus-client<0.3,>=0.2 in ./.local/lib/python3.6/site-packages (from dramatiq[redis,watch])
Requirement already satisfied: redis<4.0,>=2.0; extra == "redis" in ./.local/lib/python3.6/site-packages (from dramatiq[redis,watch])
Collecting watchdog<0.9,>=0.8; extra == "watch" (from dramatiq[redis,watch])
Collecting watchdog-gevent==0.1; extra == "watch" (from dramatiq[redis,watch])
  Using cached https://files.pythonhosted.org/packages/c1/85/14264c65d46c3e15d201dcf86722838a1d9d8b8443de8bfd8b19299f429b/watchdog_gevent-0.1.0-py3-none-any.whl
Collecting argh>=0.24.1 (from watchdog<0.9,>=0.8; extra == "watch"->dramatiq[redis,watch])
  Using cached https://files.pythonhosted.org/packages/06/1c/e667a7126f0b84aaa1c56844337bf0ac12445d1beb9c8a6199a7314944bf/argh-0.26.2-py2.py3-none-any.whl
Collecting pathtools>=0.1.1 (from watchdog<0.9,>=0.8; extra == "watch"->dramatiq[redis,watch])
Requirement already satisfied: PyYAML>=3.10 in /usr/lib64/python3.6/site-packages (from watchdog<0.9,>=0.8; extra == "watch"->dramatiq[redis,watch])
Collecting gevent>=1.1 (from watchdog-gevent==0.1; extra == "watch"->dramatiq[redis,watch])
  Using cached https://files.pythonhosted.org/packages/f2/ca/5b5962361ed832847b6b2f9a2d0452c8c2f29a93baef850bb8ad067c7bf9/gevent-1.4.0-cp36-cp36m-manylinux1_x86_64.whl
Requirement already satisfied: greenlet>=0.4.14; platform_python_implementation == "CPython" in ./.local/lib/python3.6/site-packages (from gevent>=1.1->watchdog-gevent==0.1; extra == "watch"->dramatiq[redis,watch])
Installing collected packages: argh, pathtools, watchdog, gevent, watchdog-gevent
Successfully installed argh-0.26.2 gevent-1.4.0 pathtools-0.1.2 watchdog-0.8.3 watchdog-gevent-0.1.0

4. Implementace prvního skriptu pro naplánování úlohy

Nyní již tedy máme nainstalovány a nakonfigurovány všechny potřebné balíčky a můžeme se pokusit vytvořit příklad, v němž bude implementován jak worker, tak i skript, který workerovi přiřadí nějakou úlohu (job, task). Příklady budou nepatrně zkomplikovány tím, že budeme používat Redis namísto výchozího RabbitMQ, takže bude nutné explicitně specifikovat, který broker bude použit (v případě RabbitMQ to není bezpodmínečně nutné).

Celá činnost dnešního prvního demonstračního příkladu se dá shrnout do několika bodů:

  1. Skript naplánuje úlohu. Ta se přenese a uloží do brokera, v našem případě do Redisu.
  2. Systém Dramatiq (ve chvíli, kdy bude spuštěn) postupně vybírá úlohy/zprávy z brokera, které jsou určeny zvolenému workeru.
  3. Pokud zprávu přijme, zavolá kód brokera a předá mu příslušný parametr/parametry uložené ve zprávě.
  4. Worker může v případě potřeby uložit výsledky do Redisu.
  5. V případě, že worker zhavaruje (což se může velmi snadno stát, ostatně si to i ukážeme), může být zpráva/úloha zopakována popř. uložena do fronty nazvané DLQ (Dead Letter Queue), kde implicitně vydrží týden.

Podívejme se, jak může vypadat skript, který workerovi zadá (naplánuje) úlohu. Nejprve si ukažme celý kód tohoto skriptu:

import time
import dramatiq
 
from dramatiq.brokers.redis import RedisBroker
 
from test_worker_1 import test_worker, setup_broker
 
 
setup_broker()
 
test_worker.send()

Povšimněte si, že musíme importovat samotného workera, aby bylo možné pracovat s identifikátorem test_worker (to je název funkce s implementací workera – viz další kapitolu). Zadání úlohy workerovi je snadné – vše se provede na tomto řádku:

test_worker.send()
Poznámka: v dalším textu se předpokládá, že systém Redis již běží a že používá výchozí port. Pokud tomu tak není, bude se při pokusech o spuštění dalších skriptů vypisovat chybové hlášení, že se není možné připojit na message brokera.

5. Implementace workera

Dále se podívejme na implementaci workera. Celý skript s workerem může vypadat následovně:

import time
 
import dramatiq
from dramatiq.brokers.redis import RedisBroker
 
 
def setup_broker():
    redis_broker = RedisBroker(host="localhost", port=6379)
    dramatiq.set_broker(redis_broker)
    return redis_broker
 
 
setup_broker()
 
 
@dramatiq.actor
def test_worker():
    print("Working")
    time.sleep(1)
    print("Done")

Povšimněte si především pomocné funkce setup_broker sloužící pro konfiguraci message brokera. V této funkci specifikujeme, na jaké adrese a na jakém portu je dostupné API message brokera; následně je broker nastaven jako výchozí message broker pro všechny dále specifikované workery:

def setup_broker():
    redis_broker = RedisBroker(host="localhost", port=6379)
    dramatiq.set_broker(redis_broker)
    return redis_broker
Poznámka: v praxi by se pochopitelně parametry host a port načítaly z konfiguračního souboru.

Výše popsanou funkci setup_broker je nutné zavolat ještě předtím, než se bude interpretovat samotná implementace workera. Ta je prozatím velmi jednoduchá, protože se jedná o funkci s dekorátorem @dramatiq.actor. Vzhledem k tomu, že našemu prvnímu workeru nebudeme předávat žádné parametry, bude i funkce s jeho implementací bez argumentů:

@dramatiq.actor
def test_worker():
    print("Working")
    time.sleep(1)
    print("Done")
Poznámka: skutečnou práci workera zde simulujeme zavoláním funkce time.sleep
.

6. Naplánování úlohy a její zpracování workerem

Workeři, kterým jsou předávány jednotlivé úlohy, pochopitelně nejsou spouštěni přímo zavoláním funkce, v níž jsou implementováni (zde konkrétně funkce test_worker). Namísto toho je předávání úloh a plánování práce workerů ponecháno na nástroji dramatiq ovládaném z příkazového řádku. Tento nástroj se spustí jednoduše – přesuneme se do adresáře se skriptem obsahujícím kód workera a napíšeme:

$ dramatiq test_worker_1

Povšimněte si, že se spustí celkem osm procesů s čekajícím workerem. Je tomu tak z toho důvodu, že na mém počítači je dostupných osm jader a nástroj dramatiq se snaží využít všechna dostupná jádra (toto nastavení je pochopitelně možné ovlivnit z příkazové řádky):

[2019-07-23 14:01:57,290] [PID 3229] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq '1.6.0' is booting up.
[2019-07-23 14:01:57,323] [PID 3235] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Worker process is ready for action.
[2019-07-23 14:01:57,324] [PID 3237] [MainThread] [dramatiq.WorkerProcess(1)] [INFO] Worker process is ready for action.
[2019-07-23 14:01:57,324] [PID 3238] [MainThread] [dramatiq.WorkerProcess(2)] [INFO] Worker process is ready for action.
[2019-07-23 14:01:57,330] [PID 3239] [MainThread] [dramatiq.WorkerProcess(3)] [INFO] Worker process is ready for action.
[2019-07-23 14:01:57,332] [PID 3240] [MainThread] [dramatiq.WorkerProcess(4)] [INFO] Worker process is ready for action.
[2019-07-23 14:01:57,332] [PID 3242] [MainThread] [dramatiq.WorkerProcess(6)] [INFO] Worker process is ready for action.
[2019-07-23 14:01:57,334] [PID 3241] [MainThread] [dramatiq.WorkerProcess(5)] [INFO] Worker process is ready for action.
[2019-07-23 14:01:57,348] [PID 3243] [MainThread] [dramatiq.WorkerProcess(7)] [INFO] Worker process is ready for action.

Náš worker by se měl spustit a vypsat na terminál informaci o tom, že začal zpracovávat (jedinou) úlohu a následně ji i ukončil po cca jedné sekundě „práce“:

Working
Done

Jakmile byla úloha zpracována, můžeme všechny procesy i samotný nástroj dramatiq ukončit, například klávesovou zkratkou Ctrl+C:

[2019-07-23 14:02:51,893] [PID 3414] [MainThread] [dramatiq.MainProcess] [INFO] Sending signal 'SIGTERM' to worker processes...
[2019-07-23 14:02:51,895] [PID 3419] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Stopping worker process...
[2019-07-23 14:02:51,895] [PID 3420] [MainThread] [dramatiq.WorkerProcess(1)] [INFO] Stopping worker process...
[2019-07-23 14:02:51,895] [PID 3421] [MainThread] [dramatiq.WorkerProcess(2)] [INFO] Stopping worker process...
[2019-07-23 14:02:51,895] [PID 3422] [MainThread] [dramatiq.WorkerProcess(3)] [INFO] Stopping worker process...
[2019-07-23 14:02:51,895] [PID 3423] [MainThread] [dramatiq.WorkerProcess(4)] [INFO] Stopping worker process...
[2019-07-23 14:02:51,895] [PID 3424] [MainThread] [dramatiq.WorkerProcess(5)] [INFO] Stopping worker process...
[2019-07-23 14:02:51,895] [PID 3425] [MainThread] [dramatiq.WorkerProcess(6)] [INFO] Stopping worker process...
[2019-07-23 14:02:51,895] [PID 3426] [MainThread] [dramatiq.WorkerProcess(7)] [INFO] Stopping worker process...
[2019-07-23 14:02:52,741] [PID 3420] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2019-07-23 14:02:52,746] [PID 3421] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2019-07-23 14:02:52,750] [PID 3419] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2019-07-23 14:02:52,750] [PID 3422] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2019-07-23 14:02:52,750] [PID 3423] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2019-07-23 14:02:52,751] [PID 3424] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2019-07-23 14:02:52,751] [PID 3425] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2019-07-23 14:02:52,761] [PID 3426] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2019-07-23 14:02:53,990] [PID 3424] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.
[2019-07-23 14:02:54,052] [PID 3421] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.
[2019-07-23 14:02:54,082] [PID 3423] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.
[2019-07-23 14:02:54,114] [PID 3426] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.
[2019-07-23 14:02:54,154] [PID 3422] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.
[2019-07-23 14:02:54,202] [PID 3419] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.
[2019-07-23 14:02:54,283] [PID 3425] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.
[2019-07-23 14:02:54,744] [PID 3420] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.
Poznámka: povšimněte si, že se korektně ukončilo všech osm procesů s workery.

7. Druhý demonstrační příklad – předání parametrů workerům

První worker byl ve skutečnosti velmi primitivní, protože neakceptoval žádné parametry. Teoreticky by tedy měl při každém zavolání vykonat tu samou činnost – jinými slovy by měl být idempotentní. Můžeme se ovšem pochopitelně pokusit vytvořit workera, který akceptuje nějaký parametr či parametry. Prozatím se předaný parametr pouze vypíše na standardní výstup. Nová implementace workera může vypadat následovně:

@dramatiq.actor
def test_worker(parameter):
    print("Working, received parameter: {param}".format(param=parameter))
    time.sleep(1)
    print("Done")
Poznámka: pro jednoduchost nijak nekontrolujeme typ parametru, pouze předpokládáme, že je převeditelný na řetězec.

Uveďme si celý skript s workerem:

import time
 
import dramatiq
from dramatiq.brokers.redis import RedisBroker
 
 
def setup_broker():
    redis_broker = RedisBroker(host="localhost", port=6379)
    dramatiq.set_broker(redis_broker)
    return redis_broker
 
 
setup_broker()
 
 
@dramatiq.actor
def test_worker(parameter):
    print("Working, received parameter: {param}".format(param=parameter))
    time.sleep(1)
    print("Done")

Pochopitelně se změní i skript, který plánuje jednotlivé úlohy. Nyní tento skript musí generovat i parametry předané workeru. Celkem vytvoříme deset úloh:

for i in range(1, 11):
    test_worker.send(i)
Poznámka: parametry, které mají být předány workeru, jsou součástí zprávy ukládané do message brokera. Z tohoto důvodu se musí jednat o hodnoty, které jsou serializovatelné do formátu JSON.

8. Spuštění druhého demonstračního příkladu

Ihned po spuštění druhého demonstračního příkladu (tedy jak skriptu pro naplánování úloh, tak i nástroje dramatiq) je zajímavé se podívat do logů. Můžeme zde vidět, že se nejprve spustilo osm úloh (protože na testovacím počítači je k dispozici osm jader), a další dvě úlohy mohly být spuštěny až ve chvíli, kdy byly předchozí úlohy dokončeny:

Working, received parameter: 10
Working, received parameter: 3
Working, received parameter: 4
Working, received parameter: 1
Working, received parameter: 5
Working, received parameter: 2
Working, received parameter: 8
Working, received parameter: 7
Done
Done
Done
Done
Done
Working, received parameter: 6
Done
Working, received parameter: 9
Done
Done
Done
Done
Poznámka: podobně by se pokračovalo i v případě, že bychom naplánovali více úloh. Pochopitelně je možné při spuštění nástroje dramatiq určit, kolik procesů se má vytvořit a tedy kolik workerů může současně běžet.

9. Třetí demonstrační příklad – worker vyhazující výjimku

Třetí demonstrační příklad se liší od druhého příkladu v tom, že worker pro některý vstupní parametr (konkrétně pro celočíselný parametr dělitelný třemi) vyhodí výjimku:

@dramatiq.actor
def test_worker(parameter):
    print("Working, received parameter: {param}".format(param=parameter))
    if parameter % 3 == 0:
        raise Exception("I don't like this parameter!")
    time.sleep(1)
    print("Done")

Tímto způsobem se budeme snažit simulovat reálné chyby a výjimky, které mohou při práci workerů nastat.

Další části demonstračního příkladu jsou prakticky totožné s předchozím příkladem a proto je již nebudu znovu uvádět.

10. Spuštění třetího demonstračního příkladu

Po spuštění workerů příkazem dramatiq test_worker3 uvidíme, jak workeři přijali jednotlivé úlohy, ovšem worker, který přijal úlohu s parametrem nastaveným na trojku, posléze zhavaroval:

Working, received parameter: 10
Working, received parameter: 3
Working, received parameter: 4
Working, received parameter: 1
Working, received parameter: 5
Working, received parameter: 2
Working, received parameter: 8
Working, received parameter: 7
[2019-07-23 14:09:40,209] [PID 4347] [Thread-8] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(3) with unhandled exception.
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message
    res = actor(*message.args, **message.kwargs)
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__
    return self.fn(*args, **kwargs)
  File "./test_worker_3.py", line 20, in test_worker
    raise Exception("I don't like this parameter!")
Exception: I don't like this parameter!

Podobně pochopitelně zhavarují i workery, kterým se předal parametr 6 a 9.

Nástroj Dramatiq se havárii úlohy snaží vyřešit tím, že po určitém čase naplánuje tu samou úlohu, přičemž se časy mezi jednotlivými úlohami postupně zvětšují (protože v naprosté většině případů nechceme dojít do okamžiku, kdy počítač stále dokola opakuje ty samé kroky vedoucí k chybě):

[2019-07-23 14:09:40,216] [PID 4347] [Thread-8] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '73c0f344-b1ae-4601-852e-bab6dc77affb' in 13434 milliseconds.
Working, received parameter: 6
[2019-07-23 14:09:40,221] [PID 4347] [Thread-8] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(3) with unhandled exception.
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message
    res = actor(*message.args, **message.kwargs)
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__
    return self.fn(*args, **kwargs)
  File "./test_worker_3.py", line 20, in test_worker
    raise Exception("I don't like this parameter!")
Exception: I don't like this parameter!

11. Čtvrtý příklad – omezení počtu znovuposlaných zpráv do workera

V mnoha případech je výchozí chování systému Dramatiq rozumné – pokud dojde k chybě, bude zpráva obsahující informaci o naplánované úloze znovu vložena do fronty a worker ji později zopakuje s tím, že doufáme, že zopakování již proběhne v pořádku. Ovšem toto chování můžeme ovlivnit, a to již při plánování úlohy. Můžeme totiž určit maximální počet pokusů o spuštění úlohy. Pro tento účel slouží nepovinný parametr max_retries dekorátoru dramatiq.actor:

@dramatiq.actor(max_retries=3)
def test_worker(parameter):
    print("Working, received parameter: {param}".format(param=parameter))
    if parameter % 3 == 0:
        raise Exception("I don't like this parameter!")
    time.sleep(1)
    print("Done")

Nyní se chování systému Dramatiq při práci s havarovanými úlohami změní. První pády vedou k zopakování úlohy:

Working, received parameter: 6
[2019-07-23 14:15:05,061] [PID 4946] [Thread-12] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(6) with unhandled exception.
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message
    res = actor(*message.args, **message.kwargs)
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__
    return self.fn(*args, **kwargs)
  File "./test_worker_4.py", line 20, in test_worker
    raise Exception("I don't like this parameter!")
Exception: I don't like this parameter!

Posléze (po třech opakováních) se však již jen vypíše poslední varování, že úloha nebyla ani napotřetí dokončena a již nebude znovu provedena:

[2019-07-23 14:15:05,065] [PID 4946] [Thread-12] [dramatiq.middleware.retries.Retries] [WARNING] Retries exceeded for message 'c59cf6e4-f226-4f91-9067-666d0ccd51a1'.
Working, received parameter: 3
[2019-07-23 14:15:14,667] [PID 4949] [Thread-5] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(3) with unhandled exception.
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message
    res = actor(*message.args, **message.kwargs)
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__
    return self.fn(*args, **kwargs)
  File "./test_worker_4.py", line 20, in test_worker
    raise Exception("I don't like this parameter!")
Exception: I don't like this parameter!
[2019-07-23 14:15:14,668] [PID 4949] [Thread-5] [dramatiq.middleware.retries.Retries] [WARNING] Retries exceeded for message 'c83ee481-47c1-4dd1-9bbb-c7a1e3083233'.
Working, received parameter: 9
[2019-07-23 14:15:24,905] [PID 4948] [Thread-5] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(9) with unhandled exception.
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message
    res = actor(*message.args, **message.kwargs)
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__
    return self.fn(*args, **kwargs)
  File "./test_worker_4.py", line 20, in test_worker
    raise Exception("I don't like this parameter!")
Exception: I don't like this parameter!
[2019-07-23 14:15:24,908] [PID 4948] [Thread-5] [dramatiq.middleware.retries.Retries] [WARNING] Retries exceeded for message 'e32d6343-5af0-4ab8-a7a7-b5ef6cd31ad7'.

12. Pátý příklad – nastavení minimálního a maximálního časového intervalu mezi znovuposláním zprávy

Existuje ještě jeden způsob, jakým můžeme omezit opětovnému naplánování úloh, které zhavarovaly. Je totiž možné použít parametry min_backoff a max_backoff, kterými se řídí časy mezi havárií úlohy a jejím novým naplánovaným časem. Oba časy jsou uvedeny v milisekundách a časová prodleva se postupně zvětšuje (násobí konstantou větší než 1, většinou dvojkou). Jakmile je překročena druhá hodnota (maximální), nebude havarující úloha znovu naplánována:

@dramatiq.actor(min_backoff=100, max_backoff=2000)
def test_worker(parameter):
    print("Working, received parameter: {param}".format(param=parameter))
    raise Exception("I don't like this parameter!")
    time.sleep(1)
    print("Done")

Opět se podívejme na konkrétní chování systému. První pády úlohy vedou k jejímu opětovnému naplánování:

Working, received parameter: 42
[2019-07-23 14:18:11,229] [PID 5604] [Thread-5] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(42) with unhandled exception.
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message
    res = actor(*message.args, **message.kwargs)
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__
    return self.fn(*args, **kwargs)
  File "./test_worker_5.py", line 19, in test_worker
    raise Exception("I don't like this parameter!")
Exception: I don't like this parameter!
[2019-07-23 14:18:11,233] [PID 5604] [Thread-5] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '257d6c66-b798-47a6-bc44-2373d374eb14' in 70 milliseconds.
Working, received parameter: 42
[2019-07-23 14:18:11,333] [PID 5598] [Thread-5] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(42) with unhandled exception.
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message
    res = actor(*message.args, **message.kwargs)
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__
    return self.fn(*args, **kwargs)
  File "./test_worker_5.py", line 19, in test_worker
    raise Exception("I don't like this parameter!")
Exception: I don't like this parameter!

Nakonec se však oznámí, že již úloha nebude znovu naplánována:

Working, received parameter: 42
[2019-07-23 14:18:41,865] [PID 5601] [Thread-7] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(42) with unhandled exception.
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message
    res = actor(*message.args, **message.kwargs)
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__
    return self.fn(*args, **kwargs)
  File "./test_worker_5.py", line 19, in test_worker
    raise Exception("I don't like this parameter!")
Exception: I don't like this parameter!
[2019-07-23 14:18:41,867] [PID 5601] [Thread-7] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '257d6c66-b798-47a6-bc44-2373d374eb14' in 1562 milliseconds.
Traceback (most recent call last):
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message
    res = actor(*message.args, **message.kwargs)
  File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__
    return self.fn(*args, **kwargs)
  File "./test_worker_5.py", line 19, in test_worker
    raise Exception("I don't like this parameter!")
Exception: I don't like this parameter!
[2019-07-23 14:18:43,741] [PID 5604] [Thread-7] [dramatiq.middleware.retries.Retries] [WARNING] Retries exceeded for message '257d6c66-b798-47a6-bc44-2373d374eb14'.

13. Konfigurace aplikace v případě použití většího množství workerů a front

V dalších kapitolách si ukážeme, jak lze nakonfigurovat větší množství workerů a front. Většinou se každému typu workeru přiřadí vlastní fronta, která je po workeru pojmenována. Ovšem jak se jednotlivé naplánované úlohy budou spouštět? Existuje několik možností:

  1. Workeři jsou na sobě nezávislí a nepředávají si výsledky. Mohou tedy běžet naprosto nezávisle na sobě, což pochopitelně platí i pro naplánované úlohy.
  2. Workeři tvoří skupiny a můžeme požadovat, aby se počkalo na všechny výsledky workerů ve skupině. Tito workeři mohou běžet paralelně, ovšem bude se vždy čekat na dokončení úloh zpracovaných workery ve skupině (to musíme explicitně specifikovat).
  3. A naopak, workeři si mohou postupně předávat výsledky a mohou tak tvořitkolonu (pipeline). Teprve výsledek posledního workera je dále použit.

Jednotlivé možnosti jsou ukázány v navazujícím textu.

14. Šestý demonstrační příklad – tři na sobě nezávislí workeři

V šestém příkladu implementujeme tři workery, kteří jsou na sobě nezávislí (to, že volají společnou funkci, je jen výsledek malého refaktoringu):

def worker(name, parameter):
    print("Worker {w}: working, received parameter: {param}".format(w=name, param=parameter))
    time.sleep(1)
    print("Worker {w}: done".format(w=name))
 
 
@dramatiq.actor
def test_worker_A(parameter):
    worker("A", parameter)
 
 
@dramatiq.actor
def test_worker_B(parameter):
    worker("B", parameter)
 
 
@dramatiq.actor
def test_worker_C(parameter):
    worker("C", parameter)

Naplánování úloh pro tyto workery může vypadat takto – pro každý worker pět úloh:

for i in range(1, 6):
    test_worker_A.send(i)
    test_worker_B.send(i)
    test_worker_C.send(i)

Po spuštění workerů a naplánování úloh můžeme vidět, že jednotlivé úlohy jsou spouštěny „náhodně“ a nezávisle na ostatních úlohách (čekání na „done“ je způsobeno použitím pouhých osmi procesů):

Worker C: working, received parameter: 2
Worker B: working, received parameter: 1
Worker C: working, received parameter: 1
Worker A: working, received parameter: 1
Worker B: working, received parameter: 2
Worker A: working, received parameter: 2
Worker C: working, received parameter: 3
Worker B: working, received parameter: 3
Worker C: done
Worker B: done
Worker A: done
Worker C: done
Worker C: done
Worker B: done
Worker B: done
Worker A: done
Worker C: working, received parameter: 5
Worker A: working, received parameter: 5
Worker A: working, received parameter: 4
Worker B: working, received parameter: 4
Worker A: working, received parameter: 3
Worker B: working, received parameter: 5
Worker C: working, received parameter: 4
Worker C: done
Worker A: done
Worker B: done
Worker A: done
Worker A: done
Worker B: done
Worker C: done

15. Úlohy spouštěné ve skupině

Úlohy ovšem můžeme sdružit do skupiny, a to s využitím konstruktoru group, kterému se předá sekvence (seznam, n-tice) workerů s naplánovanou úlohou. Ovšem pozor – zde se nepoužívá metoda send, ale message:

for i in range(1, 6):
    g = dramatiq.group([
        test_worker_A.message(i),
        test_worker_B.message(i),
        test_worker_C.message(i)
    ]).run()

Ani při použití skupiny však nemusí být zaručeno, že jsou úlohy spouštěny v nějakém pevném pořadí. Je tomu tak proto, že nečekáme na výsledky úloh zařazených do skupiny:

Worker A: working, received parameter: 3
Worker A: working, received parameter: 1
Worker C: working, received parameter: 2
Worker B: working, received parameter: 1
Worker C: working, received parameter: 1
Worker B: working, received parameter: 1
Worker B: working, received parameter: 2
Worker C: working, received parameter: 1
Worker C: working, received parameter: 5
Worker C: working, received parameter: 4
Worker A: working, received parameter: 4
Worker C: working, received parameter: 3
Worker C: working, received parameter: 4
Worker A: working, received parameter: 4
Worker B: working, received parameter: 4
Worker B: working, received parameter: 3

16. Uložení výsledků práce workerů

Pokud ovšem systém nastavíme tak, aby se výsledky práce workerů (výsledky jednotlivých úloh) ukládaly zpět do Redisu, budeme moci čekat na výsledky celé skupiny úloh. Nastavení se provede takto:

def setup_broker_and_backend():
    redis_broker = RedisBroker(host="localhost", port=6379)
    result_backend = RedisBackend()
    dramatiq.set_broker(redis_broker)
    redis_broker.add_middleware(Results(backend=result_backend))
    return redis_broker

Změní se i jednotliví workeři, protože u nich budeme vyžadovat uložení výsledků:

@dramatiq.actor(store_results=True)
def test_worker_A(parameter):
    worker("A", parameter)
    return parameter + "A"
 
 
@dramatiq.actor(store_results=True)
def test_worker_B(parameter):
    worker("B", parameter)
    return parameter + "B"
 
 
@dramatiq.actor(store_results=True)
def test_worker_C(parameter):
    worker("C", parameter)
    return parameter + "C"

Naplánování úloh s jejich rozřazením do skupiny a s čekáním na výsledek (maximálně však dvacet sekund):

for i in range(1, 6):
    print(i)
    g = dramatiq.group([
        test_worker_A.message(str(i)),
        test_worker_B.message(str(i)),
        test_worker_C.message(str(i))
    ]).run()
    g.wait(timeout=20000)

Ještě se podívejme na rozdíl mezi použitím metody send a message. Pokud ve skupině workerů použijeme metodu send, bude ve skutečnosti každý worker zavolán dvakrát s tím stejným parametrem, což většinou je chování, které nám nebude vyhovovat (i když by workeři měli být idempotentní):

for i in range(1, 6):
    print(i)
    g = dramatiq.group([
        test_worker_A.send(str(i)),
        test_worker_B.send(str(i)),
        test_worker_C.send(str(i))
    ]).run()
    g.wait(timeout=20000)

S výsledky:

Worker C: working, received parameter: 1
Worker B: working, received parameter: 1
Worker B: done
Worker C: done
Worker C: working, received parameter: 1
Worker C: done
Worker A: working, received parameter: 1
Worker A: done
Worker B: working, received parameter: 1
Worker A: working, received parameter: 1
Worker A: done
Worker B: done
Worker C: working, received parameter: 2
Worker C: done
Worker A: working, received parameter: 2
Worker A: done
Worker C: working, received parameter: 2
Worker C: done
Worker A: working, received parameter: 2
Worker A: done
Worker B: working, received parameter: 2
Worker B: done
Worker B: working, received parameter: 2
Worker B: done
Worker C: working, received parameter: 3
Worker C: done
Worker B: working, received parameter: 3
Worker C: working, received parameter: 3
Worker A: working, received parameter: 3

Ovšem ve chvíli, kdy se zdrojový kód příkladu změní tak, že se namísto send použije message:

for i in range(1, 6):
    print(i)
    g = dramatiq.group([
        test_worker_A.message(str(i)),
        test_worker_B.message(str(i)),
        test_worker_C.message(str(i))
    ]).run()
    g.wait(timeout=20000)

Bude rozdílné i chování – nyní se vždy spustí všechny tři workery ve skupině a teprve poté se spustí další skupina workerů:

Worker C: working, received parameter: 1
Worker C: done
Worker B: working, received parameter: 1
Worker B: done
Worker A: working, received parameter: 1
Worker A: done
 
Worker C: working, received parameter: 2
Worker B: working, received parameter: 2
Worker B: done
Worker C: done
Worker A: working, received parameter: 2
Worker A: done
 
Worker C: working, received parameter: 3
Worker C: done
Worker B: working, received parameter: 3
Worker A: working, received parameter: 3
Worker A: done
Worker B: done
 
Worker C: working, received parameter: 4
Worker B: working, received parameter: 4
Worker B: done
Worker A: working, received parameter: 4
Worker C: done
Worker A: done
 
Worker C: working, received parameter: 5
Worker B: working, received parameter: 5
Worker B: done
Worker A: working, received parameter: 5
Worker A: done
Worker C: done
Poznámka: pro větší přehlednost jsem jednotlivé zprávy vypisované workery rozdělil prázdným řádkem tam, kde došlo k dokončení celé skupiny úloh.

17. Kolona (pipeline) workerů

V předposledním demonstračním příkladu je ukázáno, jak lze vytvořit takzvanou kolonu (pipeline) složenou z více workerů. Nejprve je spuštěn první worker a jeho výsledek je předán workerovi druhému, výsledek druhého workera je předán workerovi třetímu atd. – zcela stejným způsobem, jako v klasické Unixové koloně (povšimněte si, že workeři nemusí vědět, kdo jim parametry předal a kdo zpracuje jejich výsledek, naprosto stejně, jako je tomu v Unixové pipeline):

p = dramatiq.pipeline([
    test_worker_A.message("!"),
    test_worker_B.message(),
    test_worker_C.message()
]).run()
Poznámka: ve skutečnosti je možné workerům předat i další parametry, ovšem výsledek předchozího workera je vždy předán v posledním parametru.

Implementace všech tří workerů je snadná; každý worker přečte parametr úlohy, přidá k němu jedno písmeno a vrátí jako výsledek:

import time
 
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results.backends import RedisBackend
from dramatiq.results import Results
 
 
def setup_broker_and_backend():
    redis_broker = RedisBroker(host="localhost", port=6379)
    result_backend = RedisBackend()
    dramatiq.set_broker(redis_broker)
    redis_broker.add_middleware(Results(backend=result_backend))
    return redis_broker
 
 
setup_broker_and_backend()
 
 
def worker(name, parameter):
    print("Worker {w}: working, received parameter: {param}".format(w=name, param=parameter))
    print("Worker {w}: done".format(w=name))
 
 
@dramatiq.actor(store_results=True)
def test_worker_A(parameter):
    worker("A", parameter)
    return parameter + "A"
 
 
@dramatiq.actor(store_results=True)
def test_worker_B(parameter):
    worker("B", parameter)
    return parameter + "B"
 
 
@dramatiq.actor(store_results=True)
def test_worker_C(parameter):
    worker("C", parameter)
    return parameter + "C"

Úplný zdrojový kód skriptu, který spustí kolonu workerů, vypadá následovně:

import time
import dramatiq
 
from dramatiq.brokers.redis import RedisBroker
 
from test_worker_9 import test_worker_A, test_worker_B, test_worker_C, setup_broker_and_backend
 
 
setup_broker_and_backend()
 
p = dramatiq.pipeline([
    test_worker_A.message("!"),
    test_worker_B.message(),
    test_worker_C.message()
]).run()
 
print(p.get_result(block=True, timeout=5000))

Podívejme se nyní na to, jak bude vypadat výsledek spojení tří workerů do kolony:

Worker A: working, received parameter: !
Worker A: done
Worker B: working, received parameter: !A
Worker B: done
Worker C: working, received parameter: !AB
Worker C: done

V dnešním posledním demonstračním příkladu opět použijeme kolonu (pipeline) složenou ze tří workerů, tentokrát ovšem bude každá kolona spuštěna třikrát – pokaždé pro jiný vstup. Pro jednoduchost budou vstupy znaky „X“, „Y“ a „Z“ a samotná kolona bude delší (bude sestávat ze šestice workerů):

for parameter in "XYZ":
    p = dramatiq.pipeline([
        test_worker_A.message(parameter),
        test_worker_B.message(),
        test_worker_C.message(),
        test_worker_A.message(),
        test_worker_B.message(),
        test_worker_C.message()
    ]).run()

Úplný tvar skriptu, který úlohy naplánuje, bude vypadat takto:

import time
import dramatiq
 
from dramatiq.brokers.redis import RedisBroker
 
from test_worker_10 import test_worker_A, test_worker_B, test_worker_C, setup_broker_and_backend
 
 
setup_broker_and_backend()
 
for parameter in "XYZ":
    p = dramatiq.pipeline([
        test_worker_A.message(parameter),
        test_worker_B.message(),
        test_worker_C.message(),
        test_worker_A.message(),
        test_worker_B.message(),
        test_worker_C.message()
    ]).run()
    print(p.get_result(block=True, timeout=5000))

Výsledky získané z jednotlivých kolon by tedy měly vypadat následovně:

XABCABC
YABCABC
ZABCABC
Poznámka: teoreticky je ovšem možné, ze se výsledky uloží v jiném pořadí.

Pochopitelně můžeme sledovat i chování jednotlivých workerů při zpracovávání vstupů a generování výsledků:

Worker A: working, received parameter: X
Worker A: done
Worker B: working, received parameter: XA
Worker B: done
Worker C: working, received parameter: XAB
Worker C: done
Worker A: working, received parameter: XABC
Worker A: done
Worker B: working, received parameter: XABCA
Worker B: done
Worker C: working, received parameter: XABCAB
Worker C: done
Worker A: working, received parameter: Y
Worker A: done
Worker B: working, received parameter: YA
Worker B: done
Worker C: working, received parameter: YAB
Worker C: done
Worker A: working, received parameter: YABC
Worker A: done
Worker B: working, received parameter: YABCA
Worker B: done
Worker C: working, received parameter: YABCAB
Worker C: done
Worker A: working, received parameter: Z
Worker A: done
Worker B: working, received parameter: ZA
Worker B: done
Worker C: working, received parameter: ZAB
Worker C: done
Worker A: working, received parameter: ZABC
Worker A: done
Worker B: working, received parameter: ZABCA
Worker B: done
Worker C: working, received parameter: ZABCAB
Worker C: done

18. Nástroj dramatiq-dashboard pro sledování front

Jedním z pomocných nástrojů, o němž se v dnešním článku ve stručnosti zmíníme, je nástroj nazvaný Dramatiq Dashboard. Jedná se o obdobu nástroje rq-dashboard, který byl vyvinut pro konkurenční systém Redis Queue. Nástroj Dramatiq Dashboard je určen pro zobrazení webových stránek s informacemi o použitých frontách, připojených workerech atd. Zajímavé je, že Dramatiq Dashboard je možné zabudovat do libovolné WSGI aplikace či WSGI webového serveru, nemusí se tedy jednat o další samostatně běžící webový server.

Instalaci nástroje (či možná lépe řečeno knihovny) dramatiq_dashboard opět provedeme pomocí pip nebo pip3:

$ pip3 install --user dramatiq_dashboard
 
Collecting dramatiq_dashboard
  Downloading https://files.pythonhosted.org/packages/d9/f6/89bbc958546f18ab3207db3b52fef235528a1f87cc680ceea9001868941a/dramatiq_dashboard-0.2.2-py3-none-any.whl
Requirement already satisfied: redis<4.0,>=2.0 in ./.local/lib/python3.6/site-packages (from dramatiq_dashboard)
Requirement already satisfied: jinja2<3,>=2 in ./.local/lib/python3.6/site-packages (from dramatiq_dashboard)
Requirement already satisfied: dramatiq[redis]<2.0,>=1.6 in ./.local/lib/python3.6/site-packages (from dramatiq_dashboard)
Requirement already satisfied: MarkupSafe>=0.23 in ./.local/lib/python3.6/site-packages (from jinja2<3,>=2->dramatiq_dashboard)
Requirement already satisfied: prometheus-client<0.3,>=0.2 in ./.local/lib/python3.6/site-packages (from dramatiq[redis]<2.0,>=1.6->dramatiq_dashboard)
Installing collected packages: dramatiq-dashboard
Successfully installed dramatiq-dashboard-0.2.2

Jak již bylo řečeno v předchozím odstavci, je možné knihovnu dramatiq_dashboard integrovat do WSGI aplikace. Samotná integrace je popsána přímo na stránkách tohoto projektu a může vypadat následovně:

import bjoern
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq_dashboard import DashboardApp
 
broker = RedisBroker(host="127.0.0.1", port=6379)
dramatiq.set_broker(broker)
 
app = DashboardApp(broker=broker, prefix="")
bjoern.run(app, "127.0.0.1", 8080)
Poznámka: zde se konkrétně používá projekt Bjoern a nikoli například Gunicorn.

19. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Python byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má stále ještě doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce. Každý příklad obsahuje implementaci workera či workerů a taktéž skript pro naplánování úloh:

Příklad Skript Stručný popis Cesta
1 example01/test_worker1.py worker přijímající úlohu bez parametrů https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example01/test_worker.py
2 example01/enqueue_work.py skript pro naplánování jedné úlohy pro workera https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example01/enqueue_wor­k.py
     
3 example02/test_worker2.py worker přijímající úlohu s jedním parametrem https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example02/test_worker.py
4 example02/enqueue_work.py skript pro naplánování deseti úloh pro workera https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example02/enqueue_wor­k.py
     
5 example03/test_worker3.py worker vyhazující výjimku pro některé úlohy https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example03/test_worker.py
6 example03/enqueue_work.py skript pro naplánování deseti úloh pro workera https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example03/enqueue_wor­k.py
     
7 example04/test_worker4.py worker vyhazující výjimku pro některé úlohy https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example04/test_worker.py
8 example04/enqueue_work.py nastavení volby max_retries pro opakované posílání zprávy https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example04/enqueue_wor­k.py
     
9 example05/test_worker5.py worker vyhazující výjimku pro některé úlohy https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example05/test_worker.py
10 example05/enqueue_work.py nastavení volby min_backoff a max_backoff pro opakované posílání zprávy https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example05/enqueue_wor­k.py
     
11 example06/test_worker6.py implementace více workerů https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example06/test_worker.py
12 example06/enqueue_work.py spuštění většího množství workerů (asynchronně, současně) https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example06/enqueue_wor­k.py
     
13 example07/test_worker7.py implementace více workerů https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example07/test_worker.py
14 example07/enqueue_work.py seskupení workerů (groups) https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example07/enqueue_wor­k.py
     
15 example08/test_worker8.py implementace více workerů https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example08/test_worker.py
16 example08/enqueue_work.py nastavení úložiště pro výsledky workerů https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example08/enqueue_wor­k.py
     
17 example09/test_worker9.py implementace více workerů https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example09/test_worker.py
18 example09/enqueue_work.py kolona workerů (pipeline) https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example09/enqueue_wor­k.py
     
19 example10/test_worker10.py implementace více workerů https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example10/test_worker10­.py
20 example10/enqueue_work.py několikanásobné spuštění kolony workerů (pipeline) https://github.com/tisnik/message-queues-examples/blob/master/drama­tiq/example10/enqueue_wor­k.py

20. Odkazy na Internetu

  1. Dramatiq: simple task processing
    https://dramatiq.io/
  2. Cookbook (for Dramatiq)
    https://dramatiq.io/cookbook.html
  3. Balíček dramatiq na PyPi
    https://pypi.org/project/dramatiq/
  4. Dramatiq dashboard
    https://github.com/Bogdan­p/dramatiq_dashboard
  5. Dramatiq na Redditu
    https://www.reddit.com/r/dramatiq/
  6. A Dramatiq broker that can be used with Amazon SQS
    https://github.com/Bogdan­p/dramatiq_sqs
  7. nanomsg na GitHubu
    https://github.com/nanomsg/nanomsg
  8. Referenční příručka knihovny nanomsg
    https://nanomsg.org/v1.1.5/na­nomsg.html
  9. nng (nanomsg-next-generation)
    https://github.com/nanomsg/nng
  10. Differences between nanomsg and ZeroMQ
    https://nanomsg.org/documentation-zeromq.html
  11. NATS
    https://nats.io/about/
  12. NATS Streaming Concepts
    https://nats.io/documenta­tion/streaming/nats-streaming-intro/
  13. NATS Streaming Server
    https://nats.io/download/nats-io/nats-streaming-server/
  14. NATS Introduction
    https://nats.io/documentation/
  15. NATS Client Protocol
    https://nats.io/documenta­tion/internals/nats-protocol/
  16. NATS Messaging (Wikipedia)
    https://en.wikipedia.org/wi­ki/NATS_Messaging
  17. Stránka Apache Software Foundation
    http://www.apache.org/
  18. Informace o portu 5672
    http://www.tcp-udp-ports.com/port-5672.htm
  19. Třída MessagingHandler knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._handlers.MessagingHan­dler-class.html
  20. Třída Event knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._events.Event-class.html
  21. package stomp (Go)
    https://godoc.org/github.com/go-stomp/stomp
  22. Go language library for STOMP protocol
    https://github.com/go-stomp/stomp
  23. python-qpid-proton 0.26.0 na PyPi
    https://pypi.org/project/python-qpid-proton/
  24. Qpid Proton
    http://qpid.apache.org/proton/
  25. Using the AMQ Python Client
    https://access.redhat.com/do­cumentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/
  26. Apache ActiveMQ
    http://activemq.apache.org/
  27. Apache ActiveMQ Artemis
    https://activemq.apache.org/artemis/
  28. Apache ActiveMQ Artemis User Manual
    https://activemq.apache.or­g/artemis/docs/latest/index­.html
  29. KahaDB
    http://activemq.apache.or­g/kahadb.html
  30. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html
  31. Command Line Tools (Apache ActiveMQ)
    https://activemq.apache.org/activemq-command-line-tools-reference.html
  32. stomp.py 4.1.21 na PyPi
    https://pypi.org/project/stomp.py/
  33. Stomp Tutorial
    https://access.redhat.com/do­cumentation/en-US/Fuse_Message_Broker/5.5/html/Con­nectivity_Guide/files/FMBCon­nectivityStompTelnet.html
  34. Heartbeat (computing)
    https://en.wikipedia.org/wi­ki/Heartbeat_(computing)
  35. Apache Camel
    https://camel.apache.org/
  36. Red Hat Fuse
    https://developers.redhat­.com/products/fuse/overvi­ew/
  37. Confusion between ActiveMQ and ActiveMQ-Artemis?
    https://serverfault.com/qu­estions/873533/confusion-between-activemq-and-activemq-artemis
  38. Staré stránky projektu HornetQ
    http://hornetq.jboss.org/
  39. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  40. Difference between ActiveMQ vs Apache ActiveMQ Artemis
    http://activemq.2283324.n4­.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html
  41. Microservices communications. Why you should switch to message queues
    https://dev.to/matteojoli­veau/microservices-communications-why-you-should-switch-to-message-queues–48ia
  42. Stomp.py 4.1.19 documentation
    https://stomppy.readthedoc­s.io/en/stable/
  43. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  44. ØMQ – Distributed Messaging
    http://zeromq.org/
  45. ØMQ Community
    http://zeromq.org/community
  46. Get The Software
    http://zeromq.org/intro:get-the-software
  47. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  48. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  49. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  50. ZeroMQ RFC
    https://rfc.zeromq.org/
  51. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  52. zeromq/czmq
    https://github.com/zeromq/czmq
  53. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  54. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  55. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  56. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  57. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  58. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  59. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  60. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  61. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  62. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  63. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  64. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  65. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  66. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  67. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  68. Python binding
    http://zeromq.org/bindings:python
  69. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  70. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  71. About Nanomsg
    https://nanomsg.org/
  72. Advanced Message Queuing Protocol
    https://www.amqp.org/
  73. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  74. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  75. RabbitMQ
    https://www.rabbitmq.com/
  76. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  77. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  78. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  79. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  80. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  81. Erlang
    http://www.erlang.org/
  82. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  83. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  84. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  85. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  86. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  87. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  88. celery na PyPi
    https://pypi.org/project/celery/
  89. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  90. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  91. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  92. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  93. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  94. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  95. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  96. node-celery
    https://github.com/mher/node-celery
  97. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  98. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  99. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  100. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  101. Stránky projektu Redis
    https://redis.io/
  102. Introduction to Redis
    https://redis.io/topics/introduction
  103. Try Redis
    http://try.redis.io/
  104. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  105. Python Redis
    https://redislabs.com/lp/python-redis/
  106. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  107. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  108. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  109. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  110. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  111. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  112. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  113. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  114. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  115. Redis Persistence
    https://redis.io/topics/persistence
  116. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  117. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  118. Ost (knihovna)
    https://github.com/soveran/ost
  119. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  120. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  121. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  122. What Is Sharding?
    https://btcmanager.com/what-sharding/
  123. Redis clients
    https://redis.io/clients
  124. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  125. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  126. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  127. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  128. Resque
    https://github.com/resque/resque
  129. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  130. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  131. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  132. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  133. Pub/Sub
    https://redis.io/topics/pubsub
  134. ZeroMQ distributed messaging
    http://zeromq.org/
  135. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  136. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  137. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  138. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  139. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  140. Redis Protocol specification
    https://redis.io/topics/protocol
  141. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  142. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  143. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  144. Apache Kafka
    https://kafka.apache.org/
  145. Iron
    http://www.iron.io/mq
  146. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  147. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  148. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  149. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  150. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  151. Enqueueing internals
    http://python-rq.org/contrib/
  152. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  153. Queue – A thread-safe FIFO implementation
    https://pymotw.com/2/Queue/
  154. Queues
    http://queues.io/
  155. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  156. RestMQ
    http://restmq.com/
  157. ActiveMQ
    http://activemq.apache.org/
  158. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  159. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  160. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  161. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  162. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  163. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  164. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  165. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  166. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  167. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  168. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  169. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  170. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  171. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  172. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  173. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  174. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  175. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  176. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  177. Jupyter
    https://jupyter.org/
  178. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html