تجربتي في إنشاء تطبيق متعدد الخيوط للعمل مع النسخ الاحتياطية

في الوقت الحالي ، لن تفاجئ أي شخص بتطبيقات متعددة مؤشرات الترابط ، لكنني أعتقد أنه في هذه المقالة يمكنك أن تجد بعض الأفكار المثيرة للاهتمام. بدأت دراستي عن Java بهذا المشروع ، لذلك ربما أكون مخطئًا جدًا في بعض الأماكن أو أقوم بإنشاء دراجة كبيرة ، لكنني آمل أن يهتم أحدهم بتجربة أحد المبتدئين في Java. سأقدم العديد من ميزات التطبيق:


  • وهو يعمل مع النسخ الاحتياطية حصريا في الذاكرة ، بغض النظر عن حجم النسخ الاحتياطي
  • لا يتم تحميل النسخة الاحتياطية بالكامل في الذاكرة
  • النسخ الاحتياطي / استعادة العمليات يمكن إلغاؤها

تحت خفض سيتم النظر في بنية التطبيق ، وكذلك المشاكل الرئيسية التي واجهتها وحلها.


نظرة عامة على التطبيق


يحدث التواصل مع التطبيق من خلال Web UI ، ولكن في المستقبل سيكون من الممكن إضافة REST API إذا لزم الأمر.


يمكن للتطبيق:


  1. إنشاء نسخ احتياطية وتحميلها على واحد أو أكثر من المخازن
  2. استعادة النسخ الاحتياطية عن طريق تحميلها من التخزين
  3. حذف النسخ الاحتياطية من جميع المخازن
  4. إنشاء نسخ احتياطية بشكل دوري

المستودعات المدعومة حاليًا:


  • نظام الملفات المحلي (غير معتمد من Docker)
  • دروببوإكس

قواعد البيانات المدعومة حاليا:


  • كيو

من تطبيق خاص ، يمكنني ملاحظة:


  1. العمل الصحيح في تكوين كتلة
  2. لا يتم تحميل نسخة احتياطية بالكامل في الذاكرة ، بغض النظر عن حجم النسخ الاحتياطي. نظام الملفات لتخزين النسخ الاحتياطي المؤقت غير مشترك أيضًا. كل من إنشاء نسخة احتياطية واستعادة ، وبالتالي تحميل / تفريغ نسخة احتياطية ، تحدث حصرا في الذاكرة.
  3. عبر منصة - يعمل على كل من ويندوز ولينكس.
  4. يمكننا مراقبة جميع المهام الجارية وإلغاءها إذا لزم الأمر.

فيما يلي لقطات شاشة Web UI تصف ميزات التطبيق بوضوح.


إدارة التخزين



إدارة قواعد البيانات



إنشاء النسخ الاحتياطي


استعادة النسخ الاحتياطي


إدارة النسخ الاحتياطية التي تم إنشاؤها


النسخ الاحتياطي الدوري


تتبع المهام الجارية




هندسة معمارية


سيتم العمل الرئيسي في 3 خدمات - DatabaseBackup ، المعالج ، التخزين ، وسنقوم بربطها معًا باستخدام مفهوم المهام . حول كل هذا أبعد من ذلك.


DatabaseBackup


هذه الخدمة مسؤولة عن إنشاء واستعادة النسخ الاحتياطية للنص العادي.


واجهة الخدمة:


public interface DatabaseBackup { InputStream createBackup(DatabaseSettings databaseSettings, Integer id); void restoreBackup(InputStream in, DatabaseSettings databaseSettings, Integer id); } 

تعمل كلا أساليب الواجهة على مثيلات InputStream ، حيث نحتاج إلى التأكد من عدم تحميل النسخة الاحتياطية بالكامل في الذاكرة ، مما يعني أنه يجب قراءة النسخة الاحتياطية / كتابتها في وضع التدفق. يتم إنشاء كيان DatabaseSettings مسبقًا من Web UI ويقوم بتخزين الإعدادات المختلفة اللازمة للوصول إلى قاعدة البيانات. ما هي هذه المعلمة - id - سيتم شرح أكثر قليلا.


متطلبات الخدمة هي كما يلي:


  1. كلا الطريقتين لا يجب قراءة النسخة الاحتياطية بالكامل في الذاكرة.
  2. يجب أن تقوم طريقة restoreBackup() باستعادة النسخة الاحتياطية في معاملة واحدة ، بحيث لا تترك قاعدة البيانات في حالة غير متسقة في حالة حدوث خطأ.

تطبيق PostgreSQL (وصف النص)

على وجه التحديد ، في تطبيق PostgreSQL ، يتم تنفيذ الخدمة على النحو التالي:


  1. createBackup() : يتم إنشاء عملية pg_dump من شأنها إنشاء نسخة احتياطية وكتابتها إلى دفق الإخراج القياسي. يتم إرجاع دفق إخراج العملية القياسي من الطريقة (راجع https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getInputStream-- ). تستند تدفقات الإدخال / الإخراج في النظام إلى مخزن مؤقت بحجم معين ، وعندما تكتب عملية ما إلى دفق الإخراج ، فإنها تكتب فعليًا إلى المخزن المؤقت في الذاكرة. الشيء الأكثر أهمية هنا هو أن مؤشر ترابط العملية لن يكتب إلى المخزن المؤقت المعبأ حتى تتم قراءة الأخير من الجانب الآخر ، مما يعني أن الخيط سيكون في حالة مقفلة ولن يتم تحميل النسخة الاحتياطية بالكامل في الذاكرة. ربما تكون قد واجهت موقفًا حيث وصل برنامج Java إلى طريق مسدود أثناء العمل مع العمليات نظرًا لحقيقة أنك لم تقرأ الحالة أو العملية. من المهم للغاية مراقبة ذلك ، لأن العملية لا يمكن أن تستمر إذا تم حظرها على مكالمة حظر الإدخال / الإخراج عند الكتابة إلى مخزن مؤقت كامل ولا يقرأ أحد هذا المخزن المؤقت.
  2. restoreBackup() : يتم إنشاء عملية psql ، تتم قراءة النسخة الاحتياطية من restoreBackup() تم تمريرها إلى الطريقة وتتم كتابتها في نفس الوقت إلى دفق الإدخال القياسي psql (راجع https://docs.oracle.com/javase/8/docs/api/java/lang/Process. أتش تي أم أل # getOutputStream-- ). يعمل هذا لأن النسخ الاحتياطي PostgreSQL بنص عادي هو مجرد مجموعة من أوامر DDL و DML التي يسهل فهمها psql.

يوجد الكثير من التعليمات البرمجية ، لذلك لن أعطيها هنا ، ولكن يمكنك مشاهدتها على GitHub باستخدام الرابط في نهاية المقالة.


المعالج


هذه الخدمة مسؤولة عن استخدام المعالجات وإعادة معالجة النسخ الاحتياطي. تستخدم المعالجات قبل التنزيل للتخزين أو بعد التفريغ من التخزين. مثال المعالج: ضاغط ، التشفير.


واجهة الخدمة:


 public interface Processor { InputStream process(InputStream in); InputStream deprocess(InputStream in); ProcessorType getType(); // ProcessorType -  Enum,     int getPrecedence(); //   } 

كل معالج له الأولوية - إذا تم تحديد عدة معالجات ، فسيتم تطبيقها بترتيب تنازلي للأولوية. عند تطبيق الوظيفة العكسية بنفس الترتيب الذي تم به تطبيق المعالجات ، نحصل على النسخة الاحتياطية الأصلية.


تخزين


هذه الخدمة مسؤولة عن تحميل وتفريغ نسخة احتياطية ، وكذلك إزالتها من التخزين. تخزين مثال: Dropbox ، نظام الملفات المحلي.


واجهة الخدمة:


 public interface Storage { void uploadBackup(InputStream in, StorageSettings storageSettings, String backupName, Integer id); InputStream downloadBackup(StorageSettings storageSettings, String backupName, Integer id); void deleteBackup(StorageSettings storageSettings, String backupName, Integer id); } 

يتم تعيين اسم فريد لكل نسخة احتياطية تم إنشاؤها - حتى نتمكن من العثور عليها على أي من المخازن التي تم تنزيلها إليها. إن الطريقة التي يتم بها عرض النسخة الاحتياطية على وحدة التخزين هي مسألة تنفيذ الخدمة بشكل حصري ، ولكن عند نقل اسم النسخة الاحتياطية إلى إحدى الوظائف ، يجب أن نتوقع السلوك الصحيح. يتم إنشاء كيان StorageSettings مسبقًا من Web UI ويقوم بتخزين الإعدادات اللازمة للوصول إلى وحدة التخزين.




مفهوم المهمة


نود أن نكون قادرين على تتبع حالة مهامنا ، والتعامل مع الأخطاء المحتملة حسب تقدم المهمة ، وكذلك إلغاء المهام. لذلك ، سوف نستمر في العمل فقط مع المهام. سيتم تمثيل كل مهمة في قاعدة البيانات بسجل في الجدول ، وبرمجياً بواسطة مثيل Future (راجع Java Future ). يرتبط كل سجل في الجدول بمستقبله الخاص (علاوة على ذلك ، إذا كانت هناك عدة خوادم قيد التشغيل ، فيمكن أن تكون مثيلات المستقبل في ذاكرة الخوادم المختلفة).


دعنا نذهب بالتتابع. بادئ ذي بدء ، نحن بحاجة إلى خدمة لبدء المهام - إنشاء النسخ الاحتياطية واستعادتها وحذفها.


إطلاق المهمة


إنشاء نسخة احتياطية:


 public Task startBackupTask(@NotNull Task.RunType runType, @NotNull List<String> storageSettingsNameList, @Nullable List<ProcessorType> processors, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(storageSettingsNameList); Objects.requireNonNull(processors); Objects.requireNonNull(databaseSettings); BackupProperties backupProperties = backupPropertiesManager.initNewBackupProperties(storageSettingsNameList, processors, databaseSettings.getName()); Task task = tasksManager.initNewTask(Task.Type.CREATE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.CREATING); logger.info("Creating backup..."); try (InputStream backupStream = databaseBackupManager.createBackup(databaseSettings, taskId)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_PROCESSORS); logger.info("Applying processors on created backup. Processors: {}", processors); try (InputStream processedBackupStream = backupProcessorManager.process(backupStream, processors)) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.UPLOADING); logger.info("Uploading backup..."); backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Creating backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of created backup", ex); } catch (RuntimeException ex) { logger.error("Error occurred while creating backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Backup creating task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

يمر إنشاء نسخة احتياطية بثلاث خطوات رئيسية بالترتيب التالي: إنشاء نسخة احتياطية -> تطبيق المعالجات -> التحميل إلى التخزين. في جميع أساليب الخدمة تقريبًا ، نعيد توجيه معرّف المهمة الحالية حتى تتمكن الخدمة من الإبلاغ عن خطأ من سلسلة رسائل تعمل في الخلفية. حول معالجة الأخطاء ، لماذا يتم مناقشة InterruptedException هنا وما سيحدث مع وجود خطأ بعد تلقي RuntimeException لاحقًا.


وإليك كيفية تشغيل مهمة إنشاء نسخة احتياطية:


 tasksStarterService.startBackupTask(Task.RunType.USER, storageSettingsNameList, processors, databaseSettings); 

المعلمة الأولى التي نمررها إلى بادئ المهمة: المستخدم أو مهمة الخادم الداخلية (مثال على مهمة داخلية هي نسخة احتياطية دورية). تتيح لنا معرفة بادئ المهمة إظهار تلك المهام التي تم تشغيلها بواسطة المستخدم في Web UI فقط. المعلمات المتبقية ضرورية لإنشاء نسخة احتياطية مباشرة - قائمة من المخازن والمعالجات لاستخدامها ، وقاعدة بيانات تحتاج إلى إنشاء ملف تفريغ.


عند إنشاء نسخة احتياطية ، يتم أيضًا إنشاء سجل في قاعدة البيانات تسمى BackupProperties . سيقوم هذا الكيان بتخزين خصائص النسخ الاحتياطي مثل الاسم والمعالجات المستخدمة وقائمة المستودعات التي تم تنزيل النسخة الاحتياطية عليها. علاوة على ذلك ، لاستعادة أو حذف النسخة الاحتياطية ، سنعمل مع هذا الكيان المحدد.


يتم تخزين المهمة في قاعدة البيانات في النموذج التالي:


 @Entity @Table(name = "backup_tasks") public class Task { /** * Identifier of each backup task. Identifier is generated by PostgreSQL database after saving of entity. */ @Id @Column(insertable = false, updatable = false) @GeneratedValue(strategy = GenerationType.IDENTITY) private Integer id; /** * Backup task type. * <p> * Type is set at the very start of any task and can't be changed. * * @see Type */ @Enumerated(EnumType.STRING) @Column(updatable = false) private Type type; /** * Who initiated a task: user or server. * <p> * We need to know it to show on front only these tasks that was started by user. * * @see RunType */ @Enumerated(EnumType.STRING) @Column(updatable = false) private RunType runType; /** * Backup task state. * <p> * State is updated with every new step in task being executed. * * @see Task.State */ @Enumerated(EnumType.STRING) private State state; /** * Whether task has been interrupted or not. * <p> * Default is {@literal false}. */ @Column(insertable = false) private boolean interrupted; /** * Identifier of {@link BackupProperties}. * <p> * We need to know backup ID to be able to handle occurred errors. */ @Column(updatable = false) private Integer backupPropertiesId; /** * Start time of the task. */ @Column(updatable = false) private LocalDateTime date; public enum RunType { USER, INTERNAL } public enum State { PLANNED, CREATING, RESTORING, DELETING, APPLYING_PROCESSORS, APPLYING_DEPROCESSORS, DOWNLOADING, UPLOADING, COMPLETED, } public enum Type { CREATE_BACKUP { @Override public String toString() { return "CREATE BACKUP"; } }, RESTORE_BACKUP { @Override public String toString() { return "RESTORE BACKUP"; } }, DELETE_BACKUP { @Override public String toString() { return "DELETE BACKUP"; } } } // getters & setters... } 

وبالتالي ، يمكنك وصف عملية إنشاء نسخة احتياطية في شكل رسم تخطيطي على النحو التالي:
عملية النسخ الاحتياطي




يتم إطلاق أنواع أخرى من المهام عن طريق القياس. لكي لا تشوش المقالة بكمية هائلة من التعليمات البرمجية ، لأني فضولي ، سأقدم لك رمزًا لبدء المهام لاستعادة النسخة الاحتياطية وحذفها بشكل منفصل في المفسد.


استعادة النسخ الاحتياطي
 public Task startRestoreTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties, @NotNull String storageSettingsName, @NotNull DatabaseSettings databaseSettings) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Objects.requireNonNull(storageSettingsName); Objects.requireNonNull(databaseSettings); Task task = tasksManager.initNewTask(Task.Type.RESTORE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { tasksManager.updateTaskState(taskId, Task.State.DOWNLOADING); logger.info("Downloading backup..."); try (InputStream downloadedBackup = backupLoadManager.downloadBackup(backupProperties.getBackupName(), storageSettingsName, taskId)) { if (Thread.interrupted() || downloadedBackup == null) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.APPLYING_DEPROCESSORS); logger.info("Deprocessing backup..."); try (InputStream deprocessedBackup = backupProcessorManager.deprocess(downloadedBackup, backupProperties.getProcessors())) { if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.RESTORING); logger.info("Restoring backup..."); databaseBackupManager.restoreBackup(deprocessedBackup, databaseSettings, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Restoring backup completed. Backup properties: {}", backupProperties); } } catch (IOException ex) { logger.error("Error occurred while closing input stream of downloaded backup", ex); } catch (RuntimeException ex) { logger.info("Error occurred while restoring backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

تتم استعادة النسخة الاحتياطية من خلال 3 خطوات رئيسية بالترتيب التالي: إلغاء تحميل نسخة احتياطية من التخزين -> استخدام وحدات deprocessors للحصول على النسخة الاحتياطية الأصلية للنص العادي -> استعادة نسخة احتياطية.


بدء الانتعاش على النحو التالي:


 tasksStarterService.startRestoreTask(Task.RunType.USER, backupProperties, storageSettingsName, databaseSettings); 

عملية استعادة نسخة احتياطية في شكل رسم بياني:


حذف النسخ الاحتياطي
 public Task startDeleteTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties) { Objects.requireNonNull(runType); Objects.requireNonNull(backupProperties); Task task = tasksManager.initNewTask(Task.Type.DELETE_BACKUP, runType, backupProperties.getId()); Integer taskId = task.getId(); Future future = tasksStarterExecutorService.submit(() -> { try { logger.info("Deleting backup started. Backup properties: {}", backupProperties); tasksManager.updateTaskState(taskId, Task.State.DELETING); backupLoadManager.deleteBackup(backupProperties, taskId); if (Thread.interrupted()) { throw new InterruptedException(); } tasksManager.updateTaskState(taskId, Task.State.COMPLETED); logger.info("Deleting backup completed. Backup properties: {}", backupProperties); } catch (RuntimeException ex) { logger.error("Error occurred while deleting backup. Backup properties: {}", backupProperties, ex); errorTasksManager.addErrorTask(taskId); } catch (InterruptedException ex) { tasksManager.setInterrupted(taskId); logger.error("Task was interrupted. Task ID: {}", taskId); } finally { futures.remove(taskId); } }); futures.put(taskId, future); return task; } 

عملية حذف نسخة احتياطية بسيطة للغاية: يتم ببساطة حذف نسخة احتياطية من جميع المخازن التي تم تنزيلها إليها.


قم بإلغاء التثبيت كما يلي:


 tasksStarterService.startDeleteTask(Task.RunType.USER, backupProperties); 

عملية حذف نسخة احتياطية في شكل رسم بياني:




إلغاء المهمة


ما هو إلغاء المهمة؟ بالطبع ، هذا ليس أكثر من إنهاء مؤشر ترابط. يمكنك أن ترى أن جميع التعليمات البرمجية الرئيسية التي يتم تشغيلها في Future يتم تغليفها في بنية try-catch التالية:


 try { ... } catch (InterruptedException ex) { ... tasksManager.setInterrupted(taskId); } 

وأيضًا بعد كل طريقة مهمة يتم التحكم في تنفيذها ، يتم تثبيت الإنشاء التالي:


 if (Thread.interrupted()) { throw new InterruptedException(); } 

قبل الانتقال ، يجب إعطاء نظرية مختصرة عن حالات انقطاع خيوط JVM.


يمكن أن تحتوي مؤشرات الترابط في JVM على الحالات التالية:


  1. جديد
  2. Runnable
  3. توقيت الانتظار
  4. الانتظار
  5. مسدود
  6. إنهاء

نحن مهتمون فقط في حالات الانتظار وتوقيت الانتظار. Object.wait() في حالة انتظار بواسطة الأساليب Object.wait() و Thread.join() وغيرها. يتم وضع مؤشر الترابط في حالة انتظار Timed (أي فترة انتظار تدوم فترة زمنية معينة) باستخدام الأساليب Object.wait(timeout) و Thread.join(timeout) و Thread.sleep(sleeping) وغيرها.


الشيء الأكثر أهمية هنا هو أنه إذا قاطعت سلسلة الرسائل قبل الدخول في حالة الانتظار أو الانتظار ، أو عندما يكون الموضوع في هذه الحالة ، يستيقظ مؤشر الترابط ، ويرمي إجابة InterruptedException .


لكن هذا ليس كل شيء. ليست حقيقة أن مؤشر ترابط سينتقل إلى بيانات الحالة من خلال إنشاء نسخة احتياطية أو استعادتها أو حذفها. كيف بعد ذلك لإبلاغ الخيط الذي انقطع؟


تتمثل الطريقة الأولى في التحقق من علامة المقاطعة مع مؤشر الترابط بشكل مستقل باستخدام الطرق Thread.interrupted Thread.interrupted() أو Thread.currentThread.isInterrupted() . الفرق بين الاثنين هو أن الأول يستدعي الطريقة الأصلية الخاصة currentThread.isInterrupted(boolean ClearInterrupted) ، ويمرر إلى ذلك ، مما يشير إلى أنه سيتم مسح إشارة المقاطعة ، والثاني تمرير false ، وترك علامة المقاطعة دون تغيير. يعتمد الاختيار بين هاتين الطريقتين تمامًا على الموقف. عندما يتم طرح InterruptedException ، يتم مسح علامة المقاطعة أيضًا - وهذا يستحق التذكر.


ولكن يجب أن يكون هناك طريقة أسهل - وهو كذلك. في التطبيق ، هناك قدر كبير من العمل مع تدفقات I / O ، وبالتالي مع أساليب I / O. تتمثل مهمتنا في التأكد من أنه عند استدعاء أساليب read() أو write(int b) على دفق الإدخال / الإخراج ، يتم إلقاء خطأ أثناء المقاطعة ، مع العلم بأنه تمت مقاطعة استدعاء الإدخال / الإخراج المحظور. لحسن الحظ ، لدى Java مثل هذا الاستثناء - InterruptedIOException . ومع ذلك ، لا تراقب كل أساليب دفق القراءة / الكتابة مقاطعة مؤشر الترابط ، وعلى وجه التحديد فقط PipedInputStream تراقبها . لذلك ، في تلك الأماكن التي لا يشارك فيها هذا الدفق ، يجب أن نوسّع طريقة القراءة / الكتابة بحيث يتم إبطال مفعول المقاطعة عندما يكون هناك مقاطعة. في الواقع ، كان امتداد طريقة read () كافياً بالنسبة لي في التطبيق في مكان واحد فقط - عندما عاد InputStream من طريقة التحميل الاحتياطي. هذه هي الطريقة التي يمكننا بها التعرف على أصل المقاطعة دون الاضطرار إلى وضع اختبارات القالب على العلم في كل مكان. ومع ذلك ، من المهم التقاط هذا الاستثناء بشكل منفصل عن IOException ومعالجته بشكل منفصل. بالطبع ، لا يمكنك الاستغناء عن مساعدة قالب التحقق من العلم في بعض الأماكن ، ولكن أصبح بالفعل أفضل.


من المهم أيضًا ملاحظة أنه إذا تم مسح العلم أثناء معالجة المقاطعة ، فمن الضروري دائمًا تعيين إشارة المقاطعة مرة أخرى حتى أنه بعد الرجوع من الطريقة ، يمكننا معرفة المقاطعة التي حدثت.


اسمحوا لي أن أشرح مع مثال لماذا هذا مهم. لنفترض أننا قمنا بتحميل نسخة احتياطية إلى السعة التخزينية في طريقة الرفع () وتحدث مقاطعة. تتم معالجة المقاطعة وتوقف العمل وإرجاع الطريقة. لا يحدث الانقطاع مع الإصابات - فهذا يعني أن أي خطأ قد حدث في مكان ما ، أو أن المستخدم ألغى المهمة. بغض النظر عن السبب ، يجب أن نوقف كل العمل في هذا المستقبل. ولكن إذا لم تقم بتعيين إشارة المقاطعة مرة أخرى قبل العودة من طريقة التمهيد ، فلن نعرف أبدًا في كتلة Future الرئيسية عن المقاطعة التي حدثت.
نفس رمز المثال:


 backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); <-   ,       if (Thread.interrupted()) { //      ,      - ,    throw new InterruptedException(); } 

لذلك ، من الممارسات الجيدة التعامل مع InterruptedException أو InterruptedIOException كما يلي:


 try { ... } catch (InterruptedException e) { //  InterruptedIOException ... // re-interrupt the thread Thread.currentThread().interrupt(); } 

حسنًا ، يمكننا التعامل مع المقاطعة ، لكن من الذي سيقاطع الخيوط بالفعل؟
للقيام بذلك ، سنقوم بإنشاء كيان آخر يسمى CancelTask ، والذي سيقوم بتخزين معرف المهمة للإلغاء ، وأيضًا كتابة ساعة ستحاول مقاطعة المهام. لماذا تحاول؟ للأسباب التالية:


  1. غير قادر على إنهاء مؤشر الترابط في ذاكرة خادم آخر. يمكن أن تعمل عدة خوادم لنا ، مما يعني أن Future مبعثرة على خوادم مختلفة. وبالتالي ، عندما يصل طلب لإلغاء مهمة ما على أحد الخوادم ، قد يكون المستقبل المرغوب في ذاكرة خادم آخر.
  2. لا يمكن إلغاء المهمة منذ فقد المستقبل بسبب عطل في الخادم.

صف بإيجاز خوارزمية الإلغاء في المساء:
يقوم واتشر بإخراج جميع السجلات من جدول Cancel_tasks (لم يتم ضبط القفل في نفس الوقت) ، يمر كل واحد ويحاول الحصول على المستقبل المقابل من ذاكرته. في حالة تلقي Future بنجاح ، تتم مقاطعة سلسلة الرسائل المقابلة ، وتعود المهمة ويتم حذف الطلب من الجدول. إذا تم تجاوز طلب المهلة لإلغاء المهمة (مما يعني أن الخادم تعطل وفقد المستقبل) - يتم حذف الطلب ببساطة من الجدول. إذا لاحظت عدة خوادم انتهاء مهلة وحذف السجل من الجدول ، فلن يحدث أي شيء سيئ ، لأن الحذف في PostgreSQL هو أمر ضعيف.


رمز إلغاء المهام:


النص المخفي
 /** * This class scans for tasks to cancel and tries to cancel them. */ @Component class CancelTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(CancelTasksWatcher.class); private static final Duration cancelTimeout = Duration.ofMinutes(10); private CancelTasksManager cancelTasksManager; private TasksStarterService tasksStarterService; private TasksManager tasksManager; // spring setters... /** * This watcher wakes up every time 10 seconds passed from the last completion, checks if there are any tasks to cancel and tries to * cancel each task. * <p> * Since there are can be working more that one instance of the program, {@literal Future} instance of task can belong to different * servers. We can't get access to {@literal Future} if it's not in memory of the server where task cancellation request was accepted. * So the purpose of this watcher is to be able cancel tasks that works in the other instance of program. Each server has this watcher * checking for available cancellation requests and if any, the watcher tries to cancel corresponding {@literal Future}. * If cancellation is successful task will be also reverted. * <p> * If task cancellation request timeout exceeded, then it means a server that had requested {@literal Future} instances has been * shutdown, so all {@literal Future} instances lost and task can't be canceled. In such case task cancellation request will be ignored. * * @see TasksStarterService#getFuture(Integer) * @see TasksManager#revertTask(Task) */ @Scheduled(fixedDelay = 10 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchTasksToCancel() { Iterable<CancelTask> cancelTasks = cancelTasksManager.findAll(); Iterable<Task> tasks = tasksManager.findAllById(StreamSupport.stream(cancelTasks.spliterator(), false) .map(CancelTask::getTaskId).collect(Collectors.toList())); Map<Integer, Task> tasksAsMap = StreamSupport.stream(tasks.spliterator(), false) .collect(Collectors.toMap(Task::getId, Function.identity())); List<Integer> taskIdsForDeleting = new ArrayList<>(); for (CancelTask cancelTask : cancelTasks) { Integer taskId = cancelTask.getTaskId(); Task task = tasksAsMap.get(taskId); if (task == null) { logger.error("Can't cancel task: no such entity with ID {}", taskId); taskIdsForDeleting.add(taskId); continue; } // timeout exceeded, that is server shutdown and lost all Future instances, so task can't be canceled if (LocalDateTime.now(ZoneOffset.UTC).isAfter(cancelTask.getPutTime().plus(cancelTimeout))) { logger.error("Can't cancel task: timeout exceed. Task ID: {}", taskId); taskIdsForDeleting.add(taskId); continue; } tasksStarterService.getFuture(taskId).ifPresent(future -> { logger.info("Canceling task with ID {}", taskId); boolean canceled = future.cancel(true); if (canceled) { try { // give time to properly handle interrupt Thread.sleep(10000); } catch (InterruptedException e) { // should not happen } tasksManager.revertTask(task); } taskIdsForDeleting.add(taskId); logger.info("Task canceled: {}. Task ID: {}", canceled, taskId); }); } cancelTasksManager.deleteByTaskIdIn(taskIdsForDeleting); } } 



خطأ في التعامل


, , Future, try-catch :


 try { ... } catch (RuntimeException e) { ... errorTasksManager.addErrorTask(taskId); } 

RuntimeException , Future , .


addErrorTask(taskId) , ID , .
? , , , .


:
, , . — PostgreSQL select for update , select skip locked . , , revertTask() , .


ErrorTasksWatcher :


 /** * This class scans for erroneous tasks and handles them depending on their state. */ @Component class ErrorTasksWatcher { private static final Logger logger = LoggerFactory.getLogger(ErrorTasksWatcher.class); private static final Integer nRows = 10; private TasksManager tasksManager; private ErrorTasksManager errorTasksManager; // spring setters... /** * This watcher wakes up every time 1 minute passed from the last completion, checks backup states periodically and handles erroneous * tasks if any. * <p> * The watcher handles at most N tasks as described by {@link #nRows} constant and skips already locked tasks. * When retrieving error tasks from database pessimistic lock is set. It allows safely run more than one copy of program, as no other * watcher can pick up already being handled error tasks. * <p> * If the server shutdowns while rows was locked, transaction will be rolled back and lock released, so these entities can be picked * up by the other running server. */ @Scheduled(fixedDelay = 60 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW) public void watchErrorTasks() { for (ErrorTask errorTask : errorTasksManager.findFirstNAndLock(nRows)) { if (!errorTask.isErrorHandled()) { Integer backupTaskId = errorTask.getTaskId(); Optional<Task> optionalTask = tasksManager.findById(backupTaskId); if (!optionalTask.isPresent()) { logger.info("Can't handle erroneous task: no corresponding backup task entity. Backup task ID: {}", backupTaskId); continue; } tasksManager.revertTask(optionalTask.get()); errorTask.setErrorHandled(true); } } } } 

revertTask(Task) :


  /** * This function reverts erroneous task by its entity. * <p> * Use this function only after canceling related {@literal Future}. * <p> * If the task was of the type {@link Task.Type#CREATE_BACKUP} then related {@link BackupProperties} will be deleted. * * @param task the entity */ public void revertTask(@NotNull Task task) { Objects.requireNonNull(task); Task.State state = task.getState(); switch (state) { case DOWNLOADING: case APPLYING_DEPROCESSORS: case RESTORING: case DELETING: { logger.info("Handling broken operation. Operation: {}. No extra actions required", state.toString()); break; } case CREATING: case APPLYING_PROCESSORS: { logger.info("Handling broken operation. Operation: {}. Delete backup properties...", state.toString()); Integer backupPropertiesID = task.getBackupPropertiesId(); if (!backupPropertiesManager.existsById(backupPropertiesID)) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } backupPropertiesManager.deleteById(backupPropertiesID); break; } case UPLOADING: { logger.info("Handling broken operation. Operation: {}. Deleting backup from storage...", state); Integer backupPropertiesId = task.getBackupPropertiesId(); Optional<BackupProperties> optionalBackupProperties = backupPropertiesManager.findById(backupPropertiesId); if (!optionalBackupProperties.isPresent()) { logger.error("Can't revert task: no related backup properties. Task info: {}", task); return; } tasksStarterService.startDeleteTask(Task.RunType.INTERNAL, optionalBackupProperties.get()); backupPropertiesManager.deleteById(backupPropertiesId); break; } default: { logger.error("Can't revert task: unknown state. Task info: {}", task); } } } 

:


  1. DOWNLOADING , APPLYING_DEPROCESSORS , RESTORING , DELETING — . , .
  2. CREATING , APPLYING_PROCESSORS — , . BackupProperties , ( BackupProperties Web UI ).
  3. UPLOADING — . BackupProperties , . .

, . , ? , , Future ( 1), , InputStream ( 2). , 2, 1 2 ?


, , , . Future ( 1) :


  public void onError(@NotNull Throwable t, @NotNull Integer taskId) { logger.error("Exception caught. Task ID: {}", taskId, t); Optional<Future> optionalFuture = tasksStarterService.getFuture(taskId); if (!optionalFuture.isPresent()) { logger.error("Can't cancel the Future of task with ID {}: no such Future instance", taskId); } else { boolean canceled = optionalFuture.get().cancel(true); if (!canceled) { logger.error("Error canceling the Future of task with ID {}", taskId); } else { logger.info("Task canceled. Task ID: {}", taskId); errorTasksManager.setError(taskId); } } } 

, , ID , , Future - , ID .


, , , , , .


, :


, , , . — Future.


, , , I/O , — / . , . :


  1. , . , — .
  2. — Future , . , / , , ( , — IOException , , ).

, — ( ID , , ), .




, , . , , .



  1. Web UI: , . ,

استنتاج


:



, ! , GitHub!

Source: https://habr.com/ru/post/ar459478/


All Articles