Apache Spark, evaluación diferida y consultas SQL de varias páginas

El famoso: spark funciona con marcos de datos, que son algoritmos de transformación. El algoritmo se lanza en el último momento para "dar más espacio" a la optimización y debido a la optimización para ejecutarlo de la manera más eficiente posible.


Debajo del corte, analizaremos cómo descomponer una consulta SQL de varias páginas en átomos (sin pérdida de eficiencia) y cómo reducir significativamente el tiempo de ejecución de la tubería ETL debido a esto.


Evaluación perezosa


Una característica funcional interesante de la chispa es la evaluación perezosa: las transformaciones solo se ejecutan cuando se completan las acciones. Cómo funciona (aproximadamente): los algoritmos para construir los marcos de datos que preceden a la acción están "pegados", el optimizador crea el algoritmo final más eficiente desde su punto de vista, que comienza y da el resultado (el que fue solicitado por la acción).


Lo interesante aquí en el contexto de nuestra presentación: cualquier consulta compleja puede descomponerse en "átomos" sin pérdida de eficiencia. Analicemos un poco más.


SQL multipágina


Hay muchas razones por las que escribimos consultas SQL de "páginas múltiples", una de las principales, probablemente, renuencia a crear objetos intermedios (reticencia respaldada por requisitos de eficiencia). El siguiente es un ejemplo de una consulta relativamente compleja (por supuesto, incluso es muy simple, pero para fines de presentación adicional, tendremos suficiente).


qSel = """ select con.contract_id as con_contract_id, con.begin_date as con_begin_date, con.product_id as con_product_id, cst.contract_status_type_id as cst_status_type_id, sbj.subject_id as sbj_subject_id, sbj.subject_name as sbj_subject_name, pp.birth_date as pp_birth_date from kasko.contract con join kasko.contract_status cst on cst.contract_status_id = con.contract_status_id join kasko.subject sbj on sbj.subject_id = con.owner_subject_id left join kasko.physical_person pp on pp.subject_id = con.owner_subject_id """ dfSel = sp.sql(qSel) 

Que vemos


  • los datos se seleccionan de varias tablas
  • se utilizan diferentes tipos de unión
  • las columnas seleccionables se distribuyen por parte de selección, parte de unión (y donde parte, pero aquí no está aquí; lo eliminé por simplicidad)

Esta consulta podría descomponerse en simples (por ejemplo, primero combine las tablas contract y contract_status, guarde el resultado en una tabla temporal, luego combínelo con el asunto, también guarde el resultado en una tabla temporal, etc.). Seguramente, cuando creamos consultas realmente complejas, hacemos esto, justo entonces, después de la depuración, recopilamos todo esto en un bloque de varias páginas.


¿Qué hay de malo aquí? Nada, de hecho, todo el mundo trabaja así y está acostumbrado.


Pero hay inconvenientes, o mejor dicho, qué mejorar, siga leyendo.


La misma consulta en chispa


Cuando use la chispa para la transformación, por supuesto, puede simplemente tomar y ejecutar esta solicitud (y será bueno, de hecho, también la ejecutaremos), pero puede ir hacia otro lado, intentemoslo.


Descompongamos esta consulta "compleja" en "átomos" - marcos de datos elementales. Obtendremos tantos como el número de tablas involucradas en la consulta (en este caso, 4).


Aquí están - "átomos":


 dfCon = sp.sql("""select contract_id as con_contract_id, begin_date as con_begin_date, product_id as con_product_id, owner_subject_id as con_owner_subject_id, contract_status_id as con_contract_status_id from kasko.contract""") dfCStat = sp.sql("""select contract_status_id as cst_status_id, contract_status_type_id as cst_status_type_id from kasko.contract_status""") dfSubj = sp.sql("""select subject_id as sbj_subject_id, subject_type_id as sbj_subject_type_id, subject_name as sbj_subject_name from kasko.subject""") dfPPers = sp.sql("""select subject_id as pp_subject_id, birth_date as pp_birth_date from kasko.physical_person""") 

Spark te permite unirlos usando expresiones separadas de los "átomos" reales, hagamos esto:


 con_stat = f.col("cst_status_id")==f.col("con_contract_status_id") con_subj_own = f.col("con_owner_subject_id")==f.col("sbj_subject_id") con_ppers_own = f.col("con_owner_subject_id")==f.col("pp_subject_id") 

Entonces nuestra "consulta compleja" se verá así:


 dfAtom = dfCon.join(dfCStat,con_stat, "inner")\ .join(dfSubj,con_subj_own,"inner") \ .join(dfPPers,con_ppers_own, "left") \ .drop("con_contract_status_id","sbj_subject_type_id", "pp_subject_id","con_owner_subject_id","cst_status_id") 

¿Qué hay de bueno aquí? A primera vista, no es nada, todo lo contrario: mediante el uso de SQL "complejo" puede comprender lo que está sucediendo, por nuestra consulta "atómica" es más difícil de entender, necesita mirar "átomos" y expresiones.


Primero, asegurémonos de que estas consultas sean equivalentes: en el libro de Jupyter, por referencia , di planes para cumplir ambas consultas (los curiosos pueden encontrar 10 diferencias, pero la esencia, la equivalencia, es obvia). Esto, por supuesto, no es un milagro, debería serlo (ver arriba para una evaluación y optimización perezosa).


Lo que tenemos al final: la solicitud de "páginas múltiples" y la solicitud "atómica" funcionan con la misma eficiencia (esto es importante, sin que estas consideraciones adicionales pierdan parcialmente su significado).


Bueno, ahora encontremos lo bueno en la forma "atómica" de generar consultas.


Lo que es un "átomo" (marco de datos elemental) es nuestro conocimiento de un subconjunto del área temática (parte de la tabla relacional). Al aislar tales "átomos" seleccionamos automáticamente (y, lo que es más importante, algorítmica y reproduciblemente) una parte significativa de lo ilimitado para nosotros llamado "modelo de datos físicos".


¿Cuál es la expresión que usamos cuando nos unimos? Esto también es conocimiento sobre el área temática: así es como (como se indica en la expresión) las entidades del área temática (tablas en la base de datos) están interconectadas.


Repito, esto es importante, este "conocimiento" (átomos y expresiones) se materializa en el código ejecutable (no en el diagrama o la descripción verbal), este es el código que se ejecuta cada vez que se ejecuta la tubería ETL (el ejemplo se toma, por cierto, de la vida real).


El código ejecutable, como sabemos por el codificador limpio, es uno de los dos artefactos objetivamente existentes que afirman ser el "título" de la documentación. Es decir, el uso de "átomos" nos permite dar un paso adelante en un proceso tan importante como la documentación de datos.


¿Qué más se puede encontrar en la "atomicidad"?


Optimización de transportadores


En la vida real, un ingeniero de datos, por cierto, no me presenté, una tubería ETL consiste en docenas de transformaciones similares a las anteriores. Las tablas se repiten muy a menudo en ellas (de alguna manera calculé en Excel; algunas tablas se usan en el 40% de las consultas).


¿Qué pasa en términos de eficiencia? Desorden: la misma tabla se lee varias veces desde la fuente ...


¿Cómo mejorarlo? Spark tiene un mecanismo para almacenar en caché los marcos de datos: podemos especificar explícitamente qué marcos de datos y cuánto queremos mantener en el caché.


Lo que tenemos que hacer para esto es seleccionar tablas duplicadas y crear consultas de manera que se minimice el tamaño total de la memoria caché (porque todas las tablas, por definición, no encajan en él, entonces hay grandes datos).


¿Se puede hacer esto con consultas SSQ de varias páginas? Sí, pero ... un poco complicado (realmente no tenemos marcos de datos allí, solo tablas, también se pueden almacenar en caché; la comunidad de spark está trabajando en esto).


¿Se puede hacer esto usando consultas atómicas? Si! Y no es difícil, solo necesitamos generalizar los "átomos": agregarles las columnas utilizadas en todas las consultas de nuestra tubería. Si lo piensa, esto es "correcto" desde el punto de vista de la documentación: si se utiliza una columna en alguna consulta (incluso en la parte where), es parte de los datos del área temática que nos interesa.


Y luego todo es simple: almacenamos en caché átomos repetitivos (marcos de datos), construimos una cadena de transformaciones para que la intersección de los marcos de datos en caché sea mínima (por cierto, esto no es trivial, pero puede ser algoritmoizable).


Y obtenemos el transportador más eficiente completamente "gratis". Y además de esto, un artefacto útil e importante es la "preparación" para la documentación de datos sobre el área temática.


Robotización y Automatización


Los átomos son más susceptibles al procesamiento automático que el "gran y poderoso SQL": su estructura es simple y clara, la chispa nos analiza (por lo que le agradecemos especialmente), también crea planes de consulta, analizando los cuales puede reordenar automáticamente la secuencia del procesamiento de la consulta.


Entonces aquí puedes jugar algo.


En conclusión


Tal vez soy demasiado optimista: me parece que esta ruta (atomización de consulta) funciona más que tratar de describir una fuente de datos después del hecho. Además, por cierto, para qué sirven los "aditivos", obtenemos un aumento en la eficiencia. ¿Por qué considero que el enfoque atómico "funciona"? Es parte del proceso regular, lo que significa que los artefactos descritos tienen una posibilidad real de ser relevantes a largo plazo.


Probablemente me perdí algo, ¿ayudar a encontrar (en los comentarios)?

Source: https://habr.com/ru/post/481924/


All Articles