Spark SQL. Un poco sobre el optimizador de consultas

Hola a todos Como introducción, quiero contarles cómo llegué a tal vida.


Antes de reunirme con Big Data y Spark, en particular, tenía muchas y muchas veces para optimizar las consultas SQL, primero para MSSQL, luego para Oracle, y ahora me encontré con SparkSQL.


Y si ya hay muchos libros buenos para el DBMS que describen la metodología y los "bolígrafos" que puede girar para obtener el plan de consulta óptimo, entonces no he visto esos libros para Spark. Encontré más artículos y conjuntos de prácticas, más relacionados con el trabajo a través de la API RDD / Dataset, en lugar de SQL puro. Para mí, uno de los libros de referencia sobre optimización de SQL es el libro de J. Lewis, Oracle. Conceptos básicos de optimización de costos ". Busqué algo similar en profundidad de estudio. ¿Por qué fue objeto de investigación específicamente SparkSQL, y no la API subyacente? Luego, el interés fue causado por las características del proyecto en el que estoy trabajando.




Para uno de nuestros clientes, nuestra empresa está desarrollando un almacén de datos, cuya capa detallada y parte de las vitrinas están en el clúster de Hadoop, y las vitrinas finales están en Oracle. Este proyecto involucra una extensa capa de conversión de datos, que se implementa en Spark. Para acelerar el desarrollo y la conectividad de los desarrolladores de ETL que no están familiarizados con las complejidades de las tecnologías de Big Data, pero están familiarizados con las herramientas SQL y ETL, se ha desarrollado una herramienta que recuerda ideológicamente a otras herramientas ETL, por ejemplo, Informatica, y le permite diseñar visualmente procesos ETL con la generación posterior código para Spark. Debido a la complejidad de los algoritmos y la gran cantidad de transformaciones, los desarrolladores utilizan principalmente consultas SparkSQL.


Aquí es donde comienza la historia, ya que tuve que responder una gran cantidad de preguntas de la forma "¿Por qué la consulta no funciona / funciona lentamente / no funciona como en Oracle?". Esta resultó ser la parte más interesante para mí: "¿Por qué funciona lentamente?". Además, a diferencia del DBMS con el que trabajé antes, puede ingresar al código fuente y obtener la respuesta a sus preguntas.


Limitaciones y Suposiciones


Spark 2.3.0 se utiliza para ejecutar ejemplos y analizar el código fuente.
Se supone que el lector está familiarizado con la arquitectura Spark y los principios generales del optimizador de consultas para uno de los DBMS. Como mínimo, la frase "plan de consulta" ciertamente no debería ser sorprendente.


Además, este artículo intenta no convertirse en una traducción del código del optimizador de Spark al ruso, por lo que para cosas que son muy interesantes desde el punto de vista del optimizador, pero que pueden leerse en el código fuente, simplemente se mencionarán brevemente aquí con enlaces a las clases correspondientes.


Pasar a estudiar


Comencemos con una pequeña consulta para explorar las etapas básicas a través de las cuales se pasa del análisis a la ejecución.


scala> spark.read.orc("/user/test/balance").createOrReplaceTempView("bal") scala> spark.read.orc("/user/test/customer").createOrReplaceTempView("cust") scala> val df = spark.sql(""" | select bal.account_rk, cust.full_name | from bal | join cust | on bal.party_rk = cust.party_rk | and bal.actual_date = cust.actual_date | where bal.actual_date = cast('2017-12-31' as date) | """) df: org.apache.spark.sql.DataFrame = [account_rk: decimal(38,18), full_name: string] scala> df.explain(true) 

El módulo principal responsable de analizar SQL y optimizar el plan de ejecución de consultas es Spark Catalyst.


El resultado ampliado en la descripción del plan de solicitud (df.explain (true)) le permite realizar un seguimiento de todas las etapas por las que pasa la solicitud:


  • Plan lógico analizado: obtenga después de analizar SQL. En esta etapa, solo se verifica la corrección sintáctica de la solicitud.

 == Parsed Logical Plan == 'Project ['bal.account_rk, 'cust.full_name] +- 'Filter ('bal.actual_date = cast(2017-12-31 as date)) +- 'Join Inner, (('bal.party_rk = 'cust.party_rk) && ('bal.actual_date = 'cust.actual_date)) :- 'UnresolvedRelation `bal` +- 'UnresolvedRelation `cust` 

  • Plan lógico analizado: en esta etapa, se agrega información sobre la estructura de las entidades utilizadas, se verifica la correspondencia de la estructura y los atributos solicitados.

 == Analyzed Logical Plan == account_rk: decimal(38,18), full_name: string Project [account_rk#1, full_name#59] +- Filter (actual_date#27 = cast(2017-12-31 as date)) +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- SubqueryAlias bal : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- SubqueryAlias cust +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57... 9 more fields] orc 

  • El plan lógico optimizado es el más interesante para nosotros. En esta etapa, el árbol de consulta resultante se convierte en función de las reglas de optimización disponibles.

 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[ACTUAL_END_DATE#0,ACCOUNT_RK#1,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter ((isnotnull(actual_date#88) && isnotnull(party_rk#57)) && (actual_date#88 = 17531)) +- Relation[ACTUAL_END_DATE#56,PARTY_RK#57,... 9 more fields] orc 

  • Plan físico: las características de acceso a los datos de origen comienzan a tenerse en cuenta, incluidas las optimizaciones para filtrar particiones y datos para minimizar el conjunto de datos resultante. Se selecciona la estrategia de ejecución de combinación (para obtener más detalles sobre las opciones disponibles, consulte a continuación).

 == Physical Plan == *(2) Project [account_rk#1, full_name#59] +- *(2) BroadcastHashJoin [party_rk#18, actual_date#27], [party_rk#57, actual_date#88], Inner, BuildRight :- *(2) Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- *(2) Filter isnotnull(party_rk#18) : +- *(2) FileScan orc [ACCOUNT_RK#1,PARTY_RK#18,ACTUAL_DATE#27] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/balance], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#27), (ACTUAL_DATE#27 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<ACCOUNT_RK:decimal(38,18),PARTY_RK:decimal(38,18)> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(38,18), true], input[2, date, true])) +- *(1) Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- *(1) Filter isnotnull(party_rk#57) +- *(1) FileScan orc [PARTY_RK#57,FULL_NAME#59,ACTUAL_DATE#88] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://cluster:8020/user/test/customer], PartitionCount: 1, PartitionFilters: [isnotnull(ACTUAL_DATE#88), (ACTUAL_DATE#88 = 17531)], PushedFilters: [IsNotNull(PARTY_RK)], ReadSchema: struct<PARTY_RK:decimal(38,18),FULL_NAME:string> 

Las siguientes etapas de optimización y ejecución (por ejemplo, WholeStageCodegen) están más allá del alcance de este artículo, pero se describen con gran detalle (así como las etapas descritas anteriormente) en Mastering Spark Sql .


La lectura del plan de ejecución de consultas generalmente ocurre "desde adentro" y "de abajo hacia arriba", es decir, las partes más anidadas se ejecutan primero y avanzan gradualmente hasta la proyección final ubicada en la parte superior.


Tipos de optimizadores de consultas


Se pueden distinguir dos tipos de optimizadores de consultas:


  • Optimizadores basados ​​en reglas (RBO).
  • Optimizadores basados ​​en una estimación del costo de ejecución de la consulta (Optimizador basado en costos, CBO).

Los primeros se centran en el uso de un conjunto de reglas fijas, por ejemplo, la aplicación de condiciones de filtrado desde donde en etapas anteriores, si es posible, el cálculo de constantes, etc.


Para evaluar la calidad del plan resultante, el optimizador CBO utiliza una función de costo, que generalmente depende de la cantidad de datos procesados, el número de filas que caen bajo los filtros y el costo de realizar ciertas operaciones.


Para obtener más información sobre la especificación de diseño CBO para Apache Spark, siga los enlaces: la especificación y la tarea principal de JIRA para la implementación .


El punto de partida para explorar la gama completa de optimizaciones existentes es el código Optimizer.scala.


Aquí hay un breve extracto de una larga lista de optimizaciones disponibles:


 def batches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down PushProjectionThroughUnion, ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, LimitPushDown, ColumnPruning, InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, CollapseWindow, CombineFilters, CombineLimits, CombineUnions, // Constant folding and strength reduction NullPropagation, ConstantPropagation, ........ 

Cabe señalar que la lista de estas optimizaciones incluye tanto optimizaciones basadas en reglas como optimizaciones basadas en estimaciones de costos de consulta, que se discutirán a continuación.


Una característica de CBO es que para un funcionamiento correcto necesita conocer y almacenar información sobre las estadísticas de los datos utilizados en la consulta: la cantidad de registros, el tamaño de registro, los histogramas de distribución de datos en las columnas de la tabla.


Para recopilar estadísticas, se usa un conjunto de comandos SQL ANALYZE TABLE ... COMPUTE STATISTICS, además, se necesita un conjunto de tablas para almacenar información, la API se proporciona a través de ExternalCatalog, más precisamente a través de HiveExternalCatalog.


Dado que CBO está actualmente deshabilitado de manera predeterminada, el énfasis principal se pondrá en investigar la optimización y los matices disponibles de RBO.


Tipos y elección de estrategia de unión


En la etapa de formación del plan físico para ejecutar la solicitud, se selecciona la estrategia de unión. Las siguientes opciones están actualmente disponibles en Spark (puede comenzar a aprender el código del código en SparkStrategies.scala).


Difundir hash join


La mejor opción es si una de las partes de la unión es lo suficientemente pequeña (el criterio de suficiencia se establece mediante el parámetro spark.sql.autoBroadcastJoinThreshold en SQLConf). En este caso, este lado se copia completamente a todos los ejecutores, donde hay una unión hash con la tabla principal. Además del tamaño, debe tenerse en cuenta que en el caso de la unión externa, solo se puede copiar el lado externo, por lo tanto, si es posible, como la tabla principal en el caso de la unión externa, debe usar la tabla con la mayor cantidad de datos.


   ,    ,     SQL      Oracle,   /*+ broadcast(t1, t2) */ 

Ordenar fusionar unirse


Con spark.sql.join.preferSortMergeJoin activado de forma predeterminada, este método se aplica de forma predeterminada si se pueden ordenar las claves para la unión.
De las características, se puede observar que, a diferencia del método anterior, la optimización de generación de código para realizar la operación solo está disponible para la unión interna.


Reproducción aleatoria de hash


Si las claves no se pueden ordenar, o la opción de selección de combinación de combinación de clasificación predeterminada está deshabilitada, Catalyst intenta aplicar una combinación aleatoria aleatoria. Además de verificar la configuración, también se verifica que Spark tenga suficiente memoria para construir un mapa de hash local para una partición (el número total de particiones se establece configurando spark.sql.shuffle.partitions )


BroadcastNestedLoopJoin y CartesianProduct


En el caso donde no hay posibilidad de comparación directa por clave (por ejemplo, una condición como) o no hay claves para unir tablas, dependiendo del tamaño de las tablas, se selecciona este tipo o CartesianProduct.


El orden de especificar tablas en join'ah


En cualquier caso, unirse requiere barajar tablas por clave. Por lo tanto, en este momento, el orden de especificar tablas, especialmente en el caso de realizar varias uniones en una fila, es importante (si usted es un aburrido, entonces si CBO no está activado y la configuración JOIN_REORDER_ENABLED no está activada).


Si es posible, el orden de unir tablas debe minimizar el número de operaciones de barajado para tablas grandes, para lo cual las uniones en la misma clave deben ir secuencialmente. Además, no olvide minimizar los datos para unirse, para habilitar Broadcast Hash Join.


Aplicación transitiva de las condiciones del filtro.


Considere la siguiente consulta:


 select bal.account_rk, cust.full_name from balance bal join customer cust on bal.party_rk = cust.party_rk and bal.actual_date = cust.actual_date where bal.actual_date = cast('2017-12-31' as date) 

Aquí conectamos dos tablas que se particionan de la misma manera, de acuerdo con el campo actual_date y aplicamos un filtro explícito solo a la partición de acuerdo con la tabla de equilibrio.


Como se puede ver en el plan de consulta optimizado, el filtro por fecha también se aplica al cliente, y al momento de leer los datos del disco se determina que se necesita exactamente una partición.


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join Inner, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter ((isnotnull(actual_date#27) && (actual_date#27 = 17531)) && isnotnull(party_rk#18)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Filter (((actual_date#88 = 17531) && isnotnull(actual_date#88)) && isnotnull(party_rk#57)) +- Relation[,... 9 more fields] orc 

Pero solo necesita reemplazar la unión interna con la izquierda externa en la consulta, ya que el predicado de inserción para la tabla del cliente se cae inmediatamente y se produce un escaneo completo, lo cual es un efecto indeseable.


 == Optimized Logical Plan == Project [account_rk#1, full_name#59] +- Join LeftOuter, ((party_rk#18 = party_rk#57) && (actual_date#27 = actual_date#88)) :- Project [ACCOUNT_RK#1, PARTY_RK#18, ACTUAL_DATE#27] : +- Filter (isnotnull(actual_date#27) && (actual_date#27 = 17531)) : +- Relation[,... 4 more fields] orc +- Project [PARTY_RK#57, FULL_NAME#59, ACTUAL_DATE#88] +- Relation[,... 9 more fields] orc 

Conversión de tipo


Considere un ejemplo simple de selección de una tabla con filtrado por tipo de cliente, en el esquema, el tipo del campo party_type es string.


 select party_rk, full_name from cust where actual_date = cast('2017-12-31' as date) and party_type = 101 --   -- and party_type = '101' --     

Y compare los dos planes resultantes, el primero, cuando nos referimos al tipo incorrecto (habrá una conversión implícita a int), el segundo, cuando el tipo corresponde al esquema.


 PushedFilters: [IsNotNull(PARTY_TYPE)] //            . PushedFilters: [IsNotNull(PARTY_TYPE), EqualTo(PARTY_TYPE,101)] //             . 

Se observa un problema similar para el caso de comparar fechas con una cadena, habrá un filtro para comparar cadenas. Un ejemplo:


 where OPER_DATE = '2017-12-31' Filter (isnotnull(oper_date#0) && (cast(oper_date#0 as string) = 2017-12-31) PushedFilters: [IsNotNull(OPER_DATE)] where OPER_DATE = cast('2017-12-31' as date) PushedFilters: [IsNotNull(OPER_DATE), EqualTo(OPER_DATE,2017-12-31)] 

Para el caso en que es posible una conversión de tipo implícita, por ejemplo, int -> decimal, el optimizador lo hace por sí solo.


Más investigación


Se puede obtener mucha información interesante sobre los "mandos" que se pueden utilizar para ajustar Catalyst, así como sobre las posibilidades (presentes y futuras) del optimizador de SQLConf.scala.


En particular, como puede ver por defecto, el optimizador de costos todavía está apagado en este momento.


 val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") .booleanConf .createWithDefault(false) 

Además de sus optimizaciones dependientes asociadas con la reordenación de join'ov.


 val JOIN_REORDER_ENABLED = buildConf("spark.sql.cbo.joinReorder.enabled") .doc("Enables join reorder in CBO.") .booleanConf .createWithDefault(false) 

o


 val STARSCHEMA_DETECTION = buildConf("spark.sql.cbo.starSchemaDetection") .doc("When true, it enables join reordering based on star schema detection. ") .booleanConf .createWithDefault(false) 

Resumen breve


Solo se ha tocado una pequeña parte de las optimizaciones existentes, los experimentos con la optimización de costos, que pueden dar mucho más espacio para la conversión de consultas, están por venir. Además, otra pregunta interesante es la comparación de un conjunto de optimizaciones al leer archivos de Parquet y Orc, a juzgar por la jira del proyecto, se trata de paridad, pero ¿es realmente así?


Además


  • El análisis y la optimización de solicitudes es interesante y emocionante, especialmente teniendo en cuenta la disponibilidad de códigos fuente.
  • La inclusión de CBO proporcionará margen para futuras optimizaciones e investigaciones.
  • Es necesario controlar la aplicabilidad de las reglas básicas que le permiten filtrar la mayor cantidad posible de datos "adicionales" en las primeras etapas posibles.
  • Unirse es un mal necesario, pero si es posible, vale la pena minimizarlos y realizar un seguimiento de qué implementación se utiliza bajo el capó.

Source: https://habr.com/ru/post/es417103/


All Articles