Hlavní navigace

Souběžné a paralelně běžící úlohy naprogramované v Pythonu (2)

22. 3. 2022
Doba čtení: 35 minut

Sdílet

 Autor: Depositphotos
Dnes si povíme, jak zajistit souběžné či paralelní zpracování úloh s využitím několika různých technologií. Bude se jednat o třídy ThreadPoolExecutor a ProcessPoolExecutor, ovšem zmíníme se i o korutinách a konstrukcích async a await.

Obsah

1. Souběžné a paralelně běžící úlohy naprogramované v Pythonu (2)

2. Třída ThreadPoolExecutor

3. Omezení celkového množství vláken dostupného workerům

4. Návratové hodnoty z jednotlivých úloh

5. Získání vypočtených hodnot

6. Od ThreadPoolExecutor k ProcessPoolExecutor

7. Použití třídy ProcessPoolExecutor

8. Získání výsledků z paralelně běžících procesů

9. Souběžně běžící úlohy a konstrukce asyncawait

10. Základní použití konstrukcí asyncawait

11. Vytvoření a spuštění dvou korutin s čekáním na jejich dokončení

12. Získání výsledků z korutin

13. Fronta jako komunikační kanál

14. Postupné (synchronní) spouštění úloh

15. Asynchronní spouštění úloh ve čtyřech korutinách

16. Knihovna aiohttp – asynchronní operace přes protokol HTTP

17. Příklad použití knihovny aiohttp

18. Doba trvání dotazů vs. celková doba běhu programu

19. Repositář s demonstračními příklady

20. Odkazy na Internetu

1. Souběžné a paralelně běžící úlohy naprogramované v Pythonu (2)

Na článek „Souběžné a paralelně běžící úlohy naprogramované v Pythonu“ dnes navážeme. Ukážeme si totiž, jakým způsobem je možné zajistit souběžné či dokonce paralelní zpracování úloh s využitím několika navzájem zcela odlišných technologií. V první části článku bude popsána třída ThreadPoolExecutor (souběžný běh několika vláken), v části další pak ProcessPoolExecutor (paralelní běh několika procesů) a konečně se ve třetí části článku zmíníme o knihovnách asyncio a aiohttp, s nimiž do značné míry souvisejí i programové konstrukce async a await. Pochopitelně si řekneme i to, jak lze zajistit předávání parametrů úlohám a získávání jejich výsledků.

Poznámka: na tomto místě je vhodné znovu upozornit na fakt, že v Pythonu sice můžeme používat všechny dále popisované balíčky a knihovny, ovšem skutečný paralelní (a nezávislý) běh několika vláken je ve standardním CPythonu do značné míry omezen kvůli existenci techniky zvané GIL neboli Global Interpreter Lock (viz též příslušnou stránku). V mnoha projektech, zejména tehdy, pokud převažují I/O operace, to vadit nemusí, ale při požadavku na rychlé a paralelní výpočty již ano.

2. Třída ThreadPoolExecutor

V úvodní části dnešního článku si popíšeme třídu nazvanou ThreadPoolExecutor, kterou lze s výhodou využít ve chvíli, kdy se mají často spouštět různé asynchronně běžící úlohy. V případě, že se takové úlohy spouštějí často a navíc pokud je délka jejich trvání relativně krátká, může být limitujícím faktorem čas na vytvoření zcela nového vlákna. Navíc pokud si programátor vlákna vytváří sám, neexistuje centrální mechanismus pro omezení jejich počtu atd. Oba tyto problémy jsou řešeny třídou ThreadPoolExecutor z balíčku concurrent.futures.thread. Při konstrukci této třídy je možné zvolit, kolik vláken může běžet současně a následně jsou tato vlákna přidělována jednotlivým úlohám. Zcela typický příklad použití pro tři vlákna a tři úlohy může vypadat následovně – povšimněte si použití idomu založeného na konstrukci with:

from concurrent.futures.thread import ThreadPoolExecutor
import time
 
 
def worker(threadName, delay, n):
    for counter in range(1, n + 1):
        time.sleep(delay)
        print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time())))
 
 
with ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(worker, "Thread-1", 0.5, 10)
    executor.submit(worker, "Thread-2", 1.0, 10)
    executor.submit(worker, "Thread-3", 1.5, 10)
 
 
print("Done!")

Po spuštění tohoto demonstračního příkladu je patrné, že se skutečně využívá trojice vláken:

$ python3 thread_pool_1.py 
 
Thread-1: 1/10 - Mon Mar 14 18:14:52 2022
Thread-2: 1/10 - Mon Mar 14 18:14:52 2022
Thread-1: 2/10 - Mon Mar 14 18:14:52 2022
Thread-3: 1/10 - Mon Mar 14 18:14:53 2022
Thread-1: 3/10 - Mon Mar 14 18:14:53 2022
Thread-2: 2/10 - Mon Mar 14 18:14:53 2022
Thread-1: 4/10 - Mon Mar 14 18:14:53 2022
Thread-1: 5/10 - Mon Mar 14 18:14:54 2022
Thread-2: 3/10 - Mon Mar 14 18:14:54 2022
Thread-3: 2/10 - Mon Mar 14 18:14:54 2022
Thread-1: 6/10 - Mon Mar 14 18:14:54 2022
Thread-1: 7/10 - Mon Mar 14 18:14:55 2022
Thread-2: 4/10 - Mon Mar 14 18:14:55 2022
Thread-1: 8/10 - Mon Mar 14 18:14:55 2022
Thread-3: 3/10 - Mon Mar 14 18:14:56 2022
Thread-1: 9/10 - Mon Mar 14 18:14:56 2022
Thread-2: 5/10 - Mon Mar 14 18:14:56 2022
Thread-1: 10/10 - Mon Mar 14 18:14:56 2022
Thread-2: 6/10 - Mon Mar 14 18:14:57 2022
Thread-3: 4/10 - Mon Mar 14 18:14:57 2022
Thread-2: 7/10 - Mon Mar 14 18:14:58 2022
Thread-3: 5/10 - Mon Mar 14 18:14:59 2022
Thread-2: 8/10 - Mon Mar 14 18:14:59 2022
Thread-3: 6/10 - Mon Mar 14 18:15:00 2022
Thread-2: 9/10 - Mon Mar 14 18:15:00 2022
Thread-2: 10/10 - Mon Mar 14 18:15:01 2022
Thread-3: 7/10 - Mon Mar 14 18:15:02 2022
Thread-3: 8/10 - Mon Mar 14 18:15:03 2022
Thread-3: 9/10 - Mon Mar 14 18:15:05 2022
Thread-3: 10/10 - Mon Mar 14 18:15:06 2022
Done!

3. Omezení celkového množství vláken dostupného workerům

Nyní si zkusme předchozí příklad nepatrně upravit, a to takovým způsobem, že zvětšíme množství dostupných vláken na deset. Taktéž zvýšíme celkové množství úloh, které se mají souběžně vykonat, a to taktéž na hodnotu deset. Takto upravený příklad bude vypadat následovně:

from concurrent.futures.thread import ThreadPoolExecutor
import time
 
 
def worker(threadName, delay, n):
    for counter in range(1, n + 1):
        time.sleep(delay)
        print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time())))
    print("{}: DONE!".format(threadName))
 
 
workers = 10
 
with ThreadPoolExecutor(max_workers=workers) as executor:
    for w in range(workers):
        executor.submit(worker, "Thread-{}".format(w + 1), 0.5 + w / 10.0, 10)
 
 
print("Done!")

Po spuštění demonstračního příkladu je patrné, že každému dostupnému vláknu byla přiřazena přesně jedna úloha a že všechna vlákna byla spuštěna souběžně:

Thread-1: 1/10 - Mon Mar 14 18:16:59 2022
Thread-2: 1/10 - Mon Mar 14 18:16:59 2022
Thread-3: 1/10 - Mon Mar 14 18:16:59 2022
Thread-4: 1/10 - Mon Mar 14 18:16:59 2022
Thread-5: 1/10 - Mon Mar 14 18:16:59 2022
Thread-1: 2/10 - Mon Mar 14 18:16:59 2022
...
...
...
Thread-7: DONE!
Thread-10: 8/10 - Mon Mar 14 18:17:09 2022
Thread-9: 9/10 - Mon Mar 14 18:17:10 2022
Thread-8: 10/10 - Mon Mar 14 18:17:10 2022
Thread-8: DONE!
Thread-10: 9/10 - Mon Mar 14 18:17:11 2022
Thread-9: 10/10 - Mon Mar 14 18:17:11 2022
Thread-9: DONE!
Thread-10: 10/10 - Mon Mar 14 18:17:12 2022
Thread-10: DONE!
Done!

Nástrojem time lze snadno zjistit, že celý skript byl dokončen za přibližně 14 sekund:

real    0m14,058s
user    0m0,051s
sys     0m0,010s

V praxi je ovšem situace, kdy počet dostupných (resp. přesněji řečeno volných) vláken přesně odpovídá počtu úloh, které se mají zpracovat, dosti nepravděpodobná. Často se setkáme se situací, kdy je množství vláken pevně dané a typicky odvozené od počtu jader mikroprocesoru. Pokud bude počet úloh přesahovat počet vláken, budou některé úlohy (pochopitelně) čekat na dokončení předchozích úloh, tedy na situaci, kdy se nějaké vlákno uvolní a vrátí do poolu. Podívejme se tedy nyní na způsob naplánování deseti úloh, z nichž ovšem souběžně poběží maximálně pouze tři:

from concurrent.futures.thread import ThreadPoolExecutor
import time
 
 
def worker(threadName, delay, n):
    for counter in range(1, n + 1):
        time.sleep(delay)
        print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time())))
    print("{}: DONE!".format(threadName))
 
 
workers = 10
 
with ThreadPoolExecutor(max_workers=3) as executor:
    for w in range(workers):
        executor.submit(worker, "Thread-{}".format(w + 1), 0.5 + w / 10.0, 10)
 
 
print("Done!")

Nyní skutečně souběžně poběží pouze tři úlohy (tedy budou se opakovat řádky se jmény Thread-1 až Thread-3):

Thread-1: 1/10 - Mon Mar 14 18:17:42 2022
Thread-2: 1/10 - Mon Mar 14 18:17:42 2022
Thread-3: 1/10 - Mon Mar 14 18:17:42 2022
Thread-1: 2/10 - Mon Mar 14 18:17:43 2022
Thread-2: 2/10 - Mon Mar 14 18:17:43 2022
Thread-3: 2/10 - Mon Mar 14 18:17:43 2022
...
...
...
Thread-10: 2/10 - Mon Mar 14 18:18:08 2022
Thread-8: 10/10 - Mon Mar 14 18:18:09 2022
Thread-8: DONE!
Thread-9: 8/10 - Mon Mar 14 18:18:09 2022
Thread-10: 3/10 - Mon Mar 14 18:18:10 2022
Thread-9: 9/10 - Mon Mar 14 18:18:10 2022
Thread-10: 4/10 - Mon Mar 14 18:18:11 2022
Thread-9: 10/10 - Mon Mar 14 18:18:12 2022
Thread-9: DONE!
Thread-10: 5/10 - Mon Mar 14 18:18:13 2022
Thread-10: 6/10 - Mon Mar 14 18:18:14 2022
Thread-10: 7/10 - Mon Mar 14 18:18:15 2022
Thread-10: 8/10 - Mon Mar 14 18:18:17 2022
Thread-10: 9/10 - Mon Mar 14 18:18:18 2022
Thread-10: 10/10 - Mon Mar 14 18:18:20 2022
Thread-10: DONE!
Done!

A navíc budou (opět podle očekávání) všechny úlohy dokončeny až po delší době, zde konkrétně namísto 14 sekund až za 38 sekund:

real    0m38,084s
user    0m0,041s
sys     0m0,013s

4. Návratové hodnoty z jednotlivých úloh

Úlohy spouštěné ve vláknech získaných z poolu mnohdy musí vracet výsledky své činnosti (tedy výpočtů atd.). Vzhledem k tomu, že každá úloha je reprezentována běžnou funkcí, můžeme se samozřejmě pokusit vrátit hodnotu či hodnoty příkazem return. Ovšem funkce představující jednu úlohu se nevolá přímo (to by pochopitelně neběžela souběžně, ale přímo v hlavním vláknu). Úlohy voláme, resp. přesněji řečeno plánujeme, přes ThreadPoolExecutor.Submit. Pokusme se tedy zjistit, jakou hodnotu vlastně tímto způsobem získáme a zda se bude skutečně jednat o návratovou hodnotu dokončené úlohy:

from concurrent.futures.thread import ThreadPoolExecutor
import time
 
 
def worker(threadName, delay, n):
    result = 0
 
    for counter in range(1, n + 1):
        time.sleep(delay)
        print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time())))
        result += delay
 
    print("{}: DONE!".format(threadName))
    return result
 
 
workers = 10
 
with ThreadPoolExecutor(max_workers=3) as executor:
    for w in range(workers):
        result = executor.submit(worker, "Thread-{}".format(w + 1), 0.5 + w / 10.0, 10)
        print(w, result)
 
 
print("Done!")

Pokud tento příklad spustíme, vypíše se (alespoň většinou) nejdříve těchto deset řádků:

0 <Future at 0x7f0e8162bc40 state=running>
1 <Future at 0x7f0e814f6a00 state=running>
2 <Future at 0x7f0e814f6d90 state=running>
3 <Future at 0x7f0e814fe190 state=pending>
4 <Future at 0x7f0e814fe310 state=pending>
5 <Future at 0x7f0e814fe430 state=pending>
6 <Future at 0x7f0e814fe550 state=pending>
7 <Future at 0x7f0e814fe670 state=pending>
8 <Future at 0x7f0e814fe790 state=pending>
9 <Future at 0x7f0e814fe8b0 state=pending>

Následuje již běžné spuštění našich deseti úloh:

Thread-1: 1/10 - Wed Mar 16 09:12:52 2022
Thread-2: 1/10 - Wed Mar 16 09:12:52 2022
Thread-3: 1/10 - Wed Mar 16 09:12:52 2022
Thread-1: 2/10 - Wed Mar 16 09:12:53 2022
Thread-2: 2/10 - Wed Mar 16 09:12:53 2022
Thread-3: 2/10 - Wed Mar 16 09:12:53 2022
Thread-1: 3/10 - Wed Mar 16 09:12:53 2022
Thread-2: 3/10 - Wed Mar 16 09:12:54 2022
Thread-1: 4/10 - Wed Mar 16 09:12:54 2022
...
...
...

Ovšem nás bude v tuto chvíli zajímat prvních deset řádků, které odpovídají hodnotám vraceným z:

result = executor.submit(worker, "Thread-{}".format(w + 1), 0.5 + w / 10.0, 10)
print(w, result)

Toto volání tedy nevrátilo přímo výsledek úlohy, což je ovšem očekávatelné – na výsledky bychom museli postupně čekat a opět by se nejednalo o souběžné výpočty. Namísto toho je vrácena hodnota, které se říká future. Co se však vlastně pod pojmem future skrývá? Kromě názvu jednoho typu finančního derivátu :-) představuje future(s) výpočet, který běží asynchronně (tedy většinou souběžně nebo paralelně) k hlavnímu vláknu aplikace. Uživatel pouze daný výpočet spustí a teprve ve chvíli, kdy potřebuje pracovat s výsledkem tohoto výpočtu, začne systém řešit, jakým způsobem má asynchronní výpočet ukončit, tj. jak má provést synchronizaci obou vláken nebo získání výsledku z nějaké k tomu určené (synchronizované) datové struktury. V ideálním případě je výpočet již dokončen, takže se přímo použije zaznamenaný výsledek, v případě opačném se až při čtení výsledku počká na dokončení výpočtu. V obou případech však operujeme s výslednou hodnotou nepřímo.

5. Získání vypočtených hodnot

V případě, že budeme chtít získat skutečně vypočtené hodnoty (a to většinou budeme požadovat), je nutné postupovat nepatrně odlišným způsobem. Nejdříve se vytvoří kontejner určený pro uložení výsledků:

results = []

Dále všechny úlohy spustíme a zapamatujeme si vrácené futures (tedy „obaly“ pro později vypočtené hodnoty):

for w in range(workers):
    result = executor.submit(worker, "Thread-{}".format(w + 1), 0.5 + w / 10.0, 10)
    results.append(result)

Následně můžeme čekat na dokončení všech úloh. Nejedná se o ideální řešení, lepší by pravděpodobně bylo využití fronty (viz další text):

for result in results:
    print(result.result())

Úplný zdrojový kód takto upraveného demonstračního příkladu vypadá následovně:

from concurrent.futures.thread import ThreadPoolExecutor
import time
 
 
def worker(threadName, delay, n):
    result = 0
 
    for counter in range(1, n + 1):
        time.sleep(delay)
        print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time())))
        result += delay
 
    print("{}: DONE!".format(threadName))
    return result
 
 
workers = 10
 
results = []
 
with ThreadPoolExecutor(max_workers=3) as executor:
    for w in range(workers):
        result = executor.submit(worker, "Thread-{}".format(w + 1), 0.5 + w / 10.0, 10)
        results.append(result)
        print(w, result)
 
 
print("Computing finished")
 
for result in results:
    print(result.result())
 
print("Done!")

Po spuštění se nejdříve vypíšou hodnoty (futures) vrácené po zavolání executor.submit:

0 <Future at 0x7f26e648bc40 state=running>
1 <Future at 0x7f26e6357a00 state=running>
2 <Future at 0x7f26e6357d90 state=running>
3 <Future at 0x7f26e635e190 state=pending>
4 <Future at 0x7f26e635e310 state=pending>
5 <Future at 0x7f26e635e430 state=pending>
6 <Future at 0x7f26e635e550 state=pending>
7 <Future at 0x7f26e635e670 state=pending>
8 <Future at 0x7f26e635e790 state=pending>
9 <Future at 0x7f26e635e8b0 state=pending>

Ihned poté by se měly zobrazit průběžné informace o probíhajících výpočtech:

Thread-1: 1/10 - Wed Mar 16 09:13:41 2022
Thread-2: 1/10 - Wed Mar 16 09:13:41 2022
Thread-3: 1/10 - Wed Mar 16 09:13:41 2022
Thread-1: 2/10 - Wed Mar 16 09:13:41 2022
Thread-2: 2/10 - Wed Mar 16 09:13:41 2022
Thread-3: 2/10 - Wed Mar 16 09:13:42 2022
Thread-1: 3/10 - Wed Mar 16 09:13:42 2022

Dokončení všech výpočtů:

Thread-9: 10/10 - Wed Mar 16 09:14:10 2022
Thread-9: DONE!
Thread-10: 5/10 - Wed Mar 16 09:14:11 2022
Thread-10: 6/10 - Wed Mar 16 09:14:13 2022
Thread-10: 7/10 - Wed Mar 16 09:14:14 2022
Thread-10: 8/10 - Wed Mar 16 09:14:15 2022
Thread-10: 9/10 - Wed Mar 16 09:14:17 2022
Thread-10: 10/10 - Wed Mar 16 09:14:18 2022
Thread-10: DONE!
Computing finished

A následně se zobrazí hodnoty získané po čekání na dokončení výpočtů:

5.0
5.999999999999999
7.000000000000001
7.999999999999999
9.000000000000002
10.0
10.999999999999998
11.999999999999998
13.000000000000002
14.000000000000002
Done!

Zdrojový kód předchozího příkladu je možné přepsat do idiomatičtější formy následovně:

from concurrent.futures.thread import ThreadPoolExecutor
import time
 
 
def worker(threadName, delay, n):
    result = 0
 
    for counter in range(1, n + 1):
        time.sleep(delay)
        print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time())))
        result += delay
 
    print("{}: DONE!".format(threadName))
    return result
 
 
workers = 10
 
with ThreadPoolExecutor(max_workers=3) as executor:
    results = [executor.submit(worker, "Thread-{}".format(w + 1), 0.5 + w / 10.0, 10) for w in range(workers)]
 
print("Computing finished")
 
for result in results:
    print(result.result())

print("Done!")

Zprávy vypisované při spuštění:

Process-1: 1/10 - Wed Mar 16 09:17:49 2022
Process-2: 1/10 - Wed Mar 16 09:17:49 2022
Process-3: 1/10 - Wed Mar 16 09:17:49 2022
Process-4: 1/10 - Wed Mar 16 09:17:49 2022
Process-5: 1/10 - Wed Mar 16 09:17:49 2022
Process-1: 2/10 - Wed Mar 16 09:17:49 2022

Dokončení úloh:

Process-10: DONE!
Computing finished

Výpis získaných výsledků:

5.0
5.999999999999999
7.000000000000001
7.999999999999999
9.000000000000002
10.0
10.999999999999998
11.999999999999998
13.000000000000002
14.000000000000002
Done!

6. Od ThreadPoolExecutor k ProcessPoolExecutor

Již v úvodním článku jsme si řekli, že souběžné vykonání úloh (resp. operací) lze realizovat mnoha různými způsoby, zejména použitím korutin, dále s využitím vláken (threads) nebo spuštěním většího množství procesů (processes) spravovaných operačním systémem. V předchozích kapitolách byl ukázán jeden ze způsobů využití vláken získávaných z poolu s omezenou kapacitou. Prakticky stejný koncept však můžeme použít i ve chvíli, kdy mají být jednotlivé výpočty realizovány v samostatně běžících procesech. Pro tento účel se používá třída nazvaná ProcessPoolExecutor z balíčku concurrent.futures. V navazujících kapitolách si ukážeme základní způsoby využití této třídy.

Poznámka: připomeňme si, že tímto způsobem můžeme obejít GIL, ovšem za tu cenu, že bude spuštěno větší množství virtuálních strojů Pythonu, což je náročnější s ohledem na dostupné systémové prostředky.

7. Použití třídy ProcessPoolExecutor

Základní způsob použití třídy ProcessPoolExecutor se (z pohledu programátora) vlastně žádným zásadním způsobem neodlišuje od použití třídy ThreadPoolExecutor. Ostatně následující demonstrační příklad vypadá velmi podobně, jako příklady uvedené v předchozích kapitolách:

from concurrent.futures import ProcessPoolExecutor
import time
 
 
def worker(processName, delay, n):
    for counter in range(1, n + 1):
        time.sleep(delay)
        print("{}: {}/{} - {}".format(processName, counter, n, time.ctime(time.time())))
 
 
with ProcessPoolExecutor(max_workers=3) as executor:
    executor.submit(worker, "Process-1", 0.5, 10)
    executor.submit(worker, "Process-2", 1.0, 10)
    executor.submit(worker, "Process-3", 1.5, 10)
 
 
print("Done!")

Po spuštění je patrné, že úlohy jsou vykonány v dalších třech procesech:

Process-1: 1/10 - Wed Mar 16 09:16:53 2022
Process-2: 1/10 - Wed Mar 16 09:16:53 2022
Process-1: 2/10 - Wed Mar 16 09:16:53 2022
Process-3: 1/10 - Wed Mar 16 09:16:54 2022
Process-1: 3/10 - Wed Mar 16 09:16:54 2022
Process-2: 2/10 - Wed Mar 16 09:16:54 2022
Process-1: 4/10 - Wed Mar 16 09:16:54 2022
Process-1: 5/10 - Wed Mar 16 09:16:55 2022
Process-3: 2/10 - Wed Mar 16 09:16:55 2022
Process-2: 3/10 - Wed Mar 16 09:16:55 2022
...
...
...

O tom, kolik procesů je vlastně spuštěno, se můžeme velmi snadno přesvědčit:

$ ps ax |grep python

V mém konkrétním případě je nutné ignorovat první tři procesy, které s příkladem nijak nesouvisí. Důležitý je až proces s PID 671007, který spustil další tři procesy:

    598 ?        Ss     0:00 /usr/bin/python3 /usr/bin/networkd-dispatcher --run-startup-triggers
   1613 ?        S      0:01 /usr/bin/python3 /usr/share/system-config-printer/applet.py
   1653 ?        S      0:00 python3 /usr/lib/blueberry/safechild /usr/sbin/rfkill event
 671007 pts/1    Sl+    0:00 python3 process_pool_1.py
 671008 pts/1    S+     0:00 python3 process_pool_1.py
 671009 pts/1    S+     0:00 python3 process_pool_1.py
 671010 pts/1    S+     0:00 python3 process_pool_1.py

8. Získání výsledků z paralelně běžících procesů

I třída ProcessPoolExecutor, podobně jako třída ThreadPoolExecutor, podporuje přístup k výsledkům úloh s využitím future(s) (interně se ovšem musí používat odlišné techniky, od nichž jsme jako programátoři odstíněni). Podívejme se tedy, jakým způsobem lze získat (a ihned vypsat) hodnoty typu future:

from concurrent.futures import ProcessPoolExecutor
import time
 
 
def worker(processName, delay, n):
    result = 0
 
    for counter in range(1, n + 1):
        time.sleep(delay)
        print("{}: {}/{} - {}".format(processName, counter, n, time.ctime(time.time())))
        result += delay
 
    print("{}: DONE!".format(processName))
    return result
 
 
workers = 10
 
with ProcessPoolExecutor(max_workers=workers) as executor:
    for w in range(workers):
        print(executor.submit(worker, "Process-{}".format(w + 1), 0.5 + w / 10.0, 10))
 
print("Computing finished")
 
print("Done!")

Chování tohoto demonstračního příkladu po jeho spuštění:

<Future at 0x7f875948c340 state=running>
<Future at 0x7f8759493370 state=pending>
<Future at 0x7f8759493820 state=pending>
<Future at 0x7f8759493940 state=pending>
<Future at 0x7f8759493a60 state=pending>
<Future at 0x7f8759493b80 state=pending>
<Future at 0x7f8759493cd0 state=pending>
<Future at 0x7f8759493e80 state=pending>
<Future at 0x7f8759493fa0 state=pending>
<Future at 0x7f8758417100 state=pending>
Process-1: 1/10 - Wed Mar 16 09:17:24 2022
Process-2: 1/10 - Wed Mar 16 09:17:24 2022
Process-3: 1/10 - Wed Mar 16 09:17:24 2022
Process-4: 1/10 - Wed Mar 16 09:17:24 2022
Process-5: 1/10 - Wed Mar 16 09:17:25 2022
Process-1: 2/10 - Wed Mar 16 09:17:25 2022
...
...
...

Pro vyhodnocení výsledků je nutné počkat na dokončení příslušných úloh a poté nad objekty/hodnotami typu future zavolat metodu result (což v případě, že úloha není ještě dokončena, zajistí čekání na její dokončení):

from concurrent.futures import ProcessPoolExecutor
import time
 
 
def worker(processName, delay, n):
    result = 0
 
    for counter in range(1, n + 1):
        time.sleep(delay)
        print("{}: {}/{} - {}".format(processName, counter, n, time.ctime(time.time())))
        result += delay
 
    print("{}: DONE!".format(processName))
    return result
 
 
workers = 10
 
with ProcessPoolExecutor(max_workers=workers) as executor:
    results = [executor.submit(worker, "Process-{}".format(w + 1), 0.5 + w / 10.0, 10) for w in range(workers)]
 
print("Computing finished")
 
for result in results:
    print(result.result())
 
print("Done!")

Otestování příkladu:

Process-1: 1/10 - Mon Mar 21 15:46:06 2022
Process-2: 1/10 - Mon Mar 21 15:46:06 2022
Process-3: 1/10 - Mon Mar 21 15:46:06 2022
Process-4: 1/10 - Mon Mar 21 15:46:06 2022
Process-5: 1/10 - Mon Mar 21 15:46:06 2022
Process-1: 2/10 - Mon Mar 21 15:46:06 2022
Process-6: 1/10 - Mon Mar 21 15:46:06 2022
Process-7: 1/10 - Mon Mar 21 15:46:07 2022
Process-2: 2/10 - Mon Mar 21 15:46:07 2022
Process-8: 1/10 - Mon Mar 21 15:46:07 2022
Process-9: 1/10 - Mon Mar 21 15:46:07 2022
Process-3: 2/10 - Mon Mar 21 15:46:07 2022
Process-10: 1/10 - Mon Mar 21 15:46:07 2022
Process-1: 3/10 - Mon Mar 21 15:46:07 2022
Process-4: 2/10 - Mon Mar 21 15:46:07 2022
...
...
...
Process-8: 10/10 - Mon Mar 21 15:46:17 2022
Process-8: DONE!
Process-10: 9/10 - Mon Mar 21 15:46:18 2022
Process-9: 10/10 - Mon Mar 21 15:46:18 2022
Process-9: DONE!
Process-10: 10/10 - Mon Mar 21 15:46:19 2022
Process-10: DONE!
Computing finished
5.0
5.999999999999999
7.000000000000001
7.999999999999999
9.000000000000002
10.0
10.999999999999998
11.999999999999998
13.000000000000002
14.000000000000002
Done!

9. Souběžně běžící úlohy a konstrukce asyncawait

V některých situacích je možné dosáhnout zvýšení efektivity aplikace (například zvýšit počet odpovědí, které může server vygenerovat za určitou časovou jednotku) a přitom není možné či vhodné využívat přímočaré ale dosti nízkoúrovňové řešení založené na použití většího množství vláken spravovaných systémem. Naprosto typickým příkladem jsou některé virtuální stroje JavaScriptu, které povětšinou umožňují běh aplikace v jediném vláknu.

Pokud aplikace intenzivně používá I/O, tedy například přístup k datům přes HTTP(s), volání DB operací atd., lze namísto více vláken využít korutiny, což jsou funkce transformované takovým způsobem, že mohou běžet souběžně, nikoli však nutně paralelně. Existuje mnoho způsobů konstrukce a volání korutin. V Pythonu se (vedle zavedeného slova yield) nově používají dvě slova async a await, přičemž async je klíčové slovo umožňující transformaci funkce do korutiny a await je klíčové slovo pro čekání na dokončení korutiny se získáním její návratové hodnoty.

Poznámka: v dalším textu si ukážeme pouze základy použití async a await společně se standardní knihovnou asyncio, popř. s knihovnou aiohttp. Jedná se však o poměrně důležité a rozsáhlé téma, kterému bude věnován samostatný článek.

10. Základní použití konstrukcí async a await

První pokus o vytvoření úlohy, která bude reprezentována korutinou, může vypadat následovně. Úloha je získána transformací funkce task do korutiny, kterou posléze voláme z funkce main:

import asyncio
import time
 
 
async def task():
    print("task started")
    await asyncio.sleep(5)
    print("task finished")
 
 
def main():
    task1 = asyncio.create_task(task())
    print("task created")
 
    await task1
 
    print("done")
 
 
main()

Tento zápis však není syntakticky korektní, na což nás Python upozorní. Problém spočívá v tom, že konstrukci await lze použít pouze ve funkci ztransformované na korutinu slovem async:

  File "async_await_1.py", line 15
    await task1
    ^
SyntaxError: 'await' outside async function

Korektní zápis bude vypadat následovně. Povšimněte si, že korutinu main (tedy nikoli již funkci) musíme volat nepřímo přes asyncio.run:

import asyncio
import time
 
 
async def task():
    print("task started")
    await asyncio.sleep(5)
    print("task finished")
 
 
async def main():
    task1 = asyncio.create_task(task())
    print("task created")
 
    await task1
 
    print("done")
 
 
asyncio.run(main())

Nyní by již výsledek měl odpovídat očekávání:

task created
task started
task finished
done

11. Vytvoření a spuštění dvou korutin s čekáním na jejich dokončení

Ve druhém příkladu je ukázáno, jak se vytvoří a ihned spustí dvě korutiny, které simulují činnost synchronním voláním sleep. Na dokončení obou korutin se čeká v konstrukci await (přesněji řečeno se čeká na dokončení první korutiny a teprve poté na dokončení korutiny druhé):

import asyncio
import time
 
 
async def task(name):
    print(f"{name} task started")
    await asyncio.sleep(5)
    print(f"{name} task finished")
 
 
async def main():
    task1 = asyncio.create_task(task("first"))
    print("first task created")
 
    task2 = asyncio.create_task(task("second"))
    print("second task created")
 
    await task1
    await task2
 
    print("done")
 
 
asyncio.run(main())

Výsledek získaný po spuštění:

first task created
second task created
first task started
second task started
first task finished
second task finished
done

12. Získání výsledků z korutin

Konstrukci await lze použít pro získání (přečtení) výsledků z korutin, což je hodnota/hodnoty vrácené přes return. Opět si ukažme velmi jednoduchý způsob použití:

import asyncio
import time
 
 
async def task(name):
    print(f"{name} task started")
    await asyncio.sleep(5)
    print(f"{name} task finished")
    return name[::-1]
 
 
async def main():
    task1 = asyncio.create_task(task("first"))
    print("first task created")
 
    task2 = asyncio.create_task(task("second"))
    print("second task created")
 
    task3 = asyncio.create_task(task("third"))
    print("third task created")
 
    print("result of task #1:", await task1)
    print("result of task #2:", await task2)
    print("result of task #3:", await task3)
 
    print("done")
 
 
asyncio.run(main())

Jednotlivé korutiny po určité době vrátí své jméno, ovšem znaky jsou uvedeny v opačném pořadí:

first task created
second task created
third task created
first task started
second task started
third task started
first task finished
second task finished
third task finished
result of task #1: tsrif
result of task #2: dnoces
result of task #3: driht
done

13. Fronta jako komunikační kanál

I pro předávání dat mezi korutinami lze využít frontu, v tomto případě se ovšem jedná o instanci třídy asyncio.Queue. Všechny operace s touto frontou (tedy zejména operace put a get) se musí volat v konstrukci await, což je ukázáno v dalším (prozatím velmi jednoduchém) demonstračním příkladu:

import asyncio
import time
 
 
async def task(name, queue):
    while not queue.empty():
        param = await queue.get()
        print(f"Task named {name} started with parameter {param}")
        await asyncio.sleep(5)
        print(f"{name} task finished")
 
 
async def main():
    queue = asyncio.Queue()
 
    for i in range(20):
        await queue.put(i)
 
    for n in range(1, 2):
        asyncio.create_task(task(f"{n}", queue))
 
 
asyncio.run(main())

Po spuštění se nejdříve do fronty vloží několik parametrů úloh a následně se tyto úlohy spustí, a to bez čekání na jejich dokončení!:

Task named 1 started with parameter 0

14. Postupné (synchronní) spouštění úloh

Při spouštění úloh má programátor plně pod kontrolou, zda se bude jednat o spuštění asynchronní či synchronní. V dalším příkladu si sice připravíme parametry pro deset úloh, ale všechny budou vykonány postupně ve smyčce, která bude načítat parametry z fronty v jediné korutině:

import asyncio
import time
 
 
async def task(name, queue):
    while not queue.empty():
        param = await queue.get()
        print(f"Task named {name} started with parameter {param}")
        await asyncio.sleep(5)
        print(f"{name} task finished")
 
 
async def main():
    queue = asyncio.Queue()
 
    for i in range(10):
        await queue.put(i)
 
    for n in range(1, 3):
        await asyncio.gather(asyncio.create_task(task(f"{n}", queue)))
 
 
asyncio.run(main())

Po spuštění tohoto příkladu by se měly vypsat informace o tom, že veškerou práci ve skutečnosti vykonala první korutina (druhá již měla k dispozici pouze prázdnou frontu):

Task named 1 started with parameter 0
1 task finished
Task named 1 started with parameter 1
1 task finished
Task named 1 started with parameter 2
1 task finished
Task named 1 started with parameter 3
1 task finished
Task named 1 started with parameter 4
1 task finished
Task named 1 started with parameter 5
1 task finished
Task named 1 started with parameter 6
1 task finished
Task named 1 started with parameter 7
1 task finished
Task named 1 started with parameter 8
1 task finished
Task named 1 started with parameter 9
1 task finished

15. Asynchronní spouštění úloh ve čtyřech korutinách

Pro spuštění více korutin s čekáním na jejich dokončení je možné použít funkci nazvanou asyncio.gather, které se předají jednotlivé úlohy (tedy korutiny). Příklad použití čtyř korutin zpracovávajících dvacet parametrů získávaných z jediné fronty, může vypadat takto:

import asyncio
import time
 
 
async def task(name, queue):
    while not queue.empty():
        param = await queue.get()
        print(f"Task named {name} started with parameter {param}")
        await asyncio.sleep(5)
        print(f"{name} task finished")
 
 
async def main():
    queue = asyncio.Queue()
 
    for i in range(20):
        await queue.put(i)
 
    await asyncio.gather(
            asyncio.create_task(task(1, queue)),
            asyncio.create_task(task(2, queue)),
            asyncio.create_task(task(3, queue)),
            asyncio.create_task(task(4, queue)))
 
 
asyncio.run(main())

Příklad rozdělení úloh do čtyřech korutin:

Task named 1 started with parameter 0
Task named 2 started with parameter 1
Task named 3 started with parameter 2
Task named 4 started with parameter 3
1 task finished
Task named 1 started with parameter 4
2 task finished
Task named 2 started with parameter 5
3 task finished
Task named 3 started with parameter 6
4 task finished
Task named 4 started with parameter 7
1 task finished
Task named 1 started with parameter 8
2 task finished
Task named 2 started with parameter 9
3 task finished
Task named 3 started with parameter 10
4 task finished
Task named 4 started with parameter 11
1 task finished
Task named 1 started with parameter 12
2 task finished
Task named 2 started with parameter 13
3 task finished
Task named 3 started with parameter 14
4 task finished
Task named 4 started with parameter 15
1 task finished
Task named 1 started with parameter 16
2 task finished
Task named 2 started with parameter 17
3 task finished
Task named 3 started with parameter 18
4 task finished
Task named 4 started with parameter 19
1 task finished
2 task finished
3 task finished
4 task finished

16. Knihovna aiohttp – asynchronní operace přes protokol HTTP

V závěrečné části dnešního článku se zmíníme o knihovně aiohttp. Tato knihovna podporuje asynchronní operace (GET, PUT atd.) prováděné přes protokol HTTP a může tak sloužit jako alternativa ke známé knihovně Requests. Knihovnu aiohttp samozřejmě musíme před prvním použitím nainstalovat:

$ pip3 install --user aiohttp
 
Collecting aiohttp
  Downloading aiohttp-3.8.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.3 MB)
     |████████████████████████████████| 1.3 MB 834 kB/s
Collecting frozenlist>=1.1.1
  Downloading frozenlist-1.3.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (158 kB)
     |████████████████████████████████| 158 kB 936 kB/s
Collecting multidict<7.0,>=4.5
  Downloading multidict-6.0.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (121 kB)
     |████████████████████████████████| 121 kB 953 kB/s
Collecting yarl<2.0,>=1.0
  Downloading yarl-1.7.2-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (308 kB)
     |████████████████████████████████| 308 kB 987 kB/s
Collecting charset-normalizer<3.0,>=2.0
  Downloading charset_normalizer-2.0.12-py3-none-any.whl (39 kB)
Collecting attrs>=17.3.0
  Downloading attrs-21.4.0-py2.py3-none-any.whl (60 kB)
     |████████████████████████████████| 60 kB 817 kB/s
Collecting aiosignal>=1.1.2
  Downloading aiosignal-1.2.0-py3-none-any.whl (8.2 kB)
Collecting async-timeout<5.0,>=4.0.0a3
  Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB)
Requirement already satisfied: idna>=2.0 in /usr/lib/python3/dist-packages (from yarl<2.0,>=1.0->aiohttp) (2.8)
Installing collected packages: multidict, frozenlist, yarl, charset-normalizer, attrs, async-timeout, aiosignal, aiohttp
Successfully installed aiohttp-3.8.1 aiosignal-1.2.0 async-timeout-4.0.2 attrs-21.4.0 charset-normalizer-2.0.12 frozenlist-1.3.0 multidict-6.0.2 yarl-1.7.2

17. Příklad použití knihovny aiohttp

V prvním příkladu, v němž je knihovna aiohttp použita, je ukázán způsob souběžného stahování (resp. přístupu) na několik webových stránek. Jedná se o typickou úlohu, v níž mají velký význam I/O operace (čekání na odpověď) a tedy je žádoucí, aby tyto operace probíhaly souběžně. V příkladu je pro naplánování úloh použita knihovna asyncio a pro komunikaci fronta reprezentovaná třídou asyncio.Queue. Samotný přístup k webovým stránkám zajišťuje knihovna aiohttp – nejprve se vytvoří sezení (session) a následně se zavolá metoda session.get pro přístup k webové stránce:

import asyncio
import aiohttp
import time
 
 
async def download(name, queue):
    async with aiohttp.ClientSession() as session:
        while not queue.empty():
            url = await queue.get()
            print(f"Task named {name} getting URL: {url}")
            async with session.get(url) as response:
                t = await response.text()
                print(f"Task named {name} downloaded {len(t)} characters")
            print(f"Task named {name} finished")
 
 
async def main():
    queue = asyncio.Queue()
 
    for url in (
        "http://www.root.cz",
        "http://duckduckgo.com",
        "http://seznam.com",
        "https://www.root.cz/programovaci-jazyky/",
        "https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/",
        "https://github.com/"
    ):
        await queue.put(url)
 
    await asyncio.gather(
            asyncio.create_task(download(1, queue)),
            asyncio.create_task(download(2, queue)))
 
 
asyncio.run(main())

Ukázka souběžného přístupu ke dvěma stránkám po spuštění příkladu:

Task named 1 getting URL: http://www.root.cz
Task named 2 getting URL: http://duckduckgo.com
Task named 2 downloaded 5775 characters
Task named 2 finished
Task named 2 getting URL: http://seznam.com
Task named 1 downloaded 249826 characters
Task named 1 finished
Task named 1 getting URL: https://www.root.cz/programovaci-jazyky/
Task named 1 downloaded 251068 characters
Task named 1 finished
Task named 1 getting URL: https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/
Task named 2 downloaded 1555 characters
Task named 2 finished
Task named 2 getting URL: https://github.com/
Task named 2 downloaded 209166 characters
Task named 2 finished
Task named 1 downloaded 235800 characters
Task named 1 finished

18. Doba trvání dotazů vs. celková doba běhu programu

Nepatrnou úpravou předchozího zdrojového kódu si můžeme nechat vypsat dobu trvání jednotlivých dotazů i celkovou dobu běhu demonstračního příkladu. Přitom budeme předpokládat, že celková doba běhu bude kratší než součet doby vyřízení jednotlivých dotazů (resp. doby mezi posláním dotazu a získáním odpovědi). Prozatím budeme vypočtené časy pouze vypisovat na standardní výstup:

import asyncio
import aiohttp
import time
 
 
async def download(name, queue):
    async with aiohttp.ClientSession() as session:
        while not queue.empty():
            url = await queue.get()
            t1 = time.time()
            print(f"Task named {name} getting URL: {url}")
            async with session.get(url) as response:
                t = await response.text()
                t2 = time.time()
                print(f"Task named {name} downloaded {len(t)} characters in {t2-t1} seconds")
            print(f"Task named {name} finished")
 
 
async def main():
    queue = asyncio.Queue()
 
    t1 = time.time()
 
    for url in (
        "http://www.root.cz",
        "http://duckduckgo.com",
        "http://seznam.com",
        "https://www.root.cz/programovaci-jazyky/",
        "https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/",
        "https://www.root.cz/clanky/pywebio-interaktivni-webove-dialogy-a-formulare-v-cistem-pythonu/",
        "https://streamlit.io/",
        "https://pglet.io/",
        "https://www.root.cz/serialy/graficke-uzivatelske-rozhrani-v-pythonu/",
        "https://github.com/"
    ):
        await queue.put(url)
 
    await asyncio.gather(
            asyncio.create_task(download(1, queue)),
            asyncio.create_task(download(2, queue)),
            asyncio.create_task(download(3, queue)))
 
    t2 = time.time()
    print(f"Total time: {t2-t1} seconds")
 
asyncio.run(main())

A takto může vypadat výsledek:

Task named 1 getting URL: http://www.root.cz
Task named 2 getting URL: http://duckduckgo.com
Task named 3 getting URL: http://seznam.com
Task named 2 downloaded 5775 characters in 0.3052408695220947 seconds
Task named 2 finished
Task named 2 getting URL: https://www.root.cz/programovaci-jazyky/
Task named 1 downloaded 249910 characters in 0.4537777900695801 seconds
Task named 1 finished
Task named 1 getting URL: https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/
Task named 3 downloaded 1555 characters in 0.4831697940826416 seconds
Task named 3 finished
Task named 3 getting URL: https://www.root.cz/clanky/pywebio-interaktivni-webove-dialogy-a-formulare-v-cistem-pythonu/
Task named 2 downloaded 251382 characters in 0.5078389644622803 seconds
Task named 2 finished
Task named 2 getting URL: https://streamlit.io/
Task named 1 downloaded 236297 characters in 0.4007532596588135 seconds
Task named 1 finished
Task named 1 getting URL: https://pglet.io/
Task named 3 downloaded 236033 characters in 0.4225172996520996 seconds
Task named 3 finished
Task named 3 getting URL: https://www.root.cz/serialy/graficke-uzivatelske-rozhrani-v-pythonu/
Task named 2 downloaded 444341 characters in 0.42142748832702637 seconds
Task named 2 finished
Task named 2 getting URL: https://github.com/
Task named 1 downloaded 10145 characters in 0.5715913772583008 seconds
Task named 1 finished
Task named 3 downloaded 263715 characters in 0.521721601486206 seconds
Task named 3 finished
Task named 2 downloaded 209167 characters in 0.22428131103515625 seconds
Task named 2 finished
Total time: 1.4610064029693604 seconds
Poznámka: vidíme, že celková doba běhu je rovna 1,46 sekund, zatímco je evidentní, že součet času mezi jednotlivými dotazy a odpovědmi je mnohem větší.

Na závěr si ukažme ještě jednu úpravu tohoto příkladu. Nyní budeme časy mezi dotazem a odpovědí jednotlivých korutin ukládat do fronty results, což nám na závěr umožní vypočítat součet všech časů dotaz-odpověď:

Root obecny

import asyncio
import aiohttp
import time
 
 
async def download(name, queue, results):
    async with aiohttp.ClientSession() as session:
        while not queue.empty():
            url = await queue.get()
            t1 = time.time()
            print(f"Task named {name} getting URL: {url}")
            async with session.get(url) as response:
                t = await response.text()
                t2 = time.time()
                print(f"Task named {name} downloaded {len(t)} characters in {t2-t1} seconds")
                await results.put(t2-t1)
            print(f"Task named {name} finished")
 
 
async def main():
    queue = asyncio.Queue()
    results = asyncio.Queue()
 
    t1 = time.time()
 
    for url in (
        "http://www.root.cz",
        "http://duckduckgo.com",
        "http://seznam.com",
        "https://www.root.cz/programovaci-jazyky/",
        "https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/",
        "https://www.root.cz/clanky/pywebio-interaktivni-webove-dialogy-a-formulare-v-cistem-pythonu/",
        "https://streamlit.io/",
        "https://pglet.io/",
        "https://www.root.cz/serialy/graficke-uzivatelske-rozhrani-v-pythonu/",
        "https://github.com/"
    ):
        await queue.put(url)
 
    await asyncio.gather(
            asyncio.create_task(download(1, queue, results)),
            asyncio.create_task(download(2, queue, results)),
            asyncio.create_task(download(3, queue, results)))
 
    process_time = 0
    while not results.empty():
        process_time += await results.get()
 
    print(f"Process time: {process_time} seconds")
 
    t2 = time.time()
    print(f"Total time:   {t2-t1} seconds")
 
asyncio.run(main())

Výsledky:

Task named 1 getting URL: http://www.root.cz
Task named 2 getting URL: http://duckduckgo.com
Task named 3 getting URL: http://seznam.com
Task named 2 downloaded 5775 characters in 0.31725001335144043 seconds
Task named 2 finished
Task named 2 getting URL: https://www.root.cz/programovaci-jazyky/
Task named 3 downloaded 1555 characters in 0.43852806091308594 seconds
Task named 3 finished
Task named 3 getting URL: https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/
Task named 1 downloaded 249707 characters in 0.535081148147583 seconds
Task named 1 finished
Task named 1 getting URL: https://www.root.cz/clanky/pywebio-interaktivni-webove-dialogy-a-formulare-v-cistem-pythonu/
Task named 2 downloaded 251515 characters in 0.3788483142852783 seconds
Task named 2 finished
Task named 2 getting URL: https://streamlit.io/
Task named 1 downloaded 235679 characters in 0.2868804931640625 seconds
Task named 1 finished
Task named 1 getting URL: https://pglet.io/
Task named 3 downloaded 236045 characters in 0.41786885261535645 seconds
Task named 3 finished
Task named 3 getting URL: https://www.root.cz/serialy/graficke-uzivatelske-rozhrani-v-pythonu/
Task named 2 downloaded 444341 characters in 0.3120858669281006 seconds
Task named 2 finished
Task named 2 getting URL: https://github.com/
Task named 1 downloaded 10145 characters in 0.21546196937561035 seconds
Task named 1 finished
Task named 3 downloaded 263683 characters in 0.29593372344970703 seconds
Task named 3 finished
Task named 2 downloaded 209173 characters in 0.28455424308776855 seconds
Task named 2 finished
Process time: 3.482492685317993 seconds
Total time:   1.2949903011322021 seconds
Poznámka: nyní je z vypsaných výsledků zcela zřejmé, že namísto cca 3,5 sekundy se nám díky korutinám podařilo celkovou dobu běhu programu snížit na cca 1,3 sekundy. Je však možné i další snížení celkového času, a to zvětšením počtu korutin, například na deset:
Task named 1 getting URL: http://www.root.cz
Task named 2 getting URL: http://duckduckgo.com
Task named 3 getting URL: http://seznam.com
Task named 4 getting URL: https://www.root.cz/programovaci-jazyky/
Task named 5 getting URL: https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/
Task named 6 getting URL: https://www.root.cz/clanky/pywebio-interaktivni-webove-dialogy-a-formulare-v-cistem-pythonu/
Task named 7 getting URL: https://streamlit.io/
Task named 8 getting URL: https://pglet.io/
Task named 9 getting URL: https://www.root.cz/serialy/graficke-uzivatelske-rozhrani-v-pythonu/
Task named 10 getting URL: https://github.com/
Task named 8 downloaded 10145 characters in 0.2201550006866455 seconds
Task named 8 finished
Task named 10 downloaded 209167 characters in 0.26105284690856934 seconds
Task named 10 finished
Task named 7 downloaded 444341 characters in 0.31197690963745117 seconds
Task named 7 finished
Task named 2 downloaded 5775 characters in 0.3665473461151123 seconds
Task named 2 finished
Task named 4 downloaded 252883 characters in 0.41240453720092773 seconds
Task named 4 finished
Task named 9 downloaded 265455 characters in 0.41875481605529785 seconds
Task named 9 finished
Task named 3 downloaded 1555 characters in 0.46982383728027344 seconds
Task named 3 finished
Task named 6 downloaded 235728 characters in 0.4692232608795166 seconds
Task named 6 finished
Task named 1 downloaded 250629 characters in 0.4968068599700928 seconds
Task named 1 finished
Task named 5 downloaded 236190 characters in 0.5269002914428711 seconds
Task named 5 finished
Process time: 3.953645706176758 seconds
Total time:   0.529677152633667 seconds

19. Repositář s demonstračními příklady

Zdrojové kódy všech minule i dnes popsaných demonstračních příkladů určených pro programovací jazyk Python 3 byly uloženy do Git repositáře dostupného na adrese https://github.com/tisnik/most-popular-python-libs. V případě, že nebudete chtít klonovat celý repositář (ten je ovšem stále velmi malý, dnes má velikost zhruba několik desítek kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:

# Demonstrační příklad Stručný popis příkladu Cesta
1 multithreading1.py spuštění tří vláken vykonávajících déletrvající činnost https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multithreading1.py
2 multithreading2.py spuštění tří vláken, předání parametrů volaným funkcím https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multithreading2.py
3 multithreading3.py explicitní čekání na dokončení běhu vláken metodou join https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multithreading3.py
4 multithreading4.py sdílený objekt https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multithreading4.py
5 multithreading_join_deamon.py čekání na dokončení vláken s příznakem „daemon“ https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multithreading_join_dea­mon.py
6 multithreading_no_join_deamon.py vlákna s příznakem „daemon“, na jejichž ukončení se nečeká https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multithreading_no_join_de­amon.py
7 multithreading_no_join_no_deamon.py běžná vlákna bez příznaku „daemon“ https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multithreading_no_join_no_de­amon.py
8 multithreading_timeout.py specifikace maximální doby čekání na ukončení vlákna https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multithreading_timeout.py
       
9 multiprocessing1.py zavolání funkce spuštěné v rámci dalšího procesu https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multiprocessing1.py
10 multiprocessing2.py spuštění většího množství procesů https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multiprocessing2.py
11 multiprocessing3.py nepatrná úprava předchozího příkladu https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multiprocessing3.py
12 multiprocessing4.py řízení workerů posílanými příkazy https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multiprocessing4.py
13 multiprocessing5.py řízení workerů posílanými příkazy https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multiprocessing5.py
14 multiprocessing6.py jeden proces a sdílená globální hodnota https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multiprocessing6.py
15 multiprocessing7.py více procesů, které nesdílí hodnoty https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/multiprocessing7.py
       
16 queue_example.py základní vlastnosti sdílené datové struktury Queue https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/queue_example.py
17 simple_queue_example.py základní vlastnosti sdílené datové struktury SimpleQueue https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/simple_queue_example.py
18 priority_queue_example.py základní vlastnosti sdílené datové struktury PriorityQueue https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/priority_queue_example.py
       
19 queues1.py komunikace mezi vlákny s využitím front: základní forma https://github.com/tisnik/most-popular-python-libs/blob/master/concurrent/queues1.py
20 queues2.py komunikace mezi vlákny s využitím front: více konzumentů https://github.com/tisnik/most-popular-python-libs/blob/master/concurrent/queues2.py
21 queues3.py komunikace mezi vlákny s využitím front: více producentů https://github.com/tisnik/most-popular-python-libs/blob/master/concurrent/queues3.py
22 queues4.py komunikace mezi vlákny s využitím front: více producentů i konzumentů https://github.com/tisnik/most-popular-python-libs/blob/master/concurrent/queues4.py
       
23 thread_pool1.py spuštění tří úloh ve třech vláknech https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/thread_pool1.py
24 thread_pool2.py spuštění deseti úloh v deseti vláknech https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/thread_pool2.py
25 thread_pool3.py omezení počtu vláken na 3 pro celkem deset úloh https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/thread_pool3.py
26 thread_pool4.py návratová hodnota získaná po spuštění úlohy https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/thread_pool4.py
27 thread_pool5.py získání vypočtených hodnot https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/thread_pool5.py
28 thread_pool6.py alternativní způsob zápisu předchozího příkladu https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/thread_pool6.py
       
29 process_pool1.py spuštění tří úloh ve vlastních procesech https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/process_pool1.py
30 process_pool2.py návratové hodnoty https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/process_pool2.py
31 process_pool3.py čekání na dokončení úloh + získání návratových hodnot https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/process_pool3.py
       
32 async_await1.py základní způsob použití async a await https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/async_await1.py
33 async_await2.py funkce main volaná asynchronně https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/async_await2.py
34 async_await3.py dvě asynchronně běžící úlohy https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/async_await3.py
35 async_await4.py získání výsledků z asynchronně běžících úloh https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/async_await4.py
36 async_queue1.py fronty pro kooperace mezi korutinami https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/async_queue1.py
37 async_queue2.py korektní spuštění většího množství korutin https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/async_queue2.py
38 async_queue3.py využití asyncio.gather https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/async_queue3.py
39 async_aiohttp1.py použití knihovny aiohttp https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/async_aiohttp1.py
40 async_aiohttp2.py záznam časů trvání jednotlivých operací https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/async_aiohttp2.py
41 async_aiohttp3.py vylepšení předchozího příkladu https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/async_aiohttp3.py
42 async_aiohttp4.py využití deseti korutin https://github.com/tisnik/most-popular-python-libs/blob/master/concurren­t/async_aiohttp4.py

20. Odkazy na Internetu

  1. Dokumentace Pythonu: balíček queue
    https://docs.python.org/3/li­brary/queue.html
  2. Dokumentace Pythonu: balíček threading
    https://docs.python.org/3/li­brary/threading.html?
  3. Dokumentace Pythonu: balíček multiprocessing
    https://docs.python.org/3/li­brary/multiprocessing.html
  4. Dokumentace Pythonu: balíček asyncio
    https://docs.python.org/3/li­brary/asyncio.html
  5. Synchronization Primitives
    https://docs.python.org/3/li­brary/asyncio-sync.html
  6. Coroutines
    https://docs.python.org/3/li­brary/asyncio-task.html
  7. Queues
    https://docs.python.org/3/li­brary/asyncio-queue.html
  8. python-csp
    https://python-csp.readthedocs.io/en/latest/
  9. TrellisSTM
    http://peak.telecommunity­.com/DevCenter/TrellisSTM
  10. Python Multithreading and Multiprocessing Tutorial
    https://www.toptal.com/pyt­hon/beginners-guide-to-concurrency-and-parallelism-in-python
  11. ThreadPoolExecutor
    https://docs.python.org/3/li­brary/concurrent.futures.html#thre­adpoolexecutor
  12. ProcessPoolExecutor
    https://docs.python.org/3/li­brary/concurrent.futures.html#pro­cesspoolexecutor
  13. asyncio — Asynchronous I/O
    https://docs.python.org/3/li­brary/asyncio.html
  14. Threads vs Async: Has Asyncio Solved Concurrency?
    https://www.youtube.com/wat­ch?v=NZq31Sg8R9E
  15. Python Asynchronous Programming – AsyncIO & Async/Await
    https://www.youtube.com/wat­ch?v=t5Bo1Je9EmE
  16. AsyncIO & Asynchronous Programming in Python
    https://www.youtube.com/wat­ch?v=6RbJYN7SoRs
  17. Coroutines and Tasks
    https://docs.python.org/3/li­brary/asyncio-task.html
  18. Python async/await Tutorial
    https://stackabuse.com/python-async-await-tutorial/
  19. Demystifying Python's Async and Await Keywords
    https://www.youtube.com/wat­ch?v=F19R_M4Nay4
  20. Curio
    https://curio.readthedocs­.io/en/latest/
  21. Trio: a friendly Python library for async concurrency and I/O
    https://trio.readthedocs.i­o/en/stable/
  22. Curio – A Tutorial Introduction
    https://curio.readthedocs­.io/en/latest/tutorial.html
  23. unsync
    https://github.com/alex-sherman/unsync

Autor článku

Pavel Tišnovský vystudoval VUT FIT a v současné době pracuje ve společnosti Red Hat, kde vyvíjí nástroje pro OpenShift.io.