
يتم استخدام قوائم الانتظار لتنظيم معالجة تدفق المهمة. هناك حاجة لتراكم وتوزيع المهام بين فناني الأداء. يمكن أن توفر قوائم الانتظار أيضًا متطلبات إضافية لمعالجة مهام التشغيل: ضمان التسليم ، وضمان لمرة واحدة ، وتحديد الأولويات ، إلخ.
كقاعدة عامة ، يتم استخدام أنظمة قائمة انتظار الرسائل الجاهزة (MQ - قائمة انتظار الرسائل) ، ولكن في بعض الأحيان تحتاج إلى تنظيم قائمة انتظار مخصصة أو قائمة متخصصة معينة (على سبيل المثال ، قائمة انتظار أولوية وإعادة تشغيل متأخرة للمهام التي لم تتم معالجتها بسبب استثناءات). ستتم مناقشة إنشاء قوائم الانتظار هذه أدناه.
حدود التطبيق
تم تصميم الحلول المقترحة للتعامل مع تدفق المهام المماثلة. أنها ليست مناسبة لتنظيم الحانة / الفرعية أو المراسلة بين الأنظمة والمكونات المزدوجة.
تعمل قائمة الانتظار الموجودة أعلى قاعدة بيانات علائقية بشكل جيد للأحمال الصغيرة والمتوسطة (مئات الآلاف من المهام في اليوم ، عشرات إلى مئات المؤدين) ، ولكن من الأفضل استخدام حل متخصص للخيوط الكبيرة.
جوهر الأسلوب في خمس كلمات
select ... for update skip locked
خط الأساس
للبساطة ، سيتم فيما يلي تخزين معرفات المهام الفريدة فقط في الجدول. لا ينبغي أن يكون إضافة نوع من الحمولة الصعبة أمرًا صعبًا.
يحتوي جدول قائمة انتظار أبسط على المهمة نفسها وحالتها:
create table task ( id bigint not null primary key, status integer not null default 0
إضافة مهمة:
insert into task (id) values ($1) on conflict (id) do nothing;
الحصول على المهمة التالية:
with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
إنجاز المهمة:
update task set status = 2 where id = $1;
قائمة انتظار الأولوية
في الحالة البسيطة ، يكون id
المهمة هو أولويته. يتم تغيير طلب المهمة التالية فقط - تتم إضافة order by id
شرط الفرز order by id
بالترتيب المطلوب لمهام المعالجة. تحتاج أيضًا إلى إنشاء فهرس مركب بواسطة (status, id)
.
أو ، للأولوية ، يتم إضافة عمود منفصل:
create table task ( id bigint not null primary key, priority integer not null, status integer not null default 0
إضافة مهمة: insert into task (id, priority) values ($1, $2) on conflict (id) do nothing;
الحصول على المهمة التالية: with next_task as ( select id from task where status = 0 order by priority limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
يتيح لك العمود المميز تغيير أولوية المهمة أثناء التنقل.
قائمة انتظار مع تكرار المهام "الساقطة"
قد يحدث خطأ أو استثناء أثناء تنفيذ المهمة. في مثل هذه الحالات ، يجب أن تكون المهمة في قائمة الانتظار مرة أخرى. في بعض الأحيان ، لا يزال من الضروري تأجيل وقت التنفيذ المتكرر لبعض الوقت ، على سبيل المثال ، إذا كان الاستثناء يرجع إلى عدم توفر خدمة جهة خارجية بشكل مؤقت.
create table task ( id bigint not null primary key, status integer not null default 0,
كما ترى ، تم توسيع قائمة الحالات وتمت إضافة أعمدة جديدة:
attempt
- عدد من المحاولات ؛ مطلوب لاتخاذ قرار بشأن الحاجة إلى إعادة المحاولة (تحديد عدد المحاولات) وتحديد تأخير قبل إعادة المحاولة (على سبيل المثال ، يتم تأخير كل محاولة لاحقة بمقدار 10 * attempt
دقائق 10 * attempt
) ؛delayed_to
- وقت المحاولة التالية لإكمال المهمة ؛error_text
- نص الخطأ.
نص الخطأ مطلوب للتجميع حسب نوع الخطأ.
مثال تقارير نظام المراقبة التي تراكمت الآلاف من المهام مع "خطأ" الحالة في قائمة الانتظار. نحن تلبية الطلب:
select error_text, count(*) from task where status = 3 group by 1 order by 2 desc;
للحصول على التفاصيل ، انتقل إلى سجلات فناني الأداء. قم بتصحيح الموقف الذي تسبب في الخطأ (إن أمكن). إذا لزم الأمر ، نقوم بتسريع إعادة تشغيل المهام عن طريق تعيين الحالة إلى 0 أو عن طريق تحويل وقت المحاولة التالية.
الحصول على المهمة الجديدة التالية: with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id;
الحصول على المهمة التالية معلقة بسبب خطأ: with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id;
الانتهاء بنجاح من المهمة: update task set status = 2, delayed_to = null, error_text = null where id = $1;
فشلت المهمة ، سيكون هناك تكرار في (5 * عدد المحاولات) بالدقائق: update task set status = 3, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2 where id = $1;
المهمة المكتملة بخطأ فادح ، لن تتم إعادة المحاولة: update task set status = 4, delayed_to = null, error_text = $2 where id = $1;
ينقسم طلب المهمة التالية إلى قسمين بحيث يمكن لـ DBMS إنشاء خطة استعلام فعالة لقائمة الانتظار ذات الأولوية. شرط التحديد مع or
يمكن أن يحدث خطأ كبير مع order by
الفرز order by
.
جمع القياسات
أضف السمات التالية:
- وقت إنشاء المهمة
- وقت تغيير المهمة
- بداية ونهاية وقت المهمة.
create table task ( id bigint not null primary key, status integer not null default 0,
نحن نعتبر الأعمدة المضافة في جميع الاستعلامات.
الحصول على المهمة الجديدة التالية: with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id;
الحصول على المهمة التالية معلقة بسبب خطأ: with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id;
الانتهاء بنجاح من المهمة: update task set status = 2, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1;
فشلت المهمة ، سيكون هناك تكرار في (5 * عدد المحاولات) بالدقائق: update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2, updated = localtimestamp where id = $1;
المهمة المكتملة بخطأ فادح ، لن تتم إعادة المحاولة: update task set status = 4, end_time = localtimestamp, delayed_to = null, error_text = $2, updated = localtimestamp where id = $1;
أمثلة على سبب الحاجة لذلك
بحث وإعادة تشغيل المهام المتدلية:
update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp, error_text = 'hanged', updated = localtimestamp where status = 1 and updated < localtimestamp - interval '1 hour';
إزالة المهام القديمة:
delete from task where updated < localtimestamp - interval '30 days';
إحصائيات لإنجاز المهام:
select date_trunc('hour', end_time), count(*), sum(end_time - begin_time), avg(end_time - begin_time) from task where status = 2 and end_time >= '2019-12-16' group by 1 order by 1;
أعد تشغيل المهام المكتملة مسبقًا
على سبيل المثال ، يتم تحديث المستند ، تحتاج إلى إعادة اختباره للبحث عن النص الكامل.
create table task ( id bigint not null primary key, task_updated_at timestamp not null default localtimstamp, status integer not null default 0,
هنا ، تتم إضافة عمود task_updated_at
لوقت تحديث المهمة ، ولكن يمكن استخدام الحقل الذي created
.
إضافة أو تحديث (إعادة تشغيل) مهمة:
insert into task (id, task_updated_at) values ($1, $2) on conflict (id) do update set task_updated_at = excluded.task_updated_at, status = case when status = 1 then 1 else 0 end, delayed_to = null, error_text = null, updated = localtimestamp where task_updated_at < excluded.task_updated_at;
ما يجري هنا. تصبح المهمة "جديدة" إذا لم يتم إكمالها الآن.
سوف يتحقق طلب إكمال المهمة أيضًا مما إذا كان قد تم تغييره أثناء التنفيذ.
طلبات المهمة التالية هي نفسها في قائمة انتظار جمع المقاييس.
الانتهاء بنجاح من المهمة:
update task set status = case when begin_time >= updated then 2 else 0 end, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1;
الانتهاء من المهمة مع وجود خطأ: اعتمادا على المهمة. يمكنك إجراء تأخير غير مشروط في إعادة التشغيل ، يمكنك تعيين الحالة إلى "جديد" عند التحديث.
خط أنابيب
المهمة تمر بعدة مراحل. يمكنك عمل قائمة انتظار منفصلة لكل مرحلة. أو يمكنك إضافة العمود المقابل إلى الجدول.
مثال يستند إلى قائمة الانتظار الأساسية حتى لا تشوش التعليمات البرمجية. يمكن تطبيق جميع التعديلات الموضحة مسبقًا على قائمة الانتظار هذه دون أي مشاكل.
create table task ( id bigint not null primary key, stage integer not null default 0, status integer not null default 0 ); create index task__stage__status__idx on task (stage, status);
الحصول على المهمة التالية في مرحلة معينة:
with next_task as ( select id from task where stage = $1 and status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
الانتهاء من المهمة مع الانتقال إلى المرحلة المشار إليها:
update task set stage = $2, status = 2 where id = $1;
أو الانتقال إلى المرحلة التالية بالترتيب:
update task set stage = stage + 1, status = 2 where id = $1;
المهام المجدولة
هذا هو اختلاف قائمة الانتظار التكرار.
يمكن أن يكون لكل مهمة جدول زمني خاص بها (في أبسط إصدار ، وتيرة الإطلاق).
create table task ( id bigint not null primary key, period integer not null,
إضافة مهمة:
insert into task (id, period, next_run_time) values ($1, $2, $3);
الحصول على المهمة التالية:
with next_task as ( select id from task where status = 0 and next_run_time <= localtimestamp limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id;
إكمال المهمة والتخطيط للتشغيل التالي:
update task set status = 0, next_run_time = next_run_time + make_interval(secs => period) where id = $1
بدلا من الاستنتاج
لا يوجد شيء معقد في إنشاء قائمة انتظار مهمة متخصصة باستخدام أدوات RDBMS.
سوف تستجيب قائمة الانتظار "عصامي" حتى الأعنف تقريبا أي عمل / مجال المتطلبات.
حسنًا ، يجب ألا ننسى أن قائمة الانتظار ، مثلها مثل أي قاعدة بيانات أخرى ، تتطلب ضبطًا مدروسًا للخادم والاستعلامات والفهارس.