Pengalaman saya dalam membuat aplikasi multi-utas untuk bekerja dengan cadangan

Saat ini, Anda tidak akan mengejutkan siapa pun dengan aplikasi multithread, tetapi saya pikir dalam artikel ini Anda dapat menemukan beberapa ide menarik. Pelajaran saya tentang Jawa dimulai dengan proyek khusus ini, jadi mungkin di beberapa tempat saya akan sangat salah atau membangun sepeda besar, tetapi saya berharap seseorang akan tertarik dengan pengalaman seorang pemula di Jawa. Saya akan memberikan beberapa fitur aplikasi:


  • Ini bekerja dengan cadangan secara eksklusif di memori, terlepas dari ukuran cadangan
  • Tidak memuat seluruh cadangan ke dalam memori
  • Operasi pencadangan / pengembalian dapat dibatalkan

Di bawah cut akan dianggap arsitektur aplikasi, serta masalah utama yang dihadapi dan solusinya.


Ikhtisar Aplikasi


Komunikasi dengan aplikasi terjadi melalui Web UI, tetapi di masa mendatang akan dimungkinkan untuk menambahkan REST API jika perlu.


Aplikasi dapat:


  1. Buat cadangan dan unggah ke satu atau beberapa penyimpanan
  2. Kembalikan cadangan dengan memuatnya dari penyimpanan
  3. Hapus cadangan dari semua penyimpanan
  4. Buat cadangan secara berkala

Repositori yang saat ini didukung:


  • Sistem file lokal (tidak didukung dari Docker)
  • Dropbox

Database yang saat ini didukung:


  • PostgreSQL

Dari aplikasi khusus, saya dapat mencatat:


  1. Pekerjaan yang benar dalam konfigurasi cluster
  2. Cadangan tidak pernah dimuat penuh ke dalam memori, terlepas dari ukuran cadangan. Sistem file untuk penyimpanan cadangan sementara juga tidak terlibat. Baik pembuatan cadangan maupun pemulihan, dan karenanya pemuatan / pembongkaran cadangan, terjadi secara eksklusif di memori.
  3. Cross-platform - berfungsi pada Windows dan Linux.
  4. Kami dapat memantau semua tugas yang berjalan dan membatalkannya jika perlu.

Di bawah ini adalah tangkapan layar dari Web UI yang menggambarkan dengan jelas fitur-fitur aplikasi.


Manajemen penyimpanan



Manajemen basis data



Pembuatan cadangan


Pemulihan cadangan


Kelola cadangan yang dibuat


Pencadangan berkala


Lacak tugas yang sedang berjalan




Arsitektur


Pekerjaan utama akan berlangsung di 3 layanan - DatabaseBackup , Prosesor , Penyimpanan , dan kami akan menghubungkan mereka bersama-sama menggunakan konsep tugas . Tentang semua ini lebih jauh.


Databasebackup


Layanan ini bertanggung jawab untuk membuat dan memulihkan cadangan teks biasa.


Antarmuka Layanan:


public interface DatabaseBackup { InputStream createBackup(DatabaseSettings databaseSettings, Integer id); void restoreBackup(InputStream in, DatabaseSettings databaseSettings, Integer id); } 

Kedua metode antarmuka beroperasi pada instance InputStream , karena kita membutuhkan seluruh cadangan untuk tidak dimuat ke dalam memori, yang berarti bahwa cadangan tersebut harus dibaca / ditulis dalam mode streaming. Entitas DatabaseSettings dibuat sebelumnya dari UI Web dan menyimpan berbagai pengaturan yang diperlukan untuk mengakses database. Apa parameter ini - id - akan dijelaskan sedikit lebih jauh.


Persyaratan layanan adalah sebagai berikut:


  1. Kedua metode seharusnya tidak membaca seluruh cadangan ke dalam memori.
  2. Metode restoreBackup() harus mengembalikan cadangan dalam satu transaksi, sehingga jika terjadi kesalahan, jangan biarkan database dalam keadaan tidak konsisten.

Implementasi untuk PostgreSQL (deskripsi teks)

Secara khusus, dalam implementasi untuk PostgreSQL, layanan diimplementasikan sebagai berikut:


  1. createBackup() : proses pg_dump dibuat yang akan membuat cadangan dan menuliskannya ke aliran output standar. Aliran output proses standar dikembalikan dari metode (lihat https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getInputStream-- ). I / O stream dalam sistem didasarkan pada buffer dengan ukuran tertentu, dan ketika suatu proses menulis ke aliran output, itu sebenarnya menulis ke buffer dalam memori. Yang paling penting di sini adalah bahwa utas proses tidak akan menulis ke buffer yang diisi sampai yang terakhir telah dibaca oleh pihak lain, yang berarti utas akan dalam keadaan terkunci dan cadangan tidak akan dimuat sepenuhnya ke dalam memori. Anda mungkin pernah mengalami situasi di mana program Java Anda menemui jalan buntu saat bekerja dengan proses karena Anda tidak membaca stdout atau stderr proses. Sangat penting untuk memantau ini, karena proses tidak dapat dilanjutkan jika diblokir pada panggilan pemblokiran I / O saat menulis ke buffer penuh dan tidak ada yang membaca buffer ini.
  2. restoreBackup() : proses psql dibuat, cadangan dibaca dari InputStream yang diteruskan ke metode dan secara simultan ditulis ke aliran input standar psql (lihat https://docs.oracle.com/javase/8/docs/api/java/lang/Process. html # getOutputStream-- ). Ini berfungsi karena cadangan teks biasa PostgreSQL hanyalah kumpulan perintah DDL dan DML yang mudah dimengerti psql.

Ada banyak kode, jadi saya tidak akan memberikannya di sini, tetapi Anda dapat melihatnya di GitHub menggunakan tautan di akhir artikel.


Prosesor


Layanan ini bertanggung jawab untuk penggunaan prosesor dan membalikkan pemrosesan ulang cadangan. Prosesor digunakan sebelum mengunduh ke penyimpanan atau setelah membongkar dari penyimpanan. Contoh prosesor: kompresor, enkripsi.


Antarmuka Layanan:


 public interface Processor { InputStream process(InputStream in); InputStream deprocess(InputStream in); ProcessorType getType(); // ProcessorType -  Enum,     int getPrecedence(); //   } 

Setiap prosesor memiliki prioritas - jika beberapa prosesor ditentukan, mereka akan diterapkan dalam urutan prioritas yang menurun. Menerapkan fungsi terbalik dalam urutan yang sama ketika prosesor diterapkan, kami mendapatkan cadangan asli.


Penyimpanan


Layanan ini bertanggung jawab untuk memuat dan membongkar cadangan, serta menghapusnya dari penyimpanan. Contoh penyimpanan: Dropbox, sistem file lokal.


Antarmuka Layanan:


 public interface Storage { void uploadBackup(InputStream in, StorageSettings storageSettings, String backupName, Integer id); InputStream downloadBackup(StorageSettings storageSettings, String backupName, Integer id); void deleteBackup(StorageSettings storageSettings, String backupName, Integer id); } 

Setiap cadangan yang dibuat diberi nama unik - jadi kami dapat menemukannya di penyimpanan mana pun yang diunduh. Cara cadangan disajikan ke penyimpanan adalah masalah implementasi layanan secara eksklusif, tetapi ketika mentransfer nama cadangan ke salah satu fungsi, kita harus mengharapkan perilaku yang benar. Entitas StorageSettings sudah dibuat sebelumnya dari UI Web dan menyimpan pengaturan yang diperlukan untuk mengakses penyimpanan.




Konsep tugas


Kami ingin dapat melacak status tugas kami, menangani kemungkinan kesalahan tergantung pada kemajuan tugas, dan juga membatalkan tugas. Karena itu, kami akan terus beroperasi hanya dengan tugas. Setiap tugas akan diwakili dalam database dengan catatan dalam tabel, dan secara terprogram oleh instance Future (lihat Java Future ). Setiap catatan dalam tabel dikaitkan dengan masa depannya sendiri (apalagi, jika beberapa server berjalan, contoh masa depan dapat di memori server yang berbeda).


Mari kita berurutan. Pertama-tama, kami membutuhkan layanan untuk meluncurkan tugas - membuat, memulihkan, dan menghapus cadangan.


Peluncuran tugas


Membuat cadangan:


 public Task startBackupTask(@NotNull Task.RunType runType, @NotNull List<String> storageSettingsNameList, @Nullable List<ProcessorType> processors, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(storageSettingsNameList); Objects.requireNonNull(processors); Objects.requireNonNull(databaseSettings); BackupProperties backupProperties = backupPropertiesManager.initNewBackupProperties(storageSettingsNameList, processors, databaseSettings.getName()); Task task = tasksManager.initNewTask(Task.Type.CREATE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.CREATING); logger.info("Creating backup..."); try (InputStream backupStream = databaseBackupManager.createBackup(databaseSettings, taskId)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_PROCESSORS); logger.info("Applying processors on created backup. Processors: {}", processors); try (InputStream processedBackupStream = backupProcessorManager.process(backupStream, processors)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.UPLOADING); logger.info("Uploading backup..."); backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Creating backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of created backup", ex); } catch (RuntimeException ex) { logger.error("Error occurred while creating backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Backup creating task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

Membuat cadangan berjalan melalui 3 langkah utama dalam urutan berikut: membuat cadangan -> aplikasi prosesor -> mengunggah ke penyimpanan. Di hampir semua metode layanan, kami meneruskan ID tugas saat ini sehingga layanan dapat melaporkan kesalahan dari utas yang berfungsi di latar belakang. Tentang penanganan kesalahan, mengapa InterruptedException di sini dan apa yang terjadi dengan kesalahan setelah menerima RuntimeException akan dibahas nanti.


Dan inilah cara kita menjalankan tugas membuat cadangan:


 tasksStarterService.startBackupTask(Task.RunType.USER, storageSettingsNameList, processors, databaseSettings); 

Parameter pertama yang kami berikan ke inisiator tugas: tugas pengguna atau server internal (contoh tugas internal adalah cadangan berkala). Pengetahuan tentang inisiator tugas memungkinkan kami untuk menampilkan di UI Web hanya tugas-tugas yang diluncurkan oleh pengguna. Parameter yang tersisa diperlukan untuk membuat cadangan secara langsung - daftar penyimpanan, prosesor yang akan digunakan, database yang dump Anda perlu buat.


Saat membuat cadangan, catatan juga dibuat dalam database yang disebut BackupProperties . Entitas ini akan menyimpan properti cadangan seperti nama, prosesor yang digunakan, dan daftar repositori tempat cadangan diunduh. Selanjutnya, untuk mengembalikan atau menghapus cadangan, kami akan beroperasi dengan entitas khusus ini.


Tugas dalam database disimpan dalam bentuk berikut:


 @Entity @Table(name = "backup_tasks") public class Task { /** * Identifier of each backup task. Identifier is generated by PostgreSQL database after saving of entity. */ @Id @Column(insertable = false, updatable = false) @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; /** * Backup task type. * <p> * Type is set at the very start of any task and can't be changed. * * @see Type */ @Enumerated(EnumType.STRING) @Column(updatable = false) private Type type; /** * Who initiated a task: user or server. * <p> * We need to know it to show on front only these tasks that was started by user. * * @see RunType */ @Enumerated(EnumType.STRING) @Column(updatable = false) private RunType runType; /** * Backup task state. * <p> * State is updated with every new step in task being executed. * * @see Task.State */ @Enumerated(EnumType.STRING) private State state; /** * Whether task has been interrupted or not. * <p> * Default is {@literal false}. */ @Column(insertable = false) private boolean interrupted; /** * Identifier of {@link BackupProperties}. * <p> * We need to know backup ID to be able to handle occurred errors. */ @Column(updatable = false) private Integer backupPropertiesId; /** * Start time of the task. */ @Column(updatable = false) private LocalDateTime date; public enum RunType { USER, INTERNAL } public enum State { PLANNED, CREATING, RESTORING, DELETING, APPLYING_PROCESSORS, APPLYING_DEPROCESSORS, DOWNLOADING, UPLOADING, COMPLETED, } public enum Type { CREATE_BACKUP { @Override public String toString() { return "CREATE BACKUP"; } }, RESTORE_BACKUP { @Override public String toString() { return "RESTORE BACKUP"; } }, DELETE_BACKUP { @Override public String toString() { return "DELETE BACKUP"; } } } // getters & setters... } 

Dengan demikian, Anda dapat menggambarkan proses membuat cadangan dalam bentuk diagram sebagai berikut:
Proses pencadangan




Jenis tugas lain diluncurkan dengan analogi. Agar tidak mengacaukan artikel dengan sejumlah besar kode, bagi yang penasaran saya akan memberikan kode untuk meluncurkan tugas untuk mengembalikan dan menghapus cadangan secara terpisah di spoiler.


Pemulihan cadangan
 public Task startRestoreTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties, @NotNull String storageSettingsName, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Objects.requireNonNull(storageSettingsName); Objects.requireNonNull(databaseSettings); Task task = tasksManager.initNewTask(Task.Type.RESTORE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.DOWNLOADING); logger.info("Downloading backup..."); try (InputStream downloadedBackup = backupLoadManager.downloadBackup(backupProperties.getBackupName(), storageSettingsName, taskId)) { if (Thread.interrupted() || downloadedBackup == null) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_DEPROCESSORS); logger.info("Deprocessing backup..."); try (InputStream deprocessedBackup = backupProcessorManager.deprocess(downloadedBackup, backupProperties.getProcessors())) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.RESTORING); logger.info("Restoring backup..."); databaseBackupManager.restoreBackup(deprocessedBackup, databaseSettings, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Restoring backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of downloaded backup", ex); } catch (RuntimeException ex) { logger.info("Error occurred while restoring backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

Memulihkan cadangan melalui 3 langkah utama dalam urutan berikut: membongkar cadangan dari penyimpanan -> menggunakan deprocessor untuk mendapatkan cadangan teks biasa -> memulihkan cadangan.


Mulai pemulihan sebagai berikut:


 tasksStarterService.startRestoreTask(Task.RunType.USER, backupProperties, storageSettingsName, databaseSettings); 

Proses pemulihan cadangan dalam bentuk diagram:


Hapus cadangan
 public Task startDeleteTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Task task = tasksManager.initNewTask(Task.Type.DELETE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { try { logger.info("Deleting backup started. Backup properties: {}", backupProperties); tasksManager.updateTaskState(taskId, Task.State.DELETING); backupLoadManager.deleteBackup(backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Deleting backup completed. Backup properties: {}", backupProperties); } catch (RuntimeException ex) { logger.error("Error occurred while deleting backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

Proses menghapus cadangan cukup sederhana: cadangan hanya dihapus dari semua penyimpanan yang diunduh.


Jalankan uninstall sebagai berikut:


 tasksStarterService.startDeleteTask(Task.RunType.USER, backupProperties); 

Proses menghapus cadangan dalam bentuk diagram:




Batalkan Tugas


Apakah pembatalan tugas itu? Tentu saja, ini tidak lebih dari penghentian utas. Anda bisa melihat bahwa semua kode utama yang berjalan di Future dibungkus dalam konstruksi try-catch berikut:


 try { ... } catch (InterruptedException ex) { ... tasksManager.setInterrupted(taskId); } 

Dan juga setelah setiap metode penting, alur eksekusi yang kami kontrol, konstruksi berikut dipasang:


 if (Thread.interrupted()) { throw new InterruptedException(); } 

Sebelum melanjutkan, teori singkat tentang gangguan dan status utas JVM harus diberikan.


Utas di JVM dapat memiliki status berikut:


  1. Baru
  2. Runnable
  3. Waktunya menunggu
  4. Menunggu
  5. Dicekal
  6. Dihentikan

Kami hanya tertarik pada status Menunggu dan Jangka waktu. Object.wait() di status Menunggu oleh metode Object.wait() , Thread.join() dan lainnya. Utas ditransfer ke status menunggu Jangka Waktu (mis., Tunggu yang berlangsung selama periode waktu tertentu) menggunakan metode Object.wait(timeout) , Thread.join(timeout) , Thread.sleep(sleeping) dan lainnya.


Yang paling penting di sini adalah jika Anda mengganggu utas sebelum memasuki status Menunggu atau Menunggu waktunya atau saat utas dalam keadaan ini , utas bangun, melempar InterruptedException .


Tapi itu belum semuanya. Sama sekali bukan fakta bahwa utas akan pernah masuk ke data negara dengan membuat, memulihkan, atau menghapus cadangan. Lalu bagaimana cara memberi tahu utas bahwa itu terputus?


Cara pertama adalah dengan memeriksa flag interupsi secara independen dengan menggunakan metode Thread.interrupted Thread.interrupted() atau Thread.currentThread.isInterrupted() . Perbedaan antara keduanya adalah bahwa yang pertama memanggil metode private asli currentThread.isInterrupted(boolean ClearInterrupted) , memberikan true , yang menunjukkan bahwa flag interrupt akan dihapus, dan yang kedua passing false , membiarkan flag interrupt tidak tersentuh. Pilihan antara kedua metode ini sepenuhnya tergantung pada situasi. Ketika InterruptedException dilempar, bendera interrupt juga dihapus - ini perlu diingat.


Tapi pasti ada cara yang lebih mudah - dan memang begitu. Dalam aplikasi, ada sejumlah besar pekerjaan dengan aliran I / O, dan karenanya dengan metode I / O. Tugas kami adalah memastikan bahwa ketika memanggil metode read() atau write(int b) pada aliran I / O, kesalahan terjadi saat interupsi, memberi tahu bahwa panggilan pemblokiran I / O terganggu. Untungnya, Java memiliki pengecualian seperti itu - InterruptedIOException . Namun, tidak semua metode aliran baca / tulis memantau gangguan utas, dan secara khusus hanya PipedInputStream yang memantaunya. Oleh karena itu, di tempat-tempat di mana aliran ini tidak terlibat, kita harus memperluas metode baca / tulis sehingga ketika ada interupsi, InterruptedIOException dilemparkan. Bahkan, ekstensi metode read () sudah cukup bagi saya dalam aplikasi hanya di satu tempat - ketika InputStream kembali dari metode unggah cadangan. Ini adalah bagaimana kita bisa belajar tentang asal usul interupsi tanpa harus menempatkan pemeriksaan templat pada bendera di mana-mana. Namun, penting untuk menangkap pengecualian ini secara terpisah dari IOException dan menanganinya secara terpisah. Tentu saja, Anda tidak dapat melakukannya tanpa bantuan pemeriksaan templat bendera di beberapa tempat, tetapi sudah menjadi lebih baik.


Penting juga untuk dicatat bahwa jika flag dikosongkan selama pemrosesan interupsi, selalu diperlukan untuk mengatur flag interrupt lagi sehingga setelah kembali dari metode kita dapat mengetahui tentang interupsi yang terjadi.


Izinkan saya menjelaskan dengan sebuah contoh mengapa ini penting. Misalkan kita mengunggah cadangan ke penyimpanan dalam metode unggah () dan terjadi interupsi. Interupsi diproses, pekerjaan dihentikan dan metode kembali. Gangguan tidak terjadi dengan santai - itu berarti bahwa kesalahan terjadi di suatu tempat, atau pengguna membatalkan tugas. Apa pun alasannya, kita harus menghentikan semua pekerjaan di Masa Depan ini. Tetapi jika Anda tidak mengatur flag interupsi lagi sebelum kembali dari metode boot, kita tidak akan pernah tahu di blok Future utama tentang interupsi yang terjadi.
Contoh kode yang sama:


 backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); <-   ,       if (Thread.interrupted()) { //      ,      - ,    throw new InterruptedException(); } 

Oleh karena itu, praktik yang baik untuk menangani InterruptedException atau InterruptedIOException sebagai berikut:


 try { ... } catch (InterruptedException e) { //  InterruptedIOException ... // re-interrupt the thread Thread.currentThread().interrupt(); } 

Ya, kita bisa menangani interupsi, tetapi siapa yang benar-benar akan mengganggu utasnya?
Untuk melakukan ini, kami akan membuat entitas lain yang disebut CancelTask , yang akan menyimpan ID tugas untuk pembatalan, dan juga menulis arloji yang akan mencoba mengganggu tugas. Mengapa mencoba Karena:


  1. Tidak dapat mengakhiri utas di memori server lain. Beberapa server dapat bekerja untuk kita, yang berarti Masa Depan tersebar di berbagai server. Jadi, ketika permintaan untuk membatalkan tugas tiba di salah satu server, Masa Depan yang diinginkan mungkin ada di memori server lain.
  2. Tugas tidak dapat dibatalkan karena Masa Depan hilang karena server crash.

Jelaskan secara singkat algoritma pembatalan di malam hari:
Watercher mengeluarkan semua catatan dari tabel cancel_tasks (kunci tidak diatur pada waktu yang sama), menelusuri masing-masing dan mencoba untuk mendapatkan Masa Depan yang sesuai dari ingatannya. Jika Future berhasil diterima, utas yang terkait terputus, tugas dikembalikan dan permintaan dihapus dari tabel. Jika permintaan batas waktu untuk membatalkan tugas terlampaui (yang berarti bahwa server macet dan Masa Depan hilang) - permintaan dihapus dari tabel. Jika beberapa server melihat batas waktu dan menghapus catatan dari tabel, tidak ada hal buruk yang akan terjadi, karena penghapusan di PostgreSQL idempoten.


CancelTasksWatcher Code:


Teks tersembunyi
 /** * This class scans for tasks to cancel and tries to cancel them. */ @Component class CancelTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(CancelTasksWatcher.class); private static final Duration cancelTimeout = Duration.ofMinutes(10); private CancelTasksManager cancelTasksManager; private TasksStarterService tasksStarterService; private TasksManager tasksManager; // spring setters... /** * This watcher wakes up every time 10 seconds passed from the last completion, checks if there are any tasks to cancel and tries to * cancel each task. * <p> * Since there are can be working more that one instance of the program, {@literal Future} instance of task can belong to different * servers. We can't get access to {@literal Future} if it's not in memory of the server where task cancellation request was accepted. * So the purpose of this watcher is to be able cancel tasks that works in the other instance of program. Each server has this watcher * checking for available cancellation requests and if any, the watcher tries to cancel corresponding {@literal Future}. * If cancellation is successful task will be also reverted. * <p> * If task cancellation request timeout exceeded, then it means a server that had requested {@literal Future} instances has been * shutdown, so all {@literal Future} instances lost and task can't be canceled. In such case task cancellation request will be ignored. * * @see TasksStarterService#getFuture(Integer) * @see TasksManager#revertTask(Task) */ @Scheduled(fixedDelay = 10 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchTasksToCancel() { Iterable<CancelTask> cancelTasks = cancelTasksManager.findAll(); Iterable<Task> tasks = tasksManager.findAllById(StreamSupport.stream(cancelTasks.spliterator(), false) .map(CancelTask::getTaskId).collect(Collectors.toList())); Map<Integer, Task> tasksAsMap = StreamSupport.stream(tasks.spliterator(), false) .collect(Collectors.toMap(Task::getId, Function.identity())); List<Integer> taskIdsForDeleting = new ArrayList<>(); for (CancelTask cancelTask : cancelTasks) { Integer taskId = cancelTask.getTaskId(); Task task = tasksAsMap.get(taskId); if (task == null) { logger.error("Can't cancel task: no such entity with ID {}", taskId); taskIdsForDeleting.add(taskId); continue; } // timeout exceeded, that is server shutdown and lost all Future instances, so task can't be canceled if (LocalDateTime.now(ZoneOffset.UTC).isAfter(cancelTask.getPutTime().plus(cancelTimeout))) { logger.error("Can't cancel task: timeout exceed. Task ID: {}", taskId); taskIdsForDeleting.add(taskId); continue; } tasksStarterService.getFuture(taskId).ifPresent(future -> { logger.info("Canceling task with ID {}", taskId); boolean canceled = future.cancel(true); if (canceled) { try { // give time to properly handle interrupt Thread.sleep(10000); } catch (InterruptedException e) { // should not happen } tasksManager.revertTask(task); } taskIdsForDeleting.add(taskId); logger.info("Task canceled: {}. Task ID: {}", canceled, taskId); }); } cancelTasksManager.deleteByTaskIdIn(taskIdsForDeleting); } } 



Menangani kesalahan


, , Future, try-catch :


 try { ... } catch (RuntimeException e) { ... errorTasksManager.addErrorTask(taskId); } 

RuntimeException , Future , .


addErrorTask(taskId) , ID , .
? , , , .


:
, , . — PostgreSQL select for update , select skip locked . , , revertTask() , .


ErrorTasksWatcher :


 /** * This class scans for erroneous tasks and handles them depending on their state. */ @Component class ErrorTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(ErrorTasksWatcher.class); private static final Integer nRows = 10; private TasksManager tasksManager; private ErrorTasksManager errorTasksManager; // spring setters... /** * This watcher wakes up every time 1 minute passed from the last completion, checks backup states periodically and handles erroneous * tasks if any. * <p> * The watcher handles at most N tasks as described by {@link #nRows} constant and skips already locked tasks. * When retrieving error tasks from database pessimistic lock is set. It allows safely run more than one copy of program, as no other * watcher can pick up already being handled error tasks. * <p> * If the server shutdowns while rows was locked, transaction will be rolled back and lock released, so these entities can be picked * up by the other running server. */ @Scheduled(fixedDelay = 60 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchErrorTasks() { for (ErrorTask errorTask : errorTasksManager.findFirstNAndLock(nRows)) { if (!errorTask.isErrorHandled()) { Integer backupTaskId = errorTask.getTaskId(); Optional<Task> optionalTask = tasksManager.findById(backupTaskId); if (!optionalTask.isPresent()) { logger.info("Can't handle erroneous task: no corresponding backup task entity. Backup task ID: {}", backupTaskId); continue; } tasksManager.revertTask(optionalTask.get()); errorTask.setErrorHandled(true); } } } } 

revertTask(Task) :


  /** * This function reverts erroneous task by its entity. * <p> * Use this function only after canceling related {@literal Future}. * <p> * If the task was of the type {@link Task.Type#CREATE_BACKUP} then related {@link BackupProperties} will be deleted. * * @param task the entity */ public void revertTask(@NotNull Task task) { Objects.requireNonNull(task); Task.State state = task.getState(); switch (state) { case DOWNLOADING: case APPLYING_DEPROCESSORS: case RESTORING: case DELETING: { logger.info("Handling broken operation. Operation: {}. No extra actions required", state.toString()); break; } case CREATING: case APPLYING_PROCESSORS: { logger.info("Handling broken operation. Operation: {}. Delete backup properties...", state.toString()); Integer backupPropertiesID = task.getBackupPropertiesId(); if (!backupPropertiesManager.existsById(backupPropertiesID)) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } backupPropertiesManager.deleteById(backupPropertiesID); break; } case UPLOADING: { logger.info("Handling broken operation. Operation: {}. Deleting backup from storage...", state); Integer backupPropertiesId = task.getBackupPropertiesId(); Optional<BackupProperties> optionalBackupProperties = backupPropertiesManager.findById(backupPropertiesId); if (!optionalBackupProperties.isPresent()) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } tasksStarterService.startDeleteTask(Task.RunType.INTERNAL, optionalBackupProperties.get()); backupPropertiesManager.deleteById(backupPropertiesId); break; } default: { logger.error("Can't revert task: unknown state. Task info: {}", task); } } } 

:


  1. DOWNLOADING , APPLYING_DEPROCESSORS , RESTORING , DELETING — . , .
  2. CREATING , APPLYING_PROCESSORS — , . BackupProperties , ( BackupProperties Web UI ).
  3. UPLOADING — . BackupProperties , . .

, . , ? , , Future ( 1), , InputStream ( 2). , 2, 1 2 ?


, , , . Future ( 1) :


  public void onError(@NotNull Throwable t, @NotNull Integer taskId) { logger.error("Exception caught. Task ID: {}", taskId, t); Optional<Future> optionalFuture = tasksStarterService.getFuture(taskId); if (!optionalFuture.isPresent()) { logger.error("Can't cancel the Future of task with ID {}: no such Future instance", taskId); } else { boolean canceled = optionalFuture.get().cancel(true); if (!canceled) { logger.error("Error canceling the Future of task with ID {}", taskId); } else { logger.info("Task canceled. Task ID: {}", taskId); errorTasksManager.setError(taskId); } } } 

, , ID , , Future - , ID .


, , , , , .


, :


, , , . — Future.


, , , I/O , — / . , . :


  1. , . , — .
  2. — Future , . , / , , ( , — IOException , , ).

, — ( ID , , ), .




, , . , , .



  1. Web UI: , . ,

Kesimpulan


:



, ! , GitHub!

Source: https://habr.com/ru/post/id459478/


All Articles