Obsah
1. Programovací paradigma založené na tocích (dat)
3. Buffery s pevnou kapacitou a neomezené fronty
6. Jednoduchá síť s jediným uzlem
7. Úplný zdrojový kód prvního demonstračního příkladu
8. Nekorektní přímé rozvětvení v síti
9. Přidání procesu se vstupním a současně i výstupním portem
10. Úplný zdrojový kód třetího demonstračního příkladu
11. Proces se dvěma výstupními porty
12. Úplný zdrojový kód čtvrtého demonstračního příkladu
14. Využití bufferů pro komunikaci mezi procesy
15. Úplný zdrojový kód šestého demonstračního příkladu
16. Uzel typu Splitter a Merger
17. Úplný zdrojový kód sedmého demonstračního příkladu
19. Repositář s demonstračními příklady
1. Programovací paradigma založené na tocích (dat)
„τὰ πάντα ῥεῖ καὶ οὐδὲν μένει: everything flows, and nothing stays“
Hérakleitos
V průběhu více než sedmdesáti let vývoje informatiky bylo vytvořeno relativně velké množství paradigmat programování. S mnoha paradigmaty jsme se ostatně již mnohokrát setkali i na stránkách Roota. Připomeňme si například články o nestrukturovaném programování/paradigmatu (BASIC), strukturovaném programování (Algol a Pascal), objektově orientovaném paradigmatu (Smalltalk), funkcionálním paradigmatu (Clojure) či o paradigmatu založeném primárně na zpracování polí (APL, J).
Prozatím jsme se však příliš nezabývali programováním/paradigmatem založeným na tocích (FBP – Flow-based programming). Jedná se o paradigma, v němž se aplikace definují jako sítě procesů – „černé skříňky“ –, které si posílají data předáváním zpráv předem definovanými cestami (tedy nikoli „náhodně“ či libovolně, jako je tomu v klasickém OOP). Tyto procesy neboli černé skříňky lze prakticky libovolně propojovat a vytvářet tak různé aplikace, aniž by bylo nutné je (tedy ony procesy) interně měnit. FBP je tedy paradigma, které zcela přirozeně vede k tvorbě více či méně samostatně pracujících komponent.
A právě paradigmatem založeným na tocích dat se budeme zabývat v dnešním článku. Praktické příklady budou vytvořeny v jazyce Go a používat budou knihovnu nazvanou přímočaře goflow.
2. Základní koncepty FBP
„The real world is asynchronous: don't try to force everything into a synchronous framework derived from the architecture of early computers.“
J. Paul Morrison, inventor/discoverer of Flow-Based Programming
V FBP se upouští od klasického (řekněme von Neumannovského) pohledu na aplikace jako na sekvenční proces. Namísto toho se aplikace modelují jako síť asynchronně běžících procesů, které si předávají data přes buffery (resp. přes fronty). Každý proces v FBP provádí činnost ve chvíli, kdy dostává vstupní data a díky oddělení procesů a jejich nezávislosti na ostatních procesech vlastně zcela zadarmo získáme plnou podporu pro paralelní běh aplikace jako celku.
Základními stavebními prvky při návrhu aplikace při použití paradigmatu FBP jsou komponenty. Interně se typicky jedná o třídy, struktury+metody (Go), uzávěry (closure) či někdy o obyčejné funkce. Instance komponent umístěné do grafu/sítě se nazývají procesy. Z jedné komponenty lze pochopitelně odvodit větší množství procesů.
Jednotlivé procesy spolu nekomunikují přímo (voláním), ale příjmem dat na vstupních portech popř. posíláním dat na porty výstupní. Jednotlivá spojení (connection) mohou obsahovat buffery resp. přesněji řečeno fronty (FIFO, queue), typicky s předem nastavenou kapacitou (tedy maximálním počtem v dané chvíli čekajících zpráv). Některá propojení mohou mít kapacitu nastavenou na nulu; zde na sebe budou procesy čekat a navzájem se tak do jisté míry synchronizovat.
Porty jsou pojmenované popř. je možné pracovat s polem portů. Důležitým konceptem FBP je právě oddělení procesů od jejich propojení – proces typicky neví, odkud získává a kam posílá data, protože proces je „jen“ uzlem v síti. Teprve konfigurací sítě se zvolí potřebná propojení.
3. Buffery s pevnou kapacitou a neomezené fronty
Víme již, že jednotlivé procesy jsou při využití FBP propojeny přes porty. Porty jsou pojmenovány a při deklaraci sítě propojující jednotlivé procesy přes porty se používají právě jména portů, což ostatně uvidíme dále na demonstračních příkladech:
// propojení dvou uzlů n.Connect("converter", "Out", "printer", "Message")
Posílání zpráv je v tomto případě „potvrzováno“ – přijímací proces musí data skutečně přečíst. Dokud k přečtení nedojde, bude první (posílající) proces pozastaven. To však ve skutečnosti není asynchronní chování. Aby se zajistila asynchronicita (a tím pádem nepřímo i lepší využití dostupných prostředků – CPU atd.), je nutné, aby mezi oba komunikační porty byl vložen buffer nebo fronta. Typicky se setkáme s bufferem s omezenou kapacitou. Interně se může jednat například o cyklickou frontu (circular buffer) realizovanou buď s využitím pole (kapacita se nemění) nebo jednosměrně vázaného cyklického seznamu (kapacitu lze zvyšovat na základě dynamických požadavků aplikace):
// propojení dvou uzlů n.ConnectBuf("converter", "Out1", "printer1", "Message", BufferCapacity)
4. Vizuální návrh sítě
Pro návrh sítí procesů a portů, které tvoří součást FBP, se typicky používají nástroje určené pro vizuální tvorbu a následnou úpravu sítí. Těchto nástrojů dnes existuje minimálně několik desítek a některé z nich dokáží síť nejenom navrhnout, ale i exportovat do podoby zdrojového kódu (typicky v Javě). Příkladem velmi jednoduchého editoru FBP je nástroj nazvaný DrawFBP, jehož zdrojové kódy (v Javě) i spustitelný Java archiv (JAR) jsou dostupné na adrese https://github.com/jpaulm/drawfbp. Tento nástroj slouží k návrhu sítě a popř. i k vygenerování zdrojových kódů. Jeho ovládání je sice místy poněkud podivné (zejména se to týká editace hran), ovšem stále se jedná o užitečnou aplikaci, k jejímuž běhu je vyžadována pouze instalace JRE (běhového prostředí Javy).

Obrázek 1: Minimalisticky pojaté prostředí programu DrawFBP.

Obrázek 2: Dialog pro editaci informací o právě vloženém procesu.

Obrázek 3: Součástí projektu DrawFBP je i nápověda.

Obrázek 4: Konfigurační dialog projektu DrawFBP.
Jednou z předností DrawFBP je fakt, že dokáže vygenerovat XML s popisem sítě, které lze dále zpracovat libovolnou knihovnou určenou pro načítání a manipulaci s XML:
<?xml version="1.0"?> <drawfbp_file xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="https://github.com/jpaulm/drawfbp/blob/master/lib/drawfbp_file.xsd"> <net> <desc>Click anywhere on selection area</desc> <blocks> <block> <x> 520 </x> <y> 320 </y> <id> 2 </id> <type>B</type> <width>96</width> <height>64</height> <description>Splitter</description> <multiplex>false</multiplex> <invisible>false</invisible> <issubnet>false</issubnet> </block> <block> <x> 336 </x> <y> 320 </y> <id> 4 </id> <type>C</type> <width>36</width> <height>28</height> <description>Message</description> <substreamsensitive>false</substreamsensitive> <multiplex>false</multiplex> <invisible>false</invisible> <issubnet>false</issubnet> </block> <block> <x> 712 </x> <y> 360 </y> <id> 7 </id> <type>B</type> <width>96</width> <height>64</height> <description>Converter 2</description> <multiplex>false</multiplex> <invisible>false</invisible> <issubnet>false</issubnet> </block> <block> <x> 712 </x> <y> 264 </y> <id> 9 </id> <type>B</type> <width>96</width> <height>64</height> <description>Converter 1</description> <multiplex>false</multiplex> <invisible>false</invisible> <issubnet>false</issubnet> </block> <block> <x> 880 </x> <y> 264 </y> <id> 11 </id> <type>B</type> <width>96</width> <height>64</height> <description>Printer 1</description> <multiplex>false</multiplex> <invisible>false</invisible> <issubnet>false</issubnet> </block> <block> <x> 880 </x> <y> 360 </y> <id> 13 </id> <type>B</type> <width>96</width> <height>64</height> <description>Printer 2</description> <multiplex>false</multiplex> <invisible>false</invisible> <issubnet>false</issubnet> </block> </blocks> <connections> <connection> <fromx>354</fromx> <fromy>320</fromy> <tox>472</tox> <toy>320</toy> <fromid>4</fromid> <toid>2</toid> <id>2</id> <endsatline>false</endsatline> <upstreamport>OUT</upstreamport> <downstreamport>IN</downstreamport> <capacity>100</capacity> <segno>0</segno> </connection> <connection> <fromx>568</fromx> <fromy>304</fromy> <tox>664</tox> <toy>264</toy> <fromid>2</fromid> <toid>9</toid> <id>11</id> <endsatline>false</endsatline> <upstreamport>OUT1</upstreamport> <downstreamport>IN</downstreamport> <capacity>100</capacity> <segno>0</segno> </connection> <connection> <fromx>568</fromx> <fromy>336</fromy> <tox>664</tox> <toy>360</toy> <fromid>2</fromid> <toid>7</toid> <id>12</id> <endsatline>false</endsatline> <upstreamport>OUT2</upstreamport> <downstreamport>IN</downstreamport> <capacity>100</capacity> <segno>0</segno> </connection> <connection> <fromx>760</fromx> <fromy>264</fromy> <tox>832</tox> <toy>264</toy> <fromid>9</fromid> <toid>11</toid> <id>13</id> <endsatline>false</endsatline> <upstreamport>OUT</upstreamport> <downstreamport>IN</downstreamport> <segno>0</segno> </connection> <connection> <fromx>760</fromx> <fromy>360</fromy> <tox>832</tox> <toy>360</toy> <fromid>7</fromid> <toid>13</toid> <id>14</id> <endsatline>false</endsatline> <upstreamport>OUT</upstreamport> <downstreamport>IN</downstreamport> <segno>0</segno> </connection> </connections> </net> </drawfbp_file>
5. Instalace knihovny goflow
Ve druhé části dnešního článku si ukážeme základní způsoby použití knihovny nazvané goflow. Jedná se o knihovnu, která umožňuje využít koncepty FBP v programovacím jazyce Go. Před použitím této knihovny je pochopitelně nutné ji nainstalovat. Vytvoříme tedy prázdný projekt, a následně rámci jeho adresáře provedeme instalaci zmíněné knihovny.
Vytvoření nového projektu se provede následovně:
$ go mod init flow1 go: creating new go.mod: module flow1
Poté již postačuje přejít do adresáře s nově vytvořeným projektem a nainstalovat knihovnu goflow příkazem:
$ go get -v -v github.com/trustmaster/goflow go: downloading github.com/trustmaster/goflow v0.0.0-20210928125717-b7d4fd465ab2 go: added github.com/trustmaster/goflow v0.0.0-20210928125717-b7d4fd465ab2
Od této chvíle bude možné ve všech projektech (psaných v Go) použít tento import:
import ( "github.com/trustmaster/goflow" )
6. Jednoduchá síť s jediným uzlem
Pokusme se nyní v programovém kódu (napsaném v Go) vytvořit jednoduchou síť, která bude obsahovat jeden uzel se vstupním portem. Síť by měla vypadat následovně:

Obrázek 5: Schéma sítě.
Nejprve nadefinujeme datovou strukturu představující uzel (proces). Ten má přijímat zprávy, k čemuž se v knihovně goflow používají kanály. Kanál pojmenujeme například Message:
// Jediný typ uzlu v síti type Printer struct { // vstupní port Message <-chan string }
Dále je nutné nadefinovat metodu Process (bez parametrů – je volána automaticky). V našem jednoduchém příkladu pouze vypíšeme zprávy přečtené z kanálu:
// Zpracování vstupních dat posílaných přes kanál func (c *Printer) Process() { for message := range c.Message { fmt.Println(message) } }
Nejdůležitější částí aplikace je definice samotné sítě. Nejprve vytvoříme kostru prázdné sítě, poté do ní přidáme uzel (a pojmenujeme ho) a nakonec je nutné nadefinovat vstupní port do uzlu (opět pojmenovaný a explicitně navázaný na jméno kanálu):
// konstrukce grafu n := goflow.NewGraph() // přidání uzlu s procesem do grafu n.Add("printer", new(Printer)) // propojení uzlu s procesem n.MapInPort("Input", "printer", "Message")
Následně již můžeme vytvořit kanál pro vstupní zprávy posílané do sítě a napojit ho na port pojmenovaný „Input“ v předchozím příkazu:
// konstrukce kanálu channel := make(chan string) net.SetInPort("Input", channel)
Spuštění celé sítě:
// spuštění sítě // (kanál bude použit pro čekání na její ukončení) wait := goflow.Run(net)
V této chvíli všechny procesy (v našem případě jediný proces) čekají na data. Ta do sítě pošleme přes právě vytvořený kanál:
// poslání několika zpráv do sítě channel <- "Foo" channel <- "Bar" channel <- "Baz"
Nakonec je nutné činnost procesů ukončit a počkat na skutečné ukončení zpracování dat:
// žádost o ukončení činnosti sítě close(channel) // čekání na ukončení činnosti sítě <-wait
7. Úplný zdrojový kód prvního demonstračního příkladu
Úplný zdrojový kód dnešního prvního demonstračního příkladu (resp. přesněji řečeno celého projektu) lze nalézt na adrese https://github.com/tisnik/go-root/blob/master/article90/flow1/. Samotný program vypadá následovně:
package main import ( "fmt" "github.com/trustmaster/goflow" ) // Jediný typ uzlu v síti type Printer struct { // vstupní port Message <-chan string } // Zpracování vstupních dat posílaných přes kanál func (c *Printer) Process() { for message := range c.Message { fmt.Println(message) } } // Definice grafu sítě s aplikací func NewFlowApp() *goflow.Graph { // konstrukce grafu n := goflow.NewGraph() // přidání uzlu s procesem do grafu n.Add("printer", new(Printer)) // propojení uzlu s procesem n.MapInPort("Input", "printer", "Message") // výsledný graf return n } func main() { // vytvoření sítě net := NewFlowApp() // konstrukce kanálu channel := make(chan string) net.SetInPort("Input", channel) // spuštění sítě // (kanál bude použit pro čekání na její ukončení) wait := goflow.Run(net) // poslání několika zpráv do sítě channel <- "Foo" channel <- "Bar" channel <- "Baz" // žádost o ukončení činnosti sítě close(channel) // čekání na ukončení činnosti sítě <-wait }
8. Nekorektní přímé rozvětvení v síti
Pokusme se nyní programově nadefinovat následující síť, v níž je jeden vstup posílán do dvou procesů:

Obrázek 6: Přímé rozvětvení v síti.
Teoreticky by to mělo být snadné:
// přidání uzlu s procesem do grafu n.Add("printer", new(Printer)) // přidání uzlu s procesem do grafu n.Add("printer2", new(Printer)) // propojení uzlu s procesem n.MapInPort("Input", "printer", "Message") // propojení uzlu s procesem n.MapInPort("Input", "printer2", "Message")
Můžeme se pokusit tuto část definice sítě zakomponovat do ucelené aplikace:
package main import ( "fmt" "github.com/trustmaster/goflow" ) // Jediný typ uzlu v síti type Printer struct { // vstupní port Message <-chan string } // Zpracování vstupních dat posílaných přes kanál func (c *Printer) Process() { for message := range c.Message { fmt.Println(message) } } // Definice grafu sítě s aplikací func NewFlowApp() *goflow.Graph { // konstrukce grafu n := goflow.NewGraph() // přidání uzlu s procesem do grafu n.Add("printer", new(Printer)) // přidání uzlu s procesem do grafu n.Add("printer2", new(Printer)) // propojení uzlu s procesem n.MapInPort("Input", "printer", "Message") // propojení uzlu s procesem n.MapInPort("Input", "printer2", "Message") // výsledný graf return n } func main() { // vytvoření sítě net := NewFlowApp() // konstrukce kanálu channel := make(chan string) net.SetInPort("Input", channel) // spuštění sítě // (kanál bude použit pro čekání na její ukončení) wait := goflow.Run(net) // poslání několika zpráv do sítě channel <- "Foo" channel <- "Bar" channel <- "Baz" // žádost o ukončení činnosti sítě close(channel) // čekání na ukončení činnosti sítě <-wait }
Ovšem při ukončování aplikace dojde k chybě při pokusu o opakované uzavření kanálů sloužících pro synchronizaci mezi gorutinami:
$ go run flow2.go Foo Bar Baz fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() /home/ptisnovs/src/go/flow2/flow2.go:63 +0xbd goroutine 6 [semacquire]: sync.runtime_Semacquire(0xc00007c240?) /opt/go/src/runtime/sema.go:56 +0x25 sync.(*WaitGroup).Wait(0xc00004a740?) /opt/go/src/sync/waitgroup.go:136 +0x52 github.com/trustmaster/goflow.(*Graph).Process(0xc00008a000) /home/ptisnovs/go/pkg/mod/github.com/trustmaster/goflow@v0.0.0-20210928125717-b7d4fd465ab2/graph.go:181 +0x19b github.com/trustmaster/goflow.Run.func1() /home/ptisnovs/go/pkg/mod/github.com/trustmaster/goflow@v0.0.0-20210928125717-b7d4fd465ab2/component.go:19 +0x2b created by github.com/trustmaster/goflow.Run /home/ptisnovs/go/pkg/mod/github.com/trustmaster/goflow@v0.0.0-20210928125717-b7d4fd465ab2/component.go:18 +0x98 goroutine 7 [chan receive (nil chan)]: main.(*Printer).Process(0x0?) /home/ptisnovs/src/go/flow2/flow2.go:16 +0x85 github.com/trustmaster/goflow.Run.func1() /home/ptisnovs/go/pkg/mod/github.com/trustmaster/goflow@v0.0.0-20210928125717-b7d4fd465ab2/component.go:19 +0x2b created by github.com/trustmaster/goflow.Run /home/ptisnovs/go/pkg/mod/github.com/trustmaster/goflow@v0.0.0-20210928125717-b7d4fd465ab2/component.go:18 +0x98 goroutine 8 [chan receive]: github.com/trustmaster/goflow.(*Graph).Process.func1() /home/ptisnovs/go/pkg/mod/github.com/trustmaster/goflow@v0.0.0-20210928125717-b7d4fd465ab2/graph.go:175 +0x3a created by github.com/trustmaster/goflow.(*Graph).Process /home/ptisnovs/go/pkg/mod/github.com/trustmaster/goflow@v0.0.0-20210928125717-b7d4fd465ab2/graph.go:174 +0x185 exit status 2
9. Přidání procesu se vstupním a současně i výstupním portem
Nyní celou síť upravíme takovým způsobem, že zprávy budou nejprve upraveny v dalším uzlu (procesu) nazvaném Converter. Ten zprávy převede na velká písmena:

Obrázek 7: Proces se vstupním a současně i výstupním portem.
Jedná se o proces s jedním vstupním a jedním výstupním kanálem, čemuž odpovídá i deklarace datového typu (struktury) představujícího tento proces:
// Druhý typ uzlu v síti type Converter struct { // vstupní port In <-chan string // výstupní port Out chan<- string }
Samotná funkce nového procesu je jednoduchá – každá zpráva načtená ze vstupního kanálu je převedena na velká písmena a poslána do kanálu výstupního:
// Zpracování vstupních dat posílaných přes kanál // a poslání výsledku na výstupní kanál func (c *Converter) Process() { for message := range c.In { converted := strings.ToUpper(message) c.Out <- converted } }
Nyní je nutné vytvořit graf a přidat do něj dva uzly – jeden typu Converter a druhý typu Printer:
// konstrukce grafu n := goflow.NewGraph() // přidání uzlu s procesem do grafu n.Add("converter", new(Converter)) // přidání uzlu s procesem do grafu n.Add("printer", new(Printer))
Následuje propojení obou uzlů, konkrétně kanálů Out uzlu converter s kanálem Message uzlu printer (to je v tomto příkladu novinka):
// propojení dvou uzlů n.Connect("converter", "Out", "printer", "Message")
Definice vstupu do sítě již známe:
// propojení uzlu s procesem n.MapInPort("Input", "converter", "In")
Po překladu a spuštění získáme tuto trojici zpráv zobrazených na standardním výstupu, což ukazuje, že tok dat opravdu odpovídá nakonfigurované síti:
FOO BAR BAZ
10. Úplný zdrojový kód třetího demonstračního příkladu
Úplný zdrojový kód dnešního třetího demonstračního příkladu (resp. přesněji řečeno celého projektu) lze nalézt na adrese https://github.com/tisnik/go-root/blob/master/article90/flow3/. Samotný program vypadá následovně:
package main import ( "fmt" "github.com/trustmaster/goflow" "strings" ) // První typ uzlu v síti type Printer struct { Message <-chan string } // Zpracování vstupních dat posílaných přes kanál func (c *Printer) Process() { for message := range c.Message { fmt.Println(message) } } // Druhý typ uzlu v síti type Converter struct { // vstupní port In <-chan string // výstupní port Out chan<- string } // Zpracování vstupních dat posílaných přes kanál // a poslání výsledku na výstupní kanál func (c *Converter) Process() { for message := range c.In { converted := strings.ToUpper(message) c.Out <- converted } } // Definice grafu sítě s aplikací func NewFlowApp() *goflow.Graph { // konstrukce grafu n := goflow.NewGraph() // přidání uzlu s procesem do grafu n.Add("converter", new(Converter)) // přidání uzlu s procesem do grafu n.Add("printer", new(Printer)) // propojení dvou uzlů n.Connect("converter", "Out", "printer", "Message") // propojení uzlu s procesem n.MapInPort("Input", "converter", "In") // výsledný graf return n } func main() { // vytvoření sítě net := NewFlowApp() // konstrukce kanálu channel := make(chan string) net.SetInPort("Input", channel) // spuštění sítě // (kanál bude použit pro čekání na její ukončení) wait := goflow.Run(net) // poslání několika zpráv do sítě channel <- "Foo" channel <- "Bar" channel <- "Baz" // žádost o ukončení činnosti sítě close(channel) // čekání na ukončení činnosti sítě <-wait }
11. Proces se dvěma výstupními porty
Počet vstupních nebo výstupních portů u každého procesu není prakticky žádným způsobem omezen, takže si v rámci této kapitoly ukážeme, jak lze nadefinovat proces s dvojicí výstupních portů pojmenovaných jednoduše Out1 a Out2:

Obrázek 8: Proces se dvěma výstupními porty.
Vidíme, že bude nutné upravit struktur Converter tak, aby obsahovala dva výstupní kanály:
type Converter struct { // vstupní port In <-chan string // první výstupní port Out1 chan<- string // druhý výstupní port Out2 chan<- string }
Upravíme i chování procesu Converter, například tak, že bude vytvářet dvě zprávy (každou s jiným obsahem) a pošle je do obou výstupních kanálů (portů). Ve skutečnosti však záleží pouze na procesu, jaká data a kam bude posílat. Například může rozhodovat o tom, zda je vstupní zpráva relevantní pro externí zákazníky či pro interní administrátory a podle toho se rozhodnout, kam výsledek poslat atd.:
func (c *Converter) Process() { for message := range c.In { converted := "to upper case: " + strings.ToUpper(message) c.Out1 <- converted converted = "to lower case: " + strings.ToLower(message) c.Out2 <- converted } }
Definice sítě s procesy a jejich propojeními bude nepatrně složitější, než je tomu v předchozích příkladech. Síť bude nyní obsahovat trojici uzlů:
// konstrukce grafu n := goflow.NewGraph() // přidání uzlu s procesem do grafu n.Add("converter", new(Converter)) // přidání uzlu s procesem do grafu n.Add("printer1", new(Printer)) // přidání uzlu s procesem do grafu n.Add("printer2", new(Printer))
Uzel Converter bude přes svoji dvojici výstupních portů propojen se dvěma uzly Printer:
// propojení dvou uzlů n.Connect("converter", "Out1", "printer1", "Message") // propojení dvou uzlů n.Connect("converter", "Out2", "printer2", "Message") // propojení uzlu s procesem n.MapInPort("Input", "converter", "In")
Chování příkladu po spuštění celé sítě, kdy se budou jednotlivé procesy Printer střídat v tisku zpráv, které dostávají:
to upper case: FOO to upper case: BAR to lower case: foo to lower case: bar to lower case: baz to upper case: BAZ
12. Úplný zdrojový kód čtvrtého demonstračního příkladu
Úplný zdrojový kód dnešního čtvrtého demonstračního příkladu (resp. přesněji řečeno celého projektu) lze nalézt na adrese https://github.com/tisnik/go-root/blob/master/article90/flow4/. Samotný program vypadá následovně:
package main import ( "fmt" "github.com/trustmaster/goflow" "strings" ) // První typ uzlu v síti type Printer struct { Message <-chan string } // Zpracování vstupních dat posílaných přes kanál func (c *Printer) Process() { for message := range c.Message { fmt.Println(message) } } // Druhý typ uzlu v síti type Converter struct { // vstupní port In <-chan string // první výstupní port Out1 chan<- string // druhý výstupní port Out2 chan<- string } // Zpracování vstupních dat posílaných přes kanál // a poslání výsledku na výstupní kanály func (c *Converter) Process() { for message := range c.In { converted := "to upper case: " + strings.ToUpper(message) c.Out1 <- converted converted = "to lower case: " + strings.ToLower(message) c.Out2 <- converted } } // Definice grafu sítě s aplikací func NewFlowApp() *goflow.Graph { // konstrukce grafu n := goflow.NewGraph() // přidání uzlu s procesem do grafu n.Add("converter", new(Converter)) // přidání uzlu s procesem do grafu n.Add("printer1", new(Printer)) // přidání uzlu s procesem do grafu n.Add("printer2", new(Printer)) // propojení dvou uzlů n.Connect("converter", "Out1", "printer1", "Message") // propojení dvou uzlů n.Connect("converter", "Out2", "printer2", "Message") // propojení uzlu s procesem n.MapInPort("Input", "converter", "In") // výsledný graf return n } func main() { // vytvoření sítě net := NewFlowApp() // konstrukce kanálu channel := make(chan string) net.SetInPort("Input", channel) // spuštění sítě // (kanál bude použit pro čekání na její ukončení) wait := goflow.Run(net) // poslání několika zpráv do sítě channel <- "Foo" channel <- "Bar" channel <- "Baz" // žádost o ukončení činnosti sítě close(channel) // čekání na ukončení činnosti sítě <-wait }
13. Déletrvající výpočty
Důležité je taktéž zjištění, jak se bude celá síť chovat v případě, že nějaký proces bude obsahovat déletrvající výpočty (popř. komunikaci s databází atd.). Takový proces můžeme simulovat snadno, a to zavoláním funkce time.Sleep. Upravíme tedy komponentu Printer, a to tak, že pokud bude zpráva začínat malým písmenem, bude simulována akce trvající jednu sekundu. V opačném případě skončí zpracování zprávy prakticky okamžitě:
func (c *Printer) Process() { for message := range c.Message { fmt.Println(message) if unicode.IsLower([]rune(message)[0]) { time.Sleep(1 * time.Second) } } }
Z výstupu získaného po spuštění celé sítě je patrné, že se oba procesy typu Printer (tedy Printer1 a Printer2) ve výstupu střídají. To může vypadat podezřele – vždyť jeden z procesů umí zprávu zpracovat prakticky ihned a druhému to trvá celou sekundu. Problém spočívá v tom, že se při komunikaci (propojení portů) nepoužívají buffery, resp. přesněji řečeno, že tyto buffery mají nulovou kapacitu, takže poslání zprávy z procesu Converter je provedeno s čekáním na její odebrání (a tedy vlastně synchronně):
FOO BAR foo bar BAZ baz FOO foo BAR bar BAZ baz FOO foo BAR bar BAZ baz FOO foo BAR bar BAZ
Pro úplnost si uveďme celý demonstrační příklad, který se však až na zmíněné volání time.Sleep neliší od příkladu předchozího:
package main import ( "fmt" "github.com/trustmaster/goflow" "strings" "time" "unicode" ) // První typ uzlu v síti type Printer struct { Message <-chan string } // Zpracování vstupních dat posílaných přes kanál func (c *Printer) Process() { for message := range c.Message { fmt.Println(message) if unicode.IsLower([]rune(message)[0]) { time.Sleep(1 * time.Second) } } } // Druhý typ uzlu v síti type Converter struct { // vstupní port In <-chan string // první výstupní port Out1 chan<- string // druhý výstupní port Out2 chan<- string } // Zpracování vstupních dat posílaných přes kanál // a poslání výsledku na výstupní kanály func (c *Converter) Process() { for message := range c.In { converted := strings.ToUpper(message) c.Out1 <- converted converted = strings.ToLower(message) c.Out2 <- converted } } // Definice grafu sítě s aplikací func NewFlowApp() *goflow.Graph { // konstrukce grafu n := goflow.NewGraph() // přidání uzlu s procesem do grafu n.Add("converter", new(Converter)) // přidání uzlu s procesem do grafu n.Add("printer1", new(Printer)) // přidání uzlu s procesem do grafu n.Add("printer2", new(Printer)) // propojení dvou uzlů n.Connect("converter", "Out1", "printer1", "Message") // propojení dvou uzlů n.Connect("converter", "Out2", "printer2", "Message") // propojení uzlu s procesem n.MapInPort("Input", "converter", "In") // výsledný graf return n } func main() { // vytvoření sítě net := NewFlowApp() // konstrukce kanálu channel := make(chan string) net.SetInPort("Input", channel) // spuštění sítě // (kanál bude použit pro čekání na její ukončení) wait := goflow.Run(net) // poslání několika zpráv do sítě for i := 0; i < 10; i++ { channel <- "Foo" channel <- "Bar" channel <- "Baz" } // žádost o ukončení činnosti sítě close(channel) // čekání na ukončení činnosti sítě <-wait }
14. Využití bufferů pro komunikaci mezi procesy
Ve třetí kapitole jsme si řekli, že pro komunikaci mezi procesy se velmi často používají buffery s předem známou kapacitou. V grafu sítě se kapacita většinou zapisuje do kulatých závorek, tedy následovně:

Obrázek 9: Využití bufferů.
Zajištění propojení s využitím bufferů je snadné. Nejdříve si nakonfigurujeme kapacity bufferů (buď všech nebo jen vybraných komunikačních cest):
// Výchozí kapacita bufferů const BufferCapacity = 100
Následně se namísto přímého propojení stylem:
// propojení dvou uzlů n.Connect("converter", "Out1", "printer1", "Message") // propojení dvou uzlů n.Connect("converter", "Out2", "printer2", "Message")
Použije propojení s využitím bufferů:
// propojení dvou uzlů n.ConnectBuf("converter", "Out1", "printer1", "Message", BufferCapacity) // propojení dvou uzlů n.ConnectBuf("converter", "Out2", "printer2", "Message", BufferCapacity)
Lišit se ovšem bude chování celé sítě po jejím spuštění, protože nyní není nutné v procesu converter čekat na jednotlivé procesy printer1 či printer2:
foo FOO BAR BAZ FOO BAR BAZ FOO BAR BAZ FOO BAR BAZ FOO BAR BAZ FOO BAR BAZ FOO BAR BAZ FOO BAR BAZ FOO BAR BAZ FOO BAR BAZ bar baz foo bar ... ... ... (běží již jen printer2) ... ... ...
15. Úplný zdrojový kód šestého demonstračního příkladu
Úplný zdrojový kód dnešního šestého demonstračního příkladu (resp. přesněji řečeno celého projektu) lze nalézt na adrese https://github.com/tisnik/go-root/blob/master/article90/flow6/. Samotný program vypadá následovně:
package main import ( "fmt" "github.com/trustmaster/goflow" "strings" "time" "unicode" ) // Výchozí kapacita bufferů const BufferCapacity = 100 // První typ uzlu v síti type Printer struct { Message <-chan string } // Zpracování vstupních dat posílaných přes kanál func (c *Printer) Process() { for message := range c.Message { fmt.Println(message) if unicode.IsLower([]rune(message)[0]) { time.Sleep(1 * time.Second) } } } // Druhý typ uzlu v síti type Converter struct { // vstupní port In <-chan string // první výstupní port Out1 chan<- string // druhý výstupní port Out2 chan<- string } // Zpracování vstupních dat posílaných přes kanál // a poslání výsledku na výstupní kanály func (c *Converter) Process() { for message := range c.In { converted := strings.ToUpper(message) c.Out1 <- converted converted = strings.ToLower(message) c.Out2 <- converted } } // Definice grafu sítě s aplikací func NewFlowApp() *goflow.Graph { // konstrukce grafu n := goflow.NewGraph() // přidání uzlu s procesem do grafu n.Add("converter", new(Converter)) // přidání uzlu s procesem do grafu n.Add("printer1", new(Printer)) // přidání uzlu s procesem do grafu n.Add("printer2", new(Printer)) // propojení dvou uzlů n.ConnectBuf("converter", "Out1", "printer1", "Message", BufferCapacity) // propojení dvou uzlů n.ConnectBuf("converter", "Out2", "printer2", "Message", BufferCapacity) // propojení uzlu s procesem n.MapInPort("Input", "converter", "In") // propojení uzlu s procesem // výsledný graf return n } func main() { // vytvoření sítě net := NewFlowApp() // konstrukce kanálu channel := make(chan string) net.SetInPort("Input", channel) // spuštění sítě // (kanál bude použit pro čekání na její ukončení) wait := goflow.Run(net) // poslání několika zpráv do sítě for i := 0; i < 10; i++ { channel <- "Foo" channel <- "Bar" channel <- "Baz" } // žádost o ukončení činnosti sítě close(channel) // čekání na ukončení činnosti sítě <-wait }
16. Uzel typu Splitter a Merger
Velmi často se při návrhu sítě setkáme s požadavkem na implementaci „výhybek“, konkrétně uzlů, v nichž dochází k rozdvojení (roztrojení…) zpráv a taktéž uzlů, které naopak dokážou propojit dva či více toků zpráv do jediného toku. Tyto uzly se nazývají Splitter a Merger. Buď je již implementuje příslušná knihovna či framework, nebo se můžeme pokusit o jejich vlastní implementaci. Nejedná se o nic složitého (minimálně v Go) – ostatně jednoduchou komponentu typu Splitter lze nadeklarovat doslova na několika řádcích (a využít lze v síti mnohokrát):
// Druhý typ uzlu v síti type Splitter struct { // vstupní port In <-chan string // první výstupní port Out1 chan<- string // druhý výstupní port Out2 chan<- string } // Zpracování vstupních dat posílaných přes kanál func (c *Splitter) Process() { for message := range c.In { c.Out1 <- message c.Out2 <- message } }

Obrázek 10: Uzel typu Splitter.
17. Úplný zdrojový kód sedmého demonstračního příkladu
Úplný zdrojový kód dnešního sedmého demonstračního příkladu (resp. přesněji řečeno celého projektu) lze nalézt na adrese https://github.com/tisnik/go-root/blob/master/article90/flow7/. Samotný program vypadá následovně:
package main import ( "fmt" "github.com/trustmaster/goflow" "strings" "time" "unicode" ) // Výchozí kapacita bufferů const BufferCapacity = 100 // První typ uzlu v síti type Printer struct { Message <-chan string } // Zpracování vstupních dat posílaných přes kanál func (c *Printer) Process() { for message := range c.Message { fmt.Println(message) if unicode.IsLower([]rune(message)[0]) { time.Sleep(1 * time.Second) } } } // Druhý typ uzlu v síti type Splitter struct { // vstupní port In <-chan string // první výstupní port Out1 chan<- string // druhý výstupní port Out2 chan<- string } // Zpracování vstupních dat posílaných přes kanál func (c *Splitter) Process() { for message := range c.In { c.Out1 <- message c.Out2 <- message } } // Třetí typ uzlu v síti type Converter1 struct { // vstupní port In <-chan string // výstupní port Out chan<- string } // Zpracování vstupních dat posílaných přes kanál // a poslání výsledku na výstupní kanály func (c *Converter1) Process() { for message := range c.In { converted := strings.ToUpper(message) c.Out <- converted } } // Čtvrtý typ uzlu v síti type Converter2 struct { // vstupní port In <-chan string // výstupní port Out chan<- string } // Zpracování vstupních dat posílaných přes kanál // a poslání výsledku na výstupní kanály func (c *Converter2) Process() { for message := range c.In { converted := strings.ToLower(message) c.Out <- converted } } // Definice grafu sítě s aplikací func NewFlowApp() *goflow.Graph { // konstrukce grafu n := goflow.NewGraph() // přidání uzlu s procesem do grafu n.Add("splitter", new(Splitter)) // přidání uzlu s procesem do grafu n.Add("converter1", new(Converter1)) // přidání uzlu s procesem do grafu n.Add("converter2", new(Converter2)) // přidání uzlu s procesem do grafu n.Add("printer1", new(Printer)) // přidání uzlu s procesem do grafu n.Add("printer2", new(Printer)) // propojení dvou uzlů n.ConnectBuf("splitter", "Out1", "converter1", "In", BufferCapacity) // propojení dvou uzlů n.ConnectBuf("splitter", "Out2", "converter2", "In", BufferCapacity) // propojení dvou uzlů n.ConnectBuf("converter1", "Out", "printer1", "Message", BufferCapacity) // propojení dvou uzlů n.ConnectBuf("converter2", "Out", "printer2", "Message", BufferCapacity) // propojení uzlu s procesem n.MapInPort("Input", "splitter", "In") // výsledný graf return n } func main() { // vytvoření sítě net := NewFlowApp() // konstrukce kanálu channel := make(chan string) net.SetInPort("Input", channel) // spuštění sítě // (kanál bude použit pro čekání na její ukončení) wait := goflow.Run(net) // poslání několika zpráv do sítě for i := 0; i < 10; i++ { channel <- "Foo" channel <- "Bar" channel <- "Baz" } // žádost o ukončení činnosti sítě close(channel) // čekání na ukončení činnosti sítě <-wait }
18. Obsah druhé části článku
V navazujícím článku nejprve dokončíme popis možností nabízených knihovnou goflow. Posléze se seznámíme s některými alternativními knihovnami určenými pro použití v dalších programovacích jazycích (například v Pythonu atd.).
19. Repositář s demonstračními příklady
Zdrojové kódy všech dnes použitých demonstračních příkladů byly uloženy do nového Git repositáře, který je dostupný na adrese https://github.com/tisnik/go-root (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má přibližně stovku kilobajtů), můžete namísto toho použít odkazy na jednotlivé demonstrační příklady, které naleznete v následující tabulce:
# | Příklad/soubor | Stručný popis | Cesta |
---|---|---|---|
1 | flow1/ | ukázka definice sítě s jediným uzlem | https://github.com/tisnik/go-root/blob/master/article90/flow1/ |
2 | flow2/ | nekorektně vytvořená síť | https://github.com/tisnik/go-root/blob/master/article90/flow2/ |
3 | flow3/ | síť s dvojicí uzlů | https://github.com/tisnik/go-root/blob/master/article90/flow3/ |
4 | flow4/ | explicitní rozvětvení v síti | https://github.com/tisnik/go-root/blob/master/article90/flow4/ |
5 | flow5/ | vliv zpoždění při zpracování dat | https://github.com/tisnik/go-root/blob/master/article90/flow5/ |
6 | flow6/ | propojení uzlů přes buffery s kapacitou | https://github.com/tisnik/go-root/blob/master/article90/flow6/ |
7 | flow7/ | složitější síť s explicitním splitterem | https://github.com/tisnik/go-root/blob/master/article90/flow7/ |
20. Odkazy na Internetu
- Get into the flow
https://appliedgo.net/flow/ - Flow-based and dataflow programming library for Go programming language
https://github.com/trustmaster/goflow - Flow-based programming (Wikipedia)
https://en.wikipedia.org/wiki/Flow-based_programming - FlowBasedProgramming (Python wiki)
https://wiki.python.org/moin/FlowBasedProgramming - Flow Based Programming
https://github.com/flowbased/flowbased.org/wiki - Concepts
https://github.com/flowbased/flowbased.org/wiki/Concepts - Circular buffer
https://en.wikipedia.org/wiki/Circular_buffer - Circular Buffers in Linux kernel
https://www.kernel.org/doc/html/latest/core-api/circular-buffers.html - DrawFBP
https://github.com/jpaulm/drawfbp - Panta Rhei
https://blogs.bu.edu/marsh-vocation/2016/09/29/panta-rhei/ - Hérakleitos
https://cs.wikipedia.org/wiki/H%C3%A9rakleitos - FlowBasedProgramming (Wiki)
https://www.jpaulmorrison.com/cgi-bin/wiki.pl - FBP Network Protocol
https://flowbased.github.io/fbp-protocol/ - Flow-based programming specification wiki
https://flow-based.org/ - Flow Based Programming
http://wiki.c2.com/?FlowBasedProgramming - FlowBasedProgramming
http://www.jpaulmorrison.com/cgi-bin/wiki.pl - BrokerageApplication
http://www.jpaulmorrison.com/cgi-bin/wiki.pl?BrokerageApplication - The Go Programming Language Specification
https://go.dev/ref/spec - Generics in Go
https://bitfieldconsulting.com/golang/generics - Tutorial: Getting started with generics
https://go.dev/doc/tutorial/generics - Type parameters in Go
https://bitfieldconsulting.com/golang/type-parameters - Go Data Structures: Binary Search Tree
https://flaviocopes.com/golang-data-structure-binary-search-tree/ - Gobs of data
https://blog.golang.org/gobs-of-data - How the Go runtime implements maps efficiently (without generics)
https://dave.cheney.net/2018/05/29/how-the-go-runtime-implements-maps-efficiently-without-generics - Go 1.18 Release Notes
https://golang.org/doc/go1.18 - Go 1.17 Release Notes
https://golang.org/doc/go1.17 - Go 1.16 Release Notes
https://golang.org/doc/go1.16 - Go 1.15 Release Notes
https://golang.org/doc/go1.15 - Go 1.14 Release Notes
https://golang.org/doc/go1.14 - Go 1.13 Release Notes
https://golang.org/doc/go1.13 - Go 1.12 Release Notes
https://golang.org/doc/go1.12 - Go 1.11 Release Notes
https://golang.org/doc/go1.11 - Go 1.11 Release Notes
https://golang.org/doc/go1.11 - Go 1.10 Release Notes
https://golang.org/doc/go1.10 - Go 1.9 Release Notes
https://golang.org/doc/go1.9 - Go 1.8 Release Notes
https://golang.org/doc/go1.8 - A Proposal for Adding Generics to Go
https://go.dev/blog/generics-proposal - Proposal: Go should have generics
https://github.com/golang/proposal/blob/master/design/15292-generics.md - Know Go: Generics (Kniha)
https://bitfieldconsulting.com/books/generics - Balíček constraints
https://pkg.go.dev/golang.org/x/exp/constraints - What are the libraries/tools you missed from other programming languages in Golang?
https://www.quora.com/What-are-the-libraries-tools-you-missed-from-other-programming-languages-in-Golang?share=1 - Golang Has Generics—Why I Don't Miss Generics Anymore
https://blog.jonathanoliver.com/golang-has-generics/ - Go 1.18 Generics based slice package
https://golangexample.com/go-1–18-generics-based-slice-package/ - The missing slice package
https://github.com/ssoroka/slice - Methods in Go (part I)
https://medium.com/golangspec/methods-in-go-part-i-a4e575dff860