Minha experiência na criação de um aplicativo multithread para trabalhar com backups

No momento, você não surpreenderá ninguém com aplicativos multithread, mas acho que neste artigo você pode encontrar algumas idéias interessantes. Meu estudo de Java começou com esse projeto em particular; talvez, em alguns lugares, eu esteja muito errado ou construa uma bicicleta grande, mas espero que alguém se interesse pela experiência de um iniciante em Java. Vou dar vários recursos do aplicativo:


  • Funciona com backups exclusivamente na memória, independentemente do tamanho do backup
  • Não carrega o backup inteiro na memória
  • As operações de backup / restauração podem ser canceladas

Sob o corte será considerada a arquitetura do aplicativo, bem como os principais problemas encontrados e sua solução.


Visão geral do aplicativo


A comunicação com o aplicativo ocorre por meio da UI da Web, mas no futuro será possível adicionar uma API REST, se necessário.


O aplicativo pode:


  1. Crie backups e envie-os para um ou mais armazenamentos
  2. Restaurar backups carregando-os do armazenamento
  3. Excluir backups de todos os armazenamentos
  4. Crie backups periodicamente

Repositórios atualmente suportados:


  • Sistema de arquivos local (não suportado no Docker)
  • Dropbox

Bancos de dados atualmente suportados:


  • PostgreSQL

De um aplicativo especial, posso observar:


  1. Trabalho correto em uma configuração de cluster
  2. Um backup nunca é totalmente carregado na memória, independentemente do tamanho do backup. O sistema de arquivos para armazenamento de backup temporário também não está envolvido. A criação de um backup e a restauração e, portanto, o carregamento / descarregamento de um backup, ocorrem exclusivamente na memória.
  3. Plataforma cruzada - funciona em Windows e Linux.
  4. Podemos monitorar todas as tarefas em execução e cancelá-las, se necessário.

Abaixo estão as capturas de tela da interface do usuário da Web que descrevem claramente os recursos do aplicativo.


Gerenciamento de armazenamento



Gerenciamento de banco de dados



Criação de backup


Recuperação de backup


Gerenciar backups criados


Backups periódicos


Controlar tarefas em execução




Arquitetura


O trabalho principal será realizado em três serviços - DatabaseBackup , Processor , Storage , e nós os conectaremos usando o conceito de tarefas . Sobre tudo isso ainda mais.


Databasebackup


Este serviço é responsável por criar e restaurar backups em texto sem formatação.


Interface de serviço:


public interface DatabaseBackup { InputStream createBackup(DatabaseSettings databaseSettings, Integer id); void restoreBackup(InputStream in, DatabaseSettings databaseSettings, Integer id); } 

Os dois métodos de interface operam nas instâncias InputStream , pois precisamos que o backup inteiro não seja carregado na memória, o que significa que o backup deve ser lido / gravado no modo de streaming. A entidade DatabaseSettings é pré-criada a partir da interface da Web da Web e armazena as várias configurações necessárias para acessar o banco de dados. O que é esse parâmetro - id - será explicado um pouco mais.


Os requisitos de serviço são os seguintes:


  1. Ambos os métodos não devem ler o backup inteiro na memória.
  2. O método restoreBackup() deve restaurar o backup em uma única transação, para que, em caso de erro, não deixe o banco de dados em um estado inconsistente.

Implementação para PostgreSQL (descrição do texto)

Especificamente, na implementação do PostgreSQL, o serviço é implementado da seguinte maneira:


  1. createBackup() : é criado um processo pg_dump que cria um backup e o grava no fluxo de saída padrão. O fluxo de saída do processo padrão é retornado do método (consulte https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getInputStream-- ). Os fluxos de E / S no sistema são baseados em um buffer de um determinado tamanho e, quando um processo grava no fluxo de saída, ele realmente grava no buffer na memória. O mais importante aqui é que o encadeamento do processo não será gravado no buffer preenchido até que o último tenha sido lido pelo outro lado, o que significa que o encadeamento estará em um estado bloqueado e o backup não será carregado completamente na memória. Você pode ter encontrado uma situação em que seu programa Java travou um impasse ao trabalhar com processos devido ao fato de você não ter lido o stdout ou stderr do processo. É extremamente importante monitorar isso, porque o processo não pode continuar se estiver bloqueado em uma chamada de bloqueio de E / S ao gravar em um buffer completo e ninguém lê esse buffer.
  2. restoreBackup() : um processo psql é criado, o backup é lido a partir do restoreBackup() passado para o método e é gravado simultaneamente no fluxo de entrada padrão psql (consulte https://docs.oracle.com/javase/8/docs/api/java/lang/Process. html # getOutputStream-- ). Isso funciona porque o backup PostgreSQL em texto simples é apenas uma coleção de comandos DDL e DML que são fáceis de entender o psql.

Há muito código, por isso não o darei aqui, mas você pode vê-lo no GitHub usando o link no final do artigo.


Processador


Este serviço é responsável pelo uso de processadores e pelo reprocessamento de backup reverso. Os processadores são usados ​​antes do download para armazenamento ou após o descarregamento do armazenamento. Exemplo de processador: compressor, criptografia.


Interface de serviço:


 public interface Processor { InputStream process(InputStream in); InputStream deprocess(InputStream in); ProcessorType getType(); // ProcessorType -  Enum,     int getPrecedence(); //   } 

Cada processador tem prioridade - se vários processadores forem especificados, eles serão aplicados em ordem decrescente de prioridade. Aplicando a função inversa na mesma ordem em que os processadores foram aplicados, obtemos o backup original.


Armazenamento


Este serviço é responsável por carregar e descarregar um backup, bem como por sua remoção do armazenamento. Exemplo de armazenamento: Dropbox, sistema de arquivos local.


Interface de serviço:


 public interface Storage { void uploadBackup(InputStream in, StorageSettings storageSettings, String backupName, Integer id); InputStream downloadBackup(StorageSettings storageSettings, String backupName, Integer id); void deleteBackup(StorageSettings storageSettings, String backupName, Integer id); } 

Cada backup criado recebe um nome exclusivo - para que possamos encontrá-lo em qualquer um dos armazenamentos para os quais foi baixado. A maneira como o backup é apresentado ao armazenamento é uma questão exclusiva da implementação do serviço, mas ao transferir o nome do backup para uma das funções, devemos esperar o comportamento correto. A entidade StorageSettings é pré-criada a partir da interface da Web da Web e armazena as configurações necessárias para acessar o armazenamento.




Conceito de tarefa


Gostaríamos de poder rastrear o status de nossas tarefas, lidar com possíveis erros, dependendo do andamento da tarefa, e também cancelar tarefas. Portanto, continuaremos a operar apenas com tarefas. Cada tarefa será representada no banco de dados por um registro na tabela e programaticamente pela instância Future (consulte Java Future ). Cada registro na tabela está associado ao seu próprio futuro (além disso, se vários servidores estiverem em execução, as instâncias do futuro poderão estar na memória de diferentes servidores).


Vamos sequencialmente. Primeiro de tudo, precisamos de um serviço para iniciar tarefas - criar, restaurar e excluir backups.


Início da tarefa


Criando um backup:


 public Task startBackupTask(@NotNull Task.RunType runType, @NotNull List<String> storageSettingsNameList, @Nullable List<ProcessorType> processors, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(storageSettingsNameList); Objects.requireNonNull(processors); Objects.requireNonNull(databaseSettings); BackupProperties backupProperties = backupPropertiesManager.initNewBackupProperties(storageSettingsNameList, processors, databaseSettings.getName()); Task task = tasksManager.initNewTask(Task.Type.CREATE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.CREATING); logger.info("Creating backup..."); try (InputStream backupStream = databaseBackupManager.createBackup(databaseSettings, taskId)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_PROCESSORS); logger.info("Applying processors on created backup. Processors: {}", processors); try (InputStream processedBackupStream = backupProcessorManager.process(backupStream, processors)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.UPLOADING); logger.info("Uploading backup..."); backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Creating backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of created backup", ex); } catch (RuntimeException ex) { logger.error("Error occurred while creating backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Backup creating task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

A criação de um backup realiza três etapas principais na seguinte ordem: criação de um backup -> aplicativo de processadores -> upload para armazenamento. Em quase todos os métodos de serviço, encaminhamos o ID da tarefa atual para que o serviço possa relatar um erro de um encadeamento que funciona em segundo plano. Sobre o tratamento de erros, por que o InterruptedException está aqui e o que acontece com um erro após o recebimento de um RuntimeException será discutido mais adiante.


E aqui está como executaremos a tarefa de criar um backup:


 tasksStarterService.startBackupTask(Task.RunType.USER, storageSettingsNameList, processors, databaseSettings); 

O primeiro parâmetro que passamos ao iniciador da tarefa: o usuário ou a tarefa interna do servidor (um exemplo de tarefa interna é um backup periódico). O conhecimento do iniciador de tarefas nos permite mostrar na interface da Web da Web apenas as tarefas que foram iniciadas pelo usuário. Os parâmetros restantes são necessários para criar um backup diretamente - uma lista de armazenamentos, processadores a serem usados, um banco de dados cujo dump você precisa criar.


Ao criar um backup, também é criado um registro no banco de dados chamado BackupProperties . Essa entidade armazenará propriedades de backup, como nome, processadores usados ​​e a lista de repositórios nos quais o backup foi baixado. Além disso, para restaurar ou excluir o backup, operaremos com essa entidade específica.


A tarefa no banco de dados é armazenada no seguinte formato:


 @Entity @Table(name = "backup_tasks") public class Task { /** * Identifier of each backup task. Identifier is generated by PostgreSQL database after saving of entity. */ @Id @Column(insertable = false, updatable = false) @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; /** * Backup task type. * <p> * Type is set at the very start of any task and can't be changed. * * @see Type */ @Enumerated(EnumType.STRING) @Column(updatable = false) private Type type; /** * Who initiated a task: user or server. * <p> * We need to know it to show on front only these tasks that was started by user. * * @see RunType */ @Enumerated(EnumType.STRING) @Column(updatable = false) private RunType runType; /** * Backup task state. * <p> * State is updated with every new step in task being executed. * * @see Task.State */ @Enumerated(EnumType.STRING) private State state; /** * Whether task has been interrupted or not. * <p> * Default is {@literal false}. */ @Column(insertable = false) private boolean interrupted; /** * Identifier of {@link BackupProperties}. * <p> * We need to know backup ID to be able to handle occurred errors. */ @Column(updatable = false) private Integer backupPropertiesId; /** * Start time of the task. */ @Column(updatable = false) private LocalDateTime date; public enum RunType { USER, INTERNAL } public enum State { PLANNED, CREATING, RESTORING, DELETING, APPLYING_PROCESSORS, APPLYING_DEPROCESSORS, DOWNLOADING, UPLOADING, COMPLETED, } public enum Type { CREATE_BACKUP { @Override public String toString() { return "CREATE BACKUP"; } }, RESTORE_BACKUP { @Override public String toString() { return "RESTORE BACKUP"; } }, DELETE_BACKUP { @Override public String toString() { return "DELETE BACKUP"; } } } // getters & setters... } 

Assim, você pode descrever o processo de criação de um backup na forma de um diagrama da seguinte maneira:




Outros tipos de tarefas são iniciados por analogia. Para não sobrecarregar o artigo com uma enorme quantidade de código, para os curiosos, darei o código para iniciar tarefas para restaurar e excluir backup separadamente no spoiler.


Recuperação de backup
 public Task startRestoreTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties, @NotNull String storageSettingsName, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Objects.requireNonNull(storageSettingsName); Objects.requireNonNull(databaseSettings); Task task = tasksManager.initNewTask(Task.Type.RESTORE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.DOWNLOADING); logger.info("Downloading backup..."); try (InputStream downloadedBackup = backupLoadManager.downloadBackup(backupProperties.getBackupName(), storageSettingsName, taskId)) { if (Thread.interrupted() || downloadedBackup == null) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_DEPROCESSORS); logger.info("Deprocessing backup..."); try (InputStream deprocessedBackup = backupProcessorManager.deprocess(downloadedBackup, backupProperties.getProcessors())) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.RESTORING); logger.info("Restoring backup..."); databaseBackupManager.restoreBackup(deprocessedBackup, databaseSettings, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Restoring backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of downloaded backup", ex); } catch (RuntimeException ex) { logger.info("Error occurred while restoring backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

A restauração de um backup realiza três etapas principais na seguinte ordem: descarregando um backup do armazenamento -> usando deprocessadores para obter o backup em texto sem formatação original -> restaurando um backup.


Inicie a recuperação da seguinte maneira:


 tasksStarterService.startRestoreTask(Task.RunType.USER, backupProperties, storageSettingsName, databaseSettings); 

O processo de restauração de um backup na forma de um diagrama:
Processo de recuperação de backup


Excluir backup
 public Task startDeleteTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Task task = tasksManager.initNewTask(Task.Type.DELETE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { try { logger.info("Deleting backup started. Backup properties: {}", backupProperties); tasksManager.updateTaskState(taskId, Task.State.DELETING); backupLoadManager.deleteBackup(backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Deleting backup completed. Backup properties: {}", backupProperties); } catch (RuntimeException ex) { logger.error("Error occurred while deleting backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

O processo de exclusão de um backup é bastante simples: um backup é simplesmente excluído de todos os armazenamentos para os quais foi baixado.


Execute a desinstalação da seguinte maneira:


 tasksStarterService.startDeleteTask(Task.RunType.USER, backupProperties); 

O processo de exclusão de um backup na forma de um diagrama:
Processo de remoção de backup




Cancelar tarefa


O que é cancelamento de tarefa? Obviamente, isso nada mais é do que um encerramento de thread. Você pode ver que todo o código principal em execução no Future está agrupado na seguinte construção try-catch:


 try { ... } catch (InterruptedException ex) { ... tasksManager.setInterrupted(taskId); } 

E também após cada método importante, cujo fluxo de execução controlamos, a seguinte construção é instalada:


 if (Thread.interrupted()) { throw new InterruptedException(); } 

Antes de prosseguir, uma breve teoria das interrupções e estados dos encadeamentos da JVM deve ser fornecida.


Os encadeamentos na JVM podem ter os seguintes estados:


  1. Novo
  2. Executável
  3. Espera cronometrada
  4. Esperando
  5. Bloqueado
  6. Terminado

Estamos interessados ​​apenas nos estados Em espera e Em espera temporizada. O Object.wait() no estado Waiting pelos métodos Object.wait() , Thread.join() e outros. O encadeamento é colocado no estado de espera Temporizada (ou seja, uma espera que dura um certo período de tempo) usando os métodos Object.wait(timeout) , Thread.join(timeout) , Thread.sleep(sleeping) e outros.


O mais importante aqui é que, se você interromper o encadeamento antes de entrar no estado Em espera ou Em espera temporizada, ou quando o encadeamento estiver nesse estado , o encadeamento será ativado, lançando uma InterruptedException .


Mas isso não é tudo. Não é de todo um fato que um encadeamento entre nos dados do estado criando, restaurando ou excluindo um backup. Como então informar o thread que foi interrompido?


A primeira maneira é verificar independentemente o sinalizador de interrupção com o thread usando os métodos Thread.interrupted Thread.interrupted() ou Thread.currentThread.isInterrupted() . A diferença entre eles é que o primeiro chama o método nativo particular currentThread.isInterrupted(boolean ClearInterrupted) , passando true , indicando que o sinalizador de interrupção será limpo e o segundo passando false , deixando o sinalizador de interrupção intacto. A escolha entre esses dois métodos depende inteiramente da situação. Quando uma InterruptedException é lançada, o sinalizador de interrupção também é limpo - vale a pena lembrar.


Mas deve haver um caminho mais fácil - e é. No aplicativo, há uma enorme quantidade de trabalho com fluxos de E / S e, portanto, com métodos de E / S. Nossa tarefa é garantir que, ao chamar os métodos read() ou write(int b) no fluxo de E / S, seja gerado um erro durante a interrupção, informando que a chamada de E / S bloqueada foi interrompida. Felizmente, o Java tem essa exceção - InterruptedIOException . No entanto, nem todos os métodos de fluxo de leitura / gravação monitoram interrupções de encadeamento e, especificamente, apenas o PipedInputStream o monitora. Portanto, nos locais em que esse fluxo não está envolvido, devemos estender o método de leitura / gravação para que, quando houver uma interrupção, uma InterruptedIOException seja lançada. De fato, a extensão do método read () foi suficiente para mim no aplicativo apenas em um lugar - quando o InputStream retornou do método de upload de backup. É assim que podemos aprender sobre a origem de uma interrupção sem precisar colocar verificações de modelo na bandeira em todos os lugares. No entanto, é importante capturar essa exceção separadamente do IOException e manipulá-la separadamente. Obviamente, você não pode prescindir da ajuda de uma verificação de modelo da bandeira em alguns lugares, mas ela se tornou melhor.


Também é importante observar que, se o sinalizador foi apagado durante o processamento da interrupção, é sempre necessário configurá-lo novamente para que, após retornar do método, possamos descobrir a interrupção que ocorreu.


Deixe-me explicar com um exemplo por que isso é importante. Suponha que façamos o upload de um backup para o armazenamento no método upload () e ocorra uma interrupção. A interrupção é processada, o trabalho é interrompido e o método retorna. A interrupção não ocorre com casualidade - significa que ocorreu um erro em algum lugar ou o usuário cancelou a tarefa. Independentemente do motivo, devemos interromper todo o trabalho neste futuro. Mas se você não definir o sinalizador de interrupção novamente antes de retornar do método de inicialização, nunca saberemos no bloco principal do futuro a interrupção que ocorreu.
O mesmo exemplo de código:


 backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); <-   ,       if (Thread.interrupted()) { //      ,      - ,    throw new InterruptedException(); } 

Portanto, é uma boa prática manipular uma InterruptedException ou InterruptedIOException da seguinte maneira:


 try { ... } catch (InterruptedException e) { //  InterruptedIOException ... // re-interrupt the thread Thread.currentThread().interrupt(); } 

Bem, podemos lidar com a interrupção, mas quem realmente interromperá os threads?
Para fazer isso, criaremos outra entidade chamada CancelTask , que armazenará o ID da tarefa para cancelamento e também escreveremos um relógio que tentará interromper as tarefas. Por que tentar? Porque:


  1. Não foi possível finalizar o encadeamento na memória de outro servidor. Vários servidores podem trabalhar para nós, o que significa que o Future está espalhado em diferentes servidores. Assim, quando uma solicitação para cancelar uma tarefa chega em um dos servidores, o futuro desejado pode estar na memória de outro servidor.
  2. A tarefa não pode ser cancelada porque o Future foi perdido devido a uma falha no servidor.

Descreva brevemente o algoritmo de cancelamento à noite:
Watercher pega todos os registros da tabela cancel_tasks (ao mesmo tempo, o bloqueio não está definido), passa por cada um e tenta obter o futuro correspondente de sua memória. Se o futuro for recebido com êxito, o encadeamento correspondente será interrompido, a tarefa será revertida e a solicitação será excluída da tabela. Se a solicitação de tempo limite para cancelar a tarefa for excedida (o que significa que o servidor travou e o futuro foi perdido) - a solicitação é simplesmente excluída da tabela. Se vários servidores notarem um tempo limite e excluir o registro da tabela, nada de ruim acontecerá, porque a exclusão no PostgreSQL é idempotente.


CancelTasksWatcher Code:


Texto oculto
 /** * This class scans for tasks to cancel and tries to cancel them. */ @Component class CancelTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(CancelTasksWatcher.class); private static final Duration cancelTimeout = Duration.ofMinutes(10); private CancelTasksManager cancelTasksManager; private TasksStarterService tasksStarterService; private TasksManager tasksManager; // spring setters... /** * This watcher wakes up every time 10 seconds passed from the last completion, checks if there are any tasks to cancel and tries to * cancel each task. * <p> * Since there are can be working more that one instance of the program, {@literal Future} instance of task can belong to different * servers. We can't get access to {@literal Future} if it's not in memory of the server where task cancellation request was accepted. * So the purpose of this watcher is to be able cancel tasks that works in the other instance of program. Each server has this watcher * checking for available cancellation requests and if any, the watcher tries to cancel corresponding {@literal Future}. * If cancellation is successful task will be also reverted. * <p> * If task cancellation request timeout exceeded, then it means a server that had requested {@literal Future} instances has been * shutdown, so all {@literal Future} instances lost and task can't be canceled. In such case task cancellation request will be ignored. * * @see TasksStarterService#getFuture(Integer) * @see TasksManager#revertTask(Task) */ @Scheduled(fixedDelay = 10 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchTasksToCancel() { Iterable<CancelTask> cancelTasks = cancelTasksManager.findAll(); Iterable<Task> tasks = tasksManager.findAllById(StreamSupport.stream(cancelTasks.spliterator(), false) .map(CancelTask::getTaskId).collect(Collectors.toList())); Map<Integer, Task> tasksAsMap = StreamSupport.stream(tasks.spliterator(), false) .collect(Collectors.toMap(Task::getId, Function.identity())); List<Integer> taskIdsForDeleting = new ArrayList<>(); for (CancelTask cancelTask : cancelTasks) { Integer taskId = cancelTask.getTaskId(); Task task = tasksAsMap.get(taskId); if (task == null) { logger.error("Can't cancel task: no such entity with ID {}", taskId); taskIdsForDeleting.add(taskId); continue; } // timeout exceeded, that is server shutdown and lost all Future instances, so task can't be canceled if (LocalDateTime.now(ZoneOffset.UTC).isAfter(cancelTask.getPutTime().plus(cancelTimeout))) { logger.error("Can't cancel task: timeout exceed. Task ID: {}", taskId); taskIdsForDeleting.add(taskId); continue; } tasksStarterService.getFuture(taskId).ifPresent(future -> { logger.info("Canceling task with ID {}", taskId); boolean canceled = future.cancel(true); if (canceled) { try { // give time to properly handle interrupt Thread.sleep(10000); } catch (InterruptedException e) { // should not happen } tasksManager.revertTask(task); } taskIdsForDeleting.add(taskId); logger.info("Task canceled: {}. Task ID: {}", canceled, taskId); }); } cancelTasksManager.deleteByTaskIdIn(taskIdsForDeleting); } } 



Tratamento de erros


, , Future, try-catch :


 try { ... } catch (RuntimeException e) { ... errorTasksManager.addErrorTask(taskId); } 

RuntimeException , Future , .


addErrorTask(taskId) , ID , .
? , , , .


:
, , . — PostgreSQL select for update , select skip locked . , , revertTask() , .


ErrorTasksWatcher :


 /** * This class scans for erroneous tasks and handles them depending on their state. */ @Component class ErrorTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(ErrorTasksWatcher.class); private static final Integer nRows = 10; private TasksManager tasksManager; private ErrorTasksManager errorTasksManager; // spring setters... /** * This watcher wakes up every time 1 minute passed from the last completion, checks backup states periodically and handles erroneous * tasks if any. * <p> * The watcher handles at most N tasks as described by {@link #nRows} constant and skips already locked tasks. * When retrieving error tasks from database pessimistic lock is set. It allows safely run more than one copy of program, as no other * watcher can pick up already being handled error tasks. * <p> * If the server shutdowns while rows was locked, transaction will be rolled back and lock released, so these entities can be picked * up by the other running server. */ @Scheduled(fixedDelay = 60 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchErrorTasks() { for (ErrorTask errorTask : errorTasksManager.findFirstNAndLock(nRows)) { if (!errorTask.isErrorHandled()) { Integer backupTaskId = errorTask.getTaskId(); Optional<Task> optionalTask = tasksManager.findById(backupTaskId); if (!optionalTask.isPresent()) { logger.info("Can't handle erroneous task: no corresponding backup task entity. Backup task ID: {}", backupTaskId); continue; } tasksManager.revertTask(optionalTask.get()); errorTask.setErrorHandled(true); } } } } 

revertTask(Task) :


  /** * This function reverts erroneous task by its entity. * <p> * Use this function only after canceling related {@literal Future}. * <p> * If the task was of the type {@link Task.Type#CREATE_BACKUP} then related {@link BackupProperties} will be deleted. * * @param task the entity */ public void revertTask(@NotNull Task task) { Objects.requireNonNull(task); Task.State state = task.getState(); switch (state) { case DOWNLOADING: case APPLYING_DEPROCESSORS: case RESTORING: case DELETING: { logger.info("Handling broken operation. Operation: {}. No extra actions required", state.toString()); break; } case CREATING: case APPLYING_PROCESSORS: { logger.info("Handling broken operation. Operation: {}. Delete backup properties...", state.toString()); Integer backupPropertiesID = task.getBackupPropertiesId(); if (!backupPropertiesManager.existsById(backupPropertiesID)) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } backupPropertiesManager.deleteById(backupPropertiesID); break; } case UPLOADING: { logger.info("Handling broken operation. Operation: {}. Deleting backup from storage...", state); Integer backupPropertiesId = task.getBackupPropertiesId(); Optional<BackupProperties> optionalBackupProperties = backupPropertiesManager.findById(backupPropertiesId); if (!optionalBackupProperties.isPresent()) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } tasksStarterService.startDeleteTask(Task.RunType.INTERNAL, optionalBackupProperties.get()); backupPropertiesManager.deleteById(backupPropertiesId); break; } default: { logger.error("Can't revert task: unknown state. Task info: {}", task); } } } 

Vamos analisar as possíveis situações:


  1. DOWNLOADING , APPLYING_DEPROCESSORS , RESTORING , DELETING — . , .
  2. CREATING , APPLYING_PROCESSORS — , . BackupProperties , ( BackupProperties Web UI ).
  3. UPLOADING — . BackupProperties , . .

, . , ? , , Future ( 1), , InputStream ( 2). , 2, 1 2 ?


, , , . Future ( 1) :


  public void onError(@NotNull Throwable t, @NotNull Integer taskId) { logger.error("Exception caught. Task ID: {}", taskId, t); Optional<Future> optionalFuture = tasksStarterService.getFuture(taskId); if (!optionalFuture.isPresent()) { logger.error("Can't cancel the Future of task with ID {}: no such Future instance", taskId); } else { boolean canceled = optionalFuture.get().cancel(true); if (!canceled) { logger.error("Error canceling the Future of task with ID {}", taskId); } else { logger.info("Task canceled. Task ID: {}", taskId); errorTasksManager.setError(taskId); } } } 

, , ID , , Future - , ID .


, , , , , .


, :


, , , . — Future.


, , , I/O , — / . , . :


  1. , . , — .
  2. — Future , . , / , , ( , — IOException , , ).

, — ( ID , , ), .




, , . , , .



  1. Web UI: , . ,

Conclusão


:



, ! , GitHub!

Source: https://habr.com/ru/post/pt459478/


All Articles