Pour le moment, vous ne surprendrez personne avec des applications multithreads, mais je pense que dans cet article, vous pouvez trouver des idées intéressantes. Mon étude de Java a commencé avec ce projet, donc peut-être qu'à certains endroits je me tromperai ou je construirai un gros vélo, mais j'espère que quelqu'un sera intéressé par l'expérience d'un débutant en Java. Je vais vous donner plusieurs fonctionnalités de l'application:
- Il fonctionne avec des sauvegardes exclusivement en mémoire, quelle que soit la taille de la sauvegarde
- Ne charge pas la sauvegarde entière en mémoire
- Les opérations de sauvegarde / restauration peuvent être annulées
Sous la coupe sera considérée l'architecture de l'application, ainsi que les principaux problèmes rencontrés et leur solution.
Présentation de l'application
La communication avec l'application se fait via l'interface utilisateur Web, mais à l'avenir, il sera possible d'ajouter une API REST si nécessaire.
L'application peut:
- Créez des sauvegardes et téléchargez-les sur un ou plusieurs stockages
- Restaurez les sauvegardes en les chargeant du stockage
- Supprimer les sauvegardes de tous les stockages
- Créez périodiquement des sauvegardes
Référentiels actuellement pris en charge:
- Système de fichiers local (non pris en charge par Docker)
- Dropbox
Bases de données actuellement prises en charge:
A partir d'une application spéciale, je peux noter:
- Travail correct dans une configuration de cluster
- Une sauvegarde n'est jamais entièrement chargée en mémoire, quelle que soit la taille de la sauvegarde. Le système de fichiers pour le stockage de sauvegarde temporaire n'est pas non plus impliqué. La création d'une sauvegarde et la restauration, et donc le chargement / déchargement d'une sauvegarde, se produisent exclusivement en mémoire.
- Multiplateforme - fonctionne sur Windows et Linux.
- Nous pouvons surveiller toutes les tâches en cours d'exécution et les annuler si nécessaire.
Vous trouverez ci-dessous des captures d'écran de l'interface utilisateur Web qui décrivent clairement les fonctionnalités de l'application.
Gestion de base de données Récupération de sauvegarde Gérer les sauvegardes créées Suivre les tâches en cours
L'architecture
Le travail principal se déroulera dans 3 services - DatabaseBackup , Processor , Storage , et nous les relierons ensemble en utilisant le concept de tâches . À propos de tout cela plus loin.
Sauvegarde de la base de données
Ce service est responsable de la création et de la restauration des sauvegardes en texte brut.
Interface de service:
public interface DatabaseBackup { InputStream createBackup(DatabaseSettings databaseSettings, Integer id); void restoreBackup(InputStream in, DatabaseSettings databaseSettings, Integer id); }
Les deux méthodes d'interface fonctionnent sur les instances InputStream , car nous avons besoin que la sauvegarde entière ne soit pas chargée en mémoire, ce qui signifie que la sauvegarde doit être lue / écrite en mode streaming. L'entité DatabaseSettings est pré-créée à partir de l'interface utilisateur Web et stocke les différents paramètres nécessaires pour accéder à la base de données. Quel est ce paramètre - id
- sera expliqué un peu plus loin.
Les exigences de service sont les suivantes:
- Les deux méthodes ne doivent pas lire l'intégralité de la sauvegarde en mémoire.
- La méthode
restoreBackup()
doit restaurer la sauvegarde en une seule transaction, afin qu'en cas d'erreur, ne laissez pas la base de données dans un état incohérent.
Implémentation pour PostgreSQL (description textuelle)Plus précisément, dans l'implémentation de PostgreSQL, le service est implémenté comme suit:
createBackup()
: un processus pg_dump est créé qui créera une sauvegarde et l'écrira dans le flux de sortie standard. Le flux de sortie de processus standard est renvoyé par la méthode (voir https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getInputStream-- ). Les flux d'E / S dans le système sont basés sur un tampon d'une certaine taille, et lorsqu'un processus écrit dans le flux de sortie, il écrit réellement dans le tampon en mémoire. La chose la plus importante ici est que le thread de processus n'écrira pas dans le tampon rempli jusqu'à ce que ce dernier ait été lu par l'autre côté, ce qui signifie que le thread sera dans un état verrouillé et que la sauvegarde ne sera pas complètement chargée en mémoire. Vous avez peut-être rencontré une situation dans laquelle votre programme Java a rencontré un blocage lors de l'utilisation de processus en raison du fait que vous n'avez pas lu la stdout ou la stderr du processus. Il est extrêmement important de surveiller cela, car le processus ne peut pas continuer s'il est bloqué sur un appel de blocage d'E / S lors de l'écriture dans un tampon complet et que personne ne lit ce tampon.restoreBackup()
: un processus psql est créé, la sauvegarde est lue à partir du restoreBackup()
transmis à la méthode et est simultanément écrite dans le flux d'entrée standard psql (voir https://docs.oracle.com/javase/8/docs/api/java/lang/Process. html # getOutputStream-- ). Cela fonctionne parce que la sauvegarde PostgreSQL en texte brut n'est qu'une collection de commandes DDL et DML faciles à comprendre psql.
Il y a beaucoup de code, donc je ne le donnerai pas ici, mais vous pouvez le voir sur GitHub en utilisant le lien Ă la fin de l'article.
Processeur
Ce service est responsable de l'utilisation des processeurs et du retraitement des sauvegardes inversées. Les processeurs sont utilisés avant le téléchargement vers le stockage ou après le déchargement du stockage. Exemple de processeur: compresseur, cryptage.
Interface de service:
public interface Processor { InputStream process(InputStream in); InputStream deprocess(InputStream in); ProcessorType getType();
Chaque processeur a la priorité - si plusieurs processeurs sont spécifiés, ils seront appliqués par ordre décroissant de priorité. En appliquant la fonction inverse dans le même ordre dans lequel les processeurs ont été appliqués, nous obtenons la sauvegarde d'origine.
Stockage
Ce service est responsable du chargement et du déchargement d'une sauvegarde, ainsi que de sa suppression du stockage. Exemple de stockage: Dropbox, système de fichiers local.
Interface de service:
public interface Storage { void uploadBackup(InputStream in, StorageSettings storageSettings, String backupName, Integer id); InputStream downloadBackup(StorageSettings storageSettings, String backupName, Integer id); void deleteBackup(StorageSettings storageSettings, String backupName, Integer id); }
Chaque sauvegarde créée se voit attribuer un nom unique - afin que nous puissions la trouver sur l'un des stockages sur lesquels elle a été téléchargée. La façon dont la sauvegarde est présentée au stockage dépend exclusivement de la mise en œuvre du service, mais lors du transfert du nom de la sauvegarde vers l'une des fonctions, nous devons nous attendre au comportement correct. L'entité StorageSettings est pré-créée à partir de l'interface utilisateur Web et stocke les paramètres nécessaires pour accéder au stockage.
Concept de tâche
Nous aimerions pouvoir suivre l'état de nos tâches, gérer les erreurs possibles en fonction de l'avancement de la tâche et également annuler les tâches. Par conséquent, nous continuerons à fonctionner uniquement avec des tâches. Chaque tâche sera représentée dans la base de données par un enregistrement dans le tableau et par programme par l'instance Future (voir Java Future ). Chaque enregistrement de la table est associé à son propre Future (de plus, si plusieurs serveurs sont en cours d'exécution, les instances Future peuvent être dans la mémoire de différents serveurs).
Allons séquentiellement. Tout d'abord, nous avons besoin d'un service pour lancer des tâches - création, restauration et suppression de sauvegardes.
Lancement de la tâche
Création d'une sauvegarde:
public Task startBackupTask(@NotNull Task.RunType runType, @NotNull List<String> storageSettingsNameList, @Nullable List<ProcessorType> processors, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(storageSettingsNameList); Objects.requireNonNull(processors); Objects.requireNonNull(databaseSettings); BackupProperties backupProperties = backupPropertiesManager.initNewBackupProperties(storageSettingsNameList, processors, databaseSettings.getName()); Task task = tasksManager.initNewTask(Task.Type.CREATE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.CREATING); logger.info("Creating backup..."); try (InputStream backupStream = databaseBackupManager.createBackup(databaseSettings, taskId)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_PROCESSORS); logger.info("Applying processors on created backup. Processors: {}", processors); try (InputStream processedBackupStream = backupProcessorManager.process(backupStream, processors)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.UPLOADING); logger.info("Uploading backup..."); backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Creating backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of created backup", ex); } catch (RuntimeException ex) { logger.error("Error occurred while creating backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Backup creating task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; }
La création d'une sauvegarde passe par 3 étapes principales dans l'ordre suivant: création d'une sauvegarde -> application des processeurs -> téléchargement vers le stockage. Dans presque toutes les méthodes de service, nous transmettons l'ID de la tâche en cours afin que le service puisse signaler une erreur à partir d'un thread qui fonctionne en arrière-plan. À propos de la gestion des erreurs, pourquoi InterruptedException est ici et ce qui se passe avec une erreur après avoir reçu une RuntimeException sera discuté plus tard.
Et voici comment nous allons exécuter la tâche de création d'une sauvegarde:
tasksStarterService.startBackupTask(Task.RunType.USER, storageSettingsNameList, processors, databaseSettings);
Le premier paramètre que nous transmettons à l'initiateur de la tâche: l'utilisateur ou la tâche serveur interne (un exemple de tâche interne est une sauvegarde périodique). La connaissance de l'initiateur de tâche nous permet d'afficher dans l'interface utilisateur Web uniquement les tâches qui ont été lancées par l'utilisateur. Les paramètres restants sont nécessaires pour créer directement une sauvegarde - une liste des stockages, des processeurs à utiliser, une base de données dont vous devez créer le vidage.
Lors de la création d'une sauvegarde, un enregistrement est également créé dans la base de données appelé BackupProperties . Cette entité stockera les propriétés de sauvegarde telles que le nom, les processeurs utilisés et la liste des référentiels dans lesquels la sauvegarde a été téléchargée. De plus, pour restaurer ou supprimer la sauvegarde, nous fonctionnerons avec cette entité particulière.
La tâche dans la base de données est stockée sous la forme suivante:
@Entity @Table(name = "backup_tasks") public class Task { @Id @Column(insertable = false, updatable = false) @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; @Enumerated(EnumType.STRING) @Column(updatable = false) private Type type; @Enumerated(EnumType.STRING) @Column(updatable = false) private RunType runType; @Enumerated(EnumType.STRING) private State state; @Column(insertable = false) private boolean interrupted; @Column(updatable = false) private Integer backupPropertiesId; @Column(updatable = false) private LocalDateTime date; public enum RunType { USER, INTERNAL } public enum State { PLANNED, CREATING, RESTORING, DELETING, APPLYING_PROCESSORS, APPLYING_DEPROCESSORS, DOWNLOADING, UPLOADING, COMPLETED, } public enum Type { CREATE_BACKUP { @Override public String toString() { return "CREATE BACKUP"; } }, RESTORE_BACKUP { @Override public String toString() { return "RESTORE BACKUP"; } }, DELETE_BACKUP { @Override public String toString() { return "DELETE BACKUP"; } } }
Ainsi, vous pouvez décrire le processus de création d'une sauvegarde sous la forme d'un diagramme comme suit:

D'autres types de tâches sont lancés par analogie. Afin de ne pas encombrer l'article avec une énorme quantité de code, pour les curieux, je donnerai le code pour lancer les tâches de restauration et de suppression de la sauvegarde séparément dans le spoiler.
Récupération de sauvegarde public Task startRestoreTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties, @NotNull String storageSettingsName, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Objects.requireNonNull(storageSettingsName); Objects.requireNonNull(databaseSettings); Task task = tasksManager.initNewTask(Task.Type.RESTORE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.DOWNLOADING); logger.info("Downloading backup..."); try (InputStream downloadedBackup = backupLoadManager.downloadBackup(backupProperties.getBackupName(), storageSettingsName, taskId)) { if (Thread.interrupted() || downloadedBackup == null) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_DEPROCESSORS); logger.info("Deprocessing backup..."); try (InputStream deprocessedBackup = backupProcessorManager.deprocess(downloadedBackup, backupProperties.getProcessors())) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.RESTORING); logger.info("Restoring backup..."); databaseBackupManager.restoreBackup(deprocessedBackup, databaseSettings, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Restoring backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of downloaded backup", ex); } catch (RuntimeException ex) { logger.info("Error occurred while restoring backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; }
La restauration d'une sauvegarde passe par 3 étapes principales dans l'ordre suivant: déchargement d'une sauvegarde du stockage -> utilisation des déprocesseurs pour obtenir la sauvegarde en texte brut d'origine -> restauration d'une sauvegarde.
Démarrez la récupération comme suit:
tasksStarterService.startRestoreTask(Task.RunType.USER, backupProperties, storageSettingsName, databaseSettings);
Processus de restauration d'une sauvegarde sous forme de diagramme:

Supprimer la sauvegarde public Task startDeleteTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Task task = tasksManager.initNewTask(Task.Type.DELETE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { try { logger.info("Deleting backup started. Backup properties: {}", backupProperties); tasksManager.updateTaskState(taskId, Task.State.DELETING); backupLoadManager.deleteBackup(backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Deleting backup completed. Backup properties: {}", backupProperties); } catch (RuntimeException ex) { logger.error("Error occurred while deleting backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; }
Le processus de suppression d'une sauvegarde est assez simple: une sauvegarde est simplement supprimée de tous les stockages sur lesquels elle a été téléchargée.
Exécutez la désinstallation comme suit:
tasksStarterService.startDeleteTask(Task.RunType.USER, backupProperties);
Processus de suppression d'une sauvegarde sous forme de diagramme:

Annuler la tâche
Qu'est-ce que l'annulation de tâche? Bien sûr, ce n'est rien de plus qu'une terminaison de thread. Vous pouvez voir que tout le code principal exécuté dans Future est encapsulé dans la construction try-catch suivante:
try { ... } catch (InterruptedException ex) { ... tasksManager.setInterrupted(taskId); }
Et également après chaque méthode importante, dont nous contrôlons le flux, la construction suivante est installée:
if (Thread.interrupted()) { throw new InterruptedException(); }
Avant de poursuivre, une brève théorie des interruptions et des états des threads JVM doit être donnée.
Les threads dans la JVM peuvent avoir les états suivants:
- Nouveau
- Runnable
- Attente chronométrée
- En attente
- Bloqué
- Terminé
Nous ne sommes intéressés que par les états d'attente et d'attente. Le Object.wait()
dans l'état Waiting par les méthodes Object.wait()
, Thread.join()
et autres. Le thread est placé dans l'état d' attente Timed (c'est-à -dire une attente qui dure une certaine période de temps) en utilisant les méthodes Object.wait(timeout)
, Thread.join(timeout)
, Thread.sleep(sleeping)
et autres.
La chose la plus importante ici est que si vous interrompez le thread avant d'entrer dans l'état d' attente En attente ou Timed ou lorsque le thread est dans cet état , le thread se réveille, lançant une InterruptedException .
Mais ce n'est pas tout. Ce n'est pas du tout un fait qu'un thread entrera jamais dans les données d'état en créant, en restaurant ou en supprimant une sauvegarde. Comment alors informer le fil qu'il a été interrompu?
La première façon consiste à vérifier indépendamment l'indicateur d'interruption avec le thread en utilisant les méthodes Thread.interrupted Thread.interrupted()
ou Thread.currentThread.isInterrupted()
. La différence entre eux est que la première appelle la méthode native privée currentThread.isInterrupted(boolean ClearInterrupted)
, la passant true
, indiquant que le drapeau d'interruption sera effacé, et le second passant false
, laissant le drapeau d'interruption intact. Le choix entre ces deux méthodes dépend entièrement de la situation. Lorsqu'une InterruptedException est levée, l'indicateur d'interruption est également effacé - cela mérite d'être rappelé.
Mais il doit y avoir un moyen plus facile - et c'est le cas. Dans l'application, il y a énormément de travail avec les flux d'E / S, et donc avec les méthodes d'E / S. Notre tâche consiste à garantir que lors de l'appel des méthodes read()
ou write(int b)
sur le flux d'E / S, une erreur est générée lors de l'interruption, informant que l'appel d'E / S de blocage a été interrompu. Heureusement, Java a une telle exception - InterruptedIOException . Cependant, toutes les méthodes de lecture / écriture de flux ne surveillent pas les interruptions de threads, et en particulier seul PipedInputStream le surveille. Par conséquent, dans les endroits où ce flux n'est pas impliqué, nous devons étendre la méthode de lecture / écriture de sorte qu'en cas d'interruption, une InterruptedIOException soit levée. En fait, l'extension de la méthode read () ne me suffisait dans l'application qu'à un seul endroit - lorsque InputStream revenait de la méthode de téléchargement de sauvegarde. C'est ainsi que nous pouvons découvrir l'origine d'une interruption sans avoir à placer des vérifications de modèle sur le drapeau partout. Cependant, il est important d'attraper cette exception séparément de l'exception IOException et de la gérer séparément. Bien sûr, vous ne pouvez pas vous passer de l'aide d'un modèle de vérification du drapeau à certains endroits, mais c'est déjà mieux.
Il est également important de noter que si l'indicateur a été effacé pendant le traitement d'interruption, il est toujours nécessaire de définir à nouveau l'indicateur d'interruption afin qu'après le retour de la méthode, nous puissions découvrir l'interruption qui s'est produite.
Permettez-moi d'expliquer avec un exemple pourquoi c'est important. Supposons que nous téléchargions une sauvegarde vers le stockage dans la méthode upload () et qu'une interruption se produise. L'interruption est traitée, le travail est arrêté et la méthode revient. L'interruption ne se produit pas avec désinvolture - cela signifie que soit une erreur s'est produite quelque part, soit que l'utilisateur a annulé la tâche. Quelle que soit la raison, nous devons arrêter tout travail dans cet avenir. Mais si vous ne définissez pas à nouveau l'indicateur d'interruption avant de revenir de la méthode de démarrage, nous ne saurons jamais dans le bloc Future principal l'interruption qui s'est produite.
Le mĂŞme exemple de code:
backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); <- , if (Thread.interrupted()) {
Par conséquent, il est recommandé de gérer une exception InterruptedException ou InterruptedIOException comme suit:
try { ... } catch (InterruptedException e) {
Eh bien, nous pouvons gérer l'interruption, mais qui va réellement interrompre les threads?
Pour ce faire, nous allons créer une autre entité appelée CancelTask , qui stockera l'ID de la tâche à annuler, et rédigera également une veille qui tentera d'interrompre les tâches. Pourquoi essayer? Parce que:
- Impossible de terminer le thread dans la mémoire d'un autre serveur. Plusieurs serveurs peuvent fonctionner pour nous, ce qui signifie que Future est dispersé sur différents serveurs. Ainsi, lorsqu'une demande d'annulation d'une tâche arrive sur l'un des serveurs, le Future souhaité peut se trouver dans la mémoire d'un autre serveur.
- La tâche ne peut pas être annulée car Future a été perdu en raison d'une panne de serveur.
Décrivez brièvement l'algorithme d'annulation le soir:
Watercher supprime tous les enregistrements de la table cancel_tasks (le verrou n'est pas défini en même temps), parcourt chacun d'eux et essaie d'obtenir le futur correspondant de sa mémoire. Si Future est reçu avec succès, le thread correspondant est interrompu, la tâche est annulée et la demande est supprimée de la table. Si la demande de délai d'expiration pour annuler la tâche est dépassée (ce qui signifie que le serveur est tombé en panne et que Future a été perdu) - la demande est simplement supprimée de la table. Si plusieurs serveurs remarquent un timeout et suppriment l'enregistrement de la table, rien de mauvais ne se produira, car la suppression dans PostgreSQL est idempotente.
CancelTasksWatcher Code:
Texte masqué @Component class CancelTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(CancelTasksWatcher.class); private static final Duration cancelTimeout = Duration.ofMinutes(10); private CancelTasksManager cancelTasksManager; private TasksStarterService tasksStarterService; private TasksManager tasksManager;
Gestion des erreurs
Ci-dessus, vous pouvez voir que tout le code principal en cours d'exécution dans Future est encapsulé dans la construction try-catch suivante:
try { ... } catch (RuntimeException e) { ... errorTasksManager.addErrorTask(taskId); }
RuntimeException , Future , .
addErrorTask(taskId)
, ID , .
? , , , .
:
, , . — PostgreSQL select for update
, select skip locked
. , , revertTask()
, .
ErrorTasksWatcher :
@Component class ErrorTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(ErrorTasksWatcher.class); private static final Integer nRows = 10; private TasksManager tasksManager; private ErrorTasksManager errorTasksManager;
revertTask(Task)
:
public void revertTask(@NotNull Task task) { Objects.requireNonNull(task); Task.State state = task.getState(); switch (state) { case DOWNLOADING: case APPLYING_DEPROCESSORS: case RESTORING: case DELETING: { logger.info("Handling broken operation. Operation: {}. No extra actions required", state.toString()); break; } case CREATING: case APPLYING_PROCESSORS: { logger.info("Handling broken operation. Operation: {}. Delete backup properties...", state.toString()); Integer backupPropertiesID = task.getBackupPropertiesId(); if (!backupPropertiesManager.existsById(backupPropertiesID)) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } backupPropertiesManager.deleteById(backupPropertiesID); break; } case UPLOADING: { logger.info("Handling broken operation. Operation: {}. Deleting backup from storage...", state); Integer backupPropertiesId = task.getBackupPropertiesId(); Optional<BackupProperties> optionalBackupProperties = backupPropertiesManager.findById(backupPropertiesId); if (!optionalBackupProperties.isPresent()) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } tasksStarterService.startDeleteTask(Task.RunType.INTERNAL, optionalBackupProperties.get()); backupPropertiesManager.deleteById(backupPropertiesId); break; } default: { logger.error("Can't revert task: unknown state. Task info: {}", task); } } }
:
- DOWNLOADING , APPLYING_DEPROCESSORS , RESTORING , DELETING — . , .
- CREATING , APPLYING_PROCESSORS — , . BackupProperties , ( BackupProperties Web UI ).
- UPLOADING — . BackupProperties , . .
, . , ? , , Future ( 1), , InputStream ( 2). , 2, 1 2 ?
, , , . Future ( 1) :
public void onError(@NotNull Throwable t, @NotNull Integer taskId) { logger.error("Exception caught. Task ID: {}", taskId, t); Optional<Future> optionalFuture = tasksStarterService.getFuture(taskId); if (!optionalFuture.isPresent()) { logger.error("Can't cancel the Future of task with ID {}: no such Future instance", taskId); } else { boolean canceled = optionalFuture.get().cancel(true); if (!canceled) { logger.error("Error canceling the Future of task with ID {}", taskId); } else { logger.info("Task canceled. Task ID: {}", taskId); errorTasksManager.setError(taskId); } } }
, , ID , , Future - , ID .
, , , , , .
, :
, , , . — Future.
, , , I/O , — / . , . :
- , . , — .
- — Future , . , / , , ( , — IOException , , ).
, — ( ID , , ), .
, , . , , .
- Web UI: , . ,
Conclusion
:
, ! , GitHub!