Hace un par de meses comencé a estudiar Spark, y en algún momento me encontré con el problema de guardar los cálculos de Streaming Estructurado en la base de datos de Cassandra.
En esta publicación, doy un ejemplo simple de creación y uso de Cassandra Sink para Spark Structured Streaming. Espero que la publicación sea útil para aquellos que recientemente comenzaron a trabajar con Spark Structured Streaming y se preguntan cómo cargar los resultados del cálculo en la base de datos.
La idea de la aplicación es muy simple: recibir y analizar mensajes de Kafka, realizar transformaciones simples en un par y guardar los resultados en cassandra.
Ventajas de la transmisión estructurada
Puede leer más sobre la transmisión estructurada en la
documentación . En resumen, Structured Streaming es un motor de procesamiento de información de transmisión bien escalable que se basa en el motor Spark SQL. Le permite usar un conjunto de datos / marco de datos para agregar datos, calcular funciones de ventana, conexiones, etc. Es decir, la transmisión estructurada le permite usar el buen SQL antiguo para trabajar con flujos de datos.
Cual es el problema
El lanzamiento estable de Spark Structured Streaming se lanzó en 2017. Es decir, esta es una API bastante nueva que implementa la funcionalidad básica, pero algunas cosas deberán ser realizadas por nosotros mismos. Por ejemplo, Structured Streaming tiene funciones estándar para escribir resultados en un archivo, mosaico, consola o memoria, pero para guardar los datos en la base de datos, debe usar el receptor
foreach disponible en Structured Streaming e implementar la interfaz
ForeachWriter .
A partir de Spark 2.3.1, esta funcionalidad solo se puede implementar en Scala y Java .
Supongo que el lector ya sabe cómo funciona Structured Streaming en términos generales, sabe cómo implementar las transformaciones necesarias y ahora está listo para cargar los resultados en la base de datos. Si alguno de los pasos anteriores no está claro, la documentación oficial puede servir como un buen punto de partida para aprender la Transmisión Estructurada. En este artículo, me gustaría centrarme en el último paso cuando necesite guardar los resultados en una base de datos.
A continuación, describiré un ejemplo de implementación del sumidero Cassandra para la transmisión estructurada y explicaré cómo ejecutarlo en un clúster. El código completo está disponible
aquí .
Cuando encontré por primera vez el problema anterior,
este proyecto resultó ser muy útil. Sin embargo, puede parecer un poco complicado si el lector acaba de comenzar a trabajar con Structured Streaming y está buscando un ejemplo simple de cómo cargar datos en Cassandra. Además, el proyecto está escrito para funcionar en modo local y requiere algunos cambios para ejecutarse en el clúster.
También quiero dar ejemplos de cómo guardar datos en
MongoDB y cualquier otra base de datos usando
JDBC .
Solución simple
Para cargar datos a un sistema externo, debe usar el receptor
foreach . Lea más sobre esto
aquí . En resumen, se debe implementar la interfaz
ForeachWriter . Es decir, es necesario determinar cómo abrir la conexión, cómo procesar cada dato y cómo cerrar la conexión al final del procesamiento. El código fuente es el siguiente:
class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
La definición de
CassandraDriver y la estructura de la tabla de salida que describiré más adelante, pero por ahora, echemos un vistazo más de cerca a cómo funciona el código anterior. Para conectarme a Kasandra desde Spark, creo un objeto
CassandraDriver que proporciona acceso a
CassandraConnector , un conector desarrollado por
DataStax . CassandraConnector es responsable de abrir y cerrar la conexión a la base de datos, por lo que simplemente visualizo mensajes de depuración en los métodos de
apertura y
cierre de la clase
CassandraSinkForeach .
El código anterior se llama desde la aplicación principal de la siguiente manera:
val sink = parsed .writeStream .queryName("KafkaToCassandraForeach") .outputMode("update") .foreach(new CassandraSinkForeach()) .start()
CassandraSinkForeach se crea para cada fila de datos, por lo que cada nodo de trabajo inserta su parte de las filas en la base de datos. Es decir, cada nodo de trabajo ejecuta
val cassandraDriver = new CassandraDriver (); Así es como se ve CassandraDriver:
class CassandraDriver extends SparkSessionBuilder {
Echemos un vistazo más de cerca al objeto de
chispa . El código para
SparkSessionBuilder es el siguiente:
class SparkSessionBuilder extends Serializable {
En cada nodo de trabajo,
SparkSessionBuilder proporciona acceso a la
SparkSession que se creó en el controlador. Para hacer posible dicho acceso, es necesario serializar
SparkSessionBuilder y usar el valor
vago transitorio , que permite que el sistema de serialización ignore los objetos
conf y
spark cuando el programa se inicializa y hasta que se accede a los objetos. Por lo tanto, cuando se inicia el programa,
buildSparkSession se serializa y se envía a cada nodo de trabajo, pero los objetos
conf y
spark solo se permiten cuando el nodo de trabajo está accediendo a ellos.
Ahora veamos el código de la aplicación principal:
object KafkaToCassandra extends SparkSessionBuilder {
Cuando la aplicación se envía para su ejecución,
buildSparkSession se serializa y se envía a los nodos de trabajo, sin embargo, los objetos
conf y
spark permanecen sin resolver. Luego, el controlador crea un objeto de chispa dentro de
KafkaToCassandra y distribuye el trabajo entre los nodos de trabajo. Cada nodo de trabajo lee datos de Kafka, realiza transformaciones simples en la parte recibida de los registros, y cuando el nodo de trabajo está listo para escribir los resultados en la base de datos, permite
conf y objetos de
chispa , obteniendo así acceso a la
SparkSession creada en el controlador.
¿Cómo construir y ejecutar la aplicación?
Cuando me mudé de PySpark a Scala, me llevó un tiempo descubrir cómo construir la aplicación. Por lo tanto,
incluí Maven
pom.xml en mi proyecto. El lector puede construir la aplicación usando Maven ejecutando el comando
mvn package . Después de que la aplicación se puede enviar para su ejecución usando
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar
Para construir y ejecutar la aplicación, es necesario reemplazar los nombres de mis máquinas AWS con los suyos (es decir, reemplazar todo lo que parece ec2-xx-xxx-xx-xx.compute-1.amazonaws.com).
Spark and Structured Streaming en particular es un tema nuevo para mí, por lo que agradeceré mucho a los lectores por sus comentarios, discusiones y correcciones.