Consultas paralelas no PostgreSQL


Existem muitos núcleos nas CPUs modernas. Durante anos, os aplicativos enviaram consultas aos bancos de dados em paralelo. Se esta é uma consulta de relatório para várias linhas em uma tabela, ela é executada mais rapidamente quando usa várias CPUs, e no PostgreSQL é possível a partir da versão 9.6.


Foram necessários três anos para implementar a função de consulta paralela - tive que reescrever o código em diferentes estágios da execução da consulta. O PostgreSQL 9.6 introduziu uma infraestrutura para melhorar ainda mais o código. Nas versões subseqüentes, outros tipos de consultas são executados em paralelo.


Limitações


  • Não ative a execução paralela se todos os núcleos já estiverem ocupados, caso contrário, outras solicitações ficarão mais lentas.
  • Mais importante, o processamento paralelo com altos valores WORK_MEM consome muita memória - cada junção ou classificação de hash ocupa memória na quantidade de work_mem.
  • Solicitações OLTP de baixa latência não podem ser aceleradas por execução paralela. E se a consulta retornar uma linha, o processamento paralelo apenas diminuirá a velocidade.
  • Os desenvolvedores gostam de usar o benchmark TPC-H. Talvez você tenha consultas semelhantes para uma execução paralela perfeita.
  • Somente consultas SELECT sem bloqueios de predicado são executadas em paralelo.
  • Às vezes, a indexação correta é melhor do que as varreduras seqüenciais da tabela em paralelo.
  • Suspender consultas e cursores não são suportados.
  • Funções de janela e funções agregadas de conjuntos ordenados não são paralelas.
  • Você não ganha nada na carga de trabalho de E / S.
  • Não há algoritmos de classificação paralela. Mas as consultas classificadas podem ser executadas em paralelo em alguns aspectos.
  • Substitua CTE (WITH ...) por um SELECT aninhado para ativar o processamento paralelo.
  • Os invólucros de dados de terceiros ainda não oferecem suporte ao processamento paralelo (mas poderiam!)
  • JOIN EXTERNO CHEIO não é suportado.
  • max_rows desativa o processamento paralelo.
  • Se a solicitação tiver uma função que não esteja marcada como PARALLEL SAFE, ela será de thread único.
  • O nível de isolamento de transação SERIALIZABLE desabilita o processamento paralelo.

Ambiente de teste


Os desenvolvedores do PostgreSQL tentaram reduzir o tempo de resposta das consultas de benchmark TPC-H. Faça o download do benchmark e adapte-o ao PostgreSQL . Esse é um uso não oficial do benchmark TPC-H - não para comparar bancos de dados ou hardware.


  1. Faça o download do TPC-H_Tools_v2.17.3.zip (ou uma versão mais recente) do TPC externo .
  2. Renomeie o makefile.suite para Makefile e altere conforme descrito aqui: https://github.com/tvondra/pg_tpch . Compile o código com o comando make.
  3. Gerar dados: ./dbgen -s 10 cria um banco de dados de 23 GB. Isso é suficiente para ver a diferença no desempenho de consultas paralelas e não paralelas.
  4. Converta arquivos tbl para csv for e sed .
  5. Clone o repositório pg_tpch e copie os csv para pg_tpch/dss/data .
  6. Crie consultas com o comando qgen .
  7. Faça o upload dos dados no banco de dados com o comando ./tpch.sh .

Varredura sequencial paralela


Pode ser mais rápido, não por causa da leitura paralela, mas porque os dados estão espalhados por muitos núcleos da CPU. Nos sistemas operacionais modernos, os arquivos de dados do PostgreSQL são bem armazenados em cache. Com a leitura antecipada, você pode obter mais do armazenamento do que o daemon PG solicita. Portanto, o desempenho da consulta não é limitado pela E / S do disco. Consome ciclos da CPU para:


  • leia as linhas uma a uma das páginas da tabela;
  • Compare valores de sequência e cláusulas WHERE .

Vamos executar uma consulta de select simples:


 tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------- Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 1146337 Planning Time: 0.203 ms Execution Time: 19035.100 ms 

Uma varredura seqüencial gera muitas linhas sem agregação; portanto, a solicitação é executada por um único núcleo da CPU.


Se você adicionar SUM() , poderá ver que dois fluxos de trabalho ajudarão a acelerar a solicitação:


 explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1) -> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 -> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3) -> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 382112 Planning Time: 0.241 ms Execution Time: 8555.131 ms 

Agregação paralela


O nó Parallel Seq Scan produz seqüências de caracteres para agregação parcial. O nó Agregado Parcial trunca essas linhas usando SUM() . No final, o contador SUM de cada fluxo de trabalho é coletado pelo nó Gather.


O resultado final é calculado pelo nó "Finalizar agregado". Se você possui suas próprias funções de agregação, marque-as como "paralelas seguras".


Número de fluxos de trabalho


O número de fluxos de trabalho pode ser aumentado sem reiniciar o servidor:


 alter system set max_parallel_workers_per_gather=4; select * from pg_reload_conf(); 

Agora vemos 4 trabalhadores na saída de explicação:


 tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1440213.58..1440213.59 rows=1 width=32) (actual time=5152.072..5152.072 rows=1 loops=1) -> Gather (cost=1440213.15..1440213.56 rows=4 width=32) (actual time=5151.807..5153.900 rows=5 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial Aggregate (cost=1439213.15..1439213.16 rows=1 width=32) (actual time=5147.238..5147.239 rows=1 loops=5) -> Parallel Seq Scan on lineitem (cost=0.00..1402428.00 rows=14714059 width=5) (actual time=0.037..3601.882 rows=11767943 loops=5) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 229267 Planning Time: 0.218 ms Execution Time: 5153.967 ms 

O que está acontecendo aqui? Havia 2 vezes mais fluxos de trabalho, e a solicitação era apenas 1,6599 vezes mais rápida. Os cálculos são interessantes. Tivemos 2 processos de trabalho e 1 líder. Após a mudança, tornou-se 4 + 1.


Nossa aceleração máxima do processamento paralelo: 5/3 = 1,66 (6) vezes.


Como isso funciona?


Os processos


A execução de uma solicitação sempre começa com um processo inicial. O líder faz tudo de forma não paralela e faz parte do processamento paralelo. Outros processos que executam as mesmas solicitações são chamados de fluxos de trabalho. O processamento paralelo usa uma infraestrutura de fluxos de trabalho dinâmicos em segundo plano (desde a versão 9.4). Como outras partes do PostgreSQL usam processos em vez de threads, uma consulta com 3 fluxos de trabalho pode ser 4 vezes mais rápida que o processamento tradicional.


Interação


Os fluxos de trabalho se comunicam com o líder por meio de uma fila de mensagens (com base na memória compartilhada). Cada processo possui 2 filas: para erros e para tuplas.


Quantos processos de trabalho você precisa?


O limite mínimo é definido pelo parâmetro max_parallel_workers_per_gather . Em seguida, o executor de consulta obtém fluxos de trabalho do pool limitados pelo parâmetro de max_parallel_workers size . A última limitação é max_worker_processes , ou seja, o número total de processos em segundo plano.


Se não foi possível alocar um fluxo de trabalho, o processamento será de processo único.


O planejador de consultas pode reduzir os fluxos de trabalho, dependendo do tamanho da tabela ou do índice. Existem parâmetros min_parallel_index_scan_size e min_parallel_index_scan_size para isso.


 set min_parallel_table_scan_size='8MB' 8MB table => 1 worker 24MB table => 2 workers 72MB table => 3 workers x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker 

Cada vez que uma tabela é 3 vezes maior que min_parallel_(index|table)_scan_size , o Postgres adiciona um fluxo de trabalho. O número de processos de trabalho não é baseado em custos. A dependência circular complica implementações complexas. Em vez disso, o planejador usa regras simples.


Na prática, essas regras nem sempre são adequadas para produção, portanto, você pode alterar o número de fluxos de trabalho para uma tabela específica: ALTER TABLE ... SET ( parallel_workers = N ).


Por que o processamento paralelo não é usado?


Além de uma longa lista de restrições, também existem verificações de custos:


parallel_setup_cost - para fazer sem processamento paralelo de solicitações curtas. Este parâmetro calcula o tempo para preparar a memória, iniciar o processo e trocar dados inicial.


parallel_tuple_cost : a comunicação entre um líder e os trabalhadores pode ser atrasada na proporção do número de tuplas dos processos de trabalho. Este parâmetro calcula os custos de troca de dados.


Junção de loop aninhado


 PostgreSQL 9.6+      —   . explain (costs off) select c_custkey, count(o_orderkey) from customer left outer join orders on c_custkey = o_custkey and o_comment not like '%special%deposits%' group by c_custkey; QUERY PLAN -------------------------------------------------------------------------------------- Finalize GroupAggregate Group Key: customer.c_custkey -> Gather Merge Workers Planned: 4 -> Partial GroupAggregate Group Key: customer.c_custkey -> Nested Loop Left Join -> Parallel Index Only Scan using customer_pkey on customer -> Index Scan using idx_orders_custkey on orders Index Cond: (customer.c_custkey = o_custkey) Filter: ((o_comment)::text !~~ '%special%deposits%'::text) 

A coleção ocorre no último estágio, portanto, a Junta esquerda do loop aninhado é uma operação paralela. A Varredura Paralela Somente para Índice apareceu apenas na versão 10. Funciona de maneira semelhante à varredura serial paralela. A condição c_custkey = o_custkey lê um pedido para cada linha do cliente. Portanto, não é paralelo.


Hash Join - Hash Join


Cada fluxo de trabalho cria sua própria tabela de hash antes do PostgreSQL 11. E se houver mais de quatro desses processos, o desempenho não melhorará. Na nova versão, a tabela de hash é compartilhada. Cada fluxo de trabalho pode usar WORK_MEM para criar uma tabela de hash.


 select l_shipmode, sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority <> '1-URGENT' and o_orderpriority <> '2-HIGH' then 1 else 0 end) as low_line_count from orders, lineitem where o_orderkey = l_orderkey and l_shipmode in ('MAIL', 'AIR') and l_commitdate < l_receiptdate and l_shipdate < l_commitdate and l_receiptdate >= date '1996-01-01' and l_receiptdate < date '1996-01-01' + interval '1' year group by l_shipmode order by l_shipmode LIMIT 1; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1) -> Finalize GroupAggregate (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1) Group Key: lineitem.l_shipmode -> Gather Merge (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial GroupAggregate (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5) Group Key: lineitem.l_shipmode -> Sort (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5) Sort Key: lineitem.l_shipmode Sort Method: external merge Disk: 2304kB Worker 0: Sort Method: external merge Disk: 2064kB Worker 1: Sort Method: external merge Disk: 2384kB Worker 2: Sort Method: external merge Disk: 2264kB Worker 3: Sort Method: external merge Disk: 2336kB -> Parallel Hash Join (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5) Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) -> Parallel Seq Scan on lineitem (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5) Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone)) Rows Removed by Filter: 11934691 -> Parallel Hash (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5) Buckets: 65536 Batches: 256 Memory Usage: 3840kB -> Parallel Seq Scan on orders (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5) Planning Time: 0.977 ms Execution Time: 7923.770 ms 

A solicitação 12 do TPC-H ilustra uma conexão hash paralela. Cada fluxo de trabalho está envolvido na criação de uma tabela de hash compartilhada.


Mesclar junção


Uma junção de mesclagem não é paralela por natureza. Não se preocupe se esse for o último estágio da solicitação - ele ainda pode ser executado em paralelo.


 -- Query 2 from TPC-H explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment from part, supplier, partsupp, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 36 and p_type like '%BRASS' and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' and ps_supplycost = ( select min(ps_supplycost) from partsupp, supplier, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' ) order by s_acctbal desc, n_name, s_name, p_partkey LIMIT 100; QUERY PLAN ---------------------------------------------------------------------------------------------------------- Limit -> Sort Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey -> Merge Join Merge Cond: (part.p_partkey = partsupp.ps_partkey) Join Filter: (partsupp.ps_supplycost = (SubPlan 1)) -> Gather Merge Workers Planned: 4 -> Parallel Index Scan using <strong>part_pkey</strong> on part Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36)) -> Materialize -> Sort Sort Key: partsupp.ps_partkey -> Nested Loop -> Nested Loop Join Filter: (nation.n_regionkey = region.r_regionkey) -> Seq Scan on region Filter: (r_name = 'AMERICA'::bpchar) -> Hash Join Hash Cond: (supplier.s_nationkey = nation.n_nationkey) -> Seq Scan on supplier -> Hash -> Seq Scan on nation -> Index Scan using idx_partsupp_suppkey on partsupp Index Cond: (ps_suppkey = supplier.s_suppkey) SubPlan 1 -> Aggregate -> Nested Loop Join Filter: (nation_1.n_regionkey = region_1.r_regionkey) -> Seq Scan on region region_1 Filter: (r_name = 'AMERICA'::bpchar) -> Nested Loop -> Nested Loop -> Index Scan using idx_partsupp_partkey on partsupp partsupp_1 Index Cond: (part.p_partkey = ps_partkey) -> Index Scan using supplier_pkey on supplier supplier_1 Index Cond: (s_suppkey = partsupp_1.ps_suppkey) -> Index Scan using nation_pkey on nation nation_1 Index Cond: (n_nationkey = supplier_1.s_nationkey) 

O nó Merge Join está localizado acima da Gather Merge. Portanto, a mesclagem não usa processamento paralelo. Mas o nó Parallel Index Scan ainda ajuda no segmento part_pkey .


Seção Conexão


No PostgreSQL 11, o particionamento é desativado por padrão: possui um agendamento muito caro. Tabelas com particionamento semelhante podem ser unidas seção por seção. Portanto, o Postgres usará tabelas de hash menores. Cada conexão de seção pode ser paralela.


 tpch=# set enable_partitionwise_join=t; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN --------------------------------------------------- Append -> Hash Join Hash Cond: (t2.b = t1.a) -> Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p1 t1 Filter: (b = 0) -> Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p2 t1_1 Filter: (b = 0) tpch=# set parallel_setup_cost = 1; tpch=# set parallel_tuple_cost = 0.01; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN ----------------------------------------------------------- Gather Workers Planned: 4 -> Parallel Append -> Parallel Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Parallel Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p2 t1_1 Filter: (b = 0) -> Parallel Hash Join Hash Cond: (t2.b = t1.a) -> Parallel Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p1 t1 Filter: (b = 0) 

O principal é que a conexão nas seções seja paralela apenas se essas seções forem grandes o suficiente.


Anexo Paralelo - Anexo Paralelo


Anexar paralelo pode ser usado em vez de diferentes blocos em diferentes fluxos de trabalho. Isso geralmente acontece com as consultas UNION ALL. A desvantagem é menos paralelismo, porque cada fluxo de trabalho processa apenas 1 solicitação.


Dois fluxos de trabalho estão em execução aqui, embora quatro estejam incluídos.


 tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day; QUERY PLAN ------------------------------------------------------------------------------------------------ Gather Workers Planned: 2 -> Parallel Append -> Aggregate -> Seq Scan on lineitem Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone) -> Aggregate -> Seq Scan on lineitem lineitem_1 Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) 

As variáveis ​​mais importantes


  • WORK_MEM limita a quantidade de memória para cada processo, não apenas para solicitações: processos de conexão work_mem = muita memória.
  • max_parallel_workers_per_gather - quantos processos de trabalho o programa em execução usará para processamento paralelo do plano.
  • max_worker_processes - ajusta o número total de processos de trabalho ao número de núcleos da CPU no servidor.
  • max_parallel_workers é o mesmo, mas para fluxos de trabalho paralelos.

Sumário


A partir da versão 9.6, o processamento paralelo pode melhorar seriamente o desempenho de consultas complexas que varrem muitas linhas ou índices. No PostgreSQL 10, o processamento paralelo é ativado por padrão. Lembre-se de desativá-lo em servidores com uma grande carga de trabalho OLTP. As verificações sequenciais ou de índice consomem muitos recursos. Se você não estiver relatando todo o conjunto de dados, as consultas poderão ser mais eficientes simplesmente adicionando os índices ausentes ou usando o particionamento correto.


Referências


Source: https://habr.com/ru/post/pt446706/


All Articles