Hlavní navigace

Komunikace v distribuovaných systémech: směrování dle obsahu

14. 4. 2021
Doba čtení: 7 minut

Sdílet

 Autor: Depositphotos
Směrování podle obsahu, nebo také Content Based Router, je obecný EIP koncept využitelný na mnoha místech komunikačních a integračních projektů.

Jeho použití ukážu na příkladu uzlů poskytujících několik služeb najednou. Bližší informace najdete také v Content Based Router.

Doposud jsem měl vždy vytvořeno několik poskytovatelů služeb. Každý z nich poslouchal na jedné frontě a poskytoval implementaci jedné služby. Jsou to ale takové ty školní příklady.

V reálném životě budu mít uzlů, rozuměj také poskytovatelů služeb, desítky nebo stovky. Navíc budou obvykle poskytovat více služeb. Opět to mohou být desítky služeb na jeden uzel. Výsledkem pak bude, že budu mít stovky či tisíce front, vyhrazených vždy pro jednu službu jednoho uzlu.

Ono se to dá udělat i jinak. O tom bude tento článek.

Komunikačních uzlů mám desítky či stovky. Mohou být rozesety po různých zemích, různých datových centrech, v domácích počítačích a podobně. Uzly spolu vzájemně komunikují prostřednictvím front zpráv vyhrazených na message brokeru.

Každému uzlu bude na message brokeru přidělena pouze jedna fronta. Pokud bude požadovat nějaký jiný uzel službu, pošle požadavek do této fronty. Na příjemci zprávy pak bude, aby sám rozlišil, o co jej druhá strana žádá. A následně se podle typu požadavku odpovídajícím způsobem zachoval.

Takže koncept směrování podle obsahu použiji na straně poskytovatele služeb k rozlišení, o co vlastně protistrana žádá.

Příklady k tomuto článku je možné najít v package: example09

Předávané zprávy

Všechny definované typy zpráv jsou Java Bean, jejíž definice jsou v package entity

V tomto příkladu jsem definice zpráv rozšířil. Pro přehlednost bude asi lepší UML model tříd definovaných entit:


Autor: Jiří Raška

Mám definovány požadavky a odpovědi pro dvě služby, a to službu A a B.

Každý požadavek má jednoho společného předka, a tím je třída RequestBasis. Stejně tak každá odpověď má jednoho společného předka, a tím je ResponseBasis.

Dále nebudu vypisovat definice všech entit. Zájemce se může podívat do zdrojových kódů pro tenhle článek.

Zastavím se pouze u jednoho bodu, a sice návratový kód odpovědi. Ten má dle definice každá odpověď, protože jej dědí od společného předka.

Definované návratové kódy jsou tři:

  • OK              – služba proběhla v pořádku a výsledkem je validní odpověď
  • ERROR       – při zpracování požadavku nastala chyba
  • REFUSED  – uzel odmítl požadavek zpracovat; v mém případě to znamená, že služba není uzlem podporována

Definované Camel cesty

Takto vypadají definované cesty v Camel:

@Component
public class CamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class);

    @Override
    public void configure() {

//      Applicant Route definitions ...
        from("direct:applicant01").routeId("applicant01")
            .multicast()
                .aggregationStrategy(new GroupedBodyAggregationStrategy())
                .to("activemq:queue:QUEUE-1", "activemq:queue:QUEUE-2", "activemq:queue:QUEUE-3")
            .end();

        from("direct:applicant02").routeId("applicant02")
            .multicast()
                .aggregationStrategy((oldExchange, newExchange) -> {
                    Exchange result;
                    List<ResponseBasis> list;
                    if (oldExchange != null) {
                        list = oldExchange.getIn().getBody(List.class);
                        result = oldExchange;
                    }
                    else {
                        list = new ArrayList<>();
                        result = newExchange;
                    }
                    ResponseBasis resp = newExchange.getMessage().getBody(ResponseBasis.class);
                    if (resp.getCode() == ResponseCodeType.OK) {
                        list.add(newExchange.getMessage().getBody(ResponseBasis.class));
                    }
                    result.getMessage().setBody(list, List.class);
                    return result;
                })
                .to("activemq:queue:QUEUE-1", "activemq:queue:QUEUE-2", "activemq:queue:QUEUE-3")
            .end();

//      Provider Route definitions ...
        from("activemq:queue:QUEUE-1").routeId("provider01")
            .choice()
                .when(body().isInstanceOf(RequestA.class))
                    .process(exchange -> {
                        RequestA request = exchange.getMessage().getBody(RequestA.class);
                        ResponseA response = new ResponseA("provider01", new Date(), ResponseCodeType.OK,request.getValue() + 10);
                        exchange.getMessage().setBody(response);
                    })
                .when(body().isInstanceOf(RequestB.class))
                    .process(exchange -> {
                        RequestB request = exchange.getMessage().getBody(RequestB.class);
                        ResponseB response = new ResponseB("provider01", new Date(), ResponseCodeType.OK, "text length: " + request.getText().length());
                        exchange.getMessage().setBody(response);
                    })
                .otherwise()
                    .process(exchange -> {
                        ResponseBasis response = new ResponseBasis("provider01", new Date(), ResponseCodeType.REFUSED);
                        exchange.getMessage().setBody(response);
                    })
            .end();

        from("activemq:queue:QUEUE-2").routeId("provider02")
            .choice()
                .when(body().isInstanceOf(RequestA.class))
                    .process(exchange -> {
                        RequestA request = exchange.getMessage().getBody(RequestA.class);
                        ResponseA response = new ResponseA("provider02", new Date(), ResponseCodeType.OK,(request.getValue() + 10) * 2);
                        exchange.getMessage().setBody(response);
                    })
                .otherwise()
                    .process(exchange -> {
                        ResponseBasis response = new ResponseBasis("provider02", new Date(), ResponseCodeType.REFUSED);
                        exchange.getMessage().setBody(response);
                    })
            .end();

        from("activemq:queue:QUEUE-3").routeId("provider03")
            .choice()
                .when(body().isInstanceOf(RequestB.class))
                    .process(exchange -> {
                        RequestB request = exchange.getMessage().getBody(RequestB.class);
                        ResponseB response = new ResponseB("provider03", new Date(), ResponseCodeType.OK, ">>> " + request.getText() + " <<<");
                        exchange.getMessage().setBody(response);
                    })
                .otherwise()
                    .process(exchange -> {
                        ResponseBasis response = new ResponseBasis("provider03", new Date(), ResponseCodeType.REFUSED);
                        exchange.getMessage().setBody(response);
                    })
            .end();
    }
}

Mám tedy definovánu jednu cestu pro žadatele – applicant01. Ten rozešle požadavek na tři fronty, které jsou přiděleny poskytovatelům služeb, rozuměj uzlům – provider01, provider02 a provider03.

Doposud tedy nic nového.

Každý poskytovatel po příjmu zprávy z fronty nejdříve zjišťuje, jaký požadavek mu vlastně přišel. 

Používám direktivu choice, která má sekce when a otherwise.

Pokud přijatá zpráva je instancí třídy podporované danou sekcí, pak se provede odpovídající tělo této sekce.

V případě, že se nenajde žádné tělo sekce when, použije se tělo sekce otherwise. V sekci otherwise odesílám odpověď s kódem REFUSED.

Pokud se blíže podíváte na výše uvedené cesty pro jednotlivé poskytovatele, pak zjistíte, že applicant01 umí služby A i B, applicant02 umí pouze službu A. No a na závěr applicant03 umí pouze službu B.

REST API aplikace

V tomto případě se jedná o dvě služby odpovídající volání služby A nebo B. To se pozná podle typu požadavku, který je v těle HTTP dotazu.

@RestController
public class ServiceController {

    private static final String HEADER_OBJECT_COMPRESSION = "ObjectCompression";

    @Autowired
    private ProducerTemplate producerTemplate;

    @RequestMapping(value = "/rest/appl01")
    public ResponseEntity<List<ResponseA>> restApplicant01(
            @RequestBody RequestA request,
            @RequestParam(value = "filter", required = false, defaultValue = "false") boolean filter) {

        if (request.getName() == null)
            request.setName("rest-applicant01");
        request.setTs(new Date());

        Map<String, Object> headers = new HashMap<>();
        String url = (!filter) ? "direct:applicant01" : "direct:applicant02";

        List<ResponseA> response = producerTemplate.requestBodyAndHeaders(url, request, headers, List.class);
        if (response != null) {
            return ResponseEntity.ok(response);
        } else {
            return ResponseEntity.notFound().build();
        }
    }

    @RequestMapping(value = "/rest/appl02")
    public ResponseEntity<List<ResponseB>> restApplicant02(
            @RequestBody RequestB request,
            @RequestParam(value = "filter", required = false, defaultValue = "false") boolean filter) {

        if (request.getName() == null)
            request.setName("rest-applicant02");
        request.setTs(new Date());

        Map<String, Object> headers = new HashMap<>();
        String url = (!filter) ? "direct:applicant01" : "direct:applicant02";

        List<ResponseB> response = producerTemplate.requestBodyAndHeaders(url, request, headers, List.class);
        if (response != null) {
            return ResponseEntity.ok(response);
        } else {
            return ResponseEntity.notFound().build();
        }
    }
}

U každé služby mám ještě volitelný parametr filter. K jeho významu a použití se dostanu později.

Jak si to vyzkoušet

Takže si rovnou vyzkoušíme zavolat obě služby a uvidíme, co se stane:

[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl01' | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-03T10:12:19.749+00:00",
    "code": "OK",
    "result": 1244
  },
  {
    "name": "provider02",
    "ts": "2021-01-03T10:12:19.787+00:00",
    "code": "OK",
    "result": 2488
  },
  {
    "name": "provider03",
    "ts": "2021-01-03T10:12:19.848+00:00",
    "code": "REFUSED"
  }
]
[raska@localhost ~]$ curl -s -d '{ "text": "hahaha", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl02' | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-03T10:13:02.574+00:00",
    "code": "OK",
    "text": "text length: 6"
  },
  {
    "name": "provider02",
    "ts": "2021-01-03T10:13:02.598+00:00",
    "code": "REFUSED"
  },
  {
    "name": "provider03",
    "ts": "2021-01-03T10:13:02.625+00:00",
    "code": "OK",
    "text": ">>> hahaha <<<"
  }
]

Postupně jsem zavolal REST služby A i B. Z výstupu je zřejmé, že od některých poskytovatelů jsem dostal validní odpověď na danou službu. V případě, že poskytovatel službu nepodporuje, tak jsem dostal odpověď s kódem REFUSED.

Nedalo by se to vylepšit?

To záleží na nás, jestli nám tohle řešení vyhovuje nebo ne. 

Možná bych nechtěl, aby se mně odmítnuté požadavky zařazovaly do výsledků, a já je pak následně musel odstraňovat. Prostě chci pouze platné odpovědi od poskytovatelů, kteří danou službu podporují.

Tohle se dá zajistit na úrovni REST služby. Dále ale ukážu variantu, jak to zajistit na úrovni Camel cesty žadatele.

Patřičně upravení je druhá cesta pro žadatele – applicant02.

Vyhledání platných odpovědí a jejich zahrnutí do výsledného seznamu mohu udělat v rámci agregace výsledků direktivy multicast.

Udělám to tak, že si napíšu vlastní implementaci rozhraní AggregationStrategy.  

Jedná se o implementaci jediné metody:

aggregate(Exchange oldExchange, Exchange newExchange)

Jde o spojení dvou Exchange, přičemž ta první obsahuje doposud agregované odpovědi, a ta druhá aktuálně přijatou další odpověď.

Do výsledku zahrnuji pouze ty odpovědi, které mají návratový kód OK.

root_podpora

No a ještě se vrátím k výše uvedeným REST službám. Ten parametr filter slouží k tomu, abych si vybral Camel cestu žadatele. Buď zavolám tu původní a nebo upravenou s filtrací.

Výsledky volání se zapnutou filtrací pak vypadají takto:

[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl01?filter=true' | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-03T10:16:34.785+00:00",
    "code": "OK",
    "result": 1244
  },
  {
    "name": "provider02",
    "ts": "2021-01-03T10:16:34.834+00:00",
    "code": "OK",
    "result": 2488
  }
]
[raska@localhost ~]$ curl -s -d '{ "text": "hahaha", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl02?filter=true' | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-03T10:17:15.623+00:00",
    "code": "OK",
    "text": "text length: 6"
  },
  {
    "name": "provider03",
    "ts": "2021-01-03T10:17:15.666+00:00",
    "code": "OK",
    "text": ">>> hahaha <<<"
  }
]

Byl pro vás článek přínosný?

Autor článku

Jiří Raška pracuje na pozici IT architekta. Poslední roky se zaměřuje na integrační a komunikační projekty ve zdravotnictví. Mezi jeho koníčky patří také paragliding a jízda na horském kole.