
In modernen CPUs gibt es viele Kerne. Seit Jahren senden Anwendungen parallel Anfragen an Datenbanken. Wenn es sich um eine Berichtsabfrage für viele Zeilen in einer Tabelle handelt, wird sie schneller ausgeführt, wenn mehrere CPUs verwendet werden. In PostgreSQL ist dies ab Version 9.6 möglich.
Die Implementierung der parallelen Abfragefunktion dauerte 3 Jahre. Ich musste den Code in verschiedenen Phasen der Abfrageausführung neu schreiben. PostgreSQL 9.6 führte eine Infrastruktur ein, um den Code weiter zu verbessern. In nachfolgenden Versionen werden andere Abfragetypen parallel ausgeführt.
Einschränkungen
- Aktivieren Sie die parallele Ausführung nicht, wenn bereits alle Kerne belegt sind. Andernfalls werden andere Anforderungen verlangsamt.
- Am wichtigsten ist, dass die parallele Verarbeitung mit hohen WORK_MEM-Werten viel Speicherplatz beansprucht - jeder Hash-Join oder jede Sortierung belegt Speicherplatz in der Menge von work_mem.
- OLTP-Anforderungen mit geringer Latenz können nicht durch parallele Ausführung beschleunigt werden. Wenn die Abfrage eine Zeile zurückgibt, wird sie durch die parallele Verarbeitung nur verlangsamt.
- Entwickler verwenden gerne den TPC-H-Benchmark. Möglicherweise haben Sie ähnliche Abfragen für eine perfekte parallele Ausführung.
- Parallel dazu werden nur SELECT-Abfragen ohne Prädikatsperren ausgeführt.
- Manchmal ist eine korrekte Indizierung besser als ein paralleles sequentielles Scannen von Tabellen.
- Anhalten von Abfragen und Cursorn werden nicht unterstützt.
- Fensterfunktionen und Aggregatfunktionen geordneter Mengen sind nicht parallel.
- Sie gewinnen nichts an der E / A-Arbeitslast.
- Es gibt keine parallelen Sortieralgorithmen. Sortierte Abfragen können jedoch in einigen Aspekten parallel ausgeführt werden.
- Ersetzen Sie CTE (WITH ...) durch ein verschachteltes SELECT, um die parallele Verarbeitung zu ermöglichen.
- Daten-Wrapper von Drittanbietern unterstützen noch keine parallele Verarbeitung (aber sie könnten!)
- FULL OUTER JOIN wird nicht unterstützt.
- max_rows deaktiviert die Parallelverarbeitung.
- Wenn die Anforderung eine Funktion hat, die nicht als PARALLEL SAFE markiert ist, wird sie mit einem Thread ausgeführt.
- Die Transaktionsisolationsstufe SERIALIZABLE deaktiviert die Parallelverarbeitung.
Testumgebung
PostgreSQL-Entwickler haben versucht, die Antwortzeit von TPC-H-Benchmark-Abfragen zu reduzieren. Laden Sie den Benchmark herunter und passen Sie ihn an PostgreSQL an . Dies ist eine inoffizielle Verwendung des TPC-H-Benchmarks - nicht zum Vergleichen von Datenbanken oder Hardware.
- Laden Sie TPC-H_Tools_v2.17.3.zip (oder eine neuere Version) von der externen TPC herunter.
- Benennen Sie makefile.suite in Makefile um und ändern Sie es wie hier beschrieben: https://github.com/tvondra/pg_tpch . Kompilieren Sie den Code mit dem Befehl make.
- Daten generieren:
./dbgen -s 10
erstellt eine 23-GB-Datenbank. Dies reicht aus, um den Leistungsunterschied zwischen parallelen und nicht parallelen Abfragen zu erkennen. - Konvertieren Sie
tbl
Dateien csv for
und sed
in csv for
. - Klonen Sie das Repository pg_tpch und kopieren Sie die
csv
nach pg_tpch/dss/data
. - Erstellen Sie Abfragen mit dem Befehl
qgen
. - Laden Sie Daten mit dem Befehl
./tpch.sh
in die Datenbank ./tpch.sh
.
Paralleler sequentieller Scan
Es kann schneller sein, nicht wegen des parallelen Lesens, sondern weil Daten über viele CPU-Kerne verteilt sind. Auf modernen Betriebssystemen sind PostgreSQL-Datendateien gut zwischengespeichert. Mit Read-Ahead können Sie mehr aus dem Speicher herausholen, als der PG-Dämon anfordert. Daher ist die Abfrageleistung nicht durch die Festplatten-E / A beschränkt. Es verbraucht CPU-Zyklen, um:
- Lesen Sie die Zeilen einzeln von den Seiten der Tabelle.
- Vergleichen Sie Zeichenfolgenwerte und
WHERE
Klauseln.
Lassen Sie uns eine einfache select
ausführen:
tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------- Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 1146337 Planning Time: 0.203 ms Execution Time: 19035.100 ms
Ein sequentieller Scan ergibt zu viele Zeilen ohne Aggregation, sodass die Anforderung von einem einzelnen CPU-Kern ausgeführt wird.
Wenn Sie SUM()
hinzufügen, können Sie sehen, dass zwei Workflows die Anforderung beschleunigen:
explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1) -> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 -> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3) -> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 382112 Planning Time: 0.241 ms Execution Time: 8555.131 ms
Parallele Aggregation
Der Knoten Parallel Seq Scan erzeugt Zeichenfolgen für die teilweise Aggregation. Der Partial Aggregate-Knoten schneidet diese Zeilen mit SUM()
. Am Ende wird der SUM-Zähler aus jedem Workflow vom Gather-Knoten erfasst.
Das Endergebnis wird vom Knoten "Aggregat finalisieren" berechnet. Wenn Sie über eigene Aggregationsfunktionen verfügen, müssen Sie diese als "parallel sicher" markieren.
Anzahl der Workflows
Die Anzahl der Workflows kann erhöht werden, ohne den Server neu zu starten:
alter system set max_parallel_workers_per_gather=4; select * from pg_reload_conf();
Jetzt sehen wir 4 Arbeiter in der Erklärungsausgabe:
tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1440213.58..1440213.59 rows=1 width=32) (actual time=5152.072..5152.072 rows=1 loops=1) -> Gather (cost=1440213.15..1440213.56 rows=4 width=32) (actual time=5151.807..5153.900 rows=5 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial Aggregate (cost=1439213.15..1439213.16 rows=1 width=32) (actual time=5147.238..5147.239 rows=1 loops=5) -> Parallel Seq Scan on lineitem (cost=0.00..1402428.00 rows=14714059 width=5) (actual time=0.037..3601.882 rows=11767943 loops=5) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 229267 Planning Time: 0.218 ms Execution Time: 5153.967 ms
Was ist hier los? Es gab 2-mal mehr Workflows und die Anfrage war nur 1,6599-mal schneller. Die Berechnungen sind interessant. Wir hatten 2 Arbeitsprozesse und 1 Leiter. Nach der Änderung wurde es 4 + 1.
Unsere maximale Beschleunigung durch Parallelverarbeitung: 5/3 = 1,66 (6) mal.
Wie funktioniert es
Die Prozesse
Die Ausführung einer Anfrage beginnt immer mit einem führenden Prozess. Der Leiter erledigt alles nicht parallel und ist Teil der Parallelverarbeitung. Andere Prozesse, die dieselben Anforderungen ausführen, werden als Workflows bezeichnet. Die parallele Verarbeitung verwendet eine Infrastruktur dynamischer Hintergrundworkflows (seit Version 9.4). Da andere Teile von PostgreSQL Prozesse anstelle von Threads verwenden, kann eine Abfrage mit drei Workflows viermal schneller sein als die herkömmliche Verarbeitung.
Interaktion
Workflows kommunizieren mit dem Leiter über eine Nachrichtenwarteschlange (basierend auf dem gemeinsam genutzten Speicher). Jeder Prozess hat 2 Warteschlangen: für Fehler und für Tupel.
Wie viele Arbeitsprozesse benötigen Sie?
Die Mindestgrenze wird durch den Parameter max_parallel_workers_per_gather
. Anschließend nimmt der Abfrage-Executor Workflows aus dem Pool, der durch den Parameter max_parallel_workers size
ist. Die letzte Einschränkung ist max_worker_processes
, d. H. Die Gesamtzahl der Hintergrundprozesse.
Wenn es nicht möglich war, einen Workflow zuzuweisen, erfolgt die Verarbeitung in einem einzigen Prozess.
Der Abfrageplaner kann Workflows abhängig von der Größe der Tabelle oder des Index verkürzen. min_parallel_table_scan_size
min_parallel_index_scan_size
Parameter min_parallel_table_scan_size
und min_parallel_index_scan_size
.
set min_parallel_table_scan_size='8MB' 8MB table => 1 worker 24MB table => 2 workers 72MB table => 3 workers x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker
Jedes Mal, wenn eine Tabelle dreimal größer als min_parallel_(index|table)_scan_size
, fügt Postgres einen Workflow hinzu. Die Anzahl der Arbeitsprozesse ist nicht kostenbasiert. Zirkuläre Abhängigkeit erschwert komplexe Implementierungen. Stattdessen verwendet der Scheduler einfache Regeln.
In der Praxis sind diese Regeln nicht immer für die Produktion geeignet, daher können Sie die Anzahl der Workflows für eine bestimmte Tabelle ändern: ALTER TABLE ... SET ( parallel_workers = N
).
Warum wird die Parallelverarbeitung nicht verwendet?
Neben einer langen Liste von Einschränkungen gibt es auch Kostenprüfungen:
parallel_setup_cost
- verzichtet auf die parallele Verarbeitung von Kurzanforderungen. Dieser Parameter schätzt die Zeit für die Vorbereitung des Speichers, den Start des Prozesses und den anfänglichen Datenaustausch.
parallel_tuple_cost
: Die Kommunikation zwischen einem Leiter und Arbeitnehmern kann proportional zur Anzahl der Tupel aus Arbeitsprozessen verzögert werden. Dieser Parameter berechnet die Datenaustauschkosten.
Verschachtelte Schleife verbinden
PostgreSQL 9.6+ — . explain (costs off) select c_custkey, count(o_orderkey) from customer left outer join orders on c_custkey = o_custkey and o_comment not like '%special%deposits%' group by c_custkey; QUERY PLAN -------------------------------------------------------------------------------------- Finalize GroupAggregate Group Key: customer.c_custkey -> Gather Merge Workers Planned: 4 -> Partial GroupAggregate Group Key: customer.c_custkey -> Nested Loop Left Join -> Parallel Index Only Scan using customer_pkey on customer -> Index Scan using idx_orders_custkey on orders Index Cond: (customer.c_custkey = o_custkey) Filter: ((o_comment)::text !~~ '%special%deposits%'::text)
Die Erfassung erfolgt in der letzten Phase, sodass die Linksverknüpfung für verschachtelte Schleifen eine Paralleloperation ist. Nur paralleler Index-Scan wurde nur in Version 10 angezeigt. Er funktioniert ähnlich wie paralleles serielles Scannen. Die Bedingung c_custkey = o_custkey
liest eine Bestellung für jede c_custkey = o_custkey
. Es ist also nicht parallel.
Hash Join - Hash Join
Jeder Workflow erstellt vor PostgreSQL 11 eine eigene Hash-Tabelle. Wenn mehr als vier dieser Prozesse vorhanden sind, wird die Leistung nicht verbessert. In der neuen Version wird die Hash-Tabelle gemeinsam genutzt. Jeder Workflow kann WORK_MEM verwenden, um eine Hash-Tabelle zu erstellen.
select l_shipmode, sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority <> '1-URGENT' and o_orderpriority <> '2-HIGH' then 1 else 0 end) as low_line_count from orders, lineitem where o_orderkey = l_orderkey and l_shipmode in ('MAIL', 'AIR') and l_commitdate < l_receiptdate and l_shipdate < l_commitdate and l_receiptdate >= date '1996-01-01' and l_receiptdate < date '1996-01-01' + interval '1' year group by l_shipmode order by l_shipmode LIMIT 1; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1) -> Finalize GroupAggregate (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1) Group Key: lineitem.l_shipmode -> Gather Merge (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial GroupAggregate (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5) Group Key: lineitem.l_shipmode -> Sort (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5) Sort Key: lineitem.l_shipmode Sort Method: external merge Disk: 2304kB Worker 0: Sort Method: external merge Disk: 2064kB Worker 1: Sort Method: external merge Disk: 2384kB Worker 2: Sort Method: external merge Disk: 2264kB Worker 3: Sort Method: external merge Disk: 2336kB -> Parallel Hash Join (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5) Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) -> Parallel Seq Scan on lineitem (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5) Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone)) Rows Removed by Filter: 11934691 -> Parallel Hash (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5) Buckets: 65536 Batches: 256 Memory Usage: 3840kB -> Parallel Seq Scan on orders (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5) Planning Time: 0.977 ms Execution Time: 7923.770 ms
Die Anforderung 12 von TPC-H zeigt eine parallele Hash-Verbindung. Jeder Workflow ist an der Erstellung einer gemeinsam genutzten Hash-Tabelle beteiligt.
Zusammenführen Verbinden
Ein Merge-Join ist nicht paralleler Natur. Machen Sie sich keine Sorgen, wenn dies die letzte Stufe der Anforderung ist - sie kann weiterhin parallel ausgeführt werden.
-- Query 2 from TPC-H explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment from part, supplier, partsupp, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 36 and p_type like '%BRASS' and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' and ps_supplycost = ( select min(ps_supplycost) from partsupp, supplier, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' ) order by s_acctbal desc, n_name, s_name, p_partkey LIMIT 100; QUERY PLAN ---------------------------------------------------------------------------------------------------------- Limit -> Sort Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey -> Merge Join Merge Cond: (part.p_partkey = partsupp.ps_partkey) Join Filter: (partsupp.ps_supplycost = (SubPlan 1)) -> Gather Merge Workers Planned: 4 -> Parallel Index Scan using <strong>part_pkey</strong> on part Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36)) -> Materialize -> Sort Sort Key: partsupp.ps_partkey -> Nested Loop -> Nested Loop Join Filter: (nation.n_regionkey = region.r_regionkey) -> Seq Scan on region Filter: (r_name = 'AMERICA'::bpchar) -> Hash Join Hash Cond: (supplier.s_nationkey = nation.n_nationkey) -> Seq Scan on supplier -> Hash -> Seq Scan on nation -> Index Scan using idx_partsupp_suppkey on partsupp Index Cond: (ps_suppkey = supplier.s_suppkey) SubPlan 1 -> Aggregate -> Nested Loop Join Filter: (nation_1.n_regionkey = region_1.r_regionkey) -> Seq Scan on region region_1 Filter: (r_name = 'AMERICA'::bpchar) -> Nested Loop -> Nested Loop -> Index Scan using idx_partsupp_partkey on partsupp partsupp_1 Index Cond: (part.p_partkey = ps_partkey) -> Index Scan using supplier_pkey on supplier supplier_1 Index Cond: (s_suppkey = partsupp_1.ps_suppkey) -> Index Scan using nation_pkey on nation nation_1 Index Cond: (n_nationkey = supplier_1.s_nationkey)
Der Merge Join-Knoten befindet sich über dem Gather Merge. Die Zusammenführung verwendet also keine Parallelverarbeitung. Der Knoten Parallel Index Scan hilft jedoch weiterhin beim Segment part_pkey
.
Abschnitt Verbindung
In PostgreSQL 11 ist die Partitionierung standardmäßig deaktiviert: Die Planung ist sehr teuer. Tabellen mit ähnlicher Partitionierung können abschnittsweise verbunden werden. Postgres verwendet also kleinere Hash-Tabellen. Jede Abschnittsverbindung kann parallel sein.
tpch=# set enable_partitionwise_join=t; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN --------------------------------------------------- Append -> Hash Join Hash Cond: (t2.b = t1.a) -> Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p1 t1 Filter: (b = 0) -> Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p2 t1_1 Filter: (b = 0) tpch=# set parallel_setup_cost = 1; tpch=# set parallel_tuple_cost = 0.01; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN ----------------------------------------------------------- Gather Workers Planned: 4 -> Parallel Append -> Parallel Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Parallel Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p2 t1_1 Filter: (b = 0) -> Parallel Hash Join Hash Cond: (t2.b = t1.a) -> Parallel Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p1 t1 Filter: (b = 0)
Hauptsache, die Verbindung in Abschnitten ist nur dann parallel, wenn diese Abschnitte groß genug sind.
Paralleles Anhängen - Paralleles Anhängen
Paralleles Anhängen kann anstelle verschiedener Blöcke in verschiedenen Workflows verwendet werden. Dies geschieht normalerweise bei UNION ALL-Abfragen. Der Nachteil ist weniger Parallelität, da jeder Workflow nur eine Anforderung verarbeitet.
Hier werden 2 Workflows ausgeführt, obwohl 4 enthalten sind.
tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day; QUERY PLAN ------------------------------------------------------------------------------------------------ Gather Workers Planned: 2 -> Parallel Append -> Aggregate -> Seq Scan on lineitem Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone) -> Aggregate -> Seq Scan on lineitem lineitem_1 Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Die wichtigsten Variablen
Zusammenfassung
Ab Version 9.6 kann die parallele Verarbeitung die Leistung komplexer Abfragen, die viele Zeilen oder Indizes scannen, erheblich verbessern. In PostgreSQL 10 ist die Parallelverarbeitung standardmäßig aktiviert. Denken Sie daran, es auf Servern mit einer großen OLTP-Arbeitslast zu deaktivieren. Sequentielle Scans oder Index-Scans verbrauchen viele Ressourcen. Wenn Sie nicht über das gesamte Dataset berichten, können Abfragen effizienter gestaltet werden, indem Sie einfach die fehlenden Indizes hinzufügen oder die richtige Partitionierung verwenden.
Referenzen