PostgreSQL中的并行查询


现代CPU中有很多核心。 多年来,应用程序已将查询并行发送到数据库。 如果这是对表中多个行的报告查询,则在使用多个CPU时运行速度更快,而在PostgreSQL中,可以从9.6版开始运行。


实现并行查询功能花了3年的时间-我不得不在查询执行的不同阶段重写代码。 PostgreSQL 9.6引入了基础结构以进一步改进代码。 在后续版本中,其他类型的查询将并行执行。


局限性


  • 如果所有内核都已使用,则不要启用并行执行,否则其他请求将变慢。
  • 最重要的是,具有高WORK_MEM值的并行处理会消耗大量内存-每个哈希联接或排序都会占用work_mem数量的内存。
  • 低延迟OLTP请求不能通过并行执行来加速。 如果查询返回一行,那么并行处理只会减慢它的速度。
  • 开发人员喜欢使用TPC-H基准测试。 也许您有类似的查询以实现完美的并行执行。
  • 只有没有谓词锁的SELECT查询才能并行执行。
  • 有时正确的索引比并行的顺序表扫描要好。
  • 不支持挂起查询和游标。
  • 窗口函数和有序集合的集合函数不是并行的。
  • 您在I / O工作负载中一无所获。
  • 没有并行排序算法。 但是排序查询可以在某些方面并行执行。
  • 用嵌套的SELECT替换CTE(WITH ...)以启用并行处理。
  • 第三方数据包装器尚不支持并行处理(但是可以!)
  • 不支持FULL OUTER JOIN。
  • max_rows禁用并行处理。
  • 如果该请求具有未标记为PARALLEL SAFE的功能,则它将是单线程的。
  • 事务隔离级别SERIALIZABLE禁用并行处理。

测试环境


PostgreSQL开发人员试图减少TPC-H基准查询的响应时间。 下载基准测试, 使其适应PostgreSQL 。 这是TPC-H基准的非官方使用-并非用于比较数据库或硬件。


  1. 从非现场TPC下载TPC-H_Tools_v2.17.3.zip(或更新版本)。
  2. 将makefile.suite重命名为Makefile并按以下说明进行更改: https : //github.com/tvondra/pg_tpch 。 使用make命令编译代码。
  3. 生成数据: ./dbgen -s 10创建一个23 GB的数据库。 这足以看出并行查询和非并行查询的性能差异。
  4. csv forsedtbl文件转换为csv for
  5. 克隆pg_tpch存储库,然后将csv复制到pg_tpch/dss/data
  6. 使用qgen命令创建查询。
  7. 使用./tpch.sh命令将数据上传到数据库。

并行顺序扫描


可能不是因为并行读取,而是因为数据分散在许多CPU内核中,所以速度更快。 在现代操作系统上,PostgreSQL数据文件被很好地缓存。 通过预读,您可以从存储中获得比PG守护程序请求更多的信息。 因此,查询性能不受磁盘I / O的限制。 它消耗CPU周期来:


  • 从表格的页面一一读取
  • 比较字符串值和WHERE子句。

让我们运行一个简单的select查询:


 tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------- Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 1146337 Planning Time: 0.203 ms Execution Time: 19035.100 ms 

顺序扫描产生太多行而没有聚合,因此该请求由单个CPU内核执行。


如果添加SUM() ,则会看到两个工作流程将有助于加快请求的速度:


 explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1) -> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 -> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3) -> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 382112 Planning Time: 0.241 ms Execution Time: 8555.131 ms 

并行聚合


并行Seq扫描节点生成用于部分聚合的字符串。 部分聚合节点使用SUM()截断这些行。 最后,来自每个工作流的SUM计数器由Gather节点收集。


最终结果由“最终汇总”节点计算。 如果您拥有自己的聚合功能,请确保将其标记为“并行安全”。


工作流程数


无需重启服务器即可增加工作流数量:


 alter system set max_parallel_workers_per_gather=4; select * from pg_reload_conf(); 

现在我们看到4个工人在解释输出:


 tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1440213.58..1440213.59 rows=1 width=32) (actual time=5152.072..5152.072 rows=1 loops=1) -> Gather (cost=1440213.15..1440213.56 rows=4 width=32) (actual time=5151.807..5153.900 rows=5 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial Aggregate (cost=1439213.15..1439213.16 rows=1 width=32) (actual time=5147.238..5147.239 rows=1 loops=5) -> Parallel Seq Scan on lineitem (cost=0.00..1402428.00 rows=14714059 width=5) (actual time=0.037..3601.882 rows=11767943 loops=5) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 229267 Planning Time: 0.218 ms Execution Time: 5153.967 ms 

这是怎么回事 工作流程增加了2倍,而请求仅提高了1.6599倍。 计算很有趣。 我们有2个工作流程和1个领导者。 更改后,变为4 +1。


我们从并行处理中获得的最大加速:5/3 = 1.66(6)倍。


如何运作?


流程


请求的执行始终始于领先流程。 领导者执行所有非并行的工作,并且是并行处理的一部分。 执行相同请求的其他流程称为工作流程。 并行处理使用动态后台工作流的基础结构(自9.4版开始)。 由于PostgreSQL的其他部分使用进程而不是线程,因此具有3个工作流的查询可能比传统处理快4倍。


互动互动


工作流通过消息队列(基于共享内存)与领导者进行通信。 每个进程都有2个队列:错误队列和元组队列。


您需要多少个工作流程?


最小限制由max_parallel_workers_per_gather参数设置。 然后,查询执行程序从受max_parallel_workers size参数限制的池中获取工作流。 最后一个限制是max_worker_processes ,即后台进程的总数。


如果无法分配工作流程,则处理将是单过程。


查询计划程序可以根据表或索引的大小来缩短工作流程。 min_parallel_table_scan_sizemin_parallel_table_scan_sizemin_parallel_index_scan_size参数。


 set min_parallel_table_scan_size='8MB' 8MB table => 1 worker 24MB table => 2 workers 72MB table => 3 workers x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker 

每次表比min_parallel_(index|table)_scan_size大3倍时,Postgres会添加工作流。 工作流程的数量不是基于成本的。 循环依赖性使复杂的实现复杂化。 而是,调度程序使用简单的规则。


实际上,这些规则并不总是适用于生产,因此您可以更改特定表的工作流数量:ALTER TABLE ... SET( parallel_workers = N )。


为什么不使用并行处理?


除了一长串的限制外,还进行成本检查:


parallel_setup_cost在不对短请求进行并行处理的情况下执行。 此参数估计准备内存,启动进程和初始数据交换的时间。


parallel_tuple_cost :领导者与工人之间的通信可能会延迟,与工作流程中元组的数量成正比。 此参数计算数据交换成本。


嵌套循环联接


 PostgreSQL 9.6+      —   . explain (costs off) select c_custkey, count(o_orderkey) from customer left outer join orders on c_custkey = o_custkey and o_comment not like '%special%deposits%' group by c_custkey; QUERY PLAN -------------------------------------------------------------------------------------- Finalize GroupAggregate Group Key: customer.c_custkey -> Gather Merge Workers Planned: 4 -> Partial GroupAggregate Group Key: customer.c_custkey -> Nested Loop Left Join -> Parallel Index Only Scan using customer_pkey on customer -> Index Scan using idx_orders_custkey on orders Index Cond: (customer.c_custkey = o_custkey) Filter: ((o_comment)::text !~~ '%special%deposits%'::text) 

收集发生在最后阶段,因此嵌套循环左连接是并行操作。 仅并行索引扫描仅在版本10中出现。其工作方式与并行串行扫描相似。 条件c_custkey = o_custkey为每个客户线读取一个订单。 因此它不是并行的。


哈希联接-哈希联接


每个工作流程都在PostgreSQL 11之前创建自己的哈希表。而且,如果这些流程中有四个以上,则性能将不会提高。 在新版本中,哈希表是共享的。 每个工作流程都可以使用WORK_MEM创建哈希表。


 select l_shipmode, sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority <> '1-URGENT' and o_orderpriority <> '2-HIGH' then 1 else 0 end) as low_line_count from orders, lineitem where o_orderkey = l_orderkey and l_shipmode in ('MAIL', 'AIR') and l_commitdate < l_receiptdate and l_shipdate < l_commitdate and l_receiptdate >= date '1996-01-01' and l_receiptdate < date '1996-01-01' + interval '1' year group by l_shipmode order by l_shipmode LIMIT 1; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1) -> Finalize GroupAggregate (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1) Group Key: lineitem.l_shipmode -> Gather Merge (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial GroupAggregate (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5) Group Key: lineitem.l_shipmode -> Sort (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5) Sort Key: lineitem.l_shipmode Sort Method: external merge Disk: 2304kB Worker 0: Sort Method: external merge Disk: 2064kB Worker 1: Sort Method: external merge Disk: 2384kB Worker 2: Sort Method: external merge Disk: 2264kB Worker 3: Sort Method: external merge Disk: 2336kB -> Parallel Hash Join (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5) Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) -> Parallel Seq Scan on lineitem (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5) Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone)) Rows Removed by Filter: 11934691 -> Parallel Hash (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5) Buckets: 65536 Batches: 256 Memory Usage: 3840kB -> Parallel Seq Scan on orders (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5) Planning Time: 0.977 ms Execution Time: 7923.770 ms 

来自TPC-H的请求12说明了并行哈希连接。 每个工作流程都涉及创建共享哈希表。


合并加入


本质上,合并联接不是并行的。 如果这是请求的最后阶段,请不要担心-它仍然可以并行执行。


 -- Query 2 from TPC-H explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment from part, supplier, partsupp, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 36 and p_type like '%BRASS' and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' and ps_supplycost = ( select min(ps_supplycost) from partsupp, supplier, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' ) order by s_acctbal desc, n_name, s_name, p_partkey LIMIT 100; QUERY PLAN ---------------------------------------------------------------------------------------------------------- Limit -> Sort Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey -> Merge Join Merge Cond: (part.p_partkey = partsupp.ps_partkey) Join Filter: (partsupp.ps_supplycost = (SubPlan 1)) -> Gather Merge Workers Planned: 4 -> Parallel Index Scan using <strong>part_pkey</strong> on part Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36)) -> Materialize -> Sort Sort Key: partsupp.ps_partkey -> Nested Loop -> Nested Loop Join Filter: (nation.n_regionkey = region.r_regionkey) -> Seq Scan on region Filter: (r_name = 'AMERICA'::bpchar) -> Hash Join Hash Cond: (supplier.s_nationkey = nation.n_nationkey) -> Seq Scan on supplier -> Hash -> Seq Scan on nation -> Index Scan using idx_partsupp_suppkey on partsupp Index Cond: (ps_suppkey = supplier.s_suppkey) SubPlan 1 -> Aggregate -> Nested Loop Join Filter: (nation_1.n_regionkey = region_1.r_regionkey) -> Seq Scan on region region_1 Filter: (r_name = 'AMERICA'::bpchar) -> Nested Loop -> Nested Loop -> Index Scan using idx_partsupp_partkey on partsupp partsupp_1 Index Cond: (part.p_partkey = ps_partkey) -> Index Scan using supplier_pkey on supplier supplier_1 Index Cond: (s_suppkey = partsupp_1.ps_suppkey) -> Index Scan using nation_pkey on nation nation_1 Index Cond: (n_nationkey = supplier_1.s_nationkey) 

合并联接节点位于聚集合并上方。 因此,合并不使用并行处理。 但是“并行索引扫描”节点仍然可以帮助part_pkey段。


节连接


在PostgreSQL 11中,默认情况下禁用分区:它的调度非常昂贵。 具有类似分区的表可以逐节连接。 因此Postgres将使用较小的哈希表。 每个部分的连接可以并行。


 tpch=# set enable_partitionwise_join=t; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN --------------------------------------------------- Append -> Hash Join Hash Cond: (t2.b = t1.a) -> Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p1 t1 Filter: (b = 0) -> Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p2 t1_1 Filter: (b = 0) tpch=# set parallel_setup_cost = 1; tpch=# set parallel_tuple_cost = 0.01; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN ----------------------------------------------------------- Gather Workers Planned: 4 -> Parallel Append -> Parallel Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Parallel Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p2 t1_1 Filter: (b = 0) -> Parallel Hash Join Hash Cond: (t2.b = t1.a) -> Parallel Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p1 t1 Filter: (b = 0) 

最主要的是,只有这些部分足够大时,这些部分中的连接才是平行的。


并行追加-并行追加


在不同的工作流程中,可以使用“ 并行追加”代替不同的块。 这通常发生在UNION ALL查询中。 缺点是并行性较差,因为每个工作流程仅处理1个请求。


尽管其中包含4个工作流,但这里正在运行2个工作流。


 tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day; QUERY PLAN ------------------------------------------------------------------------------------------------ Gather Workers Planned: 2 -> Parallel Append -> Aggregate -> Seq Scan on lineitem Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone) -> Aggregate -> Seq Scan on lineitem lineitem_1 Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) 

最重要的变量



总结


从9.6版开始,并行处理可以严重提高扫描许多行或索引的复杂查询的性能。 在PostgreSQL 10中,默认情况下启用并行处理。 请记住在具有大量OLTP工作负载的服务器上禁用它。 顺序扫描或索引扫描会消耗大量资源。 如果您不是在整个数据集中进行报告,则只需添加缺少的索引或使用正确的分区,便可以提高查询效率。


参考文献


Source: https://habr.com/ru/post/zh-CN446706/


All Articles