Antrian Tugas PostgreSQL

Antrian Gajah - pixabay.com


Antrian digunakan untuk mengatur pemrosesan alur tugas. Mereka dibutuhkan untuk akumulasi dan distribusi tugas di antara para pemain. Antrian juga dapat memberikan persyaratan tambahan untuk tugas pemrosesan: jaminan pengiriman, jaminan satu kali, penentuan prioritas, dll.


Sebagai aturan, sistem antrian pesan siap pakai digunakan (MQ - antrian pesan), tetapi kadang-kadang Anda perlu mengatur antrian ad hoc atau yang khusus (misalnya, antrian prioritas dan keterlambatan memulai kembali tugas yang tidak diproses karena pengecualian). Pembuatan antrian seperti itu akan dibahas di bawah ini.


Keterbatasan Penerapan


Solusi yang diusulkan dirancang untuk menangani aliran tugas serupa. Mereka tidak cocok untuk mengatur pub / sub atau pesan antara sistem dan komponen yang digabungkan secara longgar.


Antrian di atas basis data relasional bekerja dengan baik untuk beban kecil dan menengah (ratusan ribu tugas per hari, puluhan hingga ratusan pemain), tetapi untuk utas besar lebih baik menggunakan solusi khusus.


Esensi metode dalam lima kata


select ... for update skip locked 

Garis dasar


Untuk mempermudah, selanjutnya hanya pengidentifikasi tugas unik yang akan disimpan dalam tabel. Menambahkan semacam muatan seharusnya tidak sulit.


Tabel untuk antrian paling sederhana berisi tugas itu sendiri dan statusnya:


 create table task ( id bigint not null primary key, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__idx on task (status); 

Menambahkan tugas:


 insert into task (id) values ($1) on conflict (id) do nothing; 

Mendapatkan tugas berikut:


 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Penyelesaian tugas:


 update task set status = 2 where id = $1; 

Antrian Prioritas


Dalam kasus sederhana, id tugas adalah prioritasnya. Hanya permintaan untuk tugas selanjutnya yang diubah - syarat kondisi pengurutan order by id dengan urutan tugas pemrosesan yang diperlukan ditambahkan. Anda juga perlu membuat indeks gabungan berdasarkan (status, id) .


Atau, untuk prioritas, kolom terpisah ditambahkan:


 create table task ( id bigint not null primary key, priority integer not null, status integer not null default 0 -- 0 - , 1 -  , 2 -  ); create index task__status__priority__idx on task (status, priority); 

Menambahkan tugas:
 insert into task (id, priority) values ($1, $2) on conflict (id) do nothing; 

Mendapatkan tugas berikut:
 with next_task as ( select id from task where status = 0 order by priority limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Kolom yang disorot memungkinkan Anda mengubah prioritas tugas dengan cepat.


Antri dengan pengulangan tugas "jatuh"


Kesalahan atau pengecualian dapat terjadi selama pelaksanaan tugas. Dalam kasus seperti itu, tugas harus antri lagi. Kadang-kadang masih perlu menunda waktu pelaksanaannya yang berulang untuk beberapa waktu, misalnya, jika pengecualian disebabkan oleh tidak tersedianya sementara layanan pihak ketiga.


 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, delayed_to timestamp null, error_text text null ); create index task__status__delayed_to__idx on task (status, delayed_to); 

Seperti yang Anda lihat, daftar status telah diperluas dan kolom baru telah ditambahkan:


  • attempt - jumlah upaya; diperlukan untuk membuat keputusan tentang perlunya mencoba lagi (membatasi jumlah upaya) dan memilih penundaan sebelum mencoba lagi (misalnya, setiap upaya berikutnya ditunda oleh 10 * attempt menit 10 * attempt );
  • delayed_to - waktu upaya berikutnya untuk menyelesaikan tugas;
  • error_text - teks kesalahan.

Teks kesalahan diperlukan untuk dikelompokkan berdasarkan jenis kesalahan.


Sebuah contoh Sistem pemantauan melaporkan bahwa ribuan tugas dengan status "kesalahan" telah terakumulasi dalam antrian. Kami memenuhi permintaan:


 select error_text, count(*) from task where status = 3 group by 1 order by 2 desc; 

Untuk detailnya, buka log para pelaku. Perbaiki situasi yang menyebabkan kesalahan (jika mungkin). Jika perlu, kami mempercepat mulai ulang tugas dengan mengatur status ke 0 atau dengan menggeser waktu upaya berikutnya.


Dapatkan tugas baru berikut:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

Mendapatkan tugas berikut yang tertunda karena kesalahan:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, delayed_to = null, error_text = null from next_task where task.id = next_task.id returning task.id; 

Berhasil menyelesaikan tugas:
 update task set status = 2, delayed_to = null, error_text = null where id = $1; 

Tugas gagal, akan ada pengulangan dalam (5 * jumlah upaya) menit:
 update task set status = 3, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2 where id = $1; 

Tugas selesai dengan kesalahan fatal, tidak akan ada coba lagi:
 update task set status = 4, delayed_to = null, error_text = $2 where id = $1; 

Permintaan untuk tugas selanjutnya dibagi menjadi dua sehingga DBMS dapat membangun rencana kueri yang efektif untuk antrian prioritas. Kondisi pemilihan dengan or bisa salah dengan order by sortir.


Koleksi metrik


Tambahkan atribut berikut:


  • waktu pembuatan tugas;
  • waktu tugas berubah;
  • memulai dan mengakhiri waktu tugas.

 create table task ( id bigint not null primary key, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) attempt integer not null default 0, begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); create index task__status__delayed_to__idx on task (status, delayed_to); create index task__updated__idx on task (updated); 

Kami mempertimbangkan kolom yang ditambahkan di semua kueri.


Dapatkan tugas baru berikut:
 with next_task as ( select id from task where status = 0 limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

Mendapatkan tugas berikut yang tertunda karena kesalahan:
 with next_task as ( select id from task where status = 3 and delayed_to < localtimestamp limit 1 for update skip locked ) update task set status = 1, attempt = attempt + 1, begin_time = localtimestamp, end_time = null, delayed_to = null, error_text = null, updated = localtimestamp from next_task where task.id = next_task.id returning task.id; 

Berhasil menyelesaikan tugas:
 update task set status = 2, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

Tugas gagal, akan ada pengulangan dalam (5 * jumlah upaya) menit:
 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp + make_interval(mins => 5 * attempt), error_text = $2, updated = localtimestamp where id = $1; 

Tugas selesai dengan kesalahan fatal, tidak akan ada coba lagi:
 update task set status = 4, end_time = localtimestamp, delayed_to = null, error_text = $2, updated = localtimestamp where id = $1; 

Contoh mengapa ini mungkin diperlukan


Cari dan mulai kembali tugas yang menggantung:


 update task set status = 3, end_time = localtimestamp, delayed_to = localtimestamp, error_text = 'hanged', updated = localtimestamp where status = 1 and updated < localtimestamp - interval '1 hour'; 

Menghapus tugas lama:


 delete from task where updated < localtimestamp - interval '30 days'; 

Statistik untuk menyelesaikan tugas:


 select date_trunc('hour', end_time), count(*), sum(end_time - begin_time), avg(end_time - begin_time) from task where status = 2 and end_time >= '2019-12-16' group by 1 order by 1; 

Mulai ulang tugas yang sudah diselesaikan sebelumnya


Misalnya, dokumen diperbarui, Anda perlu mengindeks ulang untuk pencarian teks lengkap.


 create table task ( id bigint not null primary key, task_updated_at timestamp not null default localtimstamp, status integer not null default 0, -- 0 - , 1 -  , 2 - , 3 - , 4 -   (  ) begin_time timestamp null, end_time timestamp null, delayed_to timestamp null, error_text text null, created timestamp not null default localtimestamp, updated timestamp not null default localtimestamp ); 

Di sini, kolom task_updated_at ditambahkan untuk waktu pembaruan tugas, tetapi bidang yang created dapat digunakan.


Menambah atau memperbarui (memulai kembali) tugas:


 insert into task (id, task_updated_at) values ($1, $2) on conflict (id) do update set task_updated_at = excluded.task_updated_at, status = case when status = 1 then 1 else 0 end, delayed_to = null, error_text = null, updated = localtimestamp where task_updated_at < excluded.task_updated_at; 

Apa yang sedang terjadi di sini. Suatu tugas menjadi "baru" jika tidak selesai sekarang.


Permintaan untuk menyelesaikan tugas juga akan memeriksa apakah itu diubah selama eksekusi.


Permintaan untuk tugas berikutnya sama seperti dalam antrian untuk mengumpulkan metrik.


Berhasil menyelesaikan tugas:


 update task set status = case when begin_time >= updated then 2 else 0 end, end_time = localtimestamp, delayed_to = null, error_text = null, updated = localtimestamp where id = $1; 

Penyelesaian tugas dengan kesalahan: tergantung pada tugas. Anda dapat membuat penundaan tanpa syarat dalam memulai kembali, Anda dapat mengatur status ke "baru" saat memperbarui.


Saluran pipa


Tugas melewati beberapa tahap. Anda dapat membuat antrian terpisah untuk setiap tahap. Atau Anda bisa menambahkan kolom yang sesuai ke tabel.


Contoh berdasarkan antrian dasar agar tidak mengacaukan kode. Semua modifikasi yang dijelaskan sebelumnya dapat diterapkan ke antrian ini tanpa masalah.


 create table task ( id bigint not null primary key, stage integer not null default 0, status integer not null default 0 ); create index task__stage__status__idx on task (stage, status); 

Mendapatkan tugas berikut pada tahap tertentu:


 with next_task as ( select id from task where stage = $1 and status = 0 limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Penyelesaian tugas dengan transisi ke tahap yang ditunjukkan:


 update task set stage = $2, status = 2 where id = $1; 

Atau transisi ke tahap selanjutnya secara berurutan:


 update task set stage = stage + 1, status = 2 where id = $1; 

Tugas Terjadwal


Ini adalah variasi dari antrian ulang.


Setiap tugas dapat memiliki jadwal sendiri (dalam versi paling sederhana, frekuensi peluncuran).


 create table task ( id bigint not null primary key, period integer not null, --     status integer not null default 0, -- 0 - , 1 -   next_run_time timestamp not null default localtimestamp ); create index task__status__next_run_time__idx on task (status, next_run_time); 

Menambahkan tugas:


 insert into task (id, period, next_run_time) values ($1, $2, $3); 

Mendapatkan tugas berikut:


 with next_task as ( select id from task where status = 0 and next_run_time <= localtimestamp limit 1 for update skip locked ) update task set status = 1 from next_task where task.id = next_task.id returning task.id; 

Menyelesaikan tugas dan merencanakan proses selanjutnya:


 update task set status = 0, next_run_time = next_run_time + make_interval(secs => period) where id = $1 

Alih-alih sebuah kesimpulan


Tidak ada yang rumit dalam membuat antrian tugas khusus menggunakan alat RDBMS.


Antrian "buatan sendiri" akan merespons bahkan yang paling liar hampir semua persyaratan bisnis / domain.


Yah, kita tidak boleh lupa bahwa, seperti basis data lainnya, antrian memerlukan penyetelan server, kueri, dan indeks secara bijaksana.

Source: https://habr.com/ru/post/id481556/


All Articles