Meine Erfahrung beim Erstellen einer Multithread-Anwendung für die Arbeit mit Backups

Im Moment werden Sie niemanden mit Multithread-Anwendungen überraschen, aber ich denke, dass Sie in diesem Artikel einige interessante Ideen finden können. Mein Java-Studium begann mit diesem Projekt. Vielleicht irre ich mich an einigen Stellen sehr oder baue ein großes Fahrrad, aber ich hoffe, dass sich jemand für die Erfahrung eines Anfängers in Java interessiert. Ich werde einige Funktionen der Anwendung geben:


  • Es funktioniert mit Sicherungen ausschließlich im Speicher, unabhängig von der Größe der Sicherung
  • Lädt nicht die gesamte Sicherung in den Speicher
  • Sicherungs- / Wiederherstellungsvorgänge können abgebrochen werden

Unter dem Schnitt werden die Architektur der Anwendung sowie die aufgetretenen Hauptprobleme und deren Lösung betrachtet.


Anwendungsübersicht


Die Kommunikation mit der Anwendung erfolgt über die Web-Benutzeroberfläche. In Zukunft kann jedoch bei Bedarf eine REST-API hinzugefügt werden.


Die Anwendung kann:


  1. Erstellen Sie Backups und laden Sie sie in einen oder mehrere Speicher hoch
  2. Stellen Sie Sicherungen wieder her, indem Sie sie aus dem Speicher laden
  3. Löschen Sie Backups aus allen Speichern
  4. Erstellen Sie regelmäßig Backups

Derzeit unterstützte Repositorys:


  • Lokales Dateisystem (wird von Docker nicht unterstützt)
  • Dropbox

Derzeit unterstützte Datenbanken:


  • PostgreSQL

Aus einer speziellen Anwendung kann ich Folgendes hervorgehen:


  1. Richtige Arbeit in einer Clusterkonfiguration
  2. Ein Backup wird unabhängig von der Größe des Backups niemals vollständig in den Speicher geladen. Das Dateisystem für die temporäre Sicherungsspeicherung ist ebenfalls nicht beteiligt. Sowohl die Erstellung einer Sicherung als auch die Wiederherstellung und damit das Laden / Entladen einer Sicherung erfolgen ausschließlich im Speicher.
  3. Plattformübergreifend - funktioniert sowohl unter Windows als auch unter Linux.
  4. Wir können alle laufenden Aufgaben überwachen und bei Bedarf abbrechen.

Unten finden Sie Screenshots der Web-Benutzeroberfläche, in denen die Funktionen der Anwendung klar beschrieben werden.


Speicherverwaltung



Datenbankverwaltung



Backup-Erstellung


Backup-Wiederherstellung


Verwalten Sie erstellte Backups


Regelmäßige Sicherungen


Verfolgen Sie laufende Aufgaben




Architektur


Die Hauptarbeit wird in 3 Diensten stattfinden - DatabaseBackup , Prozessor , Speicher , und wir werden sie unter Verwendung des Aufgabenkonzepts miteinander verbinden . Über all das weiter.


Datenbanksicherung


Dieser Dienst ist für das Erstellen und Wiederherstellen von Klartextsicherungen verantwortlich.


Serviceschnittstelle:


public interface DatabaseBackup { InputStream createBackup(DatabaseSettings databaseSettings, Integer id); void restoreBackup(InputStream in, DatabaseSettings databaseSettings, Integer id); } 

Beide Schnittstellenmethoden arbeiten mit InputStream- Instanzen, da die gesamte Sicherung nicht in den Speicher geladen werden muss. Dies bedeutet, dass die Sicherung im Streaming-Modus gelesen / geschrieben werden muss. Die DatabaseSettings- Entität wird über die Web-Benutzeroberfläche vorab erstellt und speichert die verschiedenen Einstellungen, die für den Zugriff auf die Datenbank erforderlich sind. Was ist dieser Parameter - id - wird etwas weiter erklärt.


Die Serviceanforderungen lauten wie folgt:


  1. Beide Methoden sollten nicht die gesamte Sicherung in den Speicher lesen.
  2. Die Methode restoreBackup() sollte die Sicherung in einer einzelnen Transaktion wiederherstellen, damit die Datenbank im Fehlerfall nicht in einem inkonsistenten Zustand restoreBackup() .

Implementierung für PostgreSQL (Textbeschreibung)

Insbesondere in der Implementierung für PostgreSQL wird der Dienst wie folgt implementiert:


  1. createBackup() : Es wird ein pg_dump- Prozess erstellt, der ein Backup erstellt und in den Standardausgabestream schreibt. Der Standardprozessausgabestream wird von der Methode zurückgegeben (siehe https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getInputStream-- ). Die E / A-Streams im System basieren auf einem Puffer einer bestimmten Größe. Wenn ein Prozess in den Ausgabestream schreibt, schreibt er tatsächlich in den Puffer im Speicher. Das Wichtigste dabei ist, dass der Prozessthread erst dann in den gefüllten Puffer schreibt, wenn dieser von der anderen Seite gelesen wurde. Dies bedeutet, dass sich der Thread in einem gesperrten Zustand befindet und die Sicherung nicht vollständig in den Speicher geladen wird. Möglicherweise ist Ihr Java-Programm bei der Arbeit mit Prozessen festgefahren, weil Sie das stdout oder stderr des Prozesses nicht gelesen haben. Es ist äußerst wichtig, dies zu überwachen, da der Prozess nicht fortgesetzt werden kann, wenn er bei einem E / A-Blockierungsaufruf beim Schreiben in einen vollständigen Puffer blockiert wird und niemand diesen Puffer liest.
  2. restoreBackup() : Ein psql- Prozess wird erstellt, die Sicherung wird aus dem an die Methode übergebenen restoreBackup() gelesen und gleichzeitig in den psql-Standardeingabestream geschrieben (siehe https://docs.oracle.com/javase/8/docs/api/java/lang/Process). html # getOutputStream-- ). Dies funktioniert, da die PostgreSQL-Sicherung im Klartext nur eine Sammlung von DDL- und DML-Befehlen ist, die leicht verständlich sind.

Es gibt viel Code, daher werde ich ihn hier nicht weitergeben, aber Sie können GitHub über den Link am Ende des Artikels ansehen.


Prozessor


Dieser Service ist für die Verwendung von Prozessoren und die Wiederaufbereitung von Reverse Backups verantwortlich. Prozessoren werden vor dem Herunterladen in den Speicher oder nach dem Entladen aus dem Speicher verwendet. Prozessorbeispiel: Kompressor, Verschlüsselung.


Serviceschnittstelle:


 public interface Processor { InputStream process(InputStream in); InputStream deprocess(InputStream in); ProcessorType getType(); // ProcessorType -  Enum,     int getPrecedence(); //   } 

Jeder Prozessor hat Priorität. Wenn mehrere Prozessoren angegeben sind, werden diese in absteigender Reihenfolge der Priorität angewendet. Wenn Sie die Umkehrfunktion in derselben Reihenfolge anwenden, in der die Prozessoren angewendet wurden, erhalten Sie die ursprüngliche Sicherung.


Lagerung


Dieser Dienst ist für das Laden und Entladen eines Backups sowie dessen Entfernung aus dem Speicher verantwortlich. Speicherbeispiel: Dropbox, lokales Dateisystem.


Serviceschnittstelle:


 public interface Storage { void uploadBackup(InputStream in, StorageSettings storageSettings, String backupName, Integer id); InputStream downloadBackup(StorageSettings storageSettings, String backupName, Integer id); void deleteBackup(StorageSettings storageSettings, String backupName, Integer id); } 

Jedem erstellten Backup wird ein eindeutiger Name zugewiesen, sodass wir es in jedem der Speicher finden können, in die es heruntergeladen wurde. Die Art und Weise, wie die Sicherung dem Speicher präsentiert wird, hängt ausschließlich von der Implementierung des Dienstes ab. Wenn Sie jedoch den Sicherungsnamen auf eine der Funktionen übertragen, sollten Sie das richtige Verhalten erwarten. Die StorageSettings- Entität wird über die Web-Benutzeroberfläche vorab erstellt und speichert die erforderlichen Einstellungen für den Zugriff auf den Speicher.




Aufgabenkonzept


Wir möchten in der Lage sein, den Status unserer Aufgaben zu verfolgen, mögliche Fehler abhängig vom Fortschritt der Aufgabe zu behandeln und auch Aufgaben abzubrechen. Daher werden wir weiterhin nur mit Aufgaben arbeiten. Jede Aufgabe wird in der Datenbank durch einen Datensatz in der Tabelle und programmgesteuert durch die Future- Instanz dargestellt (siehe Java Future ). Jeder Datensatz in der Tabelle ist mit einer eigenen Zukunft verknüpft (wenn mehrere Server ausgeführt werden, können sich zukünftige Instanzen im Speicher verschiedener Server befinden).


Lass uns nacheinander gehen. Zunächst benötigen wir einen Service zum Starten von Aufgaben - Erstellen, Wiederherstellen und Löschen von Backups.


Aufgabenstart


Erstellen eines Backups:


 public Task startBackupTask(@NotNull Task.RunType runType, @NotNull List<String> storageSettingsNameList, @Nullable List<ProcessorType> processors, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(storageSettingsNameList); Objects.requireNonNull(processors); Objects.requireNonNull(databaseSettings); BackupProperties backupProperties = backupPropertiesManager.initNewBackupProperties(storageSettingsNameList, processors, databaseSettings.getName()); Task task = tasksManager.initNewTask(Task.Type.CREATE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.CREATING); logger.info("Creating backup..."); try (InputStream backupStream = databaseBackupManager.createBackup(databaseSettings, taskId)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_PROCESSORS); logger.info("Applying processors on created backup. Processors: {}", processors); try (InputStream processedBackupStream = backupProcessorManager.process(backupStream, processors)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.UPLOADING); logger.info("Uploading backup..."); backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Creating backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of created backup", ex); } catch (RuntimeException ex) { logger.error("Error occurred while creating backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Backup creating task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

Das Erstellen eines Backups durchläuft drei Hauptschritte in der folgenden Reihenfolge: Erstellen eines Backups -> Anwendung von Prozessoren -> Hochladen in den Speicher. Bei fast allen Dienstmethoden leiten wir die ID der aktuellen Aufgabe weiter, damit der Dienst einen Fehler von einem Thread melden kann, der im Hintergrund arbeitet. Über die Fehlerbehandlung, warum InterruptedException hier ist und was mit einem Fehler nach Erhalt einer RuntimeException passiert, wird später erläutert.


Und so führen wir die Aufgabe aus, ein Backup zu erstellen:


 tasksStarterService.startBackupTask(Task.RunType.USER, storageSettingsNameList, processors, databaseSettings); 

Der erste Parameter, den wir an den Initiator der Aufgabe übergeben: der Benutzer oder die interne Serveraufgabe (ein Beispiel für eine interne Aufgabe ist eine regelmäßige Sicherung). Das Wissen des Aufgabeninitiators ermöglicht es uns, in der Web-Benutzeroberfläche nur die Aufgaben anzuzeigen, die vom Benutzer gestartet wurden. Die verbleibenden Parameter sind erforderlich, um eine Sicherung direkt zu erstellen - eine Liste der Speicher, der zu verwendenden Prozessoren und einer Datenbank, deren Speicherauszug Sie erstellen müssen.


Beim Erstellen einer Sicherung wird auch ein Datensatz in der Datenbank mit dem Namen BackupProperties erstellt . Diese Entität speichert Sicherungseigenschaften wie den Namen, die verwendeten Prozessoren und die Liste der Repositorys, in die die Sicherung heruntergeladen wurde. Um die Sicherung wiederherzustellen oder zu löschen, arbeiten wir mit dieser bestimmten Entität.


Die Aufgabe in der Datenbank wird in folgender Form gespeichert:


 @Entity @Table(name = "backup_tasks") public class Task { /** * Identifier of each backup task. Identifier is generated by PostgreSQL database after saving of entity. */ @Id @Column(insertable = false, updatable = false) @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; /** * Backup task type. * <p> * Type is set at the very start of any task and can't be changed. * * @see Type */ @Enumerated(EnumType.STRING) @Column(updatable = false) private Type type; /** * Who initiated a task: user or server. * <p> * We need to know it to show on front only these tasks that was started by user. * * @see RunType */ @Enumerated(EnumType.STRING) @Column(updatable = false) private RunType runType; /** * Backup task state. * <p> * State is updated with every new step in task being executed. * * @see Task.State */ @Enumerated(EnumType.STRING) private State state; /** * Whether task has been interrupted or not. * <p> * Default is {@literal false}. */ @Column(insertable = false) private boolean interrupted; /** * Identifier of {@link BackupProperties}. * <p> * We need to know backup ID to be able to handle occurred errors. */ @Column(updatable = false) private Integer backupPropertiesId; /** * Start time of the task. */ @Column(updatable = false) private LocalDateTime date; public enum RunType { USER, INTERNAL } public enum State { PLANNED, CREATING, RESTORING, DELETING, APPLYING_PROCESSORS, APPLYING_DEPROCESSORS, DOWNLOADING, UPLOADING, COMPLETED, } public enum Type { CREATE_BACKUP { @Override public String toString() { return "CREATE BACKUP"; } }, RESTORE_BACKUP { @Override public String toString() { return "RESTORE BACKUP"; } }, DELETE_BACKUP { @Override public String toString() { return "DELETE BACKUP"; } } } // getters & setters... } 

So können Sie den Vorgang des Erstellens eines Backups in Form eines Diagramms wie folgt beschreiben:
Sicherungsprozess




Andere Arten von Aufgaben werden analog gestartet. Um den Artikel nicht mit einer großen Menge an Code zu überladen, werde ich den Neugierigen den Code zum Starten von Aufgaben zum Wiederherstellen und Löschen von Backups separat im Spoiler geben.


Backup-Wiederherstellung
 public Task startRestoreTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties, @NotNull String storageSettingsName, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Objects.requireNonNull(storageSettingsName); Objects.requireNonNull(databaseSettings); Task task = tasksManager.initNewTask(Task.Type.RESTORE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.DOWNLOADING); logger.info("Downloading backup..."); try (InputStream downloadedBackup = backupLoadManager.downloadBackup(backupProperties.getBackupName(), storageSettingsName, taskId)) { if (Thread.interrupted() || downloadedBackup == null) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_DEPROCESSORS); logger.info("Deprocessing backup..."); try (InputStream deprocessedBackup = backupProcessorManager.deprocess(downloadedBackup, backupProperties.getProcessors())) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.RESTORING); logger.info("Restoring backup..."); databaseBackupManager.restoreBackup(deprocessedBackup, databaseSettings, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Restoring backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of downloaded backup", ex); } catch (RuntimeException ex) { logger.info("Error occurred while restoring backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

Das Wiederherstellen einer Sicherung durchläuft drei Hauptschritte in der folgenden Reihenfolge: Entladen einer Sicherung aus dem Speicher -> Verwenden von Deprozessoren zum Abrufen der ursprünglichen Klartextsicherung -> Wiederherstellen einer Sicherung.


Starten Sie die Wiederherstellung wie folgt:


 tasksStarterService.startRestoreTask(Task.RunType.USER, backupProperties, storageSettingsName, databaseSettings); 

Der Vorgang des Wiederherstellens einer Sicherung in Form eines Diagramms:


Backup löschen
 public Task startDeleteTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Task task = tasksManager.initNewTask(Task.Type.DELETE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { try { logger.info("Deleting backup started. Backup properties: {}", backupProperties); tasksManager.updateTaskState(taskId, Task.State.DELETING); backupLoadManager.deleteBackup(backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Deleting backup completed. Backup properties: {}", backupProperties); } catch (RuntimeException ex) { logger.error("Error occurred while deleting backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

Das Löschen eines Backups ist ganz einfach: Ein Backup wird einfach aus allen Speichern gelöscht, in die es heruntergeladen wurde.


Führen Sie die Deinstallation wie folgt aus:


 tasksStarterService.startDeleteTask(Task.RunType.USER, backupProperties); 

Der Vorgang des Löschens einer Sicherung in Form eines Diagramms:




Aufgabe abbrechen


Was ist Aufgabenabbruch? Dies ist natürlich nichts anderes als eine Thread-Beendigung. Sie konnten sehen, dass der gesamte in Future ausgeführte Hauptcode in das folgende Try-Catch-Konstrukt eingeschlossen ist:


 try { ... } catch (InterruptedException ex) { ... tasksManager.setInterrupted(taskId); } 

Und auch nach jeder wichtigen Methode, deren Ausführungsfluss wir steuern, wird die folgende Konstruktion installiert:


 if (Thread.interrupted()) { throw new InterruptedException(); } 

Bevor Sie fortfahren, sollte eine kurze Theorie der Unterbrechungen und Zustände von JVM-Threads gegeben werden.


Threads in der JVM können den folgenden Status haben:


  1. Neu
  2. Runnable
  3. Zeitgesteuertes Warten
  4. Warten
  5. Blockiert
  6. Beendet

Wir sind nur an den Wartezuständen und zeitgesteuerten Wartezuständen interessiert. Der Object.wait() durch die Methoden Object.wait() , Thread.join() und andere in den Status Warten versetzt . Der Thread wird unter Verwendung der Methoden Object.wait(timeout) , Thread.join(timeout) , Thread.sleep(sleeping) und anderer in den zeitgesteuerten Wartezustand (d. H. Eine Wartezeit, die einen bestimmten Zeitraum dauert Thread.sleep(sleeping) .


Das Wichtigste dabei ist, dass der Thread aufwacht und eine InterruptedException auslöst, wenn Sie den Thread unterbrechen, bevor Sie in den Wartezustand oder den zeitgesteuerten Wartezustand wechseln, oder wenn sich der Thread in diesem Zustand befindet.


Das ist aber noch nicht alles. Es ist überhaupt keine Tatsache, dass ein Thread jemals in Statusdaten wechselt, indem er ein Backup erstellt, wiederherstellt oder löscht. Wie kann man dann den Thread darüber informieren, dass er unterbrochen wurde?


Die erste Möglichkeit besteht darin, das Interrupt-Flag mit dem Thread unabhängig mit den Methoden Thread.interrupted Thread.interrupted() oder Thread.currentThread.isInterrupted() . Der Unterschied zwischen ihnen besteht darin, dass der erste die private native Methode currentThread.isInterrupted(boolean ClearInterrupted) , wobei true , um currentThread.isInterrupted(boolean ClearInterrupted) , dass das Interrupt-Flag gelöscht wird, und der zweite false , wobei das Interrupt-Flag intakt bleibt. Die Wahl zwischen diesen beiden Methoden hängt ganz von der Situation ab. Wenn eine InterruptedException ausgelöst wird, wird auch das Interrupt-Flag gelöscht - dies ist zu beachten.


Aber es muss einen Weg geben, der einfacher ist - und das ist es auch. In der Anwendung gibt es eine Menge Arbeit mit E / A-Streams und daher mit E / A-Methoden. Unsere Aufgabe ist es sicherzustellen, dass beim Aufrufen der Methoden read() oder write(int b) im E / A-Stream während der Unterbrechung ein Fehler ausgegeben wird, der darauf hinweist, dass der blockierende E / A-Aufruf unterbrochen wurde. Glücklicherweise hat Java eine solche Ausnahme - InterruptedIOException . Allerdings überwachen nicht alle Lese- / Schreib-Stream-Methoden Thread-Unterbrechungen, und speziell nur PipedInputStream überwacht sie. Daher müssen wir an den Stellen, an denen dieser Stream nicht beteiligt ist, die Lese- / Schreibmethode so erweitern, dass bei einem Interrupt eine InterruptedIOException ausgelöst wird. Tatsächlich war die Erweiterung der read () -Methode für mich in der Anwendung nur an einer Stelle ausreichend - als der InputStream von der Backup-Upload-Methode zurückkehrte. Auf diese Weise können wir den Ursprung eines Interrupts ermitteln, ohne dass das Flag überall mit Vorlagenprüfungen versehen werden muss. Es ist jedoch wichtig, diese Ausnahme getrennt von der IOException abzufangen und separat zu behandeln. Natürlich kann man an einigen Stellen nicht auf die Hilfe einer Vorlagenprüfung der Flagge verzichten, aber sie ist bereits besser geworden.


Es ist auch wichtig zu beachten, dass, wenn das Flag während der Interrupt-Verarbeitung gelöscht wurde, das Interrupt-Flag immer wieder gesetzt werden muss, damit wir nach der Rückkehr von der Methode herausfinden können, welcher Interrupt aufgetreten ist.


Lassen Sie mich anhand eines Beispiels erklären, warum dies wichtig ist. Angenommen, wir laden mit der Methode upload () ein Backup in den Speicher hoch und es tritt ein Interrupt auf. Der Interrupt wird verarbeitet, die Arbeit gestoppt und die Methode zurückgegeben. Eine Unterbrechung tritt nicht gelegentlich auf - dies bedeutet, dass entweder irgendwo ein Fehler aufgetreten ist oder der Benutzer die Aufgabe abgebrochen hat. Unabhängig vom Grund müssen wir alle Arbeiten in dieser Zukunft einstellen. Wenn Sie das Interrupt-Flag jedoch nicht erneut setzen, bevor Sie von der Startmethode zurückkehren, werden wir im Future-Hauptblock nie erfahren, welcher Interrupt aufgetreten ist.
Das gleiche Codebeispiel:


 backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); <-   ,       if (Thread.interrupted()) { //      ,      - ,    throw new InterruptedException(); } 

Daher empfiehlt es sich, eine InterruptedException oder InterruptedIOException wie folgt zu behandeln:


 try { ... } catch (InterruptedException e) { //  InterruptedIOException ... // re-interrupt the thread Thread.currentThread().interrupt(); } 

Nun, wir können mit Unterbrechungen umgehen, aber wer wird die Threads tatsächlich unterbrechen?
Zu diesem Zweck erstellen wir eine weitere Entität namens CancelTask , in der die ID der Aufgabe zum Abbrechen gespeichert wird, und schreiben eine Überwachung , die versucht, die Aufgaben zu unterbrechen. Warum versuchen? Weil:


  1. Der Thread im Speicher eines anderen Servers kann nicht beendet werden. Für uns können mehrere Server arbeiten, was bedeutet, dass Future auf verschiedene Server verteilt ist. Wenn also eine Anforderung zum Abbrechen einer Aufgabe auf einem der Server eintrifft, befindet sich die gewünschte Zukunft möglicherweise im Speicher eines anderen Servers.
  2. Die Aufgabe kann nicht abgebrochen werden, da Future aufgrund eines Serverabsturzes verloren gegangen ist.

Beschreiben Sie kurz den Stornierungsalgorithmus am Abend:
Watercher nimmt alle Datensätze aus der Tabelle cancel_tasks heraus (die Sperre wird nicht gleichzeitig gesetzt), geht jeden einzelnen durch und versucht, die entsprechende Zukunft aus seinem Speicher abzurufen . Wenn Future erfolgreich empfangen wurde, wird der entsprechende Thread unterbrochen, die Aufgabe zurückgesetzt und die Anforderung aus der Tabelle gelöscht. Wenn die Timeout-Anforderung zum Abbrechen der Aufgabe überschritten wird (was bedeutet, dass der Server abgestürzt ist und Future verloren gegangen ist), wird die Anforderung einfach aus der Tabelle gelöscht. Wenn mehrere Server eine Zeitüberschreitung feststellen und den Datensatz aus der Tabelle löschen, geschieht nichts Schlimmes, da das Löschen in PostgreSQL idempotent ist.


CancelTasksWatcher-Code:


Versteckter Text
 /** * This class scans for tasks to cancel and tries to cancel them. */ @Component class CancelTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(CancelTasksWatcher.class); private static final Duration cancelTimeout = Duration.ofMinutes(10); private CancelTasksManager cancelTasksManager; private TasksStarterService tasksStarterService; private TasksManager tasksManager; // spring setters... /** * This watcher wakes up every time 10 seconds passed from the last completion, checks if there are any tasks to cancel and tries to * cancel each task. * <p> * Since there are can be working more that one instance of the program, {@literal Future} instance of task can belong to different * servers. We can't get access to {@literal Future} if it's not in memory of the server where task cancellation request was accepted. * So the purpose of this watcher is to be able cancel tasks that works in the other instance of program. Each server has this watcher * checking for available cancellation requests and if any, the watcher tries to cancel corresponding {@literal Future}. * If cancellation is successful task will be also reverted. * <p> * If task cancellation request timeout exceeded, then it means a server that had requested {@literal Future} instances has been * shutdown, so all {@literal Future} instances lost and task can't be canceled. In such case task cancellation request will be ignored. * * @see TasksStarterService#getFuture(Integer) * @see TasksManager#revertTask(Task) */ @Scheduled(fixedDelay = 10 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchTasksToCancel() { Iterable<CancelTask> cancelTasks = cancelTasksManager.findAll(); Iterable<Task> tasks = tasksManager.findAllById(StreamSupport.stream(cancelTasks.spliterator(), false) .map(CancelTask::getTaskId).collect(Collectors.toList())); Map<Integer, Task> tasksAsMap = StreamSupport.stream(tasks.spliterator(), false) .collect(Collectors.toMap(Task::getId, Function.identity())); List<Integer> taskIdsForDeleting = new ArrayList<>(); for (CancelTask cancelTask : cancelTasks) { Integer taskId = cancelTask.getTaskId(); Task task = tasksAsMap.get(taskId); if (task == null) { logger.error("Can't cancel task: no such entity with ID {}", taskId); taskIdsForDeleting.add(taskId); continue; } // timeout exceeded, that is server shutdown and lost all Future instances, so task can't be canceled if (LocalDateTime.now(ZoneOffset.UTC).isAfter(cancelTask.getPutTime().plus(cancelTimeout))) { logger.error("Can't cancel task: timeout exceed. Task ID: {}", taskId); taskIdsForDeleting.add(taskId); continue; } tasksStarterService.getFuture(taskId).ifPresent(future -> { logger.info("Canceling task with ID {}", taskId); boolean canceled = future.cancel(true); if (canceled) { try { // give time to properly handle interrupt Thread.sleep(10000); } catch (InterruptedException e) { // should not happen } tasksManager.revertTask(task); } taskIdsForDeleting.add(taskId); logger.info("Task canceled: {}. Task ID: {}", canceled, taskId); }); } cancelTasksManager.deleteByTaskIdIn(taskIdsForDeleting); } } 



Fehlerbehandlung


Oben sehen Sie, dass der gesamte in Future ausgeführte Hauptcode in das folgende Try-Catch-Konstrukt eingeschlossen ist:


 try { ... } catch (RuntimeException e) { ... errorTasksManager.addErrorTask(taskId); } 

RuntimeException , Future , .


addErrorTask(taskId) , ID , .
? , , , .


:
, , . — PostgreSQL select for update , select skip locked . , , revertTask() , .


ErrorTasksWatcher :


Versteckter Text
 /** * This class scans for erroneous tasks and handles them depending on their state. */ @Component class ErrorTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(ErrorTasksWatcher.class); private static final Integer nRows = 10; private TasksManager tasksManager; private ErrorTasksManager errorTasksManager; // spring setters... /** * This watcher wakes up every time 1 minute passed from the last completion, checks backup states periodically and handles erroneous * tasks if any. * <p> * The watcher handles at most N tasks as described by {@link #nRows} constant and skips already locked tasks. * When retrieving error tasks from database pessimistic lock is set. It allows safely run more than one copy of program, as no other * watcher can pick up already being handled error tasks. * <p> * If the server shutdowns while rows was locked, transaction will be rolled back and lock released, so these entities can be picked * up by the other running server. */ @Scheduled(fixedDelay = 60 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchErrorTasks() { for (ErrorTask errorTask : errorTasksManager.findFirstNAndLock(nRows)) { if (!errorTask.isErrorHandled()) { Integer backupTaskId = errorTask.getTaskId(); Optional<Task> optionalTask = tasksManager.findById(backupTaskId); if (!optionalTask.isPresent()) { logger.info("Can't handle erroneous task: no corresponding backup task entity. Backup task ID: {}", backupTaskId); continue; } tasksManager.revertTask(optionalTask.get()); errorTask.setErrorHandled(true); } } } } 

revertTask(Task) :


Versteckter Text
  /** * This function reverts erroneous task by its entity. * <p> * Use this function only after canceling related {@literal Future}. * <p> * If the task was of the type {@link Task.Type#CREATE_BACKUP} then related {@link BackupProperties} will be deleted. * * @param task the entity */ public void revertTask(@NotNull Task task) { Objects.requireNonNull(task); Task.State state = task.getState(); switch (state) { case DOWNLOADING: case APPLYING_DEPROCESSORS: case RESTORING: case DELETING: { logger.info("Handling broken operation. Operation: {}. No extra actions required", state.toString()); break; } case CREATING: case APPLYING_PROCESSORS: { logger.info("Handling broken operation. Operation: {}. Delete backup properties...", state.toString()); Integer backupPropertiesID = task.getBackupPropertiesId(); if (!backupPropertiesManager.existsById(backupPropertiesID)) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } backupPropertiesManager.deleteById(backupPropertiesID); break; } case UPLOADING: { logger.info("Handling broken operation. Operation: {}. Deleting backup from storage...", state); Integer backupPropertiesId = task.getBackupPropertiesId(); Optional<BackupProperties> optionalBackupProperties = backupPropertiesManager.findById(backupPropertiesId); if (!optionalBackupProperties.isPresent()) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } tasksStarterService.startDeleteTask(Task.RunType.INTERNAL, optionalBackupProperties.get()); backupPropertiesManager.deleteById(backupPropertiesId); break; } default: { logger.error("Can't revert task: unknown state. Task info: {}", task); } } } 

:


  1. DOWNLOADING , APPLYING_DEPROCESSORS , RESTORING , DELETING — . , .
  2. CREATING , APPLYING_PROCESSORS — , . BackupProperties , ( BackupProperties Web UI ).
  3. UPLOADING — . BackupProperties , . .

, . , ? , , Future ( 1), , InputStream ( 2). , 2, 1 2 ?


, , , . Future ( 1) :


  public void onError(@NotNull Throwable t, @NotNull Integer taskId) { logger.error("Exception caught. Task ID: {}", taskId, t); Optional<Future> optionalFuture = tasksStarterService.getFuture(taskId); if (!optionalFuture.isPresent()) { logger.error("Can't cancel the Future of task with ID {}: no such Future instance", taskId); } else { boolean canceled = optionalFuture.get().cancel(true); if (!canceled) { logger.error("Error canceling the Future of task with ID {}", taskId); } else { logger.info("Task canceled. Task ID: {}", taskId); errorTasksManager.setError(taskId); } } } 

, , ID , , Future - , ID .


, , , , , .


, :


, , , . — Future.


, , , I/O , — / . , . :


  1. , . , — .
  2. — Future , . , / , , ( , — IOException , , ).

, — ( ID , , ), .




, , . , , .



  1. Web UI: , . ,

Fazit


:



, ! , GitHub!

Source: https://habr.com/ru/post/de459478/


All Articles