Hlavní navigace

Komunikace v distribuovaných systémech: synchronní komunikace typu požadavek a odpověď

4. 2. 2021
Doba čtení: 5 minut

Sdílet

 Autor: Depositphotos
Předmětem dnešního článku budou komunikace typu požadavek/odpověď, nebo také synchronní komunikace. Odesilatel osloví jednoho nebo více příjemců tím, že jim pošle nějakou zprávu a očekává od příjemců odpověď.

Příklady k tomuto článku je možné najít v balíčku example02.

Předávané zprávy

Stejně jako tomu bylo v případě předávání zpráv, potřebuji mít vydefinované typy zpráv. 

V mém případě se bude jednat o Java Bean, které budou představovat požadavek na službu a její výsledek. Jejich definice je v balíčku entity

Je tam abstraktní třída Token, kterou budu používat jako předchůdce pro všechny vyměňované zprávy. Ta vypadá následovně:

public abstract class Token implements Serializable {

    private String name;
    private Date ts;

    public Token(String name, Date ts) {
        this.name = name;
        this.ts = ts;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Date getTs() {
        return ts;
    }
    public void setTs(Date ts) {
        this.ts = ts;
    }
    @Override
    public String toString() {
        return "Token{" + "name='" + name + '\'' + ", ts=" + ts + '}';
    }
}

Dále je tam třída Request, která reprezentuje požadavek na službu:

public class Request extends Token {

    private long value;

    public Request(String name, Date ts, long value) {
        super(name, ts);
        this.value = value;
    }
    public long getValue() {
        return value;
    }
    public void setValue(long value) {
        this.value = value;
    }
    @Override
    public String toString() {
        return "Request{value=" + value + ", " + super.toString() + "}";
    }
}

A nakonec je zde třída Response, která reprezentuje odpověď na náš dotaz:

public class Response extends Token {

    private long result;

    public Response(String name, Date ts, long result) {
        super(name, ts);
        this.result = result;
    }
    public long getResult() {
        return result;
    }
    public void setResult(long result) {
        this.result = result;
    }
    @Override
    public String toString() {
        return "Response{result=" + result + ", " + super.toString() + "}";
    }
}

Vyvolání služby

V této části se budu zabývat dvěma způsoby vzdáleného volání služby. Vzájemně se liší v tom, jestli oslovuji jednoho nebo více poskytovatelů služby.

Nejdříve ukázka definovaných cest a pak se podívám na jednotlivé způsoby samostatně:

@Component
public class CamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class);

    @Override
    public void configure() {

//      Applicant Route definitions ...
        from("timer://applicant01?fixedRate=true&delay=0&repeatCount=1").routeId("applicant01")
            .process(exchange -> {
                exchange.getMessage().setBody(new Request("applicant01", new Date(), 10));
                logger.info(">>> {}", exchange.getMessage().getBody(Request.class));
            })
            .setExchangePattern(ExchangePattern.InOut)
            .to("activemq:queue:QUEUE-1")
            .process(exchange -> logger.info("<<< {}", exchange.getMessage().getBody(Response.class)));

        from("timer://applicant02?fixedRate=true&delay=10000&repeatCount=1").routeId("applicant02")
            .process(exchange -> {
                exchange.getMessage().setBody(new Request("applicant02", new Date(), 20));
                logger.info(">>> {}", exchange.getMessage().getBody(Request.class));
            })
            .setExchangePattern(ExchangePattern.InOut)
            .multicast()
                .aggregationStrategy(new GroupedBodyAggregationStrategy())
                .to("activemq:queue:QUEUE-1", "activemq:queue:QUEUE-2", "activemq:queue:QUEUE-3")
            .end()
            .process(exchange -> exchange.getMessage().getBody(List.class).forEach(o -> logger.info("<<< {}", o.toString())));

//      Provider Route definitions ...
        from("activemq:queue:QUEUE-1").routeId("provider01")
            .process(exchange -> {
                Request request = exchange.getMessage().getBody(Request.class);
                exchange.getMessage().setBody(new Response("provider01", new Date(), request.getValue() + 10));
            });

        from("activemq:queue:QUEUE-2").routeId("provider02")
            .process(exchange -> {
                Request request = exchange.getMessage().getBody(Request.class);
                exchange.getMessage().setBody(new Response("provider02", new Date(), (request.getValue() + 10) * 2));
            });

        from("activemq:queue:QUEUE-3").routeId("provider03")
            .process(exchange -> {
                Request request = exchange.getMessage().getBody(Request.class);
                exchange.getMessage().setBody(new Response("provider03", new Date(), (request.getValue() + 50) * request.getValue()));
            });
    }
}

Aby se mně žadatelé nastartovali, jsou jejich cesty iniciovány pomocí komponenty timer. Je to jenom berlička pro tyhle příklady, v dalších článcích přejdu na inicializaci z REST rozhraní.

Mám vytvořeny tři poskytovatele služby, kteří poslouchají na frontě QUEUE-1, QUEUE-2 a QUEUE-3. Všichni jako vstup očekávají objekt třídy Request. Na základě v něm zadané hodnoty pak vytvoří odpověď ve formě objektu třídy Response. Ten pak odešlou zpět žadateli.

Vyvolání služby jednoho poskytovatele – metoda request/response

Asi ta nejjednodušší obdoba lokálního volání procedury, ale v distribuované podobě. Žadatel a poskytovatel mohou běžet na jakémkoliv uzlu, propojeni pouze prostřednictvím fronty zpráv.

Žadatel, v tomto případě applicant01, předá žádost poskytovateli provider01 prostřednictvím fronty QUEUE-1.

Odesilatel vytvoří bean třídy Request a předá je do fronty. Rozdíl proti předávání zpráv je v nastavení vzoru komunikace.

Tímto nastavením komunikačního vzoru říkám, že od příjemce očekávám odpověď:

.setExchangePattern(ExchangePattern.InOut)

Poté, co mně poskytovatel odpoví, výsledek pouze zapíšu do logu.

Takto by to mělo vypadat v logu po spuštění:

2021-01-01 18:40:24.005  INFO [r://applicant01]: >>> Request{value=10, Token{name='applicant01', ts=Fri Jan 01 18:40:24 CET 2021}}
2021-01-01 18:40:24.156  INFO [anager[QUEUE-1]]: <<< Response{result=20, Token{name='provider01', ts=Fri Jan 01 18:40:24 CET 2021}}

Vyvolání služby více poskytovatelů – metoda multicast/response

V tomto případě je situace o něco komplikovanější, ale ne zase o tolik. 

Jako žadatel chci oslovit více poskytovatelů a od každého dostat nějakou odpověď.

V mém příkladu je to applicant02, který nejdříve vytvoří žádost typu Request. Nastaví vzor komunikace na request/response. Doposud je to stejné jako v předchozím případě.

Dále ale požadavek rozešle třem poskytovatelům provider01, provider02 a provider03, adresovaným prostřednictvím jejich front.

Každý provider vytvoří nějakou odpověď dle svých pravidel, a odešle ji zpět žadateli.

V tomto okamžiku ale žadatel dostane více než jednu odpověď. Musí si je nějak spojit do jednoho výsledku. 

Ve výše uvedeném příkladu se používá strategie spojení více odpovědí do seznamu (pochopitelně si můžete napsat i vlastní, ale to teď řešit nebudu):

.aggregationStrategy(new GroupedBodyAggregationStrategy())

Tato strategie zajistí, že všechny odpovědi dostanu jako seznam, tedy v Java je to List<Response>.

A pak již následuje pouze zápis do logu. 

Výsledek v logu by pak měl vypadat nějak takto:

2021-01-01 18:40:33.980  INFO [r://applicant02]: >>> Request{value=20, Token{name='applicant02', ts=Fri Jan 01 18:40:33 CET 2021}}
2021-01-01 18:40:34.098  INFO [anager[QUEUE-3]]: <<< Response{result=30, Token{name='provider01', ts=Fri Jan 01 18:40:33 CET 2021}}
2021-01-01 18:40:34.099  INFO [anager[QUEUE-3]]: <<< Response{result=60, Token{name='provider02', ts=Fri Jan 01 18:40:34 CET 2021}}
2021-01-01 18:40:34.100  INFO [anager[QUEUE-3]]: <<< Response{result=1400, Token{name='provider03', ts=Fri Jan 01 18:40:34 CET 2021}}

Proč není příklad na metodu broadcast/response

V předchozím díle o předávání zpráv jsem měl prezentovánu metodu broadcast. Proč tedy není i tady varianta, která by umožnila broadcast požadavku a posbírání všech odpovědí?

CS24_early

No, taková varianta existuje. Pokud o ní bude mít čtenář zájem, může se podívat třeba na vzor EIP Scatter-Gather.

Nicméně jedná se o vzor použití dost komplikovaný, v reálu přinášející hodně problémů se zajištěním synchronního způsobu volání služby.

Byl pro vás článek přínosný?

Autor článku

Jiří Raška pracuje na pozici IT architekta. Poslední roky se zaměřuje na integrační a komunikační projekty ve zdravotnictví. Mezi jeho koníčky patří také paragliding a jízda na horském kole.