
Hay muchos núcleos en las CPU modernas. Durante años, las aplicaciones han enviado consultas a bases de datos en paralelo. Si se trata de una consulta de informes para varias filas en una tabla, se ejecuta más rápido cuando usa múltiples CPU, y en PostgreSQL es posible a partir de la versión 9.6.
Me llevó 3 años implementar la función de consulta paralela. Tuve que reescribir el código en diferentes etapas de la ejecución de la consulta. PostgreSQL 9.6 introdujo una infraestructura para mejorar aún más el código. En versiones posteriores, otros tipos de consultas se ejecutan en paralelo.
Limitaciones
- No habilite la ejecución paralela si todos los núcleos ya están ocupados, de lo contrario, otras solicitudes se ralentizarán.
- Lo más importante es que el procesamiento en paralelo con valores altos de WORK_MEM consume mucha memoria: cada hash join u sort ocupa memoria en la cantidad de work_mem.
- Las solicitudes OLTP de baja latencia no pueden acelerarse mediante ejecución paralela. Y si la consulta devuelve una fila, el procesamiento paralelo solo la ralentizará.
- A los desarrolladores les gusta usar el punto de referencia TPC-H. Quizás tenga consultas similares para una ejecución paralela perfecta.
- Solo las consultas SELECT sin bloqueos de predicado se ejecutan en paralelo.
- A veces, la indexación correcta es mejor que los escaneos de tablas secuenciales en paralelo.
- Suspender consultas y cursores no son compatibles.
- Las funciones de ventana y las funciones agregadas de conjuntos ordenados no son paralelas.
- No gana nada en la carga de trabajo de E / S.
- No hay algoritmos de clasificación paralelos. Pero las consultas ordenadas se pueden ejecutar en paralelo en algunos aspectos.
- Reemplace CTE (WITH ...) con un SELECT anidado para habilitar el procesamiento en paralelo.
- Los contenedores de datos de terceros aún no admiten el procesamiento paralelo (¡pero podrían hacerlo!)
- FULL OUTER JOIN no es compatible.
- max_rows deshabilita el procesamiento paralelo.
- Si la solicitud tiene una función que no está marcada como PARALELO SEGURO, será de un solo subproceso.
- El nivel de aislamiento de transacción SERIALIZABLE deshabilita el procesamiento paralelo.
Entorno de prueba
Los desarrolladores de PostgreSQL han intentado reducir el tiempo de respuesta de las consultas de referencia TPC-H. Descargue el punto de referencia y adáptelo a PostgreSQL . Este es un uso no oficial del punto de referencia TPC-H, no para comparar bases de datos o hardware.
- Descargue TPC-H_Tools_v2.17.3.zip (o una versión más reciente) del TPC externo .
- Cambie el nombre de makefile.suite a Makefile y cámbielo como se describe aquí: https://github.com/tvondra/pg_tpch . Compile el código con el comando make.
- Generar datos:
./dbgen -s 10
crea una base de datos de 23 GB. Esto es suficiente para ver la diferencia en el rendimiento de las consultas paralelas y no paralelas. - Convierte archivos
tbl
a csv for
y sed
. - Clone el repositorio pg_tpch y copie los
csv
a pg_tpch/dss/data
. - Crea consultas con el comando
qgen
. - Suba datos a la base de datos con el comando
./tpch.sh
.
Exploración secuencial paralela
Puede ser más rápido no debido a la lectura paralela, sino a que los datos están dispersos en muchos núcleos de CPU. En los sistemas operativos modernos, los archivos de datos PostgreSQL están bien almacenados en caché. Con la lectura anticipada, puede obtener más del almacenamiento que las solicitudes del demonio PG. Por lo tanto, el rendimiento de la consulta no está limitado por la E / S de disco. Consume ciclos de CPU para:
- lea las líneas una por una de las páginas de la tabla;
- Compare los valores de cadena y las cláusulas
WHERE
.
Ejecutemos una consulta de select
simple:
tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------- Seq Scan on lineitem (cost=0.00..1964772.00 rows=58856235 width=5) (actual time=0.014..16951.669 rows=58839715 loops=1) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 1146337 Planning Time: 0.203 ms Execution Time: 19035.100 ms
Una exploración secuencial produce demasiadas filas sin agregación, por lo que la solicitud la ejecuta un solo núcleo de CPU.
Si agrega SUM()
, verá que dos flujos de trabajo ayudarán a acelerar la solicitud:
explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1589702.14..1589702.15 rows=1 width=32) (actual time=8553.365..8553.365 rows=1 loops=1) -> Gather (cost=1589701.91..1589702.12 rows=2 width=32) (actual time=8553.241..8555.067 rows=3 loops=1) Workers Planned: 2 Workers Launched: 2 -> Partial Aggregate (cost=1588701.91..1588701.92 rows=1 width=32) (actual time=8547.546..8547.546 rows=1 loops=3) -> Parallel Seq Scan on lineitem (cost=0.00..1527393.33 rows=24523431 width=5) (actual time=0.038..5998.417 rows=19613238 loops=3) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 382112 Planning Time: 0.241 ms Execution Time: 8555.131 ms
Agregación Paralela
El nodo Parallel Seq Scan produce cadenas para la agregación parcial. El nodo Agregado parcial trunca estas líneas usando SUM()
. Al final, el contador Gather recopila el contador SUMA de cada flujo de trabajo.
El resultado final se calcula mediante el nodo "Finalizar agregado". Si tiene sus propias funciones de agregación, asegúrese de marcarlas como "seguridad paralela".
Cantidad de flujos de trabajo
La cantidad de flujos de trabajo se puede aumentar sin reiniciar el servidor:
alter system set max_parallel_workers_per_gather=4; select * from pg_reload_conf();
Ahora vemos 4 trabajadores en la salida de explicación:
tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------- Finalize Aggregate (cost=1440213.58..1440213.59 rows=1 width=32) (actual time=5152.072..5152.072 rows=1 loops=1) -> Gather (cost=1440213.15..1440213.56 rows=4 width=32) (actual time=5151.807..5153.900 rows=5 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial Aggregate (cost=1439213.15..1439213.16 rows=1 width=32) (actual time=5147.238..5147.239 rows=1 loops=5) -> Parallel Seq Scan on lineitem (cost=0.00..1402428.00 rows=14714059 width=5) (actual time=0.037..3601.882 rows=11767943 loops=5) Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone) Rows Removed by Filter: 229267 Planning Time: 0.218 ms Execution Time: 5153.967 ms
¿Qué está pasando aquí? Hubo 2 veces más flujos de trabajo, y la solicitud fue solo 1.6599 veces más rápida. Los cálculos son interesantes. Tuvimos 2 procesos de trabajo y 1 líder. Después del cambio, se convirtió en 4 + 1.
Nuestra aceleración máxima del procesamiento en paralelo: 5/3 = 1.66 (6) veces.
Como funciona
Los procesos
La ejecución de una solicitud siempre comienza con un proceso líder. El líder hace todo lo que no es paralelo y parte del procesamiento paralelo. Otros procesos que realizan las mismas solicitudes se denominan flujos de trabajo. El procesamiento en paralelo utiliza una infraestructura de flujos de trabajo dinámicos en segundo plano (desde la versión 9.4). Dado que otras partes de PostgreSQL usan procesos en lugar de hilos, una consulta con 3 flujos de trabajo podría ser 4 veces más rápida que el procesamiento tradicional.
Interacción
Los flujos de trabajo se comunican con el líder a través de una cola de mensajes (basada en la memoria compartida). Cada proceso tiene 2 colas: para errores y para tuplas.
¿Cuántos procesos de trabajo necesitas?
El límite mínimo lo establece el parámetro max_parallel_workers_per_gather
. Luego, el ejecutor de consultas toma flujos de trabajo del grupo limitado por el parámetro de max_parallel_workers size
. La última limitación es max_worker_processes
, es decir, el número total de procesos en segundo plano.
Si no fue posible asignar un flujo de trabajo, el procesamiento será de un solo proceso.
El planificador de consultas puede reducir los flujos de trabajo según el tamaño de la tabla o índice. Hay parámetros min_parallel_table_scan_size
y min_parallel_index_scan_size
para esto.
set min_parallel_table_scan_size='8MB' 8MB table => 1 worker 24MB table => 2 workers 72MB table => 3 workers x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker
Cada vez que una tabla es 3 veces más grande que min_parallel_(index|table)_scan_size
, Postgres agrega un flujo de trabajo. El número de procesos de trabajo no se basa en el costo. La dependencia circular complica las implementaciones complejas. En cambio, el planificador usa reglas simples.
En la práctica, estas reglas no siempre son adecuadas para la producción, por lo que puede cambiar el número de flujos de trabajo para una tabla en particular: ALTER TABLE ... SET ( parallel_workers = N
).
¿Por qué no se usa el procesamiento paralelo?
Además de una larga lista de restricciones, también hay controles de costos:
parallel_setup_cost
: prescindir del procesamiento paralelo de solicitudes cortas. Este parámetro estima el tiempo para preparar la memoria, iniciar el proceso y el intercambio de datos inicial.
parallel_tuple_cost
: la comunicación entre un líder y los trabajadores puede retrasarse en proporción al número de tuplas de los procesos de trabajo. Este parámetro calcula los costos de intercambio de datos.
Unión de bucle anidado
PostgreSQL 9.6+ — . explain (costs off) select c_custkey, count(o_orderkey) from customer left outer join orders on c_custkey = o_custkey and o_comment not like '%special%deposits%' group by c_custkey; QUERY PLAN -------------------------------------------------------------------------------------- Finalize GroupAggregate Group Key: customer.c_custkey -> Gather Merge Workers Planned: 4 -> Partial GroupAggregate Group Key: customer.c_custkey -> Nested Loop Left Join -> Parallel Index Only Scan using customer_pkey on customer -> Index Scan using idx_orders_custkey on orders Index Cond: (customer.c_custkey = o_custkey) Filter: ((o_comment)::text !~~ '%special%deposits%'::text)
La recolección se lleva a cabo en la última etapa, por lo que la unión izquierda del bucle anidado es una operación paralela. La exploración de solo índice paralelo apareció solo en la versión 10. Funciona de manera similar a la exploración en serie paralela. La condición c_custkey = o_custkey
lee un pedido para cada línea de cliente. Entonces no es paralelo.
Hash Join - Hash Join
Cada flujo de trabajo crea su propia tabla hash antes de PostgreSQL 11. Y si hay más de cuatro de estos procesos, el rendimiento no mejorará. En la nueva versión, la tabla hash se comparte. Cada flujo de trabajo puede usar WORK_MEM para crear una tabla hash.
select l_shipmode, sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1 else 0 end) as high_line_count, sum(case when o_orderpriority <> '1-URGENT' and o_orderpriority <> '2-HIGH' then 1 else 0 end) as low_line_count from orders, lineitem where o_orderkey = l_orderkey and l_shipmode in ('MAIL', 'AIR') and l_commitdate < l_receiptdate and l_shipdate < l_commitdate and l_receiptdate >= date '1996-01-01' and l_receiptdate < date '1996-01-01' + interval '1' year group by l_shipmode order by l_shipmode LIMIT 1; QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Limit (cost=1964755.66..1964961.44 rows=1 width=27) (actual time=7579.592..7922.997 rows=1 loops=1) -> Finalize GroupAggregate (cost=1964755.66..1966196.11 rows=7 width=27) (actual time=7579.590..7579.591 rows=1 loops=1) Group Key: lineitem.l_shipmode -> Gather Merge (cost=1964755.66..1966195.83 rows=28 width=27) (actual time=7559.593..7922.319 rows=6 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial GroupAggregate (cost=1963755.61..1965192.44 rows=7 width=27) (actual time=7548.103..7564.592 rows=2 loops=5) Group Key: lineitem.l_shipmode -> Sort (cost=1963755.61..1963935.20 rows=71838 width=27) (actual time=7530.280..7539.688 rows=62519 loops=5) Sort Key: lineitem.l_shipmode Sort Method: external merge Disk: 2304kB Worker 0: Sort Method: external merge Disk: 2064kB Worker 1: Sort Method: external merge Disk: 2384kB Worker 2: Sort Method: external merge Disk: 2264kB Worker 3: Sort Method: external merge Disk: 2336kB -> Parallel Hash Join (cost=382571.01..1957960.99 rows=71838 width=27) (actual time=7036.917..7499.692 rows=62519 loops=5) Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) -> Parallel Seq Scan on lineitem (cost=0.00..1552386.40 rows=71838 width=19) (actual time=0.583..4901.063 rows=62519 loops=5) Filter: ((l_shipmode = ANY ('{MAIL,AIR}'::bpchar[])) AND (l_commitdate < l_receiptdate) AND (l_shipdate < l_commitdate) AND (l_receiptdate >= '1996-01-01'::date) AND (l_receiptdate < '1997-01-01 00:00:00'::timestamp without time zone)) Rows Removed by Filter: 11934691 -> Parallel Hash (cost=313722.45..313722.45 rows=3750045 width=20) (actual time=2011.518..2011.518 rows=3000000 loops=5) Buckets: 65536 Batches: 256 Memory Usage: 3840kB -> Parallel Seq Scan on orders (cost=0.00..313722.45 rows=3750045 width=20) (actual time=0.029..995.948 rows=3000000 loops=5) Planning Time: 0.977 ms Execution Time: 7923.770 ms
La solicitud 12 de TPC-H ilustra una conexión de hash paralela. Cada flujo de trabajo participa en la creación de una tabla hash compartida.
Fusionar unirse
Una combinación de fusión no es paralela en la naturaleza. No se preocupe si esta es la última etapa de la solicitud, aún puede ejecutarse en paralelo.
-- Query 2 from TPC-H explain (costs off) select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment from part, supplier, partsupp, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and p_size = 36 and p_type like '%BRASS' and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' and ps_supplycost = ( select min(ps_supplycost) from partsupp, supplier, nation, region where p_partkey = ps_partkey and s_suppkey = ps_suppkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and r_name = 'AMERICA' ) order by s_acctbal desc, n_name, s_name, p_partkey LIMIT 100; QUERY PLAN ---------------------------------------------------------------------------------------------------------- Limit -> Sort Sort Key: supplier.s_acctbal DESC, nation.n_name, supplier.s_name, part.p_partkey -> Merge Join Merge Cond: (part.p_partkey = partsupp.ps_partkey) Join Filter: (partsupp.ps_supplycost = (SubPlan 1)) -> Gather Merge Workers Planned: 4 -> Parallel Index Scan using <strong>part_pkey</strong> on part Filter: (((p_type)::text ~~ '%BRASS'::text) AND (p_size = 36)) -> Materialize -> Sort Sort Key: partsupp.ps_partkey -> Nested Loop -> Nested Loop Join Filter: (nation.n_regionkey = region.r_regionkey) -> Seq Scan on region Filter: (r_name = 'AMERICA'::bpchar) -> Hash Join Hash Cond: (supplier.s_nationkey = nation.n_nationkey) -> Seq Scan on supplier -> Hash -> Seq Scan on nation -> Index Scan using idx_partsupp_suppkey on partsupp Index Cond: (ps_suppkey = supplier.s_suppkey) SubPlan 1 -> Aggregate -> Nested Loop Join Filter: (nation_1.n_regionkey = region_1.r_regionkey) -> Seq Scan on region region_1 Filter: (r_name = 'AMERICA'::bpchar) -> Nested Loop -> Nested Loop -> Index Scan using idx_partsupp_partkey on partsupp partsupp_1 Index Cond: (part.p_partkey = ps_partkey) -> Index Scan using supplier_pkey on supplier supplier_1 Index Cond: (s_suppkey = partsupp_1.ps_suppkey) -> Index Scan using nation_pkey on nation nation_1 Index Cond: (n_nationkey = supplier_1.s_nationkey)
El nodo Merge Join se encuentra sobre Gather Merge. Por lo tanto, la fusión no usa procesamiento paralelo. Pero el nodo Parallel Index Scan todavía ayuda con el segmento part_pkey
.
Conexión de sección
En PostgreSQL 11, la partición está deshabilitada de manera predeterminada: tiene una programación muy costosa. Las tablas con particiones similares se pueden unir sección por sección. Entonces Postgres usará tablas hash más pequeñas. Cada conexión de sección puede ser paralela.
tpch=# set enable_partitionwise_join=t; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN --------------------------------------------------- Append -> Hash Join Hash Cond: (t2.b = t1.a) -> Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p1 t1 Filter: (b = 0) -> Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Hash -> Seq Scan on prt1_p2 t1_1 Filter: (b = 0) tpch=# set parallel_setup_cost = 1; tpch=# set parallel_tuple_cost = 0.01; tpch=# explain (costs off) select * from prt1 t1, prt2 t2 where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000; QUERY PLAN ----------------------------------------------------------- Gather Workers Planned: 4 -> Parallel Append -> Parallel Hash Join Hash Cond: (t2_1.b = t1_1.a) -> Parallel Seq Scan on prt2_p2 t2_1 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p2 t1_1 Filter: (b = 0) -> Parallel Hash Join Hash Cond: (t2.b = t1.a) -> Parallel Seq Scan on prt2_p1 t2 Filter: ((b >= 0) AND (b <= 10000)) -> Parallel Hash -> Parallel Seq Scan on prt1_p1 t1 Filter: (b = 0)
Lo principal es que la conexión en secciones es paralela solo si estas secciones son lo suficientemente grandes.
Anexos paralelos - Anexos paralelos
Se puede utilizar un anexo paralelo en lugar de diferentes bloques en diferentes flujos de trabajo. Esto suele suceder con UNION TODAS las consultas. La desventaja es menos paralelismo, porque cada flujo de trabajo procesa solo 1 solicitud.
Aquí se ejecutan 2 flujos de trabajo, aunque se incluyen 4.
tpch=# explain (costs off) select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day union all select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '2000-12-01' - interval '105' day; QUERY PLAN ------------------------------------------------------------------------------------------------ Gather Workers Planned: 2 -> Parallel Append -> Aggregate -> Seq Scan on lineitem Filter: (l_shipdate <= '2000-08-18 00:00:00'::timestamp without time zone) -> Aggregate -> Seq Scan on lineitem lineitem_1 Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Las variables mas importantes
- WORK_MEM limita la cantidad de memoria para cada proceso, no solo para las solicitudes: procesos de conexión work_mem = mucha memoria.
max_parallel_workers_per_gather
: cuántos procesos de trabajo utilizará el programa en ejecución para el procesamiento paralelo del plan.max_worker_processes
: ajusta el número total de procesos de trabajo al número de núcleos de CPU en el servidor.max_parallel_workers
es lo mismo, pero para flujos de trabajo paralelos.
Resumen
A partir de la versión 9.6, el procesamiento paralelo puede mejorar seriamente el rendimiento de consultas complejas que analizan muchas filas o índices. En PostgreSQL 10, el procesamiento paralelo está habilitado de forma predeterminada. Recuerde deshabilitarlo en servidores con una gran carga de trabajo OLTP. Los escaneos secuenciales o los escaneos de índice consumen muchos recursos. Si no está informando en todo el conjunto de datos, las consultas pueden hacerse más eficientes simplemente agregando los índices que faltan o utilizando la partición correcta.
Referencias