قائمة انتظار مهمة PostgreSQL

قائمة انتظار الفيل - pixabay.com


يتم استخدام قوائم الانتظار لتنظيم معالجة تدفق المهمة. هناك حاجة لتراكم وتوزيع المهام بين فناني الأداء. يمكن أن توفر قوائم الانتظار أيضًا متطلبات إضافية لمعالجة مهام التشغيل: ضمان التسليم ، وضمان لمرة واحدة ، وتحديد الأولويات ، إلخ.


كقاعدة عامة ، يتم استخدام أنظمة قائمة انتظار الرسائل الجاهزة (MQ - قائمة انتظار الرسائل) ، ولكن في بعض الأحيان تحتاج إلى تنظيم قائمة انتظار مخصصة أو قائمة متخصصة معينة (على سبيل المثال ، قائمة انتظار أولوية وإعادة تشغيل متأخرة للمهام التي لم تتم معالجتها بسبب استثناءات). ستتم مناقشة إنشاء قوائم الانتظار هذه أدناه.


حدود التطبيق


تم تصميم الحلول المقترحة للتعامل مع تدفق المهام المماثلة. أنها ليست مناسبة لتنظيم الحانة / الفرعية أو المراسلة بين الأنظمة والمكونات المزدوجة.


تعمل قائمة الانتظار الموجودة أعلى قاعدة بيانات علائقية بشكل جيد للأحمال الصغيرة والمتوسطة (مئات الآلاف من المهام في اليوم ، عشرات إلى مئات المؤدين) ، ولكن من الأفضل استخدام حل متخصص للخيوط الكبيرة.


جوهر الأسلوب في خمس كلمات


select ... for update skip locked 

خط الأساس


للبساطة ، سيتم فيما يلي تخزين معرفات المهام الفريدة فقط في الجدول. لا ينبغي أن يكون إضافة نوع من الحمولة الصعبة أمرًا صعبًا.


يحتوي جدول قائمة انتظار أبسط على المهمة نفسها وحالتها:


 create table task ( id bigint not null primary key, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__idx on task (status); 

إضافة مهمة:


 insert into task (id) values ($1) on conflict (id) do nothing; 

الحصول على المهمة التالية:


 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

إنجاز المهمة:


 update task set status = 2 where id = $1; 

قائمة انتظار الأولوية


في الحالة البسيطة ، يكون id المهمة هو أولويته. يتم تغيير طلب المهمة التالية فقط - تتم إضافة order by id شرط الفرز order by id بالترتيب المطلوب لمهام المعالجة. تحتاج أيضًا إلى إنشاء فهرس مركب بواسطة (status, id) .


أو ، للأولوية ، يتم إضافة عمود منفصل:


 create table task ( id bigint not null primary key, priority integer not null, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__priority__idx on task (status, priority); 

إضافة مهمة:
 insert into task (id, priority) values ($1, $2) on conflict (id) do nothing; 

الحصول على المهمة التالية:
 with next_task as ( select id from task where status = 0 order by priority limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

يتيح لك العمود المميز تغيير أولوية المهمة أثناء التنقل.


قائمة انتظار مع تكرار المهام "الساقطة"


قد يحدث خطأ أو استثناء أثناء تنفيذ المهمة. في مثل هذه الحالات ، يجب أن تكون المهمة في قائمة الانتظار مرة أخرى. في بعض الأحيان ، لا يزال من الضروري تأجيل وقت التنفيذ المتكرر لبعض الوقت ، على سبيل المثال ، إذا كان الاستثناء يرجع إلى عدم توفر خدمة جهة خارجية بشكل مؤقت.


 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, delayed_to timestamp null, error_text text null ); create index task__status__delayed_to__idx on task (status, delayed_to); 

كما ترى ، تم توسيع قائمة الحالات وتمت إضافة أعمدة جديدة:


  • attempt - عدد من المحاولات ؛ مطلوب لاتخاذ قرار بشأن الحاجة إلى إعادة المحاولة (تحديد عدد المحاولات) وتحديد تأخير قبل إعادة المحاولة (على سبيل المثال ، يتم تأخير كل محاولة لاحقة بمقدار 10 * attempt دقائق 10 * attempt ) ؛
  • delayed_to - وقت المحاولة التالية لإكمال المهمة ؛
  • error_text - نص الخطأ.

نص الخطأ مطلوب للتجميع حسب نوع الخطأ.


مثال تقارير نظام المراقبة التي تراكمت الآلاف من المهام مع "خطأ" الحالة في قائمة الانتظار. نحن تلبية الطلب:


 select error_text, count(*) from task where status = 3 group by 1 order by 2 desc; 

للحصول على التفاصيل ، انتقل إلى سجلات فناني الأداء. قم بتصحيح الموقف الذي تسبب في الخطأ (إن أمكن). إذا لزم الأمر ، نقوم بتسريع إعادة تشغيل المهام عن طريق تعيين الحالة إلى 0 أو عن طريق تحويل وقت المحاولة التالية.


الحصول على المهمة الجديدة التالية:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

الحصول على المهمة التالية معلقة بسبب خطأ:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

الانتهاء بنجاح من المهمة:
 update task set status = 2, delayed_to = null, error_text = null where id = $1; 

فشلت المهمة ، سيكون هناك تكرار في (5 * عدد المحاولات) بالدقائق:
 update task set status = 3, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2 where id = $1; 

المهمة المكتملة بخطأ فادح ، لن تتم إعادة المحاولة:
 update task set status = 4, delayed_to = null, error_text = $2 where id = $1; 

ينقسم طلب المهمة التالية إلى قسمين بحيث يمكن لـ DBMS إنشاء خطة استعلام فعالة لقائمة الانتظار ذات الأولوية. شرط التحديد مع or يمكن أن يحدث خطأ كبير مع order by الفرز order by .


جمع القياسات


أضف السمات التالية:


  • وقت إنشاء المهمة
  • وقت تغيير المهمة
  • بداية ونهاية وقت المهمة.

 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); create index task__status__delayed_to__idx on task (status, delayed_to); create index task__updated__idx on task (updated); 

نحن نعتبر الأعمدة المضافة في جميع الاستعلامات.


الحصول على المهمة الجديدة التالية:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

الحصول على المهمة التالية معلقة بسبب خطأ:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

الانتهاء بنجاح من المهمة:
 update task set status = 2, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

فشلت المهمة ، سيكون هناك تكرار في (5 * عدد المحاولات) بالدقائق:
 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2, updated = localtimestamp where id = $1; 

المهمة المكتملة بخطأ فادح ، لن تتم إعادة المحاولة:
 update task set status = 4, end_time = localtimestamp, delayed_to = null, error_text = $2, updated = localtimestamp where id = $1; 

أمثلة على سبب الحاجة لذلك


بحث وإعادة تشغيل المهام المتدلية:


 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp, error_text = 'hanged', updated = localtimestamp where status = 1 and updated < localtimestamp - interval '1 hour'; 

إزالة المهام القديمة:


 delete from task where updated < localtimestamp - interval '30 days'; 

إحصائيات لإنجاز المهام:


 select date_trunc('hour', end_time), count(*), sum(end_time - begin_time), avg(end_time - begin_time) from task where status = 2 and end_time >= '2019-12-16' group by 1 order by 1; 

أعد تشغيل المهام المكتملة مسبقًا


على سبيل المثال ، يتم تحديث المستند ، تحتاج إلى إعادة اختباره للبحث عن النص الكامل.


 create table task ( id bigint not null primary key, task_updated_at timestamp not null default localtimstamp, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); 

هنا ، تتم إضافة عمود task_updated_at لوقت تحديث المهمة ، ولكن يمكن استخدام الحقل الذي created .


إضافة أو تحديث (إعادة تشغيل) مهمة:


 insert into task (id, task_updated_at) values ($1, $2) on conflict (id) do update set task_updated_at = excluded.task_updated_at, status = case when status = 1 then 1 else 0 end, delayed_to = null, error_text = null, updated = localtimestamp where task_updated_at < excluded.task_updated_at; 

ما يجري هنا. تصبح المهمة "جديدة" إذا لم يتم إكمالها الآن.


سوف يتحقق طلب إكمال المهمة أيضًا مما إذا كان قد تم تغييره أثناء التنفيذ.


طلبات المهمة التالية هي نفسها في قائمة انتظار جمع المقاييس.


الانتهاء بنجاح من المهمة:


 update task set status = case when begin_time >= updated then 2 else 0 end, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

الانتهاء من المهمة مع وجود خطأ: اعتمادا على المهمة. يمكنك إجراء تأخير غير مشروط في إعادة التشغيل ، يمكنك تعيين الحالة إلى "جديد" عند التحديث.


خط أنابيب


المهمة تمر بعدة مراحل. يمكنك عمل قائمة انتظار منفصلة لكل مرحلة. أو يمكنك إضافة العمود المقابل إلى الجدول.


مثال يستند إلى قائمة الانتظار الأساسية حتى لا تشوش التعليمات البرمجية. يمكن تطبيق جميع التعديلات الموضحة مسبقًا على قائمة الانتظار هذه دون أي مشاكل.


 create table task ( id bigint not null primary key, stage integer not null default 0, status integer not null default 0 ); create index task__stage__status__idx on task (stage, status); 

الحصول على المهمة التالية في مرحلة معينة:


 with next_task as ( select id from task where stage = $1 and status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

الانتهاء من المهمة مع الانتقال إلى المرحلة المشار إليها:


 update task set stage = $2, status = 2 where id = $1; 

أو الانتقال إلى المرحلة التالية بالترتيب:


 update task set stage = stage + 1, status = 2 where id = $1; 

المهام المجدولة


هذا هو اختلاف قائمة الانتظار التكرار.


يمكن أن يكون لكل مهمة جدول زمني خاص بها (في أبسط إصدار ، وتيرة الإطلاق).


 create table task ( id bigint not null primary key, period integer not null, --     status integer not null default 0, -- 0 - , 1 -   next_run_time timestamp not null default localtimestamp ); create index task__status__next_run_time__idx on task (status, next_run_time); 

إضافة مهمة:


 insert into task (id, period, next_run_time) values ($1, $2, $3); 

الحصول على المهمة التالية:


 with next_task as ( select id from task where status = 0 and next_run_time <= localtimestamp limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

إكمال المهمة والتخطيط للتشغيل التالي:


 update task set status = 0, next_run_time = next_run_time + make_interval(secs => period) where id = $1 

بدلا من الاستنتاج


لا يوجد شيء معقد في إنشاء قائمة انتظار مهمة متخصصة باستخدام أدوات RDBMS.


سوف تستجيب قائمة الانتظار "عصامي" حتى الأعنف تقريبا أي عمل / مجال المتطلبات.


حسنًا ، يجب ألا ننسى أن قائمة الانتظار ، مثلها مثل أي قاعدة بيانات أخرى ، تتطلب ضبطًا مدروسًا للخادم والاستعلامات والفهارس.

Source: https://habr.com/ru/post/ar481556/


All Articles