Hlavní navigace

Erlang: procesy a zprávy

18. 8. 2014
Doba čtení: 16 minut

Sdílet

Programovací jazyk Erlang je určený k vytváření distribuovaných systémů pro zpracování velkého množství paralelních úloh. Náš seriál vás provede jeho zajímavými vlastnostmi i konkrétními příklady použití. V dnešním díle si ukážeme, jak se v Erlang spouštějí procesy a jak mohou spolu komunikovat zasíláním zpráv.

Spuštění procesu

Jak bylo řečeno v úvodní kapitole, pro zpracování paralelně běžících úloh existují Erlangu tzv. odlehčené procesy. Nejedná se o vlákna či procesy ve smyslu operačního systému, ale o interní věc runtime prostředí, které má na jejich řízení vlastní scheduler.

Seznam běžících procesů lze zobrazit příkazem shellu i()

21> i().
Pid                   Initial Call                          Heap     Reds Msgs
Registered            Current Function                     Stack
<0.0.0>               otp_ring0:start/2                     1598     3956    0
init                  init:loop/1                              2
<0.3.0>               erlang:apply/2                        1598   677774    0
erl_prim_loader       erl_prim_loader:loop/3                   6
<0.6.0>               gen_event:init_it/6                    376      235    0
error_logger          gen_event:fetch_msg/5                    8
<0.7.0>               erlang:apply/2                        1598      947    0
application_controlle gen_server:loop/6                        7
<0.9.0>               application_master:init/4              376       44    0
                      application_master:main_loop/2           6
<0.10.0>              application_master:start_it/4          233       69    0
                      application_master:loop_it/4             5
...

Pro monitorování systému a mimo jiné pro monitorování jednotlivých procesů v něm (a mnoho dalšího) se hodí program observer. Jedná se o grafický program (Erlang umí malovat grafická okénka, pokud má kam). Spouští se zavoláním funkce observer:start()

22> observer:start().
ok

V programu Observer lze na záložce Processes vidět tabulku se seznamy procesů.

Od minule víme, že pro spuštění funkce je potřeba uvést modul, funkci a argumenty, což se zapisuje

1> io:format("Hello World~n").
Hello World
ok
2>

To samé lze alternativně provést pomocí funkce apply

2> apply(io, format, ["Hello World~n"]).
Hello World
ok
3>

První parametr je modul, druhý funkce v něm (oba jsou atomy) a třetí je list s parametry volané funkce.

Možnost určit spouštěnou funkci a její parametry jako parametry funkce se často využívá při práci s callbacky, kdy jeden modul obsahuje nějaký obecný kód a v něm se na různých místech pouští kód jiného modulu dělajícího nějakou konkrétnější činnost (v objektovém programování se na podobné věci používají virtuální metody – zhruba řečeno).

Funkce spawn vytvoří nový proces a začne v něm vykonávat kód tak, že zavolá funkci, kterou dostane v parametrech, které jsou stejné jako ve funkci apply.

3> spawn(io, format, ["Hello World~n"]).
Hello World
<0.38.0>
4>

Co se stalo. Funkce spawn spustila nový proces, a vrátila jeho tzv. PID (process ID – to jsou ty číslíčka ve špičatých závorkách). Nový proces zavolal funkci io:format/1, která vypsala text a skončila. Návratová hodnota funkce io:format/1 (atom ok) se ztratila. Po ukončení volané funkce (nezbyl už žádný kód, co by se dal vykonávat) nový proces skončil.

Vytvářet nové procesy a v nich pouštět funkce, které hned skončí, není zrovna chytrý nápad, ale než si ukážeme, jak psát kód pro procesy, které běží pokud možno trvale, je třeba si říci něco o komunikaci pomocí zasílání zpráv.

Zprávy

Procesy v Erlangu spolu komunikují zprávami. Každý proces má svoji vlastní frontu zpráv, kde si může číst, co mu ostatní poslali.

Je zaručeno, že pokud jeden proces pošle jinému několik zpráv, dojdou ve stejném pořadí. Pokud více procesů zasílá zprávy jednomu procesu, není zaručeno, že dorazí v pořadí podle času odeslání (pokud byly odeslány hodně brzy po sobě).

Zprávy se odesílají operátorem !. Na levé straně je odkaz na proces, kterému je zpráva určena (nejčastěji PID), na pravé straně je obsah zprávy (libovolná hodnota v Erlangu).

Pid ! {foo, [bar, quix]},

Operátor ! jako každý jiný operátor vrací hodnotu, a tou je obsah přenášené zprávy. Toho lze využít, pokud je např. třeba poslat jednu zprávu více procesům (Pid1 ! Pid2 ! Msg).

Zatím ještě nemáme vytvořený proces, který by uměl přijmout zprávu, ale pro ukázku lze využít toho, že Erlang shell je napsaný v Erlangu a pomocí funkce self() zjistit Pid shellu. Pak lze poslat zprávu sám sobě a příkazem shellu flush() vyzvednout zprávy, co se tak dostaly shellu do ve fronty a vypsat je (pozor, self() je funkce Erlangu, flush() je příkaz shellu).

20> Pid = self().
<0.50.0>
21> Pid ! msg1.
msg1
21> flush().
Shell got msg1
ok
23> Pid ! msg2.
msg2
24> Pid ! msg3.
msg3
25> flush().
Shell got msg2
Shell got msg3
ok

Odeslání zprávy procesu, který již neexistuje, se tiše přejde a nezpůsobí chybu. Proč. Pokud o nic nejde, tak netřeba odesílatele zatěžovat starostí, zda Pid, co kdysi získal, ještě existuje a vytvářet tak nadbytečné vazby. Pokud o něco jde, tak odeslání zprávy existujícímu procesu nic neřeší, protože není známo, zda si ji příjemce přečetl a porozuměl jí. V takovém případě je stejně nutné zavést do protokolu výměny zpráv nějakou formu potvrzování a pokud na zprávu odeslanou neexistujícímu procesu potvrzení nepřijde, tak se chyba projeví.

Příjem zpráv se provádí pomocí klíčového slova receive. Za ním následuje jeden nebo více vzorů (pattern matching) doplněných o případné omezující podmínky (guards). V každé větvi jsou pak příkazy, které se v ní mají vykonat.

receive
  Vzor1 when Podminka1 -> expr, ... expr;
  Vzor2 when Podminka2 -> expr, ... expr;
  ...
  VzorN when PodminkaN -> expr, ... expr
end

Jedná se o konstrukci velmi podobnou příkazu case. Také se zde vezme hodnota a postupně se hledá vzor (s případnou omezující podmínkou), na který pasuje. Pokud se najde, vykonají se příkazy uvedené za vzorem. Rozdíl je v tom, že hodnota k porovnání se zde bere z fronty zpráv. Příkaz receive má návratovou hodnotu a tou je hodnota posledního výrazu ve větvi, která byla zvolena k vykonání. Obdobně jako příkaz case nebo if.

Příkaz receive aplikuje vzory v něm obsažené na první (nejstarší) zprávu ve frontě. Pokud žádný ze vzorů nepasuje, zkusí se následující zpráva. V případě, že žádná ze zpráv ve frontě nepasuje, proces se zastaví (suspenduje) a čeká se na novou zprávu, aby se mohla zkusit porovnat se vzory. V praxi je obvykle situace mnohem jednodušší. Ve frontě není nic a příkaz receive čeká na zprávu, kterou následně umí zpracovat.

Protože se proces může na příkaz receive zastavit (jedná se o blokační operaci), může se občas hodit mít možnost zadat hodnotu timeoutu, po jaké se má přestat čekat. To se zapisuje takto:

receive
  msg1 -> action1();
  msg2 -> action2()
after 1000 -> action_timeout()
end

Čas timeoutu se zapisuje v milisekundách (v tomto případě se jedná o jednu sekundu) a po vypršení času ze zvolí větev za after a návratovou hodnotou operace je poslední výraz v této větvi.

To že příkaz receive nepracuje jen s první zprávou ve frontě, ale zkouší i následující (a pak případně čeká), přináší riziko, že se díky nějaké chybě v programu mohou ve frontě hromadit nevyzvednuté zprávy, které budou pokaždé přeskakovány. To není dobré, neboť nevyzvednuté zprávy obsahují data, která se nikdy neuvolní (memory leak) a tím, že se neustále porovnávají se vzory, při každé příchozí zprávě postupně snižují výkon systému. Časem pak dojde k zastavení runtime prostředí z důvodu nedostatku paměti, nebo jej zoufalý administrátor po stížnostech nespokojených uživatelů restartuje spolu s celým počítačem. Aby se toto nestalo, je dobré se při psaní programu nespoléhat na selektivní vybírání zpráv z fronty. Nejlépe tak, že se vzory na očekávané zprávy vloží do jednoho velkého receive místo několika menších za sebou a na posledním místě se uvede vzor pasující na cokoliv s patřičným ošetřením neočekávaných zpráv (zalogováním, ukončení procesu s chybou a pod. dle okolností).

Příklad – echo proces

Nyní už toho máme dost na to si zkusit napsat první proces, co se spustí, reaguje na zprávy a až je o to požádán, tak se ukončí. Ukážeme si to na procesu, kterému půjde poslat libovolná data a on je podešle zpátky odesilateli (echo proces).

Zdrojový kód modulu echo_proc.erl vypadá takto.

%%
%% Echo proces
%%
-module(echo_proc).
-export([init/0, start/0, stop/1, do_echo/2]).

%%
%% funkce pro vypinani a zapinani procesu
%%

% zapnuti procesu
start() ->
    debug_msg("start"),
    spawn (echo_proc, init, []).

% vypnuti procesu
stop(Pid) ->
    debug_msg ("sending stop"),
    Pid ! stop,
    ok

%%
%% uzivatelelske API
%%

% odeslani dat a prijmuti odpovedi
do_echo (Pid, ReqData) ->
    debug_msg ("sending request"),
    Pid ! {request, self(), ReqData},
    debug_msg ("waiting for reply"),
    receive
       {response, RespData} ->
           debug_msg ("recieve reply"),
           {ok, RespData}
    after 1000 ->
        debug_msg ("reply timeout"),
        timeout
    end.

%%
%% interni funkce souvisejici s fungovanim procesu
%%

% inicializacni funkce
init() ->
   debug_msg("init"),
   loop().

% hlavni cyklus
loop() ->
   debug_msg ("waiting for message"),
   receive
       % echo pozadavek
       {request, From, RequestData} ->
           debug_msg ("recieve request"),
           ResponseData = RequestData,   % echo server - zasila se zpatky co prislo
           debug_msg ("sending response"),
           From ! {response, ResponseData},
       loop ();

       % zadost o zastaveni procesu
       stop ->
           debug_msg ("recieve stop"),
           terminate();

       % osetreni neocekavane zpravy ve fronte
       Other ->
           io:format("unexpected message ~w~n", [Other]),
       loop ()
   end.

% ukoncovaci funkce
terminate() ->
   debug_msg("terminate"),
   ok.

%%
%% lokalni pomocne funkce
%%

% vypis ladicich vypisu
debug_msg (Text) ->
   io:format("[~w] ~s~n", [self(), Text]).

Co se tu děje. První funkce start/0 slouží ke spuštění nového procesu. Jedná se jen o zkratku volající spawn/3, která vytvoří nový proces, v němž se začne vykonávat funkce init/0 (prázdný list znamená nula parametrů). Funkce stop/1 je určena k zaslání požadavku na zastavení procesu. To, že se to dělá zasláním atomu stop, je interní záležitost a úkolem funkce stop je tento detail skrýt před uživatelem. Funkce start/0stop/1 jsou tzv. O&M funkce (operation & maitenace) a jsou určeny pro nějakou řídící část systému, která má za úkol zapínat nebo vypínat (a dozorovat) procesy dle potřeby.

V programu jsou na zajímavých místech ladící výpisy. Ty se vypisují na výstup interní funkcí debug_msg/1, která před text přidá PID procesu, ve kterém byla zavolána.

Zkusíme si modul zkompilovat a spustit proces.

1> c(echo_proc).
{ok,echo_proc}
2> EchoPid = echo_proc:start().
[<0.32.0>] start
[<0.40.0>] init
[<0.40.0>] waiting for message
<0.40.0>

Funkce start/0 se pustila v procesu Erlang shellu (PID <0.32.0>). V ní se vytvořil zavoláním funkce spawn/3 nový proces (PID <0.40.0>) a ten začal vykonávat funkci init/0. Návratovou hodnotou funkce start/0 je PID nově vytvořeného procesu, který se uložil do proměnné EchoPid. Funkce init/0 kromě výpisu textu nic nedělá a zavolá funkci loop/0. Zde je čekání na zprávu ve frontě receive (bez timeoutu), na kterém se proces zastavil a čeká na příchozí zprávu.

Tak tam zkusme něco poslat.

3> EchoPid ! foo.
unexpected message foo
[<0.40.0>] waiting for message

Zpráva foo není něco, s čím protokol tohoto procesu počítá, takže se zpracovala v poslední (třetí) větvi určené pro neočekávané zprávy. Zde je vidět výpis textu a za ním rekuzivní zavolání funkce loop/0, takže po vyřízení zprávy (vypsání textu) se proces opět zastaví a čeká na novou zprávu ve frontě.

V první větvi je vidět, že očekávaná zpráva má podobu n-tice

{request, From, RequestData}

kde request je atom From je PID, kam zaslat odpověď a RequestData jsou přenášená data. Nic nebrání takovou n-tici vytvořit ručně a poslat ji.

4> EchoPid ! {request, self(), test_data}.
[<0.40.0>] recieve request
[<0.40.0>] sending response
{request,<0.32.0>,test_data}
[<0.40.0>] waiting for message
5> flush().
Shell got {response,test_data}
ok

Proces takovou zprávu přijal, sestaví odpověď {response,test_data} a tu poslal zpět odesilateli. V tomto případě se jednalo o shell, jehož fronta pak byla vyprázdněna příkazem flush(). Řádek {request,<0.32.0>,test_data} je jen návratová hodnota operátoru odeslání zprávy (!), kterou je přenášená zpráva. Tím že se jedná o dva různé procesy (shell a echo_proc), se může stát, že se jejich výpisy píší přes sebe.

Vytvoření a odeslání zprávy v očekávaném tvaru a přijmutí odpovědi je zapouzdřeno ve funkci do_echo/2.

6> echo_proc:do_echo(EchoPid, test_data2).
[<0.32.0>] sending request
[<0.32.0>] waiting for reply
[<0.40.0>] recieve request
[<0.40.0>] sending response
[<0.40.0>] waiting for message
[<0.32.0>] recieve reply
{ok,test_data2}

Nejprve se v rámci procesu shellu (PID <0.32.0>) sestavila zpráva, odeslala se echo procesu a shell se zastavil na čekání na odpověď. V echo procesu (PID <0.40.0>) se požadavek přijal, zpracoval, odeslala se odpověď a proces se zastavil na čekání na další zprávu. Odpověď dorazila do volajícího procesu (shell), kde se přijala a sestavila se z ní návratová hodnota funkce do_echo. Funkce se ukončila a co vrátila se zobrazilo na výstupu ({ok,test_data2}).

Obdobně funguje funkce stop/1. Zde se je situace jednodušší že se jen odešle zpráva (atom) stop a nečeká na odpověď. Na straně echo procesu se na takovou zprávu (atom stop) reaguje tak, že se zavolá funkce terminate/0. Po jejím doběhnutí již nezbývá žádný kód k vykonání (rekurze se přeruší) a proces se ukončí.

7> echo_proc:stop(EchoPid).
[<0.32.0>] sending stop
[<0.40.0>] recieve stop
[<0.40.0>] terminate
ok

Po zastavení echo procesu v proměnné EchoPid stále zbývá jeho PID. Na něj lze beztrestně odesílat libovolné zprávy, ale odpověď pochopitelně nepřijde a čekání na ni skončí timeoutem. Ještě že tam je, jinak by se zablokoval shell nekonečným čekáním a v rámci současné úrovně poznání by nezbylo nic jiného než runtime prostředí zastavit a pustit znovu.

8> echo_proc:do_echo(EchoPid, test_data3).
[<0.32.0>] sending request
[<0.32.0>] waiting for reply
[<0.32.0>] reply timeout
timeout

Registrované procesy

Jak bylo ukázáno výše, zprávy se zasílají na tzv. process ID (PID) procesu. K tomu aby bylo možné zaslat procesu zprávu, je nutné tento PID znát. Alternativou k tomu je tzv. registrace procesu. Proces se může zaregistrovat pod nějakým jménem a zprávy pak lze posílat na toto jméno, nikoliv na konkrétní PID. Odesilatel nemusí znát PID příjemce a řešit tak případnou změnu PIDu, pokud se např. proces restartuje.

Registrace se provádí funkcí register/2, zrušení registrace unregister/1, převod jména na PID funkcí whereis/1. Pokud registrovaný proces zanikne, registrace se automaticky zruší. Registrovaným jménem je atom.

1> register(myname, self()).
true
2> myname ! msg1.
msg1
3> myname ! msg2.
msg2
4> flush().
Shell got msg1
Shell got msg2
ok
5> whereis(myname).
<0.32.0>
6> unregister(myname).
true
7> myname ! msg3.
** exception error: bad argument
     in operator  !/2
        called as myname ! msg3

Je vidět, že odeslání zprávy na neexistující registrované jméno skončí chybou.

Příklad – inkrementační proces

V předchozím příkladě echo procesu nezávisely výsledky zasílání zpráv na předchozí činnosti. Proces byl tzv. bezestavový. Často je ale potřeba, aby si proces nějaká data (stavová data) pamatoval a na základě různých událostí (co mu přijdou zprávami) je měnil a uměl vracet aktuální hodnoty. Dále se pak může hodit, aby šlo procesu při startu a inicializaci předat nějaké vstupní parametry, které si bude pamatovat a využívat je při reakcích na zprávy. To si ukážeme v následujícím příkladě, ve kterém půjde o to udržovat hodnotu čítače a zvyšovat ji, pokud na to přijde zpráva. Dále půjde zjistit aktuální hodnotu čítače a nechat jej zresetovat. Při startu půjde zadat maximální hodnotu čítače, po jejímž dosažení se čítač nastaví na nulu.

Tato triviální úloha může být nějakým zárodkem počítání statistiky výskytu událostí, která se pak může protokolem SNMP přenášet do nějakých grafů v dohledovém centru nebo tak něco.

V příkladu se dále bude využívat registrace procesu, takže nebude nutné funkci pro zaslání zprávy dávat v parametrech PID, kde proces běží.

Zdrojový kód modulu counter_proc.erl. Pro zpřehlednění zde nejsou ladící výpisy jako v předchozím případě.

%%
%% priklad - inkrementacni proces
%%
-module(counter_proc).

-export([init/1, start/0, start/1, stop/0, increment/0, reset/0, get_counter/0]).

%%
%% funkce pro vypinani a zapinani procesu
%%

% zapnuti procesu
start() ->
    start (infinity).

start(MaxValue) ->
    spawn (counter_proc, init, [MaxValue]).

% vypnuti procesu
stop() ->
    counter_proc ! stop,
    ok.

%%
%% uzivatelelske API
%%

% zvetseni citace
increment () ->
    counter_proc ! {request, increment},
    ok.

% reset counteru
reset() ->
    counter_proc ! {request, reset},
    ok.

% ziskani hodnoty
get_counter () ->
    counter_proc ! {request, self(), get_counter},
    receive
       {response, Counter} -> {ok, Counter}
    after 1000 -> timeout
    end.

%%
%% interni funkce souvisejici s fungovanim procesu
%%

% inicializacni funkce
init(MaxValue) ->
   register(counter_proc, self()),
   StateData = {0, MaxValue},
   loop(StateData).

% hlavni cyklus
loop (StateData) ->
   receive
       % zjisteni hodnoty pocitadla
       {request, From, get_counter} ->
       {CounterValue, _} = StateData,   % zajima nas jen hodnota pocitadla
           From ! {response, CounterValue},
       loop (StateData);

       % zvyseni hodnoty pocitadla
       {request, increment} ->
       {CounterValue, MaxValue} = StateData,
           NewCounterValue = increment_counter (CounterValue, MaxValue),
       NewStateData = {NewCounterValue, MaxValue},
       loop (NewStateData);

       % reset pocitadla
       {request, reset} ->
       {_ , MaxValue} = StateData,   % zajima nas jen MaxValue
       NewStateData = {0, MaxValue},
       loop (NewStateData);

       % zadost o zastaveni procesu
       stop ->
           terminate(StateData);

       % osetreni neocekavane zpravy ve fronte
       Other ->
           io:format("unexpected message ~w~n", [Other]),
       loop (StateData)
   end.

% ukoncovaci funkce
terminate(_StateData) ->
   ok.

%%
%% lokalni pomocne funkce
%%
increment_counter(Counter, MaxValue) when ((MaxValue == infinity) or (Counter < MaxValue)) ->
    Counter + 1;
increment_counter(_Counter, _MaxValue) -> 0.

Základní myšlenka fungování je stejná jako v předchozím případě, takže budu upozorňovat hlavně na rozdíly.

Funkce start/1 má jeden parametr (maximální hodnota), který je třeba nějak dostat do procesu (funkce start/0 jen zavolá start/1 s defaultní hodnotou). Parametr se předá funkci init/1 při vytváření nového procesu funkcí spawn/3.

V nově vytvořeném procesu funkce init/1 nejprve zaregistruje PID procesu pod jménem counter_proc (bývá zvykem registrovat proces se stejným jménem, jako má příslušný modul). Pak se vytvoří inicializační stavová data, což je n-tice skládající se ze dvou hodnot. Aktuální hodnoty čítače (nula) a maximální hodnoty. S takto vytvořenými stavovými daty se zavolá funkce loop/1.

Funkce loop/1 má na rozdíl od minulého příkladu jeden parametr, a tím jsou stavová data. Při každém rekuzivním zavolání sama sebe se tato stavová data přenáší dále a mohou se do nich zaznamenávat případné změny.

Princip zasílání zpráv a reakce na ně je obdobný jako minule. V první větvi očekává n-tice

{request, From, get_counter}

ze stavových hodnot se vytáhne co je třeba (hodnota čítače) a odešle se.

V dalších dvou větvích se reaguje na zprávy, na které se neodesílá odpověď, ale modifikují se stavová data.

Čtvrtá větev řeší žádost o zastavení procesu a poslední vyzvedává z fronty nečekané zprávy.

Uživatelské funkce na odesílání zpráv (increment/0, reset/0, get_counter/0, stop/0) posílají zprávy na registrované jméno.

Výsledek funguje dle očekávání

1> c(counter_proc).
{ok,counter_proc}
2> counter_proc:start(5).
<0.40.0>
3> counter_proc:get_counter().
{ok,0}
4> counter_proc:increment().
ok
5> counter_proc:increment().
ok
6> counter_proc:increment().
ok
7> counter_proc:increment().
ok
8> counter_proc:increment().
ok
9> counter_proc:get_counter().
{ok,5}
10> counter_proc:increment().
ok
11> counter_proc:get_counter().
{ok,0}
12> counter_proc:stop().
ok

V příkladu je vidět, že některé operace vyžadují odpověď (jsou synchronní), jiné odpověď nevyžadují (jsou asynchronní). Asynchronní je zejména funkce pro zvýšení hodnoty čítače. Dá se očekávat, že tuto funkci by volaly procesy, co pracují nezávisle na sobě s nějakými transakcemi. Pokud v nich nastane potřeba zvýšit čítač, tak jen požádají runtime prostředí, aby nějakému procesu vložil zprávu do fronty. Nemusejí se domlouvat nad nějakým zámkem nad sdílenou pamětí a tím se navzájem ovlivňovat.

UX DAy - tip 2

Vlastní zpracovávání požadavků v procesu je sekvenční. Tj. dokud se nevyřídí jedna zpráva, ostatní čekají ve frontě. Toho lze využít, pokud je naopak třeba, aby se volající procesy nějak synchronizovaly. A to tak, že se použijí synchronní zprávy.

Dále je patrné, že základní kostra kódu v obou příkladech si je nápadně podobná. Jedná se o návrhový vzor procesu typu server. Příště si ukážeme nějaké další návrhové vzory (dva) a řekneme si něco o tom, jak takové procesy psát s využitím knihoven, aby nebylo třeba pokaždé psát obecné funkce na výměnu zpráv znovu.

Kdyby se v Praze konala konference o Erlangu...

Byl pro vás článek přínosný?