Hlavní navigace

Komunikace v distribuovaných systémech: asynchronní předávání zpráv

21. 1. 2021
Doba čtení: 8 minut

Sdílet

 Autor: Depositphotos
První díl z nové série článků, která se bude zabývat ukázkou návrhu a postupu realizace některých základních funkcí distribuovaného informačního systému. Centrálním bodem bude message broker.

Všeobecný úvod k sérii článků

Zabývám se návrhem a vývojem distribuovaných komunikačních systémů již řadu let. Tak jsem si řekl, že by nebylo špatné shrnout některé své zkušenosti a nabídnout je k širší diskuzi a použití.

Moje představa je taková, že postupně vytvořím sérii článků, ve které ukážu návrh a postup realizace některých základních funkcí distribuovaného informačního systému. O jaký systém se bude tedy jednat?

Jsem dlouhodobým příznivcem komunikačních systémů, které mají jen minimum centrálních služeb (nějaké ty centrální služby vždycky potřebujeme, jen je otázkou, jak moc jich v tom centru musí být). Proto jsem si vybral jako centrální spojovací bod message broker (dále taky MB), přes který budou všechny komponenty komunikovat. Jedná se tedy o hvězdicovou architekturu, kdy každý komunikující IS (dále také uzel nebo node) je připojen pouze a jenom na jeden centrální message broker. Komunikující IS se spolu baví tak, že si posílají zprávy.

Takto to vypadá, že úzkým místem celého řešení je centrální message broker. V případě jeho výpadku pak nekomunikuje nikdo. To je pravda, nicméně v případě message broker se velice jednoduše realizuje clustering, který nám pomůže tento problém velice efektivně a jednoduše vyřešit.

Pro vývoj těchto systémů dlouhodobě užívám jazyk Java, proto jsem pro vlastní realizaci použil projekty:

  • Apacha Camel ver. 3.6, jako integrační nástroj na úrovni jednotlivých uzlů
  • Apache ActiveMQ ver. 5.16, v roli message broker
  • SpringBoot ver. 2.4, framework pro rychlý vývoj aplikace pro komunikační uzly

Plánovaný obsah seriálu

V rámci následujících článků bych se rád ponořil do těchto témat:

  • úvod do výměny zpráv přes message broker, messaging a request-response 
  • vyvolán služeb z REST rozhraní
  • práce s timeout v rámci výměny zpráv
  • dynamické směrování
  • serializace zpráv
  • efektivní serializace zpráv a komprese
  • ověření identity původce zpráv
  • směrování dle obsahu

Technický úvod do série článků

Všechny zdrojové kódy můžete najít: jraska1/jv-distributed-systems-guide

Celou sérii článků připravuji ve virtuálním linuxovém prostředí. 

Používám toto, i když na vlastní funkčnost projektu by to nemělo mít vliv:

  • VirtualBox 6.1 jako virtuální prostředí 
  • Fedora 33 
  • OpenJDK 11 z distribuce
  • Maven 3 z distribuce

Message broker ActiveMQ

Ten mám nainstalován ve stejném prostředí. V této sérii článků bych se nechtěl explicitně zabývat vlastnostmi a nastavováním brokeru, takže používám implicitní konfiguraci. 

K brokeru se tedy budu připojovat protokolem OpenWire na portu 61616, a nebude k tomu potřeba ani účet a heslo.

Pro kontrolu front zpráv je možné použít administrátorské rozhraní, které standardně běží na adrese http://localhost:8161.

Aplikace SpringBoot

Pro zjednodušení vývoje aplikace jsem použil framework SpringBoot.

Jeho nastavení je v rámci pom.xml, který najdete v projektu na GitHub.

Příklady pro konkrétní článek jsou zahrnuty do jednoho package, v jehož kořenu je také vždy hlavní třída pro spuštění. Ta obvykle vypadá nějak takto:

@SpringBootApplication
public class Example01Application {
    public static void main(String[] args) {
        SpringApplication.run(Example01Application.class, args);
    }
}

Komunikační funkce v Camel

Tam, kde to půjde, budu využívat projekt Camel pro implementaci komunikačních funkcí. Cesty jsou definovány zápisem Java kódu ve třídě rozšiřující třídu RouteBuilder. Tato implementace je obvykle v package route.

Takto nějak vypadá definice cest v Camel:

@Component
public class CamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class);

    @Override
    public void configure() {
        from("timer://applicant01?repeatCount=1").routeId("applicant01")
            .process(exchange -> exchange.getMessage().setBody(new Request("applicant01", new Date(), 10)))
            .to("activemq:queue:QUEUE-1");
    }
}

Podrobnější informace pro nastavení testovacího prostředí

Pro vás, kteří potřebujete pomoci s nastavením prostředí a spouštěním jednotlivých příkladů, připojuji detailnější postup.

Ověřte si, že máte dostupné JDK:

[raska@localhost ~]$ java -version
openjdk version "11.0.9.1" 2020-11-04
OpenJDK Runtime Environment 18.9 (build 11.0.9.1+11)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.9.1+11, mixed mode, sharing)

A také nainstalovaný Maven:

[raska@localhost ~]$ mvn -version
Apache Maven 3.6.3 (Red Hat 3.6.3-5)
Maven home: /usr/share/maven
Java version: 11.0.9.1, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.9.11-9.fc33.x86_64
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "5.9.16-200.fc33.x86_64", arch: "amd64", family: "unix"

Toto je postup krok za krokem, jak stáhnout a rozběhnout AciveMQ v implicitním nastavení:

[raska@localhost]$ mkdir opt
[raska@localhost]$ cd opt
[raska@localhost opt]$ wget https://downloads.apache.org/activemq/5.16.0/apache-activemq-5.16.0-bin.tar.gz
[raska@localhost opt]$ tar xzf apache-activemq-5.16.0-bin.tar.gz
[raska@localhost opt]$ ln -s apache-activemq-5.16.0 activemq
[raska@localhost opt]$ ./activemq/bin/activemq start

Funkčnost si můžete ověřit na webovém administrátorském rozhraní http://localhost:8161.

Příklady vyvíjím a testuji v IntelliJ IDEA. Do tohoto prostředí si můžete stáhnout projekt rovnou z GitHub a pouštět jednotlivé příklady.

Pokud se nechcete zabývat vývojovým prostředím, pak si je můžete pustit i z příkazové řádky. 

Nejjednodušší postup je následující:

[raska@localhost]$ cd opt
[raska@localhost opt]$ git clone https://github.com/jraska1/jv-distributed-systems-guide.git
[raska@localhost opt]$ cd jv-distributed-systems-guide/
raska@localhost jv-distributed-systems-guide]$ mvn compile exec:java -Dexec.mainClass="cz.dsw.distribguide.example01.Example01Application"

Spustí se vám aplikace příkladů z první lekce.

Zasílání zpráv

Odesilatel zprávu odešle příjemci a už se dále nepídí po tom, zda příjemce zprávu dostal, a případně jak s ní naložil.

Příklady k tomuto článku je možné najít v package: example01

Předávané zprávy

Nejdříve musím něco mít, abych to mohl předat někomu jinému, tedy nějakou zprávu. 

V mém případě se bude jednat o Java Bean, které budou takovou zprávu představovat. Jejich definice je v package entity

Je tam abstraktní třída Token, kterou budu používat jako předchůdce pro všechny vyměňované zprávy. Ta vypadá následovně:

public abstract class Token implements Serializable {

    private String name;
    private Date ts;

    public Token(String name, Date ts) {
        this.name = name;
        this.ts = ts;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Date getTs() {
        return ts;
    }
    public void setTs(Date ts) {
        this.ts = ts;
    }
    @Override
    public String toString() {
        return "Token{" + "name='" + name + '\'' + ", ts=" + ts + '}';
    }
}

V každé zprávě bude nějaké jméno uzlu, který zprávu vytvořil a timestamp, kdy se tak stalo.

No a dále tam je další třída Request, která bude představovat vlastní obsah předávané zprávy od odesilatele k příjemci:

public class Request extends Token {

    private long value;

    public Request(String name, Date ts, long value) {
        super(name, ts);
        this.value = value;
    }
    public long getValue() {
        return value;
    }
    public void setValue(long value) {
        this.value = value;
    }
    @Override
    public String toString() {
        return "Request{value=" + value + ", " + super.toString() + "}";
    }
}

Výměna zpráv

Tady se již budu zabývat jednotlivými způsoby výměny zpráv. Budou konkrétně tři.

Nejdříve ukázka definovaných cest a pak se podívám na jednotlivé způsoby samostatně:

@Component
public class CamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class);

    @Override
    public void configure() {

//      Applicant Route definitions ...
        from("timer://applicant01?fixedRate=true&delay=0&repeatCount=1").routeId("applicant01")
            .process(exchange -> exchange.getMessage().setBody(new Request("applicant01", new Date(), 10)))
            .to("activemq:queue:QUEUE-1");

        from("timer://applicant02?fixedRate=true&delay=10000&repeatCount=1").routeId("applicant02")
            .process(exchange -> exchange.getMessage().setBody(new Request("applicant02", new Date(), 20)))
            .multicast()
                .to("activemq:queue:QUEUE-1", "activemq:queue:QUEUE-2", "activemq:queue:QUEUE-3")
            .end();

        from("timer://applicant03?fixedRate=true&delay=20000&repeatCount=1").routeId("applicant03")
            .process(exchange -> exchange.getMessage().setBody(new Request("applicant03", new Date(), 30)))
            .to("activemq:topic:TOPIC");

//      Provider Route definitions ...
        from("activemq:queue:QUEUE-1").routeId("provider01")
            .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class)));

        from("activemq:queue:QUEUE-2").routeId("provider02")
            .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class)));

        from("activemq:queue:QUEUE-3").routeId("provider03")
            .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class)));

        from("activemq:topic:TOPIC").routeId("provider04")
            .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class)));

        from("activemq:topic:TOPIC").routeId("provider05")
            .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class)));

        from("activemq:topic:TOPIC").routeId("provider06")
            .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class)));
    }
}

Aby se mně odesilatelé nastartovali, jsou jejich cesty iniciovány pomocí komponenty timer. Je to jenom berlička pro tyhle příklady. V dalších článcích přejdu na inicializaci z REST rozhraní. 

Předání zprávy jednomu příjemci – metoda send

Je to ta nejjednodušší podoba, kdy jeden odesilatel applicant01 předá zprávu jednomu příjemci provider01 reprezentovanému frontou zpráv QUEUE-1.

Odesilatel nejdříve vytvoří bean třídy Request, kterou následně předá do fronty QUEUE-1. O výsledek se již nestará. Přesněji řečeno, jeho role končí v okamžiku, kdy se zpráva zapíše do fronty na message brokeru.

Příjemce pak vyčítá zprávy ze své fronty a pouze zapíše jejich obsah do logu. No a výsledek by se měl projevit v logu takto:

2021-01-01 18:10:40.766  INFO [nsumer[QUEUE-1]]: ... Request{value=10, Token{name='applicant01', ts=Fri Jan 01 18:10:40 CET 2021}}

Předání zprávy více příjemcům – metoda multicast

O něco komplikovanější, ale ne o moc, je předání jedné zprávy více příjemcům. To dělá odesilatel applicant02. Ten vytvoří zprávu a pak jí procesorem multicast rozešle příjemcům provider01, provider02 a provider03, a to prostřednictvím front QUEUE-1, QUEUE-2 a QUEUE-3. 

Role odesilatele končí v okamžiku předání zpráv do patřičných front v message brokeru. Dále je to již úloha pro příjemce, aby obsah svých front načetli a zapsali do logu.

Takhle vypadá výsledek v logu:

2021-01-01 18:10:50.678  INFO [nsumer[QUEUE-1]]: ... Request{value=20, Token{name='applicant02', ts=Fri Jan 01 18:10:50 CET 2021}}
2021-01-01 18:10:50.726  INFO [nsumer[QUEUE-2]]: ... Request{value=20, Token{name='applicant02', ts=Fri Jan 01 18:10:50 CET 2021}}
2021-01-01 18:10:50.784  INFO [nsumer[QUEUE-3]]: ... Request{value=20, Token{name='applicant02', ts=Fri Jan 01 18:10:50 CET 2021}}

Předání zprávy neznámým příjemcům – metoda broadcast

Poslední metodou, kterou v tomto článku ukážu, je předání zprávy s využitím metody Public-Subscribe.

Odesilatel neví, komu bude zprávu předávat. On ji předá každému, kdo se přihlásí k odběru tématu TOPIC. 

CS24_early

Ve výše uvedeném příkladu je to applicant03, který vytvoří zprávu a předá ji do TOPIC. K odběru jsou přihlášeni provider04, provider05 a provider06, kteří obdrží každý stejnou kopii zprávy.

Výsledek v logu pak vypadá nějak takto:

2021-01-01 18:11:00.685  INFO [Consumer[TOPIC]]: ... Request{value=30, Token{name='applicant03', ts=Fri Jan 01 18:11:00 CET 2021}}
2021-01-01 18:11:00.685  INFO [Consumer[TOPIC]]: ... Request{value=30, Token{name='applicant03', ts=Fri Jan 01 18:11:00 CET 2021}}
2021-01-01 18:11:00.686  INFO [Consumer[TOPIC]]: ... Request{value=30, Token{name='applicant03', ts=Fri Jan 01 18:11:00 CET 2021}}

Byl pro vás článek přínosný?

Autor článku

Jiří Raška pracuje na pozici IT architekta. Poslední roky se zaměřuje na integrační a komunikační projekty ve zdravotnictví. Mezi jeho koníčky patří také paragliding a jízda na horském kole.