To může být pro některé služby postačující řešení. Pokud ovšem začneme pracovat se sítěmi, kde jsou zapojeny desítky nebo stovky uzlů, pak již toto řešení postačovat nebude.
Dnes vám ukážu, jak je možné zadat do volání služby, koho chceme oslovit. Cíle již nebudou natvrdo zadané v definicích cest, ale budou se sestavovat dynamicky při volání.
V dnešním článku si vystačím s jedinou službou typu požadavek/odpověď. V rámci jejího volání bude možné zadat identifikace poskytovatele/ů služeb, které se mají oslovit.
Příklady k tomuto článku je možné najít v package: example05
Předávané zprávy
Všechny vydefinované typy zpráv jsou Java Bean, jejíž definice jsou v package entity. Vzhledem k tomu, že se od předchozích článků nezměnily, nebudu je tady dále rozebírat.
Definované Camel cesty
Takto vypadají Camel cesty, které budu potřebovat na prezentaci dynamického směrování:
@Component public class CamelRoutes extends RouteBuilder { private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class); private static final String RECIPIENT_LIST = "Recipients"; @Override public void configure() { // Applicant Route definitions ... from("direct:applicant01").routeId("applicant01") .onException(ExchangeTimedOutException.class) .handled(true) .process(exchange -> { logger.warn("TIMEOUT Exception handled"); exchange.getMessage().setBody(null); }) .end() .recipientList() .header(RECIPIENT_LIST) .parallelProcessing() .aggregationStrategy(new GroupedBodyAggregationStrategy()) .end(); // Provider Route definitions ... from("activemq:queue:QUEUE-1").routeId("provider01") .process(exchange -> { Request request = exchange.getMessage().getBody(Request.class); TimeUnit.MILLISECONDS.sleep(1500); exchange.getMessage().setBody(new Response("provider01", new Date(), request.getValue() + 10)); }); from("activemq:queue:QUEUE-2").routeId("provider02") .process(exchange -> { Request request = exchange.getMessage().getBody(Request.class); TimeUnit.MILLISECONDS.sleep(500); exchange.getMessage().setBody(new Response("provider02", new Date(), (request.getValue() + 10) * 2)); }); from("activemq:queue:QUEUE-3").routeId("provider03") .process(exchange -> { Request request = exchange.getMessage().getBody(Request.class); TimeUnit.MILLISECONDS.sleep(2500); exchange.getMessage().setBody(new Response("provider03", new Date(), (request.getValue() + 50) * request.getValue())); }); } }
Mám vytvořeny tři poskytovatele služeb, kteří jsou dostupní na frontě QUEUE-1, QUEUE-2 a QUEUE-3. Jejich chování se proti předchozím příkladům nezměnilo. Ponechal jsem také různé časové prodlevy pro odpověď, takže budu moci dále experimentovat i s časovými limity společně s dynamickým směrováním.
Zajímavější je implementace pro cestu jediného žadatele, a sice applicant01.
Pro oslovení poskytovatelů je použita direktiva recipientList. Ta má definováno, že cíle pro oslovení jsou zadány formou seznamu URL v hlavičce Recipients.
Dále je nastaveno paralelní zpracování všech požadavků a agregační strategie pro zařazení odpovědí do seznamu. To ale není nic nového. To jsem používal již v předchozím článku při práci s časovými limity služeb.
REST API aplikace
Aplikace má definovanou jedinou REST službu:
@RestController public class ServiceController { private static final String RECIPIENT_LIST = "Recipients"; @Value("${routes.applicants.checkerInterval:100}") private long checkerInterval; @Autowired private ProducerTemplate producerTemplate; @RequestMapping(value = "/rest/appl01") public ResponseEntity<List<Response>> restApplicant01( @RequestBody Request request, @RequestParam(value = "destination") List<String> destinations, @RequestParam(value = "expire", required = false, defaultValue = "5000") Long expire) { if (request.getName() == null) request.setName("rest-applicant01"); request.setTs(new Date()); Map<String, Object> headers = new HashMap<>(); headers.put(RECIPIENT_LIST, destinations.stream() .map(s -> "activemq:queue:" + s + "?explicitQosEnabled=true&requestTimeoutCheckerInterval=" + checkerInterval) .collect(Collectors.toList())); headers.put(JmsConstants.JMS_REQUEST_TIMEOUT, expire); List<Response> response = producerTemplate.requestBodyAndHeaders("direct:applicant01", request, headers, List.class); if (response != null) { return ResponseEntity.ok(response); } else { return ResponseEntity.notFound().build(); } } }
Požadavek se opět předává jako JSON objekt v HTTP těle.
Dále můžu nastavit dva parametry:
- destination – což je seznam názvů front, které mají být osloveny
- expire – časový limit pro dobu trvání služby; volitelný parametr
Nejdříve si vytvořím mapu pro hlavičky zprávy, do které dále doplním hlavičky Recipients
a CamelJmsRequestTimeout
.
Do Recipients
naplním seznam URL, které mají formát: "activemq:queue:<QUEUE>?explicitQosEnabled=true&requestTimeoutCheckerInterval=100"
- <QUEUE> se nahradí hodnotami z parametru služby
- další volby souvisí s řízením časových limitů, zmiňoval jsem je v předchozím díle
Výsledek se nakonec předá jako JSON objekt v těle HTTP odpovědi.
Jak si to vyzkoušet
Tak teď již máme vše potřebné, abychom mohli trochu experimentovat.
Takže základní funkčnost:
[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl01?destination=QUEUE-1 | jq .
[
{
"name": "provider01",
"ts": "2021-01-02T11:59:29.732+00:00",
"result": 1244
}
]
nebo pro více poskytovatelů:
[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl01?destination=QUEUE-1,QUEUE-2,QUEUE-3 | jq . [ { "name": "provider01", "ts": "2021-01-02T12:00:13.431+00:00", "result": 1244 }, { "name": "provider02", "ts": "2021-01-02T12:00:12.449+00:00", "result": 2488 }, { "name": "provider03", "ts": "2021-01-02T12:00:14.442+00:00", "result": 1584456 } ]
nebo pro žádného:
[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' http://localhost:8080/rest/appl01 | jq . { "timestamp": "2021-01-02T12:01:08.829+00:00", "status": 400, "error": "Bad Request", "message": "", "path": "/rest/appl01" }
Tady je hlášena chyba, protože destination je požadovaný parametr volání.
Teď ještě zkouška s omezením časového limitu:
[raska@localhost ~]$ time curl -s -d '{"value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl01?destination=QUEUE-1,QUEUE-2,QUEUE-3&expire=2000' | jq . [ { "name": "provider01", "ts": "2021-01-02T12:02:14.213+00:00", "result": 1244 }, { "name": "provider02", "ts": "2021-01-02T12:02:13.242+00:00", "result": 2488 } ] real 0m2.112s user 0m0.046s sys 0m0.007s
a ještě něco, kde nestihne odpovědět žádný poskytovatel:
[raska@localhost ~]$ time curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl01?destination=QUEUE-1,QUEUE-2,QUEUE-3&expire=100' | jq . [] real 0m0.258s user 0m0.041s sys 0m0.010