Hlavní navigace

Služby v distribuovaných systémech – vzdálené úložiště dat

2. 6. 2021
Doba čtení: 10 minut

Sdílet

 Autor: Depositphotos
Dnes se zaměříme na možnosti použití vzdáleného ukládání dat. Co kdyby se každý uzel zálohoval sám? V případě nutnosti náhrady bych pak musel na nové instanci pouze nastavit jeho identifikaci a parametry pro připojení na broker.

Obdobnou problematikou jsem se zabýval dva články nazpět. Tehdy šlo o centrální sběr dat od komunikačních uzlů a jejich případné další zpracování. V tomto případě nejde ani tak o to, abych dostal nějaká data na jednu hromadu. Jde o to, aby každý uzel měl možnost odložit nějaká svá data na externí úložiště a následně si je vyzvednout a dále s nimi pracovat.

K čemu by to mohlo být dobré? Zkusím to naznačit na příkladu sítě automatizovaných meteorologických stanic, který jsem používal již dříve. Pokud provozujete síť s velkým počtem uzlů, pak se budete jistě dostávat do situace, kdy vám některý z uzlů zkolabuje a potřebujete jej vyměnit. No a hodilo by se, aby případná výměna uzlu byla co možná technicky nejjednodušší. Můžete unifikovat hardware i software uzlů, ale vždy narazíte při výměně na problém a konfiguračními a stavovými daty každého uzlu. Tady se unifikuje dost obtížně.

Základní přístup každého technika je zálohování. Jak to ale zajistit, pokud jediným přístupným spojením na uzel je message broker?

Co kdyby se každý uzel zálohoval sám? V případě nutnosti náhrady porouchaného uzlu bych pak musel na nové instanci pouze nastavit jeho identifikaci a parametry pro připojení na broker. Po spuštění by pak uzel načetl svá konfigurační a stavová data ze vzdáleného úložiště a mohl by pokračovat v činnosti tam, kde jeho předchůdce skončil (nebo přesněji řečeno v bodě, kdy jeho předchůdce udělal poslední zálohu svých konfiguračních a stavových dat).

Vzdálené úložiště dat

Vyhradím si tedy v síti nějaký uzel, který bude plnit úlohu centrálního úložiště dat pro všechny ostatní uzly v síti. V reálném nasazení budou data uložena v nějakém vhodném typu databáze. Pro ukázku si ale vystačím úložištěm ve formě Java Bean.

Budu používat úložiště typu klíč → hodnota, tedy v případě implementace v Java je to rozhraní Map. Jako klíč budu používat technický identifikátor uzlu (nid). Hodnotou pro mne bude JSON řetězec reprezentující stavová a konfigurační data. Jako úložiště nemusím znát strukturu ukládaných dat. Záleží pouze na jednotlivých uzlech a jejich implementacích, co si do úložiště zapíší.

Operace pro práci s úložištěm odpovídají základním metodám CRUD (create, read, update, delete).

Následující diagram shrnuje předchozí popis a základní představu o práci s externím úložištěm dat:

V mém případě budu používat pouze dvě operace pro práci s úložištěm:

  • načtení konfiguračních dat podle klíče

Uzel s rolí <state-aware> se při svém spuštění pokusí načíst stavová data z centrálního úložiště. V případě, že se mu nepodaří data načíst, vytvoří si stav nový. V průběhu své činnosti zaznamenává počty provedených služeb do svého stavu. Dříve, než je uzel zastavený, zapíše svůj aktuální stav do centrálního úložiště. Je zde pochopitelně nebezpečí, že při nějakém náhodném pádu uzlu nebude stav uložen. Proti této situaci je dobré se bránit nějakými dalšími opatřeními, např. pravidelným ukládáním stavu za běhu nebo při změně stavu. To ale vždy bude záležet na konkrétní situaci, jaká rizika jsou pro nás akceptovatelná.

Uzel, který plní úlohu centrálního úložiště, má přidělenu roli <repository>.

Jako vhodný způsob komunikace je v tomto případě metoda request/response s využitím centrálně definované fronty. Pro toto použití jsem si vytvořil následující typy zpráv:

V dotazu RepositoryRequest je specifikována operace, jakou chci s daty provést. Dále pak klíč, a v některých případech také jeho hodnota. Podle typu operace pak RepositoryResponse obsahuje hodnotu v úložišti pro zadaný klíč.

Role <repository>

Tato role přísluší centrálnímu uzlu, který bude plnit úlohu úložiště pro konfigurační a stavová data poskytovanou všem ostatním uzlům zapojeným do sítě.

Datové komponenty

Jako úložiště se v reálu bude používat nějaký typ perzistentního uložení. Pro ukázku mně bude ale postačovat jako úložiště implementace v Java Bean s rozhraním Map. Implementace takového úložiště je ve třídě RepositoryComponent.

@Component
@Profile(value = "repository")
public class RepositoryComponent {

    @Bean
    public Map<String, String> repository() {
        return Collections.synchronizedMap(new HashMap<>());
    }
}

Komunikační funkce

V rámci komunikační vrstvy jsou implementovány CRUD operace pro přístup k úložišti. Jedná se o třídu RepositoryCamelRoutes:

@Component
@Profile(value = "repository")
public class RepositoryCamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(RepositoryCamelRoutes.class);

    @Autowired
    Map<String, String> repository;

    @Autowired
    TokenFactory factory;

    @Override
    public void configure() throws Exception {

        from("{{services.repository.uri}}").routeId("repository")
            .choice()
                .when(body().isInstanceOf(RepositoryRequest.class))
                    .process(exchange -> {
                        RepositoryRequest request = exchange.getMessage().getBody(RepositoryRequest.class);
                        RepositoryResponse response = factory.tokenInstance(request.getTid(), RepositoryResponse.class);
                        response.setCode(ResponseCodeType.OK);
                        switch (request.getOperation()) {
                            case CREATE:
                                if (!repository.containsKey(request.getKey()))
                                    repository.put(request.getKey(), request.getValue());
                                else
                                    response.setCode(ResponseCodeType.REFUSED);
                                break;
                            case READ:
                                response.setValue(repository.getOrDefault(request.getKey(), null));
                                break;
                            case UPDATE:
                                if (repository.containsKey(request.getKey()))
                                    repository.put(request.getKey(), request.getValue());
                                else
                                    response.setCode(ResponseCodeType.REFUSED);
                                break;
                            case UPSERT:
                                repository.put(request.getKey(), request.getValue());
                                break;
                            case DELETE:
                                if (repository.containsKey(request.getKey()))
                                    repository.remove(request.getKey());
                                else
                                    response.setCode(ResponseCodeType.REFUSED);
                                break;
                        }
                        exchange.getMessage().setBody(response);
                    })
                .otherwise()
                    .process(exchange -> {
                        Request request = exchange.getMessage().getBody(Request.class);
                        RepositoryResponse response = factory.tokenInstance(request.getTid(), RepositoryResponse.class);
                        response.setCode(ResponseCodeType.REFUSED);
                        exchange.getMessage().setBody(response);
                    })
            .end();
    }
}

Kontroluje se, zda přijatá žádost je skutečně typu RepositoryRequest. Pokud ano, pak se provede požadovaná operace na úložišti, jinak se požadavek odmítne.

Aplikační rozhraní

Abych se mohl podívat, co je v úložišti aktuálně zapsáno, vytvořil jsem jednoduchou funkci pro přístup k jeho obsahu. Výsledkem je pole JSON objektů reprezentujících zapsaná data pro klíč.

Implementace rozhraní je ve třídě RepositoryServiceController:

@RestController
@Profile(value = "repository")
public class RepositoryServiceController {

    private static final Logger logger = LoggerFactory.getLogger(RepositoryServiceController.class);

    @Autowired
    Map<String, String> repository;

    @RequestMapping(value = "/repository")
    public ResponseEntity<Map<String, String>> getAuditRecords() throws Exception {
        return new ResponseEntity<Map<String, String>>(repository, HttpStatus.OK);
    }
}

Role <state-aware>

Pokud má uzel tuto roli, pak ukládá svůj stav do centrálního úložiště.

Stavem v mém případě jsou počítadla počtu požadavků na aplikační službu a počtu odpovědí na aplikační službu. Ty jsou spojeny do jednoho objektu typu Map, kde klíčem jsou „requests“ případně „responses“.

Stav uzlu je načítán při startu aplikace, přesněji řečeno po nastartování Spring kontextu. Uložení stavu je provedeno před ukončením Spring kontextu, a to formou reakce na událost ContextClosedEvent. Obě akce jsou vázány na start/ukončení aplikace, proto jsem je zařadil do hlavní třídy Application.

Datové komponenty

Pro aktualizaci počítadel požadavků a odpovědí na služby, načítání a ukládání stavových dat uzlu, jsem vytvořil samostatnou komponentu ApplicationStateComponent:

@Component
@Profile(value = "state-aware")
public class ApplicationStateComponent {

    private static final Logger logger = LoggerFactory.getLogger(ApplicationStateComponent.class);

    private static final long SERVICE_EXPIRE = 1000;

    @Autowired
    private ObjectMapper jsonMapper;

    @Autowired
    private ProducerTemplate producerTemplate;

    @Autowired
    TokenFactory factory;
    private final Map<String, Integer> statistics;

    public ApplicationStateComponent() {
        statistics = new HashMap<>();
        statistics.put("requests", 0);
        statistics.put("responses", 0);
    }

    public void init() {
        RepositoryRequest request = factory.tokenInstance(RepositoryRequest.class);
        request.setOperation(OperationType.READ);
        request.setKey(request.getNid().toString());
        Map<String, Object> headers = new HashMap<>();
        headers.put(JmsConstants.JMS_REQUEST_TIMEOUT, SERVICE_EXPIRE);

        RepositoryResponse response = producerTemplate.requestBodyAndHeaders("direct:state-aware", request, headers, RepositoryResponse.class);
        if (response != null && response.getCode() == ResponseCodeType.OK) {
            logger.info("Read from Repository succeeded.");
            if (response.getValue() != null) {
                try {
                    Map<String, Integer> data = (Map) jsonMapper.readValue(response.getValue(), Map.class);
                    data.entrySet().forEach(e -> statistics.put(e.getKey(), e.getValue()));
                } catch (JsonProcessingException e) {
                    logger.error("Cannot load application state", e);
                }
            }
        }
        else {
            logger.warn("Read from Repository failed!");
        }
        logger.info("Application State: {}", statistics.entrySet().stream().map(e -> e.getKey().toString() + "=" + e.getValue()).collect(Collectors.joining(", ")));
    }

    public void close() {
        try {
            String data = jsonMapper.writeValueAsString(statistics);
            RepositoryRequest request = factory.tokenInstance(RepositoryRequest.class);
            request.setOperation(OperationType.UPSERT);
            request.setKey(request.getNid().toString());
            request.setValue(data);
            Map<String, Object> headers = new HashMap<>();
            headers.put(JmsConstants.JMS_REQUEST_TIMEOUT, SERVICE_EXPIRE);

            RepositoryResponse response = producerTemplate.requestBodyAndHeaders("direct:state-aware", request, headers, RepositoryResponse.class);
            if (response != null && response.getCode() == ResponseCodeType.OK)
                logger.info("Upsert to Repository succeeded.");
            else
                logger.warn("Upsert to Repository failed!");
        } catch (JsonProcessingException e) {
            logger.error("Cannot store application state", e);
        }
    }

    public void requestMade() {
        statistics.put("requests", statistics.get("requests") + 1);
    }

    public void responseMade() {
        statistics.put("responses", statistics.get("responses") + 1);
    }
}

Metoda init() slouží k načtení stavu ze vzdáleného úložiště. Provádí tedy operaci READ prostřednictvím komunikační funkce na URI „direct:state-aware“. Pokud je výsledkem operace vrácený JSON objekt, pak jeho atributy promítne do stavu uzlu.

Vyvolání metody při startu aplikačního kontextu zajišťuje tato část kódu v rámci třídy Application (pokud není role uzlu přidělena, pak je applicationState roven null):

@Autowired(required = false)
private ApplicationStateComponent applicationState;

@Override
public void run(ApplicationArguments args) throws Exception {
    if (applicationState != null)
        applicationState.init();
}

Metoda close() naopak ukládá aktuální stav uzlu do vzdáleného úložiště. Nejdříve stav převede na JSON objekt, který následně odešle do úložiště jako operace UPSERT. Klíčem je vždy nid uzlu. Pro komunikaci je opět využita stejná služba na URI „direct:state-aware“.

Vyvolání metody při ukončení aplikačního kontextu je v rámci třídy Application zajištěno:

public static void main(String[] args) {
    context = SpringApplication.run(Application.class, args);
    addContextListeners();
}

private static final void addContextListeners() {
    context.addApplicationListener(applicationEvent -> {
        if (applicationEvent instanceof ContextClosedEvent) {
            try {
                context.getBean(ApplicationStateComponent.class).close();
            }
            catch (NoSuchBeanDefinitionException e) { }
        }
    });
}

Komunikační funkce

Takto vypadá komunikační vrstva pro roli, třída StateAwareCamelRoutes:

@Component
@Profile(value = "state-aware")
public class StateAwareCamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(StateAwareCamelRoutes.class);

    @Override
    public void configure() throws Exception {
        from("direct:state-aware").routeId("state-aware")
            .onException(ExchangeTimedOutException.class)
                .handled(true)
                .process(exchange -> {
                    exchange.getMessage().setBody(null);
                })
            .end()
            .to("{{services.repository.uri}}");
    }
}

Nastal čas na vyzkoušení

Ukázka základní funkčnosti

Nejdříve si vyzkoušíme konfiguraci uzlů tak, jak jsem je naznačil v sekvenčním diagramu výše. Spustím tedy uzly s těmito rolemi:

  • node01 – applicant, state-aware
  • node02 – provider, state-aware
  • node03 – provider, state-aware
  • node06 – repository

Příkazy spustit každý v samostatném terminálu:

java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node06
java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node01,state-aware
java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node02,state-aware
java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node03,state-aware

Při startu uzlu node01, node02 a node03 by jste měli v logu vidět obdobní hlášení o inicializaci stavu uzlu:

2021-04-03 12:56:22.715  INFO 8338 --- [           main] c.d.d.c.ApplicationStateComponent        : Read from Repository succeeded.
2021-04-03 12:56:22.720  INFO 8338 --- [           main] c.d.d.c.ApplicationStateComponent        : Application State: responses=0, requests=0

A nyní se můžu podívat na obsah úložiště:

[raska@localhost ~]$ curl -s http://localhost:8086/repository | jq .
{}

Aktuálně nemám v úložišti nic zapsáno. To by se mělo změnit při ukončení některého z uzlů s rolí <state-aware>.

Nejdříve si ale zkusím nějaký stav vytvořit tím, že několikrát zavolám aplikační službu, např:

[raska@localhost ~]$ time curl -s http://localhost:8081/rest/appl01?value=1234 | jq .
[
  {
    "nid": "local:node02",
    "name": "node02",
    "tid": "uuid:23a28f0d-6a2d-495f-bf74-00e3530a91a4",
    "ts": "2021-04-03T10:58:38.465+00:00",
    "code": "OK",
    "result": 1744
  },
  {
    "nid": "local:node03",
    "name": "node03",
    "tid": "uuid:23a28f0d-6a2d-495f-bf74-00e3530a91a4",
    "ts": "2021-04-03T10:58:38.459+00:00",
    "code": "OK",
    "result": 1513
  }
]

real    0m3.433s
user    0m0.052s
sys     0m0.009s

Dále zkusím zastavit uzly node01, node02 a node03 (pro zastavení Ctrl-C), a opět se zkusím podívat na obsah úložiště:

[raska@localhost ~]$ curl -s http://localhost:8086/repository | jq .
{
  "local:node01": "{\"responses\":0,\"requests\":2}",
  "local:node02": "{\"responses\":2,\"requests\":0}",
  "local:node03": "{\"responses\":2,\"requests\":0}"
}

Teď již mám stavové informace zapsané pro každý uzel.

Uzel bez role state-aware

A dále můžu pokračovat v experimentování, tak například:

Nastartuji si opět uzly01 a node03, u uzlu node02 odstraním profil state-aware. Dále zkusím vyvolat nějakou službu, a následně opět všechny uzly ukončit.

MIF21_Dolejsova

Výsledek by mohl vypadat následovně:

[raska@localhost ~]$ curl -s http://localhost:8086/repository | jq .
{
  "local:node01": "{\"responses\":0,\"requests\":5}",
  "local:node02": "{\"responses\":2,\"requests\":0}",
  "local:node03": "{\"responses\":5,\"requests\":0}"
}

Aplikační službu jsem zavolal třikrát po sobě. Je vidět, že stav se změnil u uzlu node01 a node03, node02 zůstal ve stavu z předchozího experimentování.

Autor článku

Jiří Raška pracuje na pozici IT architekta. Poslední roky se zaměřuje na integrační a komunikační projekty ve zdravotnictví. Mezi jeho koníčky patří také paragliding a jízda na horském kole.