Obsah
1. Nové vlastnosti Pythonu 3.14 v praxi: komunikace mezi interpretry
2. Komunikace mezi interpretry s využitím fronty
3. Demonstrační příklad: poslání zprávy mezi úlohami, které běží v různých interpretrech
4. Úplný zdrojový kód prvního demonstračního příkladu
5. Realizace programu pro zpracování úloh v samostatně běžících interpretrech
6. Ukázka chování programu s více úlohami po jeho spuštění
7. Jazyk Python a multiprocessing
8. Spuštění většího množství procesů, čekání na dokončení těchto procesů
9. Realizace programu pro zpracování úloh v samostatných procesech
10. Porovnání realizace workerů založené na interpretrech s realizací založenou na multiprocesingu
11. Přepis programu takovým způsobem, aby se úlohy spouštěly v samostatných vláknech
12. Porovnání realizace workerů založené na interpretrech s realizací založenou na multithreadingu
13. Čekání na ukončení všech úloh s využitím synchronizačních mechanismů fronty
14. Realizace klasického vzoru vzor producent–konzument
15. Souběžně běžící úlohy a konstrukce async a await
16. Přepis programu takovým způsobem, aby se používaly asynchronně běžící úlohy
18. Odkazy na články s problematikou souběžnosti a paralelnosti v Pythonu
19. Repositář s demonstračními příklady
1. Nové vlastnosti Pythonu 3.14 v praxi: komunikace mezi interpretry
Na úvodní dvojici článků o nových vlastnostech Pythonu 3.14 [1] [2] dnes navážeme. Minule jsme si řekli, že v Pythonu 3.14 je možné programově vytvořit větší množství interpretrů a spouštět v nich úlohy, které jsou do značné míry izolovány od úloh, které jsou spouštěny v jiných interpretrech (i když se tato technologie odlišuje od klasického multiprocesingu). A nejen to – úlohy v interpretrech mohou běžet (a většinou i běží) v samostatných vláknech, takže lze dosáhnout souběžného a mnohdy i paralelního běhu výpočtů.
Ovšem současně je nutné nějakým způsobem zajistit komunikaci mezi úlohami, které jsou od sebe tímto způsobem odizolovány. Dnes si ukážeme, jakým způsobem se to provádí. Navíc použitou technologii porovnáme s realizací komunikace mezi asynchronně spouštěnými úlohami, vlákny a procesy (což jsou taktéž technologie zajišťující souběžné provádění úloh, ovšem na odlišných úrovních).
$ python3 Python 3.14.0 free-threading build (main, Oct 18 2025, 10:08:58) [GCC 14.2.1 20240912 (Red Hat 14.2.1-3)] on linux Type "help", "copyright", "credits" or "license" for more information. >>>
2. Komunikace mezi interpretry s využitím fronty
V předchozím článku jsme si ověřili, že pokud je nějaká úloha (kód ve zdrojové podobě nebo funkce) spuštěna v kontextu nového interpretru, nemá přístup k proměnným ostatních interpretrů. To je pochopitelně korektní a očekávané chování protože tato úloha má běžet izolovaně od ostatních úloh. Ovšem mnohdy je nutné zajistit předávání dat mezi jednotlivými (od sebe izolovanými) úlohami. Pro tento účel se používají fronty, což není v ekosystému jazyka Python překvapující, protože pro stejné účely se fronty používají i v kontextu asynchronních úloh, úloh běžících ve vláknech nebo i při spuštění úloh v samostatných procesech.
Nyní tedy víme, že pro komunikaci mezi interpretry (resp. mezi úlohami, které v nich běží) slouží fronty. Jak se taková fronta vytvoří? K tomuto účelu slouží funkce nazvaná create_queue z balíčku concurrent.interpreters (jinou formu fronty nepoužívejte):
create(maxsize=0, *, unbounditems=concurrent.interpreters._queues.UNBOUND)
Return a new cross-interpreter queue.
The queue may be used to pass data safely between interpreters.
"unbounditems" sets the default for Queue.put(); see that method for
supported values. The default value is UNBOUND, which replaces
the unbound item.
Příklad vytvoření fronty:
from concurrent import interpreters q = interpreters.create_queue() ... ... ...
Vytvořená fronta programátorům nabízí několik metod, které postupně použijeme v dalším textu:
| # | Metoda |
|---|---|
| 1 | empty |
| 2 | full |
| 3 | get |
| 4 | get_nowait |
| 5 | put |
| 6 | put_nowait |
| 7 | qsize |
from concurrent import interpreters interp = interpreters.create() q = interpreters.create_queue() interp.call_in_thread(run, q)
3. Demonstrační příklad: poslání zprávy mezi úlohami, které běží v různých interpretrech
Podívejme se nyní na způsob praktické realizace posílání zpráv mezi dvěma úlohami, z nichž každá běží ve vlastním interpretru (navíc v různých vláknech). Nejdříve zkonstruujeme nový interpretr (ten zatím neběží) a následně frontu použitou pro komunikaci mezi úlohami:
from concurrent import interpreters interp = interpreters.create() q = interpreters.create_queue()
Dále nadefinujeme funkci, která bude později spuštěna v novém interpretru. Povšimněte si, že této funkci bude předán parametr q, který by měl být typu Queue (těchto typů existuje v základní knihovně Pythonu několik, ovšem jejich základní vlastnosti jsou shodné). Funkce bude čekat na zprávu, kterou někdo jiný (kód běžící v odlišném vláknu) pošle do fronty. Jakmile takovou zprávu dostane, vypíše její obsah a funkce se následně ukončí:
def run(q):
print("Hello from new interpreter, waiting for message")
message = q.get()
print(f"Message received: '{message}'")
Nyní již postačuje spustit tuto funkci v novém interpretru (navíc v novém vláknu) a předat jí referenci na frontu:
print("Executing run()")
t = interp.call_in_thread(run, q)
Pošleme zprávu do fronty; převezme si ji úloha běžící v jiném interpretru/vláknu:
print("Sending message into queue")
q.put("foo")
print("Message sent")
Nyní již pouze počkáme na dokončení úlohy:
print("Waiting for other interpreter to finish")
t.join()
4. Úplný zdrojový kód prvního demonstračního příkladu
Úplný zdrojový kód příkladu, který byl popsán v předchozí kapitole, vypadá následovně:
from concurrent import interpreters
interp = interpreters.create()
def run(q):
print("Hello from new interpreter, waiting for message")
message = q.get()
print(f"Message received: '{message}'")
q = interpreters.create_queue()
print("Executing run()")
t = interp.call_in_thread(run, q)
print("Sending message into queue")
q.put("foo")
print("Message sent")
print("Waiting for other interpreter to finish")
t.join()
print("Done")
Příklad chování programu po spuštění:
Executing run() Sending message into queue Message sent Waiting for other interpreter to finish Hello from new interpreter, waiting for message Message received: 'foo' Done
5. Realizace programu pro zpracování úloh v samostatně běžících interpretrech
Původní program popsaný v předchozích dvou kapitolách nyní upravíme do nepatrně složitější podoby. V první řadě budeme v programu definovat funkci nazvanou worker. Ta poběží (klidně i několikrát) v různých interpretrech/vláknech a bude přes frontu přijímat úlohy. Vytiskne parametry úlohy a samotnou práci bude simulovat funkcí time.sleep :-). Pokud ovšem bude úloha obsahovat text quit, bude funkce worker ihned ukončena:
def worker(name, q):
"""Worker spuštěný několikrát v samostatných interpretrech."""
while True:
# čtení příkazů z fronty
cmd = q.get()
print(f"Process '{name}' received command '{cmd}'")
if cmd == "quit":
print(f"Process '{name}' is about to quit")
return
time.sleep(1)
Funkce worker bude spuštěna několikrát, abychom si ukázali možnosti škálování úloh:
q = interpreters.create_queue()
# vytvoření tří procesů
names = ("foo", "bar", "baz")
ins = [interpreters.create().call_in_thread(worker, name, q) for name in names]
Povšimněte si, že všem třem workerům je předáno sice odlišné jméno, ale ve druhém parametru získají referenci na instanci stejné fronty. To nám umožní předávat wokerkům úlohy, o které se budou dělit (podle toho, který worker je zrovna dostupný):
print("Sending data to other interpreters")
# komunikace s interpretry přes frontu
for i in range(10):
print(f"Sending 'command {i}'")
q.put("command {}".format(i))
Na konci požádáme workery o jejich ukončení a počkáme, až všechna vlákna, ve kterých běží interpretry, skutečně skončí:
# příkaz pro ukončení procesů
for i in range(3):
q.put("quit")
print("Waiting for other interpreters")
# čekání na ukončení interpretrů
for i in ins:
i.join()
print("All work done!")
Úplný zdrojový kód takto upraveného demonstračního příkladu vypadá následovně:
# Multiprocesing a multithreading v Pythonu:
# - spuštění více úloh v nových interpretrech
# - komunikace mezi interpretry s využitím fronty
import time
from concurrent import interpreters
def worker(name, q):
"""Worker spuštěný několikrát v samostatných interpretrech."""
while True:
# čtení příkazů z fronty
cmd = q.get()
print(f"Process '{name}' received command '{cmd}'")
if cmd == "quit":
print(f"Process '{name}' is about to quit")
return
time.sleep(1)
if __name__ == "__main__":
print("Starting")
# vytvoření fronty pro komunikaci mezi interpretry
q = interpreters.create_queue()
# vytvoření tří procesů
names = ("foo", "bar", "baz")
ins = [interpreters.create().call_in_thread(worker, name, q) for name in names]
print("Sending data to other interpreters")
# komunikace s interpretry přes frontu
for i in range(10):
print(f"Sending 'command {i}'")
q.put("command {}".format(i))
print("Asking other interpreters to finish")
# příkaz pro ukončení procesů
for i in range(3):
q.put("quit")
print("Waiting for other interpreters")
# čekání na ukončení interpretrů
for i in ins:
i.join()
print("All work done!")
6. Ukázka chování programu s více úlohami po jeho spuštění
Po spuštění programu můžeme snadno zjistit, že se workeři skutečně o předané úlohy dělili:
Process 'foo' received command 'command 0' Process 'foo' received command 'command 3' Process 'foo' received command 'command 6' Process 'foo' received command 'command 9' Process 'foo' received command 'quit' Process 'foo' is about to quit Process 'baz' received command 'command 2' Process 'baz' received command 'command 5' Process 'baz' received command 'command 8' Process 'baz' received command 'quit' Process 'baz' is about to quit Process 'bar' received command 'command 1' Process 'bar' received command 'command 4' Process 'bar' received command 'command 7' Process 'bar' received command 'quit' Process 'bar' is about to quit Starting Sending data to other interpreters Sending 'command 0' Sending 'command 1' Sending 'command 2' Sending 'command 3' Sending 'command 4' Sending 'command 5' Sending 'command 6' Sending 'command 7' Sending 'command 8' Sending 'command 9' Asking other interpreters to finish Waiting for other interpreters All work done!
Doba běhu programu je menší než 10 sekund:
real 0m4.171s user 0m0.199s sys 0m0.040s
7. Jazyk Python a multiprocessing
V předchozích kapitolách jsme si popsali technologii spouštění výpočtů v samostatných vláknech, ve kterých navíc běží izolované interpretry. Ovšem existují i další možnosti tvorby aplikací, jejichž části mají běžet buď „pouze“ souběžně nebo které využijí plnou paralelnost nabízenou moderními počítači. Vzhledem k (dnes již volitelné) existenci GILu ve standardním CPythonu může být problematické zajistit paralelní běh v rámci jednoho procesu (tedy vlastně jednoho virtuálního stroje Pythonu), proto je dalším logickým krokem rozdělení (fork) tohoto procesu na větší množství plnohodnotných systémových procesů.
Ty budou moci běžet nezávisle na sobě a navíc toto řešení programátora donutí k tomu, aby explicitně zajistil korektní komunikaci mezi těmito procesy – zde již nemůže dojít k chybám typu „přístup do objektu vlastněného jiným vláknem“ atd. Toto řešení přináší i některé nevýhody – větší systémové nároky, pomalejší spouštění procesů (v porovnání se spouštěním interpretrů) a v některých případech se projeví i delší doba přepínání mezi procesy (ovšem to se podle mého názoru u interpretovaného Pythonu ztratí) a někdy by se skutečně hodilo mít snadný přístup ke sdíleným objektům.
V případě, že se rozhodnete si vyzkoušet spouštění jednotlivých částí algoritmu v samostatných procesech, můžete využít další standardní modul (balíček), který se jmenuje příznačně multiprocessing. Tento modul vývojáře do značné míry odstiňuje od nízkoúrovňových operací, tedy od samotného rozvětvení procesu (fork), spuštění nového interpretru a specifikace, jaký kód má tento interpret použít. Z pohledu vývojáře je totiž použití modulu multiprocessing velmi přímočaré – pouze se zvolí, jaká funkce se má zavolat v novém procesu a jaké mají být této funkci předány argumenty. Navíc modul multiprocessing programátorům nabízí mechanismy umožňující komunikaci mezi procesy. Zejména se jedná o frontu (queue) (ovšem jinou frontu, než jsme doposud používali, i když její API je v některých ohledech prakticky totožné) a taktéž o oboustrannou rouru (pipe).
8. Spuštění většího množství procesů, čekání na dokončení těchto procesů
Podívejme se nyní na velmi jednoduchý demonstrační příklad, na němž jsou ukázány základní mechanismy nabízené standardním modulem multiprocessing. Ve skriptu je definována následující funkce, která se má spustit v samostatném procesu:
def worker(name):
print("hello", name)
Nový proces se spustí následovně:
p = Process(target=worker, args=("foo",))
p.start()
Implementovat je možné i čekání na dokončení tohoto procesu:
p.join()
p = Process(target=worker, args=("foo",)).start()
Úplný zdrojový kód takto vytvořeného příkladu vypadá následovně:
from multiprocessing import Process
def worker(name):
print("hello", name)
def main():
p = Process(target=worker, args=("foo",))
p.start()
p.join()
if __name__ == '__main__':
print("Running main")
main()
Samozřejmě můžeme spustit větší množství procesů a následně si (například nástrojem top nebo htop) tyto procesy zobrazit. Na dokončení procesů lze počkat metodou join – jedná se tedy o prakticky stejný koncept, jaký jsme viděli při práci s větším množstvím interpretrů (a ještě uvidíme při práci s vlákny):
from multiprocessing import Process
import time
def worker(name):
print("hello", name)
time.sleep(5)
print("done", name)
def main():
ps = []
for name in ("foo", "bar", "baz", "other"):
p = Process(target=worker, args=(name,))
p.start()
ps.append(p)
for p in ps:
p.join()
if __name__ == '__main__':
print("Running main")
main()
Výpis procesů (včetně dvou nerelevantních procesů):
$ ps ax |grep python 767 ? Ssl 0:00 /usr/bin/python3 -Es /usr/sbin/firewalld --nofork --nopid 10864 pts/4 S+ 0:00 python3 multiprocessing2.py 10865 pts/4 S+ 0:00 python3 multiprocessing2.py 10866 pts/4 S+ 0:00 python3 multiprocessing2.py 10867 pts/4 S+ 0:00 python3 multiprocessing2.py 10868 pts/4 S+ 0:00 python3 multiprocessing2.py 10947 pts/6 S+ 0:00 grep --color=auto python
Zvýraznění rozvětvení (forku), které bylo provedeno:
$ pstree -c 10864
python3─┬─python3
├─python3
├─python3
└─python3
9. Realizace programu pro zpracování úloh v samostatných procesech
V této chvíli již známe všechny informace pro to, abychom program, ve kterém se úlohy spouští v samostatných interpretrech, změnili tak, aby se úlohy spouštěly v samostatných procesech. I když se jedná o dosti velkou změnu technologie, budou změny (z pohledu zdrojového kódu) relativně malé:
# Multiprocesing a multithreading v Pythonu:
# - spuštění více úloh v nových procesech
# - komunikace mezi procesy s využitím fronty
import time
from multiprocessing import Process, Queue, freeze_support
def worker(name, q):
"""Worker spuštěný několikrát v samostatných procesech."""
while True:
# čtení příkazů z fronty
cmd = q.get()
print(f"Process '{name}' received command '{cmd}'")
if cmd == "quit":
print(f"Process '{name}' is about to quit")
return
time.sleep(1)
if __name__ == "__main__":
print("Starting")
freeze_support()
# vytvoření fronty pro komunikaci mezi procesy
q = Queue()
# vytvoření tří procesů
names = ("foo", "bar", "baz")
ps = [Process(target=worker, args=(name, q)) for name in names]
# spuštění tří procesů
for p in ps:
p.start()
print("Sending data to other processes")
# komunikace s procesy přes frontu
for i in range(10):
print(f"Sending 'command {i}'")
q.put("command {}".format(i))
print("Asking other processes to finish")
# příkaz pro ukončení procesů
for i in range(3):
q.put("quit")
print("Waiting for other processes")
# čekání na ukončení procesů
for p in ps:
p.join()
print("All work done!")
10. Porovnání realizace workerů založené na interpretrech s realizací založenou na multiprocesingu
I když je realizace workerů založená na interpretrech interně značně odlišná od realizace založené na multiprocesingu, jsou zdrojové kódy obou realizací skutečně velmi podobné. Ostatně postačuje se podívat na porovnání obou zdrojových kódů řádek po řádku:
# Multiprocesing a multithreading v Pythonu: # Multiprocesing a multithreading v Pythonu:
# - spuštění více úloh v nových interpretrech # - spuštění více úloh v nových procesech
# - komunikace mezi interpretry s využitím fronty # - komunikace mezi procesy s využitím fronty
import time import time
from concurrent import interpreters from multiprocessing import Process, Queue, freeze_support
def worker(name, q): def worker(name, q):
"""Worker spuštěný několikrát v samostatných interpretrech """Worker spuštěný několikrát v samostatných procesech.""
while True: while True:
# čtení příkazů z fronty # čtení příkazů z fronty
cmd = q.get() cmd = q.get()
print(f"Process '{name}' received command '{cmd}'") print(f"Process '{name}' received command '{cmd}'")
if cmd == "quit": if cmd == "quit":
print(f"Process '{name}' is about to quit") print(f"Process '{name}' is about to quit")
return return
time.sleep(1) time.sleep(1)
if __name__ == "__main__": if __name__ == "__main__":
print("Starting") print("Starting")
freeze_support()
# vytvoření fronty pro komunikaci mezi interpretry # vytvoření fronty pro komunikaci mezi procesy
q = interpreters.create_queue() q = Queue()
# vytvoření tří procesů # vytvoření tří procesů
names = ("foo", "bar", "baz") names = ("foo", "bar", "baz")
ins = [interpreters.create().call_in_thread(worker, name, ps = [Process(target=worker, args=(name, q)) for name in
# spuštění tří procesů
for p in ps:
p.start()
print("Sending data to other interpreters") print("Sending data to other processes")
# komunikace s interpretry přes frontu # komunikace s procesy přes frontu
for i in range(10): for i in range(10):
print(f"Sending 'command {i}'") print(f"Sending 'command {i}'")
q.put("command {}".format(i)) q.put("command {}".format(i))
print("Asking other interpreters to finish") print("Asking other processes to finish")
# příkaz pro ukončení procesů # příkaz pro ukončení procesů
for i in range(3): for i in range(3):
q.put("quit") q.put("quit")
print("Waiting for other interpreters") print("Waiting for other processes")
# čekání na ukončení interpretrů # čekání na ukončení procesů
for i in ins: for p in ps:
i.join() p.join()
print("All work done!") print("All work done!")
11. Přepis programu takovým způsobem, aby se úlohy spouštěly v samostatných vláknech
V Pythonu je možné jednotlivé úlohy spouštět v samostatných vláknech, a to se všemi výhodami a nevýhodami, které tato technologie přináší. Pro komunikaci mezi vlákny se opět (typicky) používají fronty. Navíc je ovšem nutné funkci, která má být zavolána v nově spuštěném vláknu, předat nějaké parametry; v našem konkrétním případě jméno úlohy a referenci na frontu. To nelze provést přímo (funkci totiž nevolá přímo programátor, ale modul threading), nicméně parametry do volané funkce je možné specifikovat, i když nepřímým způsobem:
# vytvoření fronty pro komunikaci mezi vlákny
q = Queue()
# spuštění tří vláken
names = ("foo", "bar", "baz")
ts = [Thread(target=worker, daemon=True, name=name, args=[name, q]) for name in names]
Čekání na dokončení úloh lze zajistit více způsoby, například explicitním čekáním na dokončení vláken:
print("Asking other threads to finish")
# příkaz pro ukončení vláken
for i in range(3):
q.put("quit")
print("Waiting for other threads")
# čekání na zpracování všech zpráv ve frontě
for t in ts:
t.join()
Program upravený do takové podoby, že používá multithreading a nikoli multiprocesing (či více interpretrů) může vypadat následovně:
# Multiprocesing a multithreading v Pythonu:
# - spuštění více úloh v nových vláknech
# - komunikace mezi vlákny s využitím fronty
from queue import Queue
from threading import Thread
import time
def worker(name, q):
"""Worker spuštěný několikrát v samostatných vláknech."""
while True:
# čtení příkazů z fronty
cmd = q.get()
print(f"Thread '{name}' received command '{cmd}'")
if cmd == "quit":
print(f"Thread '{name}' is about to quit")
return
time.sleep(1)
if __name__ == "__main__":
print("Starting")
# vytvoření fronty pro komunikaci mezi vlákny
q = Queue()
# spuštění tří vláken
names = ("foo", "bar", "baz")
ts = [Thread(target=worker, daemon=True, name=name, args=[name, q]) for name in names]
# spuštění tří vláken
for t in ts:
t.start()
print("Sending data to other threads")
# komunikace s vlákny přes frontu
for i in range(10):
print(f"Sending 'command {i}'")
q.put("command {}".format(i))
print("Asking other threads to finish")
# příkaz pro ukončení vláken
for i in range(3):
q.put("quit")
print("Waiting for other threads")
# čekání na zpracování všech zpráv ve frontě
for t in ts:
t.join()
print("All work done!")
12. Porovnání realizace workerů založené na interpretrech s realizací založenou na multithreadingu
Opět si porovnejme realizaci programu, který spouští úlohy v různých interpretrech a programu, který provádí tutéž činnost, ovšem s využitím multithreadingu. Změny ve zdrojovém kódu budou relativně malé:
# Multiprocesing a multithreading v Pythonu: # Multiprocesing a multithreading v Pythonu:
# - spuštění více úloh v nových interpretrech # - spuštění více úloh v nových vláknech
# - komunikace mezi interpretry s využitím fronty # - komunikace mezi vlákny s využitím fronty
from queue import Queue
from threading import Thread
import time import time
from concurrent import interpreters
def worker(name, q): def worker(name, q):
"""Worker spuštěný několikrát v samostatných interpretrec """Worker spuštěný několikrát v samostatných vláknech."""
while True: while True:
# čtení příkazů z fronty # čtení příkazů z fronty
cmd = q.get() cmd = q.get()
print(f"Process '{name}' received command '{cmd}'") print(f"Thread '{name}' received command '{cmd}'")
if cmd == "quit": if cmd == "quit":
print(f"Process '{name}' is about to quit") print(f"Thread '{name}' is about to quit")
return return
time.sleep(1) time.sleep(1)
if __name__ == "__main__": if __name__ == "__main__":
print("Starting") print("Starting")
# vytvoření fronty pro komunikaci mezi interpretry # vytvoření fronty pro komunikaci mezi vlákny
q = interpreters.create_queue() q = Queue()
# vytvoření tří procesů # spuštění tří vláken
names = ("foo", "bar", "baz") names = ("foo", "bar", "baz")
ins = [interpreters.create().call_in_thread(worker, name, ts = [Thread(target=worker, daemon=True, name=name, args=
# spuštění tří vláken
for t in ts:
t.start()
print("Sending data to other interpreters") print("Sending data to other threads")
# komunikace s interpretry přes frontu # komunikace s vlákny přes frontu
for i in range(10): for i in range(10):
print(f"Sending 'command {i}'") print(f"Sending 'command {i}'")
q.put("command {}".format(i)) q.put("command {}".format(i))
print("Asking other interpreters to finish") print("Asking other threads to finish")
# příkaz pro ukončení procesů # příkaz pro ukončení vláken
for i in range(3): for i in range(3):
q.put("quit") q.put("quit")
print("Waiting for other interpreters") print("Waiting for other threads")
# čekání na ukončení interpretrů # čekání na zpracování všech zpráv ve frontě
for i in ins: for t in ts:
i.join() t.join()
print("All work done!") print("All work done!")
13. Čekání na ukončení všech úloh s využitím synchronizačních mechanismů fronty
Ve skutečnosti je možné pro čekání na dokončení všech úloh použít ještě jeden synchronizační mechanismus, který je ze sémantického hlediska výhodnější. Samotní workeři totiž mohou signalizovat dokončení úlohy zavoláním metody Queue.task_done(). A vlákno, ve kterém se čeká na dokončení činnosti workerů, může použít metodu Queue.join().
Realizace workera se tedy může změnit následovně:
def worker(name, q):
"""Worker spuštěný několikrát v samostatných vláknech."""
while True:
# čtení příkazů z fronty
cmd = q.get()
print(f"Thread '{name}' received command '{cmd}'")
if cmd == "quit":
print(f"Thread '{name}' is about to quit")
q.task_done()
return
time.sleep(1)
q.task_done()
Samotné čekání na všechna tři vlákna bude vypadat takto:
# příkaz pro ukončení vláken
for i in range(3):
q.put("quit")
print("Waiting for other threads")
# čekání na zpracování všech zpráv ve frontě
q.join()
Zdrojový kód demonstračního příkladu se změní do následující podoby:
# Multiprocesing a multithreading v Pythonu:
# - spuštění více úloh v nových vláknech
# - komunikace mezi vlákny s využitím fronty
from queue import Queue
from threading import Thread
import time
def worker(name, q):
"""Worker spuštěný několikrát v samostatných vláknech."""
while True:
# čtení příkazů z fronty
cmd = q.get()
q.task_done()
print(f"Thread '{name}' received command '{cmd}'")
if cmd == "quit":
print(f"Thread '{name}' is about to quit")
return
time.sleep(1)
if __name__ == "__main__":
print("Starting")
# vytvoření fronty pro komunikaci mezi vlákny
q = Queue()
# spuštění tří vláken
names = ("foo", "bar", "baz")
ts = [Thread(target=worker, daemon=True, name=name, args=[name, q]) for name in names]
# spuštění tří vláken
for t in ts:
t.start()
print("Sending data to other threads")
# komunikace s vlákny přes frontu
for i in range(10):
print(f"Sending 'command {i}'")
q.put("command {}".format(i))
print("Asking other threads to finish")
# příkaz pro ukončení vláken
for i in range(3):
q.put("quit")
print("Waiting for other threads")
# čekání na zpracování všech zpráv ve frontě
q.join()
print("All work done!")
14. Realizace klasického vzoru vzor producent–konzument
V následujícím demonstračním příkladu je ukázán známý systém producer-consumer, kde jak producenti, tak i konzumenti každý běží v samostatném vláknu a komunikují spolu pouze přes sdílenou frontu q. Producentů i konzumentů může být (prakticky) libovolné množství a navíc je možné je přidávat nebo ubírat na základě požadavků aplikace. V následujícím demonstračním příkladu spolu komunikují tři producenti a čtyři konzumenti:
# Multiprocesing a multithreading v Pythonu:
# - spuštění více úloh v nových vláknech
# - komunikace mezi vlákny s využitím fronty
from queue import Queue
from threading import Thread
import time
def worker(name, q):
"""Worker spuštěný několikrát v samostatných vláknech."""
while True:
# čtení příkazů z fronty
cmd = q.get()
print(f"Thread '{name}' received command '{cmd}'")
if cmd == "quit":
print(f"Thread '{name}' is about to quit")
q.task_done()
return
time.sleep(1)
q.task_done()
def producer(name, q):
for job in range(10):
print(f"{name} thread: Starting producing {job}")
q.put(job)
time.sleep(0.3)
print(f"{name} thread: Produced {job}")
if __name__ == "__main__":
print("Starting")
# vytvoření fronty pro komunikaci mezi vlákny
q = Queue()
# spuštění čtyř producentů
names = ("1st", "2nd", "3rd", "4th")
ps = [Thread(target=producer, daemon=True, name=name, args=[name, q]) for name in names]
# spuštění producentů
for p in ps:
p.start()
# vytvoření tří vláken
names = ("foo", "bar", "baz")
ts = [Thread(target=worker, daemon=True, name=name, args=[name, q]) for name in names]
# spuštění tří vláken
for t in ts:
t.start()
print("Asking other threads to finish")
# čekání na dokončení producentů
for p in ps:
p.join()
# příkaz pro ukončení vláken
for i in range(3):
q.put("quit")
print("Waiting for other threads")
# čekání na zpracování všech zpráv ve frontě
q.join()
print("All work done!")
15. Souběžně běžící úlohy a konstrukce async a await
V některých situacích je možné dosáhnout zvýšení efektivity aplikace (například zvýšit počet odpovědí, které může server vygenerovat za určitou časovou jednotku) a přitom není možné či vhodné využívat přímočaré ale dosti nízkoúrovňové řešení založené na použití většího množství vláken, procesů nebo interpretrů spravovaných runtime systémem Pythonu. Naprosto typickým příkladem jsou některé virtuální stroje JavaScriptu, které povětšinou umožňují běh aplikace v jediném vláknu.
Pokud aplikace intenzivně používá I/O, tedy například přístup k datům přes HTTP(s), volání DB operací atd., lze namísto více vláken využít korutiny, což jsou funkce transformované takovým způsobem, že mohou běžet souběžně, nikoli však nutně paralelně. Existuje mnoho způsobů konstrukce a volání korutin. V Pythonu se (vedle zavedeného slova yield) nově používají dvě slova async a await, přičemž async je klíčové slovo umožňující transformaci funkce do korutiny a await je klíčové slovo pro čekání na dokončení korutiny se získáním její návratové hodnoty.
I pro předávání dat mezi korutinami lze využít frontu, v tomto případě se ovšem jedná o instanci třídy asyncio.Queue. Všechny operace s touto frontou (tedy zejména operace put a get) se musí volat v konstrukci await, což je ukázáno v dalším demonstračním příkladu.
16. Přepis programu takovým způsobem, aby se používaly asynchronně běžící úlohy
Jak bude vypadat přepis našeho programu s několika workery tak, aby byly jednotlivé úlohy prováděny asynchronně? Samotná implementace workerů vyžaduje asynchronní kód (async def) a čtení z fronty tudíž bude obsahovat synchronizaci (await. Implementace může vypadat následovně:
async def worker(name, q):
"""Worker spuštěný několikrát asynchronně."""
while not q.empty():
# čtení příkazů z fronty
cmd = await q.get()
print(f"Task '{name}' received command '{cmd}'")
await sleep(1)
Volání workerů je prováděno asynchronně a z tohoto důvodu Python vyžaduje, aby toto volání bylo prováděno z asynchronní funkce:
async def main():
...
...
...
Připravíme si jednotlivé úlohy a vložíme je do fronty:
# komunikace s úlohami přes frontu
for i in range(10):
print(f"Sending 'command {i}'")
await queue.put("command {}".format(i))
Následně spustíme tři workery, pochopitelně formou asynchronního kódu:
await gather(
create_task(worker("foo", queue)),
create_task(worker("bar", queue)),
create_task(worker("baz", queue)),
)
Úplný zdrojový kód takto upraveného programu bude vypadat následovně:
# Multiprocesing a multithreading v Pythonu:
# - spuštění více úloh asynchronně
# - komunikace mezi procesy s využitím fronty
from asyncio import Queue, sleep, run, gather, create_task
async def worker(name, q):
"""Worker spuštěný několikrát asynchronně."""
while not q.empty():
# čtení příkazů z fronty
cmd = await q.get()
print(f"Task '{name}' received command '{cmd}'")
await sleep(1)
async def main():
print("Starting")
# vytvoření fronty pro komunikaci mezi úlohami
queue = Queue()
print("Sending data to async tasks")
# komunikace s úlohami přes frontu
for i in range(10):
print(f"Sending 'command {i}'")
await queue.put("command {}".format(i))
print("Waiting for all tasks")
await gather(
create_task(worker("foo", queue)),
create_task(worker("bar", queue)),
create_task(worker("baz", queue)),
)
print("All work done!")
run(main())
A takto může vypadat chování programu po jeho spuštění (používám Python bez GILu, to však nemusí mít na pořadí spouštění příkazů žádný zvenku viditelný vliv):
Starting Sending data to async tasks Sending 'command 0' Sending 'command 1' Sending 'command 2' Sending 'command 3' Sending 'command 4' Sending 'command 5' Sending 'command 6' Sending 'command 7' Sending 'command 8' Sending 'command 9' Waiting for all tasks Task 'foo' received command 'command 0' Task 'bar' received command 'command 1' Task 'baz' received command 'command 2' Task 'foo' received command 'command 3' Task 'bar' received command 'command 4' Task 'baz' received command 'command 5' Task 'foo' received command 'command 6' Task 'bar' received command 'command 7' Task 'baz' received command 'command 8' Task 'foo' received command 'command 9' All work done!
17. Shrnutí
Možnost spouštění úloh v samostatných a do značné míry izolovaných interpretrech může být velmi užitečná a současně se jedná o technologii, která doplňuje existující sadu tří technologií určených pro souběžné provádění úloh: asynchronní kód, multithreading a multiprocesing. Při použití interpretrů dosáhneme větší izolace kódu, než tomu je u multithreadingu, ovšem nároky na systémové zdroje budou obecně menší, než při multiprocesingu (to však musíme dokázat měřením). Ovšem důležité je, že pokud je vyžadováno, aby mezi sebou jednotlivé úlohy komunikovaly, použije se ve všech čtyřech řešeních podobná technika – fronta se synchronizovanými (a obecně blokujícími) operacemi put a get. Na demonstračních příkladech jsme si navíc ukázali, že i zdrojové kódy budou do značné míry totožné (až na asynchronní kód), takže přechod mezi různými stupni izolace a paralelnosti nemusí být v praxi příliš komplikovaný (pokud ovšem dodržíme fakt, že úlohy mezi sebou striktně komunikují jen s využitím front).
18. Odkazy na články s problematikou souběžnosti a paralelnosti v Pythonu
Na stránkách Roota jsme se již několikrát setkali s problematikou souběžnosti, paralelnosti a asynchronního běhu v Pythonu. Různé varianty spouštění a řízení více vláken, procesů a asynchronních úloh naleznete v následujících článcích (všechny v článcích uvedené demonstrační příklady by měly být spustitelné i v interpretru Pythonu 3.14 bez GILu):
- Souběžné a paralelně běžící úlohy naprogramované v Pythonu
https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu/ - Souběžné a paralelně běžící úlohy naprogramované v Pythonu (2)
https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu-2/ - Souběžné a paralelně běžící úlohy naprogramované v Pythonu – Curio a Trio
https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu-curio-a-trio/ - Souběžné a paralelně běžící úlohy naprogramované v Pythonu – knihovna Trio
https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu-knihovna-trio/ - Souběžné a paralelně běžící úlohy naprogramované v Pythonu – knihovna Trio (2)
https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu-knihovna-trio-2/ - Souběžné a paralelně běžící úlohy naprogramované v Pythonu – závěrečné zhodnocení
https://www.root.cz/clanky/soubezne-a-paralelne-bezici-ulohy-naprogramovane-v-pythonu-zaverecne-zhodnoceni/ - Interpret Pythonu bez GILu: vyplatí se odstranění velkého zámku?
https://www.root.cz/clanky/interpret-pythonu-bez-gilu-vyplati-se-odstraneni-velkeho-zamku/ - Nové vlastnosti Pythonu 3.14 v praxi: vliv odstranění GIL a využití více interpretrů
https://www.root.cz/clanky/nove-vlastnosti-pythonu-3–14-v-praxi-vliv-odstraneni-gil-a-vyuziti-vice-interpretru/
19. Repositář s demonstračními příklady
Všechny demonstrační příklady, které byly popsány v dnešním článku, naleznete na adresách:
Všechny minule popsané demonstrační příklady jsou vypsány v následující tabulce:
Demonstrační příklady vytvořené pro Python verze 3.14 a popsané v prvním článku najdete v repositáři https://github.com/tisnik/most-popular-python-libs/. Následují odkazy na jednotlivé příklady:
| # | Příklad | Stručný popis | Adresa |
|---|---|---|---|
| 1 | argparse_test.py | skript s definicí přepínačů použitelných na příkazovém řádku | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/argparse_test.py |
| 2 | syntax_error1.py | skript obsahující syntaktické chyby: chybějící či naopak přebývající písmeno v klíčovém slovu nebo identifikátoru | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/syntax_error1.py |
| 2 | syntax_error2.py | skript obsahující syntaktické chyby: chybějící či naopak přebývající písmeno v klíčovém slovu nebo identifikátoru | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/syntax_error2.py |
| 3 | syntax_error3.py | skript obsahující syntaktické chyby: chybějící či naopak přebývající písmeno v klíčovém slovu nebo identifikátoru | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/syntax_error3.py |
| 4 | syntax_error4.py | skript obsahující syntaktické chyby: chybějící či naopak přebývající písmeno v klíčovém slovu nebo identifikátoru | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/syntax_error4.py |
| 5 | syntax_error5.py | skript obsahující syntaktické chyby: chybějící či naopak přebývající písmeno v klíčovém slovu nebo identifikátoru | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/syntax_error5.py |
| 6 | primes.py | realizace výpočtu prvočísel | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/primes.py |
| 7 | test_primes.py | jednotkové testy pro modul primes.py | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/test_primes.py |
| 8 | pep-758-motivation-1.py | zachycení většího množství výjimek v bloku except – motivační příklad | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/pep-758-motivation-1.py |
| 9 | pep-758-motivation-2.py | zachycení většího množství výjimek v bloku except – motivační příklad | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/pep-758-motivation-2.py |
| 10 | pep-758-usage.py | nový způsob zachycení výjimek definovaný v PEP-758 | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/pep-758-usage.py |
| 11 | pep-758-usage-as.py | klauzule as a nový způsob zachycení výjimek definovaný v PEP-758 | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/pep-758-usage-as.py |
| 12 | pep-765-motivation-1.py | detekce opuštění bloku finally, první demonstrační příklad | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/pep-765-motivation-1.py |
| 13 | pep-765-motivation-2.py | detekce opuštění bloku finally, druhý demonstrační příklad | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/pep-765-motivation-2.py |
| 14 | pep-765-motivation-3.py | detekce opuštění bloku finally, třetí demonstrační příklad | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/pep-765-motivation-3.py |
| 15 | pep-765-motivation-4.py | detekce opuštění bloku finally, čtvrtý demonstrační příklad | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/pep-765-motivation-4.py |
| 16 | f-string-1.py | rozdíl mezi f-řetězci a t-řetězci, první demonstrační příklad | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/f-string-1.py |
| 17 | t-string-1.py | rozdíl mezi f-řetězci a t-řetězci, první demonstrační příklad | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/t-string-1.py |
| 18 | f-string-2.py | rozdíl mezi f-řetězci a t-řetězci, druhý demonstrační příklad | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/f-string-2.py |
| 19 | t-string-2.py | rozdíl mezi f-řetězci a t-řetězci, druhý demonstrační příklad | https://github.com/tisnik/most-popular-python-libs/blob/master/python3.14/t-string-2.py |
20. Odkazy na Internetu
- Python 3.14.0
https://test.python.org/downloads/release/python-3140/ - PEP 765 – Disallow return/break/continue that exit a finally block
https://peps.python.org/pep-0765/ - PEP 758 – Allow except and except* expressions without parentheses
https://peps.python.org/pep-0758/ - What’s new in Python 3.14 (official)
https://docs.python.org/3/whatsnew/3.14.html - What’s New In Python 3.13 (official)
https://docs.python.org/3/whatsnew/3.13.html - What’s New In Python 3.12 (official)
https://docs.python.org/3/whatsnew/3.12.html - What’s New In Python 3.11 (official)
https://docs.python.org/3/whatsnew/3.11.html - What’s New In Python 3.12
https://dev.to/mahiuddindev/python-312–4n43 - PEP 698 – Override Decorator for Static Typing
https://peps.python.org/pep-0698/ - PEP 484 – Type Hints
https://www.python.org/dev/peps/pep-0484/ - What’s New In Python 3.5
https://docs.python.org/3.5/whatsnew/3.5.html - 26.1. typing — Support for type hints
https://docs.python.org/3.5/library/typing.html#module-typing - Type Hints – Guido van Rossum – PyCon 2015 (youtube)
https://www.youtube.com/watch?v=2wDvzy6Hgxg - Python 3.5 is on its way
https://lwn.net/Articles/650904/ - Type hints
https://lwn.net/Articles/640359/ - Stránka projektu PDM
https://pdm.fming.dev/latest/ - PDF na GitHubu
https://github.com/pdm-project/pdm - PEP 582 – Python local packages directory
https://peps.python.org/pep-0582/ - PDM na PyPi
https://pypi.org/project/pdm/ - Which Python package manager should you use?
https://towardsdatascience.com/which-python-package-manager-should-you-use-d0fd0789a250 - How to Use PDM to Manage Python Dependencies without a Virtual Environment
https://www.youtube.com/watch?v=qOIWNSTYfcc - What are the best Python package managers?
https://www.slant.co/topics/2666/~best-python-package-managers - PEP 621 – Storing project metadata in pyproject.toml
https://peps.python.org/pep-0621/ - Pick a Python Lockfile and Improve Security
https://blog.phylum.io/pick-a-python-lockfile-and-improve-security/ - PyPA specifications
https://packaging.python.org/en/latest/specifications/ - Creation of virtual environments
https://docs.python.org/3/library/venv.html - How to Use virtualenv in Python
https://learnpython.com/blog/how-to-use-virtualenv-python/ - Python Virtual Environments: A Primer
https://realpython.com/python-virtual-environments-a-primer/ - virtualenv Cheatsheet
https://aaronlelevier.github.io/virtualenv-cheatsheet/ - Installing Python Modules
https://docs.python.org/3/installing/index.html - Python: The Documentary | An origin story
https://www.youtube.com/watch?v=GfH4QL4VqJ0 - History of Python
https://en.wikipedia.org/wiki/History_of_Python - History of Python
https://www.geeksforgeeks.org/python/history-of-python/ - IPython: jedno z nejpropracovanějších interaktivních prostředí pro práci s Pythonem
https://www.root.cz/clanky/ipython-jedno-z-nejpropracova-nejsich-interaktivnich-prostredi-pro-praci-s-pythonem/ - Další kulaté výročí v IT: dvacet let existence Pythonu 2
https://www.root.cz/clanky/dalsi-kulate-vyroci-v-it-dvacet-let-existence-pythonu-2/ - PEP 684 – A Per-Interpreter GIL
https://peps.python.org/pep-0684/ - What Is the Python Global Interpreter Lock (GIL)?
https://realpython.com/python-gil/ - PEP 703 – Making the Global Interpreter Lock Optional in CPython
https://peps.python.org/pep-0703/ - GlobalInterpreterLock
https://wiki.python.org/moin/GlobalInterpreterLock - What is the Python Global Interpreter Lock (GIL)
https://www.geeksforgeeks.org/what-is-the-python-global-interpreter-lock-gil/ - Let's remove the Global Interpreter Lock
https://www.pypy.org/posts/2017/08/lets-remove-global-interpreter-lock-748023554216649595.html - Global interpreter lock
https://en.wikipedia.org/wiki/Global_interpreter_lock - Rychlost CPythonu 3.11 a 3.12 v porovnání s JIT a AOT překladači
https://www.root.cz/clanky/rychlost-cpythonu-3–11-a-3–12-v-porovnani-s-jit-a-aot-prekladaci-pythonu/ - Dokumentace Pythonu: balíček queue
https://docs.python.org/3/library/queue.html - Dokumentace Pythonu: balíček threading
https://docs.python.org/3/library/threading.html? - Dokumentace Pythonu: balíček multiprocessing
https://docs.python.org/3/library/multiprocessing.html - Dokumentace Pythonu: balíček asyncio
https://docs.python.org/3/library/asyncio.html - Synchronization Primitives
https://docs.python.org/3/library/asyncio-sync.html - Coroutines
https://docs.python.org/3/library/asyncio-task.html - Queues
https://docs.python.org/3/library/asyncio-queue.html - python-csp
https://python-csp.readthedocs.io/en/latest/ - TrellisSTM
http://peak.telecommunity.com/DevCenter/TrellisSTM - Python Multithreading and Multiprocessing Tutorial
https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python