Hlavní navigace

Komunikace v distribuovaných systémech: časové limity služeb

23. 2. 2021
Doba čtení: 13 minut

Sdílet

 Autor: Depositphotos
V tomto dílu se již více posunu k problémům spojeným s budováním a provozem distribuovaných systémů. Budu se zabývat časovými limity služeb, jak je řídit, vynucovat, a jak na nesplnění reagovat.

Představte si, že máte síť desítek nebo stovek uzlů, které jsou rozprostřeny po celé republice. Jediné jejich pojítko je centrální message broker, přes který si vyměňují zprávy. 

Je naprosto běžné, že síťová spojení jsou pomalá a nestabilní. Komunikační uzly se obvykle baví s jinými aplikacemi, od kterých získávají data. No a ty mohou opět vypadávat nebo být pomalé. A navíc se k tomu přidává provoz v různých datových centrech v kombinaci s lidským faktorem.

Z hlediska funkčnosti služeb musíte neustále počítat s tím, že pokud žadatel osloví poskytovatele (nebo více poskytovatelů), pak odpověď může dostat, ale také nemusí.

Co tedy s tím?

Jako základní nástroj, jak se s těmito problémy vypořádat, je nastavení časových limitů pro provedení služeb a adekvátní reakce při jejich nesplnění. Na příkladech to asi bude zřejmější, tak to nebudu prodlužovat. 

Příklady k tomuto článku je možné najít v package: example04

Časové limity podle vzoru komunikace

Co tedy můžeme ovlivnit z hlediska časových limitů s ohledem na druh komunikace mezi žadatelem a poskytovatelem služby?

Vzor komunikace odeslání zpráv

V některém z dřívějších článků jsem napsal něco ve smyslu, že u tohoto vzoru komunikace žadatel zapíše zprávu do message brokeru a dále již neřeší, co se s ní děje. To je sice pravda, ale ne úplná.

Pokud by příjemce nebyl napojen na message broker, pak by zpráva ležela ve frontě a čekala, a čekala. Mohlo by se nám stát, že se budou v některých frontách hromadit zprávy, což může způsobovat provozní problémy. Navíc to může být i aplikační problém, neboť odesilateli se nemusí líbit, že jeho data dlouhodobě leží v brokeru.

Jak to tedy jako odesilatel zprávy můžete ovlivnit? Jednoduše tak, že u zprávy nastavíte tzv. TTL – time to live

Jedná se o čas v milisekundách od počátku všech časů (tedy od 1.1.1970), do kterého je zpráva platná. Pokud si zprávu do této doby příjemce nevyzvedne, pak očekáváme, že ji message broker sám smaže.

Vzor komunikace požadavek/odpověď

V tomto případě si žadatel o službu stanovuje maximální možný čas (obvykle v milisekundách), po který bude čekat na doručení odpovědi od poskytovatele/ů. Jedná se tedy o dobu expirace služby.

Agregace odpovědí od všech poskytovatelů služby končí, pokud jsme obdrželi odpovědi od všech oslovených poskytovatelů nebo vypršel časový limit služby. V případě vypršení časového limitu se vrací pouze ty odpovědi, které dorazily včas.

Předávané zprávy

Všechny vydefinované typy zpráv jsou Java Bean, jejíž definice jsou v package entity. Vzhledem k tomu, že se od předchozích článků nezměnily, nebudu je tady dále rozebírat.

Definované Camel cesty

Jak již je zvykem, tak nejdříve jejich ukázka a pak komentáře k nim:

@Component
public class CamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class);

    @Value("${routes.providers.switchOn:true}")
    private boolean providerOn;

    @Override
    public void configure() {

//      Applicant Route definitions ...
        from("direct:applicant01").routeId("applicant01")
            .to("activemq:queue:QUEUE-1?preserveMessageQos=true");

        from("direct:applicant02").routeId("applicant02")
            .multicast()
                .to("activemq:queue:QUEUE-1?preserveMessageQos=true",
                    "activemq:queue:QUEUE-2?preserveMessageQos=true",
                    "activemq:queue:QUEUE-3?preserveMessageQos=true")
            .end();

        from("direct:applicant03").routeId("applicant03")
            .to("activemq:queue:QUEUE-1?explicitQosEnabled=true&requestTimeoutCheckerInterval={{routes.applicants.checkerInterval}}");

        from("direct:applicant04").routeId("applicant04")
            .multicast()
                .aggregationStrategy(new GroupedBodyAggregationStrategy())
                .to("activemq:queue:QUEUE-1?explicitQosEnabled=true&requestTimeoutCheckerInterval={{routes.applicants.checkerInterval}}",
                    "activemq:queue:QUEUE-2?explicitQosEnabled=true&requestTimeoutCheckerInterval={{routes.applicants.checkerInterval}}",
                    "activemq:queue:QUEUE-3?explicitQosEnabled=true&requestTimeoutCheckerInterval={{routes.applicants.checkerInterval}}")
            .end();

        from("direct:applicant05").routeId("applicant05")
            .onException(ExchangeTimedOutException.class)
                .handled(true)
                .process(exchange -> {
                    logger.warn("TIMEOUT Exception handled");
                    exchange.getMessage().setBody(null);
                })
            .end()
            .multicast()
                .parallelProcessing()
                .aggregationStrategy(new GroupedBodyAggregationStrategy())
                .to("activemq:queue:QUEUE-1?explicitQosEnabled=true&requestTimeoutCheckerInterval={{routes.applicants.checkerInterval}}",
                    "activemq:queue:QUEUE-2?explicitQosEnabled=true&requestTimeoutCheckerInterval={{routes.applicants.checkerInterval}}",
                    "activemq:queue:QUEUE-3?explicitQosEnabled=true&requestTimeoutCheckerInterval={{routes.applicants.checkerInterval}}")
            .end();


//      Provider Route definitions ...
        if (providerOn) {
            from("activemq:queue:QUEUE-1").routeId("provider01")
                .process(exchange -> {
                    Request request = exchange.getMessage().getBody(Request.class);
                    logger.info(">>> {}", request);
                    Response response = new Response("provider01", new Date(), request.getValue() + 10);
                    TimeUnit.MILLISECONDS.sleep(1500);
                    logger.info("<<< {}", response);
                    exchange.getMessage().setBody(response);
                });

            from("activemq:queue:QUEUE-2").routeId("provider02")
                .process(exchange -> {
                    Request request = exchange.getMessage().getBody(Request.class);
                    logger.info(">>> {}", request);
                    Response response = new Response("provider02", new Date(), (request.getValue() + 10) * 2);
                    TimeUnit.MILLISECONDS.sleep(500);
                    logger.info("<<< {}", response);
                    exchange.getMessage().setBody(response);
                });

            from("activemq:queue:QUEUE-3").routeId("provider03")
                .process(exchange -> {
                    Request request = exchange.getMessage().getBody(Request.class);
                    logger.info(">>> {}", request);
                    Response response = new Response("provider03", new Date(), (request.getValue() + 50) * request.getValue());
                    TimeUnit.MILLISECONDS.sleep(2500);
                    logger.info("<<< {}", response);
                    exchange.getMessage().setBody(response);
                });
        }
    }
}

Vzhledem k tomu, že budu potřebovat prezentovat reakci na nedostupnost služby, je v konfiguračním  souboru application.yml k dispozici parametr routes.providers.switchOn. Tímto parametrem můžete řídit spuštění Camel cest pro poskytovatele služeb. Standardně je parametr nastaven na spuštění, ale v příkladech s ním budu cvičit.

Mám vytvořeny následující cesty:

  • applicant01 a applicant02 – žadatel pro službu odesílání zpráv
    • volbou preserveMessageQos=true se stanovuje, že se bude TTL řídit na úrovni jednotlivých zpráv 
  • applicant03, applicant04 a applicant05 – žadatel pro službu požadavek/odpověď
    • volbou explicitQosEnabled=true se stanovuje, že budeme řídit expiraci služby
    • volba requestTimeoutCheckerInter­val={{routes.applicants.chec­kerInterval}} je zde proto, abych nastavil kratší čas kontroly odpovědí na message brokeru. Standardně je to 1 sekunda, což by v příkladech se zpožděním v desetinách sekundy dělalo problémy. Hodnota parametru se načítá z application.yml.
  • provider01, provider02 a provider03 – poskytovatelé služby
    • Každý poskytovatel služby přijme požadavek, vytvoří dle své implementace odpověď a pošle jí zpět. Navíc ještě zapíše jak požadavek, tak odpověď do logu a zpozdí odpověď o několik sekund.

REST API aplikace

Do aplikace je doplněna komponenta implementující REST Controller. Najdete ji v package rest.

Opět nejdříve ukážu a pak nějaké komentáře:

@RestController
public class ServiceController {

    private static final String JMS_EXPIRATION = "JMSExpiration";

    @Autowired
    private ProducerTemplate producerTemplate;

    @RequestMapping(value = "/mesg/appl01")
    public void sendMessageApplicant01(@RequestBody Request request, @RequestParam(value = "ttl", required = false, defaultValue = "-1") Long ttl) {
        if (request.getName() == null)
            request.setName("mesg-applicant01");
        request.setTs(new Date());

        Map<String, Object> headers = new HashMap<>();
        if (ttl >= 0)
            headers.put(JMS_EXPIRATION, System.currentTimeMillis() + ttl);

        producerTemplate.sendBodyAndHeaders("direct:applicant01", request, headers);
    }

    @RequestMapping(value = "/mesg/appl02")
    public void sendMessageApplicant02(@RequestBody Request request, @RequestParam(value = "ttl", required = false, defaultValue = "-1") Long ttl) {
        if (request.getName() == null)
            request.setName("mesg-applicant02");
        request.setTs(new Date());

        Map<String, Object> headers = new HashMap<>();
        if (ttl >= 0)
            headers.put(JMS_EXPIRATION, System.currentTimeMillis() + ttl);

        producerTemplate.sendBodyAndHeaders("direct:applicant02", request, headers);
    }

    @RequestMapping(value = "/rest/appl01")
    public ResponseEntity<Response> restApplicant01(@RequestBody Request request, @RequestParam(value = "expire", required = false, defaultValue = "20000") Long expire) {
        if (request.getName() == null)
            request.setName("rest-applicant01");
        request.setTs(new Date());

        Response response = producerTemplate.requestBodyAndHeader("direct:applicant03", request, JmsConstants.JMS_REQUEST_TIMEOUT, expire, Response.class);
        if (response != null) {
            return ResponseEntity.ok(response);
        } else {
            return ResponseEntity.notFound().build();
        }
    }

    @RequestMapping(value = "/rest/appl02")
    public ResponseEntity<List<Response>> restApplicant02(@RequestBody Request request, @RequestParam(value = "expire", required = false, defaultValue = "20000") Long expire) {
        if (request.getName() == null)
            request.setName("rest-applicant02");
        request.setTs(new Date());

        List<Response> response = producerTemplate.requestBodyAndHeader("direct:applicant04", request, JmsConstants.JMS_REQUEST_TIMEOUT, expire, List.class);
        if (response != null) {
            return ResponseEntity.ok(response);
        } else {
            return ResponseEntity.notFound().build();
        }
    }

    @RequestMapping(value = "/rest/appl03")
    public ResponseEntity<List<Response>> restApplicant03(@RequestBody Request request, @RequestParam(value = "expire", required = false, defaultValue = "20000") Long expire) {
        if (request.getName() == null)
            request.setName("rest-applicant03");
        request.setTs(new Date());

        List<Response> response = producerTemplate.requestBodyAndHeader("direct:applicant05", request, JmsConstants.JMS_REQUEST_TIMEOUT, expire, List.class);
        if (response != null) {
            return ResponseEntity.ok(response);
        } else {
            return ResponseEntity.notFound().build();
        }
    }
}

Jsou zde dvě skupiny služeb:

  • mesg – pro komunikační vzor odeslání zprávy 
    • dvě služby, jedna pro odeslání jednomu příjemci a druhá pak pro odeslání více příjemcům
    • žádost o službu je v těle HTTP požadavku jako JSON objekt
    • služby mají volitelný parametr ttl, kterým se stanovuje délka platnosti zprávy (parametr je uveden v milisekundách od okamžiku zadání požadavku, proto pozor na správné nastavení TTL ve zprávě: System.currentTimeMillis() + ttl)
    • výsledek je vždy ve formě JSON objektu v těle HTTP odpovědi
  • rest – pro komunikační vzor požadavek/odpověď
    • první pro požadavek jednomu poskytovateli
    • druhá pro požadavek na více poskytovatelů
    • třetí pak prezentující řešení zádrhelů, které se objevily u prvních dvou
    • žádost o službu je v těle HTTP požadavku jako JSON objekt
    • služby mají volitelný parametr expire, který stanovuje dobu v milisekundách pro vypršení požadavku
    • výsledek je vždy ve formě JSON objektu v těle HTTP odpovědi

Jak si to vyzkoušet

Vzor komunikace odeslání zpráv

Takže nejdříve vzor komunikace odeslání zprávy. Obě služby si můžeme vyzkoušet současně s tím, že v prvém běhu si necháme puštěné příjemce zpráv a ve druhém je pak zastavíme. A když se podívám na obsah front do administrátorského rozhraní message brokeru, měl bych vidět rozdíl v chování.

Takže vyvolání služeb:

[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/mesg/appl01?ttl=30000

Tělo odpovědi je prázdné, v logu bychom měli vidět něco takového:

2021-01-02 12:20:31.376  INFO: >>> Request{value=1234, Token{name='TRPASLIK', ts=Sat Jan 02 12:20:31 CET 2021}}
2021-01-02 12:20:32.906  INFO: <<< Response{result=1244,Token{name='provider01', ts=Sat Jan 02 12:20:31 CET 2021}}}

A volání pro více příjemců:

[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/mesg/appl02?ttl=30000

Tělo odpovědi je prázdné, v logu bychom měli vidět něco takového:

2021-01-02 12:21:42.076  INFO: >>> Request{value=1234, Token{name='TRPASLIK', ts=Sat Jan 02 12:21:42 CET 2021}}
2021-01-02 12:21:42.092  INFO: >>> Request{value=1234, Token{name='TRPASLIK', ts=Sat Jan 02 12:21:42 CET 2021}}
2021-01-02 12:21:42.117  INFO: >>> Request{value=1234, Token{name='TRPASLIK', ts=Sat Jan 02 12:21:42 CET 2021}}
2021-01-02 12:21:42.596  INFO: <<< Response{result=2488, Token{name='provider02', ts=Sat Jan 02 12:21:42 CET 2021}}
2021-01-02 12:21:43.590  INFO: <<< Response{result=1244, Token{name='provider01', ts=Sat Jan 02 12:21:42 CET 2021}}
2021-01-02 12:21:44.618  INFO: <<< Response{result=1584456, Token{name='provider03', ts=Sat Jan 02 12:21:42 CET 2021}}

Pokud se teď podíváte na fronty zpráv v administrátorském rozhraní, pak by měly být prázdné.

Teď nastavte parametr routes.providers.switchOn=false, restartujte aplikaci a zopakujte stejné kroky ještě jednou. 

V logu by se již neměly objevit odpovědi od providerů.

Pokud se podíváte na fronty, pak je v každé frontě jedna zpráva. Ta tam ale vydrží pouze 30 sekund. Pak vyprší její TTL a zpráva je z fronty message brokerem smazána. Přesněji řečeno, v našem nastavení brokeru je odsunuta do fronty DLQ – dead letter queue, kde je ponechána k manuálnímu řešení.

Vzor komunikace požadavek/odpověď

Tady si pro ověření funkčnosti opět vystačíme s příkazem curl. Měli bychom dostávat také nějaké odpovědi. Při volání používám ještě příkaz time, abyste lépe viděli, jak rychle mně služba odpovídá. A nakonec ještě výsledek posílám do jq pro pěknější formátování JSON.

Nejdříve tedy služba pro oslovení jednoho poskytovatele

[raska@localhost ~]$ time curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl01?expire=5000 | jq .
{
  "name": "provider01",
  "ts": "2021-01-02T11:26:40.663+00:00",
  "result": 1244
}
real    0m1.807s
user    0m0.047s
sys    0m0.006s

Je vidět, že odpověď jsem dostal za cca. 1,5s, což odpovídá zpoždění nastavenému pro provider01. Vzhledem k tomu, že mám expiraci 5s, tak je to v pořádku.

Zkusím tedy zkrátit expiraci:

[raska@localhost ~]$ time curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl01?expire=500 | jq .
{
  "timestamp": "2021-01-02T11:27:45.931+00:00",
  "status": 500,
  "error": "Internal Server Error",
  "message": "",
  "path": "/rest/appl01"
}
real    0m0.672s
user    0m0.044s
sys    0m0.008s

Zkrátil jsem expiraci na půl sekundy. Provedení příkazu mně trvalo 0,6s, což je v pořádku. Dostal jsem ale dost ošklivou chybu. Jak s ní naložit, ukážu později. Tohle prostě znamená, že jsem odpověď nedostal.

Dále služba pro oslovení více poskytovatelů

[raska@localhost ~]$ time curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl02?expire=5000 | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-02T11:29:15.132+00:00",
    "result": 1244
  },
  {
    "name": "provider02",
    "ts": "2021-01-02T11:29:16.666+00:00",
    "result": 2488
  },
  {
    "name": "provider03",
    "ts": "2021-01-02T11:29:17.214+00:00",
    "result": 1584456
  }
]
real    0m4.667s
user    0m0.041s
sys    0m0.020s

Tady už je ten výsledek zajímavější. Co se tedy událo?

  1. Nastavil jsem časový limit na 5s. 
  2. Dostal jsem odpovědi od všech služeb (ty mají nastaveno zpoždění 1,5s, 0,5s a 2,5s)
  3. Doba trvání dotazu je 4,7s, což napovídá, že se dotazy realizovaly sekvenčně. Co s tím, ukážu později.

Dobrá, zkusím zkracovat dobu expirace a sledovat, co se bude dít:

[raska@localhost ~]$ time curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl02?expire=2000 | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-02T11:30:37.812+00:00",
    "result": 1244
  },
  {
    "name": "provider02",
    "ts": "2021-01-02T11:30:39.334+00:00",
    "result": 2488
  },
  {
    "name": "REQUESTED by TRPASLIK",
    "ts": "2021-01-02T11:30:37.796+00:00",
    "value": 1234
  }
]
real    0m4.154s
user    0m0.044s
sys    0m0.014s

Zkrátil jsem expiraci na 2s, takže stihli odpovědět pouze provider01 a provider02. Ten třetí to nestihl. Dostal jsem ale tři odpovědi. Co to má znamenat? 

No, ve skutečnosti jsem dostal odpovědi jenom dvě. To třetí není odpověď, ale ten původní dotaz. 

Co s tím, aby se mně nemíchaly odpovědi spolu s expirovanými dotazy, ukážu dále.

A na závěr bádání ještě jedna expirace, a to po 0,5s.

[raska@localhost ~]$ time curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl02?expire=500 | jq .
{
  "timestamp": "2021-01-02T11:32:46.222+00:00",
  "status": 500,
  "error": "Internal Server Error",
  "message": "",
  "path": "/rest/appl02"
}
real    0m1.740s
user    0m0.043s
sys    0m0.012s

A opět ta zvláštní chyba, kdy se mně nepodařilo získat žádnou odpověď.

Řešení pro dříve naznačené problémy

Odpověď na dříve naznačené problémy najdete u poslední služby, která volá Camel cestu applicant05.

Ta vypadá takto:

from("direct:applicant05").routeId("applicant05")
    .onException(ExchangeTimedOutException.class)
        .handled(true)
        .process(exchange -> {
            logger.warn("TIMEOUT Exception handled");
            exchange.getMessage().setBody(null);
        })
    .end()
    .multicast()
        .parallelProcessing()
        .aggregationStrategy(new GroupedBodyAggregationStrategy())
        .to("activemq:queue:QUEUE-1?explicitQosEnabled=true&requestTimeoutCheckerInterval={{routes.applicants.checkerInterval}}"
            "activemq:queue:QUEUE-2?explicitQosEnabled=true&requestTimeoutCheckerInterval={{routes.applicants.checkerInterval}}",
            "activemq:queue:QUEUE-3?explicitQosEnabled=true&requestTimeoutCheckerInterval={{routes.applicants.checkerInterval}}")
    .end();

Prvním z problému, na který jsme narazili, je sekvenční zpracování dotazů na několik poskytovatelů. 

To se dá vyřešit directivou parallelProcessing() u multicast. Ta zajistí, že se všechna volání budou řešit paralelně v rámci samostatných vláken.

Dalším problémem je míchání platných odpovědí a expirovaných požadavků.

To se dá řešit pomocí reakce na výjimku ExchangeTimedOutException.class. V případě jejího výskytu ji označíme za obslouženou a smažeme tělo požadavku z exchange. 

Jako vedlejší příjemný efekt bude, že se v logu nebudou objevovat sáhodlouhé výpisy výjimek, na které stejně nechceme reagovat.

No a jako poslední příjemný efekt bude ten, že se zbavíme i té zvláštní chybové odpovědi v případech, kdy jsme nedostali žádnou odpověď od poskytovatelů.

Takhle by to tedy mělo fungovat:

ICTZ2021

[raska@localhost ~]$ time curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl03?expire=5000 | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-02T11:38:10.651+00:00",
    "result": 1244
  },
  {
    "name": "provider02",
    "ts": "2021-01-02T11:38:10.660+00:00",
    "result": 2488
  },
  {
    "name": "provider03",
    "ts": "2021-01-02T11:38:10.677+00:00",
    "result": 1584456
  }
]
real    0m2.668s
user    0m0.047s
sys    0m0.012s

Po zkrácení expirace:

[raska@localhost ~]$ time curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl03?expire=2000 | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-02T11:39:49.621+00:00",
    "result": 1244
  },
  {
    "name": "provider02",
    "ts": "2021-01-02T11:39:49.655+00:00",
    "result": 2488
  }
]
real    0m2.128s
user    0m0.040s
sys    0m0.016s

Nakonec ještě kratší čas pro expiraci:

[raska@localhost ~]$ time curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl03?expire=500 | jq .
[]
real    0m0.622s
user    0m0.054s
sys    0m0.011s

Autor článku

Jiří Raška pracuje v ICZ a.s. na pozici IT architekta. Poslední roky se zaměřuje na integrační a komunikační projekty ve zdravotnictví. Mezi jeho koníčky patří také paragliding a jízda na horském kole.