Hlavní navigace

Komunikace v distribuovaných systémech: dynamické směrování

2. 3. 2021
Doba čtení: 4 minuty

Sdílet

 Autor: Depositphotos
Doposud jsem měl v příkladech pro každou službu zapsáno ve zdrojovém kódu, které poskytovatele bude oslovovat. To bylo nastaveno v Camel cestě žadatele ve formě URL pro direktivu to nebo multicast.

To může být pro některé služby postačující řešení. Pokud ovšem začneme pracovat se sítěmi, kde jsou zapojeny desítky nebo stovky uzlů, pak již toto řešení postačovat nebude.

Dnes vám ukážu, jak je možné zadat do volání služby, koho chceme oslovit. Cíle již nebudou natvrdo zadané v definicích cest, ale budou se sestavovat dynamicky při volání.

V dnešním článku si vystačím s jedinou službou typu požadavek/odpověď. V rámci jejího volání bude možné zadat identifikace poskytovatele/ů služeb, které se mají oslovit.

Příklady k tomuto článku je možné najít v package: example05

Předávané zprávy

Všechny vydefinované typy zpráv jsou Java Bean, jejíž definice jsou v package entity. Vzhledem k tomu, že se od předchozích článků nezměnily, nebudu je tady dále rozebírat.

Definované Camel cesty

Takto vypadají Camel cesty, které budu potřebovat na prezentaci dynamického směrování:

@Component
public class CamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class);

    private static final String RECIPIENT_LIST = "Recipients";

    @Override
    public void configure() {

//      Applicant Route definitions ...
        from("direct:applicant01").routeId("applicant01")
            .onException(ExchangeTimedOutException.class)
                .handled(true)
                .process(exchange -> {
                    logger.warn("TIMEOUT Exception handled");
                    exchange.getMessage().setBody(null);
                })
            .end()
            .recipientList()
                .header(RECIPIENT_LIST)
                .parallelProcessing()
                .aggregationStrategy(new GroupedBodyAggregationStrategy())
            .end();

//      Provider Route definitions ...
        from("activemq:queue:QUEUE-1").routeId("provider01")
            .process(exchange -> {
                Request request = exchange.getMessage().getBody(Request.class);
                TimeUnit.MILLISECONDS.sleep(1500);
                exchange.getMessage().setBody(new Response("provider01", new Date(), request.getValue() + 10));
            });

        from("activemq:queue:QUEUE-2").routeId("provider02")
            .process(exchange -> {
                Request request = exchange.getMessage().getBody(Request.class);
                TimeUnit.MILLISECONDS.sleep(500);
                exchange.getMessage().setBody(new Response("provider02", new Date(), (request.getValue() + 10) * 2));
            });

        from("activemq:queue:QUEUE-3").routeId("provider03")
            .process(exchange -> {
                Request request = exchange.getMessage().getBody(Request.class);
                TimeUnit.MILLISECONDS.sleep(2500);
                exchange.getMessage().setBody(new Response("provider03", new Date(), (request.getValue() + 50) * request.getValue()));
            });
    }
}

Mám vytvořeny tři poskytovatele služeb, kteří jsou dostupní na frontě QUEUE-1, QUEUE-2 a QUEUE-3. Jejich chování se proti předchozím příkladům nezměnilo. Ponechal jsem také různé časové prodlevy pro odpověď, takže budu moci dále experimentovat i s časovými limity společně s dynamickým směrováním.

Zajímavější je implementace pro cestu jediného žadatele, a sice applicant01. 

Pro oslovení poskytovatelů je použita direktiva recipientList. Ta má definováno, že cíle pro oslovení jsou zadány formou seznamu URL v hlavičce Recipients

Dále je nastaveno paralelní zpracování všech požadavků a agregační strategie pro zařazení odpovědí do seznamu. To ale není nic nového. To jsem používal již v předchozím článku při práci s časovými limity služeb.

REST API aplikace

Aplikace má definovanou jedinou REST službu:

@RestController
public class ServiceController {

    private static final String RECIPIENT_LIST = "Recipients";

    @Value("${routes.applicants.checkerInterval:100}")
    private long checkerInterval;

    @Autowired
    private ProducerTemplate producerTemplate;

    @RequestMapping(value = "/rest/appl01")
    public ResponseEntity<List<Response>> restApplicant01(
            @RequestBody Request request,
            @RequestParam(value = "destination") List<String> destinations,
            @RequestParam(value = "expire", required = false, defaultValue = "5000") Long expire) {
        if (request.getName() == null)
            request.setName("rest-applicant01");
        request.setTs(new Date());

        Map<String, Object> headers = new HashMap<>();
        headers.put(RECIPIENT_LIST, destinations.stream()
                .map(s -> "activemq:queue:" + s + "?explicitQosEnabled=true&requestTimeoutCheckerInterval=" + checkerInterval)
                .collect(Collectors.toList()));
        headers.put(JmsConstants.JMS_REQUEST_TIMEOUT, expire);

        List<Response> response = producerTemplate.requestBodyAndHeaders("direct:applicant01", request, headers, List.class);
        if (response != null) {
            return ResponseEntity.ok(response);
        } else {
            return ResponseEntity.notFound().build();
        }
    }
}

Požadavek se opět předává jako JSON objekt v HTTP těle. 

Dále můžu nastavit dva parametry:

  • destination – což je seznam názvů front, které mají být osloveny
  • expire – časový limit pro dobu trvání služby; volitelný parametr

Nejdříve si vytvořím mapu pro hlavičky zprávy, do které dále doplním hlavičky RecipientsCamelJmsRequestTimeout.

Do Recipients naplním seznam URL, které mají formát:  "activemq:queue:<QUEUE>?explicitQosEnabled=true&requestTimeoutCheckerInterval=100"

  • <QUEUE> se nahradí hodnotami z parametru služby
  • další volby souvisí s řízením časových limitů, zmiňoval jsem je v předchozím díle 

Výsledek se nakonec předá jako JSON objekt v těle HTTP odpovědi.

Jak si to vyzkoušet

Tak teď již máme vše potřebné, abychom mohli trochu experimentovat.

Takže základní funkčnost:

[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl01?destination=QUEUE-1 | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-02T11:59:29.732+00:00",
    "result": 1244
  }
]

nebo pro více poskytovatelů:

[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl01?destination=QUEUE-1,QUEUE-2,QUEUE-3 | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-02T12:00:13.431+00:00",
    "result": 1244
  },
  {
    "name": "provider02",
    "ts": "2021-01-02T12:00:12.449+00:00",
    "result": 2488
  },
  {
    "name": "provider03",
    "ts": "2021-01-02T12:00:14.442+00:00",
    "result": 1584456
  }
]

nebo pro žádného:

[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl01 | jq .
{
  "timestamp": "2021-01-02T12:01:08.829+00:00",
  "status": 400,
  "error": "Bad Request",
  "message": "",
  "path": "/rest/appl01"
}

Tady je hlášena chyba, protože destination je požadovaný parametr volání.

Teď ještě zkouška s omezením časového limitu:

[raska@localhost ~]$ time curl -s -d '{"value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl01?destination=QUEUE-1,QUEUE-2,QUEUE-3&expire=2000' | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-02T12:02:14.213+00:00",
    "result": 1244
  },
  {
    "name": "provider02",
    "ts": "2021-01-02T12:02:13.242+00:00",
    "result": 2488
  }
]
real    0m2.112s
user    0m0.046s
sys    0m0.007s

a ještě něco, kde nestihne odpovědět žádný poskytovatel:

[raska@localhost ~]$ time curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl01?destination=QUEUE-1,QUEUE-2,QUEUE-3&expire=100' | jq .
[]
real    0m0.258s
user    0m0.041s
sys    0m0.010

Autor článku

Jiří Raška pracuje v ICZ a.s. na pozici IT architekta. Poslední roky se zaměřuje na integrační a komunikační projekty ve zdravotnictví. Mezi jeho koníčky patří také paragliding a jízda na horském kole.