我在创建用于备份的多线程应用程序方面的经验

目前,使用多线程应用程序的用户不会感到惊讶,但是我认为在本文中您可以找到一些有趣的想法。 我对Java的研究是从这个项目开始的,所以也许在某些地方我会犯错或制造一辆大型自行车,但是我希望有人会对Java初学者的经验感兴趣。 我将给出该应用程序的几个功能:


  • 无论备份的大小如何,它都可专门用于内存中的备份
  • 不会将整个备份加载到内存中
  • 备份/还原操作可以取消

切入点将考虑应用程序的体系结构,以及遇到的主要问题及其解决方案。


应用概述


与应用程序的通信是通过Web UI进行的,但是在将来,如有必要,可以添加REST API。


该应用程序可以:


  1. 创建备份并将其上传到一个或多个存储
  2. 通过从存储加载备份来还原备份
  3. 从所有存储中删除备份
  4. 定期创建备份

当前支持的存储库:


  • 本地文件系统(Docker不支持)
  • 投寄箱

当前支持的数据库:


  • PostgreSQL的

从一个特殊的应用程序,我可以注意到:


  1. 纠正集群配置中的工作
  2. 无论备份的大小如何,备份都不会完全加载到内存中。 还不涉及用于临时备份存储的文件系统。 备份的创建和还原以及备份的加载/卸载都专门在内存中进行。
  3. 跨平台-在Windows和Linux上均可使用。
  4. 我们可以监视所有正在运行的任务,并在必要时取消它们。

下面是Web UI的屏幕截图,清楚地描述了应用程序的功能。


仓储管理



数据库管理



备份创建


备份恢复


管理创建的备份


定期备份


跟踪运行任务




建筑学


主要工作将在3个服务中进行: DatabaseBackupProcessorStorage ,我们将使用任务概念将它们连接在一起。 关于所有这一切。


数据库备份


该服务负责创建和还原纯文本备份。


服务接口:


public interface DatabaseBackup { InputStream createBackup(DatabaseSettings databaseSettings, Integer id); void restoreBackup(InputStream in, DatabaseSettings databaseSettings, Integer id); } 

两种接口方法都在InputStream实例上运行,因为我们需要不将整个备份加载到内存中,这意味着必须以流模式读取/写入备份。 DatabaseSettings实体是从Web UI预先创建的,并存储访问数据库所需的各种设置。 参数id是什么,将进一步解释。


服务要求如下:


  1. 两种方法都不应将整个备份读入内存。
  2. restoreBackup()方法应在单个事务中还原备份,以便在发生错误的情况下,不要使数据库处于不一致的状态。

PostgreSQL的实现(文本描述)

具体来说,在PostgreSQL的实现中,该服务的实现如下:


  1. createBackup() :创建一个pg_dump进程,该进程将创建一个备份并将其写入标准输出流。 从该方法返回标准流程输出流(请参阅https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getInputStream-- )。 系统中的I / O流基于特定大小的缓冲区,并且当进程写入输出流时,它实际上会写入内存中的缓冲区。 这里最重要的是,进程线程将不会写入已填充的缓冲区,直到另一端已读取该缓冲区为止,这意味着该线程将处于锁定状态,并且备份将不会完全加载到内存中。 您可能会遇到这样的情况,即您的Java程序在处理进程时遇到死锁,原因是您没有读取进程的stdout或stderr。 监视这一点非常重要,因为如果在写入一个完整的缓冲区时在I / O阻塞调用中阻塞了该进程,并且没有人读取该缓冲区,则该进程将无法继续。
  2. restoreBackup() :创建一个psql进程,从传递到该方法的InputStream中读取备份,并将其同时写入psql标准输入流(请参阅https://docs.oracle.com/javase/8/docs/api/java/lang/Process。 html#getOutputStream-- )。 之所以可行,是因为纯文本PostgreSQL备份只是DDL和DML命令的集合,这些命令很容易理解psql。

有很多代码,因此在这里我不会给出,但是您可以使用本文结尾的链接在GitHub上看到它。


处理器


该服务负责使用处理器和反向备份重新处理。 在下载到存储之前或从存储中卸载之后,将使用处理器。 处理器示例:压缩器,加密。


服务接口:


 public interface Processor { InputStream process(InputStream in); InputStream deprocess(InputStream in); ProcessorType getType(); // ProcessorType -  Enum,     int getPrecedence(); //   } 

每个处理器都有优先级-如果指定了多个处理器,则将按照优先级从高到低的顺序应用它们。 以应用处理器的相同顺序应用逆函数,我们得到了原始备份。


贮藏


该服务负责加载和卸载备份,以及从存储中删除备份。 存储示例:Dropbox,本地文件系统。


服务接口:


 public interface Storage { void uploadBackup(InputStream in, StorageSettings storageSettings, String backupName, Integer id); InputStream downloadBackup(StorageSettings storageSettings, String backupName, Integer id); void deleteBackup(StorageSettings storageSettings, String backupName, Integer id); } 

每个创建的备份都有一个唯一的名称-因此我们可以在将其下载到的任何存储上找到它。 将备份呈现到存储的方式完全取决于服务的实现,但是将备份名称转移到其中一个功能时,我们应该期望正确的行为。 StorageSettings实体是从Web UI预先创建的,并存储用于访问存储器的必要设置。




任务概念


我们希望能够跟踪任务的状态,根据任务的进度处理可能的错误,以及取消任务。 因此,我们将继续仅处理任务。 每个任务将在数据库中由表中的记录表示,并以编程方式由Future实例表示(请参见Java Future )。 该表中的每个记录都与其自己的Future关联(此外,如果正在运行多个服务器,则Future实例可以位于不同服务器的内存中)。


让我们按顺序去。 首先,我们需要一项用于启动任务的服务-创建,还原和删除备份。


任务启动


创建备份:


 public Task startBackupTask(@NotNull Task.RunType runType, @NotNull List<String> storageSettingsNameList, @Nullable List<ProcessorType> processors, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(storageSettingsNameList); Objects.requireNonNull(processors); Objects.requireNonNull(databaseSettings); BackupProperties backupProperties = backupPropertiesManager.initNewBackupProperties(storageSettingsNameList, processors, databaseSettings.getName()); Task task = tasksManager.initNewTask(Task.Type.CREATE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.CREATING); logger.info("Creating backup..."); try (InputStream backupStream = databaseBackupManager.createBackup(databaseSettings, taskId)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_PROCESSORS); logger.info("Applying processors on created backup. Processors: {}", processors); try (InputStream processedBackupStream = backupProcessorManager.process(backupStream, processors)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.UPLOADING); logger.info("Uploading backup..."); backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Creating backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of created backup", ex); } catch (RuntimeException ex) { logger.error("Error occurred while creating backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Backup creating task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

创建备份按以下顺序经历了3个主要步骤:创建备份->处理器应用程序->上载到存储。 在几乎所有服务方法中,我们都转发当前任务的ID,以便该服务可以报告来自后台工作线程的错误。 关于错误处理,为什么在这里出现InterruptedException ,以及在收到RuntimeException之后出现错误会发生什么,将在后面讨论。


这是我们将如何运行创建备份的任务:


 tasksStarterService.startBackupTask(Task.RunType.USER, storageSettingsNameList, processors, databaseSettings); 

我们传递给任务发起者的第一个参数:用户或内部服务器任务(内部任务的一个示例是定期备份)。 任务发起者的知识使我们能够在Web UI中仅显示由用户启动的那些任务。 其余参数对于直接创建备份是必需的-存储列表,要使用的处理器以及需要创建其转储的数据库。


创建备份时,还将在名为BackupProperties的数据库中创建一条记录。 该实体将存储备份属性,例如名称,使用的处理器以及将备份下载到的存储库列表。 此外,要还原或删除备份,我们将使用此特定实体。


数据库中的任务以以下形式存储:


 @Entity @Table(name = "backup_tasks") public class Task { /** * Identifier of each backup task. Identifier is generated by PostgreSQL database after saving of entity. */ @Id @Column(insertable = false, updatable = false) @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; /** * Backup task type. * <p> * Type is set at the very start of any task and can't be changed. * * @see Type */ @Enumerated(EnumType.STRING) @Column(updatable = false) private Type type; /** * Who initiated a task: user or server. * <p> * We need to know it to show on front only these tasks that was started by user. * * @see RunType */ @Enumerated(EnumType.STRING) @Column(updatable = false) private RunType runType; /** * Backup task state. * <p> * State is updated with every new step in task being executed. * * @see Task.State */ @Enumerated(EnumType.STRING) private State state; /** * Whether task has been interrupted or not. * <p> * Default is {@literal false}. */ @Column(insertable = false) private boolean interrupted; /** * Identifier of {@link BackupProperties}. * <p> * We need to know backup ID to be able to handle occurred errors. */ @Column(updatable = false) private Integer backupPropertiesId; /** * Start time of the task. */ @Column(updatable = false) private LocalDateTime date; public enum RunType { USER, INTERNAL } public enum State { PLANNED, CREATING, RESTORING, DELETING, APPLYING_PROCESSORS, APPLYING_DEPROCESSORS, DOWNLOADING, UPLOADING, COMPLETED, } public enum Type { CREATE_BACKUP { @Override public String toString() { return "CREATE BACKUP"; } }, RESTORE_BACKUP { @Override public String toString() { return "RESTORE BACKUP"; } }, DELETE_BACKUP { @Override public String toString() { return "DELETE BACKUP"; } } } // getters & setters... } 

因此,您可以以图表的形式描述创建备份的过程,如下所示:
备份程序




类推启动其他类型的任务。 为了避免用大量代码弄乱文章,出于好奇,我将提供代码来启动任务,以在剧透器中分别还原和删除备份。


备份恢复
 public Task startRestoreTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties, @NotNull String storageSettingsName, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Objects.requireNonNull(storageSettingsName); Objects.requireNonNull(databaseSettings); Task task = tasksManager.initNewTask(Task.Type.RESTORE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.DOWNLOADING); logger.info("Downloading backup..."); try (InputStream downloadedBackup = backupLoadManager.downloadBackup(backupProperties.getBackupName(), storageSettingsName, taskId)) { if (Thread.interrupted() || downloadedBackup == null) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_DEPROCESSORS); logger.info("Deprocessing backup..."); try (InputStream deprocessedBackup = backupProcessorManager.deprocess(downloadedBackup, backupProperties.getProcessors())) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.RESTORING); logger.info("Restoring backup..."); databaseBackupManager.restoreBackup(deprocessedBackup, databaseSettings, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Restoring backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of downloaded backup", ex); } catch (RuntimeException ex) { logger.info("Error occurred while restoring backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

还原备份按以下顺序经历了3个主要步骤:从存储中卸载备份->使用解处理器获得原始的纯文本备份->还原备份。


开始恢复,如下所示:


 tasksStarterService.startRestoreTask(Task.RunType.USER, backupProperties, storageSettingsName, databaseSettings); 

以图表形式还原备份的过程:


删除备份
 public Task startDeleteTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Task task = tasksManager.initNewTask(Task.Type.DELETE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { try { logger.info("Deleting backup started. Backup properties: {}", backupProperties); tasksManager.updateTaskState(taskId, Task.State.DELETING); backupLoadManager.deleteBackup(backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Deleting backup completed. Backup properties: {}", backupProperties); } catch (RuntimeException ex) { logger.error("Error occurred while deleting backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

删除备份的过程非常简单:只需将备份从其下载到的所有存储中删除。


运行卸载,如下所示:


 tasksStarterService.startDeleteTask(Task.RunType.USER, backupProperties); 

以图表形式删除备份的过程:




取消任务


什么是任务取消? 当然,这无非是线程终止。 您会看到,在Future中运行的所有主要代码都包装在以下try-catch构造中:


 try { ... } catch (InterruptedException ex) { ... tasksManager.setInterrupted(taskId); } 

并且在每种重要方法(我们控制其执行流程)之后,都安装了以下结构:


 if (Thread.interrupted()) { throw new InterruptedException(); } 

在继续之前,应该给出JVM线程的中断和状态的简要理论。


JVM中的线程可以具有以下状态:


  1. 新品
  2. 可运行
  3. 定时等待
  4. 等待中
  5. 受阻
  6. 已终止

我们仅对等待和定时等待状态感兴趣。 通过Object.wait()Thread.join()等方法将Object.wait() Waiting状态。 使用Object.wait(timeout)Thread.join(timeout)Thread.sleep(sleeping)和其他方法将线程置于Timed等待状态(即,持续一定时间的等待Thread.sleep(sleeping)


这里最重要的是,如果在进入 Waiting或Timed等待状态 之前中断线程,或者当线程处于此状态时 ,线程将唤醒,并抛出InterruptedException


但这还不是全部。 通过创建,还原或删除备份,线程根本不会进入状态数据。 然后如何通知线程它被中断了?


第一种方法是使用Thread.interrupted Thread.interrupted()Thread.currentThread.isInterrupted()方法独立检查线程的中断标志。 两者之间的区别在于,第一个调用私有本机方法currentThread.isInterrupted(boolean ClearInterrupted)currentThread.isInterrupted(boolean ClearInterrupted)传递true ,表示将清除中断标志,而第二个传递false ,使中断标志保持不变。 这两种方法之间的选择完全取决于情况。 当抛出InterruptedException时,中断标志也会被清除-这值得记住。


但是必须有一种更简单的方法-确实如此。 在应用程序中,使用I / O流以及使用I / O方法的工作量很大。 我们的任务是确保在I / O流上调用read()write(int b)方法时,在中断过程中引发错误,通知阻塞的I / O调用已中断。 幸运的是,Java有一个异常-InterruptedIOException 。 但是,并非所有读/写流方法都监视线程中断,特别是仅PipedInputStream监视它。 因此,在不涉及该流的那些地方,我们必须扩展读/写方法,以便在发生中断时引发InterruptedIOException。 实际上,在我的应用程序中,仅在一个地方对我来说,read()方法的扩展就足够了-当InputStream从备份上传方法返回时。 这样我们就可以了解中断的起源,而不必在任何地方对标志进行模板检查。 但是,将此异常与IOException分开捕获并分别处理是很重要的。 当然,在某些地方,如果不能借助标志的模板检查就无法做,但是它已经变得更好。


同样重要的是要注意,如果在中断处理期间清除了该标志,则始终有必要再次设置该中断标志,以便从该方法返回后,我们可以找出发生的中断。


让我举例说明为什么这很重要。 假设我们使用upload()方法将备份上传到存储中,并且发生了中断。 处理中断,工作停止,方法返回。 中断不会随随便便发生-这意味着错误发生在某个地方,或者用户取消了任务。 无论出于何种原因,我们都必须在此未来中停止所有工作。 但是,如果您在从引导方法返回之前没有再次设置中断标志,则在Future主块中我们将永远不会知道发生了什么中断。
相同的代码示例:


 backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); <-   ,       if (Thread.interrupted()) { //      ,      - ,    throw new InterruptedException(); } 

因此,优良作法是按以下方式处理InterruptedExceptionInterruptedIOException


 try { ... } catch (InterruptedException e) { //  InterruptedIOException ... // re-interrupt the thread Thread.currentThread().interrupt(); } 

好吧,我们可以处理中断,但是谁会真正中断线程?
为此,我们将创建另一个名为CancelTask​​的实体,该实体将存储要取消的任务的ID,并编写一个手表来尝试中断任务。 为什么要尝试? 因为:


  1. 无法终止另一台服务器的内存中的线程。 几个服务器可以为我们工作,这意味着Future分散在不同的服务器上。 因此,当取消任务的请求到达其中一个服务器时,所需的Future可能在另一台服务器的内存中。
  2. 由于由于服务器崩溃而丢失了Future,因此无法取消任务。

简要介绍一下晚上的取消算法:
Watercher从cancel_tasks表中取出所有记录(同时未设置锁),逐一检查并尝试从其内存中获取相应的Future。 如果成功接收到Future,则将中断相应的线程,还原任务并将请求从表中删除。 如果超过了取消任务的超时请求(这意味着服务器崩溃了,并且Future丢失了),则只需从表中删除该请求即可。 如果多个服务器注意到超时并从表中删除记录,则不会发生任何不良情况,因为PostgreSQL中的删除是幂等的。


CancelTask​​sWatcher代码:


隐藏文字
 /** * This class scans for tasks to cancel and tries to cancel them. */ @Component class CancelTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(CancelTasksWatcher.class); private static final Duration cancelTimeout = Duration.ofMinutes(10); private CancelTasksManager cancelTasksManager; private TasksStarterService tasksStarterService; private TasksManager tasksManager; // spring setters... /** * This watcher wakes up every time 10 seconds passed from the last completion, checks if there are any tasks to cancel and tries to * cancel each task. * <p> * Since there are can be working more that one instance of the program, {@literal Future} instance of task can belong to different * servers. We can't get access to {@literal Future} if it's not in memory of the server where task cancellation request was accepted. * So the purpose of this watcher is to be able cancel tasks that works in the other instance of program. Each server has this watcher * checking for available cancellation requests and if any, the watcher tries to cancel corresponding {@literal Future}. * If cancellation is successful task will be also reverted. * <p> * If task cancellation request timeout exceeded, then it means a server that had requested {@literal Future} instances has been * shutdown, so all {@literal Future} instances lost and task can't be canceled. In such case task cancellation request will be ignored. * * @see TasksStarterService#getFuture(Integer) * @see TasksManager#revertTask(Task) */ @Scheduled(fixedDelay = 10 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchTasksToCancel() { Iterable<CancelTask> cancelTasks = cancelTasksManager.findAll(); Iterable<Task> tasks = tasksManager.findAllById(StreamSupport.stream(cancelTasks.spliterator(), false) .map(CancelTask::getTaskId).collect(Collectors.toList())); Map<Integer, Task> tasksAsMap = StreamSupport.stream(tasks.spliterator(), false) .collect(Collectors.toMap(Task::getId, Function.identity())); List<Integer> taskIdsForDeleting = new ArrayList<>(); for (CancelTask cancelTask : cancelTasks) { Integer taskId = cancelTask.getTaskId(); Task task = tasksAsMap.get(taskId); if (task == null) { logger.error("Can't cancel task: no such entity with ID {}", taskId); taskIdsForDeleting.add(taskId); continue; } // timeout exceeded, that is server shutdown and lost all Future instances, so task can't be canceled if (LocalDateTime.now(ZoneOffset.UTC).isAfter(cancelTask.getPutTime().plus(cancelTimeout))) { logger.error("Can't cancel task: timeout exceed. Task ID: {}", taskId); taskIdsForDeleting.add(taskId); continue; } tasksStarterService.getFuture(taskId).ifPresent(future -> { logger.info("Canceling task with ID {}", taskId); boolean canceled = future.cancel(true); if (canceled) { try { // give time to properly handle interrupt Thread.sleep(10000); } catch (InterruptedException e) { // should not happen } tasksManager.revertTask(task); } taskIdsForDeleting.add(taskId); logger.info("Task canceled: {}. Task ID: {}", canceled, taskId); }); } cancelTasksManager.deleteByTaskIdIn(taskIdsForDeleting); } } 



错误处理


, , Future, try-catch :


 try { ... } catch (RuntimeException e) { ... errorTasksManager.addErrorTask(taskId); } 

RuntimeException , Future , .


addErrorTask(taskId) , ID , .
? , , , .


:
, , . — PostgreSQL select for update , select skip locked . , , revertTask() , .


ErrorTasksWatcher :


 /** * This class scans for erroneous tasks and handles them depending on their state. */ @Component class ErrorTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(ErrorTasksWatcher.class); private static final Integer nRows = 10; private TasksManager tasksManager; private ErrorTasksManager errorTasksManager; // spring setters... /** * This watcher wakes up every time 1 minute passed from the last completion, checks backup states periodically and handles erroneous * tasks if any. * <p> * The watcher handles at most N tasks as described by {@link #nRows} constant and skips already locked tasks. * When retrieving error tasks from database pessimistic lock is set. It allows safely run more than one copy of program, as no other * watcher can pick up already being handled error tasks. * <p> * If the server shutdowns while rows was locked, transaction will be rolled back and lock released, so these entities can be picked * up by the other running server. */ @Scheduled(fixedDelay = 60 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchErrorTasks() { for (ErrorTask errorTask : errorTasksManager.findFirstNAndLock(nRows)) { if (!errorTask.isErrorHandled()) { Integer backupTaskId = errorTask.getTaskId(); Optional<Task> optionalTask = tasksManager.findById(backupTaskId); if (!optionalTask.isPresent()) { logger.info("Can't handle erroneous task: no corresponding backup task entity. Backup task ID: {}", backupTaskId); continue; } tasksManager.revertTask(optionalTask.get()); errorTask.setErrorHandled(true); } } } } 

revertTask(Task) :


  /** * This function reverts erroneous task by its entity. * <p> * Use this function only after canceling related {@literal Future}. * <p> * If the task was of the type {@link Task.Type#CREATE_BACKUP} then related {@link BackupProperties} will be deleted. * * @param task the entity */ public void revertTask(@NotNull Task task) { Objects.requireNonNull(task); Task.State state = task.getState(); switch (state) { case DOWNLOADING: case APPLYING_DEPROCESSORS: case RESTORING: case DELETING: { logger.info("Handling broken operation. Operation: {}. No extra actions required", state.toString()); break; } case CREATING: case APPLYING_PROCESSORS: { logger.info("Handling broken operation. Operation: {}. Delete backup properties...", state.toString()); Integer backupPropertiesID = task.getBackupPropertiesId(); if (!backupPropertiesManager.existsById(backupPropertiesID)) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } backupPropertiesManager.deleteById(backupPropertiesID); break; } case UPLOADING: { logger.info("Handling broken operation. Operation: {}. Deleting backup from storage...", state); Integer backupPropertiesId = task.getBackupPropertiesId(); Optional<BackupProperties> optionalBackupProperties = backupPropertiesManager.findById(backupPropertiesId); if (!optionalBackupProperties.isPresent()) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } tasksStarterService.startDeleteTask(Task.RunType.INTERNAL, optionalBackupProperties.get()); backupPropertiesManager.deleteById(backupPropertiesId); break; } default: { logger.error("Can't revert task: unknown state. Task info: {}", task); } } } 

:


  1. DOWNLOADING , APPLYING_DEPROCESSORS , RESTORING , DELETING — . , .
  2. CREATING , APPLYING_PROCESSORS — , . BackupProperties , ( BackupProperties Web UI ).
  3. UPLOADING — . BackupProperties , . .

, . , ? , , Future ( 1), , InputStream ( 2). , 2, 1 2 ?


, , , . Future ( 1) :


  public void onError(@NotNull Throwable t, @NotNull Integer taskId) { logger.error("Exception caught. Task ID: {}", taskId, t); Optional<Future> optionalFuture = tasksStarterService.getFuture(taskId); if (!optionalFuture.isPresent()) { logger.error("Can't cancel the Future of task with ID {}: no such Future instance", taskId); } else { boolean canceled = optionalFuture.get().cancel(true); if (!canceled) { logger.error("Error canceling the Future of task with ID {}", taskId); } else { logger.info("Task canceled. Task ID: {}", taskId); errorTasksManager.setError(taskId); } } } 

, , ID , , Future - , ID .


, , , , , .


, :


, , , . — Future.


, , , I/O , — / . , . :


  1. , . , — .
  2. — Future , . , / , , ( , — IOException , , ).

, — ( ID , , ), .




, , . , , .



  1. Web UI: , . ,

结论


:



, ! , GitHub!

Source: https://habr.com/ru/post/zh-CN459478/


All Articles