Hlavní navigace

Služby v distribuovaných systémech – centrální sběr dat

20. 5. 2021
Doba čtení: 11 minut

Sdílet

 Autor: Depositphotos
V tomto článku bych se chtěl zabývat typickou centrální službou v distribuovaných systémech. Tou je centrální sběr dat. Představme si zařízení rozmístěná po republice s automatizovaným sběrem základních dat.

Námětem následujících tří článků budou centrální služby v distribuovaných systémech. Že je to protimluv? Myslím, že to tak působí jen zdánlivě.

V případě centrálních služeb každý účastník ví, na jakém místě, kdo a jakou službu poskytuje. Je to obvykle zadáno v rámci konfigurace každého uzlu zapojeného do sítě. Na druhou stranu ten, kdo poskytuje nějakou centrální službu, nemusí vědět, kdo a z jakého místa jeho službu využívá. Pro pochopení asi budou lepší příklady.

V tomto článku bych se chtěl zabývat typickou centrální službou v distribuovaných systémech, a tou je centrální sběr dat.

Jako příklad by nám mohla posloužit síť automatizovaných meteorologických stanic. Představme si je jako zařízení rozmístěná po republice s automatizovaným sběrem základních meteorologických dat. Tato zařízení budou zapojena do společné sítě prostřednictvím message brokeru. Dále bude v síti připojeno jedno centrum, jehož účelem bude sbírat data od všech stanic do jedné databáze. Pokud nějaké zařízení provede měření a získá data, odešle je do centra, a tím jeho úloha skončí. Každá stanice má ve své konfiguraci nastaveno, na jaké místo (queue: nebo topic:) má data odeslat. Na druhou stranu uzel s centrální databází vůbec nemusí vědět, kolik stanic máme aktuálně připojených a funkčních. Ten jednoduše sebere všechna data, která přijdou na jeho vstup a patřičným způsobem s nimi naloží (v tomto případě zapíše do databáze).

Mohli bychom rozlišit několik variant komunikace pro centrální sběr dat.

  1. Předání dat prostřednictvím TOPIC:

    Jedná se o „nejvolnější“ formu vztahu mezi odesilatelem dat a jejich příjemcem. Odesilatel svá data předá do tématu, a dále již není schopen ovlivnit, zda jeho data přijal jeden či více příjemců, nebo zda vůbec nějaký.

    Příkladem takového způsobu sběru dat bude centrální databáze auditních záznamů, kterou se budu zabývat v další části tohoto textu.

  2. Předání dat prostřednictvím QUEUE:

    O něco více „pevnější“ formu vztahu mezi odesilatelem a příjemcem reprezentuje využití fronty pro předání dat. Odesilatel odešle svá data do fronty, ze které mohou být data přečtena jen jedním příjemcem (pomiňme v tomto okamžiku možnosti clusterování). Může se stát, že příjemce data nepřečte a ta budou následně smazána díky jejich nastavenému TTL.

    Tento způsob komunikace bych asi využil v dříve uvedeném příkladu s automatizovanými meteorologickými stanicemi.

  3. Předání dat prostřednictvím request/response:

    Nejužší formou kooperace mezi odesilatelem a příjemcem dat je request/response. Odesilatel odešle svá data příjemci jako součást dotazu a očekává, že mu přijetí či odmítnutí dat bude potvrzeno v rámci odpovědi. V případě, že odesilatel nedostane potvrzení o přijetí dat, může pokus v závislosti na podmínkách opakovat, dokud nebude úspěšný.

    V dnešní době je aktuální téma očkování a sběr informací o něm. Pokud by byly součástí sítě ambulantní či klinické systémy, pak by tímto způsobem mohly předávat do centrální databáze informace o provedeném očkování konkrétního pacienta. V tomto případě by jistě bylo na místě využití komunikace typu request/response, aby si odesílající mohl být jistý, že jeho data byla centrem přijata.

Centrální sběr dat – auditní databáze

Jsem velkým zastáncem využívání auditních záznamů pro sledování a vyhodnocování funkčnosti informačních systémů, nejen těch distribuovaných. Přihřeji si tedy svou polívčičku i zde.

Za auditní záznamy považuji data zachycující vznik nějaké aplikační události v informačním systému. Nejedná se tedy o nějaký výběr aplikačních dat (například dat o zákazníkovi), ale záznam o tom, že se s daty zákazníka nějak manipulovalo. Obvykle se zaznamenává vnik dat, jejich aktualizace, případně také jejich výmaz. Může se, či raději bych řekl měl by se, zaznamenávat i přístup k datům.

V další části textu jsem vydefinoval dvě nové role, a to jsou:

  • audited – instance s touto rolí vytváří auditní záznamy o událostech a odesílá je do centrální auditní databáze
  • auditor – představuje roli, která implementuje službu příjmu auditních záznamů, a jejich uložení do databáze, a případné zpřístupnění těchto záznamů pověřeným osobám

Následující diagram zachycuje interakci mezi uzly (rolemi) s návazností na centrální sběr auditních záznamů:

Každý uzel s rolí applicant nebo provider má přidělenu i roli audited. Budou tedy o své činnosti předávat auditní záznamy do uzlu s rolí auditor, což je v případě výše uvedeného schématu uzel node04.

Jako formu komunikace jsem si v tomto případě zvolil nejvolnější variantu, a sice TOPIC. Jedním z důvodu mé volby je to, že auditování by nemělo žádným způsobem ovlivňovat aplikační funkce. Auditovaná aplikace by neměla řešit, zda auditor běží a jak plní svou funkci.

Další otázkou je, jak budu reprezentovat auditní záznamy. Jako východisko použiji dříve definovanou strukturu entit s rozšířením:

Vycházím z třídy Message, která mně slouží jako základ všech asynchronně předávaných zpráv. Od ní je odvozen jeden společný předek pro auditní záznamy – AuditRecord, který je dále ještě konkretizován pro události generované rolemi applicant, provider a conducted (touto rolí se budu zbývat v příštím textu, zde to prosím ignorujte). Z diagramu je také vidět, že pro různé události mohu sbírat rozdílná data.

Role <audited>

Pokud má uzel přiřazenu tuto roli, pak je schopen generovat auditní záznamy a předávat je do URI topic:AUDIT (toto URI je definované v hlavním konfiguračním souboru pod jménem services.audit.uri; znají jej tedy všechny uzly bez rozdílu).

Komunikační funkce

Takto vypadá komunikační vrstva role (definováno ve třídě AuditedCamelRoutes):

@Component
@Profile(value = "audited")
public class AuditedCamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(AuditedCamelRoutes.class);

    @Autowired
    TokenFactory factory;

    @Override
    public void configure() throws Exception {

        from("direct:applicant-audit").routeId("applicant-audit")
            .process(exchange -> {
                Request request = exchange.getProperty("Request", Request.class);
                List<String> recipients = exchange.getProperty("Recipients", List.class);
                List<Response> response = exchange.getMessage().getBody(List.class);
                ApplicantAuditRecord auditRecord = factory.tokenInstance(request.getTid(), ApplicantAuditRecord.class);
                auditRecord.setEvent(request.auditEvent());
                auditRecord.setSeverity("info");
                auditRecord.setDestinations(recipients);
                auditRecord.setResponseFrom(response.stream().map(r -> r.getNid().toString()).collect(Collectors.toList()));
                exchange.getMessage().setBody(auditRecord);
            })
            .to("{{services.audit.uri}}");

        from("direct:provider-audit").routeId("provider-audit")
            .process(exchange -> {
                Request request = exchange.getProperty("Request", Request.class);
                Response response = exchange.getMessage().getBody(Response.class);
                ProviderAuditRecord auditRecord = factory.tokenInstance(response.getTid(), ProviderAuditRecord.class);
                auditRecord.setEvent(response.auditEvent());
                auditRecord.setSeverity("info");
                auditRecord.setRequestFrom(request.getNid().toString());
                auditRecord.setCode(response.getCode());
                exchange.getMessage().setBody(auditRecord);
            })
            .to("{{services.audit.uri}}");

        from("direct:conducted-audit").routeId("conducted-audit")
            .choice()
                .when(body().isInstanceOf(RestartMessage.class))
                    .process(exchange -> {
                        RestartMessage request = exchange.getMessage().getBody(RestartMessage.class);
                        ConductedAuditRecord auditRecord = factory.tokenInstance(request.getTid(), ConductedAuditRecord.class);
                        auditRecord.setEvent("conducted.restarted");
                        auditRecord.setSeverity("info");
                        auditRecord.setRequestFrom(request.getNid().toString());
                        exchange.getMessage().setBody(auditRecord);
                    })
                    .to("{{services.audit.uri}}")
            .end();

        from("direct:conductor-audit").routeId("conductor-audit")
            .choice()
                .when(body().isInstanceOf(RestartMessage.class))
                    .process(exchange -> {
                        RestartMessage request = exchange.getMessage().getBody(RestartMessage.class);
                        AuditRecord auditRecord = factory.tokenInstance(request.getTid(), AuditRecord.class);
                        auditRecord.setEvent("conductor.restart");
                        auditRecord.setSeverity("info");
                        exchange.getMessage().setBody(auditRecord);
                    })
                    .to("{{services.audit.uri}}")
            .end();
    }
}

Jsou zde definovány cesty pro každý typ podporované auditní události. To proto, že u každé se sbírají jiné údaje o proběhlé události.

Společným pojítkem je atribut event, ve kterém se předává strukturované jméno události.

Jak se vlastně Camel cesty volají? Setkali jsme se s nimi již v předchozím dílu, kde jsem vám ukazoval cesty pro roli applicant a provider. Tam byly na konci vždy „ocásky“.

Tak například pro roli applicant vypadá následovně:

.choice()
    .when(simple("${camelContext.hasEndpoint(direct:applicant-audit)} != null"))
        .wireTap("direct:applicant-audit")

V podstatě se jedná o následující:

  • nejdříve zjistím, jestli v Camel kontextu existuje cesta s uri=“direct:applicant-audit“. Pokud ano, pak to znamená, že uzel má přidělenu roli audited.
  • v případě, že existuje, předám do cesty výsledek volání, a to s využitím direktivy wireTap. Ta zajistí asynchronní zpracování bez blokace volající cesty. Více informací o direktivě wireTap.

Obdobně je to uvedeno také u role provider, což na tomto místě nebudu více rozebírat.

Role <auditor>

Role auditor je určena k tomu, aby sbírala auditní záznamy od všech uzlů s rolí audited. Navíc má uživatelské REST rozhraní, aby bylo možné nahlížet na získané záznamy.

Datové komponenty

V reálném nasazení se budou auditní záznamy nejspíše ukládat do nějaké databáze. Abych vám zbytečně nekomplikoval život, tak databázi používat nebudu. Data ponechám v Java Bean, takže pro ukončení uzlu s touto rolí o data přijdu. Pro ukázku funkce to ale bude dostatečné, jen je potřeba s tím počítat.

Data budu ukládat do této komponenty typu seznam (implementováno ve třídě AuditorComponent):

@Component
@Profile(value = "auditor")
public class AuditorComponent {

    @Bean
    public List<AuditRecord> auditStorage() {
        return Collections.synchronizedList(new ArrayList<>());
    }
}

Komunikační funkce

Takto vypadá komunikační vrstva pro roli, třída AuditorCamelRoutes:

@Component
@Profile(value = "auditor")
public class AuditorCamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(AuditorCamelRoutes.class);

    @Autowired
    List<AuditRecord> auditStorage;

    @Override
    public void configure() throws Exception {
        from("{{services.audit.uri}}").routeId("auditor")
            .process(exchange -> {
                AuditRecord auditRecord = exchange.getMessage().getBody(AuditRecord.class);
                logger.info("Audit Record written to the Storage: {}", auditRecord);
                auditStorage.add(auditRecord);
            });
    }
}

Po vyzvednutí záznamu z uri=“topic:AUDIT“ záznam přidám do úložiště, a to je vše.

Aplikační rozhraní

Pro přístup k úložišti mám vytvořenu jednu službu, třída AuditorServiceController:

@RestController
@Profile(value = "auditor")
public class AuditorServiceController {

    private static final Logger logger = LoggerFactory.getLogger(AuditorServiceController.class);

    @Autowired
    List<AuditRecord> auditStorage;

    @RequestMapping(value = "/audit")
    public ResponseEntity<List<AuditRecord>> getAuditRecords() {
        return new ResponseEntity<>(auditStorage, HttpStatus.OK);
    }
}

Služba vrátí aktuální obsah úložiště jako pole JSON objektů.

Nastal čas na vyzkoušení

Ukázka základní funkčnosti

Nejdříve si vyzkoušíme konfiguraci uzlů tak, jak jsem je naznačil v sekvenčním diagramu výše. Spustím tedy uzly s těmito rolemi:

  • node01 – applicant, audited
  • node02 – provider, audited
  • node03 – provider, audited
  • node04 – auditor

Příkazy spustit každý v samostatném terminálu:

java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node01,audited
java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node02,audited
java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node03,audited
java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node04

Nyní si můžeme ověřit, že nám fungují požadavky na službu:

[raska@localhost ~]$ time curl -s http://localhost:8081/rest/appl01?value=1234 | jq .
[
  {
    "nid": "local:node03",
    "name": "node03",
    "tid": "uuid:6429b6c1-6f23-47f5-888c-c0857bf3fec6",
    "ts": "2021-02-21T16:49:20.214+00:00",
    "code": "OK",
    "result": 1534
  },
  {
    "nid": "local:node02",
    "name": "node02",
    "tid": "uuid:6429b6c1-6f23-47f5-888c-c0857bf3fec6",
    "ts": "2021-02-21T16:49:20.214+00:00",
    "code": "OK",
    "result": 1729
  }
]

real    0m3.953s
user    0m0.052s
sys     0m0.004s

Podle očekávání jsem dostal dvě odpovědi od uzlů node02 a node03.

Teď se zkusím podívat, co mně zapsaly všechny uzly do auditní databáze:

[raska@localhost ~]$ curl -s http://localhost:8084/audit | jq .
[
  {
    "nid": "local:node02",
    "name": "node02",
    "tid": "uuid:6429b6c1-6f23-47f5-888c-c0857bf3fec6",
    "ts": "2021-02-21T16:49:20.298+00:00",
    "event": "provider.serviceA",
    "severity": "info",
    "requestFrom": "local:node01",
    "code": "OK"
  },
  {
    "nid": "local:node03",
    "name": "node03",
    "tid": "uuid:6429b6c1-6f23-47f5-888c-c0857bf3fec6",
    "ts": "2021-02-21T16:49:20.367+00:00",
    "event": "provider.serviceA",
    "severity": "info",
    "requestFrom": "local:node01",
    "code": "OK"
  },
  {
    "nid": "local:node01",
    "name": "node01",
    "tid": "uuid:6429b6c1-6f23-47f5-888c-c0857bf3fec6",
    "ts": "2021-02-21T16:49:23.209+00:00",
    "event": "applicant.serviceA",
    "severity": "info",
    "destinations": [
      "QUEUE-1",
      "QUEUE-2",
      "QUEUE-3"
    ],
    "responseFrom": [
      "local:node03",
      "local:node02"
    ]
  }
]

Je vidět, že mám záznam od obou poskytovatelů služeb (to jsou ty události „provider.serviceA“) a také záznam od žadatele (to je událost „applicant.serviceA“). Současně je také vidět, že o každém typu události vedu mírně odlišné informace.

Uzly bez role audited

Vyzkouším ještě variantu, kdy u nějakého uzlu vypnu roli audited.

Zkuste zastavit uzel node02 a opětovně jej spustit bez profilu audited:

java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node02

Zkusím vyvolat službu, a následně se podívat do auditní databáze, jak se nám to promítlo.

Tip do clanku - webcast avast

[raska@localhost ~]$ time curl -s http://localhost:8081/rest/appl02?text=hahaha | jq .
[
  {
    "nid": "local:node03",
    "name": "node03",
    "tid": "uuid:657d1134-d0a3-47f9-ac2c-9a205817ab22",
    "ts": "2021-02-21T17:03:03.101+00:00",
    "code": "OK",
    "text": "text length: 6"
  },
  {
    "nid": "local:node02",
    "name": "node02",
    "tid": "uuid:657d1134-d0a3-47f9-ac2c-9a205817ab22",
    "ts": "2021-02-21T17:03:03.139+00:00",
    "code": "OK",
    "text": "text length: 6"
  }
]

real    0m3.062s
user    0m0.048s
sys     0m0.006s

A takto vypadá obsah databáze o provedení všech výše popsaných akci:

[raska@localhost ~]$ curl -s http://localhost:8084/audit | jq ".[] | {nid, event, tid, ts}"
{
  "nid": "local:node02",
  "event": "provider.serviceA",
  "tid": "uuid:6429b6c1-6f23-47f5-888c-c0857bf3fec6",
  "ts": "2021-02-21T16:49:20.298+00:00"
}
{
  "nid": "local:node03",
  "event": "provider.serviceA",
  "tid": "uuid:6429b6c1-6f23-47f5-888c-c0857bf3fec6",
  "ts": "2021-02-21T16:49:20.367+00:00"
}
{
  "nid": "local:node01",
  "event": "applicant.serviceA",
  "tid": "uuid:6429b6c1-6f23-47f5-888c-c0857bf3fec6",
  "ts": "2021-02-21T16:49:23.209+00:00"
}
{
  "nid": "local:node03",
  "event": "provider.serviceB",
  "tid": "uuid:657d1134-d0a3-47f9-ac2c-9a205817ab22",
  "ts": "2021-02-21T17:03:03.114+00:00"
}
{
  "nid": "local:node01",
  "event": "applicant.serviceB",
  "tid": "uuid:657d1134-d0a3-47f9-ac2c-9a205817ab22",
  "ts": "2021-02-21T17:03:06.094+00:00"
}

Z výpisu je vidět, že nám přibyly v databázi dva záznamy o průběhu realizace služby B. Záznamy zapsal uzel node03 a node01 (záznam od node02 chybí). Je zde také názorně vidět, k čemu je nám dobrý atribut tid – unikátní identifikátor transakce. Na základě něho jsem schopen spojit několik auditních záznamů do jedné provedené transakce.

Autor článku

Jiří Raška pracuje na pozici IT architekta. Poslední roky se zaměřuje na integrační a komunikační projekty ve zdravotnictví. Mezi jeho koníčky patří také paragliding a jízda na horském kole.