Hlavní navigace

Erlang: návrhové vzory procesů

25. 8. 2014
Doba čtení: 13 minut

Sdílet

Programovací jazyk Erlang je určený k vytváření distrubuovaných systémů pro zpracování velkého množství paralelních úloh. Náš seriál vás provede jeho zajímavými vlastnostmi i konkrétními příklady použití. V dnešním díle si ukážeme další typy procesů a způsoby, jak mezi sebou mohou komunikovat.

Proces typu server

minulém díle jsme si ukázali dva příklady procesů, které příjmaly zprávy a dle stavových dat na ně reagovaly. Proces se spustil, proběhla jeho inicializační fáze, jejímž výstupem byla stavová data. Pak se spustila nekonečná smyčka nad frontou zpráv, na které proces reagoval. Reakce mohla být synchronní (na požadavek se po vyřízení zprávy zaslala odpověď) nebo asynchronní (odpověď se odesílateli neposlala). Díky rekurzivnímu volání sebe sama jako poslední příkaz ve funkci loop (tail rekurze) je možné takto udělat nekonečnou smyčku, aniž by rostl stack. Po přerušení čtecí smyčky (na žádost) se zavolala ukončovací funkce (terminate) a proces skončil.

Proces typu server se používá k udržování dat ve stavových proměnných (může se jednat o relativně velké tabulky) a umožňuje tato data číst a měnit. Přičemž požadavky na změnu jsou serializované (vykonávají se jeden po druhém), takže nehrozí vznik nekonzistentních dat z důvodu vykonávání více souběžných operací.

Příklady procesů typu server byly uvedeny v minulém díle.

Proces typu stavový automat (FSM)

Proces typu stavový automat (finite state machine – FSM) je velmi podobný procesu typu server. Udržuje si nějaká stavová data a přijmá zprávy, na které může odpovídat (synchronní volání) nebo odpovídat nemusí (asynchronní volání). Rozdíl je v tom, že kromě stavových dat se udržuje navíc tzv. stav procesu. Tento stav procesu se obvykle zohledňuje při reakcích na zprávy. Stav procesu se může (ale nemusí) po odbavení zprávy změnit.

FSM by šel snadno vyrobit z obecného procesu typu server přidáním jedné hodnoty ke stavovým datům (atom s označením stavu). Pak by ale vyhodnocování přijatých zpráv mohlo být dost rozsáhlé a komplikované. Reakce na zprávy je obvykle v různých stavech jiná. Proto se při implementaci FSM nepoužívá jedna velká recieve rozhodovací struktura, která by zohledňovala stav procesu, ale pro každý stav existuje zpracování zpráv zvlášť.

Příklad – jednoduchým příkladem FSM je úloha typu zámek (lock). Zámek má dva stavy. Může být buď uvolněný nebo uzamčený. Pokud se nějaký proces pokusí zámek získat tak se to buď povede (zámek byl volný) nebo nepovede (zámek držel jiný proces). Zámek může uvolnit jen ten proces, který jej získal.

%%
%% priklad FSM  - jednoduchy zamek
%% file: simple_lock.erl
%%
-module (simple_lock).

-export([start/1, stop/1, take_lock/1, free_lock/1]).
-export([init/1, free/1, locked/1]).

%%
%% O & M funkce
%%
start(LockName) ->
  InitParams = [LockName],
  spawn (simple_lock, init, [InitParams]),
  ok.

stop (LockName) ->
  LockName ! stop.

%%
%% uzivatelske API
%%
take_lock (LockName) ->
  LockName ! {request, take_lock, self()},
  receive
    {response, Result} -> Result
  after 1000 -> timeout
  end.

free_lock (LockName) ->
  LockName ! {request, free_lock, self()},
  receive
    {response, Result} -> Result
  after 1000 -> timeout
  end.

%%
%% interni funkce
%%
init ([LockName]) ->
  register (LockName, self()),
  StateData = undefined,
  free (StateData).

% zpracovani udalosti pokud je zamek volny
free (LockerPid) ->
  receive

    {request, take_lock, From} ->
      From ! {response, ok},
        locked (From);

    {request, free_lock, From} when From /= LockerPid ->
      From ! {response, {error, not_locked}},
      free (LockerPid);

    stop ->
      terminate (LockerPid);

    Other ->
      io:format ("unexpected message ~w~n", [Other]),
      free (LockerPid)
  end.

% zpracovani udalosti pokud je zamek uzamceny
locked (LockerPid) ->
  receive
    {request, free_lock, From} ->
      case From of
        % ten kdo uvolnuje zamek musi byt ten kdo si jej vyzadal
        P when P == LockerPid ->
          From ! {response, ok},
          free (undefined);
        P when P /= LockerPid ->
          From ! {response, {error, not_owner}},
      locked (LockerPid)
      end;

    {request, take_lock, From} ->
      From ! {response, {error, locked}},
      locked (LockerPid);

    stop ->
      terminate (LockerPid);

    Other ->
      io:format ("unexpected message ~w~n", [Other]),
      locked (LockerPid)
  end.

terminate (_) -> ok.

Jak je vidět, jedná se o proces velmi podobný typu server, hlavní rozdíl je v tom, že místo jedné funkce na zpracování zpráv je jich více (pro každý stav jedna). Zámek funguje dle očekávání.

1> c(simple_lock).
{ok,simple_lock}
2> simple_lock:start(lock1).
ok
3> simple_lock:take_lock(lock1).
ok
4> simple_lock:take_lock(lock1).
{error,locked}
5> simple_lock:free_lock(lock1).
ok
6> simple_lock:free_lock(lock1).
{error,not_locked}

Obsluha událostí (event handlery)

Tento typ procesů je určen k příjímání událostí a jejich následné obsluze. Spustí se proces (tzv. event manager), do kterého přicházejí události (dostává zprávy do fronty). Tyto události jsou obsluhovány tzv. event handlery. Event handler není předem dán, může se kdykoliv zaregistrovat (a pak přijímat události) nebo odregistrovat. Event handlerů může být zaregistrováno více, pak se volají jejich obslužné funkce v pořadí, v jakém jsou zaregistrovány.

Z hlediska procesů existuje jen jeden, a tím je event manager. Jedná se o variaci na téma proces typu server. Event manager je obecný kód a netřeba jej kvůli nějaké konkrétní úloze programovat. V event manageru jsou zaregistrovány jednotlivé event handlery a v nich je naprogramována konkrétní činnost při obsluze událostí. Event manager je něco jako server, který přijímá jen asynchronní zprávy (na události se nezasílá odpověď) a reakce na tyto události je dána event handlery, což jsou něco jako „pluginy“, které lze za běhu event manageru libovolně přidávat či odebírat.

Event handler obdobně jako předchozí typy procesů může mít vstupní parametry, které se předají do inicializační funkce (volá se při registraci). Zde se z nich se vytvoří prvotní stavová data. Při obsluze události event handleru jsou stavová data k dispozici a je třeba je pak poslat dále, při tom je pochopitelně možno v nich provést změny. Po odregistrování event handleru se volá ukončovací funkce, kde je k dispozici poslední verze stavových dat. Event handler nemá vlastní proces. Jedná se o modul, který má předepsáno, jaké funkce musí exportovat, a ty pak volá proces event manageru.

Nejjednoduším příkladem je nějaký logovací framework. Spustí se proces pro příjem logovacích zpráv a pokud se neudělá nic dalšího, tak příchozí události nic nezpůsobují. Kdykoliv lze dle potřeby do logování zapojit modul, co začne logovací zprávy vypisovat na obrazovku, do souboru, syslogu a pod.

Může se ale jedna i o něco jiného. Obecně se event handlery hodí na vytváření míst, kam se mohou napojovat jiné části systému, aby mohly reagovat na nějaké události.

Příklad: Systém má různá spojení (např. TCP/IP linka), nad kterými se posílají jistým protokolem data. Kromě zpráv na přenášení dat zde existují mimo jiné i zprávy, že nějaká adresa (v rámci onoho protokolu) co může být o několik „hopů“ dále, přestala být dosažitelná nebo naopak se její dosažitelnost obnovila. Management linky tuto informaci nemá jak zpracovat. Ta je zajímavá pro vyšší vrstvy protokolu (pokud v nich je obsaženo směrování) nebo pro aplikaci nad ním a záleží, jak tyto vyšší vrstvy vypadají. To ale management linky neví a vědět nepotřebuje. Nabízí se vytvořit event manager (ať už jeden pro všechny linky nebo pro každou linku zvlášť), kam se budou tyto události zasílat, a ty části programu, které je potřebují znát, si v event manageru zaregistrují svůj event handler.

Příklad obecného event manageru. Jedná se o rozsáhlejší kus kódu a je na něm ukázáno více věcí.

%%
%% Priklad obecny event manager
%% file: event_mngr.erl
%%
-module(event_mngr).

-export([start/1, start/2, stop/1]).
-export([register_hander/3, unregister_handler/2, event/2, get_handler_data/2]).
-export([init/1]).

%%
%% O & M funkce
%%

% start - jmeno handleru a list handleru, co se maji ihned
% po startu zaregistrovat. Prvky listu jsou n-tice
% [{modul_halnderu, InicilaizacniData}]
start (ProcName, Handlers) ->
    InitParams = [ProcName, Handlers],
    spawn (event_mngr, init, [InitParams]),  % spusteni procesu
    ok.

% start bez handleru - s prazdnym listem
start (ProcName) -> start (ProcName, []).

% stop - jedna se o synchronni volani, vraci se posledni stavova
% data aktualne zaregistrovanych handleru
stop (ProcName) ->
   ProcName ! {stop, self()},
   get_reply ().

%%
%% uzivatelske API
%%
register_hander (ProcName, HandlerMod, InitData) ->
  call (ProcName, {register_hander, HandlerMod, InitData}).

unregister_handler (ProcName, HandlerMod) ->
  call (ProcName, {unregister_handler, HandlerMod}).

event (ProcName, EventData) ->
  call (ProcName, {event, EventData}).

get_handler_data (ProcName, HandlerMod) ->
  call (ProcName, {get_handler_data, HandlerMod}).

%%
%% funkce procesu
%%

% inicilalizace
init ([ProcName, Handlers])  ->
  register (ProcName, self()), % registrace jmena procesu
  StateData = init_handlers(Handlers),
  loop (StateData).

% reakce na udalosti
handle_request ({register_hander, HandlerMod, InitData}, StateData) ->
  {ok, [{HandlerMod, HandlerMod:init(InitData)} | StateData]};

handle_request ({unregister_handler, HandlerMod}, StateData) ->
  case lists:keysearch(HandlerMod, 1, StateData) of
    {value, {HandlerMod, HanderState}} ->
      TermState = HandlerMod:terminate(HanderState),
      Reply = {ok, TermState},
      NewStateData = lists:keydelete(HandlerMod, 1, StateData),
      {Reply, NewStateData};
    false -> {{error, no_such_handler, StateData}}
  end;

handle_request ({event, EventData}, StateData) ->
  NewStateData = handle_event_handlers (EventData, StateData),
  {ok, NewStateData};

handle_request ({get_handler_data, HandlerMod}, StateData) ->
  case lists:keysearch(HandlerMod, 1, StateData) of
    {value, {HandlerMod, HanderState}} ->
      Reply = {ok, HanderState},
      {Reply, StateData};
    false -> {{error, no_such_handler, StateData}}
  end.

%%
%% lokalni pomocne funkce
%%

% funkce souvisejici s handlery udalosti

% pro kazdy handelr v listu se zavola jeho funkce init
init_handlers ([]) -> [];
init_handlers ([{HandlerMod, InitData} | Tail]) ->
   InitState = HandlerMod:init(InitData),
   [{HandlerMod, InitState} | init_handlers (Tail)].

% pro kazdy handler v listu se zavola jeho funkce terminate
terminate_handlers([]) -> [];
terminate_handlers ([{HandlerMod, HanderState} | Tail]) ->
   TermState = HandlerMod:terminate(HanderState),
   [{HandlerMod, TermState} | terminate_handlers (Tail)].

% pro kazdy handler v listu se zavola funkce handle_event
handle_event_handlers(_, []) -> [];
handle_event_handlers (EventData, [{HandlerMod, HanderState} | Tail]) ->
   NewHanderState = HandlerMod:handle_event(EventData, HanderState),
   [{HandlerMod, NewHanderState} | handle_event_handlers (EventData, Tail)].


% meziprocesovy zpravickovy protokol
call (ProcName, Data) ->
  ProcName ! {request, self(), Data},
  get_reply ().

send_reply (To, Data) ->
  To ! {reply, Data}.

get_reply () -> get_reply (1000).
get_reply (TimeOut) ->
  receive {reply, Reply} -> Reply
  after TimeOut -> timeout end.

% cteci cyklus na zpravy
loop (StateData) ->
  receive
    {request, From, RequestData} ->
      {ReplyData, NewStateData} = handle_request (RequestData, StateData),
      send_reply (From, ReplyData),
      loop (NewStateData);
    {stop, From} ->
      StopData = terminate_handlers (StateData),
      send_reply (From, StopData);
    Other ->
      io:format ("unexpected message ~w~n", [Other]),
      loop (StateData)
  end.

Funkce start/2stop/1 fungují obdobně jako v předchozích příkladech. Výsledný proces se spustí pod zadaným registrovaným jménem (parametr ProcName) a na toto jméno je pak třeba zasílat zprávy. Je to tak proto, že se jedná o obecný kód, ale dá se předpokládat, že tento proces bude spuštěn ve více instancích.

Další rozdíl je ve čtecím cyklu na příchozí zprávy loop/1. Ten je nyní udělán obecně, stejně tak protokol pro výměnu zpráv je ukryt v obecných funkcích call/1, get_reply/1send_reply/1.

Aplikační logika procesu je v inicializační funkci init/1, funkci na zpracování příchozích požadavků handle_request/2 a v ukončovací funkci terminate/1. Vše ostatní co se týče provozu procesu typu server a protokolu na výměnu zpráv by mohlo být přemístěno do nějakého obecného modulu.

Stavovými daty je list n-tic ve tvaru {HandlerModule, HanderState}. Jelikož není dopředu známé, jaké moduly s handlery se budou používat, je třeba umět zavolat funkci z modulu, jehož jméno je uloženo ve proměnné. Od minule víme, že se to dá zařídit funkcí apply/3. Ale dá se to udělat rovnou tak, že místo modulu ve volání funkce se použije proměnná obsahující atom s názvem modulu.

1> io:format("Hello world~n").
Hello world
ok
2> ModuleName = io.
io
3> ModuleName:format("Hello world~n").
Hello world

Nahradit jméno funkce proměnnou obsahující atom stejného jména funguje také. Pokud volaný modul neexistuje, nebo existuje, ale neobsahuje volanou funkci, tak volání skončí s chybou (výjimka) jako každé jiné volání neexistující funkce.

Použití tohoto postupu je vidět ve funkcích init_handlers/1, terminate_handlers/1handle_event_handlers/1. V těchto funkcích se provádí tzv. mapování. Vezme se list a nad každým jeho prvkem se provede nějaká operace. V tomto případě se z prvku listu vezme jméno modulu a hodnota a zavolá se funkce z modulu s touto hodnotou. Výsledkem je opět n-tice se jménem modulu a novou hodnotou, která vznikla zavoláním funkce.

Uživatelské funkce register_hander/1, unregister_hander/1, get_handler_data/1event/1 zasílají zprávu serveru a čekají na odpověď. Ta se vyřizuje v příslušné variantě funkce handle_request/1. Přidání nového handleru rozšiřuje stavová data, pro mazání a vyhledávání v listu s n-ticemi se používají funkce s modulu lists. Tento systémový modul obsahuje řadu užitečných funkcí na manipulaci s listy a projít si jeho zdrojový kód je poučné čtení na téma rekurze a listy.

Pokud se event manager zkompiluje, spustí a pošlou se do něj data:

1> c(event_mngr).
{ok,event_mngr}
2> event_mngr:start(m1).
ok
3> event_mngr:event(m1, data7).
ok

Nestane se nic viditelného (ani neviditelného), neboť není zaregistrovaný žádný handler.

Handlerem může být modul, který exportuje funkce init/1 terminate_handlers/1handle_event/2

Příklad modulu, který příchozí události vypisuje na výstup, počítá zprávy a přidává číslo aktuálního výpisu do textu.

%%
%% priklad event handleru
%% pocita kolikrat udalosti prisly a vypisuje je na vystup
%% file: handler1.erl
%%

-module (handler1).

-export([init/1, handle_event/2, terminate/1]).

% vstupem je Id handleru, ktere se pripisuje do vystupniho textu
init(Id) -> {Id, 0}.

% zpracovani udalosti - prectese Id handleru, zvysi se citac
% a vypise logovaci text
handle_event (EventData, {Id, N}) ->
  NewN = N + 1,
  io:format ("[~w] #~w: ~w~n", [Id, NewN, EventData]),
  {Id, NewN}.

% po ukonceni se vraci Id a pocet udalosti shodou okolnosti
% ve stejnem tvaru v jakem je ulozen ve stavovych datech
terminate ({Id, N}) -> {Id, N}.

Handler funguje dle očekávání.

5> c(handler1).
{ok,handler1}
6> event_mngr:register_hander(m1, handler1, id1).
ok
7> event_mngr:event(m1, data7).
[id1] #1: data7
ok
8> event_mngr:event(m1, {a, b, c, d}).
[id1] #2: {a,b,c,d}
ok
9> event_mngr:register_hander(m1, handler1, id2).
ok
10> event_mngr:event(m1, {a, b, c, d}).
[id2] #1: {a,b,c,d}
[id1] #3: {a,b,c,d}
ok
11> event_mngr:unregister_handler(m1, handler1).
{ok,{id2,1}}

Reference

S výměnou zpráv mezi procesy souvisí datový typ reference. Je datový typ určený k uchovávání automaticky generovaných unikátních hodnot v rámci runtime prostředí (nejedná se o global id). Tyto hodnoty lze pouze mezi sebou porovnávat. Reference se vytvoří funkcí make_ref/0.

1> Ref1 = make_ref().
#Ref<0.0.0.26>
2> Ref2 = make_ref().
#Ref<0.0.0.31>
3> Ref3 = Ref1.
#Ref<0.0.0.26>
4> Ref1 == Ref2.
false
5> Ref1 == foo.
false
6> Ref1 == Ref3.
true

K čemu to může být dobré? Mohou se přidávat do protokolu pro zasílání synchronních zpráv. Klient přidá do požadavku referenci a server ji zkopíruje do odpovědi. Klient si ji pak v při čtení odpovědi zkontroluje. Jako pojistka proti chybě, kdy by při nějaké komplikované komunikaci více procesů mohlo dojít k záměnám zpráv. Nebo pokud je protokol postavený tak, že jeden proces odešle více požadavků, na které pak očekává (asynchronně) odpovědi. Pomocí reference pak lze spojit odeslané požadavky s došlými odpověďmi.

Jak uspořádat procesy

Obecný návod, jak uspořádat procesy v systému, není. Záleží na konkrétní úloze. V Erlangu platí několik doporučení. Vytvořit proces je laciné, takže jakmile se v systému, který modelujete, vyskytne nová úloha, bývá dobré pro ni vytvořit proces. Ale nesmí se to přehánět (uměle rozdvojit nějakou relativně krátkou úlohu a doufat, že se spočítá dřív). Velkou chybou je vytvářet procesy pro jednotlivé funkce. Např. parser příchozí zprávy udělat tak, že se někam zašle vstup a počká se na zprávu s výstupem. Nebo navrhnout vícevrstvový protokol tak, že pro jednotlivé vrstvy běží proces a mezi sebou komunikují zprávami. To vede k tomu, že vznikají jednotlivé procesy, přes které pak běží veškerý provoz. Vzhledem k tomu, že proces pracuje sekvenčně, mohou tak vznikat úzká hrdla, kdy se někde plní vstupní fronta rychleji, než se stačí vyprazdňovat.

ict ve školství 24

Pokud to jde, je třeba vytvářet procesy pro jednotlivé úlohy (transakce). Příklad. Ze socketu vypadnou data a spustí se proces. V něm se data rozparsují, projdou do aplikace, odpověď se zakóduje do protokolu a zapíše do socketu. To vše v jednom procesu, který volá synchronní zprávy jen na místech nezbytně nutných (kde je třeba serializovat požadavky na změny nějakých společných dat, co přicházejí od jednotlivých transakcí).

Příště si ukážeme jak se v Erlangu šíří chyby a jak je zachytávat nebo se o nich jinde dozvědět. Dále jak vytvářet procesy podle dnes popsaných návrhových vzorů s pomocí knihovny OTP.

Autor článku