Mi experiencia en la creación de una aplicación multiproceso para trabajar con copias de seguridad

En este momento, no sorprenderá a nadie con aplicaciones multiproceso, pero creo que en este artículo puede encontrar algunas ideas interesantes. Mi estudio de Java comenzó con este proyecto, así que tal vez en algunos lugares me equivocaré mucho o construiré una bicicleta grande, pero espero que alguien esté interesado en la experiencia de un principiante en Java. Daré varias características de la aplicación:


  • Funciona con copias de seguridad exclusivamente en la memoria, independientemente del tamaño de la copia de seguridad.
  • No carga la copia de seguridad completa en la memoria
  • Las operaciones de copia de seguridad / restauración pueden cancelarse

Bajo el corte se considerará la arquitectura de la aplicación, así como los principales problemas encontrados y su solución.


Resumen de la aplicación


La comunicación con la aplicación se realiza a través de la interfaz de usuario web, pero en el futuro será posible agregar una API REST si es necesario.


La aplicación puede:


  1. Cree copias de seguridad y cárguelas en uno o más almacenes
  2. Restaurar copias de seguridad cargándolas desde el almacenamiento
  3. Eliminar copias de seguridad de todos los almacenes
  4. Crear copias de seguridad periódicamente

Repositorios actualmente soportados:


  • Sistema de archivos local (no compatible con Docker)
  • Dropbox

Bases de datos compatibles actualmente:


  • PostgreSQL

De una aplicación especial, puedo notar:


  1. Corregir el trabajo en una configuración de clúster
  2. Una copia de seguridad nunca se carga completamente en la memoria, independientemente del tamaño de la copia de seguridad. El sistema de archivos para el almacenamiento de respaldo temporal tampoco está involucrado. Tanto la creación de una copia de seguridad como la restauración, y por lo tanto la carga / descarga de una copia de seguridad, se producen exclusivamente en la memoria.
  3. Multiplataforma: funciona tanto en Windows como en Linux.
  4. Podemos monitorear todas las tareas en ejecución y cancelarlas si es necesario.

A continuación se muestran capturas de pantalla de la interfaz de usuario web que describen claramente las características de la aplicación.


Gestión de almacenamiento



Gestión de bases de datos



Creación de respaldo


Recuperación de copia de seguridad


Administrar copias de seguridad creadas


Copias de seguridad periódicas


Realizar un seguimiento de las tareas en ejecución




Arquitectura


El trabajo principal tendrá lugar en 3 servicios: base de datos , procesador , almacenamiento , y los conectaremos juntos utilizando el concepto de tareas . Sobre todo esto más allá.


Copia de seguridad de la base de datos


Este servicio es responsable de crear y restaurar copias de seguridad de texto sin formato.


Interfaz de servicio:


public interface DatabaseBackup { InputStream createBackup(DatabaseSettings databaseSettings, Integer id); void restoreBackup(InputStream in, DatabaseSettings databaseSettings, Integer id); } 

Ambos métodos de interfaz operan en instancias de InputStream , ya que necesitamos que la copia de seguridad completa no se cargue en la memoria, lo que significa que la copia de seguridad debe leerse / escribirse en modo de transmisión. La entidad DatabaseSettings se crea previamente desde la interfaz de usuario web y almacena las diversas configuraciones necesarias para acceder a la base de datos. ¿Cuál es este parámetro, id ?, Se explicará un poco más.


Los requisitos de servicio son los siguientes:


  1. Ambos métodos no deberían leer la copia de seguridad completa en la memoria.
  2. El método restoreBackup() debe restaurar la copia de seguridad en una sola transacción, de modo que, en caso de error, no deje la base de datos en un estado inconsistente.

Implementación para PostgreSQL (descripción del texto)

Específicamente, en la implementación de PostgreSQL, el servicio se implementa de la siguiente manera:


  1. createBackup() : se crea un proceso pg_dump que creará una copia de seguridad y la escribirá en la secuencia de salida estándar. La secuencia de salida del proceso estándar se devuelve desde el método (consulte https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getInputStream-- ). Los flujos de E / S en el sistema se basan en un búfer de cierto tamaño, y cuando un proceso escribe en el flujo de salida, en realidad escribe en el búfer en la memoria. Lo más importante aquí es que el hilo del proceso no escribirá en el búfer lleno hasta que este último haya sido leído por el otro lado, lo que significa que el hilo estará bloqueado y la copia de seguridad no se cargará completamente en la memoria. Es posible que haya encontrado una situación en la que su programa Java se estancó mientras trabajaba con procesos debido al hecho de que no leyó el stdout o stderr del proceso. Es extremadamente importante monitorear esto, porque el proceso no puede continuar si está bloqueado en una llamada de bloqueo de E / S cuando se escribe en un búfer completo y nadie lee este búfer.
  2. restoreBackup() : se crea un proceso psql , la copia de seguridad se lee desde el restoreBackup() pasado al método y se escribe simultáneamente en la secuencia de entrada estándar psql (consulte https://docs.oracle.com/javase/8/docs/api/java/lang/Process. html # getOutputStream-- ). Esto funciona porque la copia de seguridad de PostgreSQL en texto plano es solo una colección de comandos DDL y DML que son fáciles de entender psql.

Hay mucho código, así que no lo daré aquí, pero puedes verlo en GitHub usando el enlace al final del artículo.


Procesador


Este servicio es responsable del uso de procesadores y el reprocesamiento de copias de seguridad inversas. Los procesadores se utilizan antes de descargar al almacenamiento o después de descargar del almacenamiento. Ejemplo de procesador: compresor, cifrado.


Interfaz de servicio:


 public interface Processor { InputStream process(InputStream in); InputStream deprocess(InputStream in); ProcessorType getType(); // ProcessorType -  Enum,     int getPrecedence(); //   } 

Cada procesador tiene prioridad: si se especifican varios procesadores, se aplicarán en orden descendente de prioridad. Aplicando la función inversa en el mismo orden en que se aplicaron los procesadores, obtenemos la copia de seguridad original.


Almacenamiento


Este servicio es responsable de cargar y descargar una copia de seguridad, así como su eliminación del almacenamiento. Ejemplo de almacenamiento: Dropbox, sistema de archivos local.


Interfaz de servicio:


 public interface Storage { void uploadBackup(InputStream in, StorageSettings storageSettings, String backupName, Integer id); InputStream downloadBackup(StorageSettings storageSettings, String backupName, Integer id); void deleteBackup(StorageSettings storageSettings, String backupName, Integer id); } 

A cada copia de seguridad creada se le asigna un nombre único, por lo que podemos encontrarla en cualquiera de los almacenes en los que se descargó. La forma en que se presenta la copia de seguridad en el almacenamiento depende exclusivamente de la implementación del servicio, pero al transferir el nombre de la copia de seguridad a una de las funciones, debemos esperar el comportamiento correcto. La entidad StorageSettings se crea previamente desde la interfaz de usuario web y almacena la configuración necesaria para acceder al almacenamiento.




Concepto de tarea


Nos gustaría poder rastrear el estado de nuestras tareas, manejar posibles errores dependiendo del progreso de la tarea y también cancelar tareas. Por lo tanto, continuaremos operando solo con tareas. Cada tarea estará representada en la base de datos por un registro en la tabla y programáticamente por la instancia Future (ver Java Future ). Cada registro en la tabla está asociado con su propio futuro (además, si se están ejecutando varios servidores, las instancias futuras pueden estar en la memoria de diferentes servidores).


Vamos secuencialmente. En primer lugar, necesitamos un servicio para iniciar tareas: crear, restaurar y eliminar copias de seguridad.


Lanzamiento de tarea


Crear una copia de seguridad:


 public Task startBackupTask(@NotNull Task.RunType runType, @NotNull List<String> storageSettingsNameList, @Nullable List<ProcessorType> processors, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(storageSettingsNameList); Objects.requireNonNull(processors); Objects.requireNonNull(databaseSettings); BackupProperties backupProperties = backupPropertiesManager.initNewBackupProperties(storageSettingsNameList, processors, databaseSettings.getName()); Task task = tasksManager.initNewTask(Task.Type.CREATE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.CREATING); logger.info("Creating backup..."); try (InputStream backupStream = databaseBackupManager.createBackup(databaseSettings, taskId)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_PROCESSORS); logger.info("Applying processors on created backup. Processors: {}", processors); try (InputStream processedBackupStream = backupProcessorManager.process(backupStream, processors)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.UPLOADING); logger.info("Uploading backup..."); backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Creating backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of created backup", ex); } catch (RuntimeException ex) { logger.error("Error occurred while creating backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Backup creating task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

La creación de una copia de seguridad pasa por 3 pasos principales en el siguiente orden: creación de una copia de seguridad -> aplicación de procesadores -> carga en almacenamiento. En casi todos los métodos de servicio, reenviamos el ID de la tarea actual para que el servicio pueda informar un error de un hilo que funciona en segundo plano. Sobre el manejo de errores, por qué se interrumpe la excepción aquí y qué sucede con un error después de recibir una excepción RuntimeException se discutirá más adelante.


Y así es como ejecutaremos la tarea de crear una copia de seguridad:


 tasksStarterService.startBackupTask(Task.RunType.USER, storageSettingsNameList, processors, databaseSettings); 

El primer parámetro que pasamos al iniciador de la tarea: el usuario o la tarea del servidor interno (un ejemplo de una tarea interna es una copia de seguridad periódica). El conocimiento del iniciador de tareas nos permite mostrar en la interfaz de usuario web solo aquellas tareas iniciadas por el usuario. Los parámetros restantes son necesarios para crear una copia de seguridad directamente: una lista de almacenamientos, procesadores para usar, una base de datos cuyo volcado necesita crear.


Al crear una copia de seguridad, también se crea un registro en la base de datos llamada BackupProperties . Esta entidad almacenará las propiedades de la copia de seguridad, como el nombre, los procesadores utilizados y la lista de repositorios en los que se descargó la copia de seguridad. Además, para restaurar o eliminar la copia de seguridad, operaremos con esta entidad en particular.


La tarea en la base de datos se almacena de la siguiente forma:


 @Entity @Table(name = "backup_tasks") public class Task { /** * Identifier of each backup task. Identifier is generated by PostgreSQL database after saving of entity. */ @Id @Column(insertable = false, updatable = false) @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; /** * Backup task type. * <p> * Type is set at the very start of any task and can't be changed. * * @see Type */ @Enumerated(EnumType.STRING) @Column(updatable = false) private Type type; /** * Who initiated a task: user or server. * <p> * We need to know it to show on front only these tasks that was started by user. * * @see RunType */ @Enumerated(EnumType.STRING) @Column(updatable = false) private RunType runType; /** * Backup task state. * <p> * State is updated with every new step in task being executed. * * @see Task.State */ @Enumerated(EnumType.STRING) private State state; /** * Whether task has been interrupted or not. * <p> * Default is {@literal false}. */ @Column(insertable = false) private boolean interrupted; /** * Identifier of {@link BackupProperties}. * <p> * We need to know backup ID to be able to handle occurred errors. */ @Column(updatable = false) private Integer backupPropertiesId; /** * Start time of the task. */ @Column(updatable = false) private LocalDateTime date; public enum RunType { USER, INTERNAL } public enum State { PLANNED, CREATING, RESTORING, DELETING, APPLYING_PROCESSORS, APPLYING_DEPROCESSORS, DOWNLOADING, UPLOADING, COMPLETED, } public enum Type { CREATE_BACKUP { @Override public String toString() { return "CREATE BACKUP"; } }, RESTORE_BACKUP { @Override public String toString() { return "RESTORE BACKUP"; } }, DELETE_BACKUP { @Override public String toString() { return "DELETE BACKUP"; } } } // getters & setters... } 

Por lo tanto, puede describir el proceso de creación de una copia de seguridad en forma de diagrama de la siguiente manera:




Otros tipos de tareas se inician por analogía. Para no saturar el artículo con una gran cantidad de código, para los curiosos les daré el código para iniciar tareas para restaurar y eliminar copias de seguridad por separado en el spoiler.


Recuperación de copia de seguridad
 public Task startRestoreTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties, @NotNull String storageSettingsName, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Objects.requireNonNull(storageSettingsName); Objects.requireNonNull(databaseSettings); Task task = tasksManager.initNewTask(Task.Type.RESTORE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.DOWNLOADING); logger.info("Downloading backup..."); try (InputStream downloadedBackup = backupLoadManager.downloadBackup(backupProperties.getBackupName(), storageSettingsName, taskId)) { if (Thread.interrupted() || downloadedBackup == null) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_DEPROCESSORS); logger.info("Deprocessing backup..."); try (InputStream deprocessedBackup = backupProcessorManager.deprocess(downloadedBackup, backupProperties.getProcessors())) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.RESTORING); logger.info("Restoring backup..."); databaseBackupManager.restoreBackup(deprocessedBackup, databaseSettings, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Restoring backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of downloaded backup", ex); } catch (RuntimeException ex) { logger.info("Error occurred while restoring backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

La restauración de una copia de seguridad pasa por 3 pasos principales en el siguiente orden: descargar una copia de seguridad del almacenamiento -> usar deprocesadores para obtener la copia de seguridad de texto sin formato original -> restaurar una copia de seguridad.


Comience la recuperación de la siguiente manera:


 tasksStarterService.startRestoreTask(Task.RunType.USER, backupProperties, storageSettingsName, databaseSettings); 

El proceso de recuperación de la copia de seguridad en forma de diagrama:


Eliminar copia de seguridad
 public Task startDeleteTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Task task = tasksManager.initNewTask(Task.Type.DELETE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { try { logger.info("Deleting backup started. Backup properties: {}", backupProperties); tasksManager.updateTaskState(taskId, Task.State.DELETING); backupLoadManager.deleteBackup(backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Deleting backup completed. Backup properties: {}", backupProperties); } catch (RuntimeException ex) { logger.error("Error occurred while deleting backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

El proceso de eliminación de una copia de seguridad es bastante simple: una copia de seguridad simplemente se elimina de todos los almacenamientos en los que se descargó.


Ejecute la desinstalación de la siguiente manera:


 tasksStarterService.startDeleteTask(Task.RunType.USER, backupProperties); 

El proceso de eliminar una copia de seguridad en forma de diagrama:




Cancelar tarea


¿Qué es la cancelación de tareas? Por supuesto, esto no es más que una terminación de hilo. Podría ver que todo el código principal que se ejecuta en Future está envuelto en la siguiente construcción try-catch:


 try { ... } catch (InterruptedException ex) { ... tasksManager.setInterrupted(taskId); } 

Y también después de cada método importante, cuyo flujo de ejecución controlamos, se instala la siguiente construcción:


 if (Thread.interrupted()) { throw new InterruptedException(); } 

Antes de continuar, se debe dar una breve teoría de interrupciones y estados de subprocesos JVM.


Los subprocesos en la JVM pueden tener los siguientes estados:


  1. Nuevo
  2. Ejecutable
  3. Tiempo de espera
  4. Esperando
  5. Bloqueado
  6. Terminado

Solo nos interesan los estados de espera y de espera cronometrada. El Object.wait() en estado de espera mediante los métodos Object.wait() , Thread.join() y otros. El hilo se transfiere al estado de espera temporizada (es decir, una espera que dura un cierto período de tiempo) utilizando los métodos Object.wait(timeout) , Thread.join(timeout) , Thread.sleep(sleeping) y otros.


Lo más importante aquí es que si interrumpe el hilo antes de entrar en el estado de espera o de espera temporizada o cuando el hilo está en este estado , el hilo se despierta y produce una excepción interrumpida .


Pero eso no es todo. No es en absoluto un hecho que un hilo entrará en datos de estado al crear, restaurar o eliminar una copia de seguridad. ¿Cómo informar entonces al hilo que fue interrumpido?


La primera forma es verificar independientemente el indicador de interrupción con el hilo utilizando los métodos Thread.interrupted Thread.interrupted() o Thread.currentThread.isInterrupted() . La diferencia entre los dos es que el primero llama al método nativo privado currentThread.isInterrupted(boolean ClearInterrupted) , que le pasa true , lo que indica que el indicador de interrupción se borrará, y el segundo pasa false , dejando el indicador de interrupción intacto. La elección entre estos dos métodos depende completamente de la situación. Cuando se lanza una excepción interrumpida, también se borra la bandera de interrupción; vale la pena recordarlo.


Pero debe haber una manera más fácil, y lo es. En la aplicación, hay una gran cantidad de trabajo con flujos de E / S y, por lo tanto, con métodos de E / S. Nuestra tarea es asegurar que al llamar a los métodos read() o write(int b) en el flujo de E / S, se produzca un error durante la interrupción, informando que la llamada de bloqueo de E / S se interrumpió. Afortunadamente, Java tiene una excepción: InterruptedIOException . Sin embargo, no todos los métodos de flujo de lectura / escritura monitorean las interrupciones de subprocesos, y específicamente solo PipedInputStream lo monitorea. Por lo tanto, en aquellos lugares donde esta secuencia no está involucrada, debemos extender el método de lectura / escritura para que cuando haya una interrupción, se produzca una InterruptedIOException. De hecho, la extensión del método read () fue suficiente para mí en la aplicación solo en un lugar, cuando InputStream regresó del método de carga de respaldo. Así es como podemos aprender sobre el origen de una interrupción sin tener que colocar controles de plantilla en la bandera en todas partes. Sin embargo, es importante detectar esta excepción por separado de la IOException y manejarla por separado. Por supuesto, no puede prescindir de la ayuda de una verificación de plantilla de la bandera en algunos lugares, pero ya ha mejorado.


También es importante tener en cuenta que si el indicador se borró durante el proceso de interrupción, siempre es necesario volver a establecer el indicador de interrupción para que después de regresar del método podamos averiguar la interrupción que se produjo.


Permítanme explicar con un ejemplo por qué esto es importante. Supongamos que cargamos una copia de seguridad en el almacenamiento en el método upload () y se produce una interrupción. La interrupción se procesa, el trabajo se detiene y el método regresa. La interrupción no ocurre con casualidad, significa que se produjo un error en alguna parte o que el usuario canceló la tarea. Independientemente de la razón, debemos detener todo el trabajo en este Futuro. Pero si no configura el indicador de interrupción nuevamente antes de regresar del método de arranque, nunca sabremos en el bloque Future principal sobre la interrupción que sucedió.
El mismo ejemplo de código:


 backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); <-   ,       if (Thread.interrupted()) { //      ,      - ,    throw new InterruptedException(); } 

Por lo tanto, es una buena práctica manejar una InterruptedException o InterruptedIOException de la siguiente manera:


 try { ... } catch (InterruptedException e) { //  InterruptedIOException ... // re-interrupt the thread Thread.currentThread().interrupt(); } 

Bueno, podemos manejar la interrupción, pero ¿quién realmente interrumpirá los hilos?
Para hacer esto, crearemos otra entidad llamada CancelTask , que almacenará la ID de la tarea para su cancelación, y también escribiremos un reloj que intentará interrumpir las tareas. ¿Por qué intentarlo? Porque:


  1. No se puede terminar el hilo en la memoria de otro servidor. Varios servidores pueden funcionar para nosotros, lo que significa que Future está disperso en diferentes servidores. Por lo tanto, cuando llega una solicitud para cancelar una tarea en uno de los servidores, el futuro deseado puede estar en la memoria de otro servidor.
  2. La tarea no se puede cancelar porque se perdió el futuro debido a un bloqueo del servidor.

Describa brevemente el algoritmo de cancelación en la noche:
Watercher saca todos los registros de la tabla cancel_tasks (el bloqueo no se establece al mismo tiempo), revisa cada uno e intenta obtener el futuro correspondiente de su memoria. Si Future se recibe con éxito, el hilo correspondiente se interrumpe, la tarea se revierte y la solicitud se elimina de la tabla. Si se supera la solicitud de tiempo de espera para cancelar la tarea (lo que significa que el servidor se bloqueó y se perdió el futuro), la solicitud simplemente se elimina de la tabla. Si varios servidores notan un tiempo de espera y eliminan el registro de la tabla, no ocurrirá nada malo, porque la eliminación en PostgreSQL es idempotente.


Código de CancelTasksWatcher:


Texto oculto
 /** * This class scans for tasks to cancel and tries to cancel them. */ @Component class CancelTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(CancelTasksWatcher.class); private static final Duration cancelTimeout = Duration.ofMinutes(10); private CancelTasksManager cancelTasksManager; private TasksStarterService tasksStarterService; private TasksManager tasksManager; // spring setters... /** * This watcher wakes up every time 10 seconds passed from the last completion, checks if there are any tasks to cancel and tries to * cancel each task. * <p> * Since there are can be working more that one instance of the program, {@literal Future} instance of task can belong to different * servers. We can't get access to {@literal Future} if it's not in memory of the server where task cancellation request was accepted. * So the purpose of this watcher is to be able cancel tasks that works in the other instance of program. Each server has this watcher * checking for available cancellation requests and if any, the watcher tries to cancel corresponding {@literal Future}. * If cancellation is successful task will be also reverted. * <p> * If task cancellation request timeout exceeded, then it means a server that had requested {@literal Future} instances has been * shutdown, so all {@literal Future} instances lost and task can't be canceled. In such case task cancellation request will be ignored. * * @see TasksStarterService#getFuture(Integer) * @see TasksManager#revertTask(Task) */ @Scheduled(fixedDelay = 10 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchTasksToCancel() { Iterable<CancelTask> cancelTasks = cancelTasksManager.findAll(); Iterable<Task> tasks = tasksManager.findAllById(StreamSupport.stream(cancelTasks.spliterator(), false) .map(CancelTask::getTaskId).collect(Collectors.toList())); Map<Integer, Task> tasksAsMap = StreamSupport.stream(tasks.spliterator(), false) .collect(Collectors.toMap(Task::getId, Function.identity())); List<Integer> taskIdsForDeleting = new ArrayList<>(); for (CancelTask cancelTask : cancelTasks) { Integer taskId = cancelTask.getTaskId(); Task task = tasksAsMap.get(taskId); if (task == null) { logger.error("Can't cancel task: no such entity with ID {}", taskId); taskIdsForDeleting.add(taskId); continue; } // timeout exceeded, that is server shutdown and lost all Future instances, so task can't be canceled if (LocalDateTime.now(ZoneOffset.UTC).isAfter(cancelTask.getPutTime().plus(cancelTimeout))) { logger.error("Can't cancel task: timeout exceed. Task ID: {}", taskId); taskIdsForDeleting.add(taskId); continue; } tasksStarterService.getFuture(taskId).ifPresent(future -> { logger.info("Canceling task with ID {}", taskId); boolean canceled = future.cancel(true); if (canceled) { try { // give time to properly handle interrupt Thread.sleep(10000); } catch (InterruptedException e) { // should not happen } tasksManager.revertTask(task); } taskIdsForDeleting.add(taskId); logger.info("Task canceled: {}. Task ID: {}", canceled, taskId); }); } cancelTasksManager.deleteByTaskIdIn(taskIdsForDeleting); } } 



Manejo de errores


, , Future, try-catch :


 try { ... } catch (RuntimeException e) { ... errorTasksManager.addErrorTask(taskId); } 

RuntimeException , Future , .


addErrorTask(taskId) , ID , .
? , , , .


:
, , . — PostgreSQL select for update , select skip locked . , , revertTask() , .


ErrorTasksWatcher :


 /** * This class scans for erroneous tasks and handles them depending on their state. */ @Component class ErrorTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(ErrorTasksWatcher.class); private static final Integer nRows = 10; private TasksManager tasksManager; private ErrorTasksManager errorTasksManager; // spring setters... /** * This watcher wakes up every time 1 minute passed from the last completion, checks backup states periodically and handles erroneous * tasks if any. * <p> * The watcher handles at most N tasks as described by {@link #nRows} constant and skips already locked tasks. * When retrieving error tasks from database pessimistic lock is set. It allows safely run more than one copy of program, as no other * watcher can pick up already being handled error tasks. * <p> * If the server shutdowns while rows was locked, transaction will be rolled back and lock released, so these entities can be picked * up by the other running server. */ @Scheduled(fixedDelay = 60 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchErrorTasks() { for (ErrorTask errorTask : errorTasksManager.findFirstNAndLock(nRows)) { if (!errorTask.isErrorHandled()) { Integer backupTaskId = errorTask.getTaskId(); Optional<Task> optionalTask = tasksManager.findById(backupTaskId); if (!optionalTask.isPresent()) { logger.info("Can't handle erroneous task: no corresponding backup task entity. Backup task ID: {}", backupTaskId); continue; } tasksManager.revertTask(optionalTask.get()); errorTask.setErrorHandled(true); } } } } 

revertTask(Task) :


  /** * This function reverts erroneous task by its entity. * <p> * Use this function only after canceling related {@literal Future}. * <p> * If the task was of the type {@link Task.Type#CREATE_BACKUP} then related {@link BackupProperties} will be deleted. * * @param task the entity */ public void revertTask(@NotNull Task task) { Objects.requireNonNull(task); Task.State state = task.getState(); switch (state) { case DOWNLOADING: case APPLYING_DEPROCESSORS: case RESTORING: case DELETING: { logger.info("Handling broken operation. Operation: {}. No extra actions required", state.toString()); break; } case CREATING: case APPLYING_PROCESSORS: { logger.info("Handling broken operation. Operation: {}. Delete backup properties...", state.toString()); Integer backupPropertiesID = task.getBackupPropertiesId(); if (!backupPropertiesManager.existsById(backupPropertiesID)) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } backupPropertiesManager.deleteById(backupPropertiesID); break; } case UPLOADING: { logger.info("Handling broken operation. Operation: {}. Deleting backup from storage...", state); Integer backupPropertiesId = task.getBackupPropertiesId(); Optional<BackupProperties> optionalBackupProperties = backupPropertiesManager.findById(backupPropertiesId); if (!optionalBackupProperties.isPresent()) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } tasksStarterService.startDeleteTask(Task.RunType.INTERNAL, optionalBackupProperties.get()); backupPropertiesManager.deleteById(backupPropertiesId); break; } default: { logger.error("Can't revert task: unknown state. Task info: {}", task); } } } 

:


  1. DOWNLOADING , APPLYING_DEPROCESSORS , RESTORING , DELETING — . , .
  2. CREATING , APPLYING_PROCESSORS — , . BackupProperties , ( BackupProperties Web UI ).
  3. UPLOADING — . BackupProperties , . .

, . , ? , , Future ( 1), , InputStream ( 2). , 2, 1 2 ?


, , , . Future ( 1) :


  public void onError(@NotNull Throwable t, @NotNull Integer taskId) { logger.error("Exception caught. Task ID: {}", taskId, t); Optional<Future> optionalFuture = tasksStarterService.getFuture(taskId); if (!optionalFuture.isPresent()) { logger.error("Can't cancel the Future of task with ID {}: no such Future instance", taskId); } else { boolean canceled = optionalFuture.get().cancel(true); if (!canceled) { logger.error("Error canceling the Future of task with ID {}", taskId); } else { logger.info("Task canceled. Task ID: {}", taskId); errorTasksManager.setError(taskId); } } } 

, , ID , , Future - , ID .


, , , , , .


, :


, , , . — Future.


, , , I/O , — / . , . :


  1. , . , — .
  2. — Future , . , / , , ( , — IOException , , ).

, — ( ID , , ), .




, , . , , .



  1. Web UI: , . ,

Conclusión


:



, ! , GitHub!

Source: https://habr.com/ru/post/459478/


All Articles