
Il y a beaucoup de cĆurs dans les processeurs modernes. Pendant des annĂ©es, les applications ont envoyĂ© des requĂȘtes aux bases de donnĂ©es en parallĂšle. S'il s'agit d'une requĂȘte de gĂ©nĂ©ration de rapports pour plusieurs lignes d'une table, elle s'exĂ©cute plus rapidement lorsqu'elle utilise plusieurs processeurs, et dans PostgreSQL, il est possible Ă partir de la version 9.6.
Il a fallu 3 ans pour implĂ©menter la fonction de requĂȘte parallĂšle - j'ai dĂ» réécrire le code Ă diffĂ©rentes Ă©tapes de l'exĂ©cution de la requĂȘte. PostgreSQL 9.6 a introduit une infrastructure pour amĂ©liorer encore le code. Dans les versions ultĂ©rieures, d'autres types de requĂȘtes sont exĂ©cutĂ©es en parallĂšle.
Limitations
- N'activez pas l'exĂ©cution parallĂšle si tous les cĆurs sont dĂ©jĂ pris, sinon les autres requĂȘtes ralentiront.
- Plus important encore, le traitement parallÚle avec des valeurs WORK_MEM élevées consomme beaucoup de mémoire - chaque jointure ou tri de hachage occupe de la mémoire dans la quantité de work_mem.
- Les requĂȘtes OLTP Ă faible latence ne peuvent pas ĂȘtre accĂ©lĂ©rĂ©es par une exĂ©cution parallĂšle. Et si la requĂȘte renvoie une ligne, le traitement parallĂšle ne fera que la ralentir.
- Les dĂ©veloppeurs aiment utiliser le benchmark TPC-H. Vous avez peut-ĂȘtre des requĂȘtes similaires pour une exĂ©cution parallĂšle parfaite.
- Seules les requĂȘtes SELECT sans verrous de prĂ©dicat sont exĂ©cutĂ©es en parallĂšle.
- Parfois, une indexation correcte est meilleure que des analyses de table séquentielles en parallÚle.
- Les requĂȘtes d'interruption et les curseurs ne sont pas pris en charge.
- Les fonctions de fenĂȘtre et les fonctions d'agrĂ©gation des ensembles ordonnĂ©s ne sont pas parallĂšles.
- Vous ne gagnez rien dans la charge de travail d'E / S.
- Il n'y a pas d'algorithme de tri parallĂšle. Mais les requĂȘtes triĂ©es peuvent ĂȘtre exĂ©cutĂ©es en parallĂšle Ă certains Ă©gards.
- Remplacez CTE (WITH ...) par un SELECT imbriqué pour activer le traitement parallÚle.
- Les wrappers de données tiers ne prennent pas encore en charge le traitement parallÚle (mais ils pourraient!)
- FULL OUTER JOIN n'est pas pris en charge.
- max_rows désactive le traitement parallÚle.
- Si la demande a une fonction qui n'est pas marquée comme PARALLEL SAFE, elle sera monothread.
- Le niveau d'isolement des transactions SERIALIZABLE désactive le traitement parallÚle.
Environnement de test
Les dĂ©veloppeurs de PostgreSQL ont essayĂ© de rĂ©duire le temps de rĂ©ponse des requĂȘtes de benchmark TPC-H. TĂ©lĂ©chargez le benchmark et adaptez-le Ă PostgreSQL . Il s'agit d'une utilisation non officielle du benchmark TPC-H - pas pour comparer des bases de donnĂ©es ou du matĂ©riel.
- Téléchargez TPC-H_Tools_v2.17.3.zip (ou une version plus récente) à partir du TPC hors site .
- Renommez makefile.suite en Makefile et modifiez comme décrit ici: https://github.com/tvondra/pg_tpch . Compilez le code avec la commande make.
- Générez des données:
./dbgen -s 10
crĂ©e une base de donnĂ©es de 23 Go. Cela suffit pour voir la diffĂ©rence de performances des requĂȘtes parallĂšles et non parallĂšles. - Convertissez les fichiers
tbl
en csv for
et sed
. - Clonez le référentiel pg_tpch et copiez les
csv
dans pg_tpch/dss/data
. - CrĂ©ez des requĂȘtes avec la commande
qgen
. - Téléchargez les données dans la base de données avec la commande
./tpch.sh
.
Balayage séquentiel parallÚle
Il peut ĂȘtre plus rapide non pas en raison de la lecture parallĂšle, mais parce que les donnĂ©es sont dispersĂ©es sur de nombreux cĆurs de processeur. Sur les systĂšmes d'exploitation modernes, les fichiers de donnĂ©es PostgreSQL sont bien mis en cache. Avec la lecture anticipĂ©e, vous pouvez obtenir plus du stockage que les demandes du dĂ©mon PG. Par consĂ©quent, les performances des requĂȘtes ne sont pas limitĂ©es par les E / S de disque. Il consomme des cycles CPU pour:
- lire les lignes une par une dans les pages du tableau;
- Comparez les valeurs de chaĂźne et les clauses
WHERE
.
ExĂ©cutons une simple requĂȘte de select
:
tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------- Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 1146337 Planning Time: 0.203 ms Execution Time: 19035.100 ms
Une analyse sĂ©quentielle produit trop de lignes sans agrĂ©gation, de sorte que la demande est exĂ©cutĂ©e par un seul cĆur de processeur.
Si vous ajoutez SUM()
, vous pouvez voir que deux workflows aideront à accélérer la demande:
explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1) -> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 -> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3) -> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 382112 Planning Time: 0.241 ms Execution Time: 8555.131 ms
Agrégation parallÚle
Le nĆud Parallel Seq Scan produit des chaĂźnes pour une agrĂ©gation partielle. Le nĆud Partial Aggregate tronque ces lignes Ă l'aide de SUM()
. Ă la fin, le compteur SUM de chaque workflow est collectĂ© par le nĆud Gather.
Le rĂ©sultat final est calculĂ© par le nĆud "Finalize Aggregate". Si vous avez vos propres fonctions d'agrĂ©gation, assurez-vous de les marquer comme "coffre-fort parallĂšle".
Nombre de workflows
Le nombre de workflows peut ĂȘtre augmentĂ© sans redĂ©marrer le serveur:
alter system set max_parallel_workers_per_gather=4; select * from pg_reload_conf();
Maintenant, nous voyons 4 travailleurs dans la sortie d'explication:
tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1440213.58..1440213.59 rows=1 width=32) (actual time=5152.072..5152.072 rows=1 loops=1) -> Gather (cost=1440213.15..1440213.56 rows=4 width=32) (actual time=5151.807..5153.900 rows=5 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial Aggregate (cost=1439213.15..1439213.16 rows=1 width=32) (actual time=5147.238..5147.239 rows=1 loops=5) -> Parallel Seq Scan on lineitem (cost=0.00..1402428.00 rows=14714059 width=5) (actual time=0.037..3601.882 rows=11767943 loops=5) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 229267 Planning Time: 0.218 ms Execution Time: 5153.967 ms
Que se passe-t-il ici? Il y avait 2 fois plus de workflows et la demande n'était que 1,6599 fois plus rapide. Les calculs sont intéressants. Nous avions 2 processus de travail et 1 leader. AprÚs le changement, il est devenu 4 + 1.
Notre accélération maximale du traitement parallÚle: 5/3 = 1,66 (6) fois.
Comment ça marche?
Les processus
L'exĂ©cution d'une demande commence toujours par un processus pilote. Le leader fait tout ce qui n'est pas parallĂšle et fait partie du traitement parallĂšle. Les autres processus qui exĂ©cutent les mĂȘmes requĂȘtes sont appelĂ©s workflows. Le traitement parallĂšle utilise une infrastructure de workflows d'arriĂšre-plan dynamiques (depuis la version 9.4). Ătant donnĂ© que d'autres parties de PostgreSQL utilisent des processus plutĂŽt que des threads, une requĂȘte avec 3 workflows pourrait ĂȘtre 4 fois plus rapide que le traitement traditionnel.
L'interaction
Les workflows communiquent avec le leader via une file d'attente de messages (basée sur la mémoire partagée). Chaque processus a 2 files d'attente: pour les erreurs et pour les tuples.
De combien de processus de travail avez-vous besoin?
La limite minimale est définie par le paramÚtre max_parallel_workers_per_gather
. Ensuite, l'exĂ©cuteur de requĂȘte prend les workflows du pool limitĂ© par le paramĂštre de max_parallel_workers size
. La derniĂšre limitation est max_worker_processes
, c'est-Ă -dire le nombre total de processus d'arriĂšre-plan.
S'il n'a pas été possible d'allouer un workflow, le traitement se fera en un seul processus.
Le planificateur de requĂȘtes peut raccourcir les workflows en fonction de la taille de la table ou de l'index. Il existe des paramĂštres min_parallel_table_scan_size
et min_parallel_index_scan_size
pour cela.
set min_parallel_table_scan_size='8MB' 8MB table => 1 worker 24MB table => 2 workers 72MB table => 3 workers x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker
Chaque fois qu'une table est 3 fois plus grande que min_parallel_(index|table)_scan_size
, Postgres ajoute un workflow. Le nombre de processus de travail n'est pas basé sur les coûts. La dépendance circulaire complique les implémentations complexes. Au lieu de cela, le planificateur utilise des rÚgles simples.
En pratique, ces rÚgles ne sont pas toujours adaptées à la production, vous pouvez donc modifier le nombre de workflows pour une table particuliÚre: ALTER TABLE ... SET ( parallel_workers = N
).
Pourquoi le traitement parallÚle n'est-il pas utilisé?
En plus d'une longue liste de restrictions, il existe également des contrÎles des coûts:
parallel_setup_cost
- pour se passer du traitement parallĂšle des requĂȘtes courtes. Ce paramĂštre estime le temps de prĂ©paration de la mĂ©moire, de dĂ©marrage du processus et d'Ă©change de donnĂ©es initial.
parallel_tuple_cost
: la communication entre un leader et des travailleurs peut ĂȘtre retardĂ©e proportionnellement au nombre de tuples des processus de travail. Ce paramĂštre calcule les coĂ»ts d'Ă©change de donnĂ©es.
Jointure de boucle imbriquée
PostgreSQL 9.6+ â . explain (costs off) select c_custkey, count(o_orderkey) from customer left outer join orders on c_custkey = o_custkey and o_comment not like '%special%deposits%' group by c_custkey; QUERY PLAN -------------------------------------------------------------------------------------- Finalize GroupAggregate Group Key: customer.c_custkey -> Gather Merge Workers Planned: 4 -> Partial GroupAggregate Group Key: customer.c_custkey -> Nested Loop Left Join -> Parallel Index Only Scan using customer_pkey on customer -> Index Scan using idx_orders_custkey on orders Index Cond: (customer.c_custkey = o_custkey) Filter: ((o_comment)::text !~~ '%special%deposits%'::text)
La collecte a lieu à la derniÚre étape, donc la jointure gauche de la boucle imbriquée est une opération parallÚle. Parallel Index Only Scan n'est apparu que dans la version 10. Il fonctionne de maniÚre similaire à la numérisation série parallÚle. La condition c_custkey = o_custkey
lit une commande pour chaque ligne client. Ce n'est donc pas parallĂšle.
Hash Join - Hash Join
Chaque flux de travail crée sa propre table de hachage avant PostgreSQL 11. Et s'il existe plus de quatre de ces processus, les performances ne s'amélioreront pas. Dans la nouvelle version, la table de hachage est partagée. Chaque flux de travail peut utiliser WORK_MEM pour créer une table de hachage.
select l_shipmode, sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority <> '1-URGENT' and o_orderpriority <> '2-HIGH' then 1 else 0 end) as low_line_count from orders, lineitem where o_orderkey = l_orderkey and l_shipmode in ('MAIL', 'AIR') and l_commitdate < l_receiptdate and l_shipdate < l_commitdate and l_receiptdate >= date '1996-01-01' and l_receiptdate < date '1996-01-01' + interval '1' year group by l_shipmode order by l_shipmode LIMIT 1; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1) -> Finalize GroupAggregate (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1) Group Key: lineitem.l_shipmode -> Gather Merge (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial GroupAggregate (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5) Group Key: lineitem.l_shipmode -> Sort (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5) Sort Key: lineitem.l_shipmode Sort Method: external merge Disk: 2304kB Worker 0: Sort Method: external merge Disk: 2064kB Worker 1: Sort Method: external merge Disk: 2384kB Worker 2: Sort Method: external merge Disk: 2264kB Worker 3: Sort Method: external merge Disk: 2336kB -> Parallel Hash Join (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5) Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) -> Parallel Seq Scan on lineitem (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5) Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone)) Rows Removed by Filter: 11934691 -> Parallel Hash (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5) Buckets: 65536 Batches: 256 Memory Usage: 3840kB -> Parallel Seq Scan on orders (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5) Planning Time: 0.977 ms Execution Time: 7923.770 ms
La requĂȘte 12 de TPC-H illustre une connexion de hachage parallĂšle. Chaque flux de travail est impliquĂ© dans la crĂ©ation d'une table de hachage partagĂ©e.
Fusionner rejoindre
Une jointure de fusion n'est pas de nature parallĂšle. Ne vous inquiĂ©tez pas s'il s'agit de la derniĂšre Ă©tape de la demande - elle peut toujours ĂȘtre exĂ©cutĂ©e en parallĂšle.
-- Query 2 from TPC-H explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment from part, supplier, partsupp, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 36 and p_type like '%BRASS' and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' and ps_supplycost = ( select min(ps_supplycost) from partsupp, supplier, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' ) order by s_acctbal desc, n_name, s_name, p_partkey LIMIT 100; QUERY PLAN ---------------------------------------------------------------------------------------------------------- Limit -> Sort Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey -> Merge Join Merge Cond: (part.p_partkey = partsupp.ps_partkey) Join Filter: (partsupp.ps_supplycost = (SubPlan 1)) -> Gather Merge Workers Planned: 4 -> Parallel Index Scan using <strong>part_pkey</strong> on part Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36)) -> Materialize -> Sort Sort Key: partsupp.ps_partkey -> Nested Loop -> Nested Loop Join Filter: (nation.n_regionkey = region.r_regionkey) -> Seq Scan on region Filter: (r_name = 'AMERICA'::bpchar) -> Hash Join Hash Cond: (supplier.s_nationkey = nation.n_nationkey) -> Seq Scan on supplier -> Hash -> Seq Scan on nation -> Index Scan using idx_partsupp_suppkey on partsupp Index Cond: (ps_suppkey = supplier.s_suppkey) SubPlan 1 -> Aggregate -> Nested Loop Join Filter: (nation_1.n_regionkey = region_1.r_regionkey) -> Seq Scan on region region_1 Filter: (r_name = 'AMERICA'::bpchar) -> Nested Loop -> Nested Loop -> Index Scan using idx_partsupp_partkey on partsupp partsupp_1 Index Cond: (part.p_partkey = ps_partkey) -> Index Scan using supplier_pkey on supplier supplier_1 Index Cond: (s_suppkey = partsupp_1.ps_suppkey) -> Index Scan using nation_pkey on nation nation_1 Index Cond: (n_nationkey = supplier_1.s_nationkey)
Le nĆud Fusionner la jointure est situĂ© au-dessus de la collecte de fusion. La fusion n'utilise donc pas de traitement parallĂšle. Mais le nĆud Parallel Index Scan aide toujours avec le segment part_pkey
.
Connexion de section
Dans PostgreSQL 11, le partitionnement est dĂ©sactivĂ© par dĂ©faut: il a une planification trĂšs coĂ»teuse. Les tables avec un partitionnement similaire peuvent ĂȘtre jointes section par section. Postgres utilisera donc des tables de hachage plus petites. Chaque connexion de section peut ĂȘtre parallĂšle.
tpch=# set enable_partitionwise_join=t; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN --------------------------------------------------- Append -> Hash Join Hash Cond: (t2.b = t1.a) -> Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p1 t1 Filter: (b = 0) -> Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p2 t1_1 Filter: (b = 0) tpch=# set parallel_setup_cost = 1; tpch=# set parallel_tuple_cost = 0.01; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN ----------------------------------------------------------- Gather Workers Planned: 4 -> Parallel Append -> Parallel Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Parallel Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p2 t1_1 Filter: (b = 0) -> Parallel Hash Join Hash Cond: (t2.b = t1.a) -> Parallel Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p1 t1 Filter: (b = 0)
L'essentiel est que la connexion dans les sections ne soit parallĂšle que si ces sections sont suffisamment grandes.
Parallel Append - Parallel Append
Parallel Append peut ĂȘtre utilisĂ© Ă la place de diffĂ©rents blocs dans diffĂ©rents workflows. Cela se produit gĂ©nĂ©ralement avec les requĂȘtes UNION ALL. L'inconvĂ©nient est moins de parallĂ©lisme, car chaque flux de travail ne traite qu'une seule demande.
2 workflows s'exécutent ici, bien que 4 soient inclus.
tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day; QUERY PLAN ------------------------------------------------------------------------------------------------ Gather Workers Planned: 2 -> Parallel Append -> Aggregate -> Seq Scan on lineitem Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone) -> Aggregate -> Seq Scan on lineitem lineitem_1 Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Les variables les plus importantes
- WORK_MEM limite la quantitĂ© de mĂ©moire pour chaque processus, pas seulement pour les requĂȘtes: processus de connexion work_mem = beaucoup de mĂ©moire.
max_parallel_workers_per_gather
- combien de processus de travail le programme d'exécution utilisera pour le traitement parallÚle à partir du plan.max_worker_processes
- ajuste le nombre total de processus de travail au nombre de cĆurs de processeur sur le serveur.max_parallel_workers
est le mĂȘme, mais pour les workflows parallĂšles.
Résumé
Ă partir de la version 9.6, le traitement parallĂšle peut considĂ©rablement amĂ©liorer les performances des requĂȘtes complexes qui analysent de nombreuses lignes ou index. Dans PostgreSQL 10, le traitement parallĂšle est activĂ© par dĂ©faut. N'oubliez pas de le dĂ©sactiver sur les serveurs avec une charge de travail OLTP importante. Les analyses sĂ©quentielles ou les analyses d'index consomment beaucoup de ressources. Si vous ne gĂ©nĂ©rez pas de rapports sur l'ensemble des donnĂ©es, les requĂȘtes peuvent ĂȘtre rendues plus efficaces en ajoutant simplement les index manquants ou en utilisant le partitionnement correct.
Les références