目前,使用多线程应用程序的用户不会感到惊讶,但是我认为在本文中您可以找到一些有趣的想法。 我对Java的研究是从这个项目开始的,所以也许在某些地方我会犯错或制造一辆大型自行车,但是我希望有人会对Java初学者的经验感兴趣。 我将给出该应用程序的几个功能:
- 无论备份的大小如何,它都可专门用于内存中的备份
- 不会将整个备份加载到内存中
- 备份/还原操作可以取消
切入点将考虑应用程序的体系结构,以及遇到的主要问题及其解决方案。
应用概述
与应用程序的通信是通过Web UI进行的,但是在将来,如有必要,可以添加REST API。
该应用程序可以:
- 创建备份并将其上传到一个或多个存储
- 通过从存储加载备份来还原备份
- 从所有存储中删除备份
- 定期创建备份
当前支持的存储库:
当前支持的数据库:
从一个特殊的应用程序,我可以注意到:
- 纠正集群配置中的工作
- 无论备份的大小如何,备份都不会完全加载到内存中。 还不涉及用于临时备份存储的文件系统。 备份的创建和还原以及备份的加载/卸载都专门在内存中进行。
- 跨平台-在Windows和Linux上均可使用。
- 我们可以监视所有正在运行的任务,并在必要时取消它们。
下面是Web UI的屏幕截图,清楚地描述了应用程序的功能。
建筑学
主要工作将在3个服务中进行: DatabaseBackup , Processor , Storage ,我们将使用任务概念将它们连接在一起。 关于所有这一切。
数据库备份
该服务负责创建和还原纯文本备份。
服务接口:
public interface DatabaseBackup { InputStream createBackup(DatabaseSettings databaseSettings, Integer id); void restoreBackup(InputStream in, DatabaseSettings databaseSettings, Integer id); }
两种接口方法都在InputStream实例上运行,因为我们需要不将整个备份加载到内存中,这意味着必须以流模式读取/写入备份。 DatabaseSettings实体是从Web UI预先创建的,并存储访问数据库所需的各种设置。 参数id
是什么,将进一步解释。
服务要求如下:
- 两种方法都不应将整个备份读入内存。
restoreBackup()
方法应在单个事务中还原备份,以便在发生错误的情况下,不要使数据库处于不一致的状态。
处理器
该服务负责使用处理器和反向备份重新处理。 在下载到存储之前或从存储中卸载之后,将使用处理器。 处理器示例:压缩器,加密。
服务接口:
public interface Processor { InputStream process(InputStream in); InputStream deprocess(InputStream in); ProcessorType getType();
每个处理器都有优先级-如果指定了多个处理器,则将按照优先级从高到低的顺序应用它们。 以应用处理器的相同顺序应用逆函数,我们得到了原始备份。
贮藏
该服务负责加载和卸载备份,以及从存储中删除备份。 存储示例:Dropbox,本地文件系统。
服务接口:
public interface Storage { void uploadBackup(InputStream in, StorageSettings storageSettings, String backupName, Integer id); InputStream downloadBackup(StorageSettings storageSettings, String backupName, Integer id); void deleteBackup(StorageSettings storageSettings, String backupName, Integer id); }
每个创建的备份都有一个唯一的名称-因此我们可以在将其下载到的任何存储上找到它。 将备份呈现到存储的方式完全取决于服务的实现,但是将备份名称转移到其中一个功能时,我们应该期望正确的行为。 StorageSettings实体是从Web UI预先创建的,并存储用于访问存储器的必要设置。
任务概念
我们希望能够跟踪任务的状态,根据任务的进度处理可能的错误,以及取消任务。 因此,我们将继续仅处理任务。 每个任务将在数据库中由表中的记录表示,并以编程方式由Future实例表示(请参见Java Future )。 该表中的每个记录都与其自己的Future关联(此外,如果正在运行多个服务器,则Future实例可以位于不同服务器的内存中)。
让我们按顺序去。 首先,我们需要一项用于启动任务的服务-创建,还原和删除备份。
任务启动
创建备份:
public Task startBackupTask(@NotNull Task.RunType runType, @NotNull List<String> storageSettingsNameList, @Nullable List<ProcessorType> processors, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(storageSettingsNameList); Objects.requireNonNull(processors); Objects.requireNonNull(databaseSettings); BackupProperties backupProperties = backupPropertiesManager.initNewBackupProperties(storageSettingsNameList, processors, databaseSettings.getName()); Task task = tasksManager.initNewTask(Task.Type.CREATE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.CREATING); logger.info("Creating backup..."); try (InputStream backupStream = databaseBackupManager.createBackup(databaseSettings, taskId)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_PROCESSORS); logger.info("Applying processors on created backup. Processors: {}", processors); try (InputStream processedBackupStream = backupProcessorManager.process(backupStream, processors)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.UPLOADING); logger.info("Uploading backup..."); backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Creating backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of created backup", ex); } catch (RuntimeException ex) { logger.error("Error occurred while creating backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Backup creating task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; }
创建备份按以下顺序经历了3个主要步骤:创建备份->处理器应用程序->上载到存储。 在几乎所有服务方法中,我们都转发当前任务的ID,以便该服务可以报告来自后台工作线程的错误。 关于错误处理,为什么在这里出现InterruptedException ,以及在收到RuntimeException之后出现错误会发生什么,将在后面讨论。
这是我们将如何运行创建备份的任务:
tasksStarterService.startBackupTask(Task.RunType.USER, storageSettingsNameList, processors, databaseSettings);
我们传递给任务发起者的第一个参数:用户或内部服务器任务(内部任务的一个示例是定期备份)。 任务发起者的知识使我们能够在Web UI中仅显示由用户启动的那些任务。 其余参数对于直接创建备份是必需的-存储列表,要使用的处理器以及需要创建其转储的数据库。
创建备份时,还将在名为BackupProperties的数据库中创建一条记录。 该实体将存储备份属性,例如名称,使用的处理器以及将备份下载到的存储库列表。 此外,要还原或删除备份,我们将使用此特定实体。
数据库中的任务以以下形式存储:
@Entity @Table(name = "backup_tasks") public class Task { @Id @Column(insertable = false, updatable = false) @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; @Enumerated(EnumType.STRING) @Column(updatable = false) private Type type; @Enumerated(EnumType.STRING) @Column(updatable = false) private RunType runType; @Enumerated(EnumType.STRING) private State state; @Column(insertable = false) private boolean interrupted; @Column(updatable = false) private Integer backupPropertiesId; @Column(updatable = false) private LocalDateTime date; public enum RunType { USER, INTERNAL } public enum State { PLANNED, CREATING, RESTORING, DELETING, APPLYING_PROCESSORS, APPLYING_DEPROCESSORS, DOWNLOADING, UPLOADING, COMPLETED, } public enum Type { CREATE_BACKUP { @Override public String toString() { return "CREATE BACKUP"; } }, RESTORE_BACKUP { @Override public String toString() { return "RESTORE BACKUP"; } }, DELETE_BACKUP { @Override public String toString() { return "DELETE BACKUP"; } } }
因此,您可以以图表的形式描述创建备份的过程,如下所示:

类推启动其他类型的任务。 为了避免用大量代码弄乱文章,出于好奇,我将提供代码来启动任务,以在剧透器中分别还原和删除备份。
备份恢复 public Task startRestoreTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties, @NotNull String storageSettingsName, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Objects.requireNonNull(storageSettingsName); Objects.requireNonNull(databaseSettings); Task task = tasksManager.initNewTask(Task.Type.RESTORE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.DOWNLOADING); logger.info("Downloading backup..."); try (InputStream downloadedBackup = backupLoadManager.downloadBackup(backupProperties.getBackupName(), storageSettingsName, taskId)) { if (Thread.interrupted() || downloadedBackup == null) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_DEPROCESSORS); logger.info("Deprocessing backup..."); try (InputStream deprocessedBackup = backupProcessorManager.deprocess(downloadedBackup, backupProperties.getProcessors())) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.RESTORING); logger.info("Restoring backup..."); databaseBackupManager.restoreBackup(deprocessedBackup, databaseSettings, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Restoring backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of downloaded backup", ex); } catch (RuntimeException ex) { logger.info("Error occurred while restoring backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; }
还原备份按以下顺序经历了3个主要步骤:从存储中卸载备份->使用解处理器获得原始的纯文本备份->还原备份。
开始恢复,如下所示:
tasksStarterService.startRestoreTask(Task.RunType.USER, backupProperties, storageSettingsName, databaseSettings);
以图表形式还原备份的过程:

删除备份 public Task startDeleteTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Task task = tasksManager.initNewTask(Task.Type.DELETE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { try { logger.info("Deleting backup started. Backup properties: {}", backupProperties); tasksManager.updateTaskState(taskId, Task.State.DELETING); backupLoadManager.deleteBackup(backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Deleting backup completed. Backup properties: {}", backupProperties); } catch (RuntimeException ex) { logger.error("Error occurred while deleting backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; }
删除备份的过程非常简单:只需将备份从其下载到的所有存储中删除。
运行卸载,如下所示:
tasksStarterService.startDeleteTask(Task.RunType.USER, backupProperties);
以图表形式删除备份的过程:

取消任务
什么是任务取消? 当然,这无非是线程终止。 您会看到,在Future中运行的所有主要代码都包装在以下try-catch构造中:
try { ... } catch (InterruptedException ex) { ... tasksManager.setInterrupted(taskId); }
并且在每种重要方法(我们控制其执行流程)之后,都安装了以下结构:
if (Thread.interrupted()) { throw new InterruptedException(); }
在继续之前,应该给出JVM线程的中断和状态的简要理论。
JVM中的线程可以具有以下状态:
- 新品
- 可运行
- 定时等待
- 等待中
- 受阻
- 已终止
我们仅对等待和定时等待状态感兴趣。 通过Object.wait()
, Thread.join()
等方法将Object.wait()
Waiting状态。 使用Object.wait(timeout)
, Thread.join(timeout)
, Thread.sleep(sleeping)
和其他方法将线程置于Timed等待状态(即,持续一定时间的等待Thread.sleep(sleeping)
。
这里最重要的是,如果在进入 Waiting或Timed等待状态 之前中断线程,或者当线程处于此状态时 ,线程将唤醒,并抛出InterruptedException 。
但这还不是全部。 通过创建,还原或删除备份,线程根本不会进入状态数据。 然后如何通知线程它被中断了?
第一种方法是使用Thread.interrupted Thread.interrupted()
或Thread.currentThread.isInterrupted()
方法独立检查线程的中断标志。 两者之间的区别在于,第一个调用私有本机方法currentThread.isInterrupted(boolean ClearInterrupted)
, currentThread.isInterrupted(boolean ClearInterrupted)
传递true
,表示将清除中断标志,而第二个传递false
,使中断标志保持不变。 这两种方法之间的选择完全取决于情况。 当抛出InterruptedException时,中断标志也会被清除-这值得记住。
但是必须有一种更简单的方法-确实如此。 在应用程序中,使用I / O流以及使用I / O方法的工作量很大。 我们的任务是确保在I / O流上调用read()
或write(int b)
方法时,在中断过程中引发错误,通知阻塞的I / O调用已中断。 幸运的是,Java有一个异常-InterruptedIOException 。 但是,并非所有读/写流方法都监视线程中断,特别是仅PipedInputStream监视它。 因此,在不涉及该流的那些地方,我们必须扩展读/写方法,以便在发生中断时引发InterruptedIOException。 实际上,在我的应用程序中,仅在一个地方对我来说,read()方法的扩展就足够了-当InputStream从备份上传方法返回时。 这样我们就可以了解中断的起源,而不必在任何地方对标志进行模板检查。 但是,将此异常与IOException分开捕获并分别处理是很重要的。 当然,在某些地方,如果不能借助标志的模板检查就无法做,但是它已经变得更好。
同样重要的是要注意,如果在中断处理期间清除了该标志,则始终有必要再次设置该中断标志,以便从该方法返回后,我们可以找出发生的中断。
让我举例说明为什么这很重要。 假设我们使用upload()方法将备份上传到存储中,并且发生了中断。 处理中断,工作停止,方法返回。 中断不会随随便便发生-这意味着错误发生在某个地方,或者用户取消了任务。 无论出于何种原因,我们都必须在此未来中停止所有工作。 但是,如果您在从引导方法返回之前没有再次设置中断标志,则在Future主块中我们将永远不会知道发生了什么中断。
相同的代码示例:
backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); <- , if (Thread.interrupted()) {
因此,优良作法是按以下方式处理InterruptedException或InterruptedIOException :
try { ... } catch (InterruptedException e) {
好吧,我们可以处理中断,但是谁会真正中断线程?
为此,我们将创建另一个名为CancelTask的实体,该实体将存储要取消的任务的ID,并编写一个手表来尝试中断任务。 为什么要尝试? 因为:
- 无法终止另一台服务器的内存中的线程。 几个服务器可以为我们工作,这意味着Future分散在不同的服务器上。 因此,当取消任务的请求到达其中一个服务器时,所需的Future可能在另一台服务器的内存中。
- 由于由于服务器崩溃而丢失了Future,因此无法取消任务。
简要介绍一下晚上的取消算法:
Watercher从cancel_tasks表中取出所有记录(同时未设置锁),逐一检查并尝试从其内存中获取相应的Future。 如果成功接收到Future,则将中断相应的线程,还原任务并将请求从表中删除。 如果超过了取消任务的超时请求(这意味着服务器崩溃了,并且Future丢失了),则只需从表中删除该请求即可。 如果多个服务器注意到超时并从表中删除记录,则不会发生任何不良情况,因为PostgreSQL中的删除是幂等的。
CancelTasksWatcher代码:
隐藏文字 @Component class CancelTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(CancelTasksWatcher.class); private static final Duration cancelTimeout = Duration.ofMinutes(10); private CancelTasksManager cancelTasksManager; private TasksStarterService tasksStarterService; private TasksManager tasksManager;
错误处理
, , Future, try-catch :
try { ... } catch (RuntimeException e) { ... errorTasksManager.addErrorTask(taskId); }
RuntimeException , Future , .
addErrorTask(taskId)
, ID , .
? , , , .
:
, , . — PostgreSQL select for update
, select skip locked
. , , revertTask()
, .
ErrorTasksWatcher :
@Component class ErrorTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(ErrorTasksWatcher.class); private static final Integer nRows = 10; private TasksManager tasksManager; private ErrorTasksManager errorTasksManager;
revertTask(Task)
:
public void revertTask(@NotNull Task task) { Objects.requireNonNull(task); Task.State state = task.getState(); switch (state) { case DOWNLOADING: case APPLYING_DEPROCESSORS: case RESTORING: case DELETING: { logger.info("Handling broken operation. Operation: {}. No extra actions required", state.toString()); break; } case CREATING: case APPLYING_PROCESSORS: { logger.info("Handling broken operation. Operation: {}. Delete backup properties...", state.toString()); Integer backupPropertiesID = task.getBackupPropertiesId(); if (!backupPropertiesManager.existsById(backupPropertiesID)) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } backupPropertiesManager.deleteById(backupPropertiesID); break; } case UPLOADING: { logger.info("Handling broken operation. Operation: {}. Deleting backup from storage...", state); Integer backupPropertiesId = task.getBackupPropertiesId(); Optional<BackupProperties> optionalBackupProperties = backupPropertiesManager.findById(backupPropertiesId); if (!optionalBackupProperties.isPresent()) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } tasksStarterService.startDeleteTask(Task.RunType.INTERNAL, optionalBackupProperties.get()); backupPropertiesManager.deleteById(backupPropertiesId); break; } default: { logger.error("Can't revert task: unknown state. Task info: {}", task); } } }
:
- DOWNLOADING , APPLYING_DEPROCESSORS , RESTORING , DELETING — . , .
- CREATING , APPLYING_PROCESSORS — , . BackupProperties , ( BackupProperties Web UI ).
- UPLOADING — . BackupProperties , . .
, . , ? , , Future ( 1), , InputStream ( 2). , 2, 1 2 ?
, , , . Future ( 1) :
public void onError(@NotNull Throwable t, @NotNull Integer taskId) { logger.error("Exception caught. Task ID: {}", taskId, t); Optional<Future> optionalFuture = tasksStarterService.getFuture(taskId); if (!optionalFuture.isPresent()) { logger.error("Can't cancel the Future of task with ID {}: no such Future instance", taskId); } else { boolean canceled = optionalFuture.get().cancel(true); if (!canceled) { logger.error("Error canceling the Future of task with ID {}", taskId); } else { logger.info("Task canceled. Task ID: {}", taskId); errorTasksManager.setError(taskId); } } }
, , ID , , Future - , ID .
, , , , , .
, :
, , , . — Future.
, , , I/O , — / . , . :
- , . , — .
- — Future , . , / , , ( , — IOException , , ).
, — ( ID , , ), .
, , . , , .
- Web UI: , . ,
结论
:
, ! , GitHub!